当前位置: 首页 > news >正文

鸿蒙中的并发线程间通信、线程间通信对象

目录

  • 并发线程间通信
    • 1. 线程间通信对象
      • 1.1 普通对象
      • 1.2 ArrayBuufer对象
      • 1.3 SharedArrayBuffer对象
      • 1.4 Transferable对象(NativeBinding对象)
      • 1.5 Sendable对象
        • 简介
        • 异步锁
        • ASON解析与生成
        • 共享容器
        • 共享模块
        • Sendable对象冻结
    • 2 线程间通信场景
      • 2.1 使用TaskPool执行独立的耗时任务
      • 2.2 使用TaskPool执行多个耗时任务
      • 2.3 TaskPool任务与宿主线程通信
      • 2.4 Worker和宿主线程的即时消息通信
      • 2.5 Worker同步调用宿主线程的接口

并发线程间通信

线程间通信指的是并发多线程间存在的数据交换行为。

对于不同的数据对象,在ArkTS线程间通信的行为是有差异的,比如普通JS对象、ArrayBuffer对象、SharedArrayBuffer对象等,跨线程的行为是不一致的,包括序列化反序列化拷贝、数据转移、数据共享等不同行为。

以JS对象为例,其在并发任务间的通信采用了标准的Structure Clone算法(序列化反序列化),通过序列化将JS对象转成与引擎无关的数据(字符串或内存块等),在另一个并发实例通过反序列化,还原成与原JS对象内容一致的新对象,因此通常需要经过深拷贝,效率较低。

ArkTS目前主要提供两种并发能力支持线程间通信:TaskPool和Worker。

  • Worker是Actor并发模型标准的跨线程通信API,与Web Worker或者Node.js Worker的使用方式基本一致。
  • TaskPool提供了功能更强、并发编程更简易的任务池API。其中TaskPool涉及跨并发实例的对象传递行为与Worker一致,还是采用了标准的Structured Clone算法,并发通信的对象越大,耗时就越长。

1. 线程间通信对象

1.1 普通对象

普通对象跨线程时通过拷贝形式传递,两个线程的对象内容一致,但是指向各自线程的隔离内存区间,被分配在各自线程的虚拟机本地堆(LocalHeap)。

img

说明

普通类实例对象跨线程通过拷贝形式传递,只能传递数据,类实例上的方法会丢失。可以使用@Sendable装饰器标识为Sendable类,类实例对象跨线程传递后,可携带类方法。

1.2 ArrayBuufer对象

ArrayBuffer内部包含一块Native内存,该ArrayBuffer的JS对象壳被分配在虚拟机本地堆(LocalHeap)。与普通对象一样,需要经过序列化与反序列化拷贝传递,但是Native内存有两种传输方式:拷贝和转移。

传输时采用拷贝的话,需要经过深拷贝(递归遍历),传输后两个线程都可以独立访问ArrayBuffer。通信过程如下图所示:

img

如果采用转移的方式,则原线程无法使用此ArrayBuffer对象,跨线程时只需重建JS壳,Native内存无需拷贝,效率更高。通信过程如下图所示:

img

ArrayBuffer可以用来表示图片等资源,在应用开发中,会遇到需要进行图片处理的场景(比如需要调整一张图片的亮度、饱和度、大小等),为了避免阻塞UI主线程,可以将图片传递到子线程中执行这些操作。转移方式性能更高,但是原线程不能再访问ArrayBuffer对象,如果两个线程都需要访问,则需要采用拷贝方式,否则建议采用转移方式,提升性能。

1.3 SharedArrayBuffer对象

SharedArrayBuffer内部包含一块Native内存,其JS对象壳被分配在虚拟机本地堆(LocalHeap)。支持跨并发实例间共享,但是访问及修改需要采用Atomics类,防止数据竞争。SharedArrayBuffer可以用于多个并发实例间的状态共享或者数据共享。通信过程如下图所示:

img

1.4 Transferable对象(NativeBinding对象)

Transferable对象(也称为NativeBinding对象)指的是一个JS对象,绑定了一个C++对象,且主体功能由C++提供,其JS对象壳被分配在虚拟机本地堆(LocalHeap)。跨线程传输时可以直接复用同一个C++对象,相比于JS对象的拷贝模式,传输效率较高。因此,可共享或转移的NativeBinding对象也被称为Transferable对象。

共享模式

如果C++实现能够保证线程安全性,则这个NativeBinding对象的C++部分可以支持共享传输。此时,NativeBinding对象跨线程传输后,只需要重新创建JS壳,就可以桥接到相同的C++对象上。通信过程如下图所示:

