import WebSocket from 'ws'; import { signRequest } from './auth.js'; import { EventEmitter } from 'events'; const WS_URL = 'wss://api.elections.kalshi.com/trade-api/ws/v2'; export class KalshiWS extends EventEmitter { constructor() { super(); this.ws = null; this.subscribedTickers = new Set(); this.alive = false; this.reconnectTimer = null; this.pingInterval = null; } connect() { if (this.ws) this.disconnect(); const path = '/trade-api/ws/v2'; const headers = signRequest('GET', path); this.ws = new WebSocket(WS_URL, { headers }); this.ws.on('open', () => { console.log('[WS] Connected to Kalshi'); this.alive = true; this._startPing(); // Resubscribe to any tickers we were watching for (const ticker of this.subscribedTickers) { this._sendSubscribe(ticker); } this.emit('connected'); }); this.ws.on('message', (raw) => { try { const msg = JSON.parse(raw.toString()); this._handleMessage(msg); } catch (e) { console.error('[WS] Parse error:', e.message); } }); this.ws.on('close', (code) => { console.log(`[WS] Disconnected (code: ${code}). Reconnecting in 3s...`); this.alive = false; this._stopPing(); this._scheduleReconnect(); }); this.ws.on('error', (err) => { console.error('[WS] Error:', err.message); }); } subscribeTicker(ticker) { this.subscribedTickers.add(ticker); if (this.alive) this._sendSubscribe(ticker); } unsubscribeTicker(ticker) { this.subscribedTickers.delete(ticker); if (this.alive) { this.ws.send(JSON.stringify({ id: Date.now(), cmd: 'unsubscribe', params: { channels: ['orderbook_delta', 'ticker'], market_tickers: [ticker] } })); } } disconnect() { this._stopPing(); clearTimeout(this.reconnectTimer); if (this.ws) { this.ws.removeAllListeners(); this.ws.close(); this.ws = null; } this.alive = false; } _sendSubscribe(ticker) { this.ws.send(JSON.stringify({ id: Date.now(), cmd: 'subscribe', params: { channels: ['orderbook_delta', 'ticker'], market_tickers: [ticker] } })); } _handleMessage(msg) { const { type } = msg; if (type === 'orderbook_snapshot' || type === 'orderbook_delta') { this.emit('orderbook', msg); } else if (type === 'ticker') { this.emit('ticker', msg); } else if (type === 'subscribed') { console.log(`[WS] Subscribed to: ${msg.msg?.channels || 'unknown'}`); } } _startPing() { this.pingInterval = setInterval(() => { if (this.alive && this.ws?.readyState === WebSocket.OPEN) { this.ws.ping(); } }, 15000); } _stopPing() { clearInterval(this.pingInterval); } _scheduleReconnect() { clearTimeout(this.reconnectTimer); this.reconnectTimer = setTimeout(() => this.connect(), 3000); } }