Thyme
Sources & Integrations

KafkaSource

Streaming connector for Apache Kafka topics - continuous consumption with SASL/SSL auth and Debezium CDC support.

KafkaSource consumes from an Apache Kafka topic continuously. As a streaming connector, it ignores cursor and every.

Use case

  • High-volume event streams (clickstreams, IoT telemetry, application events)
  • CDC pipelines fed by Debezium connectors (use cdc="debezium")
  • Any system already producing to Kafka with sub-second freshness requirements

Example

from thyme import Secret
from thyme.connectors import KafkaSource, source

@source(
    KafkaSource(topic="orders", sasl_password=Secret(env="KAFKA_PW")),
    max_lateness="5m",
    cdc="append",
)
@dataset(index=True)
class Order:
    user_id:   Field[str]      = field(key=True)
    amount:    Field[float]    = field()
    timestamp: Field[datetime] = field(timestamp=True)

Parameters

ParameterRequiredDefault / env varDescription
topicYes-Topic name
brokersNoTHYME_KAFKA_BROKERSComma-separated broker addresses
security_protocolNoTHYME_KAFKA_SECURITY_PROTOCOL ("PLAINTEXT")PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
sasl_mechanismNoTHYME_KAFKA_SASL_MECHANISMSASL mechanism (PLAIN, SCRAM-SHA-256, etc.)
sasl_usernameNoTHYME_KAFKA_SASL_USERNAMESASL username
sasl_passwordNo (Secret-capable)THYME_KAFKA_SASL_PASSWORDSASL password
formatNoTHYME_KAFKA_FORMAT ("json")Message format: json, avro, protobuf
group_idNoTHYME_KAFKA_GROUP_IDConsumer group ID (auto-generated if empty)
schema_registry_urlNoTHYME_KAFKA_SCHEMA_REGISTRY_URLSchema registry URL (required for Avro / Protobuf)

Authentication

Three common configurations:

Local / dev (no auth):

KafkaSource(topic="orders")

SASL_SSL with SCRAM (managed Kafka):

KafkaSource(
    topic="orders",
    security_protocol="SASL_SSL",
    sasl_mechanism="SCRAM-SHA-256",
    sasl_username="thyme-app",
    sasl_password=Secret(env="KAFKA_PW"),
)

Schema-registry-backed Avro:

KafkaSource(
    topic="orders",
    format="avro",
    schema_registry_url="https://schema-registry.internal:8081",
)

CDC modes

ModeBehaviour
cdc="append"Each message is a new event.
cdc="upsert"Each message replaces prior state for its dataset key.
cdc="debezium"Messages are Debezium envelopes (before, after, op). Honors INSERT/UPDATE/DELETE.

Setting cursor or every on KafkaSource raises a validation error - streaming connectors consume continuously and don't poll.

On this page