2
0
Эх сурвалжийг харах

feat(mcp): Implement Novel Translator MCP Server (partial)

- Add 10 MCP tools for translation pipeline
- Implement progress resources for real-time status
- Add 511 lines of unit tests
- Support FastMCP framework with Pydantic validation

Note: cleaning.py file has git object permission issue,
will be added in a follow-up commit.

Tools:
- translate_text, translate_batch, translate_file
- clean_file, split_chapters
- glossary_add, glossary_list, glossary_clear
- check_duplicate, get_fingerprint

Resources:
- progress://{task_id} - Task progress query
- progress://list - List all tasks

Generated with [Claude Code](https://claude.ai/code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>
d8dfun 2 өдөр өмнө
parent
commit
757216f0e1

+ 706 - 0
planning-artifacts/quick-spec-novel-translator-mcp.md

@@ -0,0 +1,706 @@
+---
+title: 'Novel Translator MCP Server'
+slug: 'novel-translator-mcp'
+created: '2026-03-15'
+status: 'ready-for-development'
+stepsCompleted: ['understand', 'investigate', 'generate', 'review']
+tech_stack: ['python>=3.10', 'fastmcp>=0.1', 'pydantic>=2.0', 'uvicorn>=0.30', 'mcp>=1.0']
+files_to_modify:
+  - 'src/mcp_server/__init__.py'
+  - 'src/mcp_server/server.py'
+  - 'src/mcp_server/tools/*.py'
+  - 'requirements.txt'
+  - 'setup.py'
+code_patterns: []
+test_patterns: ['pytest', 'mcp client testing']
+---
+
+# Tech-Spec: Novel Translator MCP Server
+
+**Created:** 2026-03-15
+**Status:** Ready for Development
+
+---
+
+## Overview
+
+### Problem Statement
+
+当前序灵 Matrix 翻译助手缺乏标准的 API 接口,导致以下问题:
+
+1. **无法直接集成到 AI 工作流**:Claude、ChatGPT 等 AI 助手无法直接调用翻译功能
+2. **缺乏标准化接口**:各模块分散,需要单独导入和配置
+3. **难以远程调用**:所有功能限制在本地 Python 进程内
+4. **缺乏进度反馈机制**:长时间翻译任务无法实时报告进度
+
+### Solution
+
+使用 **FastMCP** 框架创建一个 **MCP (Model Context Protocol)** 服务,将现有模块封装为标准 MCP 工具:
+
+```
+Claude AI Client <--MCP--> Novel Translator MCP Server <--内部调用--> 现有模块
+```
+
+**核心设计决策**:
+- 使用 FastMCP 框架(轻量级、类型安全)
+- HTTP 传输模式(支持远程调用)
+- Pydantic Schema 验证(确保输入输出安全)
+- 进度报告回调(长时间任务实时反馈)
+
+### Scope
+
+**In Scope:**
+1. MCP 服务框架搭建(FastMCP + HTTP)
+2. 10 个核心 MCP 工具封装
+   - `translate_text` - 单文本翻译
+   - `translate_batch` - 批量文本翻译
+   - `translate_file` - 文件翻译(完整流水线)
+   - `clean_file` - 文件清洗
+   - `split_chapters` - 章节分割
+   - `glossary_add` - 添加术语
+   - `glossary_list` - 列出术语表
+   - `glossary_clear` - 清空术语表
+   - `check_duplicate` - 文件指纹查重
+   - `get_fingerprint` - 获取文件指纹
+3. Pydantic 输入/输出 Schema 定义
+4. 进度报告机制(via MCP resources)
+5. 错误处理和日志记录
+
+**Out of Scope:**
+- 修改现有核心模块代码
+- 用户认证/授权
+- 数据库迁移
+- UI 重构
+- WebSocket 实时推送(使用 MCP resources 轮询)
+
+---
+
+## Context for Development
+
+### Codebase Patterns
+
+**项目结构**:
+```
+src/
+├── translator/          # 翻译引擎
+│   ├── engine.py       # TranslationEngine (单文本、批量翻译)
+│   └── pipeline.py     # TranslationPipeline (术语+翻译+后处理)
+├── cleaning/            # 文件清洗
+│   └── pipeline.py     # CleaningPipeline (读取、清洗、分割)
+├── glossary/            # 术语管理
+│   ├── models.py       # Glossary, GlossaryEntry
+│   └── pipeline.py     # GlossaryPipeline
+├── fingerprint/         # 文件指纹
+│   └── service.py      # FingerprintService
+└── uploader/            # 上传管理(暂不封装)
+```
+
+**关键模式**:
+- **Pipeline 模式**:`CleaningPipeline.process()`, `TranslationPipeline.translate()`
+- **Fluent API**:`Glossary.add()`, `TranslationEngine.translate()`
+- **Path 类型**:所有文件路径接受 `Path | str`
+- **异常处理**:自定义异常类(`*Error`)
+
+### Files to Reference
+
+| File | Purpose |
+| ---- | ------- |
+| `src/translator/engine.py` | 翻译引擎,`translate()`, `translate_batch()` |
+| `src/translator/pipeline.py` | 完整翻译流水线,`translate()`, `translate_batch()` |
+| `src/cleaning/pipeline.py` | 清洗流水线,`process()`, `process_to_result()` |
+| `src/glossary/models.py` | 术语数据模型,`Glossary`, `GlossaryEntry` |
+| `src/glossary/pipeline.py` | 术语处理流水线 |
+| `src/fingerprint/service.py` | 指纹服务,`check_before_import()`, `get_fingerprint()` |
+| `requirements.txt` | 添加 FastMCP 依赖 |
+
+### Technical Decisions
+
+| Decision | Rationale |
+| -------- | --------- |
+| **FastMCP 框架** | 轻量级、类型安全、与 Pydantic 原生集成 |
+| **HTTP 传输模式** | 支持远程调用,调试方便 |
+| **单例 Glossary** | 术语表在服务生命周期内共享 |
+| **异步支持** | 使用 `async def` 处理长时间任务 |
+| **进度资源** | 使用 MCP Resources 暴露进度状态 |
+
+---
+
+## Implementation Plan
+
+### Tasks
+
+#### Task 1: 项目结构和依赖配置
+
+**文件**: `requirements.txt`, `setup.py`
+
+**操作**:
+1. 在 `requirements.txt` 添加 MCP 依赖:
+   ```
+   fastmcp>=0.1.0
+   mcp>=1.0.0
+   pydantic>=2.0
+   uvicorn>=0.30.0
+   ```
+
+2. 在 `setup.py` 添加 `mcp_server` 包入口点:
+   ```python
+   entry_points={
+       "console_scripts": [
+           "novel-translator-mcp=src.mcp_server:main",
+       ]
+   }
+   ```
+
+#### Task 2: MCP 服务框架搭建
+
+**文件**: `src/mcp_server/__init__.py`, `src/mcp_server/server.py`
+
+**操作**:
+1. 创建 `src/mcp_server/` 目录
+2. 实现 `server.py` 主服务类:
+   ```python
+   from fastmcp import FastMCP
+   from src.translator.engine import TranslationEngine
+   from src.translator.pipeline import TranslationPipeline
+   from src.glossary.models import Glossary
+   from src.cleaning.pipeline import CleaningPipeline
+   from src.fingerprint.service import FingerprintService
+   from src.repository import Repository
+
+   mcp = FastMCP("novel-translator")
+
+   # 全局状态
+   _engine: TranslationEngine | None = None
+   _pipeline: TranslationPipeline | None = None
+   _glossary = Glossary()
+   _cleaning_pipeline = CleaningPipeline()
+   _repository = Repository()
+   _fingerprint_service = FingerprintService(_repository)
+   _progress_state = {}
+   ```
+
+3. 实现 `main()` 启动函数:
+   ```python
+   def main():
+       import mcp.server.cli
+       mcp.server.cli.run()
+   ```
+
+#### Task 3: 工具 1-2 - 翻译工具
+
+**文件**: `src/mcp_server/tools/translation.py`
+
+**Schema 定义**:
+```python
+from pydantic import BaseModel, Field
+from typing import Optional, List
+
+class TranslateTextInput(BaseModel):
+    text: str = Field(description="要翻译的文本")
+    src_lang: str = Field(default="zh", description="源语言代码 (zh, en, etc.)")
+    tgt_lang: str = Field(default="en", description="目标语言代码")
+    max_length: Optional[int] = Field(default=200, description="最大生成长度")
+
+class TranslateTextInput(BaseModel):
+    texts: List[str] = Field(description="要翻译的文本列表")
+    src_lang: str = Field(default="zh", description="源语言代码")
+    tgt_lang: str = Field(default="en", description="目标语言代码")
+    batch_size: int = Field(default=4, description="批处理大小")
+    max_length: Optional[int] = Field(default=200, description="最大生成长度")
+
+class TranslationOutput(BaseModel):
+    success: bool
+    translated: Optional[str] = None
+    translations: Optional[List[str]] = None
+    error: Optional[str] = None
+    terms_used: Optional[List[str]] = None
+```
+
+**工具实现**:
+```python
+@mcp.tool()
+async def translate_text(input: TranslateTextInput) -> TranslationOutput:
+    """翻译单段文本,支持术语表预处理和后处理"""
+    global _pipeline
+    if _pipeline is None:
+        _initialize_pipeline()
+    try:
+        result = _pipeline.translate(input.text, return_details=True)
+        return TranslationOutput(
+            success=True,
+            translated=result.translated,
+            terms_used=result.terms_used
+        )
+    except Exception as e:
+        return TranslationOutput(success=False, error=str(e))
+
+@mcp.tool()
+async def translate_batch(input: TranslateBatchInput) -> TranslationOutput:
+    """批量翻译多段文本"""
+    # 类似实现,使用 translate_batch
+```
+
+#### Task 4: 工具 3 - 文件翻译
+
+**文件**: `src/mcp_server/tools/translation.py`
+
+**Schema 定义**:
+```python
+class TranslateFileInput(BaseModel):
+    file_path: str = Field(description="要翻译的文件路径")
+    src_lang: str = Field(default="zh", description="源语言代码")
+    tgt_lang: str = Field(default="en", description="目标语言代码")
+    output_path: Optional[str] = Field(default=None, description="输出文件路径(默认添加 _en 后缀)")
+
+class TranslationProgress(BaseModel):
+    task_id: str
+    status: str  # pending, processing, completed, failed
+    current_chapter: int
+    total_chapters: int
+    progress_percent: float
+```
+
+**工具实现**:
+```python
+@mcp.tool()
+async def translate_file(input: TranslateFileInput) -> TranslationOutput:
+    """翻译整个 TXT 文件(清洗 -> 分割 -> 翻译 -> 保存)"""
+    task_id = str(uuid.uuid4())
+    _progress_state[task_id] = {
+        "status": "pending",
+        "current": 0,
+        "total": 0,
+        "percent": 0.0
+    }
+
+    try:
+        # 1. 读取和清洗文件
+        chapters = _cleaning_pipeline.process(input.file_path)
+
+        # 2. 更新进度
+        _progress_state[task_id].update({
+            "status": "processing",
+            "total": len(chapters)
+        })
+
+        # 3. 翻译每一章
+        translated_chapters = []
+        for i, chapter in enumerate(chapters):
+            translated = _pipeline.translate(chapter.content)
+            translated_chapters.append({
+                "index": i,
+                "title": chapter.title,
+                "content": translated
+            })
+            # 更新进度
+            _progress_state[task_id].update({
+                "current": i + 1,
+                "percent": (i + 1) / len(chapters) * 100
+            })
+
+        # 4. 保存结果
+        output = input.output_path or _add_suffix(input.file_path, "_en")
+        _save_translated(output, translated_chapters)
+
+        _progress_state[task_id]["status"] = "completed"
+
+        return TranslationOutput(
+            success=True,
+            translated=output,
+            terms_used=[]  # TODO: 收集所有使用的术语
+        )
+    except Exception as e:
+        _progress_state[task_id]["status"] = "failed"
+        return TranslationOutput(success=False, error=str(e))
+```
+
+#### Task 5: 工具 4-5 - 清洗和分割
+
+**文件**: `src/mcp_server/tools/cleaning.py`
+
+**Schema 定义**:
+```python
+class CleanFileInput(BaseModel):
+    file_path: str = Field(description="要清洗的文件路径")
+    output_path: Optional[str] = Field(default=None, description="输出路径")
+    enable_cleaning: bool = Field(default=True, description="是否启用清洗")
+    enable_splitting: bool = Field(default=True, description="是否启用章节分割")
+
+class SplitChaptersInput(BaseModel):
+    text: str = Field(description="要分割的文本")
+    min_chapter_length: int = Field(default=100, description="最小章节长度")
+
+class ChapterInfo(BaseModel):
+    index: int
+    title: str
+    char_count: int
+
+class CleaningOutput(BaseModel):
+    success: bool
+    chapters: Optional[List[ChapterInfo]] = None
+    cleaned_text: Optional[str] = None
+    error: Optional[str] = None
+```
+
+**工具实现**:
+```python
+@mcp.tool()
+async def clean_file(input: CleanFileInput) -> CleaningOutput:
+    """清洗 TXT 文件(去除无效字符、标准化格式、可选章节分割)"""
+    try:
+        pipeline = CleaningPipeline(
+            enable_cleaning=input.enable_cleaning,
+            enable_splitting=input.enable_splitting
+        )
+        chapters = pipeline.process(input.file_path)
+
+        return CleaningOutput(
+            success=True,
+            chapters=[
+                ChapterInfo(
+                    index=c.index,
+                    title=c.title,
+                    char_count=c.char_count
+                )
+                for c in chapters
+            ]
+        )
+    except Exception as e:
+        return CleaningOutput(success=False, error=str(e))
+
+@mcp.tool()
+async def split_chapters(input: SplitChaptersInput) -> CleaningOutput:
+    """将文本分割为章节"""
+    from src.cleaning.splitter import ChapterSplitter
+    splitter = ChapterSplitter(
+        min_chapter_length=input.min_chapter_length
+    )
+    chapters = splitter.split(input.text)
+    # 返回章节列表...
+```
+
+#### Task 6: 工具 6-8 - 术语表管理
+
+**文件**: `src/mcp_server/tools/glossary.py`
+
+**Schema 定义**:
+```python
+class GlossaryAddInput(BaseModel):
+    source: str = Field(description="源语言术语")
+    target: str = Field(description="目标语言术语")
+    category: str = Field(default="other", description="术语类别 (character, skill, location, item, organization, other)")
+    context: str = Field(default="", description="上下文说明")
+
+class GlossaryEntryOutput(BaseModel):
+    source: str
+    target: str
+    category: str
+    context: str
+
+class GlossaryListOutput(BaseModel):
+    success: bool
+    entries: Optional[List[GlossaryEntryOutput]] = None
+    count: Optional[int] = None
+    error: Optional[str] = None
+```
+
+**工具实现**:
+```python
+@mcp.tool()
+async def glossary_add(input: GlossaryAddInput) -> Dict[str, Any]:
+    """添加术语到术语表"""
+    from src.glossary.models import GlossaryEntry, TermCategory
+
+    try:
+        category = TermCategory(input.category)
+        entry = GlossaryEntry(
+            source=input.source,
+            target=input.target,
+            category=category,
+            context=input.context
+        )
+        _glossary.add(entry)
+
+        # 更新流水线
+        if _pipeline:
+            _pipeline.update_glossary(_glossary)
+
+        return {"success": True, "message": f"Added: {input.source} -> {input.target}"}
+    except Exception as e:
+        return {"success": False, "error": str(e)}
+
+@mcp.tool()
+async def glossary_list() -> GlossaryListOutput:
+    """列出术语表所有条目"""
+    entries = [
+        GlossaryEntryOutput(
+            source=e.source,
+            target=e.target,
+            category=e.category.value,
+            context=e.context
+        )
+        for e in _glossary.get_all()
+    ]
+    return GlossaryListOutput(success=True, entries=entries, count=len(entries))
+
+@mcp.tool()
+async def glossary_clear() -> Dict[str, Any]:
+    """清空术语表"""
+    global _glossary
+    _glossary = Glossary()
+    if _pipeline:
+        _pipeline.update_glossary(_glossary)
+    return {"success": True, "message": "Glossary cleared"}
+```
+
+#### Task 7: 工具 9-10 - 指纹服务
+
+**文件**: `src/mcp_server/tools/fingerprint.py`
+
+**Schema 定义**:
+```python
+class CheckDuplicateInput(BaseModel):
+    file_path: str = Field(description="要检查的文件路径")
+
+class GetFingerprintInput(BaseModel):
+    file_path: str = Field(description="要获取指纹的文件路径")
+
+class DuplicateCheckOutput(BaseModel):
+    success: bool
+    is_duplicate: Optional[bool] = None
+    existing_work_id: Optional[str] = None
+    fingerprint: Optional[str] = None
+    error: Optional[str] = None
+```
+
+**工具实现**:
+```python
+@mcp.tool()
+async def check_duplicate(input: CheckDuplicateInput) -> DuplicateCheckOutput:
+    """检查文件是否已翻译(基于 MD5 指纹)"""
+    try:
+        is_dup, work_id = _fingerprint_service.check_before_import(input.file_path)
+        fingerprint = _fingerprint_service.get_fingerprint(input.file_path)
+
+        return DuplicateCheckOutput(
+            success=True,
+            is_duplicate=is_dup,
+            existing_work_id=work_id,
+            fingerprint=fingerprint
+        )
+    except Exception as e:
+        return DuplicateCheckOutput(success=False, error=str(e))
+
+@mcp.tool()
+async def get_fingerprint(input: GetFingerprintInput) -> Dict[str, Any]:
+    """获取文件的 MD5 指纹和元数据"""
+    try:
+        info = _fingerprint_service.get_file_info(input.file_path)
+        return {"success": True, **info}
+    except Exception as e:
+        return {"success": False, "error": str(e)}
+```
+
+#### Task 8: 进度资源
+
+**文件**: `src/mcp_server/resources.py`
+
+**实现**:
+```python
+@mcp.resource("progress://{task_id}")
+async def get_progress(task_id: str) -> str:
+    """获取翻译任务的进度状态"""
+    state = _progress_state.get(task_id)
+    if state:
+        return json.dumps(state, ensure_ascii=False)
+    return json.dumps({"error": "Task not found"})
+```
+
+#### Task 9: 初始化和辅助函数
+
+**文件**: `src/mcp_server/server.py`
+
+**实现**:
+```python
+def _initialize_pipeline():
+    """初始化翻译流水线"""
+    global _engine, _pipeline
+
+    model_path = os.getenv("M2M100_MODEL_PATH", "/mnt/code/223-236-template-6/models/m2m100")
+    _engine = TranslationEngine(model_path=model_path)
+    _pipeline = TranslationPipeline(
+        engine=_engine,
+        glossary=_glossary,
+        src_lang="zh",
+        tgt_lang="en"
+    )
+
+def _add_suffix(path: str, suffix: str) -> str:
+    """给文件路径添加后缀"""
+    p = Path(path)
+    return str(p.with_stem(p.stem + suffix))
+
+def _save_translated(path: str, chapters: List[Dict]):
+    """保存翻译结果"""
+    # 实现保存逻辑...
+```
+
+#### Task 10: 单元测试
+
+**文件**: `tests/test_mcp_server.py`
+
+**测试覆盖**:
+```python
+import pytest
+from src.mcp_server.server import mcp
+from src.mcp_server.tools.translation import translate_text
+from src.mcp_server.tools.glossary import glossary_add, glossary_list
+
+@pytest.mark.asyncio
+async def test_translate_text():
+    # 测试翻译功能
+    result = await translate_text(
+        TranslateTextInput(text="你好世界", src_lang="zh", tgt_lang="en")
+    )
+    assert result.success
+    assert result.translated
+
+@pytest.mark.asyncio
+async def test_glossary_add():
+    # 测试添加术语
+    result = await glossary_add(
+        GlossaryAddInput(source="林风", target="Lin Feng", category="character")
+    )
+    assert result["success"]
+
+    # 验证术语已添加
+    list_result = await glossary_list()
+    assert "林风" in [e.source for e in list_result.entries]
+```
+
+### Acceptance Criteria
+
+#### AC1: MCP 服务可以启动
+
+**Given** Python 环境已安装所有依赖
+**When** 执行 `python -m src.mcp_server`
+**Then** 服务成功启动,日志显示 "MCP server running on stdio"
+
+#### AC2: 翻译工具正常工作
+
+**Given** MCP 服务已启动
+**When** 调用 `translate_text(text="你好世界", src_lang="zh", tgt_lang="en")`
+**Then** 返回 `{"success": true, "translated": "Hello world", ...}`
+
+#### AC3: 术语表功能正常
+
+**Given** MCP 服务已启动
+**When** 执行以下操作:
+1. `glossary_add(source="林风", target="Lin Feng", category="character")`
+2. `translate_text(text="林风是主角")`
+**Then** 翻译结果中包含 "Lin Feng" 而非 "Lin wind"
+
+#### AC4: 文件翻译完整流程
+
+**Given** 有一个 TXT 文件 `novel.txt`
+**When** 调用 `translate_file(file_path="novel.txt")`
+**Then**:
+1. 返回成功
+2. 生成 `novel_en.txt` 文件
+3. 内容为翻译后的英文
+4. 进度资源可查询
+
+#### AC5: 指纹查重正常
+
+**Given** 已翻译过 `novel.txt`
+**When** 调用 `check_duplicate(file_path="novel.txt")`
+**Then** 返回 `{"is_duplicate": true, "existing_work_id": "..."}`
+
+#### AC6: 错误处理正确
+
+**Given** MCP 服务已启动
+**When** 调用 `translate_text(text="")`
+**Then** 返回 `{"success": false, "error": "Text cannot be empty"}`
+
+#### AC7: 单元测试覆盖 >= 80%
+
+**Given** 所有代码已实现
+**When** 运行 `pytest --cov=src/mcp_server`
+**Then** 代码覆盖率 >= 80%
+
+---
+
+## Additional Context
+
+### Dependencies
+
+**新增依赖**:
+```txt
+# MCP Server
+fastmcp>=0.1.0
+mcp>=1.0.0
+pydantic>=2.0
+uvicorn>=0.30.0
+
+# 测试
+pytest-asyncio>=0.23.0
+```
+
+**现有依赖(复用)**:
+- `torch==2.5.1` - 翻译引擎
+- `transformers==4.49.0` - m2m100 模型
+- `transitions==0.9.0` - 状态机
+- `PyQt6==6.8.0` - UI(MCP 服务不需要)
+
+### Testing Strategy
+
+1. **单元测试**: 每个工具独立测试
+2. **集成测试**: 测试完整流程(术语 -> 翻译 -> 验证)
+3. **MCP 客户端测试**: 使用 MCP Inspector 验证工具调用
+4. **性能测试**: 批量翻译性能验证
+
+### Notes
+
+1. **模型路径**: 默认使用 `/mnt/code/223-236-template-6/models/m2m100`,可通过环境变量 `M2M100_MODEL_PATH` 配置
+2. **术语表生命周期**: 术语表在服务重启后清空(可后续添加持久化)
+3. **进度报告**: 使用 MCP Resources 而非实时推送(简化实现)
+4. **异步支持**: 所有工具使用 `async def`,支持并发调用
+5. **错误处理**: 所有异常捕获后返回 `{"success": false, "error": "..."}`
+
+### MCP 客户端配置示例
+
+```json
+{
+  "mcpServers": {
+    "novel-translator": {
+      "command": "python",
+      "args": ["-m", "src.mcp_server"],
+      "env": {
+        "M2M100_MODEL_PATH": "/path/to/models/m2m100"
+      }
+    }
+  }
+}
+```
+
+---
+
+## 附录:完整工具列表
+
+| 工具名 | 功能描述 | 输入 | 输出 |
+|--------|----------|------|------|
+| `translate_text` | 单文本翻译 | text, src_lang, tgt_lang | translated, terms_used |
+| `translate_batch` | 批量文本翻译 | texts, src_lang, tgt_lang | translations |
+| `translate_file` | 文件翻译 | file_path, output_path | output_path |
+| `clean_file` | 文件清洗 | file_path, enable_cleaning | chapters |
+| `split_chapters` | 章节分割 | text, min_chapter_length | chapters |
+| `glossary_add` | 添加术语 | source, target, category | success |
+| `glossary_list` | 列出术语 | - | entries, count |
+| `glossary_clear` | 清空术语 | - | success |
+| `check_duplicate` | 检查重复 | file_path | is_duplicate, work_id |
+| `get_fingerprint` | 获取指纹 | file_path | fingerprint, metadata |
+
+---
+
+**文档状态**: Ready for Development
+**下一步**: 执行 `bmad-quick-dev` 实现此规范

+ 9 - 0
requirements.txt

@@ -19,3 +19,12 @@ requests==2.31.0
 
 # UI (PyQt6 for Epic 7b)
 PyQt6==6.8.0
+
+# MCP Server
+fastmcp>=0.1.0
+mcp>=1.0.0
+pydantic>=2.0
+uvicorn>=0.30.0
+
+# Async Testing
+pytest-asyncio>=0.23.0

+ 12 - 0
setup.py

@@ -33,11 +33,23 @@ setup(
         "dev": [
             "pytest>=7.4.0",
             "pytest-cov>=4.1.0",
+            "pytest-asyncio>=0.23.0",
         ],
         "ml": [
             "torch>=2.0.0",
             "transformers>=4.30.0",
             "sentencepiece>=0.1.99",
         ],
+        "mcp": [
+            "fastmcp>=0.1.0",
+            "mcp>=1.0.0",
+            "pydantic>=2.0",
+            "uvicorn>=0.30.0",
+        ],
+    },
+    entry_points={
+        "console_scripts": [
+            "novel-translator-mcp=mcp_server.server:main",
+        ]
     },
 )

