Sources & Integrations
KafkaSource
Streaming connector for Apache Kafka topics - continuous consumption with SASL/SSL auth and Debezium CDC support.
KafkaSource consumes from an Apache Kafka topic continuously. As a streaming connector, it ignores cursor and every.
Use case
- High-volume event streams (clickstreams, IoT telemetry, application events)
- CDC pipelines fed by Debezium connectors (use
cdc="debezium") - Any system already producing to Kafka with sub-second freshness requirements
Example
from thyme import Secret
from thyme.connectors import KafkaSource, source
@source(
KafkaSource(topic="orders", sasl_password=Secret(env="KAFKA_PW")),
max_lateness="5m",
cdc="append",
)
@dataset(index=True)
class Order:
user_id: Field[str] = field(key=True)
amount: Field[float] = field()
timestamp: Field[datetime] = field(timestamp=True)Parameters
| Parameter | Required | Default / env var | Description |
|---|---|---|---|
topic | Yes | - | Topic name |
brokers | No | THYME_KAFKA_BROKERS | Comma-separated broker addresses |
security_protocol | No | THYME_KAFKA_SECURITY_PROTOCOL ("PLAINTEXT") | PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL |
sasl_mechanism | No | THYME_KAFKA_SASL_MECHANISM | SASL mechanism (PLAIN, SCRAM-SHA-256, etc.) |
sasl_username | No | THYME_KAFKA_SASL_USERNAME | SASL username |
sasl_password | No (Secret-capable) | THYME_KAFKA_SASL_PASSWORD | SASL password |
format | No | THYME_KAFKA_FORMAT ("json") | Message format: json, avro, protobuf |
group_id | No | THYME_KAFKA_GROUP_ID | Consumer group ID (auto-generated if empty) |
schema_registry_url | No | THYME_KAFKA_SCHEMA_REGISTRY_URL | Schema registry URL (required for Avro / Protobuf) |
Authentication
Three common configurations:
Local / dev (no auth):
KafkaSource(topic="orders")SASL_SSL with SCRAM (managed Kafka):
KafkaSource(
topic="orders",
security_protocol="SASL_SSL",
sasl_mechanism="SCRAM-SHA-256",
sasl_username="thyme-app",
sasl_password=Secret(env="KAFKA_PW"),
)Schema-registry-backed Avro:
KafkaSource(
topic="orders",
format="avro",
schema_registry_url="https://schema-registry.internal:8081",
)CDC modes
| Mode | Behaviour |
|---|---|
cdc="append" | Each message is a new event. |
cdc="upsert" | Each message replaces prior state for its dataset key. |
cdc="debezium" | Messages are Debezium envelopes (before, after, op). Honors INSERT/UPDATE/DELETE. |
Setting cursor or every on KafkaSource raises a validation error - streaming connectors consume continuously and don't poll.