diff --git a/lib/kalshi/websocket.js b/lib/kalshi/websocket.js index 9daef7c..8488a1b 100644 --- a/lib/kalshi/websocket.js +++ b/lib/kalshi/websocket.js @@ -4,6 +4,19 @@ import { EventEmitter } from 'events'; const WS_URL = 'wss://api.elections.kalshi.com/trade-api/ws/v2'; +function unwrapPacket(packet) { + if (!packet || typeof packet !== 'object') return { type: null, payload: null }; + + const type = packet.type || null; + const meta = { id: packet.id, sid: packet.sid, seq: packet.seq, type }; + + if (packet.msg && typeof packet.msg === 'object') { + return { type, payload: { ...meta, ...packet.msg } }; + } + + return { type, payload: { ...meta, ...packet } }; +} + export class KalshiWS extends EventEmitter { constructor() { super(); @@ -12,11 +25,14 @@ export class KalshiWS extends EventEmitter { this.alive = false; this.reconnectTimer = null; this.pingInterval = null; + this.shouldReconnect = true; } connect() { if (this.ws) this.disconnect(); + this.shouldReconnect = true; + const path = '/trade-api/ws/v2'; const headers = signRequest('GET', path); @@ -26,27 +42,29 @@ export class KalshiWS extends EventEmitter { console.log('[WS] Connected to Kalshi'); this.alive = true; this._startPing(); - // Resubscribe to any tickers we were watching - for (const ticker of this.subscribedTickers) { - this._sendSubscribe(ticker); - } + + for (const ticker of this.subscribedTickers) this._sendSubscribe(ticker); this.emit('connected'); }); this.ws.on('message', (raw) => { try { - const msg = JSON.parse(raw.toString()); - this._handleMessage(msg); + const packet = JSON.parse(raw.toString()); + this._handleMessage(packet); } catch (e) { console.error('[WS] Parse error:', e.message); } }); this.ws.on('close', (code) => { - console.log(`[WS] Disconnected (code: ${code}). Reconnecting in 3s...`); + console.log(`[WS] Disconnected (code: ${code}).`); this.alive = false; this._stopPing(); - this._scheduleReconnect(); + + if (this.shouldReconnect) { + console.log('[WS] Reconnecting in 3s...'); + this._scheduleReconnect(); + } }); this.ws.on('error', (err) => { @@ -55,13 +73,17 @@ export class KalshiWS extends EventEmitter { } subscribeTicker(ticker) { + if (!ticker) return; this.subscribedTickers.add(ticker); if (this.alive) this._sendSubscribe(ticker); } unsubscribeTicker(ticker) { + if (!ticker) return; + this.subscribedTickers.delete(ticker); - if (this.alive) { + + if (this.alive && this.ws?.readyState === WebSocket.OPEN) { this.ws.send(JSON.stringify({ id: Date.now(), cmd: 'unsubscribe', @@ -71,17 +93,22 @@ export class KalshiWS extends EventEmitter { } disconnect() { + this.shouldReconnect = false; this._stopPing(); clearTimeout(this.reconnectTimer); + if (this.ws) { this.ws.removeAllListeners(); this.ws.close(); this.ws = null; } + this.alive = false; } _sendSubscribe(ticker) { + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return; + this.ws.send(JSON.stringify({ id: Date.now(), cmd: 'subscribe', @@ -89,27 +116,38 @@ export class KalshiWS extends EventEmitter { })); } - _handleMessage(msg) { - const { type } = msg; + _handleMessage(packet) { + const { type, payload } = unwrapPacket(packet); + if (!type || !payload) return; + if (type === 'orderbook_snapshot' || type === 'orderbook_delta') { - this.emit('orderbook', msg); - } else if (type === 'ticker') { - this.emit('ticker', msg); - } else if (type === 'subscribed') { - console.log(`[WS] Subscribed to: ${msg.msg?.channels || 'unknown'}`); + this.emit('orderbook', payload); + return; + } + + if (type === 'ticker') { + this.emit('ticker', payload); + return; + } + + if (type === 'subscribed') { + const channel = payload.channel || payload.channels || 'unknown'; + const text = Array.isArray(channel) ? channel.join(', ') : channel; + console.log(`[WS] Subscribed to: ${text}`); } } _startPing() { + this._stopPing(); + this.pingInterval = setInterval(() => { - if (this.alive && this.ws?.readyState === WebSocket.OPEN) { - this.ws.ping(); - } + if (this.alive && this.ws?.readyState === WebSocket.OPEN) this.ws.ping(); }, 15000); } _stopPing() { clearInterval(this.pingInterval); + this.pingInterval = null; } _scheduleReconnect() {