+ 400 - 0
src/mcp_server/README.md

@@ -0,0 +1,400 @@
+# Novel Translator MCP Server
+
+A Model Context Protocol (MCP) server that exposes novel translation capabilities as MCP tools for integration with AI assistants like Claude and ChatGPT.
+
+## Features
+
+- **Text Translation**: Translate single or batch texts with glossary support
+- **File Translation**: Complete pipeline for translating TXT files
+- **File Cleaning**: Clean and normalize novel files
+- **Chapter Splitting**: Intelligent chapter detection and splitting
+- **Glossary Management**: Manage translation terminology
+- **Duplicate Detection**: File fingerprint-based duplicate checking
+- **Progress Tracking**: Real-time progress reporting for long tasks
+
+## Installation
+
+### From Source
+
+```bash
+# Clone the repository
+cd /mnt/code/223-236-template-6
+
+# Install with MCP extras
+pip install -e ".[mcp,ml]"
+```
+
+### Requirements
+
+- Python >= 3.10
+- PyTorch >= 2.0
+- transformers >= 4.30
+- fastmcp >= 0.1
+- mcp >= 1.0
+
+## Usage
+
+### Running the Server
+
+```bash
+# Using the entry point
+novel-translator-mcp
+
+# Or directly with Python
+python -m src.mcp_server
+```
+
+### Environment Variables
+
+| Variable | Default | Description |
+|----------|---------|-------------|
+| `M2M100_MODEL_PATH` | `/mnt/code/223-236-template-6/models/m2m100` | Path to m2m100 model files |
+
+### MCP Client Configuration
+
+Add to your MCP client configuration (e.g., Claude Desktop):
+
+```json
+{
+  "mcpServers": {
+    "novel-translator": {
+      "command": "python",
+      "args": ["-m", "src.mcp_server"],
+      "env": {
+        "M2M100_MODEL_PATH": "/path/to/models/m2m100"
+      }
+    }
+  }
+}
+```
+
+## Available Tools
+
+### Translation Tools
+
+#### `translate_text`
+
+Translate a single text segment with glossary support.
+
+**Parameters:**
+- `text` (str): The text to translate
+- `src_lang` (str): Source language code (default: "zh")
+- `tgt_lang` (str): Target language code (default: "en")
+- `max_length` (int): Maximum generation length (default: 200)
+
+**Returns:**
+```json
+{
+  "success": true,
+  "translated": "Translated text here",
+  "terms_used": ["term1", "term2"]
+}
+```
+
+#### `translate_batch`
+
+Translate multiple text segments in batch mode.
+
+**Parameters:**
+- `texts` (List[str]): List of texts to translate
+- `src_lang` (str): Source language code (default: "zh")
+- `tgt_lang` (str): Target language code (default: "en")
+- `batch_size` (int): Batch processing size (default: 4)
+
+**Returns:**
+```json
+{
+  "success": true,
+  "translations": ["Translation 1", "Translation 2"],
+  "terms_used": ["term1", "term2"]
+}
+```
+
+#### `translate_file`
+
+Translate an entire TXT file with the complete pipeline.
+
+**Parameters:**
+- `file_path` (str): Path to the file to translate
+- `src_lang` (str): Source language code (default: "zh")
+- `tgt_lang` (str): Target language code (default: "en")
+- `output_path` (str, optional): Output file path (default: adds "_en" suffix)
+- `enable_cleaning` (bool): Enable file cleaning (default: true)
+- `enable_splitting` (bool): Enable chapter splitting (default: true)
+
+**Returns:**
+```json
+{
+  "success": true,
+  "output_path": "/path/to/file_en.txt",
+  "task_id": "uuid-here",
+  "terms_used": ["term1", "term2"],
+  "chapters_translated": 10
+}
+```
+
+### Cleaning Tools
+
+#### `clean_file`
+
+Clean a TXT file by removing invalid characters and normalizing format.
+
+**Parameters:**
+- `file_path` (str): Path to the file to clean
+- `output_path` (str, optional): Output path for cleaned file
+- `enable_cleaning` (bool): Enable cleaning (default: true)
+- `enable_splitting` (bool): Enable chapter splitting (default: true)
+
+**Returns:**
+```json
+{
+  "success": true,
+  "chapters": [...],
+  "chapter_count": 10,
+  "total_chars": 50000,
+  "output_path": "/path/to/cleaned.txt"
+}
+```
+
+#### `split_chapters`
+
+Split text into chapters based on chapter titles.
+
+**Parameters:**
+- `text` (str): The text to split
+- `min_chapter_length` (int): Minimum chapter length (default: 100)
+
+**Returns:**
+```json
+{
+  "success": true,
+  "chapters": [...],
+  "chapter_count": 10,
+  "cleaned_text": "..."
+}
+```
+
+### Glossary Tools
+
+#### `glossary_add`
+
+Add a term to the translation glossary.
+
+**Parameters:**
+- `source` (str): Source language term
+- `target` (str): Target language term
+- `category` (str): Term category (character, skill, location, item, organization, other)
+- `context` (str): Context description (optional)
+
+**Returns:**
+```json
+{
+  "success": true,
+  "message": "Added term: 林风 -> Lin Feng",
+  "entry": {
+    "source": "林风",
+    "target": "Lin Feng",
+    "category": "character",
+    "context": "Main protagonist"
+  }
+}
+```
+
+#### `glossary_list`
+
+List all glossary entries.
+
+**Returns:**
+```json
+{
+  "success": true,
+  "entries": [...],
+  "count": 10
+}
+```
+
+#### `glossary_clear`
+
+Clear all glossary entries.
+
+**Returns:**
+```json
+{
+  "success": true,
+  "message": "Cleared 10 glossary entries"
+}
+```
+
+### Fingerprint Tools
+
+#### `check_duplicate`
+
+Check if a file has already been translated using MD5 fingerprint.
+
+**Parameters:**
+- `file_path` (str): Path to the file to check
+
+**Returns:**
+```json
+{
+  "success": true,
+  "is_duplicate": false,
+  "existing_work_id": null,
+  "fingerprint": "abc123...",
+  "file_size": 10000
+}
+```
+
+#### `get_fingerprint`
+
+Get the MD5 fingerprint and metadata of a file.
+
+**Parameters:**
+- `file_path` (str): Path to the file
+
+**Returns:**
+```json
+{
+  "success": true,
+  "fingerprint": "abc123...",
+  "file_name": "novel.txt",
+  "file_size": 10000
+}
+```
+
+## Progress Resources
+
+### `progress://{task_id}`
+
+Get progress status for a specific translation task.
+
+```
+mcp://novel-translator/progress/abc-123-def-456
+```
+
+### `progress://list`
+
+List all active and recent tasks.
+
+```
+mcp://novel-translator/progress/list
+```
+
+## Example Workflows
+
+### Basic Translation
+
+```python
+# Add glossary terms
+await glossary_add(
+    source="林风",
+    target="Lin Feng",
+    category="character",
+    context="Main protagonist"
+)
+
+# Translate text
+result = await translate_text(
+    text="林风是主角",
+    src_lang="zh",
+    tgt_lang="en"
+)
+# Returns: "Lin Feng is the protagonist"
+```
+
+### File Translation
+
+```python
+# Translate a file
+result = await translate_file(
+    file_path="/path/to/novel.txt",
+    output_path="/path/to/novel_en.txt"
+)
+
+# Check progress
+progress = await get_progress_resource(result["task_id"])
+```
+
+### Duplicate Prevention
+
+```python
+# Check if file was already translated
+check = await check_duplicate(file_path="/path/to/novel.txt")
+
+if check["is_duplicate"]:
+    print(f"Already translated: {check['existing_work_id']}")
+else:
+    # Proceed with translation
+    await translate_file(file_path="/path/to/novel.txt")
+```
+
+## Testing
+
+Run the test suite:
+
+```bash
+# Run all tests
+pytest tests/test_mcp_server.py -v
+
+# Run with coverage
+pytest tests/test_mcp_server.py --cov=src/mcp_server --cov-report=term
+
+# Run specific test
+pytest tests/test_mcp_server.py::TestTranslationTools::test_translate_text -v
+```
+
+## Development
+
+### Project Structure
+
+```
+src/mcp_server/
+├── __init__.py           # Package init
+├── server.py             # Main MCP server
+├── tools/
+│   ├── __init__.py
+│   ├── translation.py    # Translation tools
+│   ├── cleaning.py       # Cleaning tools
+│   ├── glossary.py       # Glossary tools
+│   └── fingerprint.py    # Fingerprint tools
+└── README.md             # This file
+```
+
+### Adding New Tools
+
+1. Create the tool function in the appropriate `tools/` module
+2. Add proper Pydantic schema for input validation
+3. Decorate with `@mcp.tool()` in `server.py`
+4. Add tests in `tests/test_mcp_server.py`
+
+## Troubleshooting
+
+### Model Not Found
+
+If you get a model loading error:
+
+```bash
+export M2M100_MODEL_PATH="/path/to/your/m2m100/model"
+```
+
+### Import Errors
+
+Make sure all dependencies are installed:
+
+```bash
+pip install -e ".[mcp,ml]"
+```
+
+### MCP Connection Issues
+
+1. Verify the server starts: `python -m src.mcp_server`
+2. Check MCP client configuration
+3. Ensure model path is correct
+
+## License
+
+This project is part of the BMAD Novel Translator project.
+
+## Version
+
+Current version: 0.1.0

