Sfoglia il codice sorgente

feat(mcp): Add glossary_import tool and improve file input handling

- Add glossary_import tool supporting batch import from file_url/file_path/file_content
- Improve term placeholder format from __en__ to __TERM_N__ for better m2m100 preservation
- Add preview field (500-1000 chars) to translation/cleaning output
- Support base64 file content and HTTP URL input for translate_file and clean_file
- Add restart_mcp_server.sh script for convenient server restart

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
d8dfun 1 giorno fa
parent
commit
d196e9fdf2

+ 22 - 1
CLAUDE.md

@@ -108,6 +108,27 @@
 
 ---
 
+## 🔌 MCP 服务器配置说明
+
+> **重要**:本项目使用 **HTTP 传输模式**,不是 stdio 模式。
+
+### MCP 连接参数
+
+| 参数 | 值 |
+|------|-----|
+| **传输模式** | HTTP |
+| **端点路径** | `/mcp` |
+| **本地端口** | 8080 |
+| **外网端点** | `https://d8d-ai-vscode-8080-223-236-template-6-group.dev.d8d.fun/mcp` |
+
+### 子代理调用 MCP 工具时注意
+
+- ✅ **正确**: MCP 服务器已在后台运行,直接调用工具即可
+- ❌ **错误**: 不要尝试启动新的 MCP 进程或修改 MCP 配置
+- 📡 **协议**: HTTP(服务器已暴露 HTTP 端点)
+
+---
+
 ## 📦 容器重启后操作指南
 
 > **⚠️ 重要**:开发容器重启后,需要执行以下步骤才能正常使用 MCP 服务器和翻译功能。
@@ -134,7 +155,7 @@ tail -f mcp.log
 /root/.local/bin/claude mcp remove novel-translator
 /root/.local/bin/claude mcp add --scope project --transport http \
   novel-translator \
