Files
KalBot/lib/kalshi/websocket.js

164 lines
4.3 KiB
JavaScript

import WebSocket from 'ws';
import { signRequest } from './auth.js';
import { EventEmitter } from 'events';
const WS_URL = 'wss://api.elections.kalshi.com/trade-api/ws/v2';
function unwrapPacket(packet) {
if (!packet || typeof packet !== 'object') return { type: null, payload: null };
const type = packet.type || null;
const meta = { id: packet.id, sid: packet.sid, seq: packet.seq, type };
if (packet.msg && typeof packet.msg === 'object') {
return { type, payload: { ...meta, ...packet.msg } };
}
return { type, payload: { ...meta, ...packet } };
}
export class KalshiWS extends EventEmitter {
constructor() {
super();
this.ws = null;
this.subscribedTickers = new Set();
this.alive = false;
this.reconnectTimer = null;
this.pingInterval = null;
this.shouldReconnect = true;
this._cmdId = 1;
}
connect() {
if (this.ws) this.disconnect();
this.shouldReconnect = true;
const path = '/trade-api/ws/v2';
const headers = signRequest('GET', path);
this.ws = new WebSocket(WS_URL, { headers });
this.ws.on('open', () => {
console.log('[WS] Connected to Kalshi');
this.alive = true;
this._startPing();
for (const ticker of this.subscribedTickers) this._sendSubscribe(ticker);
this.emit('connected');
});
this.ws.on('message', (raw) => {
try {
const packet = JSON.parse(raw.toString());
this._handleMessage(packet);
} catch (e) {
console.error('[WS] Parse error:', e.message);
}
});
this.ws.on('close', (code) => {
console.log(`[WS] Disconnected (code: ${code}).`);
this.alive = false;
this._stopPing();
if (this.shouldReconnect) {
console.log('[WS] Reconnecting in 3s...');
this._scheduleReconnect();
}
});
this.ws.on('error', (err) => {
console.error('[WS] Error:', err.message);
});
}
subscribeTicker(ticker) {
if (!ticker) return;
this.subscribedTickers.add(ticker);
if (this.alive) this._sendSubscribe(ticker);
}
unsubscribeTicker(ticker) {
if (!ticker) return;
this.subscribedTickers.delete(ticker);
if (this.alive && this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({
id: this._cmdId++,
cmd: 'unsubscribe',
params: { channels: ['orderbook_delta'], market_ticker: ticker }
}));
this.ws.send(JSON.stringify({
id: this._cmdId++,
cmd: 'unsubscribe',
params: { channels: ['ticker'], market_ticker: ticker }
}));
}
}
disconnect() {
this.shouldReconnect = false;
this._stopPing();
clearTimeout(this.reconnectTimer);
if (this.ws) {
this.ws.removeAllListeners();
this.ws.close();
this.ws = null;
}
this.alive = false;
}
_sendSubscribe(ticker) {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) return;
// Subscribe orderbook_delta (private channel) with market_ticker
this.ws.send(JSON.stringify({
id: this._cmdId++,
cmd: 'subscribe',
params: { channels: ['orderbook_delta'], market_ticker: ticker }
}));
// Subscribe ticker (public channel) with market_ticker
this.ws.send(JSON.stringify({
id: this._cmdId++,
cmd: 'subscribe',
params: { channels: ['ticker'], market_ticker: ticker }
}));
}
_handleMessage(packet) {
const { type, payload } = unwrapPacket(packet);
if (!type || !payload) return;
if (type === 'orderbook_snapshot' || type === 'orderbook_delta') {
this.emit('orderbook', payload);
return;
}
if (type === 'ticker') {
this.emit('ticker', payload);
return;
}
if (type === 'subscribed' || type === 'ok') {
console.log(`[WS] ${type}:`, JSON.stringify(payload).slice(0, 200));
}
if (type === 'error') {
console.error('[WS] Server error:', JSON.stringify(payload).slice(0, 300));
}
}
_startPing() {
this._stopPing();
this.pingInterval = setInterval(() => {
if (this.alive && this.ws?.readyState === WebSocket.OPEN) this.ws.ping();
}, 15000);
}
_stopPing() {
clearInterval(this.pingInterval);
this.pingInterval = null;
}
_scheduleReconnect() {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = setTimeout(() => this.connect(), 3000);
}
}