diff --git a/index.js b/index.js index 8c01d77..2702ebd 100644 --- a/index.js +++ b/index.js @@ -5,6 +5,7 @@ const BATCH_MS = 800; const BATCH_BYTES = 3400; const HB_INTERVAL_MS = 3000; const MAX_RUN_MS = 7 * 60 * 1000; +const WS_TIMEOUT_MS = 10 * 60 * 1000; const CORS_HEADERS = { 'Access-Control-Allow-Origin': '*', @@ -53,7 +54,7 @@ export class MyDurableObject { constructor(state, env) { this.state = state; this.env = env; - this.sockets = new Set(); + this.sockets = new Map(); this.reset(); } @@ -136,15 +137,28 @@ export class MyDurableObject { else if (!this.flushTimer) this.flushTimer = setTimeout(() => this.flush(false), BATCH_MS); } + closeSocket(ws, reason = '') { + const meta = this.sockets.get(ws); + if (meta?.timer) clearTimeout(meta.timer); + this.sockets.delete(ws); + try { ws.close(1000, reason); } catch {} + } + async fetch(req) { if (req.method === 'OPTIONS') return new Response(null, { status: 204, headers: CORS_HEADERS }); if (req.headers.get('Upgrade') === 'websocket') { const [client, server] = Object.values(new WebSocketPair()); server.accept(); - this.sockets.add(server); - server.addEventListener('close', () => this.sockets.delete(server)); - server.addEventListener('message', e => this.state.waitUntil(this.onMessage(server, e))); + const timer = setTimeout(() => this.closeSocket(server, 'timeout'), WS_TIMEOUT_MS); + this.sockets.set(server, { timer, connectedAt: Date.now() }); + server.addEventListener('close', () => this.closeSocket(server)); + server.addEventListener('message', e => { + const meta = this.sockets.get(server); + if (meta?.timer) { clearTimeout(meta.timer); meta.timer = setTimeout(() => this.closeSocket(server, 'timeout'), WS_TIMEOUT_MS); } + this.state.waitUntil(this.onMessage(server, e)); + }); + this.state.waitUntil(this.checkIdleEviction()); return new Response(null, { status: 101, webSocket: client }); } @@ -346,17 +360,27 @@ export class MyDurableObject { async stopHeartbeat() { this.hbActive = false; - await this.state.storage.setAlarm(null).catch(() => {}); + await this.state.storage.deleteAlarm().catch(() => {}); } async Heart() { if (this.phase !== 'running' || !this.hbActive) return this.stopHeartbeat(); - if (++this.age * HB_INTERVAL_MS >= MAX_RUN_MS) return this.fail('Run timed out after 15 minutes.'); + if (++this.age * HB_INTERVAL_MS >= MAX_RUN_MS) return this.fail('Run timed out after 7 minutes.'); await this.state.storage.setAlarm(Date.now() + HB_INTERVAL_MS).catch(() => {}); } + async checkIdleEviction() { + if (this.phase !== 'idle' || this.sockets.size > 0) return; + await new Promise(r => setTimeout(r, 30000)); + if (this.phase === 'idle' && this.sockets.size === 0) { + await this.state.storage.deleteAlarm().catch(() => {}); + this.sockets.forEach(ws => this.closeSocket(ws, 'idle')); + } + } + async alarm() { await this.autopsy(); + if (this.phase === 'idle' && this.sockets.size === 0) return this.stopHeartbeat(); await this.Heart(); }