new things
This commit is contained in:
@@ -0,0 +1,9 @@
|
||||
ALTER TABLE "recommendations" ADD COLUMN "use_validator" boolean DEFAULT false NOT NULL;
|
||||
--> statement-breakpoint
|
||||
ALTER TABLE "recommendations" ADD COLUMN "hard_requirements" boolean DEFAULT false NOT NULL;
|
||||
--> statement-breakpoint
|
||||
ALTER TABLE "recommendations" ADD COLUMN "self_expansive" boolean DEFAULT false NOT NULL;
|
||||
--> statement-breakpoint
|
||||
ALTER TABLE "recommendations" ADD COLUMN "expansive_passes" integer DEFAULT 1 NOT NULL;
|
||||
--> statement-breakpoint
|
||||
ALTER TABLE "recommendations" ADD COLUMN "expansive_mode" text DEFAULT 'soft' NOT NULL;
|
||||
@@ -15,6 +15,7 @@ export async function runRanking(
|
||||
interpreter: InterpreterOutput,
|
||||
retrieval: RetrievalOutput,
|
||||
mediaType: MediaType = 'tv_show',
|
||||
hardRequirements = false,
|
||||
): Promise<RankingOutput> {
|
||||
const mediaLabel = mediaType === 'movie' ? 'movie' : 'TV show';
|
||||
|
||||
@@ -57,7 +58,7 @@ Tags:
|
||||
- "questionable": Partial alignment, some aspects don't match
|
||||
- "will_not_like": Likely mismatch, conflicts with preferences or avoidance criteria
|
||||
|
||||
Every ${mediaLabel} in the input must appear in exactly one tag. Use the title exactly as given.`,
|
||||
Every ${mediaLabel} in the input must appear in exactly one tag. Use the title exactly as given.${hardRequirements ? '\n\nHARD REQUIREMENTS MODE: Any candidate that does not satisfy every stated requirement must be placed in "will_not_like", regardless of other qualities.' : ''}`,
|
||||
input: `User preferences:
|
||||
Liked ${mediaLabel}s: ${interpreter.liked.join(', ') || '(none)'}
|
||||
Themes: ${interpreter.themes.join(', ') || '(none)'}
|
||||
|
||||
@@ -15,6 +15,8 @@ export async function runRetrieval(
|
||||
brainstormCount = 100,
|
||||
mediaType: MediaType = 'tv_show',
|
||||
useWebSearch = false,
|
||||
hardRequirements = false,
|
||||
previousFullMatches: string[] = [],
|
||||
): Promise<RetrievalOutput> {
|
||||
const mediaLabel = mediaType === 'movie' ? 'movie' : 'TV show';
|
||||
const mediaLabelPlural = mediaType === 'movie' ? 'movies' : 'TV shows';
|
||||
@@ -34,7 +36,7 @@ Rules:
|
||||
- Each "reason" should briefly explain why the ${mediaLabel} matches the preferences
|
||||
- Avoid duplicates
|
||||
- Include ${mediaLabelPlural} from different decades, countries${mediaType === 'tv_show' ? ', and networks' : ', and directors'}
|
||||
- Aim for ${brainstormCount} candidates minimum`,
|
||||
- Aim for ${brainstormCount} candidates minimum${previousFullMatches.length > 0 ? '\n- Do NOT suggest titles already in the Previous Full Matches list — generate NEW candidates inspired by what made those successful' : ''}${hardRequirements ? '\n\nIMPORTANT: Strictly follow ALL requirements. Exclude any candidate that does not meet every stated requirement.' : ''}`,
|
||||
input: `Structured preferences:
|
||||
Liked ${mediaLabelPlural}: ${input.liked.join(', ') || '(none)'}
|
||||
Disliked ${mediaLabelPlural}: ${input.disliked.join(', ') || '(none)'}
|
||||
@@ -42,7 +44,7 @@ Themes: ${input.themes.join(', ') || '(none)'}
|
||||
Character preferences: ${input.character_preferences.join(', ') || '(none)'}
|
||||
Tone: ${input.tone.join(', ') || '(none)'}
|
||||
Avoid: ${input.avoid.join(', ') || '(none)'}
|
||||
Requirements: ${input.requirements.join(', ') || '(none)'}
|
||||
Requirements: ${input.requirements.join(', ') || '(none)'}${previousFullMatches.length > 0 ? `\n\nPrevious Full Match titles (DO NOT repeat these; use them as inspiration for NEW candidates with similar qualities): ${previousFullMatches.join(', ')}` : ''}
|
||||
|
||||
Generate a large, diverse pool of ${mediaLabel} candidates.`,
|
||||
});
|
||||
|
||||
67
packages/backend/src/agents/validator.ts
Normal file
67
packages/backend/src/agents/validator.ts
Normal file
@@ -0,0 +1,67 @@
|
||||
import { openai, defaultModel, serviceOptions, supportsWebSearch } from '../agent.js';
|
||||
import type { RetrievalCandidate, ValidatorOutput, MediaType } from '../types/agents.js';
|
||||
import { z } from 'zod';
|
||||
import { zodTextFormat } from 'openai/helpers/zod';
|
||||
|
||||
const ValidatorSchema = z.object({
|
||||
candidates: z.array(z.object({
|
||||
title: z.string(),
|
||||
reason: z.string(),
|
||||
isTrash: z.boolean(),
|
||||
})),
|
||||
});
|
||||
|
||||
const CHUNK_SIZE = 30;
|
||||
|
||||
function splitIntoChunks<T>(items: T[], size: number): T[][] {
|
||||
const chunks: T[][] = [];
|
||||
for (let i = 0; i < items.length; i += size) {
|
||||
chunks.push(items.slice(i, i + size));
|
||||
}
|
||||
return chunks;
|
||||
}
|
||||
|
||||
async function runValidatorChunk(
|
||||
candidates: RetrievalCandidate[],
|
||||
mediaLabel: string,
|
||||
): Promise<ValidatorOutput> {
|
||||
const list = candidates.map((c) => `- ${c.title}: ${c.reason}`).join('\n');
|
||||
|
||||
const response = await openai.responses.parse({
|
||||
model: defaultModel,
|
||||
temperature: 0.1,
|
||||
...serviceOptions,
|
||||
...(supportsWebSearch ? { tools: [{ type: 'web_search' as const }] } : {}),
|
||||
text: { format: zodTextFormat(ValidatorSchema, 'validation') },
|
||||
instructions: `You are a ${mediaLabel} metadata validator. For each candidate in the list, use web search to verify:
|
||||
1. The title actually exists as a real, produced ${mediaLabel} (not a made-up or hallucinated title)
|
||||
2. Correct the "reason" field with accurate metadata (actual genres, tone, year) if it contains errors
|
||||
|
||||
Set isTrash: true for entries that:
|
||||
- Do not exist as a real ${mediaLabel}
|
||||
- Are clearly hallucinated or fictional titles
|
||||
- Are so incorrect that no real match can be identified
|
||||
|
||||
Set isTrash: false for real, verifiable ${mediaLabel}s, even if minor metadata corrections are needed.
|
||||
Return every candidate — do not drop any entries from the output.`,
|
||||
input: `Validate these ${mediaLabel} candidates:\n${list}`,
|
||||
});
|
||||
|
||||
return (response.output_parsed as ValidatorOutput) ?? { candidates: [] };
|
||||
}
|
||||
|
||||
export async function runValidator(
|
||||
candidates: RetrievalCandidate[],
|
||||
mediaType: MediaType = 'tv_show',
|
||||
): Promise<ValidatorOutput> {
|
||||
const mediaLabel = mediaType === 'movie' ? 'movie' : 'TV show';
|
||||
const chunks = splitIntoChunks(candidates, CHUNK_SIZE);
|
||||
|
||||
const chunkResults = await Promise.all(
|
||||
chunks.map((chunk) => runValidatorChunk(chunk, mediaLabel))
|
||||
);
|
||||
|
||||
return {
|
||||
candidates: chunkResults.flatMap((r) => r.candidates),
|
||||
};
|
||||
}
|
||||
@@ -11,6 +11,11 @@ export const recommendations = pgTable('recommendations', {
|
||||
brainstorm_count: integer('brainstorm_count').notNull().default(100),
|
||||
media_type: text('media_type').notNull().default('tv_show'),
|
||||
use_web_search: boolean('use_web_search').notNull().default(false),
|
||||
use_validator: boolean('use_validator').notNull().default(false),
|
||||
hard_requirements: boolean('hard_requirements').notNull().default(false),
|
||||
self_expansive: boolean('self_expansive').notNull().default(false),
|
||||
expansive_passes: integer('expansive_passes').notNull().default(1),
|
||||
expansive_mode: text('expansive_mode').notNull().default('soft'),
|
||||
recommendations: jsonb('recommendations').$type<CuratorOutput[]>(),
|
||||
status: text('status').notNull().default('pending'),
|
||||
created_at: timestamp('created_at').defaultNow().notNull(),
|
||||
|
||||
@@ -4,6 +4,7 @@ import postgres from 'postgres';
|
||||
import * as dotenv from 'dotenv';
|
||||
import path from 'path';
|
||||
import { fileURLToPath } from 'url';
|
||||
import fs from 'fs/promises';
|
||||
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dirname = path.dirname(__filename);
|
||||
@@ -25,6 +26,10 @@ const runMigrations = async () => {
|
||||
console.log('Running database migrations...');
|
||||
try {
|
||||
const folder = path.join(__dirname, '../drizzle');
|
||||
// print all migrations
|
||||
const migrations = await fs.readdir(folder);
|
||||
console.log('Migrations:', JSON.stringify(migrations));
|
||||
|
||||
await migrate(db, { migrationsFolder: folder });
|
||||
console.log('Migrations completed successfully.');
|
||||
} catch (err) {
|
||||
|
||||
@@ -3,14 +3,16 @@ import { db } from '../db.js';
|
||||
import { recommendations } from '../db/schema.js';
|
||||
import { runInterpreter } from '../agents/interpreter.js';
|
||||
import { runRetrieval } from '../agents/retrieval.js';
|
||||
import { runValidator } from '../agents/validator.js';
|
||||
import { runRanking } from '../agents/ranking.js';
|
||||
import { runCurator } from '../agents/curator.js';
|
||||
import type { CuratorOutput, MediaType, RankingOutput, RetrievalCandidate, SSEEvent } from '../types/agents.js';
|
||||
import type { CuratorOutput, InterpreterOutput, MediaType, RankingOutput, RetrievalCandidate, SSEEvent } from '../types/agents.js';
|
||||
import { generateTitle } from '../agents/titleGenerator.js';
|
||||
|
||||
/* -- Agent pipeline --
|
||||
[1] Interpreter -> gets user input, transforms into structured data
|
||||
[2] Retrieval -> gets candidates from OpenAI (high temperature)
|
||||
[2.5] Validator (optional) -> verifies candidates exist, removes trash
|
||||
[3] Ranking -> ranks candidates based on user input
|
||||
[4] Curator -> curates candidates based on user input
|
||||
*/
|
||||
@@ -24,8 +26,8 @@ function getBucketCount(count: number): number {
|
||||
return 4;
|
||||
}
|
||||
|
||||
function deduplicateCandidates(candidates: RetrievalCandidate[]): RetrievalCandidate[] {
|
||||
const seen = new Set<string>();
|
||||
function deduplicateCandidates(candidates: RetrievalCandidate[], seenTitles?: Set<string>): RetrievalCandidate[] {
|
||||
const seen = seenTitles ?? new Set<string>();
|
||||
return candidates.filter((c) => {
|
||||
const key = c.title.toLowerCase();
|
||||
if (seen.has(key)) return false;
|
||||
@@ -40,6 +42,11 @@ function splitIntoBuckets<T>(items: T[], n: number): T[][] {
|
||||
.filter((b) => b.length > 0);
|
||||
}
|
||||
|
||||
function mergeCuratorOutputs(a: CuratorOutput[], b: CuratorOutput[]): CuratorOutput[] {
|
||||
const seen = new Set(a.map((x) => x.title.toLowerCase()));
|
||||
return [...a, ...b.filter((x) => !seen.has(x.title.toLowerCase()))];
|
||||
}
|
||||
|
||||
function log(recId: string, msg: string, data?: unknown) {
|
||||
const ts = new Date().toISOString();
|
||||
if (data !== undefined) {
|
||||
@@ -49,6 +56,123 @@ function log(recId: string, msg: string, data?: unknown) {
|
||||
}
|
||||
}
|
||||
|
||||
interface SubPipelineCtx {
|
||||
recId: string;
|
||||
interpreterOutput: InterpreterOutput;
|
||||
mediaType: MediaType;
|
||||
useWebSearch: boolean;
|
||||
useValidator: boolean;
|
||||
useHardRequirements: boolean;
|
||||
brainstormCount: number;
|
||||
previousFullMatches: string[];
|
||||
allSeenTitles: Set<string>;
|
||||
stagePrefix: string;
|
||||
sseWrite: (event: SSEEvent) => void;
|
||||
}
|
||||
|
||||
async function runSubPipeline(ctx: SubPipelineCtx): Promise<CuratorOutput[]> {
|
||||
const {
|
||||
recId, interpreterOutput, mediaType, useWebSearch, useValidator,
|
||||
useHardRequirements, brainstormCount, previousFullMatches,
|
||||
allSeenTitles, stagePrefix, sseWrite,
|
||||
} = ctx;
|
||||
|
||||
const p = (stage: string) => (stagePrefix + stage) as SSEEvent['stage'];
|
||||
|
||||
// --- Retrieval (bucketed) ---
|
||||
log(recId, `${stagePrefix}Retrieval: start`);
|
||||
sseWrite({ stage: p('retrieval'), status: 'start' });
|
||||
const t1 = Date.now();
|
||||
const retrievalBucketCount = getBucketCount(brainstormCount);
|
||||
const perBucketCount = Math.ceil(brainstormCount / retrievalBucketCount);
|
||||
const retrievalBuckets = await Promise.all(
|
||||
Array.from({ length: retrievalBucketCount }, () =>
|
||||
runRetrieval(interpreterOutput, perBucketCount, mediaType, useWebSearch, useHardRequirements, previousFullMatches)
|
||||
)
|
||||
);
|
||||
const allCandidates = retrievalBuckets.flatMap((r) => r.candidates);
|
||||
const dedupedCandidates = deduplicateCandidates(allCandidates, allSeenTitles);
|
||||
log(recId, `${stagePrefix}Retrieval: done (${Date.now() - t1}ms) — ${dedupedCandidates.length} candidates (${retrievalBucketCount} buckets, ${allCandidates.length} before dedup)`, {
|
||||
titles: dedupedCandidates.map((c) => c.title),
|
||||
});
|
||||
sseWrite({ stage: p('retrieval'), status: 'done', data: { candidates: dedupedCandidates } });
|
||||
|
||||
// --- Validator (optional) ---
|
||||
let candidatesForRanking = dedupedCandidates;
|
||||
if (useValidator) {
|
||||
log(recId, `${stagePrefix}Validator: start`);
|
||||
sseWrite({ stage: p('validator'), status: 'start' });
|
||||
const tV = Date.now();
|
||||
const validatorOutput = await runValidator(dedupedCandidates, mediaType);
|
||||
const verified = validatorOutput.candidates.filter((c) => !c.isTrash);
|
||||
const trashCount = validatorOutput.candidates.length - verified.length;
|
||||
candidatesForRanking = verified.map(({ title, reason }) => ({ title, reason }));
|
||||
log(recId, `${stagePrefix}Validator: done (${Date.now() - tV}ms) — removed ${trashCount} trash entries`);
|
||||
sseWrite({ stage: p('validator'), status: 'done', data: { removed: trashCount } });
|
||||
} else {
|
||||
sseWrite({ stage: p('validator'), status: 'done', data: { skipped: true } });
|
||||
}
|
||||
|
||||
// --- Ranking (bucketed) ---
|
||||
log(recId, `${stagePrefix}Ranking: start`);
|
||||
sseWrite({ stage: p('ranking'), status: 'start' });
|
||||
const t2 = Date.now();
|
||||
const rankBucketCount = getBucketCount(candidatesForRanking.length);
|
||||
const candidateBuckets = splitIntoBuckets(candidatesForRanking, rankBucketCount);
|
||||
const rankingBuckets = await Promise.all(
|
||||
candidateBuckets.map((bucket) =>
|
||||
runRanking(interpreterOutput, { candidates: bucket }, mediaType, useHardRequirements)
|
||||
)
|
||||
);
|
||||
const rankingOutput: RankingOutput = {
|
||||
full_match: rankingBuckets.flatMap((r) => r.full_match),
|
||||
definitely_like: rankingBuckets.flatMap((r) => r.definitely_like),
|
||||
might_like: rankingBuckets.flatMap((r) => r.might_like),
|
||||
questionable: rankingBuckets.flatMap((r) => r.questionable),
|
||||
will_not_like: rankingBuckets.flatMap((r) => r.will_not_like),
|
||||
};
|
||||
log(recId, `${stagePrefix}Ranking: done (${Date.now() - t2}ms) — ${rankBucketCount} buckets`, {
|
||||
full_match: rankingOutput.full_match.length,
|
||||
definitely_like: rankingOutput.definitely_like.length,
|
||||
might_like: rankingOutput.might_like.length,
|
||||
questionable: rankingOutput.questionable.length,
|
||||
will_not_like: rankingOutput.will_not_like.length,
|
||||
});
|
||||
sseWrite({ stage: p('ranking'), status: 'done', data: rankingOutput });
|
||||
|
||||
// --- Curator (bucketed) ---
|
||||
log(recId, `${stagePrefix}Curator: start`);
|
||||
sseWrite({ stage: p('curator'), status: 'start' });
|
||||
const t3 = Date.now();
|
||||
type CategorizedItem = { title: string; category: keyof RankingOutput };
|
||||
const categorizedItems: CategorizedItem[] = [
|
||||
...rankingOutput.full_match.map((t) => ({ title: t, category: 'full_match' as const })),
|
||||
...rankingOutput.definitely_like.map((t) => ({ title: t, category: 'definitely_like' as const })),
|
||||
...rankingOutput.might_like.map((t) => ({ title: t, category: 'might_like' as const })),
|
||||
...rankingOutput.questionable.map((t) => ({ title: t, category: 'questionable' as const })),
|
||||
...rankingOutput.will_not_like.map((t) => ({ title: t, category: 'will_not_like' as const })),
|
||||
];
|
||||
const curatorBucketCount = getBucketCount(categorizedItems.length);
|
||||
const curatorItemBuckets = splitIntoBuckets(categorizedItems, curatorBucketCount);
|
||||
const curatorBucketRankings: RankingOutput[] = curatorItemBuckets.map((bucket) => ({
|
||||
full_match: bucket.filter((i) => i.category === 'full_match').map((i) => i.title),
|
||||
definitely_like: bucket.filter((i) => i.category === 'definitely_like').map((i) => i.title),
|
||||
might_like: bucket.filter((i) => i.category === 'might_like').map((i) => i.title),
|
||||
questionable: bucket.filter((i) => i.category === 'questionable').map((i) => i.title),
|
||||
will_not_like: bucket.filter((i) => i.category === 'will_not_like').map((i) => i.title),
|
||||
}));
|
||||
const curatorBucketOutputs = await Promise.all(
|
||||
curatorBucketRankings.map((ranking) =>
|
||||
runCurator(ranking, interpreterOutput, mediaType, useWebSearch)
|
||||
)
|
||||
);
|
||||
const curatorOutput = curatorBucketOutputs.flat();
|
||||
log(recId, `${stagePrefix}Curator: done (${Date.now() - t3}ms) — ${curatorOutput.length} items curated (${curatorBucketCount} buckets)`);
|
||||
sseWrite({ stage: p('curator'), status: 'done', data: curatorOutput });
|
||||
|
||||
return curatorOutput;
|
||||
}
|
||||
|
||||
export async function runPipeline(
|
||||
rec: RecommendationRecord,
|
||||
sseWrite: (event: SSEEvent) => void,
|
||||
@@ -58,8 +182,11 @@ export async function runPipeline(
|
||||
const startTime = Date.now();
|
||||
const mediaType = (rec.media_type ?? 'tv_show') as MediaType;
|
||||
const useWebSearch = rec.use_web_search ?? false;
|
||||
const useValidator = rec.use_validator ?? false;
|
||||
const useHardRequirements = rec.hard_requirements ?? false;
|
||||
const selfExpansive = rec.self_expansive ?? false;
|
||||
|
||||
log(rec.id, `Starting pipeline for "${rec.title}" [${mediaType}${useWebSearch ? ', web_search' : ''}]${feedbackContext ? ' (with feedback context)' : ''}`);
|
||||
log(rec.id, `Starting pipeline for "${rec.title}" [${mediaType}${useWebSearch ? ', web_search' : ''}${useValidator ? ', validator' : ''}${useHardRequirements ? ', hard_req' : ''}${selfExpansive ? `, expansive×${rec.expansive_passes}(${rec.expansive_mode})` : ''}]${feedbackContext ? ' (with feedback context)' : ''}`);
|
||||
|
||||
try {
|
||||
// Set status to running
|
||||
@@ -91,84 +218,69 @@ export async function runPipeline(
|
||||
});
|
||||
sseWrite({ stage: 'interpreter', status: 'done', data: interpreterOutput });
|
||||
|
||||
// --- Retrieval (bucketed) ---
|
||||
// --- Pass 1: Retrieval → [Validator?] → Ranking → Curator ---
|
||||
currentStage = 'retrieval';
|
||||
log(rec.id, 'Retrieval: start');
|
||||
sseWrite({ stage: 'retrieval', status: 'start' });
|
||||
const t1 = Date.now();
|
||||
const retrievalBucketCount = getBucketCount(rec.brainstorm_count);
|
||||
const perBucketCount = Math.ceil(rec.brainstorm_count / retrievalBucketCount);
|
||||
const retrievalBuckets = await Promise.all(
|
||||
Array.from({ length: retrievalBucketCount }, () =>
|
||||
runRetrieval(interpreterOutput, perBucketCount, mediaType, useWebSearch)
|
||||
)
|
||||
);
|
||||
const allCandidates = retrievalBuckets.flatMap((r) => r.candidates);
|
||||
const dedupedCandidates = deduplicateCandidates(allCandidates);
|
||||
const retrievalOutput = { candidates: dedupedCandidates };
|
||||
log(rec.id, `Retrieval: done (${Date.now() - t1}ms) — ${dedupedCandidates.length} candidates (${retrievalBucketCount} buckets, ${allCandidates.length} before dedup)`, {
|
||||
titles: dedupedCandidates.map((c) => c.title),
|
||||
const allSeenTitles = new Set<string>();
|
||||
const pass1Output = await runSubPipeline({
|
||||
recId: rec.id,
|
||||
interpreterOutput,
|
||||
mediaType,
|
||||
useWebSearch,
|
||||
useValidator,
|
||||
useHardRequirements,
|
||||
brainstormCount: rec.brainstorm_count,
|
||||
previousFullMatches: [],
|
||||
allSeenTitles,
|
||||
stagePrefix: '',
|
||||
sseWrite: (event) => {
|
||||
currentStage = event.stage;
|
||||
sseWrite(event);
|
||||
},
|
||||
});
|
||||
sseWrite({ stage: 'retrieval', status: 'done', data: retrievalOutput });
|
||||
|
||||
// --- Ranking (bucketed) ---
|
||||
currentStage = 'ranking';
|
||||
log(rec.id, 'Ranking: start');
|
||||
sseWrite({ stage: 'ranking', status: 'start' });
|
||||
const t2 = Date.now();
|
||||
const rankBucketCount = getBucketCount(dedupedCandidates.length);
|
||||
const candidateBuckets = splitIntoBuckets(dedupedCandidates, rankBucketCount);
|
||||
const rankingBuckets = await Promise.all(
|
||||
candidateBuckets.map((bucket) =>
|
||||
runRanking(interpreterOutput, { candidates: bucket }, mediaType)
|
||||
)
|
||||
);
|
||||
const rankingOutput: RankingOutput = {
|
||||
full_match: rankingBuckets.flatMap((r) => r.full_match),
|
||||
definitely_like: rankingBuckets.flatMap((r) => r.definitely_like),
|
||||
might_like: rankingBuckets.flatMap((r) => r.might_like),
|
||||
questionable: rankingBuckets.flatMap((r) => r.questionable),
|
||||
will_not_like: rankingBuckets.flatMap((r) => r.will_not_like),
|
||||
};
|
||||
log(rec.id, `Ranking: done (${Date.now() - t2}ms) — ${rankBucketCount} buckets`, {
|
||||
full_match: rankingOutput.full_match.length,
|
||||
definitely_like: rankingOutput.definitely_like.length,
|
||||
might_like: rankingOutput.might_like.length,
|
||||
questionable: rankingOutput.questionable.length,
|
||||
will_not_like: rankingOutput.will_not_like.length,
|
||||
});
|
||||
sseWrite({ stage: 'ranking', status: 'done', data: rankingOutput });
|
||||
let mergedOutput = pass1Output;
|
||||
|
||||
// --- Curator (bucketed) ---
|
||||
currentStage = 'curator';
|
||||
log(rec.id, 'Curator: start');
|
||||
sseWrite({ stage: 'curator', status: 'start' });
|
||||
const t3 = Date.now();
|
||||
type CategorizedItem = { title: string; category: keyof RankingOutput };
|
||||
const categorizedItems: CategorizedItem[] = [
|
||||
...rankingOutput.full_match.map((t) => ({ title: t, category: 'full_match' as const })),
|
||||
...rankingOutput.definitely_like.map((t) => ({ title: t, category: 'definitely_like' as const })),
|
||||
...rankingOutput.might_like.map((t) => ({ title: t, category: 'might_like' as const })),
|
||||
...rankingOutput.questionable.map((t) => ({ title: t, category: 'questionable' as const })),
|
||||
...rankingOutput.will_not_like.map((t) => ({ title: t, category: 'will_not_like' as const })),
|
||||
];
|
||||
const curatorBucketCount = getBucketCount(categorizedItems.length);
|
||||
const curatorItemBuckets = splitIntoBuckets(categorizedItems, curatorBucketCount);
|
||||
const curatorBucketRankings: RankingOutput[] = curatorItemBuckets.map((bucket) => ({
|
||||
full_match: bucket.filter((i) => i.category === 'full_match').map((i) => i.title),
|
||||
definitely_like: bucket.filter((i) => i.category === 'definitely_like').map((i) => i.title),
|
||||
might_like: bucket.filter((i) => i.category === 'might_like').map((i) => i.title),
|
||||
questionable: bucket.filter((i) => i.category === 'questionable').map((i) => i.title),
|
||||
will_not_like: bucket.filter((i) => i.category === 'will_not_like').map((i) => i.title),
|
||||
}));
|
||||
const curatorBucketOutputs = await Promise.all(
|
||||
curatorBucketRankings.map((ranking) =>
|
||||
runCurator(ranking, interpreterOutput, mediaType, useWebSearch)
|
||||
)
|
||||
);
|
||||
const curatorOutput = curatorBucketOutputs.flat();
|
||||
log(rec.id, `Curator: done (${Date.now() - t3}ms) — ${curatorOutput.length} items curated (${curatorBucketCount} buckets)`);
|
||||
sseWrite({ stage: 'curator', status: 'done', data: curatorOutput });
|
||||
// --- Self Expansive: extra passes ---
|
||||
if (selfExpansive && rec.expansive_passes > 0) {
|
||||
const allFullMatches = pass1Output
|
||||
.filter((c) => c.category === 'Full Match')
|
||||
.map((c) => c.title);
|
||||
|
||||
for (let i = 0; i < rec.expansive_passes; i++) {
|
||||
const passNum = i + 2;
|
||||
const passCount = rec.expansive_mode === 'extreme' ? rec.brainstorm_count : 60;
|
||||
const passPrefix = `pass${passNum}:` as const;
|
||||
|
||||
log(rec.id, `Self Expansive Pass ${passNum}: start (${passCount} candidates, ${allFullMatches.length} full matches as context)`);
|
||||
currentStage = `${passPrefix}retrieval` as SSEEvent['stage'];
|
||||
|
||||
const passOutput = await runSubPipeline({
|
||||
recId: rec.id,
|
||||
interpreterOutput,
|
||||
mediaType,
|
||||
useWebSearch,
|
||||
useValidator,
|
||||
useHardRequirements,
|
||||
brainstormCount: passCount,
|
||||
previousFullMatches: [...allFullMatches],
|
||||
allSeenTitles,
|
||||
stagePrefix: passPrefix,
|
||||
sseWrite: (event) => {
|
||||
currentStage = event.stage;
|
||||
sseWrite(event);
|
||||
},
|
||||
});
|
||||
|
||||
mergedOutput = mergeCuratorOutputs(mergedOutput, passOutput);
|
||||
|
||||
const newFullMatches = passOutput
|
||||
.filter((c) => c.category === 'Full Match')
|
||||
.map((c) => c.title);
|
||||
allFullMatches.push(...newFullMatches);
|
||||
|
||||
log(rec.id, `Self Expansive Pass ${passNum}: done — ${passOutput.length} new items, ${mergedOutput.length} total`);
|
||||
}
|
||||
}
|
||||
|
||||
// Generate AI title
|
||||
let aiTitle: string = rec.title;
|
||||
@@ -180,17 +292,27 @@ export async function runPipeline(
|
||||
log(rec.id, `Title generation failed, keeping initial title: ${String(err)}`);
|
||||
}
|
||||
|
||||
// Sort by category order before saving
|
||||
const CATEGORY_ORDER: Record<string, number> = {
|
||||
'Full Match': 0,
|
||||
'Definitely Like': 1,
|
||||
'Might Like': 2,
|
||||
'Questionable': 3,
|
||||
'Will Not Like': 4,
|
||||
};
|
||||
mergedOutput.sort((a, b) => (CATEGORY_ORDER[a.category] ?? 99) - (CATEGORY_ORDER[b.category] ?? 99));
|
||||
|
||||
// Save results to DB
|
||||
log(rec.id, 'Saving results to DB');
|
||||
await db
|
||||
.update(recommendations)
|
||||
.set({ recommendations: curatorOutput, status: 'done', title: aiTitle })
|
||||
.set({ recommendations: mergedOutput, status: 'done', title: aiTitle })
|
||||
.where(eq(recommendations.id, rec.id));
|
||||
|
||||
sseWrite({ stage: 'complete', status: 'done', data: { title: aiTitle } });
|
||||
|
||||
log(rec.id, `Pipeline complete (total: ${Date.now() - startTime}ms)`);
|
||||
return curatorOutput;
|
||||
return mergedOutput;
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
log(rec.id, `Pipeline error at stage "${currentStage}": ${message}`);
|
||||
|
||||
@@ -4,6 +4,7 @@ import { db } from '../db.js';
|
||||
import { recommendations, feedback } from '../db/schema.js';
|
||||
import { runPipeline } from '../pipelines/recommendation.js';
|
||||
import type { MediaType, SSEEvent } from '../types/agents.js';
|
||||
import { supportsWebSearch } from '../agent.js';
|
||||
|
||||
export default async function recommendationsRoute(fastify: FastifyInstance) {
|
||||
// POST /recommendations — create record, return { id }
|
||||
@@ -16,6 +17,11 @@ export default async function recommendationsRoute(fastify: FastifyInstance) {
|
||||
brainstorm_count?: number;
|
||||
media_type?: string;
|
||||
use_web_search?: boolean;
|
||||
use_validator?: boolean;
|
||||
hard_requirements?: boolean;
|
||||
self_expansive?: boolean;
|
||||
expansive_passes?: number;
|
||||
expansive_mode?: string;
|
||||
};
|
||||
|
||||
const title = (body.main_prompt ?? '')
|
||||
@@ -28,6 +34,12 @@ export default async function recommendationsRoute(fastify: FastifyInstance) {
|
||||
const brainstorm_count = Number.isFinite(rawCount) ? Math.min(200, Math.max(50, rawCount)) : 100;
|
||||
const media_type: MediaType = body.media_type === 'movie' ? 'movie' : 'tv_show';
|
||||
const use_web_search = body.use_web_search === true;
|
||||
const use_validator = body.use_validator === true && supportsWebSearch;
|
||||
const hard_requirements = body.hard_requirements === true;
|
||||
const self_expansive = body.self_expansive === true;
|
||||
const rawPasses = Number(body.expansive_passes ?? 2);
|
||||
const expansive_passes = Number.isFinite(rawPasses) ? Math.min(5, Math.max(1, rawPasses)) : 2;
|
||||
const expansive_mode = body.expansive_mode === 'extreme' ? 'extreme' : 'soft';
|
||||
|
||||
const [rec] = await db
|
||||
.insert(recommendations)
|
||||
@@ -40,6 +52,11 @@ export default async function recommendationsRoute(fastify: FastifyInstance) {
|
||||
brainstorm_count,
|
||||
media_type,
|
||||
use_web_search,
|
||||
use_validator,
|
||||
hard_requirements,
|
||||
self_expansive,
|
||||
expansive_passes,
|
||||
expansive_mode,
|
||||
status: 'pending',
|
||||
})
|
||||
.returning({ id: recommendations.id });
|
||||
|
||||
@@ -19,6 +19,16 @@ export interface RetrievalOutput {
|
||||
candidates: RetrievalCandidate[];
|
||||
}
|
||||
|
||||
export interface ValidatorCandidate {
|
||||
title: string;
|
||||
reason: string;
|
||||
isTrash: boolean;
|
||||
}
|
||||
|
||||
export interface ValidatorOutput {
|
||||
candidates: ValidatorCandidate[];
|
||||
}
|
||||
|
||||
export interface RankingOutput {
|
||||
full_match: string[];
|
||||
definitely_like: string[];
|
||||
@@ -38,7 +48,18 @@ export interface CuratorOutput {
|
||||
cons: string[];
|
||||
}
|
||||
|
||||
export type PipelineStage = 'interpreter' | 'retrieval' | 'ranking' | 'curator' | 'complete';
|
||||
export type PipelineStage =
|
||||
| 'interpreter'
|
||||
| 'retrieval'
|
||||
| 'validator'
|
||||
| 'ranking'
|
||||
| 'curator'
|
||||
| 'complete'
|
||||
| `pass${number}:retrieval`
|
||||
| `pass${number}:validator`
|
||||
| `pass${number}:ranking`
|
||||
| `pass${number}:curator`;
|
||||
|
||||
export type SSEStatus = 'start' | 'done' | 'error';
|
||||
|
||||
export interface SSEEvent {
|
||||
|
||||
Reference in New Issue
Block a user