diff options
| author | Leonard Kugis <leonard@kug.is> | 2026-04-12 03:23:26 +0200 |
|---|---|---|
| committer | Leonard Kugis <leonard@kug.is> | 2026-04-12 03:23:26 +0200 |
| commit | 50438b7012805e61cb4263cfe411a42455164b79 (patch) | |
| tree | 5d825d14a0fc6ef67478395ea5ebb7c754818285 /archival.py | |
| parent | be333abac6d927af0de3dbaddccf1c478efe154a (diff) | |
| download | scripts-50438b7012805e61cb4263cfe411a42455164b79.tar.gz | |
Directories can now be split across containers if specified directly in args.
Previously, size of directory (and its contents) was calculated and
checked against container capacity.
Diffstat (limited to 'archival.py')
| -rwxr-xr-x | archival.py | 96 |
1 files changed, 52 insertions, 44 deletions
diff --git a/archival.py b/archival.py index 7ba4309..f259c7f 100755 --- a/archival.py +++ b/archival.py @@ -8,20 +8,24 @@ from typing import List, Tuple, Dict import subprocess import shutil -def get_file_size(file_path: str) -> int: - path = Path(file_path) - if not path.exists(): - raise FileNotFoundError(f"Path does not exist: {file_path}") +def collect_files_from_paths(paths: List[str]) -> List[Tuple[str, int]]: + files_with_sizes = [] + + for path_str in paths: + path = Path(path_str) + if not path.exists(): + print(f"WARNING: Path does not exist: {path_str}", file=sys.stderr) + continue + + if path.is_file(): + files_with_sizes.append((str(path.absolute()), path.stat().st_size)) + + elif path.is_dir(): + for item in path.rglob("*"): + if item.is_file(): + files_with_sizes.append((str(item.absolute()), item.stat().st_size)) - if path.is_file(): - return path.stat().st_size - elif path.is_dir(): - total_size = 0 - for item in path.rglob("*"): - if item.is_file(): - total_size += item.stat().st_size - return total_size - return 0 + return files_with_sizes def load_paths_from_file(file_path: str) -> List[str]: @@ -35,18 +39,10 @@ def load_paths_from_file(file_path: str) -> List[str]: def split_files( - file_paths: List[str], + files_with_sizes: List[Tuple[str, int]], max_container_size: int, output_prefix: str ) -> Dict[int, List[str]]: - files_with_sizes = [] - for path in file_paths: - try: - size = get_file_size(path) - files_with_sizes.append((path, size)) - except Exception as e: - print(f"Warnung: {e}", file=sys.stderr) - files_with_sizes.sort(key=lambda x: x[1], reverse=True) containers = {} @@ -56,14 +52,18 @@ def split_files( for file_path, file_size in files_with_sizes: if file_size > max_container_size: - print(f"WARNING: File too large for container ({file_size} > {max_container_size}): {file_path}", file=sys.stderr) + print(f"WARNING: file too large for container ({file_size} > {max_container_size}): {file_path}", file=sys.stderr) continue if current_size + file_size > max_container_size: - containers[container_index] = current_container.copy() - container_index += 1 - current_container = [file_path] - current_size = file_size + if current_container: + containers[container_index] = current_container.copy() + container_index += 1 + current_container = [file_path] + current_size = file_size + else: + current_container.append(file_path) + current_size = file_size else: current_container.append(file_path) current_size += file_size @@ -156,7 +156,7 @@ def write_container_files( return False return True - + def calculate_statistics( containers: Dict[int, List[str]], @@ -167,11 +167,9 @@ def calculate_statistics( print("\n--- Stats ---") for container_index, file_paths in containers.items(): - container_size = 0 - for file_path in file_paths: - container_size += get_file_size(file_path) + container_size = sum(os.path.getsize(f) for f in file_paths) - utilization = (container_size / max_container_size) * 100 + utilization = (container_size / max_container_size) * 100 if max_container_size > 0 else 0 print(f"Container {container_index}:") print(f" Files: {len(file_paths)}") print(f" Size: {container_size} Bytes ({container_size / 1024**2:.2f} MB)") @@ -180,7 +178,10 @@ def calculate_statistics( total_files += len(file_paths) total_size += container_size - print(f"\nTotal: {total_files} Files, {total_size} Bytes ({total_size / 1024**2:.2f} MB)") + print(f"\nTotal: {total_files} files, {total_size} bytes ({total_size / 1024**2:.2f} MB)") + if containers: + avg_utilization = total_size / (len(containers) * max_container_size) * 100 if max_container_size > 0 else 0 + print(f"Avg. container usage: {avg_utilization:.1f}%") def main(): @@ -243,32 +244,39 @@ def main(): ) args = parser.parse_args() - - file_paths = [] + input_paths = [] if args.files: - file_paths.extend(args.files) + input_paths.extend(args.files) if args.from_file: try: - file_paths.extend(load_paths_from_file(args.from_file)) + input_paths.extend(load_paths_from_file(args.from_file)) except Exception as e: - print(f"Error with reading file: {e}", file=sys.stderr) + print(f"Error reading args from file: {e}", file=sys.stderr) sys.exit(1) - if not file_paths: - print("No paths provided.", file=sys.stderr) + if not input_paths: + print("No paths given.", file=sys.stderr) parser.print_help() sys.exit(1) - print(f"Processing {len(file_paths)} files / directories with container size {args.container_size} Bytes...") + print(f"Processing {len(input_paths)} paths.") + + all_files = collect_files_from_paths(input_paths) + + if not all_files: + print("No files found.", file=sys.stderr) + sys.exit(1) + + print(f"Found: {len(all_files)} files with total size {sum(size for _, size in all_files) / 1024**2:.2f} MB") + + containers = split_files(all_files, args.container_size, args.output_prefix) - containers = split_files(file_paths, args.container_size, args.output_prefix) - if not shutil.which("mksquashfs"): print("mksquashfs not available. Please install it first.", file=sys.stderr) sys.exit(1) - + write_container_files(containers, args.source, args.mksquashfs_args, args.cryptsetup, args.cryptsetup_args, args.output_prefix) if args.verbose: |
