【设计总结】灵活拼装随时结束、暂停、重启的 Pipeline

需求部分

场景:

我们的飞机在拍摄完一张照片并生成后,APP 需要实现这么个流程:

  1. 飞机通过数传,回调给 APP 一个文件名
  2. APP 拿到文件名后, 要从 MediaServer 获取文件名对应的数据
  3. 将文件数据插入数据库
  4. 执行缩略图和源图的下载
  5. 对下载后的源图做美化
  6. 对下载后的源图加水印
  7. 存到相册



飞机录视频时(比如 4K),APP 端会同步将飞机发送预览 rtp 解码(720P)后也写入一个文件。
因为源视频太大,需要用户主动触发时才下载,需求上需要录一个视频作为给用户预览用
流程是飞机在录制完一段视频后:

  1. 飞机通过数传,回调给 APP 一个文件名
  2. APP 拿到文件名后, 要从 MediaServer 获取文件名对应的数据
  3. 将文件数据插入数据库
  4. APP 将自己录制的视频找合适的地方存放,并在数据库中建立关联
  5. 执行缩略图下载



在 APP 中,包括实时图传,相册等场景。 有很多很多种与上述类似,但不完全相同的流程。 并且其中部分步骤是一样的。
下载部分,因为涉及到两种通信(wifi,usb),导致具体下载实现上也会有不同

条件

基于上面的场景, 由于飞机的限制,对代码有这么些要求:

  • 飞机与 APP 随时会断开,不能因为任何异常导致用户看不到自己拍的内容
    (要求恢复连接/重开 APP 后。所有资源仍然走完完整的下载流程。)
  • 用户有开关,可以决定下载后的内容要不要美化,要不要加水印
    (某个步骤执行与否是动态决定的)
  • 飞机带宽有限,在需要的时候要保证图传优先。会在相册暂停,删除下载中资源
    (执行中的流程需要被管理)
  • 用户会短时间内拍摄多张照片,会在相册同时下载多个资源
    (要求同类流程能同时进行多个)



现在, 让我们忽略每个步骤具体的细节, 考虑这一套业务怎么能较好的实现?

  • 每一步的操作,执行过程中能被停下
  • 某一步操作,可能出现在多个流程中
  • 这些操作步骤,可能是同步,可能是异步的
  • 如果流程中某一步出现异常,流程视情况重试或直接退出整个流程
  • 流程中步骤会根据情况增删或跳过
  • 流程在断了、杀了后还重新恢复到对应步骤
  • 流程被管理
  • 很容易就写出一个新的流程

分析部分

先看里面涉及到的元素, 考虑怎么抽象
首先最直观的两个:

  • 有 ”执行步骤“ 概念:需求里执行的事情,都可以被归为”步骤“
    • ”步骤“ 要能被中断
    • 不同流程可能有相同的步骤, 要可复用
  • 有 ”流程“ 概念: 特定的一些步骤拼装在一块就是流程, 可以理解为是步骤容器
    • ”流程“ 有几种不同的种类
    • 同一种 ”流程“, 同一时间可能存在多个
  • 流程中的步骤,是按顺序执行的异步函数

因为要能对进行中的任务进行管理,(全部暂停,全部开始,任务恢复等), 所以在流程上面还有个 ”管理器“ 概念
而从需求描述可知,里面的步骤大多都是异步行为,所以执行流程时,需要有一套实现能在一个异步结束后,调用下一个异步操作。


我的实现

早期实现用的函数拼装,后面由同时改由改用 OC 的 RAC, 转到 RN 后我在 TypeScript 环境用的 rxjs 实现
大体思路差不多, 这里用我实现的方式做例子

回顾时发现, DownloadManager 和 PipeLineTask ,PipeLineStep 这一套还挺像 NSOperation 和 NSOperationQueue 的

MediaPipeline 示范代码

步骤: MediaPipelineStep

以执行步骤本身的角度看,其可以理解为是执行单一行为函数。 与业务和流程都无关
而我们在 外部,对执行步骤提出了两个需求: 可组装可中断

如何组装步骤

组装方面好处理, 很自然的联想到, 用一个数组,把所有的步骤都装进去。 然后从头往后执行,就是一个组装好的流程了。

//举个栗子  
const pipeline.steps = [Step1,Step2,Step3,Step4],   
pipeline.run() // 逐个执行步骤  

