recovery.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. """
  2. Crash recovery management for the pipeline scheduler.
  3. This module provides checkpoint and recovery functionality
  4. to ensure the pipeline can resume after crashes.
  5. """
  6. import json
  7. import threading
  8. from pathlib import Path
  9. from typing import Optional, Dict, Any, List
  10. from datetime import datetime
  11. import hashlib
  12. from .models import CheckpointData, SchedulerState, ChapterTask, TaskStatus
  13. class RecoveryManager:
  14. """
  15. Manage crash recovery through checkpoints.
  16. This class handles:
  17. - Saving checkpoints during pipeline execution
  18. - Loading checkpoints after a crash
  19. - Determining recovery state
  20. - Cleaning up old checkpoints
  21. Thread-safe: Uses locks for checkpoint operations.
  22. """
  23. CHECKPOINT_FILE = "checkpoint.json"
  24. CHECKPOINT_BACKUP_FILE = "checkpoint.json.bak"
  25. def __init__(self, work_dir: str | Path):
  26. """
  27. Initialize the recovery manager.
  28. Args:
  29. work_dir: Working directory for checkpoint storage
  30. """
  31. self.work_dir = Path(work_dir)
  32. self.checkpoint_file = self.work_dir / self.CHECKPOINT_FILE
  33. self.backup_file = self.work_dir / self.CHECKPOINT_BACKUP_FILE
  34. self._lock = threading.Lock()
  35. def save_checkpoint(self, checkpoint: CheckpointData) -> None:
  36. """
  37. Save a checkpoint to disk.
  38. This method uses atomic write (write to temp then rename)
  39. to ensure checkpoint integrity.
  40. Args:
  41. checkpoint: The checkpoint data to save
  42. """
  43. with self._lock:
  44. # Ensure directory exists
  45. self.work_dir.mkdir(parents=True, exist_ok=True)
  46. # Serialize checkpoint
  47. data = {
  48. "work_id": checkpoint.work_id,
  49. "current_chapter_index": checkpoint.current_chapter_index,
  50. "completed_indices": checkpoint.completed_indices,
  51. "failed_indices": checkpoint.failed_indices,
  52. "timestamp": checkpoint.timestamp.isoformat(),
  53. "scheduler_state": checkpoint.scheduler_state.value,
  54. "metadata": checkpoint.metadata
  55. }
  56. # Atomic write: write to temp file first
  57. temp_file = self.checkpoint_file.with_suffix(".tmp")
  58. try:
  59. with open(temp_file, "w", encoding="utf-8") as f:
  60. json.dump(data, f, indent=2, ensure_ascii=False)
  61. # Backup existing checkpoint if it exists
  62. if self.checkpoint_file.exists():
  63. self.checkpoint_file.replace(self.backup_file)
  64. # Atomic rename
  65. temp_file.replace(self.checkpoint_file)
  66. except Exception as e:
  67. # Clean up temp file on error
  68. if temp_file.exists():
  69. temp_file.unlink()
  70. raise IOError(f"Failed to save checkpoint: {e}")
  71. def load_checkpoint(self) -> Optional[CheckpointData]:
  72. """
  73. Load a checkpoint from disk.
  74. Tries to load the main checkpoint file, and falls back
  75. to the backup if the main file is corrupted.
  76. Returns:
  77. The loaded CheckpointData, or None if no checkpoint exists
  78. """
  79. with self._lock:
  80. # Try main checkpoint
  81. checkpoint = self._load_checkpoint_file(self.checkpoint_file)
  82. if checkpoint:
  83. return checkpoint
  84. # Try backup checkpoint
  85. checkpoint = self._load_checkpoint_file(self.backup_file)
  86. if checkpoint:
  87. return checkpoint
  88. return None
  89. def _load_checkpoint_file(self, file_path: Path) -> Optional[CheckpointData]:
  90. """
  91. Load checkpoint from a specific file.
  92. Args:
  93. file_path: The file to load from
  94. Returns:
  95. The loaded CheckpointData, or None if file doesn't exist/is invalid
  96. """
  97. if not file_path.exists():
  98. return None
  99. try:
  100. with open(file_path, "r", encoding="utf-8") as f:
  101. data = json.load(f)
  102. # Validate required fields
  103. if "work_id" not in data or "current_chapter_index" not in data:
  104. return None
  105. # Parse timestamp
  106. timestamp = datetime.now()
  107. if "timestamp" in data:
  108. try:
  109. timestamp = datetime.fromisoformat(data["timestamp"])
  110. except (ValueError, TypeError):
  111. pass
  112. # Parse state
  113. state = SchedulerState.RUNNING
  114. if "scheduler_state" in data:
  115. try:
  116. state = SchedulerState(data["scheduler_state"])
  117. except ValueError:
  118. state = SchedulerState.RUNNING
  119. return CheckpointData(
  120. work_id=data["work_id"],
  121. current_chapter_index=data["current_chapter_index"],
  122. completed_indices=data.get("completed_indices", []),
  123. failed_indices=data.get("failed_indices", []),
  124. timestamp=timestamp,
  125. scheduler_state=state,
  126. metadata=data.get("metadata", {})
  127. )
  128. except (json.JSONDecodeError, IOError, KeyError):
  129. return None
  130. def has_checkpoint(self) -> bool:
  131. """
  132. Check if a checkpoint exists.
  133. Returns:
  134. True if a checkpoint file exists, False otherwise
  135. """
  136. return self.checkpoint_file.exists() or self.backup_file.exists()
  137. def delete_checkpoint(self) -> None:
  138. """Delete all checkpoint files."""
  139. with self._lock:
  140. if self.checkpoint_file.exists():
  141. self.checkpoint_file.unlink()
  142. if self.backup_file.exists():
  143. self.backup_file.unlink()
  144. def get_checkpoint_age(self) -> Optional[float]:
  145. """
  146. Get the age of the checkpoint in seconds.
  147. Returns:
  148. Age in seconds, or None if no checkpoint exists
  149. """
  150. checkpoint = self.load_checkpoint()
  151. if checkpoint:
  152. return (datetime.now() - checkpoint.timestamp).total_seconds()
  153. return None
  154. def create_checkpoint_from_progress(
  155. self,
  156. work_id: str,
  157. current_index: int,
  158. completed_indices: List[int],
  159. failed_indices: List[int],
  160. state: SchedulerState = SchedulerState.RUNNING,
  161. metadata: Optional[Dict[str, Any]] = None
  162. ) -> CheckpointData:
  163. """
  164. Create a checkpoint from progress data.
  165. Args:
  166. work_id: Work item ID
  167. current_index: Current chapter index
  168. completed_indices: List of completed chapter indices
  169. failed_indices: List of failed chapter indices
  170. state: Current scheduler state
  171. metadata: Optional metadata
  172. Returns:
  173. The created CheckpointData
  174. """
  175. checkpoint = CheckpointData(
  176. work_id=work_id,
  177. current_chapter_index=current_index,
  178. completed_indices=list(completed_indices),
  179. failed_indices=list(failed_indices),
  180. timestamp=datetime.now(),
  181. scheduler_state=state,
  182. metadata=metadata or {}
  183. )
  184. self.save_checkpoint(checkpoint)
  185. return checkpoint
  186. def get_recovery_state(self) -> Optional[Dict[str, Any]]:
  187. """
  188. Get the recovery state from checkpoint.
  189. Returns:
  190. Dictionary with recovery information:
  191. - recoverable: Whether recovery is possible
  192. - work_id: Work ID to resume
  193. - resume_index: Chapter index to resume from
  194. - completed_count: Number of completed chapters
  195. - failed_count: Number of failed chapters
  196. - checkpoint_age: Age of checkpoint in seconds
  197. """
  198. checkpoint = self.load_checkpoint()
  199. if not checkpoint:
  200. return None
  201. return {
  202. "recoverable": True,
  203. "work_id": checkpoint.work_id,
  204. "resume_index": checkpoint.current_chapter_index,
  205. "completed_count": len(checkpoint.completed_indices),
  206. "failed_count": len(checkpoint.failed_indices),
  207. "checkpoint_age": (datetime.now() - checkpoint.timestamp).total_seconds(),
  208. "state": checkpoint.scheduler_state.value
  209. }
  210. def can_resume(self) -> bool:
  211. """
  212. Check if the pipeline can be resumed from a checkpoint.
  213. Returns:
  214. True if resumable, False otherwise
  215. """
  216. recovery_state = self.get_recovery_state()
  217. if not recovery_state:
  218. return False
  219. # Check if checkpoint is recent (within 24 hours)
  220. if recovery_state["checkpoint_age"] > 86400:
  221. return False
  222. return True
  223. def get_work_id_from_checkpoint(self) -> Optional[str]:
  224. """
  225. Get the work ID from the checkpoint.
  226. Returns:
  227. The work ID, or None if no checkpoint exists
  228. """
  229. checkpoint = self.load_checkpoint()
  230. return checkpoint.work_id if checkpoint else None
  231. class AutoCheckpointMixin:
  232. """
  233. Mixin for automatic checkpointing.
  234. Classes can inherit from this to get automatic checkpoint
  235. saving at regular intervals.
  236. """
  237. def __init__(self, *args, checkpoint_interval: int = 5, **kwargs):
  238. """
  239. Initialize with auto-checkpoint.
  240. Args:
  241. checkpoint_interval: Save checkpoint every N chapters
  242. """
  243. super().__init__(*args, **kwargs)
  244. self.checkpoint_interval = checkpoint_interval
  245. self._chapters_since_checkpoint = 0
  246. def should_checkpoint(self) -> bool:
  247. """
  248. Check if it's time to save a checkpoint.
  249. Returns:
  250. True if checkpoint should be saved
  251. """
  252. self._chapters_since_checkpoint += 1
  253. return self._chapters_since_checkpoint >= self.checkpoint_interval
  254. def reset_checkpoint_counter(self) -> None:
  255. """Reset the checkpoint counter."""
  256. self._chapters_since_checkpoint = 0
  257. def compute_work_fingerprint(file_path: str | Path) -> str:
  258. """
  259. Compute a fingerprint for a work file.
  260. This fingerprint can be used to detect if the source file
  261. has changed since a checkpoint was created.
  262. Args:
  263. file_path: Path to the file
  264. Returns:
  265. Hexadecimal fingerprint string
  266. """
  267. path = Path(file_path)
  268. if not path.exists():
  269. return ""
  270. # Use file size, mtime, and first/last 1KB for fingerprint
  271. stat = path.stat()
  272. # Read first and last 1KB
  273. first_kb = b""
  274. last_kb = b""
  275. try:
  276. with open(path, "rb") as f:
  277. first_kb = f.read(1024)
  278. if stat.st_size > 2048:
  279. f.seek(-1024, 2)
  280. last_kb = f.read(1024)
  281. except (IOError, OSError):
  282. pass
  283. # Compute hash
  284. content = f"{stat.st_size}{stat.st_mtime}{first_kb}{last_kb}".encode()
  285. return hashlib.sha256(content).hexdigest()