""" Unit tests for the fingerprint module. Tests cover FileFingerprint, FingerprintStore, FingerprintService, and BatchFingerprintChecker functionality. """ import tempfile from pathlib import Path import pytest from src.fingerprint.calculator import FileFingerprint from src.fingerprint.store import FingerprintStore from src.fingerprint.service import FingerprintService from src.fingerprint.batch import BatchFingerprintChecker from src.repository import Repository from src.repository.models import WorkItem, WorkStatus class TestFileFingerprint: """Test FileFingerprint calculator.""" def test_calculate_md5(self): """Test MD5 calculation.""" with tempfile.NamedTemporaryFile(delete=False) as f: f.write(b"Hello, World!") temp_path = f.name try: calc = FileFingerprint() md5 = calc.calculate_md5(temp_path) # Known MD5 for "Hello, World!" assert md5 == "65a8e27d8879283831b664bd8b7f0ad4" assert len(md5) == 32 finally: Path(temp_path).unlink() def test_calculate_md5_different_content(self): """Test that different content produces different hashes.""" with tempfile.TemporaryDirectory() as tmpdir: file1 = Path(tmpdir) / "file1.txt" file2 = Path(tmpdir) / "file2.txt" file1.write_text("content one") file2.write_text("content two") calc = FileFingerprint() md5_1 = calc.calculate_md5(str(file1)) md5_2 = calc.calculate_md5(str(file2)) assert md5_1 != md5_2 def test_calculate_md5_same_content(self): """Test that same content produces same hash.""" with tempfile.TemporaryDirectory() as tmpdir: file1 = Path(tmpdir) / "file1.txt" file2 = Path(tmpdir) / "file2.txt" content = "identical content" file1.write_text(content) file2.write_text(content) calc = FileFingerprint() md5_1 = calc.calculate_md5(str(file1)) md5_2 = calc.calculate_md5(str(file2)) assert md5_1 == md5_2 def test_calculate_md5_large_file(self): """Test MD5 calculation for larger files.""" with tempfile.NamedTemporaryFile(delete=False) as f: # Write 100KB of data f.write(b"x" * 100_000) temp_path = f.name try: calc = FileFingerprint() md5 = calc.calculate_md5(temp_path) assert len(md5) == 32 finally: Path(temp_path).unlink() def test_calculate_quick_hash(self): """Test quick hash calculation.""" with tempfile.NamedTemporaryFile(delete=False) as f: f.write(b"Hello, World!") temp_path = f.name try: calc = FileFingerprint() quick = calc.calculate_quick_hash(temp_path, sample_size=5) # Hash of first 5 bytes "Hello" is different from full hash # MD5 of "Hello" is 8b1a9953c4611296a827abf8c47804d7 assert quick == "8b1a9953c4611296a827abf8c47804d7" assert len(quick) == 32 # Quick hash should differ from full hash full_hash = calc.calculate_md5(temp_path) assert quick != full_hash finally: Path(temp_path).unlink() def test_get_file_size(self): """Test getting file size.""" with tempfile.NamedTemporaryFile(delete=False) as f: content = b"Test content for size" f.write(content) temp_path = f.name try: calc = FileFingerprint() size = calc.get_file_size(temp_path) assert size == len(content) finally: Path(temp_path).unlink() def test_get_file_meta(self): """Test getting file metadata.""" with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as f: f.write(b"content") temp_path = f.name try: calc = FileFingerprint() meta = calc.get_file_meta(temp_path) assert "name" in meta assert "size" in meta assert "modified_time" in meta assert meta["size"] == 7 assert meta["name"].endswith(".txt") finally: Path(temp_path).unlink() def test_file_not_found(self): """Test FileNotFoundError for non-existent file.""" calc = FileFingerprint() with pytest.raises(FileNotFoundError): calc.calculate_md5("/nonexistent/file.txt") with pytest.raises(FileNotFoundError): calc.calculate_quick_hash("/nonexistent/file.txt") with pytest.raises(FileNotFoundError): calc.get_file_size("/nonexistent/file.txt") with pytest.raises(FileNotFoundError): calc.get_file_meta("/nonexistent/file.txt") class TestFingerprintStore: """Test FingerprintStore.""" def test_init_creates_index(self): """Test that initialization creates an empty index.""" with tempfile.TemporaryDirectory() as tmpdir: repo = Repository(Path(tmpdir)) store = FingerprintStore(repo) assert isinstance(store.index, dict) assert len(store.index) == 0 def test_load_existing_index(self): """Test loading an existing index.""" with tempfile.TemporaryDirectory() as tmpdir: repo = Repository(Path(tmpdir)) storage_dir = Path(tmpdir) # Create a pre-existing index index_file = storage_dir / "fingerprints.json" index_file.write_text('{"abc123": {"work_id": "work1"}}') store = FingerprintStore(repo) assert "abc123" in store.index def test_add_fingerprint(self): """Test adding a fingerprint.""" with tempfile.TemporaryDirectory() as tmpdir: # Create test file test_file = Path(tmpdir) / "test.txt" test_file.write_text("test content") repo = Repository(Path(tmpdir)) store = FingerprintStore(repo) store.add_fingerprint("work123", str(test_file), {"name": "test.txt"}) assert len(store.index) == 1 # Get the fingerprint from src.fingerprint.calculator import FileFingerprint calc = FileFingerprint() fp = calc.calculate_md5(str(test_file)) assert fp in store.index def test_check_duplicate(self): """Test checking for duplicates.""" with tempfile.TemporaryDirectory() as tmpdir: # Create test file test_file = Path(tmpdir) / "test.txt" test_file.write_text("test content") repo = Repository(Path(tmpdir)) store = FingerprintStore(repo) # Not duplicate initially work_id = store.check_duplicate(str(test_file)) assert work_id is None # Add fingerprint store.add_fingerprint("work123", str(test_file), {}) # Now it's a duplicate work_id = store.check_duplicate(str(test_file)) assert work_id == "work123" def test_check_duplicate_copy(self): """Test that file copies are detected as duplicates.""" with tempfile.TemporaryDirectory() as tmpdir: # Create original and copy file1 = Path(tmpdir) / "original.txt" file2 = Path(tmpdir) / "copy.txt" content = "same content" file1.write_text(content) file2.write_text(content) repo = Repository(Path(tmpdir)) store = FingerprintStore(repo) # Register first file store.add_fingerprint("work123", str(file1), {}) # Check second file work_id = store.check_duplicate(str(file2)) assert work_id == "work123" def test_get_work_history(self): """Test getting fingerprint history for a work.""" with tempfile.TemporaryDirectory() as tmpdir: repo = Repository(Path(tmpdir)) store = FingerprintStore(repo) # Add multiple files for same work for i in range(3): test_file = Path(tmpdir) / f"file{i}.txt" test_file.write_text(f"content {i}") store.add_fingerprint("work123", str(test_file), {"index": i}) history = store.get_work_history("work123") assert len(history) == 3 def test_remove_fingerprint(self): """Test removing a fingerprint.""" with tempfile.TemporaryDirectory() as tmpdir: test_file = Path(tmpdir) / "test.txt" test_file.write_text("content") repo = Repository(Path(tmpdir)) store = FingerprintStore(repo) store.add_fingerprint("work123", str(test_file), {}) assert len(store.index) == 1 removed = store.remove_fingerprint(str(test_file)) assert removed is True assert len(store.index) == 0 def test_remove_nonexistent_fingerprint(self): """Test removing a non-existent fingerprint.""" with tempfile.TemporaryDirectory() as tmpdir: test_file = Path(tmpdir) / "test.txt" test_file.write_text("content") repo = Repository(Path(tmpdir)) store = FingerprintStore(repo) removed = store.remove_fingerprint(str(test_file)) assert removed is False def test_clear(self): """Test clearing all fingerprints.""" with tempfile.TemporaryDirectory() as tmpdir: repo = Repository(Path(tmpdir)) store = FingerprintStore(repo) # Add some fingerprints for i in range(3): test_file = Path(tmpdir) / f"file{i}.txt" test_file.write_text(f"content {i}") store.add_fingerprint(f"work{i}", str(test_file), {}) assert len(store.index) == 3 store.clear() assert len(store.index) == 0 def test_get_stats(self): """Test getting store statistics.""" with tempfile.TemporaryDirectory() as tmpdir: repo = Repository(Path(tmpdir)) store = FingerprintStore(repo) # Add fingerprints for i in range(5): test_file = Path(tmpdir) / f"file{i}.txt" test_file.write_text(f"content {i}") work_id = "work1" if i < 3 else "work2" store.add_fingerprint(work_id, str(test_file), {}) stats = store.get_stats() assert stats["total_fingerprints"] == 5 assert stats["unique_works"] == 2 def test_persistence(self): """Test that index persists across store instances.""" with tempfile.TemporaryDirectory() as tmpdir: test_file = Path(tmpdir) / "test.txt" test_file.write_text("content") repo = Repository(Path(tmpdir)) # Create store and add fingerprint store1 = FingerprintStore(repo) store1.add_fingerprint("work123", str(test_file), {}) # Create new store instance store2 = FingerprintStore(repo) # Should have the fingerprint work_id = store2.check_duplicate(str(test_file)) assert work_id == "work123" class TestFingerprintService: """Test FingerprintService.""" def test_check_before_import_new_file(self): """Test checking a new file before import.""" with tempfile.TemporaryDirectory() as tmpdir: test_file = Path(tmpdir) / "new.txt" test_file.write_text("new content") repo = Repository(Path(tmpdir)) service = FingerprintService(repo) is_dup, work_id = service.check_before_import(str(test_file)) assert is_dup is False assert work_id is None def test_check_before_import_duplicate(self): """Test checking a duplicate file before import.""" with tempfile.TemporaryDirectory() as tmpdir: test_file = Path(tmpdir) / "test.txt" test_file.write_text("content") repo = Repository(Path(tmpdir)) # Create a completed work test_file_path = Path(tmpdir) / "source.txt" test_file_path.write_text("content") work = repo.create_work(str(test_file_path), title="Test") work.status = WorkStatus.COMPLETED repo.update_work(work) service = FingerprintService(repo) service.register_import(work.work_id, str(test_file_path)) # Check duplicate is_dup, work_id = service.check_before_import(str(test_file_path)) assert is_dup is True assert work_id == work.work_id def test_check_duplicate_incomplete_work(self): """Test that incomplete works don't count as duplicates.""" with tempfile.TemporaryDirectory() as tmpdir: test_file = Path(tmpdir) / "test.txt" test_file.write_text("content") repo = Repository(Path(tmpdir)) # Create an incomplete work work = repo.create_work(str(test_file)) # Status is PENDING, not COMPLETED service = FingerprintService(repo) service.register_import(work.work_id, str(test_file)) # Should not be a duplicate is_dup, work_id = service.check_before_import(str(test_file)) assert is_dup is False def test_register_import(self): """Test registering an import.""" with tempfile.TemporaryDirectory() as tmpdir: test_file = Path(tmpdir) / "test.txt" test_file.write_text("content") repo = Repository(Path(tmpdir)) service = FingerprintService(repo) service.register_import("work123", str(test_file)) # Verify it's now tracked is_dup, work_id = service.check_before_import(str(test_file)) # Note: won't be duplicate until work is completed assert is_dup is False # But fingerprint is in store fp = service.store.check_duplicate(str(test_file)) assert fp == "work123" def test_register_batch_import(self): """Test registering multiple files.""" with tempfile.TemporaryDirectory() as tmpdir: files = [] for i in range(3): f = Path(tmpdir) / f"file{i}.txt" f.write_text(f"content {i}") files.append(str(f)) repo = Repository(Path(tmpdir)) service = FingerprintService(repo) service.register_batch_import("work123", files) stats = service.store.get_stats() assert stats["total_fingerprints"] == 3 def test_get_fingerprint(self): """Test getting file fingerprint.""" with tempfile.NamedTemporaryFile(delete=False) as f: f.write(b"known content") temp_path = f.name try: repo = Repository(Path(temp_path).parent) service = FingerprintService(repo) fp = service.get_fingerprint(temp_path) assert len(fp) == 32 assert isinstance(fp, str) finally: Path(temp_path).unlink() def test_get_file_info(self): """Test getting comprehensive file info.""" with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as f: f.write(b"test content") temp_path = f.name try: repo = Repository(Path(temp_path).parent) service = FingerprintService(repo) info = service.get_file_info(temp_path) assert "fingerprint" in info assert "metadata" in info assert "is_duplicate" in info assert "existing_work_id" in info assert len(info["fingerprint"]) == 32 assert info["metadata"]["size"] == 12 finally: Path(temp_path).unlink() class TestBatchFingerprintChecker: """Test BatchFingerprintChecker.""" def test_check_files(self): """Test checking multiple files.""" with tempfile.TemporaryDirectory() as tmpdir: # Create files file1 = Path(tmpdir) / "file1.txt" file2 = Path(tmpdir) / "file2.txt" file1.write_text("content 1") file2.write_text("content 2") repo = Repository(Path(tmpdir)) service = FingerprintService(repo) checker = BatchFingerprintChecker(service) results = checker.check_files([str(file1), str(file2)]) assert len(results) == 2 assert str(file1) in results assert str(file2) in results # Both should be non-duplicate assert results[str(file1)] == (False, None) assert results[str(file2)] == (False, None) def test_check_files_with_duplicate(self): """Test checking files with one duplicate.""" with tempfile.TemporaryDirectory() as tmpdir: file1 = Path(tmpdir) / "file1.txt" file2 = Path(tmpdir) / "file2.txt" file1.write_text("same") file2.write_text("same") repo = Repository(Path(tmpdir)) service = FingerprintService(repo) # Register first file work = repo.create_work(str(file1)) work.status = WorkStatus.COMPLETED repo.update_work(work) service.register_import(work.work_id, str(file1)) checker = BatchFingerprintChecker(service) results = checker.check_files([str(file1), str(file2)]) # file1 should be duplicate, file2 should be too (same content) assert results[str(file1)][0] is True assert results[str(file2)][0] is True def test_filter_new_files(self): """Test filtering new files.""" with tempfile.TemporaryDirectory() as tmpdir: files = [] for i in range(3): f = Path(tmpdir) / f"file{i}.txt" f.write_text(f"content {i}") files.append(str(f)) repo = Repository(Path(tmpdir)) service = FingerprintService(repo) checker = BatchFingerprintChecker(service) new_files = checker.filter_new_files(files) assert len(new_files) == 3 def test_filter_new_files_with_duplicate(self): """Test filtering removes duplicates.""" with tempfile.TemporaryDirectory() as tmpdir: file1 = Path(tmpdir) / "file1.txt" file2 = Path(tmpdir) / "file2.txt" file1.write_text("same") file2.write_text("different") repo = Repository(Path(tmpdir)) service = FingerprintService(repo) # Register file1 work = repo.create_work(str(file1)) work.status = WorkStatus.COMPLETED repo.update_work(work) service.register_import(work.work_id, str(file1)) checker = BatchFingerprintChecker(service) new_files = checker.filter_new_files([str(file1), str(file2)]) # Only file2 should be new assert len(new_files) == 1 assert str(file2) in new_files def test_filter_duplicate_files(self): """Test filtering to get only duplicates.""" with tempfile.TemporaryDirectory() as tmpdir: file1 = Path(tmpdir) / "file1.txt" file2 = Path(tmpdir) / "file2.txt" file1.write_text("same content") file2.write_text("different") repo = Repository(Path(tmpdir)) service = FingerprintService(repo) # Register file1 work = repo.create_work(str(file1)) work.status = WorkStatus.COMPLETED repo.update_work(work) service.register_import(work.work_id, str(file1)) checker = BatchFingerprintChecker(service) duplicates = checker.filter_duplicate_files([str(file1), str(file2)]) assert len(duplicates) == 1 assert str(file1) in duplicates def test_categorize_files(self): """Test categorizing files.""" with tempfile.TemporaryDirectory() as tmpdir: file1 = Path(tmpdir) / "file1.txt" file2 = Path(tmpdir) / "file2.txt" file3 = Path(tmpdir) / "nonexistent.txt" file1.write_text("same") file2.write_text("different") repo = Repository(Path(tmpdir)) service = FingerprintService(repo) # Register file1 work = repo.create_work(str(file1)) work.status = WorkStatus.COMPLETED repo.update_work(work) service.register_import(work.work_id, str(file1)) checker = BatchFingerprintChecker(service) result = checker.categorize_files([str(file1), str(file2), str(file3)]) assert len(result["duplicate"]) == 1 assert len(result["new"]) == 1 assert len(result["error"]) == 1 assert str(file1) in result["duplicate"] assert str(file2) in result["new"] assert str(file3) in result["error"] def test_get_summary(self): """Test getting summary statistics.""" with tempfile.TemporaryDirectory() as tmpdir: files = [] for i in range(5): f = Path(tmpdir) / f"file{i}.txt" f.write_text(f"content {i}") files.append(str(f)) # Add one non-existent file files.append("/nonexistent/file.txt") repo = Repository(Path(tmpdir)) service = FingerprintService(repo) checker = BatchFingerprintChecker(service) summary = checker.get_summary(files) assert summary["total"] == 6 assert summary["new"] == 5 # All existing files are new assert summary["duplicate"] == 0 assert summary["error"] == 1 # Non-existent file class TestIntegration: """Integration tests for fingerprint module.""" def test_full_duplicate_detection_workflow(self): """Test complete duplicate detection workflow.""" with tempfile.TemporaryDirectory() as tmpdir: # Original file original = Path(tmpdir) / "novel.txt" original.write_text("This is a novel content.") repo = Repository(Path(tmpdir)) service = FingerprintService(repo) # Import original file work = repo.create_work(str(original), title="My Novel") service.register_import(work.work_id, str(original)) # Mark as completed work.status = WorkStatus.COMPLETED repo.update_work(work) # Try to import duplicate (copy with same content) copy = Path(tmpdir) / "novel_copy.txt" copy.write_text("This is a novel content.") is_dup, existing_work_id = service.check_before_import(str(copy)) assert is_dup is True assert existing_work_id == work.work_id def test_batch_import_with_duplicates(self): """Test batch import workflow with duplicates.""" with tempfile.TemporaryDirectory() as tmpdir: # Create files with some duplicates content_sets = [ ("unique1.txt", "content 1"), ("unique2.txt", "content 2"), ("unique3.txt", "content 3"), # Will be duplicated ("copy3.txt", "content 3"), # Duplicate of unique3 ("unique4.txt", "content 4"), ] files = [] for name, content in content_sets: f = Path(tmpdir) / name f.write_text(content) files.append(str(f)) repo = Repository(Path(tmpdir)) service = FingerprintService(repo) checker = BatchFingerprintChecker(service) # First batch - import unique1-3 first_batch = files[:3] for file_path in first_batch: work = repo.create_work(file_path) work.status = WorkStatus.COMPLETED repo.update_work(work) service.register_import(work.work_id, file_path) # Check second batch summary = checker.get_summary(files) assert summary["total"] == 5 # 1 duplicate (copy3), 4 new (unique1, unique2, unique4, copy3 detected as dup) assert summary["duplicate"] >= 1 def test_fingerprint_survives_repository_restart(self): """Test that fingerprints persist across repository restarts.""" with tempfile.TemporaryDirectory() as tmpdir: test_file = Path(tmpdir) / "test.txt" test_file.write_text("persistent content") storage_dir = Path(tmpdir) / "storage" # First session repo1 = Repository(storage_dir) service1 = FingerprintService(repo1) work1 = repo1.create_work(str(test_file)) work1.status = WorkStatus.COMPLETED repo1.update_work(work1) service1.register_import(work1.work_id, str(test_file)) # Second session (new instances) repo2 = Repository(storage_dir) service2 = FingerprintService(repo2) is_dup, work_id = service2.check_before_import(str(test_file)) assert is_dup is True assert work_id == work1.work_id