diff --git a/public/sw.js b/public/sw.js index f92f830..28f700a 100644 --- a/public/sw.js +++ b/public/sw.js @@ -1,84 +1,50 @@ -// sw.js -// Drop-in service worker to tee streaming chat responses and write progress to localForage (threads_v1). -// - Adjust TARGET_SUBSTRING if your stream URL differs. -// - The SW writes messages tagged with `sw_streamId` to avoid clobbering unrelated messages. - +// sw.js — debug-friendly service worker +// Replace your existing worker with this (or merge the relevant parts). importScripts('https://cdn.jsdelivr.net/npm/localforage@1.10.0/dist/localforage.min.js'); -const TARGET_SUBSTRING = 'openrouter.ai/api/v1/chat/completions'; // change if needed -const THREADS_KEY = 'threads_v1'; // matches your index -const BUFFER_SAVE_BYTES = 32 * 1024; // save every ~32KB of new data (tune) -const SAVE_INTERVAL_MS = 2000; // or at least every 2s even if buffer < size +const THREADS_KEY = 'threads_v1'; +const TARGET_SUBSTRING = 'openrouter.ai/api/v1/chat/completions'; // keep your real target here +const LOG = (...a) => { console.log('[sw-debug]', ...a) }; -// utils -const gid = () => Math.random().toString(36).slice(2, 9) + '-' + Date.now().toString(36); - -function now() { return Date.now(); } +const gid = () => Math.random().toString(36).slice(2,9) + '-' + Date.now().toString(36); +// ---------- small idb helpers using localforage ---------- async function readThreads() { - try { - const v = await localforage.getItem(THREADS_KEY); - return Array.isArray(v) ? v : []; - } catch (e) { - console.error('sw: idb read error', e); - return []; - } + try { const v = await localforage.getItem(THREADS_KEY); return Array.isArray(v) ? v : []; } + catch (e) { LOG('readThreads err', e); return []; } } async function writeThreads(arr) { - try { - await localforage.setItem(THREADS_KEY, arr); - } catch (e) { - console.error('sw: idb write error', e); - throw e; - } + try { await localforage.setItem(THREADS_KEY, arr); } + catch (e) { LOG('writeThreads err', e); throw e; } } - -// choose a thread to attach progress to function pickThread(threads) { - if (!threads || threads.length === 0) return null; + if (!threads || !threads.length) return null; // prefer newest updatedAt threads.sort((a,b) => (b.updatedAt||0) - (a.updatedAt||0)); return threads[0]; } - -// update or append assistant message for streamId async function upsertStreamMessage(streamId, text, meta = {}) { - // read-modify-write const threads = await readThreads(); let th = pickThread(threads); - const createdNow = now(); - + const now = Date.now(); if (!th) { - // create a new thread if none found - th = { - id: 'sw-' + gid(), - title: 'Missed while backgrounded', - pinned: false, - updatedAt: createdNow, - messages: [] - }; + th = { id: 'sw-' + gid(), title: 'Missed while backgrounded', pinned: false, updatedAt: now, messages: [] }; threads.unshift(th); } - - // look for existing message with sw_streamId let msgIndex = -1; for (let i = th.messages.length - 1; i >= 0; i--) { - const m = th.messages[i]; - if (m && m.sw_streamId === streamId) { msgIndex = i; break; } + if (th.messages[i] && th.messages[i].sw_streamId === streamId) { msgIndex = i; break; } } - const contentParts = [{ type: 'text', text }]; if (msgIndex >= 0) { - // update message content - const existing = th.messages[msgIndex]; - existing.content = text; - existing.contentParts = contentParts; - existing.updatedAt = createdNow; - existing._sw_lastSave = createdNow; - existing._sw_meta = Object.assign({}, existing._sw_meta || {}, meta); + const ex = th.messages[msgIndex]; + ex.content = text; + ex.contentParts = contentParts; + ex.updatedAt = now; + ex._sw_lastSave = now; + ex._sw_meta = Object.assign({}, ex._sw_meta || {}, meta); } else { - // append new assistant message (tag with sw_streamId) - const msg = { + th.messages.push({ id: 'swmsg-' + gid(), role: 'assistant', content: text, @@ -86,190 +52,194 @@ async function upsertStreamMessage(streamId, text, meta = {}) { kind: 'assistant', sw_saved: true, sw_streamId: streamId, - createdAt: createdNow, - updatedAt: createdNow, - _sw_meta: Object.assign({}, meta) - }; - th.messages.push(msg); + createdAt: now, + updatedAt: now, + _sw_meta: meta + }); } - th.updatedAt = createdNow; - - // write back + th.updatedAt = now; await writeThreads(threads); return { threadId: th.id }; } - async function finalizeStream(streamId, meta = {}) { - // mark the message as complete; put complete flag in _sw_meta const threads = await readThreads(); const th = pickThread(threads); if (!th) return; for (let i = th.messages.length - 1; i >= 0; i--) { const m = th.messages[i]; if (m && m.sw_streamId === streamId) { - m._sw_meta = Object.assign({}, m._sw_meta || {}, meta, { completeAt: now() }); - m.updatedAt = now(); - th.updatedAt = now(); + m._sw_meta = Object.assign({}, m._sw_meta || {}, meta, { completeAt: Date.now() }); + m.updatedAt = Date.now(); + th.updatedAt = Date.now(); break; } } await writeThreads(threads); - // inform clients + // notify clients const clientsList = await self.clients.matchAll({ includeUncontrolled: true, type: 'window' }); - clientsList.forEach(c => { - try { c.postMessage({ type: 'stream-saved', streamId, meta }); } catch(e){ } - }); + for (const c of clientsList) { + try { c.postMessage({ type: 'stream-saved', streamId, meta }); } catch(e) {} + } } -// notify utility +// ---------- helpers ---------- +async function listSwStreams() { + const threads = await readThreads(); + const found = []; + for (const t of (threads || [])) { + for (const m of (t.messages || [])) { + if (m && m.sw_streamId) found.push({ + threadId: t.id, + threadTitle: t.title, + messageId: m.id, + sw_streamId: m.sw_streamId, + snippet: (m.content||'').slice(0,200), + updatedAt: m.updatedAt + }); + } + } + return found; +} async function notifyClients(msg) { try { const list = await self.clients.matchAll({ includeUncontrolled: true, type: 'window' }); for (const c of list) { - try { c.postMessage(msg); } catch (e) {} + try { c.postMessage(msg); } catch(e) {} } - } catch (e) { - /* ignore */ - } + } catch(e) { LOG('notifyClients err', e); } } -// fetch handler: tee and save +// ---------- install/activate ---------- +self.addEventListener('install', e => { self.skipWaiting(); }); +self.addEventListener('activate', e => { e.waitUntil(self.clients.claim()); }); + +// ---------- fetch: keep existing behavior for your real target, plus a debug probe ---------- self.addEventListener('fetch', event => { - try { - const url = event.request.url || ''; - if (!url.includes(TARGET_SUBSTRING)) { - return; // not our target; let it pass through - } - + const reqUrl = event.request.url || ''; + // debug probe route — the sune will call /__sw_tee_test to force a tee test: + if (new URL(reqUrl).pathname === '/__sw_tee_test') { event.respondWith((async () => { - // perform the upstream fetch - const upstream = await fetch(event.request); + const probeId = new URL(reqUrl).searchParams.get('probeId') || gid(); + LOG('Received tee-test probe', probeId); - // if no streaming body, return upstream - if (!upstream || !upstream.body) return upstream; + // create a simulated streaming source inside the SW + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(ctrl) { + let count = 0; + const id = setInterval(() => { + count++; + const chunk = `probe(${probeId}) chunk ${count}\n`; + ctrl.enqueue(encoder.encode(chunk)); + // after 6 chunks stop + if (count >= 6) { clearInterval(id); ctrl.close(); } + }, 300); + } + }); - const streamId = 'swstream-' + gid(); - const headers = new Headers(upstream.headers); + // tee the generated stream: one branch to client, one branch for SW consumption/save + const [clientBranch, swBranch] = stream.tee(); - // tee so one stream goes to client, one consumed by SW - const [clientStream, swStream] = upstream.body.tee(); - - // background save logic: - const savePromise = (async () => { + // in SW consume swBranch and write to localforage + (async () => { try { - const reader = swStream.getReader(); + const reader = swBranch.getReader(); const dec = new TextDecoder('utf-8'); - let bufferText = ''; - let bufferedBytes = 0; - let lastSaveAt = 0; - - const saveIfNeeded = async (force = false) => { - const nowMs = Date.now(); - if (!force && bufferedBytes < BUFFER_SAVE_BYTES && (nowMs - lastSaveAt) < SAVE_INTERVAL_MS) return; - // upsert into threads - try { - await upsertStreamMessage(streamId, bufferText, { partialBytes: bufferedBytes, savedAt: Date.now() }); - lastSaveAt = nowMs; - } catch (e) { - console.error('sw: upsert save error', e); - } - }; - + let collected = ''; + let bytes = 0; while (true) { const { value, done } = await reader.read(); if (done) break; - // value is Uint8Array; decode incrementally const chunkText = dec.decode(value, { stream: true }); - bufferText += chunkText; - bufferedBytes += (value && value.byteLength) ? value.byteLength : chunkText.length; - // try saving periodically - await saveIfNeeded(false); + collected += chunkText; + bytes += (value && value.byteLength) ? value.byteLength : chunkText.length; + // periodic save: write the collected so far into threads_v1 under probe stream id + await upsertStreamMessage('probe-' + probeId, collected, { probeId, bytesSoFar: bytes }); + // notify clients incrementally + await notifyClients({ type: 'tee-probe-chunk', probeId, bytes, snippet: chunkText.slice(0,200) }); } - - // final save + finalize - await saveIfNeeded(true); - await finalizeStream(streamId, { totalBytes: bufferedBytes }); + await finalizeStream('probe-' + probeId, { totalBytes: bytes, probeId }); + LOG('tee-probe: save complete', probeId, 'bytes', bytes); + await notifyClients({ type: 'tee-probe-complete', probeId, totalBytes: bytes }); } catch (err) { - console.error('sw: error saving stream', err); - // try to mark failure - try { await finalizeStream(streamId, { error: String(err) }); } catch(e){ } + LOG('tee-probe save error', err); + await notifyClients({ type: 'tee-probe-error', probeId, error: String(err) }); } })(); - // keep worker alive while saving - event.waitUntil(savePromise); - - // return response to client using the clientStream - return new Response(clientStream, { - status: upstream.status, - statusText: upstream.statusText, - headers - }); + // return the client branch as a streaming response + return new Response(clientBranch, { status: 200, headers: { 'Content-Type': 'text/plain; charset=utf-8' } }); })()); - } catch (err) { - // if anything goes wrong, let the request fallback - console.error('sw: fetch handler error', err); + return; } + + // your production streaming intercept — minimal example: + if (reqUrl.includes(TARGET_SUBSTRING)) { + // let your existing logic run — simple pass-through or tee logic you already had + // For demonstration, do a straight throughfetch (or optionally tee/save like earlier example) + event.respondWith(fetch(event.request)); + return; + } + + // not handled by SW: let it go }); -// ping/pong: support both port reply and broadcast reply +// ---------- message handling: richer PING/DIAG ---------- self.addEventListener('message', event => { const data = event.data || {}; try { if (data && data.type === 'PING') { - // prefer replying on message port if provided - if (event.ports && event.ports[0]) { - try { - event.ports[0].postMessage({ type: 'PONG', ts: Date.now(), ok: true }); - } catch (e) { /* ignore */ } - } else { - // fallback: send message back to the source (if possible) or broadcast to clients - if (event.source && typeof event.source.postMessage === 'function') { - try { event.source.postMessage({ type: 'PONG', ts: Date.now(), ok: true }); } catch(e) {} - } else { - notifyClients({ type: 'PONG', ts: Date.now(), ok: true }); - } - } - return; - } - - // support a client request to list SW-saved streams or threads - if (data && data.type === 'list-sw-streams') { + // reply with diagnostic info (prefer port) (async () => { - const threads = await readThreads(); - // collect messages that have sw_streamId - const found = []; - for (const t of (threads||[])) { - for (const m of (t.messages||[])) { - if (m && m.sw_streamId) found.push({ threadId: t.id, threadTitle: t.title, messageId: m.id, sw_streamId: m.sw_streamId, summary: (m.content||'').slice(0,200), updatedAt: m.updatedAt }); - } - } - // reply to the source if possible, otherwise broadcast + const streams = await listSwStreams(); + const info = { + type: 'PONG', + ts: Date.now(), + ok: true, + canTeeProbe: true, + savedStreamCount: streams.length, + lastSaved: streams[0] || null + }; if (event.ports && event.ports[0]) { - event.ports[0].postMessage({ type: 'sw-streams-list', streams: found }); + event.ports[0].postMessage(info); } else if (event.source && typeof event.source.postMessage === 'function') { - event.source.postMessage({ type: 'sw-streams-list', streams: found }); + event.source.postMessage(info); } else { - notifyClients({ type: 'sw-streams-list', streams: found }); + await notifyClients(info); } })(); return; } - // other messages — ignore or log - } catch (e) { - console.error('sw: message handler error', e); + if (data && data.type === 'list-sw-streams') { + (async () => { + const streams = await listSwStreams(); + const payload = { type: 'sw-streams-list', streams }; + if (event.ports && event.ports[0]) event.ports[0].postMessage(payload); + else if (event.source && typeof event.source.postMessage === 'function') event.source.postMessage(payload); + else await notifyClients(payload); + })(); + return; + } + + // allow direct command to run a tee probe from client (alternative path) + if (data && data.type === 'run-tee-probe' && data.probeId) { + // reply that probe will be run — the fetch handler will simulate the stream + (async () => { + try { + const probeUrl = '/__sw_tee_test?probeId=' + encodeURIComponent(data.probeId); + // perform fetch from SW to itself to cause internal handling (not required — client can call the endpoint) + // but we simply reply OK and let the client hit the endpoint so the client receives the response body + if (event.ports && event.ports[0]) event.ports[0].postMessage({ type:'run-tee-probe-ok', probeId: data.probeId, probeUrl }); + else if (event.source && typeof event.source.postMessage === 'function') event.source.postMessage({ type:'run-tee-probe-ok', probeId: data.probeId, probeUrl }); + } catch (e) { + if (event.ports && event.ports[0]) event.ports[0].postMessage({ type:'run-tee-probe-error', error: String(e) }); + } + })(); + return; + } + + } catch (err) { + LOG('message handler error', err); } }); - -// basic install/activate to claim clients quickly -self.addEventListener('install', event => { - self.skipWaiting(); -}); -self.addEventListener('activate', event => { - event.waitUntil((async () => { - try { - await self.clients.claim(); - } catch(e) { /* ignore */ } - })()); -});