2
0
Просмотр исходного кода

feat(core): Define PipelineState enum and transition rules (Story 1.1.1)

- Add PipelineState enum with 9 states (IDLE, FINGERPRINTING, CLEANING, etc.)
- Define ALLOWED_TRANSITIONS rules for state machine
- Add is_transition_allowed() validator
- Add helper functions: can_pause_from(), can_resume_to(), get_allowed_transitions()
- Add state helper methods: is_terminal(), is_active(), can_pause(), can_resume()
- Unit tests covering all transitions and edge cases

Part of Epic 1.1: State Machine (Phase 1a)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
d8dfun 3 дней назад
Родитель
Сommit
9494f32225
4 измененных файлов с 441 добавлено и 0 удалено
  1. 15 0
      src/core/__init__.py
  2. 75 0
      src/core/states.py
  3. 134 0
      src/core/transitions.py
  4. 217 0
      tests/test_core_states.py

+ 15 - 0
src/core/__init__.py

@@ -0,0 +1,15 @@
+"""
+Core module for pipeline state management.
+
+This module provides the state machine infrastructure for managing
+translation pipeline lifecycle, including states, transitions, and validation.
+"""
+
+from .states import PipelineState
+from .transitions import is_transition_allowed, ALLOWED_TRANSITIONS
+
+__all__ = [
+    "PipelineState",
+    "is_transition_allowed",
+    "ALLOWED_TRANSITIONS",
+]

+ 75 - 0
src/core/states.py

@@ -0,0 +1,75 @@
+"""
+Pipeline state definitions.
+
+This module defines all possible states for the translation pipeline
+and provides helper functions for state validation.
+"""
+
+from enum import Enum
+from typing import Set
+
+
+class PipelineState(Enum):
+    """
+    Translation pipeline state enumeration.
+
+    States represent the lifecycle of a translation task from initialization
+    to completion or failure.
+
+    Attributes:
+        IDLE: Initial state, task created but not started
+        FINGERPRINTING: Calculating file fingerprint for change detection
+        CLEANING: Cleaning and normalizing source text
+        TERM_EXTRACTION: Extracting terminology for glossary
+        TRANSLATING: Translating content with m2m100 model
+        UPLOADING: Uploading translated content to target platform
+        PAUSED: Task paused by user or system
+        COMPLETED: Task completed successfully
+        FAILED: Task failed with error
+    """
+
+    IDLE = "idle"
+    FINGERPRINTING = "fingerprinting"
+    CLEANING = "cleaning"
+    TERM_EXTRACTION = "term_extraction"
+    TRANSLATING = "translating"
+    UPLOADING = "uploading"
+    PAUSED = "paused"
+    COMPLETED = "completed"
+    FAILED = "failed"
+
+    def __str__(self) -> str:
+        """Return string representation of the state."""
+        return self.value
+
+    def is_terminal(self) -> bool:
+        """Check if this is a terminal state (no transitions out)."""
+        return self in (PipelineState.COMPLETED, PipelineState.FAILED)
+
+    def is_active(self) -> bool:
+        """Check if this is an active processing state."""
+        return self in (
+            PipelineState.FINGERPRINTING,
+            PipelineState.CLEANING,
+            PipelineState.TERM_EXTRACTION,
+            PipelineState.TRANSLATING,
+            PipelineState.UPLOADING,
+        )
+
+    def can_pause(self) -> bool:
+        """Check if task can be paused from this state."""
+        return self.is_active()
+
+    def can_resume(self) -> bool:
+        """Check if task can be resumed from this state."""
+        return self == PipelineState.PAUSED
+
+
+# States that can be transitioned to from PAUSED
+RESUME_TARGETS: Set[PipelineState] = {
+    PipelineState.FINGERPRINTING,
+    PipelineState.CLEANING,
+    PipelineState.TERM_EXTRACTION,
+    PipelineState.TRANSLATING,
+    PipelineState.UPLOADING,
+}

+ 134 - 0
src/core/transitions.py

