当前位置: 首页 > news >正文

【Dify(v1.2) 核心源码深入解析】Agent 模块

重磅推荐专栏:
《大模型AIGC》
《课程大纲》
《知识星球》

本专栏致力于探索和讨论当今最前沿的技术趋势和应用领域,包括但不限于ChatGPT和Stable Diffusion等。我们将深入研究大型模型的开发和应用,以及与之相关的人工智能生成内容(AIGC)技术。通过深入的技术解析和实践经验分享,旨在帮助读者更好地理解和应用这些领域的最新进展

引言

Dify 是一个强大的 AI 应用开发平台,它提供了丰富的功能来帮助开发者构建智能代理(Agent)。在 Dify 中,Agent 模块是核心组件之一,它负责处理用户请求、调用工具、生成响应以及管理整个交互流程。本文将深入解析 Dify 的 Agent 模块,从架构设计到代码实现,帮助你全面理解这个模块的工作原理。

1. Agent 模块概述

Agent 模块是 Dify 中负责处理用户请求的核心组件。它的主要功能包括:

  • 接收用户输入并解析请求。
  • 根据请求内容调用合适的工具或模型。
  • 生成响应并返回给用户。
  • 管理整个交互流程,包括工具调用、消息处理和状态管理。

1.1 核心功能

功能模块描述
策略(Strategy)定义 Agent 的行为逻辑,如 Chain-of-Thought 和 Function-Calling。
工具(Tools)提供各种功能,如数据检索、文件处理等。
消息处理处理用户输入和生成响应。
状态管理管理 Agent 的状态和上下文。

1.2 设计目标

  • 灵活性:支持多种策略和工具,适应不同的应用场景。
  • 可扩展性:方便开发者扩展新的功能和工具。
  • 高效性:优化性能,确保快速响应用户请求。

2. Agent 模块架构

Agent 模块的架构设计如下图所示:

Chain-of-Thought
Function-Calling
用户请求
Agent 策略
选择策略
CoT 处理器
FC 处理器
调用工具
生成响应
返回用户

Agent 模块主要由以下几个部分组成:

  1. 策略(Strategy):定义 Agent 的行为逻辑。
  2. 工具(Tools):提供各种功能。
  3. 消息处理(Message Processing):处理用户输入和生成响应。
  4. 状态管理(State Management):管理 Agent 的状态和上下文。

3. 策略(Strategy)

策略是 Agent 模块的核心,它决定了 Agent 如何处理用户请求。Dify 提供了两种主要的策略:

  • Chain-of-Thought (CoT):通过逐步推理解决问题。
  • Function-Calling (FC):直接调用工具或函数解决问题。

3.1 Chain-of-Thought (CoT)

CoT 策略通过逐步推理解决问题,它的工作流程如下:

用户 Agent 工具 模型 发送请求 生成初步思考 调用工具 返回结果 更新思考 loop [逐步推理] 返回最终响应 用户 Agent 工具 模型

3.1.1 关键代码解析

class CotAgentRunner(BaseAgentRunner, ABC):def run(self, message: Message, query: str, inputs: Mapping[str, str]) -> Generator:"""运行 CoT 策略"""# 初始化 Agent 状态self._init_react_state(query)# 循环处理用户请求while function_call_state and iteration_step <= max_iteration_steps:# 组织提示消息prompt_messages = self._organize_prompt_messages()# 调用模型生成响应chunks = model_instance.invoke_llm(prompt_messages=prompt_messages,model_parameters=app_generate_entity.model_conf.parameters,tools=[],stop=app_generate_entity.model_conf.stop,stream=True,user=self.user_id,callbacks=[],)# 处理模型输出for chunk in chunks:# 解析模型输出scratchpad = AgentScratchpadUnit(agent_response="",thought="",action_str="",observation="",action=None,)# 检查是否需要调用工具if scratchpad.action and not scratchpad.is_final():# 调用工具tool_invoke_response, tool_invoke_meta = self._handle_invoke_action(action=scratchpad.action,tool_instances=tool_instances,message_file_ids=message_file_ids,trace_manager=trace_manager,)# 更新 Agent 状态self.save_agent_thought(agent_thought=agent_thought,tool_name=scratchpad.action.action_name,tool_input={scratchpad.action.action_name: scratchpad.action.action_input},thought=scratchpad.thought or "",observation={scratchpad.action.action_name: tool_invoke_response},tool_invoke_meta={scratchpad.action.action_name: tool_invoke_meta.to_dict()},answer=scratchpad.agent_response,messages_ids=message_file_ids,llm_usage=usage_dict["usage"],)

