Skip to content

Rolling top-K thresholds for extreme-event filters

A common pattern in research strategies: "fire when this bar's metric is in the top-K of the trailing N-bar window." Range breakout, volume spike, abs-return tail, count-of-prints all reduce to the same primitive once the per-bar metric is computed.

The naive Python loop is fine at hourly resolution:

WARMUP = 30 * 24   # 30 days of 1h bars
for i in range(WARMUP, n):
    win = metric[i - WARMUP : i]
    thr = np.partition(win, -K)[-K]
    if metric[i] >= thr:
        ...

At 17,000 bars it runs in ~7 seconds. At 15-minute resolution (70k bars) it crosses a minute. At 1-second resolution (60M bars) it crosses an hour. flox_py.rolling.top_k_threshold collapses the same computation to a single vectorized np.partition call along the window axis.

Function

flox_py.rolling.top_k_threshold(
    values: ArrayLike,
    *,
    window: int,
    k: int = 1,
    out_dtype: np.dtype | None = None,
) -> np.ndarray

thr[i] is the k-th largest value in values[i - window : i]. The bar at index i itself is excluded, so the helper does not leak future information into the threshold. The first window slots are filled with NaN so callers can mask out the warmup with ~np.isnan(thr).

Example

The script below is the same one CI runs on every push. It computes a top-3 threshold over a 100-bar trailing window on a synthetic per-bar metric and confirms the result matches an explicit np.partition call:

"""Sliding top-K threshold round-trip — generate a synthetic price
series, derive a per-bar metric, compute the rolling K-th largest
value across the trailing window, and print a few entries to confirm
the helper does what the doc claims.

This is the CI-runnable companion to
[Rolling top-K thresholds](../how-to/rolling-thresholds.md). It runs
without flox_py's C extension because ``flox_py.rolling`` is pure
numpy — the import still goes through the package so the example
also smoke-tests that the module ships.

Usage:
    cd /path/to/flox
    PYTHONPATH=build/python python3 docs/examples/python_rolling_top_k.py
"""
from __future__ import annotations

import numpy as np

from flox_py import rolling


def main() -> None:
    rng = np.random.default_rng(0)
    n = 1_000
    # Synthetic per-bar metric: range in bps. Heavier tail so the
    # top-K threshold has a meaningful shape.
    metric = np.abs(rng.standard_t(df=4, size=n)) * 10.0

    window = 100   # last 100 bars
    k = 3          # 3rd-largest of the trailing window
    thr = rolling.top_k_threshold(metric, window=window, k=k)

    # Pick a bar past the warmup and confirm the helper matches what
    # the explicit np.partition call returns for the same window.
    i = 500
    win = metric[i - window : i]
    expected = float(np.partition(win, -k)[-k])
    got = float(thr[i])
    assert abs(got - expected) < 1e-12, (got, expected)
    print(
        f"top-{k} threshold over trailing {window} bars at bar {i}: "
        f"{got:.4f} (matches np.partition)."
    )

    # Number of bars whose metric exceeds the trailing K-th largest —
    # the kind of count an extreme-event filter cares about.
    fired = np.sum(metric[window:] >= thr[window:])
    print(f"bars firing the threshold: {int(fired)} / {n - window}")


if __name__ == "__main__":
    main()

Performance

Benchmark on 17,520 bars (BTC 1h × 2 years), window=720, k=3:

Implementation Time
Naive Python loop with np.partition per bar ~7 s
flox_py.rolling.top_k_threshold ~25 ms

At 1-minute (or finer) resolution the difference goes from "the script runs" to "the script does not run." The helper makes the lower timeframes accessible without writing per-call boilerplate around np.lib.stride_tricks.sliding_window_view.

When to use a C++ aggregator instead

If the per-bar metric is itself derived from a tape (not from a pre-computed numpy array), pair top_k_threshold with the OHLCBinAggregator so the metric and the threshold are produced in a single pass over the tape. The threshold computation stays in numpy because the work is bounded by the OHLC output (one row per bar, not one row per trade); the savings from pushing it down to C++ would not be measurable next to the I/O.

A streaming per-trade top-K (a tape-side filter that emits only the buckets above some trailing-window threshold, without materialising every bucket) is tracked as a separate aggregator task when a use case actually needs it.