Thyme
Sources & Integrations

BigQuerySource

Polling connector for BigQuery tables - incremental reads via a cursor field, GCP service-account auth.

BigQuerySource reads from a BigQuery table incrementally, tracking a cursor field and fetching new rows on each poll.

Use case

  • Event tables already landed in BigQuery (e.g. via Cloud Pub/Sub → BQ subscription)
  • Slow-moving dimensional data in BigQuery used for SCD-style temporal joins
  • Cross-cloud setups where the system of record is on GCP

Example

from thyme import Secret
from thyme.connectors import BigQuerySource, source

@source(
    BigQuerySource(
        dataset_id="analytics",
        table="orders",
        credentials_json=Secret(env="GCP_SA_JSON"),
    ),
    cursor="timestamp", every="5m", max_lateness="1h",
)
@dataset(index=True)
class Order:
    user_id:   Field[str]      = field(key=True)
    amount:    Field[float]    = field()
    timestamp: Field[datetime] = field(timestamp=True)

Parameters

ParameterRequiredDefault / env varDescription
dataset_idYes-BigQuery dataset ID
tableYes-Table name
project_idNoTHYME_BIGQUERY_PROJECT_IDGCP project ID
credentials_jsonNo (Secret-capable)THYME_BIGQUERY_CREDENTIALS_JSONService-account JSON (uses ADC if empty)

Authentication

Two options:

Application Default Credentials (ADC) - leave credentials_json empty and rely on the engine pod's GCP identity (Workload Identity, GCE metadata, etc.).

Explicit service-account JSON - pass via Secret:

BigQuerySource(
    dataset_id="analytics",
    table="orders",
    credentials_json=Secret(env="GCP_SA_JSON"),
)

The service account needs bigquery.tables.getData and bigquery.jobs.create on the dataset.

Limits

  • The cursor field must be monotonically non-decreasing. Use a TIMESTAMP column populated server-side (CURRENT_TIMESTAMP() at insert) when application clocks can't be trusted.
  • Each poll runs a parameterised SELECT … WHERE cursor > @last. BigQuery slot consumption depends on table partitioning - partition the source table on the cursor column for efficient incremental reads.

On this page