From 8c1e9748bf7dca09d076728b60b444877ddc1053 Mon Sep 17 00:00:00 2001 From: Marco Cetica Date: Tue, 20 Jan 2026 17:21:15 +0100 Subject: [PATCH] Fixed many bugs, added CLI progress bar and completed backup method --- backup.py | 122 ++++++++++++++++++++++++++---------------------------- 1 file changed, 59 insertions(+), 63 deletions(-) diff --git a/backup.py b/backup.py index 9eb198b..2cc6f61 100755 --- a/backup.py +++ b/backup.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # backup.py: modular and lightweight backup utility -# Developed by Marco Cetica (c) 2018, 2023, 2024, 2026 +# Developed by Marco Cetica (c) 2018-2026 # import argparse @@ -18,7 +18,6 @@ from dataclasses import dataclass from typing import Generic, TypeVar, Union, Literal, List T = TypeVar("T") - @dataclass(frozen=True) class Ok(Generic[T]): """Sum type to represent results""" @@ -49,20 +48,25 @@ class BackupState: class BackupProgress: """Progress indicator for backup operations""" - def __init__(self, total: int, operation: str): + def __init__(self, total: int, operation: str) -> None: self.total = total self.current = 0 self.operation = operation - def update(self, message: str = ""): - """Update progress""" + def draw_progress_bar(self, message: str = "") -> None: + """draw progress bar""" self.current += 1 percentage = (self.current / self.total) * 100 if self.total > 0 else 0 - status = f"\r{self.operation}: [{self.current}/{self.total}] {percentage: .1f}% - {message}" + # Create a CLI prograss bar + bar_width = 30 + filled = int(bar_width * self.current / self.total) + bar = '█' * filled + '░' * (bar_width - filled) + + status = f"\r :: {self.operation} [{bar}] {percentage:.1f}% ({self.current}/{self.total}) - (processing '{message}') ::" print(f"\r\033[K{status}", end='', flush=True) - def finish(self): + def finish(self) -> None: """Print new line""" print() @@ -111,7 +115,7 @@ class Backup: continue if '=' not in line: - print(f"Warning: invalid format at line {pos}: {line}") + return Err(f"invalid format at line {pos}: '{line}'") label, path_str = line.split('=', 1) path = Path(path_str.strip()) @@ -141,7 +145,7 @@ class Backup: return True # Skip named pipes (FIFOs) - if path.stat().st_mode & 0o170000 == 0o010000: + if (path.stat().st_mode & 0o170000) == 0o010000: return True return False @@ -163,7 +167,7 @@ class Backup: return ignored_files @staticmethod - def copy_files(source: Path, destination: Path, verbose: bool) -> Result[None]: + def copy_files(source: Path, destination: Path) -> Result[None]: """Copy files and directories preserving their metadata""" try: # Handle single file @@ -171,9 +175,6 @@ class Backup: # Parent directory might not exists, so we try to create it first destination.parent.mkdir(parents=True, exist_ok=True) - if verbose: - print(f"Copying file {source} -> {destination}") - # Copy file and its metadata shutil.copy2(source, destination) @@ -186,9 +187,6 @@ class Backup: if destination.exists(): shutil.rmtree(destination) - if verbose: - print(f"Copying directory {source} -> {destination}") - # Copy directory and its metadata. # We also ignore special files and we preserves links instead # of following them. @@ -234,10 +232,11 @@ class Backup: def compute_file_hash(file_path: Path) -> Result[str]: """Compute SHA256 hash of a given file""" try: + hash_obj = hashlib.sha256() with open(file_path, "rb") as f: for byte_block in iter(lambda: f.read(4096), b""): - hashlib.sha256().update(byte_block) - return Ok(hashlib.sha256().hexdigest()) + hash_obj.update(byte_block) + return Ok(hash_obj.hexdigest()) except IOError as e: return Err(f"Failed to read file {file_path}: {e}") @@ -245,7 +244,7 @@ class Backup: def create_tarball(source_dir: Path, output_file: Path, verbose: bool) -> Result[None]: """Create a compressed tar archive of the backup directory""" if verbose: - print("Compressing backup...") + print("> Compressing backup...") cmd = [ "tar", @@ -256,14 +255,14 @@ class Backup: source_dir.name ] - if verbose: - cmd[1] += "-v" + # if verbose: + # cmd.insert(1, "-v") # capture here means suppress it/holding it - result = subprocess.run(cmd, capture_output=not verbose, text=None) + result = subprocess.run(cmd, capture_output=not verbose, text=True) if result.returncode != 0: - error_msg = f"tar failed: {result.stderr if result.stderr else 'Unknown error code'}" + error_msg = f"tar failed: {result.stderr if result.stderr else 'unknown error code'}" return Err(error_msg) @@ -272,8 +271,12 @@ class Backup: @staticmethod def encrypt_file(input_file: Path, output_file: Path, password: str, verbose: bool) -> Result[None]: """Encrypt a file with GPG in symmetric mode (using AES256)""" + + if output_file.exists(): + return Err("Encryption failed: archive already exists") + if verbose: - print("Encrypting backup...") + print("> Encrypting backup...", end='', flush=True) cmd = [ "gpg", "-a", @@ -296,6 +299,7 @@ class Backup: if result.returncode != 0: return Err(f"Encryption failed: {result.stderr.decode()}") + print("DONE") return Ok(None) def make_backup(self, config: BackupState) -> Result[None]: @@ -321,7 +325,7 @@ class Backup: # Backup each source sources_count = len(config.sources) for idx, source in enumerate(config.sources, 1): - print(f"Copying {source.label} ({idx}/{sources_count})") + print(f"> Copying {source.label} ({idx}/{sources_count})") # Create source subdirectory source_dir = work_dir / f"backup-{source.label}-{date_str}" @@ -329,7 +333,7 @@ class Backup: source_dir.mkdir(parents=True, exist_ok=True) # Copy files - copy_res = self.copy_files(source.path, work_dir, config.verbose) + copy_res = self.copy_files(source.path, source_dir) match copy_res: case Err(): self.cleanup_files(work_dir, temp_tarball) @@ -343,23 +347,21 @@ class Backup: backup_progress: BackupProgress | None = None if config.verbose: - backup_progress = BackupProgress(len(files), "Computing checksums") + backup_progress = BackupProgress(len(files), "Computing checksum") - checksum_fd = open(checksum_file, 'a') - for file in files: - hash_result = self.compute_file_hash(file) - match hash_result: - case Err(): - checksum_fd.close() - self.cleanup_files(work_dir, temp_tarball) - return hash_result - case Ok(value=v): - checksum_fd.write(f"{v}\n") + with open(checksum_file, 'a') as checksum_fd: + for file in files: + hash_result = self.compute_file_hash(file) + match hash_result: + case Err(): + checksum_fd.close() + self.cleanup_files(work_dir, temp_tarball) + return hash_result + case Ok(value=v): + checksum_fd.write(f"{v}\n") - if config.verbose and backup_progress is not None: - backup_progress.update(str(file.name)) - - checksum_fd.close() + if config.verbose and backup_progress is not None: + backup_progress.draw_progress_bar(str(file.name)) if config.verbose and backup_progress is not None: backup_progress.finish() @@ -390,9 +392,8 @@ class Backup: elapsed_time = time.time() - start_time file_size = backup_archive.stat().st_size file_size_hr = Backup.prettify_size(file_size) - - print(f"\nBackup complete") - print(f"File name: {backup_archive}") + + print(f"File name: '{backup_archive}'") if config.checksum: print(f"Checksum file: {checksum_file}") print(f"File size: {file_size} bytes ({file_size_hr})") @@ -402,22 +403,7 @@ class Backup: def main(): parser = argparse.ArgumentParser( - description="backup.py - modular and lightweight backup utility", - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Usage: - > Create a backup - sudo ./backup.py --backup sources.bk /home/user $PASS - - > Create backup with checksum - sudo ./backup.py --checksum --backup sources.bk /home/user $PASS - - > Extract and verify checksum - ./backup.py --checksum --extract backup.tar.gz.enc $PASS hashes.sha256 - - For more information visit: https://git.marcocetica.com/marco/backup.py - or issue `man backup.py` on your terminal. - """ + description="backup.py - modular and lightweight backup utility" ) parser.add_argument( @@ -448,10 +434,15 @@ Usage: args = parser.parse_args() + if not (args.backup or args.extract): + parser.error("specify either --backup or --extract") + # Check whether dependencies are installed deps_res = Backup.check_deps() match deps_res: - case Err(error=e): print(f"Error: {e}", file=sys.stderr) + case Err(error=e): + print(f"{e}", file=sys.stderr) + sys.exit(1) case Ok(): pass backup = Backup() @@ -471,7 +462,7 @@ Usage: config: BackupState match sources_res: case Err(error=e): - print(f"Error: {e}", file=sys.stderr) + print(f"{e}", file=sys.stderr) sys.exit(1) case Ok(value=v): # Create a backup state @@ -482,7 +473,12 @@ Usage: checksum=args.checksum, verbose=args.verbose ) - + + backup_res = backup.make_backup(config) + match backup_res: + case Err(error=e): + print(f"{e}", file=sys.stderr) + sys.exit(1) if __name__ == "__main__": main()