asyncdefmake_nws_request(url: str) -> dict[str, Any] | None: """Make a request to the NWS API with proper error handling.""" headers = { "User-Agent": USER_AGENT, "Accept": "application/geo+json" } asyncwith httpx.AsyncClient() as client: try: response = await client.get(url, headers=headers, timeout=30.0) response.raise_for_status() return response.json() except Exception: returnNone
defformat_alert(feature: dict) -> str: """Format an alert feature into a readable string.""" props = feature["properties"] returnf""" Event: {props.get('event', 'Unknown')} Area: {props.get('areaDesc', 'Unknown')} Severity: {props.get('severity', 'Unknown')} Description: {props.get('description', 'No description available')} Instructions: {props.get('instruction', 'No specific instructions provided')} """
# @mcp.tool():他会从函数的注释里面提取这个函数的用途,以及每个参数的含义,以便模型决定调用这个函数的最佳时机;最终都会转化为tool的信息,并在实际调用中传给模型,帮助模型做决策 @mcp.tool() asyncdefget_alerts(state: str) -> str: """Get weather alerts for a US state. Args: state: Two-letter US state code (e.g. CA, NY) """ url = f"{NWS_API_BASE}/alerts/active/area/{state}" data = await make_nws_request(url)
ifnot data or"features"notin data: return"Unable to fetch alerts or no alerts found."
ifnot data["features"]: return"No active alerts for this state."
alerts = [format_alert(feature) for feature in data["features"]] return"\n---\n".join(alerts)
@mcp.tool() asyncdefget_forecast(latitude: float, longitude: float) -> str: """Get weather forecast for a location. Args: latitude: Latitude of the location longitude: Longitude of the location """ # First get the forecast grid endpoint points_url = f"{NWS_API_BASE}/points/{latitude},{longitude}" points_data = await make_nws_request(points_url)
ifnot points_data: return"Unable to fetch forecast data for this location."
# Get the forecast URL from the points response forecast_url = points_data["properties"]["forecast"] forecast_data = await make_nws_request(forecast_url)
ifnot forecast_data: return"Unable to fetch detailed forecast."
# Format the periods into a readable forecast periods = forecast_data["properties"]["periods"] forecasts = [] for period in periods[:5]: # Only show next 5 periods forecast = f""" {period['name']}: Temperature: {period['temperature']}°{period['temperatureUnit']} Wind: {period['windSpeed']}{period['windDirection']} Forecast: {period['detailedForecast']} """ forecasts.append(forecast)
return"\n---\n".join(forecasts)
if __name__ == "__main__": # Initialize and run the server mcp.run(transport='stdio') # transport: 表示mcp server与cline的沟通方式
# --- Argument Parsing --- parser = argparse.ArgumentParser( description="Wrap a command, passing STDIN/STDOUT verbatim while logging them.", usage="%(prog)s <command> [args...]" ) # Capture the command and all subsequent arguments parser.add_argument('command', nargs=argparse.REMAINDER, help='The command and its arguments to execute.')
ifnot args.command: print("Error: No command provided.", file=sys.stderr) parser.print_help(sys.stderr) sys.exit(1)
target_command = args.command # --- End Argument Parsing ---
# --- I/O Forwarding Functions --- # These will run in separate threads
defforward_and_log_stdin(proxy_stdin, target_stdin, log_file): """Reads from proxy's stdin, logs it, writes to target's stdin.""" try: whileTrue: # Read line by line from the script's actual stdin line_bytes = proxy_stdin.readline() ifnot line_bytes: # EOF reached break
# Log with prefix log_file.write(f"输入: {line_str}") log_file.flush() # Ensure log is written promptly
# Write the original bytes to the target process's stdin target_stdin.write(line_bytes) target_stdin.flush() # Ensure target receives it promptly
except Exception as e: # Log errors happening during forwarding try: log_file.write(f"!!! STDIN Forwarding Error: {e}\n") log_file.flush() except: pass# Avoid errors trying to log errors if log file is broken
finally: # Important: Close the target's stdin when proxy's stdin closes # This signals EOF to the target process (like test.sh's read loop) try: target_stdin.close() log_file.write("--- STDIN stream closed to target ---\n") log_file.flush() except Exception as e: try: log_file.write(f"!!! Error closing target STDIN: {e}\n") log_file.flush() except: pass
defforward_and_log_stdout(target_stdout, proxy_stdout, log_file): """Reads from target's stdout, logs it, writes to proxy's stdout.""" try: whileTrue: # Read line by line from the target process's stdout line_bytes = target_stdout.readline() ifnot line_bytes: # EOF reached (process exited or closed stdout) break
# Log with prefix log_file.write(f"输出: {line_str}") log_file.flush()
# Write the original bytes to the script's actual stdout proxy_stdout.write(line_bytes) proxy_stdout.flush() # Ensure output is seen promptly
except Exception as e: try: log_file.write(f"!!! STDOUT Forwarding Error: {e}\n") log_file.flush() except: pass finally: try: log_file.flush() except: pass # Don't close proxy_stdout (sys.stdout) here
# --- Main Execution --- process = None log_f = None exit_code = 1# Default exit code in case of early failure
try: # Open log file in append mode ('a') for the threads log_f = open(LOG_FILE, 'a', encoding='utf-8')
# Start the target process # We use pipes for stdin/stdout # We work with bytes (bufsize=0 for unbuffered binary, readline() still works) # stderr=subprocess.PIPE could be added to capture stderr too if needed. process = subprocess.Popen( target_command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, # Capture stderr too, good practice bufsize=0# Use 0 for unbuffered binary I/O )
# Pass binary streams to threads stdin_thread = threading.Thread( target=forward_and_log_stdin, args=(sys.stdin.buffer, process.stdin, log_f), daemon=True# Allows main thread to exit even if this is stuck (e.g., waiting on stdin) - reconsider if explicit join is needed )
# Optional: Handle stderr similarly (log and pass through) stderr_thread = threading.Thread( target=forward_and_log_stdout, # Can reuse the function args=(process.stderr, sys.stderr.buffer, log_f), # Pass stderr streams # Add a different prefix in the function if needed, or modify function # For now, it will log with "STDOUT:" prefix - might want to change function # Let's modify the function slightly for this daemon=True ) # A slightly modified version for stderr logging defforward_and_log_stderr(target_stderr, proxy_stderr, log_file): """Reads from target's stderr, logs it, writes to proxy's stderr.""" try: whileTrue: line_bytes = target_stderr.readline() ifnot line_bytes: break try: line_str = line_bytes.decode('utf-8') except UnicodeDecodeError: line_str = f"[Non-UTF8 data, {len(line_bytes)} bytes]\n" log_file.write(f"STDERR: {line_str}") # Use STDERR prefix log_file.flush() proxy_stderr.write(line_bytes) proxy_stderr.flush() except Exception as e: try: log_file.write(f"!!! STDERR Forwarding Error: {e}\n") log_file.flush() except: pass finally: try: log_file.flush() except: pass
# Start the forwarding threads stdin_thread.start() stdout_thread.start() stderr_thread.start() # Start stderr thread too
# Wait for the target process to complete process.wait() exit_code = process.returncode
# Wait briefly for I/O threads to finish flushing last messages # Since they are daemons, they might exit abruptly with the main thread. # Joining them ensures cleaner shutdown and logging. # We need to make sure the pipes are closed so the reads terminate. # process.wait() ensures target process is dead, pipes should close naturally. stdin_thread.join(timeout=1.0) # Add timeout in case thread hangs stdout_thread.join(timeout=1.0) stderr_thread.join(timeout=1.0)
except Exception as e: print(f"MCP Logger Error: {e}", file=sys.stderr) # Try to log the error too if log_f andnot log_f.closed: try: log_f.write(f"!!! MCP Logger Main Error: {e}\n") log_f.flush() except: pass# Ignore errors during final logging attempt exit_code = 1# Indicate logger failure
finally: # Ensure the process is terminated if it's still running (e.g., if logger crashed) if process and process.poll() isNone: try: process.terminate() process.wait(timeout=1.0) # Give it a moment to terminate except: pass# Ignore errors during cleanup if process.poll() isNone: # Still running? try: process.kill() # Force kill except: pass# Ignore kill errors
# Final log message if log_f andnot log_f.closed: try: log_f.close() except: pass# Ignore errors during final logging attempt
# Exit with the target process's exit code sys.exit(exit_code)
import httpx from fastapi import FastAPI, Request from starlette.responses import StreamingResponse
classAppLogger: def__init__(self, log_file="llm.log"): """Initialize the logger with a file that will be cleared on startup.""" self.log_file = log_file # Clear the log file on startup withopen(self.log_file, 'w', encoding='utf-8') as f: f.write("")
deflog(self, message): """Log a message to both file and console."""
# Log to file withopen(self.log_file, 'a', encoding='utf-8') as f: f.write(message + "\n")
# Log to console print(message)
app = FastAPI(title="LLM API Logger") logger = AppLogger("llm.log")
Function Calling => AI Agent 和 AI Model 之间的工具调用协议 MCP => AI Agent 和 AI Tools 之间的工具发现与调用协议 A2A => AI Agent 和 AI Agent 之间的发现与任务配发协议
sequenceDiagram
participant Agent1 as 🤖 AI Agent
participant Model as 🧠 AI Model
participant Tool as 🛠️ AI Tool
participant Agent2 as 🤖 Other AI Agent
%% 用户请求
Note over Agent1: 用户发起请求
%% Function Calling 阶段
Agent1->>Model: Function Calling 请求 (解析用户意图)
Model-->>Agent1: 返回调用计划 (函数名+参数)
%% MCP 阶段
Agent1->>Tool: MCP 调用 (执行具体工具)
Tool-->>Agent1: 工具执行结果
%% A2A 阶段 (若需要协作)
Agent1->>Agent2: A2A 请求 (任务分发/协作)
Agent2-->>Agent1: 返回任务处理结果
%% 最终响应
Agent1-->>User: 汇总结果,返回给用户