diff --git a/worker.js b/worker.js index f2e54c0..ad2bd87 100644 --- a/worker.js +++ b/worker.js @@ -16,10 +16,12 @@ const BALANCE_POLL_MS = 30000; let isSettling = false; async function lockSettling() { - while (isSettling) await new Promise(r => setTimeout(r, 50)); + while (isSettling) await new Promise((r) => setTimeout(r, 50)); isSettling = true; } -function unlockSettling() { isSettling = false; } +function unlockSettling() { + isSettling = false; +} async function main() { console.log('=== Kalbot Worker Starting ==='); @@ -35,7 +37,7 @@ async function main() { // Dynamically load all strategies const strategies = []; const strategiesDir = path.join(__dirname, 'lib', 'strategies'); - const stratFiles = fs.readdirSync(strategiesDir).filter(f => f.endsWith('.js') && f !== 'base.js'); + const stratFiles = fs.readdirSync(strategiesDir).filter((f) => f.endsWith('.js') && f !== 'base.js'); for (const file of stratFiles) { try { @@ -56,13 +58,17 @@ async function main() { paper._getAccount(s.name); } - console.log(`[Worker] Loaded ${strategies.length} strategies: ${strategies.map(s => s.name).join(', ')}`); + console.log(`[Worker] Loaded ${strategies.length} strategies: ${strategies.map((s) => s.name).join(', ')}`); let latestMarketState = null; async function pollBalance() { - try { await live.fetchBalance(); } catch {} - try { await live.fetchPositions(); } catch {} + try { + await live.fetchBalance(); + } catch {} + try { + await live.fetchPositions(); + } catch {} } await pollBalance(); setInterval(pollBalance, BALANCE_POLL_MS); @@ -72,7 +78,10 @@ async function main() { await lockSettling(); try { const { settled, expired } = await paper.checkOrphans(getMarket); - const allResolved = [...settled, ...expired]; + const settledLive = await live.checkOrphans(getMarket); + + const allResolved = [...settled, ...expired, ...settledLive]; + if (allResolved.length > 0) { for (const strategy of strategies) { for (const trade of allResolved) { @@ -91,7 +100,7 @@ async function main() { } await processOrphans(); - setInterval(processOrphans, 60000); + setInterval(processOrphans, 10000); const tracker = new MarketTracker(); let heartbeatTimer = null; @@ -121,7 +130,9 @@ async function main() { if (live.isStrategyEnabled(strategy.name) && !live.hasOpenPositionForStrategy(strategy.name)) { const liveSignal = strategy.evaluate(state, 'live'); if (liveSignal) { - console.log(`[Worker] LIVE signal from ${strategy.name}: ${liveSignal.side} @ ${liveSignal.price}¢ (max: ${liveSignal.maxPrice}¢)`); + console.log( + `[Worker] LIVE signal from ${strategy.name}: ${liveSignal.side} @ ${liveSignal.price}¢ (max: ${liveSignal.maxPrice}¢)` + ); await live.executeTrade(liveSignal, state); } } @@ -148,16 +159,30 @@ async function main() { } const settledLive = await live.settle(ticker, result); + if (settledLive) { + for (const strategy of strategies) { + for (const trade of settledLive) { + strategy.onSettlement(trade.result, trade); + } + } + } if (settledPaper || settledLive) { await notify( `Market ${ticker} settled: ${result.toUpperCase()}`, - 'Market Settled', 'default', 'chart_with_upwards_trend' + 'Market Settled', + 'default', + 'chart_with_upwards_trend' ); } } finally { unlockSettling(); } + } else { + // Market rotated but result not posted yet. Recover via orphan reconciliation shortly. + setTimeout(() => { + processOrphans().catch((e) => console.error('[Worker] delayed orphan reconcile failed:', e.message)); + }, 4000); } writeState(latestMarketState, paper, live, strategies); @@ -255,7 +280,7 @@ function writeState(marketState, paper, live, strategies) { market: marketState ? { ...marketState, orderbook: undefined } : null, paper: paper.getStats(), paperByStrategy: paper.getPerStrategyStats(), - strategies: strategies.map(s => s.toJSON()), + strategies: strategies.map((s) => s.toJSON()), workerUptime: process.uptime(), lastUpdate: Date.now() }; @@ -263,13 +288,17 @@ function writeState(marketState, paper, live, strategies) { const liveData = { market: marketState ? { ...marketState, orderbook: undefined } : null, live: live.getStats(), - strategies: strategies.map(s => s.toJSON()), + strategies: strategies.map((s) => s.toJSON()), workerUptime: process.uptime(), lastUpdate: Date.now() }; - try { fs.writeFileSync(STATE_FILE, JSON.stringify(paperData)); } catch {} - try { fs.writeFileSync(LIVE_STATE_FILE, JSON.stringify(liveData)); } catch {} + try { + fs.writeFileSync(STATE_FILE, JSON.stringify(paperData)); + } catch {} + try { + fs.writeFileSync(LIVE_STATE_FILE, JSON.stringify(liveData)); + } catch {} } main().catch((err) => {