MCP指南

MCP指南

笔记整理来源博主

博主示例代码Github

MCP简要介绍

省流:MCP就是让模型感知外部环境(周围有哪些函数可以调用,从而获取外界的信息)的一个协议

MCP 全称 模型上下文协议(Model Context Protocol),由 Anthropic 在 2024 年 11 月推出,是个开源通信标准。简单说,它给 AI 装了个“超级网线”,让 AI 能跟外部工具、数据、系统无缝对接。

大模型本身只会问答,并不会使用外部工具

  • 比喻:AI 是个聪明但宅家的书呆子,MCP 就是它的“外卖员”,能帮它拿数据、干活儿。
  • 目标:让 AI 不只聊天,还能真动手,比如查数据库、发邮件、写代码。

MCP的用途

MCP Host:它本质上就是一个支持MCP协议的软件,常见的MCP Host包括Claude Desktop、cursor、Cline、Cherry Studio等等。(以下皆以Cline为例,配置好PlanModeActMode中的API ProviderAPI KeyModel

MCP Server和Tool:MCP Server和传统的Server并没有太大的关系,传统Server访问需要联网,而MCP Server它就是一个程序而已,只不过这个程序执行是符合MCP协议的,大部分MCP Server都是本地通过Node或者是Python启动的(对应的启动程序一般是uvx或者是npx),使用过程中可能会联网,也可能不会联网。比喻成手机应用更加合适,不管是MCP Server还是手机应用,都内置了一些功能模块,来解决问题。这些模块在MCP领域的专业名词叫做Tool,一个Tool其实就是编程语言里面的一个函数。


  • MCP Server一共有两种沟通方式:
    • stdio:MCP Server会使用它的标准输入和标准输出来与Cline(MCP Host)沟通,目前大部分都是MCP Server都是这种类型。
    • sse:Server-Send Events,

MCP交互流程

MCP协议规定的内容仅限于MCP ServerMCP Host这两个环节的交互,并没有规定如何与模型进行交互

MCP协议主要规定了两部分的内容:函数的注册和使用,这里并没有模型参与,即使脱离大模型本身也是可以使用的

  1. 每一个MCP Server有哪些函数可以用(还有一些MCP Server内部还有哪些资源可以使用)
  2. 如何调用这些函数

实际上不同的MCP Host与模型的交互是会有很大的差异的,Cline是使用xml进行沟通,而Cherry Studio是使用Function Calling

构建一个MCP Server

官方weather MCP Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from typing import Any
import httpx
from mcp.server.fastmcp import FastMCP

# Initialize FastMCP server
mcp = FastMCP("weather", log_level="ERROR")

# Constants
NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"


async def make_nws_request(url: str) -> dict[str, Any] | None:
"""Make a request to the NWS API with proper error handling."""
headers = {
"User-Agent": USER_AGENT,
"Accept": "application/geo+json"
}
async with httpx.AsyncClient() as client:
try:
response = await client.get(url, headers=headers, timeout=30.0)
response.raise_for_status()
return response.json()
except Exception:
return None

def format_alert(feature: dict) -> str:
"""Format an alert feature into a readable string."""
props = feature["properties"]
return f"""
Event: {props.get('event', 'Unknown')}
Area: {props.get('areaDesc', 'Unknown')}
Severity: {props.get('severity', 'Unknown')}
Description: {props.get('description', 'No description available')}
Instructions: {props.get('instruction', 'No specific instructions provided')}
"""

# @mcp.tool():他会从函数的注释里面提取这个函数的用途,以及每个参数的含义,以便模型决定调用这个函数的最佳时机;最终都会转化为tool的信息,并在实际调用中传给模型,帮助模型做决策
@mcp.tool()
async def get_alerts(state: str) -> str:
"""Get weather alerts for a US state.

Args:
state: Two-letter US state code (e.g. CA, NY)
"""
url = f"{NWS_API_BASE}/alerts/active/area/{state}"
data = await make_nws_request(url)

if not data or "features" not in data:
return "Unable to fetch alerts or no alerts found."

if not data["features"]:
return "No active alerts for this state."

alerts = [format_alert(feature) for feature in data["features"]]
return "\n---\n".join(alerts)

@mcp.tool()
async def get_forecast(latitude: float, longitude: float) -> str:
"""Get weather forecast for a location.

Args:
latitude: Latitude of the location
longitude: Longitude of the location
"""
# First get the forecast grid endpoint
points_url = f"{NWS_API_BASE}/points/{latitude},{longitude}"
points_data = await make_nws_request(points_url)

if not points_data:
return "Unable to fetch forecast data for this location."

# Get the forecast URL from the points response
forecast_url = points_data["properties"]["forecast"]
forecast_data = await make_nws_request(forecast_url)

if not forecast_data:
return "Unable to fetch detailed forecast."

# Format the periods into a readable forecast
periods = forecast_data["properties"]["periods"]
forecasts = []
for period in periods[:5]: # Only show next 5 periods
forecast = f"""
{period['name']}:
Temperature: {period['temperature']}°{period['temperatureUnit']}
Wind: {period['windSpeed']} {period['windDirection']}
Forecast: {period['detailedForecast']}
"""
forecasts.append(forecast)

return "\n---\n".join(forecasts)

if __name__ == "__main__":
# Initialize and run the server
mcp.run(transport='stdio') # transport: 表示mcp server与cline的沟通方式

配置Cline的MCP Server配置

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"mcpServers": {
"weather": {
"command": "C:\\Users\\Lesh\\.local\\bin\\uv.exe",
"args": [
"--directory",
"D:/Development/Workspace/mcp_server/weather",
"run",
"weather.py"
]
}
}
}

