Improved CLI user interface
All checks were successful
backup.py / unit-tests (push) Successful in 20s
All checks were successful
backup.py / unit-tests (push) Successful in 20s
This commit is contained in:
BIN
.usage.gif
BIN
.usage.gif
Binary file not shown.
|
Before Width: | Height: | Size: 685 KiB After Width: | Height: | Size: 369 KiB |
49
README.md
49
README.md
@@ -23,24 +23,50 @@ wireguard=/etc/wireguard/wg0.conf
|
||||
Then, you can start the backup process with the following command:
|
||||
|
||||
```sh
|
||||
$ sudo ./backup.py --checksum --backup sources.ini $PWD "very_bad_pw"
|
||||
Copying photos (1/3)
|
||||
Copying documents (2/3)
|
||||
Copying wireguard (3/3)
|
||||
File name: '/home/marco/Projects/backup.py/backup-wood-20260122.tar.gz.enc'
|
||||
Checksums file: '/home/marco/Projects/backup.py/backup-wood-20260122.sha256'
|
||||
File size: 5533818904 bytes (5.15 GiB)
|
||||
Elapsed time: 2 minutes, 12 seconds
|
||||
$ sudo ./backup.py --verbose --checksum --backup sources.ini $PWD "very_bad_pw"
|
||||
Copying photos (1/3)...DONE (0.02s)
|
||||
Computing checksums...DONE (0.01s)
|
||||
computing [██████████████████████████████] 100.0% (5/5): 'Screenshot From 2026-01-22....png'
|
||||
|
||||
Copying documents (2/3)...DONE (3.39s)
|
||||
Computing checksums...DONE (1.26s)
|
||||
computing [██████████████████████████████] 100.0% (7881/7881): 'master'
|
||||
|
||||
Copying wireguard (3/3)...DONE (0.00s)
|
||||
Computing checksums...DONE (0.00s)
|
||||
computing [██████████████████████████████] 100.0% (1/1): 'wg0.conf'
|
||||
|
||||
Compressing backup...DONE (22.52s)
|
||||
compressing [██████████████████████████████] 100.0% (8355/8354): 'rec2.jpg'
|
||||
|
||||
Encrypting backup...DONE (0.90s)
|
||||
|
||||
+---------------+------------------------------------------------------------------+
|
||||
| File name | '/home/marco/Projects/backup.py/backup-wood-20260129.tar.gz.enc' |
|
||||
+---------------+------------------------------------------------------------------+
|
||||
| Checksum file | '/home/marco/Projects/backup.py/backup-wood-20260129.sha256' |
|
||||
+---------------+------------------------------------------------------------------+
|
||||
| File size | 344165145 bytes (328.22 MiB) |
|
||||
+---------------+------------------------------------------------------------------+
|
||||
| Elapsed time | 23 seconds |
|
||||
+---------------+------------------------------------------------------------------+
|
||||
```
|
||||
|
||||
The `--checsum` (optional) is used to generate a checksum file containing the hashes of each single of the backup.
|
||||
The `--checksum` (optional) is used to generate a checksum file containing the hashes of each single of the backup.
|
||||
You can also omit the `--verbose` flag to run the program in quiet mode.
|
||||
|
||||
To extract an existing backup, you can instead issue the following command:
|
||||
|
||||
```sh
|
||||
$ ./backup.py -c --extract backup-wood-20260122.tar.gz.enc "very_bad_pw" backup-wood-20260122.sha256
|
||||
$ ./backup.py --verbose --checksum --extract backup-wood-20260129.tar.gz.enc "very_bad_pw" backup-wood-20260129.sha256
|
||||
Decrypting backup...DONE (0.76s)
|
||||
Extracting backup...DONE (6.93s)
|
||||
extracting [██████████████████████████████] 100.0% (8355/8355): 'rec2.jpg'
|
||||
Verifying backup...DONE (0.89s)
|
||||
verifying [██████████████████████████████] 100.0% (7887/7887): 'master'
|
||||
|
||||
Backup extracted to: '/home/marco/Projects/backup.py/backup.py.tmp'
|
||||
Elapsed time: 1 minute, 3 seconds
|
||||
Elapsed time: 8 seconds
|
||||
```
|
||||
|
||||
This will create a new directory named `backup.py.tmp` on your local path. Just like before,
|
||||
@@ -56,7 +82,6 @@ the following command:
|
||||
|
||||
```sh
|
||||
$ sudo cp -Rv "$(pwd)/backup.py" /usr/bin/backup.py
|
||||
'/home/marco/Projects/backup.py/backup.py' -> '/usr/bin/backup.py'
|
||||
```
|
||||
|
||||
## Technical details
|
||||
|
||||
106
backup.py
106
backup.py
@@ -14,6 +14,7 @@ import signal
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Any, Generic, TypeVar, Union, Optional, List
|
||||
|
||||
T = TypeVar("T")
|
||||
@@ -83,19 +84,37 @@ class SignalHandler:
|
||||
|
||||
Backup.cleanup_files(*temp_files)
|
||||
|
||||
print("DONE.", file=sys.stderr)
|
||||
print("DONE", file=sys.stderr)
|
||||
sys.exit(130)
|
||||
|
||||
class EscapeChar(Enum):
|
||||
"""Enumeration for escape characters"""
|
||||
RESET = '\033[0m'
|
||||
GRAY = '\033[90m'
|
||||
GREEN = '\033[92m'
|
||||
YELLOW = '\033[93m'
|
||||
BLUE = '\033[94m'
|
||||
CYAN = '\033[96m'
|
||||
LINE_UP = '\033[A'
|
||||
ERASE_LINE = '\033[2K'
|
||||
|
||||
class BackupProgress:
|
||||
"""Progress indicator for backup operations"""
|
||||
|
||||
def __init__(self, total: int, operation: str, status_msg: str) -> None:
|
||||
self.total = total
|
||||
self.current = 0
|
||||
self.operation = operation
|
||||
self.status_msg = status_msg
|
||||
self.start_time = 0
|
||||
|
||||
def start_time_tracking(self, existing_time = None) -> None:
|
||||
"""Initialize time tracking"""
|
||||
self.start_time = time.time() if not existing_time else existing_time
|
||||
|
||||
def log_operation(self) -> None:
|
||||
"""Print the Backup operation to stdout"""
|
||||
self.start_time_tracking()
|
||||
print(self.operation)
|
||||
|
||||
def draw_progress_bar(self, filename: str = "") -> None:
|
||||
@@ -106,7 +125,7 @@ class BackupProgress:
|
||||
# Create a CLI prograss bar
|
||||
bar_width = 30
|
||||
filled = int(bar_width * self.current / self.total)
|
||||
bar = '█' * filled + '░' * (bar_width - filled)
|
||||
bar = f"{EscapeChar.GRAY.value}{'█' * filled}{'░' * (bar_width - filled)}{EscapeChar.RESET.value}"
|
||||
|
||||
# Truncate filename if it's too long to display
|
||||
# by keeping the first 30 characters + extension (if available)
|
||||
@@ -121,17 +140,22 @@ class BackupProgress:
|
||||
else:
|
||||
filename = filename[:filename_max_len - 5]
|
||||
|
||||
status = f"\r└──{self.operation} [{bar}] {percentage:.1f}% ({self.current}/{self.total}) - (processing '{filename}')"
|
||||
print(f"\r\033[K{status}", end='', flush=True)
|
||||
progress_bar = (f"\r {self.status_msg} [{bar}] "
|
||||
f"{EscapeChar.YELLOW.value}{percentage:.1f}%{EscapeChar.RESET.value} "
|
||||
f"({self.current}/{self.total}): "
|
||||
f"{EscapeChar.BLUE.value}'{filename}'{EscapeChar.RESET.value}")
|
||||
print(f"{EscapeChar.ERASE_LINE.value}{progress_bar}", end='', flush=True)
|
||||
|
||||
def complete_task(self) -> None:
|
||||
"""Complete a task"""
|
||||
# To complete a task, we do the following:
|
||||
# 1. Move the cursor one line upwards
|
||||
# 2. Move the cursor at end of operation message (i.e., rewrite the message)
|
||||
# 3. Add 'DONE' there
|
||||
# 3. Add duration there
|
||||
# 4. Move the cursor downwards one line
|
||||
print(f'\033[A\r{self.operation}DONE\n')
|
||||
duration = time.time() - self.start_time
|
||||
print(f"{EscapeChar.LINE_UP.value}\r{self.operation}{EscapeChar.GREEN.value}DONE{EscapeChar.RESET.value} "
|
||||
f"({EscapeChar.CYAN.value}{duration:.2f}s{EscapeChar.RESET.value})\n")
|
||||
|
||||
class Backup:
|
||||
@staticmethod
|
||||
@@ -380,6 +404,7 @@ class Backup:
|
||||
@staticmethod
|
||||
def encrypt_file(input_file: Path, output_file: Path, password: str, verbose: bool) -> Result[None]:
|
||||
"""Encrypt a file with GPG in symmetric mode (using AES256)"""
|
||||
start_time = time.time()
|
||||
|
||||
if output_file.exists():
|
||||
return Err("Encryption failed: archive already exists.")
|
||||
@@ -409,7 +434,9 @@ class Backup:
|
||||
return Err(f"Encryption failed: {result.stderr.decode()}.")
|
||||
|
||||
if verbose:
|
||||
print("DONE")
|
||||
duration = time.time() - start_time
|
||||
print(f"{EscapeChar.GREEN.value}DONE{EscapeChar.RESET.value}"
|
||||
f" ({EscapeChar.CYAN.value}{duration:.2f}s{EscapeChar.RESET.value})")
|
||||
|
||||
return Ok(None)
|
||||
|
||||
@@ -436,7 +463,9 @@ class Backup:
|
||||
# Backup each source
|
||||
sources_count = len(config.sources)
|
||||
for idx, source in enumerate(config.sources, 1):
|
||||
print(f"Copying {source.label} ({idx}/{sources_count})")
|
||||
if config.verbose:
|
||||
start_time = time.time()
|
||||
print(f"Copying {source.label} ({idx}/{sources_count})...", end='', flush=True)
|
||||
|
||||
# Create source subdirectory
|
||||
source_dir = work_dir / f"backup-{source.label}-{date_str}"
|
||||
@@ -449,7 +478,11 @@ class Backup:
|
||||
case Err():
|
||||
self.cleanup_files(work_dir, temp_tarball)
|
||||
return copy_res
|
||||
case Ok(): pass
|
||||
case Ok():
|
||||
if config.verbose:
|
||||
duration = time.time() - start_time
|
||||
print(f"{EscapeChar.GREEN.value}DONE{EscapeChar.RESET.value}"
|
||||
f" ({EscapeChar.CYAN.value}{duration:.2f}s{EscapeChar.RESET.value})")
|
||||
|
||||
# Compute checksum when requested
|
||||
if config.checksum:
|
||||
@@ -478,13 +511,19 @@ class Backup:
|
||||
if config.verbose and backup_progress is not None:
|
||||
backup_progress.complete_task()
|
||||
|
||||
# Add a blank line between each backup entry (on verbose mode)
|
||||
if config.verbose:
|
||||
print("")
|
||||
|
||||
# Create compressed archive
|
||||
archive_res = self.create_tarball(work_dir, temp_tarball, config.verbose)
|
||||
match archive_res:
|
||||
case Err():
|
||||
self.cleanup_files(work_dir, temp_tarball)
|
||||
return archive_res
|
||||
case Ok(): pass
|
||||
case Ok():
|
||||
if config.verbose:
|
||||
print("")
|
||||
|
||||
# Encrypt the archive
|
||||
encrypt_res = self.encrypt_file(temp_tarball, backup_archive, config.password, config.verbose)
|
||||
@@ -492,7 +531,9 @@ class Backup:
|
||||
case Err():
|
||||
self.cleanup_files(work_dir, temp_tarball)
|
||||
return encrypt_res
|
||||
case Ok(): pass
|
||||
case Ok():
|
||||
if config.verbose:
|
||||
print("")
|
||||
|
||||
# Cleanup temporary files
|
||||
self.cleanup_files(work_dir, temp_tarball)
|
||||
@@ -505,18 +546,35 @@ class Backup:
|
||||
file_size = backup_archive.stat().st_size
|
||||
file_size_hr = self.prettify_size(file_size)
|
||||
|
||||
print(f"File name: '{backup_archive}'")
|
||||
if config.checksum:
|
||||
print(f"Checksums file: '{checksum_file}'")
|
||||
print(f"File size: {file_size} bytes ({file_size_hr})")
|
||||
print(f"Elapsed time: {self.prettify_timestamp(elapsed_time)}")
|
||||
# Print a table containing some information about the backup
|
||||
if config.verbose:
|
||||
rows = [
|
||||
("File name", f"'{backup_archive}'"),
|
||||
("File size", f"{file_size} bytes ({file_size_hr})"),
|
||||
("Elapsed time", f"{self.prettify_timestamp(elapsed_time)}")
|
||||
]
|
||||
|
||||
if config.checksum:
|
||||
rows.insert(1, ("Checksum file", f"'{checksum_file}'"))
|
||||
|
||||
# Compute column widths
|
||||
max_label_width = max(len(label) for label, _ in rows)
|
||||
max_value_width = max(len(value) for _, value in rows)
|
||||
|
||||
separator = f"+{'-' * (max_label_width + 2)}+{'-' * (max_value_width + 2)}+"
|
||||
print(separator)
|
||||
for label, value in rows:
|
||||
print(f"| {label:<{max_label_width}} | {value:<{max_value_width}} |")
|
||||
print(separator)
|
||||
|
||||
return Ok(None)
|
||||
|
||||
@staticmethod
|
||||
def decrypt_file(input_file: Path, output_file: Path, password: str, verbose: bool) -> Result[None]:
|
||||
"""Decrypt an encrypted backup archive"""
|
||||
start_time = 0
|
||||
if verbose:
|
||||
start_time = time.time()
|
||||
print("Decrypting backup...", end='', flush=True)
|
||||
|
||||
cmd = [
|
||||
@@ -541,14 +599,18 @@ class Backup:
|
||||
return Err(f"Decryption failed: {result.stderr.decode()}.")
|
||||
|
||||
if verbose:
|
||||
print("DONE")
|
||||
duration = time.time() - start_time
|
||||
print(f"{EscapeChar.GREEN.value}DONE{EscapeChar.RESET.value}"
|
||||
f" ({EscapeChar.CYAN.value}{duration:.2f}s{EscapeChar.RESET.value})")
|
||||
|
||||
return Ok(None)
|
||||
|
||||
@staticmethod
|
||||
def extract_tarball(archive_file: Path, verbose: bool) -> Result[Path]:
|
||||
"""Extract a tar archive and return the extracted path"""
|
||||
start_time = 0
|
||||
if verbose:
|
||||
start_time = time.time()
|
||||
print("Extracting backup...")
|
||||
|
||||
extracted_root: str = ""
|
||||
@@ -585,6 +647,7 @@ class Backup:
|
||||
if verbose:
|
||||
cmd.insert(1, "-v")
|
||||
progress = BackupProgress(len(entries), "Extracting backup...", "extracting")
|
||||
progress.start_time_tracking(start_time)
|
||||
|
||||
process = subprocess.Popen(
|
||||
cmd,
|
||||
@@ -680,14 +743,17 @@ class Backup:
|
||||
case Err():
|
||||
self.cleanup_files(temp_tarball, extracted_dir)
|
||||
return checksums_res
|
||||
case Ok(): pass
|
||||
case Ok():
|
||||
if verbose:
|
||||
print("")
|
||||
|
||||
self.cleanup_files(temp_tarball)
|
||||
|
||||
elapsed_time = time.time() - start_time
|
||||
|
||||
print(f"Backup extracted to: '{extracted_dir.parent.resolve() / extracted_dir}'")
|
||||
print(f"Elapsed time: {self.prettify_timestamp(elapsed_time)}")
|
||||
if verbose:
|
||||
print(f"Backup extracted to: '{extracted_dir.parent.resolve() / extracted_dir}'")
|
||||
print(f"Elapsed time: {self.prettify_timestamp(elapsed_time)}")
|
||||
|
||||
return Ok(None)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user