#!/usr/bin/env python3 import argparse import os import tarfile import subprocess from typing import Dict, List, Optional import pandas as pd import matplotlib.pyplot as plt from matplotlib.backends.backend_pdf import PdfPages from matplotlib import font_manager from modules.base import Frame, BigFrame, Module, ModuleResult from modules.general import GeneralModule from modules.drug import DrugModule from datetime import datetime import logging import sys CSV_COLUMNS = [ "date", "debitor", "group", "group_flag", "position", "value", "modules", "parameters", "receipt", ] logger = logging.getLogger("xembu") def setup_logging(verbose: int = 0): level = logging.ERROR if verbose == 1: level = logging.WARNING elif verbose == 2: level = logging.INFO elif verbose >= 3: level = logging.DEBUG logger.setLevel(level) handler = logging.StreamHandler(sys.stderr) handler.setLevel(level) fmt = logging.Formatter("[%(levelname)s] %(message)s") handler.setFormatter(fmt) logger.handlers.clear() logger.addHandler(handler) def _pick_mono_font(size: int = 8) -> font_manager.FontProperties: for fam in ["Inconsolata", "DejaVu Sans Mono", "monospace"]: try: return font_manager.FontProperties(family=fam, size=size) except Exception: pass return font_manager.FontProperties(size=size) def _decorate_figure(fig, mono_font, title: str, generated_at: str, page: int, total_pages: int): margin_lr_cm = 2.0 margin_tb_cm = 1.0 header_gap_cm = 1.3 footer_gap_cm = 2.0 cm_to_in = 1 / 2.54 margin_lr_in = margin_lr_cm * cm_to_in margin_tb_in = margin_tb_cm * cm_to_in header_gap_in = header_gap_cm * cm_to_in footer_gap_in = footer_gap_cm * cm_to_in w_in, h_in = fig.get_size_inches() mx = min(0.45, margin_lr_in / w_in) my = min(0.45, margin_tb_in / h_in) header_gap = header_gap_in / h_in footer_gap = footer_gap_in / h_in top = 1 - my - header_gap bottom = my + footer_gap if top <= bottom: top = 1 - my bottom = my fig.subplots_adjust(left=mx, right=1 - mx, top=top, bottom=bottom) left_x = mx right_x = 1 - mx header_y = 1 - my footer_y = my fig.text(left_x, header_y, title, ha="left", va="top", fontproperties=mono_font, fontsize=9) fig.text(right_x, header_y, generated_at, ha="right", va="top", fontproperties=mono_font, fontsize=9) footer_left = ( "xembu - eXtensible Event-based Multiuser Bookkeeping Utility\n" "Copyright (C) 2024 Leonard Kugis\n" "This program comes with ABSOLUTELY NO WARRANTY; for details see LICENSE.txt" ) fig.text(left_x, footer_y, footer_left, ha="left", va="bottom", fontproperties=mono_font, fontsize=7, linespacing=1.1) fig.text(right_x, footer_y, f"{page} / {total_pages}", ha="right", va="bottom", fontproperties=mono_font, fontsize=8) def _read_csv_flexible(path: str) -> pd.DataFrame: df = pd.read_csv(path, sep=";", encoding="utf-8", header=0) if not set(CSV_COLUMNS).issubset(set(df.columns)): df = pd.read_csv(path, sep=";", encoding="utf-8", header=None, names=CSV_COLUMNS) return df def parse_value_unit(s: str): if s is None or (isinstance(s, float) and pd.isna(s)): return 0.0, "" txt = str(s).strip() if not txt: return 0.0, "" parts = txt.split() if len(parts) < 2: num = txt.replace(",", ".").replace("€", "").strip() return float(num), "" unit = parts[-1].strip() num_str = " ".join(parts[:-1]).strip().replace(",", ".").replace("€", "").strip() return float(num_str), unit def parse_modules_list(s: str) -> List[str]: if s is None or (isinstance(s, float) and pd.isna(s)): return [] mods = [m.strip() for m in str(s).split(",")] return [m for m in mods if m] def parse_groups_list(s: str) -> List[str]: if s is None or (isinstance(s, float) and pd.isna(s)): return [] gs = [g.strip() for g in str(s).split(",")] return [g for g in gs if g] def parse_parameters_list(s: str) -> List[tuple]: if s is None or (isinstance(s, float) and pd.isna(s)): return [] txt = str(s).strip() if not txt: return [] import re tuples = [] for m in re.finditer(r"\(([^()]*)\)", txt): inner = m.group(1).strip() if inner == "": tuples.append(tuple()) continue parts = [p.strip() for p in inner.split(",") if p.strip() != ""] vals = [] for p in parts: try: if "." in p: vals.append(float(p)) else: vals.append(int(p)) except Exception: try: vals.append(float(p)) except Exception: vals.append(p) tuples.append(tuple(vals)) return tuples def parse_csv(path: str) -> pd.DataFrame: df = _read_csv_flexible(path) df["date"] = pd.to_datetime(df["date"], format="%Y-%m-%d-%H-%M-%S", errors="coerce") df["debitor"] = df["debitor"].astype(str).str.strip() df["group_flag"] = df["group_flag"].astype(str).str.strip().str.upper() df["position"] = df["position"].astype(str).str.strip() df["dist_groups"] = df["group"].apply(parse_groups_list) df["modules_list"] = df["modules"].apply(parse_modules_list) df["params_list"] = df["parameters"].apply(parse_parameters_list) vals_units = df["value"].apply(parse_value_unit) df["val"] = vals_units.apply(lambda x: x[0]) df["unit"] = vals_units.apply(lambda x: x[1]) df["receipt"] = df["receipt"].where(df["receipt"].notna(), "") return df def compute_hash(filepath: str, base_dir: str = ".") -> Optional[str]: import hashlib try: if not filepath: return None full_path = os.path.join(base_dir, filepath) with open(full_path, "rb") as f: return hashlib.sha1(f.read()).hexdigest() except Exception: return None def _build_positions_table_figs(df: pd.DataFrame, base_dir: str, mono_font): figures = [] columns = [ "Date", "Debitor", "Group", "Flag", "Position", "Value", "Modules", "Parameters", "Receipt", "SHA1", ] table_data = [] for _, row in df.sort_values("date").iterrows(): sha1 = compute_hash(str(row["receipt"]), base_dir=base_dir) if row["receipt"] else None sha1_fmt = "" if sha1: sha1_fmt = sha1[: len(sha1) // 2] + "\n" + sha1[len(sha1) // 2 :] groups_str = ", ".join(row["dist_groups"]) if isinstance(row["dist_groups"], list) else str(row["group"]) mods_str = ", ".join(row["modules_list"]) if isinstance(row["modules_list"], list) else str(row["modules"]) params_str = str(row["params_list"]) if isinstance(row["params_list"], list) else str(row["parameters"]) table_data.append([ row["date"].strftime("%Y-%m-%d %H:%M:%S") if pd.notna(row["date"]) else "INVALID", row["debitor"], groups_str, row["group_flag"], row["position"], f"{row['val']:.4f} {row['unit']}".strip(), mods_str, params_str, str(row["receipt"]) if row["receipt"] else "", sha1_fmt, ]) chunk_size = 16 fontprops = font_manager.FontProperties(size=8) for start in range(0, len(table_data), chunk_size): fig, ax = plt.subplots(figsize=(8.27, 11.69)) ax.axis("off") chunk = table_data[start:start + chunk_size] renderer = fig.canvas.get_renderer() def get_text_width(text, prop): t = plt.text(0, 0, str(text), fontproperties=prop) bb = t.get_window_extent(renderer=renderer) t.remove() return bb.width col_widths = [] for col_idx in range(len(columns)): max_w = get_text_width(columns[col_idx], fontprops) for r in chunk: max_w = max(max_w, get_text_width(r[col_idx], fontprops)) col_widths.append(max_w) col_widths_inches = [w / fig.dpi for w in col_widths] total_w = sum(col_widths_inches) if sum(col_widths_inches) else 1.0 scaled = [w / total_w for w in col_widths_inches] table = ax.table(cellText=chunk, colLabels=columns, loc="center", cellLoc="left") table.auto_set_font_size(False) for cell in table.get_celld().values(): cell.get_text().set_fontproperties(mono_font) cell.PAD = 0.03 for (_, c), cell in table.get_celld().items(): if c < len(scaled): cell.set_width(scaled[c]) table.scale(1, 2.0) figures.append(fig) return figures def _separator_page(pdf: PdfPages, title: str, mono_font): fig, ax = plt.subplots(figsize=(8.27, 11.69)) ax.axis("off") ax.text(0.5, 0.5, title, ha="center", va="center", fontproperties=mono_font, fontsize=18) pdf.savefig(fig) plt.close(fig) def _build_frame_figs(frames: List[Frame], mono_font): figures = [] idx = 0 while idx < len(frames): fig, axs = plt.subplots(3, 2, figsize=(8.27, 11.69)) axs = axs.flatten() for i in range(6): ax = axs[i] ax.clear() ax.axis("off") if idx + i < len(frames): fr = frames[idx + i] ax.set_title(fr.title, fontproperties=mono_font, fontsize=10) ax.set_title(fr.title, fontsize=10) ax.title.set_fontproperties(mono_font) fr.render(ax, mono_font) figures.append(fig) idx += 6 return figures def _build_bigframe_figs(bigframes: List[BigFrame], mono_font): figures = [] idx = 0 while idx < len(bigframes): fig, axs = plt.subplots(2, 1, figsize=(8.27, 11.69)) axs = axs.flatten() if hasattr(axs, "flatten") else [axs] for i in range(2): ax = axs[i] ax.clear() ax.axis("off") if idx + i < len(bigframes): bf = bigframes[idx + i] ax.set_title(bf.title, fontproperties=mono_font, fontsize=11) ax.set_title(bf.title, fontsize=11) ax.title.set_fontproperties(mono_font) bf.render(ax, mono_font) figures.append(fig) idx += 2 return figures def _render_pages(pdf: PdfPages, pages: List[plt.Figure]): for fig in pages: pdf.savefig(fig) plt.close(fig) def create_pdf( df: pd.DataFrame, module_frames: List[Frame], module_bigframes: List[BigFrame], module_pages: List[plt.Figure], pdf_path: str, mono_font, base_dir: str, title: str, ): os.makedirs(os.path.dirname(os.path.abspath(pdf_path)) or ".", exist_ok=True) generated_at = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") figs: List[plt.Figure] = [] figs.extend(_build_positions_table_figs(df, base_dir=base_dir, mono_font=mono_font)) figs.extend(_build_frame_figs(module_frames, mono_font=mono_font)) figs.extend(_build_bigframe_figs(module_bigframes, mono_font=mono_font)) figs.extend(module_pages) total_pages = len(figs) with PdfPages(pdf_path) as pdf: for i, fig in enumerate(figs, start=1): _decorate_figure(fig, mono_font, title=title, generated_at=generated_at, page=i, total_pages=total_pages) pdf.savefig(fig) plt.close(fig) def create_bundle(archive_path: str, csv_path: str, df: pd.DataFrame, base_dir: str, pdf_path: Optional[str] = None): os.makedirs(os.path.dirname(os.path.abspath(archive_path)) or ".", exist_ok=True) tar_path = archive_path if tar_path.endswith(".zst"): tar_path = tar_path[:-4] if not tar_path.endswith(".tar"): tar_path = tar_path + ".tar" beleg_paths = [] for p in df["receipt"].astype(str).tolist(): p = p.strip() if p: beleg_paths.append(p) with tarfile.open(tar_path, "w") as tar: tar.add(csv_path, arcname=os.path.basename(csv_path)) if pdf_path and os.path.exists(pdf_path): tar.add(pdf_path, arcname=os.path.basename(pdf_path)) missing = [] for rel in sorted(set(beleg_paths)): abs_path = rel if os.path.isabs(rel) else os.path.join(base_dir, rel) if os.path.exists(abs_path): arcname = os.path.basename(rel) if os.path.isabs(rel) else rel tar.add(abs_path, arcname=arcname) else: missing.append(rel) subprocess.run(["zstd", "-T0", "-o", archive_path, tar_path], check=True) try: os.remove(tar_path) except Exception: pass if missing: print("\n[WARN] Fehlende Belege (nicht im Bundle):") for m in missing: print(f" - {m}") def main(): parser = argparse.ArgumentParser() parser.add_argument("csv", help="CSV path") parser.add_argument("--title", "-t", help="PDF header title (optional)") parser.add_argument("--pdf", "-p", help="PDF path (optional)") parser.add_argument("--bundle", "-b", help="Path to bundle (.tar.zst), containing CSV, PDF and receipts (optional)") parser.add_argument("-v", "--verbose", action="count", default=0, help="Logging verbosity (-v=warning, -vv=info, -vvv=debug)") args = parser.parse_args() setup_logging(args.verbose) csv_path = os.path.abspath(args.csv) base_dir = os.path.dirname(csv_path) or "." title = args.title if args.title else os.path.basename(csv_path) df = parse_csv(csv_path) if df["date"].isna().any(): bad = df[df["date"].isna()][CSV_COLUMNS] raise ValueError(f"Invalid dates:\n{bad}") want_pdf = bool(args.pdf) mono_font = _pick_mono_font(size=8) modules: Dict[str, Module] = { "general": GeneralModule(), "drug": DrugModule(), } rows_for_module: Dict[str, List[int]] = {} for idx, row in df.iterrows(): for m in row["modules_list"]: rows_for_module.setdefault(m, []).append(idx) results: List[ModuleResult] = [] results.append(modules["general"].process(df, context={"base_dir": base_dir, "want_pdf": want_pdf, "mono_font": mono_font})) for mod_name, indices in rows_for_module.items(): if mod_name == "general": continue mod = modules.get(mod_name) if not mod: logger.warning("Unknown module {} - ignoring".format(mod_name)) continue subdf = df.loc[indices].copy() results.append(mod.process(subdf, context={"base_dir": base_dir, "want_pdf": want_pdf, "mono_font": mono_font})) print("\n===== Auswertung =====") for r in results: print(r.summary_text) print("") if args.pdf: module_frames: List[Frame] = [] module_bigframes: List[BigFrame] = [] module_pages: List[plt.Figure] = [] for r in results: module_frames.extend(r.frames) module_bigframes.extend(r.bigframes) module_pages.extend(r.pages) create_pdf(df, module_frames, module_bigframes, module_pages, args.pdf, mono_font, base_dir=base_dir, title=title) logger.info("PDF written to {}".format(args.pdf)) if args.bundle: create_bundle(args.bundle, csv_path, df, base_dir=base_dir, pdf_path=args.pdf if args.pdf else None) logger.info("Bundle written to {}".format(args.bundle)) if __name__ == "__main__": main()