전송 계층
모델 컨텍스트 프로토콜(MCP)의 전송 계층은 클라이언트와 서버 간의 통신 기반을 제공합니다. 전송 계층은 메시지가 전송되고 수신되는 기본 메커니즘을 처리합니다.
메시지 형식
MCP는 JSON-RPC 2.0을 전송 형식으로 사용합니다. 전송 계층은 MCP 프로토콜 메시지를 JSON-RPC 형식으로 변환하여 전송하고, 수신된 JSON-RPC 메시지를 다시 MCP 프로토콜 메시지로 변환하는 책임을 집니다.
사용되는 JSON-RPC 메시지에는 세 가지 유형이 있습니다:
요청 (Requests)
{
jsonrpc: "2.0",
id: number | string,
method: string,
params?: object
}응답 (Responses)
{
jsonrpc: "2.0",
id: number | string,
result?: object,
error?: {
code: number,
message: string,
data?: unknown
}
}알림 (Notifications)
{
jsonrpc: "2.0",
method: string,
params?: object
}내장 전송 유형
MCP는 두 가지 표준 전송 구현을 포함합니다:
표준 입출력 (stdio)
stdio 전송은 표준 입출력 스트림을 통한 통신을 가능하게 합니다. 이는 로컬 통합 및 명령줄 도구에 특히 유용합니다.
stdio를 사용하는 경우:
- 명령줄 도구 구축 시
- 로컬 통합 구현 시
- 간단한 프로세스 통신이 필요할 때
- 셸 스크립트 작업 시
| |
| |
| |
| |
서버 전송 이벤트 (SSE)
SSE 전송은 서버에서 클라이언트로의 스트리밍을 가능하게 하며, 클라이언트에서 서버로의 통신을 위해 HTTP POST 요청을 사용합니다.
SSE를 사용하는 경우:
- 서버에서 클라이언트로의 스트리밍만 필요할 때
- 제한된 네트워크 환경에서 작업할 때
- 간단한 업데이트를 구현할 때
import express from "express";
const app = express();
const server = new Server({
name: "example-server",
version: "1.0.0"
}, {
capabilities: {}
});
let transport: SSEServerTransport | null = null;
app.get("/sse", (req, res) => {
transport = new SSEServerTransport("/messages", res);
server.connect(transport);
});
app.post("/messages", (req, res) => {
if (transport) {
transport.handlePostMessage(req, res);
}
});
app.listen(3000);const client = new Client({
name: "example-client",
version: "1.0.0"
}, {
capabilities: {}
});
const transport = new SSEClientTransport(
new URL("http://localhost:3000/sse")
);
await client.connect(transport);from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route
app = Server("example-server")
sse = SseServerTransport("/messages")
async def handle_sse(scope, receive, send):
async with sse.connect_sse(scope, receive, send) as streams:
await app.run(streams[0], streams[1], app.create_initialization_options())
async def handle_messages(scope, receive, send):
await sse.handle_post_message(scope, receive, send)
starlette_app = Starlette(
routes=[
Route("/sse", endpoint=handle_sse),
Route("/messages", endpoint=handle_messages, methods=["POST"]),
]
)async with sse_client("http://localhost:8000/sse") as streams:
async with ClientSession(streams[0], streams[1]) as session:
await session.initialize()사용자 정의 전송
MCP는 특정 요구사항을 위한 사용자 정의 전송 구현을 쉽게 만들 수 있습니다. 모든 전송 구현은 전송 인터페이스를 준수하기만 하면 됩니다:
다음과 같은 경우에 사용자 정의 전송을 구현할 수 있습니다:
- 사용자 정의 네트워크 프로토콜
- 특수화된 통신 채널
- 기존 시스템과의 통합
- 성능 최적화
interface Transport {
// 메시지 처리 시작
start(): Promise<void>;
// JSON-RPC 메시지 전송
send(message: JSONRPCMessage): Promise<void>;
// 연결 종료
close(): Promise<void>;
// 콜백
onclose?: () => void;
onerror?: (error: Error) => void;
onmessage?: (message: JSONRPCMessage) => void;
}MCP 서버가 종종 asyncio로 구현되지만, 더 넓은 호환성을 위해 전송과 같은
저수준 인터페이스는 `anyio`로 구현하는 것을 권장합니다.
```python
@contextmanager
async def create_transport(
read_stream: MemoryObjectReceiveStream[JSONRPCMessage | Exception],
write_stream: MemoryObjectSendStream[JSONRPCMessage]
):
"""
MCP용 전송 인터페이스
Args:
read_stream: 수신 메시지를 읽을 스트림
write_stream: 송신 메시지를 쓸 스트림
"""
async with anyio.create_task_group() as tg:
try:
# 메시지 처리 시작
tg.start_soon(lambda: process_messages(read_stream))
# 메시지 전송
async with write_stream:
yield write_stream
except Exception as exc:
# 오류 처리
raise exc
finally:
# 정리
tg.cancel_scope.cancel()
await write_stream.aclose()
await read_stream.aclose()
오류 처리
전송 구현은 다양한 오류 시나리오를 처리해야 합니다:
- 연결 오류
- 메시지 파싱 오류
- 프로토콜 오류
- 네트워크 타임아웃
- 리소스 정리
오류 처리 예시:
class ExampleTransport implements Transport {
async start() {
try {
// 연결 로직
} catch (error) {
this.onerror?.(new Error(`연결 실패: ${error}`));
throw error;
}
}
async send(message: JSONRPCMessage) {
try {
// 전송 로직
} catch (error) {
this.onerror?.(new Error(`메시지 전송 실패: ${error}`));
throw error;
}
}
}MCP 서버가 종종 asyncio로 구현되지만, 더 넓은 호환성을 위해 전송과 같은
저수준 인터페이스는 anyio로 구현하는 것을 권장합니다.
@contextmanager
async def example_transport(scope: Scope, receive: Receive, send: Send):
try:
# 양방향 통신을 위한 스트림 생성
read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)
async def message_handler():
try:
async with read_stream_writer:
# 메시지 처리 로직
pass
except Exception as exc:
logger.error(f"메시지 처리 실패: {exc}")
raise exc
async with anyio.create_task_group() as tg:
tg.start_soon(message_handler)
try:
# 통신을 위한 스트림 제공
yield read_stream, write_stream
except Exception as exc:
logger.error(f"전송 오류: {exc}")
raise exc
finally:
tg.cancel_scope.cancel()
await write_stream.aclose()
await read_stream.aclose()
except Exception as exc:
logger.error(f"전송 초기화 실패: {exc}")
raise exc모범 사례
MCP 전송을 구현하거나 사용할 때:
- 연결 수명 주기를 적절하게 처리
- 적절한 오류 처리 구현
- 연결 종료 시 리소스 정리
- 적절한 타임아웃 사용
- 전송 전 메시지 검증
- 디버깅을 위한 전송 이벤트 로깅
- 적절한 경우 재연결 로직 구현
- 메시지 큐의 백프레셔 처리
- 연결 상태 모니터링
- 적절한 보안 조치 구현
보안 고려사항
전송을 구현할 때:
인증 및 권한 부여
- 적절한 인증 메커니즘 구현
- 클라이언트 자격 증명 검증
- 보안 토큰 처리 사용
- 권한 부여 검사 구현
데이터 보안
- 네트워크 전송을 위해 TLS 사용
- 민감한 데이터 암호화
- 메시지 무결성 검증
- 메시지 크기 제한 구현
- 입력 데이터 정화
네트워크 보안
- 속도 제한 구현
- 적절한 타임아웃 사용
- 서비스 거부 시나리오 처리
- 비정상적인 패턴 모니터링
- 적절한 방화벽 규칙 구현
전송 디버깅
전송 문제 디버깅을 위한 팁:
- 디버그 로깅 활성화
- 메시지 흐름 모니터링
- 연결 상태 확인
- 메시지 형식 검증
- 오류 시나리오 테스트
- 네트워크 분석 도구 사용
- 상태 확인 구현
- 리소스 사용량 모니터링
- 엣지 케이스 테스트
- 적절한 오류 추적 사용