From 25a522c8101481927c55734e4f826f8d9bec2636 Mon Sep 17 00:00:00 2001 From: multipleof4 Date: Mon, 29 Dec 2025 05:55:25 -0800 Subject: [PATCH] Refactor: Implement row-based storage for RAM efficiency --- index.js | 52 ++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 16 deletions(-) diff --git a/index.js b/index.js index 9d98420..08d2240 100644 --- a/index.js +++ b/index.js @@ -55,10 +55,10 @@ export class MyDurableObject { this.state = state; this.env = env; this.sockets = new Set(); - this.reset(); + this.resetLocal(); } - reset() { + resetLocal() { this.rid = null; this.buffer = []; this.seq = -1; @@ -76,6 +76,11 @@ export class MyDurableObject { this.messages = []; } + async resetStorage() { + await this.state.storage.deleteAll(); + this.resetLocal(); + } + corsJSON(obj, status = 200) { return new Response(JSON.stringify(obj), { status, headers: { 'Content-Type': 'application/json', 'Cache-Control': 'no-store', ...CORS_HEADERS } }); } @@ -84,13 +89,6 @@ export class MyDurableObject { bcast(obj) { this.sockets.forEach(ws => this.send(ws, obj)); } - getConversationText() { - const prompt = (this.messages || []).map(m => `## ${m.role}\n\n${this.extractTextFromMessage(m)}`).join('\n\n---\n\n'); - const response = this.buffer.map(it => it.text).join(''); - if (!prompt && !response) return ''; - return `${prompt}\n\n---\n\n## assistant\n\n${response}`; - } - notify(msg, pri = 3, tags = []) { if (!this.env.NTFY_URL) return; const headers = { Title: 'Sune ORP', Priority: `${pri}`, Tags: tags.join(',') }; @@ -106,24 +104,31 @@ export class MyDurableObject { const snap = await this.state.storage.get('run').catch(() => null); if (!snap || (Date.now() - (snap.savedAt || 0) >= TTL_MS)) { - if (snap) await this.state.storage.delete('run').catch(() => {}); + if (snap) await this.state.storage.deleteAll().catch(() => {}); return; } this.rid = snap.rid || null; - this.buffer = Array.isArray(snap.buffer) ? snap.buffer : []; this.seq = Number.isFinite(+snap.seq) ? +snap.seq : -1; this.age = snap.age || 0; this.phase = snap.phase || 'done'; this.error = snap.error || null; - this.messages = Array.isArray(snap.messages) ? snap.messages : []; + + // Load prompt and deltas separately to keep memory footprint low during active runs + const [msgs, deltaMap] = await Promise.all([ + this.state.storage.get('prompt').catch(() => []), + this.state.storage.list({ prefix: 'delta:' }).catch(() => new Map()) + ]); + + this.messages = Array.isArray(msgs) ? msgs : []; + this.buffer = Array.from(deltaMap.values()).sort((a, b) => a.seq - b.seq); this.pending = ''; this.pendingImages = []; if (this.phase === 'running') { this.phase = 'evicted'; this.error = 'The run was interrupted due to system eviction.'; - this.saveSnapshot(); + await this.saveSnapshot(); this.notify(`Run ${this.rid} evicted`, 4, ['warning']); await this.stopHeartbeat(); } @@ -131,7 +136,14 @@ export class MyDurableObject { saveSnapshot() { this.lastSavedAt = Date.now(); - const snapshot = { rid: this.rid, buffer: this.buffer, seq: this.seq, age: this.age, phase: this.phase, error: this.error, savedAt: this.lastSavedAt, messages: this.messages }; + const snapshot = { + rid: this.rid, + seq: this.seq, + age: this.age, + phase: this.phase, + error: this.error, + savedAt: this.lastSavedAt + }; return this.state.storage.put('run', snapshot).catch(() => {}); } @@ -150,6 +162,10 @@ export class MyDurableObject { this.buffer.push(item); this.bcast({ type: 'delta', seq: this.seq, text: this.pending, images: item.images }); + // Save individual delta to storage (RAM efficient) + const key = `delta:${String(item.seq).padStart(10, '0')}`; + this.state.storage.put(key, item).catch(() => {}); + this.pending = ''; this.pendingImages = []; this.lastFlushedAt = Date.now(); @@ -208,12 +224,16 @@ export class MyDurableObject { if (rid === this.rid && this.phase !== 'idle') return this.replay(ws, Number.isFinite(+after) ? +after : -1); - this.reset(); + await this.resetStorage(); this.rid = rid; this.phase = 'running'; this.controller = new AbortController(); this.messages = body.messages; - await this.saveSnapshot(); + + await Promise.all([ + this.state.storage.put('prompt', this.messages), + this.saveSnapshot() + ]); this.state.waitUntil(this.startHeartbeat()); this.state.waitUntil(this.stream({ apiKey, body, provider: provider || 'openrouter' }));