import { ECSClient, UpdateServiceCommand, DescribeServicesCommand, } from '@aws-sdk/client-ecs'; import { AutoScalingClient, UpdateAutoScalingGroupCommand, DescribeAutoScalingGroupsCommand, } from '@aws-sdk/client-auto-scaling'; import { InjectQueue } from '@nestjs/bullmq'; import { Injectable, Logger } from '@nestjs/common'; import { Cron, CronExpression } from '@nestjs/schedule'; import { Queue } from 'bullmq'; import { Transcode } from 'src/contents/types/transcode'; @Injectable() export class TranscodingServerService { ecs: ECSClient; autoScaling: AutoScalingClient; clusterName = 'indeehub-cluster-transcoding'; serviceName = 'transcoding-api-service-production'; autoScalingGroup = 'transcoding-api-ecs-asg-production'; minTasks = 0; maxTasks = 1; scaleOutThreshold = 1; scaleInThreshold = 0; constructor( @InjectQueue('transcode') private transcodeQueue: Queue, ) { this.ecs = new ECSClient({ region: 'us-east-1', credentials: { secretAccessKey: process.env.AWS_SECRET_KEY, accessKeyId: process.env.AWS_ACCESS_KEY, }, }); this.autoScaling = new AutoScalingClient({ region: 'us-east-1', credentials: { secretAccessKey: process.env.AWS_SECRET_KEY, accessKeyId: process.env.AWS_ACCESS_KEY, }, }); } // Function to get the number of messages in the queue async getQueueSize(): Promise { const jobCount = await this.transcodeQueue.getJobCounts('waiting'); return jobCount['waiting'] || 0; } // Function to check for ongoing transcoding tasks async getProcessingCount(): Promise { const jobCount = await this.transcodeQueue.getJobCounts('active'); return jobCount['active'] || 0; } // Function to describe the ECS service to get the current task count async describeService(cluster: string, service: string): Promise { const command = new DescribeServicesCommand({ cluster, services: [service], }); const response = await this.ecs.send(command); const desiredCount = response.services?.[0]?.desiredCount || 0; return desiredCount; } // Function to scale the ECS service by updating the desired task count async scaleEcsService( cluster: string, service: string, desiredCount: number, ) { const command = new UpdateServiceCommand({ cluster, service, desiredCount, }); await this.ecs.send(command); Logger.log(`ECS service scaled to ${desiredCount} tasks`); } // Function to describe the current desired count of the Auto Scaling group async describeAutoScalingGroup(asgName: string): Promise { const command = new DescribeAutoScalingGroupsCommand({ AutoScalingGroupNames: [asgName], }); const response = await this.autoScaling.send(command); return response.AutoScalingGroups?.[0]?.DesiredCapacity || 0; } // Function to update the desired count of the Auto Scaling group async scaleAutoScalingGroup(asgName: string, desiredCount: number) { const command = new UpdateAutoScalingGroupCommand({ AutoScalingGroupName: asgName, DesiredCapacity: desiredCount, }); await this.autoScaling.send(command); Logger.log(`Auto Scaling group scaled to ${desiredCount} instances`); } // Run the autoscaling logic every hour (AWS ECS only). // Skip when using the local FFmpeg worker (no TRANSCODING_API_URL). @Cron(CronExpression.EVERY_MINUTE) async autoscaleEcsService() { if (process.env.ENVIRONMENT !== 'production') return; if (!process.env.TRANSCODING_API_URL) return; try { const queueSize = await this.getQueueSize(); const currentTaskCount = await this.describeService( this.clusterName, this.serviceName, ); const activeTranscodings = await this.getProcessingCount(); const currentAsgCount = await this.describeAutoScalingGroup( this.autoScalingGroup, ); Logger.log( `Queue size: ${queueSize}, Current task count: ${currentTaskCount}, Active transcodings: ${activeTranscodings}, ASG desired count: ${currentAsgCount}`, ); // If there are no active tasks, scale down both ECS and ASG if ( activeTranscodings + queueSize <= this.scaleInThreshold && (currentTaskCount > this.minTasks || currentAsgCount > this.minTasks) ) { await this.scaleEcsService( this.clusterName, this.serviceName, this.minTasks, ); await this.scaleAutoScalingGroup(this.autoScalingGroup, this.minTasks); // Scale down ASG Logger.log( 'Scaled down to 0 tasks and 0 instances in the Auto Scaling group.', 'TRANSCODING SERVER', ); } else if ( queueSize >= this.scaleOutThreshold && currentTaskCount < this.maxTasks ) { Logger.log( 'Scaling out ECS and ASG to handle increased load', 'TRANSCODING SERVER', ); // Scale out ECS and ASG const newTaskCount = Math.min(currentTaskCount + 1, this.maxTasks); await this.scaleEcsService( this.clusterName, this.serviceName, newTaskCount, ); await this.scaleAutoScalingGroup( this.autoScalingGroup, Math.min(currentAsgCount + 1, this.maxTasks), ); } } catch (error) { Logger.error('Error during autoscaling:', error); } } }