CandleAggregator
CandleAggregator
converts a stream of trade events into time-boxed OHLCV candles and publishes them via CandleBus
.
class CandleAggregator {
public:
using Trait = traits::CandleAggregatorTrait;
using Allocator = PoolAllocator<Trait, 8>;
CandleAggregator(std::chrono::seconds interval, CandleBusRef bus);
~CandleAggregator();
void start();
void stop();
SubscriberId id() const;
SubscriberMode mode() const;
void onTrade(const TradeEvent& ev);
void onBookUpdate(const BookUpdateEvent&) {}
void onCandle (const CandleEvent&) {}
private:
struct PartialCandle {
Candle candle;
bool initialized = false;
};
std::chrono::seconds _interval{1};
CandleBusRef _bus;
std::vector<std::optional<PartialCandle>> _candles;
std::chrono::steady_clock::time_point
alignToInterval(std::chrono::steady_clock::time_point tp);
};
Purpose
- Aggregate raw trades into fixed-interval candles (1 s, 5 s, 1 m, …).
- Bridge market-data ingestion and strategy layers through a lightweight bus message.
Responsibilities
Aspect | Details |
---|---|
Interval | _interval defines candle length; constructor asserts > 0 . |
Event handling | Consumes TradeEvent ; snapshot / delta events are ignored. |
Publishing | Emits CandleEvent to every subscriber via CandleBusRef . |
Lifecycle | start() /stop() wired through SubsystemTrait for deterministic init / tear-down. |
Internal Behaviour
- Time alignment – incoming trade timestamp is rounded down by
alignToInterval()
. - Partial buffer – one
PartialCandle
per symbol (pre-sizedstd::vector<std::optional<…>>
). - Roll-over – when a trade crosses into a new slot, the previous candle is finalised and sent.
- Memory – no dynamic allocations in the hot path after vector capacity is reserved.
Notes
- Dispatch uses the static v-table generated by
CandleAggregatorTrait
; zero virtual calls. onBookUpdate
andonCandle
are no-ops – extend if mid-bar book stats are ever required.- Thread safety: rely on the surrounding
EventBus
queues; the class itself is single-threaded.