Thyme
Case Studies

Real-Time Purchase Intent for a Travel Marketplace

Real-time purchase intent for a 500M-visitor travel marketplace using Kinesis clickstream, temporal joins, and composite Python extractors.

The problem

A travel marketplace with 500M monthly visitors lists thousands of experiences - adventure tours, food walks, cultural immersions, wellness retreats. Users browse, click, linger, bounce. The business question is continuous: right now, which users are about to book?

That signal drives search ranking, personalized CTAs, and inventory allocation. It has to separate three kinds of users in milliseconds:

  • Active browsers - many clicks in the last hour, deep dwell time, pace accelerating
  • Casual scrollers - a handful of short glances before they leave
  • Dormant returners - quiet for a day, used to be active last week

Batch pipelines that refresh hourly can't answer this. By the time the aggregation lands, the user is gone.

Why this feature is hard

The clickstream is big and noisy: 5,000 events/sec sustained, peaking at 10,000 EPS. Each event is ~250 bytes of (user_id, experience_id, category, dwell_time_sec, timestamp).

Naive approaches all miss:

  • Batch in Snowflake/Spark - too slow; creates training-serving skew between daily training data and streaming serving features
  • Redis counters - require hand-rolled windowed aggregation; no temporal joins; awkward to scale with user cardinality
  • Analytics warehouses (Redshift, BigQuery) - not built for streaming ingestion paired with sub-5 ms point lookups

What you actually need is windowed aggregations across 1h, 24h, 7d, temporal-joined against user profiles, producing a composite intent signal - at ranking-request latency, with zero offline/online drift.

Why Thyme's architecture fits

Thyme's kappa architecture pulls this into a single pipeline definition:

  • Temporal joins - every click is enriched with the user's profile as of the click's timestamp, not the latest profile. Guarantees training data matches online serving.
  • Invertible operators - Count and Sum evict expired window contents in O(1). 7-day windows aren't meaningfully more expensive than 1-hour windows.
  • Derived extractors at query time - composite signals like is_high_intent are Python, executed at read time over the pre-aggregated values. No extra storage, no pre-materialization.
  • One definition, both paths - the same features.py serves online ranking and offline training sets.

The Thyme solution

The complete feature definition (trimmed from demos/experience_discovery/features.py):

from datetime import datetime
from thyme import (
    Avg, Config, Count, Sum, dataset, extractor, extractor_inputs,
    extractor_outputs, feature, featureset, field, inputs, pipeline, source,
)
from thyme.connectors import KinesisSource
from thyme.dataset import Field

config = Config.load()

click_source = KinesisSource(
    stream_arn=config.kinesis.stream_arn,
    region=config.kinesis.region,
    init_position="trim_horizon",
    format="json",
)
profile_source = config.postgres_source(table="user_profiles")

@source(click_source, max_lateness="1h")
@dataset(version=1)
class ClickEvent:
    user_id:        Field[str]      = field(key=True)
    event_type:     Field[str]      = field()
    experience_id:  Field[str]      = field()
    category:       Field[str]      = field()
    location_id:    Field[str]      = field()
    dwell_time_sec: Field[float]    = field()
    timestamp:      Field[datetime] = field(timestamp=True)

@source(profile_source, cursor="timestamp", every="60s", max_lateness="1h")
@dataset(version=1, index=True)
class UserProfile:
    user_id:          Field[str]      = field(key=True)
    segment:          Field[str]      = field()
    home_country:     Field[str]      = field()
    account_age_days: Field[int]      = field()
    timestamp:        Field[datetime] = field(timestamp=True)

@dataset(version=2, index=True)
class UserEngagementStats:
    user_id:           Field[str]      = field(key=True)
    event_count_1h:    Field[int]      = field()
    event_count_24h:   Field[int]      = field()
    event_count_7d:    Field[int]      = field()
    avg_dwell_sec_1h:  Field[float]    = field()
    avg_dwell_sec_24h: Field[float]    = field()
    avg_dwell_sec_7d:  Field[float]    = field()
    sum_dwell_sec_1h:  Field[float]    = field()
    sum_dwell_sec_24h: Field[float]    = field()
    timestamp:         Field[datetime] = field(timestamp=True)

    @pipeline(version=1)
    @inputs(ClickEvent, UserProfile)
    def compute_engagement(cls, clicks):
        return (
            clicks
            .join(UserProfile, on="user_id", fields=["segment", "account_age_days"])
            .groupby("user_id")
            .aggregate(
                event_count_1h=Count(window="1h"),
                event_count_24h=Count(window="24h"),
                event_count_7d=Count(window="7d"),
                avg_dwell_sec_1h=Avg(of="dwell_time_sec", window="1h"),
                avg_dwell_sec_24h=Avg(of="dwell_time_sec", window="24h"),
                avg_dwell_sec_7d=Avg(of="dwell_time_sec", window="7d"),
                sum_dwell_sec_1h=Sum(of="dwell_time_sec", window="1h"),
                sum_dwell_sec_24h=Sum(of="dwell_time_sec", window="24h"),
            )
        )

