Feat: Worker runs both paper + live engines

This commit is contained in:
2026-03-16 11:31:35 -07:00
parent c1bc4750bc
commit e7198fede5

155
worker.js
View File

@@ -1,5 +1,6 @@
import { MarketTracker } from './lib/market/tracker.js'; import { MarketTracker } from './lib/market/tracker.js';
import { PaperEngine } from './lib/paper/engine.js'; import { PaperEngine } from './lib/paper/engine.js';
import { LiveEngine } from './lib/live/engine.js';
import { getMarket } from './lib/kalshi/rest.js'; import { getMarket } from './lib/kalshi/rest.js';
import { db } from './lib/db.js'; import { db } from './lib/db.js';
import { notify } from './lib/notify.js'; import { notify } from './lib/notify.js';
@@ -9,19 +10,16 @@ import { fileURLToPath, pathToFileURL } from 'url';
const __dirname = path.dirname(fileURLToPath(import.meta.url)); const __dirname = path.dirname(fileURLToPath(import.meta.url));
const STATE_FILE = '/tmp/kalbot-state.json'; const STATE_FILE = '/tmp/kalbot-state.json';
const LIVE_STATE_FILE = '/tmp/kalbot-live-state.json';
const HEARTBEAT_MS = 2000; const HEARTBEAT_MS = 2000;
const BALANCE_POLL_MS = 30000;
// Mutex lock to prevent evaluate() running before onSettlement() callbacks finish
let isSettling = false; let isSettling = false;
async function lockSettling() { async function lockSettling() {
while (isSettling) { while (isSettling) await new Promise(r => setTimeout(r, 50));
await new Promise(r => setTimeout(r, 50));
}
isSettling = true; isSettling = true;
} }
function unlockSettling() { function unlockSettling() { isSettling = false; }
isSettling = false;
}
async function main() { async function main() {
console.log('=== Kalbot Worker Starting ==='); console.log('=== Kalbot Worker Starting ===');
@@ -31,7 +29,10 @@ async function main() {
const paper = new PaperEngine(1000); const paper = new PaperEngine(1000);
await paper.init(); await paper.init();
// Dynamically load all strategies from the strategies directory! const live = new LiveEngine();
await live.init();
// Dynamically load all strategies
const strategies = []; const strategies = [];
const strategiesDir = path.join(__dirname, 'lib', 'strategies'); const strategiesDir = path.join(__dirname, 'lib', 'strategies');
const stratFiles = fs.readdirSync(strategiesDir).filter(f => f.endsWith('.js') && f !== 'base.js'); const stratFiles = fs.readdirSync(strategiesDir).filter(f => f.endsWith('.js') && f !== 'base.js');
@@ -40,11 +41,9 @@ async function main() {
try { try {
const fileUrl = pathToFileURL(path.join(strategiesDir, file)).href; const fileUrl = pathToFileURL(path.join(strategiesDir, file)).href;
const mod = await import(fileUrl); const mod = await import(fileUrl);
// Grab the first exported class/function
for (const key in mod) { for (const key in mod) {
if (typeof mod[key] === 'function') { if (typeof mod[key] === 'function') {
strategies.push(new mod[key]()); // Instances use their own internal default configs! strategies.push(new mod[key]());
break; break;
} }
} }
@@ -57,10 +56,19 @@ async function main() {
paper._getAccount(s.name); paper._getAccount(s.name);
} }
console.log(`[Worker] Dynamically loaded ${strategies.length} strategies: ${strategies.map((s) => s.name).join(', ')}`); console.log(`[Worker] Loaded ${strategies.length} strategies: ${strategies.map(s => s.name).join(', ')}`);
let latestMarketState = null; let latestMarketState = null;
// Fetch Kalshi balance periodically
async function pollBalance() {
try { await live.fetchBalance(); } catch {}
try { await live.fetchPositions(); } catch {}
}
await pollBalance();
setInterval(pollBalance, BALANCE_POLL_MS);
// Orphan checker for paper
async function processOrphans() { async function processOrphans() {
if (paper._resetting) return; if (paper._resetting) return;
await lockSettling(); await lockSettling();
@@ -75,7 +83,7 @@ async function main() {
} }
} }
} }
writeState(latestMarketState, paper, strategies); writeState(latestMarketState, paper, live, strategies);
} }
} catch (e) { } catch (e) {
console.error('[Worker] Orphan check error:', e.message); console.error('[Worker] Orphan check error:', e.message);
@@ -90,75 +98,85 @@ async function main() {
const tracker = new MarketTracker(); const tracker = new MarketTracker();
let heartbeatTimer = null; let heartbeatTimer = null;
writeState(latestMarketState, paper, strategies); writeState(latestMarketState, paper, live, strategies);
tracker.on('update', async (state) => { tracker.on('update', async (state) => {
latestMarketState = state || null; latestMarketState = state || null;
writeState(latestMarketState, paper, strategies); writeState(latestMarketState, paper, live, strategies);
// Skip evaluating strategies if we are settling to avoid race conditions!
if (!state || paper._resetting || isSettling) return; if (!state || paper._resetting || isSettling) return;
for (const strategy of strategies) { for (const strategy of strategies) {
if (!strategy.enabled) continue; if (!strategy.enabled) continue;
const acct = paper._getAccount(strategy.name); // Paper trading (all strategies, always on)
if (acct.openPositions.size > 0) { const paperAcct = paper._getAccount(strategy.name);
continue; if (paperAcct.openPositions.size === 0) {
const signal = strategy.evaluate(state);
if (signal) {
console.log(`[Worker] Paper signal from ${strategy.name}: ${signal.side} @ ${signal.price}¢`);
if (strategy.mode === 'paper') {
await paper.executeTrade(signal, state);
}
}
} }
const signal = strategy.evaluate(state); // Live trading (only enabled strategies)
if (signal) { if (live.isStrategyEnabled(strategy.name) && !live.hasOpenPositionForStrategy(strategy.name)) {
console.log(`[Worker] Signal from ${strategy.name}: ${signal.side} @ ${signal.price}¢ — ${signal.reason}`); const liveSignal = strategy.evaluate(state);
if (liveSignal) {
if (strategy.mode === 'paper') { console.log(`[Worker] LIVE signal from ${strategy.name}: ${liveSignal.side} @ ${liveSignal.price}¢`);
await paper.executeTrade(signal, state); await live.executeTrade(liveSignal, state);
} }
} }
} }
writeState(latestMarketState, paper, strategies); writeState(latestMarketState, paper, live, strategies);
}); });
tracker.on('settled', async ({ ticker, result }) => { tracker.on('settled', async ({ ticker, result }) => {
console.log(`[Worker] Market ${ticker} rotated/closed. Result: ${result || 'pending'}`); console.log(`[Worker] Market ${ticker} settled. Result: ${result || 'pending'}`);
if (paper._resetting) return; if (paper._resetting) return;
if (result) { if (result) {
await lockSettling(); await lockSettling();
try { try {
const settledPositions = await paper.settle(ticker, result); // Settle paper positions
if (settledPositions) { const settledPaper = await paper.settle(ticker, result);
if (settledPaper) {
for (const strategy of strategies) { for (const strategy of strategies) {
for (const trade of settledPositions) { for (const trade of settledPaper) {
strategy.onSettlement(trade.result, trade); strategy.onSettlement(trade.result, trade);
} }
} }
}
// Settle live positions
const settledLive = await live.settle(ticker, result);
if (settledPaper || settledLive) {
await notify( await notify(
`Market ${ticker} settled: ${result.toUpperCase()}`, `Market ${ticker} settled: ${result.toUpperCase()}`,
'Market Settled', 'Market Settled', 'default', 'chart_with_upwards_trend'
'default',
'chart_with_upwards_trend'
); );
} }
} finally { } finally {
unlockSettling(); unlockSettling();
} }
} else {
console.log(`[Worker] Result for ${ticker} pending.`);
} }
writeState(latestMarketState, paper, strategies); writeState(latestMarketState, paper, live, strategies);
}); });
await tracker.start(); await tracker.start();
await notify(`🤖 Kalbot Worker started with ${strategies.length} strats!`, 'Kalbot Online', 'low', 'robot,green_circle'); await notify(`🤖 Kalbot Worker started with ${strategies.length} strats!`, 'Kalbot Online', 'low', 'robot,green_circle');
heartbeatTimer = setInterval(() => { heartbeatTimer = setInterval(() => {
writeState(latestMarketState, paper, strategies); writeState(latestMarketState, paper, live, strategies);
}, HEARTBEAT_MS); }, HEARTBEAT_MS);
// Poll for paper reset flag
setInterval(async () => { setInterval(async () => {
try { try {
if (fs.existsSync('/tmp/kalbot-reset-flag')) { if (fs.existsSync('/tmp/kalbot-reset-flag')) {
@@ -177,17 +195,51 @@ async function main() {
s.lastTradeTicker = null; s.lastTradeTicker = null;
s.lastTradeTime = 0; s.lastTradeTime = 0;
} }
writeState(latestMarketState, paper, strategies); writeState(latestMarketState, paper, live, strategies);
await notify('🔄 Paper trading reset by admin', 'Kalbot Reset', 'default', 'recycle'); await notify('🔄 Paper trading reset by admin', 'Kalbot Reset', 'default', 'recycle');
} finally { } finally {
unlockSettling(); unlockSettling();
} }
} }
} catch { } catch {}
// ignore
}
}, 1000); }, 1000);
// Poll for live commands (enable/disable/pause/resume)
setInterval(async () => {
try {
if (fs.existsSync('/tmp/kalbot-live-cmd')) {
const raw = fs.readFileSync('/tmp/kalbot-live-cmd', 'utf-8');
fs.unlinkSync('/tmp/kalbot-live-cmd');
const cmd = JSON.parse(raw);
switch (cmd.action) {
case 'enable':
if (cmd.strategy) {
live.enableStrategy(cmd.strategy);
await notify(`⚡ Strategy "${cmd.strategy}" ENABLED for live trading`, 'Live Enable', 'high', 'zap');
}
break;
case 'disable':
if (cmd.strategy) {
live.disableStrategy(cmd.strategy);
await notify(`🔴 Strategy "${cmd.strategy}" DISABLED`, 'Live Disable', 'default', 'red_circle');
}
break;
case 'pause':
live.pause();
await notify('⏸ Live trading PAUSED by admin', 'Live Paused', 'urgent', 'double_vertical_bar');
break;
case 'resume':
live.resume();
await notify('▶️ Live trading RESUMED by admin', 'Live Resumed', 'high', 'arrow_forward');
break;
}
writeState(latestMarketState, paper, live, strategies);
}
} catch {}
}, 500);
console.log('[Worker] Running. Press Ctrl+C to stop.'); console.log('[Worker] Running. Press Ctrl+C to stop.');
const shutdown = async (signal) => { const shutdown = async (signal) => {
@@ -202,21 +254,26 @@ async function main() {
process.on('SIGTERM', () => shutdown('SIGTERM')); process.on('SIGTERM', () => shutdown('SIGTERM'));
} }
function writeState(marketState, paper, strategies) { function writeState(marketState, paper, live, strategies) {
const data = { const paperData = {
market: marketState, market: marketState,
paper: paper.getStats(), paper: paper.getStats(),
paperByStrategy: paper.getPerStrategyStats(), paperByStrategy: paper.getPerStrategyStats(),
strategies: strategies.map((s) => s.toJSON()), strategies: strategies.map(s => s.toJSON()),
workerUptime: process.uptime(), workerUptime: process.uptime(),
lastUpdate: Date.now() lastUpdate: Date.now()
}; };
try { const liveData = {
fs.writeFileSync(STATE_FILE, JSON.stringify(data)); market: marketState,
} catch (e) { live: live.getStats(),
// Non-critical strategies: strategies.map(s => s.toJSON()),
} workerUptime: process.uptime(),
lastUpdate: Date.now()
};
try { fs.writeFileSync(STATE_FILE, JSON.stringify(paperData)); } catch {}
try { fs.writeFileSync(LIVE_STATE_FILE, JSON.stringify(liveData)); } catch {}
} }
main().catch((err) => { main().catch((err) => {