底层协议分析的原理与方法

mcp与cline之间的日志记录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
#!/usr/bin/env python3

import sys
import subprocess
import threading
import argparse
import os

# --- Configuration ---
LOG_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)), "mcp_io.log")
# --- End Configuration ---

# --- Argument Parsing ---
parser = argparse.ArgumentParser(
description="Wrap a command, passing STDIN/STDOUT verbatim while logging them.",
usage="%(prog)s <command> [args...]"
)
# Capture the command and all subsequent arguments
parser.add_argument('command', nargs=argparse.REMAINDER,
help='The command and its arguments to execute.')

open(LOG_FILE, 'w', encoding='utf-8')

if len(sys.argv) == 1:
parser.print_help(sys.stderr)
sys.exit(1)

args = parser.parse_args()

if not args.command:
print("Error: No command provided.", file=sys.stderr)
parser.print_help(sys.stderr)
sys.exit(1)

target_command = args.command
# --- End Argument Parsing ---

# --- I/O Forwarding Functions ---
# These will run in separate threads

def forward_and_log_stdin(proxy_stdin, target_stdin, log_file):
"""Reads from proxy's stdin, logs it, writes to target's stdin."""
try:
while True:
# Read line by line from the script's actual stdin
line_bytes = proxy_stdin.readline()
if not line_bytes: # EOF reached
break

# Decode for logging (assuming UTF-8, adjust if needed)
try:
line_str = line_bytes.decode('utf-8')
except UnicodeDecodeError:
line_str = f"[Non-UTF8 data, {len(line_bytes)} bytes]\n" # Log representation

# Log with prefix
log_file.write(f"输入: {line_str}")
log_file.flush() # Ensure log is written promptly

# Write the original bytes to the target process's stdin
target_stdin.write(line_bytes)
target_stdin.flush() # Ensure target receives it promptly

except Exception as e:
# Log errors happening during forwarding
try:
log_file.write(f"!!! STDIN Forwarding Error: {e}\n")
log_file.flush()
except: pass # Avoid errors trying to log errors if log file is broken

finally:
# Important: Close the target's stdin when proxy's stdin closes
# This signals EOF to the target process (like test.sh's read loop)
try:
target_stdin.close()
log_file.write("--- STDIN stream closed to target ---\n")
log_file.flush()
except Exception as e:
try:
log_file.write(f"!!! Error closing target STDIN: {e}\n")
log_file.flush()
except: pass


def forward_and_log_stdout(target_stdout, proxy_stdout, log_file):
"""Reads from target's stdout, logs it, writes to proxy's stdout."""
try:
while True:
# Read line by line from the target process's stdout
line_bytes = target_stdout.readline()
if not line_bytes: # EOF reached (process exited or closed stdout)
break

# Decode for logging
try:
line_str = line_bytes.decode('utf-8')
except UnicodeDecodeError:
line_str = f"[Non-UTF8 data, {len(line_bytes)} bytes]\n"

# Log with prefix
log_file.write(f"输出: {line_str}")
log_file.flush()

# Write the original bytes to the script's actual stdout
proxy_stdout.write(line_bytes)
proxy_stdout.flush() # Ensure output is seen promptly

except Exception as e:
try:
log_file.write(f"!!! STDOUT Forwarding Error: {e}\n")
log_file.flush()
except: pass
finally:
try:
log_file.flush()
except: pass
# Don't close proxy_stdout (sys.stdout) here