3.1.2 CoT 的优势

优势描述
逐步推理通过多步思考解决问题,适合复杂任务。
灵活性高可以根据需要调整思考步骤和工具调用。
透明性好每一步思考和工具调用都有记录,便于调试和优化。

3.2 Function-Calling (FC)

FC 策略通过直接调用工具或函数解决问题,它的工作流程如下:

用户 Agent 工具 模型 发送请求 生成工具调用 调用工具 返回结果 返回响应 用户 Agent 工具 模型

3.2.1 关键代码解析

class FunctionCallAgentRunner(BaseAgentRunner):def run(self, message: Message, query: str, **kwargs: Any) -> Generator[LLMResultChunk, None, None]:"""运行 Function-Calling 策略"""# 初始化工具和提示消息tool_instances, prompt_messages_tools = self._init_prompt_tools()# 循环处理用户请求while function_call_state and iteration_step <= max_iteration_steps:# 组织提示消息prompt_messages = self._organize_prompt_messages()# 调用模型生成响应chunks = model_instance.invoke_llm(prompt_messages=prompt_messages,model_parameters=app_generate_entity.model_conf.parameters,tools=prompt_messages_tools,stop=app_generate_entity.model_conf.stop,stream=self.stream_tool_call,user=self.user_id,callbacks=[],)# 处理模型输出for chunk in chunks:# 检查是否需要调用工具if self.check_tool_calls(chunk):# 提取工具调用tool_calls = self.extract_tool_calls(chunk)# 调用工具for tool_call_id, tool_call_name, tool_call_args in tool_calls:tool_instance = tool_instances.get(tool_call_name)if tool_instance:tool_invoke_response, message_files, tool_invoke_meta = ToolEngine.agent_invoke(tool=tool_instance,tool_parameters=tool_call_args,user_id=self.user_id,tenant_id=self.tenant_id,message=self.message,invoke_from=self.application_generate_entity.invoke_from,agent_tool_callback=self.agent_callback,trace_manager=trace_manager,app_id=self.application_generate_entity.app_config.app_id,message_id=self.message.id,conversation_id=self.conversation.id,)# 更新 Agent 状态self.save_agent_thought(agent_thought=agent_thought,tool_name=tool_call_name,tool_input=tool_call_args,thought=response,observation=tool_invoke_response,tool_invoke_meta=tool_invoke_meta.to_dict(),answer=response,messages_ids=message_file_ids,llm_usage=current_llm_usage,)

3.2.2 FC 的优势

优势描述
高效性直接调用工具,减少中间步骤,适合简单任务。
响应速度快减少推理步骤,提高响应速度。
简单易用适合直接调用工具的场景,易于实现。

4. 工具(Tools)

工具是 Agent 模块的重要组成部分,它提供了各种功能来帮助 Agent 完成任务。Dify 提供了多种工具,包括数据检索、文件处理、模型调用等。

4.1 工具的分类

工具分类描述
数据检索工具用于从数据集中检索信息。
文件处理工具用于处理用户上传的文件。
模型调用工具用于调用其他 AI 模型。
自定义工具用户可以自定义工具来扩展功能。

4.2 工具的实现

工具的实现基于 Tool 类,它定义了工具的基本行为。以下是一个简单的工具实现示例:

class ExampleTool(Tool):def __init__(self, name: str, description: str):super().__init__(name, description)def run(self, parameters: dict) -> str:"""运行工具"""# 处理参数input_text = parameters.get("input_text", "")# 执行工具逻辑result = self.process_input(input_text)return resultdef process_input(self, input_text: str) -> str:"""处理输入文本"""# 示例逻辑:将输入文本转换为大写return input_text.upper()

4.3 工具的调用

工具的调用流程如下:

Agent 工具管理器 工具 请求工具 初始化工具 调用工具 返回结果 Agent 工具管理器 工具

5. 消息处理

消息处理是 Agent 模块的核心功能之一,它负责处理用户输入和生成响应。消息处理的主要步骤包括:

  • 解析用户输入:将用户输入转换为内部表示。
  • 生成提示消息:根据用户输入和上下文生成提示消息。
  • 调用模型:将提示消息传递给模型,生成响应。
  • 处理模型输出:解析模型输出,生成最终响应。

