Skip to content

ParallelReader

ParallelReader processes multiple segments concurrently using a thread pool, merging results into timestamp-sorted order for delivery.

struct ParallelReaderConfig
{
  std::filesystem::path data_dir;
  uint32_t num_threads{0};            // 0 = auto-detect
  size_t prefetch_segments{2};
  size_t event_buffer_size{10000};
  std::optional<int64_t> from_ns;
  std::optional<int64_t> to_ns;
  std::set<uint32_t> symbols;
  bool verify_crc{true};
  bool sort_output{true};
};

class ParallelReader
{
public:
  explicit ParallelReader(ParallelReaderConfig config);

  using EventCallback = std::function<bool(const ReplayEvent&)>;
  uint64_t forEach(EventCallback callback);

  using BatchCallback = std::function<bool(const std::vector<ReplayEvent>&)>;
  uint64_t forEachBatch(BatchCallback callback);

  template <typename Result>
  using SegmentProcessor = std::function<Result(const std::vector<ReplayEvent>&,
                                                  const SegmentInfo&)>;

  template <typename Result>
  std::vector<Result> mapSegments(SegmentProcessor<Result> processor);

  ParallelReaderStats stats() const;
  const std::vector<SegmentInfo>& segments() const;
  uint32_t numThreads() const;
};

Purpose

  • Maximize throughput by reading multiple segments in parallel.
  • Merge events from different segments into correct timestamp order.
  • Enable map-reduce style processing over segments.

Configuration

Field Type Description
data_dir filesystem::path Directory containing segments
num_threads uint32_t Thread count (0 = hardware threads)
prefetch_segments size_t Segments to prefetch ahead
event_buffer_size size_t Events per segment buffer
from_ns optional<int64_t> Start timestamp filter
to_ns optional<int64_t> End timestamp filter
symbols set<uint32_t> Symbol IDs to include
verify_crc bool Verify CRC32 checksums
sort_output bool Merge to timestamp order

Statistics

struct ParallelReaderStats
{
  uint64_t segments_processed;
  uint64_t events_read;
  uint64_t trades_read;
  uint64_t book_updates_read;
  uint64_t bytes_read;
  uint64_t crc_errors;
  int64_t start_time_ns;
  int64_t end_time_ns;

  double eventsPerSecond() const;
  double throughputMBps() const;
};

Usage

Basic Iteration

replay::ParallelReaderConfig config{
    .data_dir = "/data/market",
    .num_threads = 4
};

replay::ParallelReader reader(config);

reader.forEach([](const replay::ReplayEvent& event) {
    // Events arrive in timestamp order
    return true;
});

auto stats = reader.stats();
std::cout << "Throughput: " << stats.eventsPerSecond() << " events/sec\n";

Batch Processing

reader.forEachBatch([](const std::vector<replay::ReplayEvent>& batch) {
    // Process entire segment at once
    return true;
});

Map-Reduce Pattern

struct SegmentStats {
    uint64_t trades;
    uint64_t books;
    double volume;
};

auto results = reader.mapSegments<SegmentStats>(
    [](const std::vector<replay::ReplayEvent>& events, const replay::SegmentInfo& info) {
        SegmentStats stats{};
        for (const auto& e : events) {
            if (e.type == replay::EventType::Trade) {
                stats.trades++;
                stats.volume += e.trade.qty_raw / 1e9;
            } else {
                stats.books++;
            }
        }
        return stats;
    });

Convenience Functions

// Simple parallel iteration
uint64_t count = replay::parallelForEach("/data/market", callback, 4);

// Parallel event count
uint64_t total = replay::parallelCount("/data/market");

Architecture

┌─────────────────────────────────────────────────────────┐
│                    ParallelReader                       │
├─────────────────────────────────────────────────────────┤
│  ┌──────────┐  ┌──────────┐  ┌──────────┐              │
│  │ Worker 0 │  │ Worker 1 │  │ Worker N │              │
│  │          │  │          │  │          │              │
│  │ Seg 0    │  │ Seg 1    │  │ Seg N    │              │
│  └────┬─────┘  └────┬─────┘  └────┬─────┘              │
│       │             │             │                     │
│       └─────────────┼─────────────┘                     │
│                     ▼                                   │
│            ┌────────────────┐                           │
│            │  Merge Queue   │  (k-way merge)            │
│            └────────┬───────┘                           │
│                     ▼                                   │
│            ┌────────────────┐                           │
│            │   Callback     │  (timestamp order)        │
│            └────────────────┘                           │
└─────────────────────────────────────────────────────────┘

Notes

  • Workers read segments independently and buffer events in memory.
  • K-way merge produces globally sorted output when sort_output=true.
  • Thread count defaults to std::thread::hardware_concurrency().
  • Best for bulk processing where I/O bandwidth is the bottleneck.