Thyme
Reference

Aggregations Reference

Complete reference for windowed aggregation operators.

Aggregation operators are used inside .aggregate() calls within a @pipeline method. Each operator computes a rolling windowed statistic over the events in an input dataset.

Usage

.aggregate(
    output_field_name=OperatorClass(of="input_field", window="7d"),
)

The keyword argument name becomes a field in the output dataset. Multiple aggregations can be combined in a single .aggregate() call.


Common parameters

All aggregation operators share these parameters:

ParameterTypeDescription
ofstrThe input dataset field to aggregate over
windowstrTime window size: integer + unit (d, h, m)

Window format examples: "7d" (7 days), "24h" (24 hours), "30m" (30 minutes).

All windows use event time, not processing time.


Operators

Avg

Rolling mean of a numeric field.

from thyme.pipeline import Avg

avg_spend_7d=Avg(of="amount", window="7d")
  • of: numeric field (float or int)
  • Returns: float

Count

Rolling count of events within the window.

from thyme.pipeline import Count

txn_count_30d=Count(of="user_id", window="30d")
  • of: any field (used to identify which events to count)
  • Returns: int

Sum

Rolling sum of a numeric field.

from thyme.pipeline import Sum

total_spend_7d=Sum(of="amount", window="7d")
  • of: numeric field (float or int)
  • Returns: same type as the input field

Min

Rolling minimum of a numeric field.

from thyme.pipeline import Min

min_amount_30d=Min(of="amount", window="30d")
  • of: numeric field (float or int)
  • Returns: same type as the input field

Max

Rolling maximum of a numeric field.

from thyme.pipeline import Max

max_amount_7d=Max(of="amount", window="7d")
  • of: numeric field (float or int)
  • Returns: same type as the input field

Complete example

@dataset(index=True)
class UserStats:
    user_id:        str      = field(key=True)
    ts:             datetime = field(timestamp=True)
    avg_amount_7d:  float
    total_spend_30d: float
    txn_count_30d:  int
    max_amount_7d:  float
    min_amount_7d:  float

    @pipeline(version=1)
    @inputs(Transaction)
    def compute(cls, t: Transaction):
        return (
            t.groupby("user_id")
             .aggregate(
                 avg_amount_7d=Avg(of="amount", window="7d"),
                 total_spend_30d=Sum(of="amount", window="30d"),
                 txn_count_30d=Count(of="user_id", window="30d"),
                 max_amount_7d=Max(of="amount", window="7d"),
                 min_amount_7d=Min(of="amount", window="7d"),
             )
        )

On this page