mirror of
https://github.com/sune-org/ORP.git
synced 2026-01-13 16:17:59 +00:00
Refactor: Use raw SSE fetch for OpenRouter parity
This commit is contained in:
94
index.js
94
index.js
@@ -1,6 +1,5 @@
|
|||||||
import OpenAI from 'openai';
|
import OpenAI from 'openai';
|
||||||
import Anthropic from '@anthropic-ai/sdk';
|
import Anthropic from '@anthropic-ai/sdk';
|
||||||
import { OpenRouter } from '@openrouter/sdk';
|
|
||||||
|
|
||||||
const TTL_MS = 20 * 60 * 1000;
|
const TTL_MS = 20 * 60 * 1000;
|
||||||
const BATCH_MS = 800;
|
const BATCH_MS = 800;
|
||||||
@@ -44,10 +43,7 @@ export default {
|
|||||||
|
|
||||||
export class MyDurableObject {
|
export class MyDurableObject {
|
||||||
constructor(state, env) {
|
constructor(state, env) {
|
||||||
this.state = state;
|
this.state = state; this.env = env; this.sockets = new Set(); this.reset();
|
||||||
this.env = env;
|
|
||||||
this.sockets = new Set();
|
|
||||||
this.reset();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
reset() {
|
reset() {
|
||||||
@@ -68,16 +64,13 @@ export class MyDurableObject {
|
|||||||
if (!this.env.NTFY_URL) return;
|
if (!this.env.NTFY_URL) return;
|
||||||
this.state.waitUntil(fetch(this.env.NTFY_URL, {
|
this.state.waitUntil(fetch(this.env.NTFY_URL, {
|
||||||
method: 'POST', body: msg, headers: { Title: 'Sune ORP', Priority: `${pri}`, Tags: tags.join(',') }
|
method: 'POST', body: msg, headers: { Title: 'Sune ORP', Priority: `${pri}`, Tags: tags.join(',') }
|
||||||
}).catch(e => console.error('ntfy failed:', e)));
|
}).catch(() => {}));
|
||||||
}
|
}
|
||||||
|
|
||||||
async autopsy() {
|
async autopsy() {
|
||||||
if (this.rid) return;
|
if (this.rid) return;
|
||||||
const snap = await this.state.storage.get('run').catch(() => null);
|
const snap = await this.state.storage.get('run').catch(() => null);
|
||||||
if (!snap || (Date.now() - (snap.savedAt || 0) >= TTL_MS)) {
|
if (!snap || (Date.now() - (snap.savedAt || 0) >= TTL_MS)) { if (snap) await this.state.storage.delete('run'); return; }
|
||||||
if (snap) await this.state.storage.delete('run').catch(() => {});
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
this.rid = snap.rid; this.buffer = snap.buffer || []; this.seq = +snap.seq || -1;
|
this.rid = snap.rid; this.buffer = snap.buffer || []; this.seq = +snap.seq || -1;
|
||||||
this.age = snap.age || 0; this.phase = snap.phase || 'done'; this.error = snap.error;
|
this.age = snap.age || 0; this.phase = snap.phase || 'done'; this.error = snap.error;
|
||||||
this.messages = snap.messages || []; this.pending = ''; this.pendingImages = [];
|
this.messages = snap.messages || []; this.pending = ''; this.pendingImages = [];
|
||||||
@@ -96,7 +89,7 @@ export class MyDurableObject {
|
|||||||
replay(ws, after) {
|
replay(ws, after) {
|
||||||
this.buffer.forEach(it => { if (it.seq > after) this.send(ws, { type: 'delta', seq: it.seq, text: it.text, images: it.images }); });
|
this.buffer.forEach(it => { if (it.seq > after) this.send(ws, { type: 'delta', seq: it.seq, text: it.text, images: it.images }); });
|
||||||
if (this.phase === 'done') this.send(ws, { type: 'done' });
|
if (this.phase === 'done') this.send(ws, { type: 'done' });
|
||||||
else if (['error', 'evicted'].includes(this.phase)) this.send(ws, { type: 'err', message: this.error || 'Terminated unexpectedly.' });
|
else if (['error', 'evicted'].includes(this.phase)) this.send(ws, { type: 'err', message: this.error || 'Terminated.' });
|
||||||
}
|
}
|
||||||
|
|
||||||
flush(force = false) {
|
flush(force = false) {
|
||||||
@@ -132,8 +125,7 @@ export class MyDurableObject {
|
|||||||
await this.autopsy();
|
await this.autopsy();
|
||||||
const text = this.buffer.map(it => it.text).join('') + this.pending;
|
const text = this.buffer.map(it => it.text).join('') + this.pending;
|
||||||
const images = [...this.buffer.flatMap(it => it.images || []), ...this.pendingImages];
|
const images = [...this.buffer.flatMap(it => it.images || []), ...this.pendingImages];
|
||||||
const isTerminal = ['done', 'error', 'evicted'].includes(this.phase);
|
return this.corsJSON({ rid: this.rid, seq: this.seq, phase: this.phase, done: ['done', 'error', 'evicted'].includes(this.phase), error: ['error', 'evicted'].includes(this.phase) ? (this.error || 'Terminated.') : null, text, images });
|
||||||
return this.corsJSON({ rid: this.rid, seq: this.seq, phase: this.phase, done: isTerminal, error: ['error', 'evicted'].includes(this.phase) ? (this.error || 'Terminated.') : null, text, images });
|
|
||||||
}
|
}
|
||||||
return this.corsJSON({ error: 'not allowed' }, 405);
|
return this.corsJSON({ error: 'not allowed' }, 405);
|
||||||
}
|
}
|
||||||
@@ -168,9 +160,7 @@ export class MyDurableObject {
|
|||||||
|
|
||||||
async streamOpenAI({ apiKey, body }) {
|
async streamOpenAI({ apiKey, body }) {
|
||||||
const client = new OpenAI({ apiKey });
|
const client = new OpenAI({ apiKey });
|
||||||
const params = { model: body.model, input: this.buildInputForResponses(body.messages || []), temperature: body.temperature, stream: true };
|
this.oaStream = await client.responses.stream({ model: body.model, input: this.buildInputForResponses(body.messages || []), temperature: body.temperature, stream: true });
|
||||||
if (body.reasoning?.effort) params.reasoning = { effort: body.reasoning.effort };
|
|
||||||
this.oaStream = await client.responses.stream(params);
|
|
||||||
for await (const event of this.oaStream) {
|
for await (const event of this.oaStream) {
|
||||||
if (this.phase !== 'running') break;
|
if (this.phase !== 'running') break;
|
||||||
if (event.type.endsWith('.delta') && event.delta) this.queueDelta(event.delta);
|
if (event.type.endsWith('.delta') && event.delta) this.queueDelta(event.delta);
|
||||||
@@ -178,10 +168,9 @@ export class MyDurableObject {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async streamClaude({ apiKey, body }) {
|
async streamClaude({ apiKey, body }) {
|
||||||
const client = new Anthropic({ apiKey });
|
const client = new Anthropic({ apiKey }), sys = body.messages.filter(m => m.role === 'system').map(m => this.extractTextFromMessage(m)).join('\n\n') || body.system;
|
||||||
const system = body.messages.filter(m => m.role === 'system').map(m => this.extractTextFromMessage(m)).join('\n\n') || body.system;
|
const stream = client.messages.stream({
|
||||||
const payload = {
|
model: body.model, max_tokens: body.max_tokens || 64000, system: sys || undefined,
|
||||||
model: body.model, max_tokens: body.max_tokens || 64000,
|
|
||||||
messages: body.messages.filter(m => m.role !== 'system').map(m => ({
|
messages: body.messages.filter(m => m.role !== 'system').map(m => ({
|
||||||
role: m.role, content: (Array.isArray(m.content) ? m.content : [{type:'text',text:String(m.content)}]).map(p => {
|
role: m.role, content: (Array.isArray(m.content) ? m.content : [{type:'text',text:String(m.content)}]).map(p => {
|
||||||
if (p.type === 'text') return { type: 'text', text: p.text };
|
if (p.type === 'text') return { type: 'text', text: p.text };
|
||||||
@@ -191,54 +180,52 @@ export class MyDurableObject {
|
|||||||
}
|
}
|
||||||
}).filter(Boolean)
|
}).filter(Boolean)
|
||||||
})).filter(m => m.content.length)
|
})).filter(m => m.content.length)
|
||||||
};
|
});
|
||||||
if (system) payload.system = system;
|
|
||||||
const stream = client.messages.stream(payload);
|
|
||||||
stream.on('text', text => { if (this.phase === 'running') this.queueDelta(text); });
|
stream.on('text', text => { if (this.phase === 'running') this.queueDelta(text); });
|
||||||
await stream.finalMessage();
|
await stream.finalMessage();
|
||||||
}
|
}
|
||||||
|
|
||||||
async streamGoogle({ apiKey, body }) {
|
async streamGoogle({ apiKey, body }) {
|
||||||
const model = (body.model ?? '').replace(/:online$/, '');
|
const resp = await fetch(`https://generativelanguage.googleapis.com/v1beta/models/${(body.model ?? '').replace(/:online$/, '')}:streamGenerateContent?alt=sse`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'x-goog-api-key': apiKey }, body: JSON.stringify({ contents: this.mapToGoogleContents(body.messages) }), signal: this.controller.signal });
|
||||||
const payload = { contents: this.mapToGoogleContents(body.messages) };
|
if (!resp.ok) throw new Error(`Google error: ${resp.status}`);
|
||||||
const resp = await fetch(`https://generativelanguage.googleapis.com/v1beta/models/${model}:streamGenerateContent?alt=sse`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'x-goog-api-key': apiKey }, body: JSON.stringify(payload), signal: this.controller.signal });
|
|
||||||
if (!resp.ok) throw new Error(`Google API error: ${resp.status}`);
|
|
||||||
const reader = resp.body.getReader(), decoder = new TextDecoder();
|
const reader = resp.body.getReader(), decoder = new TextDecoder();
|
||||||
let buffer = '';
|
let buffer = '';
|
||||||
while (this.phase === 'running') {
|
while (this.phase === 'running') {
|
||||||
const { done, value } = await reader.read();
|
const { done, value } = await reader.read(); if (done) break;
|
||||||
if (done) break;
|
|
||||||
buffer += decoder.decode(value, { stream: true });
|
buffer += decoder.decode(value, { stream: true });
|
||||||
for (const line of buffer.split('\n')) {
|
const lines = buffer.split('\n'); buffer = lines.pop();
|
||||||
|
for (const line of lines) {
|
||||||
if (!line.startsWith('data: ')) continue;
|
if (!line.startsWith('data: ')) continue;
|
||||||
try {
|
try { JSON.parse(line.substring(6))?.candidates?.[0]?.content?.parts?.forEach(p => { if (p.thought?.thought) this.queueDelta(p.thought.thought); if (p.text) this.queueDelta(p.text); }); } catch {}
|
||||||
JSON.parse(line.substring(6))?.candidates?.[0]?.content?.parts?.forEach(p => {
|
|
||||||
if (p.thought?.thought) this.queueDelta(p.thought.thought);
|
|
||||||
if (p.text) this.queueDelta(p.text);
|
|
||||||
});
|
|
||||||
} catch {}
|
|
||||||
}
|
}
|
||||||
buffer = buffer.slice(buffer.lastIndexOf('\n') + 1);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async streamOpenRouter({ apiKey, body }) {
|
async streamOpenRouter({ apiKey, body }) {
|
||||||
const client = new OpenRouter({ apiKey, defaultHeaders: { 'HTTP-Referer': 'https://sune.chat', 'X-Title': 'Sune' } });
|
const resp = await fetch("https://openrouter.ai/api/v1/chat/completions", {
|
||||||
this.queueDelta(`> [DEBUG] Requesting ${body.model} (modalities: ${JSON.stringify(body.modalities || 'default')})\n\n`);
|
method: 'POST', headers: { 'Authorization': `Bearer ${apiKey}`, 'Content-Type': 'application/json', 'HTTP-Referer': 'https://sune.chat', 'X-Title': 'Sune' },
|
||||||
try {
|
body: JSON.stringify(body), signal: this.controller.signal
|
||||||
const stream = await client.chat.send({ ...body, stream: true });
|
});
|
||||||
let hasReasoning = false, hasContent = false, collectedImages = [];
|
if (!resp.ok) throw new Error(`OR Error: ${resp.status} ${await resp.text()}`);
|
||||||
for await (const chunk of stream) {
|
const reader = resp.body.getReader(), decoder = new TextDecoder();
|
||||||
if (this.phase !== 'running') break;
|
let buf = '', hasR = false, hasC = false, imgC = 0;
|
||||||
const delta = chunk?.choices?.[0]?.delta;
|
while (this.phase === 'running') {
|
||||||
if (!delta) continue;
|
const { done, value } = await reader.read(); if (done) break;
|
||||||
if (delta.reasoning && body.reasoning?.exclude !== true) { this.queueDelta(delta.reasoning); hasReasoning = true; }
|
buf += decoder.decode(value, { stream: true });
|
||||||
if (delta.content) { if (hasReasoning && !hasContent) this.queueDelta('\n'); this.queueDelta(delta.content); hasContent = true; }
|
const lines = buf.split('\n'); buf = lines.pop();
|
||||||
if (Array.isArray(delta.images)) collectedImages.push(...delta.images);
|
for (const line of lines) {
|
||||||
|
const d = line.startsWith('data: ') ? line.slice(6).trim() : null;
|
||||||
|
if (!d || d === '[DONE]') continue;
|
||||||
|
try {
|
||||||
|
const j = JSON.parse(d), delta = j.choices?.[0]?.delta;
|
||||||
|
if (!delta) continue;
|
||||||
|
if (delta.reasoning && body.reasoning?.exclude !== true) { this.queueDelta(delta.reasoning); hasR = true; }
|
||||||
|
if (delta.content) { if (hasR && !hasC) this.queueDelta('\n'); this.queueDelta(delta.content); hasC = true; }
|
||||||
|
if (Array.isArray(delta.images)) { this.queueDelta('', delta.images); imgC += delta.images.length; }
|
||||||
|
} catch {}
|
||||||
}
|
}
|
||||||
if (collectedImages.length) this.queueDelta('', collectedImages);
|
}
|
||||||
else if (!hasContent) this.queueDelta(`> [DEBUG] Stream ended with no content/images. Check if model supports streaming for this modality.`);
|
if (!hasC && imgC === 0) this.queueDelta(`> [DEBUG] Stream finished. Content: ${hasC}, Images: ${imgC}. Raw buffer check recommended.`);
|
||||||
} catch (e) { this.queueDelta(`\n\n> [DEBUG] OR Error: ${e.message}`); throw e; }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
stop() {
|
stop() {
|
||||||
@@ -252,7 +239,7 @@ export class MyDurableObject {
|
|||||||
|
|
||||||
fail(message) {
|
fail(message) {
|
||||||
if (this.phase !== 'running') return;
|
if (this.phase !== 'running') return;
|
||||||
this.flush(true); this.phase = 'error'; this.error = String(message || 'stream_failed');
|
this.flush(true); this.phase = 'error'; this.error = String(message || 'failed');
|
||||||
try { this.controller?.abort(); } catch {}
|
try { this.controller?.abort(); } catch {}
|
||||||
try { this.oaStream?.controller?.abort(); } catch {}
|
try { this.oaStream?.controller?.abort(); } catch {}
|
||||||
this.saveSnapshot(); this.bcast({ type: 'err', message: this.error });
|
this.saveSnapshot(); this.bcast({ type: 'err', message: this.error });
|
||||||
@@ -271,3 +258,4 @@ export class MyDurableObject {
|
|||||||
buildInputForResponses(msgs) { if (!Array.isArray(msgs) || !msgs.length) return ''; if (!msgs.some(m => this.isMultimodalMessage(m))) return msgs.length === 1 ? this.extractTextFromMessage(msgs[0]) : msgs.map(m => ({ role: m.role, content: this.extractTextFromMessage(m) })); return msgs.map(m => ({ role: m.role, content: Array.isArray(m.content) ? m.content.map(p => this.mapContentPartToResponses(p)).filter(Boolean) : [{ type: 'input_text', text: String(m.content || '') }] })); }
|
buildInputForResponses(msgs) { if (!Array.isArray(msgs) || !msgs.length) return ''; if (!msgs.some(m => this.isMultimodalMessage(m))) return msgs.length === 1 ? this.extractTextFromMessage(msgs[0]) : msgs.map(m => ({ role: m.role, content: this.extractTextFromMessage(m) })); return msgs.map(m => ({ role: m.role, content: Array.isArray(m.content) ? m.content.map(p => this.mapContentPartToResponses(p)).filter(Boolean) : [{ type: 'input_text', text: String(m.content || '') }] })); }
|
||||||
mapToGoogleContents(msgs) { const c = msgs.reduce((acc, m) => { const r = m.role === 'assistant' ? 'model' : 'user', p = (Array.isArray(m.content) ? m.content : [{ type: 'text', text: String(m.content ?? '') }]).map(p => { if (p.type === 'text') return { text: p.text || '' }; if (p.type === 'image_url' && p.image_url?.url) { const m = p.image_url.url.match(/^data:(image\/\w+);base64,(.*)$/); if (m) return { inline_data: { mime_type: m[1], data: m[2] } }; } return null; }).filter(Boolean); if (!p.length) return acc; if (acc.length && acc.at(-1).role === r) acc.at(-1).parts.push(...p); else acc.push({ role: r, parts: p }); return acc; }, []); if (c.at(-1)?.role !== 'user') c.pop(); return c; }
|
mapToGoogleContents(msgs) { const c = msgs.reduce((acc, m) => { const r = m.role === 'assistant' ? 'model' : 'user', p = (Array.isArray(m.content) ? m.content : [{ type: 'text', text: String(m.content ?? '') }]).map(p => { if (p.type === 'text') return { text: p.text || '' }; if (p.type === 'image_url' && p.image_url?.url) { const m = p.image_url.url.match(/^data:(image\/\w+);base64,(.*)$/); if (m) return { inline_data: { mime_type: m[1], data: m[2] } }; } return null; }).filter(Boolean); if (!p.length) return acc; if (acc.length && acc.at(-1).role === r) acc.at(-1).parts.push(...p); else acc.push({ role: r, parts: p }); return acc; }, []); if (c.at(-1)?.role !== 'user') c.pop(); return c; }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user