【MCP Node.js SDK 全栈进阶指南】高级篇(1):MCP多服务器协作架构
随着业务规模的不断扩大和系统复杂度的提升,单一服务器架构往往无法满足高并发、高可用性和弹性扩展的需求。在MCP生态系统中,多服务器协作架构成为构建大规模应用的必然选择。本文将深入探讨MCP TypeScript-SDK在多服务器环境下的部署、协作和管理,以及如何构建高可用、高性能、易扩展的分布式MCP应用。
目录
-
多服务器架构基础
1.1 MCP多服务器架构概述
1.2 分布式系统的挑战与解决方案
1.3 MCP服务器分类与职责划分
1.4 多服务器架构的常见模式 -
服务发现与路由
2.1 服务注册与发现机制
2.2 基于DNS的服务发现
2.3 动态路由与负载均衡
2.4 服务健康检查与自动恢复 -
状态同步与一致性
3.1 分布式状态管理策略
3.2 事件驱动的状态同步
3.3 实现最终一致性
3.4 数据分区与冲突解决 -
负载均衡与容错设计
4.1 负载均衡策略与实现
4.2 故障检测与自动恢复
4.3 优雅降级与熔断机制
4.4 构建高可用MCP集群
1. 多服务器架构基础
在深入探讨MCP多服务器协作之前,我们需要先了解多服务器架构的核心概念、挑战以及MCP SDK提供的解决方案。
1.1 MCP多服务器架构概述
MCP(Model Context Protocol)作为一种为大型语言模型(LLM)交互设计的协议,其服务架构需要适应复杂多变的业务场景和负载模式。多服务器MCP架构是指将MCP服务分布在多个物理或虚拟服务器上,通过协作方式提供统一的服务能力。
// MCP服务器实例类型
interface McpServerInstance {id: string; // 服务器唯一标识host: string; // 主机地址port: number; // 端口号role: 'master' | 'worker' | 'specialized'; // 服务器角色status: 'online' | 'offline' | 'degraded'; // 服务器状态capacity: { // 服务器容量信息maxConcurrentRequests: number;currentLoad: number;};features: string[]; // 支持的特性startTime: Date; // 启动时间
}// 多服务器集群配置
interface McpClusterConfig {serviceName: string; // 服务名称serviceVersion: string;// 服务版本discoveryMethod: 'static' | 'dns' | 'registry'; // 服务发现方式heartbeatInterval: number; // 心跳间隔syncStrategy: 'event-based' | 'polling' | 'hybrid'; // 同步策略loadBalanceStrategy: 'round-robin' | 'least-connections' | 'consistent-hash'; // 负载均衡策略failoverPolicy: { // 故障转移策略retryAttempts: number;retryDelay: number;circuitBreakerThreshold: number;};
}
多服务器MCP架构具有以下几个核心特点:
- 水平扩展性:通过增加服务器数量来线性提升系统整体处理能力
- 高可用性:即使部分服务器故障,系统整体仍可持续提供服务
- 负载分散:将请求负载分散到多个服务器,避免单点压力过大
- 资源隔离:可按业务功能或客户需求隔离资源,提高安全性和稳定性
- 灵活部署:支持混合云、多区域、边缘计算等多样化部署模式
1.2 分布式系统的挑战与解决方案
构建分布式MCP服务面临一系列挑战,MCP TypeScript-SDK针对这些挑战提供了相应的解决方案:
挑战 | 表现 | MCP SDK解决方案 |
---|---|---|
网络不可靠 | 网络延迟、分区、丢包 | 重试机制、异步通信、断线重连 |
一致性问题 | 数据不一致、冲突 | 事件驱动同步、版本控制、冲突解决策略 |
服务协调 | 服务发现、路由 | 服务注册表、服务健康检查、动态路由 |
故障处理 | 节点故障、服务降级 | 故障检测、自动恢复、熔断机制 |
性能瓶颈 | 请求拥塞、资源竞争 | 负载均衡、请求限流、资源隔离 |
// MCP服务器故障处理配置
interface McpFailoverConfig {detection: {method: 'heartbeat' | 'ping' | 'health-endpoint';interval: number; // 检测间隔timeout: number; // 超时时间thresholds: {warning: number; // 警告阈值critical: number; // 临界阈值}};recovery: {strategy: 'restart' | 'replace' | 'redirect';cooldownPeriod: number; // 冷却时间maxAttempts: number; // 最大尝试次数};notification: {channels: string[]; // 通知渠道templates: Record<string, string>; // 通知模板}
}
1.3 MCP服务器分类与职责划分
在多服务器架构中,不同的MCP服务器可以承担不同的角色和职责,实现功能分离和专业化:
1.3.1 按角色分类
-
主服务器(Master):
- 管理集群状态和配置
- 协调跨服务器的资源分配
- 监控集群健康状态
- 通常部署较少数量,配置较高
-
工作服务器(Worker):
- 处理客户端的实际请求
- 执行MCP资源访问和工具调用
- 可大规模部署,构成系统处理能力的主体
- 通常是无状态的,便于横向扩展
-
专用服务器(Specialized):
- 专注于特定功能或业务场景
- 例如:AI推理服务器、数据处理服务器等
- 可根据需求定制化配置
- 适合资源密集型或安全敏感型业务
// MCP服务器角色定义和职责配置
import { McpServer } from '@modelcontextprotocol/sdk';// 主服务器配置
const masterConfig = {name: "mcp-master-server",description: "MCP集群主服务器",version: "1.0.0",cluster: {role: "master",workers: ["worker-1", "worker-2", "worker-3"],electionStrategy: "fixed", // 固定主服务器stateSync: {method: "push",interval: 5000}}
};// 工作服务器配置
const workerConfig = {name: "mcp-worker-server",description: "MCP集群工作服务器",version: "1.0.0",cluster: {role: "worker",masterId: "master-1",maxConcurrentRequests: 1000,resourceCacheSize: 500,reportInterval: 2000}
};// 专用服务器配置
const specializedConfig = {name: "mcp-specialized-server",description: "MCP图像处理专用服务器",version: "1.0.0",cluster: {role: "specialized",serviceType: "image-processing",supportedOperations: ["resize", "filter", "recognize"],resourceRequirements: {gpu: true,minMemory: "16G",cpuCores: 8}}
};
1.3.2 按功能分类
-
API网关服务器:
- 请求路由和负载均衡
- 认证和授权处理
- 请求限流和缓存
- 请求/响应转换
-
资源服务器:
- 管理MCP资源模板
- 处理资源访问请求
- 资源缓存和优化
- 资源版本控制
-
工具服务器:
- 托管和执行MCP工具
- 工具依赖管理
- 工具执行环境隔离
- 工具性能监控
-
状态同步服务器:
- 维护全局状态
- 协调跨服务器状态同步
- 处理分布式事务
- 解决数据冲突
// 按功能划分的MCP服务器实现示例
import { McpServer } from '@modelcontextprotocol/sdk';
import express from 'express';
import { createProxyMiddleware } from 'http-proxy-middleware';// API网关服务器
const apiGatewayApp = express();
const apiGatewayServer = new McpServer({name: "mcp-api-gateway",description: "MCP API网关服务器",version: "1.0.0"
});// 路由配置
const routeConfig = {'/api/resources': { target: 'http://resource-server:3001', pathRewrite: {'^/api': ''} },'/api/tools': { target: 'http://tool-server:3002', pathRewrite: {'^/api': ''} }
};// 设置API代理
Object.entries(routeConfig).forEach(([path, config]) => {apiGatewayApp.use(path, createProxyMiddleware(config));
});// 资源服务器
const resourceServer = new McpServer({name: "mcp-resource-server",description: "MCP资源服务器",version: "1.0.0"
});// 注册资源模板
resourceServer.registerResourceTemplate({name: "users",description: "用户资源",schema: userSchema
});// 工具服务器
const toolServer = new McpServer({name: "mcp-tool-server",description: "MCP工具服务器",version: "1.0.0"
});// 注册工具
toolServer.registerTool({name: "calculator",description: "计算工具",parameters: calculatorSchema,execute: async (params) => {// 计算逻辑return { result: calculateExpression(params.expression) };}
});
1.4 多服务器架构的常见模式
在实际应用中,MCP多服务器架构可以采用多种模式,根据业务需求和资源条件灵活选择:
1.4.1 主从模式(Master-Slave)
最常见的分布式架构模式,一个主服务器协调多个从服务器的工作。
┌────────────┐ ┌────────────┐
│ │ │ │
│ Master │◄────►│ Slave 1 │
│ │ │ │
└────────────┘ └────────────┘▲│▼
┌────────────┐ ┌────────────┐
│ │ │ │
│ Slave 2 │◄────►│ Slave 3 │
│ │ │ │
└────────────┘ └────────────┘
// 主从模式实现示例
import { McpServer, EventEmitter } from '@modelcontextprotocol/sdk';
import { createClient } from 'redis';// 创建Redis客户端用于服务器间通信
const pubClient = createClient({ url: process.env.REDIS_URL });
const subClient = pubClient.duplicate();// 主服务器实现
class MasterMcpServer extends McpServer {private slaves: Map<string, SlaveInfo> = new Map();constructor(config) {super(config);// 订阅从服务器状态更新subClient.subscribe('slave-status', (message) => {const slaveStatus = JSON.parse(message);this.updateSlaveStatus(slaveStatus);});// 定期检查从服务器健康状态setInterval(() => this.checkSlavesHealth(), 10000);}// 注册从服务器registerSlave(slaveId: string, info: SlaveInfo) {this.slaves.set(slaveId, { ...info, lastHeartbeat: Date.now() });this.emit('slave-registered', { slaveId, info });}// 更新从服务器状态updateSlaveStatus(status: SlaveStatus) {const slave = this.slaves.get(status.slaveId);if (slave) {this.slaves.set(status.slaveId, { ...slave, ...status, lastHeartbeat: Date.now() });}}// 检查从服务器健康状态checkSlavesHealth() {const now = Date.now();for (const [slaveId, info] of this.slaves.entries()) {if (now - info.lastHeartbeat > 30000) { // 30秒无心跳this.emit('slave-offline', { slaveId });this.slaves.delete(slaveId);}}}
}// 从服务器实现
class SlaveMcpServer extends McpServer {private masterId: string;constructor(config) {super(config);this.masterId = config.masterId;// 定期向主服务器发送心跳setInterval(() => this.sendHeartbeat(), 5000);// 订阅主服务器指令subClient.subscribe(`slave-command:${this.id}`, (message) => {this.handleMasterCommand(JSON.parse(message));});}// 发送心跳sendHeartbeat() {const status = {slaveId: this.id,load: this.getCurrentLoad(),memory: process.memoryUsage(),status: 'online'};pubClient.publish('slave-status', JSON.stringify(status));}// 处理主服务器指令handleMasterCommand(command) {switch (command.type) {case 'restart':this.restart();break;case 'update-config':this.updateConfig(command.config);break;// 其他指令处理...}}
}
1.4.2 去中心化模式(Peer-to-Peer)
所有服务器地位平等,通过协作提供服务,无单点故障风险。
┌────────────┐ ┌────────────┐
│ │◄────►│ │
│ Node 1 │ │ Node 2 │
│ │◄────►│ │
└────────────┘ └────────────┘▲ ▲ ▲│ └───────┐ ┌───┘│ ▼ ▼
┌────────────┐ ┌────────────┐
│ │◄────►│ │
│ Node 3 │ │ Node 4 │
│ │◄────►│ │
└────────────┘ └────────────┘
// 去中心化模式实现示例
import { McpServer } from '@modelcontextprotocol/sdk';
import * as IPFS from 'ipfs-core';
import { v4 as uuidv4 } from 'uuid';// P2P MCP服务器
class P2PMcpServer extends McpServer {private ipfs;private peerId: string;private peers: Map<string, PeerInfo> = new Map();private eventTopic = 'mcp-p2p-events';constructor(config) {super(config);this.peerId = uuidv4();this.initializeP2P();}// 初始化P2P网络async initializeP2P() {// 创建IPFS节点this.ipfs = await IPFS.create();// 获取本节点IDconst nodeInfo = await this.ipfs.id();console.log(`P2P节点ID: ${nodeInfo.id}`);// 订阅事件主题await this.ipfs.pubsub.subscribe(this.eventTopic, (msg) => {this.handleP2PEvent(msg);});// 定期发布状态setInterval(() => this.broadcastStatus(), 5000);// 发布上线事件this.broadcastEvent({type: 'node-online',peerId: this.peerId,timestamp: Date.now(),info: {resources: this.getResourceNames(),tools: this.getToolNames(),capacity: this.getCapacity()}});}// 处理P2P事件handleP2PEvent(message) {try {const event = JSON.parse(message.data.toString());// 忽略自己发出的消息if (event.peerId === this.peerId) return;switch (event.type) {case 'node-online':this.addPeer(event.peerId, event.info);// 向新节点发送欢迎消息this.sendDirectEvent(event.peerId, {type: 'welcome',peerId: this.peerId,timestamp: Date.now()});break;case 'node-offline':this.removePeer(event.peerId);break;case 'status-update':this.updatePeerStatus(event.peerId, event.status);break;case 'resource-request':this.handleResourceRequest(event);break;case 'tool-execution':this.handleToolExecution(event);break;}} catch (error) {console.error('处理P2P事件错误:', error);}}// 广播事件到所有节点async broadcastEvent(event) {await this.ipfs.pubsub.publish(this.eventTopic,Buffer.from(JSON.stringify(event)));}// 发送事件到特定节点async sendDirectEvent(targetPeerId, event) {const peer = this.peers.get(targetPeerId);if (!peer || !peer.directTopic) return;await this.ipfs.pubsub.publish(peer.directTopic,Buffer.from(JSON.stringify(event)));}// 定期广播状态broadcastStatus() {this.broadcastEvent({type: 'status-update',peerId: this.peerId,timestamp: Date.now(),status: {load: this.getCurrentLoad(),memory: process.memoryUsage(),uptime: process.uptime()}});}// 添加对等节点addPeer(peerId: string, info: any) {this.peers.set(peerId, {...info,directTopic: `mcp-p2p-direct:${peerId}`,lastSeen: Date.now()});// 订阅此节点的直接通信主题this.ipfs.pubsub.subscribe(`mcp-p2p-direct:${this.peerId}`, (msg) => {this.handleDirectMessage(msg);});}// 移除对等节点removePeer(peerId: string) {this.peers.delete(peerId);}// 更新对等节点状态updatePeerStatus(peerId: string, status: any) {const peer = this.peers.get(peerId);if (peer) {this.peers.set(peerId, { ...peer, ...status, lastSeen: Date.now() });}}
}
1.4.3 微服务模式(Microservices)
将MCP功能分解为多个独立服务,每个服务专注于特定功能域。
┌─────────────────────────────────────────────┐
│ API Gateway │
└─────────────┬─────────────┬─────────────────┘│ │ ┌─────────▼───┐ ┌─────▼───────┐ ┌─────────────┐│ Resource │ │ Tool │ │ Identity ││ Service │ │ Service │ │ Service │└─────────────┘ └─────────────┘ └─────────────┘│ │ │┌─────────▼───┐ ┌─────▼───────┐ ┌─────▼───────┐│ Storage │ │ Execution │ │ Security ││ Service │ │ Service │ │ Service │└─────────────┘ └─────────────┘ └─────────────┘
// 微服务模式实现示例
import { McpServer } from '@modelcontextprotocol/sdk';
import express from 'express';
import axios from 'axios';
import { MongoClient } from 'mongodb';
import { v4 as uuidv4 } from 'uuid';
import amqp from 'amqplib';// 服务发现客户端
class ServiceDiscovery {private serviceRegistry: Map<string, ServiceInfo[]> = new Map();private consulUrl: string;constructor(consulUrl: string) {this.consulUrl = consulUrl;this.refreshRegistry();setInterval(() => this.refreshRegistry(), 30000);}// 刷新服务注册表async refreshRegistry() {try {const response = await axios.get(`${this.consulUrl}/v1/catalog/services`);for (const [serviceName, tags] of Object.entries(response.data)) {if (tags.includes('mcp')) {const instances = await this.getServiceInstances(serviceName);this.serviceRegistry.set(serviceName, instances);}}} catch (error) {console.error('刷新服务注册表失败:', error);}}// 获取服务实例列表async getServiceInstances(serviceName: string): Promise<ServiceInfo[]> {try {const response = await axios.get(`${this.consulUrl}/v1/health/service/${serviceName}?passing=true`);return response.data.map(entry => ({id: entry.Service.ID,address: entry.Service.Address,port: entry.Service.Port,tags: entry.Service.Tags,meta: entry.Service.Meta}));} catch (error) {console.error(`获取服务${serviceName}实例失败:`, error);return [];}}// 获取服务实例getService(serviceName: string, tags: string[] = []): ServiceInfo | null {const instances = this.serviceRegistry.get(serviceName) || [];// 筛选满足标签要求的实例const eligibleInstances = instances.filter(instance => tags.every(tag => instance.tags.includes(tag)));if (eligibleInstances.length === 0) return null;// 简单负载均衡:随机选择一个实例const randomIndex = Math.floor(Math.random() * eligibleInstances.length);return eligibleInstances[randomIndex];}
}// MCP资源服务
class ResourceMcpService {private server: McpServer;private app = express();private mongoClient: MongoClient;private serviceId: string;private discovery: ServiceDiscovery;private messageBroker: amqp.Connection;private channel: amqp.Channel;constructor(config) {this.server = new McpServer({name: "mcp-resource-service",description: "MCP资源微服务",version: "1.0.0"});this.serviceId = uuidv4();this.discovery = new ServiceDiscovery(config.consulUrl);// 初始化Express中间件this.initializeExpress();// 连接MongoDBthis.connectMongo(config.mongoUrl);// 连接消息代理this.connectMessageBroker(config.rabbitMqUrl);// 注册服务this.registerService(config.consulUrl);}// 初始化ExpressinitializeExpress() {this.app.use(express.json());// 资源API端点this.app.get('/resources/:name', this.getResource.bind(this));this.app.post('/resources/:name', this.createResource.bind(this));this.app.put('/resources/:name/:id', this.updateResource.bind(this));this.app.delete('/resources/:name/:id', this.deleteResource.bind(this));// 健康检查端点this.app.get('/health', (req, res) => {res.status(200).json({ status: 'healthy', serviceId: this.serviceId });});// 启动服务器const port = process.env.PORT || 3000;this.app.listen(port, () => {console.log(`资源服务运行在端口 ${port}`);});}// 连接MongoDBasync connectMongo(mongoUrl: string) {try {this.mongoClient = new MongoClient(mongoUrl);await this.mongoClient.connect();console.log('已连接到MongoDB');} catch (error) {console.error('MongoDB连接失败:', error);process.exit(1);}}// 连接消息代理async connectMessageBroker(rabbitMqUrl: string) {try {this.messageBroker = await amqp.connect(rabbitMqUrl);this.channel = await this.messageBroker.createChannel();// 声明交换机和队列await this.channel.assertExchange('mcp-events', 'topic', { durable: true });const { queue } = await this.channel.assertQueue(`resource-service-${this.serviceId}`,{ exclusive: true });// 绑定感兴趣的主题await this.channel.bindQueue(queue, 'mcp-events', 'resource.*');// 消费消息this.channel.consume(queue, (msg) => {if (msg) {this.handleMessage(msg);this.channel.ack(msg);}});console.log('已连接到消息代理');} catch (error) {console.error('消息代理连接失败:', error);}}// 处理消息handleMessage(msg: amqp.ConsumeMessage) {try {const content = JSON.parse(msg.content.toString());const routingKey = msg.fields.routingKey;console.log(`收到消息: ${routingKey}`, content);// 根据路由键处理不同类型的消息switch (routingKey) {case 'resource.created':this.handleResourceCreated(content);break;case 'resource.updated':this.handleResourceUpdated(content);break;case 'resource.deleted':this.handleResourceDeleted(content);break;}} catch (error) {console.error('处理消息错误:', error);}}// 发布事件publishEvent(routingKey: string, data: any) {this.channel.publish('mcp-events',routingKey,Buffer.from(JSON.stringify({...data,serviceId: this.serviceId,timestamp: Date.now()})));}// API处理器async getResource(req, res) {try {const { name } = req.params;const filter = req.query.filter ? JSON.parse(req.query.filter) : {};const db = this.mongoClient.db('mcp-resources');const collection = db.collection(name);const resources = await collection.find(filter).toArray();res.json({ success: true, data: resources });} catch (error) {res.status(500).json({success: false,error: error.message});}}async createResource(req, res) {try {const { name } = req.params;const resourceData = req.body;const db = this.mongoClient.db('mcp-resources');const collection = db.collection(name);const result = await collection.insertOne({...resourceData,_id: uuidv4(),createdAt: new Date(),updatedAt: new Date()});// 发布资源创建事件this.publishEvent('resource.created', {resourceName: name,resourceId: result.insertedId,data: resourceData});res.status(201).json({success: true,data: { id: result.insertedId }});} catch (error) {res.status(500).json({success: false,error: error.message});}}// 其他API处理器...// 注册服务到Consulasync registerService(consulUrl: string) {try {await axios.put(`${consulUrl}/v1/agent/service/register`, {ID: this.serviceId,Name: 'mcp-resource-service',Tags: ['mcp', 'resource'],Address: process.env.SERVICE_HOST || 'localhost',Port: parseInt(process.env.PORT || '3000'),Check: {HTTP: `http://${process.env.SERVICE_HOST || 'localhost'}:${process.env.PORT || '3000'}/health`,Interval: '15s',Timeout: '5s'}});console.log(`服务已注册,ID: ${this.serviceId}`);// 注册服务注销钩子process.on('SIGINT', () => this.deregisterService(consulUrl));process.on('SIGTERM', () => this.deregisterService(consulUrl));} catch (error) {console.error('服务注册失败:', error);}}// 注销服务async deregisterService(consulUrl: string) {try {await axios.put(`${consulUrl}/v1/agent/service/deregister/${this.serviceId}`);console.log('服务已注销');process.exit(0);} catch (error) {console.error('服务注销失败:', error);process.exit(1);}}
}
多服务器架构为MCP应用提供了更高的可用性、可伸缩性和灵活性,但同时也带来了系统复杂度的提升。在后续章节中,我们将深入探讨如何应对这些挑战,构建稳健的多服务器MCP系统。
2. 服务发现与路由
在多服务器MCP架构中,服务发现和路由机制是确保系统高效运行的关键组件。它们使客户端能够动态找到可用的服务实例,并将请求路由到最合适的服务器上。
2.1 服务注册与发现机制
服务发现的核心是让服务消费者能够在不需要硬编码服务提供者地址的情况下,动态找到并调用所需的服务。在MCP系统中,服务发现机制通常涉及以下几个关键组件:
2.1.1 服务注册表
服务注册表是服务发现的核心数据存储,记录了所有可用服务实例的关键信息:
// 服务注册表接口
interface ServiceRegistry {// 注册服务实例register(instance: ServiceInstance): Promise<void>;// 注销服务实例deregister(instanceId: string): Promise<void>;// 获取指定服务的所有实例getInstances(serviceName: string): Promise<ServiceInstance[]>;// 查询符合特定条件的服务实例findServices(query: ServiceQuery): Promise<ServiceInstance[]>;// 监听服务变更watchService(serviceName: string, callback: (instances: ServiceInstance[]) => void): void;
}// 服务实例数据结构
interface ServiceInstance {id: string; // 实例唯一标识serviceName: string; // 服务名称host: string; // 主机地址port: number; // 端口号status: 'UP' | 'DOWN' | 'STARTING' | 'OUT_OF_SERVICE'; // 实例状态metadata: Record<string, string>; // 元数据,如版本、环境等healthCheckUrl?: string; // 健康检查URLregistrationTime: number; // 注册时间lastUpdateTime: number; // 最后更新时间
}
2.1.2 基于MCP实现的分布式服务注册表
以下是一个使用MCP服务器实现的分布式服务注册表示例:
import { McpServer } from '@modelcontextprotocol/sdk';
import { createClient } from 'redis';
import express from 'express';// 基于Redis的MCP服务注册表实现
class McpServiceRegistry implements ServiceRegistry {private redisClient;private readonly KEY_PREFIX = 'mcp:registry:';private readonly TTL = 60; // 服务记录过期时间(秒)constructor(redisUrl: string) {this.redisClient = createClient({ url: redisUrl });this.redisClient.connect();}// 注册服务实例async register(instance: ServiceInstance): Promise<void> {const key = `${this.KEY_PREFIX}${instance.serviceName}:${instance.id}`;const now = Date.now();// 更新注册时间或最后更新时间instance.lastUpdateTime = now;if (!instance.registrationTime) {instance.registrationTime = now;}// 将服务实例信息存储到Redisawait this.redisClient.set(key, JSON.stringify(instance), { EX: this.TTL });// 添加到服务名称集合中,方便按服务名查询await this.redisClient.sAdd(`${this.KEY_PREFIX}${instance.serviceName}`, instance.id);}// 注销服务实例async deregister(instanceId: string): Promise<void> {// 获取服务实例信息const pattern = `${this.KEY_PREFIX}*:${instanceId}`;const keys = await this.redisClient.keys(pattern);if (keys.length > 0) {const key = keys[0];const serviceName = key.split(':')[2];// 从Redis中删除服务实例await this.redisClient.del(key);// 从服务名称集合中移除await this.redisClient.sRem(`${this.KEY_PREFIX}${serviceName}`, instanceId);}}// 获取指定服务的所有实例async getInstances(serviceName: string): Promise<ServiceInstance[]> {const serviceKey = `${this.KEY_PREFIX}${serviceName}`;const instanceIds = await this.redisClient.sMembers(serviceKey);if (instanceIds.length === 0) {return [];}// 批量获取所有服务实例信息const keys = instanceIds.map(id => `${this.KEY_PREFIX}${serviceName}:${id}`);const instancesData = await this.redisClient.mGet(keys);// 过滤并解析有效的实例数据return instancesData.filter(Boolean).map(data => JSON.parse(data)).filter(instance => instance.status !== 'DOWN');}// 查询符合特定条件的服务实例async findServices(query: ServiceQuery): Promise<ServiceInstance[]> {const { serviceName, status, metadata } = query;// 获取所有匹配服务名的实例const instances = await this.getInstances(serviceName);// 应用过滤条件return instances.filter(instance => {// 状态过滤if (status && instance.status !== status) {return false;}// 元数据过滤if (metadata) {for (const [key, value] of Object.entries(metadata)) {if (instance.metadata[key] !== value) {return false;}}}return true;});}// 监听服务变更watchService(serviceName: string, callback: (instances: ServiceInstance[]) => void): void {// 创建订阅客户端const subClient = this.redisClient.duplicate();// 订阅服务变更事件subClient.subscribe(`${this.KEY_PREFIX}${serviceName}:changes`, (message) => {this.getInstances(serviceName).then(callback);});}
}// 服务注册表API服务器
function createRegistryServer(registry: McpServiceRegistry): express.Express {const app = express();app.use(express.json());// 服务注册端点app.post('/services', async (req, res) => {try {const instance = req.body as ServiceInstance;await registry.register(instance);res.status(201).json({ success: true, message: '服务实例已注册' });} catch (error) {res.status(500).json({ success: false, error: error.message });}});// 服务注销端点app.delete('/services/:instanceId', async (req, res) => {try {await registry.deregister(req.params.instanceId);res.status(200).json({ success: true, message: '服务实例已注销' });} catch (error) {res.status(500).json({ success: false, error: error.message });}});// 获取服务实例端点app.get('/services/:serviceName', async (req, res) => {try {const instances = await registry.getInstances(req.params.serviceName);res.status(200).json({ success: true, data: instances });} catch (error) {res.status(500).json({ success: false, error: error.message });}});// 服务查询端点app.get('/services', async (req, res) => {try {const query = {serviceName: req.query.serviceName as string,status: req.query.status as 'UP' | 'DOWN' | 'STARTING' | 'OUT_OF_SERVICE',metadata: req.query.metadata ? JSON.parse(req.query.metadata as string) : undefined};const instances = await registry.findServices(query);res.status(200).json({ success: true, data: instances });} catch (error) {res.status(500).json({ success: false, error: error.message });}});return app;
}// 使用MCP服务器封装服务注册表
class McpRegistryServer extends McpServer {private registry: McpServiceRegistry;private httpServer: express.Express;constructor(config: {name: string;description: string;version: string;redisUrl: string;port: number;}) {super({name: config.name,description: config.description,version: config.version});// 创建服务注册表this.registry = new McpServiceRegistry(config.redisUrl);// 创建HTTP服务器this.httpServer = createRegistryServer(this.registry);// 启动HTTP服务器this.httpServer.listen(config.port, () => {console.log(`注册表服务器运行在端口 ${config.port}`);});// 作为MCP资源暴露服务注册表this.exposeRegistryAsResource();}// 将注册表作为MCP资源暴露private exposeRegistryAsResource() {// 定义服务实例资源模板this.registerResourceTemplate({name: "serviceInstances",description: "MCP服务实例资源",schema: {type: "object",properties: {id: { type: "string" },serviceName: { type: "string" },host: { type: "string" },port: { type: "number" },status: { type: "string", enum: ["UP", "DOWN", "STARTING", "OUT_OF_SERVICE"] },metadata: { type: "object" },healthCheckUrl: { type: "string" },registrationTime: { type: "number" },lastUpdateTime: { type: "number" }},required: ["id", "serviceName", "host",