@@ -0,0 +1,134 @@
+"""
+State transition rules for the pipeline state machine.
+
+This module defines which state transitions are allowed and provides
+validation functions for state transitions.
+"""
+
+from typing import Dict, Set
+
+from .states import PipelineState, RESUME_TARGETS
+
+
+# State transition rules: from_state -> set of allowed to_states
+ALLOWED_TRANSITIONS: Dict[PipelineState, Set[PipelineState]] = {
+    # IDLE can start the pipeline
+    PipelineState.IDLE: {
+        PipelineState.FINGERPRINTING,
+    },
+    # FINGERPRINTING can move to CLEANING or fail
+    PipelineState.FINGERPRINTING: {
+        PipelineState.CLEANING,
+        PipelineState.FAILED,
+    },
+    # CLEANING can move to TERM_EXTRACTION or fail
+    PipelineState.CLEANING: {
+        PipelineState.TERM_EXTRACTION,
+        PipelineState.FAILED,
+    },
+    # TERM_EXTRACTION can move to TRANSLATING or fail
+    PipelineState.TERM_EXTRACTION: {
+        PipelineState.TRANSLATING,
+        PipelineState.FAILED,
+    },
+    # TRANSLATING can move to UPLOADING or fail
+    PipelineState.TRANSLATING: {
+        PipelineState.UPLOADING,
+        PipelineState.FAILED,
+    },
+    # UPLOADING can complete or fail
+    PipelineState.UPLOADING: {
+        PipelineState.COMPLETED,
+        PipelineState.FAILED,
+    },
+    # PAUSED can resume to previous active state or reset to IDLE
+    PipelineState.PAUSED: RESUME_TARGETS | {PipelineState.IDLE},
+    # FAILED can reset to IDLE for retry
+    PipelineState.FAILED: {
+        PipelineState.IDLE,
+    },
+    # COMPLETED can reset to IDLE for new task
+    PipelineState.COMPLETED: {
+        PipelineState.IDLE,
+    },
+}
+
+
+def is_transition_allowed(from_state: PipelineState, to_state: PipelineState) -> bool:
+    """
+    Check if a state transition is allowed.
+
+    Args:
+        from_state: The current state
+        to_state: The desired target state
+
+    Returns:
+        True if the transition is allowed, False otherwise
+
+    Examples:
+        >>> is_transition_allowed(PipelineState.IDLE, PipelineState.FINGERPRINTING)
+        True
+        >>> is_transition_allowed(PipelineState.IDLE, PipelineState.COMPLETED)
+        False
+        >>> is_transition_allowed(PipelineState.FAILED, PipelineState.IDLE)
+        True
+    """
+    allowed = ALLOWED_TRANSITIONS.get(from_state, set())
+    return to_state in allowed
+
+
+def get_allowed_transitions(from_state: PipelineState) -> Set[PipelineState]:
+    """
+    Get all allowed target states for a given source state.
+
+    Args:
+        from_state: The current state
+
+    Returns:
+        Set of allowed target states
+
+    Examples:
+        >>> get_allowed_transitions(PipelineState.IDLE)
+        {<PipelineState.FINGERPRINTING: 'fingerprinting'>}
+        >>> get_allowed_transitions(PipelineState.PAUSED)
+        {<PipelineState.IDLE: 'idle'>, <PipelineState.FINGERPRINTING: 'fingerprinting'>, ...}
+    """
+    return ALLOWED_TRANSITIONS.get(from_state, set()).copy()
+
+
+def can_pause_from(state: PipelineState) -> bool:
+    """
+    Check if a task can be paused from the given state.
+
+    Args:
+        state: The current state
+
+    Returns:
+        True if pausing is allowed from this state
+
+    Examples:
+        >>> can_pause_from(PipelineState.TRANSLATING)
+        True
+        >>> can_pause_from(PipelineState.COMPLETED)
+        False
+    """
+    return state.is_active()
+
+
+def can_resume_to(state: PipelineState) -> bool:
+    """
+    Check if a task can be resumed to the given state.
+
+    Args:
+        state: The target state
+
+    Returns:
+        True if resuming to this state is allowed
+
+    Examples:
+        >>> can_resume_to(PipelineState.TRANSLATING)
+        True
+        >>> can_resume_to(PipelineState.COMPLETED)
+        False
+    """
+    return state in RESUME_TARGETS

+ 217 - 0
tests/test_core_states.py

