Skip to content

EventBus

EventBus is a lock-free fan-out messaging system for publishing typed events to multiple subscribers. It supports both push (threaded delivery) and pull (manual draining) modes, and is configurable for sync or async semantics via policy injection.

template <typename Event, typename Policy, size_t QueueSize = config::DEFAULT_EVENTBUS_QUEUE_SIZE>
class EventBus;

Purpose

  • Deliver high-frequency events (market data, orders, etc.) to multiple subscribers with minimal latency and zero allocations.

Supported Policies

Policy Description
AsyncPolicy Lock-free delivery in per-subscriber threads with no tick synchronization.
SyncPolicy Uses TickBarrier to coordinate delivery across all subscribers per tick.

Key Responsibilities

Method Description
subscribe() Registers a new subscriber with individual queue and mode.
publish() Broadcasts an event to all subscribers; sync-mode waits for completion.
start() / stop() Starts or stops all push-mode subscriber threads.
getQueue(id) Provides direct access to a subscriber’s queue (for pull-mode).
enableDrainOnStop() Ensures any remaining events are dispatched before shutdown.

Design Highlights

  • Queue per subscriber: Each listener has a dedicated SPSCQueue to avoid contention.
  • Thread-per-subscriber: Only for PUSH mode; PULL consumers must drain manually.
  • RAII-controlled sync: TickGuard ensures deterministic behavior in SyncPolicy.
  • Zero dynamic allocations in hot path (emplace() used directly).
  • Tick-sequenced events: tickSequence field is automatically set if present.

Internal Types

Name Description
QueueItem Depends on Policy: either raw Event or (Event, TickBarrier*).
Listener Inferred from Event::Listener.
Queue Lock-free ring buffer (single producer/consumer).
Entry Subscription record: listener, queue, thread, and mode.

Notes

  • Sync-mode (SyncPolicy) is ideal for deterministic backtesting or simulation.
  • Async-mode (AsyncPolicy) is best suited for ultra-low-latency live systems.
  • EventBus is fully generic — works with any event type that defines a Listener.

Example Usage

using BookBus = EventBus<pool::Handle<BookUpdateEvent>, AsyncPolicy<pool::Handle<BookUpdateEvent>>>;
BookBus bus;
bus.subscribe(bookHandler);
bus.publish(bookUpdateHandle);