diff --git a/lib/kalshi/websocket.js b/lib/kalshi/websocket.js index 8488a1b..97c47f5 100644 --- a/lib/kalshi/websocket.js +++ b/lib/kalshi/websocket.js @@ -6,14 +6,11 @@ const WS_URL = 'wss://api.elections.kalshi.com/trade-api/ws/v2'; function unwrapPacket(packet) { if (!packet || typeof packet !== 'object') return { type: null, payload: null }; - const type = packet.type || null; const meta = { id: packet.id, sid: packet.sid, seq: packet.seq, type }; - if (packet.msg && typeof packet.msg === 'object') { return { type, payload: { ...meta, ...packet.msg } }; } - return { type, payload: { ...meta, ...packet } }; } @@ -26,11 +23,11 @@ export class KalshiWS extends EventEmitter { this.reconnectTimer = null; this.pingInterval = null; this.shouldReconnect = true; + this._cmdId = 1; } connect() { if (this.ws) this.disconnect(); - this.shouldReconnect = true; const path = '/trade-api/ws/v2'; @@ -60,7 +57,6 @@ export class KalshiWS extends EventEmitter { console.log(`[WS] Disconnected (code: ${code}).`); this.alive = false; this._stopPing(); - if (this.shouldReconnect) { console.log('[WS] Reconnecting in 3s...'); this._scheduleReconnect(); @@ -80,14 +76,17 @@ export class KalshiWS extends EventEmitter { unsubscribeTicker(ticker) { if (!ticker) return; - this.subscribedTickers.delete(ticker); - if (this.alive && this.ws?.readyState === WebSocket.OPEN) { this.ws.send(JSON.stringify({ - id: Date.now(), + id: this._cmdId++, cmd: 'unsubscribe', - params: { channels: ['orderbook_delta', 'ticker'], market_tickers: [ticker] } + params: { channels: ['orderbook_delta'], market_ticker: ticker } + })); + this.ws.send(JSON.stringify({ + id: this._cmdId++, + cmd: 'unsubscribe', + params: { channels: ['ticker'], market_ticker: ticker } })); } } @@ -96,23 +95,29 @@ export class KalshiWS extends EventEmitter { this.shouldReconnect = false; this._stopPing(); clearTimeout(this.reconnectTimer); - if (this.ws) { this.ws.removeAllListeners(); this.ws.close(); this.ws = null; } - this.alive = false; } _sendSubscribe(ticker) { if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return; + // Subscribe orderbook_delta (private channel) with market_ticker this.ws.send(JSON.stringify({ - id: Date.now(), + id: this._cmdId++, cmd: 'subscribe', - params: { channels: ['orderbook_delta', 'ticker'], market_tickers: [ticker] } + params: { channels: ['orderbook_delta'], market_ticker: ticker } + })); + + // Subscribe ticker (public channel) with market_ticker + this.ws.send(JSON.stringify({ + id: this._cmdId++, + cmd: 'subscribe', + params: { channels: ['ticker'], market_ticker: ticker } })); } @@ -130,16 +135,17 @@ export class KalshiWS extends EventEmitter { return; } - if (type === 'subscribed') { - const channel = payload.channel || payload.channels || 'unknown'; - const text = Array.isArray(channel) ? channel.join(', ') : channel; - console.log(`[WS] Subscribed to: ${text}`); + if (type === 'subscribed' || type === 'ok') { + console.log(`[WS] ${type}:`, JSON.stringify(payload).slice(0, 200)); + } + + if (type === 'error') { + console.error('[WS] Server error:', JSON.stringify(payload).slice(0, 300)); } } _startPing() { this._stopPing(); - this.pingInterval = setInterval(() => { if (this.alive && this.ws?.readyState === WebSocket.OPEN) this.ws.ping(); }, 15000);