Skip to content

Import multi-exchange public archives

flox_py.archives collects the per-exchange importers under one namespace. Each submodule exposes the same two entry points and the matching CLI subcommand:

Exchange Module Trade entry point CLI
Binance flox_py.archives.binance aggtrades_to_floxlog flox archive binance ...
Bybit flox_py.archives.bybit trades_to_floxlog flox archive bybit ...
OKX flox_py.archives.okx trades_to_floxlog flox archive okx ...
Bitget flox_py.archives.bitget trades_to_floxlog flox archive bitget ...
Deribit flox_py.archives.deribit trades_to_floxlog flox archive deribit ...

Adding a new venue is a self-contained module under flox_py/archives/ that implements trades_to_floxlog + range_to_floxlog matching the ArchiveReader Protocol.

Bybit

Bybit publishes daily trade ticks at https://public.bybit.com/ going back two-plus years. The on-disk CSV layout (post-2022):

timestamp, symbol, side, size, price, tickDirection, trdMatchID,
grossValue, homeNotional, foreignNotional

timestamp is Unix seconds (microsecond decimal precision). side is the active flow as Buy / Sell strings; the importer maps to floxlog's Side::BUY / Side::SELL. trdMatchID is the exchange-assigned trade id, used as the floxlog trade_id for append-safe dedup.

URL layout

  • spot: public.bybit.com/spot/<SYMBOL>/<SYMBOL><YYYY-MM-DD>.csv.gz
  • linear: public.bybit.com/trading/<SYMBOL>/<SYMBOL><YYYY-MM-DD>.csv.gz
  • inverse: public.bybit.com/trading/<SYMBOL>/<SYMBOL><YYYY-MM-DD>.csv.gz

The converter accepts the symbol verbatim; cross-exchange symbol normalisation is a W5 connectors concern, not an importer one. Each tape keys its own (metadata.exchange, name) so MergedTapeReader treats binance:BTCUSDT and bybit:BTCUSDT as distinct global symbols.

Example

The script below builds a synthetic Bybit-style gzipped CSV in memory, runs trades_to_floxlog, then reads the produced tape back through DataReader.read_trades:

"""Bybit public archive round-trip — build a synthetic gzipped CSV
matching Bybit's published column layout, push it through
`bybit.trades_to_floxlog`, and read the resulting `.floxlog` tape
back via `DataReader` to confirm the trade stream round-trips.

CI-runnable companion to
[Import multi-exchange archives](../how-to/import-multi-exchange-archives.md).
No network — the fixture is built in memory.

Usage:
    cd /path/to/flox
    PYTHONPATH=build/python python3 docs/examples/python_bybit_archive.py
"""
from __future__ import annotations

import gzip
import io
import shutil
import tempfile
from pathlib import Path

import flox_py
from flox_py.archives import bybit


_ROWS = [
    (1_700_000_000.500, "BTCUSDT", "Buy",  0.01, 42_100.5,
     "ZeroPlusTick",  "abc123", "", "", ""),
    (1_700_000_001.250, "BTCUSDT", "Sell", 0.02, 42_101.0,
     "ZeroMinusTick", "def456", "", "", ""),
    (1_700_000_002.000, "BTCUSDT", "Buy",  0.03, 42_100.7,
     "PlusTick",      "789012", "", "", ""),
]


def _build_gz(dest: Path) -> Path:
    buf = io.StringIO()
    buf.write("timestamp,symbol,side,size,price,tickDirection,"
              "trdMatchID,grossValue,homeNotional,foreignNotional\n")
    for r in _ROWS:
        buf.write(",".join(str(x) for x in r) + "\n")
    with gzip.open(dest, "wt", encoding="utf-8") as f:
        f.write(buf.getvalue())
    return dest


