Feat: Support image output in proxy streaming

This commit is contained in:
2025-12-29 04:51:47 -08:00
parent 55db6032d4
commit d8631f127e

View File

@@ -68,6 +68,7 @@ export class MyDurableObject {
this.controller = null;
this.oaStream = null;
this.pending = '';
this.pendingImages = [];
this.flushTimer = null;
this.lastSavedAt = 0;
this.lastFlushedAt = 0;
@@ -118,6 +119,7 @@ export class MyDurableObject {
this.error = snap.error || null;
this.messages = Array.isArray(snap.messages) ? snap.messages : [];
this.pending = '';
this.pendingImages = [];
if (this.phase === 'running') {
this.phase = 'evicted';
@@ -135,26 +137,33 @@ export class MyDurableObject {
}
replay(ws, after) {
this.buffer.forEach(it => { if (it.seq > after) this.send(ws, { type: 'delta', seq: it.seq, text: it.text }); });
this.buffer.forEach(it => { if (it.seq > after) this.send(ws, { type: 'delta', seq: it.seq, text: it.text, images: it.images }); });
if (this.phase === 'done') this.send(ws, { type: 'done' });
else if (['error', 'evicted'].includes(this.phase)) this.send(ws, { type: 'err', message: this.error || 'The run was terminated unexpectedly.' });
}
flush(force = false) {
if (this.flushTimer) { clearTimeout(this.flushTimer); this.flushTimer = null; }
if (this.pending) {
this.buffer.push({ seq: ++this.seq, text: this.pending });
this.bcast({ type: 'delta', seq: this.seq, text: this.pending });
if (this.pending || this.pendingImages.length > 0) {
const item = { seq: ++this.seq, text: this.pending };
if (this.pendingImages.length > 0) item.images = [...this.pendingImages];
this.buffer.push(item);
this.bcast({ type: 'delta', seq: this.seq, text: this.pending, images: item.images });
this.pending = '';
this.pendingImages = [];
this.lastFlushedAt = Date.now();
}
if (force) this.saveSnapshot();
}
queueDelta(text) {
if (!text) return;
this.pending += text;
if (this.pending.length >= BATCH_BYTES) this.flush(false);
queueDelta(text, images) {
if (!text && (!images || !images.length)) return;
if (text) this.pending += text;
if (images) this.pendingImages.push(...images);
if (this.pending.length >= BATCH_BYTES || this.pendingImages.length > 0) this.flush(false);
else if (!this.flushTimer) this.flushTimer = setTimeout(() => this.flush(false), BATCH_MS);
}
@@ -173,9 +182,10 @@ export class MyDurableObject {
if (req.method === 'GET') {
await this.autopsy();
const text = this.buffer.map(it => it.text).join('') + this.pending;
const images = [...this.buffer.flatMap(it => it.images || []), ...this.pendingImages];
const isTerminal = ['done', 'error', 'evicted'].includes(this.phase);
const isError = ['error', 'evicted'].includes(this.phase);
const payload = { rid: this.rid, seq: this.seq, phase: this.phase, done: isTerminal, error: isError ? (this.error || 'The run was terminated unexpectedly.') : null, text };
const payload = { rid: this.rid, seq: this.seq, phase: this.phase, done: isTerminal, error: isError ? (this.error || 'The run was terminated unexpectedly.') : null, text, images };
return this.corsJSON(payload);
}
return this.corsJSON({ error: 'not allowed' }, 405);
@@ -326,6 +336,8 @@ export class MyDurableObject {
for await (const chunk of stream) {
if (this.phase !== 'running') break;
const delta = chunk?.choices?.[0]?.delta;
const images = delta?.images;
if (delta?.reasoning && body.reasoning?.exclude !== true) {
this.queueDelta(delta.reasoning);
hasReasoning = true;
@@ -335,6 +347,9 @@ export class MyDurableObject {
this.queueDelta(delta.content);
hasContent = true;
}
if (images) {
this.queueDelta('', images);
}
}
}
@@ -434,5 +449,3 @@ export class MyDurableObject {
return contents;
}
}