ProjectX Broker Architecture
Adapter internals · SignalR real-time fills · HTTP poll fallback · circuit breaker · account sync · Common Architecture →
1
ProjectX Adapter Internal Structure
classDiagram
class Adapter {
+Config cfg
+Logger logger
-RWMutex tokenMu
-string token
-AtomicBool connected
-Once stopOnce
-Channel stopCh
-Pool bufPool
-HttpClient httpClient
+Connect(ctx) error
+PlaceOrder(ctx, order) Execution
+Subscribe(ctx, eventsCh) error
+IsConnected() bool
+Disconnect() error
-authenticate(ctx) error
-pollFallback(ctx, eventsCh, accountID) error
-fetchFilledOrders(ctx, accountID, startTime) orderItem
}
class Config {
+string BaseURL
+string HubURL
+string UserName
+string APIKey
+string AccountID
}
class fillReceiver {
+SignalRHub hub
-Channel events
-int accountID
-Logger logger
-Channel disconnectCh
+GatewayUserOrder(msg) void
+GatewayLogout(reason) void
}
class placeOrderReq {
+int accountId
+string contractId
+int orderType
+int side
+int size
+float64 limitPrice
+float64 stopPrice
+string customTag
}
class signalROrderData {
+int id
+int accountId
+string contractId
+int status
+int side
+int size
+int fillVolume
+float64 filledPrice
}
Adapter --> Config : uses
Adapter ..> fillReceiver : creates in Subscribe
Adapter ..> placeOrderReq : builds in PlaceOrder
fillReceiver --> signalROrderData : parses GatewayUserOrder
2
ProjectX Adapter Lifecycle
stateDiagram-v2
[*] --> Created : New(cfg, logger)
Created --> Connecting : Connect(ctx)
Connecting --> Authenticated : POST /Auth/loginKey → Bearer token\nstored in tokenMu-guarded field
Authenticated --> Connected : connected.Store(true)
Connected --> Subscribing : Subscribe(ctx, events)
Subscribing --> SignalRActive : NewHTTPConnection() + NewClient() OK\nSubscribeOrders(accountID) sent
Subscribing --> PollFallback : NewHTTPConnection() FAILS\nor NewClient() FAILS
SignalRActive --> PollFallback : GatewayLogout received\nor WaitForState(ClientClosed)
SignalRActive --> Disconnected : ctx.Done()
PollFallback --> Disconnected : ctx.Done()
Connected --> Disconnected : Disconnect()
Disconnected --> [*]
note right of Authenticated
token: RWMutex-protected
bufPool: sync.Pool of *bytes.Buffer
httpClient: persistent TCP, 10 conns/host
end note
note right of PollFallback
Seed seen map: fetch 24h window
Retry seed up to 5× on error
Ticker: 100ms
Heartbeat log: 30s
Runtime window: last 2 min only
end note
3
ProjectX Authentication Flow
sequenceDiagram
box rgb(234,243,222) Go Adapter
participant A as Adapter.authenticate()
participant P as sync.Pool bufPool
end
box rgb(238,237,254) ProjectX API
participant PX as POST /Auth/loginKey
end
A->>P: Get() *bytes.Buffer
A->>A: json.Encode({userName, apiKey})
A->>PX: POST https://api.thefuturesdesk.projectx.com/api/Auth/loginKey\nContent-Type: application/json\n{userName, apiKey}
P-->>A: Put() buffer back
PX-->>A: 200 {success:true, token:"eyJ..."}
A->>A: tokenMu.Lock()\ntoken = response.Token\ntokenMu.Unlock()
Note over A,PX: On 401 during PlaceOrder — re-authenticate once inline
A->>PX: POST /Auth/loginKey (retry)
PX-->>A: new token
A->>A: update token under RWMutex
A->>A: retry original request with new Bearer
4
Fill Detection — Dual-Path Strategy
flowchart TD
classDef primary fill:#EAF3DE,stroke:#3B6D11,color:#27500A
classDef fallback fill:#FFF3CD,stroke:#856404,color:#533F03
classDef event fill:#EEEDFE,stroke:#534AB7,color:#3C3489
classDef filter fill:#E6F1FB,stroke:#185FA5,color:#0C447C
classDef discard fill:#F8D7DA,stroke:#721C24,color:#491217
START(["Subscribe(ctx, events chan) called"])
subgraph PRIMARY["PRIMARY PATH — SignalR WebSocket"]
P1["HubURL + ?access_token=token\nsignalr.NewHTTPConnection(ctx, url)"]:::primary
P2["signalr.NewClient(ctx,\n WithConnection(conn),\n WithReceiver(&fillReceiver{}))"]:::primary
P3["client.Start()\nWaitForState(ClientConnected)"]:::primary
P4["client.Send('SubscribeOrders', accountID)"]:::primary
P5{"Select:\nctx.Done |\nreceiver.disconnectCh |\nWaitForState(ClientClosed)"}:::primary
P6["GatewayUserOrder(msg string) callback\nJSON unmarshal signalROrderEvent\n{action, data:{id, accountId, status, side, size, filledPrice}}"]:::primary
PF{"status==2 AND\naccountID matches?"}:::filter
end
subgraph FALLBACK["FALLBACK PATH — 100ms HTTP Poll"]
F1["Seed seen map:\nfetchFilledOrders(ctx, accountID, now-24h)\nRetry up to 5× on error"]:::fallback
F2["ticker = 100ms\nheartbeat = 30s"]:::fallback
F3["fetchFilledOrders(ctx, accountID, pollStart-2min)\nPOST /Order/search {accountId, startTimestamp}"]:::fallback
F4{"seen[orderID]?"}:::filter
F5{"creationTimestamp\n< pollStart?"}:::filter
F6["emit BrokerEvent{type:fill,\n payload:Fill{OrderID,Symbol,\n Side,Qty,Price}}"]:::event
end
START --> P1
P1 -->|handshake OK| P2 --> P3 --> P4 --> P5
P1 -->|handshake FAIL| F1
P2 -->|client error| F1
P5 -->|ctx.Done| DONE(["✓ clean shutdown"])
P5 -->|disconnectCh closed\n= GatewayLogout| F1
P5 -->|ClientClosed| F1
P5 -.->|"Hub pushes\nGatewayUserOrder"| P6
P6 --> PF
PF -->|yes| F6
PF -->|no| DISCARD1(["⊘ wrong account\nor not a fill"]):::discard
F1 --> F2 --> F3 --> F4
F4 -->|already seen| F3
F4 -->|new| F5
F5 -->|before service start| DISCARD2(["⊘ replay guard\nskip historical fill"]):::discard
F5 -->|after service start| F6
F6 -->|"events <- BrokerEvent"| KAFKA(["Ingestion Service\nnormalize() → TradeSignal\n→ trade.signals Kafka"]):::event
5
PlaceOrder Flow (Follower Account Execution)
sequenceDiagram
box rgb(234,243,222) Router Fan-Out
participant FO as fanout goroutine\n(2.5s deadline)
participant CB as Circuit Breaker\n(gobreaker)
end
box rgb(234,243,222) ProjectX Adapter
participant A as Adapter.PlaceOrder()
participant BP as bufPool
end
box rgb(238,237,254) ProjectX REST
participant PX as POST /Order/place
end
FO->>CB: cb.Execute(func() { adapter.PlaceOrder(ctx, order) })
CB->>A: PlaceOrder(ctx, order)
Note over A: order.ID = uuid.NewString()\n(unique customTag — required per account)
A->>A: tokenMu.RLock() → read token\n tokenMu.RUnlock()
A->>A: map Side string → int (BUY=1, SELL=0)\nmap Type string → int (MARKET=2, LIMIT=1, STOP=3)
A->>BP: Get() *bytes.Buffer
A->>A: json.Encode(placeOrderReq{\n accountId, contractId,\n type, side, size,\n customTag: order.ID })
A->>BP: Put() buffer back
A->>PX: POST /api/Order/place\nAuthorization: Bearer {token}\nContent-Type: application/json
alt HTTP 200
PX-->>A: {orderId: 12345, success: true}
A-->>CB: Execution{BrokerOrderID:"12345",\n Status:"WORKING", LatencyMs:Xms}
CB-->>FO: Execution (circuit stays CLOSED)
else HTTP 401 — token expired
PX-->>A: 401 Unauthorized
A->>A: authenticate(ctx) — re-auth once
A->>PX: retry POST /Order/place with new token
PX-->>A: 200 {orderId, success}
A-->>CB: Execution
CB-->>FO: Execution
else HTTP 4xx/5xx or success=false
PX-->>A: {success:false, errorCode:N}
A-->>CB: Execution{Status:"REJECTED", ErrorMsg:"errorCode=N"}
CB-->>FO: Execution (counts as failure toward OPEN threshold)
else timeout / network error
A-->>CB: error
CB->>CB: ConsecutiveFailures++\nif ≥5 → state=OPEN
CB-->>FO: gobreaker.ErrOpenState
FO->>FO: Execution{Status:"REJECTED"}
end
6
ProjectX Account Sync Flow (Frontend → DB)
flowchart LR
classDef ui fill:#E6F1FB,stroke:#185FA5,color:#0C447C
classDef api fill:#E6F1FB,stroke:#185FA5,color:#0C447C
classDef px fill:#EEEDFE,stroke:#534AB7,color:#3C3489
classDef go fill:#EAF3DE,stroke:#3B6D11,color:#27500A
classDef db fill:#FCEBEB,stroke:#A32D2D,color:#791F1F
U["👤 User\nclicks + Add Connection\nselects ProjectX"]:::ui
M["AddConnectionModal\nusername + API key\n(no password field)"]:::ui
R["POST /api/projectx/auth\nNext.js server route\n(server-side, key never\nhits browser)"]:::api
subgraph PXApi["ProjectX API"]
A1["POST /Auth/loginKey\n{userName, apiKey}\n→ Bearer token"]:::px
A2["POST /Account/search\n{Authorization: Bearer}\n→ [{id, name, balance,...}, ...]"]:::px
end
subgraph Import["Parallel Import"]
C1["Promise.all(accounts.map(\n a => POST /api/accounts {\n name: a.name,\n broker: 'projectx',\n account_ref: a.id.toString(),\n is_master: false,\n is_enabled: true\n }\n))"]:::go
end
DB[("accounts table\ncredentials table\nAPI key encrypted\nwith pgcrypto")]:::db
P["Reload accounts list\nin browser\n(revalidatePath)"]:::ui
U --> M --> R --> A1 --> A2 --> Import --> DB --> P
7
ProjectX SignalR Protocol Detail
sequenceDiagram
box rgb(234,243,222) Go Adapter
participant C as signalr.Client
participant R as fillReceiver hub
end
box rgb(238,237,254) ProjectX SignalR Hub
participant H as wss://rtc.thefuturesdesk.projectx.com/hubs/user
end
C->>H: GET /hubs/user?access_token={Bearer}\nUpgrade: websocket
H-->>C: 101 Switching Protocols
C->>H: {"protocol":"json","version":1}
H-->>C: {"type":1} (handshake ack)
C->>H: {"type":1,"target":"SubscribeOrders",\n"arguments":[{accountID}]}
H-->>C: {"type":1,"target":"GatewayUserOrder",\n"arguments":['{"action":1,"data":{...}}']}
Note over C,H: Every fill on the account triggers a push
H-->>R: GatewayUserOrder(msg string)\nmsg = '{"action":1,"data":{"id":99,"accountId":123,\n"contractId":"CON.F.US.EP.H25","status":2,\n"side":1,"size":1,"fillVolume":1,"filledPrice":5432.50}}'
R->>R: json.Unmarshal → signalROrderEvent\nCheck: status==2 AND accountId matches
R->>R: events <- BrokerEvent{type:"fill",\n payload: Fill{...}}
alt Session displaced (trader logs in via browser)
H-->>R: GatewayUserOrder or GatewayLogout("session displaced")
R->>R: close(disconnectCh)
C->>C: select: disconnectCh closed\n→ switch to pollFallback()
end
alt Connection drop
H--xC: WS connection closed
C->>C: WaitForState(ClientClosed) fires\n→ switch to pollFallback()
end
8
HTTP Poll Fallback — Deduplication Logic
flowchart TD
classDef step fill:#E6F1FB,stroke:#185FA5,color:#0C447C
classDef store fill:#FCEBEB,stroke:#A32D2D,color:#791F1F
classDef ok fill:#EAF3DE,stroke:#3B6D11,color:#27500A
classDef discard fill:#F8D7DA,stroke:#721C24,color:#491217
classDef warn fill:#FFF3CD,stroke:#856404,color:#533F03
START(["pollFallback() entered\npollStart = time.Now()"])
SEED["Seed Phase\nfetchFilledOrders(ctx, accountID, pollStart - 24h)\nMark ALL returned IDs in seen map\nRetry up to 5× with linear backoff"]:::store
LOOP["Tick every 100ms\nfetchFilledOrders(ctx, accountID, pollStart - 2min)\n(small window — seen map covers older orders)"]:::step
CHECK1{"seen[order.ID]?"}
MARK["seen[order.ID] = true"]:::store
CHECK2{"order.CreationTimestamp\n< pollStart?"}
EMIT["emit BrokerEvent{type:fill}\nIngestion normalizes → TradeSignal\nPublish to trade.signals"]:::ok
HBEAT["Heartbeat every 30s\nlog: poll alive, seen_orders count"]:::warn
START --> SEED --> LOOP
LOOP --> CHECK1
CHECK1 -->|already seen| LOOP
CHECK1 -->|new| MARK --> CHECK2
CHECK2 -->|before service start\nreplay guard| DISCARD(["⊘ skip — historical fill\nwould double-copy"]):::discard
CHECK2 -->|after service start| EMIT --> LOOP
LOOP -.->|every 30s| HBEAT
LOOP -->|ctx.Done| END(["✓ clean shutdown"])
9
Circuit Breaker State Machine (per broker)
stateDiagram-v2
[*] --> CLOSED
CLOSED --> CLOSED : PlaceOrder success\nConsecutiveFailures reset to 0
CLOSED --> OPEN : ConsecutiveFailures ≥ 5\n(any error or timeout)
OPEN --> HALF_OPEN : 30 seconds elapsed
HALF_OPEN --> CLOSED : 1 probe request succeeds
HALF_OPEN --> OPEN : probe request fails
note right of CLOSED
MaxRequests: unlimited
Interval: 30s (count reset)
end note
note right of OPEN
All calls return immediately with
gobreaker.ErrOpenState
Execution.Status = "REJECTED"
end note
note right of HALF_OPEN
MaxRequests: 1 (only 1 probe allowed)
Timeout: 30s
end note
10
ProjectX API Endpoints Reference
| Endpoint | Method | Auth | Purpose |
|---|---|---|---|
/api/Auth/loginKey |
POST | none | Obtain Bearer token from username + API key |
/api/Account/search |
POST | Bearer | List all accounts for the authenticated user |
/api/Order/place |
POST | Bearer | Submit a market/limit/stop order |
/api/Order/search |
POST | Bearer | Query filled orders (poll fallback) |
wss://rtc…/hubs/user?access_token= |
WS | query param | SignalR hub for real-time order events |
SignalR Methods (sent by adapter)
SubscribeOrders(accountID int)subscribe to order events for one account
SignalR Callbacks (received)
GatewayUserOrder(msg string)order state change push; status 2 = filledGatewayLogout(reason string)session displaced; triggers fallback
Order Type Mapping
| TradeNova | ProjectX int |
|---|---|
| MARKET | 2 |
| LIMIT | 1 |
| STOP | 3 |
Side Mapping
| TradeNova | ProjectX int |
|---|---|
| BUY | 1 |
| SELL | 0 |
★
Color Legend
Blue
Browser / Next.js / Frontend
Green
Go backend services
Amber
Kafka topics & messaging
Red
Datastores (PostgreSQL, Redis)
Purple
ProjectX / external broker APIs
Gray
Observability / Monitoring
Yellow
Fallback / warning paths
Red (light)
Rejected / discarded paths