【仓颉 + 鸿蒙 + AI Agent】CangjieMagic框架(16):ReactExecutor
CangjieMagic框架:使用华为仓颉编程语言编写,专门用于开发AI Agent,支持鸿蒙、Windows、macOS等系统。
这篇文章剖析一下 CangjieMagic 框架中的 ReactExecutor。
这个执行器名字中的"React"代表"Reasoning and Acting"(推理和行动),它让Agent能够像人类一样思考问题、采取行动、观察结果,然后再思考下一步。
如果NaiveExecutor像是直接回答问题的学生,PlanReactExecutor像是制定详细计划的项目经理,那么ReactExecutor就像是一位善于边思考边行动的侦探,通过不断的观察和推理来解决问题!
1 ReactExecutor的工作原理
ReactExecutor采用了一种叫做"思考-行动-观察"的循环执行模式,就像我们在生活中解决问题的方式:
- 思考(Thought):分析问题,考虑可能的解决方案
- 行动(Action):采取一个具体行动,比如查询信息、使用工具
- 观察(Observation):查看行动的结果
- 继续思考:基于观察到的结果,进一步思考下一步行动
这就像我们烹饪一道从未做过的菜:先思考食谱,然后按步骤操作,观察结果(尝尝味道),再决定是否需要调整。
2 代码结构与初始化
让我们先看看ReactExecutor的构造函数:
public class ReactExecutor <: AgentExecutor {public ReactExecutor(private let loop!: Int64 = Config.maxReactNumber) { }// 其他成员...
}
这个构造函数看起来很简单,只有一个参数loop
,用于设置最大循环次数,默认值来自配置Config.maxReactNumber
。这个参数非常重要,它决定了Agent最多可以进行多少轮"思考-行动-观察"循环。
这就像给侦探设定一个破案时间限制,防止陷入无限的思考和调查中。在实际应用中,通常不需要手动设置这个参数,使用默认配置就足够了。
3 同步执行流程:像侦探破案一样
现在,让我们深入研究同步执行函数:
override public func run(agent: Agent, request: AgentRequest): AgentResponse {LogUtils.info("React executor runs ${agent.name}")let task = ReactTask(agent, request)for (_ in 0..this.loop) {if (let Some(resp) <- task.runOnce()) {return resp}}LogUtils.info("Exceed the max react loop")return task.summarize()
}
这段代码实现了ReactExecutor的核心功能。让我们用一个侦探破案的例子来理解这个过程:
代码解析:
-
记录日志:记录执行开始
LogUtils.info("React executor runs ${agent.name}")
这就像福尔摩斯在笔记本上写下"案件调查开始"。
-
创建任务:创建一个ReactTask对象
let task = ReactTask(agent, request)
这相当于福尔摩斯整理案件资料,准备开始调查。
-
循环执行:在最大循环次数内,不断尝试解决问题
for (_ in 0..this.loop) {// 尝试执行一次 }
这就像福尔摩斯进行一系列的调查活动,直到找到答案或达到时间限制。
-
单次执行:每次循环调用
task.runOnce()
if (let Some(resp) <- task.runOnce()) {return resp }
这相当于福尔摩斯进行一轮"思考-行动-观察",如果找到了答案,就立即返回;如果没有,继续下一轮。
-
达到限制:如果达到最大循环次数仍未解决
LogUtils.info("Exceed the max react loop") return task.summarize()
这就像福尔摩斯在规定时间内未能完全破案,但仍然需要提供一个基于已知线索的最佳推理。
这个过程既优雅又实用,通过不断的思考和行动,逐步接近问题的答案,就像侦探通过收集线索、推理和验证来解开谜团一样。
4 异步执行流程:实时追踪侦探的思路
有时候,我们不仅想知道侦探的最终结论,还想实时跟踪他的思考过程。这就是异步执行的作用:
override public func asyncRun(agent: Agent, request: AgentRequest): AsyncAgentResponse {LogUtils.info("React executor async runs ${agent.name}")let task = ReactTask(agent, request)// Create a thread to execute the react tasklet fut: Future<Iterator<String>> = spawn {for (_ in 0..this.loop) {if (let Some(asyncAnswer) <- task.asyncRunOnce()) {return asyncAnswer}}LogUtils.info("Exceed the max react loop")return task.asyncSummarize()}return AsyncAgentResponse(IteratorWrapper(task, fut), execInfo: task.execInfo)
}
这段代码实现了异步执行的功能。想象一下,这就像福尔摩斯边调查边通过对讲机实时向你汇报进展:
代码解析:
-
创建任务:与同步函数类似,创建ReactTask
let task = ReactTask(agent, request)
-
创建工作线程:使用
spawn
创建新线程let fut: Future<Iterator<String>> = spawn {// 线程内的执行代码 }
这就像福尔摩斯委派一名助手进行调查,同时自己可以处理其他事情。
-
线程内执行循环:在新线程中执行与同步函数类似的循环
for (_ in 0..this.loop) {if (let Some(asyncAnswer) <- task.asyncRunOnce()) {return asyncAnswer} }
这个循环与同步函数类似,但使用
asyncRunOnce()
函数,返回的是一个可以实时获取内容的迭代器。 -
封装返回:将Future包装成AsyncAgentResponse
return AsyncAgentResponse(IteratorWrapper(task, fut), execInfo: task.execInfo)
这就像给用户一个特殊的收音机,可以实时听到福尔摩斯的调查进展。
这种异步模式非常适合需要实时反馈的场景,用户可以看到Agent的思考过程,而不必等待最终结果。
5 深入IteratorWrapper:信息流的管理者
react_executor.cj中还定义了一个类IteratorWrapper
,它负责管理异步返回的信息流:
protected class IteratorWrapper <: Iterator<String> {protected IteratorWrapper(private let task: AgentTask,private let workerFut: Future<Iterator<String>>) { }/*** Concatenate all response content and log it finally*/private let buffer = StringBuilder()override public func next(): Option<String> {let asyncAnswer = workerFut.get()let data = asyncAnswer.next()match (data) {case Some(v) => buffer.append(v)case None =>LogUtils.info(this.task.agent.name, buffer.toString().withTag(ReactTag.ANSWER))}return data}
}
这个类看起来有点复杂,但它的作用很简单:收集和传递异步执行中产生的数据片段。它就像一位记者,实时记录并转播福尔摩斯的调查过程:
代码解析:
-
构造函数:接收任务和Future对象
protected IteratorWrapper(private let task: AgentTask,private let workerFut: Future<Iterator<String>> ) { }
这就像记者接收到一个特殊的通讯设备和采访对象。
-
缓冲区:使用StringBuilder收集所有数据
private let buffer = StringBuilder()
这就像记者随身携带的笔记本,记录所有听到的信息。
-
next函数:获取并传递下一块数据
override public func next(): Option<String> {let asyncAnswer = workerFut.get()let data = asyncAnswer.next()// ...处理数据...return data }
这就像记者不断询问:“然后呢?发生了什么?”,并将听到的内容转播给观众。
-
数据处理:根据有无数据采取不同行动
match (data) {case Some(v) => buffer.append(v)case None =>LogUtils.info(this.task.agent.name, buffer.toString().withTag(ReactTag.ANSWER)) }
当有新数据时,记录到笔记本;当没有更多数据时,整理笔记并提交完整报道。
这个类使得异步执行的结果能够以流式的方式传递给用户,同时还能保存完整的响应用于日志记录。这就像一场实况转播,既能让观众实时了解情况,又能在赛后提供完整回放。
6 总结
ReactExecutor就像一位善于动手实践的探索者,通过不断尝试和观察,逐步接近问题的答案。它特别适合那些需要工具使用、信息查询和多步推理的场景,能够让你的Agent表现得更像一个真实的人类助手。它平衡了简单直接的NaiveExecutor和复杂全面的PlanReactExecutor,为大多数实际应用场景提供了理想的解决方案。