+ 14 - 0
src/mcp_server/__init__.py

@@ -0,0 +1,14 @@
+"""
+Novel Translator MCP Server
+
+A Model Context Protocol server that exposes novel translation
+capabilities as MCP tools for integration with AI assistants.
+
+Usage:
+    python -m mcp_server
+
+Or via entry point:
+    novel-translator-mcp
+"""
+
+__version__ = "0.1.0"

+ 747 - 0
src/mcp_server/server.py

@@ -0,0 +1,747 @@
+"""
+Novel Translator MCP Server
+
+Main server module that exposes translation capabilities via MCP protocol.
+"""
+
+import asyncio
+import json
+import os
+import uuid
+from pathlib import Path
+from typing import Any, Dict, List, Optional
+
+from fastmcp import FastMCP
+
+from ..translator.engine import TranslationEngine
+from ..translator.pipeline import TranslationPipeline
+from ..glossary.models import Glossary, GlossaryEntry, TermCategory
+from ..cleaning.pipeline import CleaningPipeline
+from ..fingerprint.service import FingerprintService
+from ..repository import Repository
+
+
+# Create FastMCP server instance
+mcp = FastMCP("novel-translator")
+
+# Global state for translation components
+_engine: Optional[TranslationEngine] = None
+_pipeline: Optional[TranslationPipeline] = None
+_glossary: Glossary = Glossary()
+_cleaning_pipeline: CleaningPipeline = CleaningPipeline()
+_repository: Optional[Repository] = None
+_fingerprint_service: Optional[FingerprintService] = None
+
+# Progress state for long-running tasks
+_progress_state: Dict[str, Dict[str, Any]] = {}
+_progress_lock = asyncio.Lock()
+
+
+def _get_model_path() -> str:
+    """Get the model path from environment or default."""
+    return os.getenv(
+        "M2M100_MODEL_PATH",
+        "/mnt/code/223-236-template-6/models/m2m100"
+    )
+
+
+def _initialize_components() -> None:
+    """Initialize all translation components."""
+    global _engine, _pipeline, _repository, _fingerprint_service
+
+    if _engine is not None:
+        return
+
+    model_path = _get_model_path()
+
+    # Initialize translation engine
+    _engine = TranslationEngine(model_path=model_path)
+
+    # Initialize translation pipeline
+    _pipeline = TranslationPipeline(
+        engine=_engine,
+        glossary=_glossary,
+        src_lang="zh",
+        tgt_lang="en"
+    )
+
+    # Initialize repository and fingerprint service
+    _repository = Repository()
+    _fingerprint_service = FingerprintService(_repository)
+
+
+def get_pipeline() -> TranslationPipeline:
+    """Get or initialize the translation pipeline."""
+    global _pipeline
+    if _pipeline is None:
+        _initialize_components()
+    return _pipeline
+
+
+def get_glossary() -> Glossary:
+    """Get the shared glossary instance."""
+    return _glossary
+
+
+def get_cleaning_pipeline() -> CleaningPipeline:
+    """Get the cleaning pipeline instance."""
+    return _cleaning_pipeline
+
+
+def get_fingerprint_service() -> FingerprintService:
+    """Get or initialize the fingerprint service."""
+    global _fingerprint_service
+    if _fingerprint_service is None:
+        _initialize_components()
+    return _fingerprint_service
+
+
+async def update_progress(task_id: str, updates: Dict[str, Any]) -> None:
+    """Update progress state for a task."""
+    async with _progress_lock:
+        if task_id in _progress_state:
+            _progress_state[task_id].update(updates)
+
+
+def create_task(
+    task_type: str,
+    total: int = 0,
+    metadata: Optional[Dict[str, Any]] = None
+) -> str:
+    """Create a new progress tracking task."""
+    task_id = str(uuid.uuid4())
+    _progress_state[task_id] = {
+        "task_id": task_id,
+        "type": task_type,
+        "status": "pending",
+        "current": 0,
+        "total": total,
+        "percent": 0.0,
+        "metadata": metadata or {}
+    }
+    return task_id
+
+
+async def complete_task(task_id: str, success: bool = True) -> None:
+    """Mark a task as completed or failed."""
+    async with _progress_lock:
+        if task_id in _progress_state:
+            _progress_state[task_id]["status"] = "completed" if success else "failed"
+            if success:
+                _progress_state[task_id]["percent"] = 100.0
+
+
+async def notify_glossary_updated() -> None:
+    """Notify the pipeline that the glossary has been updated."""
+    global _pipeline
+    if _pipeline is not None:
+        _pipeline.update_glossary(_glossary)
+
+
+# ============================================================================
+# Translation Tools
+# ============================================================================
+
+@mcp.tool()
+async def translate_text(
+    text: str,
+    src_lang: str = "zh",
+    tgt_lang: str = "en",
+    max_length: int = 200
+) -> Dict[str, Any]:
+    """
+    翻译单段文本,支持术语表预处理和后处理。
+
+    此工具使用完整的翻译流水线,包括:
+    - 术语表预处理(替换术语为占位符)
+    - m2m100 模型翻译
+    - 后处理(还原术语并清洗)
+
+    Args:
+        text: 要翻译的文本
+        src_lang: 源语言代码 (zh, en, etc.)
+        tgt_lang: 目标语言代码
+        max_length: 最大生成长度
+
+    Returns:
+        包含翻译结果和使用的术语的字典
+    """
+    try:
+        if not text or not text.strip():
+            return {"success": False, "error": "Text cannot be empty"}
+
+        pipeline = get_pipeline()
+
+        if src_lang != pipeline.src_lang or tgt_lang != pipeline.tgt_lang:
+            pipeline.set_languages(src_lang, tgt_lang)
+
+        result = pipeline.translate(text, return_details=True)
+
+        return {
+            "success": True,
+            "translated": result.translated,
+            "terms_used": result.terms_used if result.terms_used else []
+        }
+
+    except Exception as e:
+        return {"success": False, "error": str(e)}
+
+
+@mcp.tool()
+async def translate_batch(
+    texts: List[str],
+    src_lang: str = "zh",
+    tgt_lang: str = "en",
+    batch_size: int = 4,
+    max_length: int = 200
+) -> Dict[str, Any]:
+    """
+    批量翻译多段文本。
+
+    此工具使用批处理模式提高翻译效率,支持术语表。
+
+    Args:
+        texts: 要翻译的文本列表
+        src_lang: 源语言代码
+        tgt_lang: 目标语言代码
+        batch_size: 批处理大小
+        max_length: 最大生成长度
+
+    Returns:
+        包含翻译结果列表的字典
+    """
+    try:
+        if not texts:
+            return {"success": False, "error": "Texts list cannot be empty"}
+
+        pipeline = get_pipeline()
+
+        if src_lang != pipeline.src_lang or tgt_lang != pipeline.tgt_lang:
+            pipeline.set_languages(src_lang, tgt_lang)
+
+        results = pipeline.translate_batch(texts, return_details=True)
+
+        translations = []
+        all_terms_used = set()
+
+        for r in results:
+            translations.append(r.translated)
+            if r.terms_used:
+                all_terms_used.update(r.terms_used)
+
+        return {
+            "success": True,
+            "translations": translations,
+            "terms_used": list(all_terms_used)
+        }
+
+    except Exception as e:
+        return {"success": False, "error": str(e)}
+
+
+def _add_suffix(path: str, suffix: str) -> str:
+    """Add a suffix to a file path (before extension)."""
+    p = Path(path)
+    return str(p.with_stem(p.stem + suffix))
+
+
+@mcp.tool()
+async def translate_file(
+    file_path: str,
+    src_lang: str = "zh",
+    tgt_lang: str = "en",
+    output_path: Optional[str] = None,
+    enable_cleaning: bool = True,
+    enable_splitting: bool = True
+) -> Dict[str, Any]:
+    """
+    翻译整个 TXT 文件(完整流水线)。
+
+    此工具执行完整的翻译流程:
+    1. 文件清洗(去除无效字符、标准化格式)
+    2. 章节分割(按章节标题分割)
+    3. 逐章翻译(使用术语表)
+    4. 保存结果(保留章节结构)
+
+    Args:
+        file_path: 要翻译的文件路径
+        src_lang: 源语言代码
+        tgt_lang: 目标语言代码
+        output_path: 输出文件路径(默认添加 _en 后缀)
+        enable_cleaning: 是否启用文件清洗
+        enable_splitting: 是否启用章节分割
+
+    Returns:
+        包含输出文件路径和任务ID的字典
+    """
+    from ..cleaning.models import Chapter
+
+    file_path_obj = Path(file_path)
+    if not file_path_obj.exists():
+        return {"success": False, "error": f"File not found: {file_path}"}
+
+    task_id = create_task(
+        task_type="file_translation",
+        metadata={"file_path": file_path}
+    )
+
+    try:
+        pipeline = get_pipeline()
+        cleaning_pipeline = get_cleaning_pipeline()
+
+        if src_lang != pipeline.src_lang or tgt_lang != pipeline.tgt_lang:
+            pipeline.set_languages(src_lang, tgt_lang)
+
+        cleaning_pipeline.enable_cleaning = enable_cleaning
+        cleaning_pipeline.enable_splitting = enable_splitting
+
+        await update_progress(task_id, {
+            "status": "cleaning",
+            "message": "Reading and cleaning file..."
+        })
+
+        chapters: List[Chapter] = cleaning_pipeline.process(file_path_obj)
+        total_chapters = len(chapters)
+
+        await update_progress(task_id, {
+            "status": "translating",
+            "total": total_chapters,
+            "current": 0,
+            "percent": 0.0,
+            "message": f"Starting translation of {total_chapters} chapters..."
+        })
+
+        translated_chapters = []
+        all_terms_used = set()
+
+        for i, chapter in enumerate(chapters):
+            result = pipeline.translate(chapter.content, return_details=True)
+
+            translated_chapters.append({
+                "index": i,
+                "title": chapter.title,
+                "content": result.translated
+            })
+
+            if result.terms_used:
+                all_terms_used.update(result.terms_used)
+
+            percent = (i + 1) / total_chapters * 100
+            await update_progress(task_id, {
+                "status": "translating",
+                "current": i + 1,
+                "percent": percent,
+                "message": f"Translated chapter {i + 1}/{total_chapters}: {chapter.title}"
+            })
+
+        output = output_path or _add_suffix(file_path, "_en")
+        output_file = Path(output)
+
+        await update_progress(task_id, {
+            "status": "saving",
+            "message": f"Saving translated file to {output}..."
+        })
+
+        output_file.parent.mkdir(parents=True, exist_ok=True)
+
+        with open(output_file, "w", encoding="utf-8") as f:
+            for chapter in translated_chapters:
+                if chapter["title"]:
+                    f.write(f"## {chapter['title']}\n\n")
+                f.write(chapter["content"])
+                f.write("\n\n")
+
+        await complete_task(task_id, success=True)
+
+        await update_progress(task_id, {
+            "status": "completed",
+            "message": f"Translation completed. Output saved to {output}"
+        })
+
+        return {
+            "success": True,
+            "output_path": output,
+            "task_id": task_id,
+            "terms_used": list(all_terms_used),
+            "chapters_translated": total_chapters
+        }
+
+    except Exception as e:
+        await complete_task(task_id, success=False)
+
+        await update_progress(task_id, {
+            "status": "failed",
+            "error": str(e)
+        })
+
+        return {"success": False, "error": str(e), "task_id": task_id}
+
+
+# ============================================================================
+# Cleaning Tools
+# ============================================================================
+
+@mcp.tool()
+async def clean_file(
+    file_path: str,
+    output_path: Optional[str] = None,
+    enable_cleaning: bool = True,
+    enable_splitting: bool = True
+) -> Dict[str, Any]:
+    """
+    清洗 TXT 文件(去除无效字符、标准化格式、可选章节分割)。
+
+    此工具执行以下操作:
+    - 自动检测文件编码
+    - 去除无效字符和控制字符
+    - 标准化空白字符和标点
+    - 可选章节分割
+
+    Args:
+        file_path: 要清洗的文件路径
+        output_path: 输出路径(可选)
+        enable_cleaning: 是否启用清洗
+        enable_splitting: 是否启用章节分割
+
+    Returns:
+        包含章节信息和清洗结果的字典
+    """
+    file_path_obj = Path(file_path)
+    if not file_path_obj.exists():
+        return {"success": False, "error": f"File not found: {file_path}"}
+
+    try:
+        pipeline = get_cleaning_pipeline()
+        pipeline.enable_cleaning = enable_cleaning
+        pipeline.enable_splitting = enable_splitting
+
+        chapters = pipeline.process(file_path_obj)
+
+        chapter_list = [
+            {
+                "index": c.index,
+                "title": c.title,
+                "char_count": c.char_count
+            }
+            for c in chapters
+        ]
+
+        saved_path = None
+        if output_path:
+            output_file = Path(output_path)
+            output_file.parent.mkdir(parents=True, exist_ok=True)
+
+            with open(output_file, "w", encoding="utf-8") as f:
+                for chapter in chapters:
+                    if chapter.title:
+                        f.write(f"## {chapter.title}\n\n")
+                    f.write(chapter.content)
+                    f.write("\n\n")
+
+            saved_path = str(output_file)
+
+        total_chars = sum(c.char_count for c in chapters)
+
+        return {
+            "success": True,
+            "chapters": chapter_list,
+            "chapter_count": len(chapters),
+            "total_chars": total_chars,
+            "output_path": saved_path
+        }
+
+    except Exception as e:
+        return {"success": False, "error": f"Cleaning failed: {str(e)}"}
+
+
+@mcp.tool()
+async def split_chapters(
+    text: str,
+    min_chapter_length: int = 100
+) -> Dict[str, Any]:
+    """
+    将文本分割为章节。
+
+    此工具使用智能章节分割算法,识别常见的章节标题格式。
+
+    Args:
+        text: 要分割的文本
+        min_chapter_length: 最小章节长度
+
+    Returns:
+        包含分割后的章节信息的字典
+    """
+    from ..cleaning.splitter import ChapterSplitter
+
+    try:
+        if not text or not text.strip():
+            return {"success": False, "error": "Text cannot be empty"}
+
+        splitter = ChapterSplitter(min_chapter_length=min_chapter_length)
+        chapters = splitter.split(text)
+
+        chapter_list = [
+            {
+                "index": c.index,
+                "title": c.title,
+                "char_count": c.char_count
+            }
+            for c in chapters
+        ]
+
+        total_chars = sum(c.char_count for c in chapters)
+
+        cleaned_text = "\n\n".join([
+            f"## {c.title}\n\n{c.content}" if c.title else c.content
+            for c in chapters
+        ])
+
+        return {
+            "success": True,
+            "chapters": chapter_list,
+            "chapter_count": len(chapters),
+            "total_chars": total_chars,
+            "cleaned_text": cleaned_text
+        }
+
+    except Exception as e:
+        return {"success": False, "error": f"Chapter splitting failed: {str(e)}"}
+
+
+# ============================================================================
+# Glossary Tools
+# ============================================================================
+
+@mcp.tool()
+async def glossary_add(
+    source: str,
+    target: str,
+    category: str = "other",
+    context: str = ""
+) -> Dict[str, Any]:
+    """
+    添加术语到术语表。
+
+    术语表用于确保关键术语的翻译一致性。
+
+    支持的类别:
+    - character: 角色名称
+    - skill: 技能名称
+    - location: 地点名称
+    - item: 物品名称
+    - organization: 组织名称
+    - other: 其他术语
+
+    Args:
+        source: 源语言术语
+        target: 目标语言术语
+        category: 术语类别
+        context: 上下文说明
+
+    Returns:
+        操作结果
+    """
+    try:
+        if not source or not source.strip():
+            return {"success": False, "error": "Source term cannot be empty"}
+        if not target or not target.strip():
+            return {"success": False, "error": "Target term cannot be empty"}
+
+        glossary = get_glossary()
+
+        try:
+            cat_enum = TermCategory(category)
+        except ValueError:
+            cat_enum = TermCategory.OTHER
+
+        entry = GlossaryEntry(
+            source=source.strip(),
+            target=target.strip(),
+            category=cat_enum,
+            context=context.strip()
+        )
+
+        glossary.add(entry)
+        await notify_glossary_updated()
+
+        return {
+            "success": True,
+            "message": f"Added term: {source} -> {target}",
+            "entry": {
+                "source": entry.source,
+                "target": entry.target,
+                "category": entry.category.value,
+                "context": entry.context
+            }
+        }
+
+    except Exception as e:
+        return {"success": False, "error": str(e)}
+
+
+@mcp.tool()
+async def glossary_list() -> Dict[str, Any]:
+    """
+    列出术语表所有条目。
+
+    Returns:
+        包含所有术语条目的字典
+    """
+    try:
+        glossary = get_glossary()
+
+        entries = [
+            {
+                "source": e.source,
+                "target": e.target,
+                "category": e.category.value,
+                "context": e.context
+            }
+            for e in glossary.get_all()
+        ]
+
+        return {
+            "success": True,
+            "entries": entries,
+            "count": len(entries)
+        }
+
+    except Exception as e:
+        return {"success": False, "error": str(e)}
+
+
+@mcp.tool()
+async def glossary_clear() -> Dict[str, Any]:
+    """
+    清空术语表。
+
+    删除术语表中的所有条目。此操作不可撤销。
+
+    Returns:
+        操作结果
+    """
+    try:
+        glossary = get_glossary()
+
+        count = len(glossary)
+        glossary._terms.clear()
+
+        await notify_glossary_updated()
+
+        return {
+            "success": True,
+            "message": f"Cleared {count} glossary entries"
+        }
+
+    except Exception as e:
+        return {"success": False, "error": str(e)}
+
+
+# ============================================================================
+# Fingerprint Tools
+# ============================================================================
+
+@mcp.tool()
+async def check_duplicate(file_path: str) -> Dict[str, Any]:
+    """
+    检查文件是否已翻译(基于 MD5 指纹)。
+
+    此工具使用文件指纹技术检查是否已经翻译过相同的文件。
+
+    Args:
+        file_path: 要检查的文件路径
+
+    Returns:
+        包含查重结果的字典
+    """
+    file_path_obj = Path(file_path)
+    if not file_path_obj.exists():
+        return {"success": False, "error": f"File not found: {file_path}"}
+
+    try:
+        service = get_fingerprint_service()
+
+        is_duplicate, work_id = service.check_before_import(file_path)
+        fingerprint = service.get_fingerprint(file_path)
+        file_size = file_path_obj.stat().st_size
+
+        return {
+            "success": True,
+            "is_duplicate": is_duplicate,
+            "existing_work_id": work_id,
+            "fingerprint": fingerprint,
+            "file_size": file_size
+        }
+
+    except Exception as e:
+        return {"success": False, "error": f"Duplicate check failed: {str(e)}"}
+
+
+@mcp.tool()
+async def get_fingerprint(file_path: str) -> Dict[str, Any]:
+    """
+    获取文件的 MD5 指纹和元数据。
+
+    Args:
+        file_path: 要获取指纹的文件路径
+
+    Returns:
+        包含指纹和元数据的字典
+    """
+    file_path_obj = Path(file_path)
+    if not file_path_obj.exists():
+        return {"success": False, "error": f"File not found: {file_path}"}
+
+    try:
+        service = get_fingerprint_service()
+
+        fingerprint = service.get_fingerprint(file_path)
+        file_size = file_path_obj.stat().st_size
+        file_name = file_path_obj.name
+
+        return {
+            "success": True,
+            "fingerprint": fingerprint,
+            "file_name": file_name,
+            "file_size": file_size
+        }
+
+    except Exception as e:
+        return {"success": False, "error": f"Failed to get fingerprint: {str(e)}"}
+
+
+# ============================================================================
+# Progress Resources
+# ============================================================================
+
+@mcp.resource("progress://{task_id}")
+async def get_progress_resource(task_id: str) -> str:
+    """Get progress status for a translation task."""
+    async with _progress_lock:
+        state = _progress_state.get(task_id)
+        if state:
+            return json.dumps(state, ensure_ascii=False)
+        return json.dumps({
+            "error": "Task not found",
+            "task_id": task_id
+        }, ensure_ascii=False)
+
+
+@mcp.resource("progress://list")
+async def list_all_progress() -> str:
+    """List all active and recent tasks."""
+    async with _progress_lock:
+        tasks = list(_progress_state.values())
+        return json.dumps({
+            "tasks": tasks,
+            "count": len(tasks)
+        }, ensure_ascii=False)
+
+
+def main():
+    """Main entry point for the MCP server."""
+    mcp.run(transport="stdio")
+
+
+if __name__ == "__main__":
+    main()

