Pipelines
Windowed aggregations that continuously transform datasets.
A pipeline is a windowed aggregation that continuously transforms one dataset into another. The engine executes pipelines in real time as events arrive.
Defining a pipeline
Pipelines are defined as methods on the output dataset class, decorated with @pipeline and @inputs:
from thyme.pipeline import pipeline, inputs, Avg, Count
@dataset(index=True)
class UserStats:
user_id: str = field(key=True)
ts: datetime = field(timestamp=True)
avg_amount_7d: float
txn_count_30d: int
@pipeline(version=1)
@inputs(Transaction)
def compute(cls, t: Transaction):
return (
t.groupby("user_id")
.aggregate(
avg_amount_7d=Avg(of="amount", window="7d"),
txn_count_30d=Count(of="user_id", window="30d"),
)
)@inputs
@inputs(*datasets) declares which datasets flow into this pipeline. Pass the dataset classes directly:
@inputs(Transaction) # single input
@inputs(Transaction, Click) # multiple inputs (join)The decorator passes an instance of each input type as positional arguments to the pipeline method.
The builder pattern
The pipeline body uses a fluent builder to declare the computation:
.groupby(*keys)
Groups events by one or more field names. Typically this is the entity key:
t.groupby("user_id")
t.groupby("user_id", "product_category") # composite group key.aggregate(**kwargs)
Applies named aggregations. Each keyword argument becomes a field in the output dataset. The right-hand side is an aggregation operator:
.aggregate(
avg_amount_7d=Avg(of="amount", window="7d"),
txn_count_30d=Count(of="user_id", window="30d"),
)The method body is lazy: it returns a PipelineNode description of the computation. No data is processed at import or commit time. The engine compiles the DAG and executes it as a streaming job.
Aggregation operators
| Operator | Description |
|---|---|
Avg(of, window) | Rolling mean of a numeric field |
Count(of, window) | Rolling count of events |
Sum(of, window) | Rolling sum of a numeric field |
Min(of, window) | Rolling minimum of a numeric field |
Max(of, window) | Rolling maximum of a numeric field |
See Aggregations reference for full parameter details.
Time windows
Windows are specified as strings: "7d" (7 days), "24h" (24 hours), "30m" (30 minutes).
All windows use event time, not processing time. This means:
- An event timestamped yesterday is counted in yesterday's window, regardless of when it arrived.
- Late events (within the
disordertolerance set on the source) are included correctly. - Historical backfills produce the same results as real-time processing.
Pipeline versioning
@pipeline(version=N) tracks schema evolution for the pipeline's output. Increment when you change the set of aggregations or their semantics.