Polars UDFs in Pipelines
Use .transform() and .filter(callable) when closed-form expressions can't express your logic.
Pipeline .filter(expr) and .assign(col, expr) evaluate closed-form expressions - fast, but limited to the operators the engine's expression schema implements (compare, null, arithmetic, logical). When expressions aren't enough, .transform(fn) is an opt-in escape hatch for a user Python function (taking and returning a polars.DataFrame).
UDFs are opt-in. Expression-only pipelines stay on the zero-overhead fast path.
When to reach for .transform()
Use expressions when you can. Reach for .transform() when you need:
- Conditional logic -
pl.when().then().otherwise()chains, or Pythonif/elifbranches. - Library calls - regex (
str.extract), date math, domain helpers, custom scoring. - Multi-column derivations - expressions that involve more than one column in non-trivial ways.
- Stateless per-row logic that closed-form expressions can't cover today.
Prerequisites
Engine pods must be configured with a Python worker pool to run UDFs. If a pipeline contains a .transform() op and the engine has no workers configured, it will fail at startup with a clear error. Expression-only pipelines are unaffected.
Example
import polars as pl
from thyme import dataset, field, pipeline, inputs, Avg, Count
def compute_unit_price(df: pl.DataFrame) -> pl.DataFrame:
return df.with_columns(
(pl.col("total") / pl.col("quantity")).alias("unit_price")
)
@dataset(version=1, index=True)
class UserOrderStats:
user_id: Field[str] = field(key=True)
avg_unit_price_24h: Field[float] = field()
order_count_24h: Field[int] = field()
timestamp: Field[datetime] = field(timestamp=True)
@pipeline(version=1)
@inputs(Order)
def aggregate_stats(cls, orders):
return (orders
.transform(compute_unit_price, output_columns={"unit_price": float})
.groupby("user_id")
.aggregate(
avg_unit_price_24h=Avg(of="unit_price", window="24h"),
order_count_24h=Count(window="24h"),
))Contract
- Signature:
fn(df: pl.DataFrame) -> pl.DataFrame. - Named top-level function. Lambdas are rejected (the SDK captures source via
inspect.getsource). - No closures over local state. Depend only on module-level imports and function arguments.
.transform()is row-count preserving. To drop rows, use.filter(callable).- JSON-serializable output types. Strings, numbers, booleans,
None. Convert Polars datetimes back to ISO strings before returning. - Deterministic. Non-deterministic UDFs won't break correctness, but they make backfill results surprising.
Op ordering
Ops apply in declaration order:
orders
.filter(col("amount") > 0) # closed-form, fast path
.transform(compute_unit_price) # Python UDF
.assign(taxed=col("amount") * 1.1) # closed-form, fast path
.groupby("user_id")
.aggregate(...)Filter drops rows from the batch before Transform sees them. Transform mutates the batch (preserving row count). Assign then runs on the transformed batch.
Row-dropping UDFs - .filter(callable)
.filter() accepts either an expression or a callable. The callable shape is the natural pl.DataFrame -> pl.DataFrame:
import polars as pl
def drop_test_traffic(df: pl.DataFrame) -> pl.DataFrame:
return df.filter(~pl.col("user_id").str.starts_with("test_"))
(orders
.filter(drop_test_traffic)
.groupby("user_id")
.aggregate(order_count_1h=Count(window="1h")))Dropped records have their offsets committed so they're not replayed on restart.
Performance notes
- Each UDF call processes a whole batch at once, not one record at a time. Batching is handled by the engine.
- Throughput budget for UDF pipelines: aim for ≥40% of an expression-only baseline on the same hardware. Measured on local dev, typical UDF pipelines land at 82–88% of the expression-only baseline.
- Keep UDFs vectorised. Use
pl.col(...)expressions inside the function body, not Pythonforloops.
Failure handling
When a UDF fails repeatedly (user exception, timeout, worker crash), the engine doesn't halt. After exhausting retries, the failing batch is routed to a per-dataset dead-letter topic (<output_dataset>.udf_dead_letter) so the pipeline keeps moving. Dead-letter records are surfaced in the Events feed and on the Jobs page; if you need to inspect the raw payloads to debug a failing UDF, ask your Thyme administrator to pull them.
Related
- Pipelines - how pipelines compose datasets and operators.
- Decorators -
.filter(),.assign(),.transform()signatures.