Fix: Add mutex lock to prevent trade evaluation race condition during settlement

This commit is contained in:
2026-03-16 00:29:06 -07:00
parent 7ba11ecdcb
commit 29fd889acb

View File

@@ -15,6 +15,18 @@ import fs from 'fs';
const STATE_FILE = '/tmp/kalbot-state.json';
const HEARTBEAT_MS = 2000;
// Mutex lock to prevent evaluate() running before onSettlement() callbacks finish
let isSettling = false;
async function lockSettling() {
while (isSettling) {
await new Promise(r => setTimeout(r, 50));
}
isSettling = true;
}
function unlockSettling() {
isSettling = false;
}
async function main() {
console.log('=== Kalbot Worker Starting ===');
@@ -44,6 +56,7 @@ async function main() {
async function processOrphans() {
if (paper._resetting) return;
await lockSettling();
try {
const { settled, expired } = await paper.checkOrphans(getMarket);
const allResolved = [...settled, ...expired];
@@ -59,6 +72,8 @@ async function main() {
}
} catch (e) {
console.error('[Worker] Orphan check error:', e.message);
} finally {
unlockSettling();
}
}
@@ -74,7 +89,8 @@ async function main() {
latestMarketState = state || null;
writeState(latestMarketState, paper, strategies);
if (!state || paper._resetting) return;
// Skip evaluating strategies if we are settling to avoid race conditions!
if (!state || paper._resetting || isSettling) return;
for (const strategy of strategies) {
if (!strategy.enabled) continue;
@@ -103,19 +119,24 @@ async function main() {
if (paper._resetting) return;
if (result) {
const settledPositions = await paper.settle(ticker, result);
if (settledPositions) {
for (const strategy of strategies) {
for (const trade of settledPositions) {
strategy.onSettlement(trade.result, trade);
await lockSettling();
try {
const settledPositions = await paper.settle(ticker, result);
if (settledPositions) {
for (const strategy of strategies) {
for (const trade of settledPositions) {
strategy.onSettlement(trade.result, trade);
}
}
await notify(
`Market ${ticker} settled: ${result.toUpperCase()}`,
'Market Settled',
'default',
'chart_with_upwards_trend'
);
}
await notify(
`Market ${ticker} settled: ${result.toUpperCase()}`,
'Market Settled',
'default',
'chart_with_upwards_trend'
);
} finally {
unlockSettling();
}
} else {
console.log(`[Worker] Result for ${ticker} pending.`);
@@ -136,19 +157,24 @@ async function main() {
if (fs.existsSync('/tmp/kalbot-reset-flag')) {
fs.unlinkSync('/tmp/kalbot-reset-flag');
console.log('[Worker] Reset flag detected — resetting all paper data');
await paper.resetAll();
for (const s of strategies) {
if (s.consecutiveLosses !== undefined) s.consecutiveLosses = 0;
if (s.currentBetSize !== undefined) s.currentBetSize = s.config.baseBet;
if (s.round !== undefined) s.round = 0;
if (s.cycleWins !== undefined) s.cycleWins = 0;
if (s.cycleLosses !== undefined) s.cycleLosses = 0;
if (s.totalCycles !== undefined) s.totalCycles = 0;
s.lastTradeTicker = null;
s.lastTradeTime = 0;
await lockSettling();
try {
await paper.resetAll();
for (const s of strategies) {
if (s.consecutiveLosses !== undefined) s.consecutiveLosses = 0;
if (s.currentBetSize !== undefined) s.currentBetSize = s.config.baseBet;
if (s.round !== undefined) s.round = 0;
if (s.cycleWins !== undefined) s.cycleWins = 0;
if (s.cycleLosses !== undefined) s.cycleLosses = 0;
if (s.totalCycles !== undefined) s.totalCycles = 0;
s.lastTradeTicker = null;
s.lastTradeTime = 0;
}
writeState(latestMarketState, paper, strategies);
await notify('🔄 Paper trading reset by admin', 'Kalbot Reset', 'default', 'recycle');
} finally {
unlockSettling();
}
writeState(latestMarketState, paper, strategies);
await notify('🔄 Paper trading reset by admin', 'Kalbot Reset', 'default', 'recycle');
}
} catch {
// ignore