当前位置: 首页 > news >正文

MCP实践第一步--磕磕碰碰搭环境

由于deepseek-r1不支持function calling,所以我们采用了deepseek-v3进行实践,模型名称为deepseek-chat,在deepseek官网获取api-key。

一、参照MCP官网设置环境

创建项目目录

uv init mcp-client # 若没有uv,则先通过pip install uv进行安装
cd mcp-client

创建虚拟环境

uv venv # 这里可以指定python版本,版本不低于3.10

激活虚拟环境

在 Windows 上:

.venv\Scripts\activate

在 Unix 或 MacOS 上:

source .venv/bin/activate

安装所需的包

uv add mcp openai python-dotenv

在这里插入图片描述
在这里插入图片描述

修改main.py文件为mcp-client.py文件

在这里插入图片描述


二、环境变量配置

1. 创建一个 .env 文件

在这里插入图片描述

2. 配置环境变量

创建.env文件存储必要参数:

API_KEY=sk-2470be72b342438eb14d240e0fb31xxx  # 此处需替换
BASE_URL=https://api.deepseek.com
MODEL_NAME=deepseek-chat

三、MCP Server实现:打造你的工具箱

好的,我将按照代码的功能模块进行分块讲解。以下是分块后的代码解析:


1. 导入模块与配置日志

