Sources
Connect external data systems to Thyme datasets.
A source connects an external data system to a Thyme dataset. The engine polls the source on a schedule, converts new rows into events, and publishes them to Kafka for downstream pipelines.
Attaching a source
Use @source to attach a connector to a dataset class. The decorator goes outside @dataset:
from thyme.connectors import IcebergSource, source
@source(
IcebergSource(catalog="prod", database="events", table="transactions"),
cursor="ts",
every="1m",
max_lateness="5m",
cdc="append",
)
@dataset(index=True)
class Transaction:
user_id: str = field(key=True)
amount: float
ts: datetime = field(timestamp=True)@source parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
connector | connector object | required | The data source to read from |
cursor | str | "" | Field used for incremental reads (high-water mark) |
every | str | "" | Poll interval: "1m", "5m", "1h" |
max_lateness | str | "" | Maximum late-arrival tolerance: "5m", "1h", "1d" |
cdc | str | "append" | Change data capture mode (see below) |
cursor
The field the engine uses to track progress. On each poll, the engine fetches rows where cursor > last_seen_value. This enables efficient incremental reads without scanning the full table.
every
How often the engine polls the source. Format: integer + unit (m minutes, h hours, d days).
max_lateness
The maximum expected delay between event time and ingestion time. Events arriving later than (current_watermark - max_lateness) are discarded. This bound lets the engine advance the watermark and emit results without waiting forever.
Set this conservatively - too small and you lose late events; too large and your features are unnecessarily delayed.
CDC modes
The cdc parameter tells the engine how to interpret incoming rows.
append (default)
Each row is a new event. The table is insert-only. Use this for event logs, transaction records, click streams:
cdc="append"upsert
Each row is an upsert keyed on the dataset's key field. The engine interprets rows as the latest state for that key. Use this for tables where rows are updated in place (e.g., a user profile table):
cdc="upsert"debezium
Rows arrive as full Debezium CDC envelopes with before, after, and op fields. Use this when reading from a Debezium Kafka connector upstream of Thyme:
cdc="debezium"Debezium mode handles INSERT, UPDATE, and DELETE operations from the envelope metadata.
Connectors
Connectors split into two families: streaming sources consume continuously from a message bus, polling sources read incrementally from a table using a cursor field.
Streaming
| Connector | Use for |
|---|---|
KafkaSource | Topics on Apache Kafka |
KinesisSource | AWS Kinesis streams |
Streaming sources ignore every - they consume as fast as the engine can drain the topic.
Polling
| Connector | Use for |
|---|---|
IcebergSource | Apache Iceberg tables |
PostgresSource | Postgres tables (incremental via cursor) |
S3JsonSource | JSON / JSONL files on S3 |
BigQuerySource | BigQuery tables |
SnowflakeSource | Snowflake tables |
Polling sources read rows where cursor > last_seen, publish them onto the dataset's Kafka topic, and advance the cursor.
Example: Iceberg
from thyme.connectors import IcebergSource
IcebergSource(
catalog="prod",
database="events",
table="transactions",
)| Parameter | Type | Description |
|---|---|---|
catalog | str | Iceberg catalog name |
database | str | Database or namespace |
table | str | Table name |
See Sources & Integrations for per-connector parameter references.