傳輸

模型上下文協議(MCP)中的傳輸為客戶端和服務器之間的通信提供基礎。傳輸層負責處理消息如何發送和接收的底層機制。

消息格式

MCP 使用 JSON-RPC 2.0 作為其傳輸格式。傳輸層負責將 MCP 協議消息轉換為 JSON-RPC 格式進行傳輸,並將接收到的 JSON-RPC 消息轉換回 MCP 協議消息。

有三種類型的 JSON-RPC 消息:

請求

{
  jsonrpc: "2.0",
  id: number | string,
  method: string,
  params?: object
}

響應

{
  jsonrpc: "2.0",
  id: number | string,
  result?: object,
  error?: {
    code: number,
    message: string,
    data?: unknown
  }
}

通知

{
  jsonrpc: "2.0",
  method: string,
  params?: object
}

內置傳輸類型

MCP 包含兩種標準傳輸實現:

標準輸入/輸出 (stdio)

stdio 傳輸通過標準輸入和輸出流實現通信。這對於本地集成和命令行工具特別有用。

使用 stdio 的場景:

  • 構建命令行工具
  • 實現本地集成
  • 需要簡單的進程間通信
  • 使用 shell 腳本
1
2
3
4
5
6
7
8
9
    const server = new Server({
      name: "example-server",
      version: "1.0.0"
    }, {
      capabilities: {}
    });

    const transport = new StdioServerTransport();
    await server.connect(transport);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    const client = new Client({
      name: "example-client",
      version: "1.0.0"
    }, {
      capabilities: {}
    });

    const transport = new StdioClientTransport({
      command: "./server",
      args: ["--option", "value"]
    });
    await client.connect(transport);
    async def stdio_server():
        try:
            # 創建用於雙向通信的流
            read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
            write_stream, write_stream_reader = anyio.create_memory_object_stream(0)

            async def message_handler():
                try:
                    async with read_stream_writer:
                        # 消息處理邏輯
                        pass
                except Exception as exc:
                    logger.error(f"消息處理失敗:{exc}")
                    raise exc

            async with anyio.create_task_group() as tg:
                tg.start_soon(message_handler)
                try:
                    # 返回用於通信的流
                    yield read_stream, write_stream
                except Exception as exc:
                    logger.error(f"傳輸錯誤:{exc}")
                    raise exc
                finally:
                    tg.cancel_scope.cancel()
                    await write_stream.aclose()
                    await read_stream.aclose()
        except Exception as exc:
            logger.error(f"初始化傳輸失敗:{exc}")
            raise exc

服務器發送事件 (SSE)

SSE傳輸支持服務器到客戶端的流式傳輸,同時使用HTTP POST請求實現客戶端到服務器的通信。

適用場景:

  • 僅需要服務器到客戶端的流式傳輸
  • 在受限網絡環境下工作
  • 實現簡單的更新操作
const server = new Server({
  name: "example-server",
  version: "1.0.0"
}, {
  capabilities: {}
});

const transport = new SSEServerTransport("/message", response);
await server.connect(transport);
const client = new Client({
  name: "example-client",
  version: "1.0.0"
}, {
  capabilities: {}
});

const transport = new SSEClientTransport(
  new URL("http://localhost:3000/sse")
);
await client.connect(transport);
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route

app = Server("example-server")
sse = SseServerTransport("/messages")

async def handle_sse(scope, receive, send):
    async with sse.connect_sse(scope, receive, send) as streams:
        await app.run(streams[0], streams[1], app.create_initialization_options())

async def handle_messages(scope, receive, send):
    await sse.handle_post_message(scope, receive, send)

starlette_app = Starlette(
    routes=[
        Route("/sse", endpoint=handle_sse),
        Route("/messages", endpoint=handle_messages, methods=["POST"]),
    ]
)
async with sse_client("http://localhost:8000/sse") as streams:
    async with ClientSession(streams[0], streams[1]) as session:
        await session.initialize()

自定義傳輸

MCP讓實現自定義傳輸變得簡單。任何傳輸實現只需要符合Transport接口即可:

