diff --git a/index.js b/index.js index 4447b49..21157e5 100644 --- a/index.js +++ b/index.js @@ -27,16 +27,12 @@ export default { const url = new URL(req.url); const method = req.method.toUpperCase(); - if (method === 'OPTIONS') { - return new Response(null, { status: 204, headers: CORS_HEADERS }); - } + if (method === 'OPTIONS') return new Response(null, { status: 204, headers: CORS_HEADERS }); if (url.pathname === '/ws') { const isGet = method === 'GET'; const isWs = req.headers.get('Upgrade') === 'websocket'; - if (!isGet && !isWs) { - return withCORS(new Response('method not allowed', { status: 405 })); - } + if (!isGet && !isWs) return withCORS(new Response('method not allowed', { status: 405 })); const rawUid = url.searchParams.get('uid') || 'anon'; const uid = String(rawUid).slice(0, 64).replace(/[^a-zA-Z0-9_-]/g, '') || 'anon'; @@ -76,22 +72,14 @@ export class MyDurableObject { } corsJSON(obj, status = 200) { - return new Response(JSON.stringify(obj), { - status, - headers: { 'Content-Type': 'application/json', 'Cache-Control': 'no-store', ...CORS_HEADERS } - }); + return new Response(JSON.stringify(obj), { status, headers: { 'Content-Type': 'application/json', 'Cache-Control': 'no-store', ...CORS_HEADERS } }); } - send(ws, obj) { - try { ws.send(JSON.stringify(obj)); } catch {} - } + send(ws, obj) { try { ws.send(JSON.stringify(obj)); } catch {} } - bcast(obj) { - const message = JSON.stringify(obj); - this.sockets.forEach(ws => { try { ws.send(message); } catch {} }); - } + bcast(obj) { this.sockets.forEach(ws => this.send(ws, obj)); } - async restoreIfCold() { + async autopsy() { if (this.rid) return; const snap = await this.state.storage.get('run').catch(() => null); @@ -118,59 +106,35 @@ export class MyDurableObject { saveSnapshot() { this.lastSavedAt = Date.now(); - const data = { - rid: this.rid, - buffer: this.buffer, - seq: this.seq, - age: this.age, - phase: this.phase, - error: this.error, - savedAt: this.lastSavedAt, - }; - this.state.storage.put('run', data).catch(() => {}); + this.state.storage.put('run', { rid: this.rid, buffer: this.buffer, seq: this.seq, age: this.age, phase: this.phase, error: this.error, savedAt: this.lastSavedAt }).catch(() => {}); } replay(ws, after) { - this.buffer.forEach(it => { - if (it.seq > after) this.send(ws, { type: 'delta', seq: it.seq, text: it.text }); - }); - - if (this.phase === 'done') { - this.send(ws, { type: 'done' }); - } else if (this.phase === 'error' || this.phase === 'evicted') { - this.send(ws, { type: 'err', message: this.error || 'The run was terminated unexpectedly.' }); - } + this.buffer.forEach(it => { if (it.seq > after) this.send(ws, { type: 'delta', seq: it.seq, text: it.text }); }); + if (this.phase === 'done') this.send(ws, { type: 'done' }); + else if (['error', 'evicted'].includes(this.phase)) this.send(ws, { type: 'err', message: this.error || 'The run was terminated unexpectedly.' }); } flush(force = false) { - if (this.flushTimer) { - clearTimeout(this.flushTimer); - this.flushTimer = null; - } + if (this.flushTimer) { clearTimeout(this.flushTimer); this.flushTimer = null; } if (this.pending) { this.buffer.push({ seq: ++this.seq, text: this.pending }); this.bcast({ type: 'delta', seq: this.seq, text: this.pending }); this.pending = ''; this.lastFlushedAt = Date.now(); } - // Only save the snapshot if explicitly forced (i.e., at the end of a run). if (force) this.saveSnapshot(); } queueDelta(text) { if (!text) return; this.pending += text; - if (this.pending.length >= BATCH_BYTES) { - this.flush(false); - } else if (!this.flushTimer) { - this.flushTimer = setTimeout(() => this.flush(false), BATCH_MS); - } + if (this.pending.length >= BATCH_BYTES) this.flush(false); + else if (!this.flushTimer) this.flushTimer = setTimeout(() => this.flush(false), BATCH_MS); } async fetch(req) { - if (req.method === 'OPTIONS') { - return new Response(null, { status: 204, headers: CORS_HEADERS }); - } + if (req.method === 'OPTIONS') return new Response(null, { status: 204, headers: CORS_HEADERS }); if (req.headers.get('Upgrade') === 'websocket') { const [client, server] = Object.values(new WebSocketPair()); server.accept(); @@ -180,57 +144,37 @@ export class MyDurableObject { return new Response(null, { status: 101, webSocket: client }); } if (req.method === 'GET') { - await this.restoreIfCold(); + await this.autopsy(); const text = this.buffer.map(it => it.text).join('') + this.pending; const isTerminal = ['done', 'error', 'evicted'].includes(this.phase); const isError = ['error', 'evicted'].includes(this.phase); - return this.corsJSON({ - rid: this.rid, - seq: this.seq, - phase: this.phase, - done: isTerminal, - error: isError ? (this.error || 'The run was terminated unexpectedly.') : null, - text, - }); + return this.corsJSON({ rid: this.rid, seq: this.seq, phase: this.phase, done: isTerminal, error: isError ? (this.error || 'The run was terminated unexpectedly.') : null, text }); } return this.corsJSON({ error: 'not allowed' }, 405); } async onMessage(ws, evt) { - await this.restoreIfCold(); + await this.autopsy(); let msg; - try { - msg = JSON.parse(String(evt.data || '')); - } catch { - return this.send(ws, { type: 'err', message: 'bad_json' }); - } - - if (msg.type === 'stop') { - if (msg.rid && msg.rid === this.rid) this.stop(); - return; - } + try { msg = JSON.parse(String(evt.data || '')); } + catch { return this.send(ws, { type: 'err', message: 'bad_json' }); } + if (msg.type === 'stop') { if (msg.rid && msg.rid === this.rid) this.stop(); return; } if (msg.type !== 'begin') return this.send(ws, { type: 'err', message: 'bad_type' }); const { rid, apiKey, or_body, model, messages, after, provider } = msg; const body = or_body || (model && Array.isArray(messages) ? { model, messages, stream: true, ...msg } : null); - if (!rid || !apiKey || !body || !Array.isArray(body.messages) || body.messages.length === 0) { - return this.send(ws, { type: 'err', message: 'missing_fields' }); - } - if (this.phase === 'running' && rid !== this.rid) { - return this.send(ws, { type: 'err', message: 'busy' }); - } - if (rid === this.rid && this.phase !== 'idle') { - return this.replay(ws, Number.isFinite(+after) ? +after : -1); - } + if (!rid || !apiKey || !body || !Array.isArray(body.messages) || body.messages.length === 0) return this.send(ws, { type: 'err', message: 'missing_fields' }); + if (this.phase === 'running' && rid !== this.rid) return this.send(ws, { type: 'err', message: 'busy' }); + if (rid === this.rid && this.phase !== 'idle') return this.replay(ws, Number.isFinite(+after) ? +after : -1); this.reset(); this.rid = rid; this.phase = 'running'; this.controller = new AbortController(); - this.saveSnapshot(); // Save the initial state of the run. + this.saveSnapshot(); this.state.waitUntil(this.startHeartbeat()); this.state.waitUntil(this.stream({ apiKey, body, provider: provider || 'openrouter' })); @@ -238,75 +182,45 @@ export class MyDurableObject { async stream({ apiKey, body, provider }) { try { - if (provider === 'openai') { - await this.streamOpenAI({ apiKey, body }); - } else { - await this.streamOpenRouter({ apiKey, body }); - } + if (provider === 'openai') await this.streamOpenAI({ apiKey, body }); + else await this.streamOpenRouter({ apiKey, body }); if (this.phase === 'running') this.stop(); } catch (e) { if (this.phase === 'running') { const msg = String(e?.message || 'stream_failed'); - if (!((e && e.name === 'AbortError') || /abort/i.test(msg))) { - this.fail(msg); - } + if (!((e && e.name === 'AbortError') || /abort/i.test(msg))) this.fail(msg); } } } - isMultimodalMessage(m) { - if (!m || !Array.isArray(m.content)) return false; - return m.content.some(p => p?.type && p.type !== 'text' && p.type !== 'input_text'); - } + isMultimodalMessage(m) { return m && Array.isArray(m.content) && m.content.some(p => p?.type && p.type !== 'text' && p.type !== 'input_text'); } extractTextFromMessage(m) { if (!m) return ''; if (typeof m.content === 'string') return String(m.content); if (!Array.isArray(m.content)) return ''; - return m.content - .filter(p => p && (p.type === 'text' || p.type === 'input_text')) - .map(p => String(p.type === 'text' ? (p.text ?? p.content ?? '') : (p.text ?? ''))) - .join(''); + return m.content.filter(p => p && ['text', 'input_text'].includes(p.type)).map(p => String(p.type === 'text' ? (p.text ?? p.content ?? '') : (p.text ?? ''))).join(''); } mapContentPartToResponses(part) { const type = part?.type || 'text'; - if (type === 'image_url' || type === 'input_image') { - const url = part?.image_url?.url || part?.image_url || ''; - return url ? { type: 'input_image', image_url: String(url) } : null; - } - if (type === 'text' || type === 'input_text') { - const text = type === 'text' ? (part.text ?? part.content ?? '') : (part.text ?? ''); - return { type: 'input_text', text: String(text) }; - } + if (['image_url', 'input_image'].includes(type)) return (part?.image_url?.url || part?.image_url) ? { type: 'input_image', image_url: String(part?.image_url?.url || part?.image_url) } : null; + if (['text', 'input_text'].includes(type)) return { type: 'input_text', text: String(type === 'text' ? (part.text ?? part.content ?? '') : (part.text ?? '')) }; return { type: 'input_text', text: `[${type}:${part?.file?.filename || 'file'}]` }; } buildInputForResponses(messages) { if (!Array.isArray(messages) || messages.length === 0) return ''; - const isMulti = messages.some(m => this.isMultimodalMessage(m)); - - if (!isMulti) { + if (!messages.some(m => this.isMultimodalMessage(m))) { if (messages.length === 1) return this.extractTextFromMessage(messages[0]); return messages.map(m => ({ role: m.role, content: this.extractTextFromMessage(m) })); } - return messages.map(m => { - let content = [{ type: 'input_text', text: String(m.content || '') }]; - if (Array.isArray(m.content)) { - content = m.content.map(p => this.mapContentPartToResponses(p)).filter(Boolean); - } - return { role: m.role, content }; - }); + return messages.map(m => ({ role: m.role, content: Array.isArray(m.content) ? m.content.map(p => this.mapContentPartToResponses(p)).filter(Boolean) : [{ type: 'input_text', text: String(m.content || '') }] })); } async streamOpenAI({ apiKey, body }) { const client = new OpenAI({ apiKey }); - const params = { - model: body.model, - input: this.buildInputForResponses(body.messages || []), - temperature: body.temperature, - stream: true, - }; + const params = { model: body.model, input: this.buildInputForResponses(body.messages || []), temperature: body.temperature, stream: true }; if (Number.isFinite(+body.max_tokens) && +body.max_tokens > 0) params.max_output_tokens = +body.max_tokens; if (Number.isFinite(+body.top_p)) params.top_p = +body.top_p; if (body.reasoning?.effort) params.reasoning = { effort: body.reasoning.effort }; @@ -326,10 +240,7 @@ export class MyDurableObject { async streamOpenRouter({ apiKey, body }) { const client = new OpenAI({ apiKey, baseURL: 'https://openrouter.ai/api/v1' }); - const stream = await client.chat.completions.create( - { ...body, stream: true }, - { signal: this.controller.signal } - ); + const stream = await client.chat.completions.create({ ...body, stream: true }, { signal: this.controller.signal }); for await (const chunk of stream) { if (this.phase !== 'running') break; const delta = chunk?.choices?.[0]?.delta?.content ?? ''; @@ -339,24 +250,24 @@ export class MyDurableObject { stop() { if (this.phase !== 'running') return; - this.flush(true); // Flush and force a final save. + this.flush(true); this.phase = 'done'; this.error = null; try { this.controller?.abort(); } catch {} try { this.oaStream?.controller?.abort(); } catch {} - this.saveSnapshot(); // Explicitly save final 'done' state. + this.saveSnapshot(); this.bcast({ type: 'done' }); this.state.waitUntil(this.stopHeartbeat()); } fail(message) { if (this.phase === 'error') return; - this.flush(true); // Flush and force a final save. + this.flush(true); this.phase = 'error'; this.error = String(message || 'stream_failed'); try { this.controller?.abort(); } catch {} try { this.oaStream?.controller?.abort(); } catch {} - this.saveSnapshot(); // Explicitly save final 'error' state. + this.saveSnapshot(); this.bcast({ type: 'err', message: this.error }); this.state.waitUntil(this.stopHeartbeat()); } @@ -364,16 +275,12 @@ export class MyDurableObject { async startHeartbeat() { if (this.hbActive || this.phase !== 'running') return; this.hbActive = true; - try { - await this.state.storage.setAlarm(Date.now() + HB_INTERVAL_MS); - } catch {} + try { await this.state.storage.setAlarm(Date.now() + HB_INTERVAL_MS); } catch {} } async stopHeartbeat() { this.hbActive = false; - try { - await this.state.storage.setAlarm(null); - } catch {} + try { await this.state.storage.setAlarm(null); } catch {} } async Heart() { @@ -383,7 +290,7 @@ export class MyDurableObject { } async alarm() { - await this.restoreIfCold(); + await this.autopsy(); await this.Heart(); } }