Write a Custom Connector¶
Connect FLOX to a new exchange or data source.
Overview¶
A connector: 1. Connects to an exchange API (WebSocket, REST, FIX, etc.) 2. Parses incoming messages 3. Converts to FLOX event types 4. Emits events to the engine
1. Implement IExchangeConnector¶
Header: flox/connector/abstract_exchange_connector.h
#include "flox/connector/abstract_exchange_connector.h"
#include "flox/book/events/trade_event.h"
#include "flox/book/events/book_update_event.h"
class MyExchangeConnector : public flox::IExchangeConnector
{
public:
MyExchangeConnector(const std::string& symbol, flox::SymbolId symbolId)
: _symbol(symbol), _symbolId(symbolId) {}
std::string exchangeId() const override {
return "myexchange";
}
void start() override {
_running = true;
_thread = std::thread(&MyExchangeConnector::run, this);
}
void stop() override {
_running = false;
if (_thread.joinable()) {
_thread.join();
}
}
private:
void run() {
// Connect to exchange
connect();
while (_running) {
// Receive and parse messages
auto msg = receiveMessage();
handleMessage(msg);
}
disconnect();
}
void handleMessage(const Message& msg) {
if (msg.type == MessageType::Trade) {
handleTrade(msg);
} else if (msg.type == MessageType::BookUpdate) {
handleBookUpdate(msg);
}
}
void handleTrade(const Message& msg) {
flox::TradeEvent ev;
ev.trade.symbol = _symbolId;
ev.trade.price = flox::Price::fromDouble(msg.price);
ev.trade.quantity = flox::Quantity::fromDouble(msg.qty);
ev.trade.isBuy = msg.side == "buy";
ev.trade.exchangeTsNs = msg.timestamp;
ev.recvNs = flox::nowNsMonotonic();
// Emit via base class method
emitTrade(ev);
}
void handleBookUpdate(const Message& msg) {
// For book updates, create event directly (simplified)
// In practice, you might use a pool for large events
flox::BookUpdateEvent ev(/* pmr allocator */);
ev.update.symbol = _symbolId;
ev.update.type = flox::BookUpdateType::SNAPSHOT;
for (const auto& bid : msg.bids) {
ev.update.bids.push_back({
flox::Price::fromDouble(bid.price),
flox::Quantity::fromDouble(bid.qty)
});
}
for (const auto& ask : msg.asks) {
ev.update.asks.push_back({
flox::Price::fromDouble(ask.price),
flox::Quantity::fromDouble(ask.qty)
});
}
ev.recvNs = flox::nowNsMonotonic();
emitBookUpdate(ev);
}
std::string _symbol;
flox::SymbolId _symbolId;
std::atomic<bool> _running{false};
std::thread _thread;
};
2. Wire Callbacks¶
Connect the connector to your buses:
#include "flox/book/bus/trade_bus.h"
#include "flox/book/bus/book_update_bus.h"
#include "flox/util/memory/pool.h"
// Create buses
auto tradeBus = std::make_unique<TradeBus>();
auto bookBus = std::make_unique<BookUpdateBus>();
// Create pool for book events
pool::Pool<BookUpdateEvent, 128> bookPool;
// Create connector
auto connector = std::make_shared<MyExchangeConnector>("BTCUSD", symbolId);
// Wire callbacks
connector->setCallbacks(
// Book update callback
[&bookBus, &bookPool](const BookUpdateEvent& ev) {
// Acquire from pool for variable-size events
if (auto handle = bookPool.acquire()) {
(*handle)->update = ev.update;
(*handle)->recvNs = ev.recvNs;
bookBus->publish(std::move(handle));
}
},
// Trade callback
[&tradeBus](const TradeEvent& ev) {
tradeBus->publish(ev);
}
);
3. Register with Engine¶
std::vector<std::shared_ptr<IExchangeConnector>> connectors;
connectors.push_back(connector);
std::vector<std::unique_ptr<ISubsystem>> subsystems;
subsystems.push_back(std::move(tradeBus));
subsystems.push_back(std::move(bookBus));
// ... add strategies, etc.
EngineConfig config{};
Engine engine(config, std::move(subsystems), std::move(connectors));
engine.start();
4. Use ConnectorFactory (Optional)¶
For dynamic connector creation:
#include "flox/connector/connector_factory.h"
// Register factory
ConnectorFactory::instance().registerConnector("myexchange",
[registry](const std::string& symbol) {
auto symbolId = registry->getSymbolId("myexchange", symbol);
return std::make_shared<MyExchangeConnector>(symbol, *symbolId);
}
);
// Create connector
auto conn = ConnectorFactory::instance().createConnector("myexchange", "BTCUSD");
5. Best Practices¶
Thread Safety¶
- Connector runs its own thread(s)
emitTrade()andemitBookUpdate()are thread-safe- Callbacks may execute on connector thread
Timestamps¶
Capture timestamps at the right points:
void handleMessage(const RawMessage& raw) {
MonoNanos recvNs = nowNsMonotonic(); // Capture immediately
// Parse message...
auto parsed = parse(raw);
TradeEvent ev;
ev.trade.exchangeTsNs = parsed.exchangeTimestamp; // From exchange
ev.recvNs = recvNs; // When we received it
ev.publishTsNs = nowNsMonotonic(); // Right before emit
emitTrade(ev);
}
Error Handling¶
void run() {
while (_running) {
try {
if (!_connected) {
connect();
}
auto msg = receiveMessage();
handleMessage(msg);
} catch (const ConnectionError& e) {
FLOX_LOG("Connection lost: " << e.what());
_connected = false;
reconnect();
}
}
}
Sequence Numbers¶
Track exchange sequence numbers for gap detection:
void handleBookUpdate(const Message& msg) {
if (_lastSeq > 0 && msg.seq != _lastSeq + 1) {
FLOX_LOG("Gap detected: " << _lastSeq << " -> " << msg.seq);
requestSnapshot(); // Request full book snapshot
}
_lastSeq = msg.seq;
// ... create and emit event
}
Pooled Book Events¶
For high-frequency book updates, use pooled events:
class MyExchangeConnector : public IExchangeConnector
{
pool::Pool<BookUpdateEvent, 64> _bookPool;
void handleBookUpdate(const Message& msg) {
auto evOpt = _bookPool.acquire();
if (!evOpt) {
FLOX_LOG("Pool exhausted, dropping book update");
return;
}
auto& ev = *evOpt;
// ... populate ev
emitBookUpdate(*ev);
// Handle is moved to bus, returns to pool when all consumers done
}
};
6. Complete Example¶
#include "flox/connector/abstract_exchange_connector.h"
#include "flox/book/events/trade_event.h"
#include "flox/book/events/book_update_event.h"
#include "flox/util/memory/pool.h"
#include "flox/util/base/time.h"
#include "flox/log/log.h"
#include <thread>
#include <atomic>
namespace myexchange
{
using namespace flox;
class MyConnector : public IExchangeConnector
{
public:
MyConnector(SymbolId symbol, TradeBus& tradeBus, BookUpdateBus& bookBus)
: _symbol(symbol)
, _tradeBus(tradeBus)
, _bookBus(bookBus)
{}
std::string exchangeId() const override { return "myexchange"; }
void start() override {
if (_running.exchange(true)) return;
_thread = std::thread(&MyConnector::run, this);
}
void stop() override {
if (!_running.exchange(false)) return;
if (_thread.joinable()) _thread.join();
}
private:
void run() {
FLOX_LOG("[myexchange] Connecting...");
// ... connect to exchange
while (_running) {
// ... receive and process messages
}
FLOX_LOG("[myexchange] Disconnected");
}
void onTradeMessage(/* ... */) {
TradeEvent ev;
ev.trade.symbol = _symbol;
ev.trade.price = Price::fromDouble(/* ... */);
ev.trade.quantity = Quantity::fromDouble(/* ... */);
ev.trade.isBuy = /* ... */;
ev.trade.exchangeTsNs = /* ... */;
ev.recvNs = nowNsMonotonic();
_tradeBus.publish(ev);
}
void onBookMessage(/* ... */) {
auto evOpt = _bookPool.acquire();
if (!evOpt) return;
auto& ev = *evOpt;
ev->update.symbol = _symbol;
ev->update.type = BookUpdateType::SNAPSHOT;
// ... populate bids/asks
ev->recvNs = nowNsMonotonic();
_bookBus.publish(std::move(ev));
}
SymbolId _symbol;
TradeBus& _tradeBus;
BookUpdateBus& _bookBus;
pool::Pool<BookUpdateEvent, 64> _bookPool;
std::atomic<bool> _running{false};
std::thread _thread;
};
} // namespace myexchange
See Also¶
- Architecture Overview — How connectors fit
- First Strategy Tutorial — Consuming events