| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049 |
- """
- AI MCP Web UI - FastAPI 后端
- 提供聊天界面与 MCP 工具调用的桥梁
- """
- import os
- import asyncio
- import uuid
- import json as json_module
- from typing import Optional, Dict, List, Any
- from contextlib import asynccontextmanager
- from fastapi import FastAPI, Request, HTTPException, Header
- from fastapi.middleware.cors import CORSMiddleware
- from fastapi.responses import StreamingResponse, JSONResponse
- from fastapi.staticfiles import StaticFiles
- import httpx
- from anthropic import Anthropic
- from debug_logger import log_debug
- from config import MCP_SERVERS, ANTHROPIC_API_KEY, ANTHROPIC_BASE_URL, ANTHROPIC_MODEL
- from conversation_manager import ConversationManager
- from tool_handler import ToolCallHandler
- from tool_converter import ToolConverter
- from mcp_client import MCPClient
- # 存储认证会话 (生产环境应使用 Redis 或数据库)
- auth_sessions: Dict[str, dict] = {}
- def create_anthropic_client(api_key: str, base_url: str) -> Anthropic:
- """
- 创建 Anthropic 客户端,支持自定义认证格式
- 自定义 API 代理需要 'Authorization: Bearer <token>' 格式,
- 而不是 Anthropic SDK 默认的 'x-api-key' header。
- """
- # 创建自定义 httpx client,设置正确的 Authorization header
- http_client = httpx.Client(
- headers={"Authorization": f"Bearer {api_key}"},
- timeout=120.0
- )
- return Anthropic(base_url=base_url, http_client=http_client)
- # 初始化 Claude 客户端(使用自定义认证格式)
- client = create_anthropic_client(
- api_key=ANTHROPIC_API_KEY,
- base_url=ANTHROPIC_BASE_URL
- )
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- """应用生命周期管理"""
- # 启动时执行
- print(f"FastAPI 应用启动 - 模型: {ANTHROPIC_MODEL}")
- print(f"MCP 服务器: {list(MCP_SERVERS.keys())}")
- yield
- # 关闭时执行
- print("FastAPI 应用关闭")
- # 创建 FastAPI 应用
- app = FastAPI(
- title="AI MCP Web UI Backend",
- description="AI MCP Web UI 后端服务 - 支持 Claude AI 和 MCP 工具调用",
- version="2.0.0",
- lifespan=lifespan
- )
- # CORS 配置
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"],
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- # 挂载静态文件 - 支持 frontend-v2 (Next.js 静态导出)
- frontend_v2_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "frontend-v2-static")
- frontend_v1_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "frontend")
- # 优先使用 frontend-v2,如果不存在则回退到 frontend
- frontend_path = frontend_v2_path if os.path.exists(frontend_v2_path) else frontend_v1_path
- # 挂载静态资源目录 (_next 等)
- app.mount("/_next", StaticFiles(directory=os.path.join(frontend_v2_path, "_next")), name="next_static")
- # ========== 根路由 ==========
- @app.get("/")
- async def index():
- """返回前端主页 (frontend-v2)"""
- from fastapi.responses import FileResponse
- index_path = os.path.join(frontend_path, "index.html")
- return FileResponse(index_path)
- @app.get("/auth")
- async def auth_page():
- """返回登录页面 (frontend-v2)"""
- from fastapi.responses import FileResponse
- auth_path = os.path.join(frontend_path, "auth.html")
- if os.path.exists(auth_path):
- return FileResponse(auth_path)
- # 回退到主页
- return await index()
- # ========== 健康检查 ==========
- @app.get("/api/health")
- async def health():
- """健康检查端点"""
- return {
- "status": "ok",
- "model": ANTHROPIC_MODEL,
- "mcp_servers": list(MCP_SERVERS.keys())
- }
- # ========== 聊天 API ==========
- @app.post("/api/chat")
- async def chat(request: Request):
- """
- 聊天端点 - 接收用户消息,返回 Claude 响应(支持 MCP 工具调用)
- 支持 MCP 认证:通过 X-MCP-Tokens header 传递 JWT tokens
- """
- try:
- data = await request.json()
- message = data.get('message', '')
- conversation_history = data.get('history', [])
- session_id = request.headers.get('X-Session-ID')
- mcp_tokens = request.headers.get('X-MCP-Tokens') # MCP tokens (JSON string)
- if not message:
- raise HTTPException(status_code=400, detail="Message is required")
- # 解析 MCP tokens
- parsed_tokens = {}
- if mcp_tokens:
- if isinstance(mcp_tokens, str):
- try:
- parsed_tokens = json_module.loads(mcp_tokens)
- except:
- parsed_tokens = {}
- else:
- parsed_tokens = mcp_tokens
- # DEBUG: 打印收到的 token
- print(f"[DEBUG /api/chat] Received mcp_tokens: {list(parsed_tokens.keys()) if parsed_tokens else 'None'}")
- for k, v in parsed_tokens.items():
- print(f"[DEBUG /api/chat] {k}: {v[:30] if v else 'None'}...")
- # 获取前端发送的组件列表(实现动态组件注册)
- available_components = data.get('availableComponents')
- enabled_mcp_list = data.get('enabledMcpList') # 前端发送的已启用 MCP 列表
- if available_components:
- print(f"[DEBUG /api/chat] Using dynamic components from frontend ({len(available_components)} chars)")
- # DEBUG: 打印已启用的 MCP 列表
- if enabled_mcp_list is not None:
- print(f"[DEBUG /api/chat] Enabled MCP list from frontend: {enabled_mcp_list}")
- else:
- print(f"[DEBUG /api/chat] No enabled MCP list from frontend, using all configured MCPs")
- # 创建对话管理器(带 token 和组件提示)
- conv_manager = ConversationManager(
- api_key=ANTHROPIC_API_KEY,
- base_url=ANTHROPIC_BASE_URL,
- model=ANTHROPIC_MODEL,
- session_id=session_id,
- mcp_tokens=parsed_tokens,
- components_prompt=available_components, # 动态组件提示
- enabled_mcp_list=enabled_mcp_list # 前端传递的已启用 MCP 列表
- )
- # 格式化对话历史
- formatted_history = ConversationManager.format_history_for_claude(conversation_history)
- # 执行多轮对话(自动处理工具调用)
- result = await conv_manager.chat(
- user_message=message,
- conversation_history=formatted_history,
- max_turns=5
- )
- # 提取响应文本
- response_text = result.get("response", "")
- tool_calls = result.get("tool_calls", [])
- return {
- "response": response_text,
- "model": ANTHROPIC_MODEL,
- "tool_calls": tool_calls,
- "has_tools": len(tool_calls) > 0
- }
- except HTTPException:
- raise
- except Exception as e:
- import traceback
- return JSONResponse(
- status_code=500,
- content={
- "error": str(e),
- "traceback": traceback.format_exc()
- }
- )
- async def generate_chat_stream(
- message: str,
- conversation_history: List[Dict[str, Any]],
- session_id: Optional[str],
- mcp_tokens: Optional[Dict[str, str]] = None,
- available_components: Optional[str] = None, # 新增:前端发送的组件列表
- enabled_mcp_list: Optional[List[str]] = None # 新增:前端发送的已启用 MCP 列表
- ):
- """生成 SSE 流式响应的异步生成器"""
- try:
- # 发送开始事件
- yield f"event: start\ndata: {json_module.dumps({'status': 'started'})}\n\n"
- # 解析 MCP tokens (从 JSON 字符串)
- parsed_tokens = {}
- if mcp_tokens:
- if isinstance(mcp_tokens, str):
- try:
- parsed_tokens = json_module.loads(mcp_tokens)
- except Exception as e:
- parsed_tokens = {}
- else:
- parsed_tokens = mcp_tokens
- # DEBUG: 打印解析后的 token
- print(f"[DEBUG generate_chat_stream] Parsed mcp_tokens keys: {list(parsed_tokens.keys())}")
- for k, v in parsed_tokens.items():
- print(f"[DEBUG generate_chat_stream] {k}: {v[:30] if v else 'None'}...")
- # DEBUG: 打印组件提示
- if available_components:
- print(f"[DEBUG generate_chat_stream] Using dynamic components from frontend ({len(available_components)} chars)")
- else:
- print(f"[DEBUG generate_chat_stream] Using default components")
- # DEBUG: 打印已启用的 MCP 列表
- if enabled_mcp_list is not None:
- print(f"[DEBUG generate_chat_stream] Enabled MCP list from frontend: {enabled_mcp_list}")
- else:
- print(f"[DEBUG generate_chat_stream] No enabled MCP list from frontend, using all configured MCPs")
- # 创建对话管理器(带 token 和组件提示)
- conv_manager = ConversationManager(
- api_key=ANTHROPIC_API_KEY,
- base_url=ANTHROPIC_BASE_URL,
- model=ANTHROPIC_MODEL,
- session_id=session_id,
- mcp_tokens=parsed_tokens,
- components_prompt=available_components, # 动态组件提示
- enabled_mcp_list=enabled_mcp_list # 前端传递的已启用 MCP 列表
- )
- # 格式化对话历史
- formatted_history = ConversationManager.format_history_for_claude(conversation_history)
- messages = formatted_history + [{"role": "user", "content": message}]
- current_messages = messages
- tool_calls_info = []
- for turn in range(5): # 最多 5 轮
- # 获取可用工具
- tools = await conv_manager.get_available_tools()
- # 发送工具列表
- yield f"event: tools\ndata: {json_module.dumps({'count': len(tools), 'tools': [t['name'] for t in tools[:5]]})}\n\n"
- # 调用 Claude API(流式)
- if tools:
- response_stream = conv_manager.client.messages.create(
- model=conv_manager.model,
- max_tokens=4096,
- system=conv_manager.system_prompt, # 使用动态系统提示
- messages=current_messages,
- tools=tools,
- stream=True
- )
- else:
- response_stream = conv_manager.client.messages.create(
- model=conv_manager.model,
- max_tokens=4096,
- system=conv_manager.system_prompt, # 使用动态系统提示
- messages=current_messages,
- stream=True
- )
- # 处理流式响应
- content_blocks = []
- tool_use_blocks = []
- response_text = ""
- current_block_type = None
- current_tool_index = -1
- partial_json = ""
- for event in response_stream:
- # 处理内容块开始 - 检查是否是工具调用
- if event.type == "content_block_start":
- # 检查块的类型
- if hasattr(event, "content_block"):
- current_block_type = getattr(event.content_block, "type", None)
- if current_block_type == "tool_use":
- # 这是工具调用块的开始
- tool_use_id = getattr(event.content_block, "id", "")
- # content_block 包含 name
- tool_name = getattr(event.content_block, "name", "")
- tool_use_blocks.append({
- "type": "tool_use",
- "id": tool_use_id,
- "name": tool_name,
- "input": {}
- })
- current_tool_index = len(tool_use_blocks) - 1
- partial_json = ""
- # 处理内容块增量
- elif event.type == "content_block_delta":
- delta_type = getattr(event.delta, "type", "")
- # 文本增量
- if delta_type == "text_delta":
- text = event.delta.text
- response_text += text
- yield f"event: token\ndata: {json_module.dumps({'text': text})}\n\n"
- # 工具名称增量
- elif delta_type == "tool_use_delta":
- # 获取工具名称和参数增量
- delta_name = getattr(event.delta, "name", None)
- delta_input = getattr(event.delta, "input", None)
- if current_tool_index >= 0 and current_tool_index < len(tool_use_blocks):
- if delta_name is not None:
- tool_use_blocks[current_tool_index]["name"] = delta_name
- if delta_input is not None:
- # 更新输入参数
- current_input = tool_use_blocks[current_tool_index]["input"]
- if isinstance(delta_input, dict):
- current_input.update(delta_input)
- tool_use_blocks[current_tool_index]["input"] = current_input
- # 工具参数增量 - input_json_delta
- elif delta_type == "input_json_delta":
- # 累积 partial_json 构建完整参数
- partial_json_str = getattr(event.delta, "partial_json", "")
- if partial_json_str:
- partial_json += partial_json_str
- try:
- # 尝试解析累积的 JSON
- parsed_input = json_module.loads(partial_json)
- if current_tool_index >= 0 and current_tool_index < len(tool_use_blocks):
- tool_use_blocks[current_tool_index]["input"] = parsed_input
- except json_module.JSONDecodeError:
- # JSON 还不完整,继续累积
- pass
- # 处理内容块停止
- elif event.type == "content_block_stop":
- current_block_type = None
- current_tool_index = -1
- partial_json = ""
- # 如果没有工具调用,发送完成事件
- if not tool_use_blocks:
- yield f"event: complete\ndata: {json_module.dumps({'response': response_text, 'tool_calls': tool_calls_info})}\n\n"
- return
- # 处理工具调用
- yield f"event: tools_start\ndata: {json_module.dumps({'count': len(tool_use_blocks)})}\n\n"
- # 为每个工具调用发送 tool_call 事件
- for tool_block in tool_use_blocks:
- yield f"event: tool_call\ndata: {json_module.dumps({'tool': tool_block['name'], 'args': tool_block['input'], 'tool_id': tool_block['id']})}\n\n"
- tool_results = await conv_manager.tool_handler.process_tool_use_blocks(
- tool_use_blocks,
- conv_manager._tool_to_server_map
- )
- for tr in tool_results:
- tool_name = tr.get("tool_name", "")
- tool_result = tr.get("result", {})
- tool_use_id = tr.get("tool_use_id", "")
- # 发送工具完成事件
- if "error" in tool_result:
- yield f"event: tool_error\ndata: {json_module.dumps({'tool': tool_name, 'tool_id': tool_use_id, 'error': tool_result['error']})}\n\n"
- else:
- result_data = tool_result.get('result', '')
- # 限制结果长度避免传输过大
- if isinstance(result_data, str) and len(result_data) > 500:
- result_data = result_data[:500] + '...'
- yield f"event: tool_done\ndata: {json_module.dumps({'tool': tool_name, 'tool_id': tool_use_id, 'result': result_data})}\n\n"
- tool_calls_info.append({
- "tool": tool_name,
- "result": tool_result
- })
- # 构建工具结果消息
- tool_result_message = ToolCallHandler.create_tool_result_message(
- tool_results
- )
- # 添加到消息历史
- current_messages.append({
- "role": "assistant",
- "content": content_blocks
- })
- current_messages.append(tool_result_message)
- # 达到最大轮数
- yield f"event: complete\ndata: {json_module.dumps({'response': response_text, 'tool_calls': tool_calls_info, 'warning': '达到最大对话轮数'})}\n\n"
- except Exception as e:
- import traceback
- yield f"event: error\ndata: {json_module.dumps({'error': str(e), 'traceback': traceback.format_exc()})}\n\n"
- @app.post("/api/chat/stream")
- async def chat_stream(request: Request):
- """
- 聊天端点 - 流式输出版本(解决超时问题)
- 使用 Server-Sent Events (SSE) 实时返回:
- 1. Claude 的思考过程
- 2. 工具调用状态
- 3. 最终响应
- 支持 MCP 认证:通过 X-MCP-Tokens header 传递 JWT tokens
- """
- try:
- data = await request.json()
- message = data.get('message', '')
- conversation_history = data.get('history', [])
- session_id = request.headers.get('X-Session-ID')
- mcp_tokens = request.headers.get('X-MCP-Tokens') # MCP tokens (JSON string)
- available_components = data.get('availableComponents') # 前端发送的组件列表
- enabled_mcp_list = data.get('enabledMcpList') # 前端发送的已启用 MCP 列表
- # 记录到 JSONL 文件
- log_debug("app_fastapi.api_chat_stream_received", {
- "message_preview": message[:50] if message else None,
- "enabled_mcp_list": enabled_mcp_list,
- "enabled_mcp_list_type": str(type(enabled_mcp_list))
- })
- # DEBUG: 打印收到的 token
- print(f"[DEBUG /api/chat/stream] mcp_tokens type: {type(mcp_tokens)}")
- print(f"[DEBUG /api/chat/stream] mcp_tokens value: {mcp_tokens[:150] if mcp_tokens else 'None'}...")
- print(f"[DEBUG /api/chat/stream] available_components: {len(available_components) if available_components else 0} chars")
- print(f"[DEBUG /api/chat/stream] enabled_mcp_list: {enabled_mcp_list}")
- if not message:
- raise HTTPException(status_code=400, detail="Message is required")
- return StreamingResponse(
- generate_chat_stream(message, conversation_history, session_id, mcp_tokens, available_components, enabled_mcp_list),
- media_type="text/event-stream",
- headers={
- 'Cache-Control': 'no-cache',
- 'X-Accel-Buffering': 'no' # 禁用 Nginx 缓冲
- }
- )
- except HTTPException:
- raise
- except Exception as e:
- import traceback
- return JSONResponse(
- status_code=500,
- content={
- "error": str(e),
- "traceback": traceback.format_exc()
- }
- )
- # ========== MCP API ==========
- @app.get("/api/mcp/servers")
- async def list_mcp_servers():
- """获取已配置的 MCP 服务器列表"""
- servers = []
- for name, server in MCP_SERVERS.items():
- servers.append({
- "id": name,
- "name": server.get("name", name),
- "url": server.get("url", ""),
- "auth_type": server.get("auth_type", "none"),
- "enabled": server.get("enabled", False)
- })
- return {"servers": servers}
- @app.get("/api/mcp/tools")
- async def list_mcp_tools(
- x_session_id: Optional[str] = Header(None, alias='X-Session-ID'),
- x_mcp_tokens: Optional[str] = Header(None, alias='X-MCP-Tokens')
- ):
- """获取可用的 MCP 工具列表(支持带 token 的认证)"""
- try:
- # 解析 MCP tokens
- parsed_tokens = {}
- if x_mcp_tokens:
- try:
- parsed_tokens = json_module.loads(x_mcp_tokens)
- except:
- parsed_tokens = {}
- # 使用带 token 的方法获取工具
- tools = await MCPClient.get_all_tools_with_tokens_async(
- session_id=x_session_id,
- mcp_tokens=parsed_tokens
- )
- claude_tools = ToolConverter.convert_mcp_tools(tools)
- return {
- "tools": claude_tools,
- "count": len(claude_tools)
- }
- except Exception as e:
- import traceback
- return JSONResponse(
- status_code=500,
- content={
- "error": str(e),
- "traceback": traceback.format_exc(),
- "tools": []
- }
- )
- @app.get("/api/mcp/health/{mcp_type}")
- async def check_mcp_health(mcp_type: str):
- """
- 检查 MCP 服务器健康状态
- 使用 HEAD 请求检查 MCP 服务器是否在线
- 返回健康状态和响应延迟
- """
- import time
- import urllib.parse
- try:
- # 查找 MCP 服务器配置
- target_server = MCP_SERVERS.get(mcp_type)
- if not target_server:
- return JSONResponse(
- status_code=404,
- content={"status": "error", "message": f"Unknown MCP type: {mcp_type}"}
- )
- # 获取 MCP URL
- mcp_url = target_server.get('url', '')
- if not mcp_url:
- return JSONResponse(
- status_code=400,
- content={"status": "error", "message": f"No URL configured for {mcp_type}"}
- )
- # 使用 HEAD 请求检查服务器是否在线
- start_time = time.time()
- try:
- async with httpx.AsyncClient(timeout=10.0) as http_client:
- # 使用 HEAD 请求检查服务器可达性
- response = await http_client.head(
- mcp_url,
- timeout=10.0
- )
- latency = int((time.time() - start_time) * 1000)
- # 只要能收到响应(无论什么状态码),说明服务器在线
- # MCP SSE 端点可能返回 405 (不允许 HEAD),但服务器仍健康
- return {
- "status": "healthy",
- "healthy": True,
- "mcp_type": mcp_type,
- "latency": latency,
- "url": mcp_url,
- "http_status": response.status_code
- }
- except httpx.TimeoutException:
- return {
- "status": "timeout",
- "healthy": False,
- "mcp_type": mcp_type,
- "error": "Connection timeout",
- "latency": 10000
- }
- except httpx.ConnectError as e:
- return {
- "status": "unreachable",
- "healthy": False,
- "mcp_type": mcp_type,
- "error": f"Connection error: {str(e)}",
- "latency": 0
- }
- except httpx.ConnectTimeout as e:
- return {
- "status": "timeout",
- "healthy": False,
- "mcp_type": mcp_type,
- "error": f"Connection timeout: {str(e)}",
- "latency": 10000
- }
- except Exception as e:
- import traceback
- return JSONResponse(
- status_code=500,
- content={
- "status": "error",
- "healthy": False,
- "mcp_type": mcp_type,
- "error": str(e),
- "traceback": traceback.format_exc()
- }
- )
- # ========== 认证 API ==========
- @app.post("/api/auth/login")
- async def login(request: Request):
- """
- Novel Platform 用户登录
- 代理到实际的登录端点并返回 JWT Token
- """
- try:
- data = await request.json()
- # 支持 email 和 username 两种参数名
- email = data.get('email') or data.get('username')
- password = data.get('password')
- if not email or not password:
- raise HTTPException(status_code=400, detail="Email and password are required")
- # 查找用户 MCP 服务器
- target_server = MCP_SERVERS.get('novel-platform-user')
- if not target_server:
- raise HTTPException(status_code=400, detail="Novel Platform User server not configured")
- # 构建登录 URL
- base_url = target_server.get('base_url', '')
- login_path = target_server.get('login_url', '/api/v1/auth/login')
- login_url = f"{base_url}{login_path}"
- # 调用实际的登录接口(异步版本)
- async with httpx.AsyncClient(timeout=30.0) as http_client:
- response = await http_client.post(
- login_url,
- json={"email": email, "password": password}
- )
- if response.status_code == 200:
- result = response.json()
- session_id = str(uuid.uuid4())
- # Novel Platform API 返回 access_token 和 user 对象
- access_token = result.get("access_token")
- user_info = result.get("user", {})
- # 获取用户角色
- user_role = user_info.get("role", "reader")
- # 存储会话信息
- auth_sessions[session_id] = {
- "username": user_info.get("username") or user_info.get("email", email),
- "email": email,
- "role": user_role,
- "token": access_token,
- "refresh_token": result.get("refresh_token"),
- "server": target_server.get("name")
- }
- return {
- "success": True,
- "session_id": session_id,
- "username": user_info.get("username") or user_info.get("email", email),
- "role": user_role,
- "server": target_server.get("name"),
- "token": access_token
- }
- else:
- raise HTTPException(
- status_code=response.status_code,
- detail=f"Login failed: {response.text}"
- )
- except HTTPException:
- raise
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/auth/admin-login")
- async def admin_login(request: Request):
- """
- Novel Platform 管理员登录
- 代理到实际的管理员登录端点并返回 JWT Token
- """
- try:
- data = await request.json()
- # 支持 email 和 username 两种参数名
- email = data.get('email') or data.get('username')
- password = data.get('password')
- if not email or not password:
- raise HTTPException(status_code=400, detail="Email and password are required")
- # 查找管理员 MCP 服务器
- target_server = MCP_SERVERS.get('novel-platform-admin')
- if not target_server:
- raise HTTPException(status_code=400, detail="Admin server not configured")
- # 构建登录 URL
- base_url = target_server.get('base_url', '')
- login_path = target_server.get('login_url', '/api/v1/auth/admin-login')
- login_url = f"{base_url}{login_path}"
- # 调用实际的登录接口(异步版本)
- async with httpx.AsyncClient(timeout=30.0) as http_client:
- response = await http_client.post(
- login_url,
- json={"email": email, "password": password}
- )
- if response.status_code == 200:
- result = response.json()
- session_id = str(uuid.uuid4())
- # Novel Platform API 返回 access_token 和 user 对象
- access_token = result.get("access_token")
- user_info = result.get("user", {})
- auth_sessions[session_id] = {
- "username": user_info.get("username") or user_info.get("email", email),
- "email": email,
- "token": access_token,
- "refresh_token": result.get("refresh_token"),
- "server": target_server.get("name"),
- "role": "admin"
- }
- return {
- "success": True,
- "session_id": session_id,
- "username": user_info.get("username") or user_info.get("email", email),
- "server": target_server.get("name"),
- "role": "admin",
- "token": access_token
- }
- else:
- raise HTTPException(
- status_code=response.status_code,
- detail=f"Admin login failed: {response.text}"
- )
- except HTTPException:
- raise
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/auth/register")
- async def register(request: Request):
- """
- Novel Platform 用户注册
- 代理到实际的注册端点
- """
- try:
- data = await request.json()
- email = data.get('email')
- username = data.get('username')
- password = data.get('password')
- if not email or not username or not password:
- raise HTTPException(status_code=400, detail="Email, username and password are required")
- # 查找用户 MCP 服务器
- target_server = MCP_SERVERS.get('novel-platform-user')
- if not target_server:
- # 如果没有专门的用户服务器,尝试找到任何需要 JWT 认证的服务器
- for server_id, config in MCP_SERVERS.items():
- if config.get('auth_type') == 'jwt' and 'base_url' in config:
- if 'user' in server_id:
- target_server = config
- break
- elif target_server is None:
- target_server = config
- if not target_server:
- raise HTTPException(status_code=400, detail="No JWT-authenticated server configured")
- # 构建注册 URL
- base_url = target_server.get('base_url', '')
- register_url = f"{base_url}/api/v1/auth/register"
- # 调用实际的注册接口(异步版本)
- async with httpx.AsyncClient(timeout=30.0) as http_client:
- response = await http_client.post(
- register_url,
- json={"email": email, "username": username, "password": password}
- )
- if response.status_code in (200, 201):
- result = response.json()
- # Novel Platform API 返回用户对象
- return {
- "success": True,
- "message": "注册成功",
- "user": {
- "id": result.get("id"),
- "email": result.get("email"),
- "username": result.get("username"),
- "role": result.get("role")
- }
- }
- else:
- # 尝试解析错误响应
- try:
- error_detail = response.json()
- error_msg = error_detail.get("detail", response.text)
- except:
- error_msg = response.text
- raise HTTPException(
- status_code=response.status_code,
- detail=error_msg
- )
- except HTTPException:
- raise
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/auth/logout")
- async def logout(request: Request):
- """登出并清除会话"""
- try:
- data = await request.json()
- session_id = data.get('session_id')
- if session_id and session_id in auth_sessions:
- del auth_sessions[session_id]
- return {"success": True}
- except Exception as e:
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/auth/status")
- async def auth_status(x_session_id: Optional[str] = Header(None, alias='X-Session-ID')):
- """检查认证状态"""
- if x_session_id and x_session_id in auth_sessions:
- session = auth_sessions[x_session_id]
- return {
- "authenticated": True,
- "username": session.get("username"),
- "server": session.get("server"),
- "role": session.get("role", "user")
- }
- return {"authenticated": False}
- # ========== 测试 API ==========
- @app.get("/api/test-mcp")
- async def test_mcp_get(
- tool_name: str = "get_system_stats",
- server_id: str = "novel-platform-admin",
- auth_token: Optional[str] = None
- ):
- """
- GET 方式测试 MCP 工具调用(用于 curl 测试)
- 参数:
- - tool_name: 工具名称 (默认: get_system_stats)
- - server_id: 服务器 ID (默认: novel-platform-admin)
- - auth_token: JWT token (通过 query 参数或 header 传递)
- """
- try:
- # 如果 query 参数没有 token,尝试从 header 获取
- if not auth_token:
- # 这个处理会在实际请求时通过 FastAPI 的 Header 参数处理
- pass
- print("\n" + "="*60)
- print("[TEST-MCP GET] MCP 工具调用测试")
- print("="*60)
- print(f"[TEST-MCP GET] server_id: {server_id}")
- print(f"[TEST-MCP GET] tool_name: {tool_name}")
- print(f"[TEST-MCP GET] auth_token present: {bool(auth_token)}")
- if auth_token:
- print(f"[TEST-MCP GET] auth_token (前50字符): {auth_token[:50]}...")
- print("="*60 + "\n")
- # 创建 MCP 客户端
- client = MCPClient(
- server_id=server_id,
- session_id="test-session",
- auth_token=auth_token
- )
- # 调用工具(无参数)
- print(f"[TEST-MCP GET] 开始调用工具...")
- result = await client.call_tool(tool_name, {})
- print(f"[TEST-MCP GET] 调用完成")
- print(f"[TEST-MCP GET] success: {result.get('success', False)}")
- print("="*60 + "\n")
- return {
- "success": True,
- "server_id": server_id,
- "tool_name": tool_name,
- "result": result
- }
- except Exception as e:
- import traceback
- print(f"[TEST-MCP GET] 异常: {e}")
- traceback.print_exc()
- return JSONResponse(
- status_code=500,
- content={
- "success": False,
- "error": str(e),
- "traceback": traceback.format_exc()
- }
- )
- @app.post("/api/test-mcp")
- async def test_mcp_call(request: Request):
- """
- 直接测试 MCP 工具调用(绕过 CORS,用于调试)
- 请求体:
- {
- "server_id": "novel-platform-admin", // 可选,默认使用 admin
- "tool_name": "get_system_stats", // 工具名称
- "arguments": {}, // 工具参数
- "auth_token": "jwt-token" // JWT 认证 token
- }
- """
- try:
- data = await request.json()
- server_id = data.get('server_id', 'novel-platform-admin')
- tool_name = data.get('tool_name', '')
- arguments = data.get('arguments', {})
- auth_token = data.get('auth_token')
- print("\n" + "="*60)
- print("[TEST-MCP] MCP 工具调用测试")
- print("="*60)
- print(f"[TEST-MCP] server_id: {server_id}")
- print(f"[TEST-MCP] tool_name: {tool_name}")
- print(f"[TEST-MCP] arguments: {arguments}")
- print(f"[TEST-MCP] auth_token present: {bool(auth_token)}")
- if auth_token:
- print(f"[TEST-MCP] auth_token (前50字符): {auth_token[:50]}...")
- print("="*60 + "\n")
- if not tool_name:
- raise HTTPException(status_code=400, detail="tool_name is required")
- # 创建 MCP 客户端
- client = MCPClient(
- server_id=server_id,
- session_id="test-session",
- auth_token=auth_token
- )
- # 调用工具
- print(f"[TEST-MCP] 开始调用工具...")
- result = await client.call_tool(tool_name, arguments)
- print(f"[TEST-MCP] 调用结果:")
- print(f"[TEST-MCP] success: {result.get('success', False)}")
- print(f"[TEST-MCP] has_error: {'error' in result}")
- if 'error' in result:
- print(f"[TEST-MCP] error: {result['error']}")
- else:
- result_preview = result.get('result', '')[:100]
- print(f"[TEST-MCP] result (预览): {result_preview}...")
- print("="*60 + "\n")
- return {
- "success": True,
- "server_id": server_id,
- "tool_name": tool_name,
- "arguments": arguments,
- "result": result,
- "debug": {
- "auth_token_present": bool(auth_token),
- "auth_token_length": len(auth_token) if auth_token else 0
- }
- }
- except HTTPException:
- raise
- except Exception as e:
- import traceback
- print(f"[TEST-MCP] 异常: {e}")
- traceback.print_exc()
- return JSONResponse(
- status_code=500,
- content={
- "success": False,
- "error": str(e),
- "traceback": traceback.format_exc()
- }
- )
- # ========== 主程序入口 ==========
- if __name__ == '__main__':
- import uvicorn
- port = int(os.getenv('PORT', 8081)) # 改为 8081,Next.js 使用 8080
- debug = os.getenv('DEBUG', 'False').lower() == 'true'
- uvicorn.run(
- "app_fastapi:app",
- host='0.0.0.0',
- port=port,
- reload=debug
- )
|