This commit is contained in:
2025-09-06 09:12:25 -07:00
parent b9d503490e
commit 51540b8ea4

177
index.js
View File

@@ -27,16 +27,12 @@ export default {
const url = new URL(req.url); const url = new URL(req.url);
const method = req.method.toUpperCase(); const method = req.method.toUpperCase();
if (method === 'OPTIONS') { if (method === 'OPTIONS') return new Response(null, { status: 204, headers: CORS_HEADERS });
return new Response(null, { status: 204, headers: CORS_HEADERS });
}
if (url.pathname === '/ws') { if (url.pathname === '/ws') {
const isGet = method === 'GET'; const isGet = method === 'GET';
const isWs = req.headers.get('Upgrade') === 'websocket'; const isWs = req.headers.get('Upgrade') === 'websocket';
if (!isGet && !isWs) { if (!isGet && !isWs) return withCORS(new Response('method not allowed', { status: 405 }));
return withCORS(new Response('method not allowed', { status: 405 }));
}
const rawUid = url.searchParams.get('uid') || 'anon'; const rawUid = url.searchParams.get('uid') || 'anon';
const uid = String(rawUid).slice(0, 64).replace(/[^a-zA-Z0-9_-]/g, '') || 'anon'; const uid = String(rawUid).slice(0, 64).replace(/[^a-zA-Z0-9_-]/g, '') || 'anon';
@@ -76,22 +72,14 @@ export class MyDurableObject {
} }
corsJSON(obj, status = 200) { corsJSON(obj, status = 200) {
return new Response(JSON.stringify(obj), { return new Response(JSON.stringify(obj), { status, headers: { 'Content-Type': 'application/json', 'Cache-Control': 'no-store', ...CORS_HEADERS } });
status,
headers: { 'Content-Type': 'application/json', 'Cache-Control': 'no-store', ...CORS_HEADERS }
});
} }
send(ws, obj) { send(ws, obj) { try { ws.send(JSON.stringify(obj)); } catch {} }
try { ws.send(JSON.stringify(obj)); } catch {}
}
bcast(obj) { bcast(obj) { this.sockets.forEach(ws => this.send(ws, obj)); }
const message = JSON.stringify(obj);
this.sockets.forEach(ws => { try { ws.send(message); } catch {} });
}
async restoreIfCold() { async autopsy() {
if (this.rid) return; if (this.rid) return;
const snap = await this.state.storage.get('run').catch(() => null); const snap = await this.state.storage.get('run').catch(() => null);
@@ -118,59 +106,35 @@ export class MyDurableObject {
saveSnapshot() { saveSnapshot() {
this.lastSavedAt = Date.now(); this.lastSavedAt = Date.now();
const data = { this.state.storage.put('run', { rid: this.rid, buffer: this.buffer, seq: this.seq, age: this.age, phase: this.phase, error: this.error, savedAt: this.lastSavedAt }).catch(() => {});
rid: this.rid,
buffer: this.buffer,
seq: this.seq,
age: this.age,
phase: this.phase,
error: this.error,
savedAt: this.lastSavedAt,
};
this.state.storage.put('run', data).catch(() => {});
} }
replay(ws, after) { replay(ws, after) {
this.buffer.forEach(it => { this.buffer.forEach(it => { if (it.seq > after) this.send(ws, { type: 'delta', seq: it.seq, text: it.text }); });
if (it.seq > after) this.send(ws, { type: 'delta', seq: it.seq, text: it.text }); if (this.phase === 'done') this.send(ws, { type: 'done' });
}); else if (['error', 'evicted'].includes(this.phase)) this.send(ws, { type: 'err', message: this.error || 'The run was terminated unexpectedly.' });
if (this.phase === 'done') {
this.send(ws, { type: 'done' });
} else if (this.phase === 'error' || this.phase === 'evicted') {
this.send(ws, { type: 'err', message: this.error || 'The run was terminated unexpectedly.' });
}
} }
flush(force = false) { flush(force = false) {
if (this.flushTimer) { if (this.flushTimer) { clearTimeout(this.flushTimer); this.flushTimer = null; }
clearTimeout(this.flushTimer);
this.flushTimer = null;
}
if (this.pending) { if (this.pending) {
this.buffer.push({ seq: ++this.seq, text: this.pending }); this.buffer.push({ seq: ++this.seq, text: this.pending });
this.bcast({ type: 'delta', seq: this.seq, text: this.pending }); this.bcast({ type: 'delta', seq: this.seq, text: this.pending });
this.pending = ''; this.pending = '';
this.lastFlushedAt = Date.now(); this.lastFlushedAt = Date.now();
} }
// Only save the snapshot if explicitly forced (i.e., at the end of a run).
if (force) this.saveSnapshot(); if (force) this.saveSnapshot();
} }
queueDelta(text) { queueDelta(text) {
if (!text) return; if (!text) return;
this.pending += text; this.pending += text;
if (this.pending.length >= BATCH_BYTES) { if (this.pending.length >= BATCH_BYTES) this.flush(false);
this.flush(false); else if (!this.flushTimer) this.flushTimer = setTimeout(() => this.flush(false), BATCH_MS);
} else if (!this.flushTimer) {
this.flushTimer = setTimeout(() => this.flush(false), BATCH_MS);
}
} }
async fetch(req) { async fetch(req) {
if (req.method === 'OPTIONS') { if (req.method === 'OPTIONS') return new Response(null, { status: 204, headers: CORS_HEADERS });
return new Response(null, { status: 204, headers: CORS_HEADERS });
}
if (req.headers.get('Upgrade') === 'websocket') { if (req.headers.get('Upgrade') === 'websocket') {
const [client, server] = Object.values(new WebSocketPair()); const [client, server] = Object.values(new WebSocketPair());
server.accept(); server.accept();
@@ -180,57 +144,37 @@ export class MyDurableObject {
return new Response(null, { status: 101, webSocket: client }); return new Response(null, { status: 101, webSocket: client });
} }
if (req.method === 'GET') { if (req.method === 'GET') {
await this.restoreIfCold(); await this.autopsy();
const text = this.buffer.map(it => it.text).join('') + this.pending; const text = this.buffer.map(it => it.text).join('') + this.pending;
const isTerminal = ['done', 'error', 'evicted'].includes(this.phase); const isTerminal = ['done', 'error', 'evicted'].includes(this.phase);
const isError = ['error', 'evicted'].includes(this.phase); const isError = ['error', 'evicted'].includes(this.phase);
return this.corsJSON({ return this.corsJSON({ rid: this.rid, seq: this.seq, phase: this.phase, done: isTerminal, error: isError ? (this.error || 'The run was terminated unexpectedly.') : null, text });
rid: this.rid,
seq: this.seq,
phase: this.phase,
done: isTerminal,
error: isError ? (this.error || 'The run was terminated unexpectedly.') : null,
text,
});
} }
return this.corsJSON({ error: 'not allowed' }, 405); return this.corsJSON({ error: 'not allowed' }, 405);
} }
async onMessage(ws, evt) { async onMessage(ws, evt) {
await this.restoreIfCold(); await this.autopsy();
let msg; let msg;
try { try { msg = JSON.parse(String(evt.data || '')); }
msg = JSON.parse(String(evt.data || '')); catch { return this.send(ws, { type: 'err', message: 'bad_json' }); }
} catch {
return this.send(ws, { type: 'err', message: 'bad_json' });
}
if (msg.type === 'stop') {
if (msg.rid && msg.rid === this.rid) this.stop();
return;
}
if (msg.type === 'stop') { if (msg.rid && msg.rid === this.rid) this.stop(); return; }
if (msg.type !== 'begin') return this.send(ws, { type: 'err', message: 'bad_type' }); if (msg.type !== 'begin') return this.send(ws, { type: 'err', message: 'bad_type' });
const { rid, apiKey, or_body, model, messages, after, provider } = msg; const { rid, apiKey, or_body, model, messages, after, provider } = msg;
const body = or_body || (model && Array.isArray(messages) ? { model, messages, stream: true, ...msg } : null); const body = or_body || (model && Array.isArray(messages) ? { model, messages, stream: true, ...msg } : null);
if (!rid || !apiKey || !body || !Array.isArray(body.messages) || body.messages.length === 0) { if (!rid || !apiKey || !body || !Array.isArray(body.messages) || body.messages.length === 0) return this.send(ws, { type: 'err', message: 'missing_fields' });
return this.send(ws, { type: 'err', message: 'missing_fields' }); if (this.phase === 'running' && rid !== this.rid) return this.send(ws, { type: 'err', message: 'busy' });
} if (rid === this.rid && this.phase !== 'idle') return this.replay(ws, Number.isFinite(+after) ? +after : -1);
if (this.phase === 'running' && rid !== this.rid) {
return this.send(ws, { type: 'err', message: 'busy' });
}
if (rid === this.rid && this.phase !== 'idle') {
return this.replay(ws, Number.isFinite(+after) ? +after : -1);
}
this.reset(); this.reset();
this.rid = rid; this.rid = rid;
this.phase = 'running'; this.phase = 'running';
this.controller = new AbortController(); this.controller = new AbortController();
this.saveSnapshot(); // Save the initial state of the run. this.saveSnapshot();
this.state.waitUntil(this.startHeartbeat()); this.state.waitUntil(this.startHeartbeat());
this.state.waitUntil(this.stream({ apiKey, body, provider: provider || 'openrouter' })); this.state.waitUntil(this.stream({ apiKey, body, provider: provider || 'openrouter' }));
@@ -238,75 +182,45 @@ export class MyDurableObject {
async stream({ apiKey, body, provider }) { async stream({ apiKey, body, provider }) {
try { try {
if (provider === 'openai') { if (provider === 'openai') await this.streamOpenAI({ apiKey, body });
await this.streamOpenAI({ apiKey, body }); else await this.streamOpenRouter({ apiKey, body });
} else {
await this.streamOpenRouter({ apiKey, body });
}
if (this.phase === 'running') this.stop(); if (this.phase === 'running') this.stop();
} catch (e) { } catch (e) {
if (this.phase === 'running') { if (this.phase === 'running') {
const msg = String(e?.message || 'stream_failed'); const msg = String(e?.message || 'stream_failed');
if (!((e && e.name === 'AbortError') || /abort/i.test(msg))) { if (!((e && e.name === 'AbortError') || /abort/i.test(msg))) this.fail(msg);
this.fail(msg);
}
} }
} }
} }
isMultimodalMessage(m) { isMultimodalMessage(m) { return m && Array.isArray(m.content) && m.content.some(p => p?.type && p.type !== 'text' && p.type !== 'input_text'); }
if (!m || !Array.isArray(m.content)) return false;
return m.content.some(p => p?.type && p.type !== 'text' && p.type !== 'input_text');
}
extractTextFromMessage(m) { extractTextFromMessage(m) {
if (!m) return ''; if (!m) return '';
if (typeof m.content === 'string') return String(m.content); if (typeof m.content === 'string') return String(m.content);
if (!Array.isArray(m.content)) return ''; if (!Array.isArray(m.content)) return '';
return m.content return m.content.filter(p => p && ['text', 'input_text'].includes(p.type)).map(p => String(p.type === 'text' ? (p.text ?? p.content ?? '') : (p.text ?? ''))).join('');
.filter(p => p && (p.type === 'text' || p.type === 'input_text'))
.map(p => String(p.type === 'text' ? (p.text ?? p.content ?? '') : (p.text ?? '')))
.join('');
} }
mapContentPartToResponses(part) { mapContentPartToResponses(part) {
const type = part?.type || 'text'; const type = part?.type || 'text';
if (type === 'image_url' || type === 'input_image') { if (['image_url', 'input_image'].includes(type)) return (part?.image_url?.url || part?.image_url) ? { type: 'input_image', image_url: String(part?.image_url?.url || part?.image_url) } : null;
const url = part?.image_url?.url || part?.image_url || ''; if (['text', 'input_text'].includes(type)) return { type: 'input_text', text: String(type === 'text' ? (part.text ?? part.content ?? '') : (part.text ?? '')) };
return url ? { type: 'input_image', image_url: String(url) } : null;
}
if (type === 'text' || type === 'input_text') {
const text = type === 'text' ? (part.text ?? part.content ?? '') : (part.text ?? '');
return { type: 'input_text', text: String(text) };
}
return { type: 'input_text', text: `[${type}:${part?.file?.filename || 'file'}]` }; return { type: 'input_text', text: `[${type}:${part?.file?.filename || 'file'}]` };
} }
buildInputForResponses(messages) { buildInputForResponses(messages) {
if (!Array.isArray(messages) || messages.length === 0) return ''; if (!Array.isArray(messages) || messages.length === 0) return '';
const isMulti = messages.some(m => this.isMultimodalMessage(m)); if (!messages.some(m => this.isMultimodalMessage(m))) {
if (!isMulti) {
if (messages.length === 1) return this.extractTextFromMessage(messages[0]); if (messages.length === 1) return this.extractTextFromMessage(messages[0]);
return messages.map(m => ({ role: m.role, content: this.extractTextFromMessage(m) })); return messages.map(m => ({ role: m.role, content: this.extractTextFromMessage(m) }));
} }
return messages.map(m => { return messages.map(m => ({ role: m.role, content: Array.isArray(m.content) ? m.content.map(p => this.mapContentPartToResponses(p)).filter(Boolean) : [{ type: 'input_text', text: String(m.content || '') }] }));
let content = [{ type: 'input_text', text: String(m.content || '') }];
if (Array.isArray(m.content)) {
content = m.content.map(p => this.mapContentPartToResponses(p)).filter(Boolean);
}
return { role: m.role, content };
});
} }
async streamOpenAI({ apiKey, body }) { async streamOpenAI({ apiKey, body }) {
const client = new OpenAI({ apiKey }); const client = new OpenAI({ apiKey });
const params = { const params = { model: body.model, input: this.buildInputForResponses(body.messages || []), temperature: body.temperature, stream: true };
model: body.model,
input: this.buildInputForResponses(body.messages || []),
temperature: body.temperature,
stream: true,
};
if (Number.isFinite(+body.max_tokens) && +body.max_tokens > 0) params.max_output_tokens = +body.max_tokens; if (Number.isFinite(+body.max_tokens) && +body.max_tokens > 0) params.max_output_tokens = +body.max_tokens;
if (Number.isFinite(+body.top_p)) params.top_p = +body.top_p; if (Number.isFinite(+body.top_p)) params.top_p = +body.top_p;
if (body.reasoning?.effort) params.reasoning = { effort: body.reasoning.effort }; if (body.reasoning?.effort) params.reasoning = { effort: body.reasoning.effort };
@@ -326,10 +240,7 @@ export class MyDurableObject {
async streamOpenRouter({ apiKey, body }) { async streamOpenRouter({ apiKey, body }) {
const client = new OpenAI({ apiKey, baseURL: 'https://openrouter.ai/api/v1' }); const client = new OpenAI({ apiKey, baseURL: 'https://openrouter.ai/api/v1' });
const stream = await client.chat.completions.create( const stream = await client.chat.completions.create({ ...body, stream: true }, { signal: this.controller.signal });
{ ...body, stream: true },
{ signal: this.controller.signal }
);
for await (const chunk of stream) { for await (const chunk of stream) {
if (this.phase !== 'running') break; if (this.phase !== 'running') break;
const delta = chunk?.choices?.[0]?.delta?.content ?? ''; const delta = chunk?.choices?.[0]?.delta?.content ?? '';
@@ -339,24 +250,24 @@ export class MyDurableObject {
stop() { stop() {
if (this.phase !== 'running') return; if (this.phase !== 'running') return;
this.flush(true); // Flush and force a final save. this.flush(true);
this.phase = 'done'; this.phase = 'done';
this.error = null; this.error = null;
try { this.controller?.abort(); } catch {} try { this.controller?.abort(); } catch {}
try { this.oaStream?.controller?.abort(); } catch {} try { this.oaStream?.controller?.abort(); } catch {}
this.saveSnapshot(); // Explicitly save final 'done' state. this.saveSnapshot();
this.bcast({ type: 'done' }); this.bcast({ type: 'done' });
this.state.waitUntil(this.stopHeartbeat()); this.state.waitUntil(this.stopHeartbeat());
} }
fail(message) { fail(message) {
if (this.phase === 'error') return; if (this.phase === 'error') return;
this.flush(true); // Flush and force a final save. this.flush(true);
this.phase = 'error'; this.phase = 'error';
this.error = String(message || 'stream_failed'); this.error = String(message || 'stream_failed');
try { this.controller?.abort(); } catch {} try { this.controller?.abort(); } catch {}
try { this.oaStream?.controller?.abort(); } catch {} try { this.oaStream?.controller?.abort(); } catch {}
this.saveSnapshot(); // Explicitly save final 'error' state. this.saveSnapshot();
this.bcast({ type: 'err', message: this.error }); this.bcast({ type: 'err', message: this.error });
this.state.waitUntil(this.stopHeartbeat()); this.state.waitUntil(this.stopHeartbeat());
} }
@@ -364,16 +275,12 @@ export class MyDurableObject {
async startHeartbeat() { async startHeartbeat() {
if (this.hbActive || this.phase !== 'running') return; if (this.hbActive || this.phase !== 'running') return;
this.hbActive = true; this.hbActive = true;
try { try { await this.state.storage.setAlarm(Date.now() + HB_INTERVAL_MS); } catch {}
await this.state.storage.setAlarm(Date.now() + HB_INTERVAL_MS);
} catch {}
} }
async stopHeartbeat() { async stopHeartbeat() {
this.hbActive = false; this.hbActive = false;
try { try { await this.state.storage.setAlarm(null); } catch {}
await this.state.storage.setAlarm(null);
} catch {}
} }
async Heart() { async Heart() {
@@ -383,7 +290,7 @@ export class MyDurableObject {
} }
async alarm() { async alarm() {
await this.restoreIfCold(); await this.autopsy();
await this.Heart(); await this.Heart();
} }
} }