|
@@ -0,0 +1,349 @@
|
|
|
|
|
+"""
|
|
|
|
|
+Unit tests for the failure list manager module.
|
|
|
|
|
+
|
|
|
|
|
+Tests cover failure list recording, export, and retry functionality.
|
|
|
|
|
+"""
|
|
|
|
|
+
|
|
|
|
|
+import sys
|
|
|
|
|
+from unittest.mock import Mock
|
|
|
|
|
+from pathlib import Path
|
|
|
|
|
+import tempfile
|
|
|
|
|
+import json
|
|
|
|
|
+
|
|
|
|
|
+# Mock torch and transformers before importing
|
|
|
|
|
+sys_mock = Mock()
|
|
|
|
|
+sys.modules["torch"] = sys_mock
|
|
|
|
|
+sys.modules["transformers"] = sys_mock
|
|
|
|
|
+
|
|
|
|
|
+import pytest
|
|
|
|
|
+
|
|
|
|
|
+from src.repository.failure_list import (
|
|
|
|
|
+ FailureListManager,
|
|
|
|
|
+ FailedTranslation,
|
|
|
|
|
+)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class TestFailedTranslation:
|
|
|
|
|
+ """Test cases for FailedTranslation dataclass."""
|
|
|
|
|
+
|
|
|
|
|
+ def test_create_failed_translation(self):
|
|
|
|
|
+ """Test creating a failed translation record."""
|
|
|
|
|
+ failure = FailedTranslation(
|
|
|
|
|
+ work_id="test_work",
|
|
|
|
|
+ chapter_index=5,
|
|
|
|
|
+ error_type="ValueError",
|
|
|
|
|
+ error_message="Test error",
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ assert failure.work_id == "test_work"
|
|
|
|
|
+ assert failure.chapter_index == 5
|
|
|
|
|
+ assert failure.error_type == "ValueError"
|
|
|
|
|
+ assert failure.error_message == "Test error"
|
|
|
|
|
+ assert failure.resolved is False
|
|
|
|
|
+ assert failure.retry_count == 0
|
|
|
|
|
+
|
|
|
|
|
+ def test_to_dict(self):
|
|
|
|
|
+ """Test converting failed translation to dictionary."""
|
|
|
|
|
+ failure = FailedTranslation(
|
|
|
|
|
+ work_id="test_work",
|
|
|
|
|
+ chapter_index=5,
|
|
|
|
|
+ error_type="ValueError",
|
|
|
|
|
+ error_message="Test error",
|
|
|
|
|
+ source_text="Test content",
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ data = failure.to_dict()
|
|
|
|
|
+
|
|
|
|
|
+ assert data["work_id"] == "test_work"
|
|
|
|
|
+ assert data["chapter_index"] == 5
|
|
|
|
|
+ assert data["error_type"] == "ValueError"
|
|
|
|
|
+ assert data["source_text"] == "Test content"
|
|
|
|
|
+ assert "timestamp" in data
|
|
|
|
|
+
|
|
|
|
|
+ def test_from_dict(self):
|
|
|
|
|
+ """Test creating failed translation from dictionary."""
|
|
|
|
|
+ data = {
|
|
|
|
|
+ "work_id": "test_work",
|
|
|
|
|
+ "chapter_index": 5,
|
|
|
|
|
+ "error_type": "ValueError",
|
|
|
|
|
+ "error_message": "Test error",
|
|
|
|
|
+ "timestamp": "2024-01-01T00:00:00",
|
|
|
|
|
+ "retry_count": 2,
|
|
|
|
|
+ "source_text": "Test content",
|
|
|
|
|
+ "resolved": False,
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ failure = FailedTranslation.from_dict(data)
|
|
|
|
|
+
|
|
|
|
|
+ assert failure.work_id == "test_work"
|
|
|
|
|
+ assert failure.chapter_index == 5
|
|
|
|
|
+ assert failure.retry_count == 2
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class TestFailureListManager:
|
|
|
|
|
+ """Test cases for FailureListManager class."""
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.fixture
|
|
|
|
|
+ def temp_dir(self):
|
|
|
|
|
+ """Create a temporary directory for testing."""
|
|
|
|
|
+ with tempfile.TemporaryDirectory() as tmp:
|
|
|
|
|
+ yield Path(tmp)
|
|
|
|
|
+
|
|
|
|
|
+ @pytest.fixture
|
|
|
|
|
+ def manager(self, temp_dir):
|
|
|
|
|
+ """Create a failure list manager for testing."""
|
|
|
|
|
+ return FailureListManager(temp_dir)
|
|
|
|
|
+
|
|
|
|
|
+ def test_init(self, temp_dir):
|
|
|
|
|
+ """Test FailureListManager initialization."""
|
|
|
|
|
+ manager = FailureListManager(temp_dir)
|
|
|
|
|
+
|
|
|
|
|
+ assert manager.storage_dir == temp_dir
|
|
|
|
|
+ assert manager.failure_list_path == temp_dir / "translate_failed.jsonl"
|
|
|
|
|
+
|
|
|
|
|
+ def test_record_failure(self, manager):
|
|
|
|
|
+ """Test recording a failure."""
|
|
|
|
|
+ error = ValueError("Test error")
|
|
|
|
|
+ failure = manager.record_failure(
|
|
|
|
|
+ work_id="test_work",
|
|
|
|
|
+ chapter_index=5,
|
|
|
|
|
+ error=error,
|
|
|
|
|
+ source_text="Test content"
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ assert failure.work_id == "test_work"
|
|
|
|
|
+ assert failure.chapter_index == 5
|
|
|
|
|
+ assert failure.error_type == "ValueError"
|
|
|
|
|
+ assert failure.source_text == "Test content"
|
|
|
|
|
+
|
|
|
|
|
+ # Check file was created
|
|
|
|
|
+ assert manager.failure_list_path.exists()
|
|
|
|
|
+
|
|
|
|
|
+ def test_load_failures(self, manager):
|
|
|
|
|
+ """Test loading failures from file."""
|
|
|
|
|
+ # Record some failures
|
|
|
|
|
+ error1 = ValueError("Error 1")
|
|
|
|
|
+ error2 = RuntimeError("Error 2")
|
|
|
|
|
+
|
|
|
|
|
+ manager.record_failure("work1", 1, error1)
|
|
|
|
|
+ manager.record_failure("work2", 2, error2)
|
|
|
|
|
+
|
|
|
|
|
+ # Load them back
|
|
|
|
|
+ failures = list(manager.load_failures())
|
|
|
|
|
+
|
|
|
|
|
+ assert len(failures) == 2
|
|
|
|
|
+ assert any(f.work_id == "work1" for f in failures)
|
|
|
|
|
+ assert any(f.work_id == "work2" for f in failures)
|
|
|
|
|
+
|
|
|
|
|
+ def test_load_failures_exclude_resolved(self, manager):
|
|
|
|
|
+ """Test loading failures excluding resolved ones."""
|
|
|
|
|
+ # Record failures
|
|
|
|
|
+ error1 = ValueError("Error 1")
|
|
|
|
|
+ error2 = RuntimeError("Error 2")
|
|
|
|
|
+
|
|
|
|
|
+ manager.record_failure("work1", 1, error1)
|
|
|
|
|
+ manager.record_failure("work2", 2, error2)
|
|
|
|
|
+
|
|
|
|
|
+ # Mark one as resolved
|
|
|
|
|
+ manager.mark_resolved("work1", 1)
|
|
|
|
|
+
|
|
|
|
|
+ # Load excluding resolved
|
|
|
|
|
+ failures = list(manager.load_failures(include_resolved=False))
|
|
|
|
|
+
|
|
|
|
|
+ assert len(failures) == 1
|
|
|
|
|
+ assert failures[0].work_id == "work2"
|
|
|
|
|
+
|
|
|
|
|
+ def test_get_failures(self, manager):
|
|
|
|
|
+ """Test getting failures as a list."""
|
|
|
|
|
+ error = ValueError("Test error")
|
|
|
|
|
+ manager.record_failure("test_work", 1, error)
|
|
|
|
|
+
|
|
|
|
|
+ failures = manager.get_failures()
|
|
|
|
|
+
|
|
|
|
|
+ assert len(failures) == 1
|
|
|
|
|
+ assert failures[0].work_id == "test_work"
|
|
|
|
|
+
|
|
|
|
|
+ def test_get_failures_for_work(self, manager):
|
|
|
|
|
+ """Test getting failures for a specific work."""
|
|
|
|
|
+ error = ValueError("Test error")
|
|
|
|
|
+ manager.record_failure("work1", 1, error)
|
|
|
|
|
+ manager.record_failure("work2", 2, error)
|
|
|
|
|
+
|
|
|
|
|
+ work1_failures = manager.get_failures_for_work("work1")
|
|
|
|
|
+
|
|
|
|
|
+ assert len(work1_failures) == 1
|
|
|
|
|
+ assert work1_failures[0].work_id == "work1"
|
|
|
|
|
+
|
|
|
|
|
+ def test_mark_resolved(self, manager):
|
|
|
|
|
+ """Test marking a failure as resolved."""
|
|
|
|
|
+ error = ValueError("Test error")
|
|
|
|
|
+ manager.record_failure("test_work", 1, error)
|
|
|
|
|
+
|
|
|
|
|
+ manager.mark_resolved("test_work", 1)
|
|
|
|
|
+
|
|
|
|
|
+ # Load failures
|
|
|
|
|
+ failures = manager.get_failures(include_resolved=True)
|
|
|
|
|
+
|
|
|
|
|
+ assert len(failures) == 1
|
|
|
|
|
+ assert failures[0].resolved is True
|
|
|
|
|
+
|
|
|
|
|
+ def test_export_failure_list_jsonl(self, manager, temp_dir):
|
|
|
|
|
+ """Test exporting failure list in JSONL format."""
|
|
|
|
|
+ error = ValueError("Test error")
|
|
|
|
|
+ manager.record_failure("test_work", 1, error)
|
|
|
|
|
+
|
|
|
|
|
+ output_path = temp_dir / "export.jsonl"
|
|
|
|
|
+ result = manager.export_failure_list(output_path, format="jsonl")
|
|
|
|
|
+
|
|
|
|
|
+ assert result == output_path
|
|
|
|
|
+ assert result.exists()
|
|
|
|
|
+
|
|
|
|
|
+ # Verify content
|
|
|
|
|
+ with open(result, "r") as f:
|
|
|
|
|
+ lines = f.readlines()
|
|
|
|
|
+ assert len(lines) == 1
|
|
|
|
|
+
|
|
|
|
|
+ def test_export_failure_list_json(self, manager, temp_dir):
|
|
|
|
|
+ """Test exporting failure list in JSON format."""
|
|
|
|
|
+ error = ValueError("Test error")
|
|
|
|
|
+ manager.record_failure("test_work", 1, error)
|
|
|
|
|
+
|
|
|
|
|
+ output_path = temp_dir / "export.json"
|
|
|
|
|
+ result = manager.export_failure_list(output_path, format="json")
|
|
|
|
|
+
|
|
|
|
|
+ assert result == output_path
|
|
|
|
|
+ assert result.exists()
|
|
|
|
|
+
|
|
|
|
|
+ # Verify content
|
|
|
|
|
+ with open(result, "r") as f:
|
|
|
|
|
+ data = json.load(f)
|
|
|
|
|
+ assert isinstance(data, list)
|
|
|
|
|
+ assert len(data) == 1
|
|
|
|
|
+
|
|
|
|
|
+ def test_export_failure_list_csv(self, manager, temp_dir):
|
|
|
|
|
+ """Test exporting failure list in CSV format."""
|
|
|
|
|
+ error = ValueError("Test error")
|
|
|
|
|
+ manager.record_failure("test_work", 1, error)
|
|
|
|
|
+
|
|
|
|
|
+ output_path = temp_dir / "export.csv"
|
|
|
|
|
+ result = manager.export_failure_list(output_path, format="csv")
|
|
|
|
|
+
|
|
|
|
|
+ assert result == output_path
|
|
|
|
|
+ assert result.exists()
|
|
|
|
|
+
|
|
|
|
|
+ def test_export_failure_list_unsupported_format(self, manager, temp_dir):
|
|
|
|
|
+ """Test exporting with unsupported format."""
|
|
|
|
|
+ error = ValueError("Test error")
|
|
|
|
|
+ manager.record_failure("test_work", 1, error)
|
|
|
|
|
+
|
|
|
|
|
+ with pytest.raises(ValueError, match="Unsupported format"):
|
|
|
|
|
+ manager.export_failure_list(format="xml")
|
|
|
|
|
+
|
|
|
|
|
+ def test_get_failure_summary(self, manager):
|
|
|
|
|
+ """Test getting failure summary."""
|
|
|
|
|
+ error1 = ValueError("Error 1")
|
|
|
|
|
+ error2 = RuntimeError("Error 2")
|
|
|
|
|
+ error3 = ValueError("Error 3")
|
|
|
|
|
+
|
|
|
|
|
+ manager.record_failure("work1", 1, error1)
|
|
|
|
|
+ manager.record_failure("work1", 2, error2)
|
|
|
|
|
+ manager.record_failure("work2", 1, error3)
|
|
|
|
|
+
|
|
|
|
|
+ summary = manager.get_failure_summary()
|
|
|
|
|
+
|
|
|
|
|
+ assert summary["total_failures"] == 3
|
|
|
|
|
+ assert summary["resolved_count"] == 0
|
|
|
|
|
+ assert summary["unresolved_count"] == 3
|
|
|
|
|
+ assert summary["by_error_type"]["ValueError"] == 2
|
|
|
|
|
+ assert summary["by_error_type"]["RuntimeError"] == 1
|
|
|
|
|
+ assert summary["by_work"]["work1"] == 2
|
|
|
|
|
+ assert summary["by_work"]["work2"] == 1
|
|
|
|
|
+
|
|
|
|
|
+ def test_get_retry_list(self, manager):
|
|
|
|
|
+ """Test getting retry list."""
|
|
|
|
|
+ error = ValueError("Test error")
|
|
|
|
|
+ manager.record_failure("work1", 1, error)
|
|
|
|
|
+ manager.record_failure("work2", 2, error)
|
|
|
|
|
+
|
|
|
|
|
+ retry_list = manager.get_retry_list()
|
|
|
|
|
+
|
|
|
|
|
+ assert len(retry_list) == 2
|
|
|
|
|
+
|
|
|
|
|
+ def test_get_retry_list_for_work(self, manager):
|
|
|
|
|
+ """Test getting retry list for specific work."""
|
|
|
|
|
+ error = ValueError("Test error")
|
|
|
|
|
+ manager.record_failure("work1", 1, error)
|
|
|
|
|
+ manager.record_failure("work2", 2, error)
|
|
|
|
|
+
|
|
|
|
|
+ retry_list = manager.get_retry_list(work_id="work1")
|
|
|
|
|
+
|
|
|
|
|
+ assert len(retry_list) == 1
|
|
|
|
|
+ assert retry_list[0].work_id == "work1"
|
|
|
|
|
+
|
|
|
|
|
+ def test_increment_retry_count(self, manager):
|
|
|
|
|
+ """Test incrementing retry count."""
|
|
|
|
|
+ error = ValueError("Test error")
|
|
|
|
|
+ manager.record_failure("test_work", 1, error)
|
|
|
|
|
+
|
|
|
|
|
+ manager.increment_retry_count("test_work", 1)
|
|
|
|
|
+
|
|
|
|
|
+ failures = manager.get_failures()
|
|
|
|
|
+ assert failures[0].retry_count == 1
|
|
|
|
|
+
|
|
|
|
|
+ def test_clear_resolved(self, manager):
|
|
|
|
|
+ """Test clearing resolved failures."""
|
|
|
|
|
+ error1 = ValueError("Error 1")
|
|
|
|
|
+ error2 = RuntimeError("Error 2")
|
|
|
|
|
+
|
|
|
|
|
+ manager.record_failure("work1", 1, error1)
|
|
|
|
|
+ manager.record_failure("work2", 2, error2)
|
|
|
|
|
+
|
|
|
|
|
+ # Mark one as resolved
|
|
|
|
|
+ manager.mark_resolved("work1", 1)
|
|
|
|
|
+
|
|
|
|
|
+ # Clear resolved
|
|
|
|
|
+ removed = manager.clear_resolved()
|
|
|
|
|
+
|
|
|
|
|
+ assert removed == 1
|
|
|
|
|
+
|
|
|
|
|
+ # Only unresolved should remain
|
|
|
|
|
+ failures = manager.get_failures(include_resolved=False)
|
|
|
|
|
+ assert len(failures) == 1
|
|
|
|
|
+ assert failures[0].work_id == "work2"
|
|
|
|
|
+
|
|
|
|
|
+ def test_clear_all(self, manager):
|
|
|
|
|
+ """Test clearing all failures."""
|
|
|
|
|
+ error = ValueError("Test error")
|
|
|
|
|
+ manager.record_failure("test_work", 1, error)
|
|
|
|
|
+
|
|
|
|
|
+ assert manager.failure_list_path.exists()
|
|
|
|
|
+
|
|
|
|
|
+ manager.clear_all()
|
|
|
|
|
+
|
|
|
|
|
+ assert not manager.failure_list_path.exists()
|
|
|
|
|
+
|
|
|
|
|
+ def test_empty_failure_list(self, manager):
|
|
|
|
|
+ """Test operations on empty failure list."""
|
|
|
|
|
+ failures = manager.get_failures()
|
|
|
|
|
+ assert len(failures) == 0
|
|
|
|
|
+
|
|
|
|
|
+ summary = manager.get_failure_summary()
|
|
|
|
|
+ assert summary["total_failures"] == 0
|
|
|
|
|
+
|
|
|
|
|
+ retry_list = manager.get_retry_list()
|
|
|
|
|
+ assert len(retry_list) == 0
|
|
|
|
|
+
|
|
|
|
|
+ def test_corrupted_line_handling(self, manager, temp_dir):
|
|
|
|
|
+ """Test handling of corrupted lines in failure list."""
|
|
|
|
|
+ # Record a valid failure
|
|
|
|
|
+ error = ValueError("Test error")
|
|
|
|
|
+ manager.record_failure("test_work", 1, error)
|
|
|
|
|
+
|
|
|
|
|
+ # Add a corrupted line
|
|
|
|
|
+ with open(manager.failure_list_path, "a") as f:
|
|
|
|
|
+ f.write("this is not valid json\n")
|
|
|
|
|
+
|
|
|
|
|
+ # Should skip corrupted line
|
|
|
|
|
+ failures = manager.get_failures()
|
|
|
|
|
+ assert len(failures) == 1
|
|
|
|
|
+ assert failures[0].work_id == "test_work"
|