CandleAggregator¶
CandleAggregator transforms incoming TradeEvents into time-aligned OHLCV candles and broadcasts them via CandleBus.
class CandleAggregator : public ISubsystem, public IMarketDataSubscriber {
public:
CandleAggregator(std::chrono::seconds interval, CandleBus* bus);
void start() override;
void stop() override;
SubscriberId id() const override;
SubscriberMode mode() const override;
void onTrade(const TradeEvent& trade) override;
private:
struct PartialCandle {
Candle candle;
bool initialized = false;
};
std::chrono::seconds _interval;
CandleBus* _bus;
std::vector<std::optional<PartialCandle>> _candles;
TimePoint alignToInterval(TimePoint tp);
};
Purpose¶
- Buffer and roll trades into interval-based candles, suitable for downstream analytics or strategy inputs.
Responsibilities¶
| Aspect | Details |
|---|---|
| Interval | Candle size defined by _interval, validated at construction. |
| Event input | Consumes only TradeEvent; no handling for books or candles. |
| Event output | Emits CandleEvent to all subscribers via CandleBus. |
| Lifecycle | Hooks into engine via ISubsystem::start() and stop(). |
| Subscriber ID | Uses object pointer as a unique SubscriberId. |
| Mode | Operates in PUSH mode for direct event delivery. |
Internal Behavior¶
-
Time Slot Alignment Trade timestamps are aligned using
alignToInterval()to find the start of the containing interval. -
Per-Symbol Buffering
_candlesis astd::vector<std::optional<PartialCandle>>, indexed bySymbolId, pre-sized for all known symbols. -
Candle Rollover If a trade belongs to a new interval, the previous candle is finalized and sent; a new
PartialCandleis started. -
No Hot Allocations Once the vector is initialized, the hot path is allocation-free; avoids
unordered_maplookup cost.
Notes¶
- Designed for maximum cache-friendliness and fan-out throughput.
- Ignores out-of-order or backdated trades — assumes input stream is clean and ordered.
- Fully decoupled via
CandleBus; downstream consumers remain unaware of source logic.