From 60556f065543df489c116fcaa9fb701d7816af0f Mon Sep 17 00:00:00 2001 From: Marco Cetica Date: Wed, 21 Jan 2026 18:10:28 +0100 Subject: [PATCH] Added extract feature and improved CLI user interface --- backup.py | 284 +++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 260 insertions(+), 24 deletions(-) diff --git a/backup.py b/backup.py index 2cc6f61..8ac9d5e 100755 --- a/backup.py +++ b/backup.py @@ -8,14 +8,12 @@ import shutil import sys import os import time -import tarfile import subprocess import hashlib -import getpass from pathlib import Path from datetime import datetime from dataclasses import dataclass -from typing import Generic, TypeVar, Union, Literal, List +from typing import Generic, TypeVar, Union, Literal, Optional, List T = TypeVar("T") @dataclass(frozen=True) @@ -63,11 +61,13 @@ class BackupProgress: filled = int(bar_width * self.current / self.total) bar = '█' * filled + '░' * (bar_width - filled) - status = f"\r :: {self.operation} [{bar}] {percentage:.1f}% ({self.current}/{self.total}) - (processing '{message}') ::" + status = f"\r└──{self.operation} [{bar}] {percentage:.1f}% ({self.current}/{self.total}) - (processing '{message}')" print(f"\r\033[K{status}", end='', flush=True) - def finish(self) -> None: - """Print new line""" + def finish(self, initial_message: str, new_line: bool) -> None: + """Close the CLI UI""" + esc_char = 'L' if new_line else 'A' + print(f'\033[{esc_char}\r{initial_message}DONE') print() class Backup: @@ -210,7 +210,7 @@ class Backup: def cleanup_files(*paths: Path) -> None: """Clean up temporary files and directories""" for path in paths: - if not path.exists(): + if path is None or not path.exists(): continue if path.is_dir(): @@ -240,11 +240,19 @@ class Backup: except IOError as e: return Err(f"Failed to read file {file_path}: {e}") + @staticmethod + def count_tar_entries(source_dir: Path) -> int: + """Count all entries (files, dirs) that tar processes""" + return sum(1 for _ in source_dir.rglob('*')) + @staticmethod def create_tarball(source_dir: Path, output_file: Path, verbose: bool) -> Result[None]: """Create a compressed tar archive of the backup directory""" + progress: BackupProgress | None = None if verbose: - print("> Compressing backup...") + print("Compressing backup...") + total_entries = Backup.count_tar_entries(source_dir) + progress = BackupProgress(total_entries, "compressing") cmd = [ "tar", @@ -255,16 +263,35 @@ class Backup: source_dir.name ] - # if verbose: - # cmd.insert(1, "-v") + if verbose: + cmd.insert(1, "-v") - # capture here means suppress it/holding it - result = subprocess.run(cmd, capture_output=not verbose, text=True) + process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1 + ) - if result.returncode != 0: - error_msg = f"tar failed: {result.stderr if result.stderr else 'unknown error code'}" + # Read subprocess output from pipe in buffered mode + if verbose and progress is not None: + if process.stdout is None: + return Err("Failed to capture output") - return Err(error_msg) + for line in process.stdout: + line = line.strip() + if line: + # Extract filename from path + filename = Path(line).name + progress.draw_progress_bar(filename) + progress.finish("Compressing backup...", False) + + # Wait for subprocess to complete + process.wait() + + if process.returncode != 0: + return Err("Cannot create compressed archive") return Ok(None) @@ -276,7 +303,7 @@ class Backup: return Err("Encryption failed: archive already exists") if verbose: - print("> Encrypting backup...", end='', flush=True) + print("Encrypting backup...", end='', flush=True) cmd = [ "gpg", "-a", @@ -313,19 +340,19 @@ class Backup: hostname = os.uname().nodename # Create working directory - work_dir = config.output_path / "backup.sh.tmp" + work_dir = config.output_path / "backup.py.tmp" if not work_dir.exists(): work_dir.mkdir(parents=True, exist_ok=True) # Format output files backup_archive = config.output_path / f"backup-{hostname}-{date_str}.tar.gz.enc" checksum_file = config.output_path / f"backup-{hostname}-{date_str}.sha256" - temp_tarball = config.output_path / "backup.sh.tar.gz" + temp_tarball = config.output_path / "backup.py.tar.gz" # Backup each source sources_count = len(config.sources) for idx, source in enumerate(config.sources, 1): - print(f"> Copying {source.label} ({idx}/{sources_count})") + print(f"Copying {source.label} ({idx}/{sources_count})") # Create source subdirectory source_dir = work_dir / f"backup-{source.label}-{date_str}" @@ -347,7 +374,7 @@ class Backup: backup_progress: BackupProgress | None = None if config.verbose: - backup_progress = BackupProgress(len(files), "Computing checksum") + backup_progress = BackupProgress(len(files), "computing") with open(checksum_file, 'a') as checksum_fd: for file in files: @@ -364,7 +391,7 @@ class Backup: backup_progress.draw_progress_bar(str(file.name)) if config.verbose and backup_progress is not None: - backup_progress.finish() + backup_progress.finish("Computing checksums...", True) # Create compressed archive archive_res = self.create_tarball(work_dir, temp_tarball, config.verbose) @@ -395,12 +422,185 @@ class Backup: print(f"File name: '{backup_archive}'") if config.checksum: - print(f"Checksum file: {checksum_file}") + print(f"Checksum file: '{checksum_file}'") print(f"File size: {file_size} bytes ({file_size_hr})") print(f"Elapsed time: {elapsed_time:.2f} seconds") return Ok(None) + + @staticmethod + def decrypt_file(input_file: Path, output_file: Path, password: str, verbose: bool) -> Result[None]: + """Decrypt an encrypted backup archive""" + if verbose: + print("Decrypting backup...", end='', flush=True) + + cmd = [ + "gpg", "-a", + "--quiet", + "--decrypt", + "--no-symkey-cache", + "--pinentry-mode=loopback", + "--batch", + "--passphrase-fd", "0", + "--output", str(output_file), + str(input_file) + ] + + result = subprocess.run( + cmd, + input=password.encode(), + capture_output=True + ) + + if result.returncode != 0: + return Err(f"Decryption failed: {result.stderr.decode()}") + + if verbose: + print("DONE") + + return Ok(None) + + @staticmethod + def extract_tarball(archive_file: Path, verbose: bool) -> Result[Path]: + """Extract a tar archive and return the extracted path""" + if verbose: + print("Extracting backup...") + + extracted_root: str = "" + + # Count archive content + list_cmd = ["tar", "-tzf", str(archive_file)] + try: + list_res = subprocess.run( + list_cmd, + capture_output=True, + text=True, + check=True + ) + entries = list_res.stdout.strip().split('\n') + if not entries or not entries[0]: + return Err("Archive is empty or corrupted") + + # Retrieve root directory from first entry + extracted_root = entries[0].split('/')[0] + + except subprocess.CalledProcessError as err: + return Err(f"Failed to list archive content: {err}") + + cmd = [ + "tar", + "-xzf", + str(archive_file), + "-C", + str(archive_file.parent) + ] + + progress: BackupProgress | None = None + + if verbose: + cmd.insert(1, "-v") + progress = BackupProgress(len(entries), "extracting") + + process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1 + ) + + if verbose and progress is not None: + if process.stdout is None: + return Err("Failed to capture output") + + for line in process.stdout: + line = line.strip() + if line: + filename = Path(line).name + progress.draw_progress_bar(filename) + progress.finish("Extracting backup...", False) + + # Wait for process to complete + process.wait() + + if process.returncode != 0: + return Err("Unable to extract compressed archive") + + root_path = archive_file.parent / extracted_root + + if not root_path.exists(): + return Err(f"Extracted '{root_path}' not found") + + return Ok(root_path) + + @staticmethod + def verify_backup(extracted_dir: Path, checksum_file: Path, verbose: bool) -> Result[None]: + """Verify the integrity of a backup archive""" + if verbose: + print("Verifying backup...") + + try: + with open(checksum_file, 'r') as cf: + expected_hashes = set(line.strip() for line in cf if line.strip()) + except IOError as err: + return Err(f"Failed to load checksum file: {err}") + + files = Backup.collect_files(extracted_dir) + progress = None + if verbose: + progress = BackupProgress(len(files), "verifying") + + for file in files: + hash_res = Backup.compute_file_hash(file) + match hash_res: + case Err(): + return hash_res + case Ok(value=file_hash): + if file_hash not in expected_hashes: + return Err(f"Integrity error for '{file}'") + + if verbose and progress is not None: + progress.draw_progress_bar(file.name) + + if verbose and progress is not None: + progress.finish("Verifying backup...", False) + + return Ok(None) + def extract_backup(self, archive_file: Path, password: str, checksum_file: Optional[Path], verbose: bool) -> Result[None]: + """Extract and verify a backup archive""" + temp_tarball = archive_file.parent / Path("backup.py.tar.gz") + + decrypt_res = self.decrypt_file(archive_file, temp_tarball, password, verbose) + match decrypt_res: + case Err(): + self.cleanup_files(temp_tarball) + return decrypt_res + case Ok(): pass + + extracted_dir: Path | None = None + extract_res = self.extract_tarball(temp_tarball, verbose) + match extract_res: + case Err(): + self.cleanup_files(temp_tarball) + return extract_res + case Ok(value=root_dir): + extracted_dir = root_dir + + # Verify checksums when required + if checksum_file: + checksums_res = self.verify_backup(extracted_dir, checksum_file, verbose) + match checksums_res: + case Err(): + self.cleanup_files(temp_tarball, extracted_dir) + return checksums_res + case Ok(): pass + + self.cleanup_files(temp_tarball) + print(f"Backup extracted to: '{extracted_dir.parent.resolve() / extracted_dir}'") + + return Ok(None) + def main(): parser = argparse.ArgumentParser( description="backup.py - modular and lightweight backup utility" @@ -448,7 +648,7 @@ def main(): backup = Backup() if args.backup: - sources_file, output_path, password = args.backup + sources_file, output_path, encryption_pass = args.backup sources_path = Path(sources_file) output_dir = Path(output_path) @@ -469,7 +669,7 @@ def main(): config = BackupState( sources=v, output_path=output_dir, - password=password, + password=encryption_pass, checksum=args.checksum, verbose=args.verbose ) @@ -480,5 +680,41 @@ def main(): print(f"{e}", file=sys.stderr) sys.exit(1) + elif args.extract: + archive_file = Path(args.extract[0]) + + if not archive_file.exists(): + print(f"Archive file '{archive_file}' does not exist", file=sys.stderr) + sys.exit(1) + + decryption_pass: str = "" + checksum_file: Path | None = None + + if len(args.extract) >= 2: + decryption_pass = args.extract[1] + else: + print("--extract flag requires decryption password as second argument", file=sys.stderr) + sys.exit(1) + + if args.checksum: + if len(args.extract) >= 3: + checksum_file = Path(args.extract[2]) + else: + print("--checksum flag requires SHA256 file as third argument", file=sys.stderr) + sys.exit(1) + + if not checksum_file.exists(): + print(f"Checksum file '{checksum_file}' does not exist", file=sys.stderr) + sys.exit(1) + + extract_res = backup.extract_backup(archive_file, decryption_pass, checksum_file, args.verbose) + match extract_res: + case Err(error=e): + print(f"{e}", file=sys.stderr) + sys.exit(1) + else: + parser.print_help() + sys.exit(1) + if __name__ == "__main__": main()