Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions python/benchmarks/bench_eval_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,51 @@ class GroupedAggPandasUDFPeakmemBench(_GroupedAggPandasBenchMixin, _PeakmemBench
pass


# -- SQL_GROUPED_AGG_PANDAS_ITER_UDF -------------------------------------------
# UDF receives an iterator of ``pd.Series`` columns (or tuples of them) per
# group, returns scalar.


class _GroupedAggPandasIterBenchMixin(_GroupedAggPandasBenchMixin):
"""Provides _write_scenario for SQL_GROUPED_AGG_PANDAS_ITER_UDF.

Inherits ``_build_scenario`` and ``_write_scenario`` from the Pandas
sibling; only the eval type and the UDFs differ. The UDF consumes the
per-group batches lazily through an iterator instead of receiving a single
concatenated column.
"""

def _grouped_agg_pandas_iter_sum(series_iter):
"""Sum across batches via iterator."""
total = 0
for col in series_iter:
total += col.sum() or 0
return total

def _grouped_agg_pandas_iter_mean_multi(series_iter):
"""Mean across batches of tuples via iterator."""
total = 0.0
for col0, col1 in series_iter:
total += (col0.mean() or 0) + (col1.mean() or 0)
return total

_eval_type = PythonEvalType.SQL_GROUPED_AGG_PANDAS_ITER_UDF
_udfs = {
"sum_udf": _grouped_agg_pandas_iter_sum,
"mean_multi_udf": _grouped_agg_pandas_iter_mean_multi,
}
params = [list(_GroupedAggArrowBenchMixin._scenario_configs), list(_udfs)]
param_names = ["scenario", "udf"]


class GroupedAggPandasIterUDFTimeBench(_GroupedAggPandasIterBenchMixin, _TimeBenchBase):
pass


class GroupedAggPandasIterUDFPeakmemBench(_GroupedAggPandasIterBenchMixin, _PeakmemBenchBase):
pass


# -- SQL_GROUPED_MAP_ARROW_UDF ------------------------------------------------
# UDF receives ``pa.Table``, returns ``pa.Table``.

Expand Down