Compare commits

..

2 Commits

View File

@@ -1,20 +1,28 @@
import { MarketTracker } from './lib/market/tracker.js'; import { MarketTracker } from './lib/market/tracker.js';
import { PaperEngine } from './lib/paper/engine.js'; import { PaperEngine } from './lib/paper/engine.js';
import { MartingaleStrategy } from './lib/strategies/martingale.js';
import { MartingaleAlphaStrategy } from './lib/strategies/martingale-alpha.js';
import { ThresholdStrategy } from './lib/strategies/threshold.js';
import { BullDipBuyer } from './lib/strategies/bull-dip-buyer.js';
import { SniperReversalStrategy } from './lib/strategies/sniper-reversal.js';
import { MomentumRiderStrategy } from './lib/strategies/momentum-rider.js';
import { DontDoubtBullStrategy } from './lib/strategies/dont-doubt-bull.js';
import { getMarket } from './lib/kalshi/rest.js'; import { getMarket } from './lib/kalshi/rest.js';
import { db } from './lib/db.js'; import { db } from './lib/db.js';
import { notify } from './lib/notify.js'; import { notify } from './lib/notify.js';
import fs from 'fs'; import fs from 'fs';
import path from 'path';
import { fileURLToPath, pathToFileURL } from 'url';
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const STATE_FILE = '/tmp/kalbot-state.json'; const STATE_FILE = '/tmp/kalbot-state.json';
const HEARTBEAT_MS = 2000; const HEARTBEAT_MS = 2000;
// Mutex lock to prevent evaluate() running before onSettlement() callbacks finish
let isSettling = false;
async function lockSettling() {
while (isSettling) {
await new Promise(r => setTimeout(r, 50));
}
isSettling = true;
}
function unlockSettling() {
isSettling = false;
}
async function main() { async function main() {
console.log('=== Kalbot Worker Starting ==='); console.log('=== Kalbot Worker Starting ===');
@@ -23,27 +31,39 @@ async function main() {
const paper = new PaperEngine(1000); const paper = new PaperEngine(1000);
await paper.init(); await paper.init();
// Load all 7 strategies! // Dynamically load all strategies from the strategies directory!
const strategies = [ const strategies = [];
new MartingaleStrategy({ threshold: 70, baseBet: 1, maxDoublings: 5 }), const strategiesDir = path.join(__dirname, 'lib', 'strategies');
new MartingaleAlphaStrategy({ minPct: 40, maxPct: 60, baseBet: 1, maxRounds: 3 }), const stratFiles = fs.readdirSync(strategiesDir).filter(f => f.endsWith('.js') && f !== 'base.js');
new ThresholdStrategy({ triggerPct: 65, betSize: 1 }),
new BullDipBuyer({ maxYesPrice: 45, minYesPrice: 15, betSize: 2 }), for (const file of stratFiles) {
new SniperReversalStrategy({ triggerPct: 95, minsLeft: 3, betSize: 1 }), try {
new MomentumRiderStrategy({ triggerPct: 75, betSize: 2 }), const fileUrl = pathToFileURL(path.join(strategiesDir, file)).href;
new DontDoubtBullStrategy({ minYesPct: 30, maxYesPct: 40, betSize: 2 }) const mod = await import(fileUrl);
];
// Grab the first exported class/function
for (const key in mod) {
if (typeof mod[key] === 'function') {
strategies.push(new mod[key]()); // Instances use their own internal default configs!
break;
}
}
} catch (err) {
console.error(`[Worker] Failed to load strategy ${file}:`, err.message);
}
}
for (const s of strategies) { for (const s of strategies) {
paper._getAccount(s.name); paper._getAccount(s.name);
} }
console.log(`[Worker] Loaded ${strategies.length} strategies: ${strategies.map((s) => s.name).join(', ')}`); console.log(`[Worker] Dynamically loaded ${strategies.length} strategies: ${strategies.map((s) => s.name).join(', ')}`);
let latestMarketState = null; let latestMarketState = null;
async function processOrphans() { async function processOrphans() {
if (paper._resetting) return; if (paper._resetting) return;
await lockSettling();
try { try {
const { settled, expired } = await paper.checkOrphans(getMarket); const { settled, expired } = await paper.checkOrphans(getMarket);
const allResolved = [...settled, ...expired]; const allResolved = [...settled, ...expired];
@@ -59,6 +79,8 @@ async function main() {
} }
} catch (e) { } catch (e) {
console.error('[Worker] Orphan check error:', e.message); console.error('[Worker] Orphan check error:', e.message);
} finally {
unlockSettling();
} }
} }
@@ -74,7 +96,8 @@ async function main() {
latestMarketState = state || null; latestMarketState = state || null;
writeState(latestMarketState, paper, strategies); writeState(latestMarketState, paper, strategies);
if (!state || paper._resetting) return; // Skip evaluating strategies if we are settling to avoid race conditions!
if (!state || paper._resetting || isSettling) return;
for (const strategy of strategies) { for (const strategy of strategies) {
if (!strategy.enabled) continue; if (!strategy.enabled) continue;
@@ -103,6 +126,8 @@ async function main() {
if (paper._resetting) return; if (paper._resetting) return;
if (result) { if (result) {
await lockSettling();
try {
const settledPositions = await paper.settle(ticker, result); const settledPositions = await paper.settle(ticker, result);
if (settledPositions) { if (settledPositions) {
for (const strategy of strategies) { for (const strategy of strategies) {
@@ -117,6 +142,9 @@ async function main() {
'chart_with_upwards_trend' 'chart_with_upwards_trend'
); );
} }
} finally {
unlockSettling();
}
} else { } else {
console.log(`[Worker] Result for ${ticker} pending.`); console.log(`[Worker] Result for ${ticker} pending.`);
} }
@@ -125,7 +153,7 @@ async function main() {
}); });
await tracker.start(); await tracker.start();
await notify('🤖 Kalbot Worker started with 7 strats!', 'Kalbot Online', 'low', 'robot,green_circle'); await notify(`🤖 Kalbot Worker started with ${strategies.length} strats!`, 'Kalbot Online', 'low', 'robot,green_circle');
heartbeatTimer = setInterval(() => { heartbeatTimer = setInterval(() => {
writeState(latestMarketState, paper, strategies); writeState(latestMarketState, paper, strategies);
@@ -136,6 +164,8 @@ async function main() {
if (fs.existsSync('/tmp/kalbot-reset-flag')) { if (fs.existsSync('/tmp/kalbot-reset-flag')) {
fs.unlinkSync('/tmp/kalbot-reset-flag'); fs.unlinkSync('/tmp/kalbot-reset-flag');
console.log('[Worker] Reset flag detected — resetting all paper data'); console.log('[Worker] Reset flag detected — resetting all paper data');
await lockSettling();
try {
await paper.resetAll(); await paper.resetAll();
for (const s of strategies) { for (const s of strategies) {
if (s.consecutiveLosses !== undefined) s.consecutiveLosses = 0; if (s.consecutiveLosses !== undefined) s.consecutiveLosses = 0;
@@ -149,6 +179,9 @@ async function main() {
} }
writeState(latestMarketState, paper, strategies); writeState(latestMarketState, paper, strategies);
await notify('🔄 Paper trading reset by admin', 'Kalbot Reset', 'default', 'recycle'); await notify('🔄 Paper trading reset by admin', 'Kalbot Reset', 'default', 'recycle');
} finally {
unlockSettling();
}
} }
} catch { } catch {
// ignore // ignore