2
0

test_pipeline.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. """
  2. Integration tests for CleaningPipeline.
  3. """
  4. import pytest
  5. from pathlib import Path
  6. from src.cleaning.pipeline import CleaningPipeline, CleaningPipelineError
  7. from src.cleaning.reader import TxtReader
  8. from src.cleaning.cleaner import TextCleaner
  9. from src.cleaning.splitter import ChapterSplitter
  10. @pytest.fixture
  11. def sample_file(tmp_path):
  12. """Create a sample TXT file for testing."""
  13. file_path = tmp_path / "sample.txt"
  14. content = """第一章 开始
  15. 这是第一章的内容,包含一些文字。
  16. 林风站在山顶,看着远方。
  17. 第二章 继续
  18. 这是第二章的内容。
  19. 他开始了新的旅程。
  20. """
  21. file_path.write_text(content, encoding="utf-8")
  22. return file_path
  23. @pytest.fixture
  24. def pipeline():
  25. """Create a CleaningPipeline instance."""
  26. return CleaningPipeline()
  27. class TestCleaningPipeline:
  28. """Test suite for CleaningPipeline."""
  29. def test_process_basic(self, pipeline, sample_file):
  30. """Test basic file processing."""
  31. chapters = pipeline.process(sample_file)
  32. assert len(chapters) >= 2
  33. assert "第一章" in chapters[0].title
  34. def test_process_with_info(self, pipeline, sample_file):
  35. """Test processing with info return."""
  36. chapters, info = pipeline.process(sample_file, return_info=True)
  37. assert len(chapters) >= 2
  38. assert 'encoding' in info
  39. assert 'original_size' in info
  40. assert 'cleaned_size' in info
  41. def test_process_to_result(self, pipeline, sample_file):
  42. """Test processing to CleaningResult."""
  43. result = pipeline.process_to_result(sample_file)
  44. assert result.chapter_count >= 2
  45. assert result.original_char_count > 0
  46. assert result.cleaned_char_count > 0
  47. def test_removal_rate_property(self, pipeline, sample_file):
  48. """Test removal rate calculation."""
  49. result = pipeline.process_to_result(sample_file)
  50. rate = result.removal_rate
  51. assert 0.0 <= rate <= 1.0
  52. def test_read_and_clean(self, pipeline, sample_file):
  53. """Test reading and cleaning without splitting."""
  54. content = pipeline.read_and_clean(sample_file)
  55. assert isinstance(content, str)
  56. assert len(content) > 0
  57. def test_get_file_info(self, pipeline, sample_file):
  58. """Test getting file information."""
  59. info = pipeline.get_file_info(sample_file)
  60. assert info['exists'] is True
  61. assert info['is_file'] is True
  62. assert 'size' in info
  63. assert 'encoding' in info
  64. def test_get_file_info_nonexistent(self, pipeline, tmp_path):
  65. """Test getting info for non-existent file."""
  66. info = pipeline.get_file_info(tmp_path / "nonexistent.txt")
  67. assert info['exists'] is False
  68. assert info['is_file'] is False
  69. def test_custom_components(self, sample_file):
  70. """Test pipeline with custom components."""
  71. custom_reader = TxtReader(default_encoding="utf-8")
  72. custom_cleaner = TextCleaner(remove_extra_whitespace=True)
  73. custom_splitter = ChapterSplitter(min_chapter_length=10)
  74. pipeline = CleaningPipeline(
  75. reader=custom_reader,
  76. cleaner=custom_cleaner,
  77. splitter=custom_splitter
  78. )
  79. chapters = pipeline.process(sample_file)
  80. assert len(chapters) >= 2
  81. def test_disable_cleaning(self, sample_file):
  82. """Test pipeline with cleaning disabled."""
  83. pipeline = CleaningPipeline(enable_cleaning=False)
  84. chapters, info = pipeline.process(sample_file, return_info=True)
  85. assert len(chapters) >= 2
  86. assert info.get('removed_chars', 0) == 0
  87. def test_disable_splitting(self, sample_file):
  88. """Test pipeline with splitting disabled."""
  89. pipeline = CleaningPipeline(enable_splitting=False)
  90. chapters = pipeline.process(sample_file)
  91. assert len(chapters) == 1
  92. assert chapters[0].title == "全文"
  93. def test_create_custom_splitter(self, pipeline):
  94. """Test creating custom splitter."""
  95. pipeline.create_custom_splitter(
  96. min_chapter_length=50,
  97. merge_short_chapters=False
  98. )
  99. assert pipeline.splitter.min_chapter_length == 50
  100. def test_create_custom_cleaner(self, pipeline):
  101. """Test creating custom cleaner."""
  102. pipeline.create_custom_cleaner(
  103. remove_extra_whitespace=True,
  104. fix_punctuation=True
  105. )
  106. assert pipeline.cleaner.remove_extra_whitespace is True
  107. def test_batch_process(self, pipeline, tmp_path):
  108. """Test batch processing multiple files."""
  109. # Create multiple files
  110. files = []
  111. for i in range(3):
  112. file_path = tmp_path / f"file_{i}.txt"
  113. content = f"第{i+1}章\n内容{i}\n"
  114. file_path.write_text(content, encoding="utf-8")
  115. files.append(file_path)
  116. results = pipeline.batch_process(files)
  117. assert len(results) == 3
  118. for path, chapters in results:
  119. assert isinstance(chapters, list)
  120. def test_batch_process_with_errors(self, pipeline, tmp_path):
  121. """Test batch processing with some errors."""
  122. files = [
  123. tmp_path / "exists.txt",
  124. tmp_path / "nonexistent.txt"
  125. ]
  126. files[0].write_text("内容", encoding="utf-8")
  127. results = pipeline.batch_process(files, raise_on_error=False)
  128. assert len(results) == 2
  129. assert isinstance(results[0][1], list) # Success
  130. assert isinstance(results[1][1], Exception) # Error
  131. def test_batch_process_raise_on_error(self, pipeline, tmp_path):
  132. """Test batch processing raises on error."""
  133. files = [tmp_path / "nonexistent.txt"]
  134. with pytest.raises(CleaningPipelineError):
  135. pipeline.batch_process(files, raise_on_error=True)
  136. def test_process_nonexistent_file(self, pipeline):
  137. """Test processing non-existent file raises error."""
  138. with pytest.raises(CleaningPipelineError):
  139. pipeline.process("/nonexistent/file.txt")
  140. def test_process_empty_file(self, pipeline, tmp_path):
  141. """Test processing empty file."""
  142. empty_file = tmp_path / "empty.txt"
  143. empty_file.write_text("", encoding="utf-8")
  144. chapters = pipeline.process(empty_file)
  145. # Should handle gracefully - either empty list or single empty chapter
  146. assert isinstance(chapters, list)
  147. def test_result_properties(self, pipeline, sample_file):
  148. """Test CleaningResult properties."""
  149. result = pipeline.process_to_result(sample_file)
  150. # Test chapter_count property
  151. assert result.chapter_count == len(result.chapters)
  152. # Test chapters have content
  153. for chapter in result.chapters:
  154. assert hasattr(chapter, 'content')
  155. assert hasattr(chapter, 'char_count')
  156. def test_chapter_word_count_property(self, pipeline, sample_file):
  157. """Test chapter word_count property."""
  158. chapters = pipeline.process(sample_file)
  159. for chapter in chapters:
  160. assert chapter.word_count >= 0
  161. def test_chapter_len_operator(self, pipeline, sample_file):
  162. """Test len() operator on chapters."""
  163. chapters = pipeline.process(sample_file)
  164. for chapter in chapters:
  165. assert len(chapter) == chapter.char_count
  166. def test_full_pipeline_integration(self, pipeline, sample_file):
  167. """Test full integration of all components."""
  168. # This test verifies the entire pipeline works together
  169. result = pipeline.process_to_result(sample_file)
  170. # Verify all stages completed
  171. assert result.chapter_count > 0
  172. assert result.original_char_count > 0
  173. assert result.cleaned_char_count >= 0
  174. # Verify chapter structure
  175. for chapter in result.chapters:
  176. assert hasattr(chapter, 'index')
  177. assert hasattr(chapter, 'title')
  178. assert hasattr(chapter, 'content')
  179. assert chapter.index >= 0
  180. def test_chinese_encoding_detection(self, pipeline, tmp_path):
  181. """Test processing files with different Chinese encodings."""
  182. # GBK encoded file
  183. gbk_file = tmp_path / "gbk.txt"
  184. content = "第一章 测试\n内容"
  185. gbk_file.write_bytes(content.encode("gbk"))
  186. chapters = pipeline.process(gbk_file)
  187. assert len(chapters) >= 1
  188. def test_large_file_handling(self, pipeline, tmp_path):
  189. """Test handling larger files."""
  190. large_file = tmp_path / "large.txt"
  191. # Create a file with many chapters
  192. lines = []
  193. for i in range(50):
  194. lines.append(f"第{i+1}章")
  195. lines.append("这是测试内容。" * 10)
  196. large_file.write_text("\n".join(lines), encoding="utf-8")
  197. chapters = pipeline.process(large_file)
  198. assert len(chapters) == 50
  199. def test_no_chapters_detected(self, pipeline, tmp_path):
  200. """Test file without chapter titles."""
  201. no_chapter_file = tmp_path / "no_chapter.txt"
  202. no_chapter_file.write_text("这是一段没有章节标题的文本。\n第二行内容。", encoding="utf-8")
  203. chapters = pipeline.process(no_chapter_file)
  204. # Should return single chapter with "全文" title
  205. assert len(chapters) == 1
  206. assert chapters[0].title == "全文"
  207. def test_special_characters_in_file(self, pipeline, tmp_path):
  208. """Test handling files with special characters."""
  209. special_file = tmp_path / "special.txt"
  210. content = "第一章:测试!\n\"引号\"内容\n\t制表符\n多种标点:;,。!?"
  211. special_file.write_text(content, encoding="utf-8")
  212. chapters = pipeline.process(special_file)
  213. assert len(chapters) >= 1
  214. def test_cleaning_statistics(self, pipeline, sample_file):
  215. """Test that cleaning statistics are accurate."""
  216. result = pipeline.process_to_result(sample_file)
  217. # Verify statistics are consistent
  218. if result.original_char_count > result.cleaned_char_count:
  219. assert result.removed_char_count > 0
  220. assert result.removed_char_count == result.original_char_count - result.cleaned_char_count
  221. def test_pipeline_with_custom_patterns(self, tmp_path):
  222. """Test pipeline with custom chapter patterns."""
  223. custom_file = tmp_path / "custom.txt"
  224. # Make content longer to avoid merging
  225. content = """EPISODE 1 Start
  226. This is episode one with enough content to avoid merging.
  227. EPISODE 2 Middle
  228. This is episode two with enough content to avoid merging as well.
  229. """
  230. custom_file.write_text(content, encoding="utf-8")
  231. pipeline = CleaningPipeline()
  232. pipeline.create_custom_splitter(
  233. min_chapter_length=10,
  234. merge_short_chapters=False,
  235. custom_patterns=[(r'^EPISODE\s+\d+', 1)]
  236. )
  237. chapters = pipeline.process(custom_file)
  238. assert len(chapters) >= 2
  239. def test_is_binary_detection(self, pipeline, tmp_path):
  240. """Test binary file detection."""
  241. text_file = tmp_path / "text.txt"
  242. text_file.write_text("文本内容", encoding="utf-8")
  243. binary_file = tmp_path / "binary.bin"
  244. binary_file.write_bytes(b"\x00\x01\x02\x03" * 100)
  245. text_info = pipeline.get_file_info(text_file)
  246. binary_info = pipeline.get_file_info(binary_file)
  247. assert text_info['is_binary'] is False
  248. assert binary_info['is_binary'] is True