@@ -0,0 +1,217 @@
+"""
+Unit tests for pipeline state management.
+
+Tests cover state definitions, transition rules, and validation functions.
+"""
+
+import pytest
+
+from src.core.states import PipelineState
+from src.core.transitions import (
+    is_transition_allowed,
+    get_allowed_transitions,
+    can_pause_from,
+    can_resume_to,
+    ALLOWED_TRANSITIONS,
+)
+
+
+class TestPipelineState:
+    """Test PipelineState enum functionality."""
+
+    def test_all_states_defined(self):
+        """Test that all required states are defined."""
+        required_states = {
+            "IDLE",
+            "FINGERPRINTING",
+            "CLEANING",
+            "TERM_EXTRACTION",
+            "TRANSLATING",
+            "UPLOADING",
+            "PAUSED",
+            "COMPLETED",
+            "FAILED",
+        }
+        actual_states = {state.name for state in PipelineState}
+        assert actual_states == required_states
+
+    def test_state_values(self):
+        """Test that state values are correct."""
+        assert PipelineState.IDLE.value == "idle"
+        assert PipelineState.FINGERPRINTING.value == "fingerprinting"
+        assert PipelineState.CLEANING.value == "cleaning"
+        assert PipelineState.TERM_EXTRACTION.value == "term_extraction"
+        assert PipelineState.TRANSLATING.value == "translating"
+        assert PipelineState.UPLOADING.value == "uploading"
+        assert PipelineState.PAUSED.value == "paused"
+        assert PipelineState.COMPLETED.value == "completed"
+        assert PipelineState.FAILED.value == "failed"
+
+    def test_is_terminal(self):
+        """Test terminal state detection."""
+        assert PipelineState.COMPLETED.is_terminal() is True
+        assert PipelineState.FAILED.is_terminal() is True
+        assert PipelineState.IDLE.is_terminal() is False
+        assert PipelineState.TRANSLATING.is_terminal() is False
+
+    def test_is_active(self):
+        """Test active state detection."""
+        active_states = {
+            PipelineState.FINGERPRINTING,
+            PipelineState.CLEANING,
+            PipelineState.TERM_EXTRACTION,
+            PipelineState.TRANSLATING,
+            PipelineState.UPLOADING,
+        }
+        for state in active_states:
+            assert state.is_active() is True
+
+        inactive_states = {
+            PipelineState.IDLE,
+            PipelineState.PAUSED,
+            PipelineState.COMPLETED,
+            PipelineState.FAILED,
+        }
+        for state in inactive_states:
+            assert state.is_active() is False
+
+    def test_can_pause(self):
+        """Test states from which pausing is allowed."""
+        assert PipelineState.TRANSLATING.can_pause() is True
+        assert PipelineState.CLEANING.can_pause() is True
+        assert PipelineState.IDLE.can_pause() is False
+        assert PipelineState.COMPLETED.can_pause() is False
+
+    def test_can_resume(self):
+        """Test states from which resuming is allowed."""
+        assert PipelineState.PAUSED.can_resume() is True
+        assert PipelineState.IDLE.can_resume() is False
+        assert PipelineState.TRANSLATING.can_resume() is False
+
+
+class TestStateTransitions:
+    """Test state transition rules."""
+
+    def test_idle_to_fingerprinting(self):
+        """Test IDLE -> FINGERPRINTING transition."""
+        assert is_transition_allowed(PipelineState.IDLE, PipelineState.FINGERPRINTING) is True
+
+    def test_idle_to_other_states(self):
+        """Test IDLE cannot transition to most states."""
+        assert is_transition_allowed(PipelineState.IDLE, PipelineState.COMPLETED) is False
+        assert is_transition_allowed(PipelineState.IDLE, PipelineState.TRANSLATING) is False
+        assert is_transition_allowed(PipelineState.IDLE, PipelineState.FAILED) is False
+
+    def test_normal_flow(self):
+        """Test normal forward flow through the pipeline."""
+        flow = [
+            (PipelineState.IDLE, PipelineState.FINGERPRINTING),
+            (PipelineState.FINGERPRINTING, PipelineState.CLEANING),
+            (PipelineState.CLEANING, PipelineState.TERM_EXTRACTION),
+            (PipelineState.TERM_EXTRACTION, PipelineState.TRANSLATING),
+            (PipelineState.TRANSLATING, PipelineState.UPLOADING),
+            (PipelineState.UPLOADING, PipelineState.COMPLETED),
+        ]
+        for from_state, to_state in flow:
+            assert is_transition_allowed(from_state, to_state) is True, \
+                f"Failed: {from_state} -> {to_state}"
+
+    def test_failure_transitions(self):
+        """Test transitions to FAILED state."""
+        failure_sources = {
+            PipelineState.FINGERPRINTING,
+            PipelineState.CLEANING,
+            PipelineState.TERM_EXTRACTION,
+            PipelineState.TRANSLATING,
+            PipelineState.UPLOADING,
+        }
+        for state in failure_sources:
+            assert is_transition_allowed(state, PipelineState.FAILED) is True
+
+    def test_pause_to_resume(self):
+        """Test PAUSED can resume to active states."""
+        assert is_transition_allowed(PipelineState.PAUSED, PipelineState.FINGERPRINTING) is True
+        assert is_transition_allowed(PipelineState.PAUSED, PipelineState.TRANSLATING) is True
+        assert is_transition_allowed(PipelineState.PAUSED, PipelineState.UPLOADING) is True
+
+    def test_failed_to_idle(self):
+        """Test FAILED can reset to IDLE."""
+        assert is_transition_allowed(PipelineState.FAILED, PipelineState.IDLE) is True
+
+    def test_completed_to_idle(self):
+        """Test COMPLETED can reset to IDLE."""
+        assert is_transition_allowed(PipelineState.COMPLETED, PipelineState.IDLE) is True
+
+    def test_invalid_transitions(self):
+        """Test some invalid transitions."""
+        # Cannot skip states
+        assert is_transition_allowed(PipelineState.IDLE, PipelineState.TRANSLATING) is False
+        # Cannot go backwards (except via FAILED/COMPLETED -> IDLE)
+        assert is_transition_allowed(PipelineState.TRANSLATING, PipelineState.CLEANING) is False
+        # Cannot go from COMPLETED to active states directly
+        assert is_transition_allowed(PipelineState.COMPLETED, PipelineState.TRANSLATING) is False
+
+
+class TestTransitionHelpers:
+    """Test helper functions for state transitions."""
+
+    def test_get_allowed_transitions(self):
+        """Test getting allowed transitions from a state."""
+        allowed = get_allowed_transitions(PipelineState.IDLE)
+        assert allowed == {PipelineState.FINGERPRINTING}
+
+        allowed = get_allowed_transitions(PipelineState.TRANSLATING)
+        assert PipelineState.UPLOADING in allowed
+        assert PipelineState.FAILED in allowed
+        assert PipelineState.CLEANING not in allowed
+
+    def test_can_pause_from(self):
+        """Test can_pause_from helper."""
+        assert can_pause_from(PipelineState.TRANSLATING) is True
+        assert can_pause_from(PipelineState.UPLOADING) is True
+        assert can_pause_from(PipelineState.IDLE) is False
+        assert can_pause_from(PipelineState.COMPLETED) is False
+
+    def test_can_resume_to(self):
+        """Test can_resume_to helper."""
+        assert can_resume_to(PipelineState.TRANSLATING) is True
+        assert can_resume_to(PipelineState.CLEANING) is True
+        assert can_resume_to(PipelineState.COMPLETED) is False
+        assert can_resume_to(PipelineState.FAILED) is False
+
+    def test_allowed_transitions_completeness(self):
+        """Test that all states have transition rules defined."""
+        all_states = set(PipelineState)
+        defined_states = set(ALLOWED_TRANSITIONS.keys())
+        assert all_states == defined_states, \
+            f"Missing transitions for: {all_states - defined_states}"
+
+
+class TestTransitionEdgeCases:
+    """Test edge cases in state transitions."""
+
+    def test_same_state_transition(self):
+        """Test that staying in same state is not a transition."""
+        # Transitions to same state should not be explicitly allowed
+        # (they're handled as "no transition needed")
+        result = is_transition_allowed(PipelineState.IDLE, PipelineState.IDLE)
+        # This should be False since we didn't define it
+        assert result is False
+
+    def test_all_states_can_reach_terminal(self):
+        """Test that all active states can reach a terminal state."""
+        active_states = {
+            PipelineState.FINGERPRINTING,
+            PipelineState.CLEANING,
+            PipelineState.TERM_EXTRACTION,
+            PipelineState.TRANSLATING,
+            PipelineState.UPLOADING,
+        }
+        for state in active_states:
+            # Can reach FAILED
+            assert is_transition_allowed(state, PipelineState.FAILED), \
+                f"{state} cannot reach FAILED"
+            # Can reach COMPLETED via normal flow
+            assert state == PipelineState.UPLOADING or \
+                   any(is_transition_allowed(state, s) for s in PipelineState), \
+                f"{state} cannot progress"