@featureset
class DiscoverySignals:
    user_id: str = feature()

    # Raw aggregations - auto-generated lookup extractors share state reads
    # across all eight features.
    event_count_1h:    int   = feature(ref=UserEngagementStats.event_count_1h)
    event_count_24h:   int   = feature(ref=UserEngagementStats.event_count_24h)
    event_count_7d:    int   = feature(ref=UserEngagementStats.event_count_7d)
    avg_dwell_sec_1h:  float = feature(ref=UserEngagementStats.avg_dwell_sec_1h)
    avg_dwell_sec_24h: float = feature(ref=UserEngagementStats.avg_dwell_sec_24h)
    avg_dwell_sec_7d:  float = feature(ref=UserEngagementStats.avg_dwell_sec_7d)
    sum_dwell_sec_1h:  float = feature(ref=UserEngagementStats.sum_dwell_sec_1h)
    sum_dwell_sec_24h: float = feature(ref=UserEngagementStats.sum_dwell_sec_24h)

    # Derived signals
    engagement_velocity: float = feature()
    dwell_depth_score:   float = feature()
    session_intensity:   float = feature()
    activity_trend_7d:   float = feature()
    is_high_intent:      bool  = feature()
    browsing_momentum:   float = feature()

    @extractor
    @extractor_inputs(
        "event_count_1h", "event_count_24h", "event_count_7d",
        "avg_dwell_sec_1h", "avg_dwell_sec_24h", "avg_dwell_sec_7d",
        "sum_dwell_sec_1h",
    )
    @extractor_outputs(
        "engagement_velocity", "dwell_depth_score", "session_intensity",
        "activity_trend_7d", "is_high_intent", "browsing_momentum",
    )
    def compute_derived_signals(
        cls, ts, count_1h, count_24h, count_7d,
        avg_dwell_1h, avg_dwell_24h, avg_dwell_7d, sum_dwell_1h,
    ):
        hourly_rate_24h = max(count_24h / 24.0, 1.0)
        daily_rate_7d = max(count_7d / 7.0, 1.0)
        hourly_avg_7d = max(count_7d / 168.0, 0.01)

        engagement_velocity = count_1h / hourly_rate_24h
        dwell_depth_score = (avg_dwell_24h or 0.0) / max(avg_dwell_7d or 1.0, 1.0)
        session_intensity = (sum_dwell_1h or 0.0) / max(count_1h, 1)
        activity_trend_7d = count_24h / daily_rate_7d
        is_high_intent = (
            engagement_velocity > 2.0
            and (avg_dwell_1h or 0.0) >= 30.0
            and activity_trend_7d > 1.0
        )
        browsing_momentum = count_1h / hourly_avg_7d

        return {
            "engagement_velocity": engagement_velocity,
            "dwell_depth_score": dwell_depth_score,
            "session_intensity": session_intensity,
            "activity_trend_7d": activity_trend_7d,
            "is_high_intent": is_high_intent,
            "browsing_momentum": browsing_momentum,
        }

Architecture at a glance

Source
Click events
Kinesis · 5k EPS
Source
User profile
Postgres
Pipeline
Temporal join + windows
1h · 24h · 7d
Dataset
UserEngagementStats
9 raw aggregates
Pipeline
Python extractor
composite signals
Featureset
DiscoverySignals
15 features served

Write path: Kinesis click events flow through the engine's join + windowed aggregation, materializing UserEngagementStats. Read path: the query server executes the derived extractor over the pre-aggregated values, returning 15 features. Server-side latency stays below 5 ms; end-to-end below 50 ms.

The is_high_intent signal

The composite extractor fires when all three hold:

  • engagement_velocity > 2 - clicking at least 2× faster than their 24-hour baseline
  • avg_dwell_sec_1h ≥ 30 - actually reading listings, not scrolling
  • activity_trend_7d > 1 - more active today than their weekly average

All three are computed in Python at query time over the pre-aggregated state.

Production results

Smoke test: persona verification

Entityevent_count_1havg_dwell_1hengagement_velocityis_high_intent
u_active_browser20~30s~16.0true
u_casual_visitor2~4s~baselinefalse
u_dormant_returner0-0.0false

u_dormant_returner retains 20 events in the 7-day window. As soon as they return, engagement_velocity and is_high_intent update inside the next flush.

AWS production results (10k users, 5k EPS live load)

Tested on EKS (us-east-1), Kinesis on-demand (5–10 shards active), RDS Postgres for profiles, c7g Graviton nodes.

MetricValue
Sustained write throughput5,000 EPS
Peak write throughput10,000 EPS
Unique users under live load10,000
Read latency (server-side P50)< 5 ms
Read latency (end-to-end incl. network)< 50 ms
Features served per query15 (9 raw + 6 derived)
Time windows3 (1h, 24h, 7d)
Online/offline parity100%
Lines of Python~40

Comparison with other platforms

CapabilityThymeTectonFennelFeast
Kinesis streaming connectorBuilt-inBuilt-inBuilt-inRequires external
Temporal join (SCD-style)Native (.join(..., fields=[...]))NativeNativeNot supported
Multi-window aggregationSingle pipeline definitionBatch + streaming pipelinesSingle definition (lambda internally)Batch only
Composite Python extractorsQuery-timeOn-demand featuresPython transformsNot supported
Online/offline consistencyGuaranteed (kappa)Reconciled (lambda)Reconciled (lambda)Manual

Reproducing this on your own data

Point ClickEvent at your own Kinesis stream and UserProfile at your user-profile table, commit, and the 15-feature DiscoverySignals set is live within seconds.

thyme commit features.py
curl -H "Authorization: Bearer $THYME_API_KEY" \
    "$THYME_BASE_URL/features?entity_id=u_active_browser&featureset=DiscoverySignals"

On this page