|
|
@@ -0,0 +1,724 @@
|
|
|
+"""
|
|
|
+Unit tests for the fingerprint module.
|
|
|
+
|
|
|
+Tests cover FileFingerprint, FingerprintStore, FingerprintService,
|
|
|
+and BatchFingerprintChecker functionality.
|
|
|
+"""
|
|
|
+
|
|
|
+import tempfile
|
|
|
+from pathlib import Path
|
|
|
+
|
|
|
+import pytest
|
|
|
+
|
|
|
+from src.fingerprint.calculator import FileFingerprint
|
|
|
+from src.fingerprint.store import FingerprintStore
|
|
|
+from src.fingerprint.service import FingerprintService
|
|
|
+from src.fingerprint.batch import BatchFingerprintChecker
|
|
|
+from src.repository import Repository
|
|
|
+from src.repository.models import WorkItem, WorkStatus
|
|
|
+
|
|
|
+
|
|
|
+class TestFileFingerprint:
|
|
|
+ """Test FileFingerprint calculator."""
|
|
|
+
|
|
|
+ def test_calculate_md5(self):
|
|
|
+ """Test MD5 calculation."""
|
|
|
+ with tempfile.NamedTemporaryFile(delete=False) as f:
|
|
|
+ f.write(b"Hello, World!")
|
|
|
+ temp_path = f.name
|
|
|
+
|
|
|
+ try:
|
|
|
+ calc = FileFingerprint()
|
|
|
+ md5 = calc.calculate_md5(temp_path)
|
|
|
+
|
|
|
+ # Known MD5 for "Hello, World!"
|
|
|
+ assert md5 == "65a8e27d8879283831b664bd8b7f0ad4"
|
|
|
+ assert len(md5) == 32
|
|
|
+ finally:
|
|
|
+ Path(temp_path).unlink()
|
|
|
+
|
|
|
+ def test_calculate_md5_different_content(self):
|
|
|
+ """Test that different content produces different hashes."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ file1 = Path(tmpdir) / "file1.txt"
|
|
|
+ file2 = Path(tmpdir) / "file2.txt"
|
|
|
+
|
|
|
+ file1.write_text("content one")
|
|
|
+ file2.write_text("content two")
|
|
|
+
|
|
|
+ calc = FileFingerprint()
|
|
|
+ md5_1 = calc.calculate_md5(str(file1))
|
|
|
+ md5_2 = calc.calculate_md5(str(file2))
|
|
|
+
|
|
|
+ assert md5_1 != md5_2
|
|
|
+
|
|
|
+ def test_calculate_md5_same_content(self):
|
|
|
+ """Test that same content produces same hash."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ file1 = Path(tmpdir) / "file1.txt"
|
|
|
+ file2 = Path(tmpdir) / "file2.txt"
|
|
|
+
|
|
|
+ content = "identical content"
|
|
|
+ file1.write_text(content)
|
|
|
+ file2.write_text(content)
|
|
|
+
|
|
|
+ calc = FileFingerprint()
|
|
|
+ md5_1 = calc.calculate_md5(str(file1))
|
|
|
+ md5_2 = calc.calculate_md5(str(file2))
|
|
|
+
|
|
|
+ assert md5_1 == md5_2
|
|
|
+
|
|
|
+ def test_calculate_md5_large_file(self):
|
|
|
+ """Test MD5 calculation for larger files."""
|
|
|
+ with tempfile.NamedTemporaryFile(delete=False) as f:
|
|
|
+ # Write 100KB of data
|
|
|
+ f.write(b"x" * 100_000)
|
|
|
+ temp_path = f.name
|
|
|
+
|
|
|
+ try:
|
|
|
+ calc = FileFingerprint()
|
|
|
+ md5 = calc.calculate_md5(temp_path)
|
|
|
+ assert len(md5) == 32
|
|
|
+ finally:
|
|
|
+ Path(temp_path).unlink()
|
|
|
+
|
|
|
+ def test_calculate_quick_hash(self):
|
|
|
+ """Test quick hash calculation."""
|
|
|
+ with tempfile.NamedTemporaryFile(delete=False) as f:
|
|
|
+ f.write(b"Hello, World!")
|
|
|
+ temp_path = f.name
|
|
|
+
|
|
|
+ try:
|
|
|
+ calc = FileFingerprint()
|
|
|
+ quick = calc.calculate_quick_hash(temp_path, sample_size=5)
|
|
|
+
|
|
|
+ # Hash of first 5 bytes "Hello" is different from full hash
|
|
|
+ # MD5 of "Hello" is 8b1a9953c4611296a827abf8c47804d7
|
|
|
+ assert quick == "8b1a9953c4611296a827abf8c47804d7"
|
|
|
+ assert len(quick) == 32
|
|
|
+ # Quick hash should differ from full hash
|
|
|
+ full_hash = calc.calculate_md5(temp_path)
|
|
|
+ assert quick != full_hash
|
|
|
+ finally:
|
|
|
+ Path(temp_path).unlink()
|
|
|
+
|
|
|
+ def test_get_file_size(self):
|
|
|
+ """Test getting file size."""
|
|
|
+ with tempfile.NamedTemporaryFile(delete=False) as f:
|
|
|
+ content = b"Test content for size"
|
|
|
+ f.write(content)
|
|
|
+ temp_path = f.name
|
|
|
+
|
|
|
+ try:
|
|
|
+ calc = FileFingerprint()
|
|
|
+ size = calc.get_file_size(temp_path)
|
|
|
+ assert size == len(content)
|
|
|
+ finally:
|
|
|
+ Path(temp_path).unlink()
|
|
|
+
|
|
|
+ def test_get_file_meta(self):
|
|
|
+ """Test getting file metadata."""
|
|
|
+ with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as f:
|
|
|
+ f.write(b"content")
|
|
|
+ temp_path = f.name
|
|
|
+
|
|
|
+ try:
|
|
|
+ calc = FileFingerprint()
|
|
|
+ meta = calc.get_file_meta(temp_path)
|
|
|
+
|
|
|
+ assert "name" in meta
|
|
|
+ assert "size" in meta
|
|
|
+ assert "modified_time" in meta
|
|
|
+ assert meta["size"] == 7
|
|
|
+ assert meta["name"].endswith(".txt")
|
|
|
+ finally:
|
|
|
+ Path(temp_path).unlink()
|
|
|
+
|
|
|
+ def test_file_not_found(self):
|
|
|
+ """Test FileNotFoundError for non-existent file."""
|
|
|
+ calc = FileFingerprint()
|
|
|
+
|
|
|
+ with pytest.raises(FileNotFoundError):
|
|
|
+ calc.calculate_md5("/nonexistent/file.txt")
|
|
|
+
|
|
|
+ with pytest.raises(FileNotFoundError):
|
|
|
+ calc.calculate_quick_hash("/nonexistent/file.txt")
|
|
|
+
|
|
|
+ with pytest.raises(FileNotFoundError):
|
|
|
+ calc.get_file_size("/nonexistent/file.txt")
|
|
|
+
|
|
|
+ with pytest.raises(FileNotFoundError):
|
|
|
+ calc.get_file_meta("/nonexistent/file.txt")
|
|
|
+
|
|
|
+
|
|
|
+class TestFingerprintStore:
|
|
|
+ """Test FingerprintStore."""
|
|
|
+
|
|
|
+ def test_init_creates_index(self):
|
|
|
+ """Test that initialization creates an empty index."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ repo = Repository(Path(tmpdir))
|
|
|
+ store = FingerprintStore(repo)
|
|
|
+
|
|
|
+ assert isinstance(store.index, dict)
|
|
|
+ assert len(store.index) == 0
|
|
|
+
|
|
|
+ def test_load_existing_index(self):
|
|
|
+ """Test loading an existing index."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ repo = Repository(Path(tmpdir))
|
|
|
+ storage_dir = Path(tmpdir)
|
|
|
+
|
|
|
+ # Create a pre-existing index
|
|
|
+ index_file = storage_dir / "fingerprints.json"
|
|
|
+ index_file.write_text('{"abc123": {"work_id": "work1"}}')
|
|
|
+
|
|
|
+ store = FingerprintStore(repo)
|
|
|
+ assert "abc123" in store.index
|
|
|
+
|
|
|
+ def test_add_fingerprint(self):
|
|
|
+ """Test adding a fingerprint."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ # Create test file
|
|
|
+ test_file = Path(tmpdir) / "test.txt"
|
|
|
+ test_file.write_text("test content")
|
|
|
+
|
|
|
+ repo = Repository(Path(tmpdir))
|
|
|
+ store = FingerprintStore(repo)
|
|
|
+
|
|
|
+ store.add_fingerprint("work123", str(test_file), {"name": "test.txt"})
|
|
|
+
|
|
|
+ assert len(store.index) == 1
|
|
|
+
|
|
|
+ # Get the fingerprint
|
|
|
+ from src.fingerprint.calculator import FileFingerprint
|
|
|
+ calc = FileFingerprint()
|
|
|
+ fp = calc.calculate_md5(str(test_file))
|
|
|
+ assert fp in store.index
|
|
|
+
|
|
|
+ def test_check_duplicate(self):
|
|
|
+ """Test checking for duplicates."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ # Create test file
|
|
|
+ test_file = Path(tmpdir) / "test.txt"
|
|
|
+ test_file.write_text("test content")
|
|
|
+
|
|
|
+ repo = Repository(Path(tmpdir))
|
|
|
+ store = FingerprintStore(repo)
|
|
|
+
|
|
|
+ # Not duplicate initially
|
|
|
+ work_id = store.check_duplicate(str(test_file))
|
|
|
+ assert work_id is None
|
|
|
+
|
|
|
+ # Add fingerprint
|
|
|
+ store.add_fingerprint("work123", str(test_file), {})
|
|
|
+
|
|
|
+ # Now it's a duplicate
|
|
|
+ work_id = store.check_duplicate(str(test_file))
|
|
|
+ assert work_id == "work123"
|
|
|
+
|
|
|
+ def test_check_duplicate_copy(self):
|
|
|
+ """Test that file copies are detected as duplicates."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ # Create original and copy
|
|
|
+ file1 = Path(tmpdir) / "original.txt"
|
|
|
+ file2 = Path(tmpdir) / "copy.txt"
|
|
|
+ content = "same content"
|
|
|
+ file1.write_text(content)
|
|
|
+ file2.write_text(content)
|
|
|
+
|
|
|
+ repo = Repository(Path(tmpdir))
|
|
|
+ store = FingerprintStore(repo)
|
|
|
+
|
|
|
+ # Register first file
|
|
|
+ store.add_fingerprint("work123", str(file1), {})
|
|
|
+
|
|
|
+ # Check second file
|
|
|
+ work_id = store.check_duplicate(str(file2))
|
|
|
+ assert work_id == "work123"
|
|
|
+
|
|
|
+ def test_get_work_history(self):
|
|
|
+ """Test getting fingerprint history for a work."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ repo = Repository(Path(tmpdir))
|
|
|
+ store = FingerprintStore(repo)
|
|
|
+
|
|
|
+ # Add multiple files for same work
|
|
|
+ for i in range(3):
|
|
|
+ test_file = Path(tmpdir) / f"file{i}.txt"
|
|
|
+ test_file.write_text(f"content {i}")
|
|
|
+ store.add_fingerprint("work123", str(test_file), {"index": i})
|
|
|
+
|
|
|
+ history = store.get_work_history("work123")
|
|
|
+ assert len(history) == 3
|
|
|
+
|
|
|
+ def test_remove_fingerprint(self):
|
|
|
+ """Test removing a fingerprint."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ test_file = Path(tmpdir) / "test.txt"
|
|
|
+ test_file.write_text("content")
|
|
|
+
|
|
|
+ repo = Repository(Path(tmpdir))
|
|
|
+ store = FingerprintStore(repo)
|
|
|
+
|
|
|
+ store.add_fingerprint("work123", str(test_file), {})
|
|
|
+ assert len(store.index) == 1
|
|
|
+
|
|
|
+ removed = store.remove_fingerprint(str(test_file))
|
|
|
+ assert removed is True
|
|
|
+ assert len(store.index) == 0
|
|
|
+
|
|
|
+ def test_remove_nonexistent_fingerprint(self):
|
|
|
+ """Test removing a non-existent fingerprint."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ test_file = Path(tmpdir) / "test.txt"
|
|
|
+ test_file.write_text("content")
|
|
|
+
|
|
|
+ repo = Repository(Path(tmpdir))
|
|
|
+ store = FingerprintStore(repo)
|
|
|
+
|
|
|
+ removed = store.remove_fingerprint(str(test_file))
|
|
|
+ assert removed is False
|
|
|
+
|
|
|
+ def test_clear(self):
|
|
|
+ """Test clearing all fingerprints."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ repo = Repository(Path(tmpdir))
|
|
|
+ store = FingerprintStore(repo)
|
|
|
+
|
|
|
+ # Add some fingerprints
|
|
|
+ for i in range(3):
|
|
|
+ test_file = Path(tmpdir) / f"file{i}.txt"
|
|
|
+ test_file.write_text(f"content {i}")
|
|
|
+ store.add_fingerprint(f"work{i}", str(test_file), {})
|
|
|
+
|
|
|
+ assert len(store.index) == 3
|
|
|
+
|
|
|
+ store.clear()
|
|
|
+ assert len(store.index) == 0
|
|
|
+
|
|
|
+ def test_get_stats(self):
|
|
|
+ """Test getting store statistics."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ repo = Repository(Path(tmpdir))
|
|
|
+ store = FingerprintStore(repo)
|
|
|
+
|
|
|
+ # Add fingerprints
|
|
|
+ for i in range(5):
|
|
|
+ test_file = Path(tmpdir) / f"file{i}.txt"
|
|
|
+ test_file.write_text(f"content {i}")
|
|
|
+ work_id = "work1" if i < 3 else "work2"
|
|
|
+ store.add_fingerprint(work_id, str(test_file), {})
|
|
|
+
|
|
|
+ stats = store.get_stats()
|
|
|
+ assert stats["total_fingerprints"] == 5
|
|
|
+ assert stats["unique_works"] == 2
|
|
|
+
|
|
|
+ def test_persistence(self):
|
|
|
+ """Test that index persists across store instances."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ test_file = Path(tmpdir) / "test.txt"
|
|
|
+ test_file.write_text("content")
|
|
|
+
|
|
|
+ repo = Repository(Path(tmpdir))
|
|
|
+
|
|
|
+ # Create store and add fingerprint
|
|
|
+ store1 = FingerprintStore(repo)
|
|
|
+ store1.add_fingerprint("work123", str(test_file), {})
|
|
|
+
|
|
|
+ # Create new store instance
|
|
|
+ store2 = FingerprintStore(repo)
|
|
|
+
|
|
|
+ # Should have the fingerprint
|
|
|
+ work_id = store2.check_duplicate(str(test_file))
|
|
|
+ assert work_id == "work123"
|
|
|
+
|
|
|
+
|
|
|
+class TestFingerprintService:
|
|
|
+ """Test FingerprintService."""
|
|
|
+
|
|
|
+ def test_check_before_import_new_file(self):
|
|
|
+ """Test checking a new file before import."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ test_file = Path(tmpdir) / "new.txt"
|
|
|
+ test_file.write_text("new content")
|
|
|
+
|
|
|
+ repo = Repository(Path(tmpdir))
|
|
|
+ service = FingerprintService(repo)
|
|
|
+
|
|
|
+ is_dup, work_id = service.check_before_import(str(test_file))
|
|
|
+ assert is_dup is False
|
|
|
+ assert work_id is None
|
|
|
+
|
|
|
+ def test_check_before_import_duplicate(self):
|
|
|
+ """Test checking a duplicate file before import."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ test_file = Path(tmpdir) / "test.txt"
|
|
|
+ test_file.write_text("content")
|
|
|
+
|
|
|
+ repo = Repository(Path(tmpdir))
|
|
|
+
|
|
|
+ # Create a completed work
|
|
|
+ test_file_path = Path(tmpdir) / "source.txt"
|
|
|
+ test_file_path.write_text("content")
|
|
|
+ work = repo.create_work(str(test_file_path), title="Test")
|
|
|
+ work.status = WorkStatus.COMPLETED
|
|
|
+ repo.update_work(work)
|
|
|
+
|
|
|
+ service = FingerprintService(repo)
|
|
|
+ service.register_import(work.work_id, str(test_file_path))
|
|
|
+
|
|
|
+ # Check duplicate
|
|
|
+ is_dup, work_id = service.check_before_import(str(test_file_path))
|
|
|
+ assert is_dup is True
|
|
|
+ assert work_id == work.work_id
|
|
|
+
|
|
|
+ def test_check_duplicate_incomplete_work(self):
|
|
|
+ """Test that incomplete works don't count as duplicates."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ test_file = Path(tmpdir) / "test.txt"
|
|
|
+ test_file.write_text("content")
|
|
|
+
|
|
|
+ repo = Repository(Path(tmpdir))
|
|
|
+
|
|
|
+ # Create an incomplete work
|
|
|
+ work = repo.create_work(str(test_file))
|
|
|
+ # Status is PENDING, not COMPLETED
|
|
|
+
|
|
|
+ service = FingerprintService(repo)
|
|
|
+ service.register_import(work.work_id, str(test_file))
|
|
|
+
|
|
|
+ # Should not be a duplicate
|
|
|
+ is_dup, work_id = service.check_before_import(str(test_file))
|
|
|
+ assert is_dup is False
|
|
|
+
|
|
|
+ def test_register_import(self):
|
|
|
+ """Test registering an import."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ test_file = Path(tmpdir) / "test.txt"
|
|
|
+ test_file.write_text("content")
|
|
|
+
|
|
|
+ repo = Repository(Path(tmpdir))
|
|
|
+ service = FingerprintService(repo)
|
|
|
+
|
|
|
+ service.register_import("work123", str(test_file))
|
|
|
+
|
|
|
+ # Verify it's now tracked
|
|
|
+ is_dup, work_id = service.check_before_import(str(test_file))
|
|
|
+ # Note: won't be duplicate until work is completed
|
|
|
+ assert is_dup is False
|
|
|
+
|
|
|
+ # But fingerprint is in store
|
|
|
+ fp = service.store.check_duplicate(str(test_file))
|
|
|
+ assert fp == "work123"
|
|
|
+
|
|
|
+ def test_register_batch_import(self):
|
|
|
+ """Test registering multiple files."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ files = []
|
|
|
+ for i in range(3):
|
|
|
+ f = Path(tmpdir) / f"file{i}.txt"
|
|
|
+ f.write_text(f"content {i}")
|
|
|
+ files.append(str(f))
|
|
|
+
|
|
|
+ repo = Repository(Path(tmpdir))
|
|
|
+ service = FingerprintService(repo)
|
|
|
+
|
|
|
+ service.register_batch_import("work123", files)
|
|
|
+
|
|
|
+ stats = service.store.get_stats()
|
|
|
+ assert stats["total_fingerprints"] == 3
|
|
|
+
|
|
|
+ def test_get_fingerprint(self):
|
|
|
+ """Test getting file fingerprint."""
|
|
|
+ with tempfile.NamedTemporaryFile(delete=False) as f:
|
|
|
+ f.write(b"known content")
|
|
|
+ temp_path = f.name
|
|
|
+
|
|
|
+ try:
|
|
|
+ repo = Repository(Path(temp_path).parent)
|
|
|
+ service = FingerprintService(repo)
|
|
|
+
|
|
|
+ fp = service.get_fingerprint(temp_path)
|
|
|
+ assert len(fp) == 32
|
|
|
+ assert isinstance(fp, str)
|
|
|
+ finally:
|
|
|
+ Path(temp_path).unlink()
|
|
|
+
|
|
|
+ def test_get_file_info(self):
|
|
|
+ """Test getting comprehensive file info."""
|
|
|
+ with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as f:
|
|
|
+ f.write(b"test content")
|
|
|
+ temp_path = f.name
|
|
|
+
|
|
|
+ try:
|
|
|
+ repo = Repository(Path(temp_path).parent)
|
|
|
+ service = FingerprintService(repo)
|
|
|
+
|
|
|
+ info = service.get_file_info(temp_path)
|
|
|
+
|
|
|
+ assert "fingerprint" in info
|
|
|
+ assert "metadata" in info
|
|
|
+ assert "is_duplicate" in info
|
|
|
+ assert "existing_work_id" in info
|
|
|
+ assert len(info["fingerprint"]) == 32
|
|
|
+ assert info["metadata"]["size"] == 12
|
|
|
+ finally:
|
|
|
+ Path(temp_path).unlink()
|
|
|
+
|
|
|
+
|
|
|
+class TestBatchFingerprintChecker:
|
|
|
+ """Test BatchFingerprintChecker."""
|
|
|
+
|
|
|
+ def test_check_files(self):
|
|
|
+ """Test checking multiple files."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ # Create files
|
|
|
+ file1 = Path(tmpdir) / "file1.txt"
|
|
|
+ file2 = Path(tmpdir) / "file2.txt"
|
|
|
+ file1.write_text("content 1")
|
|
|
+ file2.write_text("content 2")
|
|
|
+
|
|
|
+ repo = Repository(Path(tmpdir))
|
|
|
+ service = FingerprintService(repo)
|
|
|
+ checker = BatchFingerprintChecker(service)
|
|
|
+
|
|
|
+ results = checker.check_files([str(file1), str(file2)])
|
|
|
+
|
|
|
+ assert len(results) == 2
|
|
|
+ assert str(file1) in results
|
|
|
+ assert str(file2) in results
|
|
|
+ # Both should be non-duplicate
|
|
|
+ assert results[str(file1)] == (False, None)
|
|
|
+ assert results[str(file2)] == (False, None)
|
|
|
+
|
|
|
+ def test_check_files_with_duplicate(self):
|
|
|
+ """Test checking files with one duplicate."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ file1 = Path(tmpdir) / "file1.txt"
|
|
|
+ file2 = Path(tmpdir) / "file2.txt"
|
|
|
+ file1.write_text("same")
|
|
|
+ file2.write_text("same")
|
|
|
+
|
|
|
+ repo = Repository(Path(tmpdir))
|
|
|
+ service = FingerprintService(repo)
|
|
|
+
|
|
|
+ # Register first file
|
|
|
+ work = repo.create_work(str(file1))
|
|
|
+ work.status = WorkStatus.COMPLETED
|
|
|
+ repo.update_work(work)
|
|
|
+ service.register_import(work.work_id, str(file1))
|
|
|
+
|
|
|
+ checker = BatchFingerprintChecker(service)
|
|
|
+ results = checker.check_files([str(file1), str(file2)])
|
|
|
+
|
|
|
+ # file1 should be duplicate, file2 should be too (same content)
|
|
|
+ assert results[str(file1)][0] is True
|
|
|
+ assert results[str(file2)][0] is True
|
|
|
+
|
|
|
+ def test_filter_new_files(self):
|
|
|
+ """Test filtering new files."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ files = []
|
|
|
+ for i in range(3):
|
|
|
+ f = Path(tmpdir) / f"file{i}.txt"
|
|
|
+ f.write_text(f"content {i}")
|
|
|
+ files.append(str(f))
|
|
|
+
|
|
|
+ repo = Repository(Path(tmpdir))
|
|
|
+ service = FingerprintService(repo)
|
|
|
+ checker = BatchFingerprintChecker(service)
|
|
|
+
|
|
|
+ new_files = checker.filter_new_files(files)
|
|
|
+ assert len(new_files) == 3
|
|
|
+
|
|
|
+ def test_filter_new_files_with_duplicate(self):
|
|
|
+ """Test filtering removes duplicates."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ file1 = Path(tmpdir) / "file1.txt"
|
|
|
+ file2 = Path(tmpdir) / "file2.txt"
|
|
|
+ file1.write_text("same")
|
|
|
+ file2.write_text("different")
|
|
|
+
|
|
|
+ repo = Repository(Path(tmpdir))
|
|
|
+ service = FingerprintService(repo)
|
|
|
+
|
|
|
+ # Register file1
|
|
|
+ work = repo.create_work(str(file1))
|
|
|
+ work.status = WorkStatus.COMPLETED
|
|
|
+ repo.update_work(work)
|
|
|
+ service.register_import(work.work_id, str(file1))
|
|
|
+
|
|
|
+ checker = BatchFingerprintChecker(service)
|
|
|
+ new_files = checker.filter_new_files([str(file1), str(file2)])
|
|
|
+
|
|
|
+ # Only file2 should be new
|
|
|
+ assert len(new_files) == 1
|
|
|
+ assert str(file2) in new_files
|
|
|
+
|
|
|
+ def test_filter_duplicate_files(self):
|
|
|
+ """Test filtering to get only duplicates."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ file1 = Path(tmpdir) / "file1.txt"
|
|
|
+ file2 = Path(tmpdir) / "file2.txt"
|
|
|
+ file1.write_text("same content")
|
|
|
+ file2.write_text("different")
|
|
|
+
|
|
|
+ repo = Repository(Path(tmpdir))
|
|
|
+ service = FingerprintService(repo)
|
|
|
+
|
|
|
+ # Register file1
|
|
|
+ work = repo.create_work(str(file1))
|
|
|
+ work.status = WorkStatus.COMPLETED
|
|
|
+ repo.update_work(work)
|
|
|
+ service.register_import(work.work_id, str(file1))
|
|
|
+
|
|
|
+ checker = BatchFingerprintChecker(service)
|
|
|
+ duplicates = checker.filter_duplicate_files([str(file1), str(file2)])
|
|
|
+
|
|
|
+ assert len(duplicates) == 1
|
|
|
+ assert str(file1) in duplicates
|
|
|
+
|
|
|
+ def test_categorize_files(self):
|
|
|
+ """Test categorizing files."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ file1 = Path(tmpdir) / "file1.txt"
|
|
|
+ file2 = Path(tmpdir) / "file2.txt"
|
|
|
+ file3 = Path(tmpdir) / "nonexistent.txt"
|
|
|
+ file1.write_text("same")
|
|
|
+ file2.write_text("different")
|
|
|
+
|
|
|
+ repo = Repository(Path(tmpdir))
|
|
|
+ service = FingerprintService(repo)
|
|
|
+
|
|
|
+ # Register file1
|
|
|
+ work = repo.create_work(str(file1))
|
|
|
+ work.status = WorkStatus.COMPLETED
|
|
|
+ repo.update_work(work)
|
|
|
+ service.register_import(work.work_id, str(file1))
|
|
|
+
|
|
|
+ checker = BatchFingerprintChecker(service)
|
|
|
+ result = checker.categorize_files([str(file1), str(file2), str(file3)])
|
|
|
+
|
|
|
+ assert len(result["duplicate"]) == 1
|
|
|
+ assert len(result["new"]) == 1
|
|
|
+ assert len(result["error"]) == 1
|
|
|
+ assert str(file1) in result["duplicate"]
|
|
|
+ assert str(file2) in result["new"]
|
|
|
+ assert str(file3) in result["error"]
|
|
|
+
|
|
|
+ def test_get_summary(self):
|
|
|
+ """Test getting summary statistics."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ files = []
|
|
|
+ for i in range(5):
|
|
|
+ f = Path(tmpdir) / f"file{i}.txt"
|
|
|
+ f.write_text(f"content {i}")
|
|
|
+ files.append(str(f))
|
|
|
+
|
|
|
+ # Add one non-existent file
|
|
|
+ files.append("/nonexistent/file.txt")
|
|
|
+
|
|
|
+ repo = Repository(Path(tmpdir))
|
|
|
+ service = FingerprintService(repo)
|
|
|
+ checker = BatchFingerprintChecker(service)
|
|
|
+
|
|
|
+ summary = checker.get_summary(files)
|
|
|
+
|
|
|
+ assert summary["total"] == 6
|
|
|
+ assert summary["new"] == 5 # All existing files are new
|
|
|
+ assert summary["duplicate"] == 0
|
|
|
+ assert summary["error"] == 1 # Non-existent file
|
|
|
+
|
|
|
+
|
|
|
+class TestIntegration:
|
|
|
+ """Integration tests for fingerprint module."""
|
|
|
+
|
|
|
+ def test_full_duplicate_detection_workflow(self):
|
|
|
+ """Test complete duplicate detection workflow."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ # Original file
|
|
|
+ original = Path(tmpdir) / "novel.txt"
|
|
|
+ original.write_text("This is a novel content.")
|
|
|
+
|
|
|
+ repo = Repository(Path(tmpdir))
|
|
|
+ service = FingerprintService(repo)
|
|
|
+
|
|
|
+ # Import original file
|
|
|
+ work = repo.create_work(str(original), title="My Novel")
|
|
|
+ service.register_import(work.work_id, str(original))
|
|
|
+
|
|
|
+ # Mark as completed
|
|
|
+ work.status = WorkStatus.COMPLETED
|
|
|
+ repo.update_work(work)
|
|
|
+
|
|
|
+ # Try to import duplicate (copy with same content)
|
|
|
+ copy = Path(tmpdir) / "novel_copy.txt"
|
|
|
+ copy.write_text("This is a novel content.")
|
|
|
+
|
|
|
+ is_dup, existing_work_id = service.check_before_import(str(copy))
|
|
|
+
|
|
|
+ assert is_dup is True
|
|
|
+ assert existing_work_id == work.work_id
|
|
|
+
|
|
|
+ def test_batch_import_with_duplicates(self):
|
|
|
+ """Test batch import workflow with duplicates."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ # Create files with some duplicates
|
|
|
+ content_sets = [
|
|
|
+ ("unique1.txt", "content 1"),
|
|
|
+ ("unique2.txt", "content 2"),
|
|
|
+ ("unique3.txt", "content 3"), # Will be duplicated
|
|
|
+ ("copy3.txt", "content 3"), # Duplicate of unique3
|
|
|
+ ("unique4.txt", "content 4"),
|
|
|
+ ]
|
|
|
+
|
|
|
+ files = []
|
|
|
+ for name, content in content_sets:
|
|
|
+ f = Path(tmpdir) / name
|
|
|
+ f.write_text(content)
|
|
|
+ files.append(str(f))
|
|
|
+
|
|
|
+ repo = Repository(Path(tmpdir))
|
|
|
+ service = FingerprintService(repo)
|
|
|
+ checker = BatchFingerprintChecker(service)
|
|
|
+
|
|
|
+ # First batch - import unique1-3
|
|
|
+ first_batch = files[:3]
|
|
|
+ for file_path in first_batch:
|
|
|
+ work = repo.create_work(file_path)
|
|
|
+ work.status = WorkStatus.COMPLETED
|
|
|
+ repo.update_work(work)
|
|
|
+ service.register_import(work.work_id, file_path)
|
|
|
+
|
|
|
+ # Check second batch
|
|
|
+ summary = checker.get_summary(files)
|
|
|
+
|
|
|
+ assert summary["total"] == 5
|
|
|
+ # 1 duplicate (copy3), 4 new (unique1, unique2, unique4, copy3 detected as dup)
|
|
|
+ assert summary["duplicate"] >= 1
|
|
|
+
|
|
|
+ def test_fingerprint_survives_repository_restart(self):
|
|
|
+ """Test that fingerprints persist across repository restarts."""
|
|
|
+ with tempfile.TemporaryDirectory() as tmpdir:
|
|
|
+ test_file = Path(tmpdir) / "test.txt"
|
|
|
+ test_file.write_text("persistent content")
|
|
|
+
|
|
|
+ storage_dir = Path(tmpdir) / "storage"
|
|
|
+
|
|
|
+ # First session
|
|
|
+ repo1 = Repository(storage_dir)
|
|
|
+ service1 = FingerprintService(repo1)
|
|
|
+
|
|
|
+ work1 = repo1.create_work(str(test_file))
|
|
|
+ work1.status = WorkStatus.COMPLETED
|
|
|
+ repo1.update_work(work1)
|
|
|
+ service1.register_import(work1.work_id, str(test_file))
|
|
|
+
|
|
|
+ # Second session (new instances)
|
|
|
+ repo2 = Repository(storage_dir)
|
|
|
+ service2 = FingerprintService(repo2)
|
|
|
+
|
|
|
+ is_dup, work_id = service2.check_before_import(str(test_file))
|
|
|
+ assert is_dup is True
|
|
|
+ assert work_id == work1.work_id
|