# --- Main Execution ---
process = None
log_f = None
exit_code = 1 # Default exit code in case of early failure

try:
# Open log file in append mode ('a') for the threads
log_f = open(LOG_FILE, 'a', encoding='utf-8')

# Start the target process
# We use pipes for stdin/stdout
# We work with bytes (bufsize=0 for unbuffered binary, readline() still works)
# stderr=subprocess.PIPE could be added to capture stderr too if needed.
process = subprocess.Popen(
target_command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE, # Capture stderr too, good practice
bufsize=0 # Use 0 for unbuffered binary I/O
)

# Pass binary streams to threads
stdin_thread = threading.Thread(
target=forward_and_log_stdin,
args=(sys.stdin.buffer, process.stdin, log_f),
daemon=True # Allows main thread to exit even if this is stuck (e.g., waiting on stdin) - reconsider if explicit join is needed
)

stdout_thread = threading.Thread(
target=forward_and_log_stdout,
args=(process.stdout, sys.stdout.buffer, log_f),
daemon=True
)

# Optional: Handle stderr similarly (log and pass through)
stderr_thread = threading.Thread(
target=forward_and_log_stdout, # Can reuse the function
args=(process.stderr, sys.stderr.buffer, log_f), # Pass stderr streams
# Add a different prefix in the function if needed, or modify function
# For now, it will log with "STDOUT:" prefix - might want to change function
# Let's modify the function slightly for this
daemon=True
)
# A slightly modified version for stderr logging
def forward_and_log_stderr(target_stderr, proxy_stderr, log_file):
"""Reads from target's stderr, logs it, writes to proxy's stderr."""
try:
while True:
line_bytes = target_stderr.readline()
if not line_bytes: break
try: line_str = line_bytes.decode('utf-8')
except UnicodeDecodeError: line_str = f"[Non-UTF8 data, {len(line_bytes)} bytes]\n"
log_file.write(f"STDERR: {line_str}") # Use STDERR prefix
log_file.flush()
proxy_stderr.write(line_bytes)
proxy_stderr.flush()
except Exception as e:
try:
log_file.write(f"!!! STDERR Forwarding Error: {e}\n")
log_file.flush()
except: pass
finally:
try:
log_file.flush()
except: pass

stderr_thread = threading.Thread(
target=forward_and_log_stderr,
args=(process.stderr, sys.stderr.buffer, log_f),
daemon=True
)


# Start the forwarding threads
stdin_thread.start()
stdout_thread.start()
stderr_thread.start() # Start stderr thread too

# Wait for the target process to complete
process.wait()
exit_code = process.returncode

# Wait briefly for I/O threads to finish flushing last messages
# Since they are daemons, they might exit abruptly with the main thread.
# Joining them ensures cleaner shutdown and logging.
# We need to make sure the pipes are closed so the reads terminate.
# process.wait() ensures target process is dead, pipes should close naturally.
stdin_thread.join(timeout=1.0) # Add timeout in case thread hangs
stdout_thread.join(timeout=1.0)
stderr_thread.join(timeout=1.0)


except Exception as e:
print(f"MCP Logger Error: {e}", file=sys.stderr)
# Try to log the error too
if log_f and not log_f.closed:
try:
log_f.write(f"!!! MCP Logger Main Error: {e}\n")
log_f.flush()
except: pass # Ignore errors during final logging attempt
exit_code = 1 # Indicate logger failure

finally:
# Ensure the process is terminated if it's still running (e.g., if logger crashed)
if process and process.poll() is None:
try:
process.terminate()
process.wait(timeout=1.0) # Give it a moment to terminate
except: pass # Ignore errors during cleanup
if process.poll() is None: # Still running?
try: process.kill() # Force kill
except: pass # Ignore kill errors

# Final log message
if log_f and not log_f.closed:
try:
log_f.close()
except: pass # Ignore errors during final logging attempt

# Exit with the target process's exit code
sys.exit(exit_code)

重新修改配置,使得日志截取生效

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"mcpServers": {
"weather": {
"command": "python",
"args": [
"D:\\Development\\Workspace\\mcp_server\\weather\\mcp_logger.py",
"C:\\Users\\Lesh\\.local\\bin\\uv.exe",
"--directory",
"D:/Development/Workspace/mcp_server/weather",
"run",
"weather.py"
]
}
}
}