def main() -> None:
    workdir = Path(tempfile.mkdtemp(prefix="flox-bybit-"))
    try:
        gz_path = workdir / "BTCUSDT2024-01-15.csv.gz"
        _build_gz(gz_path)

        tape = workdir / "tape"
        stats = bybit.trades_to_floxlog(
            gz_path, tape,
            symbol_id=1, symbol_name="BTCUSDT", market="linear",
        )
        trades = flox_py.DataReader(str(tape)).read_trades()
        print(
            f"converted: rows_read={stats.rows_read} "
            f"trades_written={stats.trades_written} "
            f"tape_trades={int(trades.size)}"
        )
        # Buy → Side::BUY (0); Sell → Side::SELL (1).
        sides = [int(t["side"]) for t in trades]
        assert sides == [0, 1, 0], sides
    finally:
        shutil.rmtree(workdir, ignore_errors=True)


if __name__ == "__main__":
    main()

Range form

For multi-day backfills, range_to_floxlog downloads missing files from the public archive and reuses anything already in the local mirror cache:

flox_py.archives.bybit.range_to_floxlog(
    symbol="BTCUSDT",
    market="linear",
    date_from="2024-01-01",
    date_to="2024-12-31",
    out_tape="/path/floxlog/BTCUSDT_bybit",
    mirror=None,        # default = ~/.flox/archive-cache/bybit
    parallel=4,
    skip_missing=False,
)

CLI form:

flox archive bybit \
  --symbol BTCUSDT --market linear \
  --from 2024-01-01 --to 2024-12-31 \
  --out ./tapes/bybit-linear-BTCUSDT \
  --parallel 4

OKX

OKX publishes daily trade ticks on www.okx.com/cdn/okex/traderecords/ for spot, swap (perpetual), futures, and options. The on-disk CSV columns:

trade_id, side, size, price, timestamp_ms

trade_id is an integer exchange-assigned id, used directly as the floxlog trade_id for dedup. side is the active flow as buy / sell lowercase, mapped to floxlog's Side::BUY / Side::SELL.

URL layout

  • spot: cdn/okex/traderecords/spot/daily/<YYYYMMDD>/<SYMBOL>-trades-<YYYY-MM-DD>.zip
  • swap: cdn/okex/traderecords/swap/daily/<YYYYMMDD>/<SYMBOL>-trades-<YYYY-MM-DD>.zip
  • futures: cdn/okex/traderecords/futures/daily/<YYYYMMDD>/<SYMBOL>-trades-<YYYY-MM-DD>.zip
  • option: cdn/okex/traderecords/option/daily/<YYYYMMDD>/<SYMBOL>-trades-<YYYY-MM-DD>.zip

Symbol naming follows OKX convention (BTC-USDT for spot, BTC-USDT-SWAP for perp, BTC-29MAR24-50000-C for option-chain instruments). The converter accepts the symbol verbatim; cross-exchange normalisation is out of scope.

Example

The script below builds a synthetic OKX-style zipped CSV in memory and round-trips it through the converter:

"""OKX public archive round-trip — build a synthetic OKX-format CSV
in memory, run `okx.trades_to_floxlog`, then read the resulting
`.floxlog` back through `DataReader` to confirm the trade stream
round-trips.

CI-runnable companion to
[Import multi-exchange archives](../how-to/import-multi-exchange-archives.md).
No network — the fixture is built in memory.

Usage:
    cd /path/to/flox
    PYTHONPATH=build/python python3 docs/examples/python_okx_archive.py
"""
from __future__ import annotations

import io
import shutil
import tempfile
import zipfile
from pathlib import Path

import flox_py
from flox_py.archives import okx


_ROWS = [
    (100, "buy",  0.01, 42_100.5, 1_700_000_000_500),
    (101, "sell", 0.02, 42_101.0, 1_700_000_001_250),
    (102, "buy",  0.03, 42_100.7, 1_700_000_002_000),
]


