diff --git a/worker.js b/worker.js index 9966df4..e0c837e 100644 --- a/worker.js +++ b/worker.js @@ -15,6 +15,18 @@ import fs from 'fs'; const STATE_FILE = '/tmp/kalbot-state.json'; const HEARTBEAT_MS = 2000; +// Mutex lock to prevent evaluate() running before onSettlement() callbacks finish +let isSettling = false; +async function lockSettling() { + while (isSettling) { + await new Promise(r => setTimeout(r, 50)); + } + isSettling = true; +} +function unlockSettling() { + isSettling = false; +} + async function main() { console.log('=== Kalbot Worker Starting ==='); @@ -44,6 +56,7 @@ async function main() { async function processOrphans() { if (paper._resetting) return; + await lockSettling(); try { const { settled, expired } = await paper.checkOrphans(getMarket); const allResolved = [...settled, ...expired]; @@ -59,6 +72,8 @@ async function main() { } } catch (e) { console.error('[Worker] Orphan check error:', e.message); + } finally { + unlockSettling(); } } @@ -74,7 +89,8 @@ async function main() { latestMarketState = state || null; writeState(latestMarketState, paper, strategies); - if (!state || paper._resetting) return; + // Skip evaluating strategies if we are settling to avoid race conditions! + if (!state || paper._resetting || isSettling) return; for (const strategy of strategies) { if (!strategy.enabled) continue; @@ -103,19 +119,24 @@ async function main() { if (paper._resetting) return; if (result) { - const settledPositions = await paper.settle(ticker, result); - if (settledPositions) { - for (const strategy of strategies) { - for (const trade of settledPositions) { - strategy.onSettlement(trade.result, trade); + await lockSettling(); + try { + const settledPositions = await paper.settle(ticker, result); + if (settledPositions) { + for (const strategy of strategies) { + for (const trade of settledPositions) { + strategy.onSettlement(trade.result, trade); + } } + await notify( + `Market ${ticker} settled: ${result.toUpperCase()}`, + 'Market Settled', + 'default', + 'chart_with_upwards_trend' + ); } - await notify( - `Market ${ticker} settled: ${result.toUpperCase()}`, - 'Market Settled', - 'default', - 'chart_with_upwards_trend' - ); + } finally { + unlockSettling(); } } else { console.log(`[Worker] Result for ${ticker} pending.`); @@ -136,19 +157,24 @@ async function main() { if (fs.existsSync('/tmp/kalbot-reset-flag')) { fs.unlinkSync('/tmp/kalbot-reset-flag'); console.log('[Worker] Reset flag detected — resetting all paper data'); - await paper.resetAll(); - for (const s of strategies) { - if (s.consecutiveLosses !== undefined) s.consecutiveLosses = 0; - if (s.currentBetSize !== undefined) s.currentBetSize = s.config.baseBet; - if (s.round !== undefined) s.round = 0; - if (s.cycleWins !== undefined) s.cycleWins = 0; - if (s.cycleLosses !== undefined) s.cycleLosses = 0; - if (s.totalCycles !== undefined) s.totalCycles = 0; - s.lastTradeTicker = null; - s.lastTradeTime = 0; + await lockSettling(); + try { + await paper.resetAll(); + for (const s of strategies) { + if (s.consecutiveLosses !== undefined) s.consecutiveLosses = 0; + if (s.currentBetSize !== undefined) s.currentBetSize = s.config.baseBet; + if (s.round !== undefined) s.round = 0; + if (s.cycleWins !== undefined) s.cycleWins = 0; + if (s.cycleLosses !== undefined) s.cycleLosses = 0; + if (s.totalCycles !== undefined) s.totalCycles = 0; + s.lastTradeTicker = null; + s.lastTradeTime = 0; + } + writeState(latestMarketState, paper, strategies); + await notify('🔄 Paper trading reset by admin', 'Kalbot Reset', 'default', 'recycle'); + } finally { + unlockSettling(); } - writeState(latestMarketState, paper, strategies); - await notify('🔄 Paper trading reset by admin', 'Kalbot Reset', 'default', 'recycle'); } } catch { // ignore