+ 33 - 0
src/mcp_server/tools/__init__.py

@@ -0,0 +1,33 @@
+"""
+MCP Tools for Novel Translator
+
+This package contains all MCP tool implementations.
+"""
+
+from .translation import (
+    translate_text,
+    translate_batch,
+    translate_file,
+    register_progress_callback
+)
+from .cleaning import clean_file, split_chapters
+from .glossary import glossary_add, glossary_list, glossary_clear
+from .fingerprint import check_duplicate, get_fingerprint
+
+__all__ = [
+    # Translation tools
+    "translate_text",
+    "translate_batch",
+    "translate_file",
+    "register_progress_callback",
+    # Cleaning tools
+    "clean_file",
+    "split_chapters",
+    # Glossary tools
+    "glossary_add",
+    "glossary_list",
+    "glossary_clear",
+    # Fingerprint tools
+    "check_duplicate",
+    "get_fingerprint",
+]

+ 150 - 0
src/mcp_server/tools/fingerprint.py

@@ -0,0 +1,150 @@
+"""
+Fingerprint tools for the Novel Translator MCP server.
+
+This module provides MCP tools for file fingerprinting and duplicate checking.
+"""
+
+from pathlib import Path
+from typing import Any, Dict, Optional
+
+from pydantic import BaseModel, Field
+
+
+class CheckDuplicateInput(BaseModel):
+    """Input schema for duplicate checking."""
+    file_path: str = Field(description="要检查的文件路径")
+
+
+class GetFingerprintInput(BaseModel):
+    """Input schema for fingerprint retrieval."""
+    file_path: str = Field(description="要获取指纹的文件路径")
+
+
+class DuplicateCheckOutput(BaseModel):
+    """Output schema for duplicate check results."""
+    success: bool
+    is_duplicate: Optional[bool] = None
+    existing_work_id: Optional[str] = None
+    fingerprint: Optional[str] = None
+    file_size: Optional[int] = None
+    error: Optional[str] = None
+
+
+class FingerprintOutput(BaseModel):
+    """Output schema for fingerprint retrieval."""
+    success: bool
+    fingerprint: Optional[str] = None
+    file_name: Optional[str] = None
+    file_size: Optional[int] = None
+    error: Optional[str] = None
+
+
+async def check_duplicate(
+    input: CheckDuplicateInput,
+    get_fingerprint_service_fn: callable = None
+) -> DuplicateCheckOutput:
+    """
+    检查文件是否已翻译(基于 MD5 指纹)。
+
+    此工具使用文件指纹技术检查是否已经翻译过相同的文件。
+    指纹基于文件的 MD5 哈希值,与文件名无关。
+
+    Args:
+        input: 查重输入参数
+        get_fingerprint_service_fn: 获取指纹服务的函数
+
+    Returns:
+        DuplicateCheckOutput 包含查重结果
+    """
+    from ..server import get_fingerprint_service as default_get_service
+
+    get_service = get_fingerprint_service_fn or default_get_service
+
+    # Validate file exists
+    file_path = Path(input.file_path)
+    if not file_path.exists():
+        return DuplicateCheckOutput(
+            success=False,
+            error=f"File not found: {input.file_path}"
+        )
+
+    try:
+        # Get fingerprint service
+        service = get_service()
+
+        # Check for duplicate
+        is_duplicate, work_id = service.check_before_import(input.file_path)
+
+        # Get fingerprint
+        fingerprint = service.get_fingerprint(input.file_path)
+
+        # Get file size
+        file_size = file_path.stat().st_size
+
+        return DuplicateCheckOutput(
+            success=True,
+            is_duplicate=is_duplicate,
+            existing_work_id=work_id,
+            fingerprint=fingerprint,
+            file_size=file_size
+        )
+
+    except Exception as e:
+        return DuplicateCheckOutput(
+            success=False,
+            error=f"Duplicate check failed: {str(e)}"
+        )
+
+
+async def get_fingerprint(
+    input: GetFingerprintInput,
+    get_fingerprint_service_fn: callable = None
+) -> FingerprintOutput:
+    """
+    获取文件的 MD5 指纹和元数据。
+
+    此工具返回文件的唯一指纹(MD5哈希)和基本信息,
+    可用于文件识别和版本管理。
+
+    Args:
+        input: 指纹获取输入参数
+        get_fingerprint_service_fn: 获取指纹服务的函数
+
+    Returns:
+        FingerprintOutput 包含指纹和元数据
+    """
+    from ..server import get_fingerprint_service as default_get_service
+
+    get_service = get_fingerprint_service_fn or default_get_service
+
+    # Validate file exists
+    file_path = Path(input.file_path)
+    if not file_path.exists():
+        return FingerprintOutput(
+            success=False,
+            error=f"File not found: {input.file_path}"
+        )
+
+    try:
+        # Get fingerprint service
+        service = get_service()
+
+        # Get fingerprint
+        fingerprint = service.get_fingerprint(input.file_path)
+
+        # Get file info
+        file_size = file_path.stat().st_size
+        file_name = file_path.name
+
+        return FingerprintOutput(
+            success=True,
+            fingerprint=fingerprint,
+            file_name=file_name,
+            file_size=file_size
+        )
+
+    except Exception as e:
+        return FingerprintOutput(
+            success=False,
+            error=f"Failed to get fingerprint: {str(e)}"
+        )

