Thyme
Sources & Integrations

PostgresSource

Polling connector for Postgres tables - incremental reads via a cursor field.

PostgresSource reads from a Postgres table incrementally, tracking a cursor field (typically a timestamp or auto-incrementing id) and fetching only new rows on each poll.

Use case

  • Application transaction logs (orders, payments, signups)
  • User profile tables read in cdc="upsert" mode for SCD-style joins
  • Any table with a monotonically advancing cursor field

For logical-replication CDC rather than cursor polling, run Debezium upstream and consume with KafkaSource + cdc="debezium".

Example

from thyme import Secret
from thyme.connectors import PostgresSource, source

@source(
    PostgresSource(table="orders", password=Secret(env="PG_PASSWORD")),
    cursor="timestamp", every="5s", max_lateness="1h",
)
@dataset(index=True)
class Order:
    user_id:   Field[str]      = field(key=True)
    order_id:  Field[str]      = field()
    amount:    Field[float]    = field()
    timestamp: Field[datetime] = field(timestamp=True)

Parameters

ParameterRequiredDefault / env varDescription
tableYes-Table name
hostNoTHYME_POSTGRES_HOST ("localhost")Postgres host
portNoTHYME_POSTGRES_PORT (5432)Postgres port
databaseNoTHYME_POSTGRES_DATABASEDatabase name
userNoTHYME_POSTGRES_USERUsername
passwordNo (Secret-capable)THYME_POSTGRES_PASSWORDPassword
schemaNoTHYME_POSTGRES_SCHEMA ("public")Schema name
sslmodeNoTHYME_POSTGRES_SSLMODE ("prefer")SSL mode

CDC modes

ModeBehaviour
cdc="append" (default)Each row is a new event. Use for insert-only tables (transaction logs, event streams).
cdc="upsert"Each row is the latest state for its key. Use for tables that update in place (user profiles, inventory snapshots).

cdc="debezium" is not supported on PostgresSource directly - for full INSERT/UPDATE/DELETE CDC, run a Debezium connector against your Postgres WAL and consume via Kafka.

Limits

  • The cursor field must be monotonically non-decreasing within a (cursor, primary key) ordering. Out-of-order writes within the cursor's resolution may be missed.
  • Each poll runs a SELECT … WHERE cursor > $1 ORDER BY cursor LIMIT N; ensure cursor is indexed.
  • Set every to balance freshness vs. database load. 5s is fine for hot tables; 1m is gentler for shared production databases.

On this page