Thyme
Guides

Polars UDFs in Pipelines

Use .transform() and .filter(callable) when closed-form expressions can't express your logic.

Pipeline .filter(expr) and .assign(col, expr) evaluate closed-form expressions - fast, but limited to the operators the engine's expression schema implements (compare, null, arithmetic, logical). When expressions aren't enough, .transform(fn) is an opt-in escape hatch for a user Python function (taking and returning a polars.DataFrame).

UDFs are opt-in. Expression-only pipelines stay on the zero-overhead fast path.

When to reach for .transform()

Use expressions when you can. Reach for .transform() when you need:

  • Conditional logic - pl.when().then().otherwise() chains, or Python if/elif branches.
  • Library calls - regex (str.extract), date math, domain helpers, custom scoring.
  • Multi-column derivations - expressions that involve more than one column in non-trivial ways.
  • Stateless per-row logic that closed-form expressions can't cover today.

Prerequisites

Engine pods must be configured with a Python worker pool to run UDFs. If a pipeline contains a .transform() op and the engine has no workers configured, it will fail at startup with a clear error. Expression-only pipelines are unaffected.

Example

import polars as pl
from thyme import dataset, field, pipeline, inputs, Avg, Count

def compute_unit_price(df: pl.DataFrame) -> pl.DataFrame:
    return df.with_columns(
        (pl.col("total") / pl.col("quantity")).alias("unit_price")
    )

@dataset(version=1, index=True)
class UserOrderStats:
    user_id: Field[str] = field(key=True)
    avg_unit_price_24h: Field[float] = field()
    order_count_24h: Field[int] = field()
    timestamp: Field[datetime] = field(timestamp=True)

    @pipeline(version=1)
    @inputs(Order)
    def aggregate_stats(cls, orders):
        return (orders
            .transform(compute_unit_price, output_columns={"unit_price": float})
            .groupby("user_id")
            .aggregate(
                avg_unit_price_24h=Avg(of="unit_price", window="24h"),
                order_count_24h=Count(window="24h"),
            ))

Contract

  • Signature: fn(df: pl.DataFrame) -> pl.DataFrame.
  • Named top-level function. Lambdas are rejected (the SDK captures source via inspect.getsource).
  • No closures over local state. Depend only on module-level imports and function arguments.
  • .transform() is row-count preserving. To drop rows, use .filter(callable).
  • JSON-serializable output types. Strings, numbers, booleans, None. Convert Polars datetimes back to ISO strings before returning.
  • Deterministic. Non-deterministic UDFs won't break correctness, but they make backfill results surprising.

Op ordering

Ops apply in declaration order:

orders
  .filter(col("amount") > 0)          # closed-form, fast path
  .transform(compute_unit_price)       # Python UDF
  .assign(taxed=col("amount") * 1.1)   # closed-form, fast path
  .groupby("user_id")
  .aggregate(...)

Filter drops rows from the batch before Transform sees them. Transform mutates the batch (preserving row count). Assign then runs on the transformed batch.

Row-dropping UDFs - .filter(callable)

.filter() accepts either an expression or a callable. The callable shape is the natural pl.DataFrame -> pl.DataFrame:

import polars as pl

def drop_test_traffic(df: pl.DataFrame) -> pl.DataFrame:
    return df.filter(~pl.col("user_id").str.starts_with("test_"))

(orders
    .filter(drop_test_traffic)
    .groupby("user_id")
    .aggregate(order_count_1h=Count(window="1h")))

Dropped records have their offsets committed so they're not replayed on restart.

Performance notes

  • Each UDF call processes a whole batch at once, not one record at a time. Batching is handled by the engine.
  • Throughput budget for UDF pipelines: aim for ≥40% of an expression-only baseline on the same hardware. Measured on local dev, typical UDF pipelines land at 82–88% of the expression-only baseline.
  • Keep UDFs vectorised. Use pl.col(...) expressions inside the function body, not Python for loops.

Failure handling

When a UDF fails repeatedly (user exception, timeout, worker crash), the engine doesn't halt. After exhausting retries, the failing batch is routed to a per-dataset dead-letter topic (<output_dataset>.udf_dead_letter) so the pipeline keeps moving. Dead-letter records are surfaced in the Events feed and on the Jobs page; if you need to inspect the raw payloads to debug a failing UDF, ask your Thyme administrator to pull them.

  • Pipelines - how pipelines compose datasets and operators.
  • Decorators - .filter(), .assign(), .transform() signatures.

On this page