From 9b6d2a4c4821018e06f8af76a171fc9e49007b7f Mon Sep 17 00:00:00 2001 From: multipleof4 Date: Fri, 26 Sep 2025 16:37:46 -0700 Subject: [PATCH] Update index.js --- index.js | 122 ++++++++++++++++++++++++------------------------------- 1 file changed, 54 insertions(+), 68 deletions(-) diff --git a/index.js b/index.js index 97f4197..528e560 100644 --- a/index.js +++ b/index.js @@ -3,9 +3,6 @@ import OpenAI from 'openai'; const TTL_MS = 20 * 60 * 1000; const BATCH_MS = 800; const BATCH_BYTES = 3400; -// SNAPSHOT_MIN_MS is no longer needed as we removed throttled saving. - -// Heartbeat configuration: run every 4s while streaming to prevent eviction. const HB_INTERVAL_MS = 3000; const MAX_RUN_MS = 15 * 60 * 1000; @@ -40,7 +37,7 @@ export default { const uid = String(rawUid).slice(0, 64).replace(/[^a-zA-Z0-9_-]/g, '') || 'anon'; const id = env.MY_DURABLE_OBJECT.idFromName(uid); const stub = env.MY_DURABLE_OBJECT.get(id); - + const resp = await stub.fetch(req); return isWs ? resp : withCORS(resp); } @@ -83,13 +80,11 @@ export class MyDurableObject { async autopsy() { if (this.rid) return; - const snap = await this.state.storage.get('run').catch(() => null); if (!snap || (Date.now() - (snap.savedAt || 0) >= TTL_MS)) { if (snap) await this.state.storage.delete('run').catch(() => {}); return; } - this.rid = snap.rid || null; this.buffer = Array.isArray(snap.buffer) ? snap.buffer : []; this.seq = Number.isFinite(+snap.seq) ? +snap.seq : -1; @@ -97,12 +92,11 @@ export class MyDurableObject { this.phase = snap.phase || 'done'; this.error = snap.error || null; this.pending = ''; - if (this.phase === 'running') { this.phase = 'evicted'; this.error = 'The run was interrupted due to system eviction.'; - this.saveSnapshot(); // Persist the final 'evicted' state. - this.stopHeartbeat().catch(() => {}); + this.saveSnapshot(); + await this.stopHeartbeat(); } } @@ -150,7 +144,6 @@ export class MyDurableObject { const text = this.buffer.map(it => it.text).join('') + this.pending; const isTerminal = ['done', 'error', 'evicted'].includes(this.phase); const isError = ['error', 'evicted'].includes(this.phase); - return this.corsJSON({ rid: this.rid, seq: this.seq, phase: this.phase, done: isTerminal, error: isError ? (this.error || 'The run was terminated unexpectedly.') : null, text }); } return this.corsJSON({ error: 'not allowed' }, 405); @@ -161,23 +154,18 @@ export class MyDurableObject { let msg; try { msg = JSON.parse(String(evt.data || '')); } catch { return this.send(ws, { type: 'err', message: 'bad_json' }); } - if (msg.type === 'stop') { if (msg.rid && msg.rid === this.rid) this.stop(); return; } if (msg.type !== 'begin') return this.send(ws, { type: 'err', message: 'bad_type' }); - const { rid, apiKey, or_body, model, messages, after, provider } = msg; const body = or_body || (model && Array.isArray(messages) ? { model, messages, stream: true, ...msg } : null); - if (!rid || !apiKey || !body || !Array.isArray(body.messages) || body.messages.length === 0) return this.send(ws, { type: 'err', message: 'missing_fields' }); if (this.phase === 'running' && rid !== this.rid) return this.send(ws, { type: 'err', message: 'busy' }); if (rid === this.rid && this.phase !== 'idle') return this.replay(ws, Number.isFinite(+after) ? +after : -1); - this.reset(); this.rid = rid; this.phase = 'running'; this.controller = new AbortController(); await this.saveSnapshot(); - this.state.waitUntil(this.startHeartbeat()); this.state.waitUntil(this.stream({ apiKey, body, provider: provider || 'openrouter' })); } @@ -196,53 +184,6 @@ export class MyDurableObject { } } - isMultimodalMessage(m) { return m && Array.isArray(m.content) && m.content.some(p => p?.type && p.type !== 'text' && p.type !== 'input_text'); } - - extractTextFromMessage(m) { - if (!m) return ''; - if (typeof m.content === 'string') return String(m.content); - if (!Array.isArray(m.content)) return ''; - return m.content.filter(p => p && ['text', 'input_text'].includes(p.type)).map(p => String(p.type === 'text' ? (p.text ?? p.content ?? '') : (p.text ?? ''))).join(''); - } - - mapContentPartToResponses(part) { - const type = part?.type || 'text'; - if (['image_url', 'input_image'].includes(type)) return (part?.image_url?.url || part?.image_url) ? { type: 'input_image', image_url: String(part?.image_url?.url || part?.image_url) } : null; - if (['text', 'input_text'].includes(type)) return { type: 'input_text', text: String(type === 'text' ? (part.text ?? part.content ?? '') : (part.text ?? '')) }; - return { type: 'input_text', text: `[${type}:${part?.file?.filename || 'file'}]` }; - } - - buildInputForResponses(messages) { - if (!Array.isArray(messages) || messages.length === 0) return ''; - if (!messages.some(m => this.isMultimodalMessage(m))) { - if (messages.length === 1) return this.extractTextFromMessage(messages[0]); - return messages.map(m => ({ role: m.role, content: this.extractTextFromMessage(m) })); - } - return messages.map(m => ({ role: m.role, content: Array.isArray(m.content) ? m.content.map(p => this.mapContentPartToResponses(p)).filter(Boolean) : [{ type: 'input_text', text: String(m.content || '') }] })); - } - - mapToGoogleContents(messages) { - const contents = messages.reduce((acc, m) => { - const role = m.role === 'assistant' ? 'model' : 'user'; - const msgContent = Array.isArray(m.content) ? m.content : [{ type: 'text', text: String(m.content ?? '') }]; - const parts = msgContent.map(p => { - if (p.type === 'text') return { text: p.text || '' }; - if (p.type === 'image_url' && p.image_url?.url) { - const match = p.image_url.url.match(/^data:(image\/\w+);base64,(.*)$/); - if (match) return { inline_data: { mime_type: match[1], data: match[2] } }; - } - return null; - }).filter(Boolean); - - if (!parts.length) return acc; - if (acc.length > 0 && acc.at(-1).role === role) acc.at(-1).parts.push(...parts); - else acc.push({ role, parts }); - return acc; - }, []); - if (contents.at(-1)?.role !== 'user') contents.pop(); - return contents; - } - async streamOpenAI({ apiKey, body }) { const client = new OpenAI({ apiKey }); const params = { model: body.model, input: this.buildInputForResponses(body.messages || []), temperature: body.temperature, stream: true }; @@ -250,7 +191,6 @@ export class MyDurableObject { if (Number.isFinite(+body.top_p)) params.top_p = +body.top_p; if (body.reasoning?.effort) params.reasoning = { effort: body.reasoning.effort }; if (body.verbosity) params.text = { verbosity: body.verbosity }; - this.oaStream = await client.responses.stream(params); try { for await (const event of this.oaStream) { @@ -335,23 +275,69 @@ export class MyDurableObject { async startHeartbeat() { if (this.hbActive || this.phase !== 'running') return; this.hbActive = true; - try { await this.state.storage.setAlarm(Date.now() + HB_INTERVAL_MS); } catch {} + await this.state.storage.setAlarm(Date.now() + HB_INTERVAL_MS).catch(() => {}); } async stopHeartbeat() { + if (!this.hbActive) return; this.hbActive = false; - try { await this.state.storage.setAlarm(null); } catch {} + await this.state.storage.setAlarm(null).catch(() => {}); } async Heart() { - if (this.phase !== 'running' || !this.hbActive) return await this.stopHeartbeat(); + if (this.phase !== 'running' || !this.hbActive) return this.stopHeartbeat(); if (++this.age * HB_INTERVAL_MS >= MAX_RUN_MS) return this.fail('Run timed out after 15 minutes.'); - try { await this.state.storage.setAlarm(Date.now() + HB_INTERVAL_MS); } catch {} + await this.state.storage.setAlarm(Date.now() + HB_INTERVAL_MS).catch(() => {}); } async alarm() { await this.autopsy(); await this.Heart(); } -} + isMultimodalMessage(m) { return m && Array.isArray(m.content) && m.content.some(p => p?.type && p.type !== 'text' && p.type !== 'input_text'); } + + extractTextFromMessage(m) { + if (!m) return ''; + if (typeof m.content === 'string') return String(m.content); + if (!Array.isArray(m.content)) return ''; + return m.content.filter(p => p && ['text', 'input_text'].includes(p.type)).map(p => String(p.type === 'text' ? (p.text ?? p.content ?? '') : (p.text ?? ''))).join(''); + } + + mapContentPartToResponses(part) { + const type = part?.type || 'text'; + if (['image_url', 'input_image'].includes(type)) return (part?.image_url?.url || part?.image_url) ? { type: 'input_image', image_url: String(part?.image_url?.url || part?.image_url) } : null; + if (['text', 'input_text'].includes(type)) return { type: 'input_text', text: String(type === 'text' ? (part.text ?? part.content ?? '') : (part.text ?? '')) }; + return { type: 'input_text', text: `[${type}:${part?.file?.filename || 'file'}]` }; + } + + buildInputForResponses(messages) { + if (!Array.isArray(messages) || messages.length === 0) return ''; + if (!messages.some(m => this.isMultimodalMessage(m))) { + if (messages.length === 1) return this.extractTextFromMessage(messages[0]); + return messages.map(m => ({ role: m.role, content: this.extractTextFromMessage(m) })); + } + return messages.map(m => ({ role: m.role, content: Array.isArray(m.content) ? m.content.map(p => this.mapContentPartToResponses(p)).filter(Boolean) : [{ type: 'input_text', text: String(m.content || '') }] })); + } + + mapToGoogleContents(messages) { + const contents = messages.reduce((acc, m) => { + const role = m.role === 'assistant' ? 'model' : 'user'; + const msgContent = Array.isArray(m.content) ? m.content : [{ type: 'text', text: String(m.content ?? '') }]; + const parts = msgContent.map(p => { + if (p.type === 'text') return { text: p.text || '' }; + if (p.type === 'image_url' && p.image_url?.url) { + const match = p.image_url.url.match(/^data:(image\/\w+);base64,(.*)$/); + if (match) return { inline_data: { mime_type: match[1], data: match[2] } }; + } + return null; + }).filter(Boolean); + if (!parts.length) return acc; + if (acc.length > 0 && acc.at(-1).role === role) acc.at(-1).parts.push(...parts); + else acc.push({ role, parts }); + return acc; + }, []); + if (contents.at(-1)?.role !== 'user') contents.pop(); + return contents; + } +} \ No newline at end of file