architecture.md 65 KB


stepsCompleted: [1, 2, 3, 4, 5] inputDocuments: ['prd.md'] workflowType: 'architecture' project_name: '223-236-template-6' user_name: 'User' date: '2026-03-13' status: 'complete'

version: '1.0'

Architecture Decision Document

This document builds collaboratively through step-by-step discovery. Sections are appended as we work through each architectural decision together.

Project Context Analysis

Requirements Overview

Functional Requirements:

项目包含 52 个功能需求,分为六个核心模块:

  1. 指纹模块 (FR1-FR8): 章节指纹查重,支持批量检测与人工审核
  2. 清洗模块 (FR9-FR16): 正则替换规则引擎、格式标准化、HTML/Markdown 处理
  3. 术语模块 (FR17-FR24): 术语库管理、智能提取、锁定标记 (§Ti§)
  4. 翻译模块 (FR25-FR33): M2M100 模型推理、GPU/CPU 自适应、批处理优化
  5. 上传模块 (FR34-FR40): 平台 API 对接、失败重试、CU 扣费
  6. 任务调度器 (FR41-FR47): 流水线编排、并发控制、断点续传
  7. 系统集成 (FR48-FR52): 配置管理、日志系统、版本检测

Non-Functional Requirements:

类别 关键要求 架构影响
性能 3000-5000 词/分钟 (RTX 3060) 需要批处理优化、GPU 内存管理
可靠性 Crash-Safe 原子写 所有持久化操作需使用 .tmp + fsync + rename 模式
安全性 零数据泄露 全流程本地处理,禁止数据外传
兼容性 NVIDIA GTX 1650+ (4GB+ VRAM) 需要优雅的 GPU 降级策略
许可证 零授权费依赖 所有依赖必须为标准库或 MIT 协议

Scale & Complexity:

  • Primary domain: Desktop Application + AI Inference
  • Complexity level: Medium
  • Estimated architectural components: 7 major components

Technical Constraints & Dependencies

硬约束:

  • 必须使用 MIT 协议库(排除 GPL 污染)
  • 必须 100% 本地处理(无云 API 调用)
  • 必须支持 Crash-Safe 原子写

外部依赖:

  • CTranslate2 (MIT): 模型推理引擎
  • facebook/m2m100_418M: 翻译模型
  • PyQt6: GUI 框架
  • PyTorch (CUDA): GPU 加速

集成接口:

  • 指纹查重 API: POST /api/fingerprint/check
  • 平台上传 API: 章节提交接口
  • CU 扣费 API: 按字数计费

Cross-Cutting Concerns Identified

  1. Crash-Safe 持久化: 影响所有写操作(进度、清洗结果、翻译结果)
  2. GPU 资源管理: 翻译模块独占,需协调与其他模块的并发
  3. 术语一致性: 术语锁定机制需跨越清洗→翻译流程传递
  4. 进度可见性: 六个阶段进度需统一展示
  5. 错误恢复: 每个模块的失败处理与断点续传
  6. 许可证合规性: 所有新增依赖需验证许可证类型

Starter Template Evaluation

Primary Technology Domain

Python Desktop Application (PyQt6 + CTranslate2 GPU Inference)

基于项目需求分析,这是一个本地桌面应用,需要:

  • GUI框架:PyQt6
  • AI推理引擎:CTranslate2 (GPU加速)
  • 系统集成:文件I/O、网络API调用

Starter Options Considered

由于Python桌面应用领域没有统一的"启动模板"生态系统,我们评估了以下选项:

选项 优点 缺点 适用性
从零构建 完全控制,无技术债 需要手动配置所有工具 ✅ 推荐 - 特定需求较多
Python Boilerplate 标准结构,包含测试/代码质量 针对Web/服务端优化 ⚠️ 部分适用
Cookiecutter模板 快速启动,最佳实践 需要定制化修改 ⚠️ 部分适用

Selected Approach: 自定义项目结构 (基于2025年最佳实践)

Rationale for Selection:

本项目有以下独特约束,标准模板无法满足:

  1. Crash-Safe 原子写机制:需要在所有持久化点实现
  2. GPU 资源管理:CTranslate2 需要特定配置
  3. 零授权费约束:需要严格验证所有依赖的许可证
  4. 六模块流水线架构:需要特定的模块划分

项目初始化命令:

# 1. 创建项目目录结构
mkdir -p xling-matrix-assistant/src/xling_matrix/{core,modules,ui,infrastructure}
mkdir -p xling-matrix-assistant/tests/{unit,integration}
mkdir -p xling-matrix-assistant/{data,models,logs,docs}

# 2. 创建虚拟环境
python -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate

# 3. 安装核心依赖
pip install PyQt6 ctranslate2 torch numpy requests pyyaml

# 4. 安装开发工具
pip install pytest pytest-qt pytest-cov black ruff mypy

Architectural Decisions Established:

Language & Runtime:

  • Python 3.11+ (类型注解支持,性能优化)
  • 类型检查:mypy (严格模式)
  • 代码格式化:black
  • 代码检查:ruff

项目结构 (src layout):

xling-matrix-assistant/
├── src/
│   └── xling_matrix/
│       ├── __init__.py
│       ├── __main__.py           # 应用入口点
│       ├── core/                 # 核心领域模型
│       │   ├── __init__.py
│       │   ├── models.py         # 数据模型
│       │   ├── state.py          # 状态机
│       │   └── pipeline.py       # 流水线编排
│       ├── modules/              # 六大核心模块
│       │   ├── __init__.py
│       │   ├── fingerprint/      # FR1-FR8
│       │   ├── cleaning/         # FR9-FR16
│       │   ├── terminology/      # FR17-FR24
│       │   ├── translation/      # FR25-FR33
│       │   └── upload/           # FR34-FR40
│       ├── ui/                   # PyQt6 GUI
│       │   ├── __init__.py
│       │   ├── main_window.py
│       │   ├── widgets/          # 自定义控件
│       │   └── dialogs/          # 对话框
│       └── infrastructure/       # 基础设施层
│           ├── __init__.py
│           ├── storage.py        # Crash-Safe 持久化
│           ├── gpu_manager.py    # GPU 资源管理
│           ├── api_client.py     # 外部 API 客户端
│           └── logger.py         # 日志系统
├── tests/
│   ├── unit/
│   └── integration/
├── models/                       # 翻译模型存储
├── data/                         # 用户数据目录
├── logs/                         # 日志目录
├── pyproject.toml                # 项目配置
├── pyproject.toml                # 打包配置
└── README.md

Build Tooling & Packaging:

# pyproject.toml
[project]
name = "xling-matrix-assistant"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = [
    "PyQt6>=6.6.0",
    "ctranslate2>=4.0.0",
    "torch>=2.1.0",
    "numpy>=1.24.0",
    "requests>=2.31.0",
    "pyyaml>=6.0.0",
]

[project.optional-dependencies]
dev = ["pytest>=7.4.0", "pytest-qt>=4.2.0", "pytest-cov>=4.1.0", "black>=23.12.0", "ruff>=0.1.0", "mypy>=1.7.0"]

[project.scripts]
xling-matrix = "xling_matrix.__main__:main"

[tool.black]
line-length = 100
target-version = ["py311"]

[tool.ruff]
line-length = 100
select = ["E", "F", "I", "N", "W"]

[tool.mypy]
python_version = "3.11"
strict = true

Testing Framework:

  • pytest (单元测试)
  • pytest-qt (PyQt6 测试工具)
  • pytest-cov (覆盖率报告)

