stepsCompleted: [1, 2, 3, 4, 5] inputDocuments: ['prd.md'] workflowType: 'architecture' project_name: '223-236-template-6' user_name: 'User' date: '2026-03-13' status: 'complete'
This document builds collaboratively through step-by-step discovery. Sections are appended as we work through each architectural decision together.
Functional Requirements:
项目包含 52 个功能需求,分为六个核心模块:
Non-Functional Requirements:
| 类别 | 关键要求 | 架构影响 |
|---|---|---|
| 性能 | 3000-5000 词/分钟 (RTX 3060) | 需要批处理优化、GPU 内存管理 |
| 可靠性 | Crash-Safe 原子写 | 所有持久化操作需使用 .tmp + fsync + rename 模式 |
| 安全性 | 零数据泄露 | 全流程本地处理,禁止数据外传 |
| 兼容性 | NVIDIA GTX 1650+ (4GB+ VRAM) | 需要优雅的 GPU 降级策略 |
| 许可证 | 零授权费依赖 | 所有依赖必须为标准库或 MIT 协议 |
Scale & Complexity:
硬约束:
外部依赖:
集成接口:
Python Desktop Application (PyQt6 + CTranslate2 GPU Inference)
基于项目需求分析,这是一个本地桌面应用,需要:
由于Python桌面应用领域没有统一的"启动模板"生态系统,我们评估了以下选项:
| 选项 | 优点 | 缺点 | 适用性 |
|---|---|---|---|
| 从零构建 | 完全控制,无技术债 | 需要手动配置所有工具 | ✅ 推荐 - 特定需求较多 |
| Python Boilerplate | 标准结构,包含测试/代码质量 | 针对Web/服务端优化 | ⚠️ 部分适用 |
| Cookiecutter模板 | 快速启动,最佳实践 | 需要定制化修改 | ⚠️ 部分适用 |
Rationale for Selection:
本项目有以下独特约束,标准模板无法满足:
项目初始化命令:
# 1. 创建项目目录结构
mkdir -p xling-matrix-assistant/src/xling_matrix/{core,modules,ui,infrastructure}
mkdir -p xling-matrix-assistant/tests/{unit,integration}
mkdir -p xling-matrix-assistant/{data,models,logs,docs}
# 2. 创建虚拟环境
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
# 3. 安装核心依赖
pip install PyQt6 ctranslate2 torch numpy requests pyyaml
# 4. 安装开发工具
pip install pytest pytest-qt pytest-cov black ruff mypy
Architectural Decisions Established:
Language & Runtime:
项目结构 (src layout):
xling-matrix-assistant/
├── src/
│ └── xling_matrix/
│ ├── __init__.py
│ ├── __main__.py # 应用入口点
│ ├── core/ # 核心领域模型
│ │ ├── __init__.py
│ │ ├── models.py # 数据模型
│ │ ├── state.py # 状态机
│ │ └── pipeline.py # 流水线编排
│ ├── modules/ # 六大核心模块
│ │ ├── __init__.py
│ │ ├── fingerprint/ # FR1-FR8
│ │ ├── cleaning/ # FR9-FR16
│ │ ├── terminology/ # FR17-FR24
│ │ ├── translation/ # FR25-FR33
│ │ └── upload/ # FR34-FR40
│ ├── ui/ # PyQt6 GUI
│ │ ├── __init__.py
│ │ ├── main_window.py
│ │ ├── widgets/ # 自定义控件
│ │ └── dialogs/ # 对话框
│ └── infrastructure/ # 基础设施层
│ ├── __init__.py
│ ├── storage.py # Crash-Safe 持久化
│ ├── gpu_manager.py # GPU 资源管理
│ ├── api_client.py # 外部 API 客户端
│ └── logger.py # 日志系统
├── tests/
│ ├── unit/
│ └── integration/
├── models/ # 翻译模型存储
├── data/ # 用户数据目录
├── logs/ # 日志目录
├── pyproject.toml # 项目配置
├── pyproject.toml # 打包配置
└── README.md
Build Tooling & Packaging:
# pyproject.toml
[project]
name = "xling-matrix-assistant"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = [
"PyQt6>=6.6.0",
"ctranslate2>=4.0.0",
"torch>=2.1.0",
"numpy>=1.24.0",
"requests>=2.31.0",
"pyyaml>=6.0.0",
]
[project.optional-dependencies]
dev = ["pytest>=7.4.0", "pytest-qt>=4.2.0", "pytest-cov>=4.1.0", "black>=23.12.0", "ruff>=0.1.0", "mypy>=1.7.0"]
[project.scripts]
xling-matrix = "xling_matrix.__main__:main"
[tool.black]
line-length = 100
target-version = ["py311"]
[tool.ruff]
line-length = 100
select = ["E", "F", "I", "N", "W"]
[tool.mypy]
python_version = "3.11"
strict = true
Testing Framework:
Development Experience:
GPU Inference Configuration (CTranslate2):
# 推荐配置
import ctranslate2
translator = ctranslate2.Translator(
"models/m2m100_418m_ct2/",
device="cuda", # GPU 加速
device_index=0, # 主 GPU
compute_type="float16", # Tensor Core 优化
inter_threads=4, # 并发批处理
)
# 批处理优化
batch_size = 16 # 根据显存调整 (RTX 3060: 16-32)
Note: 项目初始化应作为第一个实现故事执行。
Critical Decisions (Block Implementation):
Important Decisions (Shape Architecture):
Deferred Decisions (Post-MVP):
数据存储策略:
| 数据文件 | 格式 | 访问模式 | Crash-Safe 实现 |
|---|---|---|---|
| progress.json | JSON | 读写频繁 | 原子替换 + 锁机制 |
| novel_cleaned.json | JSON | 写入一次 | 原子写入 |
| terms_temp.json | JSON | 读写频繁 | 原子替换 + 锁机制 |
| novel_translated.json | JSON | 写入一次 | 原子写入 |
| upload_failed.jsonl | JSONL | 追加写入 | 原子追加 + 锁机制 |
| terms_library.json | JSON | 读写频繁 | 原子替换 + 锁机制 |
数据验证策略:
Crash-Safe 持久化实现:
# infrastructure/storage.py
import os
import fcntl
class AtomicWriter:
"""Crash-Safe 原子写工具"""
@staticmethod
def write(filepath: str, data: dict | str) -> None:
tmp_path = f"{filepath}.tmp"
# 写入临时文件
with open(tmp_path, 'w', encoding='utf-8') as f:
if isinstance(data, dict):
json.dump(data, f, ensure_ascii=False, indent=2)
else:
f.write(data)
f.flush() # 强制写入磁盘
os.fsync(f.fileno()) # 强制同步
# 原子重命名
os.replace(tmp_path, filepath)
不适用: 本地桌面应用,无需认证/授权机制
数据安全:
外部 API 集成:
# infrastructure/api_client.py
import requests
from typing import Dict, Optional
class PlatformAPIClient:
"""平台 API 客户端"""
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url
self.api_key = api_key
self.timeout = 30 # 30秒超时
def check_fingerprint(self, text: str) -> Dict:
"""指纹查重 API"""
response = requests.post(
f"{self.base_url}/api/fingerprint/check",
json={"text": text},
headers={"Authorization": f"Bearer {self.api_key}"},
timeout=self.timeout
)
response.raise_for_status()
return response.json()
def upload_chapter(self, chapter_data: Dict) -> Dict:
"""章节上传 API"""
response = requests.post(
f"{self.base_url}/api/chapters",
json=chapter_data,
headers={"Authorization": f"Bearer {self.api_key}"},
timeout=self.timeout
)
response.raise_for_status()
return response.json()
def deduct_cu(self, word_count: int) -> Dict:
"""CU 扣费 API"""
response = requests.post(
f"{self.base_url}/api/cu/deduct",
json={"words": word_count},
headers={"Authorization": f"Bearer {self.api_key}"},
timeout=self.timeout
)
response.raise_for_status()
return response.json()
重试策略:
PyQt6 ModelView 架构:
# ui/models/task_model.py
from PyQt6.QtCore import QAbstractTableModel, Qt
class TaskModel(QAbstractTableModel):
"""任务数据模型"""
def __init__(self):
super().__init__()
self._tasks = []
def rowCount(self, parent=None):
return len(self._tasks)
def columnCount(self, parent=None):
return 5 # work_id, status, progress, start_time, end_time
def data(self, index, role=Qt.ItemDataRole.DisplayRole):
if not index.isValid() or role != Qt.ItemDataRole.DisplayRole:
return None
return self._tasks[index.row()][index.column()]
def update_task(self, work_id: str, status: str, progress: int):
"""更新任务状态"""
row = self._find_row(work_id)
if row is not None:
self._tasks[row]['status'] = status
self._tasks[row]['progress'] = progress
self.dataChanged.emit(self.index(row, 0), self.index(row, 4))
# ui/main_window.py
from PyQt6.QtWidgets import QMainWindow, QTableView
from ui.models.task_model import TaskModel
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.task_model = TaskModel()
self.task_table = QTableView()
self.task_table.setModel(self.task_model)
进度通知机制 (Observer 模式):
# core/events.py
from PyQt6.QtCore import QObject, pyqtSignal
class ProgressEmitter(QObject):
"""进度事件发射器"""
stage_progress = pyqtSignal(str, int) # (work_id, percentage)
stage_completed = pyqtSignal(str, str) # (work_id, stage_name)
stage_failed = pyqtSignal(str, str, str) # (work_id, stage_name, error)
task_finished = pyqtSignal(str) # (work_id)
# modules/cleaning/cleaner.py
from core.events import ProgressEmitter
class TextCleaner:
def __init__(self, emitter: ProgressEmitter):
self.emitter = emitter
def clean(self, text: str, work_id: str) -> str:
# 执行清洗
cleaned = self._apply_rules(text)
# 发送进度通知
self.emitter.stage_progress.emit(work_id, 100)
self.emitter.stage_completed.emit(work_id, "cleaning")
return cleaned
打包策略:
# pyproject.toml
[build-system]
requires = ["setuptools>=68.0", "wheel", "pyinstaller>=6.0"]
build-backend = "setuptools.build_meta"
[tool.pyinstaller]
name = "序灵Matrix助手"
console = true
onefile = true
icon = "assets/icon.ico"
add-data = [
("models/*", "models/"),
("assets/*", "assets/")
]
hiddenimports = [
"PyQt6.sip",
"ctranslate2"
]
环境配置:
| 配置项 | 位置 | 说明 |
|---|---|---|
| 配置文件 | ~/.config/xling-matrix/config.yaml |
API密钥、GPU设置 |
| 数据目录 | ~/Documents/xling-matrix/ |
输入/输出文件 |
| 日志目录 | ~/Documents/xling-matrix/logs/ |
运行日志 |
| 模型目录 | ~/.local/share/xling-matrix/models/ |
翻译模型 |
日志系统:
# infrastructure/logger.py
import logging
from pathlib import Path
def setup_logger(name: str, log_dir: Path) -> logging.Logger:
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
# 文件处理器
file_handler = logging.FileHandler(
log_dir / f"{name}.log",
encoding='utf-8'
)
file_handler.setFormatter(
logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
)
logger.addHandler(file_handler)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(
logging.Formatter('%(levelname)s: %(message)s')
)
logger.addHandler(console_handler)
return logger
Implementation Sequence:
Cross-Component Dependencies:
┌─────────────┐
│ GUI UI │
└──────┬──────┘
│ Observer
┌──────▼──────┐
│ Scheduler │
└──────┬──────┘
│
┌──────────────────┼──────────────────┐
│ │ │
┌────▼────┐ ┌────▼────┐ ┌────▼────┐
│Fingerprint│ │Cleaning │ │Translation│
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└────────┬────────┴──────────────────┘
│
┌───────▼────────┐
│ Storage Layer │
│ (Crash-Safe) │
└─────────────────┘
Critical Conflict Points Identified: 8 个领域需要一致性规则以确保 AI 代理代码兼容
1. Pipeline 模式(翻译流水线)
所有翻译任务必须通过统一的 Pipeline 执行:
# core/pipeline.py
from dataclasses import dataclass
from typing import Protocol
@dataclass
class PipelineContext:
"""流水线上下文"""
work_id: str
input_file: str
output_dir: str
current_stage: str
error: str | None = None
metadata: dict = None
class PipelineStage(Protocol):
"""流水线阶段协议"""
def name(self) -> str:
"""返回阶段名称"""
...
def execute(self, context: PipelineContext) -> PipelineContext:
"""执行阶段逻辑"""
...
class TranslationPipeline:
"""翻译流水线"""
def __init__(self):
self.stages: list[PipelineStage] = []
def add_stage(self, stage: PipelineStage) -> None:
self.stages.append(stage)
def execute(self, context: PipelineContext) -> PipelineContext:
for stage in self.stages:
context.current_stage = stage.name()
try:
context = stage.execute(context)
if context.error:
context.error = f"{stage.name()}: {context.error}"
return context
except Exception as e:
context.error = f"{stage.name()}: {str(e)}"
return context
return context
2. State Machine(任务状态)
任务状态转换必须遵循状态机规则:
# core/state.py
from enum import Enum
from dataclasses import dataclass
class TaskState(Enum):
"""任务状态枚举"""
PENDING = "pending"
RUNNING = "running"
PAUSED = "paused"
SUCCESS = "success"
FAILED = "failed"
@dataclass
class TaskTransition:
"""状态转换"""
from_state: TaskState
to_state: TaskState
is_valid: bool
error: str | None = None
class TaskStateMachine:
"""任务状态机"""
# 允许的状态转换
VALID_TRANSITIONS = {
TaskState.PENDING: [TaskState.RUNNING],
TaskState.RUNNING: [TaskState.PAUSED, TaskState.SUCCESS, TaskState.FAILED],
TaskState.PAUSED: [TaskState.RUNNING, TaskState.FAILED],
TaskState.SUCCESS: [], # 终态
TaskState.FAILED: [TaskState.PENDING], # 可重试
}
def can_transition(self, from_state: TaskState, to_state: TaskState) -> bool:
return to_state in self.VALID_TRANSITIONS.get(from_state, [])
def transition(self, current: TaskState, target: TaskState) -> TaskTransition:
if not self.can_transition(current, target):
valid_targets = ", ".join(s.value for s in self.VALID_TRANSITIONS.get(current, []))
return TaskTransition(current, target, False,
f"Invalid transition: {current.value} -> {target.value}. Valid targets: {valid_targets}")
return TaskTransition(current, target, True)
3. Repository 模式(数据持久化)
所有数据访问必须通过 Repository 接口:
# core/repository.py
from abc import ABC, abstractmethod
from typing import TypeVar, Generic
T = TypeVar('T')
class Repository(ABC, Generic[T]):
"""Repository 接口"""
@abstractmethod
def save(self, entity: T) -> None:
"""保存实体"""
pass
@abstractmethod
def load(self, id: str) -> T | None:
"""加载实体"""
pass
# infrastructure/repositories/progress_repository.py
from core.repository import Repository
from infrastructure.storage import AtomicWriter
class CrashSafeProgressRepository(Repository[Progress]):
"""Crash-Safe 进度仓储"""
def __init__(self, file_path: str):
self.file_path = file_path
def save(self, progress: Progress) -> None:
AtomicWriter.write(self.file_path, progress.to_dict())
def load(self, work_id: str) -> Progress | None:
if not os.path.exists(self.file_path):
return None
with open(self.file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return Progress.from_dict(data.get(work_id))
4. Observer 模式(进度通知)
使用 PyQt6 信号槽机制实现进度通知:
# core/events.py
from PyQt6.QtCore import QObject, pyqtSignal
from typing import Protocol
class ProgressObserver(Protocol):
"""进度观察者协议"""
def on_stage_start(self, work_id: str, stage: str) -> None:
"""阶段开始"""
...
def on_stage_progress(self, work_id: str, stage: str, percent: int) -> None:
"""阶段进度"""
...
def on_stage_complete(self, work_id: str, stage: str) -> None:
"""阶段完成"""
...
def on_stage_error(self, work_id: str, stage: str, error: str) -> None:
"""阶段错误"""
...
class ProgressEmitter(QObject):
"""进度事件发射器"""
# 定义信号
stage_started = pyqtSignal(str, str) # (work_id, stage)
stage_progress = pyqtSignal(str, str, int) # (work_id, stage, percent)
stage_completed = pyqtSignal(str, str) # (work_id, stage)
stage_failed = pyqtSignal(str, str, str) # (work_id, stage, error)
task_finished = pyqtSignal(str, str) # (work_id, final_state)
# 使用示例
class TranslationStage:
def __init__(self, emitter: ProgressEmitter):
self.emitter = emitter
def execute(self, context: PipelineContext) -> PipelineContext:
self.emitter.stage_started.emit(context.work_id, "translation")
try:
for i, batch in enumerate(batches):
# 执行翻译
self._translate_batch(batch)
progress = int((i + 1) / len(batches) * 100)
self.emitter.stage_progress.emit(context.work_id, "translation", progress)
self.emitter.stage_completed.emit(context.work_id, "translation")
return context
except Exception as e:
self.emitter.stage_failed.emit(context.work_id, "translation", str(e))
context.error = str(e)
return context
代码命名约定:
| 类别 | 约定 | 示例 |
|---|---|---|
| 类名 | PascalCase | TranslationPipeline, TaskStateMachine |
| 函数名 | snake_case | execute_pipeline(), load_progress() |
| 变量名 | snake_case | work_id, batch_size |
| 常量 | UPPER_SNAKE_CASE | MAX_BATCH_SIZE, DEFAULT_TIMEOUT |
| 私有成员 | 前缀下划线 | _internal_state, _helper() |
| 协议/接口 | PascalCase + Protocol 后缀 | ProgressObserver, Repository |
文件命名约定:
| 类型 | 命名 | 示例 |
|---|---|---|
| 模块文件 | snake_case.py | translation_stage.py, progress_repository.py |
| 测试文件 | test_.py | test_pipeline.py, test_translation.py |
| 包目录 | snake_case | translation/, cleaning/ |
| 级别 | 用途 | 示例 |
|---|---|---|
| DEBUG | 详细调试信息 | "Batch size: 16, GPU memory: 3.2GB" |
| INFO | 正常操作流程 | "Stage 'translation' started for work_id: abc123" |
| WARNING | 可恢复的问题 | "GPU memory low, reducing batch size to 8" |
| ERROR | 操作失败但可恢复 | "API request failed, retrying (1/3)" |
| CRITICAL | 严重错误需人工介入 | "GPU OOM, cannot continue" |
Crash-Safe 写入模式(强制执行):
# 所有持久化操作必须使用此模式
from infrastructure.storage import AtomicWriter
# 正确示例
def save_progress(progress: Progress) -> None:
AtomicWriter.write("progress.json", progress.to_dict())
# 错误示例 - 禁止直接写入
def save_progress_WRONG(progress: Progress) -> None:
with open("progress.json", "w") as f:
json.dump(progress.to_dict(), f) # ❌ 非 Crash-Safe
错误处理模式:
# 统一错误处理流程
def execute_stage(context: PipelineContext) -> PipelineContext:
try:
# 业务逻辑
result = do_work(context)
return context
except GPUOutOfMemoryError as e:
# 特定错误处理
return handle_gpu_oom(context, e)
except APIError as e:
# 重试逻辑
return retry_with_backoff(context, e)
except Exception as e:
# 通用错误处理
context.error = str(e)
logger.error(f"Stage failed: {e}", exc_info=True)
return context
GPU 资源管理模式:
# infrastructure/gpu/manager.py
import ctranslate2
from typing import ContextManager
class GPUManager:
"""GPU 资源管理器"""
_instance = None
_translator = None
@classmethod
def get_instance(cls) -> 'GPUManager':
if cls._instance is None:
cls._instance = cls()
return cls._instance
def initialize(self, model_path: str) -> None:
"""初始化 GPU 翻译器"""
if self._translator is None:
self._translator = ctranslate2.Translator(
model_path,
device=self._detect_device(),
device_index=0,
compute_type="float16",
inter_threads=4
)
def _detect_device(self) -> str:
"""检测可用设备"""
try:
import torch
if torch.cuda.is_available():
return "cuda"
except:
pass
return "cpu" # 降级到 CPU
def translate_batch(self, tokens: list[list[str]]) -> list[list[str]]:
"""执行批处理翻译"""
return self._translator.translate_batch(tokens)
All AI Agents MUST:
AtomicWriterTaskStateMachine 验证Repository 协议ProgressEmitter 发送进度事件ErrorInfo 结构xling-matrix-assistant/
├── src/
│ └── xling_matrix/
│ ├── __init__.py
│ ├── __main__.py
│ │
│ ├── core/ # 核心领域层
│ │ ├── __init__.py
│ │ ├── models.py # 数据模型定义
│ │ ├── state.py # 状态机实现
│ │ ├── pipeline.py # 流水线编排
│ │ ├── events.py # 事件系统
│ │ └── repository.py # Repository 接口
│ │
│ ├── modules/ # 业务模块层
│ │ ├── __init__.py
│ │ │
│ │ ├── fingerprint/ # 指纹模块 (FR1-FR8)
│ │ │ ├── __init__.py
│ │ │ ├── fingerprint_stage.py # 指纹查重阶段
│ │ │ ├── fingerprint_service.py # 指纹服务
│ │ │ └── models.py # 指纹数据模型
│ │ │
│ │ ├── cleaning/ # 清洗模块 (FR9-FR16)
│ │ │ ├── __init__.py
│ │ │ ├── cleaning_stage.py
│ │ │ ├── rule_engine.py # 正则替换引擎
│ │ │ ├── formatter.py # 格式标准化
│ │ │ └── models.py
│ │ │
│ │ ├── terminology/ # 术语模块 (FR17-FR24)
│ │ │ ├── __init__.py
│ │ │ ├── terminology_stage.py
│ │ │ ├── extractor.py # 术语提取器
│ │ │ ├── library.py # 术语库管理
│ │ │ └── models.py
│ │ │
│ │ ├── translation/ # 翻译模块 (FR25-FR33)
│ │ │ ├── __init__.py
│ │ │ ├── translation_stage.py
│ │ │ ├── translator.py # CTranslate2 封装
│ │ │ ├── batch_processor.py # 批处理优化
│ │ │ └── models.py
│ │ │
│ │ └── upload/ # 上传模块 (FR34-FR40)
│ │ ├── __init__.py
│ │ ├── upload_stage.py
│ │ ├── uploader.py # 平台上传
│ │ └── models.py
│ │
│ ├── ui/ # 表示层
│ │ ├── __init__.py
│ │ ├── main_window.py # 主窗口
│ │ ├── widgets/
│ │ │ ├── __init__.py
│ │ │ ├── task_list_widget.py # 任务列表
│ │ │ ├── progress_widget.py # 进度显示
│ │ │ └── log_widget.py # 日志显示
│ │ ├── dialogs/
│ │ │ ├── __init__.py
│ │ │ ├── new_task_dialog.py # 新建任务对话框
│ │ │ ├── settings_dialog.py # 设置对话框
│ │ │ └── fingerprint_dialog.py # 指纹审核对话框
│ │ └── models/
│ │ ├── __init__.py
│ │ └── task_model.py # 任务数据模型
│ │
│ └── infrastructure/ # 基础设施层
│ ├── __init__.py
│ ├── storage/
│ │ ├── __init__.py
│ │ ├── atomic_writer.py # Crash-Safe 写入
│ │ └── file_lock.py # 文件锁机制
│ ├── gpu/
│ │ ├── __init__.py
│ │ └── manager.py # GPU 资源管理
│ ├── network/
│ │ ├── __init__.py
│ │ ├── api_client.py # 平台 API 客户端
│ │ └── retry.py # 重试策略
│ └── logging/
│ ├── __init__.py
│ └── logger.py # 日志配置
│
├── tests/
│ ├── __init__.py
│ ├── conftest.py # pytest 配置
│ │
│ ├── unit/
│ │ ├── test_core/
│ │ │ ├── __init__.py
│ │ │ ├── test_pipeline.py
│ │ │ ├── test_state.py
│ │ │ └── test_events.py
│ │ ├── test_modules/
│ │ │ ├── test_fingerprint.py
│ │ │ ├── test_cleaning.py
│ │ │ ├── test_terminology.py
│ │ │ ├── test_translation.py
│ │ │ └── test_upload.py
│ │ └── test_infrastructure/
│ │ ├── test_storage.py
│ │ ├── test_gpu_manager.py
│ │ └── test_api_client.py
│ │
│ ├── integration/
│ │ ├── __init__.py
│ │ ├── test_workflow.py # 完整工作流测试
│ │ └── test_api_integration.py
│ │
│ └── fixtures/
│ ├── novels/
│ │ └── sample_chinese.txt
│ └── expected/
│ └── sample_translated.json
│
├── models/ # 翻译模型文件
│ └── m2m100_418m_ct2/
│
├── assets/ # 资源文件
│ ├── icons/
│ │ └── app_icon.ico
│ └── config/
│ └── default_config.yaml
│
├── docs/
│ ├── architecture.md # 架构文档
│ ├── api.md # API 文档
│ └── user_guide.md # 用户指南
│
├── pyproject.toml # 项目配置
├── README.md
├── LICENSE
└── .gitignore
┌─────────────────────────────────────────────────────────────┐
│ UI Layer │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ MainWindow │ │ Widgets │ │ Dialogs │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
└─────────┼──────────────────┼──────────────────┼─────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────┐
│ Application Layer │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Scheduler │ │ Workflows │ │ State Machine│ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
└─────────┼──────────────────┼──────────────────┼─────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────┐
│ Domain Layer │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌─────────┐ │
│ │Fingerprint │ │ Cleaning │ │Terminology │ │Translation│ │
│ └────┬───────┘ └────┬───────┘ └────┬───────┘ └────┬────┘ │
│ └────────────────┴────────────────┴────────┘ │
│ │ │
│ ┌─────▼─────┐ │
│ │ Core │ │
│ │(Pipeline, │ │
│ │ Events, │ │
│ │ Models) │ │
│ └───────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Infrastructure Layer │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌─────────┐ │
│ │ Storage │ │ GPU │ │ Network │ │ Logging │ │
│ │(Crash-Safe)│ │ Manager │ │ API Client│ │ │ │
│ └────────────┘ └────────────┘ └────────────┘ └─────────┘ │
└─────────────────────────────────────────────────────────────┘
Pipeline Stage 接口:
# core/pipeline.py
class PipelineStage(Protocol):
"""流水线阶段协议 - 所有阶段必须实现"""
def name(self) -> str:
"""返回阶段唯一标识符"""
...
def execute(self, context: PipelineContext) -> PipelineContext:
"""执行阶段逻辑
Args:
context: 流水线上下文
Returns:
更新后的上下文。如果失败,设置 context.error
"""
...
def estimate_progress(self, context: PipelineContext) -> int:
"""估算当前进度百分比"""
...
Repository 接口:
# core/repository.py
class Repository(ABC, Generic[T]):
"""数据仓储接口 - 所有数据访问必须实现"""
@abstractmethod
def save(self, entity: T) -> None:
"""保存实体(使用 Crash-Safe 写入)"""
pass
@abstractmethod
def load(self, id: str) -> T | None:
"""加载实体"""
pass
@abstractmethod
def delete(self, id: str) -> bool:
"""删除实体"""
pass
平台 API 客户端接口:
# infrastructure/network/platform_api.py
class PlatformAPIClient:
"""平台 API 客户端 - 对接外部平台"""
BASE_URL: str
API_KEY: str
TIMEOUT: int = 30
# 指纹查重 API
def check_fingerprint(self, text: str) -> FingerprintResult:
"""检查文本指纹
Args:
text: 待检查的文本
Returns:
FingerprintResult: 包含相似度、匹配章节等
Raises:
APIConnectionError: 网络连接失败
APITimeoutError: 请求超时
APIError: API 返回错误
"""
...
# 章节上传 API
def upload_chapter(self, chapter: ChapterData) -> UploadResult:
"""上传翻译章节
Args:
chapter: 章节数据(标题、内容、字数等)
Returns:
UploadResult: 包含章节 ID、URL 等
Raises:
APIConnectionError, APITimeoutError, APIError
"""
...
# CU 扣费 API
def deduct_cu(self, word_count: int) -> DeductResult:
"""扣除 CU
Args:
word_count: 字数
Returns:
DeductResult: 包含剩余 CU
"""
...
# 健康检查
def health_check(self) -> bool:
"""检查 API 连接状态"""
...
API 请求/响应格式:
1. 指纹查重 API
POST /api/fingerprint/check
Content-Type: application/json
Authorization: Bearer {api_key}
# Request
{
"fingerprint": "md5hash",
"sample": "第一章样本文本...",
"work_id": "uuid"
}
# Response
{
"exists": false,
"work_id": "uuid",
"similarity": 0.0,
"matches": []
}
2. 上传章节 API
POST /api/chapters
Content-Type: application/json
Authorization: Bearer {api_key}
# Request
{
"work_id": "uuid",
"chapter_id": "Chapter 0001",
"title": "第一章 开始",
"content_en": "Chapter 1 The Beginning...",
"word_count": 1234,
"source_language": "zh",
"target_language": "en"
}
# Response
{
"success": true,
"chapter_id": "Chapter 0001",
"chapter_url": "https://platform.com/novels/uuid/chapters/Chapter%200001",
"uploaded_at": "2026-03-13T12:00:00Z"
}
3. CU 扣费 API
POST /api/cu/deduct
Content-Type: application/json
Authorization: Bearer {api_key}
# Request
{
"work_id": "uuid",
"words": 1234,
"chapter_id": "Chapter 0001"
}
# Response
{
"success": true,
"deducted": 12.34,
"balance": 987.66,
"transaction_id": "txn_abc123"
}
错误响应格式:
# Error Response
{
"error": {
"code": "INVALID_API_KEY",
"message": "API密钥无效或已过期",
"detail": "请联系客服获取新的API密钥"
}
}
# core/models.py
from dataclasses import dataclass, field
from datetime import datetime
from typing import Literal
from enum import Enum
class TaskState(Enum):
PENDING = "pending"
RUNNING = "running"
PAUSED = "paused"
SUCCESS = "success"
FAILED = "failed"
class StageStatus(Enum):
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class StageProgress:
"""阶段进度"""
status: StageStatus
progress: int # 0-100
error: str | None = None
started_at: datetime | None = None
completed_at: datetime | None = None
@dataclass
class Progress:
"""任务进度"""
work_id: str
state: TaskState
current_stage: str
stages: dict[str, StageProgress] = field(default_factory=dict)
input_file: str = ""
output_dir: str = ""
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
def to_dict(self) -> dict:
"""序列化为字典"""
return {
"version": "1.0",
"work_id": self.work_id,
"state": self.state.value,
"current_stage": self.current_stage,
"stages": {
name: {
"status": stage.status.value,
"progress": stage.progress,
"error": stage.error,
"started_at": stage.started_at.isoformat() if stage.started_at else None,
"completed_at": stage.completed_at.isoformat() if stage.completed_at else None,
}
for name, stage in self.stages.items()
},
"input_file": self.input_file,
"output_dir": self.output_dir,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
}
@classmethod
def from_dict(cls, data: dict) -> 'Progress':
"""从字典反序列化"""
stages = {
name: StageProgress(
status=StageStatus(stage["status"]),
progress=stage["progress"],
error=stage.get("error"),
started_at=datetime.fromisoformat(stage["started_at"]) if stage.get("started_at") else None,
completed_at=datetime.fromisoformat(stage["completed_at"]) if stage.get("completed_at") else None,
)
for name, stage in data.get("stages", {}).items()
}
return cls(
work_id=data["work_id"],
state=TaskState(data["state"]),
current_stage=data["current_stage"],
stages=stages,
input_file=data.get("input_file", ""),
output_dir=data.get("output_dir", ""),
created_at=datetime.fromisoformat(data["created_at"]),
updated_at=datetime.fromisoformat(data["updated_at"]),
)
@dataclass
class Term:
"""术语条目"""
source: str # 原文
target: str # 译文
category: str = "" # 分类
locked: bool = False # 是否锁定
@dataclass
class TerminologyLibrary:
"""术语库"""
version: str = "1.0"
terms: list[Term] = field(default_factory=list)
@dataclass
class ChapterData:
"""章节数据"""
title: str
content: str
word_count: int
source_language: str = "zh"
target_language: str = "en"
@dataclass
class Chapter:
"""章节实体"""
chapter_id: str # "Chapter 0001"
part_index: int # 卷索引
title_src: str # 原文标题
content: str # 原文内容
content_en: str | None = None # 译文内容
word_count: int = 0
translated_at: datetime | None = None
@dataclass
class Term:
"""术语条目"""
source: str # 原文
translation: str | None # 译文
count: int = 0 # 出现次数
chapters: int = 0 # 涉及章节数
locked: bool = False # 是否锁定
def to_dict(self) -> dict:
return {
"source": self.source,
"translation": self.translation,
"count": self.count,
"chapters": self.chapters,
"locked": self.locked
}
@classmethod
def from_dict(cls, data: dict) -> 'Term':
return cls(
source=data["source"],
translation=data.get("translation"),
count=data.get("count", 0),
chapters=data.get("chapters", 0),
locked=data.get("locked", False)
)
指纹数据模型:
@dataclass
class FingerprintData:
"""指纹查重数据"""
work_id: str
fingerprint: str # MD5 hash
sample: str # 样本文本(前1000字)
exists: bool = False
similarity: float = 0.0
matches: list[str] = field(default_factory=list) # 匹配的 work_id 列表
@dataclass
class FingerprintResult:
"""指纹查重结果"""
exists: bool
work_id: str
similarity: float
matches: list[dict] = field(default_factory=list)
# matches format: [{"work_id": "uuid", "similarity": 0.95, "chapter": "Chapter 0001"}]
上传队列模型:
@dataclass
class UploadQueueItem:
"""上传队列项"""
work_id: str
chapter_id: str
title: str
content_en: str
word_count: int
retry_count: int = 0
max_retries: int = 3
created_at: datetime = field(default_factory=datetime.now)
@dataclass
class UploadFailedItem:
"""上传失败项(JSONL 格式)"""
work_id: str
chapter_id: str
error_code: str
error_message: str
failed_at: datetime = field(default_factory=datetime.now)
retry_count: int = 0
1. 本地数据存储策略:
2. API 密钥管理:
# infrastructure/security/secret_manager.py
import keyring
from typing import Optional
class SecretManager:
"""密钥管理器 - 使用系统密钥环"""
SERVICE_NAME = "xling-matrix-assistant"
def set_api_key(self, key: str) -> None:
"""存储 API 密钥"""
keyring.set_password(self.SERVICE_NAME, "platform_api", key)
def get_api_key(self) -> Optional[str]:
"""获取 API 密钥"""
return keyring.get_password(self.SERVICE_NAME, "platform_api")
def delete_api_key(self) -> None:
"""删除 API 密钥"""
keyring.delete_password(self.SERVICE_NAME, "platform_api")
3. 文件权限控制:
# infrastructure/storage/permissions.py
import os
import stat
def set_secure_permissions(filepath: str) -> None:
"""设置安全的文件权限(仅用户可读写)"""
os.chmod(filepath, stat.S_IRUSR | stat.S_IWUSR)
依赖许可证验证:
# tools/license_checker.py
import subprocess
import json
ALLOWED_LICENSES = {"MIT", "Apache-2.0", "BSD-3-Clause", "PSF-2.0"}
BANNED_LICENSES = {"GPL", "AGPL", "LGPL", "SSPL", "CPAL"}
def check_dependency_licenses() -> dict:
"""检查所有依赖的许可证"""
result = subprocess.run(
["pip", "show", "--json"],
capture_output=True,
text=True
)
packages = json.loads(result.stdout)
issues = []
for pkg in packages:
license_ = pkg.get("License", "UNKNOWN")
if any(banned in license_ for banned in BANNED_LICENSES):
issues.append({
"package": pkg["Name"],
"license": license_,
"severity": "BLOCKING",
"reason": "Contains GPL contamination"
})
elif license_ not in ALLOWED_LICENSES and license_ != "UNKNOWN":
issues.append({
"package": pkg["Name"],
"license": license_,
"severity": "WARNING",
"reason": "License not in whitelist"
})
return {"valid": len(issues) == 0, "issues": issues}
4. 许可证管理(Growth 阶段):
# infrastructure/security/license_manager.py
import hashlib
import platform
import requests
class LicenseManager:
"""许可证管理器 - 硬件绑定与在线激活验证"""
def generate_fingerprint(self) -> str:
"""生成硬件指纹(用于软件绑定)"""
# 获取硬件信息
machine_id = platform.node()
cpu_info = platform.processor()
mac_address = self._get_mac_address()
# 组合生成指纹
fingerprint_data = f"{machine_id}:{cpu_info}:{mac_address}"
return hashlib.md5(fingerprint_data.encode()).hexdigest()
def _get_mac_address(self) -> str:
"""获取本机 MAC 地址"""
try:
import uuid
return ':'.join(['{:02x}'.format((uuid.getnode() >> elements) & 0xff)
for elements in range(0, 2*6, 8)][::-1])
except:
return "unknown"
def verify_activation(self, activation_key: str) -> bool:
"""在线验证激活密钥
Args:
activation_key: 用户输入的激活密钥
Returns:
bool: 激活是否有效
"""
try:
fingerprint = self.generate_fingerprint()
response = requests.post(
"https://license.xling-matrix.com/verify",
json={
"activation_key": activation_key,
"fingerprint": fingerprint,
"version": "0.1.0"
},
timeout=10
)
response.raise_for_status()
return response.json().get("valid", False)
except Exception:
return False
def check_expiration(self, activation_key: str) -> dict | None:
"""检查激活是否过期"""
try:
response = requests.post(
"https://license.xling-matrix.com/check",
json={"activation_key": activation_key},
timeout=10
)
response.raise_for_status()
data = response.json()
return {
"expired": data.get("expired", False),
"expires_at": data.get("expires_at"),
"days_remaining": data.get("days_remaining")
}
except Exception:
return None
def activate_offline(self, activation_key: str, max_credits: int = 1000) -> bool:
"""离线激活(本地验证签名)"""
# TODO: 实现离线激活逻辑(需要服务器生成签名密钥对)
return True
激活状态存储:
# infrastructure/storage/license_storage.py
import json
from pathlib import Path
class LicenseStorage:
"""激活状态存储"""
ACTIVATION_FILE = Path.home() / ".config" / "xling-matrix" / "activation.json"
def save_activation(self, activation_key: str, fingerprint: str) -> None:
"""保存激活信息"""
data = {
"activation_key": activation_key,
"fingerprint": fingerprint,
"activated_at": datetime.now().isoformat(),
"version": "0.1.0"
}
self.ACTIVATION_FILE.parent.mkdir(parents=True, exist_ok=True)
AtomicWriter.write(str(self.ACTIVATION_FILE), data)
def load_activation(self) -> dict | None:
"""加载激活信息"""
if not self.ACTIVATION_FILE.exists():
return None
with open(self.ACTIVATION_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
def is_activated(self) -> bool:
"""检查是否已激活"""
activation = self.load_activation()
if not activation:
return False
# 检查硬件指纹是否匹配
current_fingerprint = LicenseManager().generate_fingerprint()
return activation.get("fingerprint") == current_fingerprint
许可证验证流程:
启动 → 检查本地激活 → [无激活] 显示激活对话框
↓
[有激活] 检查硬件指纹 → [不匹配] 重新激活
↓
[匹配] 检查过期 → [已过期] 提示续费
↓
[有效] 验证 CU 余额 → [不足] 提示充值
↓
[有效] 允许使用
1. 批处理策略:
# modules/translation/batch_processor.py
class BatchProcessor:
"""批处理优化器"""
def __init__(self, max_tokens: int = 4096):
self.max_tokens = max_tokens
def create_batches(self, texts: list[str]) -> list[list[str]]:
"""将文本分割为最优批次
策略:
1. 按 token 数量分组
2. 每批接近 max_tokens 但不超过
3. 相邻文本尽量在同一批(保持上下文)
"""
batches = []
current_batch = []
current_tokens = 0
for text in texts:
tokens = self._count_tokens(text)
if current_tokens + tokens > self.max_tokens and current_batch:
batches.append(current_batch)
current_batch = [text]
current_tokens = tokens
else:
current_batch.append(text)
current_tokens += tokens
if current_batch:
batches.append(current_batch)
return batches
def _count_tokens(self, text: str) -> int:
"""估算 token 数量(中文约 1.5 字符/token)"""
return int(len(text) / 1.5)
2. 动态批次大小调整:
# infrastructure/gpu/batch_optimizer.py
import torch
class BatchSizeOptimizer:
"""动态批次大小优化器"""
def __init__(self, initial_size: int = 16):
self.current_size = initial_size
self.min_size = 4
self.max_size = 32
def adjust_for_memory(self, oom_occurred: bool) -> int:
"""根据显存使用情况调整批次大小"""
if oom_occurred:
self.current_size = max(self.min_size, self.current_size // 2)
else:
# 逐步增加以找到最优值
self.current_size = min(self.max_size, int(self.current_size * 1.2))
return self.current_size
def get_memory_info(self) -> dict:
"""获取 GPU 显存信息"""
if not torch.cuda.is_available():
return {"available": False}
return {
"available": True,
"total_gb": torch.cuda.get_device_properties(0).total_memory / 1e9,
"allocated_gb": torch.cuda.memory_allocated(0) / 1e9,
"free_gb": (torch.cuda.get_device_properties(0).total_memory -
torch.cuda.memory_allocated(0)) / 1e9,
}
1. 增量进度保存:
# infrastructure/storage/incremental.py
class IncrementalProgressSaver:
"""增量进度保存器 - 减少磁盘写入"""
def __init__(self, threshold: int = 5):
self.threshold = threshold # 进度变化超过 5% 才保存
self.last_saved = 0
def should_save(self, current_progress: int) -> bool:
return abs(current_progress - self.last_saved) >= self.threshold
def mark_saved(self, progress: int) -> None:
self.last_saved = progress
2. 文件读取优化:
# infrastructure/storage/chunked_reader.py
class ChunkedFileReader:
"""分块文件读取器 - 支持大文件"""
def __init__(self, chunk_size: int = 8192):
self.chunk_size = chunk_size
def read_by_paragraphs(self, filepath: str) -> list[str]:
"""按段落读取文件(更适合小说)"""
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
# 按双换行符分割段落
paragraphs = [p.strip() for p in content.split('\n\n') if p.strip()]
return paragraphs
1. PyInstaller 配置:
# build/pyinstaller_spec.py
import sys
from PyInstaller.utils.hooks import collect_data_files, collect_submodules
block_cipher = None
datas = [
('models', 'models'),
('assets', 'assets'),
]
hiddenimports = [
'PyQt6.sip',
'ctranslate2',
'torch',
]
a = Analysis(
['src/xling_matrix/__main__.py'],
pathex=[],
binaries=[],
datas=datas,
hiddenimports=hiddenimports,
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='序灵Matrix助手',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
icon='assets/icons/app_icon.ico',
)
2. 安装程序配置:
安装结构:
序灵Matrix助手/
├── 序灵Matrix助手.exe # 主程序
├── models/ # 翻译模型(首次运行时下载)
│ └── m2m100_418m_ct2/
├── configs/
│ └── default_config.yaml
└── README.txt
用户数据目录:
~/Documents/xling-matrix/ # Windows
~/Documents/xling-matrix/ # macOS
~/xling-matrix/ # Linux
1. 版本管理:
# core/version.py
__version__ = "0.1.0"
__build__ = "20260313"
def get_version() -> str:
return f"{__version__}+{__build__}"
2. 更新检查 (Growth 阶段):
# infrastructure/update/update_checker.py
import requests
class UpdateChecker:
"""更新检查器"""
UPDATE_URL = "https://updates.xling-matrix.com/version.json"
def check_for_updates(self, current_version: str) -> dict | None:
"""检查是否有新版本"""
try:
response = requests.get(self.UPDATE_URL, timeout=5)
response.raise_for_status()
data = response.json()
if self._is_newer(current_version, data["latest_version"]):
return {
"has_update": True,
"latest_version": data["latest_version"],
"download_url": data["download_url"],
"release_notes": data.get("release_notes", ""),
}
except Exception:
pass
return None
def _is_newer(self, current: str, latest: str) -> bool:
"""比较版本号"""
from packaging import version
return version.parse(latest) > version.parse(current)
本架构设计文档定义了序灵 Matrix 助手的完整技术架构,包括:
本架构确保所有 AI 代理可以协同工作,编写一致、兼容的代码。
文档版本: 1.0 最后更新: 2026-03-13 状态: 完成 ✅