Sources & Integrations
PostgresSource
Polling connector for Postgres tables - incremental reads via a cursor field.
PostgresSource reads from a Postgres table incrementally, tracking a cursor field (typically a timestamp or auto-incrementing id) and fetching only new rows on each poll.
Use case
- Application transaction logs (orders, payments, signups)
- User profile tables read in
cdc="upsert"mode for SCD-style joins - Any table with a monotonically advancing cursor field
For logical-replication CDC rather than cursor polling, run Debezium upstream and consume with KafkaSource + cdc="debezium".
Example
from thyme import Secret
from thyme.connectors import PostgresSource, source
@source(
PostgresSource(table="orders", password=Secret(env="PG_PASSWORD")),
cursor="timestamp", every="5s", max_lateness="1h",
)
@dataset(index=True)
class Order:
user_id: Field[str] = field(key=True)
order_id: Field[str] = field()
amount: Field[float] = field()
timestamp: Field[datetime] = field(timestamp=True)Parameters
| Parameter | Required | Default / env var | Description |
|---|---|---|---|
table | Yes | - | Table name |
host | No | THYME_POSTGRES_HOST ("localhost") | Postgres host |
port | No | THYME_POSTGRES_PORT (5432) | Postgres port |
database | No | THYME_POSTGRES_DATABASE | Database name |
user | No | THYME_POSTGRES_USER | Username |
password | No (Secret-capable) | THYME_POSTGRES_PASSWORD | Password |
schema | No | THYME_POSTGRES_SCHEMA ("public") | Schema name |
sslmode | No | THYME_POSTGRES_SSLMODE ("prefer") | SSL mode |
CDC modes
| Mode | Behaviour |
|---|---|
cdc="append" (default) | Each row is a new event. Use for insert-only tables (transaction logs, event streams). |
cdc="upsert" | Each row is the latest state for its key. Use for tables that update in place (user profiles, inventory snapshots). |
cdc="debezium" is not supported on PostgresSource directly - for full INSERT/UPDATE/DELETE CDC, run a Debezium connector against your Postgres WAL and consume via Kafka.
Limits
- The cursor field must be monotonically non-decreasing within a
(cursor, primary key)ordering. Out-of-order writes within the cursor's resolution may be missed. - Each poll runs a
SELECT … WHERE cursor > $1 ORDER BY cursor LIMIT N; ensurecursoris indexed. - Set
everyto balance freshness vs. database load.5sis fine for hot tables;1mis gentler for shared production databases.