#!/usr/bin/env python3 import argparse import os import tarfile import subprocess from typing import Dict, List, Optional import pandas as pd import matplotlib.pyplot as plt from matplotlib.backends.backend_pdf import PdfPages from matplotlib import font_manager from modules.base import Frame, BigFrame, Module, ModuleResult from modules.general import GeneralModule from datetime import datetime CSV_COLUMNS = [ "Datum", "Nutzer", "Distributionsgruppe", "Distributionsflag", "Positionsbezeichnung", "Positionswert", "Modules", "Parameters", "Beleg", ] def _pick_mono_font(size: int = 8) -> font_manager.FontProperties: for fam in ["Inconsolata", "DejaVu Sans Mono", "monospace"]: try: return font_manager.FontProperties(family=fam, size=size) except Exception: pass return font_manager.FontProperties(size=size) def _decorate_figure(fig, mono_font, title: str, generated_at: str, page: int, total_pages: int): # Margins: links/rechts 2cm, oben/unten 1cm margin_lr_cm = 2.0 margin_tb_cm = 1.0 # Zusätzlicher Abstand (Bänder) zwischen Header/Footer und Content header_gap_cm = 1.3 # mehr Abstand nach unten footer_gap_cm = 2.0 # mehr Abstand nach oben (2-zeiliger Footer) cm_to_in = 1 / 2.54 margin_lr_in = margin_lr_cm * cm_to_in margin_tb_in = margin_tb_cm * cm_to_in header_gap_in = header_gap_cm * cm_to_in footer_gap_in = footer_gap_cm * cm_to_in w_in, h_in = fig.get_size_inches() mx = min(0.45, margin_lr_in / w_in) my = min(0.45, margin_tb_in / h_in) header_gap = header_gap_in / h_in footer_gap = footer_gap_in / h_in # Content-Bereich: innerhalb der Margins + zusätzlich Platz für Header/Footer top = 1 - my - header_gap bottom = my + footer_gap if top <= bottom: # Fallback, falls es zu eng wird top = 1 - my bottom = my fig.subplots_adjust(left=mx, right=1 - mx, top=top, bottom=bottom) # Header/Footer Positionen: jeweils an der inneren Kante der Margins left_x = mx right_x = 1 - mx header_y = 1 - my footer_y = my # Kopfzeile fig.text(left_x, header_y, title, ha="left", va="top", fontproperties=mono_font, fontsize=9) fig.text(right_x, header_y, generated_at, ha="right", va="top", fontproperties=mono_font, fontsize=9) # Fußzeile links (zweizeilig) footer_left = ( "xembu - eXtensiblE Multiuser Bookkeeping Utility\n" "Copyright (C) 2024 Leonard Kugis\n" "This program comes with ABSOLUTELY NO WARRANTY; for details see LICENSE.txt" ) fig.text(left_x, footer_y, footer_left, ha="left", va="bottom", fontproperties=mono_font, fontsize=7, linespacing=1.1) # Fußzeile rechts fig.text(right_x, footer_y, f"{page} / {total_pages}", ha="right", va="bottom", fontproperties=mono_font, fontsize=8) def _read_csv_flexible(path: str) -> pd.DataFrame: df = pd.read_csv(path, sep=";", encoding="utf-8", header=0) if not set(CSV_COLUMNS).issubset(set(df.columns)): df = pd.read_csv(path, sep=";", encoding="utf-8", header=None, names=CSV_COLUMNS) return df def parse_value_unit(s: str): if s is None or (isinstance(s, float) and pd.isna(s)): return 0.0, "" txt = str(s).strip() if not txt: return 0.0, "" parts = txt.split() if len(parts) < 2: num = txt.replace(",", ".").replace("€", "").strip() return float(num), "" unit = parts[-1].strip() num_str = " ".join(parts[:-1]).strip().replace(",", ".").replace("€", "").strip() return float(num_str), unit def parse_modules_list(s: str) -> List[str]: if s is None or (isinstance(s, float) and pd.isna(s)): return [] mods = [m.strip() for m in str(s).split(",")] return [m for m in mods if m] def parse_groups_list(s: str) -> List[str]: if s is None or (isinstance(s, float) and pd.isna(s)): return [] gs = [g.strip() for g in str(s).split(",")] return [g for g in gs if g] def parse_parameters_list(s: str) -> List[tuple]: if s is None or (isinstance(s, float) and pd.isna(s)): return [] txt = str(s).strip() if not txt: return [] import re tuples = [] for m in re.finditer(r"\(([^()]*)\)", txt): inner = m.group(1).strip() if inner == "": tuples.append(tuple()) continue parts = [p.strip() for p in inner.split(",") if p.strip() != ""] vals = [] for p in parts: try: if "." in p: vals.append(float(p)) else: vals.append(int(p)) except Exception: try: vals.append(float(p)) except Exception: vals.append(p) tuples.append(tuple(vals)) return tuples def parse_csv(path: str) -> pd.DataFrame: df = _read_csv_flexible(path) df["Datum"] = pd.to_datetime(df["Datum"], format="%Y-%m-%d-%H-%M-%S", errors="coerce") df["Nutzer"] = df["Nutzer"].astype(str).str.strip() df["Distributionsflag"] = df["Distributionsflag"].astype(str).str.strip().str.upper() df["Positionsbezeichnung"] = df["Positionsbezeichnung"].astype(str).str.strip() df["dist_groups"] = df["Distributionsgruppe"].apply(parse_groups_list) df["modules_list"] = df["Modules"].apply(parse_modules_list) df["params_list"] = df["Parameters"].apply(parse_parameters_list) vals_units = df["Positionswert"].apply(parse_value_unit) df["value"] = vals_units.apply(lambda x: x[0]) df["unit"] = vals_units.apply(lambda x: x[1]) df["Beleg"] = df["Beleg"].where(df["Beleg"].notna(), "") return df def compute_hash(filepath: str, base_dir: str = ".") -> Optional[str]: import hashlib try: if not filepath: return None full_path = os.path.join(base_dir, filepath) with open(full_path, "rb") as f: return hashlib.sha1(f.read()).hexdigest() except Exception: return None def _build_positions_table_figs(df: pd.DataFrame, base_dir: str, mono_font): figures = [] columns = [ "Datum", "Nutzer", "Distributionsgruppe", "Flag", "Positionsbezeichnung", "Positionswert", "Modules", "Parameters", "Beleg", "SHA1", ] table_data = [] for _, row in df.sort_values("Datum").iterrows(): sha1 = compute_hash(str(row["Beleg"]), base_dir=base_dir) if row["Beleg"] else None sha1_fmt = "" if sha1: sha1_fmt = sha1[: len(sha1) // 2] + "\n" + sha1[len(sha1) // 2 :] groups_str = ", ".join(row["dist_groups"]) if isinstance(row["dist_groups"], list) else str(row["Distributionsgruppe"]) mods_str = ", ".join(row["modules_list"]) if isinstance(row["modules_list"], list) else str(row["Modules"]) params_str = str(row["params_list"]) if isinstance(row["params_list"], list) else str(row["Parameters"]) table_data.append([ row["Datum"].strftime("%Y-%m-%d %H:%M:%S") if pd.notna(row["Datum"]) else "INVALID", row["Nutzer"], groups_str, row["Distributionsflag"], row["Positionsbezeichnung"], f"{row['value']:.4f} {row['unit']}".strip(), mods_str, params_str, str(row["Beleg"]) if row["Beleg"] else "", sha1_fmt, ]) chunk_size = 16 fontprops = font_manager.FontProperties(size=8) for start in range(0, len(table_data), chunk_size): fig, ax = plt.subplots(figsize=(8.27, 11.69)) ax.axis("off") chunk = table_data[start:start + chunk_size] renderer = fig.canvas.get_renderer() def get_text_width(text, prop): t = plt.text(0, 0, str(text), fontproperties=prop) bb = t.get_window_extent(renderer=renderer) t.remove() return bb.width col_widths = [] for col_idx in range(len(columns)): max_w = get_text_width(columns[col_idx], fontprops) for r in chunk: max_w = max(max_w, get_text_width(r[col_idx], fontprops)) col_widths.append(max_w) col_widths_inches = [w / fig.dpi for w in col_widths] total_w = sum(col_widths_inches) if sum(col_widths_inches) else 1.0 scaled = [w / total_w for w in col_widths_inches] table = ax.table(cellText=chunk, colLabels=columns, loc="center", cellLoc="left") table.auto_set_font_size(False) for cell in table.get_celld().values(): cell.get_text().set_fontproperties(mono_font) cell.PAD = 0.03 for (_, c), cell in table.get_celld().items(): if c < len(scaled): cell.set_width(scaled[c]) table.scale(1, 2.0) figures.append(fig) return figures def _separator_page(pdf: PdfPages, title: str, mono_font): fig, ax = plt.subplots(figsize=(8.27, 11.69)) ax.axis("off") ax.text(0.5, 0.5, title, ha="center", va="center", fontproperties=mono_font, fontsize=18) pdf.savefig(fig) plt.close(fig) def _build_frame_figs(frames: List[Frame], mono_font): figures = [] idx = 0 while idx < len(frames): fig, axs = plt.subplots(3, 2, figsize=(8.27, 11.69)) axs = axs.flatten() for i in range(6): ax = axs[i] ax.clear() ax.axis("off") if idx + i < len(frames): fr = frames[idx + i] ax.set_title(fr.title, fontproperties=mono_font, fontsize=10) ax.set_title(fr.title, fontsize=10) ax.title.set_fontproperties(mono_font) fr.render(ax, mono_font) figures.append(fig) idx += 6 return figures def _build_bigframe_figs(bigframes: List[BigFrame], mono_font): figures = [] idx = 0 while idx < len(bigframes): fig, axs = plt.subplots(2, 1, figsize=(8.27, 11.69)) axs = axs.flatten() if hasattr(axs, "flatten") else [axs] for i in range(2): ax = axs[i] ax.clear() ax.axis("off") if idx + i < len(bigframes): bf = bigframes[idx + i] ax.set_title(bf.title, fontproperties=mono_font, fontsize=11) ax.set_title(bf.title, fontsize=11) ax.title.set_fontproperties(mono_font) bf.render(ax, mono_font) figures.append(fig) idx += 2 return figures def _render_pages(pdf: PdfPages, pages: List[plt.Figure]): for fig in pages: pdf.savefig(fig) plt.close(fig) def create_pdf( df: pd.DataFrame, module_frames: List[Frame], module_bigframes: List[BigFrame], module_pages: List[plt.Figure], pdf_path: str, mono_font, base_dir: str, title: str, ): os.makedirs(os.path.dirname(os.path.abspath(pdf_path)) or ".", exist_ok=True) generated_at = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") # 1) Alle Seiten als Figures sammeln (damit wir total_pages kennen) figs: List[plt.Figure] = [] figs.extend(_build_positions_table_figs(df, base_dir=base_dir, mono_font=mono_font)) figs.extend(_build_frame_figs(module_frames, mono_font=mono_font)) figs.extend(_build_bigframe_figs(module_bigframes, mono_font=mono_font)) figs.extend(module_pages) # bereits fertige Figures aus Modulen total_pages = len(figs) # 2) Speichern mit Header/Footer + Seitenzählung with PdfPages(pdf_path) as pdf: for i, fig in enumerate(figs, start=1): _decorate_figure(fig, mono_font, title=title, generated_at=generated_at, page=i, total_pages=total_pages) pdf.savefig(fig) plt.close(fig) def create_bundle(archive_path: str, csv_path: str, df: pd.DataFrame, base_dir: str, pdf_path: Optional[str] = None): """ Bundle enthält: CSV, optional PDF, und alle Belege (relative Pfade aus 'Beleg' relativ zu base_dir). Ausgabe: .tar.zst (über externes zstd). """ os.makedirs(os.path.dirname(os.path.abspath(archive_path)) or ".", exist_ok=True) # Wir bauen ein temporäres .tar daneben und komprimieren danach. tar_path = archive_path if tar_path.endswith(".zst"): tar_path = tar_path[:-4] # strip ".zst" if not tar_path.endswith(".tar"): tar_path = tar_path + ".tar" # Sammle Belege beleg_paths = [] for p in df["Beleg"].astype(str).tolist(): p = p.strip() if p: beleg_paths.append(p) with tarfile.open(tar_path, "w") as tar: # CSV tar.add(csv_path, arcname=os.path.basename(csv_path)) # PDF optional if pdf_path and os.path.exists(pdf_path): tar.add(pdf_path, arcname=os.path.basename(pdf_path)) # Belege missing = [] for rel in sorted(set(beleg_paths)): abs_path = rel if os.path.isabs(rel) else os.path.join(base_dir, rel) if os.path.exists(abs_path): # arcname: möglichst den relativen Pfad behalten arcname = os.path.basename(rel) if os.path.isabs(rel) else rel tar.add(abs_path, arcname=arcname) else: missing.append(rel) # zstd komprimieren → archive_path # zstd -o subprocess.run(["zstd", "-T0", "-o", archive_path, tar_path], check=True) # tar löschen (zstd bekommt eine Kopie) try: os.remove(tar_path) except Exception: pass if missing: print("\n[WARN] Fehlende Belege (nicht im Bundle):") for m in missing: print(f" - {m}") def main(): parser = argparse.ArgumentParser() parser.add_argument("csv", help="Pfad zur CSV-Datei") parser.add_argument("--title", "-t", help="Titel für PDF-Kopfzeile (optional)") parser.add_argument("--pdf", "-p", help="Pfad zur Ziel-PDF (optional)") parser.add_argument("--bundle", "-b", help="Pfad zum Bundle (.tar.zst), enthält CSV, PDF (falls erzeugt) und Belege (optional)") args = parser.parse_args() csv_path = os.path.abspath(args.csv) base_dir = os.path.dirname(csv_path) or "." title = args.title if args.title else os.path.basename(csv_path) df = parse_csv(csv_path) if df["Datum"].isna().any(): bad = df[df["Datum"].isna()][CSV_COLUMNS] raise ValueError(f"Ungültige Datumsangaben in folgenden Zeilen:\n{bad}") want_pdf = bool(args.pdf) mono_font = _pick_mono_font(size=8) # Module-Registry modules: Dict[str, Module] = { "general": GeneralModule(), # weitere Module später hier registrieren } # Modulzuordnung aus CSV rows_for_module: Dict[str, List[int]] = {} for idx, row in df.iterrows(): for m in row["modules_list"]: rows_for_module.setdefault(m, []).append(idx) results: List[ModuleResult] = [] # General immer results.append(modules["general"].process(df, context={"base_dir": base_dir, "want_pdf": want_pdf, "mono_font": mono_font})) # weitere Module optional for mod_name, indices in rows_for_module.items(): if mod_name == "general": continue mod = modules.get(mod_name) if not mod: print(f"[INFO] Unbekanntes Modul '{mod_name}' – ignoriert (noch nicht registriert).") continue subdf = df.loc[indices].copy() results.append(mod.process(subdf, context={"base_dir": base_dir, "want_pdf": want_pdf, "mono_font": mono_font})) # ---- NEU: Konsolen-Auswertung je Modul print("\n===== Auswertung =====") for r in results: print(r.summary_text) print("") # PDF optional if args.pdf: module_frames: List[Frame] = [] module_bigframes: List[BigFrame] = [] # NEU module_pages: List[plt.Figure] = [] for r in results: module_frames.extend(r.frames) module_bigframes.extend(r.bigframes) # NEU module_pages.extend(r.pages) create_pdf(df, module_frames, module_bigframes, module_pages, args.pdf, mono_font, base_dir=base_dir, title=title) print(f"[OK] PDF geschrieben: {args.pdf}") # Bundle optional (enthält CSV + ggf. PDF + Belege) if args.bundle: create_bundle(args.bundle, csv_path, df, base_dir=base_dir, pdf_path=args.pdf if args.pdf else None) print(f"[OK] Bundle geschrieben: {args.bundle}") if __name__ == "__main__": main()