Reference
Aggregations Reference
Complete reference for windowed aggregation operators.
Aggregation operators are used inside .aggregate() calls within a @pipeline method. Each operator computes a rolling windowed statistic over the events in an input dataset.
Usage
.aggregate(
output_field_name=OperatorClass(of="input_field", window="7d"),
)The keyword argument name becomes a field in the output dataset. Multiple aggregations can be combined in a single .aggregate() call.
Common parameters
All aggregation operators share these parameters:
| Parameter | Type | Description |
|---|---|---|
of | str | The input dataset field to aggregate over |
window | str | Time window size: integer + unit (d, h, m) |
Window format examples: "7d" (7 days), "24h" (24 hours), "30m" (30 minutes).
All windows use event time, not processing time.
Operators
Avg
Rolling mean of a numeric field.
from thyme.pipeline import Avg
avg_spend_7d=Avg(of="amount", window="7d")of: numeric field (floatorint)- Returns:
float
Count
Rolling count of events within the window.
from thyme.pipeline import Count
txn_count_30d=Count(of="user_id", window="30d")of: any field (used to identify which events to count)- Returns:
int
Sum
Rolling sum of a numeric field.
from thyme.pipeline import Sum
total_spend_7d=Sum(of="amount", window="7d")of: numeric field (floatorint)- Returns: same type as the input field
Min
Rolling minimum of a numeric field.
from thyme.pipeline import Min
min_amount_30d=Min(of="amount", window="30d")of: numeric field (floatorint)- Returns: same type as the input field
Max
Rolling maximum of a numeric field.
from thyme.pipeline import Max
max_amount_7d=Max(of="amount", window="7d")of: numeric field (floatorint)- Returns: same type as the input field
Complete example
@dataset(index=True)
class UserStats:
user_id: str = field(key=True)
ts: datetime = field(timestamp=True)
avg_amount_7d: float
total_spend_30d: float
txn_count_30d: int
max_amount_7d: float
min_amount_7d: float
@pipeline(version=1)
@inputs(Transaction)
def compute(cls, t: Transaction):
return (
t.groupby("user_id")
.aggregate(
avg_amount_7d=Avg(of="amount", window="7d"),
total_spend_30d=Sum(of="amount", window="30d"),
txn_count_30d=Count(of="user_id", window="30d"),
max_amount_7d=Max(of="amount", window="7d"),
min_amount_7d=Min(of="amount", window="7d"),
)
)