test_fingerprint.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724
  1. """
  2. Unit tests for the fingerprint module.
  3. Tests cover FileFingerprint, FingerprintStore, FingerprintService,
  4. and BatchFingerprintChecker functionality.
  5. """
  6. import tempfile
  7. from pathlib import Path
  8. import pytest
  9. from src.fingerprint.calculator import FileFingerprint
  10. from src.fingerprint.store import FingerprintStore
  11. from src.fingerprint.service import FingerprintService
  12. from src.fingerprint.batch import BatchFingerprintChecker
  13. from src.repository import Repository
  14. from src.repository.models import WorkItem, WorkStatus
  15. class TestFileFingerprint:
  16. """Test FileFingerprint calculator."""
  17. def test_calculate_md5(self):
  18. """Test MD5 calculation."""
  19. with tempfile.NamedTemporaryFile(delete=False) as f:
  20. f.write(b"Hello, World!")
  21. temp_path = f.name
  22. try:
  23. calc = FileFingerprint()
  24. md5 = calc.calculate_md5(temp_path)
  25. # Known MD5 for "Hello, World!"
  26. assert md5 == "65a8e27d8879283831b664bd8b7f0ad4"
  27. assert len(md5) == 32
  28. finally:
  29. Path(temp_path).unlink()
  30. def test_calculate_md5_different_content(self):
  31. """Test that different content produces different hashes."""
  32. with tempfile.TemporaryDirectory() as tmpdir:
  33. file1 = Path(tmpdir) / "file1.txt"
  34. file2 = Path(tmpdir) / "file2.txt"
  35. file1.write_text("content one")
  36. file2.write_text("content two")
  37. calc = FileFingerprint()
  38. md5_1 = calc.calculate_md5(str(file1))
  39. md5_2 = calc.calculate_md5(str(file2))
  40. assert md5_1 != md5_2
  41. def test_calculate_md5_same_content(self):
  42. """Test that same content produces same hash."""
  43. with tempfile.TemporaryDirectory() as tmpdir:
  44. file1 = Path(tmpdir) / "file1.txt"
  45. file2 = Path(tmpdir) / "file2.txt"
  46. content = "identical content"
  47. file1.write_text(content)
  48. file2.write_text(content)
  49. calc = FileFingerprint()
  50. md5_1 = calc.calculate_md5(str(file1))
  51. md5_2 = calc.calculate_md5(str(file2))
  52. assert md5_1 == md5_2
  53. def test_calculate_md5_large_file(self):
  54. """Test MD5 calculation for larger files."""
  55. with tempfile.NamedTemporaryFile(delete=False) as f:
  56. # Write 100KB of data
  57. f.write(b"x" * 100_000)
  58. temp_path = f.name
  59. try:
  60. calc = FileFingerprint()
  61. md5 = calc.calculate_md5(temp_path)
  62. assert len(md5) == 32
  63. finally:
  64. Path(temp_path).unlink()
  65. def test_calculate_quick_hash(self):
  66. """Test quick hash calculation."""
  67. with tempfile.NamedTemporaryFile(delete=False) as f:
  68. f.write(b"Hello, World!")
  69. temp_path = f.name
  70. try:
  71. calc = FileFingerprint()
  72. quick = calc.calculate_quick_hash(temp_path, sample_size=5)
  73. # Hash of first 5 bytes "Hello" is different from full hash
  74. # MD5 of "Hello" is 8b1a9953c4611296a827abf8c47804d7
  75. assert quick == "8b1a9953c4611296a827abf8c47804d7"
  76. assert len(quick) == 32
  77. # Quick hash should differ from full hash
  78. full_hash = calc.calculate_md5(temp_path)
  79. assert quick != full_hash
  80. finally:
  81. Path(temp_path).unlink()
  82. def test_get_file_size(self):
  83. """Test getting file size."""
  84. with tempfile.NamedTemporaryFile(delete=False) as f:
  85. content = b"Test content for size"
  86. f.write(content)
  87. temp_path = f.name
  88. try:
  89. calc = FileFingerprint()
  90. size = calc.get_file_size(temp_path)
  91. assert size == len(content)
  92. finally:
  93. Path(temp_path).unlink()
  94. def test_get_file_meta(self):
  95. """Test getting file metadata."""
  96. with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as f:
  97. f.write(b"content")
  98. temp_path = f.name
  99. try:
  100. calc = FileFingerprint()
  101. meta = calc.get_file_meta(temp_path)
  102. assert "name" in meta
  103. assert "size" in meta
  104. assert "modified_time" in meta
  105. assert meta["size"] == 7
  106. assert meta["name"].endswith(".txt")
  107. finally:
  108. Path(temp_path).unlink()
  109. def test_file_not_found(self):
  110. """Test FileNotFoundError for non-existent file."""
  111. calc = FileFingerprint()
  112. with pytest.raises(FileNotFoundError):
  113. calc.calculate_md5("/nonexistent/file.txt")
  114. with pytest.raises(FileNotFoundError):
  115. calc.calculate_quick_hash("/nonexistent/file.txt")
  116. with pytest.raises(FileNotFoundError):
  117. calc.get_file_size("/nonexistent/file.txt")
  118. with pytest.raises(FileNotFoundError):
  119. calc.get_file_meta("/nonexistent/file.txt")
  120. class TestFingerprintStore:
  121. """Test FingerprintStore."""
  122. def test_init_creates_index(self):
  123. """Test that initialization creates an empty index."""
  124. with tempfile.TemporaryDirectory() as tmpdir:
  125. repo = Repository(Path(tmpdir))
  126. store = FingerprintStore(repo)
  127. assert isinstance(store.index, dict)
  128. assert len(store.index) == 0
  129. def test_load_existing_index(self):
  130. """Test loading an existing index."""
  131. with tempfile.TemporaryDirectory() as tmpdir:
  132. repo = Repository(Path(tmpdir))
  133. storage_dir = Path(tmpdir)
  134. # Create a pre-existing index
  135. index_file = storage_dir / "fingerprints.json"
  136. index_file.write_text('{"abc123": {"work_id": "work1"}}')
  137. store = FingerprintStore(repo)
  138. assert "abc123" in store.index
  139. def test_add_fingerprint(self):
  140. """Test adding a fingerprint."""
  141. with tempfile.TemporaryDirectory() as tmpdir:
  142. # Create test file
  143. test_file = Path(tmpdir) / "test.txt"
  144. test_file.write_text("test content")
  145. repo = Repository(Path(tmpdir))
  146. store = FingerprintStore(repo)
  147. store.add_fingerprint("work123", str(test_file), {"name": "test.txt"})
  148. assert len(store.index) == 1
  149. # Get the fingerprint
  150. from src.fingerprint.calculator import FileFingerprint
  151. calc = FileFingerprint()
  152. fp = calc.calculate_md5(str(test_file))
  153. assert fp in store.index
  154. def test_check_duplicate(self):
  155. """Test checking for duplicates."""
  156. with tempfile.TemporaryDirectory() as tmpdir:
  157. # Create test file
  158. test_file = Path(tmpdir) / "test.txt"
  159. test_file.write_text("test content")
  160. repo = Repository(Path(tmpdir))
  161. store = FingerprintStore(repo)
  162. # Not duplicate initially
  163. work_id = store.check_duplicate(str(test_file))
  164. assert work_id is None
  165. # Add fingerprint
  166. store.add_fingerprint("work123", str(test_file), {})
  167. # Now it's a duplicate
  168. work_id = store.check_duplicate(str(test_file))
  169. assert work_id == "work123"
  170. def test_check_duplicate_copy(self):
  171. """Test that file copies are detected as duplicates."""
  172. with tempfile.TemporaryDirectory() as tmpdir:
  173. # Create original and copy
  174. file1 = Path(tmpdir) / "original.txt"
  175. file2 = Path(tmpdir) / "copy.txt"
  176. content = "same content"
  177. file1.write_text(content)
  178. file2.write_text(content)
  179. repo = Repository(Path(tmpdir))
  180. store = FingerprintStore(repo)
  181. # Register first file
  182. store.add_fingerprint("work123", str(file1), {})
  183. # Check second file
  184. work_id = store.check_duplicate(str(file2))
  185. assert work_id == "work123"
  186. def test_get_work_history(self):
  187. """Test getting fingerprint history for a work."""
  188. with tempfile.TemporaryDirectory() as tmpdir:
  189. repo = Repository(Path(tmpdir))
  190. store = FingerprintStore(repo)
  191. # Add multiple files for same work
  192. for i in range(3):
  193. test_file = Path(tmpdir) / f"file{i}.txt"
  194. test_file.write_text(f"content {i}")
  195. store.add_fingerprint("work123", str(test_file), {"index": i})
  196. history = store.get_work_history("work123")
  197. assert len(history) == 3
  198. def test_remove_fingerprint(self):
  199. """Test removing a fingerprint."""
  200. with tempfile.TemporaryDirectory() as tmpdir:
  201. test_file = Path(tmpdir) / "test.txt"
  202. test_file.write_text("content")
  203. repo = Repository(Path(tmpdir))
  204. store = FingerprintStore(repo)
  205. store.add_fingerprint("work123", str(test_file), {})
  206. assert len(store.index) == 1
  207. removed = store.remove_fingerprint(str(test_file))
  208. assert removed is True
  209. assert len(store.index) == 0
  210. def test_remove_nonexistent_fingerprint(self):
  211. """Test removing a non-existent fingerprint."""
  212. with tempfile.TemporaryDirectory() as tmpdir:
  213. test_file = Path(tmpdir) / "test.txt"
  214. test_file.write_text("content")
  215. repo = Repository(Path(tmpdir))
  216. store = FingerprintStore(repo)
  217. removed = store.remove_fingerprint(str(test_file))
  218. assert removed is False
  219. def test_clear(self):
  220. """Test clearing all fingerprints."""
  221. with tempfile.TemporaryDirectory() as tmpdir:
  222. repo = Repository(Path(tmpdir))
  223. store = FingerprintStore(repo)
  224. # Add some fingerprints
  225. for i in range(3):
  226. test_file = Path(tmpdir) / f"file{i}.txt"
  227. test_file.write_text(f"content {i}")
  228. store.add_fingerprint(f"work{i}", str(test_file), {})
  229. assert len(store.index) == 3
  230. store.clear()
  231. assert len(store.index) == 0
  232. def test_get_stats(self):
  233. """Test getting store statistics."""
  234. with tempfile.TemporaryDirectory() as tmpdir:
  235. repo = Repository(Path(tmpdir))
  236. store = FingerprintStore(repo)
  237. # Add fingerprints
  238. for i in range(5):
  239. test_file = Path(tmpdir) / f"file{i}.txt"
  240. test_file.write_text(f"content {i}")
  241. work_id = "work1" if i < 3 else "work2"
  242. store.add_fingerprint(work_id, str(test_file), {})
  243. stats = store.get_stats()
  244. assert stats["total_fingerprints"] == 5
  245. assert stats["unique_works"] == 2
  246. def test_persistence(self):
  247. """Test that index persists across store instances."""
  248. with tempfile.TemporaryDirectory() as tmpdir:
  249. test_file = Path(tmpdir) / "test.txt"
  250. test_file.write_text("content")
  251. repo = Repository(Path(tmpdir))
  252. # Create store and add fingerprint
  253. store1 = FingerprintStore(repo)
  254. store1.add_fingerprint("work123", str(test_file), {})
  255. # Create new store instance
  256. store2 = FingerprintStore(repo)
  257. # Should have the fingerprint
  258. work_id = store2.check_duplicate(str(test_file))
  259. assert work_id == "work123"
  260. class TestFingerprintService:
  261. """Test FingerprintService."""
  262. def test_check_before_import_new_file(self):
  263. """Test checking a new file before import."""
  264. with tempfile.TemporaryDirectory() as tmpdir:
  265. test_file = Path(tmpdir) / "new.txt"
  266. test_file.write_text("new content")
  267. repo = Repository(Path(tmpdir))
  268. service = FingerprintService(repo)
  269. is_dup, work_id = service.check_before_import(str(test_file))
  270. assert is_dup is False
  271. assert work_id is None
  272. def test_check_before_import_duplicate(self):
  273. """Test checking a duplicate file before import."""
  274. with tempfile.TemporaryDirectory() as tmpdir:
  275. test_file = Path(tmpdir) / "test.txt"
  276. test_file.write_text("content")
  277. repo = Repository(Path(tmpdir))
  278. # Create a completed work
  279. test_file_path = Path(tmpdir) / "source.txt"
  280. test_file_path.write_text("content")
  281. work = repo.create_work(str(test_file_path), title="Test")
  282. work.status = WorkStatus.COMPLETED
  283. repo.update_work(work)
  284. service = FingerprintService(repo)
  285. service.register_import(work.work_id, str(test_file_path))
  286. # Check duplicate
  287. is_dup, work_id = service.check_before_import(str(test_file_path))
  288. assert is_dup is True
  289. assert work_id == work.work_id
  290. def test_check_duplicate_incomplete_work(self):
  291. """Test that incomplete works don't count as duplicates."""
  292. with tempfile.TemporaryDirectory() as tmpdir:
  293. test_file = Path(tmpdir) / "test.txt"
  294. test_file.write_text("content")
  295. repo = Repository(Path(tmpdir))
  296. # Create an incomplete work
  297. work = repo.create_work(str(test_file))
  298. # Status is PENDING, not COMPLETED
  299. service = FingerprintService(repo)
  300. service.register_import(work.work_id, str(test_file))
  301. # Should not be a duplicate
  302. is_dup, work_id = service.check_before_import(str(test_file))
  303. assert is_dup is False
  304. def test_register_import(self):
  305. """Test registering an import."""
  306. with tempfile.TemporaryDirectory() as tmpdir:
  307. test_file = Path(tmpdir) / "test.txt"
  308. test_file.write_text("content")
  309. repo = Repository(Path(tmpdir))
  310. service = FingerprintService(repo)
  311. service.register_import("work123", str(test_file))
  312. # Verify it's now tracked
  313. is_dup, work_id = service.check_before_import(str(test_file))
  314. # Note: won't be duplicate until work is completed
  315. assert is_dup is False
  316. # But fingerprint is in store
  317. fp = service.store.check_duplicate(str(test_file))
  318. assert fp == "work123"
  319. def test_register_batch_import(self):
  320. """Test registering multiple files."""
  321. with tempfile.TemporaryDirectory() as tmpdir:
  322. files = []
  323. for i in range(3):
  324. f = Path(tmpdir) / f"file{i}.txt"
  325. f.write_text(f"content {i}")
  326. files.append(str(f))
  327. repo = Repository(Path(tmpdir))
  328. service = FingerprintService(repo)
  329. service.register_batch_import("work123", files)
  330. stats = service.store.get_stats()
  331. assert stats["total_fingerprints"] == 3
  332. def test_get_fingerprint(self):
  333. """Test getting file fingerprint."""
  334. with tempfile.NamedTemporaryFile(delete=False) as f:
  335. f.write(b"known content")
  336. temp_path = f.name
  337. try:
  338. repo = Repository(Path(temp_path).parent)
  339. service = FingerprintService(repo)
  340. fp = service.get_fingerprint(temp_path)
  341. assert len(fp) == 32
  342. assert isinstance(fp, str)
  343. finally:
  344. Path(temp_path).unlink()
  345. def test_get_file_info(self):
  346. """Test getting comprehensive file info."""
  347. with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as f:
  348. f.write(b"test content")
  349. temp_path = f.name
  350. try:
  351. repo = Repository(Path(temp_path).parent)
  352. service = FingerprintService(repo)
  353. info = service.get_file_info(temp_path)
  354. assert "fingerprint" in info
  355. assert "metadata" in info
  356. assert "is_duplicate" in info
  357. assert "existing_work_id" in info
  358. assert len(info["fingerprint"]) == 32
  359. assert info["metadata"]["size"] == 12
  360. finally:
  361. Path(temp_path).unlink()
  362. class TestBatchFingerprintChecker:
  363. """Test BatchFingerprintChecker."""
  364. def test_check_files(self):
  365. """Test checking multiple files."""
  366. with tempfile.TemporaryDirectory() as tmpdir:
  367. # Create files
  368. file1 = Path(tmpdir) / "file1.txt"
  369. file2 = Path(tmpdir) / "file2.txt"
  370. file1.write_text("content 1")
  371. file2.write_text("content 2")
  372. repo = Repository(Path(tmpdir))
  373. service = FingerprintService(repo)
  374. checker = BatchFingerprintChecker(service)
  375. results = checker.check_files([str(file1), str(file2)])
  376. assert len(results) == 2
  377. assert str(file1) in results
  378. assert str(file2) in results
  379. # Both should be non-duplicate
  380. assert results[str(file1)] == (False, None)
  381. assert results[str(file2)] == (False, None)
  382. def test_check_files_with_duplicate(self):
  383. """Test checking files with one duplicate."""
  384. with tempfile.TemporaryDirectory() as tmpdir:
  385. file1 = Path(tmpdir) / "file1.txt"
  386. file2 = Path(tmpdir) / "file2.txt"
  387. file1.write_text("same")
  388. file2.write_text("same")
  389. repo = Repository(Path(tmpdir))
  390. service = FingerprintService(repo)
  391. # Register first file
  392. work = repo.create_work(str(file1))
  393. work.status = WorkStatus.COMPLETED
  394. repo.update_work(work)
  395. service.register_import(work.work_id, str(file1))
  396. checker = BatchFingerprintChecker(service)
  397. results = checker.check_files([str(file1), str(file2)])
  398. # file1 should be duplicate, file2 should be too (same content)
  399. assert results[str(file1)][0] is True
  400. assert results[str(file2)][0] is True
  401. def test_filter_new_files(self):
  402. """Test filtering new files."""
  403. with tempfile.TemporaryDirectory() as tmpdir:
  404. files = []
  405. for i in range(3):
  406. f = Path(tmpdir) / f"file{i}.txt"
  407. f.write_text(f"content {i}")
  408. files.append(str(f))
  409. repo = Repository(Path(tmpdir))
  410. service = FingerprintService(repo)
  411. checker = BatchFingerprintChecker(service)
  412. new_files = checker.filter_new_files(files)
  413. assert len(new_files) == 3
  414. def test_filter_new_files_with_duplicate(self):
  415. """Test filtering removes duplicates."""
  416. with tempfile.TemporaryDirectory() as tmpdir:
  417. file1 = Path(tmpdir) / "file1.txt"
  418. file2 = Path(tmpdir) / "file2.txt"
  419. file1.write_text("same")
  420. file2.write_text("different")
  421. repo = Repository(Path(tmpdir))
  422. service = FingerprintService(repo)
  423. # Register file1
  424. work = repo.create_work(str(file1))
  425. work.status = WorkStatus.COMPLETED
  426. repo.update_work(work)
  427. service.register_import(work.work_id, str(file1))
  428. checker = BatchFingerprintChecker(service)
  429. new_files = checker.filter_new_files([str(file1), str(file2)])
  430. # Only file2 should be new
  431. assert len(new_files) == 1
  432. assert str(file2) in new_files
  433. def test_filter_duplicate_files(self):
  434. """Test filtering to get only duplicates."""
  435. with tempfile.TemporaryDirectory() as tmpdir:
  436. file1 = Path(tmpdir) / "file1.txt"
  437. file2 = Path(tmpdir) / "file2.txt"
  438. file1.write_text("same content")
  439. file2.write_text("different")
  440. repo = Repository(Path(tmpdir))
  441. service = FingerprintService(repo)
  442. # Register file1
  443. work = repo.create_work(str(file1))
  444. work.status = WorkStatus.COMPLETED
  445. repo.update_work(work)
  446. service.register_import(work.work_id, str(file1))
  447. checker = BatchFingerprintChecker(service)
  448. duplicates = checker.filter_duplicate_files([str(file1), str(file2)])
  449. assert len(duplicates) == 1
  450. assert str(file1) in duplicates
  451. def test_categorize_files(self):
  452. """Test categorizing files."""
  453. with tempfile.TemporaryDirectory() as tmpdir:
  454. file1 = Path(tmpdir) / "file1.txt"
  455. file2 = Path(tmpdir) / "file2.txt"
  456. file3 = Path(tmpdir) / "nonexistent.txt"
  457. file1.write_text("same")
  458. file2.write_text("different")
  459. repo = Repository(Path(tmpdir))
  460. service = FingerprintService(repo)
  461. # Register file1
  462. work = repo.create_work(str(file1))
  463. work.status = WorkStatus.COMPLETED
  464. repo.update_work(work)
  465. service.register_import(work.work_id, str(file1))
  466. checker = BatchFingerprintChecker(service)
  467. result = checker.categorize_files([str(file1), str(file2), str(file3)])
  468. assert len(result["duplicate"]) == 1
  469. assert len(result["new"]) == 1
  470. assert len(result["error"]) == 1
  471. assert str(file1) in result["duplicate"]
  472. assert str(file2) in result["new"]
  473. assert str(file3) in result["error"]
  474. def test_get_summary(self):
  475. """Test getting summary statistics."""
  476. with tempfile.TemporaryDirectory() as tmpdir:
  477. files = []
  478. for i in range(5):
  479. f = Path(tmpdir) / f"file{i}.txt"
  480. f.write_text(f"content {i}")
  481. files.append(str(f))
  482. # Add one non-existent file
  483. files.append("/nonexistent/file.txt")
  484. repo = Repository(Path(tmpdir))
  485. service = FingerprintService(repo)
  486. checker = BatchFingerprintChecker(service)
  487. summary = checker.get_summary(files)
  488. assert summary["total"] == 6
  489. assert summary["new"] == 5 # All existing files are new
  490. assert summary["duplicate"] == 0
  491. assert summary["error"] == 1 # Non-existent file
  492. class TestIntegration:
  493. """Integration tests for fingerprint module."""
  494. def test_full_duplicate_detection_workflow(self):
  495. """Test complete duplicate detection workflow."""
  496. with tempfile.TemporaryDirectory() as tmpdir:
  497. # Original file
  498. original = Path(tmpdir) / "novel.txt"
  499. original.write_text("This is a novel content.")
  500. repo = Repository(Path(tmpdir))
  501. service = FingerprintService(repo)
  502. # Import original file
  503. work = repo.create_work(str(original), title="My Novel")
  504. service.register_import(work.work_id, str(original))
  505. # Mark as completed
  506. work.status = WorkStatus.COMPLETED
  507. repo.update_work(work)
  508. # Try to import duplicate (copy with same content)
  509. copy = Path(tmpdir) / "novel_copy.txt"
  510. copy.write_text("This is a novel content.")
  511. is_dup, existing_work_id = service.check_before_import(str(copy))
  512. assert is_dup is True
  513. assert existing_work_id == work.work_id
  514. def test_batch_import_with_duplicates(self):
  515. """Test batch import workflow with duplicates."""
  516. with tempfile.TemporaryDirectory() as tmpdir:
  517. # Create files with some duplicates
  518. content_sets = [
  519. ("unique1.txt", "content 1"),
  520. ("unique2.txt", "content 2"),
  521. ("unique3.txt", "content 3"), # Will be duplicated
  522. ("copy3.txt", "content 3"), # Duplicate of unique3
  523. ("unique4.txt", "content 4"),
  524. ]
  525. files = []
  526. for name, content in content_sets:
  527. f = Path(tmpdir) / name
  528. f.write_text(content)
  529. files.append(str(f))
  530. repo = Repository(Path(tmpdir))
  531. service = FingerprintService(repo)
  532. checker = BatchFingerprintChecker(service)
  533. # First batch - import unique1-3
  534. first_batch = files[:3]
  535. for file_path in first_batch:
  536. work = repo.create_work(file_path)
  537. work.status = WorkStatus.COMPLETED
  538. repo.update_work(work)
  539. service.register_import(work.work_id, file_path)
  540. # Check second batch
  541. summary = checker.get_summary(files)
  542. assert summary["total"] == 5
  543. # 1 duplicate (copy3), 4 new (unique1, unique2, unique4, copy3 detected as dup)
  544. assert summary["duplicate"] >= 1
  545. def test_fingerprint_survives_repository_restart(self):
  546. """Test that fingerprints persist across repository restarts."""
  547. with tempfile.TemporaryDirectory() as tmpdir:
  548. test_file = Path(tmpdir) / "test.txt"
  549. test_file.write_text("persistent content")
  550. storage_dir = Path(tmpdir) / "storage"
  551. # First session
  552. repo1 = Repository(storage_dir)
  553. service1 = FingerprintService(repo1)
  554. work1 = repo1.create_work(str(test_file))
  555. work1.status = WorkStatus.COMPLETED
  556. repo1.update_work(work1)
  557. service1.register_import(work1.work_id, str(test_file))
  558. # Second session (new instances)
  559. repo2 = Repository(storage_dir)
  560. service2 = FingerprintService(repo2)
  561. is_dup, work_id = service2.check_before_import(str(test_file))
  562. assert is_dup is True
  563. assert work_id == work1.work_id