def _build_zip(dest: Path) -> Path:
    buf = io.StringIO()
    buf.write("trade_id,side,size,price,timestamp_ms\n")
    for r in _ROWS:
        buf.write(",".join(str(x) for x in r) + "\n")
    with zipfile.ZipFile(dest, "w", zipfile.ZIP_DEFLATED) as zf:
        zf.writestr(dest.with_suffix(".csv").name, buf.getvalue())
    return dest


def main() -> None:
    workdir = Path(tempfile.mkdtemp(prefix="flox-okx-"))
    try:
        z = workdir / "BTC-USDT-SWAP-trades-2024-01-15.zip"
        _build_zip(z)

        tape = workdir / "tape"
        stats = okx.trades_to_floxlog(
            z, tape,
            symbol_id=1, symbol_name="BTC-USDT-SWAP", market="swap",
        )
        trades = flox_py.DataReader(str(tape)).read_trades()
        print(
            f"converted: rows_read={stats.rows_read} "
            f"trades_written={stats.trades_written} "
            f"tape_trades={int(trades.size)}"
        )
        sides = [int(t["side"]) for t in trades]
        assert sides == [0, 1, 0], sides
    finally:
        shutil.rmtree(workdir, ignore_errors=True)


if __name__ == "__main__":
    main()

CLI

# Single day from a local CSV / zip
flox archive okx \
  --csv ./BTC-USDT-SWAP-trades-2024-01-15.zip \
  --out ./tapes/okx-swap-BTC-USDT-SWAP \
  --symbol BTC-USDT-SWAP --market swap

# Multi-day range with download
flox archive okx \
  --symbol BTC-USDT-SWAP --market swap \
  --from 2024-01-01 --to 2024-12-31 \
  --out ./tapes/okx-swap-BTC-USDT-SWAP \
  --parallel 4

Bitget

Bitget publishes daily trade ticks on its public archive S3 / CDN mirror. The on-disk CSV columns:

trade_id, price, size, side, timestamp_ms

trade_id is an integer exchange-assigned id, used directly for append-safe dedup. side is the active flow as buy / sell lowercase. timestamp_ms is Unix milliseconds.

Market codes follow Bitget's own API naming: spot, umcbl (USDT-margined perpetual), cmcbl (coin-margined perpetual). The converter accepts them verbatim.

Example

"""Bitget public archive round-trip — build a synthetic Bitget-format
CSV in memory, run `bitget.trades_to_floxlog`, then read the
resulting `.floxlog` back through `DataReader`.

CI-runnable companion to
[Import multi-exchange archives](../how-to/import-multi-exchange-archives.md).
No network — the fixture is built in memory.

Usage:
    cd /path/to/flox
    PYTHONPATH=build/python python3 docs/examples/python_bitget_archive.py
"""
from __future__ import annotations

import io
import shutil
import tempfile
import zipfile
from pathlib import Path

import flox_py
from flox_py.archives import bitget


_ROWS = [
    (100, 42_100.5, 0.01, "buy",  1_700_000_000_500),
    (101, 42_101.0, 0.02, "sell", 1_700_000_001_250),
    (102, 42_100.7, 0.03, "buy",  1_700_000_002_000),
]


def _build_zip(dest: Path) -> Path:
    buf = io.StringIO()
    buf.write("trade_id,price,size,side,timestamp_ms\n")
    for r in _ROWS:
        buf.write(",".join(str(x) for x in r) + "\n")
    with zipfile.ZipFile(dest, "w", zipfile.ZIP_DEFLATED) as zf:
        zf.writestr(dest.with_suffix(".csv").name, buf.getvalue())
    return dest


def main() -> None:
    workdir = Path(tempfile.mkdtemp(prefix="flox-bitget-"))
    try:
        z = workdir / "BTCUSDT-trades-2024-01-15.zip"
        _build_zip(z)

        tape = workdir / "tape"
        stats = bitget.trades_to_floxlog(
            z, tape,
            symbol_id=1, symbol_name="BTCUSDT", market="umcbl",
        )
        trades = flox_py.DataReader(str(tape)).read_trades()
        print(
            f"converted: rows_read={stats.rows_read} "
            f"trades_written={stats.trades_written} "
            f"tape_trades={int(trades.size)}"
        )
        sides = [int(t["side"]) for t in trades]
        assert sides == [0, 1, 0], sides
    finally:
        shutil.rmtree(workdir, ignore_errors=True)


