Thyme
Sources & Integrations

KinesisSource

Streaming connector for AWS Kinesis Data Streams - continuous shard consumption with cross-account IAM support.

KinesisSource consumes from an AWS Kinesis Data Stream continuously. Each shard maps to a Thyme worker, and Kinesis throughput scales with shard count.

Use case

  • AWS-native streaming workloads where Kinesis is already in use
  • Clickstreams and telemetry produced by AWS services (CloudFront logs, IoT Core, etc.)
  • Cross-account ingestion where the source stream lives in a different AWS account

Example

import os
from thyme import Secret
from thyme.connectors import KinesisSource, source

@source(
    KinesisSource(
        stream_arn=os.environ["MY_KINESIS_STREAM_ARN"],
        init_position="trim_horizon",
        role_arn=Secret(env="MY_KINESIS_ROLE_ARN"),
    ),
    max_lateness="5m",
)
@dataset(index=True)
class ClickEvent:
    user_id:   Field[str]      = field(key=True)
    page:      Field[str]      = field()
    timestamp: Field[datetime] = field(timestamp=True)

Parameters

ParameterRequiredDefault / env varDescription
stream_arnYes-Kinesis stream ARN
init_positionNo"latest" (per-stream, not env-defaulted)"latest", "trim_horizon", or ISO-8601 timestamp
formatNo"json" (per-stream, not env-defaulted)Message format: json
role_arnNo (Secret-capable)THYME_KINESIS_ROLE_ARNIAM role ARN for cross-account access
regionNoTHYME_KINESIS_REGION ("us-east-1")AWS region
endpoint_urlNoTHYME_KINESIS_ENDPOINT_URLLocalStack / custom endpoint override

Authentication

By default, the connector uses the engine pod's IAM identity (irsa, instance role, etc.). For cross-account ingestion, supply a role_arn to assume:

KinesisSource(
    stream_arn="arn:aws:kinesis:us-east-1:111122223333:stream/click-events",
    role_arn=Secret(env="CROSS_ACCOUNT_ROLE"),
)

Throughput

Each Kinesis shard supports approximately 1 MB/s or 1,000 records/s for read. If your stream is shard-limited, increase the shard count on the source side; Thyme will scale workers to match.

Limits

Setting cursor or every on KinesisSource raises a validation error - streaming connectors consume continuously and don't poll.

  • Backfilling a Kinesis-sourced dataset on commit is not currently supported. Rely on the live ingestion path for Kinesis sources.

On this page