Development Experience:

  • 虚拟环境隔离
  • 类型检查 (mypy strict)
  • 即时重载 (开发模式)
  • 调试配置 (VS Code / PyCharm)

GPU Inference Configuration (CTranslate2):

# 推荐配置
import ctranslate2

translator = ctranslate2.Translator(
    "models/m2m100_418m_ct2/",
    device="cuda",                    # GPU 加速
    device_index=0,                   # 主 GPU
    compute_type="float16",           # Tensor Core 优化
    inter_threads=4,                  # 并发批处理
)

# 批处理优化
batch_size = 16  # 根据显存调整 (RTX 3060: 16-32)

Note: 项目初始化应作为第一个实现故事执行。

Core Architectural Decisions

Decision Priority Analysis

Critical Decisions (Block Implementation):

  1. Crash-Safe 原子写机制: 采用 .tmp + fsync + rename 模式,所有持久化操作必须遵循
  2. 数据文件格式: 使用 JSON 格式存储进度、清洗结果、翻译结果、术语库
  3. GPU 推理配置: CTranslate2 + float16 + 批处理优化
  4. 六模块流水线架构: Fingerprint → Cleaning → Terminology → Translation → Upload

Important Decisions (Shape Architecture):

  1. PyQt6 ModelView 架构: 使用 Qt Model/View 分离,实现数据驱动UI更新
  2. Repository 模式: 抽象数据持久化层,统一 Crash-Safe 机制
  3. Observer 模式: 进度事件通知机制,解耦业务逻辑与UI
  4. 打包策略: PyInstaller 打包为可执行文件

Deferred Decisions (Post-MVP):

  1. 自动更新机制: Growth 阶段功能,使用第三方库 (如 PyUpdater)
  2. 插件系统: Vision 阶段功能,允许扩展自定义模块
  3. 云同步: Vision 阶段功能,可选的云端备份

Data Architecture

数据存储策略:

数据文件 格式 访问模式 Crash-Safe 实现
progress.json JSON 读写频繁 原子替换 + 锁机制
novel_cleaned.json JSON 写入一次 原子写入
terms_temp.json JSON 读写频繁 原子替换 + 锁机制
novel_translated.json JSON 写入一次 原子写入
upload_failed.jsonl JSONL 追加写入 原子追加 + 锁机制
terms_library.json JSON 读写频繁 原子替换 + 锁机制

数据验证策略:

  • Pydantic 模型: 定义数据模型的类型约束
  • 运行时验证: 所有外部输入必须经过验证
  • Schema 迁移: 版本化数据格式,支持自动升级

Crash-Safe 持久化实现:

# infrastructure/storage.py
import os
import fcntl

class AtomicWriter:
    """Crash-Safe 原子写工具"""

    @staticmethod
    def write(filepath: str, data: dict | str) -> None:
        tmp_path = f"{filepath}.tmp"

        # 写入临时文件
        with open(tmp_path, 'w', encoding='utf-8') as f:
            if isinstance(data, dict):
                json.dump(data, f, ensure_ascii=False, indent=2)
            else:
                f.write(data)
            f.flush()  # 强制写入磁盘
            os.fsync(f.fileno())  # 强制同步

        # 原子重命名
        os.replace(tmp_path, filepath)

Authentication & Security

不适用: 本地桌面应用,无需认证/授权机制

数据安全:

  • 所有数据 100% 本地存储
  • 禁止任何网络数据上传(除平台API上传外)
  • GPU 模型本地推理,无云端API调用

API & Communication Patterns

外部 API 集成:

# infrastructure/api_client.py
import requests
from typing import Dict, Optional

class PlatformAPIClient:
    """平台 API 客户端"""

    def __init__(self, base_url: str, api_key: str):
        self.base_url = base_url
        self.api_key = api_key
        self.timeout = 30  # 30秒超时

    def check_fingerprint(self, text: str) -> Dict:
        """指纹查重 API"""
        response = requests.post(
            f"{self.base_url}/api/fingerprint/check",
            json={"text": text},
            headers={"Authorization": f"Bearer {self.api_key}"},
            timeout=self.timeout
        )
        response.raise_for_status()
        return response.json()

    def upload_chapter(self, chapter_data: Dict) -> Dict:
        """章节上传 API"""
        response = requests.post(
            f"{self.base_url}/api/chapters",
            json=chapter_data,
            headers={"Authorization": f"Bearer {self.api_key}"},
            timeout=self.timeout
        )
        response.raise_for_status()
        return response.json()

    def deduct_cu(self, word_count: int) -> Dict:
        """CU 扣费 API"""
        response = requests.post(
            f"{self.base_url}/api/cu/deduct",
            json={"words": word_count},
            headers={"Authorization": f"Bearer {self.api_key}"},
            timeout=self.timeout
        )
        response.raise_for_status()
        return response.json()

重试策略:

  • 指数退避重试
  • 最大重试次数:3次
  • 超时配置:30秒

Frontend Architecture

PyQt6 ModelView 架构:

# ui/models/task_model.py
from PyQt6.QtCore import QAbstractTableModel, Qt

class TaskModel(QAbstractTableModel):
    """任务数据模型"""

    def __init__(self):
        super().__init__()
        self._tasks = []

    def rowCount(self, parent=None):
        return len(self._tasks)

    def columnCount(self, parent=None):
        return 5  # work_id, status, progress, start_time, end_time

    def data(self, index, role=Qt.ItemDataRole.DisplayRole):
        if not index.isValid() or role != Qt.ItemDataRole.DisplayRole:
            return None
        return self._tasks[index.row()][index.column()]

    def update_task(self, work_id: str, status: str, progress: int):
        """更新任务状态"""
        row = self._find_row(work_id)
        if row is not None:
            self._tasks[row]['status'] = status
            self._tasks[row]['progress'] = progress
            self.dataChanged.emit(self.index(row, 0), self.index(row, 4))

# ui/main_window.py
from PyQt6.QtWidgets import QMainWindow, QTableView
from ui.models.task_model import TaskModel

class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()
        self.task_model = TaskModel()

        self.task_table = QTableView()
        self.task_table.setModel(self.task_model)

进度通知机制 (Observer 模式):

# core/events.py
from PyQt6.QtCore import QObject, pyqtSignal

class ProgressEmitter(QObject):
    """进度事件发射器"""

    stage_progress = pyqtSignal(str, int)  # (work_id, percentage)
    stage_completed = pyqtSignal(str, str)    # (work_id, stage_name)
    stage_failed = pyqtSignal(str, str, str)  # (work_id, stage_name, error)
    task_finished = pyqtSignal(str)           # (work_id)

# modules/cleaning/cleaner.py
from core.events import ProgressEmitter

class TextCleaner:
    def __init__(self, emitter: ProgressEmitter):
        self.emitter = emitter

    def clean(self, text: str, work_id: str) -> str:
        # 执行清洗
        cleaned = self._apply_rules(text)

        # 发送进度通知
        self.emitter.stage_progress.emit(work_id, 100)
        self.emitter.stage_completed.emit(work_id, "cleaning")

        return cleaned

Infrastructure & Deployment

打包策略:

# pyproject.toml
[build-system]
requires = ["setuptools>=68.0", "wheel", "pyinstaller>=6.0"]
build-backend = "setuptools.build_meta"

