Files
marco 59376db19b
backup.py / unit-tests (push) Successful in 2m11s
Optimized extraction operation and improved CLI UI.
2026-04-17 10:28:08 +02:00

934 lines
31 KiB
Python
Executable File

#!/usr/bin/env python3
# backup.py: modular and lightweight backup utility
# Developed by Marco Cetica (c) 2018-2026
#
import argparse
import shutil
import sys
import os
import time
import subprocess
import hashlib
import signal
import struct
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass
from enum import Enum
from typing import Any, Generic, TypeVar, Union, Optional, List
HEADER_SIZE = 16
HEADER_FORMAT = ">QQ"
HEADER_MAGIC_NUMBER = 0x424B5F50595F4844 # BK_PY_HD
WORKDIR_NAME = "backup.py.tmp"
TARBALL_NAME = "backup.py.tar.gz"
T = TypeVar("T")
@dataclass(frozen=True)
class Ok(Generic[T]):
"""Sum type to represent results"""
value: T
@dataclass(frozen=True)
class Err:
error: str
Result = Union[Ok[T], Err]
@dataclass
class BackupSource:
"""Struct to represent a mapping between a label and a path"""
label: str
path: Path
@dataclass
class BackupState:
"""Struct to represent a backup state"""
sources: List[BackupSource]
output_path: Path
password: str
checksum: bool
verbose: bool
class SignalHandler:
"""Gracefully handle SIGINT (C-c)"""
def __init__(self) -> None:
self.interrupted = False
self.output_path: Optional[Path] = None
self.checksum_file: Optional[Path] = None
def setup(self, output_path: Path, checksum_file: Optional[Path] = None) -> None:
"""Configure signal handler with cleanup paths"""
self.output_path = output_path
self.checksum_file = checksum_file
signal.signal(signal.SIGINT, self.handle_interrupt)
def handle_interrupt(self, _sig_num: int, _frame: Any) -> None:
"""Handle SIGINT signal"""
# Second C-c: just exit without cleanup
if self.interrupted:
print("\nForced exit. temporary files NOT cleaned.", file=sys.stderr)
sys.exit(130) # that is, 128 + SIGINT(2)
# First C-c: cleanup and set flag
self.interrupted = True
print(
"\nBackup interrupted.\nCleaning up temporary files (press C-c again to force exit)...",
file=sys.stderr,
end='',
flush=True
)
if self.output_path:
temp_files = [
self.output_path / WORKDIR_NAME,
self.output_path / TARBALL_NAME
]
if self.checksum_file:
temp_files.append(self.checksum_file)
Backup.cleanup_files(*temp_files)
print("DONE", file=sys.stderr)
sys.exit(130)
class EscapeChar(Enum):
"""Enumeration for escape characters"""
RESET = '\033[0m'
GRAY = '\033[90m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
BLUE = '\033[94m'
CYAN = '\033[96m'
LINE_UP = '\033[A'
ERASE_LINE = '\033[2K'
class BackupProgress:
"""Progress indicator for backup operations"""
def __init__(self, total: int, operation: str, status_msg: str) -> None:
self.total = total
self.current = 0
self.operation = operation
self.status_msg = status_msg
self.start_time = 0
def start_time_tracking(self, existing_time = None) -> None:
"""Initialize time tracking"""
self.start_time = time.time() if not existing_time else existing_time
def log_operation(self) -> None:
"""Print the Backup operation to stdout"""
self.start_time_tracking()
print(self.operation)
def draw_progress_bar(self, filename: str = "") -> None:
"""draw progress bar"""
self.current += 1
actual = min(self.current, self.total)
percentage = (actual / self.total) * 100 if self.total > 0 else 0
# Create a CLI progress bar
bar_width = 30
filled = int(bar_width * actual / self.total)
bar = f"{EscapeChar.GRAY.value}{'' * filled}{'' * (bar_width - filled)}{EscapeChar.RESET.value}"
# Truncate filename if it's too long to display
# by keeping the first 30 characters + extension (if available)
# This prevents UI disruption
filename_max_len = 35
ext_max_len = 10
if len(filename) > filename_max_len:
ext_idx = filename.rfind('.')
if ext_idx > 0 and len(filename) - ext_idx <= ext_max_len:
ext = filename[ext_idx:]
filename = filename[:filename_max_len - len(ext) - 5] + "..." + ext
else:
filename = filename[:filename_max_len - 5]
progress_bar = (f"\r {self.status_msg} [{bar}] "
f"{EscapeChar.YELLOW.value}{percentage:.1f}%{EscapeChar.RESET.value} "
f"({actual}/{self.total}): "
f"{EscapeChar.BLUE.value}'{filename}'{EscapeChar.RESET.value}")
print(f"{EscapeChar.ERASE_LINE.value}{progress_bar}", end='', flush=True)
def complete_task(self) -> None:
"""Complete a task"""
# To complete a task, we do the following:
# 1. Move the cursor one line upwards
# 2. Move the cursor at end of operation message (i.e., rewrite the message)
# 3. Add duration there
# 4. Move the cursor downwards one line
duration = Backup.prettify_timestamp(time.time() - self.start_time)
print(f"{EscapeChar.LINE_UP.value}\r{self.operation}{EscapeChar.GREEN.value}DONE{EscapeChar.RESET.value} "
f"({EscapeChar.CYAN.value}{duration}{EscapeChar.RESET.value})\n")
class Backup:
@staticmethod
def build_header(entry_count: int) -> bytes:
"""Build an header containing backup metadata"""
# Header format:
# big endian (>), 1 uint64 (Q, 8 bytes) + 1 uint64 (Q, 8 bytes) = 16 bytes
return struct.pack(
HEADER_FORMAT,
HEADER_MAGIC_NUMBER,
entry_count
)
@staticmethod
def parse_header(data: bytes) -> Result[int]:
"""Parse metadata from a backup file"""
if len(data) < HEADER_SIZE:
return Err("File too small to contain a valid header.")
try:
magic, entry_count = struct.unpack(HEADER_FORMAT, data[:HEADER_SIZE])
except struct.error as err:
return Err(f"Malformed header: {err}.")
if magic != HEADER_MAGIC_NUMBER:
return Err("Invalid magic number.")
return Ok(entry_count)
@staticmethod
def check_deps() -> Result[None]:
"""Check whether dependencies are installed"""
missing_deps = []
for dep in ["gpg", "tar"]:
if not shutil.which(dep):
missing_deps.append(dep)
if missing_deps:
return Err(f"Missing dependencies: {', '.join(missing_deps)}.")
return Ok(None)
@staticmethod
def prettify_size(byte_size: int) -> str:
"""Convert byte_size in powers of 1024"""
units = ["B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB"]
idx = 0
size = float(byte_size)
while size >= 1024.0 and idx < (len(units) - 1):
size /= 1024.0
idx += 1
if size.is_integer():
return f"{int(size)} {units[idx]}"
return f"{size:.2f} {units[idx]}"
@staticmethod
def prettify_timestamp(timestamp: float) -> str:
"""Convert a timestamp in seconds to human-readable format"""
timestamp_int = int(timestamp)
hours = timestamp_int // 3600
minutes = (timestamp_int % 3600) // 60
seconds = timestamp_int % 60
parts = []
if hours > 0:
parts.append(f"{hours} hour{'s' if hours != 1 else ''}")
if minutes > 0:
parts.append(f"{minutes} minute{'s' if minutes != 1 else ''}")
if seconds > 0 or not parts: # show seconds if other parts are zero
parts.append(f"{seconds} second{'s' if seconds != 1 else ''}")
return ", ".join(parts)
@staticmethod
def parse_sources_file(sources_file: Path) -> Result[List[BackupSource]]:
"""Parse the sources file returning a list of BackupSource elements"""
if not sources_file.exists():
return Err("Sources file does not exist.")
sources: List[BackupSource] = []
try:
with open(sources_file, 'r') as f:
for pos, line in enumerate(f, 1):
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' not in line:
return Err(f"invalid format at line {pos}: '{line}'.")
label, path_str = line.split('=', 1)
path = Path(path_str.strip())
if not path.exists():
return Err(f"Path does not exist: '{path}'.")
sources.append(BackupSource(label.strip(), path))
except IOError as err:
return Err(f"Failed to read sources file: '{err}'.")
if not sources:
return Err(f"No valid sources found in file.")
return Ok(sources)
@staticmethod
def should_ignore_file(path: Path) -> bool:
"""Check whether a file should be ignored"""
try:
# Skip UNIX sockets
if path.is_socket():
return True
# Skip broken symlinks
if path.is_symlink() and not path.exists():
return True
# Skip named pipes (FIFOs)
if (path.stat().st_mode & 0o170000) == 0o010000:
return True
return False
except (OSError, IOError):
# Skip files that can't be checked
return True
@staticmethod
def ignore_special_files(directory: str, contents: List[str]) -> List[str]:
"""Return a list of files to ignore"""
ignored_files: List[str] = []
dir_path = Path(directory)
for item in contents:
item_path = dir_path / item
if Backup.should_ignore_file(item_path):
ignored_files.append(item)
return ignored_files
@staticmethod
def copy_files(source: Path, destination: Path) -> Result[None]:
"""Copy files and directories preserving their metadata"""
try:
# Handle single file
if source.is_file():
# Parent directory might not exists, so we try to create it first
destination.parent.mkdir(parents=True, exist_ok=True)
# Copy file and its metadata
shutil.copy2(source, destination)
return Ok(None)
# Handle directory
if source.is_dir():
# If destination directory exists, we remove it
# This approach mimics rsync's --delete option
if destination.exists():
shutil.rmtree(destination)
# Copy directory and its metadata.
# We also ignore special files and we preserves links instead
# of following them.
shutil.copytree(
source,
destination,
symlinks=True, # True = preserve symlinks
copy_function=shutil.copy2,
ignore=Backup.ignore_special_files,
dirs_exist_ok=False
)
return Ok(None)
return Err(f"The following source element is neither a file nor a directory: '{source}'.")
except (IOError, OSError, shutil.Error) as err:
return Err(f"Copy failed: {err}.")
@staticmethod
def cleanup_files(*paths: Path) -> None:
"""Clean up temporary files and directories"""
for path in paths:
if path is None or not path.exists():
continue
if path.is_dir():
shutil.rmtree(path, ignore_errors=True)
else:
path.unlink(missing_ok=True)
@staticmethod
def collect_files(directory: Path) -> List[Path]:
"""Collect all files in a directory (recursively)"""
files = []
for item in directory.rglob('*'):
if item.is_file() and not item.is_symlink():
files.append(item)
return files
@staticmethod
def compute_file_hash(file_path: Path) -> Result[str]:
"""Compute SHA256 hash of a given file"""
try:
hash_obj = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
hash_obj.update(byte_block)
return Ok(hash_obj.hexdigest())
except IOError as e:
return Err(f"Failed to read file '{file_path}': {e}.")
@staticmethod
def count_tar_entries(source_dir: Path) -> int:
"""Count all entries (files, dirs) processed by tar including the root directory"""
return sum(1 for _ in source_dir.rglob('*')) + 1
@staticmethod
def create_tarball(source_dir: Path, output_file: Path, verbose: bool) -> Result[int]:
"""Create a compressed tar archive of the backup directory"""
total_entries = Backup.count_tar_entries(source_dir)
progress: BackupProgress | None = None
if verbose:
progress = BackupProgress(total_entries, "Compressing backup...", "compressing")
progress.log_operation()
cmd = [
"tar",
"-czf",
str(output_file),
"-C",
str(source_dir.parent),
source_dir.name
]
if verbose:
cmd.insert(1, "-v")
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1
)
# Read subprocess output from pipe in buffered mode
if verbose and progress is not None:
if process.stdout is None:
return Err("Failed to capture output.")
for line in process.stdout:
line = line.strip()
if line:
# Extract filename from path
filename = Path(line).name
progress.draw_progress_bar(filename)
progress.complete_task()
# Wait for subprocess to complete
process.wait()
if process.returncode != 0:
return Err("Cannot create compressed archive.")
return Ok(total_entries)
@staticmethod
def encrypt_file(input_file: Path, output_file: Path, password: str,
entry_count: int, verbose: bool) -> Result[None]:
"""Encrypt a file with GPG and prepend an header"""
start_time = time.time()
if output_file.exists():
return Err("Encryption failed: archive already exists.")
if verbose:
print("Encrypting backup...", end='', flush=True)
# Write the encrypted file to a temporary file first. Then prepend
# the header afterward
tmp_enc = output_file.with_suffix(".enc.tmp")
cmd = [
"gpg", "-a",
"--symmetric",
"--cipher-algo=AES256",
"--no-symkey-cache",
"--pinentry-mode=loopback",
"--batch",
"--passphrase-fd", "0",
"--output", str(tmp_enc),
str(input_file)
]
result = subprocess.run(
cmd,
input=password.encode(),
capture_output=not verbose
)
if result.returncode != 0:
tmp_enc.unlink(missing_ok=True)
return Err(f"Encryption failed: {result.stderr.decode()}.")
try:
header = Backup.build_header(entry_count)
with open(output_file, "wb") as out, open(tmp_enc, "rb") as enc:
out.write(header) # Write header on the first 16 bytes of the file
shutil.copyfileobj(enc, out)
except IOError as err:
return Err(f"Failed to write encrypted backup file: {err}.")
finally:
tmp_enc.unlink(missing_ok=True)
if verbose:
duration = Backup.prettify_timestamp(time.time() - start_time)
print(f"{EscapeChar.GREEN.value}DONE{EscapeChar.RESET.value}"
f" ({EscapeChar.CYAN.value}{duration}{EscapeChar.RESET.value})")
return Ok(None)
def make_backup(self, config: BackupState) -> Result[None]:
"""Create an encrypted backup from specified sources file"""
start_time = time.time()
date_str = datetime.now().strftime("%Y%m%d")
hostname = os.uname().nodename
# Create working directory
work_dir = config.output_path / "backup.py.tmp"
if not work_dir.exists():
work_dir.mkdir(parents=True, exist_ok=True)
# Format output files
backup_archive = config.output_path / f"backup-{hostname}-{date_str}.tar.gz.enc"
checksum_file = config.output_path / f"backup-{hostname}-{date_str}.sha256"
temp_tarball = config.output_path / TARBALL_NAME
# Backup each source
sources_count = len(config.sources)
for idx, source in enumerate(config.sources, 1):
if config.verbose:
start_time = time.time()
print(f"Copying {source.label} ({idx}/{sources_count})...", end='', flush=True)
# Create source subdirectory
source_dir = work_dir / f"backup-{source.label}-{date_str}"
if not source_dir.exists():
source_dir.mkdir(parents=True, exist_ok=True)
# Copy files
copy_res = self.copy_files(source.path, source_dir)
match copy_res:
case Err():
self.cleanup_files(work_dir, temp_tarball)
return copy_res
case Ok():
if config.verbose:
duration = Backup.prettify_timestamp(time.time() - start_time)
print(f"{EscapeChar.GREEN.value}DONE{EscapeChar.RESET.value}"
f" ({EscapeChar.CYAN.value}{duration}{EscapeChar.RESET.value})")
# Compute checksum when requested
if config.checksum:
files = self.collect_files(source_dir)
backup_progress: BackupProgress | None = None
if config.verbose:
backup_progress = BackupProgress(len(files), "Computing checksums...", "computing")
backup_progress.log_operation()
with open(checksum_file, 'a') as checksum_fd:
for file in files:
hash_result = self.compute_file_hash(file)
match hash_result:
case Err():
checksum_fd.close()
self.cleanup_files(work_dir, temp_tarball)
return hash_result
case Ok(value=v):
checksum_fd.write(f"{v}\n")
if config.verbose and backup_progress is not None:
backup_progress.draw_progress_bar(str(file.name))
if config.verbose and backup_progress is not None:
backup_progress.complete_task()
# Create compressed archive
entry_count: int = 0
archive_res = self.create_tarball(work_dir, temp_tarball, config.verbose)
match archive_res:
case Err():
self.cleanup_files(work_dir, temp_tarball)
return archive_res
case Ok(value=entry_count): pass
# Encrypt the archive
encrypt_res = self.encrypt_file(temp_tarball, backup_archive, config.password, entry_count, config.verbose)
match encrypt_res:
case Err():
self.cleanup_files(work_dir, temp_tarball)
return encrypt_res
case Ok(): pass
# Cleanup temporary files
self.cleanup_files(work_dir, temp_tarball)
# Compute file size
if not backup_archive.exists():
return Err("Unable to create backup archive.")
elapsed_time = time.time() - start_time
file_size = backup_archive.stat().st_size
file_size_hr = self.prettify_size(file_size)
# Print a table containing some information about the backup
if config.verbose:
rows = [
("File name", f"'{backup_archive}'"),
("File size", f"{file_size} bytes ({file_size_hr})"),
("Elapsed time", f"{self.prettify_timestamp(elapsed_time)}")
]
if config.checksum:
rows.insert(1, ("Checksum file", f"'{checksum_file}'"))
# Compute column widths
max_label_width = max(len(label) for label, _ in rows)
max_value_width = max(len(value) for _, value in rows)
separator = f"+{'-' * (max_label_width + 2)}+{'-' * (max_value_width + 2)}+"
print(separator)
for label, value in rows:
print(f"| {label:<{max_label_width}} | {value:<{max_value_width}} |")
print(separator)
return Ok(None)
@staticmethod
def decrypt_file(input_file: Path, output_file: Path, password: str, verbose: bool) -> Result[int]:
"""Strip header, decrypt the backup file and return entry count"""
start_time = 0
if verbose:
start_time = time.time()
print("Decrypting backup...", end='', flush=True)
try:
with open(input_file, "rb") as file:
header_data = file.read(HEADER_SIZE)
except IOError as err:
return Err(f"Failed to read encrypted backup file: {err}.")
header_res = Backup.parse_header(header_data)
match header_res:
case Err():
return header_res
case Ok(value=entry_res): pass
tmp_payload = input_file.with_suffix(".payload.tmp")
try:
with open(input_file, "rb") as src, open(tmp_payload, "wb") as dest:
src.seek(HEADER_SIZE)
shutil.copyfileobj(src, dest)
except IOError as err:
tmp_payload.unlink(missing_ok=True)
return Err(f"Failed to strip header from backup file: {err}.")
cmd = [
"gpg", "-a",
"--quiet",
"--decrypt",
"--no-symkey-cache",
"--pinentry-mode=loopback",
"--batch",
"--passphrase-fd", "0",
"--output", str(output_file),
str(tmp_payload)
]
result = subprocess.run(
cmd,
input=password.encode(),
capture_output=True
)
tmp_payload.unlink(missing_ok=True)
if result.returncode != 0:
return Err(f"Decryption failed: {result.stderr.decode()}.")
if verbose:
duration = Backup.prettify_timestamp(time.time() - start_time)
print(f"{EscapeChar.GREEN.value}DONE{EscapeChar.RESET.value}"
f" ({EscapeChar.CYAN.value}{duration}{EscapeChar.RESET.value})")
return Ok(entry_res)
@staticmethod
def extract_tarball(archive_file: Path, entry_count: int, verbose: bool) -> Result[Path]:
"""Extract a tar archive and return the extracted path"""
start_time = 0
if verbose:
start_time = time.time()
print("Extracting backup...")
cmd = [
"tar",
"-xzf",
str(archive_file),
"-C",
str(archive_file.parent)
]
progress: BackupProgress | None = None
if verbose:
cmd.insert(1, "-v")
progress = BackupProgress(entry_count, "Extracting backup...", "extracting")
progress.start_time_tracking(start_time)
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1
)
if verbose and progress is not None:
if process.stdout is None:
return Err("Failed to capture output.")
for line in process.stdout:
line = line.strip()
if line:
filename = Path(line).name
progress.draw_progress_bar(filename)
progress.complete_task()
# Wait for process to complete
process.wait()
if process.returncode != 0:
return Err("Unable to extract compressed archive.")
root_path = archive_file.parent / WORKDIR_NAME
if not root_path.exists():
return Err(f"Extracted '{root_path}' not found.")
return Ok(root_path)
@staticmethod
def verify_backup(extracted_dir: Path, checksum_file: Path, verbose: bool) -> Result[None]:
"""Verify the integrity of a backup archive"""
try:
with open(checksum_file, 'r') as cf:
expected_hashes = set(line.strip() for line in cf if line.strip())
except IOError as err:
return Err(f"Failed to load checksums file: {err}.")
files = Backup.collect_files(extracted_dir)
progress = None
if verbose:
progress = BackupProgress(len(files), "Verifying backup...", "verifying")
progress.log_operation()
for file in files:
hash_res = Backup.compute_file_hash(file)
match hash_res:
case Err():
return hash_res
case Ok(value=file_hash):
if file_hash not in expected_hashes:
return Err(f"{'\n' if verbose else ''}!! Integrity error for '{file}' !!")
if verbose and progress is not None:
progress.draw_progress_bar(file.name)
if verbose and progress is not None:
progress.complete_task()
return Ok(None)
def extract_backup(self, archive_file: Path, password: str, checksum_file: Optional[Path], verbose: bool) -> Result[None]:
"""Extract and verify a backup archive"""
start_time = time.time()
temp_tarball = archive_file.parent / TARBALL_NAME
entry_count = 0
decrypt_res = self.decrypt_file(archive_file, temp_tarball, password, verbose)
match decrypt_res:
case Err():
self.cleanup_files(temp_tarball)
return decrypt_res
case Ok(value=count):
entry_count = count
extracted_dir: Path | None = None
extract_res = self.extract_tarball(temp_tarball, entry_count, verbose)
match extract_res:
case Err():
self.cleanup_files(temp_tarball)
return extract_res
case Ok(value=root_dir):
extracted_dir = root_dir
# Verify checksums when required
if checksum_file:
checksums_res = self.verify_backup(extracted_dir, checksum_file, verbose)
match checksums_res:
case Err():
self.cleanup_files(temp_tarball, extracted_dir)
return checksums_res
case Ok(): pass
self.cleanup_files(temp_tarball)
elapsed_time = time.time() - start_time
if verbose:
print(f"Backup extracted to: '{extracted_dir.parent.resolve() / extracted_dir}'")
print(f"Elapsed time: {self.prettify_timestamp(elapsed_time)}")
return Ok(None)
def main():
signal_handler = SignalHandler()
parser = argparse.ArgumentParser(
description="backup.py - modular and lightweight backup utility"
)
parser.add_argument(
"-b", "--backup",
nargs=3,
metavar=("SOURCES", "DEST", "PASS"),
help="Backup files from SOURCES path to DEST directory with password PASS"
)
parser.add_argument(
"-e", "--extract",
nargs="+",
metavar="ARCHIVE",
help="Extract ARCHIVE (optionally with PASS and SHA256 file)"
)
parser.add_argument(
"-c", "--checksum",
action="store_true",
help="Generate or check SHA256 checksums"
)
parser.add_argument(
"-V", "--verbose",
action="store_true",
help="Enable verbose mode"
)
args = parser.parse_args()
if not (args.backup or args.extract):
parser.error("specify either --backup or --extract.")
# Check whether dependencies are installed
deps_res = Backup.check_deps()
match deps_res:
case Err(error=e):
print(f"{e}", file=sys.stderr)
sys.exit(1)
case Ok(): pass
backup = Backup()
if args.backup:
# Check root permissions
if os.geteuid() != 0:
print("The '--backup' option requires root permissions.", file=sys.stderr)
sys.exit(1)
sources_file, output_path, encryption_pass = args.backup
sources_path = Path(sources_file)
output_dir = Path(output_path)
# Determine checksum file if requested
date_str = datetime.now().strftime("%Y%m%d")
hostname = os.uname().nodename
checksum_file = output_dir / f"backup-{hostname}-{date_str}.sha256" if args.checksum else None
signal_handler.setup(output_dir, checksum_file)
# Check whether output directory exists
if not output_dir.exists():
print(f"Output directory '{output_dir}' does not exist.", file=sys.stderr)
sys.exit(1)
# Parse sources file
sources_res = Backup.parse_sources_file(sources_path)
config: BackupState
match sources_res:
case Err(error=e):
print(f"{e}", file=sys.stderr)
sys.exit(1)
case Ok(value=v):
# Create a backup state
config = BackupState(
sources=v,
output_path=output_dir,
password=encryption_pass,
checksum=args.checksum,
verbose=args.verbose
)
backup_res = backup.make_backup(config)
match backup_res:
case Err(error=e):
print(f"{e}", file=sys.stderr)
sys.exit(1)
elif args.extract:
archive_file = Path(args.extract[0])
signal_handler.setup(archive_file.parent)
if not archive_file.exists():
print(f"Archive file '{archive_file}' does not exist.", file=sys.stderr)
sys.exit(1)
decryption_pass: str = ""
checksum_file: Path | None = None
if len(args.extract) >= 2:
decryption_pass = args.extract[1]
else:
print("--extract flag requires decryption password as second argument.", file=sys.stderr)
sys.exit(1)
if args.checksum:
if len(args.extract) >= 3:
checksum_file = Path(args.extract[2])
else:
print("--checksum flag requires SHA256 file as third argument.", file=sys.stderr)
sys.exit(1)
if not checksum_file.exists():
print(f"Checksums file '{checksum_file}' does not exist.", file=sys.stderr)
sys.exit(1)
extract_res = backup.extract_backup(archive_file, decryption_pass, checksum_file, args.verbose)
match extract_res:
case Err(error=e):
print(f"{e}", file=sys.stderr)
sys.exit(1)
else:
parser.print_help()
sys.exit(1)
if __name__ == "__main__":
main()