| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724 |
- """
- Unit tests for the fingerprint module.
- Tests cover FileFingerprint, FingerprintStore, FingerprintService,
- and BatchFingerprintChecker functionality.
- """
- import tempfile
- from pathlib import Path
- import pytest
- from src.fingerprint.calculator import FileFingerprint
- from src.fingerprint.store import FingerprintStore
- from src.fingerprint.service import FingerprintService
- from src.fingerprint.batch import BatchFingerprintChecker
- from src.repository import Repository
- from src.repository.models import WorkItem, WorkStatus
- class TestFileFingerprint:
- """Test FileFingerprint calculator."""
- def test_calculate_md5(self):
- """Test MD5 calculation."""
- with tempfile.NamedTemporaryFile(delete=False) as f:
- f.write(b"Hello, World!")
- temp_path = f.name
- try:
- calc = FileFingerprint()
- md5 = calc.calculate_md5(temp_path)
- # Known MD5 for "Hello, World!"
- assert md5 == "65a8e27d8879283831b664bd8b7f0ad4"
- assert len(md5) == 32
- finally:
- Path(temp_path).unlink()
- def test_calculate_md5_different_content(self):
- """Test that different content produces different hashes."""
- with tempfile.TemporaryDirectory() as tmpdir:
- file1 = Path(tmpdir) / "file1.txt"
- file2 = Path(tmpdir) / "file2.txt"
- file1.write_text("content one")
- file2.write_text("content two")
- calc = FileFingerprint()
- md5_1 = calc.calculate_md5(str(file1))
- md5_2 = calc.calculate_md5(str(file2))
- assert md5_1 != md5_2
- def test_calculate_md5_same_content(self):
- """Test that same content produces same hash."""
- with tempfile.TemporaryDirectory() as tmpdir:
- file1 = Path(tmpdir) / "file1.txt"
- file2 = Path(tmpdir) / "file2.txt"
- content = "identical content"
- file1.write_text(content)
- file2.write_text(content)
- calc = FileFingerprint()
- md5_1 = calc.calculate_md5(str(file1))
- md5_2 = calc.calculate_md5(str(file2))
- assert md5_1 == md5_2
- def test_calculate_md5_large_file(self):
- """Test MD5 calculation for larger files."""
- with tempfile.NamedTemporaryFile(delete=False) as f:
- # Write 100KB of data
- f.write(b"x" * 100_000)
- temp_path = f.name
- try:
- calc = FileFingerprint()
- md5 = calc.calculate_md5(temp_path)
- assert len(md5) == 32
- finally:
- Path(temp_path).unlink()
- def test_calculate_quick_hash(self):
- """Test quick hash calculation."""
- with tempfile.NamedTemporaryFile(delete=False) as f:
- f.write(b"Hello, World!")
- temp_path = f.name
- try:
- calc = FileFingerprint()
- quick = calc.calculate_quick_hash(temp_path, sample_size=5)
- # Hash of first 5 bytes "Hello" is different from full hash
- # MD5 of "Hello" is 8b1a9953c4611296a827abf8c47804d7
- assert quick == "8b1a9953c4611296a827abf8c47804d7"
- assert len(quick) == 32
- # Quick hash should differ from full hash
- full_hash = calc.calculate_md5(temp_path)
- assert quick != full_hash
- finally:
- Path(temp_path).unlink()
- def test_get_file_size(self):
- """Test getting file size."""
- with tempfile.NamedTemporaryFile(delete=False) as f:
- content = b"Test content for size"
- f.write(content)
- temp_path = f.name
- try:
- calc = FileFingerprint()
- size = calc.get_file_size(temp_path)
- assert size == len(content)
- finally:
- Path(temp_path).unlink()
- def test_get_file_meta(self):
- """Test getting file metadata."""
- with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as f:
- f.write(b"content")
- temp_path = f.name
- try:
- calc = FileFingerprint()
- meta = calc.get_file_meta(temp_path)
- assert "name" in meta
- assert "size" in meta
- assert "modified_time" in meta
- assert meta["size"] == 7
- assert meta["name"].endswith(".txt")
- finally:
- Path(temp_path).unlink()
- def test_file_not_found(self):
- """Test FileNotFoundError for non-existent file."""
- calc = FileFingerprint()
- with pytest.raises(FileNotFoundError):
- calc.calculate_md5("/nonexistent/file.txt")
- with pytest.raises(FileNotFoundError):
- calc.calculate_quick_hash("/nonexistent/file.txt")
- with pytest.raises(FileNotFoundError):
- calc.get_file_size("/nonexistent/file.txt")
- with pytest.raises(FileNotFoundError):
- calc.get_file_meta("/nonexistent/file.txt")
- class TestFingerprintStore:
- """Test FingerprintStore."""
- def test_init_creates_index(self):
- """Test that initialization creates an empty index."""
- with tempfile.TemporaryDirectory() as tmpdir:
- repo = Repository(Path(tmpdir))
- store = FingerprintStore(repo)
- assert isinstance(store.index, dict)
- assert len(store.index) == 0
- def test_load_existing_index(self):
- """Test loading an existing index."""
- with tempfile.TemporaryDirectory() as tmpdir:
- repo = Repository(Path(tmpdir))
- storage_dir = Path(tmpdir)
- # Create a pre-existing index
- index_file = storage_dir / "fingerprints.json"
- index_file.write_text('{"abc123": {"work_id": "work1"}}')
- store = FingerprintStore(repo)
- assert "abc123" in store.index
- def test_add_fingerprint(self):
- """Test adding a fingerprint."""
- with tempfile.TemporaryDirectory() as tmpdir:
- # Create test file
- test_file = Path(tmpdir) / "test.txt"
- test_file.write_text("test content")
- repo = Repository(Path(tmpdir))
- store = FingerprintStore(repo)
- store.add_fingerprint("work123", str(test_file), {"name": "test.txt"})
- assert len(store.index) == 1
- # Get the fingerprint
- from src.fingerprint.calculator import FileFingerprint
- calc = FileFingerprint()
- fp = calc.calculate_md5(str(test_file))
- assert fp in store.index
- def test_check_duplicate(self):
- """Test checking for duplicates."""
- with tempfile.TemporaryDirectory() as tmpdir:
- # Create test file
- test_file = Path(tmpdir) / "test.txt"
- test_file.write_text("test content")
- repo = Repository(Path(tmpdir))
- store = FingerprintStore(repo)
- # Not duplicate initially
- work_id = store.check_duplicate(str(test_file))
- assert work_id is None
- # Add fingerprint
- store.add_fingerprint("work123", str(test_file), {})
- # Now it's a duplicate
- work_id = store.check_duplicate(str(test_file))
- assert work_id == "work123"
- def test_check_duplicate_copy(self):
- """Test that file copies are detected as duplicates."""
- with tempfile.TemporaryDirectory() as tmpdir:
- # Create original and copy
- file1 = Path(tmpdir) / "original.txt"
- file2 = Path(tmpdir) / "copy.txt"
- content = "same content"
- file1.write_text(content)
- file2.write_text(content)
- repo = Repository(Path(tmpdir))
- store = FingerprintStore(repo)
- # Register first file
- store.add_fingerprint("work123", str(file1), {})
- # Check second file
- work_id = store.check_duplicate(str(file2))
- assert work_id == "work123"
- def test_get_work_history(self):
- """Test getting fingerprint history for a work."""
- with tempfile.TemporaryDirectory() as tmpdir:
- repo = Repository(Path(tmpdir))
- store = FingerprintStore(repo)
- # Add multiple files for same work
- for i in range(3):
- test_file = Path(tmpdir) / f"file{i}.txt"
- test_file.write_text(f"content {i}")
- store.add_fingerprint("work123", str(test_file), {"index": i})
- history = store.get_work_history("work123")
- assert len(history) == 3
- def test_remove_fingerprint(self):
- """Test removing a fingerprint."""
- with tempfile.TemporaryDirectory() as tmpdir:
- test_file = Path(tmpdir) / "test.txt"
- test_file.write_text("content")
- repo = Repository(Path(tmpdir))
- store = FingerprintStore(repo)
- store.add_fingerprint("work123", str(test_file), {})
- assert len(store.index) == 1
- removed = store.remove_fingerprint(str(test_file))
- assert removed is True
- assert len(store.index) == 0
- def test_remove_nonexistent_fingerprint(self):
- """Test removing a non-existent fingerprint."""
- with tempfile.TemporaryDirectory() as tmpdir:
- test_file = Path(tmpdir) / "test.txt"
- test_file.write_text("content")
- repo = Repository(Path(tmpdir))
- store = FingerprintStore(repo)
- removed = store.remove_fingerprint(str(test_file))
- assert removed is False
- def test_clear(self):
- """Test clearing all fingerprints."""
- with tempfile.TemporaryDirectory() as tmpdir:
- repo = Repository(Path(tmpdir))
- store = FingerprintStore(repo)
- # Add some fingerprints
- for i in range(3):
- test_file = Path(tmpdir) / f"file{i}.txt"
- test_file.write_text(f"content {i}")
- store.add_fingerprint(f"work{i}", str(test_file), {})
- assert len(store.index) == 3
- store.clear()
- assert len(store.index) == 0
- def test_get_stats(self):
- """Test getting store statistics."""
- with tempfile.TemporaryDirectory() as tmpdir:
- repo = Repository(Path(tmpdir))
- store = FingerprintStore(repo)
- # Add fingerprints
- for i in range(5):
- test_file = Path(tmpdir) / f"file{i}.txt"
- test_file.write_text(f"content {i}")
- work_id = "work1" if i < 3 else "work2"
- store.add_fingerprint(work_id, str(test_file), {})
- stats = store.get_stats()
- assert stats["total_fingerprints"] == 5
- assert stats["unique_works"] == 2
- def test_persistence(self):
- """Test that index persists across store instances."""
- with tempfile.TemporaryDirectory() as tmpdir:
- test_file = Path(tmpdir) / "test.txt"
- test_file.write_text("content")
- repo = Repository(Path(tmpdir))
- # Create store and add fingerprint
- store1 = FingerprintStore(repo)
- store1.add_fingerprint("work123", str(test_file), {})
- # Create new store instance
- store2 = FingerprintStore(repo)
- # Should have the fingerprint
- work_id = store2.check_duplicate(str(test_file))
- assert work_id == "work123"
- class TestFingerprintService:
- """Test FingerprintService."""
- def test_check_before_import_new_file(self):
- """Test checking a new file before import."""
- with tempfile.TemporaryDirectory() as tmpdir:
- test_file = Path(tmpdir) / "new.txt"
- test_file.write_text("new content")
- repo = Repository(Path(tmpdir))
- service = FingerprintService(repo)
- is_dup, work_id = service.check_before_import(str(test_file))
- assert is_dup is False
- assert work_id is None
- def test_check_before_import_duplicate(self):
- """Test checking a duplicate file before import."""
- with tempfile.TemporaryDirectory() as tmpdir:
- test_file = Path(tmpdir) / "test.txt"
- test_file.write_text("content")
- repo = Repository(Path(tmpdir))
- # Create a completed work
- test_file_path = Path(tmpdir) / "source.txt"
- test_file_path.write_text("content")
- work = repo.create_work(str(test_file_path), title="Test")
- work.status = WorkStatus.COMPLETED
- repo.update_work(work)
- service = FingerprintService(repo)
- service.register_import(work.work_id, str(test_file_path))
- # Check duplicate
- is_dup, work_id = service.check_before_import(str(test_file_path))
- assert is_dup is True
- assert work_id == work.work_id
- def test_check_duplicate_incomplete_work(self):
- """Test that incomplete works don't count as duplicates."""
- with tempfile.TemporaryDirectory() as tmpdir:
- test_file = Path(tmpdir) / "test.txt"
- test_file.write_text("content")
- repo = Repository(Path(tmpdir))
- # Create an incomplete work
- work = repo.create_work(str(test_file))
- # Status is PENDING, not COMPLETED
- service = FingerprintService(repo)
- service.register_import(work.work_id, str(test_file))
- # Should not be a duplicate
- is_dup, work_id = service.check_before_import(str(test_file))
- assert is_dup is False
- def test_register_import(self):
- """Test registering an import."""
- with tempfile.TemporaryDirectory() as tmpdir:
- test_file = Path(tmpdir) / "test.txt"
- test_file.write_text("content")
- repo = Repository(Path(tmpdir))
- service = FingerprintService(repo)
- service.register_import("work123", str(test_file))
- # Verify it's now tracked
- is_dup, work_id = service.check_before_import(str(test_file))
- # Note: won't be duplicate until work is completed
- assert is_dup is False
- # But fingerprint is in store
- fp = service.store.check_duplicate(str(test_file))
- assert fp == "work123"
- def test_register_batch_import(self):
- """Test registering multiple files."""
- with tempfile.TemporaryDirectory() as tmpdir:
- files = []
- for i in range(3):
- f = Path(tmpdir) / f"file{i}.txt"
- f.write_text(f"content {i}")
- files.append(str(f))
- repo = Repository(Path(tmpdir))
- service = FingerprintService(repo)
- service.register_batch_import("work123", files)
- stats = service.store.get_stats()
- assert stats["total_fingerprints"] == 3
- def test_get_fingerprint(self):
- """Test getting file fingerprint."""
- with tempfile.NamedTemporaryFile(delete=False) as f:
- f.write(b"known content")
- temp_path = f.name
- try:
- repo = Repository(Path(temp_path).parent)
- service = FingerprintService(repo)
- fp = service.get_fingerprint(temp_path)
- assert len(fp) == 32
- assert isinstance(fp, str)
- finally:
- Path(temp_path).unlink()
- def test_get_file_info(self):
- """Test getting comprehensive file info."""
- with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as f:
- f.write(b"test content")
- temp_path = f.name
- try:
- repo = Repository(Path(temp_path).parent)
- service = FingerprintService(repo)
- info = service.get_file_info(temp_path)
- assert "fingerprint" in info
- assert "metadata" in info
- assert "is_duplicate" in info
- assert "existing_work_id" in info
- assert len(info["fingerprint"]) == 32
- assert info["metadata"]["size"] == 12
- finally:
- Path(temp_path).unlink()
- class TestBatchFingerprintChecker:
- """Test BatchFingerprintChecker."""
- def test_check_files(self):
- """Test checking multiple files."""
- with tempfile.TemporaryDirectory() as tmpdir:
- # Create files
- file1 = Path(tmpdir) / "file1.txt"
- file2 = Path(tmpdir) / "file2.txt"
- file1.write_text("content 1")
- file2.write_text("content 2")
- repo = Repository(Path(tmpdir))
- service = FingerprintService(repo)
- checker = BatchFingerprintChecker(service)
- results = checker.check_files([str(file1), str(file2)])
- assert len(results) == 2
- assert str(file1) in results
- assert str(file2) in results
- # Both should be non-duplicate
- assert results[str(file1)] == (False, None)
- assert results[str(file2)] == (False, None)
- def test_check_files_with_duplicate(self):
- """Test checking files with one duplicate."""
- with tempfile.TemporaryDirectory() as tmpdir:
- file1 = Path(tmpdir) / "file1.txt"
- file2 = Path(tmpdir) / "file2.txt"
- file1.write_text("same")
- file2.write_text("same")
- repo = Repository(Path(tmpdir))
- service = FingerprintService(repo)
- # Register first file
- work = repo.create_work(str(file1))
- work.status = WorkStatus.COMPLETED
- repo.update_work(work)
- service.register_import(work.work_id, str(file1))
- checker = BatchFingerprintChecker(service)
- results = checker.check_files([str(file1), str(file2)])
- # file1 should be duplicate, file2 should be too (same content)
- assert results[str(file1)][0] is True
- assert results[str(file2)][0] is True
- def test_filter_new_files(self):
- """Test filtering new files."""
- with tempfile.TemporaryDirectory() as tmpdir:
- files = []
- for i in range(3):
- f = Path(tmpdir) / f"file{i}.txt"
- f.write_text(f"content {i}")
- files.append(str(f))
- repo = Repository(Path(tmpdir))
- service = FingerprintService(repo)
- checker = BatchFingerprintChecker(service)
- new_files = checker.filter_new_files(files)
- assert len(new_files) == 3
- def test_filter_new_files_with_duplicate(self):
- """Test filtering removes duplicates."""
- with tempfile.TemporaryDirectory() as tmpdir:
- file1 = Path(tmpdir) / "file1.txt"
- file2 = Path(tmpdir) / "file2.txt"
- file1.write_text("same")
- file2.write_text("different")
- repo = Repository(Path(tmpdir))
- service = FingerprintService(repo)
- # Register file1
- work = repo.create_work(str(file1))
- work.status = WorkStatus.COMPLETED
- repo.update_work(work)
- service.register_import(work.work_id, str(file1))
- checker = BatchFingerprintChecker(service)
- new_files = checker.filter_new_files([str(file1), str(file2)])
- # Only file2 should be new
- assert len(new_files) == 1
- assert str(file2) in new_files
- def test_filter_duplicate_files(self):
- """Test filtering to get only duplicates."""
- with tempfile.TemporaryDirectory() as tmpdir:
- file1 = Path(tmpdir) / "file1.txt"
- file2 = Path(tmpdir) / "file2.txt"
- file1.write_text("same content")
- file2.write_text("different")
- repo = Repository(Path(tmpdir))
- service = FingerprintService(repo)
- # Register file1
- work = repo.create_work(str(file1))
- work.status = WorkStatus.COMPLETED
- repo.update_work(work)
- service.register_import(work.work_id, str(file1))
- checker = BatchFingerprintChecker(service)
- duplicates = checker.filter_duplicate_files([str(file1), str(file2)])
- assert len(duplicates) == 1
- assert str(file1) in duplicates
- def test_categorize_files(self):
- """Test categorizing files."""
- with tempfile.TemporaryDirectory() as tmpdir:
- file1 = Path(tmpdir) / "file1.txt"
- file2 = Path(tmpdir) / "file2.txt"
- file3 = Path(tmpdir) / "nonexistent.txt"
- file1.write_text("same")
- file2.write_text("different")
- repo = Repository(Path(tmpdir))
- service = FingerprintService(repo)
- # Register file1
- work = repo.create_work(str(file1))
- work.status = WorkStatus.COMPLETED
- repo.update_work(work)
- service.register_import(work.work_id, str(file1))
- checker = BatchFingerprintChecker(service)
- result = checker.categorize_files([str(file1), str(file2), str(file3)])
- assert len(result["duplicate"]) == 1
- assert len(result["new"]) == 1
- assert len(result["error"]) == 1
- assert str(file1) in result["duplicate"]
- assert str(file2) in result["new"]
- assert str(file3) in result["error"]
- def test_get_summary(self):
- """Test getting summary statistics."""
- with tempfile.TemporaryDirectory() as tmpdir:
- files = []
- for i in range(5):
- f = Path(tmpdir) / f"file{i}.txt"
- f.write_text(f"content {i}")
- files.append(str(f))
- # Add one non-existent file
- files.append("/nonexistent/file.txt")
- repo = Repository(Path(tmpdir))
- service = FingerprintService(repo)
- checker = BatchFingerprintChecker(service)
- summary = checker.get_summary(files)
- assert summary["total"] == 6
- assert summary["new"] == 5 # All existing files are new
- assert summary["duplicate"] == 0
- assert summary["error"] == 1 # Non-existent file
- class TestIntegration:
- """Integration tests for fingerprint module."""
- def test_full_duplicate_detection_workflow(self):
- """Test complete duplicate detection workflow."""
- with tempfile.TemporaryDirectory() as tmpdir:
- # Original file
- original = Path(tmpdir) / "novel.txt"
- original.write_text("This is a novel content.")
- repo = Repository(Path(tmpdir))
- service = FingerprintService(repo)
- # Import original file
- work = repo.create_work(str(original), title="My Novel")
- service.register_import(work.work_id, str(original))
- # Mark as completed
- work.status = WorkStatus.COMPLETED
- repo.update_work(work)
- # Try to import duplicate (copy with same content)
- copy = Path(tmpdir) / "novel_copy.txt"
- copy.write_text("This is a novel content.")
- is_dup, existing_work_id = service.check_before_import(str(copy))
- assert is_dup is True
- assert existing_work_id == work.work_id
- def test_batch_import_with_duplicates(self):
- """Test batch import workflow with duplicates."""
- with tempfile.TemporaryDirectory() as tmpdir:
- # Create files with some duplicates
- content_sets = [
- ("unique1.txt", "content 1"),
- ("unique2.txt", "content 2"),
- ("unique3.txt", "content 3"), # Will be duplicated
- ("copy3.txt", "content 3"), # Duplicate of unique3
- ("unique4.txt", "content 4"),
- ]
- files = []
- for name, content in content_sets:
- f = Path(tmpdir) / name
- f.write_text(content)
- files.append(str(f))
- repo = Repository(Path(tmpdir))
- service = FingerprintService(repo)
- checker = BatchFingerprintChecker(service)
- # First batch - import unique1-3
- first_batch = files[:3]
- for file_path in first_batch:
- work = repo.create_work(file_path)
- work.status = WorkStatus.COMPLETED
- repo.update_work(work)
- service.register_import(work.work_id, file_path)
- # Check second batch
- summary = checker.get_summary(files)
- assert summary["total"] == 5
- # 1 duplicate (copy3), 4 new (unique1, unique2, unique4, copy3 detected as dup)
- assert summary["duplicate"] >= 1
- def test_fingerprint_survives_repository_restart(self):
- """Test that fingerprints persist across repository restarts."""
- with tempfile.TemporaryDirectory() as tmpdir:
- test_file = Path(tmpdir) / "test.txt"
- test_file.write_text("persistent content")
- storage_dir = Path(tmpdir) / "storage"
- # First session
- repo1 = Repository(storage_dir)
- service1 = FingerprintService(repo1)
- work1 = repo1.create_work(str(test_file))
- work1.status = WorkStatus.COMPLETED
- repo1.update_work(work1)
- service1.register_import(work1.work_id, str(test_file))
- # Second session (new instances)
- repo2 = Repository(storage_dir)
- service2 = FingerprintService(repo2)
- is_dup, work_id = service2.check_before_import(str(test_file))
- assert is_dup is True
- assert work_id == work1.work_id
|