From d849b67f3d5dd57f6e1e18b82bc673b747a4f2ff Mon Sep 17 00:00:00 2001 From: Jose Henrique Date: Thu, 2 Apr 2026 20:12:11 -0300 Subject: [PATCH] fixing disconnection issue --- .../backend/src/routes/recommendations.ts | 78 +++++++++++++++---- 1 file changed, 63 insertions(+), 15 deletions(-) diff --git a/packages/backend/src/routes/recommendations.ts b/packages/backend/src/routes/recommendations.ts index debadc6..64fe4ff 100644 --- a/packages/backend/src/routes/recommendations.ts +++ b/packages/backend/src/routes/recommendations.ts @@ -101,20 +101,7 @@ export default async function recommendationsRoute(fastify: FastifyInstance) { if (!rec) return reply.code(404).send({ error: 'Not found' }); - // Load all feedback to potentially inject as context - const feedbackRows = await db.select().from(feedback); - const mediaLabel = rec.media_type === 'movie' ? 'Movie' : 'Show'; - const feedbackContext = - feedbackRows.length > 0 - ? feedbackRows - .map( - (f) => - `${mediaLabel}: "${f.item_name}" — Rating: ${f.stars}/3 stars${f.feedback ? ` — Comment: ${f.feedback}` : ''}`, - ) - .join('\n') - : undefined; - - // Set SSE headers and hijack + // Set SSE headers and hijack before any branching reply.raw.setHeader('Content-Type', 'text/event-stream'); reply.raw.setHeader('Cache-Control', 'no-cache'); reply.raw.setHeader('Connection', 'keep-alive'); @@ -122,11 +109,72 @@ export default async function recommendationsRoute(fastify: FastifyInstance) { reply.raw.flushHeaders(); reply.hijack(); + // Resilient write — swallows errors so a disconnected client never crashes + // an in-flight pipeline that is still running server-side. const sseWrite = (event: SSEEvent) => { - reply.raw.write(`data: ${JSON.stringify(event)}\n\n`); + try { + reply.raw.write(`data: ${JSON.stringify(event)}\n\n`); + } catch { + // Client disconnected — pipeline continues, writes are silently dropped + } }; try { + // Already finished — send a synthetic completion event and close immediately. + if (rec.status === 'done') { + sseWrite({ stage: 'complete', status: 'done', data: { title: rec.title } }); + return; + } + + // Already errored — send a synthetic error event and close immediately. + if (rec.status === 'error') { + sseWrite({ stage: 'curator', status: 'error', data: { message: 'Pipeline failed' } }); + return; + } + + // Already running — the pipeline is executing on a previous connection. + // Poll the DB until it reaches a terminal state, then report the result. + // This prevents starting a duplicate pipeline run on page reload. + if (rec.status === 'running') { + const POLL_INTERVAL_MS = 2000; + const TIMEOUT_MS = 20 * 60 * 1000; // 20 minutes hard ceiling + const start = Date.now(); + while (Date.now() - start < TIMEOUT_MS) { + await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS)); + const [current] = await db + .select({ status: recommendations.status, title: recommendations.title }) + .from(recommendations) + .where(eq(recommendations.id, id)); + if (!current || current.status === 'done') { + sseWrite({ stage: 'complete', status: 'done', data: { title: current?.title ?? rec.title } }); + return; + } + if (current.status === 'error') { + sseWrite({ stage: 'curator', status: 'error', data: { message: 'Pipeline failed' } }); + return; + } + // Still running — keep polling + } + // Timed out waiting — report as error + sseWrite({ stage: 'curator', status: 'error', data: { message: 'Pipeline timed out' } }); + return; + } + + // status === 'pending' — start the pipeline normally. + + // Load all feedback to potentially inject as context + const feedbackRows = await db.select().from(feedback); + const mediaLabel = rec.media_type === 'movie' ? 'Movie' : 'Show'; + const feedbackContext = + feedbackRows.length > 0 + ? feedbackRows + .map( + (f) => + `${mediaLabel}: "${f.item_name}" — Rating: ${f.stars}/3 stars${f.feedback ? ` — Comment: ${f.feedback}` : ''}`, + ) + .join('\n') + : undefined; + await runPipeline(rec, sseWrite, feedbackContext); } finally { reply.raw.end();