Improved CLI user interface
All checks were successful
backup.py / unit-tests (push) Successful in 23s

This commit is contained in:
2026-01-28 17:05:26 +01:00
parent 3d194cc91e
commit 1e644c9133
3 changed files with 83 additions and 22 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 685 KiB

After

Width:  |  Height:  |  Size: 491 KiB

View File

@@ -33,7 +33,7 @@ File size: 5533818904 bytes (5.15 GiB)
Elapsed time: 2 minutes, 12 seconds
```
The `--checsum` (optional) is used to generate a checksum file containing the hashes of each single of the backup.
The `--checksum` (optional) is used to generate a checksum file containing the hashes of each single of the backup.
To extract an existing backup, you can instead issue the following command:

103
backup.py
View File

@@ -83,19 +83,33 @@ class SignalHandler:
Backup.cleanup_files(*temp_files)
print("DONE.", file=sys.stderr)
print("DONE", file=sys.stderr)
sys.exit(130)
class BackupProgress:
"""Progress indicator for backup operations"""
RESET = '\033[0m'
GRAY = '\033[90m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
CYAN = '\033[96m'
def __init__(self, total: int, operation: str, status_msg: str) -> None:
self.total = total
self.current = 0
self.operation = operation
self.status_msg = status_msg
self.start_time = 0
def start_time_tracking(self, existing_time = None) -> None:
"""Initialize time tracking"""
self.start_time = time.time() if not existing_time else existing_time
def log_operation(self) -> None:
"""Print the Backup operation to stdout"""
self.start_time_tracking()
print(self.operation)
def draw_progress_bar(self, filename: str = "") -> None:
@@ -106,7 +120,7 @@ class BackupProgress:
# Create a CLI prograss bar
bar_width = 30
filled = int(bar_width * self.current / self.total)
bar = '' * filled + '' * (bar_width - filled)
bar = f"{self.GRAY}{'' * filled}{'' * (bar_width - filled)}{self.RESET}"
# Truncate filename if it's too long to display
# by keeping the first 30 characters + extension (if available)
@@ -121,17 +135,22 @@ class BackupProgress:
else:
filename = filename[:filename_max_len - 5]
status = f"\r└──{self.operation} [{bar}] {percentage:.1f}% ({self.current}/{self.total}) - (processing '{filename}')"
print(f"\r\033[K{status}", end='', flush=True)
progress_bar = (f"\r {self.status_msg} [{bar}] "
f"{self.YELLOW}{percentage:.1f}%{self.RESET} "
f"({self.current}/{self.total}): "
f"{self.BLUE}'{filename}'{self.RESET}")
print(f"\r\033[K{progress_bar}", end='', flush=True)
def complete_task(self) -> None:
"""Complete a task"""
# To complete a task, we do the following:
# 1. Move the cursor one line upwards
# 2. Move the cursor at end of operation message (i.e., rewrite the message)
# 3. Add 'DONE' there
# 3. Add duration there
# 4. Move the cursor downwards one line
print(f'\033[A\r{self.operation}DONE\n')
duration = time.time() - self.start_time
print(f"\033[A\r{self.operation}{self.GREEN}DONE{self.RESET} "
f"({self.CYAN}{duration:.2f}s{self.RESET})\n")
class Backup:
@staticmethod
@@ -380,6 +399,7 @@ class Backup:
@staticmethod
def encrypt_file(input_file: Path, output_file: Path, password: str, verbose: bool) -> Result[None]:
"""Encrypt a file with GPG in symmetric mode (using AES256)"""
start_time = time.time()
if output_file.exists():
return Err("Encryption failed: archive already exists.")
@@ -409,7 +429,9 @@ class Backup:
return Err(f"Encryption failed: {result.stderr.decode()}.")
if verbose:
print("DONE")
duration = time.time() - start_time
print(f"{BackupProgress.GREEN}DONE{BackupProgress.RESET}"
f" ({BackupProgress.CYAN}{duration:.2f}s{BackupProgress.RESET})")
return Ok(None)
@@ -436,7 +458,9 @@ class Backup:
# Backup each source
sources_count = len(config.sources)
for idx, source in enumerate(config.sources, 1):
print(f"Copying {source.label} ({idx}/{sources_count})")
if config.verbose:
start_time = time.time()
print(f"Copying {source.label} ({idx}/{sources_count})...", end='', flush=True)
# Create source subdirectory
source_dir = work_dir / f"backup-{source.label}-{date_str}"
@@ -449,7 +473,11 @@ class Backup:
case Err():
self.cleanup_files(work_dir, temp_tarball)
return copy_res
case Ok(): pass
case Ok():
if config.verbose:
duration = time.time() - start_time
print(f"{BackupProgress.GREEN}DONE{BackupProgress.RESET}"
f" ({BackupProgress.CYAN}{duration:.2f}s{BackupProgress.RESET})")
# Compute checksum when requested
if config.checksum:
@@ -478,13 +506,19 @@ class Backup:
if config.verbose and backup_progress is not None:
backup_progress.complete_task()
# Add a blank line between each backup entry (on verbose mode)
if config.verbose:
print("")
# Create compressed archive
archive_res = self.create_tarball(work_dir, temp_tarball, config.verbose)
match archive_res:
case Err():
self.cleanup_files(work_dir, temp_tarball)
return archive_res
case Ok(): pass
case Ok():
if config.verbose:
print("")
# Encrypt the archive
encrypt_res = self.encrypt_file(temp_tarball, backup_archive, config.password, config.verbose)
@@ -492,7 +526,9 @@ class Backup:
case Err():
self.cleanup_files(work_dir, temp_tarball)
return encrypt_res
case Ok(): pass
case Ok():
if config.verbose:
print("")
# Cleanup temporary files
self.cleanup_files(work_dir, temp_tarball)
@@ -504,19 +540,36 @@ class Backup:
elapsed_time = time.time() - start_time
file_size = backup_archive.stat().st_size
file_size_hr = self.prettify_size(file_size)
print(f"File name: '{backup_archive}'")
if config.checksum:
print(f"Checksums file: '{checksum_file}'")
print(f"File size: {file_size} bytes ({file_size_hr})")
print(f"Elapsed time: {self.prettify_timestamp(elapsed_time)}")
# Print a table containing some information about the backup
if config.verbose:
rows = [
("File name", f"'{backup_archive}'"),
("File size", f"{file_size} bytes ({file_size_hr})"),
("Elapsed time", f"{self.prettify_timestamp(elapsed_time)}")
]
if config.checksum:
rows.insert(1, ("Checksum file", f"'{checksum_file}'"))
# Compute column widths
max_label_width = max(len(label) for label, _ in rows)
max_value_width = max(len(value) for _, value in rows)
separator = f"+{'-' * (max_label_width + 2)}+{'-' * (max_value_width + 2)}+"
print(separator)
for label, value in rows:
print(f"| {label:<{max_label_width}} | {value:<{max_value_width}} |")
print(separator)
return Ok(None)
@staticmethod
def decrypt_file(input_file: Path, output_file: Path, password: str, verbose: bool) -> Result[None]:
"""Decrypt an encrypted backup archive"""
start_time = 0
if verbose:
start_time = time.time()
print("Decrypting backup...", end='', flush=True)
cmd = [
@@ -541,14 +594,18 @@ class Backup:
return Err(f"Decryption failed: {result.stderr.decode()}.")
if verbose:
print("DONE")
duration = time.time() - start_time
print(f"{BackupProgress.GREEN}DONE{BackupProgress.RESET}"
f" ({BackupProgress.CYAN}{duration:.2f}s{BackupProgress.RESET})")
return Ok(None)
@staticmethod
def extract_tarball(archive_file: Path, verbose: bool) -> Result[Path]:
"""Extract a tar archive and return the extracted path"""
start_time = 0
if verbose:
start_time = time.time()
print("Extracting backup...")
extracted_root: str = ""
@@ -585,6 +642,7 @@ class Backup:
if verbose:
cmd.insert(1, "-v")
progress = BackupProgress(len(entries), "Extracting backup...", "extracting")
progress.start_time_tracking(start_time)
process = subprocess.Popen(
cmd,
@@ -680,14 +738,17 @@ class Backup:
case Err():
self.cleanup_files(temp_tarball, extracted_dir)
return checksums_res
case Ok(): pass
case Ok():
if verbose:
print("")
self.cleanup_files(temp_tarball)
elapsed_time = time.time() - start_time
print(f"Backup extracted to: '{extracted_dir.parent.resolve() / extracted_dir}'")
print(f"Elapsed time: {self.prettify_timestamp(elapsed_time)}")
if verbose:
print(f"Backup extracted to: '{extracted_dir.parent.resolve() / extracted_dir}'")
print(f"Elapsed time: {self.prettify_timestamp(elapsed_time)}")
return Ok(None)