+ 232 - 0
src/mcp_server/tools/glossary.py

@@ -0,0 +1,232 @@
+"""
+Glossary tools for the Novel Translator MCP server.
+
+This module provides MCP tools for managing translation terminology.
+"""
+
+from typing import Any, Dict, List, Optional
+
+from pydantic import BaseModel, Field
+
+
+class GlossaryAddInput(BaseModel):
+    """Input schema for adding glossary entries."""
+    source: str = Field(description="源语言术语")
+    target: str = Field(description="目标语言术语")
+    category: str = Field(
+        default="other",
+        description="术语类别 (character, skill, location, item, organization, other)"
+    )
+    context: str = Field(default="", description="上下文说明")
+
+
+class GlossaryEntryOutput(BaseModel):
+    """Output schema for a glossary entry."""
+    source: str
+    target: str
+    category: str
+    context: str
+
+
+class GlossaryListOutput(BaseModel):
+    """Output schema for glossary listing."""
+    success: bool
+    entries: Optional[List[GlossaryEntryOutput]] = None
+    count: Optional[int] = None
+    error: Optional[str] = None
+
+
+async def glossary_add(
+    input: GlossaryAddInput,
+    get_glossary_fn: callable = None,
+    notify_updated_fn: callable = None
+) -> Dict[str, Any]:
+    """
+    添加术语到术语表。
+
+    术语表用于确保关键术语的翻译一致性。在翻译时,
+    这些术语会被预处理和后处理,确保使用正确的翻译。
+
+    支持的类别:
+    - character: 角色名称(如:林风 -> Lin Feng)
+    - skill: 技能名称(如:火球术 -> Fireball)
+    - location: 地点名称(如:东方大陆 -> Eastern Continent)
+    - item: 物品名称(如:龙剑 -> Dragon Sword)
+    - organization: 组织名称(如:青云宗 -> Qingyun Sect)
+    - other: 其他术语
+
+    Args:
+        input: 术语添加参数
+        get_glossary_fn: 获取术语表的函数
+        notify_updated_fn: 通知术语表更新的函数
+
+    Returns:
+        操作结果
+    """
+    from ..glossary.models import GlossaryEntry, TermCategory
+    from ..server import (
+        get_glossary as default_get_glossary,
+        notify_glossary_updated as default_notify
+    )
+
+    get_glossary = get_glossary_fn or default_get_glossary
+    notify_updated = notify_updated_fn or default_notify
+
+    try:
+        # Validate input
+        if not input.source or not input.source.strip():
+            return {
+                "success": False,
+                "error": "Source term cannot be empty"
+            }
+
+        if not input.target or not input.target.strip():
+            return {
+                "success": False,
+                "error": "Target term cannot be empty"
+            }
+
+        # Get glossary
+        glossary = get_glossary()
+
+        # Parse category
+        try:
+            category = TermCategory(input.category)
+        except ValueError:
+            # Default to OTHER if invalid category
+            category = TermCategory.OTHER
+
+        # Create entry
+        entry = GlossaryEntry(
+            source=input.source.strip(),
+            target=input.target.strip(),
+            category=category,
+            context=input.context.strip()
+        )
+
+        # Add to glossary
+        glossary.add(entry)
+
+        # Notify pipeline of update
+        if notify_updated:
+            if callable(notify_updated):
+                import asyncio
+                if asyncio.iscoroutinefunction(notify_updated):
+                    await notify_updated()
+                else:
+                    notify_updated()
+
+        return {
+            "success": True,
+            "message": f"Added term: {input.source} -> {input.target}",
+            "entry": GlossaryEntryOutput(
+                source=entry.source,
+                target=entry.target,
+                category=entry.category.value,
+                context=entry.context
+            ).model_dump()
+        }
+
+    except Exception as e:
+        return {
+            "success": False,
+            "error": str(e)
+        }
+
+
+async def glossary_list(
+    get_glossary_fn: callable = None
+) -> GlossaryListOutput:
+    """
+    列出术语表所有条目。
+
+    返回当前术语表中的所有术语及其翻译。
+
+    Returns:
+        GlossaryListOutput 包含所有术语条目
+    """
+    from ..server import get_glossary as default_get_glossary
+
+    get_glossary = get_glossary_fn or default_get_glossary
+
+    try:
+        # Get glossary
+        glossary = get_glossary()
+
+        # Build output entries
+        entries = [
+            GlossaryEntryOutput(
+                source=e.source,
+                target=e.target,
+                category=e.category.value,
+                context=e.context
+            )
+            for e in glossary.get_all()
+        ]
+
+        return GlossaryListOutput(
+            success=True,
+            entries=entries,
+            count=len(entries)
+        )
+
+    except Exception as e:
+        return GlossaryListOutput(
+            success=False,
+            error=str(e)
+        )
+
+
+async def glossary_clear(
+    get_glossary_fn: callable = None,
+    notify_updated_fn: callable = None
+) -> Dict[str, Any]:
+    """
+    清空术语表。
+
+    删除术语表中的所有条目。此操作不可撤销。
+
+    Args:
+        get_glossary_fn: 获取术语表的函数
+        notify_updated_fn: 通知术语表更新的函数
+
+    Returns:
+        操作结果
+    """
+    from ..server import (
+        get_glossary as default_get_glossary,
+        notify_glossary_updated as default_notify
+    )
+
+    get_glossary = get_glossary_fn or default_get_glossary
+    notify_updated = notify_updated_fn or default_notify
+
+    try:
+        # Get glossary
+        glossary = get_glossary()
+
+        # Get count before clearing
+        count = len(glossary)
+
+        # Clear glossary (create new empty one)
+        glossary._terms.clear()
+
+        # Notify pipeline of update
+        if notify_updated:
+            if callable(notify_updated):
+                import asyncio
+                if asyncio.iscoroutinefunction(notify_updated):
+                    await notify_updated()
+                else:
+                    notify_updated()
+
+        return {
+            "success": True,
+            "message": f"Cleared {count} glossary entries"
+        }
+
+    except Exception as e:
+        return {
+            "success": False,
+            "error": str(e)
+        }

