核心架構

模型上下文協議(MCP)建立在靈活、可擴展的架構之上,可以實現 LLM 應用程序和集成之間的無縫通信。本文檔介紹核心架構組件和概念。

概述

MCP 遵循客戶端-服務器架構,其中:

  • 宿主 是啟動連接的 LLM 應用程序(如 Claude Desktop 或 IDE)
  • 客戶端 在宿主應用程序內部與服務器保持 1:1 連接
  • 服務器 為客戶端提供上下文、工具和提示
  flowchart LR
    subgraph " 宿主 (如 Claude Desktop) "
        client1[MCP 客戶端]
        client2[MCP 客戶端]
    end
    subgraph "服務器進程"
        server1[MCP 服務器]
    end
    subgraph "服務器進程"
        server2[MCP 服務器]
    end

    client1 <-->|傳輸層| server1
    client2 <-->|傳輸層| server2

核心組件

協議層

協議層處理消息幀、請求/響應鏈接和高級通信模式。

    class Protocol<Request, Notification, Result> {
        // Handle incoming requests
        setRequestHandler<T>(schema: T, handler: (request: T, extra: RequestHandlerExtra) => Promise<Result>): void

        // Handle incoming notifications
        setNotificationHandler<T>(schema: T, handler: (notification: T) => Promise<void>): void

        // Send requests and await responses
        request<T>(request: Request, schema: T, options?: RequestOptions): Promise<T>

        // Send one-way notifications
        notification(notification: Notification): Promise<void>
    }
    class Session(BaseSession[RequestT, NotificationT, ResultT]):
        async def send_request(
            self,
            request: RequestT,
            result_type: type[Result]
        ) -> Result:
            """
            Send request and wait for response. Raises McpError if response contains error.
            """
            # Request handling implementation

        async def send_notification(
            self,
            notification: NotificationT
        ) -> None:
            """Send one-way notification that doesn't expect response."""
            # Notification handling implementation

        async def _received_request(
            self,
            responder: RequestResponder[ReceiveRequestT, ResultT]
        ) -> None:
            """Handle incoming request from other side."""
            # Request handling implementation

        async def _received_notification(
            self,
            notification: ReceiveNotificationT
        ) -> None:
            """Handle incoming notification from other side."""
            # Notification handling implementation

主要類包括:

  • Protocol
  • Client
  • Server

傳輸層

傳輸層處理客戶端和服務器之間的實際通信。MCP 支持多種傳輸機制:

  1. 標準輸入輸出傳輸

    • 使用標準輸入/輸出進行通信
    • 適用於本地進程
  2. 帶 SSE 的 HTTP 傳輸

    • 使用服務器發送事件(SSE)進行服務器到客戶端的消息傳輸
    • 使用 HTTP POST 進行客戶端到服務器的消息傳輸

所有傳輸都使用 JSON-RPC 2.0 來交換消息。有關模型上下文協議消息格式的詳細信息,請參閱規範

消息類型

MCP 有以下主要消息類型:

  1. 請求 期望從另一方得到響應:

    interface Request {
      method: string;
      params?: { ... };
    }
  2. 通知 是不期望響應的單向消息:

    interface Notification {
      method: string;
      params?: { ... };
    }
  3. 結果 是對請求的成功響應:

    interface Result {
      [key: string]: unknown;
    }
  4. 錯誤 表示請求失敗:

    interface Error {
      code: number;
      message: string;
      data?: unknown;
    }

連接生命週期

1. 初始化

  sequenceDiagram
    participant Client
    participant Server

    Client->>Server: 初始化請求
    Server->>Client: 初始化響應
    Client->>Server: 已初始化通知

    Note over Client,Server: 連接準備就緒可以使用
  1. 客戶端發送帶有協議版本和功能的 initialize 請求
  2. 服務器響應其協議版本和功能
  3. 客戶端發送 initialized 通知作為確認
  4. 開始正常的消息交換

2. 消息交換

初始化後,支持以下模式:

  • 請求-響應: 客戶端或服務器發送請求,另一方響應
  • 通知: 任何一方發送單向消息

3. 終止

任何一方都可以終止連接:

  • 通過 close() 進行乾淨關閉
  • 傳輸斷開
  • 錯誤條件

錯誤處理

MCP 定義了這些標準錯誤代碼:

enum ErrorCode {
  // 標準 JSON-RPC 錯誤代碼
  ParseError = -32700,
  InvalidRequest = -32600,
  MethodNotFound = -32601,
  InvalidParams = -32602,
  InternalError = -32603
}

SDK 和應用程序可以定義自己的大於 -32000 的錯誤代碼。

錯誤通過以下方式傳播:

  • 對請求的錯誤響應
  • 傳輸上的錯誤事件
  • 協議級錯誤處理程序

實現示例

這是實現 MCP 服務器的基本示例:

    import { Server } from "@modelcontextprotocol/sdk/server/index.js";
    import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";

    const server = new Server({
      name: "example-server",
      version: "1.0.0"
    }, {
      capabilities: {
        resources: {}
      }
    });

    // Handle requests
    server.setRequestHandler(ListResourcesRequestSchema, async () => {
      return {
        resources: [
          {
            uri: "example://resource",
            name: "Example Resource"
          }
        ]
      };
    });

    // Connect transport
    const transport = new StdioServerTransport();
    await server.connect(transport);
    import asyncio
    import mcp.types as types
    from mcp.server import Server
    from mcp.server.stdio import stdio_server

    app = Server("example-server")

    @app.list_resources()
    async def list_resources() -> list[types.Resource]:
        return [
            types.Resource(
                uri="example://resource",
                name="Example Resource"
            )
        ]

    async def main():
        async with stdio_server() as streams:
            await app.run(
                streams[0],
                streams[1],
                app.create_initialization_options()
            )

    if __name__ == "__main__":
        asyncio.run(main)

最佳實踐

傳輸選擇

  1. 本地通信

    • 對本地進程使用標準輸入輸出傳輸
    • 適用於同一機器上的通信
    • 簡單的進程管理
  2. 遠程通信

    • 在需要 HTTP 兼容性的場景中使用 SSE
    • 考慮包括身份驗證和授權在內的安全影響

消息處理

  1. 請求處理

    • 徹底驗證輸入
    • 使用類型安全的模式
    • 優雅地處理錯誤
    • 實現超時
  2. 進度報告

    • 對長時間操作使用進度令牌
    • 逐步報告進度
    • 在已知時包含總進度
  3. 錯誤管理

    • 使用適當的錯誤代碼
    • 包含有用的錯誤消息
    • 出錯時清理資源

安全考慮

  1. 傳輸安全

    • 遠程連接使用 TLS
    • 驗證連接來源
    • 在需要時實現身份驗證
  2. 消息驗證

    • 驗證所有傳入消息
    • 淨化輸入
    • 檢查消息大小限制
    • 驗證 JSON-RPC 格式
  3. 資源保護

    • 實現訪問控制
    • 驗證資源路徑
    • 監控資源使用
    • 限制請求速率
  4. 錯誤處理

    • 不要洩露敏感信息
    • 記錄安全相關錯誤
    • 實現正確的清理
    • 處理 DoS 場景

調試和監控

  1. 日誌記錄

    • 記錄協議事件
    • 跟蹤消息流
    • 監控性能
    • 記錄錯誤
  2. 診斷

    • 實現健康檢查
    • 監控連接狀態
    • 跟蹤資源使用
    • 分析性能
  3. 測試

    • 測試不同的傳輸
    • 驗證錯誤處理
    • 檢查邊界情況
    • 負載測試服務器