Datawhale干货
作者:牧小熊,Datawhale成员
mcp最近很火,但在实际的应用环境中,并没有详细的资料讲解如何使用如何部署,增加初学者的学习成本,本文希望直观的展示mcp工具的具体使用实践。
一、mcp是什么?大语言模型,例如DeepSeek,如果不能联网、不能操作外部工具,只能是聊天机器人。除了聊天没什么可做的。而一旦大语言模型能操作工具,例如:联网/地图/查天气/函数/插件/API接口/代码解释器/机械臂/灵巧手,它就升级成为智能体Agent,能更好地帮助人类。今年爆火的Manus就是这样的智能体。
在以前,如果想让大模型调用外部工具,需要通过写大段提示词的方法,实现“Function Call”,这样其实就非常的不友好。
Anthropic公司(就是发布Claude大模型的公司),在2024年11月,发布了Model Context Protocol协议,简称MCP。MCP协议就像Type-C扩展坞,让海量的软件和工具,能够插在大语言模型上,供大模型调用。
总的来说,mcp就是一个框架,能帮助大模型调用工具
二、mcp协议通信MCP采用客户端-服务器的分布式架构,它将 LLM 与资源之间的通信划分为三个主要部分:客户端、服务器和资源。
MCP Hos:Hosts 是指 LLM 启动连接的应用程序,像Cursor、Claude、Desktop、Cline 这样的应用程序。
MCP Client:客户端是用来在 Hosts 应用程序内维护与 Server 之间 1:1 连接。一个主机应用中可以运行多个MCP客户端,从而同时连接多个不同的服务器。
MCP Server(服务器):独立运行的轻量程序,通过标准化的协议,为客户端提供上下文、工具和提示,是MCP服务的核心。
目前配置 MCP服务主要有种模式:
Stdio 模式: 这个主要是用来连接你本地电脑上的软件或文件。比如你想让 AI 控制 Blender 这种没有在线服务的软件,就得用 Stdio,它的配置相对复杂一些。
SSE 模式: 这个用来连接线上的、本身就有 API 的服务。比如访问你的谷歌邮箱、谷歌日历等等。SSE 的配置超级简单,基本上就是一个链接搞定。
在 MCP 框架中,SSE 模式是为了支持流式生成(如 LLM 的分词响应)而设计的一种 模型响应协议形式,其主要特征如下:
特点:
服务端推送:服务端可以不断发送生成的 token,客户端实时接收并显示。
兼容 Chat Completions 接口:通常和 OpenAI 的 stream=True接口兼容。
性能更高:相比于完整生成后一次返回,流式响应能提升用户体验和响应速度。
sse模型一般是推荐使用异步函数,那么为什么 SSE 模型要用异步函数?
1. SSE 本质是“流式”通信,需要持续等待数据SSE 是服务端持续推送数据,客户端需要一直监听这个连接,直到服务端关闭或中止。这种长时间等待、读取的过程非常适合用 async实现,而不是阻塞式的 requests.get。如果用同步函数,会卡住整个线程,阻塞后续逻辑或 UI。
2. 异步 I/O 更高效,占用资源更少 在异步模式下,await会在数据没到的时候挂起任务,释放执行权给其他协程,而不是死等。这对于聊天机器人、Web 服务或多用户同时请求来说,性能提升非常明显。
三、mcp实践
为了方便演示,我写了一个mcp的工具demo
# 创建一个FastMCP应用实例,名称为"demo"# 这将作为所有工具的统一服务入口app = FastMCP( "demo")
# 定义一个名为"weather"的工具,用于查询城市天气# 该工具接收一个字符串类型的城市名称作为参数@app.tool(name="weather", deion="城市天气查询")defget_weather( city: str): # 定义一个包含部分城市天气信息的字典# 实际应用中这里可能会调用真实的天气APIweather_data = {"北京": { "temp": 25, "condition": "晴"}, "上海": { "temp": 28, "condition": "多云"} }# 返回对应城市的天气信息,如果城市不存在则返回错误信息returnweather_data.get(city, { "error": "未找到该城市"})
if__name__ == "__main__": # 启动应用,使用标准输入输出作为传输方式# 这意味着可以通过命令行与工具进行交互app.run(transport= "stdio")
run(transport="stdio")以子进程方式等待客户端通过标准输入输出发送调用指令
这里为了演示方便,我们直接调用阿里的api接口进行模型与mcp工具的交互
参考链接:通义千问API参考(https://help.aliyun.com/zh/model-studio/use-qwen-by-calling-api)
# 配置OpenAI API参数,使用兼容模式接入阿里云DashScope服务OPENAI_API_KEY = "sk-xxxxxxxxxxxxxxxxxxxxxxxxxx"OPENAI_API_BASE = "https://dashscope.aliyuncs.com/compatible-mode/v1"
classMCPClientDemo: def__init__( self, server_path: str): """初始化MCP客户端:param server_path: MCP服务端脚本路径"""self.server_path = server_path# 创建OpenAI客户端,连接到兼容API的阿里云DashScope服务self.llm = OpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_API_BASE)asyncdefrun( self, user_query: str): """执行用户查询,对比使用工具和不使用工具的结果:param user_query: 用户问题:return: 对比结果字典"""# 配置标准IO通信的服务端参数server_params = StdioServerParameters(command= "python", args=[self.server_path]) # 建立与MCP服务端的连接asyncwithstdio_client(server=server_params) as(read_stream, write_stream): # 创建客户端会话asyncwithClientSession(read_stream, write_stream) assession: awaitsession.initialize # 获取服务端注册的所有工具信息tools = ( awaitsession.list_tools).tools # 将MCP工具格式转换为OpenAI函数调用格式functions = []fortool intools: functions.append({"name": tool.name, "deion": tool.deion or"", # 使用工具的输入模式或默认模式"parameters": tool.inputSchema or{ "type": "object", "properties": { "city_name": { "type": "string", "deion": "城市名称"} },"required": [ "city_name"] }})# -------------------------------# 模型调用 + MCP 工具路径# -------------------------------# 调用Qwen-max模型,启用函数调用功能response_with_tool = self.llm.chat.completions.create(model= "qwen-max", messages=[{ "role": "user", "content": user_query}], functions=functions,function_call= "auto")message_with_tool = response_with_tool.choices[ 0].message result_with_tool = {"model_reply": message_with_tool.content, "tool_called": None, "tool_result": None}# 如果模型决定调用工具ifmessage_with_tool.function_call: tool_name = message_with_tool.function_call.namearguments = json.loads(message_with_tool.function_call.arguments)# 通过MCP会话调用实际工具tool_result = awaitsession.call_tool(tool_name, arguments) result_with_tool.update({"tool_called": tool_name, "tool_arguments": arguments, "tool_result": tool_result })# -------------------------------# 模型不使用 MCP 工具的路径# -------------------------------# 调用相同模型,但不提供工具信息response_no_tool = self.llm.chat.completions.create(model= "qwen-max", messages=[{ "role": "user", "content": user_query}], # 不传入 functions 参数,模型无法使用工具)message_no_tool = response_no_tool.choices[ 0].message result_no_tool = {"model_reply": message_no_tool.content }# 返回两种调用方式的对比结果return{ "user_query": user_query, "with_mcp_tool": result_with_tool, "without_tool": result_no_tool }
asyncdefmain: """主函数,演示工具使用与不使用的对比"""# 创建MCP客户端,连接到指定服务端client = MCPClientDemo(server_path= "./stdio_mcp.py") # 执行天气查询示例result = awaitclient.run( "北京的天气怎么样") # 格式化输出对比结果print( ">>> 用户提问:", result[ "user_query"]) print( "\n【使用 MCP 工具】") print( "模型回复:", result[ "with_mcp_tool"][ "model_reply"]) ifresult[ "with_mcp_tool"][ "tool_called"]: print( "调用工具:", result[ "with_mcp_tool"][ "tool_called"]) print( "工具参数:", result[ "with_mcp_tool"][ "tool_arguments"]) print( "工具结果:", result[ "with_mcp_tool"][ "tool_result"]) else: print( "未调用任何工具") print( "\n【不使用工具】") print( "模型回复:", result[ "without_tool"][ "model_reply"])
if__name__ == "__main__": # 运行异步主函数asyncio.run(main)
那如果我开发不同的工具,模型能够准确使用,那是不是就能大幅度扩展模型的能力范围,进一步提升模型的效率呢?
四、本地化mcp实践
本节演示使用vllm本地化部署qwen系统的模型,并与本地化的mcp工具进行交互。在实际的应用场景中,我们肯定会开发各种不同的工具,那每次使用stdio这样的形式肯定是不够方便,是不是可以直接在本地的服务器上开一个端口,然后注册各种mcp的工具,如果模型要使用就直接通过mcp协议调用即可。
部署mcp服务,服务放在4200端口上
# 创建FastMCP应用实例,"demo"为应用名称app = FastMCP( "demo")
# 注册天气查询工具,用于获取指定城市的天气信息@app.tool(name="weather", deion="城市天气查询")defget_weather( city: str): # 预设的天气数据(实际应用中可替换为API调用)weather_data = {"北京": { "temp": 25, "condition": "晴"}, "上海": { "temp": 28, "condition": "多云"} }# 返回对应城市的天气,不存在则返回错误信息returnweather_data.get(city, { "error": "未找到该城市"})
if__name__ == "__main__": # 启动HTTP服务,支持流式响应app.run(transport= "streamable-http", # 使用支持流式传输的HTTP协议host= "127.0.0.1", # 监听本地地址port= 4200, # 服务端口path= "/demo", # 服务路径前缀log_level= "debug", # 调试日志级别)
测试mcp服务是否可以正常运行
asyncdeftest_mcp_service: """测试FastMCP服务的异步函数"""# 定义服务URL,与服务端配置保持一致SERVICE_URL = "http://127.0.0.1:4200/demo"
try: # 创建基于HTTP的流传输客户端transport = StreamableHttpTransport(url=SERVICE_URL)# 使用上下文管理器创建客户端会话asyncwithClient(transport) asclient: print( f"成功连接到MCP服务: {SERVICE_URL}")
# 发送ping请求测试服务连通性awaitclient.ping print( "服务心跳检测成功")
# 获取服务端注册的所有工具tools = awaitclient.list_tools tool_names = [tool.name fortool intools] print( f"可用工具列表: {', '.join(tool_names)}")
# ==== 工具调用示例 ====
# 1. 调用天气工具查询北京天气weather_results = awaitclient.call_tool( "weather", { "city": "北京"}) # 提取第一个结果的字典数据(假设服务端返回结构化数据)weather_data = weather_results[ 0].text print( f"北京天气: 温度={weather_data['temp']}℃, 天气={weather_data['condition']}")
# 3. 测试错误处理(查询不存在的城市)try: error_results = awaitclient.call_tool( "weather", { "city": "东京"}) # 检查错误信息是否符合预期iferror_results andhasattr(error_results[ 0], 'error'): print( f"错误处理测试: {error_results[0].error}- 符合预期行为") exceptException ase: print( f"意外错误: {str(e)}")
# 处理连接失败异常excepthttpx.ConnectError: print( f"连接失败!请检查服务是否运行在 {SERVICE_URL}") # 处理其他未知异常exceptException ase: print( f"测试失败: {str(e)}")
if__name__ == "__main__": # 脚本入口点print( "="* 50) print( "FastMCP服务测试脚本") print( "="* 50) # 运行异步测试函数asyncio.run(test_mcp_service)
可以看到可以正常的访问mcp服务
我们使用vllm部署模型, 把模型打到8000接口
接下来我们启动服务对大模型进行提问
asyncdefquery_mcp_tool( tool_name: str, params: dict): """调用MCP工具的统一入口:param tool_name: 工具名称:param params: 工具参数:return: 工具执行结果"""asyncwithClient( "http://127.0.0.1:4200/demo") asclient: returnawaitclient.call_tool(tool_name, params)
asyncdefchat_with_tools: """实现支持工具调用的聊天功能1. 连接本地vLLM服务2. 获取可用工具列表并转换为OpenAI函数调用格式3. 根据用户问题调用适当工具4. 整合工具结果生成最终回复"""# 连接本地部署的vLLM服务(兼容OpenAI API)llm_client = AsyncOpenAI(base_url= "http://localhost:8000/v1", api_key= "EMPTY"# 本地服务不需要API密钥)# 动态获取MCP服务提供的工具列表asyncwithClient( "http://127.0.0.1:4200/demo") asmcp_client: tools = awaitmcp_client.list_tools
# 将MCP工具模式转换为OpenAI函数调用格式tool_schemas = [{"type": "function", "function": { "name": tool.name, "deion": tool.deion, "parameters": { "type": tool.inputSchema.get( "type", "object"), "properties": { prop_name: prop_def forprop_name, prop_def intool.inputSchema[ "properties"].items },"required": tool.inputSchema.get( "required", []) }}} fortool intools] # 用户提问示例user_query = "查询北京天气和贵州茅台股价"
# 第一次调用模型,允许模型决定是否需要调用工具response = awaitllm_client.chat.completions.create( model= "qwen3-1.7b", messages=[{ "role": "user", "content": user_query}], tools=tool_schemas,tool_choice= "auto"# 让模型自动选择工具)# 处理工具调用请求message = response.choices[ 0].message print(message.tool_calls)
ifmessage.tool_calls: print( "检测到工具调用请求:")
# 按顺序执行模型请求的所有工具forcall inmessage.tool_calls: print( f"正在执行 {call.function.name}...") # 调用MCP工具并获取结果result = awaitquery_mcp_tool( call.function.name,eval(call.function.arguments) # 将参数字符串转换为字典)print( f"工具返回: {result}") # 第二次调用模型,结合工具结果生成最终回复final_response = awaitllm_client.chat.completions.create( model= "qwen3-1.7b", messages=[{ "role": "user", "content": user_query}, # 原始问题message, # 模型的工具调用计划*[{ # 每个工具的执行结果"role": "tool", "name": call.function.name, "content": str(result) } forcall inmessage.tool_calls] ])print( "\n最终回复:", final_response.choices[ 0].message.content) else: # 如果模型认为不需要工具,直接返回模型回复print( "直接回复:", message.content)
if__name__ == "__main__": # 运行异步聊天函数asyncio.run(chat_with_tools)
看一下最后的结果
可以看到模型明确的表示 我先调用了天气和股票的函数 然后得到了结果~~~~~
说明本地的模型已经与mcp进行了交互
完美下车~