#!/usr/bin/env python3 import os import sys import argparse from pathlib import Path from typing import List, Tuple, Dict import subprocess import shutil def collect_files_from_paths(paths: List[str]) -> List[Tuple[str, int]]: files_with_sizes = [] for path_str in paths: path = Path(path_str) if not path.exists(): print(f"WARNING: Path does not exist: {path_str}", file=sys.stderr) continue if path.is_file(): files_with_sizes.append((str(path.absolute()), path.stat().st_size)) elif path.is_dir(): for item in path.rglob("*"): if item.is_file(): files_with_sizes.append((str(item.absolute()), item.stat().st_size)) return files_with_sizes def load_paths_from_file(file_path: str) -> List[str]: paths = [] with open(file_path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line and not line.startswith('#'): paths.append(line) return paths def split_files( files_with_sizes: List[Tuple[str, int]], max_container_size: int, output_prefix: str ) -> Dict[int, List[str]]: files_with_sizes.sort(key=lambda x: x[1], reverse=True) containers = {} current_container = [] current_size = 0 container_index = 0 for file_path, file_size in files_with_sizes: if file_size > max_container_size: print(f"WARNING: file too large for container ({file_size} > {max_container_size}): {file_path}", file=sys.stderr) continue if current_size + file_size > max_container_size: if current_container: containers[container_index] = current_container.copy() container_index += 1 current_container = [file_path] current_size = file_size else: current_container.append(file_path) current_size = file_size else: current_container.append(file_path) current_size += file_size if current_container: containers[container_index] = current_container return containers def encrypt_container( output_file: str, cryptsetup_args: str ) -> bool: print("Encrypting container {}.".format(output_file)) result = subprocess.run(["truncate", "-s", "+32M", output_file], text=True) if result.returncode != 0: print("ERROR: truncate") return False cmd = [ "cryptsetup", "-q", "reencrypt", "--encrypt", "--type", "luks2", "--reduce-device-size", "32M" ] cmd.extend(cryptsetup_args.split(" ")) cmd.append(output_file) result = subprocess.run(cmd, text=True) if result.returncode != 0: print("ERROR: cryptsetup") return False result = subprocess.run(["truncate", "-s", "-16M", output_file], text=True) if result.returncode != 0: print("ERROR: truncate") return False result = subprocess.run(["mv", output_file, "{}.luks".format(output_file)], text=True) if result.returncode != 0: print("ERROR: mv") return False return True def write_container_files( containers: Dict[int, List[str]], source: str, mksquashfs_args: str, cryptsetup: bool, cryptsetup_args: str, output_prefix: str ) -> bool: for container_index, file_paths in containers.items(): output_desc = f"{output_prefix}-{container_index}.txt" print("Writing descriptor file {}.".format(output_desc)) with open(output_desc, 'w', encoding='utf-8') as f: for file_path in file_paths: abs_path = os.path.abspath(file_path) f.write(f"{abs_path}\n") print("Descriptor file written.") for container_index, file_paths in containers.items(): output_file = f"{output_prefix}-{container_index}.squashfs" print("Writing plain container {}".format(output_file)) cmd = [ "mksquashfs", source, output_file ] cmd.extend(mksquashfs_args.split(" ")) for i in range(0, len(containers)): if i != container_index: cmd.append("-ef") cmd.append("{}-{}.txt".format(output_prefix, i)) result = subprocess.run(cmd, text=True) if result.returncode != 0: print("ERROR: mksquashfs") return False print("Plain container written. Index: {}, No. files: {}".format(container_index, len(file_paths))) if cryptsetup: if not encrypt_container(output_file, cryptsetup_args): return False return True def calculate_statistics( containers: Dict[int, List[str]], max_container_size: int ) -> None: total_files = 0 total_size = 0 print("\n--- Stats ---") for container_index, file_paths in containers.items(): container_size = sum(os.path.getsize(f) for f in file_paths) utilization = (container_size / max_container_size) * 100 if max_container_size > 0 else 0 print(f"Container {container_index}:") print(f" Files: {len(file_paths)}") print(f" Size: {container_size} Bytes ({container_size / 1024**2:.2f} MB)") print(f" Usage: {utilization:.1f}% ({container_size}/{max_container_size})") total_files += len(file_paths) total_size += container_size print(f"\nTotal: {total_files} files, {total_size} bytes ({total_size / 1024**2:.2f} MB)") if containers: avg_utilization = total_size / (len(containers) * max_container_size) * 100 if max_container_size > 0 else 0 print(f"Avg. container usage: {avg_utilization:.1f}%") def main(): parser = argparse.ArgumentParser( description="Distribute archival files across multiple SquashFS containers of fixed maximum size." ) parser.add_argument( "--source", "-s", required=True, help="Source directory" ) parser.add_argument( "--files", "-f", nargs="+", help="List of files and directories (space separated)" ) parser.add_argument( "--from-file", "-i", help="Read list of files and directories from file (newline separated)" ) parser.add_argument( "--container-size", "-c", type=int, required=True, help="Maximum container size in Bytes" ) parser.add_argument( "--output-prefix", "-o", required=True, help="Output prefix" ) parser.add_argument( "--mksquashfs-args", "-m", help="Arguments for mksquashfs" ) parser.add_argument( "--cryptsetup", "-e", action="store_true", help="Encrypt containers using cryptsetup" ) parser.add_argument( "--cryptsetup-args", "-a", help="Arguments for cryptsetup" ) parser.add_argument( "--verbose", "-v", action="store_true", help="Increased verbosity" ) args = parser.parse_args() input_paths = [] if args.files: input_paths.extend(args.files) if args.from_file: try: input_paths.extend(load_paths_from_file(args.from_file)) except Exception as e: print(f"Error reading args from file: {e}", file=sys.stderr) sys.exit(1) if not input_paths: print("No paths given.", file=sys.stderr) parser.print_help() sys.exit(1) print(f"Processing {len(input_paths)} paths.") all_files = collect_files_from_paths(input_paths) if not all_files: print("No files found.", file=sys.stderr) sys.exit(1) print(f"Found: {len(all_files)} files with total size {sum(size for _, size in all_files) / 1024**2:.2f} MB") containers = split_files(all_files, args.container_size, args.output_prefix) if not shutil.which("mksquashfs"): print("mksquashfs not available. Please install it first.", file=sys.stderr) sys.exit(1) write_container_files(containers, args.source, args.mksquashfs_args, args.cryptsetup, args.cryptsetup_args, args.output_prefix) if args.verbose: calculate_statistics(containers, args.container_size) print(f"\nDone. Created {len(containers)} containers.") if __name__ == "__main__": main()