""" Unit tests for the failure list manager module. Tests cover failure list recording, export, and retry functionality. """ import sys from unittest.mock import Mock from pathlib import Path import tempfile import json # Mock torch and transformers before importing sys_mock = Mock() sys.modules["torch"] = sys_mock sys.modules["transformers"] = sys_mock import pytest from src.repository.failure_list import ( FailureListManager, FailedTranslation, ) class TestFailedTranslation: """Test cases for FailedTranslation dataclass.""" def test_create_failed_translation(self): """Test creating a failed translation record.""" failure = FailedTranslation( work_id="test_work", chapter_index=5, error_type="ValueError", error_message="Test error", ) assert failure.work_id == "test_work" assert failure.chapter_index == 5 assert failure.error_type == "ValueError" assert failure.error_message == "Test error" assert failure.resolved is False assert failure.retry_count == 0 def test_to_dict(self): """Test converting failed translation to dictionary.""" failure = FailedTranslation( work_id="test_work", chapter_index=5, error_type="ValueError", error_message="Test error", source_text="Test content", ) data = failure.to_dict() assert data["work_id"] == "test_work" assert data["chapter_index"] == 5 assert data["error_type"] == "ValueError" assert data["source_text"] == "Test content" assert "timestamp" in data def test_from_dict(self): """Test creating failed translation from dictionary.""" data = { "work_id": "test_work", "chapter_index": 5, "error_type": "ValueError", "error_message": "Test error", "timestamp": "2024-01-01T00:00:00", "retry_count": 2, "source_text": "Test content", "resolved": False, } failure = FailedTranslation.from_dict(data) assert failure.work_id == "test_work" assert failure.chapter_index == 5 assert failure.retry_count == 2 class TestFailureListManager: """Test cases for FailureListManager class.""" @pytest.fixture def temp_dir(self): """Create a temporary directory for testing.""" with tempfile.TemporaryDirectory() as tmp: yield Path(tmp) @pytest.fixture def manager(self, temp_dir): """Create a failure list manager for testing.""" return FailureListManager(temp_dir) def test_init(self, temp_dir): """Test FailureListManager initialization.""" manager = FailureListManager(temp_dir) assert manager.storage_dir == temp_dir assert manager.failure_list_path == temp_dir / "translate_failed.jsonl" def test_record_failure(self, manager): """Test recording a failure.""" error = ValueError("Test error") failure = manager.record_failure( work_id="test_work", chapter_index=5, error=error, source_text="Test content" ) assert failure.work_id == "test_work" assert failure.chapter_index == 5 assert failure.error_type == "ValueError" assert failure.source_text == "Test content" # Check file was created assert manager.failure_list_path.exists() def test_load_failures(self, manager): """Test loading failures from file.""" # Record some failures error1 = ValueError("Error 1") error2 = RuntimeError("Error 2") manager.record_failure("work1", 1, error1) manager.record_failure("work2", 2, error2) # Load them back failures = list(manager.load_failures()) assert len(failures) == 2 assert any(f.work_id == "work1" for f in failures) assert any(f.work_id == "work2" for f in failures) def test_load_failures_exclude_resolved(self, manager): """Test loading failures excluding resolved ones.""" # Record failures error1 = ValueError("Error 1") error2 = RuntimeError("Error 2") manager.record_failure("work1", 1, error1) manager.record_failure("work2", 2, error2) # Mark one as resolved manager.mark_resolved("work1", 1) # Load excluding resolved failures = list(manager.load_failures(include_resolved=False)) assert len(failures) == 1 assert failures[0].work_id == "work2" def test_get_failures(self, manager): """Test getting failures as a list.""" error = ValueError("Test error") manager.record_failure("test_work", 1, error) failures = manager.get_failures() assert len(failures) == 1 assert failures[0].work_id == "test_work" def test_get_failures_for_work(self, manager): """Test getting failures for a specific work.""" error = ValueError("Test error") manager.record_failure("work1", 1, error) manager.record_failure("work2", 2, error) work1_failures = manager.get_failures_for_work("work1") assert len(work1_failures) == 1 assert work1_failures[0].work_id == "work1" def test_mark_resolved(self, manager): """Test marking a failure as resolved.""" error = ValueError("Test error") manager.record_failure("test_work", 1, error) manager.mark_resolved("test_work", 1) # Load failures failures = manager.get_failures(include_resolved=True) assert len(failures) == 1 assert failures[0].resolved is True def test_export_failure_list_jsonl(self, manager, temp_dir): """Test exporting failure list in JSONL format.""" error = ValueError("Test error") manager.record_failure("test_work", 1, error) output_path = temp_dir / "export.jsonl" result = manager.export_failure_list(output_path, format="jsonl") assert result == output_path assert result.exists() # Verify content with open(result, "r") as f: lines = f.readlines() assert len(lines) == 1 def test_export_failure_list_json(self, manager, temp_dir): """Test exporting failure list in JSON format.""" error = ValueError("Test error") manager.record_failure("test_work", 1, error) output_path = temp_dir / "export.json" result = manager.export_failure_list(output_path, format="json") assert result == output_path assert result.exists() # Verify content with open(result, "r") as f: data = json.load(f) assert isinstance(data, list) assert len(data) == 1 def test_export_failure_list_csv(self, manager, temp_dir): """Test exporting failure list in CSV format.""" error = ValueError("Test error") manager.record_failure("test_work", 1, error) output_path = temp_dir / "export.csv" result = manager.export_failure_list(output_path, format="csv") assert result == output_path assert result.exists() def test_export_failure_list_unsupported_format(self, manager, temp_dir): """Test exporting with unsupported format.""" error = ValueError("Test error") manager.record_failure("test_work", 1, error) with pytest.raises(ValueError, match="Unsupported format"): manager.export_failure_list(format="xml") def test_get_failure_summary(self, manager): """Test getting failure summary.""" error1 = ValueError("Error 1") error2 = RuntimeError("Error 2") error3 = ValueError("Error 3") manager.record_failure("work1", 1, error1) manager.record_failure("work1", 2, error2) manager.record_failure("work2", 1, error3) summary = manager.get_failure_summary() assert summary["total_failures"] == 3 assert summary["resolved_count"] == 0 assert summary["unresolved_count"] == 3 assert summary["by_error_type"]["ValueError"] == 2 assert summary["by_error_type"]["RuntimeError"] == 1 assert summary["by_work"]["work1"] == 2 assert summary["by_work"]["work2"] == 1 def test_get_retry_list(self, manager): """Test getting retry list.""" error = ValueError("Test error") manager.record_failure("work1", 1, error) manager.record_failure("work2", 2, error) retry_list = manager.get_retry_list() assert len(retry_list) == 2 def test_get_retry_list_for_work(self, manager): """Test getting retry list for specific work.""" error = ValueError("Test error") manager.record_failure("work1", 1, error) manager.record_failure("work2", 2, error) retry_list = manager.get_retry_list(work_id="work1") assert len(retry_list) == 1 assert retry_list[0].work_id == "work1" def test_increment_retry_count(self, manager): """Test incrementing retry count.""" error = ValueError("Test error") manager.record_failure("test_work", 1, error) manager.increment_retry_count("test_work", 1) failures = manager.get_failures() assert failures[0].retry_count == 1 def test_clear_resolved(self, manager): """Test clearing resolved failures.""" error1 = ValueError("Error 1") error2 = RuntimeError("Error 2") manager.record_failure("work1", 1, error1) manager.record_failure("work2", 2, error2) # Mark one as resolved manager.mark_resolved("work1", 1) # Clear resolved removed = manager.clear_resolved() assert removed == 1 # Only unresolved should remain failures = manager.get_failures(include_resolved=False) assert len(failures) == 1 assert failures[0].work_id == "work2" def test_clear_all(self, manager): """Test clearing all failures.""" error = ValueError("Test error") manager.record_failure("test_work", 1, error) assert manager.failure_list_path.exists() manager.clear_all() assert not manager.failure_list_path.exists() def test_empty_failure_list(self, manager): """Test operations on empty failure list.""" failures = manager.get_failures() assert len(failures) == 0 summary = manager.get_failure_summary() assert summary["total_failures"] == 0 retry_list = manager.get_retry_list() assert len(retry_list) == 0 def test_corrupted_line_handling(self, manager, temp_dir): """Test handling of corrupted lines in failure list.""" # Record a valid failure error = ValueError("Test error") manager.record_failure("test_work", 1, error) # Add a corrupted line with open(manager.failure_list_path, "a") as f: f.write("this is not valid json\n") # Should skip corrupted line failures = manager.get_failures() assert len(failures) == 1 assert failures[0].work_id == "test_work"