From e7198fede5d814144b398fee8a331082c48df67f Mon Sep 17 00:00:00 2001 From: multipleof4 Date: Mon, 16 Mar 2026 11:31:35 -0700 Subject: [PATCH] Feat: Worker runs both paper + live engines --- worker.js | 155 +++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 106 insertions(+), 49 deletions(-) diff --git a/worker.js b/worker.js index 5f2df98..760c5ac 100644 --- a/worker.js +++ b/worker.js @@ -1,5 +1,6 @@ import { MarketTracker } from './lib/market/tracker.js'; import { PaperEngine } from './lib/paper/engine.js'; +import { LiveEngine } from './lib/live/engine.js'; import { getMarket } from './lib/kalshi/rest.js'; import { db } from './lib/db.js'; import { notify } from './lib/notify.js'; @@ -9,19 +10,16 @@ import { fileURLToPath, pathToFileURL } from 'url'; const __dirname = path.dirname(fileURLToPath(import.meta.url)); const STATE_FILE = '/tmp/kalbot-state.json'; +const LIVE_STATE_FILE = '/tmp/kalbot-live-state.json'; const HEARTBEAT_MS = 2000; +const BALANCE_POLL_MS = 30000; -// Mutex lock to prevent evaluate() running before onSettlement() callbacks finish let isSettling = false; async function lockSettling() { - while (isSettling) { - await new Promise(r => setTimeout(r, 50)); - } + while (isSettling) await new Promise(r => setTimeout(r, 50)); isSettling = true; } -function unlockSettling() { - isSettling = false; -} +function unlockSettling() { isSettling = false; } async function main() { console.log('=== Kalbot Worker Starting ==='); @@ -31,7 +29,10 @@ async function main() { const paper = new PaperEngine(1000); await paper.init(); - // Dynamically load all strategies from the strategies directory! + const live = new LiveEngine(); + await live.init(); + + // Dynamically load all strategies const strategies = []; const strategiesDir = path.join(__dirname, 'lib', 'strategies'); const stratFiles = fs.readdirSync(strategiesDir).filter(f => f.endsWith('.js') && f !== 'base.js'); @@ -40,11 +41,9 @@ async function main() { try { const fileUrl = pathToFileURL(path.join(strategiesDir, file)).href; const mod = await import(fileUrl); - - // Grab the first exported class/function for (const key in mod) { if (typeof mod[key] === 'function') { - strategies.push(new mod[key]()); // Instances use their own internal default configs! + strategies.push(new mod[key]()); break; } } @@ -57,10 +56,19 @@ async function main() { paper._getAccount(s.name); } - console.log(`[Worker] Dynamically loaded ${strategies.length} strategies: ${strategies.map((s) => s.name).join(', ')}`); + console.log(`[Worker] Loaded ${strategies.length} strategies: ${strategies.map(s => s.name).join(', ')}`); let latestMarketState = null; + // Fetch Kalshi balance periodically + async function pollBalance() { + try { await live.fetchBalance(); } catch {} + try { await live.fetchPositions(); } catch {} + } + await pollBalance(); + setInterval(pollBalance, BALANCE_POLL_MS); + + // Orphan checker for paper async function processOrphans() { if (paper._resetting) return; await lockSettling(); @@ -75,7 +83,7 @@ async function main() { } } } - writeState(latestMarketState, paper, strategies); + writeState(latestMarketState, paper, live, strategies); } } catch (e) { console.error('[Worker] Orphan check error:', e.message); @@ -90,75 +98,85 @@ async function main() { const tracker = new MarketTracker(); let heartbeatTimer = null; - writeState(latestMarketState, paper, strategies); + writeState(latestMarketState, paper, live, strategies); tracker.on('update', async (state) => { latestMarketState = state || null; - writeState(latestMarketState, paper, strategies); + writeState(latestMarketState, paper, live, strategies); - // Skip evaluating strategies if we are settling to avoid race conditions! if (!state || paper._resetting || isSettling) return; for (const strategy of strategies) { if (!strategy.enabled) continue; - const acct = paper._getAccount(strategy.name); - if (acct.openPositions.size > 0) { - continue; + // Paper trading (all strategies, always on) + const paperAcct = paper._getAccount(strategy.name); + if (paperAcct.openPositions.size === 0) { + const signal = strategy.evaluate(state); + if (signal) { + console.log(`[Worker] Paper signal from ${strategy.name}: ${signal.side} @ ${signal.price}¢`); + if (strategy.mode === 'paper') { + await paper.executeTrade(signal, state); + } + } } - const signal = strategy.evaluate(state); - if (signal) { - console.log(`[Worker] Signal from ${strategy.name}: ${signal.side} @ ${signal.price}¢ — ${signal.reason}`); - - if (strategy.mode === 'paper') { - await paper.executeTrade(signal, state); + // Live trading (only enabled strategies) + if (live.isStrategyEnabled(strategy.name) && !live.hasOpenPositionForStrategy(strategy.name)) { + const liveSignal = strategy.evaluate(state); + if (liveSignal) { + console.log(`[Worker] LIVE signal from ${strategy.name}: ${liveSignal.side} @ ${liveSignal.price}¢`); + await live.executeTrade(liveSignal, state); } } } - writeState(latestMarketState, paper, strategies); + writeState(latestMarketState, paper, live, strategies); }); tracker.on('settled', async ({ ticker, result }) => { - console.log(`[Worker] Market ${ticker} rotated/closed. Result: ${result || 'pending'}`); + console.log(`[Worker] Market ${ticker} settled. Result: ${result || 'pending'}`); if (paper._resetting) return; if (result) { await lockSettling(); try { - const settledPositions = await paper.settle(ticker, result); - if (settledPositions) { + // Settle paper positions + const settledPaper = await paper.settle(ticker, result); + if (settledPaper) { for (const strategy of strategies) { - for (const trade of settledPositions) { + for (const trade of settledPaper) { strategy.onSettlement(trade.result, trade); } } + } + + // Settle live positions + const settledLive = await live.settle(ticker, result); + + if (settledPaper || settledLive) { await notify( `Market ${ticker} settled: ${result.toUpperCase()}`, - 'Market Settled', - 'default', - 'chart_with_upwards_trend' + 'Market Settled', 'default', 'chart_with_upwards_trend' ); } } finally { unlockSettling(); } - } else { - console.log(`[Worker] Result for ${ticker} pending.`); } - writeState(latestMarketState, paper, strategies); + writeState(latestMarketState, paper, live, strategies); }); await tracker.start(); await notify(`🤖 Kalbot Worker started with ${strategies.length} strats!`, 'Kalbot Online', 'low', 'robot,green_circle'); heartbeatTimer = setInterval(() => { - writeState(latestMarketState, paper, strategies); + writeState(latestMarketState, paper, live, strategies); }, HEARTBEAT_MS); + // Poll for paper reset flag setInterval(async () => { try { if (fs.existsSync('/tmp/kalbot-reset-flag')) { @@ -177,17 +195,51 @@ async function main() { s.lastTradeTicker = null; s.lastTradeTime = 0; } - writeState(latestMarketState, paper, strategies); + writeState(latestMarketState, paper, live, strategies); await notify('🔄 Paper trading reset by admin', 'Kalbot Reset', 'default', 'recycle'); } finally { unlockSettling(); } } - } catch { - // ignore - } + } catch {} }, 1000); + // Poll for live commands (enable/disable/pause/resume) + setInterval(async () => { + try { + if (fs.existsSync('/tmp/kalbot-live-cmd')) { + const raw = fs.readFileSync('/tmp/kalbot-live-cmd', 'utf-8'); + fs.unlinkSync('/tmp/kalbot-live-cmd'); + const cmd = JSON.parse(raw); + + switch (cmd.action) { + case 'enable': + if (cmd.strategy) { + live.enableStrategy(cmd.strategy); + await notify(`⚡ Strategy "${cmd.strategy}" ENABLED for live trading`, 'Live Enable', 'high', 'zap'); + } + break; + case 'disable': + if (cmd.strategy) { + live.disableStrategy(cmd.strategy); + await notify(`🔴 Strategy "${cmd.strategy}" DISABLED`, 'Live Disable', 'default', 'red_circle'); + } + break; + case 'pause': + live.pause(); + await notify('⏸ Live trading PAUSED by admin', 'Live Paused', 'urgent', 'double_vertical_bar'); + break; + case 'resume': + live.resume(); + await notify('▶️ Live trading RESUMED by admin', 'Live Resumed', 'high', 'arrow_forward'); + break; + } + + writeState(latestMarketState, paper, live, strategies); + } + } catch {} + }, 500); + console.log('[Worker] Running. Press Ctrl+C to stop.'); const shutdown = async (signal) => { @@ -202,21 +254,26 @@ async function main() { process.on('SIGTERM', () => shutdown('SIGTERM')); } -function writeState(marketState, paper, strategies) { - const data = { +function writeState(marketState, paper, live, strategies) { + const paperData = { market: marketState, paper: paper.getStats(), paperByStrategy: paper.getPerStrategyStats(), - strategies: strategies.map((s) => s.toJSON()), + strategies: strategies.map(s => s.toJSON()), workerUptime: process.uptime(), lastUpdate: Date.now() }; - try { - fs.writeFileSync(STATE_FILE, JSON.stringify(data)); - } catch (e) { - // Non-critical - } + const liveData = { + market: marketState, + live: live.getStats(), + strategies: strategies.map(s => s.toJSON()), + workerUptime: process.uptime(), + lastUpdate: Date.now() + }; + + try { fs.writeFileSync(STATE_FILE, JSON.stringify(paperData)); } catch {} + try { fs.writeFileSync(LIVE_STATE_FILE, JSON.stringify(liveData)); } catch {} } main().catch((err) => {