diff options
Diffstat (limited to 'modules/general.py')
| -rw-r--r-- | modules/general.py | 560 |
1 files changed, 560 insertions, 0 deletions
diff --git a/modules/general.py b/modules/general.py new file mode 100644 index 0000000..f7eca38 --- /dev/null +++ b/modules/general.py @@ -0,0 +1,560 @@ +from typing import Any, Dict, List, Tuple + +import pandas as pd +import matplotlib.pyplot as plt +from matplotlib.axes import Axes +from matplotlib.font_manager import FontProperties + +import numpy as np +import matplotlib.dates as mdates +from dataclasses import dataclass +from typing import Optional + +from .base import Frame, BigFrame, ModuleResult + + +MONEY_UNITS = {"€", "eur", "EUR", "euro", "EURO"} + + +def _is_money_unit(u: str) -> bool: + return str(u).strip() in MONEY_UNITS + + +def compute_group_distribution(df: pd.DataFrame): + """ + Liefert: + group_summary: dict group -> info + per_person: DataFrame columns [person, contributed, share, balance] + per_group_person: DataFrame detail columns [group, person, contributed, usage, share, balance] + """ + # Explode Gruppen + work = df.copy() + work = work.explode("dist_groups") + work["group"] = work["dist_groups"].fillna("").astype(str).str.strip() + work = work[work["group"] != ""] + + # C/U Normalisierung + work["flag"] = work["Distributionsflag"].astype(str).str.strip().str.upper() + work["person"] = work["Nutzer"].astype(str).str.strip() + + # Contributions (Geld) + contrib = work[work["flag"] == "C"].copy() + if len(contrib) > 0: + bad_units = contrib[~contrib["unit"].apply(_is_money_unit)] + if len(bad_units) > 0: + raise ValueError( + "Contribution (C) muss Geld-Einheit haben (z.B. € / EUR). " + f"Problemzeilen:\n{bad_units[['Datum','Nutzer','group','Positionsbezeichnung','Positionswert','unit']]}" + ) + + # Usage (Beliebige Einheit, pro Gruppe sollte es sinnvoll einheitlich sein) + usage = work[work["flag"] == "U"].copy() + + # Summen + contrib_by_gp = contrib.groupby(["group", "person"])["value"].sum().rename("contributed").reset_index() + contrib_tot = contrib.groupby("group")["value"].sum().rename("total_contrib").reset_index() + + usage_by_gp = usage.groupby(["group", "person"])["value"].sum().rename("usage").reset_index() + usage_tot = usage.groupby("group")["value"].sum().rename("total_usage").reset_index() + usage_unit = usage.groupby("group")["unit"].agg(lambda s: s.dropna().astype(str).unique().tolist()).reset_index() + usage_unit = usage_unit.rename(columns={"unit": "usage_units"}) + + participants = work.groupby("group")["person"].agg(lambda s: sorted(set(s.tolist()))).reset_index() + participants = participants.rename(columns={"person": "participants"}) + + # group_summary + summary = ( + participants.merge(contrib_tot, on="group", how="left") + .merge(usage_tot, on="group", how="left") + .merge(usage_unit, on="group", how="left") + ) + summary["total_contrib"] = summary["total_contrib"].fillna(0.0) + summary["total_usage"] = summary["total_usage"].fillna(0.0) + summary["has_usage"] = summary["total_usage"].apply(lambda x: x > 0) + summary["mode"] = summary.apply(lambda r: "usage" if r["has_usage"] else "equal", axis=1) + + # Detail pro (group, person) + detail = ( + pd.DataFrame({"group": work["group"].unique()}) + .assign(key=1) + .merge(pd.DataFrame({"person": work["person"].unique()}).assign(key=1), on="key") + .drop(columns=["key"]) + ) + # Nur relevante Paare, die in der Gruppe vorkommen + gp_person = work[["group", "person"]].drop_duplicates() + detail = detail.merge(gp_person, on=["group", "person"], how="inner") + + detail = detail.merge(contrib_by_gp, on=["group", "person"], how="left").merge(usage_by_gp, on=["group", "person"], how="left") + detail["contributed"] = detail["contributed"].fillna(0.0) + detail["usage"] = detail["usage"].fillna(0.0) + + # Shares berechnen pro Gruppe + shares = [] + for _, row in summary.iterrows(): + g = row["group"] + total_c = float(row["total_contrib"] or 0.0) + parts = row["participants"] or [] + n = len(parts) if parts else 0 + + g_detail = detail[detail["group"] == g].copy() + # usage-mode, sobald es irgendeine U-Position gibt (auch wenn total_usage==0 → fallback) + g_has_any_u = (usage["group"] == g).any() + + if g_has_any_u: + total_u = float(g_detail["usage"].sum()) + if total_u > 0: + g_detail["share"] = g_detail["usage"] / total_u * total_c + mode = "usage" + else: + # fallback: gleichmäßig unter Teilnehmern der Gruppe + g_detail["share"] = (total_c / n) if n else 0.0 + mode = "equal(fallback)" + else: + g_detail["share"] = (total_c / n) if n else 0.0 + mode = "equal" + + g_detail["mode"] = mode + shares.append(g_detail[["group", "person", "share", "mode"]]) + + shares_df = pd.concat(shares, ignore_index=True) if shares else pd.DataFrame(columns=["group","person","share","mode"]) + detail = detail.merge(shares_df, on=["group", "person"], how="left") + detail["share"] = detail["share"].fillna(0.0) + detail["balance"] = detail["contributed"] - detail["share"] + + # per_person totals + per_person = detail.groupby("person")[["contributed", "share", "balance"]].sum().reset_index() + per_person = per_person.sort_values("person") + + # summary erweitern + # "Sobald es eine Position mit U gibt" zählt, auch wenn total_usage==0 (fallback) + has_any_u = usage.groupby("group").size().rename("u_count").reset_index() + summary = summary.merge(has_any_u, on="group", how="left") + summary["u_count"] = summary["u_count"].fillna(0).astype(int) + summary["mode"] = summary["u_count"].apply(lambda c: "usage" if c > 0 else "equal") + + return summary, per_person, detail + +@dataclass +class GroupTimeSeries: + group: str + times: pd.DatetimeIndex + participants: List[str] + usage_units: List[str] + xlim_start: pd.Timestamp + xlim_end: pd.Timestamp + contrib_cum: Dict[str, pd.Series] # € kumulativ + usage_cum: Dict[str, pd.Series] # unit kumulativ (z.B. km, stk) + share_cum: Dict[str, pd.Series] # € kumulativ (Anteil) + ratio: Dict[str, pd.Series] # Anteil/Ausgelegt + + +def _auto_time_limits(tmin: pd.Timestamp, tmax: pd.Timestamp) -> tuple[pd.Timestamp, pd.Timestamp]: + # +/- 5% Intervall, bei 0 Intervall fallback 30 Minuten + dt = tmax - tmin + if dt <= pd.Timedelta(0): + margin = pd.Timedelta(minutes=30) + else: + margin = dt * 0.05 + return (tmin - margin, tmax + margin) + + +def _prepare_group_timeseries(df: pd.DataFrame, group: str) -> Optional[GroupTimeSeries]: + # explode Gruppen und filtere + work = df.copy().explode("dist_groups") + work["group"] = work["dist_groups"].fillna("").astype(str).str.strip() + work = work[work["group"] == group].copy() + + work = work[pd.notna(work["Datum"])] + if work.empty: + return None + + work["person"] = work["Nutzer"].astype(str).str.strip() + work["flag"] = work["Distributionsflag"].astype(str).str.strip().str.upper() + + participants = sorted(work["person"].unique().tolist()) + + # timeline: alle Zeitpunkte der Gruppe (unique, sortiert) + times = pd.DatetimeIndex(sorted(work["Datum"].unique())) + tmin, tmax = times.min(), times.max() + x0, x1 = _auto_time_limits(tmin, tmax) + + times = times.union(pd.DatetimeIndex([x0, x1])).sort_values() + + # usage units (kann leer sein, oder mehrere – wir zeigen dann z.B. "km/stk") + usage_units = sorted( + work.loc[work["flag"] == "U", "unit"] + .dropna() + .astype(str) + .str.strip() + .unique() + .tolist() + ) + + # pro Person: Beiträge (C) und Nutzung (U) als kumulatives step-series auf timeline + contrib_cum: Dict[str, pd.Series] = {} + usage_cum: Dict[str, pd.Series] = {} + + for p in participants: + c = work[(work["person"] == p) & (work["flag"] == "C")].copy() + u = work[(work["person"] == p) & (work["flag"] == "U")].copy() + + # Beiträge: nach Datum aggregieren, reindex auf timeline, kumulieren + c_by_t = c.groupby("Datum")["value"].sum() if not c.empty else pd.Series(dtype=float) + c_by_t = c_by_t.reindex(times, fill_value=0.0) + contrib_cum[p] = c_by_t.cumsum() + + # Nutzung: nach Datum aggregieren, reindex auf timeline, kumulieren + u_by_t = u.groupby("Datum")["value"].sum() if not u.empty else pd.Series(dtype=float) + u_by_t = u_by_t.reindex(times, fill_value=0.0) + usage_cum[p] = u_by_t.cumsum() + + # share über Zeit: kumulative total contributions verteilt + total_contrib = sum((contrib_cum[p] for p in participants), start=pd.Series(0.0, index=times)) + total_usage = sum((usage_cum[p] for p in participants), start=pd.Series(0.0, index=times)) + + has_any_u = (work["flag"] == "U").any() + n = len(participants) if participants else 1 + + share_cum: Dict[str, pd.Series] = {} + if has_any_u: + # usage-mode sobald U existiert; solange total_usage==0 => equal fallback + for p in participants: + # share = total_contrib * usage_p / total_usage, sonst total_contrib/n + usage_p = usage_cum[p] + with np.errstate(divide="ignore", invalid="ignore"): + share_usage = total_contrib * (usage_p / total_usage.replace(0.0, np.nan)) + share_equal = total_contrib / float(n) + share = share_usage.where(total_usage > 0, share_equal) + share_cum[p] = share.fillna(0.0) + else: + # equal-mode immer + equal = total_contrib / float(n) + for p in participants: + share_cum[p] = equal + + ratio: Dict[str, pd.Series] = {} + for p in participants: + denom = contrib_cum[p].astype(float) + r = share_cum[p].astype(float) / denom.where(denom > 0, np.nan) + ratio[p] = r.fillna(0.0) + + return GroupTimeSeries( + group=group, + times=times, + participants=participants, + usage_units=usage_units, + xlim_start=x0, + xlim_end=x1, + contrib_cum=contrib_cum, + usage_cum=usage_cum, + share_cum=share_cum, + ratio=ratio, + ) + + +@dataclass +class GroupChartBigFrame(BigFrame): + """ + kind: + - 'usage_cum' + - 'contrib_cum' + - 'share_cum' + - 'ratio' + """ + gts: GroupTimeSeries + kind: str + + def render(self, ax: Axes, mono_font: FontProperties) -> None: + ax.axis("on") + + locator = mdates.AutoDateLocator(minticks=3, maxticks=7) + formatter = mdates.ConciseDateFormatter(locator) + ax.xaxis.set_major_locator(locator) + ax.xaxis.set_major_formatter(formatter) + ax.xaxis.get_offset_text().set_visible(False) # <-- "2025-Dec" weg + + ax.set_xlim(self.gts.xlim_start, self.gts.xlim_end) + + if self.kind == "usage_cum": + series_map = self.gts.usage_cum + unit = "/".join(self.gts.usage_units) if self.gts.usage_units else "" + ax.set_ylabel(f"Verbrauch kumulativ {unit}".strip(), fontproperties=mono_font) + + elif self.kind == "contrib_cum": + series_map = self.gts.contrib_cum + ax.set_ylabel("Contributions kumulativ €", fontproperties=mono_font) + + elif self.kind == "share_cum": + series_map = self.gts.share_cum + ax.set_ylabel("Anteil kumulativ €", fontproperties=mono_font) + + elif self.kind == "ratio": + series_map = self.gts.ratio + ax.set_ylabel("Anteil / Ausgelegt", fontproperties=mono_font) + ax.set_yscale("log") # <-- LOG + + else: + raise ValueError(f"Unknown kind: {self.kind}") + + # Plot + Sammeln für robuste y-Limits + all_vals = [] + + min_ratio = 1e-3 # „quasi 0“ für log, damit Kurven am Anfang nicht "mittendrin" starten + + for p in self.gts.participants: + y = series_map[p].copy() + + if self.kind == "ratio": + # NaN/0/Inf behandeln, damit die Kurve von Anfang an existiert + y = y.replace([np.inf, -np.inf], np.nan) + y = y.fillna(min_ratio) + y = y.clip(lower=min_ratio) + else: + y = y.replace([np.inf, -np.inf], np.nan).fillna(0.0) + + # Steps für kumulative Kurven ist meist sauberer + ax.plot(self.gts.times, y.values, label=p, linewidth=1, drawstyle="steps-post") + + v = y.values + v = v[np.isfinite(v)] + if v.size: + all_vals.append(v) + + # y-Limits so setzen, dass wirklich ALLE Werte sichtbar sind + if all_vals: + vv = np.concatenate(all_vals) + + if self.kind in ("usage_cum", "contrib_cum", "share_cum"): + vmax = float(np.nanmax(vv)) if vv.size else 0.0 + if vmax <= 0: + ax.set_ylim(0, 1) + else: + ax.set_ylim(0, vmax * 1.08) # kleiner Puffer + + elif self.kind == "ratio": + vpos = vv[vv > 0] + if vpos.size: + vmin = float(np.nanmin(vpos)) + vmax = float(np.nanmax(vpos)) + ax.set_ylim(vmin / 1.5, vmax * 1.5) # log: multiplicative padding + + ax.grid(True, alpha=0.2) + + leg = ax.legend(prop=mono_font, fontsize=7, loc="best", ncols=2) + if leg: + for t in leg.get_texts(): + t.set_fontproperties(mono_font) + + # Tick-Fonts monospace + for tick in ax.get_xticklabels() + ax.get_yticklabels(): + tick.set_fontproperties(mono_font) + +@dataclass +class TextFrame(Frame): + text: str + + def render(self, ax: Axes, mono_font: FontProperties) -> None: + ax.text(0, 1, self.text, va="top", ha="left", fontproperties=mono_font) + +@dataclass +class PlotBigFrame(BigFrame): + per_person: pd.DataFrame # erwartet Spalten: person, contributed, share + + def render(self, ax: Axes, mono_font: FontProperties) -> None: + # Axes ist schon da, wir zeichnen direkt hinein + ax.axis("on") + plot_df = self.per_person.set_index("person")[["contributed", "share"]] + plot_df.plot.bar(ax=ax) + ax.tick_params(axis="x", rotation=0) + leg = ax.legend(prop=mono_font) + if leg: + for t in leg.get_texts(): + t.set_fontproperties(mono_font) + + for tick in ax.get_xticklabels() + ax.get_yticklabels(): + tick.set_fontproperties(mono_font) + + ax.xaxis.label.set_fontproperties(mono_font) + ax.yaxis.label.set_fontproperties(mono_font) + + +class GeneralModule: + name = "general" + + def process(self, df: pd.DataFrame, context: Dict[str, Any]) -> ModuleResult: + want_pdf = bool(context.get("want_pdf", True)) + + mono_font = context.get("mono_font") or FontProperties(family="DejaVu Sans Mono", size=8) + + group_summary, per_person, detail = compute_group_distribution(df) + + balance = {r["person"]: float(r["balance"]) for _, r in per_person.iterrows()} + payments = self._minimize_payments(balance) + + # ---- NEU: Textauswertung für Konsole + summary_lines = [] + summary_lines.append("General – Verteilung über Distributionsgruppen") + summary_lines.append("") + summary_lines.append("Gruppen:") + for _, r in group_summary.sort_values("group").iterrows(): + g = r["group"] + total_c = float(r.get("total_contrib", 0.0)) + u_count = int(r.get("u_count", 0)) + mode = "usage" if u_count > 0 else "equal" + participants = r.get("participants", []) or [] + summary_lines.append(f" - {g}: {total_c:.2f} €; mode={mode}; teilnehmer={len(participants)}") + + summary_lines.append("") + summary_lines.append("Personen (Summe über alle Gruppen):") + for _, r in per_person.sort_values("person").iterrows(): + summary_lines.append( + f" - {r['person']}: ausgelegt={r['contributed']:.2f} €; anteil={r['share']:.2f} €; saldo={r['balance']:.2f} €" + ) + + summary_lines.append("") + summary_lines.append("Ausgleich (minimiert):") + if payments: + for p, r, a in payments: + summary_lines.append(f" - {p} → {r}: {a:.2f} €") + else: + summary_lines.append(" (keine Zahlungen nötig)") + + summary_text = "\n".join(summary_lines) + + frames: List[Frame] = [] + bigframes: List[BigFrame] = [] + pages: List[plt.Figure] = [] + + if want_pdf: + frames.extend(self._make_frames(group_summary, per_person, payments)) + + # BigFrame: Gesamt-Balkenplot bleibt (wie vorher) + bigframes.append( + PlotBigFrame( + title="General – Ausgelegt vs Anteil (Summe über Gruppen)", + per_person=per_person.copy(), + ) + ) + + # NEU: pro Distributionsgruppe 4 BigFrame-Charts + for g in sorted(group_summary["group"].unique().tolist()): + gts = _prepare_group_timeseries(df, g) + if not gts: + continue + + bigframes.append(GroupChartBigFrame( + title=f"{g} – Kumulativer Verbrauch pro Person", + gts=gts, + kind="usage_cum", + )) + bigframes.append(GroupChartBigFrame( + title=f"{g} – Kumulative Contributions pro Person", + gts=gts, + kind="contrib_cum", + )) + bigframes.append(GroupChartBigFrame( + title=f"{g} – Anteil pro Person (zeitlicher Verlauf)", + gts=gts, + kind="share_cum", + )) + bigframes.append(GroupChartBigFrame( + title=f"{g} – Verhältnis Anteil/Ausgelegt (zeitlicher Verlauf)", + gts=gts, + kind="ratio", + )) + + # Pages: nur noch Detailseiten, keine Balkenplot-Seite mehr + pages.extend(self._make_pages(group_summary, per_person, detail, mono_font)) + + return ModuleResult(summary_text=summary_text, frames=frames, bigframes=bigframes, pages=pages) + + def _minimize_payments(self, balance: Dict[str, float]): + receivers = [] + payers = [] + for p, amt in balance.items(): + a = round(float(amt), 2) + if a > 0: + receivers.append([p, a]) + elif a < 0: + payers.append([p, -a]) + + out = [] + i = j = 0 + while i < len(payers) and j < len(receivers): + payer, avail = payers[i] + recv, need = receivers[j] + pay = min(avail, need) + out.append((payer, recv, pay)) + payers[i][1] -= pay + receivers[j][1] -= pay + if round(payers[i][1], 2) == 0: + i += 1 + if round(receivers[j][1], 2) == 0: + j += 1 + return out + + def _make_frames(self, group_summary: pd.DataFrame, per_person: pd.DataFrame, payments: List[Tuple[str,str,float]]) -> List[Frame]: + # Frame 1: Gruppen-Übersicht + lines = ["Gruppenübersicht:"] + for _, r in group_summary.sort_values("group").iterrows(): + g = r["group"] + total_c = float(r.get("total_contrib", 0.0)) + u_count = int(r.get("u_count", 0)) + parts = r.get("participants", []) + mode = "usage" if u_count > 0 else "equal" + lines.append(f"- {g}: {total_c:.2f} €; mode={mode}; teilnehmer={len(parts)}") + + f1 = TextFrame(title="General: Gruppen", text="\n".join(lines)) + + # Frame 2: Personen-Totale + lines = ["Personen (Summe über alle Gruppen):", "Person | contributed | share | balance"] + for _, r in per_person.iterrows(): + lines.append(f"{r['person']}: {r['contributed']:.2f} €; {r['share']:.2f} €; {r['balance']:.2f} €") + f2 = TextFrame(title="General: Personen", text="\n".join(lines)) + + # Frame 3: Ausgleich + lines = ["Ausgleich (minimiert):"] + if payments: + for p, r, a in payments: + lines.append(f"{p} → {r}: {a:.2f} €") + else: + lines.append("(keine Zahlungen nötig)") + f3 = TextFrame(title="General: Ausgleich", text="\n".join(lines)) + + return [f1, f2, f3] + + def _make_pages(self, group_summary, per_person, detail, mono_font) -> List[plt.Figure]: + pages: List[plt.Figure] = [] + + # Textseiten: pro Gruppe Detail (ggf. mehrere) + # Wir machen je Gruppe eine Seite, wenn es nicht zu viele sind + for g in sorted(detail["group"].unique().tolist()): + gdet = detail[detail["group"] == g].sort_values("person") + total_c = float(group_summary[group_summary["group"] == g]["total_contrib"].iloc[0]) if (group_summary["group"] == g).any() else 0.0 + u_count = int(group_summary[group_summary["group"] == g]["u_count"].iloc[0]) if (group_summary["group"] == g).any() else 0 + mode = "usage" if u_count > 0 else "equal" + + lines = [ + f"Gruppe: {g}", + f"Total Contribution: {total_c:.2f} €", + f"Mode: {mode}", + "", + "Person | contributed | usage | share | balance", + ] + for _, r in gdet.iterrows(): + lines.append( + f"{r['person']}: {r['contributed']:.2f} €; {r['usage']:.4f}; {r['share']:.2f} €; {r['balance']:.2f} €" + ) + + fig, ax = plt.subplots(figsize=(8.27, 11.69)) + ax.axis("off") + ax.text(0, 1, "\n".join(lines), va="top", ha="left", fontproperties=mono_font) + pages.append(fig) + + # Optional: Nutzungsverläufe für Gruppen mit unit "km" + # (nur wenn U vorhanden und unit in den U-rows km ist) + # Dafür brauchen wir zeitliche Daten → aus detail nicht möglich, also direkt aus df wäre besser. + # Wenn du willst, ergänze ich das als eigene Seite pro km-Gruppe auf Basis der Original-DF. + + return pages + |
