diff --git a/run.js b/run.js index ba0cc44..8780653 100644 --- a/run.js +++ b/run.js @@ -4,6 +4,8 @@ import { streamOpenRouter, streamOpenAI, streamClaude, streamGoogle } from './pr const BATCH_MS = 800 const BATCH_BYTES = 3400 +const MAX_RUN_MS = 9 * 60 * 1000 +const CLEANUP_INTERVAL_MS = 60_000 const runs = new Map() @@ -22,6 +24,8 @@ function meta(rid) { pendingImages: [], flushTimer: null, controller: null, + startedAt: snap.startedAt ?? 0, + timeoutTimer: null, } runs.set(rid, r) return r @@ -40,6 +44,8 @@ function ensure(rid) { pendingImages: [], flushTimer: null, controller: null, + startedAt: 0, + timeoutTimer: null, } runs.set(rid, r) return r @@ -51,6 +57,7 @@ function saveSnapshot(r) { seq: r.seq, phase: r.phase, error: r.error, + startedAt: r.startedAt, }) } @@ -97,8 +104,13 @@ function replay(r, ws, after) { else if (['error', 'evicted'].includes(r.phase)) send(ws, { type: 'err', message: r.error || 'The run was terminated unexpectedly.' }) } +function clearTimeoutTimer(r) { + if (r.timeoutTimer) { clearTimeout(r.timeoutTimer); r.timeoutTimer = null } +} + function stop(r) { if (r.phase !== 'running') return + clearTimeoutTimer(r) flush(r, true) r.phase = 'done' r.error = null @@ -110,6 +122,7 @@ function stop(r) { function fail(r, message) { if (r.phase !== 'running') return + clearTimeoutTimer(r) const err = String(message || 'stream_failed') queueDelta(r, `\n\nRun failed: ${err}`) flush(r, true) @@ -157,6 +170,22 @@ async function beginStream(r, { apiKey, body, provider }) { } } +// Periodic cleanup: remove terminal runs with no sockets from the Map +setInterval(() => { + const now = Date.now() + for (const [uid, r] of runs) { + if (r.phase === 'running') { + // Safety: if startedAt is set and exceeded MAX_RUN_MS, force-fail + if (r.startedAt && now - r.startedAt > MAX_RUN_MS) { + fail(r, `Run timed out after ${MAX_RUN_MS / 60000} minutes.`) + } + continue + } + // Terminal run with no connected sockets — safe to evict from Map + if (r.sockets.size === 0) runs.delete(uid) + } +}, CLEANUP_INTERVAL_MS) + export function addSocket(rid, ws) { const r = ensure(rid) r.sockets.add(ws) @@ -208,6 +237,12 @@ export function handleMessage(rid, ws, msg) { r.pending = '' r.pendingImages = [] r.controller = new AbortController() + r.startedAt = Date.now() + + // Hard timeout safety net + r.timeoutTimer = setTimeout(() => { + if (r.phase === 'running') fail(r, `Run timed out after ${MAX_RUN_MS / 60000} minutes.`) + }, MAX_RUN_MS) kv.set(`prompt:${r.rid}`, body.messages) saveSnapshot(r)