Fix: unwrap WS payloads + safer reconnect

This commit is contained in:
2026-03-15 15:06:16 -07:00
parent 1c57c60770
commit 3c48e2bd50

View File

@@ -4,6 +4,19 @@ import { EventEmitter } from 'events';
const WS_URL = 'wss://api.elections.kalshi.com/trade-api/ws/v2'; const WS_URL = 'wss://api.elections.kalshi.com/trade-api/ws/v2';
function unwrapPacket(packet) {
if (!packet || typeof packet !== 'object') return { type: null, payload: null };
const type = packet.type || null;
const meta = { id: packet.id, sid: packet.sid, seq: packet.seq, type };
if (packet.msg && typeof packet.msg === 'object') {
return { type, payload: { ...meta, ...packet.msg } };
}
return { type, payload: { ...meta, ...packet } };
}
export class KalshiWS extends EventEmitter { export class KalshiWS extends EventEmitter {
constructor() { constructor() {
super(); super();
@@ -12,11 +25,14 @@ export class KalshiWS extends EventEmitter {
this.alive = false; this.alive = false;
this.reconnectTimer = null; this.reconnectTimer = null;
this.pingInterval = null; this.pingInterval = null;
this.shouldReconnect = true;
} }
connect() { connect() {
if (this.ws) this.disconnect(); if (this.ws) this.disconnect();
this.shouldReconnect = true;
const path = '/trade-api/ws/v2'; const path = '/trade-api/ws/v2';
const headers = signRequest('GET', path); const headers = signRequest('GET', path);
@@ -26,27 +42,29 @@ export class KalshiWS extends EventEmitter {
console.log('[WS] Connected to Kalshi'); console.log('[WS] Connected to Kalshi');
this.alive = true; this.alive = true;
this._startPing(); this._startPing();
// Resubscribe to any tickers we were watching
for (const ticker of this.subscribedTickers) { for (const ticker of this.subscribedTickers) this._sendSubscribe(ticker);
this._sendSubscribe(ticker);
}
this.emit('connected'); this.emit('connected');
}); });
this.ws.on('message', (raw) => { this.ws.on('message', (raw) => {
try { try {
const msg = JSON.parse(raw.toString()); const packet = JSON.parse(raw.toString());
this._handleMessage(msg); this._handleMessage(packet);
} catch (e) { } catch (e) {
console.error('[WS] Parse error:', e.message); console.error('[WS] Parse error:', e.message);
} }
}); });
this.ws.on('close', (code) => { this.ws.on('close', (code) => {
console.log(`[WS] Disconnected (code: ${code}). Reconnecting in 3s...`); console.log(`[WS] Disconnected (code: ${code}).`);
this.alive = false; this.alive = false;
this._stopPing(); this._stopPing();
if (this.shouldReconnect) {
console.log('[WS] Reconnecting in 3s...');
this._scheduleReconnect(); this._scheduleReconnect();
}
}); });
this.ws.on('error', (err) => { this.ws.on('error', (err) => {
@@ -55,13 +73,17 @@ export class KalshiWS extends EventEmitter {
} }
subscribeTicker(ticker) { subscribeTicker(ticker) {
if (!ticker) return;
this.subscribedTickers.add(ticker); this.subscribedTickers.add(ticker);
if (this.alive) this._sendSubscribe(ticker); if (this.alive) this._sendSubscribe(ticker);
} }
unsubscribeTicker(ticker) { unsubscribeTicker(ticker) {
if (!ticker) return;
this.subscribedTickers.delete(ticker); this.subscribedTickers.delete(ticker);
if (this.alive) {
if (this.alive && this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ this.ws.send(JSON.stringify({
id: Date.now(), id: Date.now(),
cmd: 'unsubscribe', cmd: 'unsubscribe',
@@ -71,17 +93,22 @@ export class KalshiWS extends EventEmitter {
} }
disconnect() { disconnect() {
this.shouldReconnect = false;
this._stopPing(); this._stopPing();
clearTimeout(this.reconnectTimer); clearTimeout(this.reconnectTimer);
if (this.ws) { if (this.ws) {
this.ws.removeAllListeners(); this.ws.removeAllListeners();
this.ws.close(); this.ws.close();
this.ws = null; this.ws = null;
} }
this.alive = false; this.alive = false;
} }
_sendSubscribe(ticker) { _sendSubscribe(ticker) {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return;
this.ws.send(JSON.stringify({ this.ws.send(JSON.stringify({
id: Date.now(), id: Date.now(),
cmd: 'subscribe', cmd: 'subscribe',
@@ -89,27 +116,38 @@ export class KalshiWS extends EventEmitter {
})); }));
} }
_handleMessage(msg) { _handleMessage(packet) {
const { type } = msg; const { type, payload } = unwrapPacket(packet);
if (!type || !payload) return;
if (type === 'orderbook_snapshot' || type === 'orderbook_delta') { if (type === 'orderbook_snapshot' || type === 'orderbook_delta') {
this.emit('orderbook', msg); this.emit('orderbook', payload);
} else if (type === 'ticker') { return;
this.emit('ticker', msg); }
} else if (type === 'subscribed') {
console.log(`[WS] Subscribed to: ${msg.msg?.channels || 'unknown'}`); if (type === 'ticker') {
this.emit('ticker', payload);
return;
}
if (type === 'subscribed') {
const channel = payload.channel || payload.channels || 'unknown';
const text = Array.isArray(channel) ? channel.join(', ') : channel;
console.log(`[WS] Subscribed to: ${text}`);
} }
} }
_startPing() { _startPing() {
this._stopPing();
this.pingInterval = setInterval(() => { this.pingInterval = setInterval(() => {
if (this.alive && this.ws?.readyState === WebSocket.OPEN) { if (this.alive && this.ws?.readyState === WebSocket.OPEN) this.ws.ping();
this.ws.ping();
}
}, 15000); }, 15000);
} }
_stopPing() { _stopPing() {
clearInterval(this.pingInterval); clearInterval(this.pingInterval);
this.pingInterval = null;
} }
_scheduleReconnect() { _scheduleReconnect() {