From 4897d862ae2adcde02b27a842b9122721277a479 Mon Sep 17 00:00:00 2001 From: JorisKohl Date: Tue, 21 Jan 2025 22:32:17 +0100 Subject: [PATCH 1/5] Added max_cached_per_user --- .env.example | 3 ++- src/app.service.ts | 49 +++++++++++++++++++++++++++++++++++++--------- 2 files changed, 42 insertions(+), 10 deletions(-) diff --git a/.env.example b/.env.example index 8ff01a7..c36d880 100644 --- a/.env.example +++ b/.env.example @@ -1,4 +1,5 @@ # All these are optional - uncomment to use # JELLYFIN_URL=http://your-jellyfin-url -# MAX_CONCURRENT_JOBS=1 \ No newline at end of file +# MAX_CONCURRENT_JOBS=1 +# MAX_CACHED_PER_USER=10 \ No newline at end of file diff --git a/src/app.service.ts b/src/app.service.ts index a204233..17e4084 100644 --- a/src/app.service.ts +++ b/src/app.service.ts @@ -31,6 +31,7 @@ export class AppService { private videoDurations: Map = new Map(); private jobQueue: string[] = []; private maxConcurrentJobs: number; + private maxCachedPerUser: number; private cacheDir: string; constructor( @@ -43,6 +44,10 @@ export class AppService { 1, ); + this.maxCachedPerUser = this.configService.get( + 'MAX_CACHED_PER_USER', + 10, + ); // Ensure the cache directory exists if (!fs.existsSync(this.cacheDir)) { fs.mkdirSync(this.cacheDir, { recursive: true }); @@ -220,18 +225,43 @@ export class AppService { } private checkQueue() { - let runningJobs = this.activeJobs.filter((job) => job.status === 'optimizing') - .length; - - while (runningJobs < this.maxConcurrentJobs && this.jobQueue.length > 0) { - const nextJobId = this.jobQueue.shift(); - if (nextJobId) { - this.startJob(nextJobId); - runningJobs++; // Now we track the newly started job + let runningJobs = this.activeJobs.filter((job) => job.status === 'optimizing').length; + + this.logger.log( + `${runningJobs} active jobs running and ${this.jobQueue.length} items in the queue`, + ); + + for (const index in this.jobQueue) { + if (runningJobs >= this.maxConcurrentJobs) { + break; // Stop if max concurrent jobs are reached + } + const nextJobId = this.jobQueue[index]; // Access job ID by index + if (!this.userTooManyCachedItems(nextJobId)) { + // Skip this job if user cache limits are reached + continue; } + // Start the job and remove it from the queue + this.startJob(nextJobId); + this.jobQueue.splice(Number(index), 1); // Remove the started job from the queue + runningJobs++; // Increment running jobs } } + private userTooManyCachedItems(jobid): boolean{ + if(this.maxCachedPerUser == 0){ + return false + } + const theNewJob: Job = this.activeJobs.find((job) => job.id === jobid) + let completedUserJobs = this.activeJobs.filter((job) => job.status === "completed" && job.deviceId === theNewJob.deviceId) + if((completedUserJobs.length >= this.maxCachedPerUser)){ + this.logger.log(`Waiting for items to be downloaded - device ${theNewJob.deviceId} has ${completedUserJobs.length} downloads waiting `); + return false + } + else{ + this.logger.log(`Optimizing - device ${theNewJob.deviceId} has ${completedUserJobs.length} downloads waiting`); + return true + } + } private startJob(jobId: string) { const job = this.activeJobs.find((job) => job.id === jobId); @@ -261,6 +291,7 @@ export class AppService { ]; } + private async startFFmpegProcess( jobId: string, ffmpegArgs: string[], @@ -329,7 +360,7 @@ export class AppService { } } - + private async getVideoDuration( inputUrl: string, jobId: string, From c73df8040cf128be70d7c0beee0850a0a040eddd Mon Sep 17 00:00:00 2001 From: JorisKohl Date: Thu, 23 Jan 2025 18:44:28 +0100 Subject: [PATCH 2/5] finished jobs are now called ready-for-removal --- src/app.service.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/app.service.ts b/src/app.service.ts index 908fe2a..e934dd4 100644 --- a/src/app.service.ts +++ b/src/app.service.ts @@ -265,7 +265,7 @@ export class AppService { } private getCompletedJobs(): number { - return this.activeJobs.filter((job) => job.status === 'completed').length; + return this.activeJobs.filter((job) => job.status === 'ready-for-removal').length; } private getUniqueDevices(): number { From 9abc3cc16c507f3f13dcbecf268bf3efe0cd6f34 Mon Sep 17 00:00:00 2001 From: JorisKohl Date: Thu, 23 Jan 2025 19:48:41 +0100 Subject: [PATCH 3/5] These lines will crash the application when killing an optimization. That should not be expected behaviour --- src/app.service.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/app.service.ts b/src/app.service.ts index e934dd4..308c10d 100644 --- a/src/app.service.ts +++ b/src/app.service.ts @@ -388,7 +388,7 @@ export class AppService { this.logger.error( `Job ${jobId} failed with exit code ${code}. Input URL: ${job.inputUrl}`, ); - reject(new Error(`FFmpeg process failed with exit code ${code}`)); + // reject(new Error(`FFmpeg process failed with exit code ${code}`)); } }); @@ -396,7 +396,7 @@ export class AppService { this.logger.error( `FFmpeg process error for job ${jobId}: ${error.message}`, ); - reject(error); + // reject(error); }); }); } catch (error) { From a0f67cdbc4d60f4080c88de3a6aece8c87f8b066 Mon Sep 17 00:00:00 2001 From: JorisKohl Date: Thu, 23 Jan 2025 21:26:51 +0100 Subject: [PATCH 4/5] Added a mechanism to see to it that each user that creates a job, gets equal optimization time. --- src/app.service.ts | 47 +++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 42 insertions(+), 5 deletions(-) diff --git a/src/app.service.ts b/src/app.service.ts index 308c10d..e289af8 100644 --- a/src/app.service.ts +++ b/src/app.service.ts @@ -15,7 +15,7 @@ import * as kill from 'tree-kill'; export interface Job { id: string; - status: 'queued' | 'optimizing' | 'completed' | 'failed' | 'cancelled' | 'ready-for-removal'; + status: 'queued' | 'optimizing' | 'pending downloads limit' | 'completed' | 'failed' | 'cancelled' | 'ready-for-removal'; progress: number; outputPath: string; inputUrl: string; @@ -30,6 +30,7 @@ export interface Job { @Injectable() export class AppService { private activeJobs: Job[] = []; + private optimizationHistory: Job[] = []; private ffmpegProcesses: Map = new Map(); private videoDurations: Map = new Map(); private jobQueue: string[] = []; @@ -133,7 +134,6 @@ export class AppService { const finalizeJobRemoval = () => { if (job) { this.jobQueue = this.jobQueue.filter(id => id !== jobId); - if (this.immediateRemoval === true || job.progress < 100) { this.fileRemoval.cleanupReadyForRemovalJobs([job]); this.activeJobs = this.activeJobs.filter(activeJob => activeJob.id !== jobId); @@ -143,6 +143,9 @@ export class AppService { this.logger.log('Immediate removal is not allowed, cleanup service will take care in due time') } } + this.activeJobs + .filter((nextjob) => nextjob.deviceId === job.deviceId && nextjob.status === 'pending downloads limit') + .forEach((job) => job.status = 'queued') this.checkQueue(); }; @@ -174,6 +177,7 @@ export class AppService { completeJob(jobId: string):void{ const job = this.activeJobs.find((job) => job.id === jobId); + if (job) { job.status = 'ready-for-removal'; job.timestamp = new Date() @@ -268,6 +272,34 @@ export class AppService { return this.activeJobs.filter((job) => job.status === 'ready-for-removal').length; } + private isDeviceIdInOptimizeHistory(job:Job){ + const uniqueDeviceIds: string[] = [...new Set(this.optimizationHistory.map((job: Job) => job.deviceId))]; + const result = uniqueDeviceIds.includes(job.deviceId); // Check if job.deviceId is in uniqueDeviceIds + this.logger.log(`Device ID ${job.deviceId} is ${result ? 'in' : 'not in'} the finished jobs. Optimizing ${result ? 'Allowed' : 'not Allowed'}`); + return result + } + + private getActiveJobDeviceIds(): string[]{ + const uniqueDeviceIds: string[] = [ + ...new Set( + this.activeJobs + .filter((job: Job) => job.status === 'queued') // Filter jobs with status 'queued' + .map((job: Job) => job.deviceId) // Extract deviceId + ) + ]; + return uniqueDeviceIds + } + + private handleOptimizationHistory(job: Job): void{ + // create a finished jobs list to make sure every device gets equal optimizing time + this.optimizationHistory.push(job) // push the newest job to the finished jobs list + const amountOfActiveDeviceIds = this.getActiveJobDeviceIds().length // get the amount of active queued job device ids + while(amountOfActiveDeviceIds <= this.optimizationHistory.length && this.optimizationHistory.length > 0){ // the finished jobs should always be lower than the amount of active jobs. This is to push out the last deviceid: FIFO + this.optimizationHistory.shift() // shift away the oldest job. + } + this.logger.log(`${this.optimizationHistory.length} deviceIDs have recently finished a job`) + } + private getUniqueDevices(): number { const devices = new Set(this.activeJobs.map((job) => job.deviceId)); return devices.size; @@ -285,8 +317,11 @@ export class AppService { break; // Stop if max concurrent jobs are reached } const nextJobId = this.jobQueue[index]; // Access job ID by index - if (!this.userTooManyCachedItems(nextJobId)) { - // Skip this job if user cache limits are reached + let nextJob: Job = this.activeJobs.find((job) => job.id === nextJobId); + + if (!this.userTooManyCachedItems(nextJobId) || this.isDeviceIdInOptimizeHistory(nextJob)) { + nextJob.status = 'pending downloads limit' + // Skip this job if user cache limits are reached or the deviceID is in the recently finished jobs continue; } // Start the job and remove it from the queue @@ -301,7 +336,7 @@ export class AppService { return false } const theNewJob: Job = this.activeJobs.find((job) => job.id === jobid) - let completedUserJobs = this.activeJobs.filter((job) => job.status === "completed" && job.deviceId === theNewJob.deviceId) + let completedUserJobs = this.activeJobs.filter((job) => (job.status === "completed" || job.status === 'optimizing') && job.deviceId === theNewJob.deviceId) if((completedUserJobs.length >= this.maxCachedPerUser)){ this.logger.log(`Waiting for items to be downloaded - device ${theNewJob.deviceId} has ${completedUserJobs.length} downloads waiting `); return false @@ -316,6 +351,7 @@ export class AppService { const job = this.activeJobs.find((job) => job.id === jobId); if (job) { job.status = 'optimizing'; + this.handleOptimizationHistory(job) const ffmpegArgs = this.getFfmpegArgs(job.inputUrl, job.outputPath); this.startFFmpegProcess(jobId, ffmpegArgs) .finally(() => { @@ -367,6 +403,7 @@ export class AppService { } if (code === 0) { + job.status = 'completed'; job.progress = 100; // Update the file size From fc00a5f4b0fb3ea276719dcfafa40a9f1ae3c27a Mon Sep 17 00:00:00 2001 From: JorisKohl Date: Thu, 23 Jan 2025 21:34:40 +0100 Subject: [PATCH 5/5] split the checks in checkqueue. --- src/app.service.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/app.service.ts b/src/app.service.ts index e289af8..06b402b 100644 --- a/src/app.service.ts +++ b/src/app.service.ts @@ -319,11 +319,15 @@ export class AppService { const nextJobId = this.jobQueue[index]; // Access job ID by index let nextJob: Job = this.activeJobs.find((job) => job.id === nextJobId); - if (!this.userTooManyCachedItems(nextJobId) || this.isDeviceIdInOptimizeHistory(nextJob)) { + if (!this.userTooManyCachedItems(nextJobId) ) { nextJob.status = 'pending downloads limit' - // Skip this job if user cache limits are reached or the deviceID is in the recently finished jobs + // Skip this job if user cache limits are reached continue; } + if(this.isDeviceIdInOptimizeHistory(nextJob)){ + // Skip this job if deviceID is in the recently finished jobs + continue + } // Start the job and remove it from the queue this.startJob(nextJobId); this.jobQueue.splice(Number(index), 1); // Remove the started job from the queue