diff --git a/backend/src/ffmpeg-worker/worker.ts b/backend/src/ffmpeg-worker/worker.ts index 505e232..3eb2b54 100644 --- a/backend/src/ffmpeg-worker/worker.ts +++ b/backend/src/ffmpeg-worker/worker.ts @@ -11,7 +11,7 @@ * 4. Encrypt segments with AES-128 * 5. Upload HLS output to MinIO * 6. Store encryption key in PostgreSQL - * 7. Update API with job status + * 7. Update content status in database */ import { Worker, Job } from 'bullmq'; @@ -23,11 +23,19 @@ import { join } from 'node:path'; import { tmpdir } from 'node:os'; import { Client } from 'pg'; +/** + * Must match the Transcode type the API sends from + * backend/src/contents/types/transcode.ts + */ interface TranscodeJobData { - contentId: string; - sourceKey: string; - outputPrefix: string; - bucket?: string; + inputBucket: string; + outputBucket: string; + inputKey: string; + outputKey: string; + correlationId: string; + callbackUrl: string; + drmContentId?: string; + drmMediaId?: string; } const s3 = new S3Client({ @@ -42,8 +50,18 @@ const s3 = new S3Client({ const privateBucket = process.env.S3_PRIVATE_BUCKET_NAME || 'indeedhub-private'; -async function downloadFromS3(key: string, destPath: string) { - const command = new GetObjectCommand({ Bucket: privateBucket, Key: key }); +function getPgClient(): Client { + return new Client({ + host: process.env.DATABASE_HOST || 'postgres', + port: Number(process.env.DATABASE_PORT || '5432'), + user: process.env.DATABASE_USER || 'indeedhub', + password: process.env.DATABASE_PASSWORD || 'indeedhub_dev_2026', + database: process.env.DATABASE_NAME || 'indeedhub', + }); +} + +async function downloadFromS3(key: string, destPath: string, bucket?: string) { + const command = new GetObjectCommand({ Bucket: bucket || privateBucket, Key: key }); const response = await s3.send(command); const chunks: Buffer[] = []; for await (const chunk of response.Body as any) { @@ -52,10 +70,10 @@ async function downloadFromS3(key: string, destPath: string) { writeFileSync(destPath, Buffer.concat(chunks)); } -async function uploadToS3(filePath: string, key: string, contentType: string) { +async function uploadToS3(filePath: string, key: string, contentType: string, bucket?: string) { const body = readFileSync(filePath); const command = new PutObjectCommand({ - Bucket: privateBucket, + Bucket: bucket || privateBucket, Key: key, Body: body, ContentType: contentType, @@ -64,14 +82,7 @@ async function uploadToS3(filePath: string, key: string, contentType: string) { } async function storeKey(contentId: string, keyData: Buffer, iv: Buffer) { - const client = new Client({ - host: process.env.DATABASE_HOST || 'postgres', - port: Number(process.env.DATABASE_PORT || '5432'), - user: process.env.DATABASE_USER || 'indeedhub', - password: process.env.DATABASE_PASSWORD || 'indeedhub_dev_2026', - database: process.env.DATABASE_NAME || 'indeedhub', - }); - + const client = getPgClient(); await client.connect(); try { await client.query( @@ -85,18 +96,55 @@ async function storeKey(contentId: string, keyData: Buffer, iv: Buffer) { } } +/** + * Update the content status directly in PostgreSQL. + * This replaces the API callback approach so the worker + * doesn't need authentication tokens. + */ +async function updateContentStatus( + contentId: string, + status: 'completed' | 'failed', + metadata?: Record, +) { + const client = getPgClient(); + await client.connect(); + try { + await client.query( + `UPDATE contents SET status = $1, metadata = $2, updated_at = NOW() WHERE id = $3`, + [status, metadata ? JSON.stringify(metadata) : null, contentId], + ); + console.log(`[transcode] Content ${contentId} status updated to '${status}'`); + } finally { + await client.end(); + } +} + async function processJob(job: Job) { - const { contentId, sourceKey, outputPrefix } = job.data; + const { + correlationId: contentId, + inputKey, + outputKey, + inputBucket, + } = job.data; const workDir = mkdtempSync(join(tmpdir(), 'transcode-')); try { console.log(`[transcode] Starting job for content: ${contentId}`); + console.log(`[transcode] Input: ${inputBucket}/${inputKey}`); + console.log(`[transcode] Output prefix: ${outputKey}`); await job.updateProgress(5); - // Step 1: Download source video + // Resolve which bucket to upload HLS output to. + // The API specifies outputBucket (public bucket) — encrypted segments + // are safe to serve publicly since the AES key requires authentication. + const destBucket = job.data.outputBucket + || process.env.S3_PUBLIC_BUCKET_NAME + || 'indeedhub-public'; + + // Step 1: Download source video from MinIO const sourcePath = join(workDir, 'source.mp4'); - console.log(`[transcode] Downloading ${sourceKey}...`); - await downloadFromS3(sourceKey, sourcePath); + console.log(`[transcode] Downloading ${inputKey} from ${inputBucket}...`); + await downloadFromS3(inputKey, sourcePath, inputBucket); await job.updateProgress(20); // Step 2: Generate AES-128 key and IV @@ -121,6 +169,8 @@ async function processJob(job: Job) { execSync(`mkdir -p ${hlsDir}`); // Multi-quality HLS with encryption + // Output as file.m3u8 to match the convention in helper.ts + // (getTranscodedFileRoute returns 'transcoded/file.m3u8') const ffmpegCmd = [ 'ffmpeg', '-i', sourcePath, '-hide_banner', '-loglevel', 'warning', @@ -134,22 +184,24 @@ async function processJob(job: Job) { '-hls_key_info_file', keyInfoPath, '-hls_segment_filename', join(hlsDir, 'seg_%03d.ts'), '-f', 'hls', - join(hlsDir, 'index.m3u8'), + join(hlsDir, 'file.m3u8'), ].join(' '); execSync(ffmpegCmd, { stdio: 'pipe', timeout: 3600_000 }); await job.updateProgress(75); - // Step 5: Upload HLS output to MinIO - console.log(`[transcode] Uploading HLS segments to MinIO...`); + // Step 5: Upload HLS output to MinIO public bucket + // Segments are AES-128 encrypted, so public access is safe. + // The decryption key is served via /api/contents/:id/key (requires auth). + console.log(`[transcode] Uploading HLS segments to ${destBucket}...`); const files = readdirSync(hlsDir); for (const file of files) { const filePath = join(hlsDir, file); - const s3Key = `${outputPrefix}/${file}`; + const s3Key = `${outputKey}/${file}`; const ct = file.endsWith('.m3u8') ? 'application/vnd.apple.mpegurl' : 'video/MP2T'; - await uploadToS3(filePath, s3Key, ct); + await uploadToS3(filePath, s3Key, ct, destBucket); } await job.updateProgress(90); @@ -157,13 +209,24 @@ async function processJob(job: Job) { console.log(`[transcode] Storing encryption key...`); await storeKey(contentId, keyData, iv); + // Step 7: Mark content as completed in the database + const manifestKey = `${outputKey}/file.m3u8`; + await updateContentStatus(contentId, 'completed', { + manifestKey, + segments: files.length, + }); + await job.updateProgress(100); console.log(`[transcode] Job complete for content: ${contentId}`); - return { - manifestKey: `${outputPrefix}/index.m3u8`, - segments: files.length, - }; + return { manifestKey, segments: files.length }; + } catch (err) { + // Mark content as failed so it can be retried + console.error(`[transcode] Job failed for content ${contentId}:`, err); + await updateContentStatus(contentId, 'failed').catch((e) => + console.error(`[transcode] Failed to update status:`, e), + ); + throw err; } finally { // Clean up temp directory rmSync(workDir, { recursive: true, force: true }); diff --git a/docker-compose.yml b/docker-compose.yml index cc2e57f..d75f3f1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -179,7 +179,7 @@ services: context: ./backend dockerfile: Dockerfile.ffmpeg args: - CACHEBUST: "9" + CACHEBUST: "12" restart: unless-stopped environment: ENVIRONMENT: production