Fix: reconcile delayed live settlements after rotation

This commit is contained in:
2026-03-16 13:51:55 -07:00
parent 1c8dec1f17
commit 0437bdb1db

View File

@@ -16,10 +16,12 @@ const BALANCE_POLL_MS = 30000;
let isSettling = false;
async function lockSettling() {
while (isSettling) await new Promise(r => setTimeout(r, 50));
while (isSettling) await new Promise((r) => setTimeout(r, 50));
isSettling = true;
}
function unlockSettling() { isSettling = false; }
function unlockSettling() {
isSettling = false;
}
async function main() {
console.log('=== Kalbot Worker Starting ===');
@@ -35,7 +37,7 @@ async function main() {
// Dynamically load all strategies
const strategies = [];
const strategiesDir = path.join(__dirname, 'lib', 'strategies');
const stratFiles = fs.readdirSync(strategiesDir).filter(f => f.endsWith('.js') && f !== 'base.js');
const stratFiles = fs.readdirSync(strategiesDir).filter((f) => f.endsWith('.js') && f !== 'base.js');
for (const file of stratFiles) {
try {
@@ -56,13 +58,17 @@ async function main() {
paper._getAccount(s.name);
}
console.log(`[Worker] Loaded ${strategies.length} strategies: ${strategies.map(s => s.name).join(', ')}`);
console.log(`[Worker] Loaded ${strategies.length} strategies: ${strategies.map((s) => s.name).join(', ')}`);
let latestMarketState = null;
async function pollBalance() {
try { await live.fetchBalance(); } catch {}
try { await live.fetchPositions(); } catch {}
try {
await live.fetchBalance();
} catch {}
try {
await live.fetchPositions();
} catch {}
}
await pollBalance();
setInterval(pollBalance, BALANCE_POLL_MS);
@@ -72,7 +78,10 @@ async function main() {
await lockSettling();
try {
const { settled, expired } = await paper.checkOrphans(getMarket);
const allResolved = [...settled, ...expired];
const settledLive = await live.checkOrphans(getMarket);
const allResolved = [...settled, ...expired, ...settledLive];
if (allResolved.length > 0) {
for (const strategy of strategies) {
for (const trade of allResolved) {
@@ -91,7 +100,7 @@ async function main() {
}
await processOrphans();
setInterval(processOrphans, 60000);
setInterval(processOrphans, 10000);
const tracker = new MarketTracker();
let heartbeatTimer = null;
@@ -121,7 +130,9 @@ async function main() {
if (live.isStrategyEnabled(strategy.name) && !live.hasOpenPositionForStrategy(strategy.name)) {
const liveSignal = strategy.evaluate(state, 'live');
if (liveSignal) {
console.log(`[Worker] LIVE signal from ${strategy.name}: ${liveSignal.side} @ ${liveSignal.price}¢ (max: ${liveSignal.maxPrice}¢)`);
console.log(
`[Worker] LIVE signal from ${strategy.name}: ${liveSignal.side} @ ${liveSignal.price}¢ (max: ${liveSignal.maxPrice}¢)`
);
await live.executeTrade(liveSignal, state);
}
}
@@ -148,16 +159,30 @@ async function main() {
}
const settledLive = await live.settle(ticker, result);
if (settledLive) {
for (const strategy of strategies) {
for (const trade of settledLive) {
strategy.onSettlement(trade.result, trade);
}
}
}
if (settledPaper || settledLive) {
await notify(
`Market ${ticker} settled: ${result.toUpperCase()}`,
'Market Settled', 'default', 'chart_with_upwards_trend'
'Market Settled',
'default',
'chart_with_upwards_trend'
);
}
} finally {
unlockSettling();
}
} else {
// Market rotated but result not posted yet. Recover via orphan reconciliation shortly.
setTimeout(() => {
processOrphans().catch((e) => console.error('[Worker] delayed orphan reconcile failed:', e.message));
}, 4000);
}
writeState(latestMarketState, paper, live, strategies);
@@ -255,7 +280,7 @@ function writeState(marketState, paper, live, strategies) {
market: marketState ? { ...marketState, orderbook: undefined } : null,
paper: paper.getStats(),
paperByStrategy: paper.getPerStrategyStats(),
strategies: strategies.map(s => s.toJSON()),
strategies: strategies.map((s) => s.toJSON()),
workerUptime: process.uptime(),
lastUpdate: Date.now()
};
@@ -263,13 +288,17 @@ function writeState(marketState, paper, live, strategies) {
const liveData = {
market: marketState ? { ...marketState, orderbook: undefined } : null,
live: live.getStats(),
strategies: strategies.map(s => s.toJSON()),
strategies: strategies.map((s) => s.toJSON()),
workerUptime: process.uptime(),
lastUpdate: Date.now()
};
try { fs.writeFileSync(STATE_FILE, JSON.stringify(paperData)); } catch {}
try { fs.writeFileSync(LIVE_STATE_FILE, JSON.stringify(liveData)); } catch {}
try {
fs.writeFileSync(STATE_FILE, JSON.stringify(paperData));
} catch {}
try {
fs.writeFileSync(LIVE_STATE_FILE, JSON.stringify(liveData));
} catch {}
}
main().catch((err) => {