전송 계층

모델 컨텍스트 프로토콜(MCP)의 전송 계층은 클라이언트와 서버 간의 통신 기반을 제공합니다. 전송 계층은 메시지가 전송되고 수신되는 기본 메커니즘을 처리합니다.

메시지 형식

MCP는 JSON-RPC 2.0을 전송 형식으로 사용합니다. 전송 계층은 MCP 프로토콜 메시지를 JSON-RPC 형식으로 변환하여 전송하고, 수신된 JSON-RPC 메시지를 다시 MCP 프로토콜 메시지로 변환하는 책임을 집니다.

사용되는 JSON-RPC 메시지에는 세 가지 유형이 있습니다:

요청 (Requests)

{
  jsonrpc: "2.0",
  id: number | string,
  method: string,
  params?: object
}

응답 (Responses)

{
  jsonrpc: "2.0",
  id: number | string,
  result?: object,
  error?: {
    code: number,
    message: string,
    data?: unknown
  }
}

알림 (Notifications)

{
  jsonrpc: "2.0",
  method: string,
  params?: object
}

내장 전송 유형

MCP는 두 가지 표준 전송 구현을 포함합니다:

표준 입출력 (stdio)

stdio 전송은 표준 입출력 스트림을 통한 통신을 가능하게 합니다. 이는 로컬 통합 및 명령줄 도구에 특히 유용합니다.

stdio를 사용하는 경우:

  • 명령줄 도구 구축 시
  • 로컬 통합 구현 시
  • 간단한 프로세스 통신이 필요할 때
  • 셸 스크립트 작업 시
1
2
3
4
5
6
7
8
9
    const server = new Server({
      name: "example-server",
      version: "1.0.0"
    }, {
      capabilities: {}
    });

    const transport = new StdioServerTransport();
    await server.connect(transport);
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    const client = new Client({
      name: "example-client",
      version: "1.0.0"
    }, {
      capabilities: {}
    });

    const transport = new StdioClientTransport({
      command: "./server",
      args: ["--option", "value"]
    });
    await client.connect(transport);
1
2
3
4
5
6
7
8
9

    app = Server("example-server")

    async with stdio_server() as streams:
        await app.run(
            streams[0],
            streams[1],
            app.create_initialization_options()
        )
1
2
3
4
5
6
7
8
    params = StdioServerParameters(
        command="./server",
        args=["--option", "value"]
    )

    async with stdio_client(params) as streams:
        async with ClientSession(streams[0], streams[1]) as session:
            await session.initialize()

서버 전송 이벤트 (SSE)

SSE 전송은 서버에서 클라이언트로의 스트리밍을 가능하게 하며, 클라이언트에서 서버로의 통신을 위해 HTTP POST 요청을 사용합니다.

SSE를 사용하는 경우:

  • 서버에서 클라이언트로의 스트리밍만 필요할 때
  • 제한된 네트워크 환경에서 작업할 때
  • 간단한 업데이트를 구현할 때
import express from "express";
    
const app = express();
    
const server = new Server({
  name: "example-server",
  version: "1.0.0"
}, {
  capabilities: {}
});
    
let transport: SSEServerTransport | null = null;

app.get("/sse", (req, res) => {
  transport = new SSEServerTransport("/messages", res);
  server.connect(transport);
});

app.post("/messages", (req, res) => {
  if (transport) {
    transport.handlePostMessage(req, res);
  }
});

app.listen(3000);
const client = new Client({
  name: "example-client",
  version: "1.0.0"
}, {
  capabilities: {}
});

const transport = new SSEClientTransport(
  new URL("http://localhost:3000/sse")
);
await client.connect(transport);
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route

app = Server("example-server")
sse = SseServerTransport("/messages")

async def handle_sse(scope, receive, send):
    async with sse.connect_sse(scope, receive, send) as streams:
        await app.run(streams[0], streams[1], app.create_initialization_options())

async def handle_messages(scope, receive, send):
    await sse.handle_post_message(scope, receive, send)

starlette_app = Starlette(
    routes=[
        Route("/sse", endpoint=handle_sse),
        Route("/messages", endpoint=handle_messages, methods=["POST"]),
    ]
)
async with sse_client("http://localhost:8000/sse") as streams:
    async with ClientSession(streams[0], streams[1]) as session:
        await session.initialize()

사용자 정의 전송

MCP는 특정 요구사항을 위한 사용자 정의 전송 구현을 쉽게 만들 수 있습니다. 모든 전송 구현은 전송 인터페이스를 준수하기만 하면 됩니다:

다음과 같은 경우에 사용자 정의 전송을 구현할 수 있습니다:

  • 사용자 정의 네트워크 프로토콜
  • 특수화된 통신 채널
  • 기존 시스템과의 통합
  • 성능 최적화
    interface Transport {
      // 메시지 처리 시작
      start(): Promise<void>;

      // JSON-RPC 메시지 전송
      send(message: JSONRPCMessage): Promise<void>;

      // 연결 종료
      close(): Promise<void>;

      // 콜백
      onclose?: () => void;
      onerror?: (error: Error) => void;
      onmessage?: (message: JSONRPCMessage) => void;
    }
