|
|
@@ -0,0 +1,313 @@
|
|
|
+"""
|
|
|
+Integration tests for CleaningPipeline.
|
|
|
+"""
|
|
|
+
|
|
|
+import pytest
|
|
|
+from pathlib import Path
|
|
|
+from src.cleaning.pipeline import CleaningPipeline, CleaningPipelineError
|
|
|
+from src.cleaning.reader import TxtReader
|
|
|
+from src.cleaning.cleaner import TextCleaner
|
|
|
+from src.cleaning.splitter import ChapterSplitter
|
|
|
+
|
|
|
+
|
|
|
+@pytest.fixture
|
|
|
+def sample_file(tmp_path):
|
|
|
+ """Create a sample TXT file for testing."""
|
|
|
+ file_path = tmp_path / "sample.txt"
|
|
|
+ content = """第一章 开始
|
|
|
+
|
|
|
+这是第一章的内容,包含一些文字。
|
|
|
+
|
|
|
+林风站在山顶,看着远方。
|
|
|
+
|
|
|
+第二章 继续
|
|
|
+
|
|
|
+这是第二章的内容。
|
|
|
+
|
|
|
+他开始了新的旅程。
|
|
|
+"""
|
|
|
+ file_path.write_text(content, encoding="utf-8")
|
|
|
+ return file_path
|
|
|
+
|
|
|
+
|
|
|
+@pytest.fixture
|
|
|
+def pipeline():
|
|
|
+ """Create a CleaningPipeline instance."""
|
|
|
+ return CleaningPipeline()
|
|
|
+
|
|
|
+
|
|
|
+class TestCleaningPipeline:
|
|
|
+ """Test suite for CleaningPipeline."""
|
|
|
+
|
|
|
+ def test_process_basic(self, pipeline, sample_file):
|
|
|
+ """Test basic file processing."""
|
|
|
+ chapters = pipeline.process(sample_file)
|
|
|
+ assert len(chapters) >= 2
|
|
|
+ assert "第一章" in chapters[0].title
|
|
|
+
|
|
|
+ def test_process_with_info(self, pipeline, sample_file):
|
|
|
+ """Test processing with info return."""
|
|
|
+ chapters, info = pipeline.process(sample_file, return_info=True)
|
|
|
+ assert len(chapters) >= 2
|
|
|
+ assert 'encoding' in info
|
|
|
+ assert 'original_size' in info
|
|
|
+ assert 'cleaned_size' in info
|
|
|
+
|
|
|
+ def test_process_to_result(self, pipeline, sample_file):
|
|
|
+ """Test processing to CleaningResult."""
|
|
|
+ result = pipeline.process_to_result(sample_file)
|
|
|
+ assert result.chapter_count >= 2
|
|
|
+ assert result.original_char_count > 0
|
|
|
+ assert result.cleaned_char_count > 0
|
|
|
+
|
|
|
+ def test_removal_rate_property(self, pipeline, sample_file):
|
|
|
+ """Test removal rate calculation."""
|
|
|
+ result = pipeline.process_to_result(sample_file)
|
|
|
+ rate = result.removal_rate
|
|
|
+ assert 0.0 <= rate <= 1.0
|
|
|
+
|
|
|
+ def test_read_and_clean(self, pipeline, sample_file):
|
|
|
+ """Test reading and cleaning without splitting."""
|
|
|
+ content = pipeline.read_and_clean(sample_file)
|
|
|
+ assert isinstance(content, str)
|
|
|
+ assert len(content) > 0
|
|
|
+
|
|
|
+ def test_get_file_info(self, pipeline, sample_file):
|
|
|
+ """Test getting file information."""
|
|
|
+ info = pipeline.get_file_info(sample_file)
|
|
|
+ assert info['exists'] is True
|
|
|
+ assert info['is_file'] is True
|
|
|
+ assert 'size' in info
|
|
|
+ assert 'encoding' in info
|
|
|
+
|
|
|
+ def test_get_file_info_nonexistent(self, pipeline, tmp_path):
|
|
|
+ """Test getting info for non-existent file."""
|
|
|
+ info = pipeline.get_file_info(tmp_path / "nonexistent.txt")
|
|
|
+ assert info['exists'] is False
|
|
|
+ assert info['is_file'] is False
|
|
|
+
|
|
|
+ def test_custom_components(self, sample_file):
|
|
|
+ """Test pipeline with custom components."""
|
|
|
+ custom_reader = TxtReader(default_encoding="utf-8")
|
|
|
+ custom_cleaner = TextCleaner(remove_extra_whitespace=True)
|
|
|
+ custom_splitter = ChapterSplitter(min_chapter_length=10)
|
|
|
+
|
|
|
+ pipeline = CleaningPipeline(
|
|
|
+ reader=custom_reader,
|
|
|
+ cleaner=custom_cleaner,
|
|
|
+ splitter=custom_splitter
|
|
|
+ )
|
|
|
+
|
|
|
+ chapters = pipeline.process(sample_file)
|
|
|
+ assert len(chapters) >= 2
|
|
|
+
|
|
|
+ def test_disable_cleaning(self, sample_file):
|
|
|
+ """Test pipeline with cleaning disabled."""
|
|
|
+ pipeline = CleaningPipeline(enable_cleaning=False)
|
|
|
+ chapters, info = pipeline.process(sample_file, return_info=True)
|
|
|
+ assert len(chapters) >= 2
|
|
|
+ assert info.get('removed_chars', 0) == 0
|
|
|
+
|
|
|
+ def test_disable_splitting(self, sample_file):
|
|
|
+ """Test pipeline with splitting disabled."""
|
|
|
+ pipeline = CleaningPipeline(enable_splitting=False)
|
|
|
+ chapters = pipeline.process(sample_file)
|
|
|
+ assert len(chapters) == 1
|
|
|
+ assert chapters[0].title == "全文"
|
|
|
+
|
|
|
+ def test_create_custom_splitter(self, pipeline):
|
|
|
+ """Test creating custom splitter."""
|
|
|
+ pipeline.create_custom_splitter(
|
|
|
+ min_chapter_length=50,
|
|
|
+ merge_short_chapters=False
|
|
|
+ )
|
|
|
+ assert pipeline.splitter.min_chapter_length == 50
|
|
|
+
|
|
|
+ def test_create_custom_cleaner(self, pipeline):
|
|
|
+ """Test creating custom cleaner."""
|
|
|
+ pipeline.create_custom_cleaner(
|
|
|
+ remove_extra_whitespace=True,
|
|
|
+ fix_punctuation=True
|
|
|
+ )
|
|
|
+ assert pipeline.cleaner.remove_extra_whitespace is True
|
|
|
+
|
|
|
+ def test_batch_process(self, pipeline, tmp_path):
|
|
|
+ """Test batch processing multiple files."""
|
|
|
+ # Create multiple files
|
|
|
+ files = []
|
|
|
+ for i in range(3):
|
|
|
+ file_path = tmp_path / f"file_{i}.txt"
|
|
|
+ content = f"第{i+1}章\n内容{i}\n"
|
|
|
+ file_path.write_text(content, encoding="utf-8")
|
|
|
+ files.append(file_path)
|
|
|
+
|
|
|
+ results = pipeline.batch_process(files)
|
|
|
+ assert len(results) == 3
|
|
|
+
|
|
|
+ for path, chapters in results:
|
|
|
+ assert isinstance(chapters, list)
|
|
|
+
|
|
|
+ def test_batch_process_with_errors(self, pipeline, tmp_path):
|
|
|
+ """Test batch processing with some errors."""
|
|
|
+ files = [
|
|
|
+ tmp_path / "exists.txt",
|
|
|
+ tmp_path / "nonexistent.txt"
|
|
|
+ ]
|
|
|
+ files[0].write_text("内容", encoding="utf-8")
|
|
|
+
|
|
|
+ results = pipeline.batch_process(files, raise_on_error=False)
|
|
|
+ assert len(results) == 2
|
|
|
+ assert isinstance(results[0][1], list) # Success
|
|
|
+ assert isinstance(results[1][1], Exception) # Error
|
|
|
+
|
|
|
+ def test_batch_process_raise_on_error(self, pipeline, tmp_path):
|
|
|
+ """Test batch processing raises on error."""
|
|
|
+ files = [tmp_path / "nonexistent.txt"]
|
|
|
+
|
|
|
+ with pytest.raises(CleaningPipelineError):
|
|
|
+ pipeline.batch_process(files, raise_on_error=True)
|
|
|
+
|
|
|
+ def test_process_nonexistent_file(self, pipeline):
|
|
|
+ """Test processing non-existent file raises error."""
|
|
|
+ with pytest.raises(CleaningPipelineError):
|
|
|
+ pipeline.process("/nonexistent/file.txt")
|
|
|
+
|
|
|
+ def test_process_empty_file(self, pipeline, tmp_path):
|
|
|
+ """Test processing empty file."""
|
|
|
+ empty_file = tmp_path / "empty.txt"
|
|
|
+ empty_file.write_text("", encoding="utf-8")
|
|
|
+
|
|
|
+ chapters = pipeline.process(empty_file)
|
|
|
+ # Should handle gracefully - either empty list or single empty chapter
|
|
|
+ assert isinstance(chapters, list)
|
|
|
+
|
|
|
+ def test_result_properties(self, pipeline, sample_file):
|
|
|
+ """Test CleaningResult properties."""
|
|
|
+ result = pipeline.process_to_result(sample_file)
|
|
|
+
|
|
|
+ # Test chapter_count property
|
|
|
+ assert result.chapter_count == len(result.chapters)
|
|
|
+
|
|
|
+ # Test chapters have content
|
|
|
+ for chapter in result.chapters:
|
|
|
+ assert hasattr(chapter, 'content')
|
|
|
+ assert hasattr(chapter, 'char_count')
|
|
|
+
|
|
|
+ def test_chapter_word_count_property(self, pipeline, sample_file):
|
|
|
+ """Test chapter word_count property."""
|
|
|
+ chapters = pipeline.process(sample_file)
|
|
|
+ for chapter in chapters:
|
|
|
+ assert chapter.word_count >= 0
|
|
|
+
|
|
|
+ def test_chapter_len_operator(self, pipeline, sample_file):
|
|
|
+ """Test len() operator on chapters."""
|
|
|
+ chapters = pipeline.process(sample_file)
|
|
|
+ for chapter in chapters:
|
|
|
+ assert len(chapter) == chapter.char_count
|
|
|
+
|
|
|
+ def test_full_pipeline_integration(self, pipeline, sample_file):
|
|
|
+ """Test full integration of all components."""
|
|
|
+ # This test verifies the entire pipeline works together
|
|
|
+ result = pipeline.process_to_result(sample_file)
|
|
|
+
|
|
|
+ # Verify all stages completed
|
|
|
+ assert result.chapter_count > 0
|
|
|
+ assert result.original_char_count > 0
|
|
|
+ assert result.cleaned_char_count >= 0
|
|
|
+
|
|
|
+ # Verify chapter structure
|
|
|
+ for chapter in result.chapters:
|
|
|
+ assert hasattr(chapter, 'index')
|
|
|
+ assert hasattr(chapter, 'title')
|
|
|
+ assert hasattr(chapter, 'content')
|
|
|
+ assert chapter.index >= 0
|
|
|
+
|
|
|
+ def test_chinese_encoding_detection(self, pipeline, tmp_path):
|
|
|
+ """Test processing files with different Chinese encodings."""
|
|
|
+ # GBK encoded file
|
|
|
+ gbk_file = tmp_path / "gbk.txt"
|
|
|
+ content = "第一章 测试\n内容"
|
|
|
+ gbk_file.write_bytes(content.encode("gbk"))
|
|
|
+
|
|
|
+ chapters = pipeline.process(gbk_file)
|
|
|
+ assert len(chapters) >= 1
|
|
|
+
|
|
|
+ def test_large_file_handling(self, pipeline, tmp_path):
|
|
|
+ """Test handling larger files."""
|
|
|
+ large_file = tmp_path / "large.txt"
|
|
|
+ # Create a file with many chapters
|
|
|
+ lines = []
|
|
|
+ for i in range(50):
|
|
|
+ lines.append(f"第{i+1}章")
|
|
|
+ lines.append("这是测试内容。" * 10)
|
|
|
+
|
|
|
+ large_file.write_text("\n".join(lines), encoding="utf-8")
|
|
|
+
|
|
|
+ chapters = pipeline.process(large_file)
|
|
|
+ assert len(chapters) == 50
|
|
|
+
|
|
|
+ def test_no_chapters_detected(self, pipeline, tmp_path):
|
|
|
+ """Test file without chapter titles."""
|
|
|
+ no_chapter_file = tmp_path / "no_chapter.txt"
|
|
|
+ no_chapter_file.write_text("这是一段没有章节标题的文本。\n第二行内容。", encoding="utf-8")
|
|
|
+
|
|
|
+ chapters = pipeline.process(no_chapter_file)
|
|
|
+ # Should return single chapter with "全文" title
|
|
|
+ assert len(chapters) == 1
|
|
|
+ assert chapters[0].title == "全文"
|
|
|
+
|
|
|
+ def test_special_characters_in_file(self, pipeline, tmp_path):
|
|
|
+ """Test handling files with special characters."""
|
|
|
+ special_file = tmp_path / "special.txt"
|
|
|
+ content = "第一章:测试!\n\"引号\"内容\n\t制表符\n多种标点:;,。!?"
|
|
|
+ special_file.write_text(content, encoding="utf-8")
|
|
|
+
|
|
|
+ chapters = pipeline.process(special_file)
|
|
|
+ assert len(chapters) >= 1
|
|
|
+
|
|
|
+ def test_cleaning_statistics(self, pipeline, sample_file):
|
|
|
+ """Test that cleaning statistics are accurate."""
|
|
|
+ result = pipeline.process_to_result(sample_file)
|
|
|
+
|
|
|
+ # Verify statistics are consistent
|
|
|
+ if result.original_char_count > result.cleaned_char_count:
|
|
|
+ assert result.removed_char_count > 0
|
|
|
+ assert result.removed_char_count == result.original_char_count - result.cleaned_char_count
|
|
|
+
|
|
|
+ def test_pipeline_with_custom_patterns(self, tmp_path):
|
|
|
+ """Test pipeline with custom chapter patterns."""
|
|
|
+ custom_file = tmp_path / "custom.txt"
|
|
|
+ # Make content longer to avoid merging
|
|
|
+ content = """EPISODE 1 Start
|
|
|
+
|
|
|
+This is episode one with enough content to avoid merging.
|
|
|
+
|
|
|
+EPISODE 2 Middle
|
|
|
+
|
|
|
+This is episode two with enough content to avoid merging as well.
|
|
|
+"""
|
|
|
+ custom_file.write_text(content, encoding="utf-8")
|
|
|
+
|
|
|
+ pipeline = CleaningPipeline()
|
|
|
+ pipeline.create_custom_splitter(
|
|
|
+ min_chapter_length=10,
|
|
|
+ merge_short_chapters=False,
|
|
|
+ custom_patterns=[(r'^EPISODE\s+\d+', 1)]
|
|
|
+ )
|
|
|
+
|
|
|
+ chapters = pipeline.process(custom_file)
|
|
|
+ assert len(chapters) >= 2
|
|
|
+
|
|
|
+ def test_is_binary_detection(self, pipeline, tmp_path):
|
|
|
+ """Test binary file detection."""
|
|
|
+ text_file = tmp_path / "text.txt"
|
|
|
+ text_file.write_text("文本内容", encoding="utf-8")
|
|
|
+
|
|
|
+ binary_file = tmp_path / "binary.bin"
|
|
|
+ binary_file.write_bytes(b"\x00\x01\x02\x03" * 100)
|
|
|
+
|
|
|
+ text_info = pipeline.get_file_info(text_file)
|
|
|
+ binary_info = pipeline.get_file_info(binary_file)
|
|
|
+
|
|
|
+ assert text_info['is_binary'] is False
|
|
|
+ assert binary_info['is_binary'] is True
|