diff --git a/lib/kalshi/websocket.js b/lib/kalshi/websocket.js new file mode 100644 index 0000000..9daef7c --- /dev/null +++ b/lib/kalshi/websocket.js @@ -0,0 +1,119 @@ +import WebSocket from 'ws'; +import { signRequest } from './auth.js'; +import { EventEmitter } from 'events'; + +const WS_URL = 'wss://api.elections.kalshi.com/trade-api/ws/v2'; + +export class KalshiWS extends EventEmitter { + constructor() { + super(); + this.ws = null; + this.subscribedTickers = new Set(); + this.alive = false; + this.reconnectTimer = null; + this.pingInterval = null; + } + + connect() { + if (this.ws) this.disconnect(); + + const path = '/trade-api/ws/v2'; + const headers = signRequest('GET', path); + + this.ws = new WebSocket(WS_URL, { headers }); + + this.ws.on('open', () => { + console.log('[WS] Connected to Kalshi'); + this.alive = true; + this._startPing(); + // Resubscribe to any tickers we were watching + for (const ticker of this.subscribedTickers) { + this._sendSubscribe(ticker); + } + this.emit('connected'); + }); + + this.ws.on('message', (raw) => { + try { + const msg = JSON.parse(raw.toString()); + this._handleMessage(msg); + } catch (e) { + console.error('[WS] Parse error:', e.message); + } + }); + + this.ws.on('close', (code) => { + console.log(`[WS] Disconnected (code: ${code}). Reconnecting in 3s...`); + this.alive = false; + this._stopPing(); + this._scheduleReconnect(); + }); + + this.ws.on('error', (err) => { + console.error('[WS] Error:', err.message); + }); + } + + subscribeTicker(ticker) { + this.subscribedTickers.add(ticker); + if (this.alive) this._sendSubscribe(ticker); + } + + unsubscribeTicker(ticker) { + this.subscribedTickers.delete(ticker); + if (this.alive) { + this.ws.send(JSON.stringify({ + id: Date.now(), + cmd: 'unsubscribe', + params: { channels: ['orderbook_delta', 'ticker'], market_tickers: [ticker] } + })); + } + } + + disconnect() { + this._stopPing(); + clearTimeout(this.reconnectTimer); + if (this.ws) { + this.ws.removeAllListeners(); + this.ws.close(); + this.ws = null; + } + this.alive = false; + } + + _sendSubscribe(ticker) { + this.ws.send(JSON.stringify({ + id: Date.now(), + cmd: 'subscribe', + params: { channels: ['orderbook_delta', 'ticker'], market_tickers: [ticker] } + })); + } + + _handleMessage(msg) { + const { type } = msg; + if (type === 'orderbook_snapshot' || type === 'orderbook_delta') { + this.emit('orderbook', msg); + } else if (type === 'ticker') { + this.emit('ticker', msg); + } else if (type === 'subscribed') { + console.log(`[WS] Subscribed to: ${msg.msg?.channels || 'unknown'}`); + } + } + + _startPing() { + this.pingInterval = setInterval(() => { + if (this.alive && this.ws?.readyState === WebSocket.OPEN) { + this.ws.ping(); + } + }, 15000); + } + + _stopPing() { + clearInterval(this.pingInterval); + } + + _scheduleReconnect() { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = setTimeout(() => this.connect(), 3000); + } +}