Fixed many bugs, added CLI progress bar and completed backup method

This commit is contained in:
2026-01-20 17:21:15 +01:00
parent 6e97943cb7
commit 8c1e9748bf

122
backup.py
View File

@@ -1,6 +1,6 @@
#!/usr/bin/env python3
# backup.py: modular and lightweight backup utility
# Developed by Marco Cetica (c) 2018, 2023, 2024, 2026
# Developed by Marco Cetica (c) 2018-2026
#
import argparse
@@ -18,7 +18,6 @@ from dataclasses import dataclass
from typing import Generic, TypeVar, Union, Literal, List
T = TypeVar("T")
@dataclass(frozen=True)
class Ok(Generic[T]):
"""Sum type to represent results"""
@@ -49,20 +48,25 @@ class BackupState:
class BackupProgress:
"""Progress indicator for backup operations"""
def __init__(self, total: int, operation: str):
def __init__(self, total: int, operation: str) -> None:
self.total = total
self.current = 0
self.operation = operation
def update(self, message: str = ""):
"""Update progress"""
def draw_progress_bar(self, message: str = "") -> None:
"""draw progress bar"""
self.current += 1
percentage = (self.current / self.total) * 100 if self.total > 0 else 0
status = f"\r{self.operation}: [{self.current}/{self.total}] {percentage: .1f}% - {message}"
# Create a CLI prograss bar
bar_width = 30
filled = int(bar_width * self.current / self.total)
bar = '' * filled + '' * (bar_width - filled)
status = f"\r :: {self.operation} [{bar}] {percentage:.1f}% ({self.current}/{self.total}) - (processing '{message}') ::"
print(f"\r\033[K{status}", end='', flush=True)
def finish(self):
def finish(self) -> None:
"""Print new line"""
print()
@@ -111,7 +115,7 @@ class Backup:
continue
if '=' not in line:
print(f"Warning: invalid format at line {pos}: {line}")
return Err(f"invalid format at line {pos}: '{line}'")
label, path_str = line.split('=', 1)
path = Path(path_str.strip())
@@ -141,7 +145,7 @@ class Backup:
return True
# Skip named pipes (FIFOs)
if path.stat().st_mode & 0o170000 == 0o010000:
if (path.stat().st_mode & 0o170000) == 0o010000:
return True
return False
@@ -163,7 +167,7 @@ class Backup:
return ignored_files
@staticmethod
def copy_files(source: Path, destination: Path, verbose: bool) -> Result[None]:
def copy_files(source: Path, destination: Path) -> Result[None]:
"""Copy files and directories preserving their metadata"""
try:
# Handle single file
@@ -171,9 +175,6 @@ class Backup:
# Parent directory might not exists, so we try to create it first
destination.parent.mkdir(parents=True, exist_ok=True)
if verbose:
print(f"Copying file {source} -> {destination}")
# Copy file and its metadata
shutil.copy2(source, destination)
@@ -186,9 +187,6 @@ class Backup:
if destination.exists():
shutil.rmtree(destination)
if verbose:
print(f"Copying directory {source} -> {destination}")
# Copy directory and its metadata.
# We also ignore special files and we preserves links instead
# of following them.
@@ -234,10 +232,11 @@ class Backup:
def compute_file_hash(file_path: Path) -> Result[str]:
"""Compute SHA256 hash of a given file"""
try:
hash_obj = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
hashlib.sha256().update(byte_block)
return Ok(hashlib.sha256().hexdigest())
hash_obj.update(byte_block)
return Ok(hash_obj.hexdigest())
except IOError as e:
return Err(f"Failed to read file {file_path}: {e}")
@@ -245,7 +244,7 @@ class Backup:
def create_tarball(source_dir: Path, output_file: Path, verbose: bool) -> Result[None]:
"""Create a compressed tar archive of the backup directory"""
if verbose:
print("Compressing backup...")
print("> Compressing backup...")
cmd = [
"tar",
@@ -256,14 +255,14 @@ class Backup:
source_dir.name
]
if verbose:
cmd[1] += "-v"
# if verbose:
# cmd.insert(1, "-v")
# capture here means suppress it/holding it
result = subprocess.run(cmd, capture_output=not verbose, text=None)
result = subprocess.run(cmd, capture_output=not verbose, text=True)
if result.returncode != 0:
error_msg = f"tar failed: {result.stderr if result.stderr else 'Unknown error code'}"
error_msg = f"tar failed: {result.stderr if result.stderr else 'unknown error code'}"
return Err(error_msg)
@@ -272,8 +271,12 @@ class Backup:
@staticmethod
def encrypt_file(input_file: Path, output_file: Path, password: str, verbose: bool) -> Result[None]:
"""Encrypt a file with GPG in symmetric mode (using AES256)"""
if output_file.exists():
return Err("Encryption failed: archive already exists")
if verbose:
print("Encrypting backup...")
print("> Encrypting backup...", end='', flush=True)
cmd = [
"gpg", "-a",
@@ -296,6 +299,7 @@ class Backup:
if result.returncode != 0:
return Err(f"Encryption failed: {result.stderr.decode()}")
print("DONE")
return Ok(None)
def make_backup(self, config: BackupState) -> Result[None]:
@@ -321,7 +325,7 @@ class Backup:
# Backup each source
sources_count = len(config.sources)
for idx, source in enumerate(config.sources, 1):
print(f"Copying {source.label} ({idx}/{sources_count})")
print(f"> Copying {source.label} ({idx}/{sources_count})")
# Create source subdirectory
source_dir = work_dir / f"backup-{source.label}-{date_str}"
@@ -329,7 +333,7 @@ class Backup:
source_dir.mkdir(parents=True, exist_ok=True)
# Copy files
copy_res = self.copy_files(source.path, work_dir, config.verbose)
copy_res = self.copy_files(source.path, source_dir)
match copy_res:
case Err():
self.cleanup_files(work_dir, temp_tarball)
@@ -343,23 +347,21 @@ class Backup:
backup_progress: BackupProgress | None = None
if config.verbose:
backup_progress = BackupProgress(len(files), "Computing checksums")
backup_progress = BackupProgress(len(files), "Computing checksum")
checksum_fd = open(checksum_file, 'a')
for file in files:
hash_result = self.compute_file_hash(file)
match hash_result:
case Err():
checksum_fd.close()
self.cleanup_files(work_dir, temp_tarball)
return hash_result
case Ok(value=v):
checksum_fd.write(f"{v}\n")
with open(checksum_file, 'a') as checksum_fd:
for file in files:
hash_result = self.compute_file_hash(file)
match hash_result:
case Err():
checksum_fd.close()
self.cleanup_files(work_dir, temp_tarball)
return hash_result
case Ok(value=v):
checksum_fd.write(f"{v}\n")
if config.verbose and backup_progress is not None:
backup_progress.update(str(file.name))
checksum_fd.close()
if config.verbose and backup_progress is not None:
backup_progress.draw_progress_bar(str(file.name))
if config.verbose and backup_progress is not None:
backup_progress.finish()
@@ -390,9 +392,8 @@ class Backup:
elapsed_time = time.time() - start_time
file_size = backup_archive.stat().st_size
file_size_hr = Backup.prettify_size(file_size)
print(f"\nBackup complete")
print(f"File name: {backup_archive}")
print(f"File name: '{backup_archive}'")
if config.checksum:
print(f"Checksum file: {checksum_file}")
print(f"File size: {file_size} bytes ({file_size_hr})")
@@ -402,22 +403,7 @@ class Backup:
def main():
parser = argparse.ArgumentParser(
description="backup.py - modular and lightweight backup utility",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Usage:
> Create a backup
sudo ./backup.py --backup sources.bk /home/user $PASS
> Create backup with checksum
sudo ./backup.py --checksum --backup sources.bk /home/user $PASS
> Extract and verify checksum
./backup.py --checksum --extract backup.tar.gz.enc $PASS hashes.sha256
For more information visit: https://git.marcocetica.com/marco/backup.py
or issue `man backup.py` on your terminal.
"""
description="backup.py - modular and lightweight backup utility"
)
parser.add_argument(
@@ -448,10 +434,15 @@ Usage:
args = parser.parse_args()
if not (args.backup or args.extract):
parser.error("specify either --backup or --extract")
# Check whether dependencies are installed
deps_res = Backup.check_deps()
match deps_res:
case Err(error=e): print(f"Error: {e}", file=sys.stderr)
case Err(error=e):
print(f"{e}", file=sys.stderr)
sys.exit(1)
case Ok(): pass
backup = Backup()
@@ -471,7 +462,7 @@ Usage:
config: BackupState
match sources_res:
case Err(error=e):
print(f"Error: {e}", file=sys.stderr)
print(f"{e}", file=sys.stderr)
sys.exit(1)
case Ok(value=v):
# Create a backup state
@@ -482,7 +473,12 @@ Usage:
checksum=args.checksum,
verbose=args.verbose
)
backup_res = backup.make_backup(config)
match backup_res:
case Err(error=e):
print(f"{e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()