【Dify(v1.2) 核心源码深入解析】Agent 模块
重磅推荐专栏:
《大模型AIGC》
《课程大纲》
《知识星球》
本专栏致力于探索和讨论当今最前沿的技术趋势和应用领域,包括但不限于ChatGPT和Stable Diffusion等。我们将深入研究大型模型的开发和应用,以及与之相关的人工智能生成内容(AIGC)技术。通过深入的技术解析和实践经验分享,旨在帮助读者更好地理解和应用这些领域的最新进展
引言
Dify 是一个强大的 AI 应用开发平台,它提供了丰富的功能来帮助开发者构建智能代理(Agent)。在 Dify 中,Agent 模块是核心组件之一,它负责处理用户请求、调用工具、生成响应以及管理整个交互流程。本文将深入解析 Dify 的 Agent 模块,从架构设计到代码实现,帮助你全面理解这个模块的工作原理。
1. Agent 模块概述
Agent 模块是 Dify 中负责处理用户请求的核心组件。它的主要功能包括:
- 接收用户输入并解析请求。
- 根据请求内容调用合适的工具或模型。
- 生成响应并返回给用户。
- 管理整个交互流程,包括工具调用、消息处理和状态管理。
1.1 核心功能
功能模块 | 描述 |
---|---|
策略(Strategy) | 定义 Agent 的行为逻辑,如 Chain-of-Thought 和 Function-Calling。 |
工具(Tools) | 提供各种功能,如数据检索、文件处理等。 |
消息处理 | 处理用户输入和生成响应。 |
状态管理 | 管理 Agent 的状态和上下文。 |
1.2 设计目标
- 灵活性:支持多种策略和工具,适应不同的应用场景。
- 可扩展性:方便开发者扩展新的功能和工具。
- 高效性:优化性能,确保快速响应用户请求。
2. Agent 模块架构
Agent 模块的架构设计如下图所示:
Agent 模块主要由以下几个部分组成:
- 策略(Strategy):定义 Agent 的行为逻辑。
- 工具(Tools):提供各种功能。
- 消息处理(Message Processing):处理用户输入和生成响应。
- 状态管理(State Management):管理 Agent 的状态和上下文。
3. 策略(Strategy)
策略是 Agent 模块的核心,它决定了 Agent 如何处理用户请求。Dify 提供了两种主要的策略:
- Chain-of-Thought (CoT):通过逐步推理解决问题。
- Function-Calling (FC):直接调用工具或函数解决问题。
3.1 Chain-of-Thought (CoT)
CoT 策略通过逐步推理解决问题,它的工作流程如下:
3.1.1 关键代码解析
class CotAgentRunner(BaseAgentRunner, ABC):def run(self, message: Message, query: str, inputs: Mapping[str, str]) -> Generator:"""运行 CoT 策略"""# 初始化 Agent 状态self._init_react_state(query)# 循环处理用户请求while function_call_state and iteration_step <= max_iteration_steps:# 组织提示消息prompt_messages = self._organize_prompt_messages()# 调用模型生成响应chunks = model_instance.invoke_llm(prompt_messages=prompt_messages,model_parameters=app_generate_entity.model_conf.parameters,tools=[],stop=app_generate_entity.model_conf.stop,stream=True,user=self.user_id,callbacks=[],)# 处理模型输出for chunk in chunks:# 解析模型输出scratchpad = AgentScratchpadUnit(agent_response="",thought="",action_str="",observation="",action=None,)# 检查是否需要调用工具if scratchpad.action and not scratchpad.is_final():# 调用工具tool_invoke_response, tool_invoke_meta = self._handle_invoke_action(action=scratchpad.action,tool_instances=tool_instances,message_file_ids=message_file_ids,trace_manager=trace_manager,)# 更新 Agent 状态self.save_agent_thought(agent_thought=agent_thought,tool_name=scratchpad.action.action_name,tool_input={scratchpad.action.action_name: scratchpad.action.action_input},thought=scratchpad.thought or "",observation={scratchpad.action.action_name: tool_invoke_response},tool_invoke_meta={scratchpad.action.action_name: tool_invoke_meta.to_dict()},answer=scratchpad.agent_response,messages_ids=message_file_ids,llm_usage=usage_dict["usage"],)
3.1.2 CoT 的优势
优势 | 描述 |
---|---|
逐步推理 | 通过多步思考解决问题,适合复杂任务。 |
灵活性高 | 可以根据需要调整思考步骤和工具调用。 |
透明性好 | 每一步思考和工具调用都有记录,便于调试和优化。 |
3.2 Function-Calling (FC)
FC 策略通过直接调用工具或函数解决问题,它的工作流程如下:
3.2.1 关键代码解析
class FunctionCallAgentRunner(BaseAgentRunner):def run(self, message: Message, query: str, **kwargs: Any) -> Generator[LLMResultChunk, None, None]:"""运行 Function-Calling 策略"""# 初始化工具和提示消息tool_instances, prompt_messages_tools = self._init_prompt_tools()# 循环处理用户请求while function_call_state and iteration_step <= max_iteration_steps:# 组织提示消息prompt_messages = self._organize_prompt_messages()# 调用模型生成响应chunks = model_instance.invoke_llm(prompt_messages=prompt_messages,model_parameters=app_generate_entity.model_conf.parameters,tools=prompt_messages_tools,stop=app_generate_entity.model_conf.stop,stream=self.stream_tool_call,user=self.user_id,callbacks=[],)# 处理模型输出for chunk in chunks:# 检查是否需要调用工具if self.check_tool_calls(chunk):# 提取工具调用tool_calls = self.extract_tool_calls(chunk)# 调用工具for tool_call_id, tool_call_name, tool_call_args in tool_calls:tool_instance = tool_instances.get(tool_call_name)if tool_instance:tool_invoke_response, message_files, tool_invoke_meta = ToolEngine.agent_invoke(tool=tool_instance,tool_parameters=tool_call_args,user_id=self.user_id,tenant_id=self.tenant_id,message=self.message,invoke_from=self.application_generate_entity.invoke_from,agent_tool_callback=self.agent_callback,trace_manager=trace_manager,app_id=self.application_generate_entity.app_config.app_id,message_id=self.message.id,conversation_id=self.conversation.id,)# 更新 Agent 状态self.save_agent_thought(agent_thought=agent_thought,tool_name=tool_call_name,tool_input=tool_call_args,thought=response,observation=tool_invoke_response,tool_invoke_meta=tool_invoke_meta.to_dict(),answer=response,messages_ids=message_file_ids,llm_usage=current_llm_usage,)
3.2.2 FC 的优势
优势 | 描述 |
---|---|
高效性 | 直接调用工具,减少中间步骤,适合简单任务。 |
响应速度快 | 减少推理步骤,提高响应速度。 |
简单易用 | 适合直接调用工具的场景,易于实现。 |
4. 工具(Tools)
工具是 Agent 模块的重要组成部分,它提供了各种功能来帮助 Agent 完成任务。Dify 提供了多种工具,包括数据检索、文件处理、模型调用等。
4.1 工具的分类
工具分类 | 描述 |
---|---|
数据检索工具 | 用于从数据集中检索信息。 |
文件处理工具 | 用于处理用户上传的文件。 |
模型调用工具 | 用于调用其他 AI 模型。 |
自定义工具 | 用户可以自定义工具来扩展功能。 |
4.2 工具的实现
工具的实现基于 Tool
类,它定义了工具的基本行为。以下是一个简单的工具实现示例:
class ExampleTool(Tool):def __init__(self, name: str, description: str):super().__init__(name, description)def run(self, parameters: dict) -> str:"""运行工具"""# 处理参数input_text = parameters.get("input_text", "")# 执行工具逻辑result = self.process_input(input_text)return resultdef process_input(self, input_text: str) -> str:"""处理输入文本"""# 示例逻辑:将输入文本转换为大写return input_text.upper()
4.3 工具的调用
工具的调用流程如下:
5. 消息处理
消息处理是 Agent 模块的核心功能之一,它负责处理用户输入和生成响应。消息处理的主要步骤包括:
- 解析用户输入:将用户输入转换为内部表示。
- 生成提示消息:根据用户输入和上下文生成提示消息。
- 调用模型:将提示消息传递给模型,生成响应。
- 处理模型输出:解析模型输出,生成最终响应。
5.1 消息处理流程
5.2 消息处理代码示例
class BaseAgentRunner(AppRunner):def _organize_prompt_messages(self) -> list[PromptMessage]:"""组织提示消息"""# 初始化系统消息system_message = self._organize_system_prompt()# 组织用户查询query_messages = self._organize_user_query(self._query, [])# 组织历史消息historic_messages = self._organize_historic_prompt_messages([system_message, *query_messages])# 组织当前 Agent 思考过程assistant_messages = []if self._agent_scratchpad:assistant_message = AssistantPromptMessage(content="")for unit in self._agent_scratchpad:if unit.is_final():assistant_message.content += f"Final Answer: {unit.agent_response}"else:assistant_message.content += f"Thought: {unit.thought}\n\n"if unit.action_str:assistant_message.content += f"Action: {unit.action_str}\n\n"if unit.observation:assistant_message.content += f"Observation: {unit.observation}\n\n"assistant_messages = [assistant_message]# 合并所有消息messages = [system_message,*historic_messages,*query_messages,*assistant_messages,UserPromptMessage(content="continue"),]return messages
6. 状态管理
状态管理是 Agent 模块的重要功能,它负责记录和管理 Agent 的状态和上下文。状态管理的主要任务包括:
- 记录 Agent 思考过程:保存 Agent 的每一步思考和工具调用结果。
- 管理上下文:维护用户会话的上下文信息。
- 优化性能:通过状态管理减少重复计算和资源浪费。
6.1 状态管理流程
6.2 状态管理代码示例
class BaseAgentRunner(AppRunner):def save_agent_thought(self,agent_thought: MessageAgentThought,tool_name: str | None,tool_input: Union[str, dict, None],thought: str | None,observation: Union[str, dict, None],tool_invoke_meta: Union[str, dict, None],answer: str | None,messages_ids: list[str],llm_usage: LLMUsage | None = None,):"""保存 Agent 思考过程"""# 更新 Agent 思考记录updated_agent_thought = (db.session.query(MessageAgentThought).filter(MessageAgentThought.id == agent_thought.id).first())if not updated_agent_thought:raise ValueError("agent thought not found")# 更新思考内容if thought:updated_agent_thought.thought += thought# 更新工具信息if tool_name:updated_agent_thought.tool = tool_nameif tool_input:if isinstance(tool_input, dict):try:tool_input = json.dumps(tool_input, ensure_ascii=False)except Exception:tool_input = json.dumps(tool_input)updated_agent_thought.tool_input = tool_input# 更新观察结果if observation:if isinstance(observation, dict):try:observation = json.dumps(observation, ensure_ascii=False)except Exception:observation = json.dumps(observation)updated_agent_thought.observation = observation# 更新回答内容if answer:updated_agent_thought.answer = answer# 更新文件信息if messages_ids is not None and len(messages_ids) > 0:updated_agent_thought.message_files = json.dumps(messages_ids)# 更新 LLM 使用情况if llm_usage:updated_agent_thought.message_token = llm_usage.prompt_tokensupdated_agent_thought.message_price_unit = llm_usage.prompt_price_unitupdated_agent_thought.message_unit_price = llm_usage.prompt_unit_priceupdated_agent_thought.answer_token = llm_usage.completion_tokensupdated_agent_thought.answer_price_unit = llm_usage.completion_price_unitupdated_agent_thought.answer_unit_price = llm_usage.completion_unit_priceupdated_agent_thought.tokens = llm_usage.total_tokensupdated_agent_thought.total_price = llm_usage.total_price# 提交更改db.session.commit()db.session.close()
7. 实际应用示例
为了更好地理解 Agent 模块的工作原理,我们通过一个简单的例子来展示如何使用 Dify 的 Agent 模块。
7.1 示例场景
假设我们有一个问答应用,用户可以上传文件并提问。Agent 需要解析用户问题,调用文件处理工具提取信息,并生成回答。
7.2 示例代码
from core.agent.cot_agent_runner import CotAgentRunner
from core.model_runtime.entities import Message, PromptMessage, UserPromptMessage, AssistantPromptMessageclass ExampleAgentRunner(CotAgentRunner):def __init__(self, app_config, model_config, conversation, message):super().__init__(tenant_id="example_tenant",application_generate_entity=app_config,conversation=conversation,app_config=app_config,model_config=model_config,config=app_config.agent,queue_manager=None,message=message,user_id="example_user",model_instance=None,memory=None,prompt_messages=None,)def run(self, query: str, inputs: dict):# 初始化 Agent 状态self._init_react_state(query)# 组织提示消息prompt_messages = self._organize_prompt_messages()# 调用模型生成响应response = self.model_instance.invoke_llm(prompt_messages=prompt_messages,model_parameters=self.app_config.model_conf.parameters,tools=[],stop=self.app_config.model_conf.stop,stream=True,user=self.user_id,callbacks=[],)# 处理模型输出for chunk in response:# 解析模型输出scratchpad = AgentScratchpadUnit(agent_response="",thought="",action_str="",observation="",action=None,)# 检查是否需要调用工具if scratchpad.action and not scratchpad.is_final():# 调用工具tool_invoke_response, tool_invoke_meta = self._handle_invoke_action(action=scratchpad.action,tool_instances=self._tool_instances,message_file_ids=[],trace_manager=None,)# 更新 Agent 状态self.save_agent_thought(agent_thought=self.create_agent_thought(message_id=self.message.id,message="",tool_name="",tool_input="",messages_ids=[],),tool_name=scratchpad.action.action_name,tool_input={scratchpad.action.action_name: scratchpad.action.action_input},thought=scratchpad.thought or "",observation={scratchpad.action.action_name: tool_invoke_response},tool_invoke_meta={scratchpad.action.action_name: tool_invoke_meta.to_dict()},answer=scratchpad.agent_response,messages_ids=[],llm_usage=None,)return scratchpad.agent_response
8. 总结
通过本文的详细解析,我们深入了解了 Dify 的 Agent 模块的架构设计和实现细节。Agent 模块通过灵活的策略、丰富的工具、高效的消息处理和状态管理,为开发者提供了一个强大的 AI 应用开发平台。希望本文能帮助你更好地理解和使用 Dify 的 Agent 模块。
Dify 的 Agent 模块仍在不断发展,未来可能会引入以下新功能:
- 更智能的策略:支持更多复杂的推理和决策逻辑。
- 更丰富的工具:扩展更多实用工具,如自然语言处理和图像识别。
- 更好的性能优化:进一步提升响应速度和资源利用率。