diff --git a/index.js b/index.js index 2702ebd..8c01d77 100644 --- a/index.js +++ b/index.js @@ -5,7 +5,6 @@ const BATCH_MS = 800; const BATCH_BYTES = 3400; const HB_INTERVAL_MS = 3000; const MAX_RUN_MS = 7 * 60 * 1000; -const WS_TIMEOUT_MS = 10 * 60 * 1000; const CORS_HEADERS = { 'Access-Control-Allow-Origin': '*', @@ -54,7 +53,7 @@ export class MyDurableObject { constructor(state, env) { this.state = state; this.env = env; - this.sockets = new Map(); + this.sockets = new Set(); this.reset(); } @@ -137,28 +136,15 @@ export class MyDurableObject { else if (!this.flushTimer) this.flushTimer = setTimeout(() => this.flush(false), BATCH_MS); } - closeSocket(ws, reason = '') { - const meta = this.sockets.get(ws); - if (meta?.timer) clearTimeout(meta.timer); - this.sockets.delete(ws); - try { ws.close(1000, reason); } catch {} - } - async fetch(req) { if (req.method === 'OPTIONS') return new Response(null, { status: 204, headers: CORS_HEADERS }); if (req.headers.get('Upgrade') === 'websocket') { const [client, server] = Object.values(new WebSocketPair()); server.accept(); - const timer = setTimeout(() => this.closeSocket(server, 'timeout'), WS_TIMEOUT_MS); - this.sockets.set(server, { timer, connectedAt: Date.now() }); - server.addEventListener('close', () => this.closeSocket(server)); - server.addEventListener('message', e => { - const meta = this.sockets.get(server); - if (meta?.timer) { clearTimeout(meta.timer); meta.timer = setTimeout(() => this.closeSocket(server, 'timeout'), WS_TIMEOUT_MS); } - this.state.waitUntil(this.onMessage(server, e)); - }); - this.state.waitUntil(this.checkIdleEviction()); + this.sockets.add(server); + server.addEventListener('close', () => this.sockets.delete(server)); + server.addEventListener('message', e => this.state.waitUntil(this.onMessage(server, e))); return new Response(null, { status: 101, webSocket: client }); } @@ -360,27 +346,17 @@ export class MyDurableObject { async stopHeartbeat() { this.hbActive = false; - await this.state.storage.deleteAlarm().catch(() => {}); + await this.state.storage.setAlarm(null).catch(() => {}); } async Heart() { if (this.phase !== 'running' || !this.hbActive) return this.stopHeartbeat(); - if (++this.age * HB_INTERVAL_MS >= MAX_RUN_MS) return this.fail('Run timed out after 7 minutes.'); + if (++this.age * HB_INTERVAL_MS >= MAX_RUN_MS) return this.fail('Run timed out after 15 minutes.'); await this.state.storage.setAlarm(Date.now() + HB_INTERVAL_MS).catch(() => {}); } - async checkIdleEviction() { - if (this.phase !== 'idle' || this.sockets.size > 0) return; - await new Promise(r => setTimeout(r, 30000)); - if (this.phase === 'idle' && this.sockets.size === 0) { - await this.state.storage.deleteAlarm().catch(() => {}); - this.sockets.forEach(ws => this.closeSocket(ws, 'idle')); - } - } - async alarm() { await this.autopsy(); - if (this.phase === 'idle' && this.sockets.size === 0) return this.stopHeartbeat(); await this.Heart(); }