Feat: Add image generation streaming support to ORP

This commit is contained in:
2025-11-15 19:27:57 -08:00
parent 5f437b7bcf
commit f385ab34c4

View File

@@ -66,7 +66,7 @@ export class MyDurableObject {
this.error = null; this.error = null;
this.controller = null; this.controller = null;
this.oaStream = null; this.oaStream = null;
this.pending = ''; this.pending = { text: '', images: [] };
this.flushTimer = null; this.flushTimer = null;
this.lastSavedAt = 0; this.lastSavedAt = 0;
this.lastFlushedAt = 0; this.lastFlushedAt = 0;
@@ -85,7 +85,13 @@ export class MyDurableObject {
getConversationText() { getConversationText() {
const prompt = (this.messages || []).map(m => `## ${m.role}\n\n${this.extractTextFromMessage(m)}`).join('\n\n---\n\n'); const prompt = (this.messages || []).map(m => `## ${m.role}\n\n${this.extractTextFromMessage(m)}`).join('\n\n---\n\n');
const response = this.buffer.map(it => it.text).join(''); const response = this.buffer.map(it => {
let content = it.text || '';
if (it.images && it.images.length > 0) {
content += '\n' + it.images.map(img => `![generated image](${String(img?.image_url?.url || '').substring(0, 60)}...)`).join('\n');
}
return content;
}).join('');
if (!prompt && !response) return ''; if (!prompt && !response) return '';
return `${prompt}\n\n---\n\n## assistant\n\n${response}`; return `${prompt}\n\n---\n\n## assistant\n\n${response}`;
} }
@@ -124,7 +130,7 @@ export class MyDurableObject {
this.phase = snap.phase || 'done'; this.phase = snap.phase || 'done';
this.error = snap.error || null; this.error = snap.error || null;
this.messages = Array.isArray(snap.messages) ? snap.messages : []; this.messages = Array.isArray(snap.messages) ? snap.messages : [];
this.pending = ''; this.pending = { text: '', images: [] };
if (this.phase === 'running') { if (this.phase === 'running') {
this.phase = 'evicted'; this.phase = 'evicted';
@@ -142,27 +148,35 @@ export class MyDurableObject {
} }
replay(ws, after) { replay(ws, after) {
this.buffer.forEach(it => { if (it.seq > after) this.send(ws, { type: 'delta', seq: it.seq, text: it.text }); }); this.buffer.forEach(it => { if (it.seq > after) this.send(ws, { type: 'delta', seq: it.seq, text: it.text, images: it.images }); });
if (this.phase === 'done') this.send(ws, { type: 'done' }); if (this.phase === 'done') this.send(ws, { type: 'done' });
else if (['error', 'evicted'].includes(this.phase)) this.send(ws, { type: 'err', message: this.error || 'The run was terminated unexpectedly.' }); else if (['error', 'evicted'].includes(this.phase)) this.send(ws, { type: 'err', message: this.error || 'The run was terminated unexpectedly.' });
} }
flush(force = false) { flush(force = false) {
if (this.flushTimer) { clearTimeout(this.flushTimer); this.flushTimer = null; } if (this.flushTimer) { clearTimeout(this.flushTimer); this.flushTimer = null; }
if (this.pending) { if (this.pending.text || (this.pending.images && this.pending.images.length > 0)) {
this.buffer.push({ seq: ++this.seq, text: this.pending }); this.buffer.push({ seq: ++this.seq, ...this.pending });
this.bcast({ type: 'delta', seq: this.seq, text: this.pending }); this.bcast({ type: 'delta', seq: this.seq, ...this.pending });
this.pending = ''; this.pending = { text: '', images: [] };
this.lastFlushedAt = Date.now(); this.lastFlushedAt = Date.now();
} }
if (force) this.saveSnapshot(); if (force) this.saveSnapshot();
} }
queueDelta(text) { queueUpdate({ text, images }) {
if (!text) return; const hasImages = images && images.length > 0;
this.pending += text; if (text) this.pending.text += text;
if (this.pending.length >= BATCH_BYTES) this.flush(false); if (hasImages) {
else if (!this.flushTimer) this.flushTimer = setTimeout(() => this.flush(false), BATCH_MS); if (!this.pending.images) this.pending.images = [];
this.pending.images.push(...images);
}
if (hasImages || (this.pending.text && this.pending.text.length >= BATCH_BYTES)) {
this.flush(false);
} else if (!this.flushTimer && this.pending.text) {
this.flushTimer = setTimeout(() => this.flush(false), BATCH_MS);
}
} }
async fetch(req) { async fetch(req) {
@@ -179,7 +193,7 @@ export class MyDurableObject {
if (req.method === 'GET') { if (req.method === 'GET') {
await this.autopsy(); await this.autopsy();
const text = this.buffer.map(it => it.text).join('') + this.pending; const text = this.buffer.map(it => it.text).join('') + (this.pending.text || '');
const isTerminal = ['done', 'error', 'evicted'].includes(this.phase); const isTerminal = ['done', 'error', 'evicted'].includes(this.phase);
const isError = ['error', 'evicted'].includes(this.phase); const isError = ['error', 'evicted'].includes(this.phase);
const payload = { rid: this.rid, seq: this.seq, phase: this.phase, done: isTerminal, error: isError ? (this.error || 'The run was terminated unexpectedly.') : null, text }; const payload = { rid: this.rid, seq: this.seq, phase: this.phase, done: isTerminal, error: isError ? (this.error || 'The run was terminated unexpectedly.') : null, text };
@@ -242,7 +256,7 @@ export class MyDurableObject {
try { try {
for await (const event of this.oaStream) { for await (const event of this.oaStream) {
if (this.phase !== 'running') break; if (this.phase !== 'running') break;
if (event.type.endsWith('.delta') && event.delta) this.queueDelta(event.delta); if (event.type.endsWith('.delta') && event.delta) this.queueUpdate({ text: event.delta });
} }
} finally { } finally {
try { this.oaStream?.controller?.abort(); } catch {} try { this.oaStream?.controller?.abort(); } catch {}
@@ -276,7 +290,7 @@ export class MyDurableObject {
if (body.reasoning?.enabled) payload.extended_thinking = { enabled: true, ...(body.reasoning.budget && { max_thinking_tokens: body.reasoning.budget }) }; if (body.reasoning?.enabled) payload.extended_thinking = { enabled: true, ...(body.reasoning.budget && { max_thinking_tokens: body.reasoning.budget }) };
const stream = client.messages.stream(payload); const stream = client.messages.stream(payload);
stream.on('text', text => { if (this.phase === 'running') this.queueDelta(text); }); stream.on('text', text => { if (this.phase === 'running') this.queueUpdate({ text }); });
await stream.finalMessage(); await stream.finalMessage();
} }
@@ -311,12 +325,12 @@ export class MyDurableObject {
try { try {
JSON.parse(line.substring(6))?.candidates?.[0]?.content?.parts?.forEach(p => { JSON.parse(line.substring(6))?.candidates?.[0]?.content?.parts?.forEach(p => {
if (p.thought?.thought) { if (p.thought?.thought) {
this.queueDelta(p.thought.thought); this.queueUpdate({ text: p.thought.thought });
hasReasoning = true; hasReasoning = true;
} }
if (p.text) { if (p.text) {
if (hasReasoning && !hasContent) this.queueDelta('\n'); if (hasReasoning && !hasContent) this.queueUpdate({ text: '\n' });
this.queueDelta(p.text); this.queueUpdate({ text: p.text });
hasContent = true; hasContent = true;
} }
}); });
@@ -333,13 +347,26 @@ export class MyDurableObject {
for await (const chunk of stream) { for await (const chunk of stream) {
if (this.phase !== 'running') break; if (this.phase !== 'running') break;
const delta = chunk?.choices?.[0]?.delta; const delta = chunk?.choices?.[0]?.delta;
if (delta?.reasoning && body.reasoning?.exclude !== true) { if (!delta) continue;
this.queueDelta(delta.reasoning);
if (delta.reasoning && body.reasoning?.exclude !== true) {
this.queueUpdate({ text: delta.reasoning });
hasReasoning = true; hasReasoning = true;
} }
if (delta?.content) {
if (hasReasoning && !hasContent) this.queueDelta('\n'); const hasNewContent = delta.content || (delta.images && delta.images.length > 0);
this.queueDelta(delta.content); if (hasNewContent) {
const update = {};
if (hasReasoning && !hasContent) {
update.text = '\n';
}
if (delta.content) {
update.text = (update.text || '') + delta.content;
}
if (delta.images) {
update.images = delta.images;
}
this.queueUpdate(update);
hasContent = true; hasContent = true;
} }
} }
@@ -407,7 +434,7 @@ export class MyDurableObject {
mapContentPartToResponses(part) { mapContentPartToResponses(part) {
const type = part?.type || 'text'; const type = part?.type || 'text';
if (['image_url', 'input_image'].includes(type)) return (part?.image_url?.url || part?.image_url) ? { type: 'input_image', image_url: String(part?.image_url?.url || part?.image_url) } : null; if (['image_url', 'input_image'].includes(type)) return (part?.image_url?.url || part?.image_url) ? { type: 'input_image', image_url: String(part?.image_url?.url || part?.image_url) } : null;
if (['text', 'input_text'].includes(type)) return { type: 'input_text', text: String(type === 'text' ? (part.text ?? part.content ?? '') : (part.text ?? '')) }; if (['text', 'input_text'].includes(type)) return { type: 'input_text', text: String(type === 'text' ? (p.text ?? p.content ?? '') : (part.text ?? '')) };
return { type: 'input_text', text: `[${type}:${part?.file?.filename || 'file'}]` }; return { type: 'input_text', text: `[${type}:${part?.file?.filename || 'file'}]` };
} }
@@ -441,4 +468,3 @@ export class MyDurableObject {
return contents; return contents;
} }
} }