diff --git a/docs/sw.js b/docs/sw.js index eed712c..1efa785 100644 --- a/docs/sw.js +++ b/docs/sw.js @@ -1,123 +1,31 @@ -importScripts("https://cdn.jsdelivr.net/npm/localforage@1.10.0/dist/localforage.min.js"); -const THREADS_KEY = "threads_v1"; const TARGET_SUBSTRING = "openrouter.ai/api/v1/chat/completions"; -const LOG = (...a) => { - console.log("[sw-debug]", ...a); +const STATE_TTL_MS = 24 * 60 * 60 * 1e3; +const state = { + totalIntercepted: 0, + activeStreams: {}, + // streamId -> { url, startedAt, bytes, lastProgressAt, status } + lastStreamSummary: null + // summary of last finished stream }; const gid = () => Math.random().toString(36).slice(2, 9) + "-" + Date.now().toString(36); -async function readThreads() { +async function broadcast(msg) { try { - const v = await localforage.getItem(THREADS_KEY); - return Array.isArray(v) ? v : []; - } catch (e) { - LOG("readThreads err", e); - return []; - } -} -async function writeThreads(arr) { - try { - await localforage.setItem(THREADS_KEY, arr); - } catch (e) { - LOG("writeThreads err", e); - throw e; - } -} -function pickThread(threads) { - if (!threads || !threads.length) return null; - threads.sort((a, b) => (b.updatedAt || 0) - (a.updatedAt || 0)); - return threads[0]; -} -async function upsertStreamMessage(streamId, text, meta = {}) { - const threads = await readThreads(); - let th = pickThread(threads); - const now = Date.now(); - if (!th) { - th = { id: "sw-" + gid(), title: "Missed while backgrounded", pinned: false, updatedAt: now, messages: [] }; - threads.unshift(th); - } - let msgIndex = -1; - for (let i = th.messages.length - 1; i >= 0; i--) { - if (th.messages[i] && th.messages[i].sw_streamId === streamId) { - msgIndex = i; - break; - } - } - const contentParts = [{ type: "text", text }]; - if (msgIndex >= 0) { - const ex = th.messages[msgIndex]; - ex.content = text; - ex.contentParts = contentParts; - ex.updatedAt = now; - ex._sw_lastSave = now; - ex._sw_meta = Object.assign({}, ex._sw_meta || {}, meta); - } else { - th.messages.push({ - id: "swmsg-" + gid(), - role: "assistant", - content: text, - contentParts, - kind: "assistant", - sw_saved: true, - sw_streamId: streamId, - createdAt: now, - updatedAt: now, - _sw_meta: meta - }); - } - th.updatedAt = now; - await writeThreads(threads); - return { threadId: th.id }; -} -async function finalizeStream(streamId, meta = {}) { - const threads = await readThreads(); - const th = pickThread(threads); - if (!th) return; - for (let i = th.messages.length - 1; i >= 0; i--) { - const m = th.messages[i]; - if (m && m.sw_streamId === streamId) { - m._sw_meta = Object.assign({}, m._sw_meta || {}, meta, { completeAt: Date.now() }); - m.updatedAt = Date.now(); - th.updatedAt = Date.now(); - break; - } - } - await writeThreads(threads); - const clientsList = await self.clients.matchAll({ includeUncontrolled: true, type: "window" }); - for (const c of clientsList) { - try { - c.postMessage({ type: "stream-saved", streamId, meta }); - } catch (e) { - } - } -} -async function listSwStreams() { - const threads = await readThreads(); - const found = []; - for (const t of threads || []) { - for (const m of t.messages || []) { - if (m && m.sw_streamId) found.push({ - threadId: t.id, - threadTitle: t.title, - messageId: m.id, - sw_streamId: m.sw_streamId, - snippet: (m.content || "").slice(0, 200), - updatedAt: m.updatedAt - }); - } - } - return found; -} -async function notifyClients(msg) { - try { - const list = await self.clients.matchAll({ includeUncontrolled: true, type: "window" }); - for (const c of list) { + const clientsList = await self.clients.matchAll({ includeUncontrolled: true, type: "window" }); + for (const c of clientsList) { try { c.postMessage(msg); } catch (e) { } } } catch (e) { - LOG("notifyClients err", e); + } +} +function cleanupState() { + const now = Date.now(); + for (const k of Object.keys(state.activeStreams)) { + if (now - (state.activeStreams[k].lastProgressAt || state.activeStreams[k].startedAt) > STATE_TTL_MS) { + delete state.activeStreams[k]; + } } } self.addEventListener("install", (e) => { @@ -127,107 +35,125 @@ self.addEventListener("activate", (e) => { e.waitUntil(self.clients.claim()); }); self.addEventListener("fetch", (event) => { - const reqUrl = event.request.url || ""; - if (new URL(reqUrl).pathname === "/__sw_tee_test") { + try { + const url = String(event.request.url || ""); + if (!url.includes(TARGET_SUBSTRING)) return; event.respondWith((async () => { - const probeId = new URL(reqUrl).searchParams.get("probeId") || gid(); - LOG("Received tee-test probe", probeId); - const encoder = new TextEncoder(); - const stream = new ReadableStream({ - start(ctrl) { - let count = 0; - const id = setInterval(() => { - count++; - const chunk = `probe(${probeId}) chunk ${count} -`; - ctrl.enqueue(encoder.encode(chunk)); - if (count >= 6) { - clearInterval(id); - ctrl.close(); - } - }, 300); - } - }); - const [clientBranch, swBranch] = stream.tee(); - (async () => { + const upstream = await fetch(event.request); + if (!upstream || !upstream.body) return upstream; + const streamId = "sw-" + gid(); + const meta = { url, startedAt: Date.now(), bytes: 0, lastProgressAt: Date.now(), status: "started" }; + state.totalIntercepted = (state.totalIntercepted || 0) + 1; + state.activeStreams[streamId] = meta; + broadcast({ type: "sw-intercept-start", streamId, meta }); + const [clientStream, swStream] = upstream.body.tee(); + const savePromise = (async () => { try { - const reader = swBranch.getReader(); - const dec = new TextDecoder("utf-8"); - let collected = ""; - let bytes = 0; + const reader = swStream.getReader(); + const decoder = new TextDecoder("utf-8"); + let decodedSoFar = ""; + let chunkCount = 0; + let lastBroadcastMs = 0; + const BROADCAST_THROTTLE_MS = 800; + const BROADCAST_BYTES = 16 * 1024; while (true) { const { value, done } = await reader.read(); if (done) break; - const chunkText = dec.decode(value, { stream: true }); - collected += chunkText; - bytes += value && value.byteLength ? value.byteLength : chunkText.length; - await upsertStreamMessage("probe-" + probeId, collected, { probeId, bytesSoFar: bytes }); - await notifyClients({ type: "tee-probe-chunk", probeId, bytes, snippet: chunkText.slice(0, 200) }); + chunkCount++; + const bytes = value ? value.byteLength || 0 : 0; + meta.bytes += bytes; + meta.lastProgressAt = Date.now(); + try { + decodedSoFar += decoder.decode(value, { stream: true }); + } catch (e) { + } + const now = Date.now(); + if (now - lastBroadcastMs > BROADCAST_THROTTLE_MS || meta.bytes >= (meta._lastBroadcastBytes || 0) + BROADCAST_BYTES) { + meta._lastBroadcastBytes = meta.bytes; + lastBroadcastMs = now; + broadcast({ + type: "sw-intercept-progress", + streamId, + meta: { bytes: meta.bytes, lastProgressAt: meta.lastProgressAt, snippet: decodedSoFar.slice(-1024) } + }); + } } - await finalizeStream("probe-" + probeId, { totalBytes: bytes, probeId }); - LOG("tee-probe: save complete", probeId, "bytes", bytes); - await notifyClients({ type: "tee-probe-complete", probeId, totalBytes: bytes }); + meta.status = "finished"; + meta.endedAt = Date.now(); + state.lastStreamSummary = { + streamId, + url, + startedAt: meta.startedAt, + endedAt: meta.endedAt, + totalBytes: meta.bytes + }; + delete state.activeStreams[streamId]; + broadcast({ type: "sw-intercept-end", streamId, meta: { totalBytes: meta.bytes, endedAt: meta.endedAt } }); } catch (err) { - LOG("tee-probe save error", err); - await notifyClients({ type: "tee-probe-error", probeId, error: String(err) }); + meta.status = "error"; + meta.error = String(err && err.message ? err.message : err); + meta.lastProgressAt = Date.now(); + delete state.activeStreams[streamId]; + broadcast({ type: "sw-intercept-error", streamId, meta: { error: meta.error } }); + console.error("sw: stream save error", err); } })(); - return new Response(clientBranch, { status: 200, headers: { "Content-Type": "text/plain; charset=utf-8" } }); + event.waitUntil(savePromise); + return new Response(clientStream, { + status: upstream.status, + statusText: upstream.statusText, + headers: upstream.headers + }); })()); - return; - } - if (reqUrl.includes(TARGET_SUBSTRING)) { - event.respondWith(fetch(event.request)); - return; + } catch (e) { + console.error("sw: fetch handler error", e); + } finally { + cleanupState(); } }); self.addEventListener("message", (event) => { const data = event.data || {}; try { if (data && data.type === "PING") { - (async () => { - const streams = await listSwStreams(); - const info = { - type: "PONG", - ts: Date.now(), - ok: true, - canTeeProbe: true, - savedStreamCount: streams.length, - lastSaved: streams[0] || null - }; - if (event.ports && event.ports[0]) { - event.ports[0].postMessage(info); - } else if (event.source && typeof event.source.postMessage === "function") { - event.source.postMessage(info); - } else { - await notifyClients(info); - } - })(); - return; - } - if (data && data.type === "list-sw-streams") { - (async () => { - const streams = await listSwStreams(); - const payload = { type: "sw-streams-list", streams }; - if (event.ports && event.ports[0]) event.ports[0].postMessage(payload); - else if (event.source && typeof event.source.postMessage === "function") event.source.postMessage(payload); - else await notifyClients(payload); - })(); - return; - } - if (data && data.type === "run-tee-probe" && data.probeId) { - (async () => { + if (event.ports && event.ports[0]) { + event.ports[0].postMessage({ type: "PONG", ts: Date.now(), ok: true }); + } else if (event.source && typeof event.source.postMessage === "function") { try { - const probeUrl = "/__sw_tee_test?probeId=" + encodeURIComponent(data.probeId); - if (event.ports && event.ports[0]) event.ports[0].postMessage({ type: "run-tee-probe-ok", probeId: data.probeId, probeUrl }); - else if (event.source && typeof event.source.postMessage === "function") event.source.postMessage({ type: "run-tee-probe-ok", probeId: data.probeId, probeUrl }); + event.source.postMessage({ type: "PONG", ts: Date.now(), ok: true }); } catch (e) { - if (event.ports && event.ports[0]) event.ports[0].postMessage({ type: "run-tee-probe-error", error: String(e) }); } - })(); + } else { + broadcast({ type: "PONG", ts: Date.now(), ok: true }); + } return; } - } catch (err) { - LOG("message handler error", err); + if (data && data.type === "PING_STATUS") { + const reply = { + type: "PONG_STATUS", + ts: Date.now(), + totalIntercepted: state.totalIntercepted || 0, + activeStreams: Object.entries(state.activeStreams).map(([id, m]) => ({ streamId: id, url: m.url, bytes: m.bytes, status: m.status, startedAt: m.startedAt })), + lastStreamSummary: state.lastStreamSummary || null + }; + if (event.ports && event.ports[0]) { + event.ports[0].postMessage(reply); + } else if (event.source && typeof event.source.postMessage === "function") { + try { + event.source.postMessage(reply); + } catch (e) { + } + } else { + broadcast(reply); + } + return; + } + if (data && data.type === "GET_STATE") { + const snapshot = { totalIntercepted: state.totalIntercepted || 0, activeCount: Object.keys(state.activeStreams).length, last: state.lastStreamSummary || null }; + if (event.ports && event.ports[0]) event.ports[0].postMessage({ type: "STATE", snapshot }); + else if (event.source && event.source.postMessage) event.source.postMessage({ type: "STATE", snapshot }); + return; + } + } catch (e) { + console.error("sw: message handler error", e); } });