Python-MCPServer开发-模拟版本
Python-MCPServer开发-模拟版本
原则,简化业务的逻辑部分,学习核心业务逻辑(所有的业务逻辑都是模拟出来的)
1)空工程初始化环境
mkdir my_project
cd my_project
python -m venv .venv
2)激活环境
source .venv/bin/activate
3)添加依赖
对应的依赖是在激活的环境中
pip install uv httpx mcp
4)创建Python天气服务
import logging# 假设 mcp 已经正确导入
try:from mcp import tool
except ImportError:# 如果 mcp 未找到,模拟一个 tool 装饰器def tool(func):return func# 配置日志打印级别
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)# 定义天气服务
class WeatherDataProvider:# 模拟的天气数据WEATHER_DATA = {"北京": {"condition": "晴","temperature": 25,"humidity": 40},"上海": {"condition": "多云","temperature": 27,"humidity": 60},"广州": {"condition": "雨","temperature": 30,"humidity": 80},"深圳": {"condition": "多云","temperature": 29,"humidity": 70},"杭州": {"condition": "晴","temperature": 26,"humidity": 50}}@toolasync def get_weather(self, city: str) -> str:"""获取指定城市的天气信息。参数:city (str): 城市名称返回:str: 天气信息描述"""logging.info(f"获取天气信息: {city}")if city in self.WEATHER_DATA:weather = self.WEATHER_DATA[city]return f"{city} : {weather['condition']} , {weather['temperature']} °C,湿度 {weather['humidity']} %"else:return f"抱歉,未找到 {city} 的天气信息"
5)调用Python天气服务
import asynciofrom weather_01_server import WeatherDataProviderclass WeatherInfoUser:def __init__(self):self.weather_provider = WeatherDataProvider()async def get_city_weather(self, city: str):result = await self.weather_provider.get_weather(city)return resultasync def main():user = WeatherInfoUser()city = "北京"weather_info = await user.get_city_weather(city)print(weather_info)if __name__ == "__main__":asyncio.run(main())
6)创建Python城市服务
import logging# 假设 mcp 已经正确导入
try:from mcp import tool
except ImportError:# 如果 mcp 未找到,模拟一个 tool 装饰器def tool(func):return func# 配置日志打印级别
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)# 定义城市服务
class CityDataProvider:# 模拟城市的天气数据CITY_WEATHER_DATA = {"北京": {"condition": "晴","temperature": 25,"humidity": 40},"上海": {"condition": "多云","temperature": 27,"humidity": 60},"广州": {"condition": "雨","temperature": 30,"humidity": 80},"深圳": {"condition": "多云","temperature": 29,"humidity": 70},"杭州": {"condition": "晴","temperature": 26,"humidity": 50}}@toolasync def get_weather(self, city: str) -> str:"""获取指定城市的天气信息。参数:city (str): 城市名称返回:str: 天气信息描述"""logging.info(f"获取天气信息: {city}")if city in self.CITY_WEATHER_DATA:weather = self.CITY_WEATHER_DATA[city]return f"{city} : {weather['condition']} , {weather['temperature']} °C,湿度 {weather['humidity']} %"else:return f"抱歉,未找到 {city} 的天气信息"@toolasync def get_city_list(self) -> list:"""获取所有的城市信息。返回:str: 所有的城市信息列表"""logging.info(f"获取所有的城市信息")city_list = []for city in self.CITY_WEATHER_DATA:city_list.append(city)return city_list@toolasync def get_city_detail(self, city: str) -> str:"""获取指定城市的信息。参数:city (str): 城市名称返回:str: 城市信息"""logging.info(f"获取指定城市的信息: {city}")if city in await self.get_city_list():return f"{city} : 一个风景秀丽的城市,你值得去玩一把"else:return f"抱歉,未找到 {city} 的城市信息"
7)调用Python城市服务
import asynciofrom city_03_server import CityDataProviderclass CityInfoTest:def __init__(self):self.city_provider = CityDataProvider()# 获取指定城市的天气信息async def get_city_weather(self, city: str):result = await self.city_provider.get_weather(city)return result# 获取所有城市列表async def get_city_list(self):result = await self.city_provider.get_city_list()return result# 获取指定城市的信息async def get_city_detail(self, city: str):result = await self.city_provider.get_city_detail(city)return resultasync def main():user = CityInfoTest()city = "北京"city_weather_info = await user.get_city_weather(city)print("\n", city_weather_info)city_list = await user.get_city_list()print("\n", city_list)city_info = await user.get_city_detail(city)print("\n", city_info)if __name__ == "__main__":asyncio.run(main())
8)MCP Inspector调试
1-安装MCP Inspector
pip install mcp[cli]
2-运行MCP Inspector服务
mcp dev city_06_mcp_server.py
3-访问MCP Inspector网页
- http://127.0.0.1:6274