Added extract feature and improved CLI user interface

This commit is contained in:
2026-01-21 18:10:28 +01:00
parent 8c1e9748bf
commit 60556f0655

284
backup.py
View File

@@ -8,14 +8,12 @@ import shutil
import sys
import os
import time
import tarfile
import subprocess
import hashlib
import getpass
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass
from typing import Generic, TypeVar, Union, Literal, List
from typing import Generic, TypeVar, Union, Literal, Optional, List
T = TypeVar("T")
@dataclass(frozen=True)
@@ -63,11 +61,13 @@ class BackupProgress:
filled = int(bar_width * self.current / self.total)
bar = '' * filled + '' * (bar_width - filled)
status = f"\r :: {self.operation} [{bar}] {percentage:.1f}% ({self.current}/{self.total}) - (processing '{message}') ::"
status = f"\r└──{self.operation} [{bar}] {percentage:.1f}% ({self.current}/{self.total}) - (processing '{message}')"
print(f"\r\033[K{status}", end='', flush=True)
def finish(self) -> None:
"""Print new line"""
def finish(self, initial_message: str, new_line: bool) -> None:
"""Close the CLI UI"""
esc_char = 'L' if new_line else 'A'
print(f'\033[{esc_char}\r{initial_message}DONE')
print()
class Backup:
@@ -210,7 +210,7 @@ class Backup:
def cleanup_files(*paths: Path) -> None:
"""Clean up temporary files and directories"""
for path in paths:
if not path.exists():
if path is None or not path.exists():
continue
if path.is_dir():
@@ -240,11 +240,19 @@ class Backup:
except IOError as e:
return Err(f"Failed to read file {file_path}: {e}")
@staticmethod
def count_tar_entries(source_dir: Path) -> int:
"""Count all entries (files, dirs) that tar processes"""
return sum(1 for _ in source_dir.rglob('*'))
@staticmethod
def create_tarball(source_dir: Path, output_file: Path, verbose: bool) -> Result[None]:
"""Create a compressed tar archive of the backup directory"""
progress: BackupProgress | None = None
if verbose:
print("> Compressing backup...")
print("Compressing backup...")
total_entries = Backup.count_tar_entries(source_dir)
progress = BackupProgress(total_entries, "compressing")
cmd = [
"tar",
@@ -255,16 +263,35 @@ class Backup:
source_dir.name
]
# if verbose:
# cmd.insert(1, "-v")
if verbose:
cmd.insert(1, "-v")
# capture here means suppress it/holding it
result = subprocess.run(cmd, capture_output=not verbose, text=True)
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1
)
if result.returncode != 0:
error_msg = f"tar failed: {result.stderr if result.stderr else 'unknown error code'}"
# Read subprocess output from pipe in buffered mode
if verbose and progress is not None:
if process.stdout is None:
return Err("Failed to capture output")
return Err(error_msg)
for line in process.stdout:
line = line.strip()
if line:
# Extract filename from path
filename = Path(line).name
progress.draw_progress_bar(filename)
progress.finish("Compressing backup...", False)
# Wait for subprocess to complete
process.wait()
if process.returncode != 0:
return Err("Cannot create compressed archive")
return Ok(None)
@@ -276,7 +303,7 @@ class Backup:
return Err("Encryption failed: archive already exists")
if verbose:
print("> Encrypting backup...", end='', flush=True)
print("Encrypting backup...", end='', flush=True)
cmd = [
"gpg", "-a",
@@ -313,19 +340,19 @@ class Backup:
hostname = os.uname().nodename
# Create working directory
work_dir = config.output_path / "backup.sh.tmp"
work_dir = config.output_path / "backup.py.tmp"
if not work_dir.exists():
work_dir.mkdir(parents=True, exist_ok=True)
# Format output files
backup_archive = config.output_path / f"backup-{hostname}-{date_str}.tar.gz.enc"
checksum_file = config.output_path / f"backup-{hostname}-{date_str}.sha256"
temp_tarball = config.output_path / "backup.sh.tar.gz"
temp_tarball = config.output_path / "backup.py.tar.gz"
# Backup each source
sources_count = len(config.sources)
for idx, source in enumerate(config.sources, 1):
print(f"> Copying {source.label} ({idx}/{sources_count})")
print(f"Copying {source.label} ({idx}/{sources_count})")
# Create source subdirectory
source_dir = work_dir / f"backup-{source.label}-{date_str}"
@@ -347,7 +374,7 @@ class Backup:
backup_progress: BackupProgress | None = None
if config.verbose:
backup_progress = BackupProgress(len(files), "Computing checksum")
backup_progress = BackupProgress(len(files), "computing")
with open(checksum_file, 'a') as checksum_fd:
for file in files:
@@ -364,7 +391,7 @@ class Backup:
backup_progress.draw_progress_bar(str(file.name))
if config.verbose and backup_progress is not None:
backup_progress.finish()
backup_progress.finish("Computing checksums...", True)
# Create compressed archive
archive_res = self.create_tarball(work_dir, temp_tarball, config.verbose)
@@ -395,12 +422,185 @@ class Backup:
print(f"File name: '{backup_archive}'")
if config.checksum:
print(f"Checksum file: {checksum_file}")
print(f"Checksum file: '{checksum_file}'")
print(f"File size: {file_size} bytes ({file_size_hr})")
print(f"Elapsed time: {elapsed_time:.2f} seconds")
return Ok(None)
@staticmethod
def decrypt_file(input_file: Path, output_file: Path, password: str, verbose: bool) -> Result[None]:
"""Decrypt an encrypted backup archive"""
if verbose:
print("Decrypting backup...", end='', flush=True)
cmd = [
"gpg", "-a",
"--quiet",
"--decrypt",
"--no-symkey-cache",
"--pinentry-mode=loopback",
"--batch",
"--passphrase-fd", "0",
"--output", str(output_file),
str(input_file)
]
result = subprocess.run(
cmd,
input=password.encode(),
capture_output=True
)
if result.returncode != 0:
return Err(f"Decryption failed: {result.stderr.decode()}")
if verbose:
print("DONE")
return Ok(None)
@staticmethod
def extract_tarball(archive_file: Path, verbose: bool) -> Result[Path]:
"""Extract a tar archive and return the extracted path"""
if verbose:
print("Extracting backup...")
extracted_root: str = ""
# Count archive content
list_cmd = ["tar", "-tzf", str(archive_file)]
try:
list_res = subprocess.run(
list_cmd,
capture_output=True,
text=True,
check=True
)
entries = list_res.stdout.strip().split('\n')
if not entries or not entries[0]:
return Err("Archive is empty or corrupted")
# Retrieve root directory from first entry
extracted_root = entries[0].split('/')[0]
except subprocess.CalledProcessError as err:
return Err(f"Failed to list archive content: {err}")
cmd = [
"tar",
"-xzf",
str(archive_file),
"-C",
str(archive_file.parent)
]
progress: BackupProgress | None = None
if verbose:
cmd.insert(1, "-v")
progress = BackupProgress(len(entries), "extracting")
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1
)
if verbose and progress is not None:
if process.stdout is None:
return Err("Failed to capture output")
for line in process.stdout:
line = line.strip()
if line:
filename = Path(line).name
progress.draw_progress_bar(filename)
progress.finish("Extracting backup...", False)
# Wait for process to complete
process.wait()
if process.returncode != 0:
return Err("Unable to extract compressed archive")
root_path = archive_file.parent / extracted_root
if not root_path.exists():
return Err(f"Extracted '{root_path}' not found")
return Ok(root_path)
@staticmethod
def verify_backup(extracted_dir: Path, checksum_file: Path, verbose: bool) -> Result[None]:
"""Verify the integrity of a backup archive"""
if verbose:
print("Verifying backup...")
try:
with open(checksum_file, 'r') as cf:
expected_hashes = set(line.strip() for line in cf if line.strip())
except IOError as err:
return Err(f"Failed to load checksum file: {err}")
files = Backup.collect_files(extracted_dir)
progress = None
if verbose:
progress = BackupProgress(len(files), "verifying")
for file in files:
hash_res = Backup.compute_file_hash(file)
match hash_res:
case Err():
return hash_res
case Ok(value=file_hash):
if file_hash not in expected_hashes:
return Err(f"Integrity error for '{file}'")
if verbose and progress is not None:
progress.draw_progress_bar(file.name)
if verbose and progress is not None:
progress.finish("Verifying backup...", False)
return Ok(None)
def extract_backup(self, archive_file: Path, password: str, checksum_file: Optional[Path], verbose: bool) -> Result[None]:
"""Extract and verify a backup archive"""
temp_tarball = archive_file.parent / Path("backup.py.tar.gz")
decrypt_res = self.decrypt_file(archive_file, temp_tarball, password, verbose)
match decrypt_res:
case Err():
self.cleanup_files(temp_tarball)
return decrypt_res
case Ok(): pass
extracted_dir: Path | None = None
extract_res = self.extract_tarball(temp_tarball, verbose)
match extract_res:
case Err():
self.cleanup_files(temp_tarball)
return extract_res
case Ok(value=root_dir):
extracted_dir = root_dir
# Verify checksums when required
if checksum_file:
checksums_res = self.verify_backup(extracted_dir, checksum_file, verbose)
match checksums_res:
case Err():
self.cleanup_files(temp_tarball, extracted_dir)
return checksums_res
case Ok(): pass
self.cleanup_files(temp_tarball)
print(f"Backup extracted to: '{extracted_dir.parent.resolve() / extracted_dir}'")
return Ok(None)
def main():
parser = argparse.ArgumentParser(
description="backup.py - modular and lightweight backup utility"
@@ -448,7 +648,7 @@ def main():
backup = Backup()
if args.backup:
sources_file, output_path, password = args.backup
sources_file, output_path, encryption_pass = args.backup
sources_path = Path(sources_file)
output_dir = Path(output_path)
@@ -469,7 +669,7 @@ def main():
config = BackupState(
sources=v,
output_path=output_dir,
password=password,
password=encryption_pass,
checksum=args.checksum,
verbose=args.verbose
)
@@ -480,5 +680,41 @@ def main():
print(f"{e}", file=sys.stderr)
sys.exit(1)
elif args.extract:
archive_file = Path(args.extract[0])
if not archive_file.exists():
print(f"Archive file '{archive_file}' does not exist", file=sys.stderr)
sys.exit(1)
decryption_pass: str = ""
checksum_file: Path | None = None
if len(args.extract) >= 2:
decryption_pass = args.extract[1]
else:
print("--extract flag requires decryption password as second argument", file=sys.stderr)
sys.exit(1)
if args.checksum:
if len(args.extract) >= 3:
checksum_file = Path(args.extract[2])
else:
print("--checksum flag requires SHA256 file as third argument", file=sys.stderr)
sys.exit(1)
if not checksum_file.exists():
print(f"Checksum file '{checksum_file}' does not exist", file=sys.stderr)
sys.exit(1)
extract_res = backup.extract_backup(archive_file, decryption_pass, checksum_file, args.verbose)
match extract_res:
case Err(error=e):
print(f"{e}", file=sys.stderr)
sys.exit(1)
else:
parser.print_help()
sys.exit(1)
if __name__ == "__main__":
main()