Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@

# File-removal
# REMOVE_FILE_AFTER_RIGHT_DOWNLOAD=true
# TIME_TO_KEEP_FILES=8 # Hours. Non-option when REMOVE_FILE_AFTER_RIGHT_DOWNLOAD=false
# TIME_TO_KEEP_FILES=8 # Hours. Non-option when REMOVE_FILE_AFTER_RIGHT_DOWNLOAD=false

# File management
# MAX_CACHED_PER_USER=10
100 changes: 86 additions & 14 deletions src/app.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import * as kill from 'tree-kill';

export interface Job {
id: string;
status: 'queued' | 'optimizing' | 'completed' | 'failed' | 'cancelled' | 'ready-for-removal';
status: 'queued' | 'optimizing' | 'pending downloads limit' | 'completed' | 'failed' | 'cancelled' | 'ready-for-removal';
progress: number;
outputPath: string;
inputUrl: string;
Expand All @@ -30,10 +30,12 @@ export interface Job {
@Injectable()
export class AppService {
private activeJobs: Job[] = [];
private optimizationHistory: Job[] = [];
private ffmpegProcesses: Map<string, ChildProcess> = new Map();
private videoDurations: Map<string, number> = new Map();
private jobQueue: string[] = [];
private maxConcurrentJobs: number;
private maxCachedPerUser: number;
private cacheDir: string;
private immediateRemoval: boolean;

Expand All @@ -48,6 +50,10 @@ export class AppService {
'MAX_CONCURRENT_JOBS',
1,
);
this.maxCachedPerUser = this.configService.get<number>(
'MAX_CACHED_PER_USER',
10,
);
this.immediateRemoval = this.configService.get<boolean>(
'REMOVE_FILE_AFTER_RIGHT_DOWNLOAD',
true,
Expand Down Expand Up @@ -128,7 +134,6 @@ export class AppService {
const finalizeJobRemoval = () => {
if (job) {
this.jobQueue = this.jobQueue.filter(id => id !== jobId);

if (this.immediateRemoval === true || job.progress < 100) {
this.fileRemoval.cleanupReadyForRemovalJobs([job]);
this.activeJobs = this.activeJobs.filter(activeJob => activeJob.id !== jobId);
Expand All @@ -138,6 +143,9 @@ export class AppService {
this.logger.log('Immediate removal is not allowed, cleanup service will take care in due time')
}
}
this.activeJobs
.filter((nextjob) => nextjob.deviceId === job.deviceId && nextjob.status === 'pending downloads limit')
.forEach((job) => job.status = 'queued')
this.checkQueue();
};

Expand Down Expand Up @@ -169,6 +177,7 @@ export class AppService {

completeJob(jobId: string):void{
const job = this.activeJobs.find((job) => job.id === jobId);

if (job) {
job.status = 'ready-for-removal';
job.timestamp = new Date()
Expand Down Expand Up @@ -260,7 +269,35 @@ export class AppService {
}

private getCompletedJobs(): number {
return this.activeJobs.filter((job) => job.status === 'completed').length;
return this.activeJobs.filter((job) => job.status === 'ready-for-removal').length;
}

private isDeviceIdInOptimizeHistory(job:Job){
const uniqueDeviceIds: string[] = [...new Set(this.optimizationHistory.map((job: Job) => job.deviceId))];
const result = uniqueDeviceIds.includes(job.deviceId); // Check if job.deviceId is in uniqueDeviceIds
this.logger.log(`Device ID ${job.deviceId} is ${result ? 'in' : 'not in'} the finished jobs. Optimizing ${result ? 'Allowed' : 'not Allowed'}`);
return result
}

private getActiveJobDeviceIds(): string[]{
const uniqueDeviceIds: string[] = [
...new Set(
this.activeJobs
.filter((job: Job) => job.status === 'queued') // Filter jobs with status 'queued'
.map((job: Job) => job.deviceId) // Extract deviceId
)
];
return uniqueDeviceIds
}

private handleOptimizationHistory(job: Job): void{
// create a finished jobs list to make sure every device gets equal optimizing time
this.optimizationHistory.push(job) // push the newest job to the finished jobs list
const amountOfActiveDeviceIds = this.getActiveJobDeviceIds().length // get the amount of active queued job device ids
while(amountOfActiveDeviceIds <= this.optimizationHistory.length && this.optimizationHistory.length > 0){ // the finished jobs should always be lower than the amount of active jobs. This is to push out the last deviceid: FIFO
this.optimizationHistory.shift() // shift away the oldest job.
}
this.logger.log(`${this.optimizationHistory.length} deviceIDs have recently finished a job`)
}

private getUniqueDevices(): number {
Expand All @@ -269,23 +306,56 @@ export class AppService {
}

private checkQueue() {
let runningJobs = this.activeJobs.filter((job) => job.status === 'optimizing')
.length;

while (runningJobs < this.maxConcurrentJobs && this.jobQueue.length > 0) {
const nextJobId = this.jobQueue.shift();
if (nextJobId) {
this.startJob(nextJobId);
runningJobs++; // Now we track the newly started job
let runningJobs = this.activeJobs.filter((job) => job.status === 'optimizing').length;

this.logger.log(
`${runningJobs} active jobs running and ${this.jobQueue.length} items in the queue`,
);

for (const index in this.jobQueue) {
if (runningJobs >= this.maxConcurrentJobs) {
break; // Stop if max concurrent jobs are reached
}
const nextJobId = this.jobQueue[index]; // Access job ID by index
let nextJob: Job = this.activeJobs.find((job) => job.id === nextJobId);

if (!this.userTooManyCachedItems(nextJobId) ) {
nextJob.status = 'pending downloads limit'
// Skip this job if user cache limits are reached
continue;
}
if(this.isDeviceIdInOptimizeHistory(nextJob)){
// Skip this job if deviceID is in the recently finished jobs
continue
}
// Start the job and remove it from the queue
this.startJob(nextJobId);
this.jobQueue.splice(Number(index), 1); // Remove the started job from the queue
runningJobs++; // Increment running jobs
}
}

private userTooManyCachedItems(jobid): boolean{
if(this.maxCachedPerUser == 0){
return false
}
const theNewJob: Job = this.activeJobs.find((job) => job.id === jobid)
let completedUserJobs = this.activeJobs.filter((job) => (job.status === "completed" || job.status === 'optimizing') && job.deviceId === theNewJob.deviceId)
if((completedUserJobs.length >= this.maxCachedPerUser)){
this.logger.log(`Waiting for items to be downloaded - device ${theNewJob.deviceId} has ${completedUserJobs.length} downloads waiting `);
return false
}
else{
this.logger.log(`Optimizing - device ${theNewJob.deviceId} has ${completedUserJobs.length} downloads waiting`);
return true
}
}

private startJob(jobId: string) {
const job = this.activeJobs.find((job) => job.id === jobId);
if (job) {
job.status = 'optimizing';
this.handleOptimizationHistory(job)
const ffmpegArgs = this.getFfmpegArgs(job.inputUrl, job.outputPath);
this.startFFmpegProcess(jobId, ffmpegArgs)
.finally(() => {
Expand All @@ -310,6 +380,7 @@ export class AppService {
];
}


private async startFFmpegProcess(
jobId: string,
ffmpegArgs: string[],
Expand All @@ -336,6 +407,7 @@ export class AppService {
}

if (code === 0) {

job.status = 'completed';
job.progress = 100;
// Update the file size
Expand All @@ -357,15 +429,15 @@ export class AppService {
this.logger.error(
`Job ${jobId} failed with exit code ${code}. Input URL: ${job.inputUrl}`,
);
reject(new Error(`FFmpeg process failed with exit code ${code}`));
// reject(new Error(`FFmpeg process failed with exit code ${code}`));
}
});

ffmpegProcess.on('error', (error) => {
this.logger.error(
`FFmpeg process error for job ${jobId}: ${error.message}`,
);
reject(error);
// reject(error);
});
});
} catch (error) {
Expand All @@ -377,7 +449,7 @@ export class AppService {
}
}


private async getVideoDuration(
inputUrl: string,
jobId: string,
Expand Down
Loading