前阵子同事介绍了一下MCP,自己趁有时间了解了一下。
关于MCP,大家就看官网的介绍吧。在我看来,MCP其实解决意图识别的问题,再按照Domain划分成为多个MCP server而已。
搭建MCP server
以MCP server postgres为例,这段代码启动了一个标准输入。
async function runServer() {
const transport = new StdioServerTransport();
await server.connect(transport);
}
runServer().catch(console.error);
本地执行yarn build && node dist/index.js postgres://localhost/postgres
之后,会出现一个标准输入输出界面。
然后输入{"jsonrpc":"2.0","id":"1","method":"tools/list","params":{}}
,就可以
{"result":{"tools":[{"name":"query","description":"Run a read-only SQL query","inputSchema":{"type":"object","properties":{"sql":{"type":"string"}}}}]},"jsonrpc":"2.0","id":"1"}
至于为啥是jsonrpc
格式,需要看Transports。
而Zod又提供了格式的验证,对于不是这种格式的消息,直接就不处理。
export const JSONRPCRequestSchema = z
.object({
jsonrpc: z.literal(JSONRPC_VERSION),
id: RequestIdSchema,
})
.merge(RequestSchema)
.strict();
一旦输入了jsonrpc
消息之后,就会走到_onrequest
方法,
async connect(transport: Transport): Promise<void> {
this._transport = transport;
this._transport.onclose = () => {
this._onclose();
};
this._transport.onerror = (error: Error) => {
this._onerror(error);
};
this._transport.onmessage = (message) => {
if (!("method" in message)) {
this._onresponse(message);
} else if ("id" in message) {
this._onrequest(message);
} else {
this._onnotification(message);
}
};
await this._transport.start();
}
_onrequest
方法会从注册了的方法里面找到对应的方法,然后调用。
private _onrequest(request: JSONRPCRequest): void {
const handler =
this._requestHandlers.get(request.method) ?? this.fallbackRequestHandler;
调用完了之后就会调用this._transport?.send
返回结果。
Promise.resolve()
.then(() => handler(request, extra))
.then(
(result) => {
if (abortController.signal.aborted) {
return;
}
return this._transport?.send({
result,
jsonrpc: "2.0",
id: request.id,
});
},
(error) => {
if (abortController.signal.aborted) {
return;
}
return this._transport?.send({
jsonrpc: "2.0",
id: request.id,
error: {
code: Number.isSafeInteger(error["code"])
? error["code"]
: ErrorCode.InternalError,
message: error.message ?? "Internal error",
},
});
},
)
至于StdioServerTransport.send
,就是直接把结果输出到STDOUT里面。
send(message: JSONRPCMessage): Promise<void> {
return new Promise((resolve) => {
const json = serializeMessage(message);
if (this._stdout.write(json)) {
resolve();
} else {
this._stdout.once("drain", resolve);
}
});
}
添加MCP server
在Cursor里面,添加MCP server,指定command
node dist/index.js postgres://localhost/postgres
即可。
MCP client
说完Server,可以来看看Client了。官网的介绍,直接就让你用Cursor或者Claude Desktop,或者直接用Anthropic
的包,让你不知道中间发生了什么。
比如,下面直接用了Anthropic
包,不知道里面干了些啥。
class MCPClient {
private mcp: Client;
private anthropic: Anthropic;
private transport: StdioClientTransport | null = null;
private tools: Tool[] = [];
constructor() {
this.anthropic = new Anthropic({
apiKey: ANTHROPIC_API_KEY,
});
this.mcp = new Client({ name: "mcp-client-cli", version: "1.0.0" });
}
// methods will go here
}
我选择用openai
包,再来拆解这个client干了啥。
import { OpenAI } from "openai";
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
import readline from "readline/promises";
import dotenv from "dotenv";
dotenv.config();
class MCPClient {
private mcp: Client;
private openai: OpenAI;
private transport: StdioClientTransport | null = null;
private tools: any[] = [];
constructor() {
this.openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
baseURL: process.env.OPENAI_BASE_URL,
});
this.mcp = new Client({ name: "mcp-client-cli", version: "1.0.0" });
}
async connectToServer(serverScriptPath: string, dbUrl?: string) {
try {
const command = process.execPath;
// 准备参数数组,如果提供了数据库URL,则添加到参数中
const args = [serverScriptPath];
if (dbUrl) {
args.push(dbUrl);
}
this.transport = new StdioClientTransport({
command,
args,
});
this.mcp.connect(this.transport);
const toolsResult = await this.mcp.listTools();
this.tools = toolsResult.tools.map((tool) => {
return {
type: "function",
function: {
name: tool.name,
description: tool.description,
parameters: tool.inputSchema,
}
};
});
console.log(
"Connected to server with tools:",
this.tools
);
const resourcesResult = await this.mcp.listResources();
console.log(
"Connected to server with resources:",
resourcesResult.resources
);
} catch (e) {
console.log("Failed to connect to MCP server: ", e);
throw e;
}
}
async chatLoop() {
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
try {
console.log("\nMCP Client Started!");
console.log("Type your queries or 'quit' to exit.");
while (true) {
const message = await rl.question("\nQuery: ");
console.log("Received message:", message);
if (message.toLowerCase() === "quit") {
break;
}
const response = await this.processQuery(message);
console.log("\n" + response);
}
} finally {
rl.close();
}
}
async cleanup() {
await this.mcp.close();
}
async processQuery(query: string) {
console.log("Processing query:", query);
const messages: any[] = [
{
role: "user",
content: query,
},
];
let response;
try {
response = await this.openai.chat.completions.create({
model: "qwen-max",
max_tokens: 1000,
messages,
tools: this.tools,
tool_choice: "auto", // 允许模型选择是否使用工具
});
} catch (error) {
console.error("Error calling OpenAI API:", error);
return "Sorry, there was an error processing your request. Please try again.";
}
console.log(response);
const finalText = [];
const responseMessage = response.choices[0].message;
console.log('responseMessage', responseMessage);
// 处理工具调用
if (responseMessage.tool_calls && responseMessage.tool_calls.length > 0) {
// 将原始响应添加到消息历史
messages.push(responseMessage);
// 处理每个工具调用
for (const toolCall of responseMessage.tool_calls) {
console.log(`执行工具调用: ${toolCall.function.name}`);
console.log('toolCall', JSON.stringify(toolCall, null, 2));
try {
// 解析工具调用参数
const args = JSON.parse(toolCall.function.arguments);
console.log('args', args);
// 通过MCP执行工具调用
const toolResult = await this.mcp.callTool({
name: toolCall.function.name,
arguments: args,
});
console.log('toolResult', toolResult);
const toolResultMessage = {
role: "tool",
tool_call_id: toolCall.id,
name: toolCall.function.name,
content: JSON.stringify(toolResult.content),
};
console.log('toolResultMessage', toolResultMessage);
// 将工具结果添加到消息历史
messages.push(toolResultMessage);
finalText.push(`工具 ${toolCall.function.name} 执行结果: ${JSON.stringify(toolResult.content, null, 2)}`);
} catch (error) {
console.error(`工具调用失败: ${error}`);
messages.push({
role: "tool",
tool_call_id: toolCall.id,
name: toolCall.function.name,
content: JSON.stringify({ error: `工具调用失败: ${error}` }),
});
finalText.push(`工具 ${toolCall.function.name} 执行失败: ${error}`);
}
}
// 获取模型对工具结果的最终响应
const finalResponse = await this.openai.chat.completions.create({
model: "qwen-max",
max_tokens: 1000,
messages,
});
const finalContent = finalResponse.choices[0].message.content;
if (finalContent) {
finalText.push(finalContent);
}
} else if (responseMessage.content) {
// 如果没有工具调用,直接返回内容
finalText.push(responseMessage.content);
}
return finalText.join("\n\n");
}
}
async function main() {
if (process.argv.length < 3) {
console.log("Usage: node index.ts <path_to_server_script> [database_url]");
return;
}
const serverScriptPath = process.argv[2];
const dbUrl = process.argv.length > 3 ? process.argv[3] : undefined;
const mcpClient = new MCPClient();
try {
await mcpClient.connectToServer(serverScriptPath, dbUrl);
await mcpClient.chatLoop();
} finally {
await mcpClient.cleanup();
process.exit(0);
}
}
main();
首先创建一个mcpClient
this.mcp = new Client({ name: "mcp-client-cli", version: "1.0.0" });
然后这个mcpClient
连上mcpServer
const command = process.execPath;
// 准备参数数组,如果提供了数据库URL,则添加到参数中
const args = [serverScriptPath];
if (dbUrl) {
args.push(dbUrl);
}
this.transport = new StdioClientTransport({
command,
args,
});
this.mcp.connect(this.transport);
又会继续调用Protocol.connect
,再调用Transport.start
。
override async connect(transport: Transport): Promise<void> {
await super.connect(transport);
async connect(transport: Transport): Promise<void> {
this._transport = transport;
this._transport.onclose = () => {
this._onclose();
};
this._transport.onerror = (error: Error) => {
this._onerror(error);
};
this._transport.onmessage = (message) => {
if (!("method" in message)) {
this._onresponse(message);
} else if ("id" in message) {
this._onrequest(message);
} else {
this._onnotification(message);
}
};
await this._transport.start();
}
最后会起一个新的进程,通过STDIO进行通信。
async start(): Promise<void> {
if (this._process) {
throw new Error(
"StdioClientTransport already started! If using Client class, note that connect() calls start() automatically."
);
}
return new Promise((resolve, reject) => {
this._process = spawn(
this._serverParams.command,
this._serverParams.args ?? [],
{
env: this._serverParams.env ?? getDefaultEnvironment(),
stdio: ["pipe", "pipe", this._serverParams.stderr ?? "inherit"],
shell: false,
signal: this._abortController.signal,
windowsHide: process.platform === "win32" && isElectron(),
cwd: this._serverParams.cwd,
}
);
在命令行执行node dist/index.js ../postgres/dist/index.js postgres://localhost/postgres
,就可以通过MCP client连上MCP server了。需要注意的是,../postgres/dist/index.js
是MCP server的脚本地址。
在执行之后,同样又是一个标准输入框。这次不需要输入jsonrpc
格式的数据了,直接输入sql
语句就可以了。
async chatLoop() {
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
try {
console.log("\nMCP Client Started!");
console.log("Type your queries or 'quit' to exit.");
while (true) {
const message = await rl.question("\nQuery: ");
console.log("Received message:", message);
if (message.toLowerCase() === "quit") {
break;
}
const response = await this.processQuery(message);
console.log("\n" + response);
}
} finally {
rl.close();
}
}
接着就是去处理用户的输入了。它先会去调MCP server,查询所有支持的tool
,然后在拼成一个请求,调用LLM进行function calling
,也就是进行一次意图识别。
const toolsResult = await this.mcp.listTools();
this.tools = toolsResult.tools.map((tool) => {
return {
type: "function",
function: {
name: tool.name,
description: tool.description,
parameters: tool.inputSchema,
}
};
});
response = await this.openai.chat.completions.create({
model: "qwen-max",
max_tokens: 1000,
messages,
tools: this.tools,
tool_choice: "auto", // 允许模型选择是否使用工具
});
由于我这里就一个tool
,所以直接就调用MCP server的query
工具。
const toolResult = await this.mcp.callTool({
name: toolCall.function.name,
arguments: args,
});
最后再把执行结果交给LLM,重新组织一下语言。
const toolResultMessage = {
role: "tool",
tool_call_id: toolCall.id,
name: toolCall.function.name,
content: JSON.stringify(toolResult.content),
};
messages.push(toolResultMessage);
const finalResponse = await this.openai.chat.completions.create({
model: "qwen-max",
max_tokens: 1000,
messages,
});
更改STDIO为SSE
我看里面还支持SSE,就拿来玩一下,结果新的东西都是坑。
#!/usr/bin/env node
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import {
CallToolRequestSchema,
ListResourcesRequestSchema,
ListToolsRequestSchema,
ReadResourceRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import pg from "pg";
import express, { Request, Response } from "express";
import cors from "cors";
const app = express();
app.use(cors());
const server = new Server(
{
name: "example-servers/postgres",
version: "0.1.0",
},
{
capabilities: {
resources: {},
tools: {},
},
},
);
const args = process.argv.slice(2);
if (args.length === 0) {
console.error("Please provide a database URL as a command-line argument");
process.exit(1);
}
const databaseUrl = args[0];
const resourceBaseUrl = new URL(databaseUrl);
resourceBaseUrl.protocol = "postgres:";
resourceBaseUrl.password = "";
console.log('resourceBaseUrl', resourceBaseUrl);
const pool = new pg.Pool({
connectionString: databaseUrl,
});
const SCHEMA_PATH = "public";
server.setRequestHandler(ListResourcesRequestSchema, async () => {
const client = await pool.connect();
try {
const result = await client.query(
"SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'",
);
return {
resources: result.rows.map((row) => ({
uri: new URL(`${row.table_name}/${SCHEMA_PATH}`, resourceBaseUrl).href,
mimeType: "application/json",
name: `"${row.table_name}" database schema`,
})),
};
} finally {
client.release();
}
});
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
const resourceUrl = new URL(request.params.uri);
const pathComponents = resourceUrl.pathname.split("/");
const schema = pathComponents.pop();
const tableName = pathComponents.pop();
if (schema !== SCHEMA_PATH) {
throw new Error("Invalid resource URI");
}
const client = await pool.connect();
try {
const result = await client.query(
"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = $1",
[tableName],
);
return {
contents: [
{
uri: request.params.uri,
mimeType: "application/json",
text: JSON.stringify(result.rows, null, 2),
},
],
};
} finally {
client.release();
}
});
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: "query",
description: "Run a read-only SQL query",
inputSchema: {
type: "object",
properties: {
sql: { type: "string" },
},
},
},
],
};
});
server.setRequestHandler(CallToolRequestSchema, async (request) => {
console.log('request', request);
if (request.params.name === "query") {
const sql = request.params.arguments?.sql as string;
const client = await pool.connect();
try {
await client.query("BEGIN TRANSACTION READ ONLY");
const result = await client.query(sql);
return {
content: [{ type: "text", text: JSON.stringify(result.rows, null, 2) }],
isError: false,
};
} catch (error) {
throw error;
} finally {
client
.query("ROLLBACK")
.catch((error) =>
console.warn("Could not roll back transaction:", error),
);
client.release();
}
}
throw new Error(`Unknown tool: ${request.params.name}`);
});
let transport: SSEServerTransport;
app.get("/sse", (req: Request, res: Response) => {
transport = new SSEServerTransport("/messages", res);
console.log(`SSE connection established: ${transport.sessionId}`);
// Handle client disconnect
req.on('close', () => {
console.log(`SSE connection closed: ${transport.sessionId}`);
transport.close();
});
server.connect(transport).catch(error => {
console.error(`Error connecting transport ${transport.sessionId}:`, error);
});
});
app.post("/messages", async (req: Request, res: Response) => {
try {
await transport.handlePostMessage(req, res);
} catch (error) {
console.error("Error handling message:", error);
res.status(500).json({
error: "Internal server error",
details: error instanceof Error ? error.message : String(error)
});
}
});
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`Server is running on http://localhost:${PORT}`);
});
建立一个SSE 连接
curl http://localhost:3000/sse
然后再新建一个连接发送请求,就可以拿到response。
curl -X POST --location 'http://localhost:3000/messages?sessionId=c5b35009-3d5e-4548-b21f-cd0a97a921fc' \
--header 'Content-Type: application/json' \
--data '{"jsonrpc":"2.0","id":"1","method":"tools/list","params":{}}'
也可以调用工具
curl -X POST --location 'http://localhost:3000/sse?sessionId=c7145401-8cbb-409c-8ece-af779188f7f8' \
--header 'Content-Type: application/json' \
--data '{"jsonrpc":"2.0","id":"1","method":"tools/call","params":{"name": "browser_navigate", "arguments": {"url":"https://www.google.com"} }}'
SSE 不支持多client连接
顺势吐槽一句,这里的SSE不支持多client连接,因为代码根本就没有对sessionId
的处理issue。
如果把上面的代码做个修改,根据sessionId
从连接池里面取到transport
,你也会发现没有任何卵用。
当调用mcpServer.connect(transport)
的时候,transport
会被赋值给mcpServer._transport
1
async connect(transport: Transport): Promise<void> {
this._transport = transport;
this._transport.onclose = () => {
this._onclose();
};
this._transport.onerror = (error: Error) => {
this._onerror(error);
};
this._transport.onmessage = (message) => {
if (!("method" in message)) {
this._onresponse(message);
} else if ("id" in message) {
this._onrequest(message);
} else {
this._onnotification(message);
}
};
await this._transport.start();
}
在后来处理await transport.handlePostMessage(req, res);
的时候,会调用onMessage
2。
async handlePostMessage(
req: IncomingMessage,
res: ServerResponse,
parsedBody?: unknown,
): Promise<void> {
if (!this._sseResponse) {
const message = "SSE connection not established";
res.writeHead(500).end(message);
throw new Error(message);
}
let body: string | unknown;
try {
const ct = contentType.parse(req.headers["content-type"] ?? "");
if (ct.type !== "application/json") {
throw new Error(`Unsupported content-type: ${ct}`);
}
body = parsedBody ?? await getRawBody(req, {
limit: MAXIMUM_MESSAGE_SIZE,
encoding: ct.parameters.charset ?? "utf-8",
});
} catch (error) {
res.writeHead(400).end(String(error));
this.onerror?.(error as Error);
return;
}
try {
await this.handleMessage(typeof body === 'string' ? JSON.parse(body) : body);
} catch {
res.writeHead(400).end(`Invalid message: ${body}`);
return;
}
res.writeHead(202).end("Accepted");
}
async handleMessage(message: unknown): Promise<void> {
let parsedMessage: JSONRPCMessage;
try {
parsedMessage = JSONRPCMessageSchema.parse(message);
} catch (error) {
this.onerror?.(error as Error);
throw error;
}
this.onmessage?.(parsedMessage);
}
而onMessage
是当初mcpServer.connect(transport)
的时候被赋值的3
async connect(transport: Transport): Promise<void> {
this._transport = transport;
this._transport.onclose = () => {
this._onclose();
};
this._transport.onerror = (error: Error) => {
this._onerror(error);
};
this._transport.onmessage = (message) => {
if (!("method" in message)) {
this._onresponse(message);
} else if ("id" in message) {
this._onrequest(message);
} else {
this._onnotification(message);
}
};
await this._transport.start();
}
会调用_onrequest
4。在调用完工具之后,会调用this._transport?.send
。注意,这里的this
是mcpServer,而我们一直只有一个mcpServer,所以这个mcpServer.transport
永远都是最后一个SSE连接的。
// https://github.com/modelcontextprotocol/typescript-sdk/blob/main/src/shared/protocol.ts#L330
this._transport?.send({
result,
jsonrpc: "2.0",
id: request.id,
});
写在最后
其实MCP还是没有解决很多问题,在我看来还是一个噱头。
- 如果意图过多的时候,其实是把所有的
tools
都喂给了LLM,很有可能会出现意图识别不准的情况。 - 没有解决注入业务知识的问题。现在注入业务知识,基本都是靠RAG找到最接近的文档,然后再一起丢给LLM重新组织语言的。
- 如何实现意图拆解、意图组合,以及意图的流程,MCP也没有解决。
新的东西,总是伴随着泡沫。能不能起飞,就看造化了。