mirror of
https://github.com/sune-org/ORP.git
synced 2026-01-13 16:17:59 +00:00
Fix: Add WS timeout, alarm cleanup, and eviction guards
This commit is contained in:
36
index.js
36
index.js
@@ -5,6 +5,7 @@ const BATCH_MS = 800;
|
||||
const BATCH_BYTES = 3400;
|
||||
const HB_INTERVAL_MS = 3000;
|
||||
const MAX_RUN_MS = 7 * 60 * 1000;
|
||||
const WS_TIMEOUT_MS = 10 * 60 * 1000;
|
||||
|
||||
const CORS_HEADERS = {
|
||||
'Access-Control-Allow-Origin': '*',
|
||||
@@ -53,7 +54,7 @@ export class MyDurableObject {
|
||||
constructor(state, env) {
|
||||
this.state = state;
|
||||
this.env = env;
|
||||
this.sockets = new Set();
|
||||
this.sockets = new Map();
|
||||
this.reset();
|
||||
}
|
||||
|
||||
@@ -136,15 +137,28 @@ export class MyDurableObject {
|
||||
else if (!this.flushTimer) this.flushTimer = setTimeout(() => this.flush(false), BATCH_MS);
|
||||
}
|
||||
|
||||
closeSocket(ws, reason = '') {
|
||||
const meta = this.sockets.get(ws);
|
||||
if (meta?.timer) clearTimeout(meta.timer);
|
||||
this.sockets.delete(ws);
|
||||
try { ws.close(1000, reason); } catch {}
|
||||
}
|
||||
|
||||
async fetch(req) {
|
||||
if (req.method === 'OPTIONS') return new Response(null, { status: 204, headers: CORS_HEADERS });
|
||||
|
||||
if (req.headers.get('Upgrade') === 'websocket') {
|
||||
const [client, server] = Object.values(new WebSocketPair());
|
||||
server.accept();
|
||||
this.sockets.add(server);
|
||||
server.addEventListener('close', () => this.sockets.delete(server));
|
||||
server.addEventListener('message', e => this.state.waitUntil(this.onMessage(server, e)));
|
||||
const timer = setTimeout(() => this.closeSocket(server, 'timeout'), WS_TIMEOUT_MS);
|
||||
this.sockets.set(server, { timer, connectedAt: Date.now() });
|
||||
server.addEventListener('close', () => this.closeSocket(server));
|
||||
server.addEventListener('message', e => {
|
||||
const meta = this.sockets.get(server);
|
||||
if (meta?.timer) { clearTimeout(meta.timer); meta.timer = setTimeout(() => this.closeSocket(server, 'timeout'), WS_TIMEOUT_MS); }
|
||||
this.state.waitUntil(this.onMessage(server, e));
|
||||
});
|
||||
this.state.waitUntil(this.checkIdleEviction());
|
||||
return new Response(null, { status: 101, webSocket: client });
|
||||
}
|
||||
|
||||
@@ -346,17 +360,27 @@ export class MyDurableObject {
|
||||
|
||||
async stopHeartbeat() {
|
||||
this.hbActive = false;
|
||||
await this.state.storage.setAlarm(null).catch(() => {});
|
||||
await this.state.storage.deleteAlarm().catch(() => {});
|
||||
}
|
||||
|
||||
async Heart() {
|
||||
if (this.phase !== 'running' || !this.hbActive) return this.stopHeartbeat();
|
||||
if (++this.age * HB_INTERVAL_MS >= MAX_RUN_MS) return this.fail('Run timed out after 15 minutes.');
|
||||
if (++this.age * HB_INTERVAL_MS >= MAX_RUN_MS) return this.fail('Run timed out after 7 minutes.');
|
||||
await this.state.storage.setAlarm(Date.now() + HB_INTERVAL_MS).catch(() => {});
|
||||
}
|
||||
|
||||
async checkIdleEviction() {
|
||||
if (this.phase !== 'idle' || this.sockets.size > 0) return;
|
||||
await new Promise(r => setTimeout(r, 30000));
|
||||
if (this.phase === 'idle' && this.sockets.size === 0) {
|
||||
await this.state.storage.deleteAlarm().catch(() => {});
|
||||
this.sockets.forEach(ws => this.closeSocket(ws, 'idle'));
|
||||
}
|
||||
}
|
||||
|
||||
async alarm() {
|
||||
await this.autopsy();
|
||||
if (this.phase === 'idle' && this.sockets.size === 0) return this.stopHeartbeat();
|
||||
await this.Heart();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user