from typing import Any, Dict, List, Tuple import pandas as pd import matplotlib.pyplot as plt from matplotlib.axes import Axes from matplotlib.font_manager import FontProperties import numpy as np import matplotlib.dates as mdates from dataclasses import dataclass from typing import Optional from .base import Frame, BigFrame, ModuleResult MONEY_UNITS = {"€", "eur", "EUR", "euro", "EURO"} def _is_money_unit(u: str) -> bool: return str(u).strip() in MONEY_UNITS def compute_group_distribution(df: pd.DataFrame): work = df.copy() work = work.explode("dist_groups") work["group"] = work["dist_groups"].fillna("").astype(str).str.strip() work = work[work["group"] != ""] work["flag"] = work["group_flag"].astype(str).str.strip().str.upper() work["debitor"] = work["debitor"].astype(str).str.strip() contrib = work[work["flag"] == "C"].copy() if len(contrib) > 0: bad_units = contrib[~contrib["unit"].apply(_is_money_unit)] if len(bad_units) > 0: raise ValueError( "Contribution (C) muss Geld-Einheit haben (z.B. € / EUR). " f"Problemzeilen:\n{bad_units[['date','debitor','group','position','val','unit']]}" ) usage = work[work["flag"] == "U"].copy() contrib_by_gp = contrib.groupby(["group", "debitor"])["val"].sum().rename("contributed").reset_index() contrib_tot = contrib.groupby("group")["val"].sum().rename("total_contrib").reset_index() usage_by_gp = usage.groupby(["group", "debitor"])["val"].sum().rename("usage").reset_index() usage_tot = usage.groupby("group")["val"].sum().rename("total_usage").reset_index() usage_unit = usage.groupby("group")["unit"].agg(lambda s: s.dropna().astype(str).unique().tolist()).reset_index() usage_unit = usage_unit.rename(columns={"unit": "usage_units"}) participants = work.groupby("group")["debitor"].agg(lambda s: sorted(set(s.tolist()))).reset_index() participants = participants.rename(columns={"debitor": "participants"}) summary = ( participants.merge(contrib_tot, on="group", how="left") .merge(usage_tot, on="group", how="left") .merge(usage_unit, on="group", how="left") ) summary["total_contrib"] = summary["total_contrib"].fillna(0.0) summary["total_usage"] = summary["total_usage"].fillna(0.0) summary["has_usage"] = summary["total_usage"].apply(lambda x: x > 0) summary["mode"] = summary.apply(lambda r: "usage" if r["has_usage"] else "equal", axis=1) detail = ( pd.DataFrame({"group": work["group"].unique()}) .assign(key=1) .merge(pd.DataFrame({"debitor": work["debitor"].unique()}).assign(key=1), on="key") .drop(columns=["key"]) ) gp_debitor = work[["group", "debitor"]].drop_duplicates() detail = detail.merge(gp_debitor, on=["group", "debitor"], how="inner") detail = detail.merge(contrib_by_gp, on=["group", "debitor"], how="left").merge(usage_by_gp, on=["group", "debitor"], how="left") detail["contributed"] = detail["contributed"].fillna(0.0) detail["usage"] = detail["usage"].fillna(0.0) shares = [] for _, row in summary.iterrows(): g = row["group"] total_c = float(row["total_contrib"] or 0.0) parts = row["participants"] or [] n = len(parts) if parts else 0 g_detail = detail[detail["group"] == g].copy() g_has_any_u = (usage["group"] == g).any() if g_has_any_u: total_u = float(g_detail["usage"].sum()) if total_u > 0: g_detail["share"] = g_detail["usage"] / total_u * total_c mode = "usage" else: # fallback: gleichmäßig unter Teilnehmern der Gruppe g_detail["share"] = (total_c / n) if n else 0.0 mode = "equal(fallback)" else: g_detail["share"] = (total_c / n) if n else 0.0 mode = "equal" g_detail["mode"] = mode shares.append(g_detail[["group", "debitor", "share", "mode"]]) shares_df = pd.concat(shares, ignore_index=True) if shares else pd.DataFrame(columns=["group","debitor","share","mode"]) detail = detail.merge(shares_df, on=["group", "debitor"], how="left") detail["share"] = detail["share"].fillna(0.0) detail["balance"] = detail["contributed"] - detail["share"] per_debitor = detail.groupby("debitor")[["contributed", "share", "balance"]].sum().reset_index() per_debitor = per_debitor.sort_values("debitor") has_any_u = usage.groupby("group").size().rename("u_count").reset_index() summary = summary.merge(has_any_u, on="group", how="left") summary["u_count"] = summary["u_count"].fillna(0).astype(int) summary["mode"] = summary["u_count"].apply(lambda c: "usage" if c > 0 else "equal") return summary, per_debitor, detail @dataclass class GroupTimeSeries: group: str times: pd.DatetimeIndex participants: List[str] usage_units: List[str] xlim_start: pd.Timestamp xlim_end: pd.Timestamp contrib_cum: Dict[str, pd.Series] usage_cum: Dict[str, pd.Series] share_cum: Dict[str, pd.Series] ratio: Dict[str, pd.Series] def _auto_time_limits(tmin: pd.Timestamp, tmax: pd.Timestamp) -> tuple[pd.Timestamp, pd.Timestamp]: dt = tmax - tmin if dt <= pd.Timedelta(0): margin = pd.Timedelta(minutes=30) else: margin = dt * 0.05 return (tmin - margin, tmax + margin) def _prepare_group_timeseries(df: pd.DataFrame, group: str) -> Optional[GroupTimeSeries]: work = df.copy().explode("dist_groups") work["group"] = work["dist_groups"].fillna("").astype(str).str.strip() work = work[work["group"] == group].copy() work = work[pd.notna(work["date"])] if work.empty: return None work["debitor"] = work["debitor"].astype(str).str.strip() work["flag"] = work["group_flag"].astype(str).str.strip().str.upper() participants = sorted(work["debitor"].unique().tolist()) times = pd.DatetimeIndex(sorted(work["date"].unique())) tmin, tmax = times.min(), times.max() x0, x1 = _auto_time_limits(tmin, tmax) times = times.union(pd.DatetimeIndex([x0, x1])).sort_values() usage_units = sorted( work.loc[work["flag"] == "U", "unit"] .dropna() .astype(str) .str.strip() .unique() .tolist() ) contrib_cum: Dict[str, pd.Series] = {} usage_cum: Dict[str, pd.Series] = {} for p in participants: c = work[(work["debitor"] == p) & (work["flag"] == "C")].copy() u = work[(work["debitor"] == p) & (work["flag"] == "U")].copy() c_by_t = c.groupby("date")["val"].sum() if not c.empty else pd.Series(dtype=float) c_by_t = c_by_t.reindex(times, fill_value=0.0) contrib_cum[p] = c_by_t.cumsum() u_by_t = u.groupby("date")["val"].sum() if not u.empty else pd.Series(dtype=float) u_by_t = u_by_t.reindex(times, fill_value=0.0) usage_cum[p] = u_by_t.cumsum() total_contrib = sum((contrib_cum[p] for p in participants), start=pd.Series(0.0, index=times)) total_usage = sum((usage_cum[p] for p in participants), start=pd.Series(0.0, index=times)) has_any_u = (work["flag"] == "U").any() n = len(participants) if participants else 1 share_cum: Dict[str, pd.Series] = {} if has_any_u: for p in participants: usage_p = usage_cum[p] with np.errstate(divide="ignore", invalid="ignore"): share_usage = total_contrib * (usage_p / total_usage.replace(0.0, np.nan)) share_equal = total_contrib / float(n) share = share_usage.where(total_usage > 0, share_equal) share_cum[p] = share.fillna(0.0) else: equal = total_contrib / float(n) for p in participants: share_cum[p] = equal ratio: Dict[str, pd.Series] = {} for p in participants: denom = contrib_cum[p].astype(float) r = share_cum[p].astype(float) / denom.where(denom > 0, np.nan) ratio[p] = r.fillna(0.0) return GroupTimeSeries( group=group, times=times, participants=participants, usage_units=usage_units, xlim_start=x0, xlim_end=x1, contrib_cum=contrib_cum, usage_cum=usage_cum, share_cum=share_cum, ratio=ratio, ) @dataclass class GroupChartBigFrame(BigFrame): gts: GroupTimeSeries kind: str def render(self, ax: Axes, mono_font: FontProperties) -> None: ax.axis("on") locator = mdates.AutoDateLocator(minticks=3, maxticks=7) formatter = mdates.ConciseDateFormatter(locator) ax.xaxis.set_major_locator(locator) ax.xaxis.set_major_formatter(formatter) ax.xaxis.get_offset_text().set_visible(False) ax.set_xlim(self.gts.xlim_start, self.gts.xlim_end) if self.kind == "usage_cum": series_map = self.gts.usage_cum unit = "/".join(self.gts.usage_units) if self.gts.usage_units else "" ax.set_ylabel(f"Usage cumulative {unit}".strip(), fontproperties=mono_font) elif self.kind == "contrib_cum": series_map = self.gts.contrib_cum ax.set_ylabel("Contribution cumulative €", fontproperties=mono_font) elif self.kind == "share_cum": series_map = self.gts.share_cum ax.set_ylabel("Share cumulative €", fontproperties=mono_font) elif self.kind == "ratio": series_map = self.gts.ratio ax.set_ylabel("Share / Contribution ratio (logarithmic)", fontproperties=mono_font) ax.set_yscale("log") else: raise ValueError(f"Unknown kind: {self.kind}") all_vals = [] min_ratio = 1e-3 for p in self.gts.participants: y = series_map[p].copy() if self.kind == "ratio": y = y.replace([np.inf, -np.inf], np.nan) y = y.fillna(min_ratio) y = y.clip(lower=min_ratio) else: y = y.replace([np.inf, -np.inf], np.nan).fillna(0.0) ax.plot(self.gts.times, y.values, label=p, linewidth=1, drawstyle="steps-post") v = y.values v = v[np.isfinite(v)] if v.size: all_vals.append(v) if all_vals: vv = np.concatenate(all_vals) if self.kind in ("usage_cum", "contrib_cum", "share_cum"): vmax = float(np.nanmax(vv)) if vv.size else 0.0 if vmax <= 0: ax.set_ylim(0, 1) else: ax.set_ylim(0, vmax * 1.08) elif self.kind == "ratio": vpos = vv[vv > 0] if vpos.size: vmin = float(np.nanmin(vpos)) vmax = float(np.nanmax(vpos)) ax.set_ylim(vmin / 1.5, vmax * 1.5) ax.grid(True, alpha=0.2) leg = ax.legend(prop=mono_font, fontsize=7, loc="best", ncols=2) if leg: for t in leg.get_texts(): t.set_fontproperties(mono_font) for tick in ax.get_xticklabels() + ax.get_yticklabels(): tick.set_fontproperties(mono_font) @dataclass class TextFrame(Frame): text: str def render(self, ax: Axes, mono_font: FontProperties) -> None: ax.text(0, 1, self.text, va="top", ha="left", fontproperties=mono_font) @dataclass class PlotBigFrame(BigFrame): per_debitor: pd.DataFrame def render(self, ax: Axes, mono_font: FontProperties) -> None: ax.axis("on") plot_df = self.per_debitor.set_index("debitor")[["contributed", "share"]] plot_df.plot.bar(ax=ax) ax.tick_params(axis="x", rotation=0) leg = ax.legend(prop=mono_font) if leg: for t in leg.get_texts(): t.set_fontproperties(mono_font) for tick in ax.get_xticklabels() + ax.get_yticklabels(): tick.set_fontproperties(mono_font) ax.xaxis.label.set_fontproperties(mono_font) ax.yaxis.label.set_fontproperties(mono_font) class GeneralModule: name = "general" def process(self, df: pd.DataFrame, context: Dict[str, Any]) -> ModuleResult: want_pdf = bool(context.get("want_pdf", True)) mono_font = context.get("mono_font") or FontProperties(family="DejaVu Sans Mono", size=8) group_summary, per_debitor, detail = compute_group_distribution(df) balance = {r["debitor"]: float(r["balance"]) for _, r in per_debitor.iterrows()} payments = self._minimize_payments(balance) summary_lines = [] summary_lines.append("General") summary_lines.append("") summary_lines.append("Goups:") for _, r in group_summary.sort_values("group").iterrows(): g = r["group"] total_c = float(r.get("total_contrib", 0.0)) u_count = int(r.get("u_count", 0)) mode = "usage" if u_count > 0 else "equal" participants = r.get("participants", []) or [] summary_lines.append(f" - {g}: {total_c:.2f} €; mode={mode}; participants={len(participants)}") summary_lines.append("") summary_lines.append("Debitors (total):") for _, r in per_debitor.sort_values("debitor").iterrows(): summary_lines.append( f" - {r['debitor']}: contributed={r['contributed']:.2f} €; share={r['share']:.2f} €; balance={r['balance']:.2f} €" ) summary_lines.append("") summary_lines.append("Compensation (minimized):") if payments: for p, r, a in payments: summary_lines.append(f" - {p} → {r}: {a:.2f} €") else: summary_lines.append(" (No compensation required)") summary_text = "\n".join(summary_lines) frames: List[Frame] = [] bigframes: List[BigFrame] = [] pages: List[plt.Figure] = [] if want_pdf: frames.extend(self._make_frames(group_summary, per_debitor, payments)) bigframes.append( PlotBigFrame( title="General – Shares vs. Contributions (total)", per_debitor=per_debitor.copy(), ) ) for g in sorted(group_summary["group"].unique().tolist()): gts = _prepare_group_timeseries(df, g) if not gts: continue bigframes.append(GroupChartBigFrame( title=f"{g} – Cumulative usage per debitor", gts=gts, kind="usage_cum", )) bigframes.append(GroupChartBigFrame( title=f"{g} – Cumulative contributions per debitor", gts=gts, kind="contrib_cum", )) bigframes.append(GroupChartBigFrame( title=f"{g} – Share per debitor", gts=gts, kind="share_cum", )) bigframes.append(GroupChartBigFrame( title=f"{g} – Share / Contribution ratio (logarithmic)", gts=gts, kind="ratio", )) pages.extend(self._make_pages(group_summary, per_debitor, detail, mono_font)) return ModuleResult(summary_text=summary_text, frames=frames, bigframes=bigframes, pages=pages) def _minimize_payments(self, balance: Dict[str, float]): receivers = [] payers = [] for p, amt in balance.items(): a = round(float(amt), 2) if a > 0: receivers.append([p, a]) elif a < 0: payers.append([p, -a]) out = [] i = j = 0 while i < len(payers) and j < len(receivers): payer, avail = payers[i] recv, need = receivers[j] pay = min(avail, need) out.append((payer, recv, pay)) payers[i][1] -= pay receivers[j][1] -= pay if round(payers[i][1], 2) == 0: i += 1 if round(receivers[j][1], 2) == 0: j += 1 return out def _make_frames(self, group_summary: pd.DataFrame, per_debitor: pd.DataFrame, payments: List[Tuple[str,str,float]]) -> List[Frame]: lines = ["Groups:"] for _, r in group_summary.sort_values("group").iterrows(): g = r["group"] total_c = float(r.get("total_contrib", 0.0)) u_count = int(r.get("u_count", 0)) parts = r.get("participants", []) mode = "usage" if u_count > 0 else "equal" lines.append(f"- {g}: {total_c:.2f} €; mode={mode}; participants={len(parts)}") f1 = TextFrame(title="General: Groups", text="\n".join(lines)) lines = ["Debitor total:", "debitor | contributed | share | balance"] for _, r in per_debitor.iterrows(): lines.append(f"{r['debitor']} | {r['contributed']:.2f} € | {r['share']:.2f} € | {r['balance']:.2f} €") f2 = TextFrame(title="General: Debitors", text="\n".join(lines)) lines = ["Compensation (minimized):"] if payments: for p, r, a in payments: lines.append(f"{p} → {r}: {a:.2f} €") else: lines.append("(No compensation required)") f3 = TextFrame(title="General: Compensation", text="\n".join(lines)) return [f1, f2, f3] def _make_pages(self, group_summary, per_debitor, detail, mono_font) -> List[plt.Figure]: pages: List[plt.Figure] = [] for g in sorted(detail["group"].unique().tolist()): gdet = detail[detail["group"] == g].sort_values("debitor") total_c = float(group_summary[group_summary["group"] == g]["total_contrib"].iloc[0]) if (group_summary["group"] == g).any() else 0.0 u_count = int(group_summary[group_summary["group"] == g]["u_count"].iloc[0]) if (group_summary["group"] == g).any() else 0 mode = "usage" if u_count > 0 else "equal" lines = [ f"Group: {g}", f"Total Contribution: {total_c:.2f} €", f"Mode: {mode}", "", "debitor | contributed | usage | share | balance", ] for _, r in gdet.iterrows(): lines.append( f"{r['debitor']} | {r['contributed']:.2f} € | {r['usage']:.4f} | {r['share']:.2f} € | {r['balance']:.2f} €" ) fig, ax = plt.subplots(figsize=(8.27, 11.69)) ax.axis("off") ax.text(0, 1, "\n".join(lines), va="top", ha="left", fontproperties=mono_font) pages.append(fig) return pages