diff options
Diffstat (limited to 'xembu.py')
| -rw-r--r-- | xembu.py | 489 |
1 files changed, 489 insertions, 0 deletions
diff --git a/xembu.py b/xembu.py new file mode 100644 index 0000000..5c4c701 --- /dev/null +++ b/xembu.py @@ -0,0 +1,489 @@ +#!/usr/bin/env python3 +import argparse +import os +import tarfile +import subprocess +from typing import Dict, List, Optional + +import pandas as pd +import matplotlib.pyplot as plt +from matplotlib.backends.backend_pdf import PdfPages +from matplotlib import font_manager + +from modules.base import Frame, BigFrame, Module, ModuleResult +from modules.general import GeneralModule + +from datetime import datetime + +CSV_COLUMNS = [ + "Datum", + "Nutzer", + "Distributionsgruppe", + "Distributionsflag", + "Positionsbezeichnung", + "Positionswert", + "Modules", + "Parameters", + "Beleg", +] + + +def _pick_mono_font(size: int = 8) -> font_manager.FontProperties: + for fam in ["Inconsolata", "DejaVu Sans Mono", "monospace"]: + try: + return font_manager.FontProperties(family=fam, size=size) + except Exception: + pass + return font_manager.FontProperties(size=size) + +def _decorate_figure(fig, mono_font, title: str, generated_at: str, page: int, total_pages: int): + # Margins: links/rechts 2cm, oben/unten 1cm + margin_lr_cm = 2.0 + margin_tb_cm = 1.0 + + # Zusätzlicher Abstand (Bänder) zwischen Header/Footer und Content + header_gap_cm = 1.3 # mehr Abstand nach unten + footer_gap_cm = 2.0 # mehr Abstand nach oben (2-zeiliger Footer) + + cm_to_in = 1 / 2.54 + margin_lr_in = margin_lr_cm * cm_to_in + margin_tb_in = margin_tb_cm * cm_to_in + header_gap_in = header_gap_cm * cm_to_in + footer_gap_in = footer_gap_cm * cm_to_in + + w_in, h_in = fig.get_size_inches() + + mx = min(0.45, margin_lr_in / w_in) + my = min(0.45, margin_tb_in / h_in) + header_gap = header_gap_in / h_in + footer_gap = footer_gap_in / h_in + + # Content-Bereich: innerhalb der Margins + zusätzlich Platz für Header/Footer + top = 1 - my - header_gap + bottom = my + footer_gap + if top <= bottom: + # Fallback, falls es zu eng wird + top = 1 - my + bottom = my + + fig.subplots_adjust(left=mx, right=1 - mx, top=top, bottom=bottom) + + # Header/Footer Positionen: jeweils an der inneren Kante der Margins + left_x = mx + right_x = 1 - mx + header_y = 1 - my + footer_y = my + + # Kopfzeile + fig.text(left_x, header_y, title, ha="left", va="top", fontproperties=mono_font, fontsize=9) + fig.text(right_x, header_y, generated_at, ha="right", va="top", fontproperties=mono_font, fontsize=9) + + # Fußzeile links (zweizeilig) + footer_left = ( + "xembu - eXtensiblE Multiuser Bookkeeping Utility\n" + "Copyright (C) 2024 Leonard Kugis\n" + "This program comes with ABSOLUTELY NO WARRANTY; for details see LICENSE.txt" + ) + fig.text(left_x, footer_y, footer_left, ha="left", va="bottom", + fontproperties=mono_font, fontsize=7, linespacing=1.1) + + # Fußzeile rechts + fig.text(right_x, footer_y, f"{page} / {total_pages}", ha="right", va="bottom", + fontproperties=mono_font, fontsize=8) + +def _read_csv_flexible(path: str) -> pd.DataFrame: + df = pd.read_csv(path, sep=";", encoding="utf-8", header=0) + if not set(CSV_COLUMNS).issubset(set(df.columns)): + df = pd.read_csv(path, sep=";", encoding="utf-8", header=None, names=CSV_COLUMNS) + return df + + +def parse_value_unit(s: str): + if s is None or (isinstance(s, float) and pd.isna(s)): + return 0.0, "" + txt = str(s).strip() + if not txt: + return 0.0, "" + parts = txt.split() + if len(parts) < 2: + num = txt.replace(",", ".").replace("€", "").strip() + return float(num), "" + unit = parts[-1].strip() + num_str = " ".join(parts[:-1]).strip().replace(",", ".").replace("€", "").strip() + return float(num_str), unit + + +def parse_modules_list(s: str) -> List[str]: + if s is None or (isinstance(s, float) and pd.isna(s)): + return [] + mods = [m.strip() for m in str(s).split(",")] + return [m for m in mods if m] + + +def parse_groups_list(s: str) -> List[str]: + if s is None or (isinstance(s, float) and pd.isna(s)): + return [] + gs = [g.strip() for g in str(s).split(",")] + return [g for g in gs if g] + + +def parse_parameters_list(s: str) -> List[tuple]: + if s is None or (isinstance(s, float) and pd.isna(s)): + return [] + txt = str(s).strip() + if not txt: + return [] + import re + + tuples = [] + for m in re.finditer(r"\(([^()]*)\)", txt): + inner = m.group(1).strip() + if inner == "": + tuples.append(tuple()) + continue + parts = [p.strip() for p in inner.split(",") if p.strip() != ""] + vals = [] + for p in parts: + try: + if "." in p: + vals.append(float(p)) + else: + vals.append(int(p)) + except Exception: + try: + vals.append(float(p)) + except Exception: + vals.append(p) + tuples.append(tuple(vals)) + return tuples + + +def parse_csv(path: str) -> pd.DataFrame: + df = _read_csv_flexible(path) + + df["Datum"] = pd.to_datetime(df["Datum"], format="%Y-%m-%d-%H-%M-%S", errors="coerce") + df["Nutzer"] = df["Nutzer"].astype(str).str.strip() + df["Distributionsflag"] = df["Distributionsflag"].astype(str).str.strip().str.upper() + df["Positionsbezeichnung"] = df["Positionsbezeichnung"].astype(str).str.strip() + + df["dist_groups"] = df["Distributionsgruppe"].apply(parse_groups_list) + df["modules_list"] = df["Modules"].apply(parse_modules_list) + df["params_list"] = df["Parameters"].apply(parse_parameters_list) + + vals_units = df["Positionswert"].apply(parse_value_unit) + df["value"] = vals_units.apply(lambda x: x[0]) + df["unit"] = vals_units.apply(lambda x: x[1]) + + df["Beleg"] = df["Beleg"].where(df["Beleg"].notna(), "") + return df + + +def compute_hash(filepath: str, base_dir: str = ".") -> Optional[str]: + import hashlib + try: + if not filepath: + return None + full_path = os.path.join(base_dir, filepath) + with open(full_path, "rb") as f: + return hashlib.sha1(f.read()).hexdigest() + except Exception: + return None + +def _build_positions_table_figs(df: pd.DataFrame, base_dir: str, mono_font): + figures = [] + + columns = [ + "Datum", "Nutzer", "Distributionsgruppe", "Flag", + "Positionsbezeichnung", "Positionswert", + "Modules", "Parameters", "Beleg", "SHA1", + ] + + table_data = [] + for _, row in df.sort_values("Datum").iterrows(): + sha1 = compute_hash(str(row["Beleg"]), base_dir=base_dir) if row["Beleg"] else None + sha1_fmt = "" + if sha1: + sha1_fmt = sha1[: len(sha1) // 2] + "\n" + sha1[len(sha1) // 2 :] + + groups_str = ", ".join(row["dist_groups"]) if isinstance(row["dist_groups"], list) else str(row["Distributionsgruppe"]) + mods_str = ", ".join(row["modules_list"]) if isinstance(row["modules_list"], list) else str(row["Modules"]) + params_str = str(row["params_list"]) if isinstance(row["params_list"], list) else str(row["Parameters"]) + + table_data.append([ + row["Datum"].strftime("%Y-%m-%d %H:%M:%S") if pd.notna(row["Datum"]) else "INVALID", + row["Nutzer"], + groups_str, + row["Distributionsflag"], + row["Positionsbezeichnung"], + f"{row['value']:.4f} {row['unit']}".strip(), + mods_str, + params_str, + str(row["Beleg"]) if row["Beleg"] else "", + sha1_fmt, + ]) + + chunk_size = 16 + fontprops = font_manager.FontProperties(size=8) + + for start in range(0, len(table_data), chunk_size): + fig, ax = plt.subplots(figsize=(8.27, 11.69)) + ax.axis("off") + chunk = table_data[start:start + chunk_size] + + renderer = fig.canvas.get_renderer() + + def get_text_width(text, prop): + t = plt.text(0, 0, str(text), fontproperties=prop) + bb = t.get_window_extent(renderer=renderer) + t.remove() + return bb.width + + col_widths = [] + for col_idx in range(len(columns)): + max_w = get_text_width(columns[col_idx], fontprops) + for r in chunk: + max_w = max(max_w, get_text_width(r[col_idx], fontprops)) + col_widths.append(max_w) + + col_widths_inches = [w / fig.dpi for w in col_widths] + total_w = sum(col_widths_inches) if sum(col_widths_inches) else 1.0 + scaled = [w / total_w for w in col_widths_inches] + + table = ax.table(cellText=chunk, colLabels=columns, loc="center", cellLoc="left") + table.auto_set_font_size(False) + + for cell in table.get_celld().values(): + cell.get_text().set_fontproperties(mono_font) + cell.PAD = 0.03 + + for (_, c), cell in table.get_celld().items(): + if c < len(scaled): + cell.set_width(scaled[c]) + + table.scale(1, 2.0) + figures.append(fig) + + return figures + + +def _separator_page(pdf: PdfPages, title: str, mono_font): + fig, ax = plt.subplots(figsize=(8.27, 11.69)) + ax.axis("off") + ax.text(0.5, 0.5, title, ha="center", va="center", fontproperties=mono_font, fontsize=18) + pdf.savefig(fig) + plt.close(fig) + +def _build_frame_figs(frames: List[Frame], mono_font): + figures = [] + idx = 0 + while idx < len(frames): + fig, axs = plt.subplots(3, 2, figsize=(8.27, 11.69)) + axs = axs.flatten() + + for i in range(6): + ax = axs[i] + ax.clear() + ax.axis("off") + if idx + i < len(frames): + fr = frames[idx + i] + ax.set_title(fr.title, fontproperties=mono_font, fontsize=10) + ax.set_title(fr.title, fontsize=10) + ax.title.set_fontproperties(mono_font) + + fr.render(ax, mono_font) + + figures.append(fig) + idx += 6 + return figures + +def _build_bigframe_figs(bigframes: List[BigFrame], mono_font): + figures = [] + idx = 0 + while idx < len(bigframes): + fig, axs = plt.subplots(2, 1, figsize=(8.27, 11.69)) + axs = axs.flatten() if hasattr(axs, "flatten") else [axs] + + for i in range(2): + ax = axs[i] + ax.clear() + ax.axis("off") + if idx + i < len(bigframes): + bf = bigframes[idx + i] + ax.set_title(bf.title, fontproperties=mono_font, fontsize=11) + ax.set_title(bf.title, fontsize=11) + ax.title.set_fontproperties(mono_font) + + bf.render(ax, mono_font) + + figures.append(fig) + idx += 2 + return figures + +def _render_pages(pdf: PdfPages, pages: List[plt.Figure]): + for fig in pages: + pdf.savefig(fig) + plt.close(fig) + +def create_pdf( + df: pd.DataFrame, + module_frames: List[Frame], + module_bigframes: List[BigFrame], + module_pages: List[plt.Figure], + pdf_path: str, + mono_font, + base_dir: str, + title: str, +): + os.makedirs(os.path.dirname(os.path.abspath(pdf_path)) or ".", exist_ok=True) + + generated_at = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") + + # 1) Alle Seiten als Figures sammeln (damit wir total_pages kennen) + figs: List[plt.Figure] = [] + figs.extend(_build_positions_table_figs(df, base_dir=base_dir, mono_font=mono_font)) + figs.extend(_build_frame_figs(module_frames, mono_font=mono_font)) + figs.extend(_build_bigframe_figs(module_bigframes, mono_font=mono_font)) + figs.extend(module_pages) # bereits fertige Figures aus Modulen + + total_pages = len(figs) + + # 2) Speichern mit Header/Footer + Seitenzählung + with PdfPages(pdf_path) as pdf: + for i, fig in enumerate(figs, start=1): + _decorate_figure(fig, mono_font, title=title, generated_at=generated_at, page=i, total_pages=total_pages) + pdf.savefig(fig) + plt.close(fig) + +def create_bundle(archive_path: str, csv_path: str, df: pd.DataFrame, base_dir: str, pdf_path: Optional[str] = None): + """ + Bundle enthält: CSV, optional PDF, und alle Belege (relative Pfade aus 'Beleg' relativ zu base_dir). + Ausgabe: .tar.zst (über externes zstd). + """ + os.makedirs(os.path.dirname(os.path.abspath(archive_path)) or ".", exist_ok=True) + + # Wir bauen ein temporäres .tar daneben und komprimieren danach. + tar_path = archive_path + if tar_path.endswith(".zst"): + tar_path = tar_path[:-4] # strip ".zst" + if not tar_path.endswith(".tar"): + tar_path = tar_path + ".tar" + + # Sammle Belege + beleg_paths = [] + for p in df["Beleg"].astype(str).tolist(): + p = p.strip() + if p: + beleg_paths.append(p) + + with tarfile.open(tar_path, "w") as tar: + # CSV + tar.add(csv_path, arcname=os.path.basename(csv_path)) + + # PDF optional + if pdf_path and os.path.exists(pdf_path): + tar.add(pdf_path, arcname=os.path.basename(pdf_path)) + + # Belege + missing = [] + for rel in sorted(set(beleg_paths)): + abs_path = rel if os.path.isabs(rel) else os.path.join(base_dir, rel) + if os.path.exists(abs_path): + # arcname: möglichst den relativen Pfad behalten + arcname = os.path.basename(rel) if os.path.isabs(rel) else rel + tar.add(abs_path, arcname=arcname) + else: + missing.append(rel) + + # zstd komprimieren → archive_path + # zstd -o <archive> <tar> + subprocess.run(["zstd", "-T0", "-o", archive_path, tar_path], check=True) + + # tar löschen (zstd bekommt eine Kopie) + try: + os.remove(tar_path) + except Exception: + pass + + if missing: + print("\n[WARN] Fehlende Belege (nicht im Bundle):") + for m in missing: + print(f" - {m}") + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("csv", help="Pfad zur CSV-Datei") + parser.add_argument("--title", "-t", help="Titel für PDF-Kopfzeile (optional)") + parser.add_argument("--pdf", "-p", help="Pfad zur Ziel-PDF (optional)") + parser.add_argument("--bundle", "-b", help="Pfad zum Bundle (.tar.zst), enthält CSV, PDF (falls erzeugt) und Belege (optional)") + args = parser.parse_args() + + csv_path = os.path.abspath(args.csv) + base_dir = os.path.dirname(csv_path) or "." + + title = args.title if args.title else os.path.basename(csv_path) + + df = parse_csv(csv_path) + if df["Datum"].isna().any(): + bad = df[df["Datum"].isna()][CSV_COLUMNS] + raise ValueError(f"Ungültige Datumsangaben in folgenden Zeilen:\n{bad}") + + want_pdf = bool(args.pdf) + mono_font = _pick_mono_font(size=8) + + # Module-Registry + modules: Dict[str, Module] = { + "general": GeneralModule(), + # weitere Module später hier registrieren + } + + # Modulzuordnung aus CSV + rows_for_module: Dict[str, List[int]] = {} + for idx, row in df.iterrows(): + for m in row["modules_list"]: + rows_for_module.setdefault(m, []).append(idx) + + results: List[ModuleResult] = [] + + # General immer + results.append(modules["general"].process(df, context={"base_dir": base_dir, "want_pdf": want_pdf, "mono_font": mono_font})) + + # weitere Module optional + for mod_name, indices in rows_for_module.items(): + if mod_name == "general": + continue + mod = modules.get(mod_name) + if not mod: + print(f"[INFO] Unbekanntes Modul '{mod_name}' – ignoriert (noch nicht registriert).") + continue + subdf = df.loc[indices].copy() + results.append(mod.process(subdf, context={"base_dir": base_dir, "want_pdf": want_pdf, "mono_font": mono_font})) + + # ---- NEU: Konsolen-Auswertung je Modul + print("\n===== Auswertung =====") + for r in results: + print(r.summary_text) + print("") + + # PDF optional + if args.pdf: + module_frames: List[Frame] = [] + module_bigframes: List[BigFrame] = [] # NEU + module_pages: List[plt.Figure] = [] + for r in results: + module_frames.extend(r.frames) + module_bigframes.extend(r.bigframes) # NEU + module_pages.extend(r.pages) + + create_pdf(df, module_frames, module_bigframes, module_pages, args.pdf, mono_font, base_dir=base_dir, title=title) + print(f"[OK] PDF geschrieben: {args.pdf}") + + # Bundle optional (enthält CSV + ggf. PDF + Belege) + if args.bundle: + create_bundle(args.bundle, csv_path, df, base_dir=base_dir, pdf_path=args.pdf if args.pdf else None) + print(f"[OK] Bundle geschrieben: {args.bundle}") + + +if __name__ == "__main__": + main() + |
