""" Integration tests for CleaningPipeline. """ import pytest from pathlib import Path from src.cleaning.pipeline import CleaningPipeline, CleaningPipelineError from src.cleaning.reader import TxtReader from src.cleaning.cleaner import TextCleaner from src.cleaning.splitter import ChapterSplitter @pytest.fixture def sample_file(tmp_path): """Create a sample TXT file for testing.""" file_path = tmp_path / "sample.txt" content = """第一章 开始 这是第一章的内容,包含一些文字。 林风站在山顶,看着远方。 第二章 继续 这是第二章的内容。 他开始了新的旅程。 """ file_path.write_text(content, encoding="utf-8") return file_path @pytest.fixture def pipeline(): """Create a CleaningPipeline instance.""" return CleaningPipeline() class TestCleaningPipeline: """Test suite for CleaningPipeline.""" def test_process_basic(self, pipeline, sample_file): """Test basic file processing.""" chapters = pipeline.process(sample_file) assert len(chapters) >= 2 assert "第一章" in chapters[0].title def test_process_with_info(self, pipeline, sample_file): """Test processing with info return.""" chapters, info = pipeline.process(sample_file, return_info=True) assert len(chapters) >= 2 assert 'encoding' in info assert 'original_size' in info assert 'cleaned_size' in info def test_process_to_result(self, pipeline, sample_file): """Test processing to CleaningResult.""" result = pipeline.process_to_result(sample_file) assert result.chapter_count >= 2 assert result.original_char_count > 0 assert result.cleaned_char_count > 0 def test_removal_rate_property(self, pipeline, sample_file): """Test removal rate calculation.""" result = pipeline.process_to_result(sample_file) rate = result.removal_rate assert 0.0 <= rate <= 1.0 def test_read_and_clean(self, pipeline, sample_file): """Test reading and cleaning without splitting.""" content = pipeline.read_and_clean(sample_file) assert isinstance(content, str) assert len(content) > 0 def test_get_file_info(self, pipeline, sample_file): """Test getting file information.""" info = pipeline.get_file_info(sample_file) assert info['exists'] is True assert info['is_file'] is True assert 'size' in info assert 'encoding' in info def test_get_file_info_nonexistent(self, pipeline, tmp_path): """Test getting info for non-existent file.""" info = pipeline.get_file_info(tmp_path / "nonexistent.txt") assert info['exists'] is False assert info['is_file'] is False def test_custom_components(self, sample_file): """Test pipeline with custom components.""" custom_reader = TxtReader(default_encoding="utf-8") custom_cleaner = TextCleaner(remove_extra_whitespace=True) custom_splitter = ChapterSplitter(min_chapter_length=10) pipeline = CleaningPipeline( reader=custom_reader, cleaner=custom_cleaner, splitter=custom_splitter ) chapters = pipeline.process(sample_file) assert len(chapters) >= 2 def test_disable_cleaning(self, sample_file): """Test pipeline with cleaning disabled.""" pipeline = CleaningPipeline(enable_cleaning=False) chapters, info = pipeline.process(sample_file, return_info=True) assert len(chapters) >= 2 assert info.get('removed_chars', 0) == 0 def test_disable_splitting(self, sample_file): """Test pipeline with splitting disabled.""" pipeline = CleaningPipeline(enable_splitting=False) chapters = pipeline.process(sample_file) assert len(chapters) == 1 assert chapters[0].title == "全文" def test_create_custom_splitter(self, pipeline): """Test creating custom splitter.""" pipeline.create_custom_splitter( min_chapter_length=50, merge_short_chapters=False ) assert pipeline.splitter.min_chapter_length == 50 def test_create_custom_cleaner(self, pipeline): """Test creating custom cleaner.""" pipeline.create_custom_cleaner( remove_extra_whitespace=True, fix_punctuation=True ) assert pipeline.cleaner.remove_extra_whitespace is True def test_batch_process(self, pipeline, tmp_path): """Test batch processing multiple files.""" # Create multiple files files = [] for i in range(3): file_path = tmp_path / f"file_{i}.txt" content = f"第{i+1}章\n内容{i}\n" file_path.write_text(content, encoding="utf-8") files.append(file_path) results = pipeline.batch_process(files) assert len(results) == 3 for path, chapters in results: assert isinstance(chapters, list) def test_batch_process_with_errors(self, pipeline, tmp_path): """Test batch processing with some errors.""" files = [ tmp_path / "exists.txt", tmp_path / "nonexistent.txt" ] files[0].write_text("内容", encoding="utf-8") results = pipeline.batch_process(files, raise_on_error=False) assert len(results) == 2 assert isinstance(results[0][1], list) # Success assert isinstance(results[1][1], Exception) # Error def test_batch_process_raise_on_error(self, pipeline, tmp_path): """Test batch processing raises on error.""" files = [tmp_path / "nonexistent.txt"] with pytest.raises(CleaningPipelineError): pipeline.batch_process(files, raise_on_error=True) def test_process_nonexistent_file(self, pipeline): """Test processing non-existent file raises error.""" with pytest.raises(CleaningPipelineError): pipeline.process("/nonexistent/file.txt") def test_process_empty_file(self, pipeline, tmp_path): """Test processing empty file.""" empty_file = tmp_path / "empty.txt" empty_file.write_text("", encoding="utf-8") chapters = pipeline.process(empty_file) # Should handle gracefully - either empty list or single empty chapter assert isinstance(chapters, list) def test_result_properties(self, pipeline, sample_file): """Test CleaningResult properties.""" result = pipeline.process_to_result(sample_file) # Test chapter_count property assert result.chapter_count == len(result.chapters) # Test chapters have content for chapter in result.chapters: assert hasattr(chapter, 'content') assert hasattr(chapter, 'char_count') def test_chapter_word_count_property(self, pipeline, sample_file): """Test chapter word_count property.""" chapters = pipeline.process(sample_file) for chapter in chapters: assert chapter.word_count >= 0 def test_chapter_len_operator(self, pipeline, sample_file): """Test len() operator on chapters.""" chapters = pipeline.process(sample_file) for chapter in chapters: assert len(chapter) == chapter.char_count def test_full_pipeline_integration(self, pipeline, sample_file): """Test full integration of all components.""" # This test verifies the entire pipeline works together result = pipeline.process_to_result(sample_file) # Verify all stages completed assert result.chapter_count > 0 assert result.original_char_count > 0 assert result.cleaned_char_count >= 0 # Verify chapter structure for chapter in result.chapters: assert hasattr(chapter, 'index') assert hasattr(chapter, 'title') assert hasattr(chapter, 'content') assert chapter.index >= 0 def test_chinese_encoding_detection(self, pipeline, tmp_path): """Test processing files with different Chinese encodings.""" # GBK encoded file gbk_file = tmp_path / "gbk.txt" content = "第一章 测试\n内容" gbk_file.write_bytes(content.encode("gbk")) chapters = pipeline.process(gbk_file) assert len(chapters) >= 1 def test_large_file_handling(self, pipeline, tmp_path): """Test handling larger files.""" large_file = tmp_path / "large.txt" # Create a file with many chapters lines = [] for i in range(50): lines.append(f"第{i+1}章") lines.append("这是测试内容。" * 10) large_file.write_text("\n".join(lines), encoding="utf-8") chapters = pipeline.process(large_file) assert len(chapters) == 50 def test_no_chapters_detected(self, pipeline, tmp_path): """Test file without chapter titles.""" no_chapter_file = tmp_path / "no_chapter.txt" no_chapter_file.write_text("这是一段没有章节标题的文本。\n第二行内容。", encoding="utf-8") chapters = pipeline.process(no_chapter_file) # Should return single chapter with "全文" title assert len(chapters) == 1 assert chapters[0].title == "全文" def test_special_characters_in_file(self, pipeline, tmp_path): """Test handling files with special characters.""" special_file = tmp_path / "special.txt" content = "第一章:测试!\n\"引号\"内容\n\t制表符\n多种标点:;,。!?" special_file.write_text(content, encoding="utf-8") chapters = pipeline.process(special_file) assert len(chapters) >= 1 def test_cleaning_statistics(self, pipeline, sample_file): """Test that cleaning statistics are accurate.""" result = pipeline.process_to_result(sample_file) # Verify statistics are consistent if result.original_char_count > result.cleaned_char_count: assert result.removed_char_count > 0 assert result.removed_char_count == result.original_char_count - result.cleaned_char_count def test_pipeline_with_custom_patterns(self, tmp_path): """Test pipeline with custom chapter patterns.""" custom_file = tmp_path / "custom.txt" # Make content longer to avoid merging content = """EPISODE 1 Start This is episode one with enough content to avoid merging. EPISODE 2 Middle This is episode two with enough content to avoid merging as well. """ custom_file.write_text(content, encoding="utf-8") pipeline = CleaningPipeline() pipeline.create_custom_splitter( min_chapter_length=10, merge_short_chapters=False, custom_patterns=[(r'^EPISODE\s+\d+', 1)] ) chapters = pipeline.process(custom_file) assert len(chapters) >= 2 def test_is_binary_detection(self, pipeline, tmp_path): """Test binary file detection.""" text_file = tmp_path / "text.txt" text_file.write_text("文本内容", encoding="utf-8") binary_file = tmp_path / "binary.bin" binary_file.write_bytes(b"\x00\x01\x02\x03" * 100) text_info = pipeline.get_file_info(text_file) binary_info = pipeline.get_file_info(binary_file) assert text_info['is_binary'] is False assert binary_info['is_binary'] is True