app_fastapi.py 37 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049
  1. """
  2. AI MCP Web UI - FastAPI 后端
  3. 提供聊天界面与 MCP 工具调用的桥梁
  4. """
  5. import os
  6. import asyncio
  7. import uuid
  8. import json as json_module
  9. from typing import Optional, Dict, List, Any
  10. from contextlib import asynccontextmanager
  11. from fastapi import FastAPI, Request, HTTPException, Header
  12. from fastapi.middleware.cors import CORSMiddleware
  13. from fastapi.responses import StreamingResponse, JSONResponse
  14. from fastapi.staticfiles import StaticFiles
  15. import httpx
  16. from anthropic import Anthropic
  17. from debug_logger import log_debug
  18. from config import MCP_SERVERS, ANTHROPIC_API_KEY, ANTHROPIC_BASE_URL, ANTHROPIC_MODEL
  19. from conversation_manager import ConversationManager
  20. from tool_handler import ToolCallHandler
  21. from tool_converter import ToolConverter
  22. from mcp_client import MCPClient
  23. # 存储认证会话 (生产环境应使用 Redis 或数据库)
  24. auth_sessions: Dict[str, dict] = {}
  25. def create_anthropic_client(api_key: str, base_url: str) -> Anthropic:
  26. """
  27. 创建 Anthropic 客户端,支持自定义认证格式
  28. 自定义 API 代理需要 'Authorization: Bearer <token>' 格式,
  29. 而不是 Anthropic SDK 默认的 'x-api-key' header。
  30. """
  31. # 创建自定义 httpx client,设置正确的 Authorization header
  32. http_client = httpx.Client(
  33. headers={"Authorization": f"Bearer {api_key}"},
  34. timeout=120.0
  35. )
  36. return Anthropic(base_url=base_url, http_client=http_client)
  37. # 初始化 Claude 客户端(使用自定义认证格式)
  38. client = create_anthropic_client(
  39. api_key=ANTHROPIC_API_KEY,
  40. base_url=ANTHROPIC_BASE_URL
  41. )
  42. @asynccontextmanager
  43. async def lifespan(app: FastAPI):
  44. """应用生命周期管理"""
  45. # 启动时执行
  46. print(f"FastAPI 应用启动 - 模型: {ANTHROPIC_MODEL}")
  47. print(f"MCP 服务器: {list(MCP_SERVERS.keys())}")
  48. yield
  49. # 关闭时执行
  50. print("FastAPI 应用关闭")
  51. # 创建 FastAPI 应用
  52. app = FastAPI(
  53. title="AI MCP Web UI Backend",
  54. description="AI MCP Web UI 后端服务 - 支持 Claude AI 和 MCP 工具调用",
  55. version="2.0.0",
  56. lifespan=lifespan
  57. )
  58. # CORS 配置
  59. app.add_middleware(
  60. CORSMiddleware,
  61. allow_origins=["*"],
  62. allow_credentials=True,
  63. allow_methods=["*"],
  64. allow_headers=["*"],
  65. )
  66. # 挂载静态文件 - 支持 frontend-v2 (Next.js 静态导出)
  67. frontend_v2_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "frontend-v2-static")
  68. frontend_v1_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "frontend")
  69. # 优先使用 frontend-v2,如果不存在则回退到 frontend
  70. frontend_path = frontend_v2_path if os.path.exists(frontend_v2_path) else frontend_v1_path
  71. # 挂载静态资源目录 (_next 等)
  72. app.mount("/_next", StaticFiles(directory=os.path.join(frontend_v2_path, "_next")), name="next_static")
  73. # ========== 根路由 ==========
  74. @app.get("/")
  75. async def index():
  76. """返回前端主页 (frontend-v2)"""
  77. from fastapi.responses import FileResponse
  78. index_path = os.path.join(frontend_path, "index.html")
  79. return FileResponse(index_path)
  80. @app.get("/auth")
  81. async def auth_page():
  82. """返回登录页面 (frontend-v2)"""
  83. from fastapi.responses import FileResponse
  84. auth_path = os.path.join(frontend_path, "auth.html")
  85. if os.path.exists(auth_path):
  86. return FileResponse(auth_path)
  87. # 回退到主页
  88. return await index()
  89. # ========== 健康检查 ==========
  90. @app.get("/api/health")
  91. async def health():
  92. """健康检查端点"""
  93. return {
  94. "status": "ok",
  95. "model": ANTHROPIC_MODEL,
  96. "mcp_servers": list(MCP_SERVERS.keys())
  97. }
  98. # ========== 聊天 API ==========
  99. @app.post("/api/chat")
  100. async def chat(request: Request):
  101. """
  102. 聊天端点 - 接收用户消息,返回 Claude 响应(支持 MCP 工具调用)
  103. 支持 MCP 认证:通过 X-MCP-Tokens header 传递 JWT tokens
  104. """
  105. try:
  106. data = await request.json()
  107. message = data.get('message', '')
  108. conversation_history = data.get('history', [])
  109. session_id = request.headers.get('X-Session-ID')
  110. mcp_tokens = request.headers.get('X-MCP-Tokens') # MCP tokens (JSON string)
  111. if not message:
  112. raise HTTPException(status_code=400, detail="Message is required")
  113. # 解析 MCP tokens
  114. parsed_tokens = {}
  115. if mcp_tokens:
  116. if isinstance(mcp_tokens, str):
  117. try:
  118. parsed_tokens = json_module.loads(mcp_tokens)
  119. except:
  120. parsed_tokens = {}
  121. else:
  122. parsed_tokens = mcp_tokens
  123. # DEBUG: 打印收到的 token
  124. print(f"[DEBUG /api/chat] Received mcp_tokens: {list(parsed_tokens.keys()) if parsed_tokens else 'None'}")
  125. for k, v in parsed_tokens.items():
  126. print(f"[DEBUG /api/chat] {k}: {v[:30] if v else 'None'}...")
  127. # 获取前端发送的组件列表(实现动态组件注册)
  128. available_components = data.get('availableComponents')
  129. enabled_mcp_list = data.get('enabledMcpList') # 前端发送的已启用 MCP 列表
  130. if available_components:
  131. print(f"[DEBUG /api/chat] Using dynamic components from frontend ({len(available_components)} chars)")
  132. # DEBUG: 打印已启用的 MCP 列表
  133. if enabled_mcp_list is not None:
  134. print(f"[DEBUG /api/chat] Enabled MCP list from frontend: {enabled_mcp_list}")
  135. else:
  136. print(f"[DEBUG /api/chat] No enabled MCP list from frontend, using all configured MCPs")
  137. # 创建对话管理器(带 token 和组件提示)
  138. conv_manager = ConversationManager(
  139. api_key=ANTHROPIC_API_KEY,
  140. base_url=ANTHROPIC_BASE_URL,
  141. model=ANTHROPIC_MODEL,
  142. session_id=session_id,
  143. mcp_tokens=parsed_tokens,
  144. components_prompt=available_components, # 动态组件提示
  145. enabled_mcp_list=enabled_mcp_list # 前端传递的已启用 MCP 列表
  146. )
  147. # 格式化对话历史
  148. formatted_history = ConversationManager.format_history_for_claude(conversation_history)
  149. # 执行多轮对话(自动处理工具调用)
  150. result = await conv_manager.chat(
  151. user_message=message,
  152. conversation_history=formatted_history,
  153. max_turns=5
  154. )
  155. # 提取响应文本
  156. response_text = result.get("response", "")
  157. tool_calls = result.get("tool_calls", [])
  158. return {
  159. "response": response_text,
  160. "model": ANTHROPIC_MODEL,
  161. "tool_calls": tool_calls,
  162. "has_tools": len(tool_calls) > 0
  163. }
  164. except HTTPException:
  165. raise
  166. except Exception as e:
  167. import traceback
  168. return JSONResponse(
  169. status_code=500,
  170. content={
  171. "error": str(e),
  172. "traceback": traceback.format_exc()
  173. }
  174. )
  175. async def generate_chat_stream(
  176. message: str,
  177. conversation_history: List[Dict[str, Any]],
  178. session_id: Optional[str],
  179. mcp_tokens: Optional[Dict[str, str]] = None,
  180. available_components: Optional[str] = None, # 新增:前端发送的组件列表
  181. enabled_mcp_list: Optional[List[str]] = None # 新增:前端发送的已启用 MCP 列表
  182. ):
  183. """生成 SSE 流式响应的异步生成器"""
  184. try:
  185. # 发送开始事件
  186. yield f"event: start\ndata: {json_module.dumps({'status': 'started'})}\n\n"
  187. # 解析 MCP tokens (从 JSON 字符串)
  188. parsed_tokens = {}
  189. if mcp_tokens:
  190. if isinstance(mcp_tokens, str):
  191. try:
  192. parsed_tokens = json_module.loads(mcp_tokens)
  193. except Exception as e:
  194. parsed_tokens = {}
  195. else:
  196. parsed_tokens = mcp_tokens
  197. # DEBUG: 打印解析后的 token
  198. print(f"[DEBUG generate_chat_stream] Parsed mcp_tokens keys: {list(parsed_tokens.keys())}")
  199. for k, v in parsed_tokens.items():
  200. print(f"[DEBUG generate_chat_stream] {k}: {v[:30] if v else 'None'}...")
  201. # DEBUG: 打印组件提示
  202. if available_components:
  203. print(f"[DEBUG generate_chat_stream] Using dynamic components from frontend ({len(available_components)} chars)")
  204. else:
  205. print(f"[DEBUG generate_chat_stream] Using default components")
  206. # DEBUG: 打印已启用的 MCP 列表
  207. if enabled_mcp_list is not None:
  208. print(f"[DEBUG generate_chat_stream] Enabled MCP list from frontend: {enabled_mcp_list}")
  209. else:
  210. print(f"[DEBUG generate_chat_stream] No enabled MCP list from frontend, using all configured MCPs")
  211. # 创建对话管理器(带 token 和组件提示)
  212. conv_manager = ConversationManager(
  213. api_key=ANTHROPIC_API_KEY,
  214. base_url=ANTHROPIC_BASE_URL,
  215. model=ANTHROPIC_MODEL,
  216. session_id=session_id,
  217. mcp_tokens=parsed_tokens,
  218. components_prompt=available_components, # 动态组件提示
  219. enabled_mcp_list=enabled_mcp_list # 前端传递的已启用 MCP 列表
  220. )
  221. # 格式化对话历史
  222. formatted_history = ConversationManager.format_history_for_claude(conversation_history)
  223. messages = formatted_history + [{"role": "user", "content": message}]
  224. current_messages = messages
  225. tool_calls_info = []
  226. for turn in range(5): # 最多 5 轮
  227. # 获取可用工具
  228. tools = await conv_manager.get_available_tools()
  229. # 发送工具列表
  230. yield f"event: tools\ndata: {json_module.dumps({'count': len(tools), 'tools': [t['name'] for t in tools[:5]]})}\n\n"
  231. # 调用 Claude API(流式)
  232. if tools:
  233. response_stream = conv_manager.client.messages.create(
  234. model=conv_manager.model,
  235. max_tokens=4096,
  236. system=conv_manager.system_prompt, # 使用动态系统提示
  237. messages=current_messages,
  238. tools=tools,
  239. stream=True
  240. )
  241. else:
  242. response_stream = conv_manager.client.messages.create(
  243. model=conv_manager.model,
  244. max_tokens=4096,
  245. system=conv_manager.system_prompt, # 使用动态系统提示
  246. messages=current_messages,
  247. stream=True
  248. )
  249. # 处理流式响应
  250. content_blocks = []
  251. tool_use_blocks = []
  252. response_text = ""
  253. current_block_type = None
  254. current_tool_index = -1
  255. partial_json = ""
  256. for event in response_stream:
  257. # 处理内容块开始 - 检查是否是工具调用
  258. if event.type == "content_block_start":
  259. # 检查块的类型
  260. if hasattr(event, "content_block"):
  261. current_block_type = getattr(event.content_block, "type", None)
  262. if current_block_type == "tool_use":
  263. # 这是工具调用块的开始
  264. tool_use_id = getattr(event.content_block, "id", "")
  265. # content_block 包含 name
  266. tool_name = getattr(event.content_block, "name", "")
  267. tool_use_blocks.append({
  268. "type": "tool_use",
  269. "id": tool_use_id,
  270. "name": tool_name,
  271. "input": {}
  272. })
  273. current_tool_index = len(tool_use_blocks) - 1
  274. partial_json = ""
  275. # 处理内容块增量
  276. elif event.type == "content_block_delta":
  277. delta_type = getattr(event.delta, "type", "")
  278. # 文本增量
  279. if delta_type == "text_delta":
  280. text = event.delta.text
  281. response_text += text
  282. yield f"event: token\ndata: {json_module.dumps({'text': text})}\n\n"
  283. # 工具名称增量
  284. elif delta_type == "tool_use_delta":
  285. # 获取工具名称和参数增量
  286. delta_name = getattr(event.delta, "name", None)
  287. delta_input = getattr(event.delta, "input", None)
  288. if current_tool_index >= 0 and current_tool_index < len(tool_use_blocks):
  289. if delta_name is not None:
  290. tool_use_blocks[current_tool_index]["name"] = delta_name
  291. if delta_input is not None:
  292. # 更新输入参数
  293. current_input = tool_use_blocks[current_tool_index]["input"]
  294. if isinstance(delta_input, dict):
  295. current_input.update(delta_input)
  296. tool_use_blocks[current_tool_index]["input"] = current_input
  297. # 工具参数增量 - input_json_delta
  298. elif delta_type == "input_json_delta":
  299. # 累积 partial_json 构建完整参数
  300. partial_json_str = getattr(event.delta, "partial_json", "")
  301. if partial_json_str:
  302. partial_json += partial_json_str
  303. try:
  304. # 尝试解析累积的 JSON
  305. parsed_input = json_module.loads(partial_json)
  306. if current_tool_index >= 0 and current_tool_index < len(tool_use_blocks):
  307. tool_use_blocks[current_tool_index]["input"] = parsed_input
  308. except json_module.JSONDecodeError:
  309. # JSON 还不完整,继续累积
  310. pass
  311. # 处理内容块停止
  312. elif event.type == "content_block_stop":
  313. current_block_type = None
  314. current_tool_index = -1
  315. partial_json = ""
  316. # 如果没有工具调用,发送完成事件
  317. if not tool_use_blocks:
  318. yield f"event: complete\ndata: {json_module.dumps({'response': response_text, 'tool_calls': tool_calls_info})}\n\n"
  319. return
  320. # 处理工具调用
  321. yield f"event: tools_start\ndata: {json_module.dumps({'count': len(tool_use_blocks)})}\n\n"
  322. # 为每个工具调用发送 tool_call 事件
  323. for tool_block in tool_use_blocks:
  324. yield f"event: tool_call\ndata: {json_module.dumps({'tool': tool_block['name'], 'args': tool_block['input'], 'tool_id': tool_block['id']})}\n\n"
  325. tool_results = await conv_manager.tool_handler.process_tool_use_blocks(
  326. tool_use_blocks,
  327. conv_manager._tool_to_server_map
  328. )
  329. for tr in tool_results:
  330. tool_name = tr.get("tool_name", "")
  331. tool_result = tr.get("result", {})
  332. tool_use_id = tr.get("tool_use_id", "")
  333. # 发送工具完成事件
  334. if "error" in tool_result:
  335. yield f"event: tool_error\ndata: {json_module.dumps({'tool': tool_name, 'tool_id': tool_use_id, 'error': tool_result['error']})}\n\n"
  336. else:
  337. result_data = tool_result.get('result', '')
  338. # 限制结果长度避免传输过大
  339. if isinstance(result_data, str) and len(result_data) > 500:
  340. result_data = result_data[:500] + '...'
  341. yield f"event: tool_done\ndata: {json_module.dumps({'tool': tool_name, 'tool_id': tool_use_id, 'result': result_data})}\n\n"
  342. tool_calls_info.append({
  343. "tool": tool_name,
  344. "result": tool_result
  345. })
  346. # 构建工具结果消息
  347. tool_result_message = ToolCallHandler.create_tool_result_message(
  348. tool_results
  349. )
  350. # 添加到消息历史
  351. current_messages.append({
  352. "role": "assistant",
  353. "content": content_blocks
  354. })
  355. current_messages.append(tool_result_message)
  356. # 达到最大轮数
  357. yield f"event: complete\ndata: {json_module.dumps({'response': response_text, 'tool_calls': tool_calls_info, 'warning': '达到最大对话轮数'})}\n\n"
  358. except Exception as e:
  359. import traceback
  360. yield f"event: error\ndata: {json_module.dumps({'error': str(e), 'traceback': traceback.format_exc()})}\n\n"
  361. @app.post("/api/chat/stream")
  362. async def chat_stream(request: Request):
  363. """
  364. 聊天端点 - 流式输出版本(解决超时问题)
  365. 使用 Server-Sent Events (SSE) 实时返回:
  366. 1. Claude 的思考过程
  367. 2. 工具调用状态
  368. 3. 最终响应
  369. 支持 MCP 认证:通过 X-MCP-Tokens header 传递 JWT tokens
  370. """
  371. try:
  372. data = await request.json()
  373. message = data.get('message', '')
  374. conversation_history = data.get('history', [])
  375. session_id = request.headers.get('X-Session-ID')
  376. mcp_tokens = request.headers.get('X-MCP-Tokens') # MCP tokens (JSON string)
  377. available_components = data.get('availableComponents') # 前端发送的组件列表
  378. enabled_mcp_list = data.get('enabledMcpList') # 前端发送的已启用 MCP 列表
  379. # 记录到 JSONL 文件
  380. log_debug("app_fastapi.api_chat_stream_received", {
  381. "message_preview": message[:50] if message else None,
  382. "enabled_mcp_list": enabled_mcp_list,
  383. "enabled_mcp_list_type": str(type(enabled_mcp_list))
  384. })
  385. # DEBUG: 打印收到的 token
  386. print(f"[DEBUG /api/chat/stream] mcp_tokens type: {type(mcp_tokens)}")
  387. print(f"[DEBUG /api/chat/stream] mcp_tokens value: {mcp_tokens[:150] if mcp_tokens else 'None'}...")
  388. print(f"[DEBUG /api/chat/stream] available_components: {len(available_components) if available_components else 0} chars")
  389. print(f"[DEBUG /api/chat/stream] enabled_mcp_list: {enabled_mcp_list}")
  390. if not message:
  391. raise HTTPException(status_code=400, detail="Message is required")
  392. return StreamingResponse(
  393. generate_chat_stream(message, conversation_history, session_id, mcp_tokens, available_components, enabled_mcp_list),
  394. media_type="text/event-stream",
  395. headers={
  396. 'Cache-Control': 'no-cache',
  397. 'X-Accel-Buffering': 'no' # 禁用 Nginx 缓冲
  398. }
  399. )
  400. except HTTPException:
  401. raise
  402. except Exception as e:
  403. import traceback
  404. return JSONResponse(
  405. status_code=500,
  406. content={
  407. "error": str(e),
  408. "traceback": traceback.format_exc()
  409. }
  410. )
  411. # ========== MCP API ==========
  412. @app.get("/api/mcp/servers")
  413. async def list_mcp_servers():
  414. """获取已配置的 MCP 服务器列表"""
  415. servers = []
  416. for name, server in MCP_SERVERS.items():
  417. servers.append({
  418. "id": name,
  419. "name": server.get("name", name),
  420. "url": server.get("url", ""),
  421. "auth_type": server.get("auth_type", "none"),
  422. "enabled": server.get("enabled", False)
  423. })
  424. return {"servers": servers}
  425. @app.get("/api/mcp/tools")
  426. async def list_mcp_tools(
  427. x_session_id: Optional[str] = Header(None, alias='X-Session-ID'),
  428. x_mcp_tokens: Optional[str] = Header(None, alias='X-MCP-Tokens')
  429. ):
  430. """获取可用的 MCP 工具列表(支持带 token 的认证)"""
  431. try:
  432. # 解析 MCP tokens
  433. parsed_tokens = {}
  434. if x_mcp_tokens:
  435. try:
  436. parsed_tokens = json_module.loads(x_mcp_tokens)
  437. except:
  438. parsed_tokens = {}
  439. # 使用带 token 的方法获取工具
  440. tools = await MCPClient.get_all_tools_with_tokens_async(
  441. session_id=x_session_id,
  442. mcp_tokens=parsed_tokens
  443. )
  444. claude_tools = ToolConverter.convert_mcp_tools(tools)
  445. return {
  446. "tools": claude_tools,
  447. "count": len(claude_tools)
  448. }
  449. except Exception as e:
  450. import traceback
  451. return JSONResponse(
  452. status_code=500,
  453. content={
  454. "error": str(e),
  455. "traceback": traceback.format_exc(),
  456. "tools": []
  457. }
  458. )
  459. @app.get("/api/mcp/health/{mcp_type}")
  460. async def check_mcp_health(mcp_type: str):
  461. """
  462. 检查 MCP 服务器健康状态
  463. 使用 HEAD 请求检查 MCP 服务器是否在线
  464. 返回健康状态和响应延迟
  465. """
  466. import time
  467. import urllib.parse
  468. try:
  469. # 查找 MCP 服务器配置
  470. target_server = MCP_SERVERS.get(mcp_type)
  471. if not target_server:
  472. return JSONResponse(
  473. status_code=404,
  474. content={"status": "error", "message": f"Unknown MCP type: {mcp_type}"}
  475. )
  476. # 获取 MCP URL
  477. mcp_url = target_server.get('url', '')
  478. if not mcp_url:
  479. return JSONResponse(
  480. status_code=400,
  481. content={"status": "error", "message": f"No URL configured for {mcp_type}"}
  482. )
  483. # 使用 HEAD 请求检查服务器是否在线
  484. start_time = time.time()
  485. try:
  486. async with httpx.AsyncClient(timeout=10.0) as http_client:
  487. # 使用 HEAD 请求检查服务器可达性
  488. response = await http_client.head(
  489. mcp_url,
  490. timeout=10.0
  491. )
  492. latency = int((time.time() - start_time) * 1000)
  493. # 只要能收到响应(无论什么状态码),说明服务器在线
  494. # MCP SSE 端点可能返回 405 (不允许 HEAD),但服务器仍健康
  495. return {
  496. "status": "healthy",
  497. "healthy": True,
  498. "mcp_type": mcp_type,
  499. "latency": latency,
  500. "url": mcp_url,
  501. "http_status": response.status_code
  502. }
  503. except httpx.TimeoutException:
  504. return {
  505. "status": "timeout",
  506. "healthy": False,
  507. "mcp_type": mcp_type,
  508. "error": "Connection timeout",
  509. "latency": 10000
  510. }
  511. except httpx.ConnectError as e:
  512. return {
  513. "status": "unreachable",
  514. "healthy": False,
  515. "mcp_type": mcp_type,
  516. "error": f"Connection error: {str(e)}",
  517. "latency": 0
  518. }
  519. except httpx.ConnectTimeout as e:
  520. return {
  521. "status": "timeout",
  522. "healthy": False,
  523. "mcp_type": mcp_type,
  524. "error": f"Connection timeout: {str(e)}",
  525. "latency": 10000
  526. }
  527. except Exception as e:
  528. import traceback
  529. return JSONResponse(
  530. status_code=500,
  531. content={
  532. "status": "error",
  533. "healthy": False,
  534. "mcp_type": mcp_type,
  535. "error": str(e),
  536. "traceback": traceback.format_exc()
  537. }
  538. )
  539. # ========== 认证 API ==========
  540. @app.post("/api/auth/login")
  541. async def login(request: Request):
  542. """
  543. Novel Platform 用户登录
  544. 代理到实际的登录端点并返回 JWT Token
  545. """
  546. try:
  547. data = await request.json()
  548. # 支持 email 和 username 两种参数名
  549. email = data.get('email') or data.get('username')
  550. password = data.get('password')
  551. if not email or not password:
  552. raise HTTPException(status_code=400, detail="Email and password are required")
  553. # 查找用户 MCP 服务器
  554. target_server = MCP_SERVERS.get('novel-platform-user')
  555. if not target_server:
  556. raise HTTPException(status_code=400, detail="Novel Platform User server not configured")
  557. # 构建登录 URL
  558. base_url = target_server.get('base_url', '')
  559. login_path = target_server.get('login_url', '/api/v1/auth/login')
  560. login_url = f"{base_url}{login_path}"
  561. # 调用实际的登录接口(异步版本)
  562. async with httpx.AsyncClient(timeout=30.0) as http_client:
  563. response = await http_client.post(
  564. login_url,
  565. json={"email": email, "password": password}
  566. )
  567. if response.status_code == 200:
  568. result = response.json()
  569. session_id = str(uuid.uuid4())
  570. # Novel Platform API 返回 access_token 和 user 对象
  571. access_token = result.get("access_token")
  572. user_info = result.get("user", {})
  573. # 获取用户角色
  574. user_role = user_info.get("role", "reader")
  575. # 存储会话信息
  576. auth_sessions[session_id] = {
  577. "username": user_info.get("username") or user_info.get("email", email),
  578. "email": email,
  579. "role": user_role,
  580. "token": access_token,
  581. "refresh_token": result.get("refresh_token"),
  582. "server": target_server.get("name")
  583. }
  584. return {
  585. "success": True,
  586. "session_id": session_id,
  587. "username": user_info.get("username") or user_info.get("email", email),
  588. "role": user_role,
  589. "server": target_server.get("name"),
  590. "token": access_token
  591. }
  592. else:
  593. raise HTTPException(
  594. status_code=response.status_code,
  595. detail=f"Login failed: {response.text}"
  596. )
  597. except HTTPException:
  598. raise
  599. except Exception as e:
  600. raise HTTPException(status_code=500, detail=str(e))
  601. @app.post("/api/auth/admin-login")
  602. async def admin_login(request: Request):
  603. """
  604. Novel Platform 管理员登录
  605. 代理到实际的管理员登录端点并返回 JWT Token
  606. """
  607. try:
  608. data = await request.json()
  609. # 支持 email 和 username 两种参数名
  610. email = data.get('email') or data.get('username')
  611. password = data.get('password')
  612. if not email or not password:
  613. raise HTTPException(status_code=400, detail="Email and password are required")
  614. # 查找管理员 MCP 服务器
  615. target_server = MCP_SERVERS.get('novel-platform-admin')
  616. if not target_server:
  617. raise HTTPException(status_code=400, detail="Admin server not configured")
  618. # 构建登录 URL
  619. base_url = target_server.get('base_url', '')
  620. login_path = target_server.get('login_url', '/api/v1/auth/admin-login')
  621. login_url = f"{base_url}{login_path}"
  622. # 调用实际的登录接口(异步版本)
  623. async with httpx.AsyncClient(timeout=30.0) as http_client:
  624. response = await http_client.post(
  625. login_url,
  626. json={"email": email, "password": password}
  627. )
  628. if response.status_code == 200:
  629. result = response.json()
  630. session_id = str(uuid.uuid4())
  631. # Novel Platform API 返回 access_token 和 user 对象
  632. access_token = result.get("access_token")
  633. user_info = result.get("user", {})
  634. auth_sessions[session_id] = {
  635. "username": user_info.get("username") or user_info.get("email", email),
  636. "email": email,
  637. "token": access_token,
  638. "refresh_token": result.get("refresh_token"),
  639. "server": target_server.get("name"),
  640. "role": "admin"
  641. }
  642. return {
  643. "success": True,
  644. "session_id": session_id,
  645. "username": user_info.get("username") or user_info.get("email", email),
  646. "server": target_server.get("name"),
  647. "role": "admin",
  648. "token": access_token
  649. }
  650. else:
  651. raise HTTPException(
  652. status_code=response.status_code,
  653. detail=f"Admin login failed: {response.text}"
  654. )
  655. except HTTPException:
  656. raise
  657. except Exception as e:
  658. raise HTTPException(status_code=500, detail=str(e))
  659. @app.post("/api/auth/register")
  660. async def register(request: Request):
  661. """
  662. Novel Platform 用户注册
  663. 代理到实际的注册端点
  664. """
  665. try:
  666. data = await request.json()
  667. email = data.get('email')
  668. username = data.get('username')
  669. password = data.get('password')
  670. if not email or not username or not password:
  671. raise HTTPException(status_code=400, detail="Email, username and password are required")
  672. # 查找用户 MCP 服务器
  673. target_server = MCP_SERVERS.get('novel-platform-user')
  674. if not target_server:
  675. # 如果没有专门的用户服务器,尝试找到任何需要 JWT 认证的服务器
  676. for server_id, config in MCP_SERVERS.items():
  677. if config.get('auth_type') == 'jwt' and 'base_url' in config:
  678. if 'user' in server_id:
  679. target_server = config
  680. break
  681. elif target_server is None:
  682. target_server = config
  683. if not target_server:
  684. raise HTTPException(status_code=400, detail="No JWT-authenticated server configured")
  685. # 构建注册 URL
  686. base_url = target_server.get('base_url', '')
  687. register_url = f"{base_url}/api/v1/auth/register"
  688. # 调用实际的注册接口(异步版本)
  689. async with httpx.AsyncClient(timeout=30.0) as http_client:
  690. response = await http_client.post(
  691. register_url,
  692. json={"email": email, "username": username, "password": password}
  693. )
  694. if response.status_code in (200, 201):
  695. result = response.json()
  696. # Novel Platform API 返回用户对象
  697. return {
  698. "success": True,
  699. "message": "注册成功",
  700. "user": {
  701. "id": result.get("id"),
  702. "email": result.get("email"),
  703. "username": result.get("username"),
  704. "role": result.get("role")
  705. }
  706. }
  707. else:
  708. # 尝试解析错误响应
  709. try:
  710. error_detail = response.json()
  711. error_msg = error_detail.get("detail", response.text)
  712. except:
  713. error_msg = response.text
  714. raise HTTPException(
  715. status_code=response.status_code,
  716. detail=error_msg
  717. )
  718. except HTTPException:
  719. raise
  720. except Exception as e:
  721. raise HTTPException(status_code=500, detail=str(e))
  722. @app.post("/api/auth/logout")
  723. async def logout(request: Request):
  724. """登出并清除会话"""
  725. try:
  726. data = await request.json()
  727. session_id = data.get('session_id')
  728. if session_id and session_id in auth_sessions:
  729. del auth_sessions[session_id]
  730. return {"success": True}
  731. except Exception as e:
  732. raise HTTPException(status_code=500, detail=str(e))
  733. @app.get("/api/auth/status")
  734. async def auth_status(x_session_id: Optional[str] = Header(None, alias='X-Session-ID')):
  735. """检查认证状态"""
  736. if x_session_id and x_session_id in auth_sessions:
  737. session = auth_sessions[x_session_id]
  738. return {
  739. "authenticated": True,
  740. "username": session.get("username"),
  741. "server": session.get("server"),
  742. "role": session.get("role", "user")
  743. }
  744. return {"authenticated": False}
  745. # ========== 测试 API ==========
  746. @app.get("/api/test-mcp")
  747. async def test_mcp_get(
  748. tool_name: str = "get_system_stats",
  749. server_id: str = "novel-platform-admin",
  750. auth_token: Optional[str] = None
  751. ):
  752. """
  753. GET 方式测试 MCP 工具调用(用于 curl 测试)
  754. 参数:
  755. - tool_name: 工具名称 (默认: get_system_stats)
  756. - server_id: 服务器 ID (默认: novel-platform-admin)
  757. - auth_token: JWT token (通过 query 参数或 header 传递)
  758. """
  759. try:
  760. # 如果 query 参数没有 token,尝试从 header 获取
  761. if not auth_token:
  762. # 这个处理会在实际请求时通过 FastAPI 的 Header 参数处理
  763. pass
  764. print("\n" + "="*60)
  765. print("[TEST-MCP GET] MCP 工具调用测试")
  766. print("="*60)
  767. print(f"[TEST-MCP GET] server_id: {server_id}")
  768. print(f"[TEST-MCP GET] tool_name: {tool_name}")
  769. print(f"[TEST-MCP GET] auth_token present: {bool(auth_token)}")
  770. if auth_token:
  771. print(f"[TEST-MCP GET] auth_token (前50字符): {auth_token[:50]}...")
  772. print("="*60 + "\n")
  773. # 创建 MCP 客户端
  774. client = MCPClient(
  775. server_id=server_id,
  776. session_id="test-session",
  777. auth_token=auth_token
  778. )
  779. # 调用工具(无参数)
  780. print(f"[TEST-MCP GET] 开始调用工具...")
  781. result = await client.call_tool(tool_name, {})
  782. print(f"[TEST-MCP GET] 调用完成")
  783. print(f"[TEST-MCP GET] success: {result.get('success', False)}")
  784. print("="*60 + "\n")
  785. return {
  786. "success": True,
  787. "server_id": server_id,
  788. "tool_name": tool_name,
  789. "result": result
  790. }
  791. except Exception as e:
  792. import traceback
  793. print(f"[TEST-MCP GET] 异常: {e}")
  794. traceback.print_exc()
  795. return JSONResponse(
  796. status_code=500,
  797. content={
  798. "success": False,
  799. "error": str(e),
  800. "traceback": traceback.format_exc()
  801. }
  802. )
  803. @app.post("/api/test-mcp")
  804. async def test_mcp_call(request: Request):
  805. """
  806. 直接测试 MCP 工具调用(绕过 CORS,用于调试)
  807. 请求体:
  808. {
  809. "server_id": "novel-platform-admin", // 可选,默认使用 admin
  810. "tool_name": "get_system_stats", // 工具名称
  811. "arguments": {}, // 工具参数
  812. "auth_token": "jwt-token" // JWT 认证 token
  813. }
  814. """
  815. try:
  816. data = await request.json()
  817. server_id = data.get('server_id', 'novel-platform-admin')
  818. tool_name = data.get('tool_name', '')
  819. arguments = data.get('arguments', {})
  820. auth_token = data.get('auth_token')
  821. print("\n" + "="*60)
  822. print("[TEST-MCP] MCP 工具调用测试")
  823. print("="*60)
  824. print(f"[TEST-MCP] server_id: {server_id}")
  825. print(f"[TEST-MCP] tool_name: {tool_name}")
  826. print(f"[TEST-MCP] arguments: {arguments}")
  827. print(f"[TEST-MCP] auth_token present: {bool(auth_token)}")
  828. if auth_token:
  829. print(f"[TEST-MCP] auth_token (前50字符): {auth_token[:50]}...")
  830. print("="*60 + "\n")
  831. if not tool_name:
  832. raise HTTPException(status_code=400, detail="tool_name is required")
  833. # 创建 MCP 客户端
  834. client = MCPClient(
  835. server_id=server_id,
  836. session_id="test-session",
  837. auth_token=auth_token
  838. )
  839. # 调用工具
  840. print(f"[TEST-MCP] 开始调用工具...")
  841. result = await client.call_tool(tool_name, arguments)
  842. print(f"[TEST-MCP] 调用结果:")
  843. print(f"[TEST-MCP] success: {result.get('success', False)}")
  844. print(f"[TEST-MCP] has_error: {'error' in result}")
  845. if 'error' in result:
  846. print(f"[TEST-MCP] error: {result['error']}")
  847. else:
  848. result_preview = result.get('result', '')[:100]
  849. print(f"[TEST-MCP] result (预览): {result_preview}...")
  850. print("="*60 + "\n")
  851. return {
  852. "success": True,
  853. "server_id": server_id,
  854. "tool_name": tool_name,
  855. "arguments": arguments,
  856. "result": result,
  857. "debug": {
  858. "auth_token_present": bool(auth_token),
  859. "auth_token_length": len(auth_token) if auth_token else 0
  860. }
  861. }
  862. except HTTPException:
  863. raise
  864. except Exception as e:
  865. import traceback
  866. print(f"[TEST-MCP] 异常: {e}")
  867. traceback.print_exc()
  868. return JSONResponse(
  869. status_code=500,
  870. content={
  871. "success": False,
  872. "error": str(e),
  873. "traceback": traceback.format_exc()
  874. }
  875. )
  876. # ========== 主程序入口 ==========
  877. if __name__ == '__main__':
  878. import uvicorn
  879. port = int(os.getenv('PORT', 8081)) # 改为 8081,Next.js 使用 8080
  880. debug = os.getenv('DEBUG', 'False').lower() == 'true'
  881. uvicorn.run(
  882. "app_fastapi:app",
  883. host='0.0.0.0',
  884. port=port,
  885. reload=debug
  886. )