需求部分
场景:
我们的飞机在拍摄完一张照片并生成后,APP 需要实现这么个流程:
- 飞机通过数传,回调给 APP 一个文件名
- APP 拿到文件名后, 要从 MediaServer 获取文件名对应的数据
- 将文件数据插入数据库
- 执行缩略图和源图的下载
- 对下载后的源图做美化
- 对下载后的源图加水印
- 存到相册
飞机录视频时(比如 4K),APP 端会同步将飞机发送预览 rtp 解码(720P)后也写入一个文件。
因为源视频太大,需要用户主动触发时才下载,需求上需要录一个视频作为给用户预览用
流程是飞机在录制完一段视频后:
- 飞机通过数传,回调给 APP 一个文件名
- APP 拿到文件名后, 要从 MediaServer 获取文件名对应的数据
- 将文件数据插入数据库
- APP 将自己录制的视频找合适的地方存放,并在数据库中建立关联
- 执行缩略图下载
在 APP 中,包括实时图传,相册等场景。 有很多很多种与上述类似,但不完全相同的流程。 并且其中部分步骤是一样的。
下载部分,因为涉及到两种通信(wifi,usb),导致具体下载实现上也会有不同
条件
基于上面的场景, 由于飞机的限制,对代码有这么些要求:
- 飞机与 APP 随时会断开,不能因为任何异常导致用户看不到自己拍的内容
(要求恢复连接/重开 APP 后。所有资源仍然走完完整的下载流程。) - 用户有开关,可以决定下载后的内容要不要美化,要不要加水印
(某个步骤执行与否是动态决定的) - 飞机带宽有限,在需要的时候要保证图传优先。会在相册暂停,删除下载中资源
(执行中的流程需要被管理) - 用户会短时间内拍摄多张照片,会在相册同时下载多个资源
(要求同类流程能同时进行多个)
现在, 让我们忽略每个步骤具体的细节, 考虑这一套业务怎么能较好的实现?
- 每一步的操作,执行过程中能被停下
- 某一步操作,可能出现在多个流程中
- 这些操作步骤,可能是同步,可能是异步的
- 如果流程中某一步出现异常,流程视情况重试或直接退出整个流程
- 流程中步骤会根据情况增删或跳过
- 流程在断了、杀了后还重新恢复到对应步骤
- 流程被管理
- 很容易就写出一个新的流程
分析部分
先看里面涉及到的元素, 考虑怎么抽象
首先最直观的两个:
- 有 ”执行步骤“ 概念:需求里执行的事情,都可以被归为”步骤“
- ”步骤“ 要能被中断
- 不同流程可能有相同的步骤, 要可复用
- 有 ”流程“ 概念: 特定的一些步骤拼装在一块就是流程, 可以理解为是步骤容器
- ”流程“ 有几种不同的种类
- 同一种 ”流程“, 同一时间可能存在多个
- 流程中的步骤,是按顺序执行的异步函数
因为要能对进行中的任务进行管理,(全部暂停,全部开始,任务恢复等), 所以在流程上面还有个 ”管理器“ 概念
而从需求描述可知,里面的步骤大多都是异步行为,所以执行流程时,需要有一套实现能在一个异步结束后,调用下一个异步操作。
我的实现
早期实现用的函数拼装,后面由同时改由改用 OC 的 RAC, 转到 RN 后我在 TypeScript 环境用的 rxjs 实现
大体思路差不多, 这里用我实现的方式做例子
回顾时发现, DownloadManager 和 PipeLineTask ,PipeLineStep 这一套还挺像 NSOperation 和 NSOperationQueue 的
步骤: MediaPipelineStep
以执行步骤本身的角度看,其可以理解为是执行单一行为函数。 与业务和流程都无关
而我们在 外部,对执行步骤提出了两个需求: 可组装,可中断
如何组装步骤
组装方面好处理, 很自然的联想到, 用一个数组,把所有的步骤都装进去。 然后从头往后执行,就是一个组装好的流程了。
//举个栗子
const pipeline.steps = [Step1,Step2,Step3,Step4],
pipeline.run() // 逐个执行步骤
实现方式有这几种:
- 定义一个类
BaseStep, 类里面包含通用函数run, 其他字段描述函数需要的参数
所有不同的执行步骤,是这个类不同的子类(DownloadStep,DBInsertStep)。
最终在执行时,把所有的子类对象,按需要的顺序放进数组,然后调用外界调用BaseStep的接口即可 - (我采用的方式)一个执行步骤是一个函数,所有函数用统一的入参
task(是个描述整个流程的对象,后面再谈)
然后在数组中,按需要的顺序,存放这些函数。 外部再加一个调用器(就是入参task),由调用器决定怎么执行
当然实现上有不少优化空间, 比如我实际数组中存的是函数对应的 enum 而不是函数本身,实现时使用了 Observer 的概念(rxjs)。
具体实现:
// 定义 enum 用来对应函数步骤
export enum MediaPipelineStepType {
Init = "Init",
CheckInProgressDuplication = "checkInProgressDuplication",
FetchMediaInfo = "fetchMediaInfo",
DownloadMediaThumbnailByHttp = "downloadMediaThumbnailByHttp",
}
// MediaPipelineStepType 与函数对应
const StepProcesses: { [key: string]: (task: MediaPipelineTask) => Observable<MediaPipelineTask> } = {
[MediaPipelineStepType.FetchMediaInfo]: fetchMediaInfo,
[MediaPipelineStepType.DownloadMediaThumbnailByHttp]: downloadMediaThumbnailByHttp,
}
// 描述一个流程就是在定义一个数组
const WiFiDownloadAndWritePhotoSteps = [
MediaPipelineStepType.FetchMediaInfo,
MediaPipelineStepType.DownloadMediaThumbnailByHttp,
MediaPipelineStepType.SaveMediaThumbnail,
MediaPipelineStepType.InsertMediaToDataBase,
...
]
如何管理步骤
中断方面,若希望”执行步骤“内部能不依赖外界
比如可以在入参中提供一个外部会调用的函数指针,由内部做函数实现,外部根据需要调用函数
我这里的实现方式是,在外部,有个管理流程 Class MediaPipelineTask. 当一个流程被创建时,其实就是在创建一个 task
task 内部有两个函数字段, didPause 和 disCancel。
执行时,task 会被传入执行步骤里作为参数。 然后执行步骤内部,只要实现了 didPause,在 task 触发 didPause 时,便会执行当前步骤需要的行为
function downloadMediaOriginByHttp(task: MediaPipelineTask) {
task.didPauseTask = () => { download.pause()}
task.didCancelTask = () => {download.cancel()}
}
内部使用了 rxjs 的 Observer, 目的是将一步执行的函数归纳为流的形式,利用起提供的 API 方便的管理流程
(其实用 Promise 组装或者自己管理异步也可以实现,我在另一个类似的叫 MediaEditPipeline的业务中使用 Promise 也能顺利完成。思路是一样的,只是实现上的不同)
具体实现:
- 每个函数各自定义好自己的逻辑,然后内部就是个带 Observerable 的执行函数
(传入的 Task 其实是外层 pipeline 的管理器,里面带了上下文:context,管理事件didPause,didCancel后面再聊)function downloadMediaOriginByHttp(task: MediaPipelineTask) { let {context} = task return Observable.create((observer: Observer<MediaPipelineTask>) => { if (!context.mediaInfo) {/*...*/ return} let downloadTask = new HTTPMediaDownloadTask(task.mediaName, downloadType) FileManager.isEnoughStorageForMedia(context.mediaInfo.size) .then(isEnough => { if (!isEnough) return Promise.reject(MediaPipelineErrorCode.shortOfStorageError) return downloadTask.start() }).then(filePath => { taskLog(task, `downloadMediaOriginByHttp succeed path = ${filePath}`) context.originDownloadedPath = filePath task.currentStage.updateProgress(1) observer.next(task) observer.complete() }).catch((error) => { taskErrorLog(task, 'downloadMediaOriginByHttp error:', error) observer.error(MediaPipelineErrorCode.downloadMediaOriginError) }) // 更新 task 的进度 downloadTask.progress = (received: number, total: number) => { task.currentStage.updateProgress(received / total) } task.didPauseTask = () => { downloadTask.pause() observer.error(MediaPipelineErrorCode.pause) } task.didCancelTask = () => { downloadTask.cancel() observer.error(MediaPipelineErrorCode.interrupt) } }) }
流程: MediaPipeline
基于上面步骤的实现方式,流程就比较简单了
- 流程有多个, 所以用不同的 enum 表示
export enum MediaPipelineType {
/** WiFi: 下载缩率图,源图,处理源图及写入数据库 */
WiFiDownloadAndWritePhoto = 'WiFiDownloadAndWritePhoto',
/** WiFi: 下载缩率图,源视频及写入数据库 */
WiFiDownloadAndWriteVideo = 'WiFiDownloadAndWriteVideo',
/** WiFi: 下载缩略图,保存预览流及写入数据库*/
WiFiWriteVideoWithPreview = 'WiFiWriteVideoWithPreview',
}
- 步骤是
enum MediaPipelineStepType,流程是包含MediaPipelineStepType的数组, 对应不同的流程,是不同的数组
/**
* 下载图片缩略图,写入图片数据,下载源图,处理源图,更新数据库的流程
*/
const WiFiDownloadAndWritePhotoSteps = [
MediaPipelineStepType.FetchMediaInfo,
MediaPipelineStepType.DownloadMediaThumbnailByHttp,
MediaPipelineStepType.SaveMediaThumbnail,
MediaPipelineStepType.InsertMediaToDataBase,
MediaPipelineStepType.DownloadMediaOriginByHttp,
MediaPipelineStepType.AddWaterMark,
MediaPipelineStepType.SaveMediaToAlbum,
MediaPipelineStepType.UpdateDataBaseMedia,
]
const WiFiDownloadAndWriteVideoSteps = [
MediaPipelineStepType.FetchMediaInfo,
MediaPipelineStepType.DownloadMediaThumbnailByHttp,
MediaPipelineStepType.SaveMediaThumbnail,
MediaPipelineStepType.InsertMediaToDataBase,
MediaPipelineStepType.DownloadMediaOriginByHttp,
MediaPipelineStepType.SaveMediaToAlbum,
MediaPipelineStepType.UpdateDataBaseMedia,
]
- 再让
MediaPipelineType和 数组 对应, 然后在使用时将MediaPipelineStepType转为对应函数即可
export const MediaPipelineTypeSteps: ZMap<MediaPipelineType, MediaPipelineStepType[]> = {
[MediaPipelineType.WiFiDownloadAndWritePhoto]: WiFiDownloadAndWritePhotoSteps,
[MediaPipelineType.WiFiWriteVideoWithPreview]: WiFiWriteVideoWithPreviewSteps,
...
}
// 使用前,将 enum 转为
public static pipeLine(type: MediaPipelineType) {
let steps: MediaPipelineStep[] = MediaPipelineTypeSteps[type].map(type => new MediaPipelineStep(type))
return new MediaPipeline(steps)
}
执行部分: MediaPipelineTask
上面提到, 流程本身其实只是个装了很多函数的数组。
现在来看怎么让数组里的函数跑起来,(这个实现是基于 rxjs,这是个非常好用的工具,这里不展开 Observerable 的概念)
类定义:
taskID: 用于外界管理的标识符
pipeLineType: 当前执行的流程类型
_state: BehaviorSubjec: 用于对外广播当前的执行状态, 便于外界管理
export default class MediaPipelineTask {
/** 任务 ID */
readonly taskID: number
readonly mediaName: string
/** 执行的流程类型 */
readonly pipeLineType: MediaPipelineType
readonly date: Date
/** 是否是恢复执行的 task */
private _isResume: boolean = false
/** 是否正在执行中 */
private _isWorking: boolean = false
/** 用于对外广播的当前状态 */
private _state: BehaviorSubject<MediaPipelineTaskState>
/** 当前执行的步骤名称 */
currentStage: MediaPipelineTaskStage
/** 当前 MediaPipeline 临时保存的上下文 */
context: MediaPipelineTaskContext
}
函数部分:
构造函数:记得前面提到过,即使在 APP 被杀掉时也要能恢复吗。 其实现就是 Task 从数据库把自己读出来,然后步骤,上下文就有了
static taskFromRecord(record: MediaDownloadRecord): MediaPipelineTask {}
控制函数:
get isResume(): boolean {}
get isWorking(): boolean {}
readonly pause = () => { }
readonly cancel = () => {}
private getTaskState = (): MediaPipelineTaskState => {}
执行函数:使用是调用 rxjs,这是核心部分
stepsForTask:之前 pipeline 中有提供把 step enum 转为函数的部分,而后又提供了从 task 读 MediaPipelineType 取得数组的部分。然后记得吗,函数内部是在返回 Observable, 这里就是把 Observable 拿出来。放到 pipe 中。
然后就能用流的方式,处理异步操作
ps.rxjs 这个工具真的非常便捷,但也要意识到,本质是在做异步任务处理,有很多种别的方式能实现
readonly runInPipeline = (pipeLine: MediaPipeline): BehaviorSubject<MediaPipelineTaskState> => {
if (this.isWorking) {...}
this._isWorking = true
this._state = new BehaviorSubject<MediaPipelineTaskState>(this.getTaskState())
Rx.of(this).pipe(
...pipeLine.stepsForTask(this)
).subscribe({
next: (task: MediaPipelineTask) => {},
error: (error:any) => {
this._isWorking = false
this.state.error(error)
},
complete: () => {
this._isWorking = false
this.state.complete()
},
})
return this.state
}
单个流程的执行部分,到这里基本就结束了。
Task 会在执行过程中被创建很多很多个, 每个 Task 有自己的 MediaPipelineType, 然后会用这个 Type 找到需要执行的MediaPipeline,再通过流程中描述的函数MediaPipelineStep执行具体的操作。
为了能被外界管理,对外暴露了管理接口 pause, cancel
为了让外界知道自己的状态, 用BehaviorSubject对外做了广播
然后到此为止, 这套东西实现了:
- 步骤可以复用
- 可以灵活拼装(可动态生成, 可随时写出一个新的流程)
- 外部可在任意步骤执行中断
- 步骤可动态的决定执行与否
还差这些没实现:
- 外界能统一管理所有执行中的任务
- 某个任务被暂停后,即使是杀 APP,也要能恢复
这一部分就只能依托于管理类
全局管理器:MediaDownloadManager
这部分其实已经是业务绑定很强的类, 会直接拿着业务状态,根据状态做内部处理
里面提供了一个管理当前所有 task 的数组, 用于查重,控制,恢复。对外暴露部分接口
private tasks: MediaPipelineTask[] = []
public readonly getTask = (mediaName: string): MediaPipelineTask | undefined => {}
public readonly getVideoTasks = (sort?: (a: MediaPipelineTask, b: MediaPipelineTask) => number) => {}
public readonly getTasks = (sort?: (a: MediaPipelineTask, b: MediaPipelineTask) => number) => {}
public restoreTasks = () => {}
public resumeTasks = (mediaNames: string[]) => {}
public pauseTasksWitPipelineType = (type: MediaPipelineType) => {}
public pauseTasks = (mediaNames: string[]) => {}
提供了业务接口,将底层的流程封装成直接的函数,供外界直接调用
public downloadAndAddPhotoRecord = (mediaName: string): MediaPipelineTask => {}
public downloadAndAddVideoRecord = (mediaName: string): MediaPipelineTask => {}
public downloadAndAddVideoRecordWithPreview = (...): MediaPipelineTask => {}
除了这些接口外, 比较重要的逻辑是, 承担了恢复 task 的工作
- 为了能在断线, APP 被杀情况恢复 task, 为 task 准备了一张数据库表
manager实时监听task的广播,实时更新数据库(记得前面 task 的广播吗,这是其中一个使用的地方)private runAndSubscribeTask = (task: MediaPipelineTask, pipeLine: MediaPipeline) => { if (!this.downloadable) {return} task.runInPipeline(pipeLine).subscribe({ next: (state) => { DBManager.updateDownloadRecord(task.taskID, { mediaName: task.mediaName, pipeLineType: task.pipeLineType, currentStep: state.stepType, currentStepProgress: state.stepProgress ? state.stepProgress : 0, context: state.context }) }, error: errorCode => {this.dealWithTaskError(task, errorCode)}, complete: () => {this.completeTask(task)}, }) }
最后,就是一些跟业务相关的维护工作,清理现场等
