mirror of
https://github.com/sune-org/ORP.git
synced 2026-01-13 16:17:59 +00:00
Refactor: Implement row-based storage for RAM efficiency
This commit is contained in:
52
index.js
52
index.js
@@ -55,10 +55,10 @@ export class MyDurableObject {
|
|||||||
this.state = state;
|
this.state = state;
|
||||||
this.env = env;
|
this.env = env;
|
||||||
this.sockets = new Set();
|
this.sockets = new Set();
|
||||||
this.reset();
|
this.resetLocal();
|
||||||
}
|
}
|
||||||
|
|
||||||
reset() {
|
resetLocal() {
|
||||||
this.rid = null;
|
this.rid = null;
|
||||||
this.buffer = [];
|
this.buffer = [];
|
||||||
this.seq = -1;
|
this.seq = -1;
|
||||||
@@ -76,6 +76,11 @@ export class MyDurableObject {
|
|||||||
this.messages = [];
|
this.messages = [];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async resetStorage() {
|
||||||
|
await this.state.storage.deleteAll();
|
||||||
|
this.resetLocal();
|
||||||
|
}
|
||||||
|
|
||||||
corsJSON(obj, status = 200) {
|
corsJSON(obj, status = 200) {
|
||||||
return new Response(JSON.stringify(obj), { status, headers: { 'Content-Type': 'application/json', 'Cache-Control': 'no-store', ...CORS_HEADERS } });
|
return new Response(JSON.stringify(obj), { status, headers: { 'Content-Type': 'application/json', 'Cache-Control': 'no-store', ...CORS_HEADERS } });
|
||||||
}
|
}
|
||||||
@@ -84,13 +89,6 @@ export class MyDurableObject {
|
|||||||
|
|
||||||
bcast(obj) { this.sockets.forEach(ws => this.send(ws, obj)); }
|
bcast(obj) { this.sockets.forEach(ws => this.send(ws, obj)); }
|
||||||
|
|
||||||
getConversationText() {
|
|
||||||
const prompt = (this.messages || []).map(m => `## ${m.role}\n\n${this.extractTextFromMessage(m)}`).join('\n\n---\n\n');
|
|
||||||
const response = this.buffer.map(it => it.text).join('');
|
|
||||||
if (!prompt && !response) return '';
|
|
||||||
return `${prompt}\n\n---\n\n## assistant\n\n${response}`;
|
|
||||||
}
|
|
||||||
|
|
||||||
notify(msg, pri = 3, tags = []) {
|
notify(msg, pri = 3, tags = []) {
|
||||||
if (!this.env.NTFY_URL) return;
|
if (!this.env.NTFY_URL) return;
|
||||||
const headers = { Title: 'Sune ORP', Priority: `${pri}`, Tags: tags.join(',') };
|
const headers = { Title: 'Sune ORP', Priority: `${pri}`, Tags: tags.join(',') };
|
||||||
@@ -106,24 +104,31 @@ export class MyDurableObject {
|
|||||||
const snap = await this.state.storage.get('run').catch(() => null);
|
const snap = await this.state.storage.get('run').catch(() => null);
|
||||||
|
|
||||||
if (!snap || (Date.now() - (snap.savedAt || 0) >= TTL_MS)) {
|
if (!snap || (Date.now() - (snap.savedAt || 0) >= TTL_MS)) {
|
||||||
if (snap) await this.state.storage.delete('run').catch(() => {});
|
if (snap) await this.state.storage.deleteAll().catch(() => {});
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.rid = snap.rid || null;
|
this.rid = snap.rid || null;
|
||||||
this.buffer = Array.isArray(snap.buffer) ? snap.buffer : [];
|
|
||||||
this.seq = Number.isFinite(+snap.seq) ? +snap.seq : -1;
|
this.seq = Number.isFinite(+snap.seq) ? +snap.seq : -1;
|
||||||
this.age = snap.age || 0;
|
this.age = snap.age || 0;
|
||||||
this.phase = snap.phase || 'done';
|
this.phase = snap.phase || 'done';
|
||||||
this.error = snap.error || null;
|
this.error = snap.error || null;
|
||||||
this.messages = Array.isArray(snap.messages) ? snap.messages : [];
|
|
||||||
|
// Load prompt and deltas separately to keep memory footprint low during active runs
|
||||||
|
const [msgs, deltaMap] = await Promise.all([
|
||||||
|
this.state.storage.get('prompt').catch(() => []),
|
||||||
|
this.state.storage.list({ prefix: 'delta:' }).catch(() => new Map())
|
||||||
|
]);
|
||||||
|
|
||||||
|
this.messages = Array.isArray(msgs) ? msgs : [];
|
||||||
|
this.buffer = Array.from(deltaMap.values()).sort((a, b) => a.seq - b.seq);
|
||||||
this.pending = '';
|
this.pending = '';
|
||||||
this.pendingImages = [];
|
this.pendingImages = [];
|
||||||
|
|
||||||
if (this.phase === 'running') {
|
if (this.phase === 'running') {
|
||||||
this.phase = 'evicted';
|
this.phase = 'evicted';
|
||||||
this.error = 'The run was interrupted due to system eviction.';
|
this.error = 'The run was interrupted due to system eviction.';
|
||||||
this.saveSnapshot();
|
await this.saveSnapshot();
|
||||||
this.notify(`Run ${this.rid} evicted`, 4, ['warning']);
|
this.notify(`Run ${this.rid} evicted`, 4, ['warning']);
|
||||||
await this.stopHeartbeat();
|
await this.stopHeartbeat();
|
||||||
}
|
}
|
||||||
@@ -131,7 +136,14 @@ export class MyDurableObject {
|
|||||||
|
|
||||||
saveSnapshot() {
|
saveSnapshot() {
|
||||||
this.lastSavedAt = Date.now();
|
this.lastSavedAt = Date.now();
|
||||||
const snapshot = { rid: this.rid, buffer: this.buffer, seq: this.seq, age: this.age, phase: this.phase, error: this.error, savedAt: this.lastSavedAt, messages: this.messages };
|
const snapshot = {
|
||||||
|
rid: this.rid,
|
||||||
|
seq: this.seq,
|
||||||
|
age: this.age,
|
||||||
|
phase: this.phase,
|
||||||
|
error: this.error,
|
||||||
|
savedAt: this.lastSavedAt
|
||||||
|
};
|
||||||
return this.state.storage.put('run', snapshot).catch(() => {});
|
return this.state.storage.put('run', snapshot).catch(() => {});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -150,6 +162,10 @@ export class MyDurableObject {
|
|||||||
this.buffer.push(item);
|
this.buffer.push(item);
|
||||||
this.bcast({ type: 'delta', seq: this.seq, text: this.pending, images: item.images });
|
this.bcast({ type: 'delta', seq: this.seq, text: this.pending, images: item.images });
|
||||||
|
|
||||||
|
// Save individual delta to storage (RAM efficient)
|
||||||
|
const key = `delta:${String(item.seq).padStart(10, '0')}`;
|
||||||
|
this.state.storage.put(key, item).catch(() => {});
|
||||||
|
|
||||||
this.pending = '';
|
this.pending = '';
|
||||||
this.pendingImages = [];
|
this.pendingImages = [];
|
||||||
this.lastFlushedAt = Date.now();
|
this.lastFlushedAt = Date.now();
|
||||||
@@ -208,12 +224,16 @@ export class MyDurableObject {
|
|||||||
|
|
||||||
if (rid === this.rid && this.phase !== 'idle') return this.replay(ws, Number.isFinite(+after) ? +after : -1);
|
if (rid === this.rid && this.phase !== 'idle') return this.replay(ws, Number.isFinite(+after) ? +after : -1);
|
||||||
|
|
||||||
this.reset();
|
await this.resetStorage();
|
||||||
this.rid = rid;
|
this.rid = rid;
|
||||||
this.phase = 'running';
|
this.phase = 'running';
|
||||||
this.controller = new AbortController();
|
this.controller = new AbortController();
|
||||||
this.messages = body.messages;
|
this.messages = body.messages;
|
||||||
await this.saveSnapshot();
|
|
||||||
|
await Promise.all([
|
||||||
|
this.state.storage.put('prompt', this.messages),
|
||||||
|
this.saveSnapshot()
|
||||||
|
]);
|
||||||
|
|
||||||
this.state.waitUntil(this.startHeartbeat());
|
this.state.waitUntil(this.startHeartbeat());
|
||||||
this.state.waitUntil(this.stream({ apiKey, body, provider: provider || 'openrouter' }));
|
this.state.waitUntil(this.stream({ apiKey, body, provider: provider || 'openrouter' }));
|
||||||
|
|||||||
Reference in New Issue
Block a user