This commit is contained in:
2025-09-29 19:02:18 -07:00
parent 7fc5ffe9f0
commit ec22f61c90

View File

@@ -5,7 +5,6 @@ const BATCH_MS = 800;
const BATCH_BYTES = 3400; const BATCH_BYTES = 3400;
const HB_INTERVAL_MS = 3000; const HB_INTERVAL_MS = 3000;
const MAX_RUN_MS = 7 * 60 * 1000; const MAX_RUN_MS = 7 * 60 * 1000;
const WS_TIMEOUT_MS = 10 * 60 * 1000;
const CORS_HEADERS = { const CORS_HEADERS = {
'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Origin': '*',
@@ -54,7 +53,7 @@ export class MyDurableObject {
constructor(state, env) { constructor(state, env) {
this.state = state; this.state = state;
this.env = env; this.env = env;
this.sockets = new Map(); this.sockets = new Set();
this.reset(); this.reset();
} }
@@ -137,28 +136,15 @@ export class MyDurableObject {
else if (!this.flushTimer) this.flushTimer = setTimeout(() => this.flush(false), BATCH_MS); else if (!this.flushTimer) this.flushTimer = setTimeout(() => this.flush(false), BATCH_MS);
} }
closeSocket(ws, reason = '') {
const meta = this.sockets.get(ws);
if (meta?.timer) clearTimeout(meta.timer);
this.sockets.delete(ws);
try { ws.close(1000, reason); } catch {}
}
async fetch(req) { async fetch(req) {
if (req.method === 'OPTIONS') return new Response(null, { status: 204, headers: CORS_HEADERS }); if (req.method === 'OPTIONS') return new Response(null, { status: 204, headers: CORS_HEADERS });
if (req.headers.get('Upgrade') === 'websocket') { if (req.headers.get('Upgrade') === 'websocket') {
const [client, server] = Object.values(new WebSocketPair()); const [client, server] = Object.values(new WebSocketPair());
server.accept(); server.accept();
const timer = setTimeout(() => this.closeSocket(server, 'timeout'), WS_TIMEOUT_MS); this.sockets.add(server);
this.sockets.set(server, { timer, connectedAt: Date.now() }); server.addEventListener('close', () => this.sockets.delete(server));
server.addEventListener('close', () => this.closeSocket(server)); server.addEventListener('message', e => this.state.waitUntil(this.onMessage(server, e)));
server.addEventListener('message', e => {
const meta = this.sockets.get(server);
if (meta?.timer) { clearTimeout(meta.timer); meta.timer = setTimeout(() => this.closeSocket(server, 'timeout'), WS_TIMEOUT_MS); }
this.state.waitUntil(this.onMessage(server, e));
});
this.state.waitUntil(this.checkIdleEviction());
return new Response(null, { status: 101, webSocket: client }); return new Response(null, { status: 101, webSocket: client });
} }
@@ -360,27 +346,17 @@ export class MyDurableObject {
async stopHeartbeat() { async stopHeartbeat() {
this.hbActive = false; this.hbActive = false;
await this.state.storage.deleteAlarm().catch(() => {}); await this.state.storage.setAlarm(null).catch(() => {});
} }
async Heart() { async Heart() {
if (this.phase !== 'running' || !this.hbActive) return this.stopHeartbeat(); if (this.phase !== 'running' || !this.hbActive) return this.stopHeartbeat();
if (++this.age * HB_INTERVAL_MS >= MAX_RUN_MS) return this.fail('Run timed out after 7 minutes.'); if (++this.age * HB_INTERVAL_MS >= MAX_RUN_MS) return this.fail('Run timed out after 15 minutes.');
await this.state.storage.setAlarm(Date.now() + HB_INTERVAL_MS).catch(() => {}); await this.state.storage.setAlarm(Date.now() + HB_INTERVAL_MS).catch(() => {});
} }
async checkIdleEviction() {
if (this.phase !== 'idle' || this.sockets.size > 0) return;
await new Promise(r => setTimeout(r, 30000));
if (this.phase === 'idle' && this.sockets.size === 0) {
await this.state.storage.deleteAlarm().catch(() => {});
this.sockets.forEach(ws => this.closeSocket(ws, 'idle'));
}
}
async alarm() { async alarm() {
await this.autopsy(); await this.autopsy();
if (this.phase === 'idle' && this.sockets.size === 0) return this.stopHeartbeat();
await this.Heart(); await this.Heart();
} }