Real-Time Purchase Intent for a Travel Marketplace
Real-time purchase intent for a 500M-visitor travel marketplace using Kinesis clickstream, temporal joins, and composite Python extractors.
The problem
A travel marketplace with 500M monthly visitors lists thousands of experiences - adventure tours, food walks, cultural immersions, wellness retreats. Users browse, click, linger, bounce. The business question is continuous: right now, which users are about to book?
That signal drives search ranking, personalized CTAs, and inventory allocation. It has to separate three kinds of users in milliseconds:
- Active browsers - many clicks in the last hour, deep dwell time, pace accelerating
- Casual scrollers - a handful of short glances before they leave
- Dormant returners - quiet for a day, used to be active last week
Batch pipelines that refresh hourly can't answer this. By the time the aggregation lands, the user is gone.
Why this feature is hard
The clickstream is big and noisy: 5,000 events/sec sustained, peaking at 10,000 EPS. Each event is ~250 bytes of (user_id, experience_id, category, dwell_time_sec, timestamp).
Naive approaches all miss:
- Batch in Snowflake/Spark - too slow; creates training-serving skew between daily training data and streaming serving features
- Redis counters - require hand-rolled windowed aggregation; no temporal joins; awkward to scale with user cardinality
- Analytics warehouses (Redshift, BigQuery) - not built for streaming ingestion paired with sub-5 ms point lookups
What you actually need is windowed aggregations across 1h, 24h, 7d, temporal-joined against user profiles, producing a composite intent signal - at ranking-request latency, with zero offline/online drift.
Why Thyme's architecture fits
Thyme's kappa architecture pulls this into a single pipeline definition:
- Temporal joins - every click is enriched with the user's profile as of the click's timestamp, not the latest profile. Guarantees training data matches online serving.
- Invertible operators -
CountandSumevict expired window contents in O(1). 7-day windows aren't meaningfully more expensive than 1-hour windows. - Derived extractors at query time - composite signals like
is_high_intentare Python, executed at read time over the pre-aggregated values. No extra storage, no pre-materialization. - One definition, both paths - the same
features.pyserves online ranking and offline training sets.
The Thyme solution
The complete feature definition (trimmed from demos/experience_discovery/features.py):
from datetime import datetime
from thyme import (
Avg, Config, Count, Sum, dataset, extractor, extractor_inputs,
extractor_outputs, feature, featureset, field, inputs, pipeline, source,
)
from thyme.connectors import KinesisSource
from thyme.dataset import Field
config = Config.load()
click_source = KinesisSource(
stream_arn=config.kinesis.stream_arn,
region=config.kinesis.region,
init_position="trim_horizon",
format="json",
)
profile_source = config.postgres_source(table="user_profiles")
@source(click_source, max_lateness="1h")
@dataset(version=1)
class ClickEvent:
user_id: Field[str] = field(key=True)
event_type: Field[str] = field()
experience_id: Field[str] = field()
category: Field[str] = field()
location_id: Field[str] = field()
dwell_time_sec: Field[float] = field()
timestamp: Field[datetime] = field(timestamp=True)
@source(profile_source, cursor="timestamp", every="60s", max_lateness="1h")
@dataset(version=1, index=True)
class UserProfile:
user_id: Field[str] = field(key=True)
segment: Field[str] = field()
home_country: Field[str] = field()
account_age_days: Field[int] = field()
timestamp: Field[datetime] = field(timestamp=True)
@dataset(version=2, index=True)
class UserEngagementStats:
user_id: Field[str] = field(key=True)
event_count_1h: Field[int] = field()
event_count_24h: Field[int] = field()
event_count_7d: Field[int] = field()
avg_dwell_sec_1h: Field[float] = field()
avg_dwell_sec_24h: Field[float] = field()
avg_dwell_sec_7d: Field[float] = field()
sum_dwell_sec_1h: Field[float] = field()
sum_dwell_sec_24h: Field[float] = field()
timestamp: Field[datetime] = field(timestamp=True)
@pipeline(version=1)
@inputs(ClickEvent, UserProfile)
def compute_engagement(cls, clicks):
return (
clicks
.join(UserProfile, on="user_id", fields=["segment", "account_age_days"])
.groupby("user_id")
.aggregate(
event_count_1h=Count(window="1h"),
event_count_24h=Count(window="24h"),
event_count_7d=Count(window="7d"),
avg_dwell_sec_1h=Avg(of="dwell_time_sec", window="1h"),
avg_dwell_sec_24h=Avg(of="dwell_time_sec", window="24h"),
avg_dwell_sec_7d=Avg(of="dwell_time_sec", window="7d"),
sum_dwell_sec_1h=Sum(of="dwell_time_sec", window="1h"),
sum_dwell_sec_24h=Sum(of="dwell_time_sec", window="24h"),
)
)
@featureset
class DiscoverySignals:
user_id: str = feature()
# Raw aggregations - auto-generated lookup extractors share state reads
# across all eight features.
event_count_1h: int = feature(ref=UserEngagementStats.event_count_1h)
event_count_24h: int = feature(ref=UserEngagementStats.event_count_24h)
event_count_7d: int = feature(ref=UserEngagementStats.event_count_7d)
avg_dwell_sec_1h: float = feature(ref=UserEngagementStats.avg_dwell_sec_1h)
avg_dwell_sec_24h: float = feature(ref=UserEngagementStats.avg_dwell_sec_24h)
avg_dwell_sec_7d: float = feature(ref=UserEngagementStats.avg_dwell_sec_7d)
sum_dwell_sec_1h: float = feature(ref=UserEngagementStats.sum_dwell_sec_1h)
sum_dwell_sec_24h: float = feature(ref=UserEngagementStats.sum_dwell_sec_24h)
# Derived signals
engagement_velocity: float = feature()
dwell_depth_score: float = feature()
session_intensity: float = feature()
activity_trend_7d: float = feature()
is_high_intent: bool = feature()
browsing_momentum: float = feature()
@extractor
@extractor_inputs(
"event_count_1h", "event_count_24h", "event_count_7d",
"avg_dwell_sec_1h", "avg_dwell_sec_24h", "avg_dwell_sec_7d",
"sum_dwell_sec_1h",
)
@extractor_outputs(
"engagement_velocity", "dwell_depth_score", "session_intensity",
"activity_trend_7d", "is_high_intent", "browsing_momentum",
)
def compute_derived_signals(
cls, ts, count_1h, count_24h, count_7d,
avg_dwell_1h, avg_dwell_24h, avg_dwell_7d, sum_dwell_1h,
):
hourly_rate_24h = max(count_24h / 24.0, 1.0)
daily_rate_7d = max(count_7d / 7.0, 1.0)
hourly_avg_7d = max(count_7d / 168.0, 0.01)
engagement_velocity = count_1h / hourly_rate_24h
dwell_depth_score = (avg_dwell_24h or 0.0) / max(avg_dwell_7d or 1.0, 1.0)
session_intensity = (sum_dwell_1h or 0.0) / max(count_1h, 1)
activity_trend_7d = count_24h / daily_rate_7d
is_high_intent = (
engagement_velocity > 2.0
and (avg_dwell_1h or 0.0) >= 30.0
and activity_trend_7d > 1.0
)
browsing_momentum = count_1h / hourly_avg_7d
return {
"engagement_velocity": engagement_velocity,
"dwell_depth_score": dwell_depth_score,
"session_intensity": session_intensity,
"activity_trend_7d": activity_trend_7d,
"is_high_intent": is_high_intent,
"browsing_momentum": browsing_momentum,
}Architecture at a glance
Write path: Kinesis click events flow through the engine's join + windowed aggregation, materializing UserEngagementStats.
Read path: the query server executes the derived extractor over the pre-aggregated values, returning 15 features. Server-side latency stays below 5 ms; end-to-end below 50 ms.
The is_high_intent signal
The composite extractor fires when all three hold:
engagement_velocity > 2- clicking at least 2× faster than their 24-hour baselineavg_dwell_sec_1h ≥ 30- actually reading listings, not scrollingactivity_trend_7d > 1- more active today than their weekly average
All three are computed in Python at query time over the pre-aggregated state.
Production results
Smoke test: persona verification
| Entity | event_count_1h | avg_dwell_1h | engagement_velocity | is_high_intent |
|---|---|---|---|---|
u_active_browser | 20 | ~30s | ~16.0 | true |
u_casual_visitor | 2 | ~4s | ~baseline | false |
u_dormant_returner | 0 | - | 0.0 | false |
u_dormant_returner retains 20 events in the 7-day window. As soon as they return, engagement_velocity and is_high_intent update inside the next flush.
AWS production results (10k users, 5k EPS live load)
Tested on EKS (us-east-1), Kinesis on-demand (5–10 shards active), RDS Postgres for profiles, c7g Graviton nodes.
| Metric | Value |
|---|---|
| Sustained write throughput | 5,000 EPS |
| Peak write throughput | 10,000 EPS |
| Unique users under live load | 10,000 |
| Read latency (server-side P50) | < 5 ms |
| Read latency (end-to-end incl. network) | < 50 ms |
| Features served per query | 15 (9 raw + 6 derived) |
| Time windows | 3 (1h, 24h, 7d) |
| Online/offline parity | 100% |
| Lines of Python | ~40 |
Comparison with other platforms
| Capability | Thyme | Tecton | Fennel | Feast |
|---|---|---|---|---|
| Kinesis streaming connector | Built-in | Built-in | Built-in | Requires external |
| Temporal join (SCD-style) | Native (.join(..., fields=[...])) | Native | Native | Not supported |
| Multi-window aggregation | Single pipeline definition | Batch + streaming pipelines | Single definition (lambda internally) | Batch only |
| Composite Python extractors | Query-time | On-demand features | Python transforms | Not supported |
| Online/offline consistency | Guaranteed (kappa) | Reconciled (lambda) | Reconciled (lambda) | Manual |
Reproducing this on your own data
Point ClickEvent at your own Kinesis stream and UserProfile at your user-profile table, commit, and the 15-feature DiscoverySignals set is live within seconds.
thyme commit features.py
curl -H "Authorization: Bearer $THYME_API_KEY" \
"$THYME_BASE_URL/features?entity_id=u_active_browser&featureset=DiscoverySignals"