Pipelines
Continuous windowed aggregations that are always on.
A pipeline is the thing that turns raw events into feature state. It is a windowed aggregation that runs continuously - not on a schedule, not in response to a deploy. Once you commit it, it is on, forever, until you change it.
A pipeline reads one or more input datasets, groups their events by entity, applies one or more time-windowed aggregations, and writes the result into a new dataset. That output dataset is what your featureset reads at query time.
Here is the smallest possible example:
from thyme.pipeline import pipeline, inputs, Avg, Count
@dataset(index=True)
class UserStats:
user_id: str = field(key=True)
ts: datetime = field(timestamp=True)
avg_amount_7d: float
txn_count_30d: int
@pipeline(version=1)
@inputs(Transaction)
def compute(cls, t: Transaction):
return (
t.groupby("user_id")
.aggregate(
avg_amount_7d=Avg(of="amount", window="7d"),
txn_count_30d=Count(of="user_id", window="30d"),
)
)Notice: the pipeline is defined on the output dataset. The output class declares what the pipeline produces; the method declares how.
@inputs
@inputs(*datasets) declares which datasets flow into this pipeline. Pass the dataset classes directly:
@inputs(Transaction) # single input
@inputs(Transaction, Click) # multiple inputs (temporal join)The decorator passes an instance of each input type as positional arguments to the pipeline method.
The builder
The pipeline body uses a fluent builder to declare the computation. It is lazy - it returns a description of the DAG, not data. No records are processed at import or commit time.
.groupby(*keys)
Groups events by one or more field names. Typically this is the entity key:
t.groupby("user_id")
t.groupby("user_id", "product_category") # composite group key.aggregate(**kwargs)
Applies named aggregations. Each keyword argument becomes a field in the output dataset:
.aggregate(
avg_amount_7d=Avg(of="amount", window="7d"),
txn_count_30d=Count(of="user_id", window="30d"),
).join(other, on=..., fields=[...])
Enriches events by temporally joining another input dataset. The join returns, for each event, the latest value of other as of that event's timestamp - so training and serving see the same enrichment.
Aggregation operators
| Operator | Description | Memory |
|---|---|---|
Sum(of, window) | Rolling sum of a numeric field | O(1) - invertible |
Count(of, window) | Rolling count of events | O(1) - invertible |
Avg(of, window) | Rolling mean of a numeric field | O(1) - invertible |
Min(of, window) | Rolling minimum of a numeric field | O(window) - sorted set |
Max(of, window) | Rolling maximum of a numeric field | O(window) - sorted set |
ApproxPercentile(of, window, pct) | Percentile rank via tiled t-digest sketches | O(tiles) |
Invertible operators store only the running total and count. When an old record falls out of the window, they subtract without recomputing. This is why Sum/Count/Avg over a 180-day window costs the same memory as over a one-hour window.
Non-invertible operators (Min/Max/ApproxPercentile) use sorted sets or sketches to track enough state to answer the query without scanning the window.
See the aggregations reference for full parameter details.
Time windows
Windows are specified as strings: "7d" (7 days), "24h" (24 hours), "30m" (30 minutes).
All windows use event time, not processing time. That means:
- An event timestamped yesterday is counted in yesterday's window, regardless of when it arrived.
- Late events (within the
max_latenesstolerance set on the source) are included correctly. - Historical backfills produce the same results as real-time processing.
- A point-in-time query at
ts=Treturns the windowed aggregation as it would have been computed atT.
This is the structural reason training features and serving features agree.
Versioning
@pipeline(version=N) tracks schema evolution for the pipeline's output. Increment when you change the set of aggregations or their semantics. The engine handles migration; your downstream consumers keep working.