from typing import Any, Dict, List, Tuple import pandas as pd import matplotlib.pyplot as plt from matplotlib.axes import Axes from matplotlib.font_manager import FontProperties import numpy as np import matplotlib.dates as mdates from dataclasses import dataclass from typing import Optional from .base import Frame, BigFrame, ModuleResult MONEY_UNITS = {"€", "eur", "EUR", "euro", "EURO"} def _is_money_unit(u: str) -> bool: return str(u).strip() in MONEY_UNITS def compute_group_distribution(df: pd.DataFrame): """ Liefert: group_summary: dict group -> info per_person: DataFrame columns [person, contributed, share, balance] per_group_person: DataFrame detail columns [group, person, contributed, usage, share, balance] """ # Explode Gruppen work = df.copy() work = work.explode("dist_groups") work["group"] = work["dist_groups"].fillna("").astype(str).str.strip() work = work[work["group"] != ""] # C/U Normalisierung work["flag"] = work["Distributionsflag"].astype(str).str.strip().str.upper() work["person"] = work["Nutzer"].astype(str).str.strip() # Contributions (Geld) contrib = work[work["flag"] == "C"].copy() if len(contrib) > 0: bad_units = contrib[~contrib["unit"].apply(_is_money_unit)] if len(bad_units) > 0: raise ValueError( "Contribution (C) muss Geld-Einheit haben (z.B. € / EUR). " f"Problemzeilen:\n{bad_units[['Datum','Nutzer','group','Positionsbezeichnung','Positionswert','unit']]}" ) # Usage (Beliebige Einheit, pro Gruppe sollte es sinnvoll einheitlich sein) usage = work[work["flag"] == "U"].copy() # Summen contrib_by_gp = contrib.groupby(["group", "person"])["value"].sum().rename("contributed").reset_index() contrib_tot = contrib.groupby("group")["value"].sum().rename("total_contrib").reset_index() usage_by_gp = usage.groupby(["group", "person"])["value"].sum().rename("usage").reset_index() usage_tot = usage.groupby("group")["value"].sum().rename("total_usage").reset_index() usage_unit = usage.groupby("group")["unit"].agg(lambda s: s.dropna().astype(str).unique().tolist()).reset_index() usage_unit = usage_unit.rename(columns={"unit": "usage_units"}) participants = work.groupby("group")["person"].agg(lambda s: sorted(set(s.tolist()))).reset_index() participants = participants.rename(columns={"person": "participants"}) # group_summary summary = ( participants.merge(contrib_tot, on="group", how="left") .merge(usage_tot, on="group", how="left") .merge(usage_unit, on="group", how="left") ) summary["total_contrib"] = summary["total_contrib"].fillna(0.0) summary["total_usage"] = summary["total_usage"].fillna(0.0) summary["has_usage"] = summary["total_usage"].apply(lambda x: x > 0) summary["mode"] = summary.apply(lambda r: "usage" if r["has_usage"] else "equal", axis=1) # Detail pro (group, person) detail = ( pd.DataFrame({"group": work["group"].unique()}) .assign(key=1) .merge(pd.DataFrame({"person": work["person"].unique()}).assign(key=1), on="key") .drop(columns=["key"]) ) # Nur relevante Paare, die in der Gruppe vorkommen gp_person = work[["group", "person"]].drop_duplicates() detail = detail.merge(gp_person, on=["group", "person"], how="inner") detail = detail.merge(contrib_by_gp, on=["group", "person"], how="left").merge(usage_by_gp, on=["group", "person"], how="left") detail["contributed"] = detail["contributed"].fillna(0.0) detail["usage"] = detail["usage"].fillna(0.0) # Shares berechnen pro Gruppe shares = [] for _, row in summary.iterrows(): g = row["group"] total_c = float(row["total_contrib"] or 0.0) parts = row["participants"] or [] n = len(parts) if parts else 0 g_detail = detail[detail["group"] == g].copy() # usage-mode, sobald es irgendeine U-Position gibt (auch wenn total_usage==0 → fallback) g_has_any_u = (usage["group"] == g).any() if g_has_any_u: total_u = float(g_detail["usage"].sum()) if total_u > 0: g_detail["share"] = g_detail["usage"] / total_u * total_c mode = "usage" else: # fallback: gleichmäßig unter Teilnehmern der Gruppe g_detail["share"] = (total_c / n) if n else 0.0 mode = "equal(fallback)" else: g_detail["share"] = (total_c / n) if n else 0.0 mode = "equal" g_detail["mode"] = mode shares.append(g_detail[["group", "person", "share", "mode"]]) shares_df = pd.concat(shares, ignore_index=True) if shares else pd.DataFrame(columns=["group","person","share","mode"]) detail = detail.merge(shares_df, on=["group", "person"], how="left") detail["share"] = detail["share"].fillna(0.0) detail["balance"] = detail["contributed"] - detail["share"] # per_person totals per_person = detail.groupby("person")[["contributed", "share", "balance"]].sum().reset_index() per_person = per_person.sort_values("person") # summary erweitern # "Sobald es eine Position mit U gibt" zählt, auch wenn total_usage==0 (fallback) has_any_u = usage.groupby("group").size().rename("u_count").reset_index() summary = summary.merge(has_any_u, on="group", how="left") summary["u_count"] = summary["u_count"].fillna(0).astype(int) summary["mode"] = summary["u_count"].apply(lambda c: "usage" if c > 0 else "equal") return summary, per_person, detail @dataclass class GroupTimeSeries: group: str times: pd.DatetimeIndex participants: List[str] usage_units: List[str] xlim_start: pd.Timestamp xlim_end: pd.Timestamp contrib_cum: Dict[str, pd.Series] # € kumulativ usage_cum: Dict[str, pd.Series] # unit kumulativ (z.B. km, stk) share_cum: Dict[str, pd.Series] # € kumulativ (Anteil) ratio: Dict[str, pd.Series] # Anteil/Ausgelegt def _auto_time_limits(tmin: pd.Timestamp, tmax: pd.Timestamp) -> tuple[pd.Timestamp, pd.Timestamp]: # +/- 5% Intervall, bei 0 Intervall fallback 30 Minuten dt = tmax - tmin if dt <= pd.Timedelta(0): margin = pd.Timedelta(minutes=30) else: margin = dt * 0.05 return (tmin - margin, tmax + margin) def _prepare_group_timeseries(df: pd.DataFrame, group: str) -> Optional[GroupTimeSeries]: # explode Gruppen und filtere work = df.copy().explode("dist_groups") work["group"] = work["dist_groups"].fillna("").astype(str).str.strip() work = work[work["group"] == group].copy() work = work[pd.notna(work["Datum"])] if work.empty: return None work["person"] = work["Nutzer"].astype(str).str.strip() work["flag"] = work["Distributionsflag"].astype(str).str.strip().str.upper() participants = sorted(work["person"].unique().tolist()) # timeline: alle Zeitpunkte der Gruppe (unique, sortiert) times = pd.DatetimeIndex(sorted(work["Datum"].unique())) tmin, tmax = times.min(), times.max() x0, x1 = _auto_time_limits(tmin, tmax) times = times.union(pd.DatetimeIndex([x0, x1])).sort_values() # usage units (kann leer sein, oder mehrere – wir zeigen dann z.B. "km/stk") usage_units = sorted( work.loc[work["flag"] == "U", "unit"] .dropna() .astype(str) .str.strip() .unique() .tolist() ) # pro Person: Beiträge (C) und Nutzung (U) als kumulatives step-series auf timeline contrib_cum: Dict[str, pd.Series] = {} usage_cum: Dict[str, pd.Series] = {} for p in participants: c = work[(work["person"] == p) & (work["flag"] == "C")].copy() u = work[(work["person"] == p) & (work["flag"] == "U")].copy() # Beiträge: nach Datum aggregieren, reindex auf timeline, kumulieren c_by_t = c.groupby("Datum")["value"].sum() if not c.empty else pd.Series(dtype=float) c_by_t = c_by_t.reindex(times, fill_value=0.0) contrib_cum[p] = c_by_t.cumsum() # Nutzung: nach Datum aggregieren, reindex auf timeline, kumulieren u_by_t = u.groupby("Datum")["value"].sum() if not u.empty else pd.Series(dtype=float) u_by_t = u_by_t.reindex(times, fill_value=0.0) usage_cum[p] = u_by_t.cumsum() # share über Zeit: kumulative total contributions verteilt total_contrib = sum((contrib_cum[p] for p in participants), start=pd.Series(0.0, index=times)) total_usage = sum((usage_cum[p] for p in participants), start=pd.Series(0.0, index=times)) has_any_u = (work["flag"] == "U").any() n = len(participants) if participants else 1 share_cum: Dict[str, pd.Series] = {} if has_any_u: # usage-mode sobald U existiert; solange total_usage==0 => equal fallback for p in participants: # share = total_contrib * usage_p / total_usage, sonst total_contrib/n usage_p = usage_cum[p] with np.errstate(divide="ignore", invalid="ignore"): share_usage = total_contrib * (usage_p / total_usage.replace(0.0, np.nan)) share_equal = total_contrib / float(n) share = share_usage.where(total_usage > 0, share_equal) share_cum[p] = share.fillna(0.0) else: # equal-mode immer equal = total_contrib / float(n) for p in participants: share_cum[p] = equal ratio: Dict[str, pd.Series] = {} for p in participants: denom = contrib_cum[p].astype(float) r = share_cum[p].astype(float) / denom.where(denom > 0, np.nan) ratio[p] = r.fillna(0.0) return GroupTimeSeries( group=group, times=times, participants=participants, usage_units=usage_units, xlim_start=x0, xlim_end=x1, contrib_cum=contrib_cum, usage_cum=usage_cum, share_cum=share_cum, ratio=ratio, ) @dataclass class GroupChartBigFrame(BigFrame): """ kind: - 'usage_cum' - 'contrib_cum' - 'share_cum' - 'ratio' """ gts: GroupTimeSeries kind: str def render(self, ax: Axes, mono_font: FontProperties) -> None: ax.axis("on") locator = mdates.AutoDateLocator(minticks=3, maxticks=7) formatter = mdates.ConciseDateFormatter(locator) ax.xaxis.set_major_locator(locator) ax.xaxis.set_major_formatter(formatter) ax.xaxis.get_offset_text().set_visible(False) # <-- "2025-Dec" weg ax.set_xlim(self.gts.xlim_start, self.gts.xlim_end) if self.kind == "usage_cum": series_map = self.gts.usage_cum unit = "/".join(self.gts.usage_units) if self.gts.usage_units else "" ax.set_ylabel(f"Verbrauch kumulativ {unit}".strip(), fontproperties=mono_font) elif self.kind == "contrib_cum": series_map = self.gts.contrib_cum ax.set_ylabel("Contributions kumulativ €", fontproperties=mono_font) elif self.kind == "share_cum": series_map = self.gts.share_cum ax.set_ylabel("Anteil kumulativ €", fontproperties=mono_font) elif self.kind == "ratio": series_map = self.gts.ratio ax.set_ylabel("Anteil / Ausgelegt", fontproperties=mono_font) ax.set_yscale("log") # <-- LOG else: raise ValueError(f"Unknown kind: {self.kind}") # Plot + Sammeln für robuste y-Limits all_vals = [] min_ratio = 1e-3 # „quasi 0“ für log, damit Kurven am Anfang nicht "mittendrin" starten for p in self.gts.participants: y = series_map[p].copy() if self.kind == "ratio": # NaN/0/Inf behandeln, damit die Kurve von Anfang an existiert y = y.replace([np.inf, -np.inf], np.nan) y = y.fillna(min_ratio) y = y.clip(lower=min_ratio) else: y = y.replace([np.inf, -np.inf], np.nan).fillna(0.0) # Steps für kumulative Kurven ist meist sauberer ax.plot(self.gts.times, y.values, label=p, linewidth=1, drawstyle="steps-post") v = y.values v = v[np.isfinite(v)] if v.size: all_vals.append(v) # y-Limits so setzen, dass wirklich ALLE Werte sichtbar sind if all_vals: vv = np.concatenate(all_vals) if self.kind in ("usage_cum", "contrib_cum", "share_cum"): vmax = float(np.nanmax(vv)) if vv.size else 0.0 if vmax <= 0: ax.set_ylim(0, 1) else: ax.set_ylim(0, vmax * 1.08) # kleiner Puffer elif self.kind == "ratio": vpos = vv[vv > 0] if vpos.size: vmin = float(np.nanmin(vpos)) vmax = float(np.nanmax(vpos)) ax.set_ylim(vmin / 1.5, vmax * 1.5) # log: multiplicative padding ax.grid(True, alpha=0.2) leg = ax.legend(prop=mono_font, fontsize=7, loc="best", ncols=2) if leg: for t in leg.get_texts(): t.set_fontproperties(mono_font) # Tick-Fonts monospace for tick in ax.get_xticklabels() + ax.get_yticklabels(): tick.set_fontproperties(mono_font) @dataclass class TextFrame(Frame): text: str def render(self, ax: Axes, mono_font: FontProperties) -> None: ax.text(0, 1, self.text, va="top", ha="left", fontproperties=mono_font) @dataclass class PlotBigFrame(BigFrame): per_person: pd.DataFrame # erwartet Spalten: person, contributed, share def render(self, ax: Axes, mono_font: FontProperties) -> None: # Axes ist schon da, wir zeichnen direkt hinein ax.axis("on") plot_df = self.per_person.set_index("person")[["contributed", "share"]] plot_df.plot.bar(ax=ax) ax.tick_params(axis="x", rotation=0) leg = ax.legend(prop=mono_font) if leg: for t in leg.get_texts(): t.set_fontproperties(mono_font) for tick in ax.get_xticklabels() + ax.get_yticklabels(): tick.set_fontproperties(mono_font) ax.xaxis.label.set_fontproperties(mono_font) ax.yaxis.label.set_fontproperties(mono_font) class GeneralModule: name = "general" def process(self, df: pd.DataFrame, context: Dict[str, Any]) -> ModuleResult: want_pdf = bool(context.get("want_pdf", True)) mono_font = context.get("mono_font") or FontProperties(family="DejaVu Sans Mono", size=8) group_summary, per_person, detail = compute_group_distribution(df) balance = {r["person"]: float(r["balance"]) for _, r in per_person.iterrows()} payments = self._minimize_payments(balance) # ---- NEU: Textauswertung für Konsole summary_lines = [] summary_lines.append("General – Verteilung über Distributionsgruppen") summary_lines.append("") summary_lines.append("Gruppen:") for _, r in group_summary.sort_values("group").iterrows(): g = r["group"] total_c = float(r.get("total_contrib", 0.0)) u_count = int(r.get("u_count", 0)) mode = "usage" if u_count > 0 else "equal" participants = r.get("participants", []) or [] summary_lines.append(f" - {g}: {total_c:.2f} €; mode={mode}; teilnehmer={len(participants)}") summary_lines.append("") summary_lines.append("Personen (Summe über alle Gruppen):") for _, r in per_person.sort_values("person").iterrows(): summary_lines.append( f" - {r['person']}: ausgelegt={r['contributed']:.2f} €; anteil={r['share']:.2f} €; saldo={r['balance']:.2f} €" ) summary_lines.append("") summary_lines.append("Ausgleich (minimiert):") if payments: for p, r, a in payments: summary_lines.append(f" - {p} → {r}: {a:.2f} €") else: summary_lines.append(" (keine Zahlungen nötig)") summary_text = "\n".join(summary_lines) frames: List[Frame] = [] bigframes: List[BigFrame] = [] pages: List[plt.Figure] = [] if want_pdf: frames.extend(self._make_frames(group_summary, per_person, payments)) # BigFrame: Gesamt-Balkenplot bleibt (wie vorher) bigframes.append( PlotBigFrame( title="General – Ausgelegt vs Anteil (Summe über Gruppen)", per_person=per_person.copy(), ) ) # NEU: pro Distributionsgruppe 4 BigFrame-Charts for g in sorted(group_summary["group"].unique().tolist()): gts = _prepare_group_timeseries(df, g) if not gts: continue bigframes.append(GroupChartBigFrame( title=f"{g} – Kumulativer Verbrauch pro Person", gts=gts, kind="usage_cum", )) bigframes.append(GroupChartBigFrame( title=f"{g} – Kumulative Contributions pro Person", gts=gts, kind="contrib_cum", )) bigframes.append(GroupChartBigFrame( title=f"{g} – Anteil pro Person (zeitlicher Verlauf)", gts=gts, kind="share_cum", )) bigframes.append(GroupChartBigFrame( title=f"{g} – Verhältnis Anteil/Ausgelegt (zeitlicher Verlauf)", gts=gts, kind="ratio", )) # Pages: nur noch Detailseiten, keine Balkenplot-Seite mehr pages.extend(self._make_pages(group_summary, per_person, detail, mono_font)) return ModuleResult(summary_text=summary_text, frames=frames, bigframes=bigframes, pages=pages) def _minimize_payments(self, balance: Dict[str, float]): receivers = [] payers = [] for p, amt in balance.items(): a = round(float(amt), 2) if a > 0: receivers.append([p, a]) elif a < 0: payers.append([p, -a]) out = [] i = j = 0 while i < len(payers) and j < len(receivers): payer, avail = payers[i] recv, need = receivers[j] pay = min(avail, need) out.append((payer, recv, pay)) payers[i][1] -= pay receivers[j][1] -= pay if round(payers[i][1], 2) == 0: i += 1 if round(receivers[j][1], 2) == 0: j += 1 return out def _make_frames(self, group_summary: pd.DataFrame, per_person: pd.DataFrame, payments: List[Tuple[str,str,float]]) -> List[Frame]: # Frame 1: Gruppen-Übersicht lines = ["Gruppenübersicht:"] for _, r in group_summary.sort_values("group").iterrows(): g = r["group"] total_c = float(r.get("total_contrib", 0.0)) u_count = int(r.get("u_count", 0)) parts = r.get("participants", []) mode = "usage" if u_count > 0 else "equal" lines.append(f"- {g}: {total_c:.2f} €; mode={mode}; teilnehmer={len(parts)}") f1 = TextFrame(title="General: Gruppen", text="\n".join(lines)) # Frame 2: Personen-Totale lines = ["Personen (Summe über alle Gruppen):", "Person | contributed | share | balance"] for _, r in per_person.iterrows(): lines.append(f"{r['person']}: {r['contributed']:.2f} €; {r['share']:.2f} €; {r['balance']:.2f} €") f2 = TextFrame(title="General: Personen", text="\n".join(lines)) # Frame 3: Ausgleich lines = ["Ausgleich (minimiert):"] if payments: for p, r, a in payments: lines.append(f"{p} → {r}: {a:.2f} €") else: lines.append("(keine Zahlungen nötig)") f3 = TextFrame(title="General: Ausgleich", text="\n".join(lines)) return [f1, f2, f3] def _make_pages(self, group_summary, per_person, detail, mono_font) -> List[plt.Figure]: pages: List[plt.Figure] = [] # Textseiten: pro Gruppe Detail (ggf. mehrere) # Wir machen je Gruppe eine Seite, wenn es nicht zu viele sind for g in sorted(detail["group"].unique().tolist()): gdet = detail[detail["group"] == g].sort_values("person") total_c = float(group_summary[group_summary["group"] == g]["total_contrib"].iloc[0]) if (group_summary["group"] == g).any() else 0.0 u_count = int(group_summary[group_summary["group"] == g]["u_count"].iloc[0]) if (group_summary["group"] == g).any() else 0 mode = "usage" if u_count > 0 else "equal" lines = [ f"Gruppe: {g}", f"Total Contribution: {total_c:.2f} €", f"Mode: {mode}", "", "Person | contributed | usage | share | balance", ] for _, r in gdet.iterrows(): lines.append( f"{r['person']}: {r['contributed']:.2f} €; {r['usage']:.4f}; {r['share']:.2f} €; {r['balance']:.2f} €" ) fig, ax = plt.subplots(figsize=(8.27, 11.69)) ax.axis("off") ax.text(0, 1, "\n".join(lines), va="top", ha="left", fontproperties=mono_font) pages.append(fig) # Optional: Nutzungsverläufe für Gruppen mit unit "km" # (nur wenn U vorhanden und unit in den U-rows km ist) # Dafür brauchen wir zeitliche Daten → aus detail nicht möglich, also direkt aus df wäre besser. # Wenn du willst, ergänze ich das als eigene Seite pro km-Gruppe auf Basis der Original-DF. return pages