[tool.pyinstaller]
name = "序灵Matrix助手"
console = true
onefile = true
icon = "assets/icon.ico"
add-data = [
    ("models/*", "models/"),
    ("assets/*", "assets/")
]
hiddenimports = [
    "PyQt6.sip",
    "ctranslate2"
]

环境配置:

配置项 位置 说明
配置文件 ~/.config/xling-matrix/config.yaml API密钥、GPU设置
数据目录 ~/Documents/xling-matrix/ 输入/输出文件
日志目录 ~/Documents/xling-matrix/logs/ 运行日志
模型目录 ~/.local/share/xling-matrix/models/ 翻译模型

日志系统:

# infrastructure/logger.py
import logging
from pathlib import Path

def setup_logger(name: str, log_dir: Path) -> logging.Logger:
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)

    # 文件处理器
    file_handler = logging.FileHandler(
        log_dir / f"{name}.log",
        encoding='utf-8'
    )
    file_handler.setFormatter(
        logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    )
    logger.addHandler(file_handler)

    # 控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(
        logging.Formatter('%(levelname)s: %(message)s')
    )
    logger.addHandler(console_handler)

    return logger

Decision Impact Analysis

Implementation Sequence:

  1. Crash-Safe 持久化层 → 所有模块的基础
  2. PyQt6 ModelView 架构 → UI 层的基础
  3. 六个核心模块 → 业务逻辑实现
  4. GPU 推理优化 → 性能优化
  5. API 集成与上传 → 外部对接

Cross-Component Dependencies:

                    ┌─────────────┐
                    │   GUI UI   │
                    └──────┬──────┘
                           │ Observer
                    ┌──────▼──────┐
                    │  Scheduler   │
                    └──────┬──────┘
                           │
        ┌──────────────────┼──────────────────┐
        │                  │                  │
   ┌────▼────┐     ┌────▼────┐     ┌────▼────┐
   │Fingerprint│    │Cleaning  │    │Translation│
   └────┬────┘     └────┬────┘     └────┬────┘
        │                  │                  │
        └────────┬────────┴──────────────────┘
                 │
         ┌───────▼────────┐
         │  Storage Layer  │
         │ (Crash-Safe)  │
         └─────────────────┘

Implementation Patterns & Consistency Rules

Pattern Categories Defined

Critical Conflict Points Identified: 8 个领域需要一致性规则以确保 AI 代理代码兼容

Core Design Patterns

1. Pipeline 模式(翻译流水线)

所有翻译任务必须通过统一的 Pipeline 执行:

# core/pipeline.py
from dataclasses import dataclass
from typing import Protocol

@dataclass
class PipelineContext:
    """流水线上下文"""
    work_id: str
    input_file: str
    output_dir: str
    current_stage: str
    error: str | None = None
    metadata: dict = None

class PipelineStage(Protocol):
    """流水线阶段协议"""

    def name(self) -> str:
        """返回阶段名称"""
        ...

    def execute(self, context: PipelineContext) -> PipelineContext:
        """执行阶段逻辑"""
        ...

class TranslationPipeline:
    """翻译流水线"""

    def __init__(self):
        self.stages: list[PipelineStage] = []

    def add_stage(self, stage: PipelineStage) -> None:
        self.stages.append(stage)

    def execute(self, context: PipelineContext) -> PipelineContext:
        for stage in self.stages:
            context.current_stage = stage.name()
            try:
                context = stage.execute(context)
                if context.error:
                    context.error = f"{stage.name()}: {context.error}"
                    return context
            except Exception as e:
                context.error = f"{stage.name()}: {str(e)}"
                return context
        return context

2. State Machine(任务状态)

任务状态转换必须遵循状态机规则:

# core/state.py
from enum import Enum
from dataclasses import dataclass

class TaskState(Enum):
    """任务状态枚举"""
    PENDING = "pending"
    RUNNING = "running"
    PAUSED = "paused"
    SUCCESS = "success"
    FAILED = "failed"

@dataclass
class TaskTransition:
    """状态转换"""
    from_state: TaskState
    to_state: TaskState
    is_valid: bool
    error: str | None = None

class TaskStateMachine:
    """任务状态机"""

    # 允许的状态转换
    VALID_TRANSITIONS = {
        TaskState.PENDING: [TaskState.RUNNING],
        TaskState.RUNNING: [TaskState.PAUSED, TaskState.SUCCESS, TaskState.FAILED],
        TaskState.PAUSED: [TaskState.RUNNING, TaskState.FAILED],
        TaskState.SUCCESS: [],  # 终态
        TaskState.FAILED: [TaskState.PENDING],  # 可重试
    }

    def can_transition(self, from_state: TaskState, to_state: TaskState) -> bool:
        return to_state in self.VALID_TRANSITIONS.get(from_state, [])

    def transition(self, current: TaskState, target: TaskState) -> TaskTransition:
        if not self.can_transition(current, target):
            valid_targets = ", ".join(s.value for s in self.VALID_TRANSITIONS.get(current, []))
            return TaskTransition(current, target, False,
                f"Invalid transition: {current.value} -> {target.value}. Valid targets: {valid_targets}")
        return TaskTransition(current, target, True)

3. Repository 模式(数据持久化)

所有数据访问必须通过 Repository 接口:

# core/repository.py
from abc import ABC, abstractmethod
from typing import TypeVar, Generic

T = TypeVar('T')

class Repository(ABC, Generic[T]):
    """Repository 接口"""

    @abstractmethod
    def save(self, entity: T) -> None:
        """保存实体"""
        pass

    @abstractmethod
    def load(self, id: str) -> T | None:
        """加载实体"""
        pass

# infrastructure/repositories/progress_repository.py
from core.repository import Repository
from infrastructure.storage import AtomicWriter

