import { MarketTracker } from './lib/market/tracker.js'; import { PaperEngine } from './lib/paper/engine.js'; import { LiveEngine } from './lib/live/engine.js'; import { getMarket } from './lib/kalshi/rest.js'; import { db } from './lib/db.js'; import { notify } from './lib/notify.js'; import fs from 'fs'; import path from 'path'; import { fileURLToPath, pathToFileURL } from 'url'; const __dirname = path.dirname(fileURLToPath(import.meta.url)); const STATE_FILE = '/tmp/kalbot-state.json'; const LIVE_STATE_FILE = '/tmp/kalbot-live-state.json'; const HEARTBEAT_MS = 2000; const BALANCE_POLL_MS = 30000; let isSettling = false; async function lockSettling() { while (isSettling) await new Promise((r) => setTimeout(r, 50)); isSettling = true; } function unlockSettling() { isSettling = false; } async function main() { console.log('=== Kalbot Worker Starting ==='); await db.connect(); const paper = new PaperEngine(1000); await paper.init(); const live = new LiveEngine(); await live.init(); // Dynamically load all strategies const strategies = []; const strategiesDir = path.join(__dirname, 'lib', 'strategies'); const stratFiles = fs.readdirSync(strategiesDir).filter((f) => f.endsWith('.js') && f !== 'base.js'); for (const file of stratFiles) { try { const fileUrl = pathToFileURL(path.join(strategiesDir, file)).href; const mod = await import(fileUrl); for (const key in mod) { if (typeof mod[key] === 'function') { strategies.push(new mod[key]()); break; } } } catch (err) { console.error(`[Worker] Failed to load strategy ${file}:`, err.message); } } for (const s of strategies) { paper._getAccount(s.name); } console.log(`[Worker] Loaded ${strategies.length} strategies: ${strategies.map((s) => s.name).join(', ')}`); let latestMarketState = null; async function pollBalance() { try { await live.fetchBalance(); } catch {} try { await live.fetchPositions(); } catch {} } await pollBalance(); setInterval(pollBalance, BALANCE_POLL_MS); async function processOrphans() { if (paper._resetting) return; await lockSettling(); try { const { settled, expired } = await paper.checkOrphans(getMarket); const settledLive = await live.checkOrphans(getMarket); const allResolved = [...settled, ...expired, ...settledLive]; if (allResolved.length > 0) { for (const strategy of strategies) { for (const trade of allResolved) { if (trade.strategy === strategy.name) { strategy.onSettlement(trade.result, trade); } } } writeState(latestMarketState, paper, live, strategies); } } catch (e) { console.error('[Worker] Orphan check error:', e.message); } finally { unlockSettling(); } } await processOrphans(); setInterval(processOrphans, 10000); const tracker = new MarketTracker(); let heartbeatTimer = null; writeState(latestMarketState, paper, live, strategies); tracker.on('update', async (state) => { latestMarketState = state || null; writeState(latestMarketState, paper, live, strategies); if (!state || paper._resetting || isSettling) return; for (const strategy of strategies) { if (!strategy.enabled) continue; // ===== PAPER TRADING (isolated evaluation) ===== const paperAcct = paper._getAccount(strategy.name); if (paperAcct.openPositions.size === 0) { const paperSignal = strategy.evaluate(state, 'paper'); if (paperSignal) { console.log(`[Worker] Paper signal from ${strategy.name}: ${paperSignal.side} @ ${paperSignal.price}¢`); await paper.executeTrade(paperSignal, state); } } // ===== LIVE TRADING (separate evaluation, won't poison paper) ===== if (live.isStrategyEnabled(strategy.name) && !live.hasOpenPositionForStrategy(strategy.name)) { const liveSignal = strategy.evaluate(state, 'live'); if (liveSignal) { console.log( `[Worker] LIVE signal from ${strategy.name}: ${liveSignal.side} @ ${liveSignal.price}¢ (max: ${liveSignal.maxPrice}¢)` ); await live.executeTrade(liveSignal, state); } } } writeState(latestMarketState, paper, live, strategies); }); tracker.on('settled', async ({ ticker, result }) => { console.log(`[Worker] Market ${ticker} settled. Result: ${result || 'pending'}`); if (paper._resetting) return; if (result) { await lockSettling(); try { const settledPaper = await paper.settle(ticker, result); if (settledPaper) { for (const strategy of strategies) { for (const trade of settledPaper) { strategy.onSettlement(trade.result, trade); } } } const settledLive = await live.settle(ticker, result); if (settledLive) { for (const strategy of strategies) { for (const trade of settledLive) { strategy.onSettlement(trade.result, trade); } } } if (settledPaper || settledLive) { await notify( `Market ${ticker} settled: ${result.toUpperCase()}`, 'Market Settled', 'default', 'chart_with_upwards_trend' ); } } finally { unlockSettling(); } } else { // Market rotated but result not posted yet. Recover via orphan reconciliation shortly. setTimeout(() => { processOrphans().catch((e) => console.error('[Worker] delayed orphan reconcile failed:', e.message)); }, 4000); } writeState(latestMarketState, paper, live, strategies); }); await tracker.start(); await notify(`🤖 Kalbot Worker started with ${strategies.length} strats!`, 'Kalbot Online', 'low', 'robot,green_circle'); heartbeatTimer = setInterval(() => { writeState(latestMarketState, paper, live, strategies); }, HEARTBEAT_MS); // Poll for paper reset flag setInterval(async () => { try { if (fs.existsSync('/tmp/kalbot-reset-flag')) { fs.unlinkSync('/tmp/kalbot-reset-flag'); console.log('[Worker] Reset flag detected — resetting all paper data'); await lockSettling(); try { await paper.resetAll(); for (const s of strategies) { if (s.consecutiveLosses !== undefined) s.consecutiveLosses = 0; if (s.currentBetSize !== undefined) s.currentBetSize = s.config.baseBet; if (s.round !== undefined) s.round = 0; if (s.cycleWins !== undefined) s.cycleWins = 0; if (s.cycleLosses !== undefined) s.cycleLosses = 0; if (s.totalCycles !== undefined) s.totalCycles = 0; if (s._lastTrade) { s._lastTrade.paper = { time: 0, ticker: null }; s._lastTrade.live = { time: 0, ticker: null }; } } writeState(latestMarketState, paper, live, strategies); await notify('🔄 Paper trading reset by admin', 'Kalbot Reset', 'default', 'recycle'); } finally { unlockSettling(); } } } catch {} }, 1000); // Poll for live commands (enable/disable/pause/resume) setInterval(async () => { try { if (fs.existsSync('/tmp/kalbot-live-cmd')) { const raw = fs.readFileSync('/tmp/kalbot-live-cmd', 'utf-8'); fs.unlinkSync('/tmp/kalbot-live-cmd'); const cmd = JSON.parse(raw); switch (cmd.action) { case 'enable': if (cmd.strategy) { live.enableStrategy(cmd.strategy); await notify(`⚡ Strategy "${cmd.strategy}" ENABLED for live trading`, 'Live Enable', 'high', 'zap'); } break; case 'disable': if (cmd.strategy) { live.disableStrategy(cmd.strategy); await notify(`🔴 Strategy "${cmd.strategy}" DISABLED`, 'Live Disable', 'default', 'red_circle'); } break; case 'pause': live.pause(); await notify('⏸ Live trading PAUSED by admin', 'Live Paused', 'urgent', 'double_vertical_bar'); break; case 'resume': live.resume(); await notify('▶️ Live trading RESUMED by admin', 'Live Resumed', 'high', 'arrow_forward'); break; } writeState(latestMarketState, paper, live, strategies); } } catch {} }, 500); console.log('[Worker] Running. Press Ctrl+C to stop.'); const shutdown = async (signal) => { console.log(`\n[Worker] ${signal} received. Shutting down...`); clearInterval(heartbeatTimer); tracker.stop(); await notify('🔴 Kalbot Worker stopped', 'Kalbot Offline', 'high', 'robot,red_circle'); process.exit(0); }; process.on('SIGINT', () => shutdown('SIGINT')); process.on('SIGTERM', () => shutdown('SIGTERM')); } function writeState(marketState, paper, live, strategies) { const paperData = { market: marketState ? { ...marketState, orderbook: undefined } : null, paper: paper.getStats(), paperByStrategy: paper.getPerStrategyStats(), strategies: strategies.map((s) => s.toJSON()), workerUptime: process.uptime(), lastUpdate: Date.now() }; const liveData = { market: marketState ? { ...marketState, orderbook: undefined } : null, live: live.getStats(), strategies: strategies.map((s) => s.toJSON()), workerUptime: process.uptime(), lastUpdate: Date.now() }; try { fs.writeFileSync(STATE_FILE, JSON.stringify(paperData)); } catch {} try { fs.writeFileSync(LIVE_STATE_FILE, JSON.stringify(liveData)); } catch {} } main().catch((err) => { console.error('[Worker] Fatal error:', err); process.exit(1); });