MCP初探

Database and Ruby, Python, History


前阵子同事介绍了一下MCP,自己趁有时间了解了一下。

关于MCP,大家就看官网的介绍吧。在我看来,MCP其实解决意图识别的问题,再按照Domain划分成为多个MCP server而已。

搭建MCP server

MCP server postgres为例,这段代码启动了一个标准输入。

async function runServer() {
  const transport = new StdioServerTransport();
  await server.connect(transport);
}

runServer().catch(console.error);

本地执行yarn build && node dist/index.js postgres://localhost/postgres之后,会出现一个标准输入输出界面。

然后输入{"jsonrpc":"2.0","id":"1","method":"tools/list","params":{}},就可以

{"result":{"tools":[{"name":"query","description":"Run a read-only SQL query","inputSchema":{"type":"object","properties":{"sql":{"type":"string"}}}}]},"jsonrpc":"2.0","id":"1"}

至于为啥是jsonrpc格式,需要看Transports

Zod又提供了格式的验证,对于不是这种格式的消息,直接就不处理。

export const JSONRPCRequestSchema = z
  .object({
    jsonrpc: z.literal(JSONRPC_VERSION),
    id: RequestIdSchema,
  })
  .merge(RequestSchema)
  .strict();

一旦输入了jsonrpc消息之后,就会走到_onrequest方法,

  async connect(transport: Transport): Promise<void> {
    this._transport = transport;
    this._transport.onclose = () => {
      this._onclose();
    };

    this._transport.onerror = (error: Error) => {
      this._onerror(error);
    };

    this._transport.onmessage = (message) => {
      if (!("method" in message)) {
        this._onresponse(message);
      } else if ("id" in message) {
        this._onrequest(message);
      } else {
        this._onnotification(message);
      }
    };

    await this._transport.start();
  }