class CrashSafeProgressRepository(Repository[Progress]):
    """Crash-Safe 进度仓储"""

    def __init__(self, file_path: str):
        self.file_path = file_path

    def save(self, progress: Progress) -> None:
        AtomicWriter.write(self.file_path, progress.to_dict())

    def load(self, work_id: str) -> Progress | None:
        if not os.path.exists(self.file_path):
            return None
        with open(self.file_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
            return Progress.from_dict(data.get(work_id))

4. Observer 模式(进度通知)

使用 PyQt6 信号槽机制实现进度通知:

# core/events.py
from PyQt6.QtCore import QObject, pyqtSignal
from typing import Protocol

class ProgressObserver(Protocol):
    """进度观察者协议"""

    def on_stage_start(self, work_id: str, stage: str) -> None:
        """阶段开始"""
        ...

    def on_stage_progress(self, work_id: str, stage: str, percent: int) -> None:
        """阶段进度"""
        ...

    def on_stage_complete(self, work_id: str, stage: str) -> None:
        """阶段完成"""
        ...

    def on_stage_error(self, work_id: str, stage: str, error: str) -> None:
        """阶段错误"""
        ...

class ProgressEmitter(QObject):
    """进度事件发射器"""

    # 定义信号
    stage_started = pyqtSignal(str, str)      # (work_id, stage)
    stage_progress = pyqtSignal(str, str, int)  # (work_id, stage, percent)
    stage_completed = pyqtSignal(str, str)    # (work_id, stage)
    stage_failed = pyqtSignal(str, str, str)  # (work_id, stage, error)
    task_finished = pyqtSignal(str, str)      # (work_id, final_state)

# 使用示例
class TranslationStage:
    def __init__(self, emitter: ProgressEmitter):
        self.emitter = emitter

    def execute(self, context: PipelineContext) -> PipelineContext:
        self.emitter.stage_started.emit(context.work_id, "translation")

        try:
            for i, batch in enumerate(batches):
                # 执行翻译
                self._translate_batch(batch)
                progress = int((i + 1) / len(batches) * 100)
                self.emitter.stage_progress.emit(context.work_id, "translation", progress)

            self.emitter.stage_completed.emit(context.work_id, "translation")
            return context
        except Exception as e:
            self.emitter.stage_failed.emit(context.work_id, "translation", str(e))
            context.error = str(e)
            return context

Naming Patterns

代码命名约定:

类别 约定 示例
类名 PascalCase TranslationPipeline, TaskStateMachine
函数名 snake_case execute_pipeline(), load_progress()
变量名 snake_case work_id, batch_size
常量 UPPER_SNAKE_CASE MAX_BATCH_SIZE, DEFAULT_TIMEOUT
私有成员 前缀下划线 _internal_state, _helper()
协议/接口 PascalCase + Protocol 后缀 ProgressObserver, Repository

文件命名约定:

Structure Patterns

项目组织原则:

src/xling_matrix/
├── core/              # 核心领域模型(无依赖)
│   ├── models.py      # 数据模型
│   ├── state.py       # 状态机
│   ├── pipeline.py    # 流水线
│   ├── events.py      # 事件系统
│   └── repository.py  # Repository 接口
│
├── modules/           # 业务模块(依赖 core)
│   └── <module>/
│       ├── __init__.py
│       ├── <module>_stage.py    # 阶段实现
│       ├── <module>_service.py  # 服务逻辑
│       └── models.py            # 模块特定模型
│
├── ui/                # UI 层(依赖 core)
│   ├── main_window.py
│   ├── widgets/
│   └── dialogs/
│
└── infrastructure/    # 基础设施(可依赖任何层)
    ├── storage/
    ├── gpu/
    ├── network/
    └── logging/

测试组织原则:

tests/
├── unit/              # 单元测试
│   ├── test_core/
│   │   ├── test_pipeline.py
│   │   ├── test_state.py
│   │   └── test_events.py
│   └── test_modules/
│       ├── test_translation.py
│       └── test_cleaning.py
│
├── integration/       # 集成测试
│   ├── test_workflow_integration.py
│   └── test_api_integration.py
│
└── fixtures/          # 测试数据
    ├── sample_novels/
    └── expected_outputs/

Format Patterns

数据文件格式:

所有 JSON 文件必须遵循以下格式:

# 通用 JSON 结构
{
    "version": "1.0",           # 数据版本
    "work_id": "uuid",          # 工作ID
    "timestamp": "ISO-8601",    # 时间戳
    "data": { ... }             # 实际数据
}

进度文件格式 (progress.json):

{
    "version": "1.0",
    "work_id": "abc123",
    "state": "running",
    "current_stage": "translation",
    "stages": {
        "fingerprint": {"status": "success", "progress": 100},
        "cleaning": {"status": "success", "progress": 100},
        "terminology": {"status": "success", "progress": 100},
        "translation": {"status": "running", "progress": 45},
        "upload": {"status": "pending", "progress": 0}
    },
    "created_at": "2026-03-13T12:00:00Z",
    "updated_at": "2026-03-13T12:30:00Z"
}

错误响应格式:

# 统一错误格式
@dataclass
class ErrorInfo:
    code: str           # 错误代码 (如 "STAGE_FAILED", "GPU_OOM")
    message: str        # 用户友好的错误消息
    detail: str | None  # 详细错误信息(日志级别)
    stage: str | None   # 失败的阶段

# 错误代码规范
class ErrorCode:
    # 通用错误
    UNKNOWN_ERROR = "UNKNOWN_ERROR"
    INVALID_INPUT = "INVALID_INPUT"
    FILE_NOT_FOUND = "FILE_NOT_FOUND"

    # 阶段错误
    FINGERPRINT_FAILED = "FINGERPRINT_FAILED"
    CLEANING_FAILED = "CLEANING_FAILED"
    TERMINOLOGY_FAILED = "TERMINOLOGY_FAILED"
    TRANSLATION_FAILED = "TRANSLATION_FAILED"
    UPLOAD_FAILED = "UPLOAD_FAILED"

    # GPU 错误
    GPU_NOT_AVAILABLE = "GPU_NOT_AVAILABLE"
    GPU_OOM = "GPU_OOM"

    # 网络错误
    API_CONNECTION_FAILED = "API_CONNECTION_FAILED"
    API_TIMEOUT = "API_TIMEOUT"

Communication Patterns

事件命名约定:

# 事件命名格式: <subject>_<action>
class Events:
    # 阶段事件
    STAGE_STARTED = "stage.started"
    STAGE_PROGRESS = "stage.progress"
    STAGE_COMPLETED = "stage.completed"
    STAGE_FAILED = "stage.failed"

    # 任务事件
    TASK_CREATED = "task.created"
    TASK_STARTED = "task.started"
    TASK_PAUSED = "task.paused"
    TASK_RESUMED = "task.resumed"
    TASK_FINISHED = "task.finished"

日志级别使用:

类型 命名 示例
模块文件 snake_case.py translation_stage.py, progress_repository.py
测试文件 test_.py test_pipeline.py, test_translation.py
包目录 snake_case translation/, cleaning/
级别 用途 示例
DEBUG 详细调试信息 "Batch size: 16, GPU memory: 3.2GB"
INFO 正常操作流程 "Stage 'translation' started for work_id: abc123"
WARNING 可恢复的问题 "GPU memory low, reducing batch size to 8"
ERROR 操作失败但可恢复 "API request failed, retrying (1/3)"
CRITICAL 严重错误需人工介入 "GPU OOM, cannot continue"

Process Patterns

Crash-Safe 写入模式(强制执行):

# 所有持久化操作必须使用此模式
from infrastructure.storage import AtomicWriter

# 正确示例
def save_progress(progress: Progress) -> None:
    AtomicWriter.write("progress.json", progress.to_dict())

# 错误示例 - 禁止直接写入
def save_progress_WRONG(progress: Progress) -> None:
    with open("progress.json", "w") as f:
        json.dump(progress.to_dict(), f)  # ❌ 非 Crash-Safe

错误处理模式:

# 统一错误处理流程
def execute_stage(context: PipelineContext) -> PipelineContext:
    try:
        # 业务逻辑
        result = do_work(context)
        return context
    except GPUOutOfMemoryError as e:
        # 特定错误处理
        return handle_gpu_oom(context, e)
    except APIError as e:
        # 重试逻辑
        return retry_with_backoff(context, e)
    except Exception as e:
        # 通用错误处理
        context.error = str(e)
        logger.error(f"Stage failed: {e}", exc_info=True)
        return context

GPU 资源管理模式:

# infrastructure/gpu/manager.py
import ctranslate2
from typing import ContextManager

class GPUManager:
    """GPU 资源管理器"""

    _instance = None
    _translator = None

    @classmethod
    def get_instance(cls) -> 'GPUManager':
        if cls._instance is None:
            cls._instance = cls()
        return cls._instance

    def initialize(self, model_path: str) -> None:
        """初始化 GPU 翻译器"""
        if self._translator is None:
            self._translator = ctranslate2.Translator(
                model_path,
                device=self._detect_device(),
                device_index=0,
                compute_type="float16",
                inter_threads=4
            )

    def _detect_device(self) -> str:
        """检测可用设备"""
        try:
            import torch
            if torch.cuda.is_available():
                return "cuda"
        except:
            pass
        return "cpu"  # 降级到 CPU

    def translate_batch(self, tokens: list[list[str]]) -> list[list[str]]:
        """执行批处理翻译"""
        return self._translator.translate_batch(tokens)

Enforcement Guidelines

All AI Agents MUST:

  1. 使用 Crash-Safe 写入: 所有持久化操作必须通过 AtomicWriter
  2. 遵循状态机规则: 状态转换必须通过 TaskStateMachine 验证
  3. 使用 Repository 接口: 数据访问必须实现 Repository 协议
  4. 通过信号通知进度: 使用 ProgressEmitter 发送进度事件
  5. 遵循命名约定: 代码命名必须符合定义的约定
  6. 返回统一错误格式: 所有错误必须返回 ErrorInfo 结构

Complete Project Structure

Directory Layout

xling-matrix-assistant/
├── src/
│   └── xling_matrix/
│       ├── __init__.py
│       ├── __main__.py
│       │
│       ├── core/                          # 核心领域层
│       │   ├── __init__.py
│       │   ├── models.py                  # 数据模型定义
│       │   ├── state.py                   # 状态机实现
│       │   ├── pipeline.py                # 流水线编排
│       │   ├── events.py                  # 事件系统
│       │   └── repository.py              # Repository 接口
│       │
│       ├── modules/                       # 业务模块层
│       │   ├── __init__.py
│       │   │
│       │   ├── fingerprint/               # 指纹模块 (FR1-FR8)
│       │   │   ├── __init__.py
│       │   │   ├── fingerprint_stage.py   # 指纹查重阶段
│       │   │   ├── fingerprint_service.py # 指纹服务
│       │   │   └── models.py              # 指纹数据模型
│       │   │
│       │   ├── cleaning/                  # 清洗模块 (FR9-FR16)
│       │   │   ├── __init__.py
│       │   │   ├── cleaning_stage.py
│       │   │   ├── rule_engine.py         # 正则替换引擎
│       │   │   ├── formatter.py           # 格式标准化
│       │   │   └── models.py
│       │   │
│       │   ├── terminology/               # 术语模块 (FR17-FR24)
│       │   │   ├── __init__.py
│       │   │   ├── terminology_stage.py
│       │   │   ├── extractor.py           # 术语提取器
│       │   │   ├── library.py             # 术语库管理
│       │   │   └── models.py
│       │   │
│       │   ├── translation/               # 翻译模块 (FR25-FR33)
│       │   │   ├── __init__.py
│       │   │   ├── translation_stage.py
│       │   │   ├── translator.py          # CTranslate2 封装
│       │   │   ├── batch_processor.py     # 批处理优化
│       │   │   └── models.py
│       │   │
│       │   └── upload/                    # 上传模块 (FR34-FR40)
│       │       ├── __init__.py
│       │       ├── upload_stage.py
│       │       ├── uploader.py            # 平台上传
│       │       └── models.py
│       │
│       ├── ui/                            # 表示层
│       │   ├── __init__.py
│       │   ├── main_window.py             # 主窗口
│       │   ├── widgets/
│       │   │   ├── __init__.py
│       │   │   ├── task_list_widget.py    # 任务列表
│       │   │   ├── progress_widget.py     # 进度显示
│       │   │   └── log_widget.py          # 日志显示
│       │   ├── dialogs/
│       │   │   ├── __init__.py
│       │   │   ├── new_task_dialog.py     # 新建任务对话框
│       │   │   ├── settings_dialog.py     # 设置对话框
│       │   │   └── fingerprint_dialog.py  # 指纹审核对话框
│       │   └── models/
│       │       ├── __init__.py
│       │       └── task_model.py          # 任务数据模型
│       │
│       └── infrastructure/                # 基础设施层
│           ├── __init__.py
│           ├── storage/
│           │   ├── __init__.py
│           │   ├── atomic_writer.py       # Crash-Safe 写入
│           │   └── file_lock.py           # 文件锁机制
│           ├── gpu/
│           │   ├── __init__.py
│           │   └── manager.py             # GPU 资源管理
│           ├── network/
│           │   ├── __init__.py
│           │   ├── api_client.py          # 平台 API 客户端
│           │   └── retry.py               # 重试策略
│           └── logging/
│               ├── __init__.py
│               └── logger.py              # 日志配置
│
├── tests/
│   ├── __init__.py
│   ├── conftest.py                        # pytest 配置
│   │
│   ├── unit/
│   │   ├── test_core/
│   │   │   ├── __init__.py
│   │   │   ├── test_pipeline.py
│   │   │   ├── test_state.py
│   │   │   └── test_events.py
│   │   ├── test_modules/
│   │   │   ├── test_fingerprint.py
│   │   │   ├── test_cleaning.py
│   │   │   ├── test_terminology.py
│   │   │   ├── test_translation.py
│   │   │   └── test_upload.py
│   │   └── test_infrastructure/
│   │       ├── test_storage.py
│   │       ├── test_gpu_manager.py
│   │       └── test_api_client.py
│   │
│   ├── integration/
│   │   ├── __init__.py
│   │   ├── test_workflow.py               # 完整工作流测试
│   │   └── test_api_integration.py
│   │
│   └── fixtures/
│       ├── novels/
│       │   └── sample_chinese.txt
│       └── expected/
│           └── sample_translated.json
│
├── models/                                # 翻译模型文件
│   └── m2m100_418m_ct2/
│
├── assets/                                # 资源文件
│   ├── icons/
│   │   └── app_icon.ico
│   └── config/
│       └── default_config.yaml
│
├── docs/
│   ├── architecture.md                    # 架构文档
│   ├── api.md                             # API 文档
│   └── user_guide.md                      # 用户指南
│
├── pyproject.toml                         # 项目配置
├── README.md
├── LICENSE
└── .gitignore

Module Dependencies

┌─────────────────────────────────────────────────────────────┐
│                        UI Layer                             │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐      │
│  │ MainWindow   │  │   Widgets    │  │   Dialogs    │      │
│  └──────┬───────┘  └──────┬───────┘  └──────┬───────┘      │
└─────────┼──────────────────┼──────────────────┼─────────────┘
          │                  │                  │
          ▼                  ▼                  ▼
┌─────────────────────────────────────────────────────────────┐
│                     Application Layer                        │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐      │
│  │  Scheduler   │  │  Workflows   │  │ State Machine│      │
│  └──────┬───────┘  └──────┬───────┘  └──────┬───────┘      │
└─────────┼──────────────────┼──────────────────┼─────────────┘
          │                  │                  │
          ▼                  ▼                  ▼
┌─────────────────────────────────────────────────────────────┐
│                      Domain Layer                            │
│  ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌─────────┐  │
│  │Fingerprint │ │  Cleaning  │ │Terminology │ │Translation│ │
│  └────┬───────┘ └────┬───────┘ └────┬───────┘ └────┬────┘  │
│       └────────────────┴────────────────┴────────┘         │
│                          │                                  │
│                    ┌─────▼─────┐                            │
│                    │   Core    │                            │
│                    │(Pipeline, │                            │
│                    │ Events,   │                            │
│                    │ Models)   │                            │
│                    └───────────┘                            │
└─────────────────────────────────────────────────────────────┘
                          │
                          ▼
┌─────────────────────────────────────────────────────────────┐
│                  Infrastructure Layer                        │
│  ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌─────────┐  │
│  │  Storage   │ │     GPU    │ │  Network   │ │ Logging │  │
│  │(Crash-Safe)│ │  Manager   │ │  API Client│ │         │  │
│  └────────────┘ └────────────┘ └────────────┘ └─────────┘  │
└─────────────────────────────────────────────────────────────┘

API Interface Design

Internal Module APIs

Pipeline Stage 接口:

# core/pipeline.py
class PipelineStage(Protocol):
    """流水线阶段协议 - 所有阶段必须实现"""

    def name(self) -> str:
        """返回阶段唯一标识符"""
        ...

    def execute(self, context: PipelineContext) -> PipelineContext:
        """执行阶段逻辑

        Args:
            context: 流水线上下文

        Returns:
            更新后的上下文。如果失败,设置 context.error
        """
        ...

    def estimate_progress(self, context: PipelineContext) -> int:
        """估算当前进度百分比"""
        ...

Repository 接口:

# core/repository.py
class Repository(ABC, Generic[T]):
    """数据仓储接口 - 所有数据访问必须实现"""

    @abstractmethod
    def save(self, entity: T) -> None:
        """保存实体(使用 Crash-Safe 写入)"""
        pass

    @abstractmethod
    def load(self, id: str) -> T | None:
        """加载实体"""
        pass

    @abstractmethod
    def delete(self, id: str) -> bool:
        """删除实体"""
        pass

External API Integration

平台 API 客户端接口:

# infrastructure/network/platform_api.py
class PlatformAPIClient:
    """平台 API 客户端 - 对接外部平台"""

    BASE_URL: str
    API_KEY: str
    TIMEOUT: int = 30

    # 指纹查重 API
    def check_fingerprint(self, text: str) -> FingerprintResult:
        """检查文本指纹

        Args:
            text: 待检查的文本

        Returns:
            FingerprintResult: 包含相似度、匹配章节等

        Raises:
            APIConnectionError: 网络连接失败
            APITimeoutError: 请求超时
            APIError: API 返回错误
        """
        ...

    # 章节上传 API
    def upload_chapter(self, chapter: ChapterData) -> UploadResult:
        """上传翻译章节

        Args:
            chapter: 章节数据(标题、内容、字数等)

        Returns:
            UploadResult: 包含章节 ID、URL 等

        Raises:
            APIConnectionError, APITimeoutError, APIError
        """
        ...

    # CU 扣费 API
    def deduct_cu(self, word_count: int) -> DeductResult:
        """扣除 CU

        Args:
            word_count: 字数

        Returns:
            DeductResult: 包含剩余 CU
        """
        ...

    # 健康检查
    def health_check(self) -> bool:
        """检查 API 连接状态"""
        ...

API 请求/响应格式:

1. 指纹查重 API

POST /api/fingerprint/check
Content-Type: application/json
Authorization: Bearer {api_key}

# Request
{
  "fingerprint": "md5hash",
  "sample": "第一章样本文本...",
  "work_id": "uuid"
}

# Response
{
  "exists": false,
  "work_id": "uuid",
  "similarity": 0.0,
  "matches": []
}

2. 上传章节 API

POST /api/chapters
Content-Type: application/json
Authorization: Bearer {api_key}

# Request
{
  "work_id": "uuid",
  "chapter_id": "Chapter 0001",
  "title": "第一章 开始",
  "content_en": "Chapter 1 The Beginning...",
  "word_count": 1234,
  "source_language": "zh",
  "target_language": "en"
}

# Response
{
  "success": true,
  "chapter_id": "Chapter 0001",
  "chapter_url": "https://platform.com/novels/uuid/chapters/Chapter%200001",
  "uploaded_at": "2026-03-13T12:00:00Z"
}

3. CU 扣费 API

POST /api/cu/deduct
Content-Type: application/json
Authorization: Bearer {api_key}

# Request
{
  "work_id": "uuid",
  "words": 1234,
  "chapter_id": "Chapter 0001"
}

# Response
{
  "success": true,
  "deducted": 12.34,
  "balance": 987.66,
  "transaction_id": "txn_abc123"
}

错误响应格式:

# Error Response
{
  "error": {
    "code": "INVALID_API_KEY",
    "message": "API密钥无效或已过期",
    "detail": "请联系客服获取新的API密钥"
  }
}

Data Model Design

Core Data Models

# core/models.py
from dataclasses import dataclass, field
from datetime import datetime
from typing import Literal
from enum import Enum

class TaskState(Enum):
    PENDING = "pending"
    RUNNING = "running"
    PAUSED = "paused"
    SUCCESS = "success"
    FAILED = "failed"

class StageStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    SUCCESS = "success"
    FAILED = "failed"
    SKIPPED = "skipped"

@dataclass
class StageProgress:
    """阶段进度"""
    status: StageStatus
    progress: int  # 0-100
    error: str | None = None
    started_at: datetime | None = None
    completed_at: datetime | None = None

@dataclass
class Progress:
    """任务进度"""
    work_id: str
    state: TaskState
    current_stage: str
    stages: dict[str, StageProgress] = field(default_factory=dict)
    input_file: str = ""
    output_dir: str = ""
    created_at: datetime = field(default_factory=datetime.now)
    updated_at: datetime = field(default_factory=datetime.now)

    def to_dict(self) -> dict:
        """序列化为字典"""
        return {
            "version": "1.0",
            "work_id": self.work_id,
            "state": self.state.value,
            "current_stage": self.current_stage,
            "stages": {
                name: {
                    "status": stage.status.value,
                    "progress": stage.progress,
                    "error": stage.error,
                    "started_at": stage.started_at.isoformat() if stage.started_at else None,
                    "completed_at": stage.completed_at.isoformat() if stage.completed_at else None,
                }
                for name, stage in self.stages.items()
            },
            "input_file": self.input_file,
            "output_dir": self.output_dir,
            "created_at": self.created_at.isoformat(),
            "updated_at": self.updated_at.isoformat(),
        }

    @classmethod
    def from_dict(cls, data: dict) -> 'Progress':
        """从字典反序列化"""
        stages = {
            name: StageProgress(
                status=StageStatus(stage["status"]),
                progress=stage["progress"],
                error=stage.get("error"),
                started_at=datetime.fromisoformat(stage["started_at"]) if stage.get("started_at") else None,
                completed_at=datetime.fromisoformat(stage["completed_at"]) if stage.get("completed_at") else None,
            )
            for name, stage in data.get("stages", {}).items()
        }
        return cls(
            work_id=data["work_id"],
            state=TaskState(data["state"]),
            current_stage=data["current_stage"],
            stages=stages,
            input_file=data.get("input_file", ""),
            output_dir=data.get("output_dir", ""),
            created_at=datetime.fromisoformat(data["created_at"]),
            updated_at=datetime.fromisoformat(data["updated_at"]),
        )

@dataclass
class Term:
    """术语条目"""
    source: str           # 原文
    target: str           # 译文
    category: str = ""    # 分类
    locked: bool = False  # 是否锁定

@dataclass
class TerminologyLibrary:
    """术语库"""
    version: str = "1.0"
    terms: list[Term] = field(default_factory=list)

@dataclass
class ChapterData:
    """章节数据"""
    title: str
    content: str
    word_count: int
    source_language: str = "zh"
    target_language: str = "en"

@dataclass
class Chapter:
    """章节实体"""
    chapter_id: str        # "Chapter 0001"
    part_index: int        # 卷索引
    title_src: str         # 原文标题
    content: str           # 原文内容
    content_en: str | None = None  # 译文内容
    word_count: int = 0
    translated_at: datetime | None = None

@dataclass
class Term:
    """术语条目"""
    source: str              # 原文
    translation: str | None  # 译文
    count: int = 0           # 出现次数
    chapters: int = 0        # 涉及章节数
    locked: bool = False     # 是否锁定

    def to_dict(self) -> dict:
        return {
            "source": self.source,
            "translation": self.translation,
            "count": self.count,
            "chapters": self.chapters,
            "locked": self.locked
        }

    @classmethod
    def from_dict(cls, data: dict) -> 'Term':
        return cls(
            source=data["source"],
            translation=data.get("translation"),
            count=data.get("count", 0),
            chapters=data.get("chapters", 0),
            locked=data.get("locked", False)
        )

Extended Data Models

指纹数据模型:

@dataclass
class FingerprintData:
    """指纹查重数据"""
    work_id: str
    fingerprint: str       # MD5 hash
    sample: str            # 样本文本(前1000字)
    exists: bool = False
    similarity: float = 0.0
    matches: list[str] = field(default_factory=list)  # 匹配的 work_id 列表

@dataclass
class FingerprintResult:
    """指纹查重结果"""
    exists: bool
    work_id: str
    similarity: float
    matches: list[dict] = field(default_factory=list)
    # matches format: [{"work_id": "uuid", "similarity": 0.95, "chapter": "Chapter 0001"}]

上传队列模型:

@dataclass
class UploadQueueItem:
    """上传队列项"""
    work_id: str
    chapter_id: str
    title: str
    content_en: str
    word_count: int
    retry_count: int = 0
    max_retries: int = 3
    created_at: datetime = field(default_factory=datetime.now)

@dataclass
class UploadFailedItem:
    """上传失败项(JSONL 格式)"""
    work_id: str
    chapter_id: str
    error_code: str
    error_message: str
    failed_at: datetime = field(default_factory=datetime.now)
    retry_count: int = 0

Security Design

Data Protection

1. 本地数据存储策略:

  • 所有用户数据 100% 存储在本地
  • 不上传任何原文到云端(除平台 API 上传翻译结果外)
  • 配置文件(API 密钥)使用系统密钥环存储

2. API 密钥管理:

# infrastructure/security/secret_manager.py
import keyring
from typing import Optional

class SecretManager:
    """密钥管理器 - 使用系统密钥环"""

    SERVICE_NAME = "xling-matrix-assistant"

    def set_api_key(self, key: str) -> None:
        """存储 API 密钥"""
        keyring.set_password(self.SERVICE_NAME, "platform_api", key)

    def get_api_key(self) -> Optional[str]:
        """获取 API 密钥"""
        return keyring.get_password(self.SERVICE_NAME, "platform_api")

    def delete_api_key(self) -> None:
        """删除 API 密钥"""
        keyring.delete_password(self.SERVICE_NAME, "platform_api")

3. 文件权限控制:

# infrastructure/storage/permissions.py
import os
import stat

def set_secure_permissions(filepath: str) -> None:
    """设置安全的文件权限(仅用户可读写)"""
    os.chmod(filepath, stat.S_IRUSR | stat.S_IWUSR)

License Compliance

依赖许可证验证:

# tools/license_checker.py
import subprocess
import json

ALLOWED_LICENSES = {"MIT", "Apache-2.0", "BSD-3-Clause", "PSF-2.0"}
BANNED_LICENSES = {"GPL", "AGPL", "LGPL", "SSPL", "CPAL"}

def check_dependency_licenses() -> dict:
    """检查所有依赖的许可证"""
    result = subprocess.run(
        ["pip", "show", "--json"],
        capture_output=True,
        text=True
    )
    packages = json.loads(result.stdout)

    issues = []
    for pkg in packages:
        license_ = pkg.get("License", "UNKNOWN")
        if any(banned in license_ for banned in BANNED_LICENSES):
            issues.append({
                "package": pkg["Name"],
                "license": license_,
                "severity": "BLOCKING",
                "reason": "Contains GPL contamination"
            })
        elif license_ not in ALLOWED_LICENSES and license_ != "UNKNOWN":
            issues.append({
                "package": pkg["Name"],
                "license": license_,
                "severity": "WARNING",
                "reason": "License not in whitelist"
            })

    return {"valid": len(issues) == 0, "issues": issues}

4. 许可证管理(Growth 阶段):

# infrastructure/security/license_manager.py
import hashlib
import platform
import requests

class LicenseManager:
    """许可证管理器 - 硬件绑定与在线激活验证"""

    def generate_fingerprint(self) -> str:
        """生成硬件指纹(用于软件绑定)"""
        # 获取硬件信息
        machine_id = platform.node()
        cpu_info = platform.processor()
        mac_address = self._get_mac_address()

        # 组合生成指纹
        fingerprint_data = f"{machine_id}:{cpu_info}:{mac_address}"
        return hashlib.md5(fingerprint_data.encode()).hexdigest()

    def _get_mac_address(self) -> str:
        """获取本机 MAC 地址"""
        try:
            import uuid
            return ':'.join(['{:02x}'.format((uuid.getnode() >> elements) & 0xff)
                                   for elements in range(0, 2*6, 8)][::-1])
        except:
            return "unknown"

    def verify_activation(self, activation_key: str) -> bool:
        """在线验证激活密钥

        Args:
            activation_key: 用户输入的激活密钥

        Returns:
            bool: 激活是否有效
        """
        try:
            fingerprint = self.generate_fingerprint()
            response = requests.post(
                "https://license.xling-matrix.com/verify",
                json={
                    "activation_key": activation_key,
                    "fingerprint": fingerprint,
                    "version": "0.1.0"
                },
                timeout=10
            )
            response.raise_for_status()
            return response.json().get("valid", False)
        except Exception:
            return False

    def check_expiration(self, activation_key: str) -> dict | None:
        """检查激活是否过期"""
        try:
            response = requests.post(
                "https://license.xling-matrix.com/check",
                json={"activation_key": activation_key},
                timeout=10
            )
            response.raise_for_status()
            data = response.json()
            return {
                "expired": data.get("expired", False),
                "expires_at": data.get("expires_at"),
                "days_remaining": data.get("days_remaining")
            }
        except Exception:
            return None

    def activate_offline(self, activation_key: str, max_credits: int = 1000) -> bool:
        """离线激活(本地验证签名)"""
        # TODO: 实现离线激活逻辑(需要服务器生成签名密钥对)
        return True

激活状态存储:

# infrastructure/storage/license_storage.py
import json
from pathlib import Path

class LicenseStorage:
    """激活状态存储"""

    ACTIVATION_FILE = Path.home() / ".config" / "xling-matrix" / "activation.json"

    def save_activation(self, activation_key: str, fingerprint: str) -> None:
        """保存激活信息"""
        data = {
            "activation_key": activation_key,
            "fingerprint": fingerprint,
            "activated_at": datetime.now().isoformat(),
            "version": "0.1.0"
        }
        self.ACTIVATION_FILE.parent.mkdir(parents=True, exist_ok=True)
        AtomicWriter.write(str(self.ACTIVATION_FILE), data)

    def load_activation(self) -> dict | None:
        """加载激活信息"""
        if not self.ACTIVATION_FILE.exists():
            return None
        with open(self.ACTIVATION_FILE, 'r', encoding='utf-8') as f:
            return json.load(f)

    def is_activated(self) -> bool:
        """检查是否已激活"""
        activation = self.load_activation()
        if not activation:
            return False
        # 检查硬件指纹是否匹配
        current_fingerprint = LicenseManager().generate_fingerprint()
        return activation.get("fingerprint") == current_fingerprint

许可证验证流程:

启动 → 检查本地激活 → [无激活] 显示激活对话框
                ↓
         [有激活] 检查硬件指纹 → [不匹配] 重新激活
                ↓
         [匹配] 检查过期 → [已过期] 提示续费
                ↓
         [有效] 验证 CU 余额 → [不足] 提示充值
                ↓
         [有效] 允许使用

Performance Optimization

GPU Optimization

1. 批处理策略:

# modules/translation/batch_processor.py
class BatchProcessor:
    """批处理优化器"""

    def __init__(self, max_tokens: int = 4096):
        self.max_tokens = max_tokens

    def create_batches(self, texts: list[str]) -> list[list[str]]:
        """将文本分割为最优批次

        策略:
        1. 按 token 数量分组
        2. 每批接近 max_tokens 但不超过
        3. 相邻文本尽量在同一批(保持上下文)
        """
        batches = []
        current_batch = []
        current_tokens = 0

        for text in texts:
            tokens = self._count_tokens(text)
            if current_tokens + tokens > self.max_tokens and current_batch:
                batches.append(current_batch)
                current_batch = [text]
                current_tokens = tokens
            else:
                current_batch.append(text)
                current_tokens += tokens

        if current_batch:
            batches.append(current_batch)

        return batches

    def _count_tokens(self, text: str) -> int:
        """估算 token 数量(中文约 1.5 字符/token)"""
        return int(len(text) / 1.5)

2. 动态批次大小调整:

# infrastructure/gpu/batch_optimizer.py
import torch

class BatchSizeOptimizer:
    """动态批次大小优化器"""

    def __init__(self, initial_size: int = 16):
        self.current_size = initial_size
        self.min_size = 4
        self.max_size = 32

    def adjust_for_memory(self, oom_occurred: bool) -> int:
        """根据显存使用情况调整批次大小"""
        if oom_occurred:
            self.current_size = max(self.min_size, self.current_size // 2)
        else:
            # 逐步增加以找到最优值
            self.current_size = min(self.max_size, int(self.current_size * 1.2))
        return self.current_size

    def get_memory_info(self) -> dict:
        """获取 GPU 显存信息"""
        if not torch.cuda.is_available():
            return {"available": False}
        return {
            "available": True,
            "total_gb": torch.cuda.get_device_properties(0).total_memory / 1e9,
            "allocated_gb": torch.cuda.memory_allocated(0) / 1e9,
            "free_gb": (torch.cuda.get_device_properties(0).total_memory -
                      torch.cuda.memory_allocated(0)) / 1e9,
        }

I/O Optimization

1. 增量进度保存:

# infrastructure/storage/incremental.py
class IncrementalProgressSaver:
    """增量进度保存器 - 减少磁盘写入"""

    def __init__(self, threshold: int = 5):
        self.threshold = threshold  # 进度变化超过 5% 才保存
        self.last_saved = 0

    def should_save(self, current_progress: int) -> bool:
        return abs(current_progress - self.last_saved) >= self.threshold

    def mark_saved(self, progress: int) -> None:
        self.last_saved = progress

2. 文件读取优化:

# infrastructure/storage/chunked_reader.py
class ChunkedFileReader:
    """分块文件读取器 - 支持大文件"""

    def __init__(self, chunk_size: int = 8192):
        self.chunk_size = chunk_size

    def read_by_paragraphs(self, filepath: str) -> list[str]:
        """按段落读取文件(更适合小说)"""
        with open(filepath, 'r', encoding='utf-8') as f:
            content = f.read()
        # 按双换行符分割段落
        paragraphs = [p.strip() for p in content.split('\n\n') if p.strip()]
        return paragraphs

Deployment Architecture

Application Packaging

1. PyInstaller 配置:

# build/pyinstaller_spec.py
import sys
from PyInstaller.utils.hooks import collect_data_files, collect_submodules

block_cipher = None

datas = [
    ('models', 'models'),
    ('assets', 'assets'),
]

hiddenimports = [
    'PyQt6.sip',
    'ctranslate2',
    'torch',
]

a = Analysis(
    ['src/xling_matrix/__main__.py'],
    pathex=[],
    binaries=[],
    datas=datas,
    hiddenimports=hiddenimports,
    hookspath=[],
    hooksconfig={},
    runtime_hooks=[],
    excludes=[],
    win_no_prefer_redirects=False,
    win_private_assemblies=False,
    cipher=block_cipher,
    noarchive=False,
)

pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)

exe = EXE(
    pyz,
    a.scripts,
    a.binaries,
    a.zipfiles,
    a.datas,
    [],
    name='序灵Matrix助手',
    debug=False,
    bootloader_ignore_signals=False,
    strip=False,
    upx=True,
    upx_exclude=[],
    runtime_tmpdir=None,
    console=True,
    disable_windowed_traceback=False,
    argv_emulation=False,
    target_arch=None,
    codesign_identity=None,
    entitlements_file=None,
    icon='assets/icons/app_icon.ico',
)

2. 安装程序配置:

安装结构:
序灵Matrix助手/
├── 序灵Matrix助手.exe          # 主程序
├── models/                      # 翻译模型(首次运行时下载)
│   └── m2m100_418m_ct2/
├── configs/
│   └── default_config.yaml
└── README.txt

用户数据目录:
~/Documents/xling-matrix/        # Windows
~/Documents/xling-matrix/        # macOS
~/xling-matrix/                  # Linux

Distribution Strategy

1. 版本管理:

# core/version.py
__version__ = "0.1.0"
__build__ = "20260313"

def get_version() -> str:
    return f"{__version__}+{__build__}"

2. 更新检查 (Growth 阶段):

# infrastructure/update/update_checker.py
import requests

class UpdateChecker:
    """更新检查器"""

    UPDATE_URL = "https://updates.xling-matrix.com/version.json"

    def check_for_updates(self, current_version: str) -> dict | None:
        """检查是否有新版本"""
        try:
            response = requests.get(self.UPDATE_URL, timeout=5)
            response.raise_for_status()
            data = response.json()

            if self._is_newer(current_version, data["latest_version"]):
                return {
                    "has_update": True,
                    "latest_version": data["latest_version"],
                    "download_url": data["download_url"],
                    "release_notes": data.get("release_notes", ""),
                }
        except Exception:
            pass
        return None

    def _is_newer(self, current: str, latest: str) -> bool:
        """比较版本号"""
        from packaging import version
        return version.parse(latest) > version.parse(current)

Architecture Summary

本架构设计文档定义了序灵 Matrix 助手的完整技术架构,包括:

  • 分层架构: Presentation / Application / Domain / Infrastructure
  • 核心设计模式: Pipeline, State Machine, Repository, Observer
  • Crash-Safe 机制: 原子写入确保数据安全
  • GPU 加速: CTranslate2 + 动态批处理优化
  • 六模块流水线: Fingerprint → Cleaning → Terminology → Translation → Upload
  • 本地优先: 100% 本地处理,零数据泄露
  • 零授权费: 仅使用 MIT 协议依赖

本架构确保所有 AI 代理可以协同工作,编写一致、兼容的代码。


文档版本: 1.0 最后更新: 2026-03-13 状态: 完成 ✅