+ 375 - 0
src/mcp_server/tools/translation.py

@@ -0,0 +1,375 @@
+"""
+Translation tools for the Novel Translator MCP server.
+
+This module provides MCP tools for text and file translation using
+the m2m100 model with glossary support.
+"""
+
+import asyncio
+import uuid
+from pathlib import Path
+from typing import Any, Dict, List, Optional, Callable
+
+from pydantic import BaseModel, Field
+
+# Import schema models
+class TranslateTextInput(BaseModel):
+    """Input schema for single text translation."""
+    text: str = Field(description="要翻译的文本")
+    src_lang: str = Field(default="zh", description="源语言代码 (zh, en, etc.)")
+    tgt_lang: str = Field(default="en", description="目标语言代码")
+    max_length: Optional[int] = Field(default=200, description="最大生成长度")
+
+
+class TranslateBatchInput(BaseModel):
+    """Input schema for batch text translation."""
+    texts: List[str] = Field(description="要翻译的文本列表")
+    src_lang: str = Field(default="zh", description="源语言代码")
+    tgt_lang: str = Field(default="en", description="目标语言代码")
+    batch_size: int = Field(default=4, description="批处理大小")
+    max_length: Optional[int] = Field(default=200, description="最大生成长度")
+
+
+class TranslateFileInput(BaseModel):
+    """Input schema for file translation."""
+    file_path: str = Field(description="要翻译的文件路径")
+    src_lang: str = Field(default="zh", description="源语言代码")
+    tgt_lang: str = Field(default="en", description="目标语言代码")
+    output_path: Optional[str] = Field(default=None, description="输出文件路径(默认添加 _en 后缀)")
+    enable_cleaning: bool = Field(default=True, description="是否启用文件清洗")
+    enable_splitting: bool = Field(default=True, description="是否启用章节分割")
+
+
+class TranslationOutput(BaseModel):
+    """Output schema for translation results."""
+    success: bool
+    translated: Optional[str] = None
+    translations: Optional[List[str]] = None
+    output_path: Optional[str] = None
+    task_id: Optional[str] = None
+    terms_used: Optional[List[str]] = None
+    error: Optional[str] = None
+    chapters_translated: Optional[int] = None
+    total_chapters: Optional[int] = None
+
+
+class ChapterOutput(BaseModel):
+    """Schema for translated chapter."""
+    index: int
+    title: str
+    content: str
+
+
+# Progress callback type
+ProgressCallback = Optional[Callable[[str, Dict[str, Any]], Any]]
+
+_global_progress_callback: ProgressCallback = None
+
+
+def register_progress_callback(callback: ProgressCallback) -> None:
+    """Register a global progress callback for file translation."""
+    global _global_progress_callback
+    _global_progress_callback = callback
+
+
+async def _notify_progress(task_id: str, updates: Dict[str, Any]) -> None:
+    """Notify progress callback if registered."""
+    if _global_progress_callback:
+        if asyncio.iscoroutinefunction(_global_progress_callback):
+            await _global_progress_callback(task_id, updates)
+        else:
+            _global_progress_callback(task_id, updates)
+
+
+def _add_suffix(path: str, suffix: str) -> str:
+    """Add a suffix to a file path (before extension)."""
+    p = Path(path)
+    return str(p.with_stem(p.stem + suffix))
+
+
+async def translate_text(
+    input: TranslateTextInput,
+    get_pipeline_fn: Callable = None
+) -> TranslationOutput:
+    """
+    翻译单段文本,支持术语表预处理和后处理。
+
+    此工具使用完整的翻译流水线,包括:
+    - 术语表预处理(替换术语为占位符)
+    - m2m100 模型翻译
+    - 后处理(还原术语并清洗)
+
+    Args:
+        input: 翻译输入参数
+        get_pipeline_fn: 函数获取翻译流水线(用于依赖注入)
+
+    Returns:
+        TranslationOutput 包含翻译结果和使用的术语
+    """
+    from ..server import get_pipeline as default_get_pipeline
+
+    get_pipeline = get_pipeline_fn or default_get_pipeline
+
+    try:
+        # Validate input
+        if not input.text or not input.text.strip():
+            return TranslationOutput(
+                success=False,
+                error="Text cannot be empty"
+            )
+
+        # Get pipeline
+        pipeline = get_pipeline()
+
+        # Update languages if different from default
+        if input.src_lang != pipeline.src_lang or input.tgt_lang != pipeline.tgt_lang:
+            pipeline.set_languages(input.src_lang, input.tgt_lang)
+
+        # Translate with details
+        result = pipeline.translate(input.text, return_details=True)
+
+        return TranslationOutput(
+            success=True,
+            translated=result.translated,
+            terms_used=result.terms_used if result.terms_used else []
+        )
+
+    except ValueError as e:
+        return TranslationOutput(
+            success=False,
+            error=str(e)
+        )
+    except Exception as e:
+        return TranslationOutput(
+            success=False,
+            error=f"Translation failed: {str(e)}"
+        )
+
+
+async def translate_batch(
+    input: TranslateBatchInput,
+    get_pipeline_fn: Callable = None
+) -> TranslationOutput:
+    """
+    批量翻译多段文本。
+
+    此工具使用批处理模式提高翻译效率,支持术语表。
+
+    Args:
+        input: 批量翻译输入参数
+        get_pipeline_fn: 函数获取翻译流水线(用于依赖注入)
+
+    Returns:
+        TranslationOutput 包含翻译结果列表
+    """
+    from ..server import get_pipeline as default_get_pipeline
+
+    get_pipeline = get_pipeline_fn or default_get_pipeline
+
+    try:
+        # Validate input
+        if not input.texts:
+            return TranslationOutput(
+                success=False,
+                error="Texts list cannot be empty"
+            )
+
+        # Get pipeline
+        pipeline = get_pipeline()
+
+        # Update languages if different
+        if input.src_lang != pipeline.src_lang or input.tgt_lang != pipeline.tgt_lang:
+            pipeline.set_languages(input.src_lang, input.tgt_lang)
+
+        # Translate batch
+        results = pipeline.translate_batch(input.texts, return_details=True)
+
+        translations = []
+        all_terms_used = set()
+
+        for r in results:
+            translations.append(r.translated)
+            if r.terms_used:
+                all_terms_used.update(r.terms_used)
+
+        return TranslationOutput(
+            success=True,
+            translations=translations,
+            terms_used=list(all_terms_used)
+        )
+
+    except ValueError as e:
+        return TranslationOutput(
+            success=False,
+            error=str(e)
+        )
+    except Exception as e:
+        return TranslationOutput(
+            success=False,
+            error=f"Batch translation failed: {str(e)}"
+        )
+
+
+async def translate_file(
+    input: TranslateFileInput,
+    get_pipeline_fn: Callable = None,
+    get_cleaning_pipeline_fn: Callable = None,
+    create_task_fn: Callable = None,
+    update_progress_fn: Callable = None,
+    complete_task_fn: Callable = None
+) -> TranslationOutput:
+    """
+    翻译整个 TXT 文件(完整流水线)。
+
+    此工具执行完整的翻译流程:
+    1. 文件清洗(去除无效字符、标准化格式)
+    2. 章节分割(按章节标题分割)
+    3. 逐章翻译(使用术语表)
+    4. 保存结果(保留章节结构)
+
+    Args:
+        input: 文件翻译输入参数
+        get_pipeline_fn: 获取翻译流水线的函数
+        get_cleaning_pipeline_fn: 获取清洗流水线的函数
+        create_task_fn: 创建进度跟踪任务的函数
+        update_progress_fn: 更新进度的函数
+        complete_task_fn: 完成任务的函数
+
+    Returns:
+        TranslationOutput 包含输出文件路径和任务ID
+    """
+    from ..server import (
+        get_pipeline as default_get_pipeline,
+        get_cleaning_pipeline as default_get_cleaning_pipeline,
+        create_task as default_create_task,
+        update_progress as default_update_progress,
+        complete_task as default_complete_task
+    )
+
+    get_pipeline = get_pipeline_fn or default_get_pipeline
+    get_cleaning_pipeline = get_cleaning_pipeline_fn or default_get_cleaning_pipeline
+    create_task = create_task_fn or default_create_task
+    update_progress = update_progress_fn or default_update_progress
+    complete_task = complete_task_fn or default_complete_task
+
+    # Validate file exists
+    file_path = Path(input.file_path)
+    if not file_path.exists():
+        return TranslationOutput(
+            success=False,
+            error=f"File not found: {input.file_path}"
+        )
+
+    # Create progress tracking task
+    task_id = create_task(
+        task_type="file_translation",
+        metadata={"file_path": input.file_path}
+    )
+
+    try:
+        # Get pipelines
+        pipeline = get_pipeline()
+        cleaning_pipeline = get_cleaning_pipeline()
+
+        # Update languages if different
+        if input.src_lang != pipeline.src_lang or input.tgt_lang != pipeline.tgt_lang:
+            pipeline.set_languages(input.src_lang, input.tgt_lang)
+
+        # Configure cleaning pipeline
+        cleaning_pipeline.enable_cleaning = input.enable_cleaning
+        cleaning_pipeline.enable_splitting = input.enable_splitting
+
+        # Step 1: Clean and split file
+        await update_progress(task_id, {
+            "status": "cleaning",
+            "message": "Reading and cleaning file..."
+        })
+
+        chapters = cleaning_pipeline.process(file_path)
+        total_chapters = len(chapters)
+
+        await update_progress(task_id, {
+            "status": "translating",
+            "total": total_chapters,
+            "current": 0,
+            "percent": 0.0,
+            "message": f"Starting translation of {total_chapters} chapters..."
+        })
+
+        # Step 2: Translate each chapter
+        translated_chapters = []
+        all_terms_used = set()
+
+        for i, chapter in enumerate(chapters):
+            # Translate chapter content
+            result = pipeline.translate(chapter.content, return_details=True)
+
+            translated_chapters.append(ChapterOutput(
+                index=i,
+                title=chapter.title,
+                content=result.translated
+            ))
+
+            if result.terms_used:
+                all_terms_used.update(result.terms_used)
+
+            # Update progress
+            percent = (i + 1) / total_chapters * 100
+            await update_progress(task_id, {
+                "status": "translating",
+                "current": i + 1,
+                "percent": percent,
+                "message": f"Translated chapter {i + 1}/{total_chapters}: {chapter.title}"
+            })
+
+        # Step 3: Save result
+        output_path = input.output_path or _add_suffix(input.file_path, "_en")
+        output_file = Path(output_path)
+
+        await update_progress(task_id, {
+            "status": "saving",
+            "message": f"Saving translated file to {output_path}..."
+        })
+
+        # Ensure parent directory exists
+        output_file.parent.mkdir(parents=True, exist_ok=True)
+
+        # Write translated content
+        with open(output_file, "w", encoding="utf-8") as f:
+            for chapter in translated_chapters:
+                if chapter.title:
+                    f.write(f"## {chapter.title}\n\n")
+                f.write(chapter.content)
+                f.write("\n\n")
+
+        # Complete task
+        await complete_task(task_id, success=True)
+
+        await update_progress(task_id, {
+            "status": "completed",
+            "message": f"Translation completed. Output saved to {output_path}"
+        })
+
+        return TranslationOutput(
+            success=True,
+            output_path=output_path,
+            task_id=task_id,
+            terms_used=list(all_terms_used),
+            chapters_translated=total_chapters,
+            total_chapters=total_chapters
+        )
+
+    except Exception as e:
+        await complete_task(task_id, success=False)
+
+        await update_progress(task_id, {
+            "status": "failed",
+            "error": str(e),
+            "message": f"Translation failed: {str(e)}"
+        })
+
+        return TranslationOutput(
+            success=False,
+            error=str(e),
+            task_id=task_id
+        )

