消費 Registry 數據
消費 Registry 數據
通過 REST API 構建消費 MCP Registry 數據的應用程序的集成模式和最佳實踐。
基本信息
基礎 URL: https://registry.modelcontextprotocol.io
身份驗證: 只讀訪問無需身份驗證
核心端點:
GET /v0/servers- 分頁列出所有服務器GET /v0/servers/{id}- 按 UUID 獲取服務器詳情
查看 交互式 API 文檔 瞭解完整的請求/響應模式。
快速開始
基本服務器列表
# 獲取前 10 個服務器
curl "https://registry.modelcontextprotocol.io/v0/servers?limit=10"{
"servers": [
{
"name": "io.modelcontextprotocol/filesystem",
"description": "文件系統操作服務器",
"status": "active",
"version": "1.0.2"
}
],
"metadata": {
"count": 10,
"next_cursor": "eyJ..."
}
}搜索服務器
# 搜索特定功能
curl "https://registry.modelcontextprotocol.io/v0/servers?search=filesystem"
# 按狀態過濾
curl "https://registry.modelcontextprotocol.io/v0/servers?status=active"
# 組合查詢
curl "https://registry.modelcontextprotocol.io/v0/servers?search=weather&limit=5"獲取服務器詳情
# 獲取特定服務器的完整信息
curl "https://registry.modelcontextprotocol.io/v0/servers/{server-uuid}"構建子註冊中心
創建增強的註冊中心 - ETL 官方 Registry 數據並添加您自己的元數據,如評級、安全掃描或兼容性信息。
ETL 流程
目前我們建議定期抓取 GET /v0/servers 端點。未來我們可能會提供 updated_at 過濾器(#291)以僅獲取最近更改的服務器。
import requests
import time
from datetime import datetime
class RegistryETL:
def __init__(self, base_url="https://registry.modelcontextprotocol.io"):
self.base_url = base_url
def fetch_all_servers(self):
"""獲取所有服務器,處理分頁"""
servers = []
cursor = None
while True:
params = {"limit": 100}
if cursor:
params["cursor"] = cursor
response = requests.get(f"{self.base_url}/v0/servers", params=params)
response.raise_for_status()
data = response.json()
servers.extend(data["servers"])
# 檢查是否有更多頁面
if not data.get("metadata", {}).get("next_cursor"):
break
cursor = data["metadata"]["next_cursor"]
return servers
def process_servers(self, servers):
"""處理服務器數據並添加增強信息"""
enhanced_servers = []
for server in servers:
# 跳過非活躍服務器
if server.get("status") != "active":
continue
# 添加自定義元數據
enhanced_server = {
**server,
"_meta": {
**server.get("_meta", {}),
"com.yourregistry/enhanced": {
"last_processed": datetime.utcnow().isoformat(),
"popularity_score": self.calculate_popularity(server),
"security_scan": self.perform_security_scan(server)
}
}
}
enhanced_servers.append(enhanced_server)
return enhanced_servers
def calculate_popularity(self, server):
"""計算服務器受歡迎程度(示例邏輯)"""
# 實現您的評分邏輯
base_score = 1.0
# 基於描述質量調整
if len(server.get("description", "")) > 100:
base_score += 0.2
# 基於包數量調整
package_count = len(server.get("packages", []))
if package_count > 1:
base_score += 0.1 * package_count
return min(base_score, 5.0)
def perform_security_scan(self, server):
"""執行安全掃描(示例)"""
return {
"last_scanned": datetime.utcnow().isoformat(),
"vulnerabilities_found": 0,
"scan_status": "passed"
}
# 使用示例
etl = RegistryETL()
servers = etl.fetch_all_servers()
enhanced_servers = etl.process_servers(servers)狀態管理
服務器通常是不可變的,除了 status 字段可以更新為 deleted(以及其他狀態)。對於這些包,我們建議您也快速將狀態字段更新為 deleted 或從您的註冊中心中刪除包。這是因為此狀態通常表明它違反了我們寬鬆的 審核指南,表明它是非法的、惡意軟件或垃圾郵件。
def sync_server_status(self, local_server, remote_server):
"""同步服務器狀態變更"""
if remote_server["status"] == "deleted":
# 立即從本地註冊中心刪除或標記
self.mark_server_deleted(local_server["id"])
elif remote_server["status"] != local_server.get("status"):
# 更新其他狀態變更
self.update_server_status(
local_server["id"],
remote_server["status"]
)過濾和增強
官方 Registry 有 寬鬆的審核政策,因此您可能希望在 Registry 數據基礎上實施自己的過濾。
def apply_quality_filters(self, servers):
"""應用質量過濾器"""
filtered_servers = []
for server in servers:
# 基本質量檢查
if not self.meets_quality_standards(server):
continue
# 安全檢查
if not self.passes_security_scan(server):
continue
# 許可證檢查
if not self.has_acceptable_license(server):
continue
filtered_servers.append(server)
return filtered_servers
def enhance_with_metadata(self, server):
"""為服務器添加增強元數據"""
enhanced = {
**server,
"_meta": {
**server.get("_meta", {}),
"com.yourregistry/enhanced": {
"user_rating": self.get_user_rating(server["name"]),
"download_count": self.get_download_count(server["name"]),
"last_updated": self.get_last_update_time(server["name"]),
"tags": self.generate_tags(server),
"compatibility": self.check_compatibility(server)
}
}
}
return enhanced提供 API
我們建議您的子註冊中心提供符合 Registry API 規範的 API,以便客戶端可以輕鬆在註冊中心之間切換。參見 Registry API 文檔 瞭解詳情。
from flask import Flask, jsonify, request
app = Flask(__name__)
@app.route('/v0/servers')
def list_servers():
"""實現與官方 Registry 兼容的服務器列表端點"""
# 解析查詢參數
limit = min(int(request.args.get('limit', 20)), 100)
cursor = request.args.get('cursor')
search = request.args.get('search')
status = request.args.get('status', 'active')
# 應用過濾器
servers = self.get_filtered_servers(
limit=limit,
cursor=cursor,
search=search,
status=status
)
# 計算下一頁遊標
next_cursor = None
if len(servers) == limit:
next_cursor = self.generate_cursor(servers[-1])
return jsonify({
"servers": servers,
"metadata": {
"count": len(servers),
"next_cursor": next_cursor
}
})
@app.route('/v0/servers/<server_id>')
def get_server(server_id):
"""獲取特定服務器詳情"""
server = self.get_server_by_id(server_id)
if not server:
return jsonify({"error": "Server not found"}), 404
return jsonify(server)MCP 客戶端集成
將 Registry 數據轉換為客戶端配置 - 獲取服務器並將包信息轉換為您的 MCP 客戶端的配置格式。
基本集成
import requests
from typing import List, Dict, Any
class MCPRegistryClient:
def __init__(self, registry_url="https://registry.modelcontextprotocol.io"):
self.registry_url = registry_url
def discover_servers(self, search_query: str = None) -> List[Dict[str, Any]]:
"""發現可用的 MCP 服務器"""
params = {"status": "active", "limit": 50}
if search_query:
params["search"] = search_query
response = requests.get(f"{self.registry_url}/v0/servers", params=params)
response.raise_for_status()
return response.json()["servers"]
def get_server_details(self, server_id: str) -> Dict[str, Any]:
"""獲取服務器詳細信息"""
response = requests.get(f"{self.registry_url}/v0/servers/{server_id}")
response.raise_for_status()
return response.json()
def convert_to_client_config(self, server: Dict[str, Any]) -> Dict[str, Any]:
"""將 Registry 服務器信息轉換為客戶端配置"""
if "packages" in server:
return self.convert_package_server(server)
elif "remotes" in server:
return self.convert_remote_server(server)
else:
raise ValueError(f"Server {server['name']} has no packages or remotes")
def convert_package_server(self, server: Dict[str, Any]) -> Dict[str, Any]:
"""轉換包類型服務器"""
# 選擇首選包(例如優先選擇 npm)
package = self.select_preferred_package(server["packages"])
config = {
"name": server["name"],
"description": server["description"],
"type": "package",
"package": {
"registry": package["registry_type"],
"identifier": package["identifier"],
"version": package["version"]
}
}
# 添加運行時參數
if "package_arguments" in package:
config["package"]["arguments"] = package["package_arguments"]
if "environment_variables" in package:
config["package"]["environment"] = package["environment_variables"]
return config
def convert_remote_server(self, server: Dict[str, Any]) -> Dict[str, Any]:
"""轉換遠程類型服務器"""
# 選擇首選傳輸(例如優先選擇 SSE)
remote = self.select_preferred_remote(server["remotes"])
config = {
"name": server["name"],
"description": server["description"],
"type": "remote",
"remote": {
"transport": remote["type"],
"url": remote["url"]
}
}
# 添加頭部
if "headers" in remote:
config["remote"]["headers"] = remote["headers"]
return config
def select_preferred_package(self, packages: List[Dict[str, Any]]) -> Dict[str, Any]:
"""選擇首選包類型"""
# 優先級順序
priority_order = ["npm", "pypi", "nuget", "oci", "mcpb"]
for registry_type in priority_order:
for package in packages:
if package["registry_type"] == registry_type:
return package
# 如果沒有找到首選類型,返回第一個
return packages[0]
def select_preferred_remote(self, remotes: List[Dict[str, Any]]) -> Dict[str, Any]:
"""選擇首選遠程傳輸"""
# 優先選擇 SSE
for remote in remotes:
if remote["type"] == "sse":
return remote
# 回退到第一個可用的
return remotes[0]
# 使用示例
client = MCPRegistryClient()
# 發現服務器
servers = client.discover_servers("filesystem")
# 轉換為客戶端配置
configs = []
for server in servers:
try:
config = client.convert_to_client_config(server)
configs.append(config)
except ValueError as e:
print(f"跳過服務器 {server['name']}: {e}")
print(f"找到 {len(configs)} 個可用服務器")高級過濾
我們強烈建議使用子註冊中心而不是直接從官方 Registry 獲取數據。您可能希望使其可配置,以便客戶端用戶可以選擇他們首選的註冊中心,例如我們預期一些企業用戶可能有自己的註冊中心。
class AdvancedMCPClient:
def __init__(self, preferred_registries=None):
self.preferred_registries = preferred_registries or [
"https://registry.modelcontextprotocol.io",
"https://enterprise.registry.com/api/v0",
"https://curated.mcpregistry.org/api/v0"
]
def discover_servers_multi_registry(self, query: str) -> List[Dict[str, Any]]:
"""從多個註冊中心發現服務器"""
all_servers = []
for registry_url in self.preferred_registries:
try:
client = MCPRegistryClient(registry_url)
servers = client.discover_servers(query)
# 添加來源信息
for server in servers:
server["_source_registry"] = registry_url
all_servers.extend(servers)
except Exception as e:
print(f"從 {registry_url} 獲取失敗: {e}")
continue
# 去重(基於服務器名稱)
seen = set()
unique_servers = []
for server in all_servers:
if server["name"] not in seen:
seen.add(server["name"])
unique_servers.append(server)
return unique_servers
def filter_by_criteria(self, servers: List[Dict[str, Any]], criteria: Dict[str, Any]) -> List[Dict[str, Any]]:
"""根據標準過濾服務器"""
filtered = []
for server in servers:
# 狀態過濾
if criteria.get("status") and server.get("status") != criteria["status"]:
continue
# 包類型過濾
if criteria.get("package_types"):
server_types = {pkg["registry_type"] for pkg in server.get("packages", [])}
if not server_types.intersection(set(criteria["package_types"])):
continue
# 評分過濾
if criteria.get("min_rating"):
rating = server.get("_meta", {}).get("user_rating", 0)
if rating < criteria["min_rating"]:
continue
filtered.append(server)
return filtered您的客戶端應該優雅地處理滿足最低規範的註冊中心,即避免對 _meta 字段的硬依賴。
運行服務器
您可以使用 packages 或 remotes 字段來確定如何運行服務器。這些字段的更多詳細信息在 server.json 文檔 中。
def execute_server(self, config: Dict[str, Any]) -> subprocess.Popen:
"""執行 MCP 服務器"""
if config["type"] == "package":
return self.execute_package_server(config)
elif config["type"] == "remote":
return self.connect_remote_server(config)
else:
raise ValueError(f"不支持的服務器類型: {config['type']}")
def execute_package_server(self, config: Dict[str, Any]) -> subprocess.Popen:
"""執行包類型服務器"""
package = config["package"]
# 構建命令
if package["registry"] == "npm":
cmd = ["npx", package["identifier"]]
elif package["registry"] == "pypi":
cmd = ["uvx" if "runtime_hint" in package else "python", "-m", package["identifier"]]
elif package["registry"] == "oci":
cmd = ["docker", "run", package["identifier"]]
else:
raise ValueError(f"不支持的包註冊中心: {package['registry']}")
# 添加參數
if "arguments" in package:
for arg in package["arguments"]:
if arg["type"] == "positional":
cmd.append(arg["value"])
elif arg["type"] == "named":
cmd.extend([arg["name"], arg["value"]])
# 設置環境變量
env = os.environ.copy()
if "environment" in package:
for env_var in package["environment"]:
if env_var.get("is_required") and env_var["name"] not in env:
raise ValueError(f"缺少必需的環境變量: {env_var['name']}")
env[env_var["name"]] = env.get(env_var["name"], env_var.get("default", ""))
return subprocess.Popen(cmd, env=env, stdin=subprocess.PIPE, stdout=subprocess.PIPE)最佳實踐
1. 緩存策略
import redis
import json
from datetime import timedelta
class CachedRegistryClient:
def __init__(self, redis_client=None, cache_ttl=3600):
self.redis = redis_client or redis.Redis()
self.cache_ttl = cache_ttl
def get_servers_cached(self, **params):
"""帶緩存的服務器獲取"""
cache_key = f"registry:servers:{hash(frozenset(params.items()))}"
# 嘗試從緩存獲取
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
# 從 Registry 獲取
servers = self.fetch_servers(**params)
# 緩存結果
self.redis.setex(
cache_key,
timedelta(seconds=self.cache_ttl),
json.dumps(servers)
)
return servers2. 錯誤處理
import time
import random
class RobustRegistryClient:
def __init__(self, max_retries=3, base_delay=1):
self.max_retries = max_retries
self.base_delay = base_delay
def fetch_with_retry(self, url, **kwargs):
"""帶重試的請求"""
for attempt in range(self.max_retries):
try:
response = requests.get(url, timeout=30, **kwargs)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
if attempt == self.max_retries - 1:
raise
# 指數退避
delay = self.base_delay * (2 ** attempt) + random.uniform(0, 1)
time.sleep(delay)3. 增量更新
class IncrementalRegistrySync:
def __init__(self, last_sync_file="last_sync.txt"):
self.last_sync_file = last_sync_file
def get_last_sync_time(self):
"""獲取上次同步時間"""
try:
with open(self.last_sync_file, 'r') as f:
return datetime.fromisoformat(f.read().strip())
except FileNotFoundError:
return datetime.min
def save_sync_time(self, sync_time):
"""保存同步時間"""
with open(self.last_sync_file, 'w') as f:
f.write(sync_time.isoformat())
def sync_changes(self):
"""同步變更(一旦 Registry 支持 updated_at 過濾)"""
last_sync = self.get_last_sync_time()
# 將來的 API 調用
# servers = self.fetch_servers(updated_after=last_sync)
# 目前獲取所有服務器並在本地過濾
all_servers = self.fetch_all_servers()
self.save_sync_time(datetime.utcnow())
return all_servers