5.1 消息处理流程

用户输入
解析输入
生成提示消息
调用模型
处理输出
生成响应
返回用户

5.2 消息处理代码示例

class BaseAgentRunner(AppRunner):def _organize_prompt_messages(self) -> list[PromptMessage]:"""组织提示消息"""# 初始化系统消息system_message = self._organize_system_prompt()# 组织用户查询query_messages = self._organize_user_query(self._query, [])# 组织历史消息historic_messages = self._organize_historic_prompt_messages([system_message, *query_messages])# 组织当前 Agent 思考过程assistant_messages = []if self._agent_scratchpad:assistant_message = AssistantPromptMessage(content="")for unit in self._agent_scratchpad:if unit.is_final():assistant_message.content += f"Final Answer: {unit.agent_response}"else:assistant_message.content += f"Thought: {unit.thought}\n\n"if unit.action_str:assistant_message.content += f"Action: {unit.action_str}\n\n"if unit.observation:assistant_message.content += f"Observation: {unit.observation}\n\n"assistant_messages = [assistant_message]# 合并所有消息messages = [system_message,*historic_messages,*query_messages,*assistant_messages,UserPromptMessage(content="continue"),]return messages

6. 状态管理

状态管理是 Agent 模块的重要功能,它负责记录和管理 Agent 的状态和上下文。状态管理的主要任务包括:

  • 记录 Agent 思考过程:保存 Agent 的每一步思考和工具调用结果。
  • 管理上下文:维护用户会话的上下文信息。
  • 优化性能:通过状态管理减少重复计算和资源浪费。

6.1 状态管理流程

用户请求
记录初始状态
处理请求
更新状态
保存状态
返回响应

6.2 状态管理代码示例

class BaseAgentRunner(AppRunner):def save_agent_thought(self,agent_thought: MessageAgentThought,tool_name: str | None,tool_input: Union[str, dict, None],thought: str | None,observation: Union[str, dict, None],tool_invoke_meta: Union[str, dict, None],answer: str | None,messages_ids: list[str],llm_usage: LLMUsage | None = None,):"""保存 Agent 思考过程"""# 更新 Agent 思考记录updated_agent_thought = (db.session.query(MessageAgentThought).filter(MessageAgentThought.id == agent_thought.id).first())if not updated_agent_thought:raise ValueError("agent thought not found")# 更新思考内容if thought:updated_agent_thought.thought += thought# 更新工具信息if tool_name:updated_agent_thought.tool = tool_nameif tool_input:if isinstance(tool_input, dict):try:tool_input = json.dumps(tool_input, ensure_ascii=False)except Exception:tool_input = json.dumps(tool_input)updated_agent_thought.tool_input = tool_input# 更新观察结果if observation:if isinstance(observation, dict):try:observation = json.dumps(observation, ensure_ascii=False)except Exception:observation = json.dumps(observation)updated_agent_thought.observation = observation# 更新回答内容if answer:updated_agent_thought.answer = answer# 更新文件信息if messages_ids is not None and len(messages_ids) > 0:updated_agent_thought.message_files = json.dumps(messages_ids)# 更新 LLM 使用情况if llm_usage:updated_agent_thought.message_token = llm_usage.prompt_tokensupdated_agent_thought.message_price_unit = llm_usage.prompt_price_unitupdated_agent_thought.message_unit_price = llm_usage.prompt_unit_priceupdated_agent_thought.answer_token = llm_usage.completion_tokensupdated_agent_thought.answer_price_unit = llm_usage.completion_price_unitupdated_agent_thought.answer_unit_price = llm_usage.completion_unit_priceupdated_agent_thought.tokens = llm_usage.total_tokensupdated_agent_thought.total_price = llm_usage.total_price# 提交更改db.session.commit()db.session.close()

7. 实际应用示例

为了更好地理解 Agent 模块的工作原理,我们通过一个简单的例子来展示如何使用 Dify 的 Agent 模块。

7.1 示例场景

假设我们有一个问答应用,用户可以上传文件并提问。Agent 需要解析用户问题,调用文件处理工具提取信息,并生成回答。

7.2 示例代码

