diff --git a/public/sw.js b/public/sw.js index 420ff9f..eab7faa 100644 --- a/public/sw.js +++ b/public/sw.js @@ -1,42 +1,101 @@ // /sw.js -// Enhanced: tracks whether streaming fetches are being teed and reports status on ping. -// Drop this at the root: /sw.js +// Service worker that tees streaming responses and continuously overwrites the latest +// thread in localForage (key: 'threads_v1') with the accumulating assistant text. +// Keeps ping/pong and PING_STATUS support and broadcasts live events. +// +// Requirements: place this at root (/sw.js). No changes to index required. -const TARGET_SUBSTRING = 'openrouter.ai/api/v1/chat/completions'; // adjust if different -const STATE_TTL_MS = 24 * 60 * 60 * 1000; +importScripts('https://cdn.jsdelivr.net/npm/localforage@1.10.0/dist/localforage.min.js'); -const state = { - totalIntercepted: 0, - activeStreams: {}, // streamId -> { url, startedAt, bytes, lastProgressAt, status } - lastStreamSummary: null // summary of last finished stream -}; +const TARGET_SUBSTRING = 'openrouter.ai/api/v1/chat/completions'; // change if needed +const THREADS_KEY = 'threads_v1'; +const SAVE_BYTES_THRESHOLD = 8 * 1024; // flush every ~8KB of new text +const SAVE_TIME_THRESHOLD = 1000; // or at least every 1s +const BROADCAST_THROTTLE_MS = 700; // throttle progress broadcasts -const gid = () => Math.random().toString(36).slice(2, 9) + '-' + Date.now().toString(36); +/* --- Utilities --- */ +const gid = () => Math.random().toString(36).slice(2,9) + '-' + Date.now().toString(36); +const now = () => Date.now(); -async function broadcast(msg) { +async function readThreads() { try { - const clientsList = await self.clients.matchAll({ includeUncontrolled: true, type: 'window' }); - for (const c of clientsList) { - try { c.postMessage(msg); } catch (e) { /* ignore client errors */ } - } - } catch(e) { /* ignore */ } + const v = await localforage.getItem(THREADS_KEY); + return Array.isArray(v) ? v : []; + } catch (e) { + console.error('sw: readThreads error', e); + return []; + } } - -function cleanupState() { - const now = Date.now(); - for (const k of Object.keys(state.activeStreams)) { - if ((now - (state.activeStreams[k].lastProgressAt || state.activeStreams[k].startedAt)) > STATE_TTL_MS) { - delete state.activeStreams[k]; - } +async function writeThreads(arr) { + try { + await localforage.setItem(THREADS_KEY, arr); + } catch (e) { + console.error('sw: writeThreads error', e); + throw e; } } -// install/activate -self.addEventListener('install', (e) => { self.skipWaiting(); }); -self.addEventListener('activate', (e) => { e.waitUntil(self.clients.claim()); }); +/* pick last thread heuristic: newest updatedAt, fallback to first */ +function pickLastThread(threads) { + if (!threads || threads.length === 0) return null; + let sorted = [...threads].sort((a,b) => (b.updatedAt||0) - (a.updatedAt||0)); + return sorted[0]; +} -// fetch: attempt to tee target streaming responses and track progress -self.addEventListener('fetch', (event) => { +/* Upsert assistant message in a thread by sw_streamId (overwrite content) */ +function upsertAssistantInThreadObj(threadObj, streamId, text) { + threadObj.updatedAt = now(); + // look for existing message with sw_streamId (search from end) + for (let i = threadObj.messages.length - 1; i >= 0; i--) { + const m = threadObj.messages[i]; + if (m && m.sw_streamId === streamId) { + m.content = text; + m.contentParts = [{ type: 'text', text }]; + m.updatedAt = now(); + m._sw_savedAt = now(); + return threadObj; + } + } + // not found: append a new assistant message + const msg = { + id: 'swmsg-' + gid(), + role: 'assistant', + content: text, + contentParts: [{ type: 'text', text }], + kind: 'assistant', + sw_saved: true, + sw_streamId: streamId, + createdAt: now(), + updatedAt: now(), + _sw_savedAt: now() + }; + threadObj.messages.push(msg); + return threadObj; +} + +/* Broadcast helpers */ +async function broadcast(msg) { + try { + const cl = await self.clients.matchAll({ includeUncontrolled: true, type: 'window' }); + for (const c of cl) { + try { c.postMessage(msg); } catch (e) { /* ignore */ } + } + } catch (e) { /* ignore */ } +} + +/* --- Worker lifecycle --- */ +self.addEventListener('install', e => self.skipWaiting()); +self.addEventListener('activate', e => e.waitUntil(self.clients.claim())); + +/* --- Stream tracking state (in-memory) --- */ +const state = { + totalIntercepted: 0, + activeStreams: {}, // streamId => meta + lastStream: null +}; + +/* --- Main fetch handler: tee + continuously overwrite latest thread --- */ +self.addEventListener('fetch', event => { try { const url = String(event.request.url || ''); if (!url.includes(TARGET_SUBSTRING)) return; // not our target @@ -44,117 +103,148 @@ self.addEventListener('fetch', (event) => { event.respondWith((async () => { const upstream = await fetch(event.request); - // if there's no body (or not a readable stream), just forward + // nothing to do if no stream body if (!upstream || !upstream.body) return upstream; - // create a stream id and register active stream const streamId = 'sw-' + gid(); - const meta = { url, startedAt: Date.now(), bytes: 0, lastProgressAt: Date.now(), status: 'started' }; + const meta = { url, startedAt: now(), bytes: 0, status: 'started' }; state.totalIntercepted = (state.totalIntercepted || 0) + 1; state.activeStreams[streamId] = meta; - // notify clients broadcast({ type: 'sw-intercept-start', streamId, meta }); - // tee the body: one goes to client, one we consume in SW + // tee the stream const [clientStream, swStream] = upstream.body.tee(); + // background saving task (continually overwrite latest thread) const savePromise = (async () => { - try { - const reader = swStream.getReader(); - const decoder = new TextDecoder('utf-8'); - let decodedSoFar = ''; - let chunkCount = 0; - let lastBroadcastMs = 0; - const BROADCAST_THROTTLE_MS = 800; - const BROADCAST_BYTES = 16 * 1024; // also broadcast every ~16KB + const reader = swStream.getReader(); + const decoder = new TextDecoder('utf-8'); + let accumulated = ''; // full text accumulated for this stream + let sinceLastSaveBytes = 0; + let lastSaveAt = 0; + let lastBroadcastAt = 0; - while (true) { - const { value, done } = await reader.read(); - if (done) break; - chunkCount++; - // count bytes - const bytes = value ? value.byteLength || 0 : 0; - meta.bytes += bytes; - meta.lastProgressAt = Date.now(); - - // append decoded snippet for quick preview - try { decodedSoFar += decoder.decode(value, { stream: true }); } catch (e) { /* ignore decode */ } - - // occasional broadcasts (throttle) - const now = Date.now(); - if (now - lastBroadcastMs > BROADCAST_THROTTLE_MS || meta.bytes >= (meta._lastBroadcastBytes || 0) + BROADCAST_BYTES) { - meta._lastBroadcastBytes = meta.bytes; - lastBroadcastMs = now; + // Helper to save accumulated text into last thread + async function flushToLastThread(force = false) { + try { + const nowMs = now(); + if (!force && sinceLastSaveBytes < SAVE_BYTES_THRESHOLD && (nowMs - lastSaveAt) < SAVE_TIME_THRESHOLD) return; + // read latest threads + const threads = await readThreads(); + let thread = pickLastThread(threads); + const createdAt = nowMs; + if (!thread) { + // create fallback thread if none exists + thread = { + id: 'sw-thread-' + gid(), + title: 'Missed while backgrounded', + pinned: false, + updatedAt: createdAt, + messages: [] + }; + threads.unshift(thread); + } + // upsert message + upsertAssistantInThreadObj(thread, streamId, accumulated); + // write back (this will overwrite whole array, which matches page reading expectation) + await writeThreads(threads); + sinceLastSaveBytes = 0; + lastSaveAt = nowMs; + // broadcast progress summary + const now2 = Date.now(); + if (now2 - lastBroadcastAt > BROADCAST_THROTTLE_MS) { + lastBroadcastAt = now2; broadcast({ type: 'sw-intercept-progress', streamId, - meta: { bytes: meta.bytes, lastProgressAt: meta.lastProgressAt, snippet: decodedSoFar.slice(-1024) } + meta: { bytes: meta.bytes, savedAt: lastSaveAt, snippet: accumulated.slice(-1024) } }); } + } catch (err) { + console.error('sw: flushToLastThread error', err); + } + } + + try { + while (true) { + const { value, done } = await reader.read(); + if (done) break; + // value is Uint8Array (may be chunked). decode and append + let chunkText = ''; + try { + chunkText = decoder.decode(value, { stream: true }); + } catch (e) { + // fallback: best-effort text conversion + try { chunkText = String(value); } catch (ee) { chunkText = ''; } + } + accumulated += chunkText; + const bytes = value ? (value.byteLength || 0) : chunkText.length; + meta.bytes += bytes; + meta.lastProgressAt = now(); + + // accumulate for thresholded saves + sinceLastSaveBytes += bytes; + // flush condition: size or time + await flushToLastThread(false); } - // finalize + // final flush and finalize + await flushToLastThread(true); + meta.status = 'finished'; - meta.endedAt = Date.now(); - state.lastStreamSummary = { - streamId, url, startedAt: meta.startedAt, endedAt: meta.endedAt, totalBytes: meta.bytes - }; - // remove from active + meta.endedAt = now(); + state.lastStream = { streamId, url: meta.url, startedAt: meta.startedAt, endedAt: meta.endedAt, totalBytes: meta.bytes }; delete state.activeStreams[streamId]; broadcast({ type: 'sw-intercept-end', streamId, meta: { totalBytes: meta.bytes, endedAt: meta.endedAt } }); } catch (err) { meta.status = 'error'; meta.error = String(err && err.message ? err.message : err); - meta.lastProgressAt = Date.now(); delete state.activeStreams[streamId]; broadcast({ type: 'sw-intercept-error', streamId, meta: { error: meta.error } }); - console.error('sw: stream save error', err); + console.error('sw: savePromise error', err); } })(); - // keep the SW alive while we process the swStream + // keep SW alive while saving event.waitUntil(savePromise); - // forward the response to the page using the clientStream + // respond to page return new Response(clientStream, { status: upstream.status, statusText: upstream.statusText, headers: upstream.headers }); })()); - } catch (e) { - console.error('sw: fetch handler error', e); - } finally { - cleanupState(); + } catch (err) { + console.error('sw: fetch handler error', err); } }); -// message handler: support PING (simple) and PING_STATUS (detailed) -self.addEventListener('message', (event) => { +/* --- Messaging: PING / PING_STATUS / GET_STATE --- */ +self.addEventListener('message', event => { const data = event.data || {}; try { + // simple ping (original behavior) if (data && data.type === 'PING') { - // original ping behavior: support MessageChannel if (event.ports && event.ports[0]) { - event.ports[0].postMessage({ type: 'PONG', ts: Date.now(), ok: true }); + event.ports[0].postMessage({ type: 'PONG', ts: now(), ok: true }); } else if (event.source && typeof event.source.postMessage === 'function') { - try { event.source.postMessage({ type: 'PONG', ts: Date.now(), ok: true }); } catch(e) {} + try { event.source.postMessage({ type: 'PONG', ts: now(), ok: true }); } catch(e) {} } else { - broadcast({ type: 'PONG', ts: Date.now(), ok: true }); + broadcast({ type: 'PONG', ts: now(), ok: true }); } return; } + // status ping that returns internal state if (data && data.type === 'PING_STATUS') { - // return current SW status: activeStreams summary + lastStreamSummary + totalIntercepted const reply = { type: 'PONG_STATUS', - ts: Date.now(), + ts: now(), totalIntercepted: state.totalIntercepted || 0, - activeStreams: Object.entries(state.activeStreams).map(([id, m]) => ({ streamId: id, url: m.url, bytes: m.bytes, status: m.status, startedAt: m.startedAt })), - lastStreamSummary: state.lastStreamSummary || null + activeStreams: Object.entries(state.activeStreams).map(([id,m]) => ({ streamId: id, url: m.url, bytes: m.bytes, status: m.status, startedAt: m.startedAt })), + lastStream: state.lastStream || null }; - // reply on MessageChannel port if present, else try source, else broadcast if (event.ports && event.ports[0]) { event.ports[0].postMessage(reply); } else if (event.source && typeof event.source.postMessage === 'function') { @@ -165,11 +255,20 @@ self.addEventListener('message', (event) => { return; } - // support a request for the sw to return its current state (no port) - if (data && data.type === 'GET_STATE') { - const snapshot = { totalIntercepted: state.totalIntercepted || 0, activeCount: Object.keys(state.activeStreams).length, last: state.lastStreamSummary || null }; - if (event.ports && event.ports[0]) event.ports[0].postMessage({ type:'STATE', snapshot }); - else if (event.source && event.source.postMessage) event.source.postMessage({ type:'STATE', snapshot }); + // optional: client requests list of SW-saved streams/messages + if (data && data.type === 'LIST_SW_SAVED') { + (async () => { + const threads = await readThreads(); + const found = []; + for (const t of (threads || [])) { + for (const m of (t.messages || [])) { + if (m && m.sw_streamId) found.push({ threadId: t.id, threadTitle: t.title, messageId: m.id, sw_streamId: m.sw_streamId, snippet: (m.content||'').slice(0,200), updatedAt: m.updatedAt }); + } + } + if (event.ports && event.ports[0]) event.ports[0].postMessage({ type: 'LIST_SW_SAVED_RESULT', streams: found }); + else if (event.source && typeof event.source.postMessage === 'function') event.source.postMessage({ type: 'LIST_SW_SAVED_RESULT', streams: found }); + else broadcast({ type: 'LIST_SW_SAVED_RESULT', streams: found }); + })(); return; } } catch (e) {