Thyme
Concepts

Datasets

Schema, keys, timestamps, and indexing for Thyme datasets.

A dataset is a named, schema'd stream of events. It's the fundamental data container in Thyme — everything flows through datasets.

Anatomy of a dataset

from datetime import datetime
from thyme.dataset import dataset, field

@dataset(index=True, version=1)
class Transaction:
    user_id: str      = field(key=True)
    amount:  float
    ts:      datetime = field(timestamp=True)

Every dataset class is a Python dataclass under the hood. You define fields as class annotations with optional field() descriptors for key and timestamp metadata.


Key field

Every dataset must have exactly one key field, marked with field(key=True).

The key is the entity identifier — the thing the event is about. All windowed aggregations group by the key. When you query a feature, you provide a key value.

user_id: str = field(key=True)

Rules:

  • Exactly one key per dataset (required).
  • The key field cannot be Optional.
  • The key can be any non-optional type (str, int, etc.).

Timestamp field

Every dataset must have exactly one timestamp field, marked with field(timestamp=True).

The timestamp is the event time — when the event happened, not when it was processed. Thyme uses event time (not processing time) for all windowed aggregations. This gives you point-in-time correctness: a query at time T returns the feature value as it would have been known at T.

ts: datetime = field(timestamp=True)

The index parameter

@dataset(index=True) tells Thyme to maintain a fast lookup index on this dataset in RocksDB. Set this to True for any dataset that will be queried (either directly or via an extractor).

@dataset(index=True)   # queryable — keep a keyed index
class UserStats: ...

@dataset(index=False)  # write-only intermediate — no index needed
class RawClick: ...

Dataset versioning

@dataset(version=N) allows schema evolution. Increment the version when you add, remove, or change fields. Thyme uses the version to manage migrations.

@dataset(index=True, version=2)  # bumped from 1 after adding a field
class UserStats:
    user_id:       str      = field(key=True)
    ts:            datetime = field(timestamp=True)
    avg_amount_7d: float
    txn_count_30d: int
    p99_amount_7d: float    # new in version 2

Datasets as pipeline outputs

A dataset can be both a source of raw events (fed by a @source) and a sink for aggregated data (produced by a @pipeline). The compute pipeline method in the example below is defined on the output dataset class:

@dataset(index=True)
class UserStats:
    user_id: str      = field(key=True)
    ts:      datetime = field(timestamp=True)
    count:   int

    @pipeline(version=1)
    @inputs(Transaction)
    def compute(cls, t: Transaction): ...

See Pipelines for details.

On this page