mirror of
https://github.com/sune-org/ORP.git
synced 2026-01-13 16:17:59 +00:00
Fix: Add debug logging to stream output
This commit is contained in:
288
index.js
288
index.js
@@ -25,28 +25,19 @@ export default {
|
|||||||
async fetch(req, env) {
|
async fetch(req, env) {
|
||||||
const url = new URL(req.url);
|
const url = new URL(req.url);
|
||||||
const method = req.method.toUpperCase();
|
const method = req.method.toUpperCase();
|
||||||
|
|
||||||
if (method === 'OPTIONS') return new Response(null, { status: 204, headers: CORS_HEADERS });
|
if (method === 'OPTIONS') return new Response(null, { status: 204, headers: CORS_HEADERS });
|
||||||
|
|
||||||
if ((h => h !== 'sune.planetrenox.com' && h !== 'sune.chat' && !h.endsWith('.github.io'))(new URL(req.headers.get('Origin') || 'null').hostname)) {
|
if ((h => h !== 'sune.planetrenox.com' && h !== 'sune.chat' && !h.endsWith('.github.io'))(new URL(req.headers.get('Origin') || 'null').hostname)) {
|
||||||
return withCORS(new Response('Forbidden', { status: 403 }));
|
return withCORS(new Response('Forbidden', { status: 403 }));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (url.pathname === '/ws') {
|
if (url.pathname === '/ws') {
|
||||||
const isGet = method === 'GET';
|
const isGet = method === 'GET', isWs = req.headers.get('Upgrade') === 'websocket';
|
||||||
const isWs = req.headers.get('Upgrade') === 'websocket';
|
|
||||||
if (!isGet && !isWs) return withCORS(new Response('method not allowed', { status: 405 }));
|
if (!isGet && !isWs) return withCORS(new Response('method not allowed', { status: 405 }));
|
||||||
|
|
||||||
const uid = (url.searchParams.get('uid') || '').slice(0, 64).replace(/[^a-zA-Z0-9_-]/g, '');
|
const uid = (url.searchParams.get('uid') || '').slice(0, 64).replace(/[^a-zA-Z0-9_-]/g, '');
|
||||||
if (!uid) return withCORS(new Response('uid is required', { status: 400 }));
|
if (!uid) return withCORS(new Response('uid is required', { status: 400 }));
|
||||||
|
const id = env.MY_DURABLE_OBJECT.idFromName(uid), stub = env.MY_DURABLE_OBJECT.get(id);
|
||||||
const id = env.MY_DURABLE_OBJECT.idFromName(uid);
|
|
||||||
const stub = env.MY_DURABLE_OBJECT.get(id);
|
|
||||||
|
|
||||||
const resp = await stub.fetch(req);
|
const resp = await stub.fetch(req);
|
||||||
return isWs ? resp : withCORS(resp);
|
return isWs ? resp : withCORS(resp);
|
||||||
}
|
}
|
||||||
|
|
||||||
return withCORS(new Response('not found', { status: 404 }));
|
return withCORS(new Response('not found', { status: 404 }));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -60,21 +51,10 @@ export class MyDurableObject {
|
|||||||
}
|
}
|
||||||
|
|
||||||
reset() {
|
reset() {
|
||||||
this.rid = null;
|
this.rid = null; this.buffer = []; this.seq = -1; this.phase = 'idle'; this.error = null;
|
||||||
this.buffer = [];
|
this.controller = null; this.oaStream = null; this.pending = ''; this.pendingImages = [];
|
||||||
this.seq = -1;
|
this.flushTimer = null; this.lastSavedAt = 0; this.lastFlushedAt = 0; this.hbActive = false;
|
||||||
this.phase = 'idle';
|
this.age = 0; this.messages = [];
|
||||||
this.error = null;
|
|
||||||
this.controller = null;
|
|
||||||
this.oaStream = null;
|
|
||||||
this.pending = '';
|
|
||||||
this.pendingImages = [];
|
|
||||||
this.flushTimer = null;
|
|
||||||
this.lastSavedAt = 0;
|
|
||||||
this.lastFlushedAt = 0;
|
|
||||||
this.hbActive = false;
|
|
||||||
this.age = 0;
|
|
||||||
this.messages = [];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
corsJSON(obj, status = 200) {
|
corsJSON(obj, status = 200) {
|
||||||
@@ -82,64 +62,41 @@ export class MyDurableObject {
|
|||||||
}
|
}
|
||||||
|
|
||||||
send(ws, obj) { try { ws.send(JSON.stringify(obj)); } catch {} }
|
send(ws, obj) { try { ws.send(JSON.stringify(obj)); } catch {} }
|
||||||
|
|
||||||
bcast(obj) { this.sockets.forEach(ws => this.send(ws, obj)); }
|
bcast(obj) { this.sockets.forEach(ws => this.send(ws, obj)); }
|
||||||
|
|
||||||
getConversationText() {
|
|
||||||
const prompt = (this.messages || []).map(m => `## ${m.role}\n\n${this.extractTextFromMessage(m)}`).join('\n\n---\n\n');
|
|
||||||
const response = this.buffer.map(it => it.text).join('');
|
|
||||||
if (!prompt && !response) return '';
|
|
||||||
return `${prompt}\n\n---\n\n## assistant\n\n${response}`;
|
|
||||||
}
|
|
||||||
|
|
||||||
notify(msg, pri = 3, tags = []) {
|
notify(msg, pri = 3, tags = []) {
|
||||||
if (!this.env.NTFY_URL) return;
|
if (!this.env.NTFY_URL) return;
|
||||||
const headers = { Title: 'Sune ORP', Priority: `${pri}`, Tags: tags.join(',') };
|
|
||||||
this.state.waitUntil(fetch(this.env.NTFY_URL, {
|
this.state.waitUntil(fetch(this.env.NTFY_URL, {
|
||||||
method: 'POST',
|
method: 'POST', body: msg, headers: { Title: 'Sune ORP', Priority: `${pri}`, Tags: tags.join(',') }
|
||||||
body: msg,
|
|
||||||
headers,
|
|
||||||
}).catch(e => console.error('ntfy failed:', e)));
|
}).catch(e => console.error('ntfy failed:', e)));
|
||||||
}
|
}
|
||||||
|
|
||||||
async autopsy() {
|
async autopsy() {
|
||||||
if (this.rid) return;
|
if (this.rid) return;
|
||||||
const snap = await this.state.storage.get('run').catch(() => null);
|
const snap = await this.state.storage.get('run').catch(() => null);
|
||||||
|
|
||||||
if (!snap || (Date.now() - (snap.savedAt || 0) >= TTL_MS)) {
|
if (!snap || (Date.now() - (snap.savedAt || 0) >= TTL_MS)) {
|
||||||
if (snap) await this.state.storage.delete('run').catch(() => {});
|
if (snap) await this.state.storage.delete('run').catch(() => {});
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
this.rid = snap.rid; this.buffer = snap.buffer || []; this.seq = +snap.seq || -1;
|
||||||
this.rid = snap.rid || null;
|
this.age = snap.age || 0; this.phase = snap.phase || 'done'; this.error = snap.error;
|
||||||
this.buffer = Array.isArray(snap.buffer) ? snap.buffer : [];
|
this.messages = snap.messages || []; this.pending = ''; this.pendingImages = [];
|
||||||
this.seq = Number.isFinite(+snap.seq) ? +snap.seq : -1;
|
|
||||||
this.age = snap.age || 0;
|
|
||||||
this.phase = snap.phase || 'done';
|
|
||||||
this.error = snap.error || null;
|
|
||||||
this.messages = Array.isArray(snap.messages) ? snap.messages : [];
|
|
||||||
this.pending = '';
|
|
||||||
this.pendingImages = [];
|
|
||||||
|
|
||||||
if (this.phase === 'running') {
|
if (this.phase === 'running') {
|
||||||
this.phase = 'evicted';
|
this.phase = 'evicted'; this.error = 'System eviction interrupted the run.';
|
||||||
this.error = 'The run was interrupted due to system eviction.';
|
this.saveSnapshot(); this.notify(`Run ${this.rid} evicted`, 4, ['warning']);
|
||||||
this.saveSnapshot();
|
|
||||||
this.notify(`Run ${this.rid} evicted`, 4, ['warning']);
|
|
||||||
await this.stopHeartbeat();
|
await this.stopHeartbeat();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
saveSnapshot() {
|
saveSnapshot() {
|
||||||
this.lastSavedAt = Date.now();
|
this.lastSavedAt = Date.now();
|
||||||
const snapshot = { rid: this.rid, buffer: this.buffer, seq: this.seq, age: this.age, phase: this.phase, error: this.error, savedAt: this.lastSavedAt, messages: this.messages };
|
return this.state.storage.put('run', { rid: this.rid, buffer: this.buffer, seq: this.seq, age: this.age, phase: this.phase, error: this.error, savedAt: this.lastSavedAt, messages: this.messages }).catch(() => {});
|
||||||
return this.state.storage.put('run', snapshot).catch(() => {});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
replay(ws, after) {
|
replay(ws, after) {
|
||||||
this.buffer.forEach(it => { if (it.seq > after) this.send(ws, { type: 'delta', seq: it.seq, text: it.text, images: it.images }); });
|
this.buffer.forEach(it => { if (it.seq > after) this.send(ws, { type: 'delta', seq: it.seq, text: it.text, images: it.images }); });
|
||||||
if (this.phase === 'done') this.send(ws, { type: 'done' });
|
if (this.phase === 'done') this.send(ws, { type: 'done' });
|
||||||
else if (['error', 'evicted'].includes(this.phase)) this.send(ws, { type: 'err', message: this.error || 'The run was terminated unexpectedly.' });
|
else if (['error', 'evicted'].includes(this.phase)) this.send(ws, { type: 'err', message: this.error || 'Terminated unexpectedly.' });
|
||||||
}
|
}
|
||||||
|
|
||||||
flush(force = false) {
|
flush(force = false) {
|
||||||
@@ -147,13 +104,9 @@ export class MyDurableObject {
|
|||||||
if (this.pending || this.pendingImages.length > 0) {
|
if (this.pending || this.pendingImages.length > 0) {
|
||||||
const item = { seq: ++this.seq, text: this.pending };
|
const item = { seq: ++this.seq, text: this.pending };
|
||||||
if (this.pendingImages.length > 0) item.images = [...this.pendingImages];
|
if (this.pendingImages.length > 0) item.images = [...this.pendingImages];
|
||||||
|
|
||||||
this.buffer.push(item);
|
this.buffer.push(item);
|
||||||
this.bcast({ type: 'delta', seq: this.seq, text: this.pending, images: item.images });
|
this.bcast({ type: 'delta', seq: this.seq, text: this.pending, images: item.images });
|
||||||
|
this.pending = ''; this.pendingImages = []; this.lastFlushedAt = Date.now();
|
||||||
this.pending = '';
|
|
||||||
this.pendingImages = [];
|
|
||||||
this.lastFlushedAt = Date.now();
|
|
||||||
}
|
}
|
||||||
if (force) this.saveSnapshot();
|
if (force) this.saveSnapshot();
|
||||||
}
|
}
|
||||||
@@ -162,60 +115,41 @@ export class MyDurableObject {
|
|||||||
if (!text && (!images || !images.length)) return;
|
if (!text && (!images || !images.length)) return;
|
||||||
if (text) this.pending += text;
|
if (text) this.pending += text;
|
||||||
if (images) this.pendingImages.push(...images);
|
if (images) this.pendingImages.push(...images);
|
||||||
|
|
||||||
if (this.pending.length >= BATCH_BYTES || this.pendingImages.length > 0) this.flush(false);
|
if (this.pending.length >= BATCH_BYTES || this.pendingImages.length > 0) this.flush(false);
|
||||||
else if (!this.flushTimer) this.flushTimer = setTimeout(() => this.flush(false), BATCH_MS);
|
else if (!this.flushTimer) this.flushTimer = setTimeout(() => this.flush(false), BATCH_MS);
|
||||||
}
|
}
|
||||||
|
|
||||||
async fetch(req) {
|
async fetch(req) {
|
||||||
if (req.method === 'OPTIONS') return new Response(null, { status: 204, headers: CORS_HEADERS });
|
if (req.method === 'OPTIONS') return new Response(null, { status: 204, headers: CORS_HEADERS });
|
||||||
|
|
||||||
if (req.headers.get('Upgrade') === 'websocket') {
|
if (req.headers.get('Upgrade') === 'websocket') {
|
||||||
const [client, server] = Object.values(new WebSocketPair());
|
const [client, server] = Object.values(new WebSocketPair());
|
||||||
server.accept();
|
server.accept(); this.sockets.add(server);
|
||||||
this.sockets.add(server);
|
|
||||||
server.addEventListener('close', () => this.sockets.delete(server));
|
server.addEventListener('close', () => this.sockets.delete(server));
|
||||||
server.addEventListener('message', e => this.state.waitUntil(this.onMessage(server, e)));
|
server.addEventListener('message', e => this.state.waitUntil(this.onMessage(server, e)));
|
||||||
return new Response(null, { status: 101, webSocket: client });
|
return new Response(null, { status: 101, webSocket: client });
|
||||||
}
|
}
|
||||||
|
|
||||||
if (req.method === 'GET') {
|
if (req.method === 'GET') {
|
||||||
await this.autopsy();
|
await this.autopsy();
|
||||||
const text = this.buffer.map(it => it.text).join('') + this.pending;
|
const text = this.buffer.map(it => it.text).join('') + this.pending;
|
||||||
const images = [...this.buffer.flatMap(it => it.images || []), ...this.pendingImages];
|
const images = [...this.buffer.flatMap(it => it.images || []), ...this.pendingImages];
|
||||||
const isTerminal = ['done', 'error', 'evicted'].includes(this.phase);
|
const isTerminal = ['done', 'error', 'evicted'].includes(this.phase);
|
||||||
const isError = ['error', 'evicted'].includes(this.phase);
|
return this.corsJSON({ rid: this.rid, seq: this.seq, phase: this.phase, done: isTerminal, error: ['error', 'evicted'].includes(this.phase) ? (this.error || 'Terminated.') : null, text, images });
|
||||||
const payload = { rid: this.rid, seq: this.seq, phase: this.phase, done: isTerminal, error: isError ? (this.error || 'The run was terminated unexpectedly.') : null, text, images };
|
|
||||||
return this.corsJSON(payload);
|
|
||||||
}
|
}
|
||||||
return this.corsJSON({ error: 'not allowed' }, 405);
|
return this.corsJSON({ error: 'not allowed' }, 405);
|
||||||
}
|
}
|
||||||
|
|
||||||
async onMessage(ws, evt) {
|
async onMessage(ws, evt) {
|
||||||
await this.autopsy();
|
await this.autopsy();
|
||||||
let msg;
|
let msg; try { msg = JSON.parse(String(evt.data || '')); } catch { return this.send(ws, { type: 'err', message: 'bad_json' }); }
|
||||||
try { msg = JSON.parse(String(evt.data || '')); }
|
if (msg.type === 'stop') { if (msg.rid === this.rid) this.stop(); return; }
|
||||||
catch { return this.send(ws, { type: 'err', message: 'bad_json' }); }
|
|
||||||
|
|
||||||
if (msg.type === 'stop') { if (msg.rid && msg.rid === this.rid) this.stop(); return; }
|
|
||||||
if (msg.type !== 'begin') return this.send(ws, { type: 'err', message: 'bad_type' });
|
if (msg.type !== 'begin') return this.send(ws, { type: 'err', message: 'bad_type' });
|
||||||
|
|
||||||
const { rid, apiKey, or_body, model, messages, after, provider } = msg;
|
const { rid, apiKey, or_body, model, messages, after, provider } = msg;
|
||||||
const body = or_body || (model && Array.isArray(messages) ? { model, messages, stream: true, ...msg } : null);
|
const body = or_body || (model && Array.isArray(messages) ? { model, messages, stream: true, ...msg } : null);
|
||||||
|
if (!rid || !apiKey || !body || !body.messages?.length) return this.send(ws, { type: 'err', message: 'missing_fields' });
|
||||||
if (!rid || !apiKey || !body || !Array.isArray(body.messages) || body.messages.length === 0) return this.send(ws, { type: 'err', message: 'missing_fields' });
|
|
||||||
|
|
||||||
if (this.phase === 'running' && rid !== this.rid) return this.send(ws, { type: 'err', message: 'busy' });
|
if (this.phase === 'running' && rid !== this.rid) return this.send(ws, { type: 'err', message: 'busy' });
|
||||||
|
|
||||||
if (rid === this.rid && this.phase !== 'idle') return this.replay(ws, Number.isFinite(+after) ? +after : -1);
|
if (rid === this.rid && this.phase !== 'idle') return this.replay(ws, Number.isFinite(+after) ? +after : -1);
|
||||||
|
this.reset(); this.rid = rid; this.phase = 'running'; this.controller = new AbortController(); this.messages = body.messages;
|
||||||
this.reset();
|
|
||||||
this.rid = rid;
|
|
||||||
this.phase = 'running';
|
|
||||||
this.controller = new AbortController();
|
|
||||||
this.messages = body.messages;
|
|
||||||
await this.saveSnapshot();
|
await this.saveSnapshot();
|
||||||
|
|
||||||
this.state.waitUntil(this.startHeartbeat());
|
this.state.waitUntil(this.startHeartbeat());
|
||||||
this.state.waitUntil(this.stream({ apiKey, body, provider: provider || 'openrouter' }));
|
this.state.waitUntil(this.stream({ apiKey, body, provider: provider || 'openrouter' }));
|
||||||
}
|
}
|
||||||
@@ -229,82 +163,48 @@ export class MyDurableObject {
|
|||||||
const msg = String(e?.message || 'stream_failed');
|
const msg = String(e?.message || 'stream_failed');
|
||||||
if (!((e && e.name === 'AbortError') || /abort/i.test(msg))) this.fail(msg);
|
if (!((e && e.name === 'AbortError') || /abort/i.test(msg))) this.fail(msg);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally { if (this.phase === 'running') this.stop(); }
|
||||||
if (this.phase === 'running') this.stop();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async streamOpenAI({ apiKey, body }) {
|
async streamOpenAI({ apiKey, body }) {
|
||||||
const client = new OpenAI({ apiKey });
|
const client = new OpenAI({ apiKey });
|
||||||
const params = { model: body.model, input: this.buildInputForResponses(body.messages || []), temperature: body.temperature, stream: true };
|
const params = { model: body.model, input: this.buildInputForResponses(body.messages || []), temperature: body.temperature, stream: true };
|
||||||
if (Number.isFinite(+body.max_tokens) && +body.max_tokens > 0) params.max_output_tokens = +body.max_tokens;
|
|
||||||
if (Number.isFinite(+body.top_p)) params.top_p = +body.top_p;
|
|
||||||
if (body.reasoning?.effort) params.reasoning = { effort: body.reasoning.effort };
|
if (body.reasoning?.effort) params.reasoning = { effort: body.reasoning.effort };
|
||||||
if (body.verbosity) params.text = { verbosity: body.verbosity };
|
|
||||||
this.oaStream = await client.responses.stream(params);
|
this.oaStream = await client.responses.stream(params);
|
||||||
try {
|
|
||||||
for await (const event of this.oaStream) {
|
for await (const event of this.oaStream) {
|
||||||
if (this.phase !== 'running') break;
|
if (this.phase !== 'running') break;
|
||||||
if (event.type.endsWith('.delta') && event.delta) this.queueDelta(event.delta);
|
if (event.type.endsWith('.delta') && event.delta) this.queueDelta(event.delta);
|
||||||
}
|
}
|
||||||
} finally {
|
|
||||||
try { this.oaStream?.controller?.abort(); } catch {}
|
|
||||||
this.oaStream = null;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async streamClaude({ apiKey, body }) {
|
async streamClaude({ apiKey, body }) {
|
||||||
const client = new Anthropic({ apiKey });
|
const client = new Anthropic({ apiKey });
|
||||||
const system = body.messages
|
const system = body.messages.filter(m => m.role === 'system').map(m => this.extractTextFromMessage(m)).join('\n\n') || body.system;
|
||||||
.filter(m => m.role === 'system')
|
|
||||||
.map(m => this.extractTextFromMessage(m))
|
|
||||||
.join('\n\n') || body.system;
|
|
||||||
const payload = {
|
const payload = {
|
||||||
model: body.model,
|
model: body.model, max_tokens: body.max_tokens || 64000,
|
||||||
messages: body.messages.filter(m => m.role !== 'system').map(m => ({
|
messages: body.messages.filter(m => m.role !== 'system').map(m => ({
|
||||||
role: m.role,
|
role: m.role, content: (Array.isArray(m.content) ? m.content : [{type:'text',text:String(m.content)}]).map(p => {
|
||||||
content: typeof m.content === 'string' ? m.content : (m.content || []).map(p => {
|
if (p.type === 'text') return { type: 'text', text: p.text };
|
||||||
if (p.type === 'text' && p.text) return { type: 'text', text: p.text };
|
|
||||||
if (p.type === 'image_url') {
|
if (p.type === 'image_url') {
|
||||||
const match = String(p.image_url?.url || p.image_url || '').match(/^data:(image\/\w+);base64,(.*)$/);
|
const m = String(p.image_url?.url || '').match(/^data:(image\/\w+);base64,(.*)$/);
|
||||||
if (match) return { type: 'image', source: { type: 'base64', media_type: match[1], data: match[2] } };
|
if (m) return { type: 'image', source: { type: 'base64', media_type: m[1], data: m[2] } };
|
||||||
}
|
}
|
||||||
}).filter(Boolean)
|
}).filter(Boolean)
|
||||||
})).filter(m => m.content.length),
|
})).filter(m => m.content.length)
|
||||||
max_tokens: body.max_tokens || 64000,
|
|
||||||
};
|
};
|
||||||
if (system) payload.system = system;
|
if (system) payload.system = system;
|
||||||
if (Number.isFinite(+body.temperature)) payload.temperature = +body.temperature;
|
|
||||||
if (Number.isFinite(+body.top_p)) payload.top_p = +body.top_p;
|
|
||||||
if (body.reasoning?.enabled) payload.extended_thinking = { enabled: true, ...(body.reasoning.budget && { max_thinking_tokens: body.reasoning.budget }) };
|
|
||||||
|
|
||||||
const stream = client.messages.stream(payload);
|
const stream = client.messages.stream(payload);
|
||||||
stream.on('text', text => { if (this.phase === 'running') this.queueDelta(text); });
|
stream.on('text', text => { if (this.phase === 'running') this.queueDelta(text); });
|
||||||
await stream.finalMessage();
|
await stream.finalMessage();
|
||||||
}
|
}
|
||||||
|
|
||||||
async streamGoogle({ apiKey, body }) {
|
async streamGoogle({ apiKey, body }) {
|
||||||
const generationConfig = Object.entries({ temperature: body.temperature, topP: body.top_p, maxOutputTokens: body.max_tokens }).reduce((acc, [k, v]) => (Number.isFinite(+v) && +v >= 0 ? { ...acc, [k]: +v } : acc), {});
|
|
||||||
if (body.reasoning) generationConfig.thinkingConfig = { includeThoughts: body.reasoning.exclude !== true, ...(body.reasoning.effort && body.reasoning.effort !== 'default' && { thinkingLevel: body.reasoning.effort }) };
|
|
||||||
if (body.response_format?.type?.startsWith('json')) {
|
|
||||||
generationConfig.responseMimeType = 'application/json';
|
|
||||||
if (body.response_format.json_schema) {
|
|
||||||
const translate = s => {
|
|
||||||
if (typeof s !== 'object' || s === null) return s;
|
|
||||||
const n = Array.isArray(s) ? [] : {};
|
|
||||||
for (const k in s) if (Object.hasOwn(s, k)) n[k] = (k === 'type' && typeof s[k] === 'string') ? s[k].toUpperCase() : translate(s[k]);
|
|
||||||
return n;
|
|
||||||
};
|
|
||||||
generationConfig.responseSchema = translate(body.response_format.json_schema.schema || body.response_format.json_schema);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
const model = (body.model ?? '').replace(/:online$/, '');
|
const model = (body.model ?? '').replace(/:online$/, '');
|
||||||
const payload = { contents: this.mapToGoogleContents(body.messages), ...(Object.keys(generationConfig).length && { generationConfig }), ...((body.model ?? '').endsWith(':online') && { tools: [{ google_search: {} }] }) };
|
const payload = { contents: this.mapToGoogleContents(body.messages) };
|
||||||
const resp = await fetch(`https://generativelanguage.googleapis.com/v1beta/models/${model}:streamGenerateContent?alt=sse`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'x-goog-api-key': apiKey }, body: JSON.stringify(payload), signal: this.controller.signal });
|
const resp = await fetch(`https://generativelanguage.googleapis.com/v1beta/models/${model}:streamGenerateContent?alt=sse`, { method: 'POST', headers: { 'Content-Type': 'application/json', 'x-goog-api-key': apiKey }, body: JSON.stringify(payload), signal: this.controller.signal });
|
||||||
if (!resp.ok) throw new Error(`Google API error: ${resp.status} ${await resp.text()}`);
|
if (!resp.ok) throw new Error(`Google API error: ${resp.status}`);
|
||||||
const reader = resp.body.getReader();
|
const reader = resp.body.getReader(), decoder = new TextDecoder();
|
||||||
const decoder = new TextDecoder();
|
let buffer = '';
|
||||||
let buffer = '', hasReasoning = false, hasContent = false;
|
|
||||||
while (this.phase === 'running') {
|
while (this.phase === 'running') {
|
||||||
const { done, value } = await reader.read();
|
const { done, value } = await reader.read();
|
||||||
if (done) break;
|
if (done) break;
|
||||||
@@ -313,15 +213,8 @@ export class MyDurableObject {
|
|||||||
if (!line.startsWith('data: ')) continue;
|
if (!line.startsWith('data: ')) continue;
|
||||||
try {
|
try {
|
||||||
JSON.parse(line.substring(6))?.candidates?.[0]?.content?.parts?.forEach(p => {
|
JSON.parse(line.substring(6))?.candidates?.[0]?.content?.parts?.forEach(p => {
|
||||||
if (p.thought?.thought) {
|
if (p.thought?.thought) this.queueDelta(p.thought.thought);
|
||||||
this.queueDelta(p.thought.thought);
|
if (p.text) this.queueDelta(p.text);
|
||||||
hasReasoning = true;
|
|
||||||
}
|
|
||||||
if (p.text) {
|
|
||||||
if (hasReasoning && !hasContent) this.queueDelta('\n');
|
|
||||||
this.queueDelta(p.text);
|
|
||||||
hasContent = true;
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
} catch {}
|
} catch {}
|
||||||
}
|
}
|
||||||
@@ -331,121 +224,50 @@ export class MyDurableObject {
|
|||||||
|
|
||||||
async streamOpenRouter({ apiKey, body }) {
|
async streamOpenRouter({ apiKey, body }) {
|
||||||
const client = new OpenRouter({ apiKey, defaultHeaders: { 'HTTP-Referer': 'https://sune.chat', 'X-Title': 'Sune' } });
|
const client = new OpenRouter({ apiKey, defaultHeaders: { 'HTTP-Referer': 'https://sune.chat', 'X-Title': 'Sune' } });
|
||||||
|
this.queueDelta(`> [DEBUG] Requesting ${body.model} (modalities: ${JSON.stringify(body.modalities || 'default')})\n\n`);
|
||||||
|
try {
|
||||||
const stream = await client.chat.send({ ...body, stream: true });
|
const stream = await client.chat.send({ ...body, stream: true });
|
||||||
let hasReasoning = false, hasContent = false;
|
let hasReasoning = false, hasContent = false, collectedImages = [];
|
||||||
const collectedImages = [];
|
|
||||||
for await (const chunk of stream) {
|
for await (const chunk of stream) {
|
||||||
if (this.phase !== 'running') break;
|
if (this.phase !== 'running') break;
|
||||||
const delta = chunk?.choices?.[0]?.delta;
|
const delta = chunk?.choices?.[0]?.delta;
|
||||||
if (!delta) continue;
|
if (!delta) continue;
|
||||||
|
if (delta.reasoning && body.reasoning?.exclude !== true) { this.queueDelta(delta.reasoning); hasReasoning = true; }
|
||||||
if (delta.reasoning && body.reasoning?.exclude !== true) {
|
if (delta.content) { if (hasReasoning && !hasContent) this.queueDelta('\n'); this.queueDelta(delta.content); hasContent = true; }
|
||||||
this.queueDelta(delta.reasoning);
|
|
||||||
hasReasoning = true;
|
|
||||||
}
|
|
||||||
if (delta.content) {
|
|
||||||
if (hasReasoning && !hasContent) this.queueDelta('\n');
|
|
||||||
this.queueDelta(delta.content);
|
|
||||||
hasContent = true;
|
|
||||||
}
|
|
||||||
if (Array.isArray(delta.images)) collectedImages.push(...delta.images);
|
if (Array.isArray(delta.images)) collectedImages.push(...delta.images);
|
||||||
}
|
}
|
||||||
if (collectedImages.length) this.queueDelta('', collectedImages);
|
if (collectedImages.length) this.queueDelta('', collectedImages);
|
||||||
|
else if (!hasContent) this.queueDelta(`> [DEBUG] Stream ended with no content/images. Check if model supports streaming for this modality.`);
|
||||||
|
} catch (e) { this.queueDelta(`\n\n> [DEBUG] OR Error: ${e.message}`); throw e; }
|
||||||
}
|
}
|
||||||
|
|
||||||
stop() {
|
stop() {
|
||||||
if (this.phase !== 'running') return;
|
if (this.phase !== 'running') return;
|
||||||
this.flush(true);
|
this.flush(true); this.phase = 'done'; this.error = null;
|
||||||
this.phase = 'done';
|
|
||||||
this.error = null;
|
|
||||||
try { this.controller?.abort(); } catch {}
|
try { this.controller?.abort(); } catch {}
|
||||||
try { this.oaStream?.controller?.abort(); } catch {}
|
try { this.oaStream?.controller?.abort(); } catch {}
|
||||||
this.saveSnapshot();
|
this.saveSnapshot(); this.bcast({ type: 'done' });
|
||||||
this.bcast({ type: 'done' });
|
|
||||||
this.state.waitUntil(this.stopHeartbeat());
|
this.state.waitUntil(this.stopHeartbeat());
|
||||||
}
|
}
|
||||||
|
|
||||||
fail(message) {
|
fail(message) {
|
||||||
if (this.phase !== 'running') return;
|
if (this.phase !== 'running') return;
|
||||||
this.flush(true);
|
this.flush(true); this.phase = 'error'; this.error = String(message || 'stream_failed');
|
||||||
this.phase = 'error';
|
|
||||||
this.error = String(message || 'stream_failed');
|
|
||||||
try { this.controller?.abort(); } catch {}
|
try { this.controller?.abort(); } catch {}
|
||||||
try { this.oaStream?.controller?.abort(); } catch {}
|
try { this.oaStream?.controller?.abort(); } catch {}
|
||||||
this.saveSnapshot();
|
this.saveSnapshot(); this.bcast({ type: 'err', message: this.error });
|
||||||
this.bcast({ type: 'err', message: this.error });
|
|
||||||
this.notify(`Run ${this.rid} failed: ${this.error}`, 3, ['rotating_light']);
|
this.notify(`Run ${this.rid} failed: ${this.error}`, 3, ['rotating_light']);
|
||||||
this.state.waitUntil(this.stopHeartbeat());
|
this.state.waitUntil(this.stopHeartbeat());
|
||||||
}
|
}
|
||||||
|
|
||||||
async startHeartbeat() {
|
async startHeartbeat() { if (!this.hbActive && this.phase === 'running') { this.hbActive = true; await this.state.storage.setAlarm(Date.now() + HB_INTERVAL_MS).catch(() => {}); } }
|
||||||
if (this.hbActive || this.phase !== 'running') return;
|
async stopHeartbeat() { if (this.hbActive) { this.hbActive = false; this.notify(`Run ${this.rid} ended. Age: ${((this.age * HB_INTERVAL_MS) / 1000).toFixed(1)}s.`, 3, ['stop_sign']); await this.state.storage.setAlarm(null).catch(() => {}); } }
|
||||||
this.hbActive = true;
|
async Heart() { if (this.phase !== 'running' || !this.hbActive) return this.stopHeartbeat(); if (++this.age * HB_INTERVAL_MS >= MAX_RUN_MS) return this.fail(`Timeout.`); await this.state.storage.setAlarm(Date.now() + HB_INTERVAL_MS).catch(() => {}); }
|
||||||
await this.state.storage.setAlarm(Date.now() + HB_INTERVAL_MS).catch(() => {});
|
async alarm() { await this.autopsy(); await this.Heart(); }
|
||||||
}
|
|
||||||
|
|
||||||
async stopHeartbeat() {
|
isMultimodalMessage(m) { return m && Array.isArray(m.content) && m.content.some(p => p?.type && !['text', 'input_text'].includes(p.type)); }
|
||||||
if (!this.hbActive) return;
|
extractTextFromMessage(m) { if (!m) return ''; if (typeof m.content === 'string') return m.content; if (!Array.isArray(m.content)) return ''; return m.content.filter(p => p && ['text', 'input_text'].includes(p.type)).map(p => String(p.text ?? p.content ?? '')).join(''); }
|
||||||
this.hbActive = false;
|
mapContentPartToResponses(p) { const t = p?.type || 'text'; if (['image_url', 'input_image'].includes(t)) return (p?.image_url?.url || p?.image_url) ? { type: 'input_image', image_url: String(p?.image_url?.url || p?.image_url) } : null; return { type: 'input_text', text: String(t === 'text' ? (p.text ?? p.content ?? '') : (p.text ?? '')) }; }
|
||||||
const ageSeconds = (this.age * HB_INTERVAL_MS) / 1000;
|
buildInputForResponses(msgs) { if (!Array.isArray(msgs) || !msgs.length) return ''; if (!msgs.some(m => this.isMultimodalMessage(m))) return msgs.length === 1 ? this.extractTextFromMessage(msgs[0]) : msgs.map(m => ({ role: m.role, content: this.extractTextFromMessage(m) })); return msgs.map(m => ({ role: m.role, content: Array.isArray(m.content) ? m.content.map(p => this.mapContentPartToResponses(p)).filter(Boolean) : [{ type: 'input_text', text: String(m.content || '') }] })); }
|
||||||
this.notify(`Run ${this.rid} ended. Phase: ${this.phase}. Age: ${ageSeconds.toFixed(1)}s.`, 3, ['stop_sign']);
|
mapToGoogleContents(msgs) { const c = msgs.reduce((acc, m) => { const r = m.role === 'assistant' ? 'model' : 'user', p = (Array.isArray(m.content) ? m.content : [{ type: 'text', text: String(m.content ?? '') }]).map(p => { if (p.type === 'text') return { text: p.text || '' }; if (p.type === 'image_url' && p.image_url?.url) { const m = p.image_url.url.match(/^data:(image\/\w+);base64,(.*)$/); if (m) return { inline_data: { mime_type: m[1], data: m[2] } }; } return null; }).filter(Boolean); if (!p.length) return acc; if (acc.length && acc.at(-1).role === r) acc.at(-1).parts.push(...p); else acc.push({ role: r, parts: p }); return acc; }, []); if (c.at(-1)?.role !== 'user') c.pop(); return c; }
|
||||||
await this.state.storage.setAlarm(null).catch(() => {});
|
|
||||||
}
|
|
||||||
|
|
||||||
async Heart() {
|
|
||||||
if (this.phase !== 'running' || !this.hbActive) return this.stopHeartbeat();
|
|
||||||
if (++this.age * HB_INTERVAL_MS >= MAX_RUN_MS) return this.fail(`Run timed out after ${MAX_RUN_MS / 60000} minutes.`);
|
|
||||||
await this.state.storage.setAlarm(Date.now() + HB_INTERVAL_MS).catch(() => {});
|
|
||||||
}
|
|
||||||
|
|
||||||
async alarm() {
|
|
||||||
await this.autopsy();
|
|
||||||
await this.Heart();
|
|
||||||
}
|
|
||||||
|
|
||||||
isMultimodalMessage(m) { return m && Array.isArray(m.content) && m.content.some(p => p?.type && p.type !== 'text' && p.type !== 'input_text'); }
|
|
||||||
|
|
||||||
extractTextFromMessage(m) {
|
|
||||||
if (!m) return '';
|
|
||||||
if (typeof m.content === 'string') return String(m.content);
|
|
||||||
if (!Array.isArray(m.content)) return '';
|
|
||||||
return m.content.filter(p => p && ['text', 'input_text'].includes(p.type)).map(p => String(p.type === 'text' ? (p.text ?? p.content ?? '') : (p.text ?? ''))).join('');
|
|
||||||
}
|
|
||||||
|
|
||||||
mapContentPartToResponses(part) {
|
|
||||||
const type = part?.type || 'text';
|
|
||||||
if (['image_url', 'input_image'].includes(type)) return (part?.image_url?.url || part?.image_url) ? { type: 'input_image', image_url: String(part?.image_url?.url || part?.image_url) } : null;
|
|
||||||
if (['text', 'input_text'].includes(type)) return { type: 'input_text', text: String(type === 'text' ? (part.text ?? part.content ?? '') : (part.text ?? '')) };
|
|
||||||
return { type: 'input_text', text: `[${type}:${part?.file?.filename || 'file'}]` };
|
|
||||||
}
|
|
||||||
|
|
||||||
buildInputForResponses(messages) {
|
|
||||||
if (!Array.isArray(messages) || messages.length === 0) return '';
|
|
||||||
if (!messages.some(m => this.isMultimodalMessage(m))) {
|
|
||||||
if (messages.length === 1) return this.extractTextFromMessage(messages[0]);
|
|
||||||
return messages.map(m => ({ role: m.role, content: this.extractTextFromMessage(m) }));
|
|
||||||
}
|
|
||||||
return messages.map(m => ({ role: m.role, content: Array.isArray(m.content) ? m.content.map(p => this.mapContentPartToResponses(p)).filter(Boolean) : [{ type: 'input_text', text: String(m.content || '') }] }));
|
|
||||||
}
|
|
||||||
|
|
||||||
mapToGoogleContents(messages) {
|
|
||||||
const contents = messages.reduce((acc, m) => {
|
|
||||||
const role = m.role === 'assistant' ? 'model' : 'user';
|
|
||||||
const msgContent = Array.isArray(m.content) ? m.content : [{ type: 'text', text: String(m.content ?? '') }];
|
|
||||||
const parts = msgContent.map(p => {
|
|
||||||
if (p.type === 'text') return { text: p.text || '' };
|
|
||||||
if (p.type === 'image_url' && p.image_url?.url) {
|
|
||||||
const match = p.image_url.url.match(/^data:(image\/\w+);base64,(.*)$/);
|
|
||||||
if (match) return { inline_data: { mime_type: match[1], data: match[2] } };
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}).filter(Boolean);
|
|
||||||
if (!parts.length) return acc;
|
|
||||||
if (acc.length > 0 && acc.at(-1).role === role) acc.at(-1).parts.push(...parts);
|
|
||||||
else acc.push({ role, parts });
|
|
||||||
return acc;
|
|
||||||
}, []);
|
|
||||||
if (contents.at(-1)?.role !== 'user') contents.pop();
|
|
||||||
return contents;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user