mirror of
https://github.com/sune-org/ORP.git
synced 2026-01-13 16:17:59 +00:00
Feat: Separate reasoning and content streams
This commit is contained in:
43
index.js
43
index.js
@@ -66,7 +66,8 @@ export class MyDurableObject {
|
|||||||
this.error = null;
|
this.error = null;
|
||||||
this.controller = null;
|
this.controller = null;
|
||||||
this.oaStream = null;
|
this.oaStream = null;
|
||||||
this.pending = '';
|
this.pendingContent = '';
|
||||||
|
this.pendingReasoning = '';
|
||||||
this.flushTimer = null;
|
this.flushTimer = null;
|
||||||
this.lastSavedAt = 0;
|
this.lastSavedAt = 0;
|
||||||
this.lastFlushedAt = 0;
|
this.lastFlushedAt = 0;
|
||||||
@@ -106,7 +107,8 @@ export class MyDurableObject {
|
|||||||
this.age = snap.age || 0;
|
this.age = snap.age || 0;
|
||||||
this.phase = snap.phase || 'done';
|
this.phase = snap.phase || 'done';
|
||||||
this.error = snap.error || null;
|
this.error = snap.error || null;
|
||||||
this.pending = '';
|
this.pendingContent = '';
|
||||||
|
this.pendingReasoning = '';
|
||||||
|
|
||||||
if (this.phase === 'running') {
|
if (this.phase === 'running') {
|
||||||
this.phase = 'evicted';
|
this.phase = 'evicted';
|
||||||
@@ -124,26 +126,29 @@ export class MyDurableObject {
|
|||||||
}
|
}
|
||||||
|
|
||||||
replay(ws, after) {
|
replay(ws, after) {
|
||||||
this.buffer.forEach(it => { if (it.seq > after) this.send(ws, { type: 'delta', seq: it.seq, text: it.text }); });
|
this.buffer.forEach(it => { if (it.seq > after) this.send(ws, { type: 'delta', seq: it.seq, content: it.text, reasoning: '' }); });
|
||||||
if (this.phase === 'done') this.send(ws, { type: 'done' });
|
if (this.phase === 'done') this.send(ws, { type: 'done' });
|
||||||
else if (['error', 'evicted'].includes(this.phase)) this.send(ws, { type: 'err', message: this.error || 'The run was terminated unexpectedly.' });
|
else if (['error', 'evicted'].includes(this.phase)) this.send(ws, { type: 'err', message: this.error || 'The run was terminated unexpectedly.' });
|
||||||
}
|
}
|
||||||
|
|
||||||
flush(force = false) {
|
flush(force = false) {
|
||||||
if (this.flushTimer) { clearTimeout(this.flushTimer); this.flushTimer = null; }
|
if (this.flushTimer) { clearTimeout(this.flushTimer); this.flushTimer = null; }
|
||||||
if (this.pending) {
|
if (this.pendingContent || this.pendingReasoning) {
|
||||||
this.buffer.push({ seq: ++this.seq, text: this.pending });
|
this.buffer.push({ seq: ++this.seq, text: this.pendingContent });
|
||||||
this.bcast({ type: 'delta', seq: this.seq, text: this.pending });
|
this.bcast({ type: 'delta', seq: this.seq, content: this.pendingContent, reasoning: this.pendingReasoning });
|
||||||
this.pending = '';
|
this.pendingContent = '';
|
||||||
|
this.pendingReasoning = '';
|
||||||
this.lastFlushedAt = Date.now();
|
this.lastFlushedAt = Date.now();
|
||||||
}
|
}
|
||||||
if (force) this.saveSnapshot();
|
if (force) this.saveSnapshot();
|
||||||
}
|
}
|
||||||
|
|
||||||
queueDelta(text) {
|
queueDelta(text, type = 'content') {
|
||||||
if (!text) return;
|
if (!text) return;
|
||||||
this.pending += text;
|
if (type === 'reasoning') this.pendingReasoning += text;
|
||||||
if (this.pending.length >= BATCH_BYTES) this.flush(false);
|
else this.pendingContent += text;
|
||||||
|
const totalLength = this.pendingContent.length + this.pendingReasoning.length;
|
||||||
|
if (totalLength >= BATCH_BYTES) this.flush(false);
|
||||||
else if (!this.flushTimer) this.flushTimer = setTimeout(() => this.flush(false), BATCH_MS);
|
else if (!this.flushTimer) this.flushTimer = setTimeout(() => this.flush(false), BATCH_MS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -161,7 +166,7 @@ export class MyDurableObject {
|
|||||||
|
|
||||||
if (req.method === 'GET') {
|
if (req.method === 'GET') {
|
||||||
await this.autopsy();
|
await this.autopsy();
|
||||||
const text = this.buffer.map(it => it.text).join('') + this.pending;
|
const text = this.buffer.map(it => it.text).join('') + this.pendingContent;
|
||||||
const isTerminal = ['done', 'error', 'evicted'].includes(this.phase);
|
const isTerminal = ['done', 'error', 'evicted'].includes(this.phase);
|
||||||
const isError = ['error', 'evicted'].includes(this.phase);
|
const isError = ['error', 'evicted'].includes(this.phase);
|
||||||
const payload = { rid: this.rid, seq: this.seq, phase: this.phase, done: isTerminal, error: isError ? (this.error || 'The run was terminated unexpectedly.') : null, text };
|
const payload = { rid: this.rid, seq: this.seq, phase: this.phase, done: isTerminal, error: isError ? (this.error || 'The run was terminated unexpectedly.') : null, text };
|
||||||
@@ -223,7 +228,7 @@ export class MyDurableObject {
|
|||||||
try {
|
try {
|
||||||
for await (const event of this.oaStream) {
|
for await (const event of this.oaStream) {
|
||||||
if (this.phase !== 'running') break;
|
if (this.phase !== 'running') break;
|
||||||
if (event.type.endsWith('.delta') && event.delta) this.queueDelta(event.delta);
|
if (event.type.endsWith('.delta') && event.delta) this.queueDelta(event.delta, 'content');
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
try { this.oaStream?.controller?.abort(); } catch {}
|
try { this.oaStream?.controller?.abort(); } catch {}
|
||||||
@@ -257,7 +262,7 @@ export class MyDurableObject {
|
|||||||
if (body.reasoning?.enabled) payload.extended_thinking = { enabled: true, ...(body.reasoning.budget && { max_thinking_tokens: body.reasoning.budget }) };
|
if (body.reasoning?.enabled) payload.extended_thinking = { enabled: true, ...(body.reasoning.budget && { max_thinking_tokens: body.reasoning.budget }) };
|
||||||
|
|
||||||
const stream = client.messages.stream(payload);
|
const stream = client.messages.stream(payload);
|
||||||
stream.on('text', text => { if (this.phase === 'running') this.queueDelta(text); });
|
stream.on('text', text => { if (this.phase === 'running') this.queueDelta(text, 'content'); });
|
||||||
await stream.finalMessage();
|
await stream.finalMessage();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -292,12 +297,12 @@ export class MyDurableObject {
|
|||||||
try {
|
try {
|
||||||
JSON.parse(line.substring(6))?.candidates?.[0]?.content?.parts?.forEach(p => {
|
JSON.parse(line.substring(6))?.candidates?.[0]?.content?.parts?.forEach(p => {
|
||||||
if (p.thought?.thought) {
|
if (p.thought?.thought) {
|
||||||
this.queueDelta(p.thought.thought);
|
this.queueDelta(p.thought.thought, 'reasoning');
|
||||||
hasReasoning = true;
|
hasReasoning = true;
|
||||||
}
|
}
|
||||||
if (p.text) {
|
if (p.text) {
|
||||||
if (hasReasoning && !hasContent) this.queueDelta('\n');
|
if (hasReasoning && !hasContent) this.queueDelta('\n', 'content');
|
||||||
this.queueDelta(p.text);
|
this.queueDelta(p.text, 'content');
|
||||||
hasContent = true;
|
hasContent = true;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -315,12 +320,12 @@ export class MyDurableObject {
|
|||||||
if (this.phase !== 'running') break;
|
if (this.phase !== 'running') break;
|
||||||
const delta = chunk?.choices?.[0]?.delta;
|
const delta = chunk?.choices?.[0]?.delta;
|
||||||
if (delta?.reasoning && body.reasoning?.exclude !== true) {
|
if (delta?.reasoning && body.reasoning?.exclude !== true) {
|
||||||
this.queueDelta(delta.reasoning);
|
this.queueDelta(delta.reasoning, 'reasoning');
|
||||||
hasReasoning = true;
|
hasReasoning = true;
|
||||||
}
|
}
|
||||||
if (delta?.content) {
|
if (delta?.content) {
|
||||||
if (hasReasoning && !hasContent) this.queueDelta('\n');
|
if (hasReasoning && !hasContent) this.queueDelta('\n', 'content');
|
||||||
this.queueDelta(delta.content);
|
this.queueDelta(delta.content, 'content');
|
||||||
hasContent = true;
|
hasContent = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user