initial commit
This commit is contained in:
118
packages/backend/src/pipelines/recommendation.ts
Normal file
118
packages/backend/src/pipelines/recommendation.ts
Normal file
@@ -0,0 +1,118 @@
|
||||
import { eq } from 'drizzle-orm';
|
||||
import { db } from '../db.js';
|
||||
import { recommendations } from '../db/schema.js';
|
||||
import { runInterpreter } from '../agents/interpreter.js';
|
||||
import { runRetrieval } from '../agents/retrieval.js';
|
||||
import { runRanking } from '../agents/ranking.js';
|
||||
import { runCurator } from '../agents/curator.js';
|
||||
import type { CuratorOutput, SSEEvent } from '../types/agents.js';
|
||||
|
||||
type RecommendationRecord = typeof recommendations.$inferSelect;
|
||||
|
||||
function log(recId: string, msg: string, data?: unknown) {
|
||||
const ts = new Date().toISOString();
|
||||
if (data !== undefined) {
|
||||
console.log(`[pipeline] [${ts}] [${recId}] ${msg}`, data);
|
||||
} else {
|
||||
console.log(`[pipeline] [${ts}] [${recId}] ${msg}`);
|
||||
}
|
||||
}
|
||||
|
||||
export async function runPipeline(
|
||||
rec: RecommendationRecord,
|
||||
sseWrite: (event: SSEEvent) => void,
|
||||
feedbackContext?: string,
|
||||
): Promise<CuratorOutput[]> {
|
||||
let currentStage: SSEEvent['stage'] = 'interpreter';
|
||||
const startTime = Date.now();
|
||||
|
||||
log(rec.id, `Starting pipeline for "${rec.title}"${feedbackContext ? ' (with feedback context)' : ''}`);
|
||||
|
||||
try {
|
||||
// Set status to running
|
||||
log(rec.id, 'Setting status → running');
|
||||
await db
|
||||
.update(recommendations)
|
||||
.set({ status: 'running' })
|
||||
.where(eq(recommendations.id, rec.id));
|
||||
|
||||
// --- Interpreter ---
|
||||
currentStage = 'interpreter';
|
||||
log(rec.id, 'Interpreter: start');
|
||||
sseWrite({ stage: 'interpreter', status: 'start' });
|
||||
const t0 = Date.now();
|
||||
const interpreterOutput = await runInterpreter({
|
||||
main_prompt: rec.main_prompt,
|
||||
liked_shows: rec.liked_shows,
|
||||
disliked_shows: rec.disliked_shows,
|
||||
themes: rec.themes,
|
||||
...(feedbackContext !== undefined ? { feedback_context: feedbackContext } : {}),
|
||||
});
|
||||
log(rec.id, `Interpreter: done (${Date.now() - t0}ms)`, {
|
||||
liked: interpreterOutput.liked,
|
||||
disliked: interpreterOutput.disliked,
|
||||
themes: interpreterOutput.themes,
|
||||
tone: interpreterOutput.tone,
|
||||
avoid: interpreterOutput.avoid,
|
||||
});
|
||||
sseWrite({ stage: 'interpreter', status: 'done', data: interpreterOutput });
|
||||
|
||||
// --- Retrieval ---
|
||||
currentStage = 'retrieval';
|
||||
log(rec.id, 'Retrieval: start');
|
||||
sseWrite({ stage: 'retrieval', status: 'start' });
|
||||
const t1 = Date.now();
|
||||
const retrievalOutput = await runRetrieval(interpreterOutput);
|
||||
log(rec.id, `Retrieval: done (${Date.now() - t1}ms) — ${retrievalOutput.candidates.length} candidates`, {
|
||||
titles: retrievalOutput.candidates.map((c) => c.title),
|
||||
});
|
||||
sseWrite({ stage: 'retrieval', status: 'done', data: retrievalOutput });
|
||||
|
||||
// --- Ranking ---
|
||||
currentStage = 'ranking';
|
||||
log(rec.id, 'Ranking: start');
|
||||
sseWrite({ stage: 'ranking', status: 'start' });
|
||||
const t2 = Date.now();
|
||||
const rankingOutput = await runRanking(interpreterOutput, retrievalOutput);
|
||||
log(rec.id, `Ranking: done (${Date.now() - t2}ms)`, {
|
||||
definitely_like: rankingOutput.definitely_like.length,
|
||||
might_like: rankingOutput.might_like.length,
|
||||
questionable: rankingOutput.questionable.length,
|
||||
will_not_like: rankingOutput.will_not_like.length,
|
||||
});
|
||||
sseWrite({ stage: 'ranking', status: 'done', data: rankingOutput });
|
||||
|
||||
// --- Curator ---
|
||||
currentStage = 'curator';
|
||||
log(rec.id, 'Curator: start');
|
||||
sseWrite({ stage: 'curator', status: 'start' });
|
||||
const t3 = Date.now();
|
||||
const curatorOutput = await runCurator(rankingOutput, interpreterOutput);
|
||||
log(rec.id, `Curator: done (${Date.now() - t3}ms) — ${curatorOutput.length} shows curated`);
|
||||
sseWrite({ stage: 'curator', status: 'done', data: curatorOutput });
|
||||
|
||||
// Save results to DB
|
||||
log(rec.id, 'Saving results to DB');
|
||||
await db
|
||||
.update(recommendations)
|
||||
.set({ recommendations: curatorOutput, status: 'done' })
|
||||
.where(eq(recommendations.id, rec.id));
|
||||
|
||||
sseWrite({ stage: 'complete', status: 'done' });
|
||||
|
||||
log(rec.id, `Pipeline complete (total: ${Date.now() - startTime}ms)`);
|
||||
return curatorOutput;
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
log(rec.id, `Pipeline error at stage "${currentStage}": ${message}`);
|
||||
|
||||
sseWrite({ stage: currentStage, status: 'error', data: { message } });
|
||||
|
||||
await db
|
||||
.update(recommendations)
|
||||
.set({ status: 'error' })
|
||||
.where(eq(recommendations.id, rec.id));
|
||||
|
||||
return [];
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user