Files
KalBot/worker.js

226 lines
6.7 KiB
JavaScript

import { MarketTracker } from './lib/market/tracker.js';
import { PaperEngine } from './lib/paper/engine.js';
import { getMarket } from './lib/kalshi/rest.js';
import { db } from './lib/db.js';
import { notify } from './lib/notify.js';
import fs from 'fs';
import path from 'path';
import { fileURLToPath, pathToFileURL } from 'url';
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const STATE_FILE = '/tmp/kalbot-state.json';
const HEARTBEAT_MS = 2000;
// Mutex lock to prevent evaluate() running before onSettlement() callbacks finish
let isSettling = false;
async function lockSettling() {
while (isSettling) {
await new Promise(r => setTimeout(r, 50));
}
isSettling = true;
}
function unlockSettling() {
isSettling = false;
}
async function main() {
console.log('=== Kalbot Worker Starting ===');
await db.connect();
const paper = new PaperEngine(1000);
await paper.init();
// Dynamically load all strategies from the strategies directory!
const strategies = [];
const strategiesDir = path.join(__dirname, 'lib', 'strategies');
const stratFiles = fs.readdirSync(strategiesDir).filter(f => f.endsWith('.js') && f !== 'base.js');
for (const file of stratFiles) {
try {
const fileUrl = pathToFileURL(path.join(strategiesDir, file)).href;
const mod = await import(fileUrl);
// Grab the first exported class/function
for (const key in mod) {
if (typeof mod[key] === 'function') {
strategies.push(new mod[key]()); // Instances use their own internal default configs!
break;
}
}
} catch (err) {
console.error(`[Worker] Failed to load strategy ${file}:`, err.message);
}
}
for (const s of strategies) {
paper._getAccount(s.name);
}
console.log(`[Worker] Dynamically loaded ${strategies.length} strategies: ${strategies.map((s) => s.name).join(', ')}`);
let latestMarketState = null;
async function processOrphans() {
if (paper._resetting) return;
await lockSettling();
try {
const { settled, expired } = await paper.checkOrphans(getMarket);
const allResolved = [...settled, ...expired];
if (allResolved.length > 0) {
for (const strategy of strategies) {
for (const trade of allResolved) {
if (trade.strategy === strategy.name) {
strategy.onSettlement(trade.result, trade);
}
}
}
writeState(latestMarketState, paper, strategies);
}
} catch (e) {
console.error('[Worker] Orphan check error:', e.message);
} finally {
unlockSettling();
}
}
await processOrphans();
setInterval(processOrphans, 60000);
const tracker = new MarketTracker();
let heartbeatTimer = null;
writeState(latestMarketState, paper, strategies);
tracker.on('update', async (state) => {
latestMarketState = state || null;
writeState(latestMarketState, paper, strategies);
// Skip evaluating strategies if we are settling to avoid race conditions!
if (!state || paper._resetting || isSettling) return;
for (const strategy of strategies) {
if (!strategy.enabled) continue;
const acct = paper._getAccount(strategy.name);
if (acct.openPositions.size > 0) {
continue;
}
const signal = strategy.evaluate(state);
if (signal) {
console.log(`[Worker] Signal from ${strategy.name}: ${signal.side} @ ${signal.price}¢ — ${signal.reason}`);
if (strategy.mode === 'paper') {
await paper.executeTrade(signal, state);
}
}
}
writeState(latestMarketState, paper, strategies);
});
tracker.on('settled', async ({ ticker, result }) => {
console.log(`[Worker] Market ${ticker} rotated/closed. Result: ${result || 'pending'}`);
if (paper._resetting) return;
if (result) {
await lockSettling();
try {
const settledPositions = await paper.settle(ticker, result);
if (settledPositions) {
for (const strategy of strategies) {
for (const trade of settledPositions) {
strategy.onSettlement(trade.result, trade);
}
}
await notify(
`Market ${ticker} settled: ${result.toUpperCase()}`,
'Market Settled',
'default',
'chart_with_upwards_trend'
);
}
} finally {
unlockSettling();
}
} else {
console.log(`[Worker] Result for ${ticker} pending.`);
}
writeState(latestMarketState, paper, strategies);
});
await tracker.start();
await notify(`🤖 Kalbot Worker started with ${strategies.length} strats!`, 'Kalbot Online', 'low', 'robot,green_circle');
heartbeatTimer = setInterval(() => {
writeState(latestMarketState, paper, strategies);
}, HEARTBEAT_MS);
setInterval(async () => {
try {
if (fs.existsSync('/tmp/kalbot-reset-flag')) {
fs.unlinkSync('/tmp/kalbot-reset-flag');
console.log('[Worker] Reset flag detected — resetting all paper data');
await lockSettling();
try {
await paper.resetAll();
for (const s of strategies) {
if (s.consecutiveLosses !== undefined) s.consecutiveLosses = 0;
if (s.currentBetSize !== undefined) s.currentBetSize = s.config.baseBet;
if (s.round !== undefined) s.round = 0;
if (s.cycleWins !== undefined) s.cycleWins = 0;
if (s.cycleLosses !== undefined) s.cycleLosses = 0;
if (s.totalCycles !== undefined) s.totalCycles = 0;
s.lastTradeTicker = null;
s.lastTradeTime = 0;
}
writeState(latestMarketState, paper, strategies);
await notify('🔄 Paper trading reset by admin', 'Kalbot Reset', 'default', 'recycle');
} finally {
unlockSettling();
}
}
} catch {
// ignore
}
}, 1000);
console.log('[Worker] Running. Press Ctrl+C to stop.');
const shutdown = async (signal) => {
console.log(`\n[Worker] ${signal} received. Shutting down...`);
clearInterval(heartbeatTimer);
tracker.stop();
await notify('🔴 Kalbot Worker stopped', 'Kalbot Offline', 'high', 'robot,red_circle');
process.exit(0);
};
process.on('SIGINT', () => shutdown('SIGINT'));
process.on('SIGTERM', () => shutdown('SIGTERM'));
}
function writeState(marketState, paper, strategies) {
const data = {
market: marketState,
paper: paper.getStats(),
paperByStrategy: paper.getPerStrategyStats(),
strategies: strategies.map((s) => s.toJSON()),
workerUptime: process.uptime(),
lastUpdate: Date.now()
};
try {
fs.writeFileSync(STATE_FILE, JSON.stringify(data));
} catch (e) {
// Non-critical
}
}
main().catch((err) => {
console.error('[Worker] Fatal error:', err);
process.exit(1);
});