Refactor: Use official Anthropic SDK for Claude streaming

This commit is contained in:
2025-10-03 16:41:44 -07:00
parent ec22f61c90
commit b346389d85

View File

@@ -1,4 +1,5 @@
import OpenAI from 'openai'; import OpenAI from 'openai';
import Anthropic from '@anthropic-ai/sdk';
const TTL_MS = 20 * 60 * 1000; const TTL_MS = 20 * 60 * 1000;
const BATCH_MS = 800; const BATCH_MS = 800;
@@ -221,6 +222,7 @@ export class MyDurableObject {
} }
async streamClaude({ apiKey, body }) { async streamClaude({ apiKey, body }) {
const client = new Anthropic({ apiKey });
const system = body.messages const system = body.messages
.filter(m => m.role === 'system') .filter(m => m.role === 'system')
.map(m => this.extractTextFromMessage(m)) .map(m => this.extractTextFromMessage(m))
@@ -238,35 +240,15 @@ export class MyDurableObject {
}).filter(Boolean) }).filter(Boolean)
})).filter(m => m.content.length), })).filter(m => m.content.length),
max_tokens: body.max_tokens || 64000, max_tokens: body.max_tokens || 64000,
stream: true,
}; };
if (system) payload.system = system; if (system) payload.system = system;
if (Number.isFinite(+body.temperature)) payload.temperature = +body.temperature; if (Number.isFinite(+body.temperature)) payload.temperature = +body.temperature;
if (Number.isFinite(+body.top_p)) payload.top_p = +body.top_p; if (Number.isFinite(+body.top_p)) payload.top_p = +body.top_p;
if (body.reasoning?.enabled) payload.extended_thinking = { enabled: true, ...(body.reasoning.budget && { max_thinking_tokens: body.reasoning.budget }) }; if (body.reasoning?.enabled) payload.extended_thinking = { enabled: true, ...(body.reasoning.budget && { max_thinking_tokens: body.reasoning.budget }) };
const resp = await fetch('https://api.anthropic.com/v1/messages', { method: 'POST', headers: { 'Content-Type': 'application/json', 'x-api-key': apiKey, 'anthropic-version': '2023-06-01' }, body: JSON.stringify(payload), signal: this.controller.signal }); const stream = client.messages.stream(payload);
if (!resp.ok) throw new Error(`Claude API error: ${resp.status} ${await resp.text()}`); stream.on('text', text => { if (this.phase === 'running') this.queueDelta(text); });
await stream.finalMessage();
const reader = resp.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (this.phase === 'running') {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const data = line.slice(6).trim();
if (data === '[DONE]') break;
try {
const event = JSON.parse(data);
if (event.type === 'content_block_delta' && event.delta?.text) this.queueDelta(event.delta.text);
} catch {}
}
}
} }
async streamGoogle({ apiKey, body }) { async streamGoogle({ apiKey, body }) {