From d8631f127ea9a590532c7e30ec0bb7ffa53be10e Mon Sep 17 00:00:00 2001 From: multipleof4 Date: Mon, 29 Dec 2025 04:51:47 -0800 Subject: [PATCH] Feat: Support image output in proxy streaming --- index.js | 35 ++++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/index.js b/index.js index 1e41e2a..3035dab 100644 --- a/index.js +++ b/index.js @@ -68,6 +68,7 @@ export class MyDurableObject { this.controller = null; this.oaStream = null; this.pending = ''; + this.pendingImages = []; this.flushTimer = null; this.lastSavedAt = 0; this.lastFlushedAt = 0; @@ -118,6 +119,7 @@ export class MyDurableObject { this.error = snap.error || null; this.messages = Array.isArray(snap.messages) ? snap.messages : []; this.pending = ''; + this.pendingImages = []; if (this.phase === 'running') { this.phase = 'evicted'; @@ -135,26 +137,33 @@ export class MyDurableObject { } replay(ws, after) { - this.buffer.forEach(it => { if (it.seq > after) this.send(ws, { type: 'delta', seq: it.seq, text: it.text }); }); + this.buffer.forEach(it => { if (it.seq > after) this.send(ws, { type: 'delta', seq: it.seq, text: it.text, images: it.images }); }); if (this.phase === 'done') this.send(ws, { type: 'done' }); else if (['error', 'evicted'].includes(this.phase)) this.send(ws, { type: 'err', message: this.error || 'The run was terminated unexpectedly.' }); } flush(force = false) { if (this.flushTimer) { clearTimeout(this.flushTimer); this.flushTimer = null; } - if (this.pending) { - this.buffer.push({ seq: ++this.seq, text: this.pending }); - this.bcast({ type: 'delta', seq: this.seq, text: this.pending }); + if (this.pending || this.pendingImages.length > 0) { + const item = { seq: ++this.seq, text: this.pending }; + if (this.pendingImages.length > 0) item.images = [...this.pendingImages]; + + this.buffer.push(item); + this.bcast({ type: 'delta', seq: this.seq, text: this.pending, images: item.images }); + this.pending = ''; + this.pendingImages = []; this.lastFlushedAt = Date.now(); } if (force) this.saveSnapshot(); } - queueDelta(text) { - if (!text) return; - this.pending += text; - if (this.pending.length >= BATCH_BYTES) this.flush(false); + queueDelta(text, images) { + if (!text && (!images || !images.length)) return; + if (text) this.pending += text; + if (images) this.pendingImages.push(...images); + + if (this.pending.length >= BATCH_BYTES || this.pendingImages.length > 0) this.flush(false); else if (!this.flushTimer) this.flushTimer = setTimeout(() => this.flush(false), BATCH_MS); } @@ -173,9 +182,10 @@ export class MyDurableObject { if (req.method === 'GET') { await this.autopsy(); const text = this.buffer.map(it => it.text).join('') + this.pending; + const images = [...this.buffer.flatMap(it => it.images || []), ...this.pendingImages]; const isTerminal = ['done', 'error', 'evicted'].includes(this.phase); const isError = ['error', 'evicted'].includes(this.phase); - const payload = { rid: this.rid, seq: this.seq, phase: this.phase, done: isTerminal, error: isError ? (this.error || 'The run was terminated unexpectedly.') : null, text }; + const payload = { rid: this.rid, seq: this.seq, phase: this.phase, done: isTerminal, error: isError ? (this.error || 'The run was terminated unexpectedly.') : null, text, images }; return this.corsJSON(payload); } return this.corsJSON({ error: 'not allowed' }, 405); @@ -326,6 +336,8 @@ export class MyDurableObject { for await (const chunk of stream) { if (this.phase !== 'running') break; const delta = chunk?.choices?.[0]?.delta; + const images = delta?.images; + if (delta?.reasoning && body.reasoning?.exclude !== true) { this.queueDelta(delta.reasoning); hasReasoning = true; @@ -335,6 +347,9 @@ export class MyDurableObject { this.queueDelta(delta.content); hasContent = true; } + if (images) { + this.queueDelta('', images); + } } } @@ -434,5 +449,3 @@ export class MyDurableObject { return contents; } } - -