Thyme
Concepts

Pipelines

Windowed aggregations that continuously transform datasets.

A pipeline is a windowed aggregation that continuously transforms one dataset into another. The engine executes pipelines in real time as events arrive.

Defining a pipeline

Pipelines are defined as methods on the output dataset class, decorated with @pipeline and @inputs:

from thyme.pipeline import pipeline, inputs, Avg, Count

@dataset(index=True)
class UserStats:
    user_id:       str      = field(key=True)
    ts:            datetime = field(timestamp=True)
    avg_amount_7d: float
    txn_count_30d: int

    @pipeline(version=1)
    @inputs(Transaction)
    def compute(cls, t: Transaction):
        return (
            t.groupby("user_id")
             .aggregate(
                 avg_amount_7d=Avg(of="amount", window="7d"),
                 txn_count_30d=Count(of="user_id", window="30d"),
             )
        )

@inputs

@inputs(*datasets) declares which datasets flow into this pipeline. Pass the dataset classes directly:

@inputs(Transaction)           # single input
@inputs(Transaction, Click)    # multiple inputs (join)

The decorator passes an instance of each input type as positional arguments to the pipeline method.


The builder pattern

The pipeline body uses a fluent builder to declare the computation:

.groupby(*keys)

Groups events by one or more field names. Typically this is the entity key:

t.groupby("user_id")
t.groupby("user_id", "product_category")  # composite group key

.aggregate(**kwargs)

Applies named aggregations. Each keyword argument becomes a field in the output dataset. The right-hand side is an aggregation operator:

.aggregate(
    avg_amount_7d=Avg(of="amount", window="7d"),
    txn_count_30d=Count(of="user_id", window="30d"),
)

The method body is lazy: it returns a PipelineNode description of the computation. No data is processed at import or commit time. The engine compiles the DAG and executes it as a streaming job.


Aggregation operators

OperatorDescription
Avg(of, window)Rolling mean of a numeric field
Count(of, window)Rolling count of events
Sum(of, window)Rolling sum of a numeric field
Min(of, window)Rolling minimum of a numeric field
Max(of, window)Rolling maximum of a numeric field

See Aggregations reference for full parameter details.


Time windows

Windows are specified as strings: "7d" (7 days), "24h" (24 hours), "30m" (30 minutes).

All windows use event time, not processing time. This means:

  • An event timestamped yesterday is counted in yesterday's window, regardless of when it arrived.
  • Late events (within the disorder tolerance set on the source) are included correctly.
  • Historical backfills produce the same results as real-time processing.

Pipeline versioning

@pipeline(version=N) tracks schema evolution for the pipeline's output. Increment when you change the set of aggregations or their semantics.

On this page