aboutsummaryrefslogtreecommitdiffstats
path: root/archival.py
diff options
context:
space:
mode:
Diffstat (limited to 'archival.py')
-rwxr-xr-xarchival.py281
1 files changed, 281 insertions, 0 deletions
diff --git a/archival.py b/archival.py
new file mode 100755
index 0000000..7ba4309
--- /dev/null
+++ b/archival.py
@@ -0,0 +1,281 @@
+#!/usr/bin/env python3
+
+import os
+import sys
+import argparse
+from pathlib import Path
+from typing import List, Tuple, Dict
+import subprocess
+import shutil
+
+def get_file_size(file_path: str) -> int:
+ path = Path(file_path)
+ if not path.exists():
+ raise FileNotFoundError(f"Path does not exist: {file_path}")
+
+ if path.is_file():
+ return path.stat().st_size
+ elif path.is_dir():
+ total_size = 0
+ for item in path.rglob("*"):
+ if item.is_file():
+ total_size += item.stat().st_size
+ return total_size
+ return 0
+
+
+def load_paths_from_file(file_path: str) -> List[str]:
+ paths = []
+ with open(file_path, 'r', encoding='utf-8') as f:
+ for line in f:
+ line = line.strip()
+ if line and not line.startswith('#'):
+ paths.append(line)
+ return paths
+
+
+def split_files(
+ file_paths: List[str],
+ max_container_size: int,
+ output_prefix: str
+) -> Dict[int, List[str]]:
+ files_with_sizes = []
+ for path in file_paths:
+ try:
+ size = get_file_size(path)
+ files_with_sizes.append((path, size))
+ except Exception as e:
+ print(f"Warnung: {e}", file=sys.stderr)
+
+ files_with_sizes.sort(key=lambda x: x[1], reverse=True)
+
+ containers = {}
+ current_container = []
+ current_size = 0
+ container_index = 0
+
+ for file_path, file_size in files_with_sizes:
+ if file_size > max_container_size:
+ print(f"WARNING: File too large for container ({file_size} > {max_container_size}): {file_path}", file=sys.stderr)
+ continue
+
+ if current_size + file_size > max_container_size:
+ containers[container_index] = current_container.copy()
+ container_index += 1
+ current_container = [file_path]
+ current_size = file_size
+ else:
+ current_container.append(file_path)
+ current_size += file_size
+
+ if current_container:
+ containers[container_index] = current_container
+
+ return containers
+
+def encrypt_container(
+ output_file: str,
+ cryptsetup_args: str
+) -> bool:
+ print("Encrypting container {}.".format(output_file))
+ result = subprocess.run(["truncate", "-s", "+32M", output_file], text=True)
+ if result.returncode != 0:
+ print("ERROR: truncate")
+ return False
+ cmd = [
+ "cryptsetup",
+ "-q",
+ "reencrypt",
+ "--encrypt",
+ "--type",
+ "luks2",
+ "--reduce-device-size",
+ "32M"
+ ]
+ cmd.extend(cryptsetup_args.split(" "))
+ cmd.append(output_file)
+ result = subprocess.run(cmd, text=True)
+ if result.returncode != 0:
+ print("ERROR: cryptsetup")
+ return False
+ result = subprocess.run(["truncate", "-s", "-16M", output_file], text=True)
+ if result.returncode != 0:
+ print("ERROR: truncate")
+ return False
+ result = subprocess.run(["mv", output_file, "{}.luks".format(output_file)], text=True)
+ if result.returncode != 0:
+ print("ERROR: mv")
+ return False
+ return True
+
+def write_container_files(
+ containers: Dict[int, List[str]],
+ source: str,
+ mksquashfs_args: str,
+ cryptsetup: bool,
+ cryptsetup_args: str,
+ output_prefix: str
+) -> bool:
+ for container_index, file_paths in containers.items():
+ output_desc = f"{output_prefix}-{container_index}.txt"
+ print("Writing descriptor file {}.".format(output_desc))
+
+ with open(output_desc, 'w', encoding='utf-8') as f:
+ for file_path in file_paths:
+ abs_path = os.path.abspath(file_path)
+ f.write(f"{abs_path}\n")
+
+ print("Descriptor file written.")
+
+ for container_index, file_paths in containers.items():
+ output_file = f"{output_prefix}-{container_index}.squashfs"
+ print("Writing plain container {}".format(output_file))
+
+ cmd = [
+ "mksquashfs",
+ source,
+ output_file
+ ]
+ cmd.extend(mksquashfs_args.split(" "))
+
+ for i in range(0, len(containers)):
+ if i != container_index:
+ cmd.append("-ef")
+ cmd.append("{}-{}.txt".format(output_prefix, i))
+
+ result = subprocess.run(cmd, text=True)
+
+ if result.returncode != 0:
+ print("ERROR: mksquashfs")
+ return False
+
+ print("Plain container written. Index: {}, No. files: {}".format(container_index, len(file_paths)))
+
+ if cryptsetup:
+ if not encrypt_container(output_file, cryptsetup_args):
+ return False
+
+ return True
+
+
+def calculate_statistics(
+ containers: Dict[int, List[str]],
+ max_container_size: int
+) -> None:
+ total_files = 0
+ total_size = 0
+
+ print("\n--- Stats ---")
+ for container_index, file_paths in containers.items():
+ container_size = 0
+ for file_path in file_paths:
+ container_size += get_file_size(file_path)
+
+ utilization = (container_size / max_container_size) * 100
+ print(f"Container {container_index}:")
+ print(f" Files: {len(file_paths)}")
+ print(f" Size: {container_size} Bytes ({container_size / 1024**2:.2f} MB)")
+ print(f" Usage: {utilization:.1f}% ({container_size}/{max_container_size})")
+
+ total_files += len(file_paths)
+ total_size += container_size
+
+ print(f"\nTotal: {total_files} Files, {total_size} Bytes ({total_size / 1024**2:.2f} MB)")
+
+
+def main():
+ parser = argparse.ArgumentParser(
+ description="Distribute archival files across multiple SquashFS containers of fixed maximum size."
+ )
+
+
+ parser.add_argument(
+ "--source",
+ "-s",
+ required=True,
+ help="Source directory"
+ )
+ parser.add_argument(
+ "--files",
+ "-f",
+ nargs="+",
+ help="List of files and directories (space separated)"
+ )
+ parser.add_argument(
+ "--from-file",
+ "-i",
+ help="Read list of files and directories from file (newline separated)"
+ )
+ parser.add_argument(
+ "--container-size",
+ "-c",
+ type=int,
+ required=True,
+ help="Maximum container size in Bytes"
+ )
+ parser.add_argument(
+ "--output-prefix",
+ "-o",
+ required=True,
+ help="Output prefix"
+ )
+ parser.add_argument(
+ "--mksquashfs-args",
+ "-m",
+ help="Arguments for mksquashfs"
+ )
+ parser.add_argument(
+ "--cryptsetup",
+ "-e",
+ action="store_true",
+ help="Encrypt containers using cryptsetup"
+ )
+ parser.add_argument(
+ "--cryptsetup-args",
+ "-a",
+ help="Arguments for cryptsetup"
+ )
+ parser.add_argument(
+ "--verbose",
+ "-v",
+ action="store_true",
+ help="Increased verbosity"
+ )
+
+ args = parser.parse_args()
+
+ file_paths = []
+
+ if args.files:
+ file_paths.extend(args.files)
+
+ if args.from_file:
+ try:
+ file_paths.extend(load_paths_from_file(args.from_file))
+ except Exception as e:
+ print(f"Error with reading file: {e}", file=sys.stderr)
+ sys.exit(1)
+
+ if not file_paths:
+ print("No paths provided.", file=sys.stderr)
+ parser.print_help()
+ sys.exit(1)
+
+ print(f"Processing {len(file_paths)} files / directories with container size {args.container_size} Bytes...")
+
+ containers = split_files(file_paths, args.container_size, args.output_prefix)
+
+ if not shutil.which("mksquashfs"):
+ print("mksquashfs not available. Please install it first.", file=sys.stderr)
+ sys.exit(1)
+
+ write_container_files(containers, args.source, args.mksquashfs_args, args.cryptsetup, args.cryptsetup_args, args.output_prefix)
+
+ if args.verbose:
+ calculate_statistics(containers, args.container_size)
+
+ print(f"\nDone. Created {len(containers)} containers.")
+
+
+if __name__ == "__main__":
+ main()