Added signal handler for graceful exit and timestamp prettifier

This commit is contained in:
2026-01-22 11:04:10 +01:00
parent a8d67af00b
commit 7098286b43

146
backup.py
View File

@@ -10,10 +10,11 @@ import os
import time import time
import subprocess import subprocess
import hashlib import hashlib
import signal
from pathlib import Path from pathlib import Path
from datetime import datetime from datetime import datetime
from dataclasses import dataclass from dataclasses import dataclass
from typing import Generic, TypeVar, Union, Optional, List from typing import Any, Generic, TypeVar, Union, Optional, List
T = TypeVar("T") T = TypeVar("T")
@dataclass(frozen=True) @dataclass(frozen=True)
@@ -42,6 +43,44 @@ class BackupState:
checksum: bool checksum: bool
verbose: bool verbose: bool
class SignalHandler:
"""Gracefully handle SIGINT (C-c)"""
def __init__(self) -> None:
self.interrupted = False
self.output_path: Optional[Path] = None
def setup(self, output_path: Path) -> None:
"""Configure signal handler with cleanup paths"""
self.output_path = output_path
signal.signal(signal.SIGINT, self.handle_interrupt)
def handle_interrupt(self, _sig_num: int, _frame: Any) -> None:
"""Handle SIGINT signal"""
# Second C-c: just exit without cleanup
if self.interrupted:
print("\nForced exit. temporary files NOT cleaned.", file=sys.stderr)
sys.exit(130) # that is, 128 + SIGINT(2)
# First C-c: cleanup and set flag
self.interrupted = True
print(
"\nBackup interrupted.\nCleaning up temporary files (press C-c again to force exit)...",
file=sys.stderr,
end='',
flush=True
)
if self.output_path:
temp_files = [
self.output_path / "backup.py.tmp",
self.output_path / "backup.py.tar.gz"
]
Backup.cleanup_files(*temp_files)
print("DONE.", file=sys.stderr)
sys.exit(130)
class BackupProgress: class BackupProgress:
"""Progress indicator for backup operations""" """Progress indicator for backup operations"""
def __init__(self, total: int, operation: str, status_msg: str) -> None: def __init__(self, total: int, operation: str, status_msg: str) -> None:
@@ -54,7 +93,7 @@ class BackupProgress:
"""Print the Backup operation to stdout""" """Print the Backup operation to stdout"""
print(self.operation) print(self.operation)
def draw_progress_bar(self, message: str = "") -> None: def draw_progress_bar(self, filename: str = "") -> None:
"""draw progress bar""" """draw progress bar"""
self.current += 1 self.current += 1
percentage = (self.current / self.total) * 100 if self.total > 0 else 0 percentage = (self.current / self.total) * 100 if self.total > 0 else 0
@@ -64,7 +103,20 @@ class BackupProgress:
filled = int(bar_width * self.current / self.total) filled = int(bar_width * self.current / self.total)
bar = '' * filled + '' * (bar_width - filled) bar = '' * filled + '' * (bar_width - filled)
status = f"\r└──{self.operation} [{bar}] {percentage:.1f}% ({self.current}/{self.total}) - (processing '{message}')" # Truncate filename if it's too long to display
# by keeping the first 30 characters + extension (if available)
# This prevents UI disruption
filename_max_len = 35
ext_max_len = 10
if len(filename) > filename_max_len:
ext_idx = filename.rfind('.')
if ext_idx > 0 and len(filename) - ext_idx <= ext_max_len:
ext = filename[ext_idx:]
filename = filename[:filename_max_len - len(ext) - 5] + "..." + ext
else:
filename = filename[:filename_max_len - 5]
status = f"\r└──{self.operation} [{bar}] {percentage:.1f}% ({self.current}/{self.total}) - (processing '{filename}')"
print(f"\r\033[K{status}", end='', flush=True) print(f"\r\033[K{status}", end='', flush=True)
def complete_task(self) -> None: def complete_task(self) -> None:
@@ -86,7 +138,7 @@ class Backup:
missing_deps.append(dep) missing_deps.append(dep)
if missing_deps: if missing_deps:
return Err(f"Missing dependencies: {', '.join(missing_deps)}") return Err(f"Missing dependencies: {', '.join(missing_deps)}.")
return Ok(None) return Ok(None)
@@ -106,11 +158,30 @@ class Backup:
return f"{size:.2f} {units[idx]}" return f"{size:.2f} {units[idx]}"
@staticmethod
def prettify_timestamp(timestamp: float) -> str:
"""Convert a timestamp in seconds to human-readable format"""
timestamp_int = int(timestamp)
hours = timestamp_int // 3600
minutes = (timestamp_int % 3600) // 60
seconds = timestamp_int % 60
parts = []
if hours > 0:
parts.append(f"{hours} hour{'s' if hours != 1 else ''}")
if minutes > 0:
parts.append(f"{minutes} minute{'s' if minutes != 1 else ''}")
if seconds > 0 or not parts: # show seconds if other parts are zero
parts.append(f"{seconds} second{'s' if seconds != 1 else ''}")
return ", ".join(parts)
@staticmethod @staticmethod
def parse_sources_file(sources_file: Path) -> Result[List[BackupSource]]: def parse_sources_file(sources_file: Path) -> Result[List[BackupSource]]:
"""Parse the sources file returning a list of BackupSource elements""" """Parse the sources file returning a list of BackupSource elements"""
if not sources_file.exists(): if not sources_file.exists():
return Err("Sources file does not exist") return Err("Sources file does not exist.")
sources: List[BackupSource] = [] sources: List[BackupSource] = []
try: try:
@@ -121,20 +192,20 @@ class Backup:
continue continue
if '=' not in line: if '=' not in line:
return Err(f"invalid format at line {pos}: '{line}'") return Err(f"invalid format at line {pos}: '{line}'.")
label, path_str = line.split('=', 1) label, path_str = line.split('=', 1)
path = Path(path_str.strip()) path = Path(path_str.strip())
if not path.exists(): if not path.exists():
return Err(f"Path does not exist: {path}") return Err(f"Path does not exist: {path}.")
sources.append(BackupSource(label.strip(), path)) sources.append(BackupSource(label.strip(), path))
except IOError as err: except IOError as err:
return Err(f"Failed to read sources file: {err}") return Err(f"Failed to read sources file: {err}.")
if not sources: if not sources:
return Err(f"No valid sources found in file") return Err(f"No valid sources found in file.")
return Ok(sources) return Ok(sources)
@@ -207,10 +278,10 @@ class Backup:
return Ok(None) return Ok(None)
return Err(f"The following source element is neither a file nor a directory: {source}") return Err(f"The following source element is neither a file nor a directory: {source}.")
except (IOError, OSError, shutil.Error) as err: except (IOError, OSError, shutil.Error) as err:
return Err(f"Copy failed: {err}") return Err(f"Copy failed: {err}.")
@staticmethod @staticmethod
def cleanup_files(*paths: Path) -> None: def cleanup_files(*paths: Path) -> None:
@@ -244,7 +315,7 @@ class Backup:
hash_obj.update(byte_block) hash_obj.update(byte_block)
return Ok(hash_obj.hexdigest()) return Ok(hash_obj.hexdigest())
except IOError as e: except IOError as e:
return Err(f"Failed to read file {file_path}: {e}") return Err(f"Failed to read file {file_path}: {e}.")
@staticmethod @staticmethod
def count_tar_entries(source_dir: Path) -> int: def count_tar_entries(source_dir: Path) -> int:
@@ -283,7 +354,7 @@ class Backup:
# Read subprocess output from pipe in buffered mode # Read subprocess output from pipe in buffered mode
if verbose and progress is not None: if verbose and progress is not None:
if process.stdout is None: if process.stdout is None:
return Err("Failed to capture output") return Err("Failed to capture output.")
for line in process.stdout: for line in process.stdout:
line = line.strip() line = line.strip()
@@ -297,7 +368,7 @@ class Backup:
process.wait() process.wait()
if process.returncode != 0: if process.returncode != 0:
return Err("Cannot create compressed archive") return Err("Cannot create compressed archive.")
return Ok(None) return Ok(None)
@@ -306,7 +377,7 @@ class Backup:
"""Encrypt a file with GPG in symmetric mode (using AES256)""" """Encrypt a file with GPG in symmetric mode (using AES256)"""
if output_file.exists(): if output_file.exists():
return Err("Encryption failed: archive already exists") return Err("Encryption failed: archive already exists.")
if verbose: if verbose:
print("Encrypting backup...", end='', flush=True) print("Encrypting backup...", end='', flush=True)
@@ -330,7 +401,7 @@ class Backup:
) )
if result.returncode != 0: if result.returncode != 0:
return Err(f"Encryption failed: {result.stderr.decode()}") return Err(f"Encryption failed: {result.stderr.decode()}.")
if verbose: if verbose:
print("DONE") print("DONE")
@@ -341,7 +412,7 @@ class Backup:
"""Create an encrypted backup from specified sources file""" """Create an encrypted backup from specified sources file"""
# Check root permissions # Check root permissions
if os.geteuid() != 0: if os.geteuid() != 0:
return Err("Run this program as root!") return Err("Run this program as root.")
start_time = time.time() start_time = time.time()
date_str = datetime.now().strftime("%Y%m%d") date_str = datetime.now().strftime("%Y%m%d")
@@ -423,17 +494,17 @@ class Backup:
# Compute file size # Compute file size
if not backup_archive.exists(): if not backup_archive.exists():
return Err("Unable to create backup archive") return Err("Unable to create backup archive.")
elapsed_time = time.time() - start_time elapsed_time = time.time() - start_time
file_size = backup_archive.stat().st_size file_size = backup_archive.stat().st_size
file_size_hr = Backup.prettify_size(file_size) file_size_hr = self.prettify_size(file_size)
print(f"File name: '{backup_archive}'") print(f"File name: '{backup_archive}'")
if config.checksum: if config.checksum:
print(f"Checksum file: '{checksum_file}'") print(f"Checksum file: '{checksum_file}'")
print(f"File size: {file_size} bytes ({file_size_hr})") print(f"File size: {file_size} bytes ({file_size_hr})")
print(f"Elapsed time: {elapsed_time:.2f} seconds") print(f"Elapsed time: {self.prettify_timestamp(elapsed_time)}")
return Ok(None) return Ok(None)
@@ -462,7 +533,7 @@ class Backup:
) )
if result.returncode != 0: if result.returncode != 0:
return Err(f"Decryption failed: {result.stderr.decode()}") return Err(f"Decryption failed: {result.stderr.decode()}.")
if verbose: if verbose:
print("DONE") print("DONE")
@@ -488,13 +559,13 @@ class Backup:
) )
entries = list_res.stdout.strip().split('\n') entries = list_res.stdout.strip().split('\n')
if not entries or not entries[0]: if not entries or not entries[0]:
return Err("Archive is empty or corrupted") return Err("Archive is empty or corrupted.")
# Retrieve root directory from first entry # Retrieve root directory from first entry
extracted_root = entries[0].split('/')[0] extracted_root = entries[0].split('/')[0]
except subprocess.CalledProcessError as err: except subprocess.CalledProcessError as err:
return Err(f"Failed to list archive content: {err}") return Err(f"Failed to list archive content: {err}.")
cmd = [ cmd = [
"tar", "tar",
@@ -520,7 +591,7 @@ class Backup:
if verbose and progress is not None: if verbose and progress is not None:
if process.stdout is None: if process.stdout is None:
return Err("Failed to capture output") return Err("Failed to capture output.")
for line in process.stdout: for line in process.stdout:
line = line.strip() line = line.strip()
@@ -533,12 +604,12 @@ class Backup:
process.wait() process.wait()
if process.returncode != 0: if process.returncode != 0:
return Err("Unable to extract compressed archive") return Err("Unable to extract compressed archive.")
root_path = archive_file.parent / extracted_root root_path = archive_file.parent / extracted_root
if not root_path.exists(): if not root_path.exists():
return Err(f"Extracted '{root_path}' not found") return Err(f"Extracted '{root_path}' not found.")
return Ok(root_path) return Ok(root_path)
@@ -549,7 +620,7 @@ class Backup:
with open(checksum_file, 'r') as cf: with open(checksum_file, 'r') as cf:
expected_hashes = set(line.strip() for line in cf if line.strip()) expected_hashes = set(line.strip() for line in cf if line.strip())
except IOError as err: except IOError as err:
return Err(f"Failed to load checksum file: {err}") return Err(f"Failed to load checksum file: {err}.")
files = Backup.collect_files(extracted_dir) files = Backup.collect_files(extracted_dir)
progress = None progress = None
@@ -577,6 +648,8 @@ class Backup:
def extract_backup(self, archive_file: Path, password: str, checksum_file: Optional[Path], verbose: bool) -> Result[None]: def extract_backup(self, archive_file: Path, password: str, checksum_file: Optional[Path], verbose: bool) -> Result[None]:
"""Extract and verify a backup archive""" """Extract and verify a backup archive"""
start_time = time.time()
temp_tarball = archive_file.parent / Path("backup.py.tar.gz") temp_tarball = archive_file.parent / Path("backup.py.tar.gz")
decrypt_res = self.decrypt_file(archive_file, temp_tarball, password, verbose) decrypt_res = self.decrypt_file(archive_file, temp_tarball, password, verbose)
@@ -605,11 +678,17 @@ class Backup:
case Ok(): pass case Ok(): pass
self.cleanup_files(temp_tarball) self.cleanup_files(temp_tarball)
elapsed_time = time.time() - start_time
print(f"Backup extracted to: '{extracted_dir.parent.resolve() / extracted_dir}'") print(f"Backup extracted to: '{extracted_dir.parent.resolve() / extracted_dir}'")
print(f"Elapsed time: {self.prettify_timestamp(elapsed_time)}")
return Ok(None) return Ok(None)
def main(): def main():
signal_handler = SignalHandler()
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="backup.py - modular and lightweight backup utility" description="backup.py - modular and lightweight backup utility"
) )
@@ -643,7 +722,7 @@ def main():
args = parser.parse_args() args = parser.parse_args()
if not (args.backup or args.extract): if not (args.backup or args.extract):
parser.error("specify either --backup or --extract") parser.error("specify either --backup or --extract.")
# Check whether dependencies are installed # Check whether dependencies are installed
deps_res = Backup.check_deps() deps_res = Backup.check_deps()
@@ -657,9 +736,9 @@ def main():
if args.backup: if args.backup:
sources_file, output_path, encryption_pass = args.backup sources_file, output_path, encryption_pass = args.backup
sources_path = Path(sources_file) sources_path = Path(sources_file)
output_dir = Path(output_path) output_dir = Path(output_path)
signal_handler.setup(output_dir)
# Create output directory if it doesn't exist # Create output directory if it doesn't exist
if not output_dir.exists(): if not output_dir.exists():
@@ -690,9 +769,10 @@ def main():
elif args.extract: elif args.extract:
archive_file = Path(args.extract[0]) archive_file = Path(args.extract[0])
signal_handler.setup(archive_file.parent)
if not archive_file.exists(): if not archive_file.exists():
print(f"Archive file '{archive_file}' does not exist", file=sys.stderr) print(f"Archive file '{archive_file}' does not exist.", file=sys.stderr)
sys.exit(1) sys.exit(1)
decryption_pass: str = "" decryption_pass: str = ""
@@ -701,18 +781,18 @@ def main():
if len(args.extract) >= 2: if len(args.extract) >= 2:
decryption_pass = args.extract[1] decryption_pass = args.extract[1]
else: else:
print("--extract flag requires decryption password as second argument", file=sys.stderr) print("--extract flag requires decryption password as second argument.", file=sys.stderr)
sys.exit(1) sys.exit(1)
if args.checksum: if args.checksum:
if len(args.extract) >= 3: if len(args.extract) >= 3:
checksum_file = Path(args.extract[2]) checksum_file = Path(args.extract[2])
else: else:
print("--checksum flag requires SHA256 file as third argument", file=sys.stderr) print("--checksum flag requires SHA256 file as third argument.", file=sys.stderr)
sys.exit(1) sys.exit(1)
if not checksum_file.exists(): if not checksum_file.exists():
print(f"Checksum file '{checksum_file}' does not exist", file=sys.stderr) print(f"Checksum file '{checksum_file}' does not exist.", file=sys.stderr)
sys.exit(1) sys.exit(1)
extract_res = backup.extract_backup(archive_file, decryption_pass, checksum_file, args.verbose) extract_res = backup.extract_backup(archive_file, decryption_pass, checksum_file, args.verbose)