_onrequest方法会从注册了的方法里面找到对应的方法,然后调用。

  private _onrequest(request: JSONRPCRequest): void {
    const handler =
      this._requestHandlers.get(request.method) ?? this.fallbackRequestHandler;

调用完了之后就会调用this._transport?.send返回结果。

Promise.resolve()
      .then(() => handler(request, extra))
      .then(
        (result) => {
          if (abortController.signal.aborted) {
            return;
          }

          return this._transport?.send({
            result,
            jsonrpc: "2.0",
            id: request.id,
          });
        },
        (error) => {
          if (abortController.signal.aborted) {
            return;
          }

          return this._transport?.send({
            jsonrpc: "2.0",
            id: request.id,
            error: {
              code: Number.isSafeInteger(error["code"])
                ? error["code"]
                : ErrorCode.InternalError,
              message: error.message ?? "Internal error",
            },
          });
        },
      )

至于StdioServerTransport.send,就是直接把结果输出到STDOUT里面。

send(message: JSONRPCMessage): Promise<void> {
    return new Promise((resolve) => {
      const json = serializeMessage(message);
      if (this._stdout.write(json)) {
        resolve();
      } else {
        this._stdout.once("drain", resolve);
      }
    });
  }

添加MCP server

在Cursor里面,添加MCP server,指定command node dist/index.js postgres://localhost/postgres即可。

MCP client

说完Server,可以来看看Client了。官网的介绍,直接就让你用Cursor或者Claude Desktop,或者直接用Anthropic的包,让你不知道中间发生了什么。

比如,下面直接用了Anthropic包,不知道里面干了些啥。

class MCPClient {
  private mcp: Client;
  private anthropic: Anthropic;
  private transport: StdioClientTransport | null = null;
  private tools: Tool[] = [];

  constructor() {
    this.anthropic = new Anthropic({
      apiKey: ANTHROPIC_API_KEY,
    });
    this.mcp = new Client({ name: "mcp-client-cli", version: "1.0.0" });
  }
  // methods will go here
}

我选择用openai包,再来拆解这个client干了啥。

import { OpenAI } from "openai";
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
import readline from "readline/promises";
import dotenv from "dotenv";

dotenv.config();

class MCPClient {
  private mcp: Client;
  private openai: OpenAI;
  private transport: StdioClientTransport | null = null;
  private tools: any[] = [];


  constructor() {
    this.openai = new OpenAI({
      apiKey: process.env.OPENAI_API_KEY,
      baseURL: process.env.OPENAI_BASE_URL,
    });
    this.mcp = new Client({ name: "mcp-client-cli", version: "1.0.0" });
  }

  async connectToServer(serverScriptPath: string, dbUrl?: string) {
    try {
      const command = process.execPath;

      // 准备参数数组,如果提供了数据库URL,则添加到参数中
      const args = [serverScriptPath];
      if (dbUrl) {
        args.push(dbUrl);
      }

      this.transport = new StdioClientTransport({
        command,
        args,
      });
      this.mcp.connect(this.transport);

      const toolsResult = await this.mcp.listTools();
      this.tools = toolsResult.tools.map((tool) => {
        return {
          type: "function",
          function: {
            name: tool.name,
            description: tool.description,
            parameters: tool.inputSchema,
          }
        };
      });
      console.log(
        "Connected to server with tools:",
        this.tools
      );

      const resourcesResult = await this.mcp.listResources();
      console.log(
        "Connected to server with resources:",
        resourcesResult.resources
      );
    } catch (e) {
      console.log("Failed to connect to MCP server: ", e);
      throw e;
    }
  }


  async chatLoop() {
    const rl = readline.createInterface({
      input: process.stdin,
      output: process.stdout,
    });

    try {
      console.log("\nMCP Client Started!");
      console.log("Type your queries or 'quit' to exit.");

      while (true) {
        const message = await rl.question("\nQuery: ");
        console.log("Received message:", message);
        if (message.toLowerCase() === "quit") {
          break;
        }
        const response = await this.processQuery(message);
        console.log("\n" + response);
      }
    } finally {
      rl.close();
    }
  }

  async cleanup() {
    await this.mcp.close();
  }

  async processQuery(query: string) {
    console.log("Processing query:", query);
    const messages: any[] = [
      {
        role: "user",
        content: query,
      },
    ];

    let response;
    try {
      response = await this.openai.chat.completions.create({
        model: "qwen-max",
        max_tokens: 1000,
        messages,
        tools: this.tools,
        tool_choice: "auto",  // 允许模型选择是否使用工具
      });
    } catch (error) {
      console.error("Error calling OpenAI API:", error);
      return "Sorry, there was an error processing your request. Please try again.";
    }

    console.log(response);

    const finalText = [];
    const responseMessage = response.choices[0].message;

    console.log('responseMessage', responseMessage);

    // 处理工具调用
    if (responseMessage.tool_calls && responseMessage.tool_calls.length > 0) {
      // 将原始响应添加到消息历史
      messages.push(responseMessage);

      // 处理每个工具调用
      for (const toolCall of responseMessage.tool_calls) {
        console.log(`执行工具调用: ${toolCall.function.name}`);

        console.log('toolCall', JSON.stringify(toolCall, null, 2));

        try {
          // 解析工具调用参数
          const args = JSON.parse(toolCall.function.arguments);

          console.log('args', args);

          // 通过MCP执行工具调用


          const toolResult = await this.mcp.callTool({
            name: toolCall.function.name,
            arguments: args,
          });


          console.log('toolResult', toolResult);
          const toolResultMessage = {
            role: "tool",
            tool_call_id: toolCall.id,
            name: toolCall.function.name,
            content: JSON.stringify(toolResult.content),
          };

          console.log('toolResultMessage', toolResultMessage);
          // 将工具结果添加到消息历史
          messages.push(toolResultMessage);

          finalText.push(`工具 ${toolCall.function.name} 执行结果: ${JSON.stringify(toolResult.content, null, 2)}`);
        } catch (error) {
          console.error(`工具调用失败: ${error}`);
          messages.push({
            role: "tool",
            tool_call_id: toolCall.id,
            name: toolCall.function.name,
            content: JSON.stringify({ error: `工具调用失败: ${error}` }),
          });
          finalText.push(`工具 ${toolCall.function.name} 执行失败: ${error}`);
        }
      }

      // 获取模型对工具结果的最终响应
      const finalResponse = await this.openai.chat.completions.create({
        model: "qwen-max",
        max_tokens: 1000,
        messages,
      });

      const finalContent = finalResponse.choices[0].message.content;
      if (finalContent) {
        finalText.push(finalContent);
      }
    } else if (responseMessage.content) {
      // 如果没有工具调用,直接返回内容
      finalText.push(responseMessage.content);
    }

    return finalText.join("\n\n");
  }
}



async function main() {
  if (process.argv.length < 3) {
    console.log("Usage: node index.ts <path_to_server_script> [database_url]");
    return;
  }

  const serverScriptPath = process.argv[2];
  const dbUrl = process.argv.length > 3 ? process.argv[3] : undefined;

  const mcpClient = new MCPClient();
  try {
    await mcpClient.connectToServer(serverScriptPath, dbUrl);
    await mcpClient.chatLoop();
  } finally {
    await mcpClient.cleanup();
    process.exit(0);
  }
}

main();

首先创建一个mcpClient

this.mcp = new Client({ name: "mcp-client-cli", version: "1.0.0" });

然后这个mcpClient连上mcpServer

      const command = process.execPath;

      // 准备参数数组,如果提供了数据库URL,则添加到参数中
      const args = [serverScriptPath];
      if (dbUrl) {
        args.push(dbUrl);
      }

      this.transport = new StdioClientTransport({
        command,
        args,
      });
      this.mcp.connect(this.transport);

又会继续调用Protocol.connect,再调用Transport.start

  override async connect(transport: Transport): Promise<void> {
    await super.connect(transport);
  async connect(transport: Transport): Promise<void> {
    this._transport = transport;
    this._transport.onclose = () => {
      this._onclose();
    };

    this._transport.onerror = (error: Error) => {
      this._onerror(error);
    };

    this._transport.onmessage = (message) => {
      if (!("method" in message)) {
        this._onresponse(message);
      } else if ("id" in message) {
        this._onrequest(message);
      } else {
        this._onnotification(message);
      }
    };

    await this._transport.start();
  }

最后会起一个新的进程,通过STDIO进行通信。

  async start(): Promise<void> {
    if (this._process) {
      throw new Error(
        "StdioClientTransport already started! If using Client class, note that connect() calls start() automatically."
      );
    }

    return new Promise((resolve, reject) => {
      this._process = spawn(
        this._serverParams.command,
        this._serverParams.args ?? [],
        {
          env: this._serverParams.env ?? getDefaultEnvironment(),
          stdio: ["pipe", "pipe", this._serverParams.stderr ?? "inherit"],
          shell: false,
          signal: this._abortController.signal,
          windowsHide: process.platform === "win32" && isElectron(),
          cwd: this._serverParams.cwd,
        }
      );

在命令行执行node dist/index.js ../postgres/dist/index.js postgres://localhost/postgres,就可以通过MCP client连上MCP server了。需要注意的是,../postgres/dist/index.js是MCP server的脚本地址。

在执行之后,同样又是一个标准输入框。这次不需要输入jsonrpc格式的数据了,直接输入sql语句就可以了。

  async chatLoop() {
    const rl = readline.createInterface({
      input: process.stdin,
      output: process.stdout,
    });

    try {
      console.log("\nMCP Client Started!");
      console.log("Type your queries or 'quit' to exit.");

      while (true) {
        const message = await rl.question("\nQuery: ");
        console.log("Received message:", message);
        if (message.toLowerCase() === "quit") {
          break;
        }
        const response = await this.processQuery(message);
        console.log("\n" + response);
      }
    } finally {
      rl.close();
    }
  }

接着就是去处理用户的输入了。它先会去调MCP server,查询所有支持的tool,然后在拼成一个请求,调用LLM进行function calling,也就是进行一次意图识别。

const toolsResult = await this.mcp.listTools();
      this.tools = toolsResult.tools.map((tool) => {
        return {
          type: "function",
          function: {
            name: tool.name,
            description: tool.description,
            parameters: tool.inputSchema,
          }
        };
      });

response = await this.openai.chat.completions.create({
        model: "qwen-max",
        max_tokens: 1000,
        messages,
        tools: this.tools,
        tool_choice: "auto",  // 允许模型选择是否使用工具
      });

由于我这里就一个tool,所以直接就调用MCP server的query工具。

const toolResult = await this.mcp.callTool({
            name: toolCall.function.name,
            arguments: args,
          });

最后再把执行结果交给LLM,重新组织一下语言。

const toolResultMessage = {
  role: "tool",
  tool_call_id: toolCall.id,
  name: toolCall.function.name,
  content: JSON.stringify(toolResult.content),
};

messages.push(toolResultMessage);

const finalResponse = await this.openai.chat.completions.create({
        model: "qwen-max",
        max_tokens: 1000,
        messages,
      });

更改STDIO为SSE

我看里面还支持SSE,就拿来玩一下,结果新的东西都是坑。

#!/usr/bin/env node

import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js";
import {
  CallToolRequestSchema,
  ListResourcesRequestSchema,
  ListToolsRequestSchema,
  ReadResourceRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import pg from "pg";
import express, { Request, Response } from "express";
import cors from "cors";

const app = express();
app.use(cors());

const server = new Server(
  {
    name: "example-servers/postgres",
    version: "0.1.0",
  },
  {
    capabilities: {
      resources: {},
      tools: {},
    },
  },
);

const args = process.argv.slice(2);
if (args.length === 0) {
  console.error("Please provide a database URL as a command-line argument");
  process.exit(1);
}

const databaseUrl = args[0];

const resourceBaseUrl = new URL(databaseUrl);
resourceBaseUrl.protocol = "postgres:";
resourceBaseUrl.password = "";

console.log('resourceBaseUrl', resourceBaseUrl);

const pool = new pg.Pool({
  connectionString: databaseUrl,
});

const SCHEMA_PATH = "public";

server.setRequestHandler(ListResourcesRequestSchema, async () => {
  const client = await pool.connect();
  try {
    const result = await client.query(
      "SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'",
    );
    return {
      resources: result.rows.map((row) => ({
        uri: new URL(`${row.table_name}/${SCHEMA_PATH}`, resourceBaseUrl).href,
        mimeType: "application/json",
        name: `"${row.table_name}" database schema`,
      })),
    };
  } finally {
    client.release();
  }
});

server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
  const resourceUrl = new URL(request.params.uri);

  const pathComponents = resourceUrl.pathname.split("/");
  const schema = pathComponents.pop();
  const tableName = pathComponents.pop();

  if (schema !== SCHEMA_PATH) {
    throw new Error("Invalid resource URI");
  }

  const client = await pool.connect();
  try {
    const result = await client.query(
      "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = $1",
      [tableName],
    );

    return {
      contents: [
        {
          uri: request.params.uri,
          mimeType: "application/json",
          text: JSON.stringify(result.rows, null, 2),
        },
      ],
    };
  } finally {
    client.release();
  }
});

server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: [
      {
        name: "query",
        description: "Run a read-only SQL query",
        inputSchema: {
          type: "object",
          properties: {
            sql: { type: "string" },
          },
        },
      },
    ],
  };
});

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  console.log('request', request);
  if (request.params.name === "query") {
    const sql = request.params.arguments?.sql as string;

    const client = await pool.connect();
    try {
      await client.query("BEGIN TRANSACTION READ ONLY");
      const result = await client.query(sql);
      return {
        content: [{ type: "text", text: JSON.stringify(result.rows, null, 2) }],
        isError: false,
      };
    } catch (error) {
      throw error;
    } finally {
      client
        .query("ROLLBACK")
        .catch((error) =>
          console.warn("Could not roll back transaction:", error),
        );

      client.release();
    }
  }
  throw new Error(`Unknown tool: ${request.params.name}`);
});

