fixing disconnection issue
This commit is contained in:
@@ -101,20 +101,7 @@ export default async function recommendationsRoute(fastify: FastifyInstance) {
|
|||||||
|
|
||||||
if (!rec) return reply.code(404).send({ error: 'Not found' });
|
if (!rec) return reply.code(404).send({ error: 'Not found' });
|
||||||
|
|
||||||
// Load all feedback to potentially inject as context
|
// Set SSE headers and hijack before any branching
|
||||||
const feedbackRows = await db.select().from(feedback);
|
|
||||||
const mediaLabel = rec.media_type === 'movie' ? 'Movie' : 'Show';
|
|
||||||
const feedbackContext =
|
|
||||||
feedbackRows.length > 0
|
|
||||||
? feedbackRows
|
|
||||||
.map(
|
|
||||||
(f) =>
|
|
||||||
`${mediaLabel}: "${f.item_name}" — Rating: ${f.stars}/3 stars${f.feedback ? ` — Comment: ${f.feedback}` : ''}`,
|
|
||||||
)
|
|
||||||
.join('\n')
|
|
||||||
: undefined;
|
|
||||||
|
|
||||||
// Set SSE headers and hijack
|
|
||||||
reply.raw.setHeader('Content-Type', 'text/event-stream');
|
reply.raw.setHeader('Content-Type', 'text/event-stream');
|
||||||
reply.raw.setHeader('Cache-Control', 'no-cache');
|
reply.raw.setHeader('Cache-Control', 'no-cache');
|
||||||
reply.raw.setHeader('Connection', 'keep-alive');
|
reply.raw.setHeader('Connection', 'keep-alive');
|
||||||
@@ -122,11 +109,72 @@ export default async function recommendationsRoute(fastify: FastifyInstance) {
|
|||||||
reply.raw.flushHeaders();
|
reply.raw.flushHeaders();
|
||||||
reply.hijack();
|
reply.hijack();
|
||||||
|
|
||||||
|
// Resilient write — swallows errors so a disconnected client never crashes
|
||||||
|
// an in-flight pipeline that is still running server-side.
|
||||||
const sseWrite = (event: SSEEvent) => {
|
const sseWrite = (event: SSEEvent) => {
|
||||||
reply.raw.write(`data: ${JSON.stringify(event)}\n\n`);
|
try {
|
||||||
|
reply.raw.write(`data: ${JSON.stringify(event)}\n\n`);
|
||||||
|
} catch {
|
||||||
|
// Client disconnected — pipeline continues, writes are silently dropped
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
// Already finished — send a synthetic completion event and close immediately.
|
||||||
|
if (rec.status === 'done') {
|
||||||
|
sseWrite({ stage: 'complete', status: 'done', data: { title: rec.title } });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Already errored — send a synthetic error event and close immediately.
|
||||||
|
if (rec.status === 'error') {
|
||||||
|
sseWrite({ stage: 'curator', status: 'error', data: { message: 'Pipeline failed' } });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Already running — the pipeline is executing on a previous connection.
|
||||||
|
// Poll the DB until it reaches a terminal state, then report the result.
|
||||||
|
// This prevents starting a duplicate pipeline run on page reload.
|
||||||
|
if (rec.status === 'running') {
|
||||||
|
const POLL_INTERVAL_MS = 2000;
|
||||||
|
const TIMEOUT_MS = 20 * 60 * 1000; // 20 minutes hard ceiling
|
||||||
|
const start = Date.now();
|
||||||
|
while (Date.now() - start < TIMEOUT_MS) {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS));
|
||||||
|
const [current] = await db
|
||||||
|
.select({ status: recommendations.status, title: recommendations.title })
|
||||||
|
.from(recommendations)
|
||||||
|
.where(eq(recommendations.id, id));
|
||||||
|
if (!current || current.status === 'done') {
|
||||||
|
sseWrite({ stage: 'complete', status: 'done', data: { title: current?.title ?? rec.title } });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (current.status === 'error') {
|
||||||
|
sseWrite({ stage: 'curator', status: 'error', data: { message: 'Pipeline failed' } });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Still running — keep polling
|
||||||
|
}
|
||||||
|
// Timed out waiting — report as error
|
||||||
|
sseWrite({ stage: 'curator', status: 'error', data: { message: 'Pipeline timed out' } });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// status === 'pending' — start the pipeline normally.
|
||||||
|
|
||||||
|
// Load all feedback to potentially inject as context
|
||||||
|
const feedbackRows = await db.select().from(feedback);
|
||||||
|
const mediaLabel = rec.media_type === 'movie' ? 'Movie' : 'Show';
|
||||||
|
const feedbackContext =
|
||||||
|
feedbackRows.length > 0
|
||||||
|
? feedbackRows
|
||||||
|
.map(
|
||||||
|
(f) =>
|
||||||
|
`${mediaLabel}: "${f.item_name}" — Rating: ${f.stars}/3 stars${f.feedback ? ` — Comment: ${f.feedback}` : ''}`,
|
||||||
|
)
|
||||||
|
.join('\n')
|
||||||
|
: undefined;
|
||||||
|
|
||||||
await runPipeline(rec, sseWrite, feedbackContext);
|
await runPipeline(rec, sseWrite, feedbackContext);
|
||||||
} finally {
|
} finally {
|
||||||
reply.raw.end();
|
reply.raw.end();
|
||||||
|
|||||||
Reference in New Issue
Block a user