| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350 |
- """
- Crash recovery management for the pipeline scheduler.
- This module provides checkpoint and recovery functionality
- to ensure the pipeline can resume after crashes.
- """
- import json
- import threading
- from pathlib import Path
- from typing import Optional, Dict, Any, List
- from datetime import datetime
- import hashlib
- from .models import CheckpointData, SchedulerState, ChapterTask, TaskStatus
- class RecoveryManager:
- """
- Manage crash recovery through checkpoints.
- This class handles:
- - Saving checkpoints during pipeline execution
- - Loading checkpoints after a crash
- - Determining recovery state
- - Cleaning up old checkpoints
- Thread-safe: Uses locks for checkpoint operations.
- """
- CHECKPOINT_FILE = "checkpoint.json"
- CHECKPOINT_BACKUP_FILE = "checkpoint.json.bak"
- def __init__(self, work_dir: str | Path):
- """
- Initialize the recovery manager.
- Args:
- work_dir: Working directory for checkpoint storage
- """
- self.work_dir = Path(work_dir)
- self.checkpoint_file = self.work_dir / self.CHECKPOINT_FILE
- self.backup_file = self.work_dir / self.CHECKPOINT_BACKUP_FILE
- self._lock = threading.Lock()
- def save_checkpoint(self, checkpoint: CheckpointData) -> None:
- """
- Save a checkpoint to disk.
- This method uses atomic write (write to temp then rename)
- to ensure checkpoint integrity.
- Args:
- checkpoint: The checkpoint data to save
- """
- with self._lock:
- # Ensure directory exists
- self.work_dir.mkdir(parents=True, exist_ok=True)
- # Serialize checkpoint
- data = {
- "work_id": checkpoint.work_id,
- "current_chapter_index": checkpoint.current_chapter_index,
- "completed_indices": checkpoint.completed_indices,
- "failed_indices": checkpoint.failed_indices,
- "timestamp": checkpoint.timestamp.isoformat(),
- "scheduler_state": checkpoint.scheduler_state.value,
- "metadata": checkpoint.metadata
- }
- # Atomic write: write to temp file first
- temp_file = self.checkpoint_file.with_suffix(".tmp")
- try:
- with open(temp_file, "w", encoding="utf-8") as f:
- json.dump(data, f, indent=2, ensure_ascii=False)
- # Backup existing checkpoint if it exists
- if self.checkpoint_file.exists():
- self.checkpoint_file.replace(self.backup_file)
- # Atomic rename
- temp_file.replace(self.checkpoint_file)
- except Exception as e:
- # Clean up temp file on error
- if temp_file.exists():
- temp_file.unlink()
- raise IOError(f"Failed to save checkpoint: {e}")
- def load_checkpoint(self) -> Optional[CheckpointData]:
- """
- Load a checkpoint from disk.
- Tries to load the main checkpoint file, and falls back
- to the backup if the main file is corrupted.
- Returns:
- The loaded CheckpointData, or None if no checkpoint exists
- """
- with self._lock:
- # Try main checkpoint
- checkpoint = self._load_checkpoint_file(self.checkpoint_file)
- if checkpoint:
- return checkpoint
- # Try backup checkpoint
- checkpoint = self._load_checkpoint_file(self.backup_file)
- if checkpoint:
- return checkpoint
- return None
- def _load_checkpoint_file(self, file_path: Path) -> Optional[CheckpointData]:
- """
- Load checkpoint from a specific file.
- Args:
- file_path: The file to load from
- Returns:
- The loaded CheckpointData, or None if file doesn't exist/is invalid
- """
- if not file_path.exists():
- return None
- try:
- with open(file_path, "r", encoding="utf-8") as f:
- data = json.load(f)
- # Validate required fields
- if "work_id" not in data or "current_chapter_index" not in data:
- return None
- # Parse timestamp
- timestamp = datetime.now()
- if "timestamp" in data:
- try:
- timestamp = datetime.fromisoformat(data["timestamp"])
- except (ValueError, TypeError):
- pass
- # Parse state
- state = SchedulerState.RUNNING
- if "scheduler_state" in data:
- try:
- state = SchedulerState(data["scheduler_state"])
- except ValueError:
- state = SchedulerState.RUNNING
- return CheckpointData(
- work_id=data["work_id"],
- current_chapter_index=data["current_chapter_index"],
- completed_indices=data.get("completed_indices", []),
- failed_indices=data.get("failed_indices", []),
- timestamp=timestamp,
- scheduler_state=state,
- metadata=data.get("metadata", {})
- )
- except (json.JSONDecodeError, IOError, KeyError):
- return None
- def has_checkpoint(self) -> bool:
- """
- Check if a checkpoint exists.
- Returns:
- True if a checkpoint file exists, False otherwise
- """
- return self.checkpoint_file.exists() or self.backup_file.exists()
- def delete_checkpoint(self) -> None:
- """Delete all checkpoint files."""
- with self._lock:
- if self.checkpoint_file.exists():
- self.checkpoint_file.unlink()
- if self.backup_file.exists():
- self.backup_file.unlink()
- def get_checkpoint_age(self) -> Optional[float]:
- """
- Get the age of the checkpoint in seconds.
- Returns:
- Age in seconds, or None if no checkpoint exists
- """
- checkpoint = self.load_checkpoint()
- if checkpoint:
- return (datetime.now() - checkpoint.timestamp).total_seconds()
- return None
- def create_checkpoint_from_progress(
- self,
- work_id: str,
- current_index: int,
- completed_indices: List[int],
- failed_indices: List[int],
- state: SchedulerState = SchedulerState.RUNNING,
- metadata: Optional[Dict[str, Any]] = None
- ) -> CheckpointData:
- """
- Create a checkpoint from progress data.
- Args:
- work_id: Work item ID
- current_index: Current chapter index
- completed_indices: List of completed chapter indices
- failed_indices: List of failed chapter indices
- state: Current scheduler state
- metadata: Optional metadata
- Returns:
- The created CheckpointData
- """
- checkpoint = CheckpointData(
- work_id=work_id,
- current_chapter_index=current_index,
- completed_indices=list(completed_indices),
- failed_indices=list(failed_indices),
- timestamp=datetime.now(),
- scheduler_state=state,
- metadata=metadata or {}
- )
- self.save_checkpoint(checkpoint)
- return checkpoint
- def get_recovery_state(self) -> Optional[Dict[str, Any]]:
- """
- Get the recovery state from checkpoint.
- Returns:
- Dictionary with recovery information:
- - recoverable: Whether recovery is possible
- - work_id: Work ID to resume
- - resume_index: Chapter index to resume from
- - completed_count: Number of completed chapters
- - failed_count: Number of failed chapters
- - checkpoint_age: Age of checkpoint in seconds
- """
- checkpoint = self.load_checkpoint()
- if not checkpoint:
- return None
- return {
- "recoverable": True,
- "work_id": checkpoint.work_id,
- "resume_index": checkpoint.current_chapter_index,
- "completed_count": len(checkpoint.completed_indices),
- "failed_count": len(checkpoint.failed_indices),
- "checkpoint_age": (datetime.now() - checkpoint.timestamp).total_seconds(),
- "state": checkpoint.scheduler_state.value
- }
- def can_resume(self) -> bool:
- """
- Check if the pipeline can be resumed from a checkpoint.
- Returns:
- True if resumable, False otherwise
- """
- recovery_state = self.get_recovery_state()
- if not recovery_state:
- return False
- # Check if checkpoint is recent (within 24 hours)
- if recovery_state["checkpoint_age"] > 86400:
- return False
- return True
- def get_work_id_from_checkpoint(self) -> Optional[str]:
- """
- Get the work ID from the checkpoint.
- Returns:
- The work ID, or None if no checkpoint exists
- """
- checkpoint = self.load_checkpoint()
- return checkpoint.work_id if checkpoint else None
- class AutoCheckpointMixin:
- """
- Mixin for automatic checkpointing.
- Classes can inherit from this to get automatic checkpoint
- saving at regular intervals.
- """
- def __init__(self, *args, checkpoint_interval: int = 5, **kwargs):
- """
- Initialize with auto-checkpoint.
- Args:
- checkpoint_interval: Save checkpoint every N chapters
- """
- super().__init__(*args, **kwargs)
- self.checkpoint_interval = checkpoint_interval
- self._chapters_since_checkpoint = 0
- def should_checkpoint(self) -> bool:
- """
- Check if it's time to save a checkpoint.
- Returns:
- True if checkpoint should be saved
- """
- self._chapters_since_checkpoint += 1
- return self._chapters_since_checkpoint >= self.checkpoint_interval
- def reset_checkpoint_counter(self) -> None:
- """Reset the checkpoint counter."""
- self._chapters_since_checkpoint = 0
- def compute_work_fingerprint(file_path: str | Path) -> str:
- """
- Compute a fingerprint for a work file.
- This fingerprint can be used to detect if the source file
- has changed since a checkpoint was created.
- Args:
- file_path: Path to the file
- Returns:
- Hexadecimal fingerprint string
- """
- path = Path(file_path)
- if not path.exists():
- return ""
- # Use file size, mtime, and first/last 1KB for fingerprint
- stat = path.stat()
- # Read first and last 1KB
- first_kb = b""
- last_kb = b""
- try:
- with open(path, "rb") as f:
- first_kb = f.read(1024)
- if stat.st_size > 2048:
- f.seek(-1024, 2)
- last_kb = f.read(1024)
- except (IOError, OSError):
- pass
- # Compute hash
- content = f"{stat.st_size}{stat.st_mtime}{first_kb}{last_kb}".encode()
- return hashlib.sha256(content).hexdigest()
|