/mcp_io.log

  • 一些重要属性解释
    • @mcp.tool():可以想象成API中的post方法,是要提供给AI的function
    • @mcp.resource:类似API中的get方法
    • description:实际就是我们函数里面的注释,在python领域中是叫做docstring,是一种特殊的注释,由@mcp.tool()根据注释提取,方便模型选择与用户问题最匹配的tool
    • inputSchema:这个遵循JSON Schema的规范,用来描述另外一个JSON的结构,这里就是描述给出tool的入参规范,由@mcp.tool()根据参数定义提取
    • outputSchema:与inputSchema,不过是由@mcp.tool()根据函数返回提取
    • resources/list:获取资源列表,资源就是一个文件或者一个报告之类的东西
    • resources/templates/list:获取动态资源列表
    • result.content.text:这里即是MCP Server中的tool函数调用返回的结果

还可以直接在终端启动一个MCP Server服务,然后直接使用日志截取中的格式,也可以实现与MCP Server进行沟通,只要输入的格式是符合的,不需要MCP Host也是可以的

截获MCP Host与模型出入参的原理

mcpHost与模型之间的日志记录1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import httpx
from fastapi import FastAPI, Request
from starlette.responses import StreamingResponse


class AppLogger:
def __init__(self, log_file="llm.log"):
"""Initialize the logger with a file that will be cleared on startup."""
self.log_file = log_file
# Clear the log file on startup
with open(self.log_file, 'w', encoding='utf-8') as f:
f.write("")

def log(self, message):
"""Log a message to both file and console."""

# Log to file
with open(self.log_file, 'a', encoding='utf-8') as f:
f.write(message + "\n")

# Log to console
print(message)


app = FastAPI(title="LLM API Logger")
logger = AppLogger("llm.log")


@app.post("/chat/completions")
async def proxy_request(request: Request):

body_bytes = await request.body()
body_str = body_bytes.decode('utf-8')
logger.log(f"模型请求:{body_str}")
body = await request.json()

logger.log("模型返回:\n")

async def event_stream():
async with httpx.AsyncClient(timeout=None) as client:
async with client.stream(
"POST",
"https://api.deepseek.com/chat/completions",
json=body,
headers={
"Content-Type": "application/json",
"Accept": "text/event-stream",
"Authorization": request.headers.get("Authorization"),
},
) as response:
async for line in response.aiter_lines():
logger.log(line)
yield f"{line}\n"

return StreamingResponse(event_stream(), media_type="text/event-stream")


if __name__ == "__main__":
import uvicorn

uvicorn.run(app, host="0.0.0.0", port=8000)

/llm.log

  • 基本请求

    • Cline发送请求的信息中system级别包含了System Prompt,用户级别的包含了用户发送的消息以及环境信息environment_details
    • 在SSE的返回结果里面,以冒号开头输出的信息,这个就是注释,作用是保持连接,注释的内容并不重要,可以是任意的
    • 把模型流式返回结果中所有的content拼接起来,就是模型的最终回答;最终是data: [DONE]标识结束
  • 引入了MCP Server的请求

    • 模型第二次请求会携带第一次请求的流式返回结果中的响应,放进一个新的角色assistant,如果不发给模型,模型可记不清楚之前到底发生了什么,因为模型是没有任何记忆的

      1
      2
      3
      4
      # 整个流程就是:
      # Cline一开始是发送System Prompt,然后发送了用户的问题给模型,模型返回要使用工具请求,Cline紧接着发送了模型的工具调用请求到MCP Server,最后发送了工具的调用结果给模型
      # 只有工具调用结果是新消息,其他的都是历史消息,历史消息的作用是为了告诉模型发生了什么
      # 最终模型返回得到的结果就是基于了MCP Server的结果,对话结束
    • 思考(Thought)->行动(Action)->观察(Observation)->思考(Thought)->行动(Action)->观察(Observation)->思考(Thought)->最终答案

ReAct模式

ReAct: Synergizing Reasoning and Acting in Language Models

论文中的ReAct理念可以在不需要人干预的情况下,让模型自主思考,自主调用各类外部工具,通俗理解就是一个Agent。

Agent:一种能持续思考,持续调用外部工具,直至解决用户问题的一个程序

实现一个类似Cline这样的Agent,需要满足1、返回格式要求;2、可用的工具列表;3、要求使用ReAct模式

ReAct原理

Agent

扩展

概念一句话

1
2
3
Function Calling => AI Agent 和 AI Model 之间的工具调用协议
MCP => AI Agent 和 AI Tools 之间的工具发现与调用协议
A2A => AI Agent 和 AI Agent 之间的发现与任务配发协议