-  https://d8d-ai-vscode-8080-223-236-template-6-group.dev.d8d.fun/sse
+  https://d8d-ai-vscode-8080-223-236-template-6-group.dev.d8d.fun/mcp
 ```
 
 ### 🔧 依赖问题修复

+ 42 - 0
scripts/restart_mcp_server.sh

@@ -0,0 +1,42 @@
+#!/bin/bash
+# 重启 Novel Translator MCP 服务器
+# 需要运行此脚本来应用代码更改
+
+set -e
+
+cd /mnt/code/223-236-template-6
+
+# 设置 PYTHONPATH
+export PYTHONPATH=/mnt/code/223-236-template-6:$PYTHONPATH
+
+echo "Checking for existing MCP server..."
+if pgrep -f "mcp_server" > /dev/null; then
+    echo "Found running MCP server. Attempting to stop..."
+    pkill -f 'mcp_server' 2>/dev/null || true
+    sleep 2
+
+    # Verify server stopped
+    if pgrep -f "mcp_server" > /dev/null; then
+        echo "Warning: Could not stop the MCP server."
+        echo "The server may be running under a different user."
+        echo "Please manually stop it and run this script again."
+        exit 1
+    fi
+fi
+
+echo "Starting MCP Server..."
+nohup python3 -m src.mcp_server.server > mcp.log 2>&1 &
+
+sleep 3
+
+if pgrep -f "mcp_server" > /dev/null; then
+    echo "✓ MCP Server started successfully!"
+    echo "  Log file: mcp.log"
+    echo ""
+    echo "To view logs: tail -f mcp.log"
+    echo "To check status: ps aux | grep mcp_server"
+else
+    echo "✗ Failed to start MCP server. Check mcp.log for errors."
+    cat mcp.log
+    exit 1
+fi

+ 6 - 5
scripts/start_mcp_server.sh

@@ -3,16 +3,17 @@
 
 cd /mnt/code/223-236-template-6
 
-# 设置 PYTHONPATH
-export PYTHONPATH=/mnt/code/223-236-template-6:$PYTHONPATH
-
 # 停止旧的 MCP 服务器
+pkill -f 'server_http' 2>/dev/null
 pkill -f 'mcp_server' 2>/dev/null
 sleep 2
 
+# 设置端口号(默认 8081,因为 8080 可能被占用)
+export MCP_PORT=${MCP_PORT:-8081}
+
 # 启动 MCP 服务器
-echo "Starting MCP Server..."
-nohup python3 -m src.mcp_server.server > mcp.log 2>&1 &
+echo "Starting MCP Server on port $MCP_PORT..."
+nohup python3 src/mcp_server/server_http.py > mcp.log 2>&1 &
 
 sleep 3
 echo "MCP Server started. Check logs: tail -f mcp.log"

+ 7 - 2
src/glossary/matcher.py

@@ -37,9 +37,12 @@ class GlossaryMatcher:
 
     Uses longest-match processing to ensure longer terms are matched
     before shorter ones (e.g., "魔法师" before "魔法").
+
+    Uses __TERM_0__ format for placeholders - m2m100 tends to preserve
+    alphanumeric patterns with underscores better than special characters.
     """
 
-    PLACEHOLDER_PREFIX = "__en__"
+    PLACEHOLDER_FORMAT = "__TERM_{}__"  # Use __TERM_0__ format (m2m100 preserves this)
 
     def __init__(self, glossary: Glossary):
         """
@@ -63,6 +66,7 @@ class GlossaryMatcher:
         """
         matches = []
         occupied_positions = set()
+        placeholder_index = 0
 
         for term in self._sorted_terms:
             entry = self.glossary.get(term)
@@ -80,7 +84,8 @@ class GlossaryMatcher:
 
                 # Check if any position is already occupied
                 if not any(pos <= p < end for p in occupied_positions):
-                    placeholder = f"{self.PLACEHOLDER_PREFIX}{term}"
+                    placeholder = self.PLACEHOLDER_FORMAT.format(placeholder_index)
+                    placeholder_index += 1
                     matches.append(
                         TermMatch(
                             source=term,

+ 105 - 22
src/glossary/postprocessor.py

@@ -38,7 +38,12 @@ class GlossaryPostprocessor:
 
     def __init__(self):
         """Initialize the postprocessor."""
-        self.placeholder_pattern = re.compile(r"__en__([^_\s]+(?:_[^_\s]+)*)")
+        # Pattern for __TERM_0__ placeholders (primary format)
+        self.placeholder_pattern = re.compile(r"__TERM_(\d+)__")
+        # Pattern for legacy 【数字】 placeholders (for backward compatibility)
+        self.legacy_bracket_pattern = re.compile(r"【(\d+)】")
+        # Pattern for legacy __en__ placeholders (for cleanup)
+        self.legacy_placeholder_pattern = re.compile(r"__en__([^_\s]+(?:_[^_\s]+)*)")
 
     def process(self, translated_text: str, placeholder_map: Dict[str, str]) -> str:
         """
@@ -63,21 +68,70 @@ class GlossaryPostprocessor:
         """
         Restore placeholders back to their translated terms.
 
+        Handles __TERM_N__ format placeholders which m2m100 tends to preserve.
+        Also handles legacy 【N】 format for backward compatibility.
+
         Args:
             text: The text containing placeholders
-            mapping: The placeholder to translation mapping
+            mapping: The placeholder to translation mapping (keys are __TERM_N__ format)
 
         Returns:
             Text with placeholders replaced by translations
         """
         result = text
 
-        # Sort placeholders by length (longest first) to avoid partial replacements
-        sorted_placeholders = sorted(mapping.keys(), key=len, reverse=True)
+        # Sort placeholders by index (descending) to avoid partial replacements
+        # e.g., __TERM_10__ should be replaced before __TERM_1__
+        def extract_index(placeholder: str) -> int:
+            """Extract index from placeholder format like __TERM_0__ or 【0】"""
+            match = self.placeholder_pattern.fullmatch(placeholder)
+            if match:
+                return int(match.group(1))
+            # Try legacy bracket format
+            match = self.legacy_bracket_pattern.fullmatch(placeholder)
+            if match:
+                return int(match.group(1))
+            return 0
+
+        placeholders_by_index = sorted(
+            mapping.items(),
+            key=lambda x: extract_index(x[0]),
+            reverse=True
+        )
 
-        for placeholder in sorted_placeholders:
-            translation = mapping[placeholder]
-            result = result.replace(placeholder, translation)
+        for placeholder, translation in placeholders_by_index:
+            # Extract the index from the placeholder
+            index = extract_index(placeholder)
+            if index == 0 and placeholder != "__TERM_0__" and placeholder != "【0】":
+                # Couldn't parse, try direct replacement
+                result = result.replace(placeholder, translation)
+                continue
+
+            # Try multiple formats that m2m100 might produce:
+            patterns = [
+                f"__TERM_{index}__",  # Original (preserved)
+                f"_TERM_{index}__",   # First underscore removed
+                f"__TERM_{index}_",   # Last underscore removed
+                f"_TERM_{index}_",    # Both outer underscores removed
+                f"【{index}】",        # Legacy Chinese brackets
+                f"[{index}]",          # Converted brackets
+            ]
+            for pattern in patterns:
+                if pattern in result:
+                    # Check if we need to add a space before the replacement
+                    # m2m100 sometimes omits space before placeholders: "a__TERM_1__"
+                    pos = result.find(pattern)
+                    if pos > 0:
+                        prev_char = result[pos - 1]
+                        # If previous char is a letter/digit and no space before placeholder
+                        if prev_char.isalnum() and (pos == 0 or result[pos - 2:pos - 1] != ' '):
+                            # Add space before translation
+                            result = result.replace(pattern, f" {translation}", 1)
+                        else:
+                            result = result.replace(pattern, translation, 1)
+                    else:
+                        result = result.replace(pattern, translation, 1)
+                    break
 
         return result
 
@@ -89,6 +143,7 @@ class GlossaryPostprocessor:
         - Spaces before punctuation (e.g., "Lin Feng ." → "Lin Feng.")
         - Multiple consecutive dots (e.g., "..." → ".")
         - Chinese punctuation after English (e.g., "Lin Feng," → "Lin Feng,")
+        - Missing spaces after word placeholders (e.g., "aQingyun" → "a Qingyun")
 
         Args:
             text: The text to fix
@@ -99,6 +154,10 @@ class GlossaryPostprocessor:
         # Fix multiple consecutive dots (common m2m100 output issue)
         text = re.sub(r"\.{2,}", ".", text)
 
+        # Fix missing spaces between lowercase letter and uppercase word
+        # This handles cases like "aQingyun Sect" → "a Qingyun Sect"
+        text = re.sub(r"([a-z])([A-Z][a-z])", r"\1 \2", text)
+
         # Remove space before common punctuation
         text = re.sub(r"\s+([.,!?;:)])", r"\1", text)
 
@@ -113,22 +172,38 @@ class GlossaryPostprocessor:
 
     def clean_language_tags(self, text: str) -> str:
         """
-        Remove any remaining language tag prefixes like __en__.
+        Remove any remaining placeholder artifacts and language tags.
 
-        This handles cases where m2m100 translates the content within
-        placeholders, leaving behind orphaned __en__ prefixes.
+        This cleans up:
+        1. Legacy __en__ patterns (old placeholder format)
+        2. Corrupted __TERM_N__ patterns that weren't restored
+        3. Partial matches like _TERM_N__ or __TERM_N_
 
         Args:
             text: The text to clean
 
         Returns:
-            Text with language tag prefixes removed
+            Text with placeholder artifacts removed
         """
-        # Remove __en__ followed by any non-space content
-        # This pattern catches: __en__Lin, __en__Qingyun, etc.
-        result = re.sub(r"__en__\s*", "", text)
-        # Also clean other potential language tag formats
-        result = re.sub(r"__\w+__\s*", "", result)
+        result = text
+
+        # Clean up corrupted __TERM_N__ patterns
+        result = re.sub(r"_?TERM_\d+_?", "", result)
+        result = re.sub(r"__TERM_\d+__", "", result)
+        result = re.sub(r"_TERM_\d+__", "", result)
+        result = re.sub(r"__TERM_\d+_", "", result)
+
+        # Clean up legacy __en__ patterns
+        result = re.sub(r"__en__\w*", "", result)
+        result = re.sub(r"__en(?=[A-Z][a-z])", "", result)
+        result = re.sub(r"(?<!\w)\w{0,2}en__", "", result)
+        result = re.sub(r"__en(?!\w)", "", result)
+        result = re.sub(r"__\w+__\w*", "", result)
+
+        # Clean up any double spaces created by removals
+        result = re.sub(r"\s+", " ", result)
+        result = result.strip()
+
         return result
 
     def validate_translation(
@@ -148,18 +223,26 @@ class GlossaryPostprocessor:
         # Find all placeholders that were used
         original_placeholders = set(placeholder_map.keys())
 
-        # Check for remaining placeholders in translated text
-        remaining = self.placeholder_pattern.findall(translated)
-        extra_placeholders = [f"__en__{p}" for p in remaining]
+        # Check for remaining placeholders in translated text (both 【】 and [] formats)
+        remaining_bracket = self.placeholder_pattern.findall(translated)
+        extra_placeholders = [f"【{p}】" for p in remaining_bracket]
+
+        # Also check for legacy __en__ format
+        remaining_legacy = self.legacy_placeholder_pattern.findall(translated)
+        extra_placeholders.extend([f"__en__{p}" for p in remaining_legacy])
 
         # Check for missing translations by verifying the translated text
         # contains the expected translations
         missing_terms = []
         for placeholder, translation in placeholder_map.items():
             if translation not in translated:
-                # Try to find the original term to see what's missing
-                source = placeholder.replace(GlossaryMatcher.PLACEHOLDER_PREFIX, "")
-                missing_terms.append(source)
+                # Extract the index to identify which term is missing
+                index_match = self.placeholder_pattern.fullmatch(placeholder)
+                if index_match:
+                    # The placeholder_map key is the placeholder itself
+                    # We need to find which source term this corresponds to
+                    # For now, just mark that a translation is missing
+                    missing_terms.append(placeholder)
 
         is_valid = not extra_placeholders and not missing_terms
 

+ 295 - 22
src/mcp_server/server.py

@@ -12,12 +12,16 @@ if torch_path not in sys.path:
     sys.path.insert(0, torch_path)
 
 import asyncio
+import base64
 import json
 import os
+import tempfile
 import uuid
 from pathlib import Path
 from typing import Any, Dict, List, Optional
 
+import requests
+
 from fastmcp import FastMCP
 
 from ..translator.engine import TranslationEngine
@@ -73,7 +77,7 @@ def _initialize_components() -> None:
     )
 
     # Initialize repository and fingerprint service
-    _repository = Repository()
+    _repository = Repository(Path("/mnt/code/223-236-template-6/data"))
     _fingerprint_service = FingerprintService(_repository)
 
 
@@ -252,9 +256,97 @@ def _add_suffix(path: str, suffix: str) -> str:
     return str(p.with_stem(p.stem + suffix))
 
 
+async def _resolve_file_input(
+    file_path: Optional[str] = None,
+    file_content: Optional[str] = None,
+    filename: Optional[str] = None,
+    file_url: Optional[str] = None
+) -> tuple[Path, bool]:
+    """
+    解析三种文件输入方式,返回文件路径和是否需要清理临时文件。
+
+    优先级:file_content > file_url > file_path
+
+    Args:
+        file_path: 容器内文件路径
+        file_content: base64 编码的文件内容
+        filename: 配合 file_content 使用的文件名
+        file_url: HTTP URL
+
+    Returns:
+        (文件路径, 是否为临时文件)
+
+    Raises:
+        ValueError: 如果没有提供任何有效的输入
+        IOError: 如果文件读取失败
+    """
+    temp_file = None
+
+    # 1. file_content 优先级最高
+    if file_content:
+        try:
+            decoded = base64.b64decode(file_content)
+        except Exception as e:
+            raise ValueError(f"Invalid base64 content: {e}")
+
+        # 使用文件名或默认名称
+        name = filename or "uploaded_file.txt"
+        temp_file = Path(tempfile.gettempdir()) / f"mcp_upload_{uuid.uuid4().hex}_{name}"
+
+        temp_file.parent.mkdir(parents=True, exist_ok=True)
+        with open(temp_file, "wb") as f:
+            f.write(decoded)
+
+        return temp_file, True
+
+    # 2. file_url 其次
+    if file_url:
+        try:
+            response = requests.get(file_url, timeout=30)
+            response.raise_for_status()
+            content = response.content
+        except requests.RequestException as e:
+            raise IOError(f"Failed to download file from URL: {e}")
+
+        # 从 URL 提取文件名或使用默认名称
+        url_path = Path(file_url.split("?")[0])
+        name = url_path.name or filename or "downloaded_file.txt"
+        temp_file = Path(tempfile.gettempdir()) / f"mcp_download_{uuid.uuid4().hex}_{name}"
+
+        temp_file.parent.mkdir(parents=True, exist_ok=True)
+        with open(temp_file, "wb") as f:
+            f.write(content)
+
+        return temp_file, True
+
+    # 3. file_path 最后
+    if file_path:
+        path_obj = Path(file_path)
+        if not path_obj.exists():
+            raise FileNotFoundError(f"File not found: {file_path}")
+        return path_obj, False
+
+    # 没有提供任何有效输入
+    raise ValueError(
+        "At least one of file_path, file_content, or file_url must be provided"
+    )
+
+
+async def _cleanup_temp_file(path: Path) -> None:
+    """清理临时文件。"""
+    try:
+        if path.exists():
+            path.unlink()
+    except Exception:
+        pass  # 忽略清理错误
+
+
 @mcp.tool()
 async def translate_file(
-    file_path: str,
+    file_path: Optional[str] = None,
+    file_content: Optional[str] = None,
+    filename: Optional[str] = None,
+    file_url: Optional[str] = None,
     src_lang: str = "zh",
     tgt_lang: str = "en",
     output_path: Optional[str] = None,
@@ -270,8 +362,16 @@ async def translate_file(
     3. 逐章翻译(使用术语表)
     4. 保存结果(保留章节结构)
 
+    支持三种文件输入方式(按优先级排序):
+    1. file_content: base64 编码的文件内容(优先级最高)
+    2. file_url: HTTP URL(服务器自动下载)
+    3. file_path: 容器内文件路径(优先级最低)
+
     Args:
-        file_path: 要翻译的文件路径
+        file_path: 容器内文件路径
+        file_content: base64 编码的文件内容
+        filename: 配合 file_content 使用,指定原始文件名
+        file_url: HTTP URL,服务器自动下载文件
         src_lang: 源语言代码
         tgt_lang: 目标语言代码
         output_path: 输出文件路径(默认添加 _en 后缀)
@@ -283,15 +383,25 @@ async def translate_file(
     """
     from ..cleaning.models import Chapter
 
-    file_path_obj = Path(file_path)
-    if not file_path_obj.exists():
-        return {"success": False, "error": f"File not found: {file_path}"}
+    # 解析文件输入
+    try:
+        file_path_obj, is_temp = await _resolve_file_input(
+            file_path=file_path,
+            file_content=file_content,
+            filename=filename,
+            file_url=file_url
+        )
+    except (ValueError, FileNotFoundError, IOError) as e:
+        return {"success": False, "error": str(e)}
 
     task_id = create_task(
         task_type="file_translation",
-        metadata={"file_path": file_path}
+        metadata={"file_path": str(file_path_obj)}
     )
 
+    # 用于清理临时文件的标记
+    _temp_file_to_cleanup = file_path_obj if is_temp else None
+
     try:
         pipeline = get_pipeline()
         cleaning_pipeline = get_cleaning_pipeline()
@@ -341,7 +451,7 @@ async def translate_file(
                 "message": f"Translated chapter {i + 1}/{total_chapters}: {chapter.title}"
             })
 
-        output = output_path or _add_suffix(file_path, "_en")
+        output = output_path or _add_suffix(str(file_path_obj), "_en")
         output_file = Path(output)
 
         await update_progress(task_id, {
@@ -358,6 +468,20 @@ async def translate_file(
                 f.write(chapter["content"])
                 f.write("\n\n")
 
+        # Generate preview (500-1000 characters)
+        preview_length_limit = 750
+        with open(output_file, "r", encoding="utf-8") as f:
+            content = f.read()
+            if len(content) <= preview_length_limit:
+                preview = content
+            else:
+                # Try to break at a newline for cleaner preview
+                preview = content[:preview_length_limit]
+                last_newline = preview.rfind('\n')
+                if last_newline > preview_length_limit // 2:
+                    preview = content[:last_newline]
+                preview += "\n\n... (truncated)"
+
         await complete_task(task_id, success=True)
 
         await update_progress(task_id, {
@@ -370,11 +494,16 @@ async def translate_file(
             "output_path": output,
             "task_id": task_id,
             "terms_used": list(all_terms_used),
-            "chapters_translated": total_chapters
+            "chapters_translated": total_chapters,
+            "total_chapters": total_chapters,
+            "preview": preview,
+            "preview_length": len(preview)
         }
 
     except Exception as e:
         await complete_task(task_id, success=False)
+        if _temp_file_to_cleanup:
+            await _cleanup_temp_file(_temp_file_to_cleanup)
 
         await update_progress(task_id, {
             "status": "failed",
@@ -390,7 +519,10 @@ async def translate_file(
 
 @mcp.tool()
 async def clean_file(
-    file_path: str,
+    file_path: Optional[str] = None,
+    file_content: Optional[str] = None,
+    filename: Optional[str] = None,
+    file_url: Optional[str] = None,
     output_path: Optional[str] = None,
     enable_cleaning: bool = True,
     enable_splitting: bool = True
@@ -404,8 +536,16 @@ async def clean_file(
     - 标准化空白字符和标点
     - 可选章节分割
 
+    支持三种文件输入方式(按优先级排序):
+    1. file_content: base64 编码的文件内容(优先级最高)
+    2. file_url: HTTP URL(服务器自动下载)
+    3. file_path: 容器内文件路径(优先级最低)
+
     Args:
-        file_path: 要清洗的文件路径
+        file_path: 容器内文件路径
+        file_content: base64 编码的文件内容
+        filename: 配合 file_content 使用,指定原始文件名
+        file_url: HTTP URL,服务器自动下载文件
         output_path: 输出路径(可选)
         enable_cleaning: 是否启用清洗
         enable_splitting: 是否启用章节分割
@@ -413,9 +553,18 @@ async def clean_file(
     Returns:
         包含章节信息和清洗结果的字典
     """
-    file_path_obj = Path(file_path)
-    if not file_path_obj.exists():
-        return {"success": False, "error": f"File not found: {file_path}"}
+    # 解析文件输入
+    try:
+        file_path_obj, is_temp = await _resolve_file_input(
+            file_path=file_path,
+            file_content=file_content,
+            filename=filename,
+            file_url=file_url
+        )
+    except (ValueError, FileNotFoundError, IOError) as e:
+        return {"success": False, "error": str(e)}
+
+    _temp_file_to_cleanup = file_path_obj if is_temp else None
 
     try:
         pipeline = get_cleaning_pipeline()
@@ -447,6 +596,23 @@ async def clean_file(
 
             saved_path = str(output_file)
 
+            # Generate preview (500-1000 characters)
+            preview_length_limit = 750
+            with open(output_file, "r", encoding="utf-8") as f:
+                content = f.read()
+                if len(content) <= preview_length_limit:
+                    preview = content
+                else:
+                    # Try to break at a newline for cleaner preview
+                    preview = content[:preview_length_limit]
+                    last_newline = preview.rfind('\n')
+                    if last_newline > preview_length_limit / 2:
+                        preview = content[:last_newline]
+                    preview += "\n\n... (truncated)"
+        else:
+            preview = None
+            preview_length = None
+
         total_chars = sum(c.char_count for c in chapters)
 
         return {
@@ -454,11 +620,19 @@ async def clean_file(
             "chapters": chapter_list,
             "chapter_count": len(chapters),
             "total_chars": total_chars,
-            "output_path": saved_path
+            "output_path": saved_path,
+            "preview": preview,
+            "preview_length": len(preview) if preview else None
         }
 
     except Exception as e:
+        if _temp_file_to_cleanup:
+            await _cleanup_temp_file(_temp_file_to_cleanup)
         return {"success": False, "error": f"Cleaning failed: {str(e)}"}
+    finally:
+        # 清理临时文件
+        if _temp_file_to_cleanup:
+            await _cleanup_temp_file(_temp_file_to_cleanup)
 
 
 @mcp.tool()
@@ -587,12 +761,23 @@ async def glossary_add(
 
 
 @mcp.tool()
-async def glossary_list() -> Dict[str, Any]:
+async def glossary_list(
+    export_format: Optional[str] = None,
+    output_path: Optional[str] = None
+) -> Dict[str, Any]:
     """
-    列出术语表所有条目。
+    列出术语表所有条目,支持导出为 JSON 文件。
+
+    Args:
+        export_format: 导出格式,目前仅支持 "json"
+        output_path: 导出文件保存路径(当 export_format 为 "json" 时必需)
 
     Returns:
-        包含所有术语条目的字典
+        包含所有术语条目的字典,或导出结果
+
+    Examples:
+        列出术语: glossary_list()
+        导出 JSON: glossary_list(export_format="json", output_path="/path/to/glossary.json")
     """
     try:
         glossary = get_glossary()
@@ -607,6 +792,44 @@ async def glossary_list() -> Dict[str, Any]:
             for e in glossary.get_all()
         ]
 
+        # 如果指定了导出格式
+        if export_format:
+            if export_format.lower() != "json":
+                return {
+                    "success": False,
+                    "error": f"Unsupported export format: {export_format}. Currently only 'json' is supported."
+                }
+
+            if not output_path:
+                return {
+                    "success": False,
+                    "error": "output_path is required when export_format is specified"
+                }
+
+            # 准备导出数据
+            export_data = {
+                "glossary": entries,
+                "count": len(entries),
+                "exported_at": __import__("datetime").datetime.utcnow().isoformat() + "Z"
+            }
+
+            # 写入文件
+            output_file = Path(output_path)
+            output_file.parent.mkdir(parents=True, exist_ok=True)
+
+            with open(output_file, "w", encoding="utf-8") as f:
+                json.dump(export_data, f, ensure_ascii=False, indent=2)
+
+            return {
+                "success": True,
+                "exported": True,
+                "export_format": "json",
+                "output_path": str(output_file),
+                "count": len(entries),
+                "message": f"Successfully exported {len(entries)} glossary entries to {output_path}"
+            }
+
+        # 默认返回条目列表
         return {
             "success": True,
             "entries": entries,
@@ -617,6 +840,54 @@ async def glossary_list() -> Dict[str, Any]:
         return {"success": False, "error": str(e)}
 
 
+@mcp.tool()
+async def glossary_import(
+    file_path: Optional[str] = None,
+    file_content: Optional[str] = None,
+    filename: Optional[str] = None,
+    file_url: Optional[str] = None,
+    merge_mode: str = "merge"
+) -> Dict[str, Any]:
+    """
+    批量导入术语表 JSON 文件。
+
+    支持三种文件输入方式(按优先级排序):
+    1. file_content: base64 编码的文件内容(优先级最高)
+    2. file_url: HTTP URL(服务器自动下载)
+    3. file_path: 容器内文件路径(优先级最低)
+
+    术语表 JSON 格式示例:
+    {
+        "glossary": [
+            {"source": "林风", "target": "Lin Feng", "category": "character", "context": "主角"},
+            {"source": "青云宗", "target": "Qingyun Sect", "category": "organization"}
+        ]
+    }
+
+    Args:
+        file_path: 容器内文件路径
+        file_content: base64 编码的文件内容
+        filename: 配合 file_content 使用,指定原始文件名
+        file_url: HTTP URL,服务器自动下载文件
+        merge_mode: "merge" 合并到现有术语表,"replace" 替换现有术语表
+
+    Returns:
+        导入结果,包含成功/跳过的术语数量
+    """
+    from .tools.glossary_import import glossary_import as _import
+
+    return await _import(
+        file_path=file_path,
+        file_content=file_content,
+        filename=filename,
+        file_url=file_url,
+        merge_mode=merge_mode,
+        resolve_file_fn=_resolve_file_input,
+        get_glossary_fn=get_glossary,
+        notify_updated_fn=notify_glossary_updated
+    )
+
+
 @mcp.tool()
 async def glossary_clear() -> Dict[str, Any]:
     """
@@ -752,15 +1023,17 @@ async def main():
     port = int(os.getenv("MCP_PORT", "8080"))
 
     print(f"Starting Novel Translator MCP Server on http://{host}:{port}")
-    print(f"Transport: SSE (Server-Sent Events)")
-    print(f"Endpoint: http://{host}:{port}/sse")
+    print(f"Transport: HTTP")
+    print(f"Endpoint: http://{host}:{port}/mcp")
 
     # Run HTTP server with SSE transport
+    # stateless=True disables session requirement for Claude Desktop compatibility
     await mcp.run_http_async(
-        transport="sse",
+        transport="http",
         host=host,
         port=port,
-        log_level="info"
+        log_level="info",
+        stateless=True
     )
 
 

+ 336 - 0
src/mcp_server/tools/glossary_import.py

@@ -0,0 +1,336 @@
+"""
+Glossary import tool for the Novel Translator MCP server.
+
+This module provides batch import functionality for translation glossaries.
+Supports importing from URLs, file paths, or base64-encoded content.
+"""
+
+import json
+import tempfile
+from pathlib import Path
+from typing import Any, Dict, List, Optional
+
+from pydantic import BaseModel, Field
+
+
+class GlossaryImportInput(BaseModel):
+    """Input schema for importing glossary entries."""
+    file_path: Optional[str] = Field(
+        default=None,
+        description="容器内术语表 JSON 文件路径"
+    )
+    file_content: Optional[str] = Field(
+        default=None,
+        description="base64 编码的术语表 JSON 文件内容"
+    )
+    filename: Optional[str] = Field(
+        default=None,
+        description="配合 file_content 使用,指定原始文件名"
+    )
+    file_url: Optional[str] = Field(
+        default=None,
+        description="术语表 JSON 文件的 HTTP URL,服务器自动下载"
+    )
+    merge_mode: str = Field(
+        default="merge",
+        description="导入模式: 'merge' (合并) 或 'replace' (替换现有术语表)"
+    )
+
+
+class GlossaryImportResult(BaseModel):
+    """Output schema for glossary import result."""
+    success: bool
+    imported: int = Field(description="成功导入的术语数量")
+    skipped: int = Field(default=0, description="跳过的术语数量(重复或无效)")
+    errors: List[str] = Field(default_factory=list, description="错误信息列表")
+    entries: Optional[List[Dict[str, Any]]] = Field(default=None, description="导入的术语条目")
+    error: Optional[str] = Field(default=None)
+
+
+async def glossary_import(
+    file_path: Optional[str] = None,
+    file_content: Optional[str] = None,
+    filename: Optional[str] = None,
+    file_url: Optional[str] = None,
+    merge_mode: str = "merge",
+    resolve_file_fn: callable = None,
+    get_glossary_fn: callable = None,
+    notify_updated_fn: callable = None
+) -> Dict[str, Any]:
+    """
+    批量导入术语表。
+
+    支持三种文件输入方式(按优先级排序):
+    1. file_content: base64 编码的文件内容(优先级最高)
+    2. file_url: HTTP URL(服务器自动下载)
+    3. file_path: 容器内文件路径(优先级最低)
+
+    术语表 JSON 格式示例:
+    {
+        "glossary": [
+            {
+                "source": "林风",
+                "target": "Lin Feng",
+                "category": "character",
+                "context": "主角"
+            },
+            {
+                "source": "青云宗",
+                "target": "Qingyun Sect",
+                "category": "organization",
+                "context": "修真门派"
+            }
+        ]
+    }
+
+    支持的类别:
+    - character: 角色名称
+    - skill: 技能名称
+    - location: 地点名称
+    - item: 物品名称
+    - organization: 组织名称
+    - other: 其他术语
+
+    Args:
+        file_path: 容器内文件路径
+        file_content: base64 编码的文件内容
+        filename: 配合 file_content 使用,指定原始文件名
+        file_url: HTTP URL,服务器自动下载文件
+        merge_mode: "merge" 合并到现有术语表,"replace" 替换现有术语表
+        resolve_file_fn: 文件解析函数
+        get_glossary_fn: 获取术语表的函数
+        notify_updated_fn: 通知术语表更新的函数
+
+    Returns:
+        导入结果,包含成功/跳过的术语数量和错误信息
+    """
+    import base64
+    import uuid
+    import requests
+
+    from ...glossary.models import GlossaryEntry, Glossary, TermCategory
+    from ..server import (
+        get_glossary as default_get_glossary,
+        notify_glossary_updated as default_notify
+    )
+
+    get_glossary = get_glossary_fn or default_get_glossary
+    notify_updated = notify_updated_fn or default_notify
+
+    # 使用传入的 resolve_file_fn 或使用内联实现
+    if resolve_file_fn:
+        try:
+            file_path_obj, is_temp = await resolve_file_fn(
+                file_path=file_path,
+                file_content=file_content,
+                filename=filename,
+                file_url=file_url
+            )
+        except (ValueError, FileNotFoundError, IOError) as e:
+            return {
+                "success": False,
+                "error": f"Failed to resolve file input: {e}",
+                "imported": 0,
+                "skipped": 0,
+                "errors": [str(e)]
+            }
+    else:
+        # 内联实现文件解析(与 server.py 中的 _resolve_file_input 一致)
+        temp_file = None
+
+        try:
+            # 1. file_content 优先级最高
+            if file_content:
+                try:
+                    decoded = base64.b64decode(file_content)
+                except Exception as e:
+                    raise ValueError(f"Invalid base64 content: {e}")
+
+                name = filename or "glossary.json"
+                temp_file = Path(tempfile.gettempdir()) / f"mcp_upload_{uuid.uuid4().hex}_{name}"
+
+                temp_file.parent.mkdir(parents=True, exist_ok=True)
+                with open(temp_file, "wb") as f:
+                    f.write(decoded)
+
+                file_path_obj, is_temp = temp_file, True
+
+            # 2. file_url 其次
+            elif file_url:
+                try:
+                    response = requests.get(file_url, timeout=30)
+                    response.raise_for_status()
+                    content = response.content
+                except requests.RequestException as e:
+                    raise IOError(f"Failed to download file from URL: {e}")
+
+                url_path = Path(file_url.split("?")[0])
+                name = url_path.name or filename or "glossary.json"
+                temp_file = Path(tempfile.gettempdir()) / f"mcp_download_{uuid.uuid4().hex}_{name}"
+
+                temp_file.parent.mkdir(parents=True, exist_ok=True)
+                with open(temp_file, "wb") as f:
+                    f.write(content)
+
+                file_path_obj, is_temp = temp_file, True
+
+            # 3. file_path 最后
+            elif file_path:
+                path_obj = Path(file_path)
+                if not path_obj.exists():
+                    raise FileNotFoundError(f"File not found: {file_path}")
+                file_path_obj, is_temp = path_obj, False
+
+            else:
+                raise ValueError(
+                    "At least one of file_path, file_content, or file_url must be provided"
+                )
+
+        except (ValueError, FileNotFoundError, IOError) as e:
+            return {
+                "success": False,
+                "error": str(e),
+                "imported": 0,
+                "skipped": 0,
+                "errors": [str(e)]
+            }
+
+    # 清理临时文件的标记
+    _temp_file_to_cleanup = file_path_obj if is_temp else None
+
+    try:
+        # 读取并解析 JSON 文件
+        try:
+            with open(file_path_obj, "r", encoding="utf-8") as f:
+                data = json.load(f)
+        except json.JSONDecodeError as e:
+            return {
+                "success": False,
+                "error": f"Invalid JSON format: {e}",
+                "imported": 0,
+                "skipped": 0,
+                "errors": [f"Invalid JSON: {e}"]
+            }
+        except Exception as e:
+            return {
+                "success": False,
+                "error": f"Failed to read file: {e}",
+                "imported": 0,
+                "skipped": 0,
+                "errors": [str(e)]
+            }
+
+        # 解析术语表数据
+        if isinstance(data, dict) and "glossary" in data:
+            entries_data = data["glossary"]
+        elif isinstance(data, list):
+            entries_data = data
+        else:
+            return {
+                "success": False,
+                "error": "Invalid glossary format: expected object with 'glossary' key or array",
+                "imported": 0,
+                "skipped": 0,
+                "errors": ["Invalid format"]
+            }
+
+        if not isinstance(entries_data, list):
+            return {
+                "success": False,
+                "error": "Invalid glossary format: 'glossary' must be an array",
+                "imported": 0,
+                "skipped": 0,
+                "errors": ["Invalid format"]
+            }
+
+        # 获取术语表
+        glossary = get_glossary()
+
+        # replace 模式:先清空现有术语表
+        if merge_mode == "replace":
+            glossary._terms.clear()
+
+        # 批量导入术语
+        imported = 0
+        skipped = 0
+        errors = []
+        imported_entries = []
+
+        for entry_data in entries_data:
+            if not isinstance(entry_data, dict):
+                skipped += 1
+                errors.append(f"Skipping invalid entry (not an object)")
+                continue
+
+            source = entry_data.get("source", "").strip()
+            target = entry_data.get("target", "").strip()
+
+            if not source or not target:
+                skipped += 1
+                errors.append(f"Skipping entry with empty source or target")
+                continue
+
+            # 解析类别
+            category_str = entry_data.get("category", "other")
+            try:
+                category = TermCategory(category_str)
+            except ValueError:
+                category = TermCategory.OTHER
+
+            context = entry_data.get("context", "").strip()
+
+            # 创建术语条目
+            entry = GlossaryEntry(
+                source=source,
+                target=target,
+                category=category,
+                context=context
+            )
+
+            # 添加到术语表
+            glossary.add(entry)
+            imported += 1
+
+            imported_entries.append({
+                "source": entry.source,
+                "target": entry.target,
+                "category": entry.category.value,
+                "context": entry.context
+            })
+
+        # 通知术语表更新
+        if notify_updated:
+            if callable(notify_updated):
+                import asyncio
+                if asyncio.iscoroutinefunction(notify_updated):
+                    await notify_updated()
+                else:
+                    notify_updated()
+
+        return {
+            "success": True,
+            "imported": imported,
+            "skipped": skipped,
+            "errors": errors[:10],  # 最多返回 10 个错误
+            "entries": imported_entries,
+            "message": f"Successfully imported {imported} glossary entries" +
+                       (f" (skipped {skipped})" if skipped > 0 else "")
+        }
+
+    except Exception as e:
+        return {
+            "success": False,
+            "error": f"Import failed: {str(e)}",
+            "imported": imported,
+            "skipped": skipped,
+            "errors": [str(e)]
+        }
+
+    finally:
+        # 清理临时文件
+        if _temp_file_to_cleanup:
+            try:
+                if _temp_file_to_cleanup.exists():
+                    _temp_file_to_cleanup.unlink()
+            except Exception:
+                pass

+ 19 - 1
src/mcp_server/tools/translation.py

@@ -51,6 +51,8 @@ class TranslationOutput(BaseModel):
     error: Optional[str] = None
     chapters_translated: Optional[int] = None
     total_chapters: Optional[int] = None
+    preview: Optional[str] = Field(default=None, description="预览翻译结果的前 500-1000 个字符")
+    preview_length: Optional[int] = Field(default=None, description="预览内容的实际字符数")
 
 
 class ChapterOutput(BaseModel):
@@ -342,6 +344,20 @@ async def translate_file(
                 f.write(chapter.content)
                 f.write("\n\n")
 
+        # Generate preview (500-1000 characters)
+        preview_length_limit = 750
+        with open(output_file, "r", encoding="utf-8") as f:
+            content = f.read()
+            if len(content) <= preview_length_limit:
+                preview = content
+            else:
+                # Try to break at a newline for cleaner preview
+                preview = content[:preview_length_limit]
+                last_newline = preview.rfind('\n')
+                if last_newline > preview_length_limit // 2:
+                    preview = content[:last_newline]
+                preview += "\n\n... (truncated)"
+
         # Complete task
         await complete_task(task_id, success=True)
 
@@ -356,7 +372,9 @@ async def translate_file(
             task_id=task_id,
             terms_used=list(all_terms_used),
             chapters_translated=total_chapters,
-            total_chapters=total_chapters
+            total_chapters=total_chapters,
+            preview=preview,
+            preview_length=len(preview)
         )
 
     except Exception as e:

+ 44 - 41
test_translation.py

@@ -1,62 +1,65 @@
 #!/usr/bin/env python3
-"""Simple translation test with glossary support."""
+"""Test translation with glossary terms."""
 
 import sys
-from pathlib import Path
 
-# Add project root to path
-project_root = Path('/mnt/code/223-236-template-6')
-sys.path.insert(0, str(project_root))
+# Add torch path first
+torch_path = '/root/.local/lib/python3.11/site-packages'
+if torch_path not in sys.path:
+    sys.path.insert(0, torch_path)
 
-# Direct imports to avoid circular/__init__ issues
-from src.glossary.models import Glossary, GlossaryEntry, TermCategory
-from src.translator.engine import TranslationEngine
-from src.translator.pipeline import TranslationPipeline
+# Add src path
+sys.path.insert(0, '/mnt/code/223-236-template-6/src')
+
+from glossary.models import Glossary, GlossaryEntry, TermCategory
+from translator.pipeline import TranslationPipeline
+from translator.engine import TranslationEngine
 
 
 def main():
-    # 1. 创建术语表并添加术语
+    # Initialize components
+    model_path = '/mnt/code/223-236-template-6/phase0-test/models/m2m100_418M'
+    engine = TranslationEngine(model_path=model_path)
     glossary = Glossary()
-    entry = GlossaryEntry(
-        source="林风",
-        target="Lin Feng",
-        category=TermCategory.CHARACTER,
-        context="Main protagonist"
-    )
-    glossary.add(entry)
-    print(f"Added glossary entry: {entry.source} -> {entry.target}")
 
-    # 2. 创建翻译引擎
-    print("\nLoading translation engine...")
-    engine = TranslationEngine()
-    print(f"Engine loaded. Device: {'GPU' if engine.is_gpu_enabled else 'CPU'}")
+    # Add terms
+    glossary.add(GlossaryEntry(source='林风', target='Lin Feng', category=TermCategory.CHARACTER))
+    glossary.add(GlossaryEntry(source='青云宗', target='Qingyun Sect', category=TermCategory.ORGANIZATION))
+    glossary.add(GlossaryEntry(source='外门弟子', target='outer disciple', category=TermCategory.OTHER))
+
+    print('=' * 50)
+    print('术语表已添加以下术语:')
+    for entry in glossary.get_all():
+        print(f'  - {entry.source} -> {entry.target}')
+    print('=' * 50)
 
-    # 3. 创建翻译管道
+    # Create pipeline
     pipeline = TranslationPipeline(
         engine=engine,
         glossary=glossary,
-        src_lang="zh",
-        tgt_lang="en"
+        src_lang='zh',
+        tgt_lang='en'
     )
 
-    # 4. 测试翻译
-    source_text = "林风是青云宗的一名外门弟子"
-    print(f"\nSource: {source_text}")
+    # Test translation
+    test_text = '林风是青云宗的一名外门弟子'
+    print(f'\n输入: {test_text}')
+    print('=' * 50)
 
-    result = pipeline.translate(source_text, return_details=True)
+    result = pipeline.translate(test_text, return_details=True)
 
-    print(f"\n=== Results ===")
-    print(f"Raw translation: {result.raw_translation}")
-    print(f"Final translation: {result.translated}")
-    print(f"Terms used: {result.terms_used}")
+    print(f'输出: {result.translated}')
+    print(f'使用的术语: {result.terms_used if result.terms_used else []}')
+    print('=' * 50)
 
-    if result.validation:
-        print(f"\nValidation:")
-        print(f"  Success: {result.validation.is_valid}")
-        print(f"  Success rate: {result.validation.success_rate:.1f}%")
-        for term, term_result in result.validation.term_results.items():
-            print(f"  {term}: {'✓' if term_result.success else '✗'} ({term_result.expected})")
+    # Check for _en__ prefix issue
+    if '_en__' in result.translated:
+        print('\n❌ 问题: 发现 _en__ 前缀残留!')
+        return 1
+    else:
+        print('\n✅ 通过: 没有 _en__ 前缀问题!')
+        return 0
 
 
-if __name__ == "__main__":
-    main()
+if __name__ == '__main__':
+    sys.exit(main() or 0)