if __name__ == "__main__":
    main()

CLI

# Single day from a local file
flox archive bitget \
  --csv ./BTCUSDT-trades-2024-01-15.zip \
  --out ./tapes/bitget-umcbl-BTCUSDT \
  --symbol BTCUSDT --market umcbl

# Multi-day range with download
flox archive bitget \
  --symbol BTCUSDT --market umcbl \
  --from 2024-01-01 --to 2024-12-31 \
  --out ./tapes/bitget-umcbl-BTCUSDT \
  --parallel 4

Bitget specifically matters for production reproduction: md_collector deployments on Singapore default to Bitget feeds for multi-symbol fixtures, so the archive lets researchers re-run the same (exchange, name) keying that the live capture used.

Deribit

Deribit dominates crypto options volume and is the only venue of this set whose public archive carries options trades as a first-class event type. The on-disk CSV columns (post-2022):

trade_id, timestamp_ms, instrument, side, price, amount,
mark_price, iv, index_price

trade_id is integer, used directly for append-safe dedup. side is buy / sell lowercase. mark_price, iv, and index_price are dropped at read time — the floxlog TradeRecord schema does not represent them; keep the source CSV alongside the tape and re-join in numpy when needed.

Instrument naming

  • perpetual: BTC-PERPETUAL, ETH-PERPETUAL, ...
  • future: BTC-29MAR24, ETH-28JUN24, ... (date-encoded expiry)
  • option: BTC-29MAR24-50000-C, BTC-29MAR24-50000-P (date-encoded expiry + strike + C/P)

The converter takes one instrument per tape. Multi-instrument option-chain aggregation (one tape covering every strike at a given expiry) is left as a follow-up — backtests that pin to a specific strike or roll through a known series sequentially are well served by the single-instrument path.

Example

"""Deribit public archive round-trip — build synthetic Deribit-format
CSVs in memory for a perpetual and an option-chain instrument, run
`deribit.trades_to_floxlog` on each, and read the resulting
`.floxlog` tapes back through `DataReader`.

CI-runnable companion to
[Import multi-exchange archives](../how-to/import-multi-exchange-archives.md).

Usage:
    cd /path/to/flox
    PYTHONPATH=build/python python3 docs/examples/python_deribit_archive.py
"""
from __future__ import annotations

import gzip
import io
import shutil
import tempfile
from pathlib import Path

import flox_py
from flox_py.archives import deribit


_ROWS_PERP = [
    (100, 1_700_000_000_500, "BTC-PERPETUAL", "buy",  42_100.5, 0.1, 42_100.0, 0.0, 42_098.0),
    (101, 1_700_000_001_250, "BTC-PERPETUAL", "sell", 42_101.0, 0.2, 42_101.0, 0.0, 42_099.5),
    (102, 1_700_000_002_000, "BTC-PERPETUAL", "buy",  42_100.7, 0.3, 42_100.7, 0.0, 42_098.5),
]

_ROWS_OPT = [
    (200, 1_700_000_000_000, "BTC-29MAR24-50000-C", "buy",  0.0500, 10.0, 0.0498, 0.55, 42_000.0),
    (201, 1_700_000_001_000, "BTC-29MAR24-50000-C", "sell", 0.0510, 5.0,  0.0511, 0.56, 42_010.0),
]


def _build_gz(dest: Path, rows) -> Path:
    buf = io.StringIO()
    buf.write("trade_id,timestamp_ms,instrument,side,price,amount,"
              "mark_price,iv,index_price\n")
    for r in rows:
        buf.write(",".join(str(x) for x in r) + "\n")
    with gzip.open(dest, "wt", encoding="utf-8") as f:
        f.write(buf.getvalue())
    return dest