MCP 서버가 종종 asyncio로 구현되지만, 더 넓은 호환성을 위해 전송과 같은
저수준 인터페이스는 `anyio`로 구현하는 것을 권장합니다.
```python
@contextmanager
async def create_transport(
    read_stream: MemoryObjectReceiveStream[JSONRPCMessage | Exception],
    write_stream: MemoryObjectSendStream[JSONRPCMessage]
):
    """
    MCP용 전송 인터페이스

    Args:
        read_stream: 수신 메시지를 읽을 스트림
        write_stream: 송신 메시지를 쓸 스트림
    """
    async with anyio.create_task_group() as tg:
        try:
            # 메시지 처리 시작
            tg.start_soon(lambda: process_messages(read_stream))

            # 메시지 전송
            async with write_stream:
                yield write_stream

        except Exception as exc:
            # 오류 처리
            raise exc
        finally:
            # 정리
            tg.cancel_scope.cancel()
            await write_stream.aclose()
            await read_stream.aclose()

오류 처리

전송 구현은 다양한 오류 시나리오를 처리해야 합니다:

  1. 연결 오류
  2. 메시지 파싱 오류
  3. 프로토콜 오류
  4. 네트워크 타임아웃
  5. 리소스 정리

오류 처리 예시:

    class ExampleTransport implements Transport {
      async start() {
        try {
          // 연결 로직
        } catch (error) {
          this.onerror?.(new Error(`연결 실패: ${error}`));
          throw error;
        }
      }

      async send(message: JSONRPCMessage) {
        try {
          // 전송 로직
        } catch (error) {
          this.onerror?.(new Error(`메시지 전송 실패: ${error}`));
          throw error;
        }
      }
    }

MCP 서버가 종종 asyncio로 구현되지만, 더 넓은 호환성을 위해 전송과 같은 저수준 인터페이스는 anyio로 구현하는 것을 권장합니다.

    @contextmanager
    async def example_transport(scope: Scope, receive: Receive, send: Send):
        try:
            # 양방향 통신을 위한 스트림 생성
            read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
            write_stream, write_stream_reader = anyio.create_memory_object_stream(0)

            async def message_handler():
                try:
                    async with read_stream_writer:
                        # 메시지 처리 로직
                        pass
                except Exception as exc:
                    logger.error(f"메시지 처리 실패: {exc}")
                    raise exc

            async with anyio.create_task_group() as tg:
                tg.start_soon(message_handler)
                try:
                    # 통신을 위한 스트림 제공
                    yield read_stream, write_stream
                except Exception as exc:
                    logger.error(f"전송 오류: {exc}")
                    raise exc
                finally:
                    tg.cancel_scope.cancel()
                    await write_stream.aclose()
                    await read_stream.aclose()
        except Exception as exc:
            logger.error(f"전송 초기화 실패: {exc}")
            raise exc

모범 사례

MCP 전송을 구현하거나 사용할 때:

  1. 연결 수명 주기를 적절하게 처리
  2. 적절한 오류 처리 구현
  3. 연결 종료 시 리소스 정리
  4. 적절한 타임아웃 사용
  5. 전송 전 메시지 검증
  6. 디버깅을 위한 전송 이벤트 로깅
  7. 적절한 경우 재연결 로직 구현
  8. 메시지 큐의 백프레셔 처리
  9. 연결 상태 모니터링
  10. 적절한 보안 조치 구현

보안 고려사항

전송을 구현할 때:

인증 및 권한 부여

  • 적절한 인증 메커니즘 구현
  • 클라이언트 자격 증명 검증
  • 보안 토큰 처리 사용
  • 권한 부여 검사 구현

데이터 보안

  • 네트워크 전송을 위해 TLS 사용
  • 민감한 데이터 암호화
  • 메시지 무결성 검증
  • 메시지 크기 제한 구현
  • 입력 데이터 정화

네트워크 보안

  • 속도 제한 구현
  • 적절한 타임아웃 사용
  • 서비스 거부 시나리오 처리
  • 비정상적인 패턴 모니터링
  • 적절한 방화벽 규칙 구현

전송 디버깅

전송 문제 디버깅을 위한 팁:

  1. 디버그 로깅 활성화
  2. 메시지 흐름 모니터링
  3. 연결 상태 확인
  4. 메시지 형식 검증
  5. 오류 시나리오 테스트
  6. 네트워크 분석 도구 사용
  7. 상태 확인 구현
  8. 리소스 사용량 모니터링
  9. 엣지 케이스 테스트
  10. 적절한 오류 추적 사용