Aggregate tape events in a single pass¶
DataReader.run([...]) walks a .floxlog once and forwards every event to a panel of streaming aggregators. The aggregators run entirely in C++; the dispatch loop never crosses into Python or JS during the walk.
The six native aggregators:
| Aggregator | Per-cell value | Typical use |
|---|---|---|
EventTypeStatsAggregator |
Counter of trades, book snapshots, book deltas | Quick overview of a tape's contents |
BinCountAggregator |
Event count per time bucket | Trades per minute, per second, etc. |
VolumeBinAggregator |
Sum of trade qty_raw per time bucket |
Volume profile over time |
OHLCBinAggregator |
Open / high / low / close of trade price_raw per bucket |
Price-time series for returns and candle reconstruction |
PeakAggregator |
Top-N busiest fixed-width windows per scale | Finding bursts at chosen scales |
QuantileAggregator |
Window-count distribution to quantiles | Activity baseline at a given scale |
The filter parameters work the same way on all six:
event_filter:Trades,BooksOnly, orBoth. Aggregators that only make sense for trades (volume, OHLC, peaks, quantiles) default toTrades.symbol_filter: a list of symbol ids. The default (empty) admits every symbol.
run([]) is a no-op. It does not decompress anything; useful as a sanity check that the framework is wired up.
Python¶
import flox_py
reader = flox_py.DataReader("./tape")
stats = flox_py.EventTypeStatsAggregator()
counts = flox_py.BinCountAggregator(
bucket_ns=60_000_000_000, # 1-minute buckets
by_side=True,
)
volume = flox_py.VolumeBinAggregator(
bucket_ns=60_000_000_000,
by_side=True,
)
ohlc = flox_py.OHLCBinAggregator(
bucket_ns=60_000_000_000,
)
peaks = flox_py.PeakAggregator(
window_ns_list=[1_000_000, 10_000_000, 100_000_000, 1_000_000_000],
top_n=10,
)
quantiles = flox_py.QuantileAggregator(
window_ns_list=[1_000_000, 100_000_000],
quantiles=[0.5, 0.95, 0.99],
)
reader.run([stats, counts, volume, ohlc, peaks, quantiles])
# .result() is populated once run() returns.
print(stats.result()) # structured numpy array
print(counts.result()) # structured numpy array
print(volume.result()) # structured numpy array
print(ohlc.result()) # structured numpy array
print(peaks.result()) # dict[window_ns, list[(count, start_ns)]]
print(quantiles.result()) # dict[window_ns, dict[quantile, count]]
The tabular aggregators (stats, counts, volume, ohlc) return structured numpy arrays. Field names match the row struct, so arr["count"], arr["bucket_ts_ns"], arr["close_raw"] work directly.
side encoding in BinCount and VolumeBin rows: 0 means aggregate (no side split), 1 is BUY, 2 is SELL. symbol_id = 0 means aggregate; set by_symbol=True to get one row per (bucket, symbol_id) cell.
Node.js¶
import { DataReader, EventTypeStatsAggregator, BinCountAggregator,
VolumeBinAggregator, OHLCBinAggregator, PeakAggregator,
QuantileAggregator, AggregatorEventFilter }
from '@flox-foundation/flox';
const reader = new DataReader('./tape');
const F = AggregatorEventFilter;
const stats = new EventTypeStatsAggregator(F.Both, []);
const counts = new BinCountAggregator(60_000_000_000n, /*bySide=*/true);
const volume = new VolumeBinAggregator(60_000_000_000n, /*bySide=*/true);
const ohlc = new OHLCBinAggregator(60_000_000_000n);
const peaks = new PeakAggregator(
[1_000_000n, 10_000_000n, 100_000_000n, 1_000_000_000n],
/*topN=*/10
);
const quantiles = new QuantileAggregator(
[1_000_000n, 100_000_000n],
[0.5, 0.95, 0.99]
);
reader.run([stats, counts, volume, ohlc, peaks, quantiles]);
console.log(stats.result());
console.log(counts.result());
console.log(volume.result());
console.log(ohlc.result());
console.log(peaks.result());
console.log(quantiles.result());
Node returns BigInt for ns timestamps and 64-bit counters, so values past 2^53 keep precision. The Peak result is Array<{windowNs, peaks: Array<{count, startNs}>}> rather than a Map; iterate with for (const entry of result).
Merged tapes¶
MergedTapeReader.run([...]) takes the same aggregator panel and walks the merged stream from N input tapes in a single pass. Aggregators see events with global-rewritten symbol ids; the originating tape index is not surfaced. Use streamEvents() directly when the tape index matters.
merged = flox_py.MergedTapeReader(["./tape-bybit", "./tape-bitget"])
peaks = flox_py.PeakAggregator(window_ns_list=[1_000_000_000], top_n=20)
merged.run([peaks])
Aggregator semantics¶
EventTypeStats¶
Result is sorted by symbol_id ascending. Symbols with zero matching events don't appear in the output. Pass an explicit symbol_filter for a stable shape across runs.
BinCount and VolumeBin¶
The bucket for an event at time t is floor(t / bucket_ns) * bucket_ns. Empty buckets produce no rows. by_side=True only splits trades (book events stay in side=0); by_symbol=True produces one row per (bucket, symbol_id) cell.
VolumeBin ignores book events. Pass event_filter=BooksOnly and you get an empty result by design.
OHLCBin¶
Same bucketing as BinCount / VolumeBin. For every (bucket, optional symbol) cell, four values come out of the trades that fell into it:
open_raw:price_rawof the trade with the smallestexchange_ts_ns.close_raw:price_rawof the trade with the largestexchange_ts_ns.high_raw/low_raw: max and min ofprice_rawacross the cell.
Books are ignored — no scalar price per book event. Empty buckets produce no row; forward-fill on the caller side if a dense series is needed.
There is no by_side parameter. An "open price for buys" is not a generally useful primitive; pair with VolumeBinAggregator(by_side=True) when per-side flow has to sit next to the price series.
*_raw fields are fixed-point int64. Divide by Price::SCALE (= 1e8) to get floats. Parallel runs preserve open/close ordering by comparing the per-cell first/last timestamps on merge.
Peak¶
For each window_ns, finds the top-N intervals of width window_ns that contain the most events. After the walk, finalize() removes peaks that fall within 3 × window_ns of a stronger one already kept; this keeps the top-N from being filled with adjacent samples of one burst.
oversample_factor (default 100) sets the in-flight candidate heap size to top_n × oversample_factor. Raise it when tapes have many distinct bursts clustered within 3 × window_ns of an earlier candidate.
start_ns in the result is t - window_ns, where t is the arrival time of the last event in the window. The window starts window_ns earlier and ends at the event.
Quantile¶
For each window_ns, the aggregator records the sliding-window event count at every event arrival and accumulates those observations into a histogram. finalize() then resolves each requested quantile q to the smallest count value C for which fraction(observed ≤ C) ≥ q.
Quantiles must be in (0.0, 1.0]. The result row count is len(window_ns_list) × len(quantiles). When event_filter admits no events, rows still appear with count=0 so the shape stays stable.
Parallel execution¶
reader.run(panel) takes an optional n_threads argument:
n_threads policy:
0(default): auto, resolved tomin(blocks_per_segment / 2, hardware_concurrency()). Small captures stay single-threaded; multi-block segments use all available cores. The caller does not need to know the tape layout or core count.1: explicit single-thread. Useful for benchmarking the base path, or for environments where threading is undesirable.>1: explicit worker count, capped to the effective block count per segment.n_threads=100on a 4-block segment allocates 4 panel clones.
Partitioning is intra-segment at the compressed-block level. Each worker holds its own panel, cloned from the caller's via IAggregator::cloneEmpty(), and walks its assigned block range; results merge into the caller's originals via IAggregator::merge() before finalize(). Captures with one large active segment (md_collector style) benefit from this layout; segment-level partitioning would leave workers idle when one segment dominates.
Aggregators with associative state (EventTypeStats, BinCount, VolumeBin, Quantile) produce identical results between single-thread and parallel runs.
PeakAggregator and QuantileAggregator are sliding-window aggregators. With N workers there are up to N-1 partition seams per segment, each at most max(window_ns) wide; windows that span a seam are not seen by any single worker. The trade-off is documented on IAggregator::merge. Results match the single-thread reference when window_ns is much wider than the seam. At sub-millisecond scales on a tape with 16 workers, expect a small fraction of cross-seam windows to be miscounted.
MergedTapeReader.run() is single-threaded (per-tape symbol rekey is instance-local, and parallel workers would not share a global symbol-id space).
Out-of-order tapes and the reorder buffer¶
For segments without the Sorted flag (tapes produced by external writers such as md_collector or third-party recorders), the reader applies two ordering stages:
- Intra-block sort runs on decompress, removing the ~1–2 ms exchange-WS jitter that accounts for most exchange-feed reordering. Cost: one linear scan of the block buffer per Sorted=false block.
- Cross-block reorder buffer is a bounded min-heap of in-flight events keyed by
exchange_ts_ns. As events arrive (each block already intra-sorted), the heap drains anything whose timestamp falls outsidewatermark - reorder_window_ns. Memory:O(reorder_window_ns × peak_event_rate).
reorder_window_ns is a constructor argument on DataReader:
The 10s default covers ~99% of cross-block inversions observed on real md_collector captures (1–25s tail, p99 around 5s). If an event arrives with exchange_ts_ns < watermark - reorder_window_ns, the buffer cannot slot it in sorted order and the reader raises FloxError(code="E_DATA_002") carrying the observed delta. The caller can raise reorder_window_ns and retry, or pre-sort the tape through BinaryLogWriter, which sets the Sorted flag and bypasses the reorder buffer.
Performance notes¶
onEvent runs once per event for each aggregator in the panel. Adding aggregators raises the per-event cost; it does not change how many times the tape is walked.
Aggregators don't do I/O and don't call into Python. The only hot-path allocation is the bounded sliding deque used by Peak and Quantile.
For multi-tape captures use MergedTapeReader.run(). The C++ N-way heap merge keeps peak memory at O(N_tapes) regardless of total event count.
The Quantile histogram is keyed by count_value in an unordered_map. There are at most a few dozen distinct count values (bounded by the peak burst size), so the histogram stays small even when the tape contains billions of observations.
Long-running tapes: progress callback¶
DataReader.run() on a multi-day capture can sit silently for minutes. progress_callback lets the caller report status without changing the aggregator surface.
def progress(pct: float, ts_ns: int) -> bool:
print(f" {pct * 100:5.1f}% ts={ts_ns}", flush=True)
return True # return False to cancel
reader.run([agg],
n_threads=1,
progress_callback=progress,
progress_interval_ms=1000)
The callback fires from inside the run loop at most once per progress_interval_ms (default 1 s) with the bytes-of-events pct in [0.0, 1.0] and the exchange timestamp of the most recent dispatched event. Returning False cancels the run: run() returns False, aggregators are still finalised with the partial state they had accumulated, and the caller can call .result() on them. Returning True (or None) lets the run continue. Raising inside the callback also cancels, and the exception re-raises after run() finalises so the caller sees it instead of a silent partial.
Progress is reported only on the single-thread path (n_threads=1). Multi-thread runs ignore the callback because per-event GIL reentry would defeat the parallelism gained from cloning aggregators across workers; for those, run a smaller probe slice first to estimate throughput, then let the full run complete without progress.
// Node.js: { onProgress, progressIntervalMs, nThreads }
reader.run([agg], {
nThreads: 1,
progressIntervalMs: 1000,
onProgress: (pct, tsNs) => { console.log(`${(pct*100).toFixed(1)}%`); },
});
See also¶
- Record and replay market data with
flox tapefor capturing the.floxlog. - Merged tape consumption for
MergedTapeReaderreference when streamingrun()is not enough.