Thyme
Getting Started

Your First Feature

A detailed walkthrough building your first feature end-to-end.

This walkthrough builds the same example as the Quick Start, but explains why each piece exists.


Step 1: Define a dataset

A dataset is a named, schema'd stream of events. Every dataset needs a key (the entity the event is about) and a timestamp (when the event occurred).

from datetime import datetime
from thyme.dataset import dataset, field

@dataset(index=True)
class Transaction:
    user_id: str      = field(key=True)   # entity key
    amount:  float
    ts:      datetime = field(timestamp=True)  # event time
  • index=True tells Thyme to maintain a fast lookup index on this dataset.
  • field(key=True) marks the entity identifier. There must be exactly one key per dataset.
  • field(timestamp=True) marks the event time. This drives windowed aggregations and point-in-time correctness.

Step 2: Attach a source

A source tells the engine where raw events come from. Thyme polls the source and feeds new rows into Kafka.

from thyme.connectors import IcebergSource, source

@source(
    IcebergSource(catalog="prod", database="events", table="transactions"),
    cursor="ts",      # incremental column
    every="1m",       # poll interval
    disorder="5m",    # max late-arrival tolerance
    cdc="append",     # insert-only stream
)
@dataset(index=True)
class Transaction:
    ...

The disorder parameter sets the watermark. Events arriving more than 5 minutes behind the current watermark are discarded. This keeps windowed aggregations correct without waiting forever.


Step 3: Define a pipeline

A pipeline reads from one or more input datasets and produces a new dataset via windowed aggregations. You define it as a method on the output dataset class.

from thyme.pipeline import pipeline, inputs, Avg, Count

@dataset(index=True)
class UserStats:
    user_id:       str      = field(key=True)
    ts:            datetime = field(timestamp=True)
    avg_amount_7d: float
    txn_count_30d: int

    @pipeline(version=1)
    @inputs(Transaction)
    def compute(cls, t: Transaction):
        return (
            t.groupby("user_id")
             .aggregate(
                 avg_amount_7d=Avg(of="amount", window="7d"),
                 txn_count_30d=Count(of="user_id", window="30d"),
             )
        )
  • @inputs(Transaction) declares the upstream dataset(s).
  • .groupby("user_id") groups events by the entity key.
  • .aggregate(...) applies named aggregations. Each one becomes a column in the output dataset.
  • Avg(of="amount", window="7d") computes a rolling 7-day average of the amount field.

The method body is lazy: it returns a PipelineNode DAG that the engine compiles and executes. No computation happens at import time.


Step 4: Define a featureset

A featureset is a named collection of features that your models consume. It uses extractors to compose raw aggregated values into model-ready features.

from thyme.featureset import featureset, feature, extractor
from thyme.featureset import extractor_inputs, extractor_outputs

@featureset
class UserFeatures:
    uid:           str   = feature(id=1)
    avg_spend_7d:  float = feature(id=2)
    txn_count_30d: int   = feature(id=3)

    @extractor
    @extractor_inputs("uid")
    @extractor_outputs("avg_spend_7d", "txn_count_30d")
    def from_stats(cls, ts, inputs):
        uid = inputs["uid"]
        row = UserStats.lookup(ts, user_id=uid)
        return row["avg_amount_7d"], row["txn_count_30d"]
  • feature(id=N) — integer IDs are required. They provide a stable identity across renames and schema changes.
  • @extractor_inputs — the features this extractor needs as input (resolved by the query planner).
  • @extractor_outputs — the features this extractor produces.
  • The extractor body runs in Python at query time, inside the query server.

Step 5: Commit and query

# Send all definitions to the control plane
thyme commit features.py

# Query features for a user
curl "http://localhost:8081/features?featureset=UserFeatures&uid=user_42"

The control plane creates Kafka topics and persists the execution plan. The engine picks up the new job within seconds and begins consuming from the source.


What happens next

Once committed, the engine:

  1. Polls IcebergSource every minute for new Transaction rows
  2. Publishes them to a Kafka topic
  3. Consumes the topic, applies the compute pipeline, and writes results to RocksDB
  4. The query server reads from RocksDB and runs extractors on demand

Feature values are updated continuously. The next query always returns the latest state.

On this page