Convert AI tagging to background job with progress tracking
- Add TaggingJob model for tracking tagging progress - Convert batch tagging to background job processing (prevents timeouts) - Add real-time progress polling in UI with percentage/count display - Add admin notifications when tagging job completes or fails - Export getTaggingSettings and getAvailableTags functions After deployment, run: npx prisma migrate deploy Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -1,14 +1,173 @@
|
||||
import { z } from 'zod'
|
||||
import { TRPCError } from '@trpc/server'
|
||||
import { router, adminProcedure, protectedProcedure } from '../trpc'
|
||||
import { prisma } from '@/lib/prisma'
|
||||
import {
|
||||
tagProject,
|
||||
batchTagProjects,
|
||||
batchTagProgramProjects,
|
||||
getTagSuggestions,
|
||||
addProjectTag,
|
||||
removeProjectTag,
|
||||
getTaggingSettings,
|
||||
getAvailableTags,
|
||||
} from '../services/ai-tagging'
|
||||
import {
|
||||
createNotification,
|
||||
notifyAdmins,
|
||||
NotificationTypes,
|
||||
} from '../services/in-app-notification'
|
||||
|
||||
// Background job runner for tagging
|
||||
async function runTaggingJob(jobId: string, userId: string) {
|
||||
const job = await prisma.taggingJob.findUnique({
|
||||
where: { id: jobId },
|
||||
})
|
||||
|
||||
if (!job) {
|
||||
console.error(`[AI Tagging Job] Job not found: ${jobId}`)
|
||||
return
|
||||
}
|
||||
|
||||
console.log(`[AI Tagging Job] Starting job ${jobId}...`)
|
||||
|
||||
// Mark as running
|
||||
await prisma.taggingJob.update({
|
||||
where: { id: jobId },
|
||||
data: { status: 'RUNNING', startedAt: new Date() },
|
||||
})
|
||||
|
||||
try {
|
||||
// Get settings and tags
|
||||
const settings = await getTaggingSettings()
|
||||
if (!settings.enabled) {
|
||||
throw new Error('AI tagging is not enabled')
|
||||
}
|
||||
|
||||
const availableTags = await getAvailableTags()
|
||||
if (availableTags.length === 0) {
|
||||
throw new Error('No expertise tags configured')
|
||||
}
|
||||
|
||||
// Get projects to tag
|
||||
const whereClause = job.programId
|
||||
? { round: { programId: job.programId } }
|
||||
: { roundId: job.roundId! }
|
||||
|
||||
const allProjects = await prisma.project.findMany({
|
||||
where: whereClause,
|
||||
select: { id: true, title: true, tags: true },
|
||||
})
|
||||
|
||||
const untaggedProjects = allProjects.filter(p => p.tags.length === 0)
|
||||
const skippedCount = allProjects.length - untaggedProjects.length
|
||||
|
||||
console.log(`[AI Tagging Job] Found ${untaggedProjects.length} untagged projects (${skippedCount} already tagged)`)
|
||||
|
||||
await prisma.taggingJob.update({
|
||||
where: { id: jobId },
|
||||
data: {
|
||||
totalProjects: untaggedProjects.length,
|
||||
skippedCount,
|
||||
},
|
||||
})
|
||||
|
||||
if (untaggedProjects.length === 0) {
|
||||
await prisma.taggingJob.update({
|
||||
where: { id: jobId },
|
||||
data: {
|
||||
status: 'COMPLETED',
|
||||
completedAt: new Date(),
|
||||
},
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
let taggedCount = 0
|
||||
let failedCount = 0
|
||||
const errors: string[] = []
|
||||
const startTime = Date.now()
|
||||
|
||||
for (let i = 0; i < untaggedProjects.length; i++) {
|
||||
const project = untaggedProjects[i]
|
||||
console.log(`[AI Tagging Job] Processing ${i + 1}/${untaggedProjects.length}: "${project.title.substring(0, 40)}..."`)
|
||||
|
||||
try {
|
||||
const result = await tagProject(project.id, userId)
|
||||
taggedCount++
|
||||
console.log(`[AI Tagging Job] ✓ Tagged with ${result.applied.length} tags`)
|
||||
} catch (error) {
|
||||
failedCount++
|
||||
const errorMsg = error instanceof Error ? error.message : 'Unknown error'
|
||||
errors.push(`${project.title}: ${errorMsg}`)
|
||||
console.error(`[AI Tagging Job] ✗ Failed: ${errorMsg}`)
|
||||
}
|
||||
|
||||
// Update progress
|
||||
await prisma.taggingJob.update({
|
||||
where: { id: jobId },
|
||||
data: {
|
||||
processedCount: i + 1,
|
||||
taggedCount,
|
||||
failedCount,
|
||||
errorsJson: errors.length > 0 ? errors.slice(0, 20) : undefined, // Keep last 20 errors
|
||||
},
|
||||
})
|
||||
|
||||
// Log progress every 10 projects
|
||||
if ((i + 1) % 10 === 0) {
|
||||
const elapsed = ((Date.now() - startTime) / 1000).toFixed(0)
|
||||
const avgTime = (Date.now() - startTime) / (i + 1) / 1000
|
||||
const remaining = avgTime * (untaggedProjects.length - i - 1)
|
||||
console.log(`[AI Tagging Job] Progress: ${i + 1}/${untaggedProjects.length} (${elapsed}s elapsed, ~${remaining.toFixed(0)}s remaining)`)
|
||||
}
|
||||
}
|
||||
|
||||
const totalTime = ((Date.now() - startTime) / 1000).toFixed(1)
|
||||
console.log(`[AI Tagging Job] Complete: ${taggedCount} tagged, ${failedCount} failed in ${totalTime}s`)
|
||||
|
||||
// Mark as completed
|
||||
await prisma.taggingJob.update({
|
||||
where: { id: jobId },
|
||||
data: {
|
||||
status: 'COMPLETED',
|
||||
completedAt: new Date(),
|
||||
errorsJson: errors.length > 0 ? errors : undefined,
|
||||
},
|
||||
})
|
||||
|
||||
// Send notification to admins
|
||||
await notifyAdmins({
|
||||
type: NotificationTypes.AI_SUGGESTIONS_READY,
|
||||
title: 'AI Tagging Complete',
|
||||
message: `Tagged ${taggedCount} projects${failedCount > 0 ? `, ${failedCount} failed` : ''}${skippedCount > 0 ? `, ${skippedCount} already had tags` : ''}.`,
|
||||
linkUrl: '/admin/projects',
|
||||
linkLabel: 'View Projects',
|
||||
priority: 'normal',
|
||||
metadata: { jobId, taggedCount, failedCount, skippedCount },
|
||||
})
|
||||
|
||||
} catch (error) {
|
||||
const errorMsg = error instanceof Error ? error.message : 'Unknown error'
|
||||
console.error(`[AI Tagging Job] Job failed: ${errorMsg}`)
|
||||
|
||||
await prisma.taggingJob.update({
|
||||
where: { id: jobId },
|
||||
data: {
|
||||
status: 'FAILED',
|
||||
completedAt: new Date(),
|
||||
errorMessage: errorMsg,
|
||||
},
|
||||
})
|
||||
|
||||
// Notify about failure
|
||||
await notifyAdmins({
|
||||
type: NotificationTypes.SYSTEM_ERROR,
|
||||
title: 'AI Tagging Failed',
|
||||
message: errorMsg,
|
||||
linkUrl: '/admin/projects',
|
||||
priority: 'high',
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
export const tagRouter = router({
|
||||
/**
|
||||
@@ -468,59 +627,190 @@ export const tagRouter = router({
|
||||
}),
|
||||
|
||||
/**
|
||||
* Batch tag all untagged projects in a round
|
||||
* Start a background tagging job for a round
|
||||
*/
|
||||
startTaggingJob: adminProcedure
|
||||
.input(z.object({
|
||||
roundId: z.string().optional(),
|
||||
programId: z.string().optional(),
|
||||
}))
|
||||
.mutation(async ({ ctx, input }) => {
|
||||
if (!input.roundId && !input.programId) {
|
||||
throw new TRPCError({
|
||||
code: 'BAD_REQUEST',
|
||||
message: 'Either roundId or programId is required',
|
||||
})
|
||||
}
|
||||
|
||||
// Check for existing running job
|
||||
const existingJob = await ctx.prisma.taggingJob.findFirst({
|
||||
where: {
|
||||
OR: [
|
||||
{ roundId: input.roundId, status: { in: ['PENDING', 'RUNNING'] } },
|
||||
{ programId: input.programId, status: { in: ['PENDING', 'RUNNING'] } },
|
||||
],
|
||||
},
|
||||
})
|
||||
|
||||
if (existingJob) {
|
||||
throw new TRPCError({
|
||||
code: 'CONFLICT',
|
||||
message: 'A tagging job is already running',
|
||||
})
|
||||
}
|
||||
|
||||
// Create the job
|
||||
const job = await ctx.prisma.taggingJob.create({
|
||||
data: {
|
||||
roundId: input.roundId,
|
||||
programId: input.programId,
|
||||
status: 'PENDING',
|
||||
},
|
||||
})
|
||||
|
||||
// Audit log
|
||||
await ctx.prisma.auditLog.create({
|
||||
data: {
|
||||
userId: ctx.user.id,
|
||||
action: 'START_AI_TAG_JOB',
|
||||
entityType: input.programId ? 'Program' : 'Round',
|
||||
entityId: input.programId || input.roundId!,
|
||||
detailsJson: { jobId: job.id },
|
||||
ipAddress: ctx.ip,
|
||||
userAgent: ctx.userAgent,
|
||||
},
|
||||
})
|
||||
|
||||
// Start job in background (don't await)
|
||||
runTaggingJob(job.id, ctx.user.id).catch((error) => {
|
||||
console.error('[AI Tagging] Background job error:', error)
|
||||
})
|
||||
|
||||
return { jobId: job.id }
|
||||
}),
|
||||
|
||||
/**
|
||||
* Get tagging job status
|
||||
*/
|
||||
getTaggingJobStatus: adminProcedure
|
||||
.input(z.object({ jobId: z.string() }))
|
||||
.query(async ({ ctx, input }) => {
|
||||
const job = await ctx.prisma.taggingJob.findUnique({
|
||||
where: { id: input.jobId },
|
||||
})
|
||||
|
||||
if (!job) {
|
||||
throw new TRPCError({
|
||||
code: 'NOT_FOUND',
|
||||
message: 'Job not found',
|
||||
})
|
||||
}
|
||||
|
||||
return {
|
||||
id: job.id,
|
||||
status: job.status,
|
||||
totalProjects: job.totalProjects,
|
||||
processedCount: job.processedCount,
|
||||
taggedCount: job.taggedCount,
|
||||
skippedCount: job.skippedCount,
|
||||
failedCount: job.failedCount,
|
||||
errorMessage: job.errorMessage,
|
||||
errors: job.errorsJson as string[] | null,
|
||||
startedAt: job.startedAt,
|
||||
completedAt: job.completedAt,
|
||||
}
|
||||
}),
|
||||
|
||||
/**
|
||||
* Get latest tagging job for a round or program
|
||||
*/
|
||||
getLatestTaggingJob: adminProcedure
|
||||
.input(z.object({
|
||||
roundId: z.string().optional(),
|
||||
programId: z.string().optional(),
|
||||
}))
|
||||
.query(async ({ ctx, input }) => {
|
||||
const job = await ctx.prisma.taggingJob.findFirst({
|
||||
where: {
|
||||
OR: [
|
||||
input.roundId ? { roundId: input.roundId } : {},
|
||||
input.programId ? { programId: input.programId } : {},
|
||||
].filter(o => Object.keys(o).length > 0),
|
||||
},
|
||||
orderBy: { createdAt: 'desc' },
|
||||
})
|
||||
|
||||
if (!job) {
|
||||
return null
|
||||
}
|
||||
|
||||
return {
|
||||
id: job.id,
|
||||
status: job.status,
|
||||
totalProjects: job.totalProjects,
|
||||
processedCount: job.processedCount,
|
||||
taggedCount: job.taggedCount,
|
||||
skippedCount: job.skippedCount,
|
||||
failedCount: job.failedCount,
|
||||
errorMessage: job.errorMessage,
|
||||
errors: job.errorsJson as string[] | null,
|
||||
startedAt: job.startedAt,
|
||||
completedAt: job.completedAt,
|
||||
createdAt: job.createdAt,
|
||||
}
|
||||
}),
|
||||
|
||||
// Legacy endpoints kept for backward compatibility (redirect to job-based)
|
||||
/**
|
||||
* @deprecated Use startTaggingJob instead
|
||||
*/
|
||||
batchTagProjects: adminProcedure
|
||||
.input(z.object({ roundId: z.string() }))
|
||||
.mutation(async ({ ctx, input }) => {
|
||||
const result = await batchTagProjects(input.roundId, ctx.user.id)
|
||||
|
||||
// Audit log
|
||||
await ctx.prisma.auditLog.create({
|
||||
// Start job and return immediately with placeholder
|
||||
const job = await ctx.prisma.taggingJob.create({
|
||||
data: {
|
||||
userId: ctx.user.id,
|
||||
action: 'BATCH_AI_TAG',
|
||||
entityType: 'Round',
|
||||
entityId: input.roundId,
|
||||
detailsJson: {
|
||||
processed: result.processed,
|
||||
failed: result.failed,
|
||||
skipped: result.skipped,
|
||||
},
|
||||
ipAddress: ctx.ip,
|
||||
userAgent: ctx.userAgent,
|
||||
roundId: input.roundId,
|
||||
status: 'PENDING',
|
||||
},
|
||||
})
|
||||
|
||||
return result
|
||||
runTaggingJob(job.id, ctx.user.id).catch(console.error)
|
||||
|
||||
return {
|
||||
processed: 0,
|
||||
failed: 0,
|
||||
skipped: 0,
|
||||
errors: [],
|
||||
jobId: job.id,
|
||||
message: 'Tagging job started in background. Check job status for progress.',
|
||||
}
|
||||
}),
|
||||
|
||||
/**
|
||||
* Batch tag all untagged projects in an entire program (edition)
|
||||
* @deprecated Use startTaggingJob instead
|
||||
*/
|
||||
batchTagProgramProjects: adminProcedure
|
||||
.input(z.object({ programId: z.string() }))
|
||||
.mutation(async ({ ctx, input }) => {
|
||||
const result = await batchTagProgramProjects(input.programId, ctx.user.id)
|
||||
|
||||
// Audit log
|
||||
await ctx.prisma.auditLog.create({
|
||||
// Start job and return immediately with placeholder
|
||||
const job = await ctx.prisma.taggingJob.create({
|
||||
data: {
|
||||
userId: ctx.user.id,
|
||||
action: 'BATCH_AI_TAG',
|
||||
entityType: 'Program',
|
||||
entityId: input.programId,
|
||||
detailsJson: {
|
||||
processed: result.processed,
|
||||
failed: result.failed,
|
||||
skipped: result.skipped,
|
||||
},
|
||||
ipAddress: ctx.ip,
|
||||
userAgent: ctx.userAgent,
|
||||
programId: input.programId,
|
||||
status: 'PENDING',
|
||||
},
|
||||
})
|
||||
|
||||
return result
|
||||
runTaggingJob(job.id, ctx.user.id).catch(console.error)
|
||||
|
||||
return {
|
||||
processed: 0,
|
||||
failed: 0,
|
||||
skipped: 0,
|
||||
errors: [],
|
||||
jobId: job.id,
|
||||
message: 'Tagging job started in background. Check job status for progress.',
|
||||
}
|
||||
}),
|
||||
|
||||
/**
|
||||
|
||||
@@ -94,7 +94,7 @@ Rules:
|
||||
/**
|
||||
* Get system settings for AI tagging
|
||||
*/
|
||||
async function getTaggingSettings(): Promise<{
|
||||
export async function getTaggingSettings(): Promise<{
|
||||
enabled: boolean
|
||||
maxTags: number
|
||||
}> {
|
||||
@@ -125,7 +125,7 @@ async function getTaggingSettings(): Promise<{
|
||||
/**
|
||||
* Get all active expertise tags
|
||||
*/
|
||||
async function getAvailableTags(): Promise<AvailableTag[]> {
|
||||
export async function getAvailableTags(): Promise<AvailableTag[]> {
|
||||
return prisma.expertiseTag.findMany({
|
||||
where: { isActive: true },
|
||||
select: {
|
||||
|
||||
Reference in New Issue
Block a user