""" 工具调用处理器 - 处理 Claude 的 tool_use 响应 """ import asyncio import json from typing import Dict, List, Any from mcp_client import MCPClient from tool_converter import ToolConverter class ToolCallHandler: """处理 Claude 返回的 tool_use 类型内容块""" def __init__(self, session_id: str = None, mcp_tokens: dict = None): self.session_id = session_id self.mcp_tokens = mcp_tokens or {} # MCP 服务器 token 映射 async def process_tool_use_block( self, tool_use_block: Dict[str, Any], tool_to_server_map: dict = None ) -> Dict[str, Any]: """ 处理单个 tool_use 内容块 Args: tool_use_block: Claude 返回的 tool_use 内容块 { "type": "tool_use", "id": "...", "name": "tool_name", "input": {...} } tool_to_server_map: 工具名到服务器 ID 的映射 Returns: 工具执行结果 """ tool_name = tool_use_block.get("name", "") tool_input = tool_use_block.get("input", {}) tool_id = tool_use_block.get("id", "") if not tool_name: return { "tool_use_id": tool_id, "error": "工具名称为空" } # 查找工具所属的服务器 server_id = None if tool_to_server_map and tool_name in tool_to_server_map: server_id = tool_to_server_map[tool_name] # 获取该服务器的 token auth_token = None if server_id and server_id in self.mcp_tokens: auth_token = self.mcp_tokens[server_id] # DEBUG: 打印工具调用详情 print(f"[DEBUG ToolCallHandler.process_tool_use_block] tool: {tool_name}") print(f"[DEBUG ToolCallHandler.process_tool_use_block] server_id: {server_id}") print(f"[DEBUG ToolCallHandler.process_tool_use_block] auth_token present: {bool(auth_token)}") if auth_token: print(f"[DEBUG ToolCallHandler.process_tool_use_block] auth_token: {auth_token[:30]}...") print(f"[DEBUG ToolCallHandler.process_tool_use_block] available tokens: {list(self.mcp_tokens.keys())}") # 调用 MCP 工具(带 token) client = MCPClient(session_id=self.session_id, server_id=server_id, auth_token=auth_token) result = await client.call_tool(tool_name, tool_input) return { "tool_use_id": tool_id, "tool_name": tool_name, "result": result, "server_id": server_id } async def process_tool_use_blocks( self, content_blocks: List[Dict[str, Any]], tool_to_server_map: dict = None ) -> List[Dict[str, Any]]: """ 批量处理 tool_use 内容块 Args: content_blocks: Claude 返回的内容块列表 tool_to_server_map: 工具名到服务器 ID 的映射 Returns: 工具执行结果列表 """ tool_use_blocks = [ block for block in content_blocks if block.get("type") == "tool_use" ] # DEBUG: 打印工具映射和可用 tokens print(f"[DEBUG ToolCallHandler.process_tool_use_blocks] tool_to_server_map keys: {list(tool_to_server_map.keys()) if tool_to_server_map else 'None'}") print(f"[DEBUG ToolCallHandler.process_tool_use_blocks] self.mcp_tokens keys: {list(self.mcp_tokens.keys())}") if not tool_use_blocks: return [] # 并发执行所有工具调用 tasks = [ self.process_tool_use_block(block, tool_to_server_map) for block in tool_use_blocks ] results = await asyncio.gather(*tasks, return_exceptions=True) # 处理异常 formatted_results = [] for i, result in enumerate(results): if isinstance(result, Exception): formatted_results.append({ "tool_use_id": tool_use_blocks[i].get("id", ""), "error": str(result) }) else: formatted_results.append(result) return formatted_results @staticmethod def create_tool_result_message(tool_results: List[Dict[str, Any]]) -> Dict[str, Any]: """ 将工具执行结果转换为 Claude API 可用的消息格式 Args: tool_results: 工具执行结果列表 Returns: Claude API 消息格式 """ content = [] for result in tool_results: tool_use_id = result.get("tool_use_id", "") tool_result = result.get("result", {}) # 检查是否有错误 if "error" in tool_result: content.append({ "type": "tool_result", "tool_use_id": tool_use_id, "content": f"错误: {tool_result['error']}", "is_error": True }) else: # 成功结果 result_text = tool_result.get("result", "") content.append({ "type": "tool_result", "tool_use_id": tool_use_id, "content": result_text }) return { "role": "user", "content": content }