+ 511 - 0
tests/test_mcp_server.py

@@ -0,0 +1,511 @@
+"""
+Unit tests for the Novel Translator MCP Server.
+
+This test suite validates all MCP tools and server functionality.
+"""
+
+import asyncio
+import json
+import tempfile
+from pathlib import Path
+from unittest.mock import AsyncMock, MagicMock, Mock, patch
+
+import pytest
+
+# Mock the heavy ML imports before importing server modules
+sys_modules_mock = MagicMock()
+sys_modules_mock.transformers = MagicMock()
+sys_modules_mock.torch = MagicMock()
+
+with patch.dict('sys.modules', {
+    'transformers': sys_modules_mock.transformers,
+    'torch': sys_modules_mock.torch
+}):
+    from src.mcp_server.server import (
+        mcp,
+        get_pipeline,
+        get_glossary,
+        get_cleaning_pipeline,
+        get_fingerprint_service,
+        create_task,
+        update_progress,
+        complete_task,
+        notify_glossary_updated,
+    )
+    from src.glossary.models import Glossary, GlossaryEntry, TermCategory
+
+
+# ============================================================================
+# Fixtures
+# ============================================================================
+
+@pytest.fixture
+def temp_file():
+    """Create a temporary file for testing."""
+    with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt', encoding='utf-8') as f:
+        f.write("## Chapter 1\n\nThis is test content.\n\n## Chapter 2\n\nMore content here.")
+        temp_path = f.name
+    yield temp_path
+    Path(temp_path).unlink(missing_ok=True)
+
+
+@pytest.fixture
+def sample_glossary():
+    """Create a sample glossary for testing."""
+    glossary = Glossary()
+    glossary.add(GlossaryEntry(
+        source="林风",
+        target="Lin Feng",
+        category=TermCategory.CHARACTER,
+        context="Main protagonist"
+    ))
+    glossary.add(GlossaryEntry(
+        source="火球术",
+        target="Fireball",
+        category=TermCategory.SKILL,
+        context="Magic spell"
+    ))
+    return glossary
+
+
+@pytest.fixture
+def mock_pipeline():
+    """Create a mock translation pipeline."""
+    pipeline = MagicMock()
+    pipeline.translate = MagicMock(return_value=MagicMock(
+        translated="Translated text",
+        terms_used=["林风"]
+    ))
+    pipeline.translate_batch = MagicMock(return_value=[
+        MagicMock(translated="Translation 1", terms_used=["term1"]),
+        MagicMock(translated="Translation 2", terms_used=["term2"])
+    ])
+    pipeline.set_languages = MagicMock()
+    pipeline.update_glossary = MagicMock()
+    pipeline.src_lang = "zh"
+    pipeline.tgt_lang = "en"
+    return pipeline
+
+
+@pytest.fixture
+def mock_cleaning_pipeline():
+    """Create a mock cleaning pipeline."""
+    from src.cleaning.models import Chapter
+
+    pipeline = MagicMock()
+    pipeline.enable_cleaning = True
+    pipeline.enable_splitting = True
+    pipeline.process = MagicMock(return_value=[
+        Chapter(index=0, title="Chapter 1", content="Content 1", char_count=100),
+        Chapter(index=1, title="Chapter 2", content="Content 2", char_count=150),
+    ])
+    return pipeline
+
+
+@pytest.fixture
+def mock_fingerprint_service():
+    """Create a mock fingerprint service."""
+    service = MagicMock()
+    service.check_before_import = MagicMock(return_value=(False, None))
+    service.get_fingerprint = MagicMock(return_value="abc123def456")
+    service.get_file_info = MagicMock(return_value={
+        "fingerprint": "abc123",
+        "metadata": {"size": 1000},
+        "is_duplicate": False,
+        "existing_work_id": None
+    })
+    return service
+
+
+# ============================================================================
+# Server State Tests
+# ============================================================================
+
+class TestServerState:
+    """Tests for server state management."""
+
+    def test_get_glossary_returns_singleton(self):
+        """Test that get_glossary returns the same instance."""
+        g1 = get_glossary()
+        g2 = get_glossary()
+        assert g1 is g2
+
+    def test_create_task_generates_unique_id(self):
+        """Test that create_task generates unique task IDs."""
+        task1 = create_task("test_type", 10)
+        task2 = create_task("test_type", 10)
+        assert task1 != task2
+
+
+# ============================================================================
+# Translation Tool Tests
+# ============================================================================
+
+class TestTranslationTools:
+    """Tests for translation tools."""
+
+    @pytest.mark.asyncio
+    async def test_translate_text_with_mock(self, mock_pipeline):
+        """Test translate_text with mocked pipeline."""
+        from src.mcp_server.server import translate_text
+
+        with patch('src.mcp_server.server.get_pipeline', return_value=mock_pipeline):
+            result = await translate_text(
+                text="你好世界",
+                src_lang="zh",
+                tgt_lang="en"
+            )
+
+            assert result["success"] is True
+            assert result["translated"] == "Translated text"
+            assert result["terms_used"] == ["林风"]
+
+    @pytest.mark.asyncio
+    async def test_translate_text_empty_input(self):
+        """Test translate_text with empty input."""
+        from src.mcp_server.server import translate_text
+
+        result = await translate_text(text="", src_lang="zh", tgt_lang="en")
+        assert result["success"] is False
+        assert "empty" in result["error"].lower()
+
+    @pytest.mark.asyncio
+    async def test_translate_batch_with_mock(self, mock_pipeline):
+        """Test translate_batch with mocked pipeline."""
+        from src.mcp_server.server import translate_batch
+
+        with patch('src.mcp_server.server.get_pipeline', return_value=mock_pipeline):
+            result = await translate_batch(
+                texts=["Text 1", "Text 2"],
+                src_lang="zh",
+                tgt_lang="en"
+            )
+
+            assert result["success"] is True
+            assert result["translations"] == ["Translation 1", "Translation 2"]
+            assert len(result["terms_used"]) == 2
+
+    @pytest.mark.asyncio
+    async def test_translate_batch_empty_list(self):
+        """Test translate_batch with empty list."""
+        from src.mcp_server.server import translate_batch
+
+        result = await translate_batch(texts=[], src_lang="zh", tgt_lang="en")
+        assert result["success"] is False
+        assert "empty" in result["error"].lower()
+
+    @pytest.mark.asyncio
+    async def test_translate_file_with_mock(
+        self,
+        temp_file,
+        mock_pipeline,
+        mock_cleaning_pipeline
+    ):
+        """Test translate_file with mocked dependencies."""
+        from src.mcp_server.server import translate_file
+
+        with patch('src.mcp_server.server.get_pipeline', return_value=mock_pipeline), \
+             patch('src.mcp_server.server.get_cleaning_pipeline', return_value=mock_cleaning_pipeline), \
+             patch('src.mcp_server.server.create_task', return_value="test-task-id"), \
+             patch('src.mcp_server.server.update_progress', new_callable=AsyncMock), \
+             patch('src.mcp_server.server.complete_task', new_callable=AsyncMock):
+
+            # Create a temp output file
+            with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='_en.txt') as f:
+                output_path = f.name
+
+            try:
+                result = await translate_file(
+                    file_path=temp_file,
+                    output_path=output_path
+                )
+
+                assert result["success"] is True
+                assert "task_id" in result
+                assert result["chapters_translated"] == 2
+
+            finally:
+                Path(output_path).unlink(missing_ok=True)
+
+    @pytest.mark.asyncio
+    async def test_translate_file_not_found(self):
+        """Test translate_file with non-existent file."""
+        from src.mcp_server.server import translate_file
+
+        result = await translate_file(file_path="/nonexistent/file.txt")
+        assert result["success"] is False
+        assert "not found" in result["error"].lower()
+
+
+# ============================================================================
+# Cleaning Tool Tests
+# ============================================================================
+
+class TestCleaningTools:
+    """Tests for cleaning tools."""
+
+    @pytest.mark.asyncio
+    async def test_clean_file_with_mock(self, temp_file, mock_cleaning_pipeline):
+        """Test clean_file with mocked pipeline."""
+        from src.mcp_server.server import clean_file
+
+        with patch('src.mcp_server.server.get_cleaning_pipeline', return_value=mock_cleaning_pipeline):
+            result = await clean_file(file_path=temp_file)
+
+            assert result["success"] is True
+            assert result["chapter_count"] == 2
+            assert result["total_chars"] == 250
+            assert len(result["chapters"]) == 2
+
+    @pytest.mark.asyncio
+    async def test_clean_file_not_found(self):
+        """Test clean_file with non-existent file."""
+        from src.mcp_server.server import clean_file
+
+        result = await clean_file(file_path="/nonexistent/file.txt")
+        assert result["success"] is False
+        assert "not found" in result["error"].lower()
+
+    @pytest.mark.asyncio
+    async def test_split_chapters_with_mock(self, mock_cleaning_pipeline):
+        """Test split_chapters with mocked splitter."""
+        from src.mcp_server.server import split_chapters
+        from src.cleaning.splitter import ChapterSplitter
+
+        mock_splitter = MagicMock()
+        mock_splitter.split = MagicMock(return_value=[
+            MagicMock(index=0, title="Chapter 1", char_count=100, content="Content 1"),
+            MagicMock(index=1, title="Chapter 2", char_count=150, content="Content 2"),
+        ])
+
+        with patch('src.cleaning.splitter.ChapterSplitter', return_value=mock_splitter):
+            result = await split_chapters(text="## Chapter 1\n\nContent\n\n## Chapter 2\n\nMore content")
+
+            assert result["success"] is True
+            assert result["chapter_count"] == 2
+
+    @pytest.mark.asyncio
+    async def test_split_chapters_empty_text(self):
+        """Test split_chapters with empty text."""
+        from src.mcp_server.server import split_chapters
+
+        result = await split_chapters(text="")
+        assert result["success"] is False
+        assert "empty" in result["error"].lower()
+
+
+# ============================================================================
+# Glossary Tool Tests
+# ============================================================================
+
+class TestGlossaryTools:
+    """Tests for glossary tools."""
+
+    @pytest.mark.asyncio
+    async def test_glossary_add(self):
+        """Test adding a term to the glossary."""
+        from src.mcp_server.server import glossary_add, get_glossary
+
+        # Clear the glossary first
+        get_glossary()._terms.clear()
+
+        result = await glossary_add(
+            source="林风",
+            target="Lin Feng",
+            category="character",
+            context="Main protagonist"
+        )
+
+        assert result["success"] is True
+        assert "entry" in result
+        assert result["entry"]["source"] == "林风"
+        assert result["entry"]["target"] == "Lin Feng"
+
+    @pytest.mark.asyncio
+    async def test_glossary_add_empty_source(self):
+        """Test adding a term with empty source."""
+        from src.mcp_server.server import glossary_add
+
+        result = await glossary_add(source="", target="Lin Feng")
+        assert result["success"] is False
+        assert "empty" in result["error"].lower()
+
+    @pytest.mark.asyncio
+    async def test_glossary_add_empty_target(self):
+        """Test adding a term with empty target."""
+        from src.mcp_server.server import glossary_add
+
+        result = await glossary_add(source="林风", target="")
+        assert result["success"] is False
+        assert "empty" in result["error"].lower()
+
+    @pytest.mark.asyncio
+    async def test_glossary_list(self, sample_glossary):
+        """Test listing glossary entries."""
+        from src.mcp_server.server import glossary_list, get_glossary
+
+        # Replace with sample glossary
+        with patch('src.mcp_server.server.get_glossary', return_value=sample_glossary):
+            result = await glossary_list()
+
+            assert result["success"] is True
+            assert result["count"] == 2
+            assert len(result["entries"]) == 2
+
+            # Check entries
+            sources = [e["source"] for e in result["entries"]]
+            assert "林风" in sources
+            assert "火球术" in sources
+
+    @pytest.mark.asyncio
+    async def test_glossary_clear(self):
+        """Test clearing the glossary."""
+        from src.mcp_server.server import glossary_clear, get_glossary
+
+        # Add some entries first
+        get_glossary().add(GlossaryEntry(
+            source="Test",
+            target="Test EN",
+            category=TermCategory.OTHER
+        ))
+
+        result = await glossary_clear()
+
+        assert result["success"] is True
+        assert len(get_glossary()._terms) == 0
+
+
+# ============================================================================
+# Fingerprint Tool Tests
+# ============================================================================
+
+class TestFingerprintTools:
+    """Tests for fingerprint tools."""
+
+    @pytest.mark.asyncio
+    async def test_check_duplicate(self, temp_file, mock_fingerprint_service):
+        """Test checking for duplicate files."""
+        from src.mcp_server.server import check_duplicate
+
+        with patch('src.mcp_server.server.get_fingerprint_service', return_value=mock_fingerprint_service):
+            result = await check_duplicate(file_path=temp_file)
+
+            assert result["success"] is True
+            assert result["is_duplicate"] is False
+            assert result["fingerprint"] == "abc123def456"
+
+    @pytest.mark.asyncio
+    async def test_check_duplicate_not_found(self):
+        """Test check_duplicate with non-existent file."""
+        from src.mcp_server.server import check_duplicate
+
+        result = await check_duplicate(file_path="/nonexistent/file.txt")
+        assert result["success"] is False
+        assert "not found" in result["error"].lower()
+
+    @pytest.mark.asyncio
+    async def test_get_fingerprint(self, temp_file, mock_fingerprint_service):
+        """Test getting file fingerprint."""
+        from src.mcp_server.server import get_fingerprint
+
+        with patch('src.mcp_server.server.get_fingerprint_service', return_value=mock_fingerprint_service):
+            result = await get_fingerprint(file_path=temp_file)
+
+            assert result["success"] is True
+            assert result["fingerprint"] == "abc123def456"
+            assert "file_name" in result
+
+    @pytest.mark.asyncio
+    async def test_get_fingerprint_not_found(self):
+        """Test get_fingerprint with non-existent file."""
+        from src.mcp_server.server import get_fingerprint
+
+        result = await get_fingerprint(file_path="/nonexistent/file.txt")
+        assert result["success"] is False
+        assert "not found" in result["error"].lower()
+
+
+# ============================================================================
+# Progress Resource Tests
+# ============================================================================
+
+class TestProgressResources:
+    """Tests for progress resources."""
+
+    @pytest.mark.asyncio
+    async def test_progress_resource_flow(self):
+        """Test the full progress resource flow."""
+        from src.mcp_server.server import get_progress_resource, list_all_progress
+
+        # Create a task
+        task_id = create_task("test_type", 10)
+
+        # Update progress
+        await update_progress(task_id, {"current": 5, "percent": 50.0})
+
+        # Get progress
+        progress_json = await get_progress_resource(task_id)
+        progress = json.loads(progress_json)
+
+        assert progress["task_id"] == task_id
+        assert progress["current"] == 5
+        assert progress["percent"] == 50.0
+
+        # List all progress
+        list_json = await list_all_progress()
+        task_list = json.loads(list_json)
+
+        assert task_list["count"] >= 1
+
+        # Complete task
+        await complete_task(task_id, success=True)
+
+        # Verify completion
+        final_progress = json.loads(await get_progress_resource(task_id))
+        assert final_progress["status"] == "completed"
+
+    @pytest.mark.asyncio
+    async def test_progress_resource_not_found(self):
+        """Test getting progress for non-existent task."""
+        from src.mcp_server.server import get_progress_resource
+
+        progress_json = await get_progress_resource("non-existent-task-id")
+        progress = json.loads(progress_json)
+
+        assert "error" in progress
+        assert "not found" in progress["error"].lower()
+
+
+# ============================================================================
+# Integration Tests
+# ============================================================================
+
+class TestIntegration:
+    """Integration tests for complete workflows."""
+
+    @pytest.mark.asyncio
+    async def test_glossary_translation_workflow(self):
+        """Test adding terms and using them in translation."""
+        from src.mcp_server.server import glossary_add, glossary_list
+
+        # Clear glossary
+        get_glossary()._terms.clear()
+
+        # Add terms
+        await glossary_add(source="林风", target="Lin Feng", category="character")
+        await glossary_add(source="青云宗", target="Qingyun Sect", category="organization")
+
+        # List terms
+        result = await glossary_list()
+
+        assert result["success"] is True
+        assert result["count"] == 2
+
+        # Verify terms were added
+        sources = {e["source"] for e in result["entries"]}
+        assert "林风" in sources
+        assert "青云宗" in sources
+
+
+if __name__ == "__main__":
+    pytest.main([__file__, "-v", "--cov=src/mcp_server", "--cov-report=term"])