from typing import Any, Dict, List, Tuple import pandas as pd import matplotlib.pyplot as plt from matplotlib.axes import Axes from matplotlib.font_manager import FontProperties import numpy as np import matplotlib.dates as mdates from dataclasses import dataclass from typing import Optional from .base import Frame, BigFrame, ModuleResult MONEY_UNITS = {"€", "eur", "EUR", "euro", "EURO"} def _is_money_unit(u: str) -> bool: return str(u).strip() in MONEY_UNITS def compute_group_distribution(df: pd.DataFrame): work = df.copy() work = work.explode("dist_groups") work["group"] = work["dist_groups"].fillna("").astype(str).str.strip() work = work[work["group"] != ""] work["flag"] = work["group_flag"].astype(str).str.strip().str.upper() work["debitor"] = work["debitor"].astype(str).str.strip() contrib = work[work["flag"] == "C"].copy() if len(contrib) > 0: bad_units = contrib[~contrib["unit"].apply(_is_money_unit)] if len(bad_units) > 0: raise ValueError( "Contribution (C) needs to have currency (e.g. € / EUR). " f"Lines:\n{bad_units[['date','debitor','group','position','val','unit']]}" ) usage = work[work["flag"] == "U"].copy() contrib_by_gp = contrib.groupby(["group", "debitor"])["val"].sum().rename("contributed").reset_index() contrib_tot = contrib.groupby("group")["val"].sum().rename("total_contrib").reset_index() usage_by_gp = usage.groupby(["group", "debitor"])["val"].sum().rename("usage").reset_index() usage_tot = usage.groupby("group")["val"].sum().rename("total_usage").reset_index() usage_unit = usage.groupby("group")["unit"].agg(lambda s: s.dropna().astype(str).unique().tolist()).reset_index() usage_unit = usage_unit.rename(columns={"unit": "usage_units"}) participants = work.groupby("group")["debitor"].agg(lambda s: sorted(set(s.tolist()))).reset_index() participants = participants.rename(columns={"debitor": "participants"}) summary = ( participants.merge(contrib_tot, on="group", how="left") .merge(usage_tot, on="group", how="left") .merge(usage_unit, on="group", how="left") ) summary["total_contrib"] = summary["total_contrib"].fillna(0.0) summary["total_usage"] = summary["total_usage"].fillna(0.0) summary["has_usage"] = summary["total_usage"].apply(lambda x: x > 0) summary["mode"] = summary.apply(lambda r: "usage" if r["has_usage"] else "equal", axis=1) detail = ( pd.DataFrame({"group": work["group"].unique()}) .assign(key=1) .merge(pd.DataFrame({"debitor": work["debitor"].unique()}).assign(key=1), on="key") .drop(columns=["key"]) ) gp_debitor = work[["group", "debitor"]].drop_duplicates() detail = detail.merge(gp_debitor, on=["group", "debitor"], how="inner") detail = detail.merge(contrib_by_gp, on=["group", "debitor"], how="left").merge(usage_by_gp, on=["group", "debitor"], how="left") detail["contributed"] = detail["contributed"].fillna(0.0) detail["usage"] = detail["usage"].fillna(0.0) shares = [] for _, row in summary.iterrows(): g = row["group"] total_c = float(row["total_contrib"] or 0.0) parts = row["participants"] or [] n = len(parts) if parts else 0 g_detail = detail[detail["group"] == g].copy() g_has_any_u = (usage["group"] == g).any() if g_has_any_u: total_u = float(g_detail["usage"].sum()) if total_u > 0: g_detail["share"] = g_detail["usage"] / total_u * total_c mode = "usage" else: # fallback: gleichmäßig unter Teilnehmern der Gruppe g_detail["share"] = (total_c / n) if n else 0.0 mode = "equal(fallback)" else: g_detail["share"] = (total_c / n) if n else 0.0 mode = "equal" g_detail["mode"] = mode shares.append(g_detail[["group", "debitor", "share", "mode"]]) shares_df = pd.concat(shares, ignore_index=True) if shares else pd.DataFrame(columns=["group","debitor","share","mode"]) detail = detail.merge(shares_df, on=["group", "debitor"], how="left") detail["share"] = detail["share"].fillna(0.0) detail["balance"] = detail["contributed"] - detail["share"] per_debitor = detail.groupby("debitor")[["contributed", "share", "balance"]].sum().reset_index() per_debitor = per_debitor.sort_values("debitor") has_any_u = usage.groupby("group").size().rename("u_count").reset_index() summary = summary.merge(has_any_u, on="group", how="left") summary["u_count"] = summary["u_count"].fillna(0).astype(int) summary["mode"] = summary["u_count"].apply(lambda c: "usage" if c > 0 else "equal") return summary, per_debitor, detail @dataclass class GroupTimeSeries: group: str times: pd.DatetimeIndex participants: List[str] usage_units: List[str] xlim_start: pd.Timestamp xlim_end: pd.Timestamp contrib_cum: Dict[str, pd.Series] usage_cum: Dict[str, pd.Series] share_cum: Dict[str, pd.Series] ratio: Dict[str, pd.Series] def _auto_time_limits(tmin: pd.Timestamp, tmax: pd.Timestamp) -> tuple[pd.Timestamp, pd.Timestamp]: dt = tmax - tmin if dt <= pd.Timedelta(0): margin = pd.Timedelta(minutes=30) else: margin = dt * 0.05 return (tmin - margin, tmax + margin) def _prepare_group_timeseries(df: pd.DataFrame, group: str) -> Optional[GroupTimeSeries]: work = df.copy().explode("dist_groups") work["group"] = work["dist_groups"].fillna("").astype(str).str.strip() work = work[work["group"] == group].copy() work = work[pd.notna(work["date"])] if work.empty: return None work["debitor"] = work["debitor"].astype(str).str.strip() work["flag"] = work["group_flag"].astype(str).str.strip().str.upper() participants = sorted(work["debitor"].unique().tolist()) times = pd.DatetimeIndex(sorted(work["date"].unique())) tmin, tmax = times.min(), times.max() x0, x1 = _auto_time_limits(tmin, tmax) times = times.union(pd.DatetimeIndex([x0, x1])).sort_values() usage_units = sorted( work.loc[work["flag"] == "U", "unit"] .dropna() .astype(str) .str.strip() .unique() .tolist() ) contrib_cum: Dict[str, pd.Series] = {} usage_cum: Dict[str, pd.Series] = {} for p in participants: c = work[(work["debitor"] == p) & (work["flag"] == "C")].copy() u = work[(work["debitor"] == p) & (work["flag"] == "U")].copy() c_by_t = c.groupby("date")["val"].sum() if not c.empty else pd.Series(dtype=float) c_by_t = c_by_t.reindex(times, fill_value=0.0) contrib_cum[p] = c_by_t.cumsum() u_by_t = u.groupby("date")["val"].sum() if not u.empty else pd.Series(dtype=float) u_by_t = u_by_t.reindex(times, fill_value=0.0) usage_cum[p] = u_by_t.cumsum() total_contrib = sum((contrib_cum[p] for p in participants), start=pd.Series(0.0, index=times)) total_usage = sum((usage_cum[p] for p in participants), start=pd.Series(0.0, index=times)) has_any_u = (work["flag"] == "U").any() n = len(participants) if participants else 1 share_cum: Dict[str, pd.Series] = {} if has_any_u: for p in participants: usage_p = usage_cum[p] with np.errstate(divide="ignore", invalid="ignore"): share_usage = total_contrib * (usage_p / total_usage.replace(0.0, np.nan)) share_equal = total_contrib / float(n) share = share_usage.where(total_usage > 0, share_equal) share_cum[p] = share.fillna(0.0) else: equal = total_contrib / float(n) for p in participants: share_cum[p] = equal ratio: Dict[str, pd.Series] = {} for p in participants: denom = contrib_cum[p].astype(float) r = share_cum[p].astype(float) / denom.where(denom > 0, np.nan) ratio[p] = r.fillna(0.0) return GroupTimeSeries( group=group, times=times, participants=participants, usage_units=usage_units, xlim_start=x0, xlim_end=x1, contrib_cum=contrib_cum, usage_cum=usage_cum, share_cum=share_cum, ratio=ratio, ) @dataclass class GroupChartBigFrame(BigFrame): gts: GroupTimeSeries kind: str def render(self, ax: Axes, mono_font: FontProperties) -> None: ax.axis("on") locator = mdates.AutoDateLocator(minticks=3, maxticks=7) formatter = mdates.ConciseDateFormatter(locator) ax.xaxis.set_major_locator(locator) ax.xaxis.set_major_formatter(formatter) ax.xaxis.get_offset_text().set_visible(False) ax.set_xlim(self.gts.xlim_start, self.gts.xlim_end) if self.kind == "usage_cum": series_map = self.gts.usage_cum unit = "/".join(self.gts.usage_units) if self.gts.usage_units else "" ax.set_ylabel(f"Usage cumulative [{unit}]".strip(), fontproperties=mono_font) elif self.kind == "contrib_cum": series_map = self.gts.contrib_cum ax.set_ylabel("Contribution cumulative [€]", fontproperties=mono_font) elif self.kind == "share_cum": series_map = self.gts.share_cum ax.set_ylabel("Share cumulative [€]", fontproperties=mono_font) elif self.kind == "ratio": series_map = self.gts.ratio ax.set_ylabel("Share / Contribution ratio (logarithmic)", fontproperties=mono_font) ax.set_yscale("log") else: raise ValueError(f"Unknown kind: {self.kind}") all_vals = [] min_ratio = 1e-3 for p in self.gts.participants: y = series_map[p].copy() if self.kind == "ratio": y = y.replace([np.inf, -np.inf], np.nan) y = y.fillna(min_ratio) y = y.clip(lower=min_ratio) else: y = y.replace([np.inf, -np.inf], np.nan).fillna(0.0) line, = ax.plot(self.gts.times, y.values, label=p, linewidth=1, drawstyle="steps-post") base = (min_ratio if self.kind == "ratio" else 0.0) # log-Plot kann nicht bis 0 füllen ax.fill_between( self.gts.times, y.values, base, step="post", alpha=0.18, color=line.get_color(), zorder=line.get_zorder() - 1, ) v = y.values v = v[np.isfinite(v)] if v.size: all_vals.append(v) if all_vals: vv = np.concatenate(all_vals) if self.kind in ("usage_cum", "contrib_cum", "share_cum"): vmax = float(np.nanmax(vv)) if vv.size else 0.0 if vmax <= 0: ax.set_ylim(0, 1) else: ax.set_ylim(0, vmax * 1.08) elif self.kind == "ratio": vpos = vv[vv > 0] if vpos.size: vmin = float(np.nanmin(vpos)) vmax = float(np.nanmax(vpos)) ax.set_ylim(vmin / 1.5, vmax * 1.5) ax.grid(True, alpha=0.2) leg = ax.legend(prop=mono_font, fontsize=7, loc="best", ncols=2) if leg: for t in leg.get_texts(): t.set_fontproperties(mono_font) for tick in ax.get_xticklabels() + ax.get_yticklabels(): tick.set_fontproperties(mono_font) @dataclass class TextFrame(Frame): text: str def render(self, ax: Axes, mono_font: FontProperties) -> None: ax.text(0, 1, self.text, va="top", ha="left", fontproperties=mono_font) @dataclass class PlotBigFrame(BigFrame): per_debitor: pd.DataFrame def render(self, ax: Axes, mono_font: FontProperties) -> None: ax.axis("on") plot_df = self.per_debitor.set_index("debitor")[["contributed", "share"]] plot_df.plot.bar(ax=ax) ax.tick_params(axis="x", rotation=0) leg = ax.legend(prop=mono_font) if leg: for t in leg.get_texts(): t.set_fontproperties(mono_font) for tick in ax.get_xticklabels() + ax.get_yticklabels(): tick.set_fontproperties(mono_font) ax.xaxis.label.set_fontproperties(mono_font) ax.yaxis.label.set_fontproperties(mono_font) def _fmt_groups_table(group_summary: pd.DataFrame) -> List[str]: gs = group_summary.sort_values("group").copy() g_list = gs["group"].astype(str).tolist() g_w = max([len(g) for g in g_list] + [5]) header = f"{'group':<{g_w}} | {'total [€]':>12} | {'mode':<12} | {'participants':>12}" sep = "-" * len(header) lines = [header, sep] for _, r in gs.iterrows(): g = str(r["group"]) total_c = float(r.get("total_contrib", 0.0) or 0.0) u_count = int(r.get("u_count", 0) or 0) mode = "usage" if u_count > 0 else "equal" participants = r.get("participants", []) or [] lines.append(f"{g:<{g_w}} | {total_c:>12.2f} | {mode:<12} | {len(participants):>12d}") return lines def _fmt_debitors_table(per_debitor: pd.DataFrame) -> List[str]: pdv = per_debitor.sort_values("debitor").copy() name_list = pdv["debitor"].astype(str).tolist() name_w = max([len(n) for n in name_list] + [7]) header = f"{'debitor':<{name_w}} | {'contrib [€]':>12} | {'share [€]':>12} | {'balance [€]':>12}" sep = "-" * len(header) lines = [header, sep] for _, r in pdv.iterrows(): deb = str(r["debitor"]) lines.append( f"{deb:<{name_w}} | {float(r['contributed']):>12.2f} | {float(r['share']):>12.2f} | {float(r['balance']):>12.2f}" ) return lines def _fmt_compensation_table(payments: List[Tuple[str, str, float]]) -> List[str]: if not payments: return ["(No compensation required)"] payer_w = max([len(p) for p, _, _ in payments] + [5]) recv_w = max([len(r) for _, r, _ in payments] + [8]) header = f"{'payer':<{payer_w}} | {'receiver':<{recv_w}} | {'amount [€]':>12}" sep = "-" * len(header) lines = [header, sep] for p, r, a in payments: lines.append(f"{p:<{payer_w}} | {r:<{recv_w}} | {float(a):>12.2f}") return lines def _fmt_group_detail_block(group: str, group_summary: pd.DataFrame, detail: pd.DataFrame) -> List[str]: gdet = detail[detail["group"] == group].sort_values("debitor") if (group_summary["group"] == group).any(): gs = group_summary[group_summary["group"] == group].iloc[0] total_c = float(gs.get("total_contrib", 0.0) or 0.0) total_u = float(gs.get("total_usage", 0.0) or 0.0) u_count = int(gs.get("u_count", 0) or 0) mode = "usage" if u_count > 0 else "equal" participants = gs.get("participants", []) or [] usage_units = gs.get("usage_units", []) or [] if isinstance(usage_units, float) and pd.isna(usage_units): usage_units = [] units_str = "/".join(usage_units) if isinstance(usage_units, list) else str(usage_units) else: total_c = 0.0 total_u = 0.0 mode = "equal" participants = [] units_str = "" # Kopf lines: List[str] = [] lines.append(f"Group: {group}") lines.append(f"Total contribution: {total_c:.2f} €") lines.append(f"Total usage: {total_u:.4f} {units_str}".rstrip()) lines.append(f"Mode: {mode}") lines.append(f"Participants: {len(participants)}") lines.append("") # Tabelle name_w = max([len(str(x)) for x in gdet["debitor"].astype(str).tolist()] + [7]) header = f"{'debitor':<{name_w}} | {'contrib [€]':>10} | {'usage':>12} | {'share [€]':>10} | {'balance [€]':>10}" sep = "-" * len(header) lines.append(header) lines.append(sep) for _, r in gdet.iterrows(): deb = str(r["debitor"]) lines.append( f"{deb:<{name_w}} | {r['contributed']:>10.2f} | {r['usage']:>12.4f} | {r['share']:>10.2f} | {r['balance']:>10.2f}" ) return lines class GeneralModule: name = "general" def process(self, df: pd.DataFrame, context: Dict[str, Any]) -> ModuleResult: want_pdf = bool(context.get("want_pdf", True)) mono_font = context.get("mono_font") or FontProperties(family="DejaVu Sans Mono", size=8) group_summary, per_debitor, detail = compute_group_distribution(df) balance = {r["debitor"]: float(r["balance"]) for _, r in per_debitor.iterrows()} payments = self._minimize_payments(balance) summary_lines = [] summary_lines.append("# GeneralModule") summary_lines.append("") summary_lines.append("## Groups") summary_lines.append("") summary_lines.extend(_fmt_groups_table(group_summary)) summary_lines.append("") summary_lines.append("## Group details") summary_lines.append("") for g in sorted(detail["group"].unique().tolist()): # optional: Markdown-Überschrift zusätzlich summary_lines.append(f"### {g}") summary_lines.append("") summary_lines.extend(_fmt_group_detail_block(g, group_summary, detail)) summary_lines.append("") summary_lines.append("") summary_lines.append("## Debitors (total)") summary_lines.append("") summary_lines.extend(_fmt_debitors_table(per_debitor)) summary_lines.append("") summary_lines.append("## Compensation (minimized)") summary_lines.append("") summary_lines.extend(_fmt_compensation_table(payments)) summary_text = "\n".join(summary_lines) frames: List[Frame] = [] bigframes: List[BigFrame] = [] pages: List[plt.Figure] = [] if want_pdf: frames.extend(self._make_frames(group_summary, per_debitor, payments)) bigframes.append( PlotBigFrame( title="General – Shares vs. Contributions (total)", per_debitor=per_debitor.copy(), ) ) for g in sorted(group_summary["group"].unique().tolist()): gts = _prepare_group_timeseries(df, g) if not gts: continue bigframes.append(GroupChartBigFrame( title=f"{g} – Cumulative usage per debitor", gts=gts, kind="usage_cum", )) bigframes.append(GroupChartBigFrame( title=f"{g} – Cumulative contributions per debitor", gts=gts, kind="contrib_cum", )) bigframes.append(GroupChartBigFrame( title=f"{g} – Share per debitor", gts=gts, kind="share_cum", )) bigframes.append(GroupChartBigFrame( title=f"{g} – Share / Contribution ratio (logarithmic)", gts=gts, kind="ratio", )) pages.extend(self._make_pages(group_summary, per_debitor, detail, mono_font)) return ModuleResult(summary_text=summary_text, frames=frames, bigframes=bigframes, pages=pages) def _minimize_payments(self, balance: Dict[str, float]): receivers = [] payers = [] for p, amt in balance.items(): a = round(float(amt), 2) if a > 0: receivers.append([p, a]) elif a < 0: payers.append([p, -a]) out = [] i = j = 0 while i < len(payers) and j < len(receivers): payer, avail = payers[i] recv, need = receivers[j] pay = min(avail, need) out.append((payer, recv, pay)) payers[i][1] -= pay receivers[j][1] -= pay if round(payers[i][1], 2) == 0: i += 1 if round(receivers[j][1], 2) == 0: j += 1 return out def _make_frames(self, group_summary: pd.DataFrame, per_debitor: pd.DataFrame, payments: List[Tuple[str,str,float]]) -> List[Frame]: lines = ["Groups:", ""] lines.extend(_fmt_groups_table(group_summary)) f1 = TextFrame(title="General: Groups", text="\n".join(lines)) lines = ["Debitors (total):", ""] lines.extend(_fmt_debitors_table(per_debitor)) f2 = TextFrame(title="General: Debitors", text="\n".join(lines)) lines = ["Compensation (minimized):", ""] lines.extend(_fmt_compensation_table(payments)) f3 = TextFrame(title="General: Compensation", text="\n".join(lines)) return [f1, f2, f3] def _make_pages(self, group_summary, per_debitor, detail, mono_font) -> List[plt.Figure]: pages: List[plt.Figure] = [] for g in sorted(detail["group"].unique().tolist()): gdet = detail[detail["group"] == g].sort_values("debitor") total_c = float(group_summary[group_summary["group"] == g]["total_contrib"].iloc[0]) if (group_summary["group"] == g).any() else 0.0 u_count = int(group_summary[group_summary["group"] == g]["u_count"].iloc[0]) if (group_summary["group"] == g).any() else 0 mode = "usage" if u_count > 0 else "equal" lines = _fmt_group_detail_block(g, group_summary, detail) fig, ax = plt.subplots(figsize=(8.27, 11.69)) ax.axis("off") ax.text(0, 1, "\n".join(lines), va="top", ha="left", fontproperties=mono_font) pages.append(fig) return pages