let transport: SSEServerTransport;

app.get("/sse", (req: Request, res: Response) => {
  transport = new SSEServerTransport("/messages", res);

  console.log(`SSE connection established: ${transport.sessionId}`);

  // Handle client disconnect
  req.on('close', () => {
    console.log(`SSE connection closed: ${transport.sessionId}`);
    transport.close();
  });

  server.connect(transport).catch(error => {
    console.error(`Error connecting transport ${transport.sessionId}:`, error);
  });
});

app.post("/messages", async (req: Request, res: Response) => {
  try {
    await transport.handlePostMessage(req, res);
  } catch (error) {
    console.error("Error handling message:", error);
    res.status(500).json({
      error: "Internal server error",
      details: error instanceof Error ? error.message : String(error)
    });
  }
});

const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`Server is running on http://localhost:${PORT}`);
});

建立一个SSE 连接

curl http://localhost:3000/sse

然后再新建一个连接发送请求,就可以拿到response。

curl -X POST --location 'http://localhost:3000/messages?sessionId=c5b35009-3d5e-4548-b21f-cd0a97a921fc' \
--header 'Content-Type: application/json' \
--data '{"jsonrpc":"2.0","id":"1","method":"tools/list","params":{}}'

也可以调用工具

curl -X POST --location 'http://localhost:3000/sse?sessionId=c7145401-8cbb-409c-8ece-af779188f7f8' \
--header 'Content-Type: application/json' \
--data '{"jsonrpc":"2.0","id":"1","method":"tools/call","params":{"name": "browser_navigate", "arguments": {"url":"https://www.google.com"} }}'

