| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313 |
- """
- Integration tests for CleaningPipeline.
- """
- import pytest
- from pathlib import Path
- from src.cleaning.pipeline import CleaningPipeline, CleaningPipelineError
- from src.cleaning.reader import TxtReader
- from src.cleaning.cleaner import TextCleaner
- from src.cleaning.splitter import ChapterSplitter
- @pytest.fixture
- def sample_file(tmp_path):
- """Create a sample TXT file for testing."""
- file_path = tmp_path / "sample.txt"
- content = """第一章 开始
- 这是第一章的内容,包含一些文字。
- 林风站在山顶,看着远方。
- 第二章 继续
- 这是第二章的内容。
- 他开始了新的旅程。
- """
- file_path.write_text(content, encoding="utf-8")
- return file_path
- @pytest.fixture
- def pipeline():
- """Create a CleaningPipeline instance."""
- return CleaningPipeline()
- class TestCleaningPipeline:
- """Test suite for CleaningPipeline."""
- def test_process_basic(self, pipeline, sample_file):
- """Test basic file processing."""
- chapters = pipeline.process(sample_file)
- assert len(chapters) >= 2
- assert "第一章" in chapters[0].title
- def test_process_with_info(self, pipeline, sample_file):
- """Test processing with info return."""
- chapters, info = pipeline.process(sample_file, return_info=True)
- assert len(chapters) >= 2
- assert 'encoding' in info
- assert 'original_size' in info
- assert 'cleaned_size' in info
- def test_process_to_result(self, pipeline, sample_file):
- """Test processing to CleaningResult."""
- result = pipeline.process_to_result(sample_file)
- assert result.chapter_count >= 2
- assert result.original_char_count > 0
- assert result.cleaned_char_count > 0
- def test_removal_rate_property(self, pipeline, sample_file):
- """Test removal rate calculation."""
- result = pipeline.process_to_result(sample_file)
- rate = result.removal_rate
- assert 0.0 <= rate <= 1.0
- def test_read_and_clean(self, pipeline, sample_file):
- """Test reading and cleaning without splitting."""
- content = pipeline.read_and_clean(sample_file)
- assert isinstance(content, str)
- assert len(content) > 0
- def test_get_file_info(self, pipeline, sample_file):
- """Test getting file information."""
- info = pipeline.get_file_info(sample_file)
- assert info['exists'] is True
- assert info['is_file'] is True
- assert 'size' in info
- assert 'encoding' in info
- def test_get_file_info_nonexistent(self, pipeline, tmp_path):
- """Test getting info for non-existent file."""
- info = pipeline.get_file_info(tmp_path / "nonexistent.txt")
- assert info['exists'] is False
- assert info['is_file'] is False
- def test_custom_components(self, sample_file):
- """Test pipeline with custom components."""
- custom_reader = TxtReader(default_encoding="utf-8")
- custom_cleaner = TextCleaner(remove_extra_whitespace=True)
- custom_splitter = ChapterSplitter(min_chapter_length=10)
- pipeline = CleaningPipeline(
- reader=custom_reader,
- cleaner=custom_cleaner,
- splitter=custom_splitter
- )
- chapters = pipeline.process(sample_file)
- assert len(chapters) >= 2
- def test_disable_cleaning(self, sample_file):
- """Test pipeline with cleaning disabled."""
- pipeline = CleaningPipeline(enable_cleaning=False)
- chapters, info = pipeline.process(sample_file, return_info=True)
- assert len(chapters) >= 2
- assert info.get('removed_chars', 0) == 0
- def test_disable_splitting(self, sample_file):
- """Test pipeline with splitting disabled."""
- pipeline = CleaningPipeline(enable_splitting=False)
- chapters = pipeline.process(sample_file)
- assert len(chapters) == 1
- assert chapters[0].title == "全文"
- def test_create_custom_splitter(self, pipeline):
- """Test creating custom splitter."""
- pipeline.create_custom_splitter(
- min_chapter_length=50,
- merge_short_chapters=False
- )
- assert pipeline.splitter.min_chapter_length == 50
- def test_create_custom_cleaner(self, pipeline):
- """Test creating custom cleaner."""
- pipeline.create_custom_cleaner(
- remove_extra_whitespace=True,
- fix_punctuation=True
- )
- assert pipeline.cleaner.remove_extra_whitespace is True
- def test_batch_process(self, pipeline, tmp_path):
- """Test batch processing multiple files."""
- # Create multiple files
- files = []
- for i in range(3):
- file_path = tmp_path / f"file_{i}.txt"
- content = f"第{i+1}章\n内容{i}\n"
- file_path.write_text(content, encoding="utf-8")
- files.append(file_path)
- results = pipeline.batch_process(files)
- assert len(results) == 3
- for path, chapters in results:
- assert isinstance(chapters, list)
- def test_batch_process_with_errors(self, pipeline, tmp_path):
- """Test batch processing with some errors."""
- files = [
- tmp_path / "exists.txt",
- tmp_path / "nonexistent.txt"
- ]
- files[0].write_text("内容", encoding="utf-8")
- results = pipeline.batch_process(files, raise_on_error=False)
- assert len(results) == 2
- assert isinstance(results[0][1], list) # Success
- assert isinstance(results[1][1], Exception) # Error
- def test_batch_process_raise_on_error(self, pipeline, tmp_path):
- """Test batch processing raises on error."""
- files = [tmp_path / "nonexistent.txt"]
- with pytest.raises(CleaningPipelineError):
- pipeline.batch_process(files, raise_on_error=True)
- def test_process_nonexistent_file(self, pipeline):
- """Test processing non-existent file raises error."""
- with pytest.raises(CleaningPipelineError):
- pipeline.process("/nonexistent/file.txt")
- def test_process_empty_file(self, pipeline, tmp_path):
- """Test processing empty file."""
- empty_file = tmp_path / "empty.txt"
- empty_file.write_text("", encoding="utf-8")
- chapters = pipeline.process(empty_file)
- # Should handle gracefully - either empty list or single empty chapter
- assert isinstance(chapters, list)
- def test_result_properties(self, pipeline, sample_file):
- """Test CleaningResult properties."""
- result = pipeline.process_to_result(sample_file)
- # Test chapter_count property
- assert result.chapter_count == len(result.chapters)
- # Test chapters have content
- for chapter in result.chapters:
- assert hasattr(chapter, 'content')
- assert hasattr(chapter, 'char_count')
- def test_chapter_word_count_property(self, pipeline, sample_file):
- """Test chapter word_count property."""
- chapters = pipeline.process(sample_file)
- for chapter in chapters:
- assert chapter.word_count >= 0
- def test_chapter_len_operator(self, pipeline, sample_file):
- """Test len() operator on chapters."""
- chapters = pipeline.process(sample_file)
- for chapter in chapters:
- assert len(chapter) == chapter.char_count
- def test_full_pipeline_integration(self, pipeline, sample_file):
- """Test full integration of all components."""
- # This test verifies the entire pipeline works together
- result = pipeline.process_to_result(sample_file)
- # Verify all stages completed
- assert result.chapter_count > 0
- assert result.original_char_count > 0
- assert result.cleaned_char_count >= 0
- # Verify chapter structure
- for chapter in result.chapters:
- assert hasattr(chapter, 'index')
- assert hasattr(chapter, 'title')
- assert hasattr(chapter, 'content')
- assert chapter.index >= 0
- def test_chinese_encoding_detection(self, pipeline, tmp_path):
- """Test processing files with different Chinese encodings."""
- # GBK encoded file
- gbk_file = tmp_path / "gbk.txt"
- content = "第一章 测试\n内容"
- gbk_file.write_bytes(content.encode("gbk"))
- chapters = pipeline.process(gbk_file)
- assert len(chapters) >= 1
- def test_large_file_handling(self, pipeline, tmp_path):
- """Test handling larger files."""
- large_file = tmp_path / "large.txt"
- # Create a file with many chapters
- lines = []
- for i in range(50):
- lines.append(f"第{i+1}章")
- lines.append("这是测试内容。" * 10)
- large_file.write_text("\n".join(lines), encoding="utf-8")
- chapters = pipeline.process(large_file)
- assert len(chapters) == 50
- def test_no_chapters_detected(self, pipeline, tmp_path):
- """Test file without chapter titles."""
- no_chapter_file = tmp_path / "no_chapter.txt"
- no_chapter_file.write_text("这是一段没有章节标题的文本。\n第二行内容。", encoding="utf-8")
- chapters = pipeline.process(no_chapter_file)
- # Should return single chapter with "全文" title
- assert len(chapters) == 1
- assert chapters[0].title == "全文"
- def test_special_characters_in_file(self, pipeline, tmp_path):
- """Test handling files with special characters."""
- special_file = tmp_path / "special.txt"
- content = "第一章:测试!\n\"引号\"内容\n\t制表符\n多种标点:;,。!?"
- special_file.write_text(content, encoding="utf-8")
- chapters = pipeline.process(special_file)
- assert len(chapters) >= 1
- def test_cleaning_statistics(self, pipeline, sample_file):
- """Test that cleaning statistics are accurate."""
- result = pipeline.process_to_result(sample_file)
- # Verify statistics are consistent
- if result.original_char_count > result.cleaned_char_count:
- assert result.removed_char_count > 0
- assert result.removed_char_count == result.original_char_count - result.cleaned_char_count
- def test_pipeline_with_custom_patterns(self, tmp_path):
- """Test pipeline with custom chapter patterns."""
- custom_file = tmp_path / "custom.txt"
- # Make content longer to avoid merging
- content = """EPISODE 1 Start
- This is episode one with enough content to avoid merging.
- EPISODE 2 Middle
- This is episode two with enough content to avoid merging as well.
- """
- custom_file.write_text(content, encoding="utf-8")
- pipeline = CleaningPipeline()
- pipeline.create_custom_splitter(
- min_chapter_length=10,
- merge_short_chapters=False,
- custom_patterns=[(r'^EPISODE\s+\d+', 1)]
- )
- chapters = pipeline.process(custom_file)
- assert len(chapters) >= 2
- def test_is_binary_detection(self, pipeline, tmp_path):
- """Test binary file detection."""
- text_file = tmp_path / "text.txt"
- text_file.write_text("文本内容", encoding="utf-8")
- binary_file = tmp_path / "binary.bin"
- binary_file.write_bytes(b"\x00\x01\x02\x03" * 100)
- text_info = pipeline.get_file_info(text_file)
- binary_info = pipeline.get_file_info(binary_file)
- assert text_info['is_binary'] is False
- assert binary_info['is_binary'] is True
|