Every large consumer product has a surface that answers the question of what is hot right now. Trending hashtags on a social network, most-viewed videos on a streaming site, hot search terms in a search box, and top products on a storefront all look different to users, yet behind each of them sits the same service, one that watches an enormous stream of access events and continuously maintains, for several time windows, the K items with the highest counts. The windows are part of the product rather than an implementation detail, because trending in the last five minutes and most viewed today are different features with different freshness promises, so a real design tracks something like the last 1 minute, 5 minutes, 1 hour, and 1 day, with K between 10 and 100. A user glancing at the trending panel should see it move within seconds of a genuine surge, while an analyst pulling yesterday's top 100 should get numbers that survive an audit, and those two expectations pull the design in opposite directions.
Interviewers reach for this question because the obvious answer does not survive contact with the numbers, and the standard escape, which is accepting approximation on the hot path, requires explaining a probabilistic data structure carefully rather than waving at it. The hard parts live in the memory math, in the count-min sketch's error guarantees, and in a subtle distributed problem, namely that combining per-node top-K lists is not as sound as it looks.
Scope and requirements
Functionally, the system ingests access events, each carrying an item ID and a timestamp, and serves a query of the form "give me the top K items for window W," where W is one of the fixed windows above. Results should be fresh within a few seconds for the short windows, and the service should also answer historical questions such as yesterday's top videos, which implies keeping exact daily results around rather than letting them expire with the stream. K stays fixed and small, and arbitrary ad hoc windows stay out of scope, a restriction worth defending out loud, because fixed windows are what the product actually shows and they are what makes precomputation possible at all. A system that promised top K over any user-chosen interval would have to keep per-item counts at fine granularity forever, which is a different and far more expensive product.
Non-functionally, assume one million events per second at peak, query latency of a few tens of milliseconds since the lists render on home pages, and high availability on the read side. The crucial relaxation to negotiate early is accuracy. Short-window lists may be approximate, because no user can tell whether the fifth trending hashtag really has 1,002,113 views or 1,001,890, and nothing downstream breaks if ranks nine and ten briefly swap. The daily results that feed reporting and payouts should be exact, because money and contracts hang off them. That split in accuracy requirements is the design, in the sense that once an interviewer agrees the five-minute list can be a little wrong, the two-path architecture below follows almost mechanically.
Sizing the problem
Start with why exact counting in memory fails, because the failure is quantitative rather than conceptual. At one million events per second, a day sees 86.4 billion events, and a popular platform can plausibly touch 500 million distinct items in that day across videos, hashtags, and queries. A hash table entry holding an 8-byte item identifier, an 8-byte counter, and roughly 34 bytes of pointer and bucket overhead costs about 50 bytes, so an exact table over a day's distinct items is 500 million times 50 bytes, about 25 GB, for one window on one machine. The 1-hour, 5-minute, and 1-minute windows each need their own counts, and sliding those windows means keeping per-interval tables, so the total comfortably exceeds any single node's memory, while sharding it everywhere makes every query a scatter-gather over hundreds of gigabytes of state that must still answer in tens of milliseconds. Exactness is affordable in batch on cheap storage, where the same 86 billion events stream from disk across many workers, but not in RAM on the hot path.
Bandwidth, by contrast, stays manageable. A million events per second at roughly 100 bytes each is 100 MB/s into the ingestion tier, which one well-provisioned queue cluster absorbs without drama, and the read side is lighter still, since users fetch a few kilobytes of precomputed list rather than anything proportional to event volume. The bottleneck is memory for counting rather than the pipe, and that observation points directly at the two-path architecture, where the hot path holds a compressed summary of the stream and the cold path holds the stream itself.
The query interface
The read side is a small HTTP API over precomputed lists, which keeps query latency independent of event volume, since a request reads a stored answer instead of computing one. Writes are not an API at all from the product's perspective, because services simply emit access events to a queue topic, and everything downstream of that topic is this system's internals, free to be rebuilt without any caller noticing.
GET /v1/top?window=5m&k=10
→ 200 {
"window": "5m",
"as_of": "2025-04-18T17:42:30Z",
"items": [
{ "item_id": "video-8841", "count": 1250410 },
{ "item_id": "video-1029", "count": 983112 }
],
"exact": false
}
GET /v1/top?window=1d&date=2025-04-17&k=100 → 200 { ..., "exact": true }
The exact flag is product-facing because the two paths carry different guarantees, and downstream consumers such as reporting jobs should be able to insist on the exact variant rather than discovering months later that a revenue dashboard was built on estimates.
The high-level architecture
All events flow into a message queue, which fans them out to two consumers with opposite trade-offs. The fast path is a fleet of stream counters that maintain approximate counts in fixed memory and push fresh top-K lists to a serving store every second or two, so the trending panel moves while the surge is still happening. The slow path holds the same events durably in the queue or an archive, and batch jobs aggregate them into exact per-window counts hourly and daily, overwriting the approximate results for those windows after the fact, so the permanent record is always the audited one. Queries always hit the serving store and never the counters, which keeps reads cheap and means the counting tier can crash, restart, or be rebuilt without a user-visible event. This is the lambda-architecture shape, a low-latency approximate layer and a high-latency exact layer converging on one serving view, and it earns its keep here precisely because the product itself distinguishes fresh-and-rough from final-and-exact. A single path that is both fresh and exact does exist, and the slow-path section returns to when a stream processor delivers it.
Events flow once into the queue and twice out of it, with the stream counters publishing approximate top-K lists within seconds and the batch aggregator replacing them with exact lists hours later. Queries only ever read the store.
The fast path pairs a count-min sketch with a heap
The fast path needs approximate counts for every item in bounded memory, and the standard tool is the count-min sketch. A count-min sketch is a small two-dimensional array of counters, d rows by w columns, with one independent hash function per row. Recording an event for item x means computing each row's hash of x, which picks one column in that row, and incrementing that single counter in each of the d rows, so an update costs d hashes and d additions no matter how many distinct items exist. Estimating x's count means reading the same d counters and taking the minimum. Every counter is shared by many items, so each cell only ever over-counts, since collisions add and never subtract, and the minimum across rows is the best single cell to trust because it is the cell where x collided with the least extra traffic. Estimates therefore never undershoot the true count and overshoot by a bounded amount, a one-sided error that matters for the product, since a tail item can be promoted a little but a genuinely hot item can never be hidden.
The dimensions come from two knobs, and walking the arithmetic shows how cheaply the guarantee is bought. The width controls the error, in that with total event count N the estimate exceeds the truth by at most e divided by w, times N, where e is Euler's number, about 2.718. The depth controls the confidence, since the failure probability shrinks by half with each added row, and the convenient form is d equal to the natural log of 1 over delta for failure probability delta. Choosing an error of 0.001 percent of N means w must be 2.718 divided by 0.00001, about 272,000 columns, and choosing delta of 0.1 percent means d is the natural log of 1,000, about 7 rows. Multiplying gives 272,000 times 7, roughly 1.9 million counters, and at 4 bytes each the whole sketch is about 7.6 MB. Against the 25 GB exact table that is more than three orders of magnitude smaller, with a price that is quantified rather than vague. Over a one-minute window of 60 million events, the error bound is 0.00001 times 60 million, about 600 counts, and an item contending for a top-10 slot in such a window has hundreds of thousands of accesses, so 600 counts of noise cannot confuse a genuinely hot item with the long tail.
One event for item x increments exactly one counter per row, at the column chosen by that row's hash. Reading takes the minimum of the same cells, so collisions can only inflate the answer, never hide it.
The sketch alone answers how many times x was accessed but not which items were accessed most, because a sketch can only be probed, never enumerated. The companion structure is a min-heap of the current K best candidates, maintained beside the sketch. For every incoming event, the counter estimates that item's count from the sketch, and if the estimate exceeds the smallest count in the heap, the item is inserted or updated and the minimum evicted, so the heap converges on the heavy hitters without ever scanning anything. A heap of K equal to 100 entries costs a few kilobytes, and the pair of structures per window per counter node stays under 10 MB, which is why each counting node can maintain all four windows simultaneously rather than needing a fleet per window. Windows are handled by keeping one sketch-plus-heap per interval, for example sixty 1-second sketches whose contents merge for the rolling minute, or more simply one sketch per tumbling window that resets on the boundary, which is what most products actually ship because users cannot tell a sliding minute from a minute that ticks. The named alternative to this whole arrangement is the space-saving family of counters, which keeps a fixed table of a few thousand candidate items and evicts the smallest entry when a new item arrives, answering the same product question in even less memory. The sketch-plus-heap pair wins here because sketches merge by simple addition across intervals and machines while merging space-saving tables is fussier, and because the sketch's error bound is easier to state, monitor, and defend in a design review.
Distribution introduces the subtle flaw worth naming unprompted, because it looks like a routing detail and quietly decides correctness. With events partitioned across, say, three counter nodes by item hash, per-node top-K lists merge cleanly, since every event for a given item lands on one node and that node's estimate is therefore the global estimate. But if events are partitioned by source or arrive at whatever node is nearest, an item's count is split across nodes, and merging per-node top-K lists becomes approximate in a way no sketch math fixes. An item that ranks eleventh on each of three nodes is invisible in every top-10 list they report, yet its summed count could place it fifth globally. Concretely, if each node's tenth item has 1,000 local accesses and item X has 900 on every node, X's true total of 2,700 beats any item that appears on only one node with 1,500, but no merger of the three top-10 lists will ever see X, and no metric will reveal the omission. The fix is to partition events by item ID so each item is counted whole on one node, and that single routing decision is the difference between a sound design and a quietly wrong one.
The slow path and the serving store
The slow path exists because the product also needs exact, auditable numbers and a recovery story. The queue retains raw events for a few days, and an archival consumer copies them into cheap blob storage partitioned by hour, where a month of history costs less than a single counting node's RAM. Batch jobs in the MapReduce style then compute exact counts per item per window, with the map phase emitting an item-and-one pair from each event in each file, the shuffle grouping by item, and the reduce phase summing and taking the true top K with a full sort of counts. That work is easy offline even at 86 billion events because it streams from disk across many workers rather than living in RAM, which is the same arithmetic that ruled out exact online counting now working in the design's favor. Hourly jobs finalize the 1-hour windows and daily jobs finalize the days, writing results into the same serving store keyed by window, where they overwrite the fast path's approximations. Reconciliation falls out for free, since the difference between the approximate and exact lists for the same window is a continuous measurement of sketch quality, and a growing gap is the alert that the fast path needs wider sketches or has developed a routing defect.
The serving store itself is a small key-value table, keyed by window identifier with the ordered list as the value, so a five-minute window's key maps to a list like [(video-8841, 1250410), ...]. Each entry is a few kilobytes, queries are single-key gets, and a cache in front handles the home-page read storm. An alternative framing replaces both hand-built paths with a stream processor such as Flink consuming the same topic, keyed by item, where windowed aggregations and exactly-once state checkpoints produce exact short-window counts too, holding per-key state in managed RocksDB on local disk rather than in RAM. The stream processor wins when the team already operates Flink and the distinct-item cardinality per window is modest, while the sketch design wins at extreme cardinality or when an operationally minimal fast path matters more than exact short windows, and naming the assumption that decides between them is more useful than picking a favorite. Either way, event time needs care, because events arrive late and out of order from buffering mobile clients and slow networks. The aggregator therefore tracks a watermark, which is a moving estimate that no events older than time T are still coming, closes windows only when the watermark passes them, and either drops later stragglers or issues corrections to the store.
A latency budget from event to screen
Following one access event through the fast path shows where its few seconds of freshness go and what a user can actually expect. The client's view event reaches the ingestion endpoint and lands in the queue within about 50 ms, almost all of it network. The counter node's consumer fetches it on the next poll, typically a few hundred milliseconds behind the head of the partition under normal lag, and the sketch update itself costs microseconds, being seven hashes and seven increments. The counter publishes its current heaps to the serving store every second or two, so the worst-case path from event to readable list is one publish interval plus the read, and a home page fetching the list pays a single cached key-value get of a few kilobytes, a millisecond or two at the store plus whatever the CDN adds. Summed up, a surge begins influencing the public list within two to four seconds, which is why the requirements said a few seconds rather than milliseconds, and the queue's buffering is what keeps that promise during a spike, since a counter that falls behind catches up by draining backlog instead of dropping events.
The same trace explains what degradation looks like before anything is fully down. If a counter node restarts, its windows sit empty until it replays the last few minutes from the queue, so the store briefly serves the previous publish, a list a few seconds staler than usual, and nobody notices. If one partition lags because a single item went viral and concentrated traffic, that partition's contribution to the lists goes stale while the rest stay fresh, which argues for a per-partition lag alert rather than a single global health check. And if the serving store itself becomes unreachable, the product falls back to whatever list the CDN last cached, which for a trending panel is a perfectly serviceable answer for several minutes.
Scaling, failures, and operations
Each tier scales separately, which is most of the reason the architecture splits into tiers at all. The queue scales by partitions, and partitioning by item ID is already required for correctness, so adding counter nodes means adding partitions and consumers in step. Counter nodes hold only sketches and heaps, so a crashed node restarts empty and rebuilds its current windows by replaying the last few minutes from the queue, which is the great operational virtue of keeping the fast path's state small and reconstructible from a durable stream. The batch tier scales like any MapReduce workload, by adding workers, and a failed job simply reruns since its inputs are immutable files in blob storage. The serving store and query API scale with read replicas and caching, and they are the only tiers users can see fail, so they get the redundancy budget while the counters get almost none.
The failure modes worth rehearsing are quieter than crashes. A hot item concentrated on one queue partition can lag that partition's counter, which shows up as one window's list going stale while every health check stays green, and the mitigation is monitoring per-partition consumer lag and over-partitioning so a single hot key is a small fraction of one node's load. Watermark misconfiguration silently undercounts items whose traffic arrives late from mobile clients, and its symptom is a daily exact list that consistently ranks certain items higher than the fast path ever did, one more reason to treat the reconciliation diff as a first-class metric. The sketch's one-sided error means collisions can only promote tail items and never demote hot ones, so the worst user-visible failure is a marginal item appearing at rank K for a while, which is exactly the kind of defect the exact path's reconciliation catches within the hour.
Follow-up questions
- Why not just count exactly in Redis? A day of 500 million distinct counters is about 25 GB before replication, per window, with a million increments per second pounding against it. It can be sharded into existence, but the sketch gives the same product answer in about 8 MB per window, the operational surface of a sharded counter fleet disappears, and the exact batch path already covers every use that genuinely needs precision.
- Why does the count-min sketch never undercount? Every occurrence of item x increments all d of x's counters, so each one is at least x's true count, and collisions only ever add other items' counts on top. The minimum across rows is therefore guaranteed to be at least the truth and at most the truth plus bounded noise, which is the one-sided behavior the product quietly depends on.
- How do you get a top-K list out of a sketch at all? A sketch cannot be enumerated, so a K-entry min-heap is maintained beside it and updated on every event using the sketch's estimate. The heap remembers identities while the sketch remembers counts, and neither structure alone answers the product's question.
- What goes wrong merging top-K lists from nodes that each saw part of the traffic? An item just below the cutoff on every node is dropped from every list even though its global sum belongs in the top K, like the item with 900 accesses on each of three nodes whose 2,700 total goes unseen, and widening the per-node lists only shrinks that gap without closing it. Partitioning events by item ID removes the problem outright by making each node's count global for its items.
- How are late events handled in the windowed counts? The stream path closes a window when the watermark passes it, then either drops late events or emits corrections, while the batch path naturally includes everything that arrived by the time the job runs. The exact daily list is the system of record for anything that matters, and the product copy can say plainly that short-window trends are estimates.
- What changes for "top K in the last N minutes" sliding continuously? Keep fine-grained tumbling sketches, for example one per second, and answer a query by summing the last N times 60 sketches for the heap's candidates. Count-min sketches merge by cell-wise addition as long as they share hash functions, and that closure under addition is the property that makes the whole decomposition work.
References
- Cormode and Muthukrishnan, An Improved Data Stream Summary: The Count-Min Sketch and its Applications (2005).
- Kleppmann, Designing Data-Intensive Applications (2017), chapters on batch and stream processing.
- Marz, How to beat the CAP theorem (2011), the original sketch of the lambda architecture.
- Apache Flink, Windows documentation, on keyed windows, triggers, and lateness.