Rolling top-K thresholds for extreme-event filters¶
A common pattern in research strategies: "fire when this bar's metric is in the top-K of the trailing N-bar window." Range breakout, volume spike, abs-return tail, count-of-prints all reduce to the same primitive once the per-bar metric is computed.
The naive Python loop is fine at hourly resolution:
WARMUP = 30 * 24 # 30 days of 1h bars
for i in range(WARMUP, n):
win = metric[i - WARMUP : i]
thr = np.partition(win, -K)[-K]
if metric[i] >= thr:
...
At 17,000 bars it runs in ~7 seconds. At 15-minute resolution (70k bars) it crosses a minute. At 1-second resolution (60M bars) it crosses an hour. flox_py.rolling.top_k_threshold collapses the same computation to a single vectorized np.partition call along the window axis.
Function¶
flox_py.rolling.top_k_threshold(
values: ArrayLike,
*,
window: int,
k: int = 1,
out_dtype: np.dtype | None = None,
) -> np.ndarray
thr[i] is the k-th largest value in values[i - window : i]. The bar at index i itself is excluded, so the helper does not leak future information into the threshold. The first window slots are filled with NaN so callers can mask out the warmup with ~np.isnan(thr).
Example¶
The script below is the same one CI runs on every push. It computes a top-3 threshold over a 100-bar trailing window on a synthetic per-bar metric and confirms the result matches an explicit np.partition call:
"""Sliding top-K threshold round-trip — generate a synthetic price
series, derive a per-bar metric, compute the rolling K-th largest
value across the trailing window, and print a few entries to confirm
the helper does what the doc claims.
This is the CI-runnable companion to
[Rolling top-K thresholds](../how-to/rolling-thresholds.md). It runs
without flox_py's C extension because ``flox_py.rolling`` is pure
numpy — the import still goes through the package so the example
also smoke-tests that the module ships.
Usage:
cd /path/to/flox
PYTHONPATH=build/python python3 docs/examples/python_rolling_top_k.py
"""
from __future__ import annotations
import numpy as np
from flox_py import rolling
def main() -> None:
rng = np.random.default_rng(0)
n = 1_000
# Synthetic per-bar metric: range in bps. Heavier tail so the
# top-K threshold has a meaningful shape.
metric = np.abs(rng.standard_t(df=4, size=n)) * 10.0
window = 100 # last 100 bars
k = 3 # 3rd-largest of the trailing window
thr = rolling.top_k_threshold(metric, window=window, k=k)
# Pick a bar past the warmup and confirm the helper matches what
# the explicit np.partition call returns for the same window.
i = 500
win = metric[i - window : i]
expected = float(np.partition(win, -k)[-k])
got = float(thr[i])
assert abs(got - expected) < 1e-12, (got, expected)
print(
f"top-{k} threshold over trailing {window} bars at bar {i}: "
f"{got:.4f} (matches np.partition)."
)
# Number of bars whose metric exceeds the trailing K-th largest —
# the kind of count an extreme-event filter cares about.
fired = np.sum(metric[window:] >= thr[window:])
print(f"bars firing the threshold: {int(fired)} / {n - window}")
if __name__ == "__main__":
main()
Performance¶
Benchmark on 17,520 bars (BTC 1h × 2 years), window=720, k=3:
| Implementation | Time |
|---|---|
Naive Python loop with np.partition per bar |
~7 s |
flox_py.rolling.top_k_threshold |
~25 ms |
At 1-minute (or finer) resolution the difference goes from "the script runs" to "the script does not run." The helper makes the lower timeframes accessible without writing per-call boilerplate around np.lib.stride_tricks.sliding_window_view.
When to use a C++ aggregator instead¶
If the per-bar metric is itself derived from a tape (not from a pre-computed numpy array), pair top_k_threshold with the OHLCBinAggregator so the metric and the threshold are produced in a single pass over the tape. The threshold computation stays in numpy because the work is bounded by the OHLC output (one row per bar, not one row per trade); the savings from pushing it down to C++ would not be measurable next to the I/O.
A streaming per-trade top-K (a tape-side filter that emits only the buckets above some trailing-window threshold, without materialising every bucket) is tracked as a separate aggregator task when a use case actually needs it.