实现方式有这几种:

  1. 定义一个类BaseStep, 类里面包含通用函数 run, 其他字段描述函数需要的参数
    所有不同的执行步骤,是这个类不同的子类(DownloadStepDBInsertStep)。
    最终在执行时,把所有的子类对象,按需要的顺序放进数组,然后调用外界调用 BaseStep 的接口即可
  2. (我采用的方式)一个执行步骤是一个函数,所有函数用统一的入参task(是个描述整个流程的对象,后面再谈)
    然后在数组中,按需要的顺序,存放这些函数。 外部再加一个调用器(就是入参task),由调用器决定怎么执行

当然实现上有不少优化空间, 比如我实际数组中存的是函数对应的 enum 而不是函数本身,实现时使用了 Observer 的概念(rxjs)。

具体实现

// 定义 enum 用来对应函数步骤  
export enum MediaPipelineStepType {  
  Init = "Init",  
  CheckInProgressDuplication = "checkInProgressDuplication",  
	FetchMediaInfo = "fetchMediaInfo",  
  DownloadMediaThumbnailByHttp = "downloadMediaThumbnailByHttp",  
}  
// MediaPipelineStepType 与函数对应  
const StepProcesses: { [key: string]: (task: MediaPipelineTask) => Observable<MediaPipelineTask> } = {  
	[MediaPipelineStepType.FetchMediaInfo]: fetchMediaInfo,  
	[MediaPipelineStepType.DownloadMediaThumbnailByHttp]: downloadMediaThumbnailByHttp,  
}  
// 描述一个流程就是在定义一个数组  
const WiFiDownloadAndWritePhotoSteps = [  
  MediaPipelineStepType.FetchMediaInfo,  
  MediaPipelineStepType.DownloadMediaThumbnailByHttp,  
  MediaPipelineStepType.SaveMediaThumbnail,  
  MediaPipelineStepType.InsertMediaToDataBase,  
	...  
]  

如何管理步骤

中断方面,若希望”执行步骤“内部能不依赖外界
比如可以在入参中提供一个外部会调用的函数指针,由内部做函数实现,外部根据需要调用函数

我这里的实现方式是,在外部,有个管理流程 Class MediaPipelineTask. 当一个流程被创建时,其实就是在创建一个 task
task 内部有两个函数字段, didPausedisCancel
执行时,task 会被传入执行步骤里作为参数。 然后执行步骤内部,只要实现了 didPause,在 task 触发 didPause 时,便会执行当前步骤需要的行为

function downloadMediaOriginByHttp(task: MediaPipelineTask) {  
 task.didPauseTask = () => { download.pause()}  
	task.didCancelTask = () => {download.cancel()}  
}  

内部使用了 rxjs 的 Observer, 目的是将一步执行的函数归纳为流的形式,利用起提供的 API 方便的管理流程
(其实用 Promise 组装或者自己管理异步也可以实现,我在另一个类似的叫 MediaEditPipeline的业务中使用 Promise 也能顺利完成。思路是一样的,只是实现上的不同)

具体实现

  • 每个函数各自定义好自己的逻辑,然后内部就是个带 Observerable 的执行函数
    (传入的 Task 其实是外层 pipeline 的管理器,里面带了上下文:context,管理事件didPausedidCancel后面再聊)
    function downloadMediaOriginByHttp(task: MediaPipelineTask) {  
    let {context} = task  
    return Observable.create((observer: Observer<MediaPipelineTask>) => {  
      if (!context.mediaInfo) {/*...*/ return}  
      let downloadTask = new HTTPMediaDownloadTask(task.mediaName, downloadType)  
      FileManager.isEnoughStorageForMedia(context.mediaInfo.size)  
        .then(isEnough => {  
          if (!isEnough) return Promise.reject(MediaPipelineErrorCode.shortOfStorageError)  
          return downloadTask.start()  
        }).then(filePath => {  
        taskLog(task, `downloadMediaOriginByHttp succeed path = ${filePath}`)  
        context.originDownloadedPath = filePath  
        task.currentStage.updateProgress(1)  
        observer.next(task)  
        observer.complete()  
      }).catch((error) => {  
        taskErrorLog(task, 'downloadMediaOriginByHttp error:', error)  
           observer.error(MediaPipelineErrorCode.downloadMediaOriginError)  
      })  
      
      // 更新 task 的进度  
      downloadTask.progress = (received: number, total: number) => {  
        task.currentStage.updateProgress(received / total)  
      }  
      task.didPauseTask = () => {  
        downloadTask.pause()  
        observer.error(MediaPipelineErrorCode.pause)  
      }  
      task.didCancelTask = () => {  
        downloadTask.cancel()  
        observer.error(MediaPipelineErrorCode.interrupt)  
      }  
    })  
    }  
    

