Fix FFmpeg worker: align job data format with API and update content status

The worker was completely broken because of 4 mismatches with the API:

1. Field names: API sends {correlationId, inputKey, outputKey, inputBucket,
   outputBucket} but worker expected {contentId, sourceKey, outputPrefix}.
   All fields were undefined, so jobs silently failed.

2. No status callback: Worker never updated content status to 'completed',
   so projects never appeared as published (content stuck in 'processing').
   Now updates status directly in PostgreSQL.

3. Wrong bucket: Worker uploaded HLS to private bucket, but the stream
   controller checks the public bucket. Now uploads to outputBucket (public).

4. Wrong manifest name: Worker output index.m3u8 but codebase expects
   file.m3u8. Aligned with helper.ts convention.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Dorian
2026-02-13 21:07:24 +00:00
parent f6c4b9f06c
commit 7f78ac9ba6
2 changed files with 94 additions and 31 deletions

View File

@@ -11,7 +11,7 @@
* 4. Encrypt segments with AES-128 * 4. Encrypt segments with AES-128
* 5. Upload HLS output to MinIO * 5. Upload HLS output to MinIO
* 6. Store encryption key in PostgreSQL * 6. Store encryption key in PostgreSQL
* 7. Update API with job status * 7. Update content status in database
*/ */
import { Worker, Job } from 'bullmq'; import { Worker, Job } from 'bullmq';
@@ -23,11 +23,19 @@ import { join } from 'node:path';
import { tmpdir } from 'node:os'; import { tmpdir } from 'node:os';
import { Client } from 'pg'; import { Client } from 'pg';
/**
* Must match the Transcode type the API sends from
* backend/src/contents/types/transcode.ts
*/
interface TranscodeJobData { interface TranscodeJobData {
contentId: string; inputBucket: string;
sourceKey: string; outputBucket: string;
outputPrefix: string; inputKey: string;
bucket?: string; outputKey: string;
correlationId: string;
callbackUrl: string;
drmContentId?: string;
drmMediaId?: string;
} }
const s3 = new S3Client({ const s3 = new S3Client({
@@ -42,8 +50,18 @@ const s3 = new S3Client({
const privateBucket = process.env.S3_PRIVATE_BUCKET_NAME || 'indeedhub-private'; const privateBucket = process.env.S3_PRIVATE_BUCKET_NAME || 'indeedhub-private';
async function downloadFromS3(key: string, destPath: string) { function getPgClient(): Client {
const command = new GetObjectCommand({ Bucket: privateBucket, Key: key }); return new Client({
host: process.env.DATABASE_HOST || 'postgres',
port: Number(process.env.DATABASE_PORT || '5432'),
user: process.env.DATABASE_USER || 'indeedhub',
password: process.env.DATABASE_PASSWORD || 'indeedhub_dev_2026',
database: process.env.DATABASE_NAME || 'indeedhub',
});
}
async function downloadFromS3(key: string, destPath: string, bucket?: string) {
const command = new GetObjectCommand({ Bucket: bucket || privateBucket, Key: key });
const response = await s3.send(command); const response = await s3.send(command);
const chunks: Buffer[] = []; const chunks: Buffer[] = [];
for await (const chunk of response.Body as any) { for await (const chunk of response.Body as any) {
@@ -52,10 +70,10 @@ async function downloadFromS3(key: string, destPath: string) {
writeFileSync(destPath, Buffer.concat(chunks)); writeFileSync(destPath, Buffer.concat(chunks));
} }
async function uploadToS3(filePath: string, key: string, contentType: string) { async function uploadToS3(filePath: string, key: string, contentType: string, bucket?: string) {
const body = readFileSync(filePath); const body = readFileSync(filePath);
const command = new PutObjectCommand({ const command = new PutObjectCommand({
Bucket: privateBucket, Bucket: bucket || privateBucket,
Key: key, Key: key,
Body: body, Body: body,
ContentType: contentType, ContentType: contentType,
@@ -64,14 +82,7 @@ async function uploadToS3(filePath: string, key: string, contentType: string) {
} }
async function storeKey(contentId: string, keyData: Buffer, iv: Buffer) { async function storeKey(contentId: string, keyData: Buffer, iv: Buffer) {
const client = new Client({ const client = getPgClient();
host: process.env.DATABASE_HOST || 'postgres',
port: Number(process.env.DATABASE_PORT || '5432'),
user: process.env.DATABASE_USER || 'indeedhub',
password: process.env.DATABASE_PASSWORD || 'indeedhub_dev_2026',
database: process.env.DATABASE_NAME || 'indeedhub',
});
await client.connect(); await client.connect();
try { try {
await client.query( await client.query(
@@ -85,18 +96,55 @@ async function storeKey(contentId: string, keyData: Buffer, iv: Buffer) {
} }
} }
/**
* Update the content status directly in PostgreSQL.
* This replaces the API callback approach so the worker
* doesn't need authentication tokens.
*/
async function updateContentStatus(
contentId: string,
status: 'completed' | 'failed',
metadata?: Record<string, any>,
) {
const client = getPgClient();
await client.connect();
try {
await client.query(
`UPDATE contents SET status = $1, metadata = $2, updated_at = NOW() WHERE id = $3`,
[status, metadata ? JSON.stringify(metadata) : null, contentId],
);
console.log(`[transcode] Content ${contentId} status updated to '${status}'`);
} finally {
await client.end();
}
}
async function processJob(job: Job<TranscodeJobData>) { async function processJob(job: Job<TranscodeJobData>) {
const { contentId, sourceKey, outputPrefix } = job.data; const {
correlationId: contentId,
inputKey,
outputKey,
inputBucket,
} = job.data;
const workDir = mkdtempSync(join(tmpdir(), 'transcode-')); const workDir = mkdtempSync(join(tmpdir(), 'transcode-'));
try { try {
console.log(`[transcode] Starting job for content: ${contentId}`); console.log(`[transcode] Starting job for content: ${contentId}`);
console.log(`[transcode] Input: ${inputBucket}/${inputKey}`);
console.log(`[transcode] Output prefix: ${outputKey}`);
await job.updateProgress(5); await job.updateProgress(5);
// Step 1: Download source video // Resolve which bucket to upload HLS output to.
// The API specifies outputBucket (public bucket) — encrypted segments
// are safe to serve publicly since the AES key requires authentication.
const destBucket = job.data.outputBucket
|| process.env.S3_PUBLIC_BUCKET_NAME
|| 'indeedhub-public';
// Step 1: Download source video from MinIO
const sourcePath = join(workDir, 'source.mp4'); const sourcePath = join(workDir, 'source.mp4');
console.log(`[transcode] Downloading ${sourceKey}...`); console.log(`[transcode] Downloading ${inputKey} from ${inputBucket}...`);
await downloadFromS3(sourceKey, sourcePath); await downloadFromS3(inputKey, sourcePath, inputBucket);
await job.updateProgress(20); await job.updateProgress(20);
// Step 2: Generate AES-128 key and IV // Step 2: Generate AES-128 key and IV
@@ -121,6 +169,8 @@ async function processJob(job: Job<TranscodeJobData>) {
execSync(`mkdir -p ${hlsDir}`); execSync(`mkdir -p ${hlsDir}`);
// Multi-quality HLS with encryption // Multi-quality HLS with encryption
// Output as file.m3u8 to match the convention in helper.ts
// (getTranscodedFileRoute returns 'transcoded/file.m3u8')
const ffmpegCmd = [ const ffmpegCmd = [
'ffmpeg', '-i', sourcePath, 'ffmpeg', '-i', sourcePath,
'-hide_banner', '-loglevel', 'warning', '-hide_banner', '-loglevel', 'warning',
@@ -134,22 +184,24 @@ async function processJob(job: Job<TranscodeJobData>) {
'-hls_key_info_file', keyInfoPath, '-hls_key_info_file', keyInfoPath,
'-hls_segment_filename', join(hlsDir, 'seg_%03d.ts'), '-hls_segment_filename', join(hlsDir, 'seg_%03d.ts'),
'-f', 'hls', '-f', 'hls',
join(hlsDir, 'index.m3u8'), join(hlsDir, 'file.m3u8'),
].join(' '); ].join(' ');
execSync(ffmpegCmd, { stdio: 'pipe', timeout: 3600_000 }); execSync(ffmpegCmd, { stdio: 'pipe', timeout: 3600_000 });
await job.updateProgress(75); await job.updateProgress(75);
// Step 5: Upload HLS output to MinIO // Step 5: Upload HLS output to MinIO public bucket
console.log(`[transcode] Uploading HLS segments to MinIO...`); // Segments are AES-128 encrypted, so public access is safe.
// The decryption key is served via /api/contents/:id/key (requires auth).
console.log(`[transcode] Uploading HLS segments to ${destBucket}...`);
const files = readdirSync(hlsDir); const files = readdirSync(hlsDir);
for (const file of files) { for (const file of files) {
const filePath = join(hlsDir, file); const filePath = join(hlsDir, file);
const s3Key = `${outputPrefix}/${file}`; const s3Key = `${outputKey}/${file}`;
const ct = file.endsWith('.m3u8') const ct = file.endsWith('.m3u8')
? 'application/vnd.apple.mpegurl' ? 'application/vnd.apple.mpegurl'
: 'video/MP2T'; : 'video/MP2T';
await uploadToS3(filePath, s3Key, ct); await uploadToS3(filePath, s3Key, ct, destBucket);
} }
await job.updateProgress(90); await job.updateProgress(90);
@@ -157,13 +209,24 @@ async function processJob(job: Job<TranscodeJobData>) {
console.log(`[transcode] Storing encryption key...`); console.log(`[transcode] Storing encryption key...`);
await storeKey(contentId, keyData, iv); await storeKey(contentId, keyData, iv);
// Step 7: Mark content as completed in the database
const manifestKey = `${outputKey}/file.m3u8`;
await updateContentStatus(contentId, 'completed', {
manifestKey,
segments: files.length,
});
await job.updateProgress(100); await job.updateProgress(100);
console.log(`[transcode] Job complete for content: ${contentId}`); console.log(`[transcode] Job complete for content: ${contentId}`);
return { return { manifestKey, segments: files.length };
manifestKey: `${outputPrefix}/index.m3u8`, } catch (err) {
segments: files.length, // Mark content as failed so it can be retried
}; console.error(`[transcode] Job failed for content ${contentId}:`, err);
await updateContentStatus(contentId, 'failed').catch((e) =>
console.error(`[transcode] Failed to update status:`, e),
);
throw err;
} finally { } finally {
// Clean up temp directory // Clean up temp directory
rmSync(workDir, { recursive: true, force: true }); rmSync(workDir, { recursive: true, force: true });

View File

@@ -179,7 +179,7 @@ services:
context: ./backend context: ./backend
dockerfile: Dockerfile.ffmpeg dockerfile: Dockerfile.ffmpeg
args: args:
CACHEBUST: "9" CACHEBUST: "12"
restart: unless-stopped restart: unless-stopped
environment: environment:
ENVIRONMENT: production ENVIRONMENT: production