aboutsummaryrefslogtreecommitdiffstats
path: root/xembu.py
diff options
context:
space:
mode:
Diffstat (limited to 'xembu.py')
-rw-r--r--xembu.py489
1 files changed, 489 insertions, 0 deletions
diff --git a/xembu.py b/xembu.py
new file mode 100644
index 0000000..5c4c701
--- /dev/null
+++ b/xembu.py
@@ -0,0 +1,489 @@
+#!/usr/bin/env python3
+import argparse
+import os
+import tarfile
+import subprocess
+from typing import Dict, List, Optional
+
+import pandas as pd
+import matplotlib.pyplot as plt
+from matplotlib.backends.backend_pdf import PdfPages
+from matplotlib import font_manager
+
+from modules.base import Frame, BigFrame, Module, ModuleResult
+from modules.general import GeneralModule
+
+from datetime import datetime
+
+CSV_COLUMNS = [
+ "Datum",
+ "Nutzer",
+ "Distributionsgruppe",
+ "Distributionsflag",
+ "Positionsbezeichnung",
+ "Positionswert",
+ "Modules",
+ "Parameters",
+ "Beleg",
+]
+
+
+def _pick_mono_font(size: int = 8) -> font_manager.FontProperties:
+ for fam in ["Inconsolata", "DejaVu Sans Mono", "monospace"]:
+ try:
+ return font_manager.FontProperties(family=fam, size=size)
+ except Exception:
+ pass
+ return font_manager.FontProperties(size=size)
+
+def _decorate_figure(fig, mono_font, title: str, generated_at: str, page: int, total_pages: int):
+ # Margins: links/rechts 2cm, oben/unten 1cm
+ margin_lr_cm = 2.0
+ margin_tb_cm = 1.0
+
+ # Zusätzlicher Abstand (Bänder) zwischen Header/Footer und Content
+ header_gap_cm = 1.3 # mehr Abstand nach unten
+ footer_gap_cm = 2.0 # mehr Abstand nach oben (2-zeiliger Footer)
+
+ cm_to_in = 1 / 2.54
+ margin_lr_in = margin_lr_cm * cm_to_in
+ margin_tb_in = margin_tb_cm * cm_to_in
+ header_gap_in = header_gap_cm * cm_to_in
+ footer_gap_in = footer_gap_cm * cm_to_in
+
+ w_in, h_in = fig.get_size_inches()
+
+ mx = min(0.45, margin_lr_in / w_in)
+ my = min(0.45, margin_tb_in / h_in)
+ header_gap = header_gap_in / h_in
+ footer_gap = footer_gap_in / h_in
+
+ # Content-Bereich: innerhalb der Margins + zusätzlich Platz für Header/Footer
+ top = 1 - my - header_gap
+ bottom = my + footer_gap
+ if top <= bottom:
+ # Fallback, falls es zu eng wird
+ top = 1 - my
+ bottom = my
+
+ fig.subplots_adjust(left=mx, right=1 - mx, top=top, bottom=bottom)
+
+ # Header/Footer Positionen: jeweils an der inneren Kante der Margins
+ left_x = mx
+ right_x = 1 - mx
+ header_y = 1 - my
+ footer_y = my
+
+ # Kopfzeile
+ fig.text(left_x, header_y, title, ha="left", va="top", fontproperties=mono_font, fontsize=9)
+ fig.text(right_x, header_y, generated_at, ha="right", va="top", fontproperties=mono_font, fontsize=9)
+
+ # Fußzeile links (zweizeilig)
+ footer_left = (
+ "xembu - eXtensiblE Multiuser Bookkeeping Utility\n"
+ "Copyright (C) 2024 Leonard Kugis\n"
+ "This program comes with ABSOLUTELY NO WARRANTY; for details see LICENSE.txt"
+ )
+ fig.text(left_x, footer_y, footer_left, ha="left", va="bottom",
+ fontproperties=mono_font, fontsize=7, linespacing=1.1)
+
+ # Fußzeile rechts
+ fig.text(right_x, footer_y, f"{page} / {total_pages}", ha="right", va="bottom",
+ fontproperties=mono_font, fontsize=8)
+
+def _read_csv_flexible(path: str) -> pd.DataFrame:
+ df = pd.read_csv(path, sep=";", encoding="utf-8", header=0)
+ if not set(CSV_COLUMNS).issubset(set(df.columns)):
+ df = pd.read_csv(path, sep=";", encoding="utf-8", header=None, names=CSV_COLUMNS)
+ return df
+
+
+def parse_value_unit(s: str):
+ if s is None or (isinstance(s, float) and pd.isna(s)):
+ return 0.0, ""
+ txt = str(s).strip()
+ if not txt:
+ return 0.0, ""
+ parts = txt.split()
+ if len(parts) < 2:
+ num = txt.replace(",", ".").replace("€", "").strip()
+ return float(num), ""
+ unit = parts[-1].strip()
+ num_str = " ".join(parts[:-1]).strip().replace(",", ".").replace("€", "").strip()
+ return float(num_str), unit
+
+
+def parse_modules_list(s: str) -> List[str]:
+ if s is None or (isinstance(s, float) and pd.isna(s)):
+ return []
+ mods = [m.strip() for m in str(s).split(",")]
+ return [m for m in mods if m]
+
+
+def parse_groups_list(s: str) -> List[str]:
+ if s is None or (isinstance(s, float) and pd.isna(s)):
+ return []
+ gs = [g.strip() for g in str(s).split(",")]
+ return [g for g in gs if g]
+
+
+def parse_parameters_list(s: str) -> List[tuple]:
+ if s is None or (isinstance(s, float) and pd.isna(s)):
+ return []
+ txt = str(s).strip()
+ if not txt:
+ return []
+ import re
+
+ tuples = []
+ for m in re.finditer(r"\(([^()]*)\)", txt):
+ inner = m.group(1).strip()
+ if inner == "":
+ tuples.append(tuple())
+ continue
+ parts = [p.strip() for p in inner.split(",") if p.strip() != ""]
+ vals = []
+ for p in parts:
+ try:
+ if "." in p:
+ vals.append(float(p))
+ else:
+ vals.append(int(p))
+ except Exception:
+ try:
+ vals.append(float(p))
+ except Exception:
+ vals.append(p)
+ tuples.append(tuple(vals))
+ return tuples
+
+
+def parse_csv(path: str) -> pd.DataFrame:
+ df = _read_csv_flexible(path)
+
+ df["Datum"] = pd.to_datetime(df["Datum"], format="%Y-%m-%d-%H-%M-%S", errors="coerce")
+ df["Nutzer"] = df["Nutzer"].astype(str).str.strip()
+ df["Distributionsflag"] = df["Distributionsflag"].astype(str).str.strip().str.upper()
+ df["Positionsbezeichnung"] = df["Positionsbezeichnung"].astype(str).str.strip()
+
+ df["dist_groups"] = df["Distributionsgruppe"].apply(parse_groups_list)
+ df["modules_list"] = df["Modules"].apply(parse_modules_list)
+ df["params_list"] = df["Parameters"].apply(parse_parameters_list)
+
+ vals_units = df["Positionswert"].apply(parse_value_unit)
+ df["value"] = vals_units.apply(lambda x: x[0])
+ df["unit"] = vals_units.apply(lambda x: x[1])
+
+ df["Beleg"] = df["Beleg"].where(df["Beleg"].notna(), "")
+ return df
+
+
+def compute_hash(filepath: str, base_dir: str = ".") -> Optional[str]:
+ import hashlib
+ try:
+ if not filepath:
+ return None
+ full_path = os.path.join(base_dir, filepath)
+ with open(full_path, "rb") as f:
+ return hashlib.sha1(f.read()).hexdigest()
+ except Exception:
+ return None
+
+def _build_positions_table_figs(df: pd.DataFrame, base_dir: str, mono_font):
+ figures = []
+
+ columns = [
+ "Datum", "Nutzer", "Distributionsgruppe", "Flag",
+ "Positionsbezeichnung", "Positionswert",
+ "Modules", "Parameters", "Beleg", "SHA1",
+ ]
+
+ table_data = []
+ for _, row in df.sort_values("Datum").iterrows():
+ sha1 = compute_hash(str(row["Beleg"]), base_dir=base_dir) if row["Beleg"] else None
+ sha1_fmt = ""
+ if sha1:
+ sha1_fmt = sha1[: len(sha1) // 2] + "\n" + sha1[len(sha1) // 2 :]
+
+ groups_str = ", ".join(row["dist_groups"]) if isinstance(row["dist_groups"], list) else str(row["Distributionsgruppe"])
+ mods_str = ", ".join(row["modules_list"]) if isinstance(row["modules_list"], list) else str(row["Modules"])
+ params_str = str(row["params_list"]) if isinstance(row["params_list"], list) else str(row["Parameters"])
+
+ table_data.append([
+ row["Datum"].strftime("%Y-%m-%d %H:%M:%S") if pd.notna(row["Datum"]) else "INVALID",
+ row["Nutzer"],
+ groups_str,
+ row["Distributionsflag"],
+ row["Positionsbezeichnung"],
+ f"{row['value']:.4f} {row['unit']}".strip(),
+ mods_str,
+ params_str,
+ str(row["Beleg"]) if row["Beleg"] else "",
+ sha1_fmt,
+ ])
+
+ chunk_size = 16
+ fontprops = font_manager.FontProperties(size=8)
+
+ for start in range(0, len(table_data), chunk_size):
+ fig, ax = plt.subplots(figsize=(8.27, 11.69))
+ ax.axis("off")
+ chunk = table_data[start:start + chunk_size]
+
+ renderer = fig.canvas.get_renderer()
+
+ def get_text_width(text, prop):
+ t = plt.text(0, 0, str(text), fontproperties=prop)
+ bb = t.get_window_extent(renderer=renderer)
+ t.remove()
+ return bb.width
+
+ col_widths = []
+ for col_idx in range(len(columns)):
+ max_w = get_text_width(columns[col_idx], fontprops)
+ for r in chunk:
+ max_w = max(max_w, get_text_width(r[col_idx], fontprops))
+ col_widths.append(max_w)
+
+ col_widths_inches = [w / fig.dpi for w in col_widths]
+ total_w = sum(col_widths_inches) if sum(col_widths_inches) else 1.0
+ scaled = [w / total_w for w in col_widths_inches]
+
+ table = ax.table(cellText=chunk, colLabels=columns, loc="center", cellLoc="left")
+ table.auto_set_font_size(False)
+
+ for cell in table.get_celld().values():
+ cell.get_text().set_fontproperties(mono_font)
+ cell.PAD = 0.03
+
+ for (_, c), cell in table.get_celld().items():
+ if c < len(scaled):
+ cell.set_width(scaled[c])
+
+ table.scale(1, 2.0)
+ figures.append(fig)
+
+ return figures
+
+
+def _separator_page(pdf: PdfPages, title: str, mono_font):
+ fig, ax = plt.subplots(figsize=(8.27, 11.69))
+ ax.axis("off")
+ ax.text(0.5, 0.5, title, ha="center", va="center", fontproperties=mono_font, fontsize=18)
+ pdf.savefig(fig)
+ plt.close(fig)
+
+def _build_frame_figs(frames: List[Frame], mono_font):
+ figures = []
+ idx = 0
+ while idx < len(frames):
+ fig, axs = plt.subplots(3, 2, figsize=(8.27, 11.69))
+ axs = axs.flatten()
+
+ for i in range(6):
+ ax = axs[i]
+ ax.clear()
+ ax.axis("off")
+ if idx + i < len(frames):
+ fr = frames[idx + i]
+ ax.set_title(fr.title, fontproperties=mono_font, fontsize=10)
+ ax.set_title(fr.title, fontsize=10)
+ ax.title.set_fontproperties(mono_font)
+
+ fr.render(ax, mono_font)
+
+ figures.append(fig)
+ idx += 6
+ return figures
+
+def _build_bigframe_figs(bigframes: List[BigFrame], mono_font):
+ figures = []
+ idx = 0
+ while idx < len(bigframes):
+ fig, axs = plt.subplots(2, 1, figsize=(8.27, 11.69))
+ axs = axs.flatten() if hasattr(axs, "flatten") else [axs]
+
+ for i in range(2):
+ ax = axs[i]
+ ax.clear()
+ ax.axis("off")
+ if idx + i < len(bigframes):
+ bf = bigframes[idx + i]
+ ax.set_title(bf.title, fontproperties=mono_font, fontsize=11)
+ ax.set_title(bf.title, fontsize=11)
+ ax.title.set_fontproperties(mono_font)
+
+ bf.render(ax, mono_font)
+
+ figures.append(fig)
+ idx += 2
+ return figures
+
+def _render_pages(pdf: PdfPages, pages: List[plt.Figure]):
+ for fig in pages:
+ pdf.savefig(fig)
+ plt.close(fig)
+
+def create_pdf(
+ df: pd.DataFrame,
+ module_frames: List[Frame],
+ module_bigframes: List[BigFrame],
+ module_pages: List[plt.Figure],
+ pdf_path: str,
+ mono_font,
+ base_dir: str,
+ title: str,
+):
+ os.makedirs(os.path.dirname(os.path.abspath(pdf_path)) or ".", exist_ok=True)
+
+ generated_at = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
+
+ # 1) Alle Seiten als Figures sammeln (damit wir total_pages kennen)
+ figs: List[plt.Figure] = []
+ figs.extend(_build_positions_table_figs(df, base_dir=base_dir, mono_font=mono_font))
+ figs.extend(_build_frame_figs(module_frames, mono_font=mono_font))
+ figs.extend(_build_bigframe_figs(module_bigframes, mono_font=mono_font))
+ figs.extend(module_pages) # bereits fertige Figures aus Modulen
+
+ total_pages = len(figs)
+
+ # 2) Speichern mit Header/Footer + Seitenzählung
+ with PdfPages(pdf_path) as pdf:
+ for i, fig in enumerate(figs, start=1):
+ _decorate_figure(fig, mono_font, title=title, generated_at=generated_at, page=i, total_pages=total_pages)
+ pdf.savefig(fig)
+ plt.close(fig)
+
+def create_bundle(archive_path: str, csv_path: str, df: pd.DataFrame, base_dir: str, pdf_path: Optional[str] = None):
+ """
+ Bundle enthält: CSV, optional PDF, und alle Belege (relative Pfade aus 'Beleg' relativ zu base_dir).
+ Ausgabe: .tar.zst (über externes zstd).
+ """
+ os.makedirs(os.path.dirname(os.path.abspath(archive_path)) or ".", exist_ok=True)
+
+ # Wir bauen ein temporäres .tar daneben und komprimieren danach.
+ tar_path = archive_path
+ if tar_path.endswith(".zst"):
+ tar_path = tar_path[:-4] # strip ".zst"
+ if not tar_path.endswith(".tar"):
+ tar_path = tar_path + ".tar"
+
+ # Sammle Belege
+ beleg_paths = []
+ for p in df["Beleg"].astype(str).tolist():
+ p = p.strip()
+ if p:
+ beleg_paths.append(p)
+
+ with tarfile.open(tar_path, "w") as tar:
+ # CSV
+ tar.add(csv_path, arcname=os.path.basename(csv_path))
+
+ # PDF optional
+ if pdf_path and os.path.exists(pdf_path):
+ tar.add(pdf_path, arcname=os.path.basename(pdf_path))
+
+ # Belege
+ missing = []
+ for rel in sorted(set(beleg_paths)):
+ abs_path = rel if os.path.isabs(rel) else os.path.join(base_dir, rel)
+ if os.path.exists(abs_path):
+ # arcname: möglichst den relativen Pfad behalten
+ arcname = os.path.basename(rel) if os.path.isabs(rel) else rel
+ tar.add(abs_path, arcname=arcname)
+ else:
+ missing.append(rel)
+
+ # zstd komprimieren → archive_path
+ # zstd -o <archive> <tar>
+ subprocess.run(["zstd", "-T0", "-o", archive_path, tar_path], check=True)
+
+ # tar löschen (zstd bekommt eine Kopie)
+ try:
+ os.remove(tar_path)
+ except Exception:
+ pass
+
+ if missing:
+ print("\n[WARN] Fehlende Belege (nicht im Bundle):")
+ for m in missing:
+ print(f" - {m}")
+
+
+def main():
+ parser = argparse.ArgumentParser()
+ parser.add_argument("csv", help="Pfad zur CSV-Datei")
+ parser.add_argument("--title", "-t", help="Titel für PDF-Kopfzeile (optional)")
+ parser.add_argument("--pdf", "-p", help="Pfad zur Ziel-PDF (optional)")
+ parser.add_argument("--bundle", "-b", help="Pfad zum Bundle (.tar.zst), enthält CSV, PDF (falls erzeugt) und Belege (optional)")
+ args = parser.parse_args()
+
+ csv_path = os.path.abspath(args.csv)
+ base_dir = os.path.dirname(csv_path) or "."
+
+ title = args.title if args.title else os.path.basename(csv_path)
+
+ df = parse_csv(csv_path)
+ if df["Datum"].isna().any():
+ bad = df[df["Datum"].isna()][CSV_COLUMNS]
+ raise ValueError(f"Ungültige Datumsangaben in folgenden Zeilen:\n{bad}")
+
+ want_pdf = bool(args.pdf)
+ mono_font = _pick_mono_font(size=8)
+
+ # Module-Registry
+ modules: Dict[str, Module] = {
+ "general": GeneralModule(),
+ # weitere Module später hier registrieren
+ }
+
+ # Modulzuordnung aus CSV
+ rows_for_module: Dict[str, List[int]] = {}
+ for idx, row in df.iterrows():
+ for m in row["modules_list"]:
+ rows_for_module.setdefault(m, []).append(idx)
+
+ results: List[ModuleResult] = []
+
+ # General immer
+ results.append(modules["general"].process(df, context={"base_dir": base_dir, "want_pdf": want_pdf, "mono_font": mono_font}))
+
+ # weitere Module optional
+ for mod_name, indices in rows_for_module.items():
+ if mod_name == "general":
+ continue
+ mod = modules.get(mod_name)
+ if not mod:
+ print(f"[INFO] Unbekanntes Modul '{mod_name}' – ignoriert (noch nicht registriert).")
+ continue
+ subdf = df.loc[indices].copy()
+ results.append(mod.process(subdf, context={"base_dir": base_dir, "want_pdf": want_pdf, "mono_font": mono_font}))
+
+ # ---- NEU: Konsolen-Auswertung je Modul
+ print("\n===== Auswertung =====")
+ for r in results:
+ print(r.summary_text)
+ print("")
+
+ # PDF optional
+ if args.pdf:
+ module_frames: List[Frame] = []
+ module_bigframes: List[BigFrame] = [] # NEU
+ module_pages: List[plt.Figure] = []
+ for r in results:
+ module_frames.extend(r.frames)
+ module_bigframes.extend(r.bigframes) # NEU
+ module_pages.extend(r.pages)
+
+ create_pdf(df, module_frames, module_bigframes, module_pages, args.pdf, mono_font, base_dir=base_dir, title=title)
+ print(f"[OK] PDF geschrieben: {args.pdf}")
+
+ # Bundle optional (enthält CSV + ggf. PDF + Belege)
+ if args.bundle:
+ create_bundle(args.bundle, csv_path, df, base_dir=base_dir, pdf_path=args.pdf if args.pdf else None)
+ print(f"[OK] Bundle geschrieben: {args.bundle}")
+
+
+if __name__ == "__main__":
+ main()
+