流程: MediaPipeline

基于上面步骤的实现方式,流程就比较简单了

  • 流程有多个, 所以用不同的 enum 表示
export enum MediaPipelineType {  
  /** WiFi: 下载缩率图,源图,处理源图及写入数据库 */  
  WiFiDownloadAndWritePhoto = 'WiFiDownloadAndWritePhoto',  
  /** WiFi: 下载缩率图,源视频及写入数据库 */  
  WiFiDownloadAndWriteVideo = 'WiFiDownloadAndWriteVideo',  
  /** WiFi: 下载缩略图,保存预览流及写入数据库*/  
  WiFiWriteVideoWithPreview = 'WiFiWriteVideoWithPreview',  
}  
  • 步骤是 enum MediaPipelineStepType,流程是包含 MediaPipelineStepType 的数组, 对应不同的流程,是不同的数组
/**  
 * 下载图片缩略图,写入图片数据,下载源图,处理源图,更新数据库的流程  
 */  
const WiFiDownloadAndWritePhotoSteps = [  
  MediaPipelineStepType.FetchMediaInfo,  
  MediaPipelineStepType.DownloadMediaThumbnailByHttp,  
  MediaPipelineStepType.SaveMediaThumbnail,  
  MediaPipelineStepType.InsertMediaToDataBase,  
  MediaPipelineStepType.DownloadMediaOriginByHttp,  
  MediaPipelineStepType.AddWaterMark,  
  MediaPipelineStepType.SaveMediaToAlbum,  
  MediaPipelineStepType.UpdateDataBaseMedia,  
]  
const WiFiDownloadAndWriteVideoSteps = [  
  MediaPipelineStepType.FetchMediaInfo,  
  MediaPipelineStepType.DownloadMediaThumbnailByHttp,  
  MediaPipelineStepType.SaveMediaThumbnail,  
  MediaPipelineStepType.InsertMediaToDataBase,  
  MediaPipelineStepType.DownloadMediaOriginByHttp,  
  MediaPipelineStepType.SaveMediaToAlbum,  
  MediaPipelineStepType.UpdateDataBaseMedia,  
]  
  • 再让 MediaPipelineType 和 数组 对应, 然后在使用时将 MediaPipelineStepType 转为对应函数即可
export const MediaPipelineTypeSteps: ZMap<MediaPipelineType, MediaPipelineStepType[]> = {  
  [MediaPipelineType.WiFiDownloadAndWritePhoto]: WiFiDownloadAndWritePhotoSteps,  
  [MediaPipelineType.WiFiWriteVideoWithPreview]: WiFiWriteVideoWithPreviewSteps,  
	...  
}  
// 使用前,将 enum 转为   
public static pipeLine(type: MediaPipelineType) {  
	let steps: MediaPipelineStep[] = MediaPipelineTypeSteps[type].map(type => new MediaPipelineStep(type))  
	return new MediaPipeline(steps)  
}  

执行部分: MediaPipelineTask

上面提到, 流程本身其实只是个装了很多函数的数组。
现在来看怎么让数组里的函数跑起来,(这个实现是基于 rxjs,这是个非常好用的工具,这里不展开 Observerable 的概念)

类定义:

taskID: 用于外界管理的标识符
pipeLineType: 当前执行的流程类型
_state: BehaviorSubjec: 用于对外广播当前的执行状态, 便于外界管理

export default class MediaPipelineTask {  
	/** 任务 ID */  
  readonly taskID: number  
  readonly mediaName: string  
	/** 执行的流程类型 */  
  readonly pipeLineType: MediaPipelineType  
  readonly date: Date  
  /** 是否是恢复执行的 task */  
  private _isResume: boolean = false  
  /** 是否正在执行中 */  
  private _isWorking: boolean = false  
	/** 用于对外广播的当前状态 */  
  private _state: BehaviorSubject<MediaPipelineTaskState>  
  /** 当前执行的步骤名称 */  
  currentStage: MediaPipelineTaskStage  
  /** 当前 MediaPipeline 临时保存的上下文 */  
  context: MediaPipelineTaskContext  
}  

函数部分:

构造函数:记得前面提到过,即使在 APP 被杀掉时也要能恢复吗。 其实现就是 Task 从数据库把自己读出来,然后步骤,上下文就有了

 static taskFromRecord(record: MediaDownloadRecord): MediaPipelineTask {}  

控制函数:

