adding continuous
This commit is contained in:
@@ -23,13 +23,21 @@ export const miniModel = isGeneric ? (process.env.MODEL_NAME ?? 'default') : 'gp
|
||||
export const serviceOptions = isGeneric ? {} : { service_tier: 'flex' as const };
|
||||
export const supportsWebSearch = !isGeneric;
|
||||
|
||||
export async function parseWithRetry<T>(fn: () => Promise<T>, retries = 2): Promise<T> {
|
||||
function isJsonParseError(err: unknown): boolean {
|
||||
if (err instanceof SyntaxError) return true;
|
||||
if (!(err instanceof Error)) return false;
|
||||
|
||||
// Some providers wrap JSON parsing failures in plain Error objects.
|
||||
return /not valid json|unexpected token|json/i.test(err.message);
|
||||
}
|
||||
|
||||
export async function parseWithRetry<T>(fn: () => Promise<T>, retries = 3): Promise<T> {
|
||||
let lastErr: unknown;
|
||||
for (let attempt = 0; attempt <= retries; attempt++) {
|
||||
try {
|
||||
return await fn();
|
||||
} catch (err) {
|
||||
if (err instanceof SyntaxError && attempt < retries) {
|
||||
if (isJsonParseError(err) && attempt < retries) {
|
||||
lastErr = err;
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -64,7 +64,6 @@ Rules:
|
||||
const response = await parseWithRetry(() => openai.responses.parse({
|
||||
model: defaultModel,
|
||||
temperature: 0.5,
|
||||
max_completion_tokens: 16384,
|
||||
...serviceOptions,
|
||||
...(canSearch ? { tools: [{ type: 'web_search' as const }] } : {}),
|
||||
text: { format: zodTextFormat(CuratorSchema, "shows") },
|
||||
|
||||
@@ -47,7 +47,6 @@ export async function runRanking(
|
||||
const response = await parseWithRetry(() => openai.responses.parse({
|
||||
model: defaultModel,
|
||||
temperature: 0.2,
|
||||
max_completion_tokens: 16384,
|
||||
...serviceOptions,
|
||||
text: { format: zodTextFormat(RankingSchema, "ranking") },
|
||||
instructions: `You are a ${mediaLabel} ranking critic. Assign each ${mediaLabel} to exactly one of five confidence tags based on how well it matches the user's preferences.
|
||||
|
||||
@@ -25,7 +25,6 @@ export async function runRetrieval(
|
||||
const response = await parseWithRetry(() => openai.responses.parse({
|
||||
model: defaultModel,
|
||||
temperature: 0.9,
|
||||
max_completion_tokens: 16384,
|
||||
...serviceOptions,
|
||||
...(canSearch ? { tools: [{ type: 'web_search' as const }] } : {}),
|
||||
text: { format: zodTextFormat(RetrievalSchema, "candidates") },
|
||||
|
||||
116
packages/backend/src/agents/retrievalContinuous.ts
Normal file
116
packages/backend/src/agents/retrievalContinuous.ts
Normal file
@@ -0,0 +1,116 @@
|
||||
import { openai, defaultModel, serviceOptions, supportsWebSearch, parseWithRetry } from '../agent.js';
|
||||
import type { InterpreterOutput, RetrievalCandidate, MediaType } from '../types/agents.js';
|
||||
import { z } from 'zod';
|
||||
import { zodTextFormat } from 'openai/helpers/zod';
|
||||
|
||||
const RetrievalBatchSchema = z.object({
|
||||
candidates: z.array(z.object({
|
||||
title: z.string(),
|
||||
type: z.enum(['movie', 'tv']),
|
||||
year: z.string(),
|
||||
reason: z.string()
|
||||
}))
|
||||
});
|
||||
|
||||
export interface RetrievalBatchOutput {
|
||||
candidates: Array<{
|
||||
title: string;
|
||||
type: 'movie' | 'tv';
|
||||
year: string;
|
||||
reason: string;
|
||||
}>;
|
||||
}
|
||||
|
||||
const RetrievalCandidateSchema = z.object({
|
||||
title: z.string(),
|
||||
reason: z.string()
|
||||
});
|
||||
|
||||
export interface RetrievalOutput {
|
||||
candidates: RetrievalCandidate[];
|
||||
}
|
||||
|
||||
function buildSystemPrompt(
|
||||
interpreterOutput: InterpreterOutput,
|
||||
mediaType: MediaType,
|
||||
useWebSearch: boolean,
|
||||
hardRequirements: boolean,
|
||||
seenTitles: string[]
|
||||
): string {
|
||||
const mediaLabel = mediaType === 'movie' ? 'movie' : 'TV show';
|
||||
const mediaLabelPlural = mediaType === 'movie' ? 'movies' : 'TV shows';
|
||||
|
||||
let prompt = `You are a ${mediaLabel} recommendation specialist. Your task is to recommend titles that match the user's taste profile.
|
||||
|
||||
USER TASTE PROFILE:
|
||||
- Liked ${mediaLabelPlural}: ${interpreterOutput.liked.join(', ') || '(none)'}
|
||||
- Disliked ${mediaLabelPlural}: ${interpreterOutput.disliked.join(', ') || '(none)'}
|
||||
- Themes: ${interpreterOutput.themes.join(', ') || '(none)'}
|
||||
- Tone: ${interpreterOutput.tone.join(', ') || '(none)'}
|
||||
- Requirements: ${interpreterOutput.requirements.join(', ') || '(none)'}
|
||||
- Avoid: ${interpreterOutput.avoid.join(', ') || '(none)'}
|
||||
|
||||
RESPONSE FORMAT:
|
||||
Output exactly 10 recommendations in the following JSON format. No additional text.
|
||||
|
||||
[
|
||||
{ "title": "Title Name", "type": "movie"|"tv", "year": "2024", "reason": "1-sentence why it matches" },
|
||||
...
|
||||
]
|
||||
|
||||
REQUIREMENTS:
|
||||
- Output exactly 10 titles, no more, no less
|
||||
- Each title must include: title, type (movie/tv), year, and a concise reason
|
||||
- Prioritize variety across themes and decades
|
||||
- Do not include titles the user already mentioned as liked/disliked
|
||||
${seenTitles.length > 0 ? `- Do NOT repeat the following titles that have already been suggested: ${seenTitles.join(', ')}` : ''}
|
||||
${useWebSearch ? '\n- Use web search to find recent and accurate titles, including newer releases' : ''}
|
||||
${hardRequirements ? '\n- Strictly follow ALL requirements. Exclude any candidate that does not meet every stated requirement.' : ''}
|
||||
- If asked for "more 10" or "more recommendations", provide 10 new recommendations following the same format`;
|
||||
|
||||
return prompt;
|
||||
}
|
||||
|
||||
export async function runRetrievalBatch(
|
||||
interpreterOutput: InterpreterOutput,
|
||||
mediaType: MediaType,
|
||||
useWebSearch: boolean,
|
||||
hardRequirements: boolean,
|
||||
seenTitles: string[],
|
||||
previousResponseId: string | null
|
||||
): Promise<{ candidates: RetrievalCandidate[]; responseId: string }> {
|
||||
const canSearch = useWebSearch && supportsWebSearch;
|
||||
const systemPrompt = buildSystemPrompt(interpreterOutput, mediaType, useWebSearch, hardRequirements, seenTitles);
|
||||
|
||||
let input: string;
|
||||
|
||||
if (previousResponseId === null) {
|
||||
input = 'Please recommend 10 titles that match my taste profile. Output exactly 10 recommendations in the specified JSON format.';
|
||||
} else {
|
||||
input = 'Give me 10 more recommendations following the same JSON format as before.';
|
||||
}
|
||||
|
||||
const response = await parseWithRetry(() => openai.responses.parse({
|
||||
model: defaultModel,
|
||||
...serviceOptions,
|
||||
...(canSearch ? { tools: [{ type: 'web_search' as const }] } : {}),
|
||||
...(previousResponseId ? { previous_response_id: previousResponseId } : {}),
|
||||
text: { format: zodTextFormat(RetrievalBatchSchema, "candidates") },
|
||||
instructions: systemPrompt,
|
||||
input: input,
|
||||
}));
|
||||
|
||||
const parsed = response.output_parsed as RetrievalBatchOutput | undefined;
|
||||
const responseId = response.id;
|
||||
|
||||
if (!parsed || !parsed.candidates) {
|
||||
return { candidates: [], responseId };
|
||||
}
|
||||
|
||||
const candidates: RetrievalCandidate[] = parsed.candidates.map((c) => ({
|
||||
title: c.title,
|
||||
reason: c.reason,
|
||||
}));
|
||||
|
||||
return { candidates, responseId };
|
||||
}
|
||||
@@ -30,7 +30,6 @@ async function runValidatorChunk(
|
||||
const response = await parseWithRetry(() => openai.responses.parse({
|
||||
model: defaultModel,
|
||||
temperature: 0.1,
|
||||
max_completion_tokens: 16384,
|
||||
...serviceOptions,
|
||||
...(supportsWebSearch ? { tools: [{ type: 'web_search' as const }] } : {}),
|
||||
text: { format: zodTextFormat(ValidatorSchema, 'validation') },
|
||||
|
||||
93
packages/backend/src/pipelineStreams.ts
Normal file
93
packages/backend/src/pipelineStreams.ts
Normal file
@@ -0,0 +1,93 @@
|
||||
import type { SSEEvent } from './types/agents.js';
|
||||
|
||||
type Listener = (event: SSEEvent) => void;
|
||||
|
||||
interface PipelineSession {
|
||||
history: SSEEvent[];
|
||||
listeners: Set<Listener>;
|
||||
terminal: boolean;
|
||||
cleanupTimer: NodeJS.Timeout | null;
|
||||
}
|
||||
|
||||
const sessions = new Map<string, PipelineSession>();
|
||||
const CLEANUP_DELAY_MS = 10 * 60 * 1000;
|
||||
|
||||
function getOrCreateSession(recId: string): PipelineSession {
|
||||
const existing = sessions.get(recId);
|
||||
if (existing) {
|
||||
if (existing.cleanupTimer) {
|
||||
clearTimeout(existing.cleanupTimer);
|
||||
existing.cleanupTimer = null;
|
||||
}
|
||||
return existing;
|
||||
}
|
||||
|
||||
const session: PipelineSession = {
|
||||
history: [],
|
||||
listeners: new Set(),
|
||||
terminal: false,
|
||||
cleanupTimer: null,
|
||||
};
|
||||
sessions.set(recId, session);
|
||||
return session;
|
||||
}
|
||||
|
||||
function scheduleCleanup(recId: string, session: PipelineSession): void {
|
||||
if (session.cleanupTimer) {
|
||||
clearTimeout(session.cleanupTimer);
|
||||
}
|
||||
|
||||
session.cleanupTimer = setTimeout(() => {
|
||||
const current = sessions.get(recId);
|
||||
if (current === session && current.listeners.size === 0) {
|
||||
sessions.delete(recId);
|
||||
}
|
||||
}, CLEANUP_DELAY_MS);
|
||||
}
|
||||
|
||||
export function resetPipelineSession(recId: string): void {
|
||||
const existing = sessions.get(recId);
|
||||
if (existing?.cleanupTimer) {
|
||||
clearTimeout(existing.cleanupTimer);
|
||||
}
|
||||
sessions.delete(recId);
|
||||
}
|
||||
|
||||
export function publishPipelineEvent(recId: string, event: SSEEvent): void {
|
||||
const session = getOrCreateSession(recId);
|
||||
session.history.push(event);
|
||||
|
||||
for (const listener of session.listeners) {
|
||||
listener(event);
|
||||
}
|
||||
|
||||
if (event.stage === 'complete' || event.status === 'error') {
|
||||
session.terminal = true;
|
||||
if (session.listeners.size === 0) {
|
||||
scheduleCleanup(recId, session);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function getPipelineHistory(recId: string): SSEEvent[] {
|
||||
return sessions.get(recId)?.history ?? [];
|
||||
}
|
||||
|
||||
export function hasPipelineSession(recId: string): boolean {
|
||||
return sessions.has(recId);
|
||||
}
|
||||
|
||||
export function subscribeToPipelineEvents(recId: string, listener: Listener): () => void {
|
||||
const session = getOrCreateSession(recId);
|
||||
session.listeners.add(listener);
|
||||
|
||||
return () => {
|
||||
const current = sessions.get(recId);
|
||||
if (!current) return;
|
||||
|
||||
current.listeners.delete(listener);
|
||||
if (current.terminal && current.listeners.size === 0) {
|
||||
scheduleCleanup(recId, current);
|
||||
}
|
||||
};
|
||||
}
|
||||
300
packages/backend/src/pipelines/continuous.ts
Normal file
300
packages/backend/src/pipelines/continuous.ts
Normal file
@@ -0,0 +1,300 @@
|
||||
import { eq } from 'drizzle-orm';
|
||||
import { db } from '../db.js';
|
||||
import { recommendations } from '../db/schema.js';
|
||||
import { runInterpreter } from '../agents/interpreter.js';
|
||||
import { runRetrievalBatch } from '../agents/retrievalContinuous.js';
|
||||
import { runValidator } from '../agents/validator.js';
|
||||
import { runRanking } from '../agents/ranking.js';
|
||||
import { runCurator } from '../agents/curator.js';
|
||||
import type { CuratorOutput, InterpreterOutput, MediaType, RankingOutput, RetrievalCandidate, SSEEvent } from '../types/agents.js';
|
||||
import { generateTitle } from '../agents/titleGenerator.js';
|
||||
|
||||
function log(msg: string, data?: unknown) {
|
||||
const ts = new Date().toISOString();
|
||||
if (data !== undefined) {
|
||||
console.log(`[continuous-pipeline] [${ts}] ${msg}`, data);
|
||||
} else {
|
||||
console.log(`[continuous-pipeline] [${ts}] ${msg}`);
|
||||
}
|
||||
}
|
||||
|
||||
function deduplicateCandidates(candidates: RetrievalCandidate[], seenTitles: Set<string>): RetrievalCandidate[] {
|
||||
return candidates.filter((c) => {
|
||||
const key = c.title.toLowerCase();
|
||||
if (seenTitles.has(key)) return false;
|
||||
seenTitles.add(key);
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
function splitIntoBuckets<T>(items: T[], n: number): T[][] {
|
||||
const size = Math.ceil(items.length / n);
|
||||
return Array.from({ length: n }, (_, i) => items.slice(i * size, (i + 1) * size))
|
||||
.filter((b) => b.length > 0);
|
||||
}
|
||||
|
||||
function mergeCuratorOutputs(a: CuratorOutput[], b: CuratorOutput[]): CuratorOutput[] {
|
||||
const seen = new Set(a.map((x) => x.title.toLowerCase()));
|
||||
return [...a, ...b.filter((x) => !seen.has(x.title.toLowerCase()))];
|
||||
}
|
||||
|
||||
interface ContinuousPipelineInput {
|
||||
likedShows: string;
|
||||
dislikedShows?: string;
|
||||
themes?: string;
|
||||
requirements?: string;
|
||||
avoid?: string;
|
||||
totalCount: number;
|
||||
useWebSearch: boolean;
|
||||
validateResults: boolean;
|
||||
mediaType: MediaType;
|
||||
}
|
||||
|
||||
export async function runContinuousPipeline(
|
||||
recId: string,
|
||||
input: ContinuousPipelineInput,
|
||||
sseWrite: (event: SSEEvent) => void,
|
||||
): Promise<CuratorOutput[]> {
|
||||
const startTime = Date.now();
|
||||
const {
|
||||
likedShows,
|
||||
dislikedShows = '',
|
||||
themes = '',
|
||||
requirements = '',
|
||||
avoid = '',
|
||||
totalCount,
|
||||
useWebSearch,
|
||||
validateResults,
|
||||
mediaType,
|
||||
} = input;
|
||||
|
||||
const totalBatches = Math.ceil(totalCount / 10);
|
||||
const allSeenTitles = new Set<string>();
|
||||
let previousResponseId: string | null = null;
|
||||
let interpreterOutput: InterpreterOutput | null = null;
|
||||
const accumulatedCandidates: RetrievalCandidate[] = [];
|
||||
|
||||
log(`Starting continuous pipeline: ${totalCount} titles (${totalBatches} batches), mediaType=${mediaType}, webSearch=${useWebSearch}, validate=${validateResults}`);
|
||||
|
||||
try {
|
||||
// --- Interpreter (runs once) ---
|
||||
log('Interpreter: start');
|
||||
sseWrite({ stage: 'interpreter', status: 'start' });
|
||||
const t0 = Date.now();
|
||||
|
||||
interpreterOutput = await runInterpreter({
|
||||
main_prompt: themes || 'recommend shows based on user preferences',
|
||||
liked_shows: likedShows,
|
||||
disliked_shows: dislikedShows,
|
||||
themes: themes,
|
||||
media_type: mediaType,
|
||||
});
|
||||
|
||||
log(`Interpreter: done (${Date.now() - t0}ms)`, {
|
||||
liked: interpreterOutput.liked,
|
||||
themes: interpreterOutput.themes,
|
||||
});
|
||||
sseWrite({ stage: 'interpreter', status: 'done', data: interpreterOutput });
|
||||
|
||||
// --- Retrieval (batched, chained) ---
|
||||
log(`Retrieval: start (${totalBatches} batches)`);
|
||||
sseWrite({ stage: 'retrieval', status: 'start' });
|
||||
|
||||
const hardRequirements = requirements.length > 0;
|
||||
|
||||
for (let batchIndex = 0; batchIndex < totalBatches; batchIndex++) {
|
||||
log(`Retrieval batch ${batchIndex + 1}/${totalBatches}: start`);
|
||||
sseWrite({
|
||||
stage: 'retrieval',
|
||||
status: 'start',
|
||||
data: {
|
||||
batch: batchIndex + 1,
|
||||
totalBatches,
|
||||
totalCandidates: accumulatedCandidates.length,
|
||||
},
|
||||
});
|
||||
|
||||
const seenTitlesArray = Array.from(allSeenTitles);
|
||||
const result = await runRetrievalBatch(
|
||||
interpreterOutput,
|
||||
mediaType,
|
||||
useWebSearch,
|
||||
hardRequirements,
|
||||
seenTitlesArray,
|
||||
previousResponseId
|
||||
);
|
||||
|
||||
previousResponseId = result.responseId;
|
||||
|
||||
// Deduplicate against previously seen titles
|
||||
const deduped = deduplicateCandidates(result.candidates, allSeenTitles);
|
||||
accumulatedCandidates.push(...deduped);
|
||||
|
||||
log(`Retrieval batch ${batchIndex + 1}/${totalBatches}: done, ${deduped.length} new candidates (total: ${accumulatedCandidates.length})`);
|
||||
}
|
||||
|
||||
log(`Retrieval: complete, ${accumulatedCandidates.length} total candidates`);
|
||||
sseWrite({
|
||||
stage: 'retrieval',
|
||||
status: 'done',
|
||||
data: {
|
||||
totalBatches,
|
||||
totalCandidates: accumulatedCandidates.length,
|
||||
},
|
||||
});
|
||||
|
||||
// --- Validator (optional, per batch already done during retrieval) ---
|
||||
// Note: In continuous mode, we could run validator after each batch
|
||||
// For now, we'll run it once after all batches if validateResults is true
|
||||
let candidatesForRanking = accumulatedCandidates;
|
||||
|
||||
if (validateResults && candidatesForRanking.length > 0) {
|
||||
log(`Validator: start (${candidatesForRanking.length} candidates)`);
|
||||
sseWrite({ stage: 'validator', status: 'start' });
|
||||
|
||||
const tV = Date.now();
|
||||
const validatorOutput = await runValidator(candidatesForRanking, mediaType);
|
||||
const verified = validatorOutput.candidates.filter((c) => !c.isTrash);
|
||||
const trashCount = validatorOutput.candidates.length - verified.length;
|
||||
|
||||
candidatesForRanking = verified.map(({ title, reason }) => ({ title, reason }));
|
||||
allSeenTitles.clear();
|
||||
candidatesForRanking.forEach((c) => allSeenTitles.add(c.title.toLowerCase()));
|
||||
|
||||
log(`Validator: done (${Date.now() - tV}ms) — removed ${trashCount} trash entries`);
|
||||
sseWrite({ stage: 'validator', status: 'done', data: { removed: trashCount, remaining: candidatesForRanking.length } });
|
||||
} else {
|
||||
sseWrite({ stage: 'validator', status: 'done', data: { skipped: !validateResults } });
|
||||
}
|
||||
|
||||
// --- Ranking (bucketed) ---
|
||||
log(`Ranking: start (${candidatesForRanking.length} candidates)`);
|
||||
sseWrite({ stage: 'ranking', status: 'start' });
|
||||
const t2 = Date.now();
|
||||
|
||||
const rankBucketCount = Math.ceil(candidatesForRanking.length / 15) || 1;
|
||||
const candidateBuckets = splitIntoBuckets(candidatesForRanking, rankBucketCount);
|
||||
|
||||
const rankingBuckets = await Promise.all(
|
||||
candidateBuckets.map((bucket) =>
|
||||
runRanking(interpreterOutput!, { candidates: bucket }, mediaType, hardRequirements)
|
||||
)
|
||||
);
|
||||
|
||||
const dedupTitles = (titles: string[]) => [...new Map(titles.map((t) => [t.toLowerCase(), t])).values()];
|
||||
const rankingOutput: RankingOutput = {
|
||||
full_match: dedupTitles(rankingBuckets.flatMap((r) => r.full_match)),
|
||||
definitely_like: dedupTitles(rankingBuckets.flatMap((r) => r.definitely_like)),
|
||||
might_like: dedupTitles(rankingBuckets.flatMap((r) => r.might_like)),
|
||||
questionable: dedupTitles(rankingBuckets.flatMap((r) => r.questionable)),
|
||||
will_not_like: dedupTitles(rankingBuckets.flatMap((r) => r.will_not_like)),
|
||||
};
|
||||
|
||||
log(`Ranking: done (${Date.now() - t2}ms)`, {
|
||||
full_match: rankingOutput.full_match.length,
|
||||
definitely_like: rankingOutput.definitely_like.length,
|
||||
might_like: rankingOutput.might_like.length,
|
||||
});
|
||||
sseWrite({ stage: 'ranking', status: 'done', data: rankingOutput });
|
||||
|
||||
// print all ranked titles for debuggings
|
||||
log('Ranked titles:', {
|
||||
full_match: rankingOutput.full_match,
|
||||
definitely_like: rankingOutput.definitely_like,
|
||||
might_like: rankingOutput.might_like,
|
||||
questionable: rankingOutput.questionable,
|
||||
will_not_like: rankingOutput.will_not_like,
|
||||
});
|
||||
|
||||
// --- Curator (bucketed) ---
|
||||
log(`Curator: start`);
|
||||
sseWrite({ stage: 'curator', status: 'start' });
|
||||
const t3 = Date.now();
|
||||
|
||||
type CategorizedItem = { title: string; category: keyof RankingOutput };
|
||||
const categorizedItems: CategorizedItem[] = [
|
||||
...rankingOutput.full_match.map((t) => ({ title: t, category: 'full_match' as const })),
|
||||
...rankingOutput.definitely_like.map((t) => ({ title: t, category: 'definitely_like' as const })),
|
||||
...rankingOutput.might_like.map((t) => ({ title: t, category: 'might_like' as const })),
|
||||
...rankingOutput.questionable.map((t) => ({ title: t, category: 'questionable' as const })),
|
||||
...rankingOutput.will_not_like.map((t) => ({ title: t, category: 'will_not_like' as const })),
|
||||
];
|
||||
|
||||
const curatorBucketCount = Math.ceil(categorizedItems.length / 15) || 1;
|
||||
const curatorItemBuckets = splitIntoBuckets(categorizedItems, curatorBucketCount);
|
||||
const curatorBucketRankings: RankingOutput[] = curatorItemBuckets.map((bucket) => ({
|
||||
full_match: bucket.filter((i) => i.category === 'full_match').map((i) => i.title),
|
||||
definitely_like: bucket.filter((i) => i.category === 'definitely_like').map((i) => i.title),
|
||||
might_like: bucket.filter((i) => i.category === 'might_like').map((i) => i.title),
|
||||
questionable: bucket.filter((i) => i.category === 'questionable').map((i) => i.title),
|
||||
will_not_like: bucket.filter((i) => i.category === 'will_not_like').map((i) => i.title),
|
||||
}));
|
||||
|
||||
const curatorBucketOutputs = await Promise.all(
|
||||
curatorBucketRankings.map((ranking) =>
|
||||
runCurator(ranking, interpreterOutput!, mediaType, useWebSearch)
|
||||
)
|
||||
);
|
||||
|
||||
const curatorOutput = curatorBucketOutputs.reduce((acc, bucket) => mergeCuratorOutputs(acc, bucket), [] as CuratorOutput[]);
|
||||
log(`Curator: done (${Date.now() - t3}ms) — ${curatorOutput.length} items curated`);
|
||||
sseWrite({ stage: 'curator', status: 'done', data: curatorOutput });
|
||||
|
||||
// Generate AI title
|
||||
let aiTitle = 'Continuous Recommendations';
|
||||
try {
|
||||
log('Title generation: start');
|
||||
aiTitle = await generateTitle(interpreterOutput, mediaType);
|
||||
log(`Title generation: done — "${aiTitle}"`);
|
||||
} catch (err) {
|
||||
log(`Title generation failed: ${String(err)}`);
|
||||
}
|
||||
|
||||
// Sort by category order
|
||||
const CATEGORY_ORDER: Record<string, number> = {
|
||||
'Full Match': 0,
|
||||
'Definitely Like': 1,
|
||||
'Might Like': 2,
|
||||
'Questionable': 3,
|
||||
'Will Not Like': 4,
|
||||
};
|
||||
curatorOutput.sort((a, b) => (CATEGORY_ORDER[a.category] ?? 99) - (CATEGORY_ORDER[b.category] ?? 99));
|
||||
|
||||
// Save to database (update existing record)
|
||||
log('Saving results to DB');
|
||||
await db
|
||||
.update(recommendations)
|
||||
.set({
|
||||
title: aiTitle,
|
||||
main_prompt: themes || 'Continuous recommendations',
|
||||
liked_shows: likedShows,
|
||||
disliked_shows: dislikedShows,
|
||||
themes: themes,
|
||||
brainstorm_count: totalCount,
|
||||
media_type: mediaType,
|
||||
use_web_search: useWebSearch,
|
||||
use_validator: validateResults,
|
||||
status: 'done',
|
||||
recommendations: curatorOutput,
|
||||
})
|
||||
.where(eq(recommendations.id, recId));
|
||||
|
||||
sseWrite({ stage: 'complete', status: 'done', data: { id: recId, title: aiTitle, batchCount: totalBatches } });
|
||||
|
||||
log(`Pipeline complete (total: ${Date.now() - startTime}ms), saved as ${recId}`);
|
||||
return curatorOutput;
|
||||
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
log(`Pipeline error: ${message}`);
|
||||
|
||||
// Update status to error
|
||||
await db
|
||||
.update(recommendations)
|
||||
.set({ status: 'error' })
|
||||
.where(eq(recommendations.id, recId));
|
||||
|
||||
sseWrite({ stage: 'curator', status: 'error', data: { message } });
|
||||
return [];
|
||||
}
|
||||
}
|
||||
@@ -3,8 +3,26 @@ import { eq, desc } from 'drizzle-orm';
|
||||
import { db } from '../db.js';
|
||||
import { recommendations, feedback } from '../db/schema.js';
|
||||
import { runPipeline } from '../pipelines/recommendation.js';
|
||||
import { runContinuousPipeline } from '../pipelines/continuous.js';
|
||||
import type { MediaType, SSEEvent } from '../types/agents.js';
|
||||
import { supportsWebSearch } from '../agent.js';
|
||||
import {
|
||||
getPipelineHistory,
|
||||
hasPipelineSession,
|
||||
publishPipelineEvent,
|
||||
resetPipelineSession,
|
||||
subscribeToPipelineEvents,
|
||||
} from '../pipelineStreams.js';
|
||||
|
||||
const UUID_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
|
||||
|
||||
function isUuid(id: string): boolean {
|
||||
return UUID_RE.test(id);
|
||||
}
|
||||
|
||||
function writeSSE(raw: typeof import('node:http').ServerResponse.prototype, event: SSEEvent): void {
|
||||
raw.write(`data: ${JSON.stringify(event)}\n\n`);
|
||||
}
|
||||
|
||||
export default async function recommendationsRoute(fastify: FastifyInstance) {
|
||||
// POST /recommendations — create record, return { id }
|
||||
@@ -64,6 +82,94 @@ export default async function recommendationsRoute(fastify: FastifyInstance) {
|
||||
return reply.code(201).send({ id: rec?.id });
|
||||
});
|
||||
|
||||
// POST /recommendations/continuous — create record and run continuous pipeline with SSE
|
||||
fastify.post('/recommendations/continuous', async (request, reply) => {
|
||||
const body = request.body as {
|
||||
liked_shows: string;
|
||||
disliked_shows?: string;
|
||||
themes?: string;
|
||||
requirements?: string;
|
||||
avoid?: string;
|
||||
total_count: number;
|
||||
media_type?: string;
|
||||
use_web_search?: boolean;
|
||||
validate_results?: boolean;
|
||||
};
|
||||
|
||||
const mediaType: MediaType = body.media_type === 'movie' ? 'movie' : 'tv_show';
|
||||
const useWebSearch = body.use_web_search === true;
|
||||
const validateResults = body.validate_results === true && supportsWebSearch;
|
||||
const totalCount = Number.isFinite(body.total_count) ? Math.min(100, Math.max(10, body.total_count)) : 30;
|
||||
|
||||
// Create record first with pending status
|
||||
const title = body.themes?.trim().split(/\s+/).slice(0, 5).join(' ') || 'Continuous Recommendations';
|
||||
const [rec] = await db
|
||||
.insert(recommendations)
|
||||
.values({
|
||||
title,
|
||||
main_prompt: body.themes ?? 'Continuous recommendations',
|
||||
liked_shows: body.liked_shows,
|
||||
disliked_shows: body.disliked_shows ?? '',
|
||||
themes: body.themes ?? '',
|
||||
brainstorm_count: totalCount,
|
||||
media_type: mediaType,
|
||||
use_web_search: useWebSearch,
|
||||
use_validator: validateResults,
|
||||
status: 'pending',
|
||||
})
|
||||
.returning({ id: recommendations.id });
|
||||
|
||||
if (!rec) {
|
||||
return reply.code(500).send({ error: 'Failed to create recommendation record' });
|
||||
}
|
||||
|
||||
const recId = rec.id;
|
||||
resetPipelineSession(recId);
|
||||
|
||||
// Set SSE headers and hijack
|
||||
reply.raw.setHeader('Content-Type', 'text/event-stream');
|
||||
reply.raw.setHeader('Cache-Control', 'no-cache');
|
||||
reply.raw.setHeader('Connection', 'keep-alive');
|
||||
reply.raw.setHeader('Access-Control-Allow-Origin', '*');
|
||||
reply.raw.flushHeaders();
|
||||
reply.hijack();
|
||||
|
||||
// Immediately send the record ID so frontend can navigate to it
|
||||
reply.raw.write(`data: ${JSON.stringify({ type: 'created', id: recId })}\n\n`);
|
||||
|
||||
const sseWrite = (event: SSEEvent) => {
|
||||
publishPipelineEvent(recId, event);
|
||||
try {
|
||||
writeSSE(reply.raw, event);
|
||||
} catch {
|
||||
// Client disconnected
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
// Update status to running
|
||||
await db
|
||||
.update(recommendations)
|
||||
.set({ status: 'running' })
|
||||
.where(eq(recommendations.id, recId));
|
||||
|
||||
// Run the continuous pipeline (it will now update the existing record)
|
||||
await runContinuousPipeline(recId, {
|
||||
likedShows: body.liked_shows,
|
||||
dislikedShows: body.disliked_shows ?? '',
|
||||
themes: body.themes ?? '',
|
||||
requirements: body.requirements ?? '',
|
||||
avoid: body.avoid ?? '',
|
||||
totalCount,
|
||||
useWebSearch,
|
||||
validateResults,
|
||||
mediaType,
|
||||
}, sseWrite);
|
||||
} finally {
|
||||
reply.raw.end();
|
||||
}
|
||||
});
|
||||
|
||||
// GET /recommendations — list all
|
||||
fastify.get('/recommendations', async (_request, reply) => {
|
||||
const rows = await db
|
||||
@@ -82,6 +188,7 @@ export default async function recommendationsRoute(fastify: FastifyInstance) {
|
||||
// GET /recommendations/:id — full record
|
||||
fastify.get('/recommendations/:id', async (request, reply) => {
|
||||
const { id } = request.params as { id: string };
|
||||
if (!isUuid(id)) return reply.code(404).send({ error: 'Not found' });
|
||||
const [rec] = await db
|
||||
.select()
|
||||
.from(recommendations)
|
||||
@@ -94,6 +201,7 @@ export default async function recommendationsRoute(fastify: FastifyInstance) {
|
||||
// GET /recommendations/:id/stream — SSE pipeline stream
|
||||
fastify.get('/recommendations/:id/stream', async (request, reply) => {
|
||||
const { id } = request.params as { id: string };
|
||||
if (!isUuid(id)) return reply.code(404).send({ error: 'Not found' });
|
||||
const [rec] = await db
|
||||
.select()
|
||||
.from(recommendations)
|
||||
@@ -113,7 +221,7 @@ export default async function recommendationsRoute(fastify: FastifyInstance) {
|
||||
// an in-flight pipeline that is still running server-side.
|
||||
const sseWrite = (event: SSEEvent) => {
|
||||
try {
|
||||
reply.raw.write(`data: ${JSON.stringify(event)}\n\n`);
|
||||
writeSSE(reply.raw, event);
|
||||
} catch {
|
||||
// Client disconnected — pipeline continues, writes are silently dropped
|
||||
}
|
||||
@@ -136,6 +244,34 @@ export default async function recommendationsRoute(fastify: FastifyInstance) {
|
||||
// Poll the DB until it reaches a terminal state, then report the result.
|
||||
// This prevents starting a duplicate pipeline run on page reload.
|
||||
if (rec.status === 'running') {
|
||||
if (hasPipelineSession(id)) {
|
||||
const history = getPipelineHistory(id);
|
||||
for (const event of history) {
|
||||
sseWrite(event);
|
||||
}
|
||||
|
||||
const lastEvent = history[history.length - 1];
|
||||
if (lastEvent && (lastEvent.stage === 'complete' || lastEvent.status === 'error')) {
|
||||
return;
|
||||
}
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
const unsubscribe = subscribeToPipelineEvents(id, (event) => {
|
||||
sseWrite(event);
|
||||
if (event.stage === 'complete' || event.status === 'error') {
|
||||
unsubscribe();
|
||||
resolve();
|
||||
}
|
||||
});
|
||||
|
||||
request.raw.on('close', () => {
|
||||
unsubscribe();
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const POLL_INTERVAL_MS = 2000;
|
||||
const TIMEOUT_MS = 20 * 60 * 1000; // 20 minutes hard ceiling
|
||||
const start = Date.now();
|
||||
@@ -161,6 +297,7 @@ export default async function recommendationsRoute(fastify: FastifyInstance) {
|
||||
}
|
||||
|
||||
// status === 'pending' — start the pipeline normally.
|
||||
resetPipelineSession(id);
|
||||
|
||||
// Load all feedback to potentially inject as context
|
||||
const feedbackRows = await db.select().from(feedback);
|
||||
@@ -175,7 +312,10 @@ export default async function recommendationsRoute(fastify: FastifyInstance) {
|
||||
.join('\n')
|
||||
: undefined;
|
||||
|
||||
await runPipeline(rec, sseWrite, feedbackContext);
|
||||
await runPipeline(rec, (event) => {
|
||||
publishPipelineEvent(id, event);
|
||||
sseWrite(event);
|
||||
}, feedbackContext);
|
||||
} finally {
|
||||
reply.raw.end();
|
||||
}
|
||||
@@ -184,6 +324,7 @@ export default async function recommendationsRoute(fastify: FastifyInstance) {
|
||||
// DELETE /recommendations/:id — delete a recommendation
|
||||
fastify.delete('/recommendations/:id', async (request, reply) => {
|
||||
const { id } = request.params as { id: string };
|
||||
if (!isUuid(id)) return reply.code(404).send({ error: 'Not found' });
|
||||
const [rec] = await db
|
||||
.select({ id: recommendations.id })
|
||||
.from(recommendations)
|
||||
@@ -199,6 +340,7 @@ export default async function recommendationsRoute(fastify: FastifyInstance) {
|
||||
// POST /recommendations/:id/rerank — reset status so client can re-open SSE stream
|
||||
fastify.post('/recommendations/:id/rerank', async (request, reply) => {
|
||||
const { id } = request.params as { id: string };
|
||||
if (!isUuid(id)) return reply.code(404).send({ error: 'Not found' });
|
||||
const [rec] = await db
|
||||
.select({ id: recommendations.id })
|
||||
.from(recommendations)
|
||||
@@ -210,6 +352,7 @@ export default async function recommendationsRoute(fastify: FastifyInstance) {
|
||||
.update(recommendations)
|
||||
.set({ status: 'pending' })
|
||||
.where(eq(recommendations.id, id));
|
||||
resetPipelineSession(id);
|
||||
|
||||
return reply.send({ ok: true });
|
||||
});
|
||||
|
||||
@@ -67,3 +67,28 @@ export interface SSEEvent {
|
||||
status: SSEStatus;
|
||||
data?: unknown;
|
||||
}
|
||||
|
||||
export interface ContinuousSession {
|
||||
sessionId: string;
|
||||
mediaType: MediaType;
|
||||
interpreterOutput: InterpreterOutput;
|
||||
accumulatedCandidates: RetrievalCandidate[];
|
||||
previousResponseId: string | null;
|
||||
batchCount: number;
|
||||
totalCount: number;
|
||||
useWebSearch: boolean;
|
||||
validateResults: boolean;
|
||||
allSeenTitles: Set<string>;
|
||||
}
|
||||
|
||||
export interface ContinuousStartRequest {
|
||||
mediaType: 'tv_show' | 'movie';
|
||||
likedShows: string;
|
||||
dislikedShows?: string;
|
||||
themes?: string;
|
||||
requirements?: string;
|
||||
avoid?: string;
|
||||
totalCount: number;
|
||||
useWebSearch: boolean;
|
||||
validateResults: boolean;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user