Fix: Parse new Kalshi WS dollar-string format

This commit is contained in:
2026-03-15 15:47:45 -07:00
parent aedb6aeda5
commit c2f878b23d

View File

@@ -6,14 +6,11 @@ const WS_URL = 'wss://api.elections.kalshi.com/trade-api/ws/v2';
function unwrapPacket(packet) { function unwrapPacket(packet) {
if (!packet || typeof packet !== 'object') return { type: null, payload: null }; if (!packet || typeof packet !== 'object') return { type: null, payload: null };
const type = packet.type || null; const type = packet.type || null;
const meta = { id: packet.id, sid: packet.sid, seq: packet.seq, type }; const meta = { id: packet.id, sid: packet.sid, seq: packet.seq, type };
if (packet.msg && typeof packet.msg === 'object') { if (packet.msg && typeof packet.msg === 'object') {
return { type, payload: { ...meta, ...packet.msg } }; return { type, payload: { ...meta, ...packet.msg } };
} }
return { type, payload: { ...meta, ...packet } }; return { type, payload: { ...meta, ...packet } };
} }
@@ -26,11 +23,11 @@ export class KalshiWS extends EventEmitter {
this.reconnectTimer = null; this.reconnectTimer = null;
this.pingInterval = null; this.pingInterval = null;
this.shouldReconnect = true; this.shouldReconnect = true;
this._cmdId = 1;
} }
connect() { connect() {
if (this.ws) this.disconnect(); if (this.ws) this.disconnect();
this.shouldReconnect = true; this.shouldReconnect = true;
const path = '/trade-api/ws/v2'; const path = '/trade-api/ws/v2';
@@ -60,7 +57,6 @@ export class KalshiWS extends EventEmitter {
console.log(`[WS] Disconnected (code: ${code}).`); console.log(`[WS] Disconnected (code: ${code}).`);
this.alive = false; this.alive = false;
this._stopPing(); this._stopPing();
if (this.shouldReconnect) { if (this.shouldReconnect) {
console.log('[WS] Reconnecting in 3s...'); console.log('[WS] Reconnecting in 3s...');
this._scheduleReconnect(); this._scheduleReconnect();
@@ -80,14 +76,17 @@ export class KalshiWS extends EventEmitter {
unsubscribeTicker(ticker) { unsubscribeTicker(ticker) {
if (!ticker) return; if (!ticker) return;
this.subscribedTickers.delete(ticker); this.subscribedTickers.delete(ticker);
if (this.alive && this.ws?.readyState === WebSocket.OPEN) { if (this.alive && this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ this.ws.send(JSON.stringify({
id: Date.now(), id: this._cmdId++,
cmd: 'unsubscribe', cmd: 'unsubscribe',
params: { channels: ['orderbook_delta', 'ticker'], market_tickers: [ticker] } params: { channels: ['orderbook_delta'], market_ticker: ticker }
}));
this.ws.send(JSON.stringify({
id: this._cmdId++,
cmd: 'unsubscribe',
params: { channels: ['ticker'], market_ticker: ticker }
})); }));
} }
} }
@@ -96,23 +95,29 @@ export class KalshiWS extends EventEmitter {
this.shouldReconnect = false; this.shouldReconnect = false;
this._stopPing(); this._stopPing();
clearTimeout(this.reconnectTimer); clearTimeout(this.reconnectTimer);
if (this.ws) { if (this.ws) {
this.ws.removeAllListeners(); this.ws.removeAllListeners();
this.ws.close(); this.ws.close();
this.ws = null; this.ws = null;
} }
this.alive = false; this.alive = false;
} }
_sendSubscribe(ticker) { _sendSubscribe(ticker) {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return; if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return;
// Subscribe orderbook_delta (private channel) with market_ticker
this.ws.send(JSON.stringify({ this.ws.send(JSON.stringify({
id: Date.now(), id: this._cmdId++,
cmd: 'subscribe', cmd: 'subscribe',
params: { channels: ['orderbook_delta', 'ticker'], market_tickers: [ticker] } params: { channels: ['orderbook_delta'], market_ticker: ticker }
}));
// Subscribe ticker (public channel) with market_ticker
this.ws.send(JSON.stringify({
id: this._cmdId++,
cmd: 'subscribe',
params: { channels: ['ticker'], market_ticker: ticker }
})); }));
} }
@@ -130,16 +135,17 @@ export class KalshiWS extends EventEmitter {
return; return;
} }
if (type === 'subscribed') { if (type === 'subscribed' || type === 'ok') {
const channel = payload.channel || payload.channels || 'unknown'; console.log(`[WS] ${type}:`, JSON.stringify(payload).slice(0, 200));
const text = Array.isArray(channel) ? channel.join(', ') : channel; }
console.log(`[WS] Subscribed to: ${text}`);
if (type === 'error') {
console.error('[WS] Server error:', JSON.stringify(payload).slice(0, 300));
} }
} }
_startPing() { _startPing() {
this._stopPing(); this._stopPing();
this.pingInterval = setInterval(() => { this.pingInterval = setInterval(() => {
if (this.alive && this.ws?.readyState === WebSocket.OPEN) this.ws.ping(); if (this.alive && this.ws?.readyState === WebSocket.OPEN) this.ws.ping();
}, 15000); }, 15000);