鸿蒙中的并发线程间通信、线程间通信对象
目录
- 并发线程间通信
- 1. 线程间通信对象
- 1.1 普通对象
- 1.2 ArrayBuufer对象
- 1.3 SharedArrayBuffer对象
- 1.4 Transferable对象(NativeBinding对象)
- 1.5 Sendable对象
- 简介
- 异步锁
- ASON解析与生成
- 共享容器
- 共享模块
- Sendable对象冻结
- 2 线程间通信场景
- 2.1 使用TaskPool执行独立的耗时任务
- 2.2 使用TaskPool执行多个耗时任务
- 2.3 TaskPool任务与宿主线程通信
- 2.4 Worker和宿主线程的即时消息通信
- 2.5 Worker同步调用宿主线程的接口
并发线程间通信
线程间通信指的是并发多线程间存在的数据交换行为。
对于不同的数据对象,在ArkTS线程间通信的行为是有差异的,比如普通JS对象、ArrayBuffer对象、SharedArrayBuffer对象等,跨线程的行为是不一致的,包括序列化反序列化拷贝、数据转移、数据共享等不同行为。
以JS对象为例,其在并发任务间的通信采用了标准的Structure Clone算法(序列化反序列化),通过序列化将JS对象转成与引擎无关的数据(字符串或内存块等),在另一个并发实例通过反序列化,还原成与原JS对象内容一致的新对象,因此通常需要经过深拷贝,效率较低。
ArkTS目前主要提供两种并发能力支持线程间通信:TaskPool和Worker。
- Worker是Actor并发模型标准的跨线程通信API,与Web Worker或者Node.js Worker的使用方式基本一致。
- TaskPool提供了功能更强、并发编程更简易的任务池API。其中TaskPool涉及跨并发实例的对象传递行为与Worker一致,还是采用了标准的Structured Clone算法,并发通信的对象越大,耗时就越长。
1. 线程间通信对象
1.1 普通对象
普通对象跨线程时通过拷贝形式传递,两个线程的对象内容一致,但是指向各自线程的隔离内存区间,被分配在各自线程的虚拟机本地堆(LocalHeap)。
说明
普通类实例对象跨线程通过拷贝形式传递,只能传递数据,类实例上的方法会丢失。可以使用@Sendable装饰器标识为Sendable类,类实例对象跨线程传递后,可携带类方法。
1.2 ArrayBuufer对象
ArrayBuffer内部包含一块Native内存,该ArrayBuffer的JS对象壳被分配在虚拟机本地堆(LocalHeap)。与普通对象一样,需要经过序列化与反序列化拷贝传递,但是Native内存有两种传输方式:拷贝和转移。
传输时采用拷贝的话,需要经过深拷贝(递归遍历),传输后两个线程都可以独立访问ArrayBuffer。通信过程如下图所示:
如果采用转移的方式,则原线程无法使用此ArrayBuffer对象,跨线程时只需重建JS壳,Native内存无需拷贝,效率更高。通信过程如下图所示:
ArrayBuffer可以用来表示图片等资源,在应用开发中,会遇到需要进行图片处理的场景(比如需要调整一张图片的亮度、饱和度、大小等),为了避免阻塞UI主线程,可以将图片传递到子线程中执行这些操作。转移方式性能更高,但是原线程不能再访问ArrayBuffer对象,如果两个线程都需要访问,则需要采用拷贝方式,否则建议采用转移方式,提升性能。
1.3 SharedArrayBuffer对象
SharedArrayBuffer内部包含一块Native内存,其JS对象壳被分配在虚拟机本地堆(LocalHeap)。支持跨并发实例间共享,但是访问及修改需要采用Atomics类,防止数据竞争。SharedArrayBuffer可以用于多个并发实例间的状态共享或者数据共享。通信过程如下图所示:
1.4 Transferable对象(NativeBinding对象)
Transferable对象(也称为NativeBinding对象)指的是一个JS对象,绑定了一个C++对象,且主体功能由C++提供,其JS对象壳被分配在虚拟机本地堆(LocalHeap)。跨线程传输时可以直接复用同一个C++对象,相比于JS对象的拷贝模式,传输效率较高。因此,可共享或转移的NativeBinding对象也被称为Transferable对象。
共享模式
如果C++实现能够保证线程安全性,则这个NativeBinding对象的C++部分可以支持共享传输。此时,NativeBinding对象跨线程传输后,只需要重新创建JS壳,就可以桥接到相同的C++对象上。通信过程如下图所示:
常见的共享模式NativeBinding对象包括Context,Context对象包含应用程序组件的上下文信息,它提供了一种访问系统服务和资源的方式,使得应用程序组件可以与系统进行交互。
转移模式
如果C++实现包含了数据,且无法保证线程安全性,则这个NativeBinding对象的C++部分需要采用转移方式传输。此时,NativeBinding对象跨线程传输后,只需要重新创建JS壳,就可以桥接到C++对象上,不过原对象需要移除对此对象的绑定关系。通信过程如下图所示:
1.5 Sendable对象
简介
Sendable对象为可共享的,其跨线程前后指向同一个JS对象,如果其包含了JS或者Native内容,均可以直接共享,如果底层是Native实现的,则需要考虑线程安全性。通信过程如下图所示:
与其它ArkTS对象不一样的是,符合Sendable协议的数据对象在运行时必须是类型固定的对象。
实现原理
当多个并发实例尝试同时更新Sendable数据时,会发生数据竞争,例如ArkTS共享容器的多线程操作。因此,ArkTS提供了异步锁的机制来避免不同并发实例间的数据竞争。同时,还可以通过对象冻结接口冻结对象,将其变为只读对象,就可以不用考虑数据的竞争问题。
Sendable对象提供了并发实例间高效的通信效率,即引用传递的能力,一般适用于开发者自定义大对象需要线程间通信的场景,例如子线程读取数据库的数据返回宿主线程。
异步锁
为了解决多线程并发任务间的数据竞争问题,ArkTS引入了异步锁能力。由于ArkTS语言支持异步操作,阻塞锁容易产生死锁问题,因此在ArkTS中仅支持异步锁(非阻塞式锁)
ASON解析与生成
ASON则提供了Sendable对象的序列化、反序列化能力。可以通过ASON.stringify方法将对象转换成字符串,也可以通过ASON.parse方法将字符串转成Sendable对象,以便此对象在并发任务间进行高性能引用传递。
说明
ASON.parse默认生成的对象为Sendable对象,布局不可变,不支持增删属性。如果需要支持返回对象的布局可变,可以指定返回类型为MAP,此时会全部返回collections.Map对象,支持增删属性。
共享容器
ArkTS共享容器是一种在并发任务间共享传输的容器类,可以用于并发场景下的高性能数据传递。ArkTS共享容器在多个并发任务间传递时,其默认行为是引用传递,支持多个并发任务可以操作同一个容器实例。另外,也支持拷贝传递,即每个并发任务持有一个ArkTS容器实例。ArkTS共享容器并不是线程安全的,内部使用了fail-fast(快速失败)机制,即当检测多个并发实例同时对容器进行结构性改变时,会触发异常。
共享模块
共享模块是进程内只会加载一次的模块,使用"use shared"这一指令来标记一个模块是否为共享模块。
非共享模块在同一线程内只加载一次,在不同线程间会加载多次,在不同的线程内都会产生新的模块对象。因此可以使用共享模块来实现进程单例。
Sendable对象冻结
Sendable对象支持冻结操作,冻结后的对象变成只读对象,不能增删改属性,因此在多个并发实例间访问均不需要加锁,可以通过调用Object.freeze接口冻结对象。
2 线程间通信场景
2.1 使用TaskPool执行独立的耗时任务
对于一个独立运行的耗时任务,只需要在任务执行完毕后将结果返回给宿主线程,没有上下文依赖,可以通过以下方式实现。
加载图片
-
实现子线程需要执行的任务。
// IconItemSource.ets export class IconItemSource {image: string | Resource = '';text: string | Resource = '';constructor(image: string | Resource = '', text: string | Resource = '') {this.image = image;this.text = text;} }
// IndependentTask.ets import { IconItemSource } from './IconItemSource';// 在Task中执行的方法,需要添加@Concurrent注解,否则无法正常调用。 @Concurrent export function loadPicture(count: number): IconItemSource[] {let iconItemSourceList: IconItemSource[] = [];// 遍历添加6*count个IconItem的数据for (let index = 0; index < count; index++) {const numStart: number = index * 6;// 此处循环使用6张图片资源iconItemSourceList.push(new IconItemSource('$media:startIcon', `item${numStart + 1}`));iconItemSourceList.push(new IconItemSource('$media:background', `item${numStart + 2}`));iconItemSourceList.push(new IconItemSource('$media:foreground', `item${numStart + 3}`));iconItemSourceList.push(new IconItemSource('$media:startIcon', `item${numStart + 4}`));iconItemSourceList.push(new IconItemSource('$media:background', `item${numStart + 5}`));iconItemSourceList.push(new IconItemSource('$media:foreground', `item${numStart + 6}`));}return iconItemSourceList; }
-
通过TaskPool中的execute方法执行上述任务,即加载图片。
// Index.ets import { taskpool } from '@kit.ArkTS'; import { IconItemSource } from './IconItemSource'; import { loadPicture } from './IndependentTask';@Entry @Component struct Index {@State message: string = 'Hello World';build() {Row() {Column() {Text(this.message).fontSize(50).fontWeight(FontWeight.Bold).onClick(() => {let iconItemSourceList: IconItemSource[] = [];// 创建Tasklet lodePictureTask: taskpool.Task = new taskpool.Task(loadPicture, 30);// 执行Task,并返回结果taskpool.execute(lodePictureTask).then((res: object) => {// loadPicture方法的执行结果iconItemSourceList = res as IconItemSource[];})})}.width('100%')}.height('100%')} }
2.2 使用TaskPool执行多个耗时任务
如果有多个任务同时执行,由于任务的复杂度不同,执行时间会不一样,返回数据的时间也是不可控的。如果宿主线程需要所有任务执行完毕的数据,那么可以通过下面这种方式实现。
除此以外,如果需要处理的数据量较大(比如一个列表中有10000条数据),把这些数据都放在一个Task中处理也是比较耗时的。那么就可以将原始数据拆分成多个列表,并将每个子列表分配给一个独立的Task进行执行,并且等待全部执行完毕后拼成完整的数据,这样可以节省处理时间,提升用户体验。
多个任务进行图片加载
-
实现子线程需要执行的任务。
// IconItemSource.ets export class IconItemSource {image: string | Resource = '';text: string | Resource = '';constructor(image: string | Resource = '', text: string | Resource = '') {this.image = image;this.text = text;} }
// IndependentTask.ets import { IconItemSource } from './IconItemSource';// 在Task中执行的方法,需要添加@Concurrent注解,否则无法正常调用。 @Concurrent export function loadPicture(count: number): IconItemSource[] {let iconItemSourceList: IconItemSource[] = [];// 遍历添加6*count个IconItem的数据for (let index = 0; index < count; index++) {const numStart: number = index * 6;// 此处循环使用6张图片资源iconItemSourceList.push(new IconItemSource('$media:startIcon', `item${numStart + 1}`));iconItemSourceList.push(new IconItemSource('$media:background', `item${numStart + 2}`));iconItemSourceList.push(new IconItemSource('$media:foreground', `item${numStart + 3}`));iconItemSourceList.push(new IconItemSource('$media:startIcon', `item${numStart + 4}`));iconItemSourceList.push(new IconItemSource('$media:background', `item${numStart + 5}`));iconItemSourceList.push(new IconItemSource('$media:foreground', `item${numStart + 6}`));}return iconItemSourceList; }
-
将需要执行的Task放到了一个TaskGroup里面,当TaskGroup中所有的Task都执行完毕后,会把每个Task运行的结果都放在一个数组中返回到宿主线程,而不是每执行完一个Task就返回一次,这样就可以在返回的数据里拿到所有的Task执行结果,方便宿主线程使用。
// MultiTask.ets import { taskpool } from '@kit.ArkTS'; import { IconItemSource } from './IconItemSource'; import { loadPicture } from './IndependentTask';let iconItemSourceList: IconItemSource[][];let taskGroup: taskpool.TaskGroup = new taskpool.TaskGroup(); taskGroup.addTask(new taskpool.Task(loadPicture, 30)); taskGroup.addTask(new taskpool.Task(loadPicture, 20)); taskGroup.addTask(new taskpool.Task(loadPicture, 10)); taskpool.execute(taskGroup).then((ret: object) => {let tmpLength = (ret as IconItemSource[][]).lengthfor (let i = 0; i < tmpLength; i++) {for (let j = 0; j < ret[i].length; j++) {iconItemSourceList.push(ret[i][j]);}} })
2.3 TaskPool任务与宿主线程通信
如果一个Task,不仅需要返回最后的执行结果,而且需要定时通知宿主线程状态、数据的变化,或者需要分段返回数量级较大的数据(比如从数据库中读取大量数据),可以通过下面这种方式实现。
多个图片加载任务结果实时返回
-
首先,实现一个方法,用来接收Task发送的消息。
// TaskSendDataUsage.ets function notice(data: number): void {console.info("子线程任务已执行完,共加载图片: ", data); }
-
然后,在Task需要执行的任务中,添加sendData()接口将消息发送给宿主线程。
// IconItemSource.ets export class IconItemSource {image: string | Resource = '';text: string | Resource = '';constructor(image: string | Resource = '', text: string | Resource = '') {this.image = image;this.text = text;} }
// TaskSendDataUsage.ets import { taskpool } from '@kit.ArkTS'; import { IconItemSource } from './IconItemSource';// 通过Task的sendData方法,即时通知宿主线程信息 @Concurrent export function loadPictureSendData(count: number): IconItemSource[] {let iconItemSourceList: IconItemSource[] = [];// 遍历添加6*count个IconItem的数据for (let index = 0; index < count; index++) {const numStart: number = index * 6;// 此处循环使用6张图片资源iconItemSourceList.push(new IconItemSource('$media:startIcon', `item${numStart + 1}`));iconItemSourceList.push(new IconItemSource('$media:background', `item${numStart + 2}`));iconItemSourceList.push(new IconItemSource('$media:foreground', `item${numStart + 3}`));iconItemSourceList.push(new IconItemSource('$media:startIcon', `item${numStart + 4}`));iconItemSourceList.push(new IconItemSource('$media:background', `item${numStart + 5}`));iconItemSourceList.push(new IconItemSource('$media:foreground', `item${numStart + 6}`));taskpool.Task.sendData(iconItemSourceList.length);}return iconItemSourceList; }
-
最后,在宿主线程通过onReceiveData()接口接收消息。
这样宿主线程就可以通过notice()接口接收到Task发送的数据。
// TaskSendDataUsage.ets @Entry @Component struct Index {@State message: string = 'Hello World';build() {Row() {Column() {Text(this.message).fontSize(50).fontWeight(FontWeight.Bold).onClick(() => {let iconItemSourceList: IconItemSource[];let lodePictureTask: taskpool.Task = new taskpool.Task(loadPictureSendData, 30);// 设置notice方法接收Task发送的消息lodePictureTask.onReceiveData(notice);taskpool.execute(lodePictureTask).then((res: object) => {iconItemSourceList = res as IconItemSource[];})})}.width('100%')}.height('100%')} }
2.4 Worker和宿主线程的即时消息通信
在ArkTS中,Worker相对于Taskpool存在一定的差异性,有数量限制但是可以长时间存在。一个Worker中可能会执行多个不同的任务,每个任务执行的时长或者返回的结果可能都不相同,宿主线程需要根据情况调用Worker中的不同方法,Worker则需要及时地将结果返回给宿主线程。
Worker响应"hello world"请求
-
首先,创建一个执行多个任务Worker。
// Worker.ets import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker } from '@kit.ArkTS';const workerPort: ThreadWorkerGlobalScope = worker.workerPort; // Worker接收宿主线程的消息,做相应的处理 workerPort.onmessage = (e: MessageEvents): void => {if (e.data === 'hello world') {workerPort.postMessage('success');} }
-
这里的宿主线程为UI主线程,在宿主线程中创建这个Worker的对象,在点击Button的时候调用postmessage向Worker发送消息,通过Worker的onmessage方法接收Worker返回的数据。
// Index.ets import { worker } from '@kit.ArkTS'; import { BusinessError } from '@kit.BasicServicesKit';function promiseCase() {let p: Promise<void> = new Promise<void>((resolve: Function, reject: Function) => {setTimeout(() => {resolve(1);}, 100)}).then(undefined, (error: BusinessError) => {})return p; }async function postMessageTest() {let ss = new worker.ThreadWorker("entry/ets/workers/Worker.ets");let res = undefined;let flag = false;let isTerminate = false;ss.onexit = () => {isTerminate = true;}// 接收Worker线程发送的消息ss.onmessage = (e) => {res = e.data;flag = true;console.info("worker:: res is " + res);}// 给Worker线程发送消息ss.postMessage("hello world");while (!flag) {await promiseCase();}ss.terminate();while (!isTerminate) {await promiseCase();} }@Entry @Component struct Index {@State message: string = 'Hello World';build() {Row() {Column() {Text(this.message).fontSize(50).fontWeight(FontWeight.Bold).onClick(() => {postMessageTest();})}.width('100%')}.height('100%')} }
2.5 Worker同步调用宿主线程的接口
如果一个接口在主线程中已经实现了,Worker需要调用该接口,那么可以使用下面这种方式实现。
-
首先,在宿主线程实现需要调用的接口,并且创建Worker对象,在Worker上注册需要调用的接口。
// IconItemSource.ets export class IconItemSource {image: string | Resource = '';text: string | Resource = '';constructor(image: string | Resource = '', text: string | Resource = '') {this.image = image;this.text = text;} }
// WorkerCallGlobalUsage.ets import worker from '@ohos.worker'; import { IconItemSource } from './IconItemSource';// 创建Worker对象 const workerInstance: worker.ThreadWorker = new worker.ThreadWorker("entry/ets/pages/workers/Worker.ts");class PicData {public iconItemSourceList: IconItemSource[] = [];public setUp(): string {for (let index = 0; index < 20; index++) {const numStart: number = index * 6;// 此处循环使用6张图片资源this.iconItemSourceList.push(new IconItemSource('$media:startIcon', `item${numStart + 1}`));this.iconItemSourceList.push(new IconItemSource('$media:background', `item${numStart + 2}`));this.iconItemSourceList.push(new IconItemSource('$media:foreground', `item${numStart + 3}`));this.iconItemSourceList.push(new IconItemSource('$media:startIcon', `item${numStart + 4}`));this.iconItemSourceList.push(new IconItemSource('$media:background', `item${numStart + 5}`));this.iconItemSourceList.push(new IconItemSource('$media:foreground', `item${numStart + 6}`));}return "setUpIconItemSourceList success!";} }let picData = new PicData(); // 在Worker上注册需要调用的对象 workerInstance.registerGlobalCallObject("picData", picData); workerInstance.postMessage("run setUp in picData");
-
然后,在Worker中通过callGlobalCallObjectMethod接口就可以调用宿主线程中的setUp()方法了。
// Worker.ets import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker } from '@kit.ArkTS'; const workerPort: ThreadWorkerGlobalScope = worker.workerPort; try {// 调用方法无入参let res: string = workerPort.callGlobalCallObjectMethod("picData", "setUp", 0) as string;console.error("worker: ", res); } catch (error) {// 异常处理console.error("worker: error code is " + error.code + " error message is " + error.message); }