img

常见的共享模式NativeBinding对象包括Context,Context对象包含应用程序组件的上下文信息,它提供了一种访问系统服务和资源的方式,使得应用程序组件可以与系统进行交互。

转移模式

如果C++实现包含了数据,且无法保证线程安全性,则这个NativeBinding对象的C++部分需要采用转移方式传输。此时,NativeBinding对象跨线程传输后,只需要重新创建JS壳,就可以桥接到C++对象上,不过原对象需要移除对此对象的绑定关系。通信过程如下图所示:

img

1.5 Sendable对象

简介

Sendable对象为可共享的,其跨线程前后指向同一个JS对象,如果其包含了JS或者Native内容,均可以直接共享,如果底层是Native实现的,则需要考虑线程安全性。通信过程如下图所示:

img

与其它ArkTS对象不一样的是,符合Sendable协议的数据对象在运行时必须是类型固定的对象。

实现原理

当多个并发实例尝试同时更新Sendable数据时,会发生数据竞争,例如ArkTS共享容器的多线程操作。因此,ArkTS提供了异步锁的机制来避免不同并发实例间的数据竞争。同时,还可以通过对象冻结接口冻结对象,将其变为只读对象,就可以不用考虑数据的竞争问题。

Sendable对象提供了并发实例间高效的通信效率,即引用传递的能力,一般适用于开发者自定义大对象需要线程间通信的场景,例如子线程读取数据库的数据返回宿主线程。

img

异步锁

为了解决多线程并发任务间的数据竞争问题,ArkTS引入了异步锁能力。由于ArkTS语言支持异步操作,阻塞锁容易产生死锁问题,因此在ArkTS中仅支持异步锁(非阻塞式锁)

ASON解析与生成

ASON则提供了Sendable对象的序列化、反序列化能力。可以通过ASON.stringify方法将对象转换成字符串,也可以通过ASON.parse方法将字符串转成Sendable对象,以便此对象在并发任务间进行高性能引用传递。

说明

ASON.parse默认生成的对象为Sendable对象,布局不可变,不支持增删属性。如果需要支持返回对象的布局可变,可以指定返回类型为MAP,此时会全部返回collections.Map对象,支持增删属性。

共享容器

ArkTS共享容器是一种在并发任务间共享传输的容器类,可以用于并发场景下的高性能数据传递。ArkTS共享容器在多个并发任务间传递时,其默认行为是引用传递,支持多个并发任务可以操作同一个容器实例。另外,也支持拷贝传递,即每个并发任务持有一个ArkTS容器实例。ArkTS共享容器并不是线程安全的,内部使用了fail-fast(快速失败)机制,即当检测多个并发实例同时对容器进行结构性改变时,会触发异常。

共享模块

共享模块是进程内只会加载一次的模块,使用"use shared"这一指令来标记一个模块是否为共享模块。

非共享模块在同一线程内只加载一次,在不同线程间会加载多次,在不同的线程内都会产生新的模块对象。因此可以使用共享模块来实现进程单例。

Sendable对象冻结

Sendable对象支持冻结操作,冻结后的对象变成只读对象,不能增删改属性,因此在多个并发实例间访问均不需要加锁,可以通过调用Object.freeze接口冻结对象。

2 线程间通信场景

2.1 使用TaskPool执行独立的耗时任务

对于一个独立运行的耗时任务,只需要在任务执行完毕后将结果返回给宿主线程,没有上下文依赖,可以通过以下方式实现。

