MCP实践第一步--磕磕碰碰搭环境
由于deepseek-r1不支持function calling,所以我们采用了deepseek-v3进行实践,模型名称为deepseek-chat,在deepseek官网获取api-key。
一、参照MCP官网设置环境
创建项目目录
uv init mcp-client # 若没有uv,则先通过pip install uv进行安装
cd mcp-client
创建虚拟环境
uv venv # 这里可以指定python版本,版本不低于3.10
激活虚拟环境
在 Windows 上:
.venv\Scripts\activate
在 Unix 或 MacOS 上:
source .venv/bin/activate
安装所需的包
uv add mcp openai python-dotenv
修改main.py文件为mcp-client.py文件
二、环境变量配置
1. 创建一个 .env 文件
2. 配置环境变量
创建.env
文件存储必要参数:
API_KEY=sk-2470be72b342438eb14d240e0fb31xxx # 此处需替换
BASE_URL=https://api.deepseek.com
MODEL_NAME=deepseek-chat
三、MCP Server实现:打造你的工具箱
好的,我将按照代码的功能模块进行分块讲解。以下是分块后的代码解析:
1. 导入模块与配置日志
import asyncio
import logging
from mcp.server.fastmcp import FastMCP
from mcp.server import InitializationOptions, NotificationOptions
from mcp.server.stdio import stdio_server # 直接导入 stdio_server 函数# 定义服务器名称
MCP_SERVER_NAME = "math-stdio-server"# 配置日志
logging.basicConfig(level=logging.INFO,format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(MCP_SERVER_NAME)
功能说明:
- 导入模块:引入所需的异步库(
asyncio
)、日志库(logging
),以及MCP协议相关的类和函数。 - 定义服务器名称:
MCP_SERVER_NAME
用于标识服务器实例。 - 配置日志:
logging.basicConfig
设置日志级别为INFO
,并定义日志格式(时间、名称、级别、消息)。logger
是服务器的专用日志记录器,方便后续输出日志信息。
2. 初始化 FastMCP 实例
# 初始化 FastMCP 实例
mcp = FastMCP(MCP_SERVER_NAME)
功能说明:
- 创建 FastMCP 对象:通过
FastMCP
类初始化一个 MCP(Math Calculation Protocol)服务器实例,参数为服务器名称MCP_SERVER_NAME
。 - 后续操作:通过装饰器
@mcp.tool()
注册工具函数,这些函数将暴露给客户端调用。
3. 注册数学工具函数
# 注册加法工具
@mcp.tool()
def add(a: float, b: float) -> float:"""..."""return a + b# 注册减法工具
@mcp.tool()
def subtract(a: float, b: float) -> float:"""..."""return a - b# 注册乘法工具
@mcp.tool()
def multiply(a: float, b: float) -> float:"""..."""return a * b# 注册除法工具
@mcp.tool()
def divide(a: float, b: float) -> float:"""..."""if b == 0:raise ValueError("除数不能为零")return a / b
功能说明:
- 装饰器
@mcp.tool()
:将函数注册为MCP服务器的可用工具,客户端可通过协议调用这些函数。 - 每个工具函数:
- 参数与返回值:明确指定参数类型(
float
)和返回值类型(float
),便于客户端验证。 - 文档字符串:描述工具功能、参数和返回值,这些信息会被MCP协议自动生成为工具的元数据。
- 错误处理:例如
divide
函数检查除数是否为零,抛出ValueError
。
- 参数与返回值:明确指定参数类型(
4. 主函数:启动异步 STDIO 服务器
async def main():# 使用 stdio_server 建立 STDIO 通信async with stdio_server() as (read_stream, write_stream):# 构造初始化选项init_options = InitializationOptions(server_name=MCP_SERVER_NAME,server_version="1.0.0",capabilities=mcp._mcp_server.get_capabilities(notification_options=NotificationOptions(),experimental_capabilities={}))logger.info("通过 STDIO 模式启动 MCP Server ...")# 使用内部的 _mcp_server 运行服务await mcp._mcp_server.run(read_stream, write_stream, init_options)
功能说明:
- 异步上下文管理器
stdio_server()
:- 建立通过标准输入输出(STDIO)的通信通道,
read_stream
和write_stream
分别用于读取客户端请求和发送响应。
- 建立通过标准输入输出(STDIO)的通信通道,
- 初始化选项
init_options
:- 设置服务器名称、版本号。
- 通过
get_capabilities()
获取服务器支持的功能(如通知选项、实验性功能)。
- 运行服务器:
- 调用
mcp._mcp_server.run()
启动服务,传入通信流和初始化选项。 - 服务器会监听STDIO的输入,并根据客户端请求调用注册的工具函数。
- 调用
5. 程序入口
if __name__ == "__main__":asyncio.run(main())
功能说明:
- 入口条件:当脚本直接运行时(而非被导入),执行
asyncio.run(main())
。 - 启动异步事件循环:
asyncio.run()
启动异步主函数main()
,从而启动整个MCP服务器。
代码整体流程
- 初始化:配置日志、创建MCP实例、注册工具函数。
- 启动服务器:
- 通过STDIO建立通信通道。
- 发送初始化信息(名称、版本、功能)给客户端。
- 运行服务:监听STDIO输入,处理客户端请求(如调用
add
、divide
等工具函数),并返回结果。
关键点说明
-
STDIO 通信:
- 服务器通过标准输入输出(STDIO)与客户端通信,适合在进程间或通过管道传递数据。
stdio_server()
返回的流对象用于异步读写。
-
MCP 协议:
FastMCP
是MCP协议的实现,通过装饰器注册工具函数,客户端可通过协议调用这些函数。- 工具函数的元数据(如参数类型、文档)会被自动暴露给客户端。
-
错误处理:
- 工具函数中显式抛出异常(如
divide
中的ValueError
),客户端会收到错误信息。
- 工具函数中显式抛出异常(如
-
异步编程:
- 使用
async/await
实现异步IO,适合处理高并发或长时间运行的任务。
- 使用
四、MCP Client客户端实现
好的,我将基于您提供的代码重新进行分块讲解。以下是代码的模块化解析:
1. 导入模块与环境变量配置
import asyncio
import json
import os
import sys
from typing import List
from contextlib import AsyncExitStackfrom mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from openai import AsyncOpenAI
from dotenv import load_dotenvload_dotenv() # 从 .env 加载环境变量
功能说明:
- 导入模块:
asyncio
:异步编程核心。json
:处理JSON数据。os
/sys
:系统操作和命令行参数处理。typing.List
:类型注解。contextlib.AsyncExitStack
:管理异步资源。mcp
:MCP协议客户端工具。openai
:与OpenAI API交互。dotenv
:加载环境变量。
- 环境变量加载:通过
load_dotenv()
加载.env
中的配置(如API密钥)。
2. MCPClient 类定义
class MCPClient:def __init__(self, model_name: str, base_url: str, api_key: str, server_scripts: List[str]):"""初始化 MCP 客户端,支持多个 stdio 服务器。"""self.model_name = model_nameself.base_url = base_urlself.api_key = api_keyself.server_scripts = server_scriptsself.sessions = {} # server_id -> (session, session_ctx, stdio_ctx)self.tool_mapping = {} # 带前缀的工具名 -> (session, 原始工具名)self.client = AsyncOpenAI(base_url=base_url, api_key=api_key)self.exit_stack = AsyncExitStack()
功能说明:
- 初始化方法:
- 参数:模型名称、OpenAI API地址、API密钥、MCP服务器脚本路径列表。
- 属性:
sessions
:存储所有连接的MCP服务器会话(键为服务器ID,值为会话对象和上下文)。tool_mapping
:将工具名称映射到具体会话和原始工具名,解决命名冲突。client
:OpenAI异步客户端实例。exit_stack
:管理异步资源(如会话、STDIO连接)。
3. 初始化会话与资源管理
async def initialize_sessions(self):"""初始化所有 stdio 服务器连接,并收集工具映射。"""for i, script in enumerate(self.server_scripts):if not (os.path.exists(script) and script.endswith(".py")):print(f"脚本 {script} 不存在或不是 .py 文件,跳过。")continueserver_id = f"server{i}"params = StdioServerParameters(command="python", args=[script], env=None)try:stdio_ctx = stdio_client(params)stdio = await self.exit_stack.enter_async_context(stdio_ctx)session_ctx = ClientSession(*stdio)session = await self.exit_stack.enter_async_context(session_ctx)await session.initialize()self.sessions[server_id] = (session, session_ctx, stdio_ctx)response = await session.list_tools()for tool in response.tools:self.tool_mapping[f"{server_id}_{tool.name}"] = (session, tool.name)print(f"已连接到 {script},工具:{[tool.name for tool in response.tools]}")except Exception as e:print(f"连接 {script} 失败:{e}")
功能说明:
- 初始化步骤:
- 遍历服务器脚本:检查脚本是否存在且为
.py
文件。 - 创建服务器ID:如
server0
,server1
。 - 启动STDIO连接:通过
stdio_client
启动子进程运行脚本。 - 创建会话:通过
ClientSession
建立与MCP服务器的通信。 - 注册工具:调用
list_tools()
获取工具列表,并将工具名称前缀化。 - 异常处理:捕获连接错误并跳过无效脚本。
- 遍历服务器脚本:检查脚本是否存在且为
4. 资源清理方法
async def cleanup(self):"""释放所有资源。"""try:await self.exit_stack.aclose()print("所有连接资源已释放")except asyncio.CancelledError:passexcept Exception as e:print(f"清理资源时异常:{e}")
功能说明:
- 释放资源:
- 使用
exit_stack.aclose()
关闭所有通过enter_async_context
管理的资源(如会话、STDIO连接)。 - 捕获异常并输出错误信息。
- 使用
5. 工具收集与映射
async def _gather_available_tools(self):"""汇总所有服务器的工具列表。"""tools = []for server_id, (session, _, _) in self.sessions.items():response = await session.list_tools()for tool in response.tools:tools.append({"type": "function","function": {"name": f"{server_id}_{tool.name}","description": tool.description,"parameters": tool.inputSchema,}})return tools
功能说明:
- 工具汇总:
- 遍历所有连接的服务器会话,收集工具元数据。
- 将工具名称前缀化(如
server0_add
),避免命名冲突。 - 返回工具列表供OpenAI API使用。
6. 查询处理与工具调用
async def process_query(self, query: str) -> str:"""处理查询,调用 OpenAI API 和相应工具后返回结果。"""messages = [{"role": "user", "content": query}]available_tools = await self._gather_available_tools()try:response = await self.client.chat.completions.create(model=self.model_name, messages=messages, tools=available_tools)except Exception as e:return f"调用 OpenAI API 失败:{e}"final_text = [response.choices[0].message.content or ""]message = response.choices[0].messagewhile message.tool_calls:for call in message.tool_calls:tool_name = call.function.nameif tool_name not in self.tool_mapping:final_text.append(f"未找到工具:{tool_name}")continuesession, original_tool = self.tool_mapping[tool_name]tool_args = json.loads(call.function.arguments)try:result = await session.call_tool(original_tool, tool_args)final_text.append(f"[调用 {tool_name} 参数: {tool_args}]")final_text.append(f"工具结果: {result.content}")except Exception as e:final_text.append(f"调用 {tool_name} 出错:{e}")continuemessages += [{"role": "assistant", "tool_calls": [{"id": call.id,"type": "function","function": {"name": tool_name, "arguments": json.dumps(tool_args)}}]},{"role": "tool", "tool_call_id": call.id, "content": str(result.content)}]try:response = await self.client.chat.completions.create(model=self.model_name, messages=messages, tools=available_tools)except Exception as e:final_text.append(f"调用 OpenAI API 失败:{e}")breakmessage = response.choices[0].messageif message.content:final_text.append(message.content)return "\n".join(final_text)
功能说明:
- 处理用户查询:
- 调用OpenAI API:将用户问题发送给模型,并获取可能的工具调用指令。
- 循环处理工具调用:
- 解析工具调用指令(如
server0_add
)。 - 通过
tool_mapping
定位具体会话和工具,调用call_tool()
执行。 - 将工具结果反馈给模型,继续生成后续响应。
- 解析工具调用指令(如
- 错误处理:捕获工具调用失败或API错误,返回错误信息。
7. 交互式对话循环
async def chat_loop(self):"""交互式对话循环,捕获中断平滑退出。"""print("MCP 客户端已启动,输入问题,输入 'quit' 退出。")while True:try:query = input("问题: ").strip()if query.lower() == "quit":breakresult = await self.process_query(query)print("\n" + result)except KeyboardInterrupt:print("\n检测到中断信号,退出。")breakexcept Exception as e:print(f"发生错误:{e}")
功能说明:
- 交互式循环:
- 用户输入问题,调用
process_query()
处理。 - 支持输入
quit
退出循环。 - 捕获
KeyboardInterrupt
(如Ctrl+C)并优雅退出。
- 用户输入问题,调用
8. 主函数与命令行参数处理
async def main():model_name = os.getenv("MODEL_NAME", "deepseek-chat")base_url = os.getenv("BASE_URL", "https://api.deepseek.com/v1")api_key = os.getenv("API_KEY")if not api_key:print("未设置 API_KEY 环境变量")sys.exit(1)if len(sys.argv) < 2:print("使用方法: python mcp-client.py <path_to_server_script>") # 支持多个脚本,用逗号分隔sys.exit(1)# 从命令行参数解析服务器脚本路径server_scripts = sys.argv[1].split(',')if not isinstance(server_scripts, list):print("使用方法: python mcp-client.py server1.py,server2.py")sys.exit(1)client = MCPClient(model_name, base_url, api_key, server_scripts)try:await client.initialize_sessions()await client.chat_loop()except KeyboardInterrupt:print("\n收到中断信号")finally:await client.cleanup()
功能说明:
- 主函数流程:
- 环境变量读取:从
.env
获取模型名称、API地址、API密钥。 - 参数验证:
- 检查
API_KEY
是否存在。 - 检查命令行参数是否包含至少一个脚本路径(通过
sys.argv
)。 - 将参数按逗号分割为脚本列表(如
server1.py,server2.py
)。
- 检查
- 初始化客户端:传入配置参数和服务器脚本路径列表。
- 启动流程:
initialize_sessions()
连接所有MCP服务器。chat_loop()
启动交互式对话。
- 异常处理:捕获中断信号并清理资源。
- 环境变量读取:从
9. 程序入口
if __name__ == "__main__":try:asyncio.run(main())except KeyboardInterrupt:print("程序已终止。")
功能说明:
- 启动异步主函数:
- 使用
asyncio.run(main())
启动异步主函数。 - 捕获
KeyboardInterrupt
(如Ctrl+C)并优雅退出。
- 使用
关键差异与增强点
-
命令行参数支持:
- 新版本支持通过命令行传递多个服务器脚本路径(用逗号分隔),例如:
python mcp-client.py server1.py,server2.py
- 旧版本的脚本路径是硬编码的
["server.py"]
,而新版本改为从命令行读取。
- 新版本支持通过命令行传递多个服务器脚本路径(用逗号分隔),例如:
-
参数验证增强:
- 新增对命令行参数的检查,确保用户输入了正确的脚本路径。
- 如果参数格式错误,会打印使用说明并退出。
-
代码结构优化:
- 主函数中明确处理脚本路径的解析和类型检查,避免潜在的错误。
代码整体流程
- 启动客户端:加载环境变量,实例化
MCPClient
。 - 连接服务器:通过
initialize_sessions()
启动STDIO服务器并注册工具。 - 对话循环:
- 用户输入问题,调用OpenAI模型生成响应。
- 若需要工具调用,通过
tool_mapping
调用对应服务器的工具。 - 将工具结果反馈给模型,生成最终答案。
- 资源释放:退出时通过
cleanup()
释放所有连接。
使用示例
- 运行客户端:
输入问题后,客户端会调用OpenAI模型并执行MCP服务器中的工具(如加法、减法等)。python mcp-client.py server1.py,server2.py
两个源码文件已上传云盘,感兴趣的测试宝子们快去试试吧~