Sources & Integrations
BigQuerySource
Polling connector for BigQuery tables - incremental reads via a cursor field, GCP service-account auth.
BigQuerySource reads from a BigQuery table incrementally, tracking a cursor field and fetching new rows on each poll.
Use case
- Event tables already landed in BigQuery (e.g. via Cloud Pub/Sub → BQ subscription)
- Slow-moving dimensional data in BigQuery used for SCD-style temporal joins
- Cross-cloud setups where the system of record is on GCP
Example
from thyme import Secret
from thyme.connectors import BigQuerySource, source
@source(
BigQuerySource(
dataset_id="analytics",
table="orders",
credentials_json=Secret(env="GCP_SA_JSON"),
),
cursor="timestamp", every="5m", max_lateness="1h",
)
@dataset(index=True)
class Order:
user_id: Field[str] = field(key=True)
amount: Field[float] = field()
timestamp: Field[datetime] = field(timestamp=True)Parameters
| Parameter | Required | Default / env var | Description |
|---|---|---|---|
dataset_id | Yes | - | BigQuery dataset ID |
table | Yes | - | Table name |
project_id | No | THYME_BIGQUERY_PROJECT_ID | GCP project ID |
credentials_json | No (Secret-capable) | THYME_BIGQUERY_CREDENTIALS_JSON | Service-account JSON (uses ADC if empty) |
Authentication
Two options:
Application Default Credentials (ADC) - leave credentials_json empty and rely on the engine pod's GCP identity (Workload Identity, GCE metadata, etc.).
Explicit service-account JSON - pass via Secret:
BigQuerySource(
dataset_id="analytics",
table="orders",
credentials_json=Secret(env="GCP_SA_JSON"),
)The service account needs bigquery.tables.getData and bigquery.jobs.create on the dataset.
Limits
- The cursor field must be monotonically non-decreasing. Use a
TIMESTAMPcolumn populated server-side (CURRENT_TIMESTAMP()at insert) when application clocks can't be trusted. - Each poll runs a parameterised
SELECT … WHERE cursor > @last. BigQuery slot consumption depends on table partitioning - partition the source table on the cursor column for efficient incremental reads.