可以實現自定義傳輸用於:

  • 自定義網絡協議
  • 專用通信通道
  • 與現有系統集成
  • 性能優化
    interface Transport {
      // Start processing messages
      start(): Promise<void>;

      // Send a JSON-RPC message
      send(message: JSONRPCMessage): Promise<void>;

      // Close the connection
      close(): Promise<void>;

      // Callbacks
      onclose?: () => void;
      onerror?: (error: Error) => void;
      onmessage?: (message: JSONRPCMessage) => void;
    }
Note that while MCP Servers are often implemented with asyncio, we recommend
implementing low-level interfaces like transports with `anyio` for wider compatibility.
```python
@contextmanager
async def create_transport(
    read_stream: MemoryObjectReceiveStream[JSONRPCMessage | Exception],
    write_stream: MemoryObjectSendStream[JSONRPCMessage]
):
    """
    Transport interface for MCP.

    Args:
        read_stream: Stream to read incoming messages from
        write_stream: Stream to write outgoing messages to
    """
    async with anyio.create_task_group() as tg:
        try:
            # Start processing messages
            tg.start_soon(lambda: process_messages(read_stream))

            # Send messages
            async with write_stream:
                yield write_stream

        except Exception as exc:
            # Handle errors
            raise exc
        finally:
            # Clean up
            tg.cancel_scope.cancel()
            await write_stream.aclose()
            await read_stream.aclose()

錯誤處理

傳輸實現應該處理各種錯誤場景:

  1. 連接錯誤
  2. 消息解析錯誤
  3. 協議錯誤
  4. 網絡超時
  5. 資源清理
    class ExampleTransport implements Transport {
      async start() {
        try {
          // Connection logic
        } catch (error) {
          this.onerror?.(new Error(`Failed to connect: ${error}`));
          throw error;
        }
      }

      async send(message: JSONRPCMessage) {
        try {
          // Sending logic
        } catch (error) {
          this.onerror?.(new Error(`Failed to send message: ${error}`));
          throw error;
        }
      }
    }

Note that while MCP Servers are often implemented with asyncio, we recommend implementing low-level interfaces like transports with anyio for wider compatibility.

    @contextmanager
    async def example_transport(scope: Scope, receive: Receive, send: Send):
        try:
            # Create streams for bidirectional communication
            read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
            write_stream, write_stream_reader = anyio.create_memory_object_stream(0)

            async def message_handler():
                try:
                    async with read_stream_writer:
                        # Message handling logic
                        pass
                except Exception as exc:
                    logger.error(f"Failed to handle message: {exc}")
                    raise exc

            async with anyio.create_task_group() as tg:
                tg.start_soon(message_handler)
                try:
                    # Yield streams for communication
                    yield read_stream, write_stream
                except Exception as exc:
                    logger.error(f"Transport error: {exc}")
                    raise exc
                finally:
                    tg.cancel_scope.cancel()
                    await write_stream.aclose()
                    await read_stream.aclose()
        except Exception as exc:
            logger.error(f"Failed to initialize transport: {exc}")
            raise exc

最佳實踐

在實現或使用MCP傳輸時:

  1. 正確處理連接生命週期
  2. 實現適當的錯誤處理
  3. 在連接關閉時清理資源
  4. 使用適當的超時設置
  5. 發送前驗證消息
  6. 記錄傳輸事件以便調試
  7. 在適當情況下實現重連邏輯
  8. 處理消息隊列中的背壓
  9. 監控連接健康狀況
  10. 實現適當的安全措施

安全注意事項

在實現傳輸時:

身份驗證和授權

  • 實現適當的身份驗證機制
  • 驗證客戶端憑據
  • 使用安全的令牌處理
  • 實現授權檢查

數據安全

  • 使用TLS進行網絡傳輸
  • 加密敏感數據
  • 驗證消息完整性
  • 實現消息大小限制
  • 淨化輸入數據

網絡安全

  • 實現速率限制
  • 使用適當的超時設置
  • 處理拒絕服務場景
  • 監控異常模式
  • 實現適當的防火牆規則

傳輸調試

調試傳輸問題的建議:

  1. 啟用調試日誌
  2. 監控消息流
  3. 檢查連接狀態
  4. 驗證消息格式
  5. 測試錯誤場景
  6. 使用網絡分析工具
  7. 實現健康檢查
  8. 監控資源使用
  9. 測試邊緣情況
  10. 使用適當的錯誤跟蹤