Feat: Implement streaming of thoughts for Google & OR

This commit is contained in:
2025-09-12 20:22:50 -07:00
parent 046dde720a
commit fb6a5daa97

View File

@@ -265,6 +265,7 @@ export class MyDurableObject {
async streamGoogle({ apiKey, body }) { async streamGoogle({ apiKey, body }) {
const generationConfig = Object.entries({ temperature: body.temperature, topP: body.top_p, maxOutputTokens: body.max_tokens }).reduce((acc, [k, v]) => (Number.isFinite(+v) && +v >= 0 ? { ...acc, [k]: +v } : acc), {}); const generationConfig = Object.entries({ temperature: body.temperature, topP: body.top_p, maxOutputTokens: body.max_tokens }).reduce((acc, [k, v]) => (Number.isFinite(+v) && +v >= 0 ? { ...acc, [k]: +v } : acc), {});
if (body.reasoning?.exclude !== true) generationConfig.thinkingConfig = { includeThoughts: true };
const model = (body.model ?? '').replace(/:online$/, ''); const model = (body.model ?? '').replace(/:online$/, '');
const payload = { contents: this.mapToGoogleContents(body.messages), ...(Object.keys(generationConfig).length && { generationConfig }), ...((body.model ?? '').endsWith(':online') && { tools: [{ google_search: {} }] }) }; const payload = { contents: this.mapToGoogleContents(body.messages), ...(Object.keys(generationConfig).length && { generationConfig }), ...((body.model ?? '').endsWith(':online') && { tools: [{ google_search: {} }] }) };
const resp = await fetch(`https://generativelanguage.googleapis.com/v1beta/models/${model}:streamGenerateContent?alt=sse`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'x-goog-api-key': apiKey }, body: JSON.stringify(payload), signal: this.controller.signal }); const resp = await fetch(`https://generativelanguage.googleapis.com/v1beta/models/${model}:streamGenerateContent?alt=sse`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'x-goog-api-key': apiKey }, body: JSON.stringify(payload), signal: this.controller.signal });
@@ -278,7 +279,7 @@ export class MyDurableObject {
buffer += decoder.decode(value, { stream: true }); buffer += decoder.decode(value, { stream: true });
for (const line of buffer.split('\n')) { for (const line of buffer.split('\n')) {
if (!line.startsWith('data: ')) continue; if (!line.startsWith('data: ')) continue;
try { this.queueDelta(JSON.parse(line.substring(6))?.candidates?.[0]?.content?.parts?.[0]?.text ?? ''); } catch {} try { JSON.parse(line.substring(6))?.candidates?.[0]?.content?.parts?.forEach(p => p.text && this.queueDelta(p.text)); } catch {}
} }
buffer = buffer.slice(buffer.lastIndexOf('\n') + 1); buffer = buffer.slice(buffer.lastIndexOf('\n') + 1);
} }
@@ -289,8 +290,9 @@ export class MyDurableObject {
const stream = await client.chat.completions.create({ ...body, stream: true }, { signal: this.controller.signal }); const stream = await client.chat.completions.create({ ...body, stream: true }, { signal: this.controller.signal });
for await (const chunk of stream) { for await (const chunk of stream) {
if (this.phase !== 'running') break; if (this.phase !== 'running') break;
const delta = chunk?.choices?.[0]?.delta?.content ?? ''; const delta = chunk?.choices?.[0]?.delta;
if (delta) this.queueDelta(delta); if (delta?.reasoning) this.queueDelta(delta.reasoning);
if (delta?.content) this.queueDelta(delta.content);
} }
} }