def main() -> None:
    workdir = Path(tempfile.mkdtemp(prefix="flox-deribit-"))
    try:
        perp_gz = workdir / "BTC-PERPETUAL-2024-01-15.csv.gz"
        _build_gz(perp_gz, _ROWS_PERP)
        perp_tape = workdir / "tape-perp"
        ps = deribit.trades_to_floxlog(
            perp_gz, perp_tape,
            symbol_id=1, symbol_name="BTC-PERPETUAL", market="perpetual",
        )
        print(f"perp: rows_read={ps.rows_read} trades_written={ps.trades_written}")

        opt_gz = workdir / "BTC-29MAR24-50000-C-2024-01-15.csv.gz"
        _build_gz(opt_gz, _ROWS_OPT)
        opt_tape = workdir / "tape-option"
        os_ = deribit.trades_to_floxlog(
            opt_gz, opt_tape,
            symbol_id=1, symbol_name="BTC-29MAR24-50000-C", market="option",
        )
        print(f"option: rows_read={os_.rows_read} trades_written={os_.trades_written}")

        for label, tape in (("perp", perp_tape), ("option", opt_tape)):
            r = flox_py.DataReader(str(tape))
            trades = r.read_trades()
            print(f"  {label}: {int(trades.size)} trades in tape")
    finally:
        shutil.rmtree(workdir, ignore_errors=True)


if __name__ == "__main__":
    main()

CLI

# Single day from a local file
flox archive deribit \
  --csv ./BTC-PERPETUAL-2024-01-15.csv.gz \
  --out ./tapes/deribit-perp-BTC \
  --symbol BTC-PERPETUAL --market perpetual

# Multi-day option-chain instrument
flox archive deribit \
  --symbol BTC-29MAR24-50000-C --market option \
  --from 2024-01-01 --to 2024-03-29 \
  --out ./tapes/deribit-opt-BTC-29MAR24-50000-C \
  --parallel 4

Shared download cache

All exchange-specific importers share one on-disk cache, rooted by default at ~/.flox/archive-cache/. Override with FLOX_ARCHIVE_CACHE=/some/path (env var) or by passing mirror=... to the range form. Each exchange writes under its own subdirectory (bybit/, binance/, etc.) so the cache layout stays predictable.

The cache has no auto-eviction; it grows monotonically. Wipe it whenever you want — the download path is idempotent and will refetch what is missing.

Cross-exchange research

Tapes from different exchanges sit side by side in MergedTapeReader. The reader assigns one global symbol id per (exchange, name) pair, so a strategy or analysis that consumes both binance:BTCUSDT and bybit:BTCUSDT sees two distinct streams with the right exchange tag on every event.

import flox_py
reader = flox_py.MergedTapeReader([
    "./tapes/binance-um-BTCUSDT",
    "./tapes/bybit-linear-BTCUSDT",
])
trades = reader.read_trades()
sym_table = reader.symbol_table()   # [{global_id, exchange, name, ...}, ...]

The (exchange, name) keying is the contract: every cross-exchange analysis that wants the right interpretation of an event must hold the exchange tag alongside the symbol id.

Adding a new exchange

A new exchange importer is one module under flox_py/archives/<exchange>.py exposing:

  • trades_to_floxlog(csv_path, out_tape, *, symbol_id, symbol_name, market, ...) — parse one day's CSV, write trades via DataWriter, append-safe by the venue's trade id.
  • range_to_floxlog(symbol, market, date_from, date_to, out_tape, *, mirror, parallel, ...) — download a date range, hand each day to the single-day function, merge metadata.json with the union counters.

Register the new module in archives/__init__.py, add a flox archive <exchange> subparser in flox_py/cli.py, and write a synthetic-fixture test under python/tests/. The CLI and tests work generically against the ArchiveReader Protocol; no changes elsewhere in the framework are required.

See also