傳輸
模型上下文協議(MCP)中的傳輸為客戶端和服務器之間的通信提供基礎。傳輸層負責處理消息如何發送和接收的底層機制。
消息格式
MCP 使用 JSON-RPC 2.0 作為其傳輸格式。傳輸層負責將 MCP 協議消息轉換為 JSON-RPC 格式進行傳輸,並將接收到的 JSON-RPC 消息轉換回 MCP 協議消息。
有三種類型的 JSON-RPC 消息:
請求
{
jsonrpc: "2.0",
id: number | string,
method: string,
params?: object
}響應
{
jsonrpc: "2.0",
id: number | string,
result?: object,
error?: {
code: number,
message: string,
data?: unknown
}
}通知
{
jsonrpc: "2.0",
method: string,
params?: object
}內置傳輸類型
MCP 包含兩種標準傳輸實現:
標準輸入/輸出 (stdio)
stdio 傳輸通過標準輸入和輸出流實現通信。這對於本地集成和命令行工具特別有用。
使用 stdio 的場景:
- 構建命令行工具
- 實現本地集成
- 需要簡單的進程間通信
- 使用 shell 腳本
| |
| |
async def stdio_server():
try:
# 創建用於雙向通信的流
read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)
async def message_handler():
try:
async with read_stream_writer:
# 消息處理邏輯
pass
except Exception as exc:
logger.error(f"消息處理失敗:{exc}")
raise exc
async with anyio.create_task_group() as tg:
tg.start_soon(message_handler)
try:
# 返回用於通信的流
yield read_stream, write_stream
except Exception as exc:
logger.error(f"傳輸錯誤:{exc}")
raise exc
finally:
tg.cancel_scope.cancel()
await write_stream.aclose()
await read_stream.aclose()
except Exception as exc:
logger.error(f"初始化傳輸失敗:{exc}")
raise exc服務器發送事件 (SSE)
SSE傳輸支持服務器到客戶端的流式傳輸,同時使用HTTP POST請求實現客戶端到服務器的通信。
適用場景:
- 僅需要服務器到客戶端的流式傳輸
- 在受限網絡環境下工作
- 實現簡單的更新操作
const server = new Server({
name: "example-server",
version: "1.0.0"
}, {
capabilities: {}
});
const transport = new SSEServerTransport("/message", response);
await server.connect(transport);const client = new Client({
name: "example-client",
version: "1.0.0"
}, {
capabilities: {}
});
const transport = new SSEClientTransport(
new URL("http://localhost:3000/sse")
);
await client.connect(transport);from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route
app = Server("example-server")
sse = SseServerTransport("/messages")
async def handle_sse(scope, receive, send):
async with sse.connect_sse(scope, receive, send) as streams:
await app.run(streams[0], streams[1], app.create_initialization_options())
async def handle_messages(scope, receive, send):
await sse.handle_post_message(scope, receive, send)
starlette_app = Starlette(
routes=[
Route("/sse", endpoint=handle_sse),
Route("/messages", endpoint=handle_messages, methods=["POST"]),
]
)async with sse_client("http://localhost:8000/sse") as streams:
async with ClientSession(streams[0], streams[1]) as session:
await session.initialize()自定義傳輸
MCP讓實現自定義傳輸變得簡單。任何傳輸實現只需要符合Transport接口即可:
可以實現自定義傳輸用於:
- 自定義網絡協議
- 專用通信通道
- 與現有系統集成
- 性能優化
interface Transport {
// Start processing messages
start(): Promise<void>;
// Send a JSON-RPC message
send(message: JSONRPCMessage): Promise<void>;
// Close the connection
close(): Promise<void>;
// Callbacks
onclose?: () => void;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage) => void;
}Note that while MCP Servers are often implemented with asyncio, we recommend
implementing low-level interfaces like transports with `anyio` for wider compatibility.
```python
@contextmanager
async def create_transport(
read_stream: MemoryObjectReceiveStream[JSONRPCMessage | Exception],
write_stream: MemoryObjectSendStream[JSONRPCMessage]
):
"""
Transport interface for MCP.
Args:
read_stream: Stream to read incoming messages from
write_stream: Stream to write outgoing messages to
"""
async with anyio.create_task_group() as tg:
try:
# Start processing messages
tg.start_soon(lambda: process_messages(read_stream))
# Send messages
async with write_stream:
yield write_stream
except Exception as exc:
# Handle errors
raise exc
finally:
# Clean up
tg.cancel_scope.cancel()
await write_stream.aclose()
await read_stream.aclose()
錯誤處理
傳輸實現應該處理各種錯誤場景:
- 連接錯誤
- 消息解析錯誤
- 協議錯誤
- 網絡超時
- 資源清理
class ExampleTransport implements Transport {
async start() {
try {
// Connection logic
} catch (error) {
this.onerror?.(new Error(`Failed to connect: ${error}`));
throw error;
}
}
async send(message: JSONRPCMessage) {
try {
// Sending logic
} catch (error) {
this.onerror?.(new Error(`Failed to send message: ${error}`));
throw error;
}
}
}Note that while MCP Servers are often implemented with asyncio, we recommend
implementing low-level interfaces like transports with anyio for wider compatibility.
@contextmanager
async def example_transport(scope: Scope, receive: Receive, send: Send):
try:
# Create streams for bidirectional communication
read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)
async def message_handler():
try:
async with read_stream_writer:
# Message handling logic
pass
except Exception as exc:
logger.error(f"Failed to handle message: {exc}")
raise exc
async with anyio.create_task_group() as tg:
tg.start_soon(message_handler)
try:
# Yield streams for communication
yield read_stream, write_stream
except Exception as exc:
logger.error(f"Transport error: {exc}")
raise exc
finally:
tg.cancel_scope.cancel()
await write_stream.aclose()
await read_stream.aclose()
except Exception as exc:
logger.error(f"Failed to initialize transport: {exc}")
raise exc最佳實踐
在實現或使用MCP傳輸時:
- 正確處理連接生命週期
- 實現適當的錯誤處理
- 在連接關閉時清理資源
- 使用適當的超時設置
- 發送前驗證消息
- 記錄傳輸事件以便調試
- 在適當情況下實現重連邏輯
- 處理消息隊列中的背壓
- 監控連接健康狀況
- 實現適當的安全措施
安全注意事項
在實現傳輸時:
身份驗證和授權
- 實現適當的身份驗證機制
- 驗證客戶端憑據
- 使用安全的令牌處理
- 實現授權檢查
數據安全
- 使用TLS進行網絡傳輸
- 加密敏感數據
- 驗證消息完整性
- 實現消息大小限制
- 淨化輸入數據
網絡安全
- 實現速率限制
- 使用適當的超時設置
- 處理拒絕服務場景
- 監控異常模式
- 實現適當的防火牆規則
傳輸調試
調試傳輸問題的建議:
- 啟用調試日誌
- 監控消息流
- 檢查連接狀態
- 驗證消息格式
- 測試錯誤場景
- 使用網絡分析工具
- 實現健康檢查
- 監控資源使用
- 測試邊緣情況
- 使用適當的錯誤跟蹤