Optimized extraction operation and improved CLI UI.
All checks were successful
backup.py / unit-tests (push) Successful in 2m11s

This commit is contained in:
2026-04-17 10:28:08 +02:00
parent ebb4547dff
commit 59376db19b
3 changed files with 164 additions and 82 deletions

View File

@@ -24,23 +24,18 @@ Then, you can start the backup process with the following command:
```sh ```sh
$ sudo ./backup.py --verbose --checksum --backup sources.ini $PWD "very_bad_pw" $ sudo ./backup.py --verbose --checksum --backup sources.ini $PWD "very_bad_pw"
Copying photos (1/3)...DONE (0.02s) Copying photos (1/3)...DONE (0 seconds)
Computing checksums...DONE (0.01s) Computing checksums...DONE (0.01s)
computing [██████████████████████████████] 100.0% (5/5): 'Screenshot From 2026-01-22....png' computing [██████████████████████████████] 100.0% (5/5): 'Screenshot From 2026-01-22....png'
Copying documents (2/3)...DONE (3 seconds)
Copying documents (2/3)...DONE (3.39s) Computing checksums...DONE (1 seconds)
Computing checksums...DONE (1.26s)
computing [██████████████████████████████] 100.0% (7881/7881): 'master' computing [██████████████████████████████] 100.0% (7881/7881): 'master'
Copying wireguard (3/3)...DONE (0 seconds)
Copying wireguard (3/3)...DONE (0.00s) Computing checksums...DONE (0 seconds)
Computing checksums...DONE (0.00s)
computing [██████████████████████████████] 100.0% (1/1): 'wg0.conf' computing [██████████████████████████████] 100.0% (1/1): 'wg0.conf'
Compressing backup...DONE (22 seconds)
Compressing backup...DONE (22.52s)
compressing [██████████████████████████████] 100.0% (8355/8354): 'rec2.jpg' compressing [██████████████████████████████] 100.0% (8355/8354): 'rec2.jpg'
Encrypting backup...DONE (0 seconds)
Encrypting backup...DONE (0.90s)
+---------------+------------------------------------------------------------------+ +---------------+------------------------------------------------------------------+
| File name | '/home/marco/Projects/backup.py/backup-wood-20260129.tar.gz.enc' | | File name | '/home/marco/Projects/backup.py/backup-wood-20260129.tar.gz.enc' |
+---------------+------------------------------------------------------------------+ +---------------+------------------------------------------------------------------+
@@ -59,10 +54,10 @@ To extract an existing backup, you can instead issue the following command:
```sh ```sh
$ ./backup.py --verbose --checksum --extract backup-wood-20260129.tar.gz.enc "very_bad_pw" backup-wood-20260129.sha256 $ ./backup.py --verbose --checksum --extract backup-wood-20260129.tar.gz.enc "very_bad_pw" backup-wood-20260129.sha256
Decrypting backup...DONE (0.76s) Decrypting backup...DONE (0 seconds)
Extracting backup...DONE (6.93s) Extracting backup...DONE (6 seconds)
extracting [██████████████████████████████] 100.0% (8355/8355): 'rec2.jpg' extracting [██████████████████████████████] 100.0% (8355/8355): 'rec2.jpg'
Verifying backup...DONE (0.89s) Verifying backup...DONE (0 seconds)
verifying [██████████████████████████████] 100.0% (7887/7887): 'master' verifying [██████████████████████████████] 100.0% (7887/7887): 'master'
Backup extracted to: '/home/marco/Projects/backup.py/backup.py.tmp' Backup extracted to: '/home/marco/Projects/backup.py/backup.py.tmp'
@@ -119,13 +114,55 @@ it follows the procedure listed below:
1. **Copy phase**: uses Python `shutil.copytree()` to copy files while preserving metadata and 1. **Copy phase**: uses Python `shutil.copytree()` to copy files while preserving metadata and
symlinks (without following them) and by ignoring special files; symlinks (without following them) and by ignoring special files;
2. **Compression**: creates a gzip-compressed tar archive using GNU tar; 2. **Compression**: creates a gzip-compressed tar archive using GNU tar and GZIP;
3. **Encryption**: encrypts the archive with GPG using AES-256 symmetric encryption; 3. **Encryption**: encrypts the archive with GPG using AES-256 symmetric encryption;
4. **Checksum** (optional): computes SHA256 hashes for each file in the backup archive. 4. **Checksum** (optional): computes SHA256 hashes for each file in the backup archive.
The backup process creates temporary files in `backup.py.tmp` and `backup.py.tar.gz`, which are The backup process creates temporary files in `backup.py.tmp` and `backup.py.tar.gz`, which are
automatically cleaned up on completion or interruption (i.e., `C-c`). automatically cleaned up on completion or interruption (i.e., `C-c`).
The final backup file consists in two separate parts: an *header* and a *payload*. The former is used
to determine whether a given backup archive is valid or not and to store various metadata values while the latter
is used to store the actual encrypted content. In particular, the header (16 bytes) consists of the following layout:
| Offset | Size | Field |
|--------|-----------------------|--------------|
| 0 | 8 Bytes (`uint64`) | Magic number |
| 8 | 8 Bytes (`uint64`) | file count |
The magic number, which is used to recognize whether a backup file is valid, is equal to `0x424B5F50595F4844` (that is, `BK_PY_HD`). The last
8 bytes are instead used to store the element count of the backup file (which is then used on the extraction phase). In other words, this is the
structure of a backup file:
```shell
$ xxd -l 42 backup-foo-20260417.tar.gz.enc
00000000: 424b 5f50 595f 4844 0000 0000 0000 000a BK_PY_HD........
00000010: 2d2d 2d2d 2d42 4547 494e 2050 4750 204d -----BEGIN PGP M
00000020: 4553 5341 4745 2d2d 2d2d ESSAGE----
```
As you can see, the first 8 bytes (`424b 5f50 595f 4844`) represents the magic number, while the last 8 represent the number of elements inside
the backup file (`0000 0000 0000 000a`), which in this example is equal to `0xA`. On extraction, we should see exactly this number of elements:
```shell
$ ./backup.py -Vce backup-arxtop-20260417.tar.gz.enc "test" backup-arxtop-20260417.sha256
Decrypting backup...DONE (0 seconds)
Extracting backup...DONE (0 seconds)
extracting [██████████████████████████████] 100.0% (10/10): 'ufetch.c'
Verifying backup...DONE (0 seconds)
verifying [██████████████████████████████] 100.0% (7/7): 'ufetch.c'
Backup extracted to: '/home/marco/Projects/backup.py/backup.py.tmp'
Elapsed time: 0 seconds
```
You may also be wondering why the count of the two operations (*extracting* and *verifying*) differs. This is due to the fact that the first
also includes the directories, while the latter only the actual files.
## Security concerns
This tool is not intended to provide advanced security features such as plausible deniability. Many of the steps of the backup process invalidates
this concept *by design*. Additionally, the structure of the final backup file contains some metadata that can be used to determine some information
about the backup (namely, the number of elements).
## Old version ## Old version
This implementation of `backup.py` is a porting of an old backup script originally written in Bash This implementation of `backup.py` is a porting of an old backup script originally written in Bash
that I developed back in 2018. While this new version should be compatible with old backup archives, that I developed back in 2018. While this new version should be compatible with old backup archives,