get isResume(): boolean {}  
get isWorking(): boolean {}  
readonly pause = () => { }  
readonly cancel = () => {}  
private getTaskState = (): MediaPipelineTaskState => {}  

执行函数:使用是调用 rxjs,这是核心部分
stepsForTask:之前 pipeline 中有提供把 step enum 转为函数的部分,而后又提供了从 task 读 MediaPipelineType 取得数组的部分。然后记得吗,函数内部是在返回 Observable, 这里就是把 Observable 拿出来。放到 pipe 中。
然后就能用流的方式,处理异步操作
ps.rxjs 这个工具真的非常便捷,但也要意识到,本质是在做异步任务处理,有很多种别的方式能实现

  readonly runInPipeline = (pipeLine: MediaPipeline): BehaviorSubject<MediaPipelineTaskState> => {  
    if (this.isWorking) {...}  
    this._isWorking = true  
    this._state = new BehaviorSubject<MediaPipelineTaskState>(this.getTaskState())  
    Rx.of(this).pipe(  
      ...pipeLine.stepsForTask(this)  
    ).subscribe({  
      next: (task: MediaPipelineTask) => {},  
      error: (error:any) => {  
        this._isWorking = false  
        this.state.error(error)  
      },  
      complete: () => {  
        this._isWorking = false  
        this.state.complete()  
      },  
    })   
    return this.state  
  }  

单个流程的执行部分,到这里基本就结束了。
Task 会在执行过程中被创建很多很多个, 每个 Task 有自己的 MediaPipelineType, 然后会用这个 Type 找到需要执行的MediaPipeline,再通过流程中描述的函数MediaPipelineStep执行具体的操作。
为了能被外界管理,对外暴露了管理接口 pausecancel
为了让外界知道自己的状态, 用BehaviorSubject对外做了广播

然后到此为止, 这套东西实现了:

  • 步骤可以复用
  • 可以灵活拼装(可动态生成, 可随时写出一个新的流程)
  • 外部可在任意步骤执行中断
  • 步骤可动态的决定执行与否

还差这些没实现:

  • 外界能统一管理所有执行中的任务
  • 某个任务被暂停后,即使是杀 APP,也要能恢复

这一部分就只能依托于管理类

全局管理器:MediaDownloadManager

这部分其实已经是业务绑定很强的类, 会直接拿着业务状态,根据状态做内部处理
里面提供了一个管理当前所有 task 的数组, 用于查重,控制,恢复。对外暴露部分接口

private tasks: MediaPipelineTask[] = []  
public readonly getTask = (mediaName: string): MediaPipelineTask | undefined => {}  
public readonly getVideoTasks = (sort?: (a: MediaPipelineTask, b: MediaPipelineTask) => number) => {}  
public readonly getTasks = (sort?: (a: MediaPipelineTask, b: MediaPipelineTask) => number) => {}  
public restoreTasks = () => {}  
public resumeTasks = (mediaNames: string[]) => {}  
public pauseTasksWitPipelineType = (type: MediaPipelineType) => {}  
public pauseTasks = (mediaNames: string[]) => {}  

提供了业务接口,将底层的流程封装成直接的函数,供外界直接调用

public downloadAndAddPhotoRecord = (mediaName: string): MediaPipelineTask => {}  
public downloadAndAddVideoRecord = (mediaName: string): MediaPipelineTask => {}  
public downloadAndAddVideoRecordWithPreview = (...): MediaPipelineTask => {}  

除了这些接口外, 比较重要的逻辑是, 承担了恢复 task 的工作

  • 为了能在断线, APP 被杀情况恢复 task, 为 task 准备了一张数据库表
  • manager 实时监听task的广播,实时更新数据库(记得前面 task 的广播吗,这是其中一个使用的地方)
    private runAndSubscribeTask = (task: MediaPipelineTask, pipeLine: MediaPipeline) => {  
      if (!this.downloadable) {return}  
      task.runInPipeline(pipeLine).subscribe({  
        next: (state) => {  
          DBManager.updateDownloadRecord(task.taskID, {  
            mediaName: task.mediaName,  
            pipeLineType: task.pipeLineType,  
            currentStep: state.stepType,  
            currentStepProgress: state.stepProgress ? state.stepProgress : 0,  
            context: state.context  
          })  
        },  
        error: errorCode => {this.dealWithTaskError(task, errorCode)},  
        complete: () => {this.completeTask(task)},  
      })  
    }  
    

最后,就是一些跟业务相关的维护工作,清理现场等