from core.agent.cot_agent_runner import CotAgentRunner
from core.model_runtime.entities import Message, PromptMessage, UserPromptMessage, AssistantPromptMessageclass ExampleAgentRunner(CotAgentRunner):def __init__(self, app_config, model_config, conversation, message):super().__init__(tenant_id="example_tenant",application_generate_entity=app_config,conversation=conversation,app_config=app_config,model_config=model_config,config=app_config.agent,queue_manager=None,message=message,user_id="example_user",model_instance=None,memory=None,prompt_messages=None,)def run(self, query: str, inputs: dict):# 初始化 Agent 状态self._init_react_state(query)# 组织提示消息prompt_messages = self._organize_prompt_messages()# 调用模型生成响应response = self.model_instance.invoke_llm(prompt_messages=prompt_messages,model_parameters=self.app_config.model_conf.parameters,tools=[],stop=self.app_config.model_conf.stop,stream=True,user=self.user_id,callbacks=[],)# 处理模型输出for chunk in response:# 解析模型输出scratchpad = AgentScratchpadUnit(agent_response="",thought="",action_str="",observation="",action=None,)# 检查是否需要调用工具if scratchpad.action and not scratchpad.is_final():# 调用工具tool_invoke_response, tool_invoke_meta = self._handle_invoke_action(action=scratchpad.action,tool_instances=self._tool_instances,message_file_ids=[],trace_manager=None,)# 更新 Agent 状态self.save_agent_thought(agent_thought=self.create_agent_thought(message_id=self.message.id,message="",tool_name="",tool_input="",messages_ids=[],),tool_name=scratchpad.action.action_name,tool_input={scratchpad.action.action_name: scratchpad.action.action_input},thought=scratchpad.thought or "",observation={scratchpad.action.action_name: tool_invoke_response},tool_invoke_meta={scratchpad.action.action_name: tool_invoke_meta.to_dict()},answer=scratchpad.agent_response,messages_ids=[],llm_usage=None,)return scratchpad.agent_response

8. 总结

通过本文的详细解析,我们深入了解了 Dify 的 Agent 模块的架构设计和实现细节。Agent 模块通过灵活的策略、丰富的工具、高效的消息处理和状态管理,为开发者提供了一个强大的 AI 应用开发平台。希望本文能帮助你更好地理解和使用 Dify 的 Agent 模块。

Dify 的 Agent 模块仍在不断发展,未来可能会引入以下新功能:

  • 更智能的策略:支持更多复杂的推理和决策逻辑。
  • 更丰富的工具:扩展更多实用工具,如自然语言处理和图像识别。
  • 更好的性能优化:进一步提升响应速度和资源利用率。

相关文章:

  • 深入讲解 CSS 选择器权重及实战
  • 【刷题2025】单指针双指针+滑动窗口+二分法三分法+区间问题
  • 如何一键检查网页里的失效链接和废弃域名?
  • 【加密算法】SM2密钥生成与转换详解:从原理到代码实现
  • ecovadis分为哪些类别,要进行ecovadis认证有什么要求
  • 榕壹云场馆预定系统:基于ThinkPHP+MySQL+UniApp打造的全能运动馆智慧运营解决方案
  • 解锁Grok-3的极致潜能:高阶应用与创新实践
  • 多模态大模型文字识别 vs OCR识别模型
  • 【Python进阶】断言(assert)的十大核心应用场景解析
  • RelativeLayout(相对布局)
  • Mac电脑交叉编译iphone设备可以运行的redsocks, openssl, libsevent
  • Rust + WebAssembly 性能剖析指南
  • 辛格迪客户案例 | 厦门三维丝实施SAP系统
  • js ES6箭头函数的作用
  • 0415-批量删除操作
  • ERR_PNPM_DLX_NO_BIN No binaries found in tailwindcss
  • ClickHouse 数据库中的 “超时”
  • 游戏引擎学习第227天
  • Java微服务线程隔离技术对比:线程池隔离 vs 信号量隔离
  • union all 关联查询
  • 生态环境部:我国核电规模全球第一,总体安全可控
  • 巴基斯坦航天员选拔工作正在进行,1人将以载荷专家身份参加联合飞行
  • 对话|四代中国缘的麦肯锡前高管:在混乱中制定规则,而不是复制旧秩序
  • 杭州萧山区两宗地块收金约44.73亿元,最高溢价率74.4%
  • 经济日报:锚定重点领域和关键环节,上海浦东谋划高水平对外开放
  • 文甦任四川乐山市委副书记,曾赴外交部挂职副司长