173
backup.py
View File

@@ -11,12 +11,19 @@ import time
import subprocess import subprocess
import hashlib import hashlib
import signal import signal
import struct
from pathlib import Path from pathlib import Path
from datetime import datetime from datetime import datetime
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum from enum import Enum
from typing import Any, Generic, TypeVar, Union, Optional, List from typing import Any, Generic, TypeVar, Union, Optional, List
HEADER_SIZE = 16
HEADER_FORMAT = ">QQ"
HEADER_MAGIC_NUMBER = 0x424B5F50595F4844 # BK_PY_HD
WORKDIR_NAME = "backup.py.tmp"
TARBALL_NAME = "backup.py.tar.gz"
T = TypeVar("T") T = TypeVar("T")
@dataclass(frozen=True) @dataclass(frozen=True)
class Ok(Generic[T]): class Ok(Generic[T]):
@@ -75,8 +82,8 @@ class SignalHandler:
if self.output_path: if self.output_path:
temp_files = [ temp_files = [
self.output_path / "backup.py.tmp", self.output_path / WORKDIR_NAME,
self.output_path / "backup.py.tar.gz" self.output_path / TARBALL_NAME
] ]
if self.checksum_file: if self.checksum_file:
@@ -123,7 +130,7 @@ class BackupProgress:
actual = min(self.current, self.total) actual = min(self.current, self.total)
percentage = (actual / self.total) * 100 if self.total > 0 else 0 percentage = (actual / self.total) * 100 if self.total > 0 else 0
# Create a CLI prograss bar # Create a CLI progress bar
bar_width = 30 bar_width = 30
filled = int(bar_width * actual / self.total) filled = int(bar_width * actual / self.total)
bar = f"{EscapeChar.GRAY.value}{'' * filled}{'' * (bar_width - filled)}{EscapeChar.RESET.value}" bar = f"{EscapeChar.GRAY.value}{'' * filled}{'' * (bar_width - filled)}{EscapeChar.RESET.value}"
@@ -154,11 +161,39 @@ class BackupProgress:
# 2. Move the cursor at end of operation message (i.e., rewrite the message) # 2. Move the cursor at end of operation message (i.e., rewrite the message)
# 3. Add duration there # 3. Add duration there
# 4. Move the cursor downwards one line # 4. Move the cursor downwards one line
duration = time.time() - self.start_time duration = Backup.prettify_timestamp(time.time() - self.start_time)
print(f"{EscapeChar.LINE_UP.value}\r{self.operation}{EscapeChar.GREEN.value}DONE{EscapeChar.RESET.value} " print(f"{EscapeChar.LINE_UP.value}\r{self.operation}{EscapeChar.GREEN.value}DONE{EscapeChar.RESET.value} "
f"({EscapeChar.CYAN.value}{duration:.2f}s{EscapeChar.RESET.value})\n") f"({EscapeChar.CYAN.value}{duration}{EscapeChar.RESET.value})\n")
class Backup: class Backup:
@staticmethod
def build_header(entry_count: int) -> bytes:
"""Build an header containing backup metadata"""
# Header format:
# big endian (>), 1 uint64 (Q, 8 bytes) + 1 uint64 (Q, 8 bytes) = 16 bytes
return struct.pack(
HEADER_FORMAT,
HEADER_MAGIC_NUMBER,
entry_count
)
@staticmethod
def parse_header(data: bytes) -> Result[int]:
"""Parse metadata from a backup file"""
if len(data) < HEADER_SIZE:
return Err("File too small to contain a valid header.")
try:
magic, entry_count = struct.unpack(HEADER_FORMAT, data[:HEADER_SIZE])
except struct.error as err:
return Err(f"Malformed header: {err}.")
if magic != HEADER_MAGIC_NUMBER:
return Err("Invalid magic number.")
return Ok(entry_count)
@staticmethod @staticmethod
def check_deps() -> Result[None]: def check_deps() -> Result[None]:
"""Check whether dependencies are installed""" """Check whether dependencies are installed"""
@@ -353,11 +388,11 @@ class Backup:
return sum(1 for _ in source_dir.rglob('*')) + 1 return sum(1 for _ in source_dir.rglob('*')) + 1
@staticmethod @staticmethod
def create_tarball(source_dir: Path, output_file: Path, verbose: bool) -> Result[None]: def create_tarball(source_dir: Path, output_file: Path, verbose: bool) -> Result[int]:
"""Create a compressed tar archive of the backup directory""" """Create a compressed tar archive of the backup directory"""
total_entries = Backup.count_tar_entries(source_dir)
progress: BackupProgress | None = None progress: BackupProgress | None = None
if verbose: if verbose:
total_entries = Backup.count_tar_entries(source_dir)
progress = BackupProgress(total_entries, "Compressing backup...", "compressing") progress = BackupProgress(total_entries, "Compressing backup...", "compressing")
progress.log_operation() progress.log_operation()
@@ -400,11 +435,12 @@ class Backup:
if process.returncode != 0: if process.returncode != 0:
return Err("Cannot create compressed archive.") return Err("Cannot create compressed archive.")
return Ok(None) return Ok(total_entries)
@staticmethod @staticmethod
def encrypt_file(input_file: Path, output_file: Path, password: str, verbose: bool) -> Result[None]: def encrypt_file(input_file: Path, output_file: Path, password: str,
"""Encrypt a file with GPG in symmetric mode (using AES256)""" entry_count: int, verbose: bool) -> Result[None]:
"""Encrypt a file with GPG and prepend an header"""
start_time = time.time() start_time = time.time()
if output_file.exists(): if output_file.exists():
@@ -413,6 +449,10 @@ class Backup:
if verbose: if verbose:
print("Encrypting backup...", end='', flush=True) print("Encrypting backup...", end='', flush=True)
# Write the encrypted file to a temporary file first. Then prepend
# the header afterward
tmp_enc = output_file.with_suffix(".enc.tmp")
cmd = [ cmd = [
"gpg", "-a", "gpg", "-a",
"--symmetric", "--symmetric",
@@ -421,7 +461,7 @@ class Backup:
"--pinentry-mode=loopback", "--pinentry-mode=loopback",
"--batch", "--batch",
"--passphrase-fd", "0", "--passphrase-fd", "0",
"--output", str(output_file), "--output", str(tmp_enc),
str(input_file) str(input_file)
] ]
@@ -432,12 +472,23 @@ class Backup:
) )
if result.returncode != 0: if result.returncode != 0:
tmp_enc.unlink(missing_ok=True)
return Err(f"Encryption failed: {result.stderr.decode()}.") return Err(f"Encryption failed: {result.stderr.decode()}.")
try:
header = Backup.build_header(entry_count)
with open(output_file, "wb") as out, open(tmp_enc, "rb") as enc:
out.write(header) # Write header on the first 16 bytes of the file
shutil.copyfileobj(enc, out)
except IOError as err:
return Err(f"Failed to write encrypted backup file: {err}.")
finally:
tmp_enc.unlink(missing_ok=True)
if verbose: if verbose:
duration = time.time() - start_time duration = Backup.prettify_timestamp(time.time() - start_time)
print(f"{EscapeChar.GREEN.value}DONE{EscapeChar.RESET.value}" print(f"{EscapeChar.GREEN.value}DONE{EscapeChar.RESET.value}"
f" ({EscapeChar.CYAN.value}{duration:.2f}s{EscapeChar.RESET.value})") f" ({EscapeChar.CYAN.value}{duration}{EscapeChar.RESET.value})")
return Ok(None) return Ok(None)
@@ -455,7 +506,7 @@ class Backup:
# Format output files # Format output files
backup_archive = config.output_path / f"backup-{hostname}-{date_str}.tar.gz.enc" backup_archive = config.output_path / f"backup-{hostname}-{date_str}.tar.gz.enc"
checksum_file = config.output_path / f"backup-{hostname}-{date_str}.sha256" checksum_file = config.output_path / f"backup-{hostname}-{date_str}.sha256"
temp_tarball = config.output_path / "backup.py.tar.gz" temp_tarball = config.output_path / TARBALL_NAME
# Backup each source # Backup each source
sources_count = len(config.sources) sources_count = len(config.sources)
@@ -477,9 +528,9 @@ class Backup:
return copy_res return copy_res
case Ok(): case Ok():
if config.verbose: if config.verbose:
duration = time.time() - start_time duration = Backup.prettify_timestamp(time.time() - start_time)
print(f"{EscapeChar.GREEN.value}DONE{EscapeChar.RESET.value}" print(f"{EscapeChar.GREEN.value}DONE{EscapeChar.RESET.value}"
f" ({EscapeChar.CYAN.value}{duration:.2f}s{EscapeChar.RESET.value})") f" ({EscapeChar.CYAN.value}{duration}{EscapeChar.RESET.value})")
# Compute checksum when requested # Compute checksum when requested
if config.checksum: if config.checksum:
@@ -508,29 +559,22 @@ class Backup:
if config.verbose and backup_progress is not None: if config.verbose and backup_progress is not None:
backup_progress.complete_task() backup_progress.complete_task()
# Add a blank line between each backup entry (on verbose mode)
if config.verbose:
print("")
# Create compressed archive # Create compressed archive
entry_count: int = 0
archive_res = self.create_tarball(work_dir, temp_tarball, config.verbose) archive_res = self.create_tarball(work_dir, temp_tarball, config.verbose)
match archive_res: match archive_res:
case Err(): case Err():
self.cleanup_files(work_dir, temp_tarball) self.cleanup_files(work_dir, temp_tarball)
return archive_res return archive_res
case Ok(): case Ok(value=entry_count): pass
if config.verbose:
print("")
# Encrypt the archive # Encrypt the archive
encrypt_res = self.encrypt_file(temp_tarball, backup_archive, config.password, config.verbose) encrypt_res = self.encrypt_file(temp_tarball, backup_archive, config.password, entry_count, config.verbose)
match encrypt_res: match encrypt_res:
case Err(): case Err():
self.cleanup_files(work_dir, temp_tarball) self.cleanup_files(work_dir, temp_tarball)
return encrypt_res return encrypt_res
case Ok(): case Ok(): pass
if config.verbose:
print("")
# Cleanup temporary files # Cleanup temporary files
self.cleanup_files(work_dir, temp_tarball) self.cleanup_files(work_dir, temp_tarball)
@@ -567,13 +611,34 @@ class Backup:
return Ok(None) return Ok(None)
@staticmethod @staticmethod
def decrypt_file(input_file: Path, output_file: Path, password: str, verbose: bool) -> Result[None]: def decrypt_file(input_file: Path, output_file: Path, password: str, verbose: bool) -> Result[int]:
"""Decrypt an encrypted backup archive""" """Strip header, decrypt the backup file and return entry count"""
start_time = 0 start_time = 0
if verbose: if verbose:
start_time = time.time() start_time = time.time()
print("Decrypting backup...", end='', flush=True) print("Decrypting backup...", end='', flush=True)
try:
with open(input_file, "rb") as file:
header_data = file.read(HEADER_SIZE)
except IOError as err:
return Err(f"Failed to read encrypted backup file: {err}.")
header_res = Backup.parse_header(header_data)
match header_res:
case Err():
return header_res
case Ok(value=entry_res): pass
tmp_payload = input_file.with_suffix(".payload.tmp")
try:
with open(input_file, "rb") as src, open(tmp_payload, "wb") as dest:
src.seek(HEADER_SIZE)
shutil.copyfileobj(src, dest)
except IOError as err:
tmp_payload.unlink(missing_ok=True)
return Err(f"Failed to strip header from backup file: {err}.")
cmd = [ cmd = [
"gpg", "-a", "gpg", "-a",
"--quiet", "--quiet",
@@ -583,7 +648,7 @@ class Backup:
"--batch", "--batch",
"--passphrase-fd", "0", "--passphrase-fd", "0",
"--output", str(output_file), "--output", str(output_file),
str(input_file) str(tmp_payload)
] ]
result = subprocess.run( result = subprocess.run(
@@ -592,45 +657,26 @@ class Backup:
capture_output=True capture_output=True
) )
tmp_payload.unlink(missing_ok=True)
if result.returncode != 0: if result.returncode != 0:
return Err(f"Decryption failed: {result.stderr.decode()}.") return Err(f"Decryption failed: {result.stderr.decode()}.")
if verbose: if verbose:
duration = time.time() - start_time duration = Backup.prettify_timestamp(time.time() - start_time)
print(f"{EscapeChar.GREEN.value}DONE{EscapeChar.RESET.value}" print(f"{EscapeChar.GREEN.value}DONE{EscapeChar.RESET.value}"
f" ({EscapeChar.CYAN.value}{duration:.2f}s{EscapeChar.RESET.value})") f" ({EscapeChar.CYAN.value}{duration}{EscapeChar.RESET.value})")
return Ok(None) return Ok(entry_res)
@staticmethod @staticmethod
def extract_tarball(archive_file: Path, verbose: bool) -> Result[Path]: def extract_tarball(archive_file: Path, entry_count: int, verbose: bool) -> Result[Path]:
"""Extract a tar archive and return the extracted path""" """Extract a tar archive and return the extracted path"""
start_time = 0 start_time = 0
if verbose: if verbose:
start_time = time.time() start_time = time.time()
print("Extracting backup...") print("Extracting backup...")
extracted_root: str = ""
# Count archive content
list_cmd = ["tar", "-tzf", str(archive_file)]
try:
list_res = subprocess.run(
list_cmd,
capture_output=True,
text=True,
check=True
)
entries = list_res.stdout.strip().split('\n')
if not entries or not entries[0]:
return Err("Archive is empty or corrupted.")
# Retrieve root directory from first entry
extracted_root = entries[0].split('/')[0]
except subprocess.CalledProcessError as err:
return Err(f"Failed to list archive content: {err}.")
cmd = [ cmd = [
"tar", "tar",
"-xzf", "-xzf",
@@ -643,7 +689,7 @@ class Backup:
if verbose: if verbose:
cmd.insert(1, "-v") cmd.insert(1, "-v")
progress = BackupProgress(len(entries), "Extracting backup...", "extracting") progress = BackupProgress(entry_count, "Extracting backup...", "extracting")
progress.start_time_tracking(start_time) progress.start_time_tracking(start_time)
process = subprocess.Popen( process = subprocess.Popen(
@@ -671,7 +717,7 @@ class Backup:
if process.returncode != 0: if process.returncode != 0:
return Err("Unable to extract compressed archive.") return Err("Unable to extract compressed archive.")
root_path = archive_file.parent / extracted_root root_path = archive_file.parent / WORKDIR_NAME
if not root_path.exists(): if not root_path.exists():
return Err(f"Extracted '{root_path}' not found.") return Err(f"Extracted '{root_path}' not found.")
@@ -715,17 +761,18 @@ class Backup:
"""Extract and verify a backup archive""" """Extract and verify a backup archive"""
start_time = time.time() start_time = time.time()
temp_tarball = archive_file.parent / Path("backup.py.tar.gz") temp_tarball = archive_file.parent / TARBALL_NAME
entry_count = 0
decrypt_res = self.decrypt_file(archive_file, temp_tarball, password, verbose) decrypt_res = self.decrypt_file(archive_file, temp_tarball, password, verbose)
match decrypt_res: match decrypt_res:
case Err(): case Err():
self.cleanup_files(temp_tarball) self.cleanup_files(temp_tarball)
return decrypt_res return decrypt_res
case Ok(): pass case Ok(value=count):
entry_count = count
extracted_dir: Path | None = None extracted_dir: Path | None = None
extract_res = self.extract_tarball(temp_tarball, verbose) extract_res = self.extract_tarball(temp_tarball, entry_count, verbose)
match extract_res: match extract_res:
case Err(): case Err():
self.cleanup_files(temp_tarball) self.cleanup_files(temp_tarball)
@@ -740,9 +787,7 @@ class Backup:
case Err(): case Err():
self.cleanup_files(temp_tarball, extracted_dir) self.cleanup_files(temp_tarball, extracted_dir)
return checksums_res return checksums_res
case Ok(): case Ok(): pass
if verbose:
print("")
self.cleanup_files(temp_tarball) self.cleanup_files(temp_tarball)

View File

@@ -7,6 +7,6 @@
web_server=/var/www/ web_server=/var/www/
ssh=/etc/ssh/ ssh=/etc/ssh/
# while individual files do not # ...while individual files do not
wireguard=/etc/wireguard/wg0.conf wireguard=/etc/wireguard/wg0.conf
firewall=/etc/nftables.conf firewall=/etc/nftables.conf