diff --git a/index.js b/index.js index 527b61a..ecdc348 100644 --- a/index.js +++ b/index.js @@ -66,7 +66,7 @@ export class MyDurableObject { this.error = null; this.controller = null; this.oaStream = null; - this.pending = ''; + this.pending = { text: '', images: [] }; this.flushTimer = null; this.lastSavedAt = 0; this.lastFlushedAt = 0; @@ -85,7 +85,13 @@ export class MyDurableObject { getConversationText() { const prompt = (this.messages || []).map(m => `## ${m.role}\n\n${this.extractTextFromMessage(m)}`).join('\n\n---\n\n'); - const response = this.buffer.map(it => it.text).join(''); + const response = this.buffer.map(it => { + let content = it.text || ''; + if (it.images && it.images.length > 0) { + content += '\n' + it.images.map(img => `![generated image](${String(img?.image_url?.url || '').substring(0, 60)}...)`).join('\n'); + } + return content; + }).join(''); if (!prompt && !response) return ''; return `${prompt}\n\n---\n\n## assistant\n\n${response}`; } @@ -124,7 +130,7 @@ export class MyDurableObject { this.phase = snap.phase || 'done'; this.error = snap.error || null; this.messages = Array.isArray(snap.messages) ? snap.messages : []; - this.pending = ''; + this.pending = { text: '', images: [] }; if (this.phase === 'running') { this.phase = 'evicted'; @@ -142,27 +148,35 @@ export class MyDurableObject { } replay(ws, after) { - this.buffer.forEach(it => { if (it.seq > after) this.send(ws, { type: 'delta', seq: it.seq, text: it.text }); }); + this.buffer.forEach(it => { if (it.seq > after) this.send(ws, { type: 'delta', seq: it.seq, text: it.text, images: it.images }); }); if (this.phase === 'done') this.send(ws, { type: 'done' }); else if (['error', 'evicted'].includes(this.phase)) this.send(ws, { type: 'err', message: this.error || 'The run was terminated unexpectedly.' }); } flush(force = false) { if (this.flushTimer) { clearTimeout(this.flushTimer); this.flushTimer = null; } - if (this.pending) { - this.buffer.push({ seq: ++this.seq, text: this.pending }); - this.bcast({ type: 'delta', seq: this.seq, text: this.pending }); - this.pending = ''; + if (this.pending.text || (this.pending.images && this.pending.images.length > 0)) { + this.buffer.push({ seq: ++this.seq, ...this.pending }); + this.bcast({ type: 'delta', seq: this.seq, ...this.pending }); + this.pending = { text: '', images: [] }; this.lastFlushedAt = Date.now(); } if (force) this.saveSnapshot(); } - queueDelta(text) { - if (!text) return; - this.pending += text; - if (this.pending.length >= BATCH_BYTES) this.flush(false); - else if (!this.flushTimer) this.flushTimer = setTimeout(() => this.flush(false), BATCH_MS); + queueUpdate({ text, images }) { + const hasImages = images && images.length > 0; + if (text) this.pending.text += text; + if (hasImages) { + if (!this.pending.images) this.pending.images = []; + this.pending.images.push(...images); + } + + if (hasImages || (this.pending.text && this.pending.text.length >= BATCH_BYTES)) { + this.flush(false); + } else if (!this.flushTimer && this.pending.text) { + this.flushTimer = setTimeout(() => this.flush(false), BATCH_MS); + } } async fetch(req) { @@ -179,7 +193,7 @@ export class MyDurableObject { if (req.method === 'GET') { await this.autopsy(); - const text = this.buffer.map(it => it.text).join('') + this.pending; + const text = this.buffer.map(it => it.text).join('') + (this.pending.text || ''); const isTerminal = ['done', 'error', 'evicted'].includes(this.phase); const isError = ['error', 'evicted'].includes(this.phase); const payload = { rid: this.rid, seq: this.seq, phase: this.phase, done: isTerminal, error: isError ? (this.error || 'The run was terminated unexpectedly.') : null, text }; @@ -242,7 +256,7 @@ export class MyDurableObject { try { for await (const event of this.oaStream) { if (this.phase !== 'running') break; - if (event.type.endsWith('.delta') && event.delta) this.queueDelta(event.delta); + if (event.type.endsWith('.delta') && event.delta) this.queueUpdate({ text: event.delta }); } } finally { try { this.oaStream?.controller?.abort(); } catch {} @@ -276,7 +290,7 @@ export class MyDurableObject { if (body.reasoning?.enabled) payload.extended_thinking = { enabled: true, ...(body.reasoning.budget && { max_thinking_tokens: body.reasoning.budget }) }; const stream = client.messages.stream(payload); - stream.on('text', text => { if (this.phase === 'running') this.queueDelta(text); }); + stream.on('text', text => { if (this.phase === 'running') this.queueUpdate({ text }); }); await stream.finalMessage(); } @@ -311,12 +325,12 @@ export class MyDurableObject { try { JSON.parse(line.substring(6))?.candidates?.[0]?.content?.parts?.forEach(p => { if (p.thought?.thought) { - this.queueDelta(p.thought.thought); + this.queueUpdate({ text: p.thought.thought }); hasReasoning = true; } if (p.text) { - if (hasReasoning && !hasContent) this.queueDelta('\n'); - this.queueDelta(p.text); + if (hasReasoning && !hasContent) this.queueUpdate({ text: '\n' }); + this.queueUpdate({ text: p.text }); hasContent = true; } }); @@ -333,13 +347,26 @@ export class MyDurableObject { for await (const chunk of stream) { if (this.phase !== 'running') break; const delta = chunk?.choices?.[0]?.delta; - if (delta?.reasoning && body.reasoning?.exclude !== true) { - this.queueDelta(delta.reasoning); + if (!delta) continue; + + if (delta.reasoning && body.reasoning?.exclude !== true) { + this.queueUpdate({ text: delta.reasoning }); hasReasoning = true; } - if (delta?.content) { - if (hasReasoning && !hasContent) this.queueDelta('\n'); - this.queueDelta(delta.content); + + const hasNewContent = delta.content || (delta.images && delta.images.length > 0); + if (hasNewContent) { + const update = {}; + if (hasReasoning && !hasContent) { + update.text = '\n'; + } + if (delta.content) { + update.text = (update.text || '') + delta.content; + } + if (delta.images) { + update.images = delta.images; + } + this.queueUpdate(update); hasContent = true; } } @@ -407,7 +434,7 @@ export class MyDurableObject { mapContentPartToResponses(part) { const type = part?.type || 'text'; if (['image_url', 'input_image'].includes(type)) return (part?.image_url?.url || part?.image_url) ? { type: 'input_image', image_url: String(part?.image_url?.url || part?.image_url) } : null; - if (['text', 'input_text'].includes(type)) return { type: 'input_text', text: String(type === 'text' ? (part.text ?? part.content ?? '') : (part.text ?? '')) }; + if (['text', 'input_text'].includes(type)) return { type: 'input_text', text: String(type === 'text' ? (p.text ?? p.content ?? '') : (part.text ?? '')) }; return { type: 'input_text', text: `[${type}:${part?.file?.filename || 'file'}]` }; } @@ -441,4 +468,3 @@ export class MyDurableObject { return contents; } } -