aboutsummaryrefslogtreecommitdiffstats
path: root/modules/general.py
diff options
context:
space:
mode:
Diffstat (limited to 'modules/general.py')
-rw-r--r--modules/general.py560
1 files changed, 560 insertions, 0 deletions
diff --git a/modules/general.py b/modules/general.py
new file mode 100644
index 0000000..f7eca38
--- /dev/null
+++ b/modules/general.py
@@ -0,0 +1,560 @@
+from typing import Any, Dict, List, Tuple
+
+import pandas as pd
+import matplotlib.pyplot as plt
+from matplotlib.axes import Axes
+from matplotlib.font_manager import FontProperties
+
+import numpy as np
+import matplotlib.dates as mdates
+from dataclasses import dataclass
+from typing import Optional
+
+from .base import Frame, BigFrame, ModuleResult
+
+
+MONEY_UNITS = {"€", "eur", "EUR", "euro", "EURO"}
+
+
+def _is_money_unit(u: str) -> bool:
+ return str(u).strip() in MONEY_UNITS
+
+
+def compute_group_distribution(df: pd.DataFrame):
+ """
+ Liefert:
+ group_summary: dict group -> info
+ per_person: DataFrame columns [person, contributed, share, balance]
+ per_group_person: DataFrame detail columns [group, person, contributed, usage, share, balance]
+ """
+ # Explode Gruppen
+ work = df.copy()
+ work = work.explode("dist_groups")
+ work["group"] = work["dist_groups"].fillna("").astype(str).str.strip()
+ work = work[work["group"] != ""]
+
+ # C/U Normalisierung
+ work["flag"] = work["Distributionsflag"].astype(str).str.strip().str.upper()
+ work["person"] = work["Nutzer"].astype(str).str.strip()
+
+ # Contributions (Geld)
+ contrib = work[work["flag"] == "C"].copy()
+ if len(contrib) > 0:
+ bad_units = contrib[~contrib["unit"].apply(_is_money_unit)]
+ if len(bad_units) > 0:
+ raise ValueError(
+ "Contribution (C) muss Geld-Einheit haben (z.B. € / EUR). "
+ f"Problemzeilen:\n{bad_units[['Datum','Nutzer','group','Positionsbezeichnung','Positionswert','unit']]}"
+ )
+
+ # Usage (Beliebige Einheit, pro Gruppe sollte es sinnvoll einheitlich sein)
+ usage = work[work["flag"] == "U"].copy()
+
+ # Summen
+ contrib_by_gp = contrib.groupby(["group", "person"])["value"].sum().rename("contributed").reset_index()
+ contrib_tot = contrib.groupby("group")["value"].sum().rename("total_contrib").reset_index()
+
+ usage_by_gp = usage.groupby(["group", "person"])["value"].sum().rename("usage").reset_index()
+ usage_tot = usage.groupby("group")["value"].sum().rename("total_usage").reset_index()
+ usage_unit = usage.groupby("group")["unit"].agg(lambda s: s.dropna().astype(str).unique().tolist()).reset_index()
+ usage_unit = usage_unit.rename(columns={"unit": "usage_units"})
+
+ participants = work.groupby("group")["person"].agg(lambda s: sorted(set(s.tolist()))).reset_index()
+ participants = participants.rename(columns={"person": "participants"})
+
+ # group_summary
+ summary = (
+ participants.merge(contrib_tot, on="group", how="left")
+ .merge(usage_tot, on="group", how="left")
+ .merge(usage_unit, on="group", how="left")
+ )
+ summary["total_contrib"] = summary["total_contrib"].fillna(0.0)
+ summary["total_usage"] = summary["total_usage"].fillna(0.0)
+ summary["has_usage"] = summary["total_usage"].apply(lambda x: x > 0)
+ summary["mode"] = summary.apply(lambda r: "usage" if r["has_usage"] else "equal", axis=1)
+
+ # Detail pro (group, person)
+ detail = (
+ pd.DataFrame({"group": work["group"].unique()})
+ .assign(key=1)
+ .merge(pd.DataFrame({"person": work["person"].unique()}).assign(key=1), on="key")
+ .drop(columns=["key"])
+ )
+ # Nur relevante Paare, die in der Gruppe vorkommen
+ gp_person = work[["group", "person"]].drop_duplicates()
+ detail = detail.merge(gp_person, on=["group", "person"], how="inner")
+
+ detail = detail.merge(contrib_by_gp, on=["group", "person"], how="left").merge(usage_by_gp, on=["group", "person"], how="left")
+ detail["contributed"] = detail["contributed"].fillna(0.0)
+ detail["usage"] = detail["usage"].fillna(0.0)
+
+ # Shares berechnen pro Gruppe
+ shares = []
+ for _, row in summary.iterrows():
+ g = row["group"]
+ total_c = float(row["total_contrib"] or 0.0)
+ parts = row["participants"] or []
+ n = len(parts) if parts else 0
+
+ g_detail = detail[detail["group"] == g].copy()
+ # usage-mode, sobald es irgendeine U-Position gibt (auch wenn total_usage==0 → fallback)
+ g_has_any_u = (usage["group"] == g).any()
+
+ if g_has_any_u:
+ total_u = float(g_detail["usage"].sum())
+ if total_u > 0:
+ g_detail["share"] = g_detail["usage"] / total_u * total_c
+ mode = "usage"
+ else:
+ # fallback: gleichmäßig unter Teilnehmern der Gruppe
+ g_detail["share"] = (total_c / n) if n else 0.0
+ mode = "equal(fallback)"
+ else:
+ g_detail["share"] = (total_c / n) if n else 0.0
+ mode = "equal"
+
+ g_detail["mode"] = mode
+ shares.append(g_detail[["group", "person", "share", "mode"]])
+
+ shares_df = pd.concat(shares, ignore_index=True) if shares else pd.DataFrame(columns=["group","person","share","mode"])
+ detail = detail.merge(shares_df, on=["group", "person"], how="left")
+ detail["share"] = detail["share"].fillna(0.0)
+ detail["balance"] = detail["contributed"] - detail["share"]
+
+ # per_person totals
+ per_person = detail.groupby("person")[["contributed", "share", "balance"]].sum().reset_index()
+ per_person = per_person.sort_values("person")
+
+ # summary erweitern
+ # "Sobald es eine Position mit U gibt" zählt, auch wenn total_usage==0 (fallback)
+ has_any_u = usage.groupby("group").size().rename("u_count").reset_index()
+ summary = summary.merge(has_any_u, on="group", how="left")
+ summary["u_count"] = summary["u_count"].fillna(0).astype(int)
+ summary["mode"] = summary["u_count"].apply(lambda c: "usage" if c > 0 else "equal")
+
+ return summary, per_person, detail
+
+@dataclass
+class GroupTimeSeries:
+ group: str
+ times: pd.DatetimeIndex
+ participants: List[str]
+ usage_units: List[str]
+ xlim_start: pd.Timestamp
+ xlim_end: pd.Timestamp
+ contrib_cum: Dict[str, pd.Series] # € kumulativ
+ usage_cum: Dict[str, pd.Series] # unit kumulativ (z.B. km, stk)
+ share_cum: Dict[str, pd.Series] # € kumulativ (Anteil)
+ ratio: Dict[str, pd.Series] # Anteil/Ausgelegt
+
+
+def _auto_time_limits(tmin: pd.Timestamp, tmax: pd.Timestamp) -> tuple[pd.Timestamp, pd.Timestamp]:
+ # +/- 5% Intervall, bei 0 Intervall fallback 30 Minuten
+ dt = tmax - tmin
+ if dt <= pd.Timedelta(0):
+ margin = pd.Timedelta(minutes=30)
+ else:
+ margin = dt * 0.05
+ return (tmin - margin, tmax + margin)
+
+
+def _prepare_group_timeseries(df: pd.DataFrame, group: str) -> Optional[GroupTimeSeries]:
+ # explode Gruppen und filtere
+ work = df.copy().explode("dist_groups")
+ work["group"] = work["dist_groups"].fillna("").astype(str).str.strip()
+ work = work[work["group"] == group].copy()
+
+ work = work[pd.notna(work["Datum"])]
+ if work.empty:
+ return None
+
+ work["person"] = work["Nutzer"].astype(str).str.strip()
+ work["flag"] = work["Distributionsflag"].astype(str).str.strip().str.upper()
+
+ participants = sorted(work["person"].unique().tolist())
+
+ # timeline: alle Zeitpunkte der Gruppe (unique, sortiert)
+ times = pd.DatetimeIndex(sorted(work["Datum"].unique()))
+ tmin, tmax = times.min(), times.max()
+ x0, x1 = _auto_time_limits(tmin, tmax)
+
+ times = times.union(pd.DatetimeIndex([x0, x1])).sort_values()
+
+ # usage units (kann leer sein, oder mehrere – wir zeigen dann z.B. "km/stk")
+ usage_units = sorted(
+ work.loc[work["flag"] == "U", "unit"]
+ .dropna()
+ .astype(str)
+ .str.strip()
+ .unique()
+ .tolist()
+ )
+
+ # pro Person: Beiträge (C) und Nutzung (U) als kumulatives step-series auf timeline
+ contrib_cum: Dict[str, pd.Series] = {}
+ usage_cum: Dict[str, pd.Series] = {}
+
+ for p in participants:
+ c = work[(work["person"] == p) & (work["flag"] == "C")].copy()
+ u = work[(work["person"] == p) & (work["flag"] == "U")].copy()
+
+ # Beiträge: nach Datum aggregieren, reindex auf timeline, kumulieren
+ c_by_t = c.groupby("Datum")["value"].sum() if not c.empty else pd.Series(dtype=float)
+ c_by_t = c_by_t.reindex(times, fill_value=0.0)
+ contrib_cum[p] = c_by_t.cumsum()
+
+ # Nutzung: nach Datum aggregieren, reindex auf timeline, kumulieren
+ u_by_t = u.groupby("Datum")["value"].sum() if not u.empty else pd.Series(dtype=float)
+ u_by_t = u_by_t.reindex(times, fill_value=0.0)
+ usage_cum[p] = u_by_t.cumsum()
+
+ # share über Zeit: kumulative total contributions verteilt
+ total_contrib = sum((contrib_cum[p] for p in participants), start=pd.Series(0.0, index=times))
+ total_usage = sum((usage_cum[p] for p in participants), start=pd.Series(0.0, index=times))
+
+ has_any_u = (work["flag"] == "U").any()
+ n = len(participants) if participants else 1
+
+ share_cum: Dict[str, pd.Series] = {}
+ if has_any_u:
+ # usage-mode sobald U existiert; solange total_usage==0 => equal fallback
+ for p in participants:
+ # share = total_contrib * usage_p / total_usage, sonst total_contrib/n
+ usage_p = usage_cum[p]
+ with np.errstate(divide="ignore", invalid="ignore"):
+ share_usage = total_contrib * (usage_p / total_usage.replace(0.0, np.nan))
+ share_equal = total_contrib / float(n)
+ share = share_usage.where(total_usage > 0, share_equal)
+ share_cum[p] = share.fillna(0.0)
+ else:
+ # equal-mode immer
+ equal = total_contrib / float(n)
+ for p in participants:
+ share_cum[p] = equal
+
+ ratio: Dict[str, pd.Series] = {}
+ for p in participants:
+ denom = contrib_cum[p].astype(float)
+ r = share_cum[p].astype(float) / denom.where(denom > 0, np.nan)
+ ratio[p] = r.fillna(0.0)
+
+ return GroupTimeSeries(
+ group=group,
+ times=times,
+ participants=participants,
+ usage_units=usage_units,
+ xlim_start=x0,
+ xlim_end=x1,
+ contrib_cum=contrib_cum,
+ usage_cum=usage_cum,
+ share_cum=share_cum,
+ ratio=ratio,
+ )
+
+
+@dataclass
+class GroupChartBigFrame(BigFrame):
+ """
+ kind:
+ - 'usage_cum'
+ - 'contrib_cum'
+ - 'share_cum'
+ - 'ratio'
+ """
+ gts: GroupTimeSeries
+ kind: str
+
+ def render(self, ax: Axes, mono_font: FontProperties) -> None:
+ ax.axis("on")
+
+ locator = mdates.AutoDateLocator(minticks=3, maxticks=7)
+ formatter = mdates.ConciseDateFormatter(locator)
+ ax.xaxis.set_major_locator(locator)
+ ax.xaxis.set_major_formatter(formatter)
+ ax.xaxis.get_offset_text().set_visible(False) # <-- "2025-Dec" weg
+
+ ax.set_xlim(self.gts.xlim_start, self.gts.xlim_end)
+
+ if self.kind == "usage_cum":
+ series_map = self.gts.usage_cum
+ unit = "/".join(self.gts.usage_units) if self.gts.usage_units else ""
+ ax.set_ylabel(f"Verbrauch kumulativ {unit}".strip(), fontproperties=mono_font)
+
+ elif self.kind == "contrib_cum":
+ series_map = self.gts.contrib_cum
+ ax.set_ylabel("Contributions kumulativ €", fontproperties=mono_font)
+
+ elif self.kind == "share_cum":
+ series_map = self.gts.share_cum
+ ax.set_ylabel("Anteil kumulativ €", fontproperties=mono_font)
+
+ elif self.kind == "ratio":
+ series_map = self.gts.ratio
+ ax.set_ylabel("Anteil / Ausgelegt", fontproperties=mono_font)
+ ax.set_yscale("log") # <-- LOG
+
+ else:
+ raise ValueError(f"Unknown kind: {self.kind}")
+
+ # Plot + Sammeln für robuste y-Limits
+ all_vals = []
+
+ min_ratio = 1e-3 # „quasi 0“ für log, damit Kurven am Anfang nicht "mittendrin" starten
+
+ for p in self.gts.participants:
+ y = series_map[p].copy()
+
+ if self.kind == "ratio":
+ # NaN/0/Inf behandeln, damit die Kurve von Anfang an existiert
+ y = y.replace([np.inf, -np.inf], np.nan)
+ y = y.fillna(min_ratio)
+ y = y.clip(lower=min_ratio)
+ else:
+ y = y.replace([np.inf, -np.inf], np.nan).fillna(0.0)
+
+ # Steps für kumulative Kurven ist meist sauberer
+ ax.plot(self.gts.times, y.values, label=p, linewidth=1, drawstyle="steps-post")
+
+ v = y.values
+ v = v[np.isfinite(v)]
+ if v.size:
+ all_vals.append(v)
+
+ # y-Limits so setzen, dass wirklich ALLE Werte sichtbar sind
+ if all_vals:
+ vv = np.concatenate(all_vals)
+
+ if self.kind in ("usage_cum", "contrib_cum", "share_cum"):
+ vmax = float(np.nanmax(vv)) if vv.size else 0.0
+ if vmax <= 0:
+ ax.set_ylim(0, 1)
+ else:
+ ax.set_ylim(0, vmax * 1.08) # kleiner Puffer
+
+ elif self.kind == "ratio":
+ vpos = vv[vv > 0]
+ if vpos.size:
+ vmin = float(np.nanmin(vpos))
+ vmax = float(np.nanmax(vpos))
+ ax.set_ylim(vmin / 1.5, vmax * 1.5) # log: multiplicative padding
+
+ ax.grid(True, alpha=0.2)
+
+ leg = ax.legend(prop=mono_font, fontsize=7, loc="best", ncols=2)
+ if leg:
+ for t in leg.get_texts():
+ t.set_fontproperties(mono_font)
+
+ # Tick-Fonts monospace
+ for tick in ax.get_xticklabels() + ax.get_yticklabels():
+ tick.set_fontproperties(mono_font)
+
+@dataclass
+class TextFrame(Frame):
+ text: str
+
+ def render(self, ax: Axes, mono_font: FontProperties) -> None:
+ ax.text(0, 1, self.text, va="top", ha="left", fontproperties=mono_font)
+
+@dataclass
+class PlotBigFrame(BigFrame):
+ per_person: pd.DataFrame # erwartet Spalten: person, contributed, share
+
+ def render(self, ax: Axes, mono_font: FontProperties) -> None:
+ # Axes ist schon da, wir zeichnen direkt hinein
+ ax.axis("on")
+ plot_df = self.per_person.set_index("person")[["contributed", "share"]]
+ plot_df.plot.bar(ax=ax)
+ ax.tick_params(axis="x", rotation=0)
+ leg = ax.legend(prop=mono_font)
+ if leg:
+ for t in leg.get_texts():
+ t.set_fontproperties(mono_font)
+
+ for tick in ax.get_xticklabels() + ax.get_yticklabels():
+ tick.set_fontproperties(mono_font)
+
+ ax.xaxis.label.set_fontproperties(mono_font)
+ ax.yaxis.label.set_fontproperties(mono_font)
+
+
+class GeneralModule:
+ name = "general"
+
+ def process(self, df: pd.DataFrame, context: Dict[str, Any]) -> ModuleResult:
+ want_pdf = bool(context.get("want_pdf", True))
+
+ mono_font = context.get("mono_font") or FontProperties(family="DejaVu Sans Mono", size=8)
+
+ group_summary, per_person, detail = compute_group_distribution(df)
+
+ balance = {r["person"]: float(r["balance"]) for _, r in per_person.iterrows()}
+ payments = self._minimize_payments(balance)
+
+ # ---- NEU: Textauswertung für Konsole
+ summary_lines = []
+ summary_lines.append("General – Verteilung über Distributionsgruppen")
+ summary_lines.append("")
+ summary_lines.append("Gruppen:")
+ for _, r in group_summary.sort_values("group").iterrows():
+ g = r["group"]
+ total_c = float(r.get("total_contrib", 0.0))
+ u_count = int(r.get("u_count", 0))
+ mode = "usage" if u_count > 0 else "equal"
+ participants = r.get("participants", []) or []
+ summary_lines.append(f" - {g}: {total_c:.2f} €; mode={mode}; teilnehmer={len(participants)}")
+
+ summary_lines.append("")
+ summary_lines.append("Personen (Summe über alle Gruppen):")
+ for _, r in per_person.sort_values("person").iterrows():
+ summary_lines.append(
+ f" - {r['person']}: ausgelegt={r['contributed']:.2f} €; anteil={r['share']:.2f} €; saldo={r['balance']:.2f} €"
+ )
+
+ summary_lines.append("")
+ summary_lines.append("Ausgleich (minimiert):")
+ if payments:
+ for p, r, a in payments:
+ summary_lines.append(f" - {p} → {r}: {a:.2f} €")
+ else:
+ summary_lines.append(" (keine Zahlungen nötig)")
+
+ summary_text = "\n".join(summary_lines)
+
+ frames: List[Frame] = []
+ bigframes: List[BigFrame] = []
+ pages: List[plt.Figure] = []
+
+ if want_pdf:
+ frames.extend(self._make_frames(group_summary, per_person, payments))
+
+ # BigFrame: Gesamt-Balkenplot bleibt (wie vorher)
+ bigframes.append(
+ PlotBigFrame(
+ title="General – Ausgelegt vs Anteil (Summe über Gruppen)",
+ per_person=per_person.copy(),
+ )
+ )
+
+ # NEU: pro Distributionsgruppe 4 BigFrame-Charts
+ for g in sorted(group_summary["group"].unique().tolist()):
+ gts = _prepare_group_timeseries(df, g)
+ if not gts:
+ continue
+
+ bigframes.append(GroupChartBigFrame(
+ title=f"{g} – Kumulativer Verbrauch pro Person",
+ gts=gts,
+ kind="usage_cum",
+ ))
+ bigframes.append(GroupChartBigFrame(
+ title=f"{g} – Kumulative Contributions pro Person",
+ gts=gts,
+ kind="contrib_cum",
+ ))
+ bigframes.append(GroupChartBigFrame(
+ title=f"{g} – Anteil pro Person (zeitlicher Verlauf)",
+ gts=gts,
+ kind="share_cum",
+ ))
+ bigframes.append(GroupChartBigFrame(
+ title=f"{g} – Verhältnis Anteil/Ausgelegt (zeitlicher Verlauf)",
+ gts=gts,
+ kind="ratio",
+ ))
+
+ # Pages: nur noch Detailseiten, keine Balkenplot-Seite mehr
+ pages.extend(self._make_pages(group_summary, per_person, detail, mono_font))
+
+ return ModuleResult(summary_text=summary_text, frames=frames, bigframes=bigframes, pages=pages)
+
+ def _minimize_payments(self, balance: Dict[str, float]):
+ receivers = []
+ payers = []
+ for p, amt in balance.items():
+ a = round(float(amt), 2)
+ if a > 0:
+ receivers.append([p, a])
+ elif a < 0:
+ payers.append([p, -a])
+
+ out = []
+ i = j = 0
+ while i < len(payers) and j < len(receivers):
+ payer, avail = payers[i]
+ recv, need = receivers[j]
+ pay = min(avail, need)
+ out.append((payer, recv, pay))
+ payers[i][1] -= pay
+ receivers[j][1] -= pay
+ if round(payers[i][1], 2) == 0:
+ i += 1
+ if round(receivers[j][1], 2) == 0:
+ j += 1
+ return out
+
+ def _make_frames(self, group_summary: pd.DataFrame, per_person: pd.DataFrame, payments: List[Tuple[str,str,float]]) -> List[Frame]:
+ # Frame 1: Gruppen-Übersicht
+ lines = ["Gruppenübersicht:"]
+ for _, r in group_summary.sort_values("group").iterrows():
+ g = r["group"]
+ total_c = float(r.get("total_contrib", 0.0))
+ u_count = int(r.get("u_count", 0))
+ parts = r.get("participants", [])
+ mode = "usage" if u_count > 0 else "equal"
+ lines.append(f"- {g}: {total_c:.2f} €; mode={mode}; teilnehmer={len(parts)}")
+
+ f1 = TextFrame(title="General: Gruppen", text="\n".join(lines))
+
+ # Frame 2: Personen-Totale
+ lines = ["Personen (Summe über alle Gruppen):", "Person | contributed | share | balance"]
+ for _, r in per_person.iterrows():
+ lines.append(f"{r['person']}: {r['contributed']:.2f} €; {r['share']:.2f} €; {r['balance']:.2f} €")
+ f2 = TextFrame(title="General: Personen", text="\n".join(lines))
+
+ # Frame 3: Ausgleich
+ lines = ["Ausgleich (minimiert):"]
+ if payments:
+ for p, r, a in payments:
+ lines.append(f"{p} → {r}: {a:.2f} €")
+ else:
+ lines.append("(keine Zahlungen nötig)")
+ f3 = TextFrame(title="General: Ausgleich", text="\n".join(lines))
+
+ return [f1, f2, f3]
+
+ def _make_pages(self, group_summary, per_person, detail, mono_font) -> List[plt.Figure]:
+ pages: List[plt.Figure] = []
+
+ # Textseiten: pro Gruppe Detail (ggf. mehrere)
+ # Wir machen je Gruppe eine Seite, wenn es nicht zu viele sind
+ for g in sorted(detail["group"].unique().tolist()):
+ gdet = detail[detail["group"] == g].sort_values("person")
+ total_c = float(group_summary[group_summary["group"] == g]["total_contrib"].iloc[0]) if (group_summary["group"] == g).any() else 0.0
+ u_count = int(group_summary[group_summary["group"] == g]["u_count"].iloc[0]) if (group_summary["group"] == g).any() else 0
+ mode = "usage" if u_count > 0 else "equal"
+
+ lines = [
+ f"Gruppe: {g}",
+ f"Total Contribution: {total_c:.2f} €",
+ f"Mode: {mode}",
+ "",
+ "Person | contributed | usage | share | balance",
+ ]
+ for _, r in gdet.iterrows():
+ lines.append(
+ f"{r['person']}: {r['contributed']:.2f} €; {r['usage']:.4f}; {r['share']:.2f} €; {r['balance']:.2f} €"
+ )
+
+ fig, ax = plt.subplots(figsize=(8.27, 11.69))
+ ax.axis("off")
+ ax.text(0, 1, "\n".join(lines), va="top", ha="left", fontproperties=mono_font)
+ pages.append(fig)
+
+ # Optional: Nutzungsverläufe für Gruppen mit unit "km"
+ # (nur wenn U vorhanden und unit in den U-rows km ist)
+ # Dafür brauchen wir zeitliche Daten → aus detail nicht möglich, also direkt aus df wäre besser.
+ # Wenn du willst, ergänze ich das als eigene Seite pro km-Gruppe auf Basis der Original-DF.
+
+ return pages
+