import asyncio
import logging
from mcp.server.fastmcp import FastMCP
from mcp.server import InitializationOptions, NotificationOptions
from mcp.server.stdio import stdio_server  # 直接导入 stdio_server 函数# 定义服务器名称
MCP_SERVER_NAME = "math-stdio-server"# 配置日志
logging.basicConfig(level=logging.INFO,format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(MCP_SERVER_NAME)
功能说明:
  • 导入模块:引入所需的异步库(asyncio)、日志库(logging),以及MCP协议相关的类和函数。
  • 定义服务器名称MCP_SERVER_NAME 用于标识服务器实例。
  • 配置日志
    • logging.basicConfig 设置日志级别为 INFO,并定义日志格式(时间、名称、级别、消息)。
    • logger 是服务器的专用日志记录器,方便后续输出日志信息。

2. 初始化 FastMCP 实例

# 初始化 FastMCP 实例
mcp = FastMCP(MCP_SERVER_NAME)
功能说明:
  • 创建 FastMCP 对象:通过 FastMCP 类初始化一个 MCP(Math Calculation Protocol)服务器实例,参数为服务器名称 MCP_SERVER_NAME
  • 后续操作:通过装饰器 @mcp.tool() 注册工具函数,这些函数将暴露给客户端调用。

3. 注册数学工具函数

# 注册加法工具
@mcp.tool()
def add(a: float, b: float) -> float:"""..."""return a + b# 注册减法工具
@mcp.tool()
def subtract(a: float, b: float) -> float:"""..."""return a - b# 注册乘法工具
@mcp.tool()
def multiply(a: float, b: float) -> float:"""..."""return a * b# 注册除法工具
@mcp.tool()
def divide(a: float, b: float) -> float:"""..."""if b == 0:raise ValueError("除数不能为零")return a / b
功能说明:
  • 装饰器 @mcp.tool():将函数注册为MCP服务器的可用工具,客户端可通过协议调用这些函数。
  • 每个工具函数
    • 参数与返回值:明确指定参数类型(float)和返回值类型(float),便于客户端验证。
    • 文档字符串:描述工具功能、参数和返回值,这些信息会被MCP协议自动生成为工具的元数据。
    • 错误处理:例如 divide 函数检查除数是否为零,抛出 ValueError

4. 主函数:启动异步 STDIO 服务器

async def main():# 使用 stdio_server 建立 STDIO 通信async with stdio_server() as (read_stream, write_stream):# 构造初始化选项init_options = InitializationOptions(server_name=MCP_SERVER_NAME,server_version="1.0.0",capabilities=mcp._mcp_server.get_capabilities(notification_options=NotificationOptions(),experimental_capabilities={}))logger.info("通过 STDIO 模式启动 MCP Server ...")# 使用内部的 _mcp_server 运行服务await mcp._mcp_server.run(read_stream, write_stream, init_options)
功能说明:
  • 异步上下文管理器 stdio_server()
    • 建立通过标准输入输出(STDIO)的通信通道,read_streamwrite_stream 分别用于读取客户端请求和发送响应。
  • 初始化选项 init_options
    • 设置服务器名称、版本号。
    • 通过 get_capabilities() 获取服务器支持的功能(如通知选项、实验性功能)。
  • 运行服务器
    • 调用 mcp._mcp_server.run() 启动服务,传入通信流和初始化选项。
    • 服务器会监听STDIO的输入,并根据客户端请求调用注册的工具函数。

5. 程序入口

if __name__ == "__main__":asyncio.run(main())
功能说明:
  • 入口条件:当脚本直接运行时(而非被导入),执行 asyncio.run(main())
  • 启动异步事件循环asyncio.run() 启动异步主函数 main(),从而启动整个MCP服务器。

代码整体流程

  1. 初始化:配置日志、创建MCP实例、注册工具函数。
  2. 启动服务器
    • 通过STDIO建立通信通道。
    • 发送初始化信息(名称、版本、功能)给客户端。
  3. 运行服务:监听STDIO输入,处理客户端请求(如调用 adddivide 等工具函数),并返回结果。

关键点说明

  1. STDIO 通信

    • 服务器通过标准输入输出(STDIO)与客户端通信,适合在进程间或通过管道传递数据。
    • stdio_server() 返回的流对象用于异步读写。
  2. MCP 协议

    • FastMCP 是MCP协议的实现,通过装饰器注册工具函数,客户端可通过协议调用这些函数。
    • 工具函数的元数据(如参数类型、文档)会被自动暴露给客户端。
  3. 错误处理

    • 工具函数中显式抛出异常(如 divide 中的 ValueError),客户端会收到错误信息。
  4. 异步编程

    • 使用 async/await 实现异步IO,适合处理高并发或长时间运行的任务。

四、MCP Client客户端实现

好的,我将基于您提供的代码重新进行分块讲解。以下是代码的模块化解析:


1. 导入模块与环境变量配置

import asyncio
import json
import os
import sys
from typing import List
from contextlib import AsyncExitStackfrom mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import AsyncOpenAI
from dotenv import load_dotenvload_dotenv()  # 从 .env 加载环境变量
功能说明:
  • 导入模块
    • asyncio:异步编程核心。
    • json:处理JSON数据。
    • os/sys:系统操作和命令行参数处理。
    • typing.List:类型注解。
    • contextlib.AsyncExitStack:管理异步资源。
    • mcp:MCP协议客户端工具。
    • openai:与OpenAI API交互。
    • dotenv:加载环境变量。
  • 环境变量加载:通过 load_dotenv() 加载 .env 中的配置(如API密钥)。

2. MCPClient 类定义

class MCPClient:def __init__(self, model_name: str, base_url: str, api_key: str, server_scripts: List[str]):"""初始化 MCP 客户端,支持多个 stdio 服务器。"""self.model_name = model_nameself.base_url = base_urlself.api_key = api_keyself.server_scripts = server_scriptsself.sessions = {}         # server_id -> (session, session_ctx, stdio_ctx)self.tool_mapping = {}     # 带前缀的工具名 -> (session, 原始工具名)self.client = AsyncOpenAI(base_url=base_url, api_key=api_key)self.exit_stack = AsyncExitStack()
功能说明:
  • 初始化方法
    • 参数:模型名称、OpenAI API地址、API密钥、MCP服务器脚本路径列表。
    • 属性
      • sessions:存储所有连接的MCP服务器会话(键为服务器ID,值为会话对象和上下文)。
      • tool_mapping:将工具名称映射到具体会话和原始工具名,解决命名冲突。
      • client:OpenAI异步客户端实例。
      • exit_stack:管理异步资源(如会话、STDIO连接)。

3. 初始化会话与资源管理

    async def initialize_sessions(self):"""初始化所有 stdio 服务器连接,并收集工具映射。"""for i, script in enumerate(self.server_scripts):if not (os.path.exists(script) and script.endswith(".py")):print(f"脚本 {script} 不存在或不是 .py 文件,跳过。")continueserver_id = f"server{i}"params = StdioServerParameters(command="python", args=[script], env=None)try:stdio_ctx = stdio_client(params)stdio = await self.exit_stack.enter_async_context(stdio_ctx)session_ctx = ClientSession(*stdio)session = await self.exit_stack.enter_async_context(session_ctx)await session.initialize()self.sessions[server_id] = (session, session_ctx, stdio_ctx)response = await session.list_tools()for tool in response.tools:self.tool_mapping[f"{server_id}_{tool.name}"] = (session, tool.name)print(f"已连接到 {script},工具:{[tool.name for tool in response.tools]}")except Exception as e:print(f"连接 {script} 失败:{e}")
功能说明:
  • 初始化步骤
    1. 遍历服务器脚本:检查脚本是否存在且为 .py 文件。
    2. 创建服务器ID:如 server0, server1
    3. 启动STDIO连接:通过 stdio_client 启动子进程运行脚本。
    4. 创建会话:通过 ClientSession 建立与MCP服务器的通信。
    5. 注册工具:调用 list_tools() 获取工具列表,并将工具名称前缀化。
    6. 异常处理:捕获连接错误并跳过无效脚本。

4. 资源清理方法

    async def cleanup(self):"""释放所有资源。"""try:await self.exit_stack.aclose()print("所有连接资源已释放")except asyncio.CancelledError:passexcept Exception as e:print(f"清理资源时异常:{e}")
功能说明:
  • 释放资源
    • 使用 exit_stack.aclose() 关闭所有通过 enter_async_context 管理的资源(如会话、STDIO连接)。
    • 捕获异常并输出错误信息。

5. 工具收集与映射

    async def _gather_available_tools(self):"""汇总所有服务器的工具列表。"""tools = []for server_id, (session, _, _) in self.sessions.items():response = await session.list_tools()for tool in response.tools:tools.append({"type": "function","function": {"name": f"{server_id}_{tool.name}","description": tool.description,"parameters": tool.inputSchema,}})return tools
功能说明:
  • 工具汇总
    • 遍历所有连接的服务器会话,收集工具元数据。
    • 将工具名称前缀化(如 server0_add),避免命名冲突。
    • 返回工具列表供OpenAI API使用。

6. 查询处理与工具调用

    async def process_query(self, query: str) -> str:"""处理查询,调用 OpenAI API 和相应工具后返回结果。"""messages = [{"role": "user", "content": query}]available_tools = await self._gather_available_tools()try:response = await self.client.chat.completions.create(model=self.model_name, messages=messages, tools=available_tools)except Exception as e:return f"调用 OpenAI API 失败:{e}"final_text = [response.choices[0].message.content or ""]message = response.choices[0].messagewhile message.tool_calls:for call in message.tool_calls:tool_name = call.function.nameif tool_name not in self.tool_mapping:final_text.append(f"未找到工具:{tool_name}")continuesession, original_tool = self.tool_mapping[tool_name]tool_args = json.loads(call.function.arguments)try:result = await session.call_tool(original_tool, tool_args)final_text.append(f"[调用 {tool_name} 参数: {tool_args}]")final_text.append(f"工具结果: {result.content}")except Exception as e:final_text.append(f"调用 {tool_name} 出错:{e}")continuemessages += [{"role": "assistant", "tool_calls": [{"id": call.id,"type": "function","function": {"name": tool_name, "arguments": json.dumps(tool_args)}}]},{"role": "tool", "tool_call_id": call.id, "content": str(result.content)}]try:response = await self.client.chat.completions.create(model=self.model_name, messages=messages, tools=available_tools)except Exception as e:final_text.append(f"调用 OpenAI API 失败:{e}")breakmessage = response.choices[0].messageif message.content:final_text.append(message.content)return "\n".join(final_text)
功能说明:
  • 处理用户查询
    1. 调用OpenAI API:将用户问题发送给模型,并获取可能的工具调用指令。
    2. 循环处理工具调用
      • 解析工具调用指令(如 server0_add)。
      • 通过 tool_mapping 定位具体会话和工具,调用 call_tool() 执行。
      • 将工具结果反馈给模型,继续生成后续响应。
    3. 错误处理:捕获工具调用失败或API错误,返回错误信息。

7. 交互式对话循环

    async def chat_loop(self):"""交互式对话循环,捕获中断平滑退出。"""print("MCP 客户端已启动,输入问题,输入 'quit' 退出。")while True:try:query = input("问题: ").strip()if query.lower() == "quit":breakresult = await self.process_query(query)print("\n" + result)except KeyboardInterrupt:print("\n检测到中断信号,退出。")breakexcept Exception as e:print(f"发生错误:{e}")
功能说明:
  • 交互式循环
    • 用户输入问题,调用 process_query() 处理。
    • 支持输入 quit 退出循环。
    • 捕获 KeyboardInterrupt(如Ctrl+C)并优雅退出。

8. 主函数与命令行参数处理

async def main():model_name = os.getenv("MODEL_NAME", "deepseek-chat")base_url = os.getenv("BASE_URL", "https://api.deepseek.com/v1")api_key = os.getenv("API_KEY")if not api_key:print("未设置 API_KEY 环境变量")sys.exit(1)if len(sys.argv) < 2:print("使用方法: python mcp-client.py <path_to_server_script>")  # 支持多个脚本,用逗号分隔sys.exit(1)# 从命令行参数解析服务器脚本路径server_scripts = sys.argv[1].split(',')if not isinstance(server_scripts, list):print("使用方法: python mcp-client.py server1.py,server2.py")sys.exit(1)client = MCPClient(model_name, base_url, api_key, server_scripts)try:await client.initialize_sessions()await client.chat_loop()except KeyboardInterrupt:print("\n收到中断信号")finally:await client.cleanup()
功能说明:
  • 主函数流程
    1. 环境变量读取:从 .env 获取模型名称、API地址、API密钥。
    2. 参数验证
      • 检查 API_KEY 是否存在。
      • 检查命令行参数是否包含至少一个脚本路径(通过 sys.argv)。
      • 将参数按逗号分割为脚本列表(如 server1.py,server2.py)。
    3. 初始化客户端:传入配置参数和服务器脚本路径列表。
    4. 启动流程
      • initialize_sessions() 连接所有MCP服务器。
      • chat_loop() 启动交互式对话。
    5. 异常处理:捕获中断信号并清理资源。

9. 程序入口

if __name__ == "__main__":try:asyncio.run(main())except KeyboardInterrupt:print("程序已终止。")
功能说明:
  • 启动异步主函数
    • 使用 asyncio.run(main()) 启动异步主函数。
    • 捕获 KeyboardInterrupt(如Ctrl+C)并优雅退出。

关键差异与增强点

  1. 命令行参数支持

    • 新版本支持通过命令行传递多个服务器脚本路径(用逗号分隔),例如:
      python mcp-client.py server1.py,server2.py
      
    • 旧版本的脚本路径是硬编码的 ["server.py"],而新版本改为从命令行读取。
  2. 参数验证增强

    • 新增对命令行参数的检查,确保用户输入了正确的脚本路径。
    • 如果参数格式错误,会打印使用说明并退出。
  3. 代码结构优化

    • 主函数中明确处理脚本路径的解析和类型检查,避免潜在的错误。

代码整体流程

  1. 启动客户端:加载环境变量,实例化 MCPClient
  2. 连接服务器:通过 initialize_sessions() 启动STDIO服务器并注册工具。
  3. 对话循环
    • 用户输入问题,调用OpenAI模型生成响应。
    • 若需要工具调用,通过 tool_mapping 调用对应服务器的工具。
    • 将工具结果反馈给模型,生成最终答案。
  4. 资源释放:退出时通过 cleanup() 释放所有连接。

使用示例

  1. 运行客户端
    python mcp-client.py server1.py,server2.py
    
    输入问题后,客户端会调用OpenAI模型并执行MCP服务器中的工具(如加法、减法等)。

在这里插入图片描述
在这里插入图片描述
两个源码文件已上传云盘,感兴趣的测试宝子们快去试试吧~

相关文章:

  • Precision Machine Dynamics/Mechatronics Design - 6
  • 20242817李臻《Linux⾼级编程实践》第8周
  • 精准评估机器学习模型:从混淆矩阵到核心指标的深度解析+面试常见问题及解析(看这篇就够了)
  • Emacs入门篇2:安装evil插件以便vi老用户可以使用VI快捷键来快速使用Emacs
  • 笔记:react中 父组件怎么获取子组件中的属性或方法
  • 进程和线程(1)
  • RIP动态路由,实现两台PC互通三个路由器,两台电脑
  • 8086微机原理与接口技术复习(1)存储器(2)接口
  • Java 多态
  • 怎么安装python3.5-以及怎么在这个环境下安装包
  • 【解决】Vue + Vite + TS 配置路径别名成功仍爆红
  • Linux常见压缩格式详解
  • Python Cookbook-6.7 有命名子项的元组
  • 量化研究---小果全球大类低相关性动量趋势增强轮动策略实盘设置
  • RHCSA Linux系统 用户和组的管理
  • Kubernetes相关的名词解释Service(15)
  • 海事局发布《船舶智能监控系统技术指南(1.0)》,解读智驱力产品为何成为最佳选择!
  • Linux系统管理与编程13:基于CentOS7.x的LAMP环境部署
  • 高校如何通过打造数字人生态实训室,实现教学改革
  • Java 排序梳理 sort
  • 受贿超8.22亿,新疆维吾尔自治区党委原副书记李鹏新一审被判死缓
  • 视频丨普京称积极对待任何和平倡议
  • 致敬劳动者!今年拟表彰2426名全国劳动模范和先进工作者
  • 观察|首个半马落幕:人形机器人场景应用才刚站上起点
  • 针对“二选一”,美团再次辟谣
  • 美方因涉港问题对中国官员滥施非法单边制裁,外交部:强烈谴责,对等反制