diff options
| author | Leonard Kugis <leonard@kug.is> | 2026-04-12 02:43:43 +0200 |
|---|---|---|
| committer | Leonard Kugis <leonard@kug.is> | 2026-04-12 02:43:43 +0200 |
| commit | 8e39b34b708a9ef855261e1c5ab0f7664d2c89e6 (patch) | |
| tree | d58f181bc9cc8c01e3710ffd87faff126de73f98 | |
| parent | 6a578732f9bf45d7325fe2154496d73995659ae0 (diff) | |
| download | scripts-8e39b34b708a9ef855261e1c5ab0f7664d2c89e6.tar.gz | |
Added archival script
This script automatically distributes a set of files and directories
to multiple containers with given maximum capacity.
It stores them as SquashFS filesystems, with optional LUKS encryption.
| -rwxr-xr-x | archival.py | 281 |
1 files changed, 281 insertions, 0 deletions
diff --git a/archival.py b/archival.py new file mode 100755 index 0000000..7ba4309 --- /dev/null +++ b/archival.py @@ -0,0 +1,281 @@ +#!/usr/bin/env python3 + +import os +import sys +import argparse +from pathlib import Path +from typing import List, Tuple, Dict +import subprocess +import shutil + +def get_file_size(file_path: str) -> int: + path = Path(file_path) + if not path.exists(): + raise FileNotFoundError(f"Path does not exist: {file_path}") + + if path.is_file(): + return path.stat().st_size + elif path.is_dir(): + total_size = 0 + for item in path.rglob("*"): + if item.is_file(): + total_size += item.stat().st_size + return total_size + return 0 + + +def load_paths_from_file(file_path: str) -> List[str]: + paths = [] + with open(file_path, 'r', encoding='utf-8') as f: + for line in f: + line = line.strip() + if line and not line.startswith('#'): + paths.append(line) + return paths + + +def split_files( + file_paths: List[str], + max_container_size: int, + output_prefix: str +) -> Dict[int, List[str]]: + files_with_sizes = [] + for path in file_paths: + try: + size = get_file_size(path) + files_with_sizes.append((path, size)) + except Exception as e: + print(f"Warnung: {e}", file=sys.stderr) + + files_with_sizes.sort(key=lambda x: x[1], reverse=True) + + containers = {} + current_container = [] + current_size = 0 + container_index = 0 + + for file_path, file_size in files_with_sizes: + if file_size > max_container_size: + print(f"WARNING: File too large for container ({file_size} > {max_container_size}): {file_path}", file=sys.stderr) + continue + + if current_size + file_size > max_container_size: + containers[container_index] = current_container.copy() + container_index += 1 + current_container = [file_path] + current_size = file_size + else: + current_container.append(file_path) + current_size += file_size + + if current_container: + containers[container_index] = current_container + + return containers + +def encrypt_container( + output_file: str, + cryptsetup_args: str +) -> bool: + print("Encrypting container {}.".format(output_file)) + result = subprocess.run(["truncate", "-s", "+32M", output_file], text=True) + if result.returncode != 0: + print("ERROR: truncate") + return False + cmd = [ + "cryptsetup", + "-q", + "reencrypt", + "--encrypt", + "--type", + "luks2", + "--reduce-device-size", + "32M" + ] + cmd.extend(cryptsetup_args.split(" ")) + cmd.append(output_file) + result = subprocess.run(cmd, text=True) + if result.returncode != 0: + print("ERROR: cryptsetup") + return False + result = subprocess.run(["truncate", "-s", "-16M", output_file], text=True) + if result.returncode != 0: + print("ERROR: truncate") + return False + result = subprocess.run(["mv", output_file, "{}.luks".format(output_file)], text=True) + if result.returncode != 0: + print("ERROR: mv") + return False + return True + +def write_container_files( + containers: Dict[int, List[str]], + source: str, + mksquashfs_args: str, + cryptsetup: bool, + cryptsetup_args: str, + output_prefix: str +) -> bool: + for container_index, file_paths in containers.items(): + output_desc = f"{output_prefix}-{container_index}.txt" + print("Writing descriptor file {}.".format(output_desc)) + + with open(output_desc, 'w', encoding='utf-8') as f: + for file_path in file_paths: + abs_path = os.path.abspath(file_path) + f.write(f"{abs_path}\n") + + print("Descriptor file written.") + + for container_index, file_paths in containers.items(): + output_file = f"{output_prefix}-{container_index}.squashfs" + print("Writing plain container {}".format(output_file)) + + cmd = [ + "mksquashfs", + source, + output_file + ] + cmd.extend(mksquashfs_args.split(" ")) + + for i in range(0, len(containers)): + if i != container_index: + cmd.append("-ef") + cmd.append("{}-{}.txt".format(output_prefix, i)) + + result = subprocess.run(cmd, text=True) + + if result.returncode != 0: + print("ERROR: mksquashfs") + return False + + print("Plain container written. Index: {}, No. files: {}".format(container_index, len(file_paths))) + + if cryptsetup: + if not encrypt_container(output_file, cryptsetup_args): + return False + + return True + + +def calculate_statistics( + containers: Dict[int, List[str]], + max_container_size: int +) -> None: + total_files = 0 + total_size = 0 + + print("\n--- Stats ---") + for container_index, file_paths in containers.items(): + container_size = 0 + for file_path in file_paths: + container_size += get_file_size(file_path) + + utilization = (container_size / max_container_size) * 100 + print(f"Container {container_index}:") + print(f" Files: {len(file_paths)}") + print(f" Size: {container_size} Bytes ({container_size / 1024**2:.2f} MB)") + print(f" Usage: {utilization:.1f}% ({container_size}/{max_container_size})") + + total_files += len(file_paths) + total_size += container_size + + print(f"\nTotal: {total_files} Files, {total_size} Bytes ({total_size / 1024**2:.2f} MB)") + + +def main(): + parser = argparse.ArgumentParser( + description="Distribute archival files across multiple SquashFS containers of fixed maximum size." + ) + + + parser.add_argument( + "--source", + "-s", + required=True, + help="Source directory" + ) + parser.add_argument( + "--files", + "-f", + nargs="+", + help="List of files and directories (space separated)" + ) + parser.add_argument( + "--from-file", + "-i", + help="Read list of files and directories from file (newline separated)" + ) + parser.add_argument( + "--container-size", + "-c", + type=int, + required=True, + help="Maximum container size in Bytes" + ) + parser.add_argument( + "--output-prefix", + "-o", + required=True, + help="Output prefix" + ) + parser.add_argument( + "--mksquashfs-args", + "-m", + help="Arguments for mksquashfs" + ) + parser.add_argument( + "--cryptsetup", + "-e", + action="store_true", + help="Encrypt containers using cryptsetup" + ) + parser.add_argument( + "--cryptsetup-args", + "-a", + help="Arguments for cryptsetup" + ) + parser.add_argument( + "--verbose", + "-v", + action="store_true", + help="Increased verbosity" + ) + + args = parser.parse_args() + + file_paths = [] + + if args.files: + file_paths.extend(args.files) + + if args.from_file: + try: + file_paths.extend(load_paths_from_file(args.from_file)) + except Exception as e: + print(f"Error with reading file: {e}", file=sys.stderr) + sys.exit(1) + + if not file_paths: + print("No paths provided.", file=sys.stderr) + parser.print_help() + sys.exit(1) + + print(f"Processing {len(file_paths)} files / directories with container size {args.container_size} Bytes...") + + containers = split_files(file_paths, args.container_size, args.output_prefix) + + if not shutil.which("mksquashfs"): + print("mksquashfs not available. Please install it first.", file=sys.stderr) + sys.exit(1) + + write_container_files(containers, args.source, args.mksquashfs_args, args.cryptsetup, args.cryptsetup_args, args.output_prefix) + + if args.verbose: + calculate_statistics(containers, args.container_size) + + print(f"\nDone. Created {len(containers)} containers.") + + +if __name__ == "__main__": + main() |
