Sources & Integrations
KinesisSource
Streaming connector for AWS Kinesis Data Streams - continuous shard consumption with cross-account IAM support.
KinesisSource consumes from an AWS Kinesis Data Stream continuously. Each shard maps to a Thyme worker, and Kinesis throughput scales with shard count.
Use case
- AWS-native streaming workloads where Kinesis is already in use
- Clickstreams and telemetry produced by AWS services (CloudFront logs, IoT Core, etc.)
- Cross-account ingestion where the source stream lives in a different AWS account
Example
import os
from thyme import Secret
from thyme.connectors import KinesisSource, source
@source(
KinesisSource(
stream_arn=os.environ["MY_KINESIS_STREAM_ARN"],
init_position="trim_horizon",
role_arn=Secret(env="MY_KINESIS_ROLE_ARN"),
),
max_lateness="5m",
)
@dataset(index=True)
class ClickEvent:
user_id: Field[str] = field(key=True)
page: Field[str] = field()
timestamp: Field[datetime] = field(timestamp=True)Parameters
| Parameter | Required | Default / env var | Description |
|---|---|---|---|
stream_arn | Yes | - | Kinesis stream ARN |
init_position | No | "latest" (per-stream, not env-defaulted) | "latest", "trim_horizon", or ISO-8601 timestamp |
format | No | "json" (per-stream, not env-defaulted) | Message format: json |
role_arn | No (Secret-capable) | THYME_KINESIS_ROLE_ARN | IAM role ARN for cross-account access |
region | No | THYME_KINESIS_REGION ("us-east-1") | AWS region |
endpoint_url | No | THYME_KINESIS_ENDPOINT_URL | LocalStack / custom endpoint override |
Authentication
By default, the connector uses the engine pod's IAM identity (irsa, instance role, etc.). For cross-account ingestion, supply a role_arn to assume:
KinesisSource(
stream_arn="arn:aws:kinesis:us-east-1:111122223333:stream/click-events",
role_arn=Secret(env="CROSS_ACCOUNT_ROLE"),
)Throughput
Each Kinesis shard supports approximately 1 MB/s or 1,000 records/s for read. If your stream is shard-limited, increase the shard count on the source side; Thyme will scale workers to match.
Limits
Setting cursor or every on KinesisSource raises a validation error - streaming connectors consume continuously and don't poll.
- Backfilling a Kinesis-sourced dataset on commit is not currently supported. Rely on the live ingestion path for Kinesis sources.