SSE 不支持多client连接

顺势吐槽一句,这里的SSE不支持多client连接,因为代码根本就没有对sessionId的处理issue

如果把上面的代码做个修改,根据sessionId从连接池里面取到transport,你也会发现没有任何卵用。

当调用mcpServer.connect(transport)的时候,transport会被赋值给mcpServer._transport1

async connect(transport: Transport): Promise<void> {
    this._transport = transport;
    this._transport.onclose = () => {
      this._onclose();
    };

    this._transport.onerror = (error: Error) => {
      this._onerror(error);
    };

    this._transport.onmessage = (message) => {
      if (!("method" in message)) {
        this._onresponse(message);
      } else if ("id" in message) {
        this._onrequest(message);
      } else {
        this._onnotification(message);
      }
    };

    await this._transport.start();
  }

在后来处理await transport.handlePostMessage(req, res);的时候,会调用onMessage2

  async handlePostMessage(
    req: IncomingMessage,
    res: ServerResponse,
    parsedBody?: unknown,
  ): Promise<void> {
    if (!this._sseResponse) {
      const message = "SSE connection not established";
      res.writeHead(500).end(message);
      throw new Error(message);
    }

    let body: string | unknown;
    try {
      const ct = contentType.parse(req.headers["content-type"] ?? "");
      if (ct.type !== "application/json") {
        throw new Error(`Unsupported content-type: ${ct}`);
      }

      body = parsedBody ?? await getRawBody(req, {
        limit: MAXIMUM_MESSAGE_SIZE,
        encoding: ct.parameters.charset ?? "utf-8",
      });
    } catch (error) {
      res.writeHead(400).end(String(error));
      this.onerror?.(error as Error);
      return;
    }

    try {
      await this.handleMessage(typeof body === 'string' ? JSON.parse(body) : body);
    } catch {
      res.writeHead(400).end(`Invalid message: ${body}`);
      return;
    }

    res.writeHead(202).end("Accepted");
  }

  async handleMessage(message: unknown): Promise<void> {
    let parsedMessage: JSONRPCMessage;
    try {
      parsedMessage = JSONRPCMessageSchema.parse(message);
    } catch (error) {
      this.onerror?.(error as Error);
      throw error;
    }

    this.onmessage?.(parsedMessage);
  }

