import { MarketTracker } from './lib/market/tracker.js'; import { PaperEngine } from './lib/paper/engine.js'; import { MartingaleStrategy } from './lib/strategies/martingale.js'; import { MartingaleAlphaStrategy } from './lib/strategies/martingale-alpha.js'; import { ThresholdStrategy } from './lib/strategies/threshold.js'; import { getMarket } from './lib/kalshi/rest.js'; import { db } from './lib/db.js'; import { notify } from './lib/notify.js'; import fs from 'fs'; const STATE_FILE = '/tmp/kalbot-state.json'; const HEARTBEAT_MS = 2000; async function main() { console.log('=== Kalbot Worker Starting ==='); await db.connect(); const paper = new PaperEngine(1000); await paper.init(); const strategies = [ new MartingaleStrategy({ threshold: 70, baseBet: 1, maxDoublings: 5 }), new MartingaleAlphaStrategy({ minPct: 40, maxPct: 60, baseBet: 1, maxRounds: 3 }), new ThresholdStrategy({ triggerPct: 65, betSize: 1 }) ]; for (const s of strategies) { paper._getAccount(s.name); } console.log(`[Worker] Loaded ${strategies.length} strategies: ${strategies.map((s) => s.name).join(', ')}`); let latestMarketState = null; async function processOrphans() { if (paper._resetting) return; try { const { settled, expired } = await paper.checkOrphans(getMarket); const allResolved = [...settled, ...expired]; if (allResolved.length > 0) { for (const strategy of strategies) { for (const trade of allResolved) { if (trade.strategy === strategy.name) { strategy.onSettlement(trade.result, trade); } } } writeState(latestMarketState, paper, strategies); } } catch (e) { console.error('[Worker] Orphan check error:', e.message); } } // Settle delayed positions before continuing await processOrphans(); // Continuously check open positions every 60s for delayed results setInterval(processOrphans, 60000); const tracker = new MarketTracker(); let heartbeatTimer = null; writeState(latestMarketState, paper, strategies); tracker.on('update', async (state) => { latestMarketState = state || null; writeState(latestMarketState, paper, strategies); if (!state || paper._resetting) return; for (const strategy of strategies) { if (!strategy.enabled) continue; const signal = strategy.evaluate(state); if (signal) { console.log(`[Worker] Signal from ${strategy.name}: ${signal.side} @ ${signal.price}ยข โ€” ${signal.reason}`); if (strategy.mode === 'paper') { await paper.executeTrade(signal, state); } } } writeState(latestMarketState, paper, strategies); }); tracker.on('settled', async ({ ticker, result }) => { console.log(`[Worker] Market ${ticker} rotated/closed. Result: ${result || 'pending'}`); if (paper._resetting) return; // Only attempt to settle and notify if the result is already available if (result) { const settledPositions = await paper.settle(ticker, result); if (settledPositions) { for (const strategy of strategies) { for (const trade of settledPositions) { strategy.onSettlement(trade.result, trade); } } await notify( `Market ${ticker} settled: ${result.toUpperCase()}`, 'Market Settled', 'default', 'chart_with_upwards_trend' ); } } else { console.log(`[Worker] Result for ${ticker} pending. Background poller will handle it when Kalshi publishes the outcome.`); } writeState(latestMarketState, paper, strategies); }); await tracker.start(); await notify('๐Ÿค– Kalbot Worker started!', 'Kalbot Online', 'low', 'robot,green_circle'); heartbeatTimer = setInterval(() => { writeState(latestMarketState, paper, strategies); }, HEARTBEAT_MS); setInterval(async () => { try { if (fs.existsSync('/tmp/kalbot-reset-flag')) { fs.unlinkSync('/tmp/kalbot-reset-flag'); console.log('[Worker] Reset flag detected โ€” resetting all paper data'); await paper.resetAll(); for (const s of strategies) { if (s.consecutiveLosses !== undefined) s.consecutiveLosses = 0; if (s.currentBetSize !== undefined) s.currentBetSize = s.config.baseBet; if (s.round !== undefined) s.round = 0; if (s.cycleWins !== undefined) s.cycleWins = 0; if (s.cycleLosses !== undefined) s.cycleLosses = 0; if (s.totalCycles !== undefined) s.totalCycles = 0; s.lastTradeTicker = null; s.lastTradeTime = 0; } writeState(latestMarketState, paper, strategies); await notify('๐Ÿ”„ Paper trading reset by admin', 'Kalbot Reset', 'default', 'recycle'); } } catch { // ignore } }, 1000); console.log('[Worker] Running. Press Ctrl+C to stop.'); const shutdown = async (signal) => { console.log(`\n[Worker] ${signal} received. Shutting down...`); clearInterval(heartbeatTimer); tracker.stop(); await notify('๐Ÿ”ด Kalbot Worker stopped', 'Kalbot Offline', 'high', 'robot,red_circle'); process.exit(0); }; process.on('SIGINT', () => shutdown('SIGINT')); process.on('SIGTERM', () => shutdown('SIGTERM')); } function writeState(marketState, paper, strategies) { const data = { market: marketState, paper: paper.getStats(), paperByStrategy: paper.getPerStrategyStats(), strategies: strategies.map((s) => s.toJSON()), workerUptime: process.uptime(), lastUpdate: Date.now() }; try { fs.writeFileSync(STATE_FILE, JSON.stringify(data)); } catch (e) { // Non-critical } } main().catch((err) => { console.error('[Worker] Fatal error:', err); process.exit(1); });