加载图片

  1. 实现子线程需要执行的任务。

    // IconItemSource.ets
    export class IconItemSource {image: string | Resource = '';text: string | Resource = '';constructor(image: string | Resource = '', text: string | Resource = '') {this.image = image;this.text = text;}
    }
    
    // IndependentTask.ets
    import { IconItemSource } from './IconItemSource';// 在Task中执行的方法,需要添加@Concurrent注解,否则无法正常调用。
    @Concurrent
    export function loadPicture(count: number): IconItemSource[] {let iconItemSourceList: IconItemSource[] = [];// 遍历添加6*count个IconItem的数据for (let index = 0; index < count; index++) {const numStart: number = index * 6;// 此处循环使用6张图片资源iconItemSourceList.push(new IconItemSource('$media:startIcon', `item${numStart + 1}`));iconItemSourceList.push(new IconItemSource('$media:background', `item${numStart + 2}`));iconItemSourceList.push(new IconItemSource('$media:foreground', `item${numStart + 3}`));iconItemSourceList.push(new IconItemSource('$media:startIcon', `item${numStart + 4}`));iconItemSourceList.push(new IconItemSource('$media:background', `item${numStart + 5}`));iconItemSourceList.push(new IconItemSource('$media:foreground', `item${numStart + 6}`));}return iconItemSourceList;
    }
    
  2. 通过TaskPool中的execute方法执行上述任务,即加载图片。

    // Index.ets
    import { taskpool } from '@kit.ArkTS';
    import { IconItemSource } from './IconItemSource';
    import { loadPicture } from './IndependentTask';@Entry
    @Component
    struct Index {@State message: string = 'Hello World';build() {Row() {Column() {Text(this.message).fontSize(50).fontWeight(FontWeight.Bold).onClick(() => {let iconItemSourceList: IconItemSource[] = [];// 创建Tasklet lodePictureTask: taskpool.Task = new taskpool.Task(loadPicture, 30);// 执行Task,并返回结果taskpool.execute(lodePictureTask).then((res: object) => {// loadPicture方法的执行结果iconItemSourceList = res as IconItemSource[];})})}.width('100%')}.height('100%')}
    }
    

2.2 使用TaskPool执行多个耗时任务

如果有多个任务同时执行,由于任务的复杂度不同,执行时间会不一样,返回数据的时间也是不可控的。如果宿主线程需要所有任务执行完毕的数据,那么可以通过下面这种方式实现。

除此以外,如果需要处理的数据量较大(比如一个列表中有10000条数据),把这些数据都放在一个Task中处理也是比较耗时的。那么就可以将原始数据拆分成多个列表,并将每个子列表分配给一个独立的Task进行执行,并且等待全部执行完毕后拼成完整的数据,这样可以节省处理时间,提升用户体验。

多个任务进行图片加载

  1. 实现子线程需要执行的任务。

    // IconItemSource.ets
    export class IconItemSource {image: string | Resource = '';text: string | Resource = '';constructor(image: string | Resource = '', text: string | Resource = '') {this.image = image;this.text = text;}
    }
    
    // IndependentTask.ets
    import { IconItemSource } from './IconItemSource';// 在Task中执行的方法,需要添加@Concurrent注解,否则无法正常调用。
    @Concurrent
    export function loadPicture(count: number): IconItemSource[] {let iconItemSourceList: IconItemSource[] = [];// 遍历添加6*count个IconItem的数据for (let index = 0; index < count; index++) {const numStart: number = index * 6;// 此处循环使用6张图片资源iconItemSourceList.push(new IconItemSource('$media:startIcon', `item${numStart + 1}`));iconItemSourceList.push(new IconItemSource('$media:background', `item${numStart + 2}`));iconItemSourceList.push(new IconItemSource('$media:foreground', `item${numStart + 3}`));iconItemSourceList.push(new IconItemSource('$media:startIcon', `item${numStart + 4}`));iconItemSourceList.push(new IconItemSource('$media:background', `item${numStart + 5}`));iconItemSourceList.push(new IconItemSource('$media:foreground', `item${numStart + 6}`));}return iconItemSourceList;
    }
    
  2. 将需要执行的Task放到了一个TaskGroup里面,当TaskGroup中所有的Task都执行完毕后,会把每个Task运行的结果都放在一个数组中返回到宿主线程,而不是每执行完一个Task就返回一次,这样就可以在返回的数据里拿到所有的Task执行结果,方便宿主线程使用。

    // MultiTask.ets
    import { taskpool } from '@kit.ArkTS';
    import { IconItemSource } from './IconItemSource';
    import { loadPicture } from './IndependentTask';let iconItemSourceList: IconItemSource[][];let taskGroup: taskpool.TaskGroup = new taskpool.TaskGroup();
    taskGroup.addTask(new taskpool.Task(loadPicture, 30));
    taskGroup.addTask(new taskpool.Task(loadPicture, 20));
    taskGroup.addTask(new taskpool.Task(loadPicture, 10));
    taskpool.execute(taskGroup).then((ret: object) => {let tmpLength = (ret as IconItemSource[][]).lengthfor (let i = 0; i < tmpLength; i++) {for (let j = 0; j < ret[i].length; j++) {iconItemSourceList.push(ret[i][j]);}}
    })
    

2.3 TaskPool任务与宿主线程通信

如果一个Task,不仅需要返回最后的执行结果,而且需要定时通知宿主线程状态、数据的变化,或者需要分段返回数量级较大的数据(比如从数据库中读取大量数据),可以通过下面这种方式实现。

多个图片加载任务结果实时返回

  1. 首先,实现一个方法,用来接收Task发送的消息。

    // TaskSendDataUsage.ets
    function notice(data: number): void {console.info("子线程任务已执行完,共加载图片: ", data);
    }
    
  2. 然后,在Task需要执行的任务中,添加sendData()接口将消息发送给宿主线程。

    // IconItemSource.ets
    export class IconItemSource {image: string | Resource = '';text: string | Resource = '';constructor(image: string | Resource = '', text: string | Resource = '') {this.image = image;this.text = text;}
    }
    
    // TaskSendDataUsage.ets
    import { taskpool } from '@kit.ArkTS';
    import { IconItemSource } from './IconItemSource';// 通过Task的sendData方法,即时通知宿主线程信息
    @Concurrent
    export function loadPictureSendData(count: number): IconItemSource[] {let iconItemSourceList: IconItemSource[] = [];// 遍历添加6*count个IconItem的数据for (let index = 0; index < count; index++) {const numStart: number = index * 6;// 此处循环使用6张图片资源iconItemSourceList.push(new IconItemSource('$media:startIcon', `item${numStart + 1}`));iconItemSourceList.push(new IconItemSource('$media:background', `item${numStart + 2}`));iconItemSourceList.push(new IconItemSource('$media:foreground', `item${numStart + 3}`));iconItemSourceList.push(new IconItemSource('$media:startIcon', `item${numStart + 4}`));iconItemSourceList.push(new IconItemSource('$media:background', `item${numStart + 5}`));iconItemSourceList.push(new IconItemSource('$media:foreground', `item${numStart + 6}`));taskpool.Task.sendData(iconItemSourceList.length);}return iconItemSourceList;
    }
    
  3. 最后,在宿主线程通过onReceiveData()接口接收消息。

    这样宿主线程就可以通过notice()接口接收到Task发送的数据。

    // TaskSendDataUsage.ets
    @Entry
    @Component
    struct Index {@State message: string = 'Hello World';build() {Row() {Column() {Text(this.message).fontSize(50).fontWeight(FontWeight.Bold).onClick(() => {let iconItemSourceList: IconItemSource[];let lodePictureTask: taskpool.Task = new taskpool.Task(loadPictureSendData, 30);// 设置notice方法接收Task发送的消息lodePictureTask.onReceiveData(notice);taskpool.execute(lodePictureTask).then((res: object) => {iconItemSourceList = res as IconItemSource[];})})}.width('100%')}.height('100%')}
    }
    

2.4 Worker和宿主线程的即时消息通信

在ArkTS中,Worker相对于Taskpool存在一定的差异性,有数量限制但是可以长时间存在。一个Worker中可能会执行多个不同的任务,每个任务执行的时长或者返回的结果可能都不相同,宿主线程需要根据情况调用Worker中的不同方法,Worker则需要及时地将结果返回给宿主线程。

Worker响应"hello world"请求

  1. 首先,创建一个执行多个任务Worker。

    // Worker.ets
    import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker } from '@kit.ArkTS';const workerPort: ThreadWorkerGlobalScope = worker.workerPort;
    // Worker接收宿主线程的消息,做相应的处理
    workerPort.onmessage = (e: MessageEvents): void => {if (e.data === 'hello world') {workerPort.postMessage('success');}
    }
    
  2. 这里的宿主线程为UI主线程,在宿主线程中创建这个Worker的对象,在点击Button的时候调用postmessage向Worker发送消息,通过Worker的onmessage方法接收Worker返回的数据。

    // Index.ets
    import { worker } from '@kit.ArkTS';
    import { BusinessError } from '@kit.BasicServicesKit';function promiseCase() {let p: Promise<void> = new Promise<void>((resolve: Function, reject: Function) => {setTimeout(() => {resolve(1);}, 100)}).then(undefined, (error: BusinessError) => {})return p;
    }async function postMessageTest() {let ss = new worker.ThreadWorker("entry/ets/workers/Worker.ets");let res = undefined;let flag = false;let isTerminate = false;ss.onexit = () => {isTerminate = true;}// 接收Worker线程发送的消息ss.onmessage = (e) => {res = e.data;flag = true;console.info("worker:: res is  " + res);}// 给Worker线程发送消息ss.postMessage("hello world");while (!flag) {await promiseCase();}ss.terminate();while (!isTerminate) {await promiseCase();}
    }@Entry
    @Component
    struct Index {@State message: string = 'Hello World';build() {Row() {Column() {Text(this.message).fontSize(50).fontWeight(FontWeight.Bold).onClick(() => {postMessageTest();})}.width('100%')}.height('100%')}
    }
    

2.5 Worker同步调用宿主线程的接口

如果一个接口在主线程中已经实现了,Worker需要调用该接口,那么可以使用下面这种方式实现。

  1. 首先,在宿主线程实现需要调用的接口,并且创建Worker对象,在Worker上注册需要调用的接口。

    // IconItemSource.ets
    export class IconItemSource {image: string | Resource = '';text: string | Resource = '';constructor(image: string | Resource = '', text: string | Resource = '') {this.image = image;this.text = text;}
    }
    
    // WorkerCallGlobalUsage.ets
    import worker from '@ohos.worker';
    import { IconItemSource } from './IconItemSource';// 创建Worker对象
    const workerInstance: worker.ThreadWorker = new worker.ThreadWorker("entry/ets/pages/workers/Worker.ts");class PicData {public iconItemSourceList: IconItemSource[] = [];public setUp(): string {for (let index = 0; index < 20; index++) {const numStart: number = index * 6;// 此处循环使用6张图片资源this.iconItemSourceList.push(new IconItemSource('$media:startIcon', `item${numStart + 1}`));this.iconItemSourceList.push(new IconItemSource('$media:background', `item${numStart + 2}`));this.iconItemSourceList.push(new IconItemSource('$media:foreground', `item${numStart + 3}`));this.iconItemSourceList.push(new IconItemSource('$media:startIcon', `item${numStart + 4}`));this.iconItemSourceList.push(new IconItemSource('$media:background', `item${numStart + 5}`));this.iconItemSourceList.push(new IconItemSource('$media:foreground', `item${numStart + 6}`));}return "setUpIconItemSourceList success!";}
    }let picData = new PicData();
    // 在Worker上注册需要调用的对象
    workerInstance.registerGlobalCallObject("picData", picData);
    workerInstance.postMessage("run setUp in picData");
    
  2. 然后,在Worker中通过callGlobalCallObjectMethod接口就可以调用宿主线程中的setUp()方法了。

    // Worker.ets
    import { ErrorEvent, MessageEvents, ThreadWorkerGlobalScope, worker } from '@kit.ArkTS';
    const workerPort: ThreadWorkerGlobalScope = worker.workerPort;
    try {// 调用方法无入参let res: string = workerPort.callGlobalCallObjectMethod("picData", "setUp", 0) as string;console.error("worker: ", res);
    } catch (error) {// 异常处理console.error("worker: error code is " + error.code + " error message is " + error.message);
    }
    

相关文章:

  • 状态模式(State Pattern)详解
  • Python | 分层线性模型的实现及示例
  • 什么是鸿蒙南向开发?什么是北向开发?
  • PHP 反序列化原生类 TIPS字符串逃逸CVE 绕过漏洞属性类型特征
  • 集结号海螺捕鱼游戏源码解析(第二篇):水浒传捕鱼模块逻辑与服务器帧同步详解
  • 2025山东省职业院校技能大赛网络安全赛项样题
  • node.js 实战——(path模块 知识点学习)
  • 解决重装idea后破解jerbel的问题
  • (一)单机架构、应用数据分离架构、应用服务集群架构
  • JavaScript学习教程,从入门到精通,Ajax与Node.js Web服务器开发全面指南(24)
  • 基于javaweb的SpringBoot扶农助农平台管理系统设计与实现(源码+文档+部署讲解)
  • 前端面试场景题
  • JVM学习笔记
  • MCP 协议:AI 时代的 “USB-C” 革命——从接口统一到生态重构的技术哲学
  • URP-UGUI交互功能实现
  • Git 远程操作全攻略:从基础到实战
  • SOA(半导体光放大器)在BOTDR(布里渊光时域反射计)系统中的应用
  • [计算机科学#1]:计算机的前世今生,从算盘到IBM的演变之路
  • 「ES数据迁移可视化工具(Python实现)」支持7.x索引数据互传
  • 在 UniApp 中获取当前页面地址
  • 研讨会丨明清区域史研究的比较与对话
  • 继续免费通行!五一假期全国高速公路日均流量约6200万辆
  • 新《火灾统计管理规定》即将施行,火灾死亡统计时限延长
  • 央行副行长陆磊:国际化程度有效提升是上海国际金融中心建设的一个主要方向
  • 首开股份:去年亏损约81.4亿元,是公司发展史上极其困难的一年
  • 秦洪看盘|热点凌乱难抑多头雄心