Segment Operations¶
Utilities for merging, splitting, exporting, validating, and partitioning binary log data.
Merge¶
merge(input_paths, output_dir, output_name="merged", compression="none", sort=True) -> dict¶
Merge multiple segment files into one.
result = flox.merge(
input_paths=["seg1.flox", "seg2.flox"],
output_dir="./merged",
output_name="combined",
compression="lz4",
sort=True,
)
print(f"Merged {result['segments_merged']} segments, {result['events_written']} events")
Returns dict:
| Key | Type | Description |
|---|---|---|
success |
bool |
Whether merge succeeded |
output_path |
str |
Output file path |
segments_merged |
int |
Input segments processed |
events_written |
int |
Events in output |
bytes_written |
int |
Output size |
errors |
list |
Error messages |
merge_dir(input_dir, output_dir) -> dict¶
Merge all segments in a directory (shorthand for merge).
Split¶
split(input_path, output_dir, mode="time", time_interval_ns=3600000000000, events_per_file=1000000) -> dict¶
Split a segment file into multiple files.
result = flox.split(
input_path="large_file.flox",
output_dir="./split",
mode="time",
time_interval_ns=3600_000_000_000, # 1 hour
)
| Parameter | Type | Default | Description |
|---|---|---|---|
mode |
str |
"time" |
Split mode: "time", "event_count", "size", "symbol" |
time_interval_ns |
int |
1 hour | Time interval for "time" mode |
events_per_file |
int |
1000000 | Events per file for "event_count" mode |
Returns dict:
| Key | Type | Description |
|---|---|---|
success |
bool |
Whether split succeeded |
output_paths |
list[str] |
Output file paths |
segments_created |
int |
Files created |
events_written |
int |
Total events |
errors |
list |
Error messages |
Export¶
export_data(input_path, output_path, format="csv", from_ns=None, to_ns=None, symbols=None) -> dict¶
Export binary data to CSV, JSON, or JSONLines.
result = flox.export_data(
input_path="data.flox",
output_path="trades.csv",
format="csv",
symbols=[1, 2],
)
| Parameter | Type | Default | Description |
|---|---|---|---|
format |
str |
"csv" |
Output format: "csv", "json", "jsonlines", "binary" |
from_ns |
int |
None |
Start timestamp filter |
to_ns |
int |
None |
End timestamp filter |
symbols |
list[int] |
None |
Symbol ID filter |
Returns dict:
| Key | Type | Description |
|---|---|---|
success |
bool |
Whether export succeeded |
output_path |
str |
Output file path |
events_exported |
int |
Events exported |
bytes_written |
int |
Output size |
errors |
list |
Error messages |
Other Operations¶
recompress(input_path, output_path, compression="lz4") -> bool¶
Re-compress a segment file. Returns True on success.
extract_symbols(input_path, output_path, symbols) -> int¶
Extract specific symbols to a new file. Returns event count.
extract_time_range(input_path, output_path, from_ns, to_ns) -> int¶
Extract a time range to a new file. Returns event count.
Validation¶
validate(segment_path, verify_crc=True, verify_timestamps=True) -> dict¶
Validate a single segment file.
result = flox.validate("data.flox")
if not result['valid']:
for issue in result['issues']:
print(f"[{issue['severity']}] {issue['message']}")
Returns dict:
| Key | Type | Description |
|---|---|---|
path |
str |
File path |
valid |
bool |
Overall validity |
header_valid |
bool |
Header integrity |
reported_event_count |
int |
Events per header |
actual_event_count |
int |
Events found |
has_index |
bool |
Has index section |
index_valid |
bool |
Index integrity |
trades_found |
int |
Trade count |
book_updates_found |
int |
Book update count |
crc_errors |
int |
CRC failures |
timestamp_anomalies |
int |
Out-of-order timestamps |
has_errors |
bool |
Has error-level issues |
has_critical |
bool |
Has critical issues |
issues |
list[dict] |
Issue details |
validate_dataset(data_dir) -> dict¶
Validate all segments in a directory.
result = flox.validate_dataset("./data")
print(f"{result['valid_segments']}/{result['total_segments']} valid")
Partitioner¶
Partition datasets for parallel processing or walk-forward backtesting.
Methods¶
partition_by_time(num_partitions, warmup_ns=0) -> list[dict]¶
Split into N equal time-based partitions.
partition_by_duration(duration_ns, warmup_ns=0) -> list[dict]¶
Partition by fixed time duration.
partition_by_calendar(unit, warmup_ns=0) -> list[dict]¶
Partition by calendar unit.
| Unit | Description |
|---|---|
"hour" |
Hourly partitions |
"day" |
Daily partitions |
"week" |
Weekly partitions |
"month" |
Monthly partitions |
partition_by_symbol(num_partitions) -> list[dict]¶
Group symbols into N partitions.
partition_per_symbol() -> list[dict]¶
One partition per symbol.
partition_by_event_count(num_partitions) -> list[dict]¶
Split by event count.
Partition Dict¶
Each partition is a dict:
| Key | Type | Description |
|---|---|---|
partition_id |
int |
Partition index |
from_ns |
int |
Start timestamp |
to_ns |
int |
End timestamp |
warmup_from_ns |
int |
Warmup start timestamp |
estimated_events |
int |
Estimated event count |
estimated_bytes |
int |
Estimated data size |
symbols |
list[int] |
Symbols in partition |