From 7098286b43435721c058f52d920d6548cf3d6a4a Mon Sep 17 00:00:00 2001 From: Marco Cetica Date: Thu, 22 Jan 2026 11:04:10 +0100 Subject: [PATCH] Added signal handler for graceful exit and timestamp prettifier --- backup.py | 146 ++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 113 insertions(+), 33 deletions(-) diff --git a/backup.py b/backup.py index ff373b5..0fe9d45 100755 --- a/backup.py +++ b/backup.py @@ -10,10 +10,11 @@ import os import time import subprocess import hashlib +import signal from pathlib import Path from datetime import datetime from dataclasses import dataclass -from typing import Generic, TypeVar, Union, Optional, List +from typing import Any, Generic, TypeVar, Union, Optional, List T = TypeVar("T") @dataclass(frozen=True) @@ -42,6 +43,44 @@ class BackupState: checksum: bool verbose: bool +class SignalHandler: + """Gracefully handle SIGINT (C-c)""" + def __init__(self) -> None: + self.interrupted = False + self.output_path: Optional[Path] = None + + def setup(self, output_path: Path) -> None: + """Configure signal handler with cleanup paths""" + self.output_path = output_path + signal.signal(signal.SIGINT, self.handle_interrupt) + + def handle_interrupt(self, _sig_num: int, _frame: Any) -> None: + """Handle SIGINT signal""" + # Second C-c: just exit without cleanup + if self.interrupted: + print("\nForced exit. temporary files NOT cleaned.", file=sys.stderr) + sys.exit(130) # that is, 128 + SIGINT(2) + + # First C-c: cleanup and set flag + self.interrupted = True + print( + "\nBackup interrupted.\nCleaning up temporary files (press C-c again to force exit)...", + file=sys.stderr, + end='', + flush=True + ) + + if self.output_path: + temp_files = [ + self.output_path / "backup.py.tmp", + self.output_path / "backup.py.tar.gz" + ] + + Backup.cleanup_files(*temp_files) + + print("DONE.", file=sys.stderr) + sys.exit(130) + class BackupProgress: """Progress indicator for backup operations""" def __init__(self, total: int, operation: str, status_msg: str) -> None: @@ -54,7 +93,7 @@ class BackupProgress: """Print the Backup operation to stdout""" print(self.operation) - def draw_progress_bar(self, message: str = "") -> None: + def draw_progress_bar(self, filename: str = "") -> None: """draw progress bar""" self.current += 1 percentage = (self.current / self.total) * 100 if self.total > 0 else 0 @@ -64,7 +103,20 @@ class BackupProgress: filled = int(bar_width * self.current / self.total) bar = '█' * filled + '░' * (bar_width - filled) - status = f"\r└──{self.operation} [{bar}] {percentage:.1f}% ({self.current}/{self.total}) - (processing '{message}')" + # Truncate filename if it's too long to display + # by keeping the first 30 characters + extension (if available) + # This prevents UI disruption + filename_max_len = 35 + ext_max_len = 10 + if len(filename) > filename_max_len: + ext_idx = filename.rfind('.') + if ext_idx > 0 and len(filename) - ext_idx <= ext_max_len: + ext = filename[ext_idx:] + filename = filename[:filename_max_len - len(ext) - 5] + "..." + ext + else: + filename = filename[:filename_max_len - 5] + + status = f"\r└──{self.operation} [{bar}] {percentage:.1f}% ({self.current}/{self.total}) - (processing '{filename}')" print(f"\r\033[K{status}", end='', flush=True) def complete_task(self) -> None: @@ -86,7 +138,7 @@ class Backup: missing_deps.append(dep) if missing_deps: - return Err(f"Missing dependencies: {', '.join(missing_deps)}") + return Err(f"Missing dependencies: {', '.join(missing_deps)}.") return Ok(None) @@ -106,11 +158,30 @@ class Backup: return f"{size:.2f} {units[idx]}" + @staticmethod + def prettify_timestamp(timestamp: float) -> str: + """Convert a timestamp in seconds to human-readable format""" + timestamp_int = int(timestamp) + + hours = timestamp_int // 3600 + minutes = (timestamp_int % 3600) // 60 + seconds = timestamp_int % 60 + + parts = [] + if hours > 0: + parts.append(f"{hours} hour{'s' if hours != 1 else ''}") + if minutes > 0: + parts.append(f"{minutes} minute{'s' if minutes != 1 else ''}") + if seconds > 0 or not parts: # show seconds if other parts are zero + parts.append(f"{seconds} second{'s' if seconds != 1 else ''}") + + return ", ".join(parts) + @staticmethod def parse_sources_file(sources_file: Path) -> Result[List[BackupSource]]: """Parse the sources file returning a list of BackupSource elements""" if not sources_file.exists(): - return Err("Sources file does not exist") + return Err("Sources file does not exist.") sources: List[BackupSource] = [] try: @@ -121,20 +192,20 @@ class Backup: continue if '=' not in line: - return Err(f"invalid format at line {pos}: '{line}'") + return Err(f"invalid format at line {pos}: '{line}'.") label, path_str = line.split('=', 1) path = Path(path_str.strip()) if not path.exists(): - return Err(f"Path does not exist: {path}") + return Err(f"Path does not exist: {path}.") sources.append(BackupSource(label.strip(), path)) except IOError as err: - return Err(f"Failed to read sources file: {err}") + return Err(f"Failed to read sources file: {err}.") if not sources: - return Err(f"No valid sources found in file") + return Err(f"No valid sources found in file.") return Ok(sources) @@ -207,10 +278,10 @@ class Backup: return Ok(None) - return Err(f"The following source element is neither a file nor a directory: {source}") + return Err(f"The following source element is neither a file nor a directory: {source}.") except (IOError, OSError, shutil.Error) as err: - return Err(f"Copy failed: {err}") + return Err(f"Copy failed: {err}.") @staticmethod def cleanup_files(*paths: Path) -> None: @@ -244,7 +315,7 @@ class Backup: hash_obj.update(byte_block) return Ok(hash_obj.hexdigest()) except IOError as e: - return Err(f"Failed to read file {file_path}: {e}") + return Err(f"Failed to read file {file_path}: {e}.") @staticmethod def count_tar_entries(source_dir: Path) -> int: @@ -283,7 +354,7 @@ class Backup: # Read subprocess output from pipe in buffered mode if verbose and progress is not None: if process.stdout is None: - return Err("Failed to capture output") + return Err("Failed to capture output.") for line in process.stdout: line = line.strip() @@ -297,7 +368,7 @@ class Backup: process.wait() if process.returncode != 0: - return Err("Cannot create compressed archive") + return Err("Cannot create compressed archive.") return Ok(None) @@ -306,7 +377,7 @@ class Backup: """Encrypt a file with GPG in symmetric mode (using AES256)""" if output_file.exists(): - return Err("Encryption failed: archive already exists") + return Err("Encryption failed: archive already exists.") if verbose: print("Encrypting backup...", end='', flush=True) @@ -330,7 +401,7 @@ class Backup: ) if result.returncode != 0: - return Err(f"Encryption failed: {result.stderr.decode()}") + return Err(f"Encryption failed: {result.stderr.decode()}.") if verbose: print("DONE") @@ -341,7 +412,7 @@ class Backup: """Create an encrypted backup from specified sources file""" # Check root permissions if os.geteuid() != 0: - return Err("Run this program as root!") + return Err("Run this program as root.") start_time = time.time() date_str = datetime.now().strftime("%Y%m%d") @@ -423,17 +494,17 @@ class Backup: # Compute file size if not backup_archive.exists(): - return Err("Unable to create backup archive") + return Err("Unable to create backup archive.") elapsed_time = time.time() - start_time file_size = backup_archive.stat().st_size - file_size_hr = Backup.prettify_size(file_size) + file_size_hr = self.prettify_size(file_size) print(f"File name: '{backup_archive}'") if config.checksum: print(f"Checksum file: '{checksum_file}'") print(f"File size: {file_size} bytes ({file_size_hr})") - print(f"Elapsed time: {elapsed_time:.2f} seconds") + print(f"Elapsed time: {self.prettify_timestamp(elapsed_time)}") return Ok(None) @@ -462,7 +533,7 @@ class Backup: ) if result.returncode != 0: - return Err(f"Decryption failed: {result.stderr.decode()}") + return Err(f"Decryption failed: {result.stderr.decode()}.") if verbose: print("DONE") @@ -488,13 +559,13 @@ class Backup: ) entries = list_res.stdout.strip().split('\n') if not entries or not entries[0]: - return Err("Archive is empty or corrupted") + return Err("Archive is empty or corrupted.") # Retrieve root directory from first entry extracted_root = entries[0].split('/')[0] except subprocess.CalledProcessError as err: - return Err(f"Failed to list archive content: {err}") + return Err(f"Failed to list archive content: {err}.") cmd = [ "tar", @@ -520,7 +591,7 @@ class Backup: if verbose and progress is not None: if process.stdout is None: - return Err("Failed to capture output") + return Err("Failed to capture output.") for line in process.stdout: line = line.strip() @@ -533,12 +604,12 @@ class Backup: process.wait() if process.returncode != 0: - return Err("Unable to extract compressed archive") + return Err("Unable to extract compressed archive.") root_path = archive_file.parent / extracted_root if not root_path.exists(): - return Err(f"Extracted '{root_path}' not found") + return Err(f"Extracted '{root_path}' not found.") return Ok(root_path) @@ -549,7 +620,7 @@ class Backup: with open(checksum_file, 'r') as cf: expected_hashes = set(line.strip() for line in cf if line.strip()) except IOError as err: - return Err(f"Failed to load checksum file: {err}") + return Err(f"Failed to load checksum file: {err}.") files = Backup.collect_files(extracted_dir) progress = None @@ -577,6 +648,8 @@ class Backup: def extract_backup(self, archive_file: Path, password: str, checksum_file: Optional[Path], verbose: bool) -> Result[None]: """Extract and verify a backup archive""" + start_time = time.time() + temp_tarball = archive_file.parent / Path("backup.py.tar.gz") decrypt_res = self.decrypt_file(archive_file, temp_tarball, password, verbose) @@ -605,11 +678,17 @@ class Backup: case Ok(): pass self.cleanup_files(temp_tarball) + + elapsed_time = time.time() - start_time + print(f"Backup extracted to: '{extracted_dir.parent.resolve() / extracted_dir}'") + print(f"Elapsed time: {self.prettify_timestamp(elapsed_time)}") return Ok(None) def main(): + signal_handler = SignalHandler() + parser = argparse.ArgumentParser( description="backup.py - modular and lightweight backup utility" ) @@ -643,7 +722,7 @@ def main(): args = parser.parse_args() if not (args.backup or args.extract): - parser.error("specify either --backup or --extract") + parser.error("specify either --backup or --extract.") # Check whether dependencies are installed deps_res = Backup.check_deps() @@ -657,9 +736,9 @@ def main(): if args.backup: sources_file, output_path, encryption_pass = args.backup - sources_path = Path(sources_file) output_dir = Path(output_path) + signal_handler.setup(output_dir) # Create output directory if it doesn't exist if not output_dir.exists(): @@ -690,9 +769,10 @@ def main(): elif args.extract: archive_file = Path(args.extract[0]) + signal_handler.setup(archive_file.parent) if not archive_file.exists(): - print(f"Archive file '{archive_file}' does not exist", file=sys.stderr) + print(f"Archive file '{archive_file}' does not exist.", file=sys.stderr) sys.exit(1) decryption_pass: str = "" @@ -701,18 +781,18 @@ def main(): if len(args.extract) >= 2: decryption_pass = args.extract[1] else: - print("--extract flag requires decryption password as second argument", file=sys.stderr) + print("--extract flag requires decryption password as second argument.", file=sys.stderr) sys.exit(1) if args.checksum: if len(args.extract) >= 3: checksum_file = Path(args.extract[2]) else: - print("--checksum flag requires SHA256 file as third argument", file=sys.stderr) + print("--checksum flag requires SHA256 file as third argument.", file=sys.stderr) sys.exit(1) if not checksum_file.exists(): - print(f"Checksum file '{checksum_file}' does not exist", file=sys.stderr) + print(f"Checksum file '{checksum_file}' does not exist.", file=sys.stderr) sys.exit(1) extract_res = backup.extract_backup(archive_file, decryption_pass, checksum_file, args.verbose)