onMessage是当初mcpServer.connect(transport)的时候被赋值的3

  async connect(transport: Transport): Promise<void> {
    this._transport = transport;
    this._transport.onclose = () => {
      this._onclose();
    };

    this._transport.onerror = (error: Error) => {
      this._onerror(error);
    };

    this._transport.onmessage = (message) => {
      if (!("method" in message)) {
        this._onresponse(message);
      } else if ("id" in message) {
        this._onrequest(message);
      } else {
        this._onnotification(message);
      }
    };

    await this._transport.start();
  }

会调用_onrequest4。在调用完工具之后,会调用this._transport?.send。注意,这里的this是mcpServer,而我们一直只有一个mcpServer,所以这个mcpServer.transport永远都是最后一个SSE连接的。

// https://github.com/modelcontextprotocol/typescript-sdk/blob/main/src/shared/protocol.ts#L330
this._transport?.send({
            result,
            jsonrpc: "2.0",
            id: request.id,
          });

写在最后

其实MCP还是没有解决很多问题,在我看来还是一个噱头。

  1. 如果意图过多的时候,其实是把所有的tools都喂给了LLM,很有可能会出现意图识别不准的情况。
  2. 没有解决注入业务知识的问题。现在注入业务知识,基本都是靠RAG找到最接近的文档,然后再一起丢给LLM重新组织语言的。
  3. 如何实现意图拆解、意图组合,以及意图的流程,MCP也没有解决。

新的东西,总是伴随着泡沫。能不能起飞,就看造化了。