From 3d9aafc65d2c2b851e9a10b858710df38a1ac75c Mon Sep 17 00:00:00 2001 From: xaxxoo Date: Sun, 29 Mar 2026 14:17:53 +0100 Subject: [PATCH] mount waveform --- .../src/mount-waveform/tracks.service.spec.ts | 103 +++++++++ backend/src/mount-waveform/tracks.service.ts | 54 +++++ .../waveform-generator.service.spec.ts | 62 ++++++ .../waveform-generator.service.ts | 63 ++++++ .../src/mount-waveform/waveform.constants.ts | 12 ++ .../waveform.controller.spec.ts | 81 +++++++ .../src/mount-waveform/waveform.controller.ts | 62 ++++++ backend/src/mount-waveform/waveform.dto.ts | 33 +++ backend/src/mount-waveform/waveform.entity.ts | 49 +++++ backend/src/mount-waveform/waveform.module.ts | 26 +++ .../src/mount-waveform/waveform.service.ts | 204 ++++++++++++++++++ backend/src/waveform.processor.ts | 69 ++++++ 12 files changed, 818 insertions(+) create mode 100644 backend/src/mount-waveform/tracks.service.spec.ts create mode 100644 backend/src/mount-waveform/tracks.service.ts create mode 100644 backend/src/mount-waveform/waveform-generator.service.spec.ts create mode 100644 backend/src/mount-waveform/waveform-generator.service.ts create mode 100644 backend/src/mount-waveform/waveform.constants.ts create mode 100644 backend/src/mount-waveform/waveform.controller.spec.ts create mode 100644 backend/src/mount-waveform/waveform.controller.ts create mode 100644 backend/src/mount-waveform/waveform.dto.ts create mode 100644 backend/src/mount-waveform/waveform.entity.ts create mode 100644 backend/src/mount-waveform/waveform.module.ts create mode 100644 backend/src/mount-waveform/waveform.service.ts create mode 100644 backend/src/waveform.processor.ts diff --git a/backend/src/mount-waveform/tracks.service.spec.ts b/backend/src/mount-waveform/tracks.service.spec.ts new file mode 100644 index 0000000..aec7802 --- /dev/null +++ b/backend/src/mount-waveform/tracks.service.spec.ts @@ -0,0 +1,103 @@ +import { NotFoundException } from '@nestjs/common'; +import { Test, TestingModule } from '@nestjs/testing'; +import { getRepositoryToken } from '@nestjs/typeorm'; +import { WaveformService } from '../waveform/waveform.service'; +import { TrackEntity } from './track.entity'; +import { TracksService } from './tracks.service'; + +// ─── Mocks ──────────────────────────────────────────────────────────────────── + +const trackRepoMock = { + create: jest.fn(), + save: jest.fn(), + findOne: jest.fn(), + find: jest.fn(), +}; + +const waveformServiceMock = { + enqueueForTrack: jest.fn(), +}; + +const makeTrack = (overrides: Partial = {}): TrackEntity => + Object.assign(new TrackEntity(), { + id: 'track-uuid', + title: 'Test Track', + audioFilePath: '/uploads/test.mp3', + createdAt: new Date(), + updatedAt: new Date(), + ...overrides, + }); + +// ─── Tests ──────────────────────────────────────────────────────────────────── + +describe('TracksService', () => { + let service: TracksService; + + beforeEach(async () => { + jest.clearAllMocks(); + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + TracksService, + { provide: getRepositoryToken(TrackEntity), useValue: trackRepoMock }, + { provide: WaveformService, useValue: waveformServiceMock }, + ], + }).compile(); + + service = module.get(TracksService); + }); + + // ── createTrack ───────────────────────────────────────────────────────────── + + describe('createTrack', () => { + it('persists the track and enqueues waveform generation', async () => { + const track = makeTrack(); + trackRepoMock.create.mockReturnValue(track); + trackRepoMock.save.mockResolvedValue(track); + waveformServiceMock.enqueueForTrack.mockResolvedValue('job-1'); + + const result = await service.createTrack({ + title: 'Test Track', + audioFilePath: '/uploads/test.mp3', + }); + + expect(trackRepoMock.save).toHaveBeenCalledWith(track); + expect(waveformServiceMock.enqueueForTrack).toHaveBeenCalledWith( + 'track-uuid', + '/uploads/test.mp3', + ); + expect(result).toEqual(track); + }); + + it('does NOT use setTimeout (no timer mocking needed)', async () => { + const setTimeoutSpy = jest.spyOn(global, 'setTimeout'); + const track = makeTrack(); + trackRepoMock.create.mockReturnValue(track); + trackRepoMock.save.mockResolvedValue(track); + waveformServiceMock.enqueueForTrack.mockResolvedValue('job-1'); + + await service.createTrack({ title: 'T', audioFilePath: '/a.mp3' }); + + // Durable queue – no in-process timers + expect(setTimeoutSpy).not.toHaveBeenCalled(); + setTimeoutSpy.mockRestore(); + }); + }); + + // ── findOne ───────────────────────────────────────────────────────────────── + + describe('findOne', () => { + it('returns the track when found', async () => { + const track = makeTrack(); + trackRepoMock.findOne.mockResolvedValue(track); + + const result = await service.findOne('track-uuid'); + expect(result).toEqual(track); + }); + + it('throws NotFoundException when track is missing', async () => { + trackRepoMock.findOne.mockResolvedValue(null); + await expect(service.findOne('ghost')).rejects.toThrow(NotFoundException); + }); + }); +}); diff --git a/backend/src/mount-waveform/tracks.service.ts b/backend/src/mount-waveform/tracks.service.ts new file mode 100644 index 0000000..5b45581 --- /dev/null +++ b/backend/src/mount-waveform/tracks.service.ts @@ -0,0 +1,54 @@ +import { + Injectable, + Logger, + NotFoundException, +} from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Repository } from 'typeorm'; + +import { WaveformService } from '../waveform/waveform.service'; +import { CreateTrackDto } from './dto/create-track.dto'; +import { TrackEntity } from './track.entity'; + +/** + * TracksService owns the track lifecycle and delegates waveform generation + * to WaveformService – no direct queue interaction here. + * + * Key change: `createTrack` now calls `waveformService.enqueueForTrack` + * after persisting the track, replacing any fire-and-forget setTimeout that + * may have existed. + */ +@Injectable() +export class TracksService { + private readonly logger = new Logger(TracksService.name); + + constructor( + @InjectRepository(TrackEntity) + private readonly trackRepo: Repository, + + // Injected from WaveformModule (exported). + private readonly waveformService: WaveformService, + ) {} + + async createTrack(dto: CreateTrackDto): Promise { + const track = this.trackRepo.create(dto); + await this.trackRepo.save(track); + + this.logger.log(`Track ${track.id} created – enqueuing waveform generation`); + + // Durable – survives process restarts. No setTimeout. + await this.waveformService.enqueueForTrack(track.id, track.audioFilePath); + + return track; + } + + async findOne(id: string): Promise { + const track = await this.trackRepo.findOne({ where: { id } }); + if (!track) throw new NotFoundException(`Track ${id} not found`); + return track; + } + + async findAll(): Promise { + return this.trackRepo.find(); + } +} diff --git a/backend/src/mount-waveform/waveform-generator.service.spec.ts b/backend/src/mount-waveform/waveform-generator.service.spec.ts new file mode 100644 index 0000000..3412956 --- /dev/null +++ b/backend/src/mount-waveform/waveform-generator.service.spec.ts @@ -0,0 +1,62 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import * as fs from 'fs'; +import * as path from 'path'; +import * as os from 'os'; +import { WaveformGeneratorService } from './waveform-generator.service'; + +describe('WaveformGeneratorService', () => { + let service: WaveformGeneratorService; + let tmpFile: string; + + beforeEach(async () => { + const module: TestingModule = await Test.createTestingModule({ + providers: [WaveformGeneratorService], + }).compile(); + + service = module.get(WaveformGeneratorService); + }); + + afterEach(() => { + if (tmpFile && fs.existsSync(tmpFile)) { + fs.unlinkSync(tmpFile); + } + }); + + it('should be defined', () => { + expect(service).toBeDefined(); + }); + + describe('generateFromFile', () => { + it('returns peaks array and durationSeconds for a valid file', async () => { + tmpFile = path.join(os.tmpdir(), `test-${Date.now()}.mp3`); + // Write 10 KB of fake audio data so peakCount > 100 + fs.writeFileSync(tmpFile, Buffer.alloc(10 * 1024, 0xaa)); + + const result = await service.generateFromFile(tmpFile); + + expect(result.peaks).toBeInstanceOf(Array); + expect(result.peaks.length).toBeGreaterThanOrEqual(100); + expect(result.durationSeconds).toBeGreaterThan(0); + result.peaks.forEach((p) => { + expect(p).toBeGreaterThanOrEqual(-1); + expect(p).toBeLessThanOrEqual(1); + }); + }); + + it('produces deterministic peaks for the same file path', async () => { + tmpFile = path.join(os.tmpdir(), `det-test.mp3`); + fs.writeFileSync(tmpFile, Buffer.alloc(5 * 1024, 0xbb)); + + const a = await service.generateFromFile(tmpFile); + const b = await service.generateFromFile(tmpFile); + + expect(a.peaks).toEqual(b.peaks); + }); + + it('throws when the file does not exist', async () => { + await expect( + service.generateFromFile('/nonexistent/path/audio.mp3'), + ).rejects.toThrow('Audio file not found'); + }); + }); +}); diff --git a/backend/src/mount-waveform/waveform-generator.service.ts b/backend/src/mount-waveform/waveform-generator.service.ts new file mode 100644 index 0000000..af07603 --- /dev/null +++ b/backend/src/mount-waveform/waveform-generator.service.ts @@ -0,0 +1,63 @@ +import { Injectable, Logger } from '@nestjs/common'; +import * as fs from 'fs'; +import * as path from 'path'; + +export interface GeneratedWaveform { + peaks: number[]; + /** Duration in seconds derived from the audio file. */ + durationSeconds: number; +} + +/** + * Responsible solely for reading an audio file and producing normalised + * peak-amplitude data. All retry / persistence logic lives elsewhere. + * + * Production swap-in: replace `generateFromFile` with a call to ffprobe / + * audiowaveform CLI or a cloud-based audio analysis service. + */ +@Injectable() +export class WaveformGeneratorService { + private readonly logger = new Logger(WaveformGeneratorService.name); + + /** + * Generates waveform peaks from a local file path. + * + * @throws {Error} when the file cannot be read or parsed. + */ + async generateFromFile(filePath: string): Promise { + this.logger.log(`Generating waveform for: ${filePath}`); + + if (!fs.existsSync(filePath)) { + throw new Error(`Audio file not found: ${filePath}`); + } + + // ------------------------------------------------------------------ + // Real implementation would shell out to `audiowaveform` or `ffprobe`. + // The stub below produces deterministic fake data so the rest of the + // stack (queue, persistence, API) can be exercised in tests. + // ------------------------------------------------------------------ + const stats = fs.statSync(filePath); + const peakCount = Math.max(100, Math.floor(stats.size / 1024)); + const peaks = this.buildFakePeaks(peakCount, filePath); + const durationSeconds = peakCount * 0.1; // 100 ms per sample – rough stub + + return { peaks, durationSeconds }; + } + + // ------------------------------------------------------------------------- + // Private helpers + // ------------------------------------------------------------------------- + + private buildFakePeaks(count: number, seed: string): number[] { + // Simple seeded PRNG so tests get deterministic output. + let s = seed.split('').reduce((acc, c) => acc + c.charCodeAt(0), 0); + const rand = () => { + s = (s * 1664525 + 1013904223) & 0xffffffff; + return (s >>> 0) / 0xffffffff; + }; + + return Array.from({ length: count }, () => + parseFloat((rand() * 2 - 1).toFixed(4)), + ); + } +} diff --git a/backend/src/mount-waveform/waveform.constants.ts b/backend/src/mount-waveform/waveform.constants.ts new file mode 100644 index 0000000..147d6a8 --- /dev/null +++ b/backend/src/mount-waveform/waveform.constants.ts @@ -0,0 +1,12 @@ +export const WAVEFORM_QUEUE = 'waveform'; + +export const WAVEFORM_JOBS = { + GENERATE: 'generate', +} as const; + +export const WAVEFORM_JOB_DEFAULTS = { + /** Max BullMQ attempts before moving to failed */ + ATTEMPTS: 5, + /** Exponential back-off: 30 s, 60 s, 120 s, 240 s, 480 s */ + BACKOFF_DELAY_MS: 30_000, +} as const; diff --git a/backend/src/mount-waveform/waveform.controller.spec.ts b/backend/src/mount-waveform/waveform.controller.spec.ts new file mode 100644 index 0000000..d57a21a --- /dev/null +++ b/backend/src/mount-waveform/waveform.controller.spec.ts @@ -0,0 +1,81 @@ +import { NotFoundException } from '@nestjs/common'; +import { Test, TestingModule } from '@nestjs/testing'; +import { WaveformStatus } from './dto/waveform.dto'; +import { WaveformController } from './waveform.controller'; +import { WaveformService } from './waveform.service'; + +const waveformServiceMock = { + getStatus: jest.fn(), + regenerate: jest.fn(), +}; + +describe('WaveformController', () => { + let controller: WaveformController; + + beforeEach(async () => { + jest.clearAllMocks(); + + const module: TestingModule = await Test.createTestingModule({ + controllers: [WaveformController], + providers: [{ provide: WaveformService, useValue: waveformServiceMock }], + }).compile(); + + controller = module.get(WaveformController); + }); + + it('should be defined', () => expect(controller).toBeDefined()); + + // ── GET /tracks/:trackId/waveform ───────────────────────────────────────── + + describe('getStatus', () => { + it('delegates to waveformService.getStatus', async () => { + const dto = { + status: WaveformStatus.DONE, + peaks: [0.1], + attempts: 1, + updatedAt: new Date().toISOString(), + }; + waveformServiceMock.getStatus.mockResolvedValue(dto); + + const result = await controller.getStatus('track-uuid'); + + expect(waveformServiceMock.getStatus).toHaveBeenCalledWith('track-uuid'); + expect(result).toEqual(dto); + }); + + it('propagates NotFoundException', async () => { + waveformServiceMock.getStatus.mockRejectedValue( + new NotFoundException('not found'), + ); + await expect(controller.getStatus('ghost')).rejects.toThrow( + NotFoundException, + ); + }); + }); + + // ── POST /tracks/:trackId/waveform/regenerate ───────────────────────────── + + describe('regenerate', () => { + it('returns queued result', async () => { + waveformServiceMock.regenerate.mockResolvedValue({ + result: 'queued', + jobId: 'j-42', + }); + + const result = await controller.regenerate('track-uuid'); + + expect(waveformServiceMock.regenerate).toHaveBeenCalledWith('track-uuid'); + expect(result.result).toBe('queued'); + }); + + it('returns already_processing when job is in-flight', async () => { + waveformServiceMock.regenerate.mockResolvedValue({ + result: 'already_processing', + jobId: 'in-flight', + }); + + const result = await controller.regenerate('track-uuid'); + expect(result.result).toBe('already_processing'); + }); + }); +}); diff --git a/backend/src/mount-waveform/waveform.controller.ts b/backend/src/mount-waveform/waveform.controller.ts new file mode 100644 index 0000000..50c3ee1 --- /dev/null +++ b/backend/src/mount-waveform/waveform.controller.ts @@ -0,0 +1,62 @@ +import { + Controller, + Get, + HttpCode, + HttpStatus, + Param, + ParseUUIDPipe, + Post, + Version, +} from '@nestjs/common'; +import { + ApiConflictResponse, + ApiNotFoundResponse, + ApiOkResponse, + ApiOperation, + ApiTags, +} from '@nestjs/swagger'; + +import { RegenerateResponseDto, WaveformStatusDto } from './dto/waveform.dto'; +import { WaveformService } from './waveform.service'; + +/** + * Routes are mounted by AppModule under the global versioned prefix. + * Final URL: /api/v1/tracks/:trackId/waveform + * + * The old hardcoded `/api/waveform` prefix has been removed – routing is + * driven entirely by the global prefix + version set in main.ts. + */ +@ApiTags('waveform') +@Controller({ path: 'tracks/:trackId/waveform', version: '1' }) +export class WaveformController { + constructor(private readonly waveformService: WaveformService) {} + + // GET /api/v1/tracks/:trackId/waveform + @Get() + @ApiOperation({ summary: 'Get waveform status and peak data for a track' }) + @ApiOkResponse({ type: WaveformStatusDto }) + @ApiNotFoundResponse({ description: 'No waveform record exists for track' }) + async getStatus( + @Param('trackId', ParseUUIDPipe) trackId: string, + ): Promise { + return this.waveformService.getStatus(trackId); + } + + // POST /api/v1/tracks/:trackId/waveform/regenerate + @Post('regenerate') + @HttpCode(HttpStatus.OK) + @ApiOperation({ + summary: 'Force-regenerate the waveform for a track', + description: + 'Enqueues a durable BullMQ job. Returns 200 + already_processing if ' + + 'a job is already in flight.', + }) + @ApiOkResponse({ type: RegenerateResponseDto }) + @ApiNotFoundResponse({ description: 'No waveform record exists for track' }) + @ApiConflictResponse({ description: 'Waveform generation already in progress' }) + async regenerate( + @Param('trackId', ParseUUIDPipe) trackId: string, + ): Promise { + return this.waveformService.regenerate(trackId); + } +} diff --git a/backend/src/mount-waveform/waveform.dto.ts b/backend/src/mount-waveform/waveform.dto.ts new file mode 100644 index 0000000..b5b8a89 --- /dev/null +++ b/backend/src/mount-waveform/waveform.dto.ts @@ -0,0 +1,33 @@ +import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger'; + +export enum WaveformStatus { + PENDING = 'pending', + PROCESSING = 'processing', + DONE = 'done', + FAILED = 'failed', +} + +export class WaveformStatusDto { + @ApiProperty({ enum: WaveformStatus }) + status: WaveformStatus; + + @ApiPropertyOptional({ description: 'Peak amplitude data (mono, normalised -1…1)', type: [Number] }) + peaks?: number[]; + + @ApiPropertyOptional({ description: 'Number of BullMQ attempts consumed so far' }) + attempts?: number; + + @ApiPropertyOptional({ description: 'Human-readable failure reason for operators' }) + failReason?: string; + + @ApiPropertyOptional({ description: 'ISO timestamp of last status change' }) + updatedAt?: string; +} + +export class RegenerateResponseDto { + @ApiProperty({ example: 'queued' }) + result: 'queued' | 'already_processing'; + + @ApiPropertyOptional({ description: 'BullMQ job id' }) + jobId?: string; +} diff --git a/backend/src/mount-waveform/waveform.entity.ts b/backend/src/mount-waveform/waveform.entity.ts new file mode 100644 index 0000000..80ab0d1 --- /dev/null +++ b/backend/src/mount-waveform/waveform.entity.ts @@ -0,0 +1,49 @@ +import { + Column, + CreateDateColumn, + Entity, + Index, + PrimaryGeneratedColumn, + UpdateDateColumn, +} from 'typeorm'; +import { WaveformStatus } from './dto/waveform.dto'; + +@Entity('waveforms') +export class WaveformEntity { + @PrimaryGeneratedColumn('uuid') + id: string; + + /** Foreign-key to the tracks table (loose coupling – no FK constraint). */ + @Index() + @Column({ name: 'track_id', type: 'uuid', unique: true }) + trackId: string; + + @Column({ + type: 'enum', + enum: WaveformStatus, + default: WaveformStatus.PENDING, + }) + status: WaveformStatus; + + /** Normalised peak data stored as a JSON array. */ + @Column({ type: 'jsonb', nullable: true }) + peaks: number[] | null; + + /** How many BullMQ attempts have been consumed. */ + @Column({ name: 'attempt_count', default: 0 }) + attemptCount: number; + + /** Last failure reason forwarded to operators. */ + @Column({ name: 'fail_reason', type: 'text', nullable: true }) + failReason: string | null; + + /** BullMQ job id – lets us correlate queue state with DB row. */ + @Column({ name: 'bull_job_id', type: 'text', nullable: true }) + bullJobId: string | null; + + @CreateDateColumn({ name: 'created_at' }) + createdAt: Date; + + @UpdateDateColumn({ name: 'updated_at' }) + updatedAt: Date; +} diff --git a/backend/src/mount-waveform/waveform.module.ts b/backend/src/mount-waveform/waveform.module.ts new file mode 100644 index 0000000..11c4893 --- /dev/null +++ b/backend/src/mount-waveform/waveform.module.ts @@ -0,0 +1,26 @@ +import { BullModule } from '@nestjs/bullmq'; +import { Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; + +import { WaveformGeneratorService } from './waveform-generator.service'; +import { WaveformController } from './waveform.controller'; +import { WAVEFORM_QUEUE } from './waveform.constants'; +import { WaveformEntity } from './waveform.entity'; +import { WaveformProcessor } from './waveform.processor'; +import { WaveformService } from './waveform.service'; + +@Module({ + imports: [ + TypeOrmModule.forFeature([WaveformEntity]), + + // Register the BullMQ queue. The shared Redis connection is configured + // once in AppModule via BullModule.forRootAsync. + BullModule.registerQueue({ name: WAVEFORM_QUEUE }), + ], + controllers: [WaveformController], + providers: [WaveformService, WaveformGeneratorService, WaveformProcessor], + // Export WaveformService so TracksModule can call enqueueForTrack on + // track creation without creating a circular dependency. + exports: [WaveformService], +}) +export class WaveformModule {} diff --git a/backend/src/mount-waveform/waveform.service.ts b/backend/src/mount-waveform/waveform.service.ts new file mode 100644 index 0000000..031f745 --- /dev/null +++ b/backend/src/mount-waveform/waveform.service.ts @@ -0,0 +1,204 @@ +import { + ConflictException, + Injectable, + Logger, + NotFoundException, +} from '@nestjs/common'; +import { InjectQueue } from '@nestjs/bullmq'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Queue } from 'bullmq'; +import { Repository } from 'typeorm'; + +import { + RegenerateResponseDto, + WaveformStatus, + WaveformStatusDto, +} from './dto/waveform.dto'; +import { WaveformEntity } from './waveform.entity'; +import { + WAVEFORM_JOB_DEFAULTS, + WAVEFORM_JOBS, + WAVEFORM_QUEUE, +} from './waveform.constants'; + +export interface WaveformJobPayload { + trackId: string; + audioFilePath: string; +} + +@Injectable() +export class WaveformService { + private readonly logger = new Logger(WaveformService.name); + + constructor( + @InjectRepository(WaveformEntity) + private readonly waveformRepo: Repository, + + @InjectQueue(WAVEFORM_QUEUE) + private readonly waveformQueue: Queue, + ) {} + + // ------------------------------------------------------------------------- + // Public API + // ------------------------------------------------------------------------- + + /** + * Enqueues a waveform generation job for a newly created track. + * Safe to call multiple times – idempotent when a job is already queued + * or processing. + */ + async enqueueForTrack( + trackId: string, + audioFilePath: string, + ): Promise { + let record = await this.waveformRepo.findOne({ where: { trackId } }); + + if ( + record && + [WaveformStatus.PENDING, WaveformStatus.PROCESSING].includes( + record.status, + ) + ) { + this.logger.log( + `Waveform already ${record.status} for track ${trackId} – skipping enqueue`, + ); + return record.bullJobId ?? ''; + } + + if (!record) { + record = this.waveformRepo.create({ + trackId, + status: WaveformStatus.PENDING, + }); + } else { + record.status = WaveformStatus.PENDING; + record.failReason = null; + record.peaks = null; + } + + await this.waveformRepo.save(record); + + const job = await this.waveformQueue.add( + WAVEFORM_JOBS.GENERATE, + { trackId, audioFilePath }, + { + jobId: `waveform:${trackId}`, + attempts: WAVEFORM_JOB_DEFAULTS.ATTEMPTS, + backoff: { + type: 'exponential', + delay: WAVEFORM_JOB_DEFAULTS.BACKOFF_DELAY_MS, + }, + removeOnComplete: true, + removeOnFail: false, // keep failed jobs in BullMQ dashboard + }, + ); + + record.bullJobId = job.id ?? null; + await this.waveformRepo.save(record); + + this.logger.log( + `Waveform job ${job.id} enqueued for track ${trackId}`, + ); + + return job.id ?? ''; + } + + /** + * Force-regenerates the waveform for an existing track. + * Returns 409 if a generation is already in flight. + */ + async regenerate(trackId: string): Promise { + const record = await this.waveformRepo.findOne({ where: { trackId } }); + + if (!record) { + throw new NotFoundException( + `No waveform record found for track ${trackId}`, + ); + } + + if ( + [WaveformStatus.PENDING, WaveformStatus.PROCESSING].includes( + record.status, + ) + ) { + return { result: 'already_processing', jobId: record.bullJobId ?? undefined }; + } + + // Derive audio path stored during original enqueue – in a real app you + // would look it up from the tracks table. Here we re-use the job payload + // that BullMQ might still hold, or fall back to a placeholder so the + // worker can re-resolve it. + const existingJob = record.bullJobId + ? await this.waveformQueue.getJob(record.bullJobId) + : null; + + const audioFilePath = + (existingJob?.data as WaveformJobPayload | null)?.audioFilePath ?? + `__RESOLVE_FROM_TRACK__${trackId}`; + + const jobId = await this.enqueueForTrack(trackId, audioFilePath); + return { result: 'queued', jobId }; + } + + /** + * Returns the current status + peaks for a given track. + */ + async getStatus(trackId: string): Promise { + const record = await this.waveformRepo.findOne({ where: { trackId } }); + + if (!record) { + throw new NotFoundException( + `No waveform record found for track ${trackId}`, + ); + } + + return { + status: record.status, + peaks: record.peaks ?? undefined, + attempts: record.attemptCount, + failReason: record.failReason ?? undefined, + updatedAt: record.updatedAt.toISOString(), + }; + } + + // ------------------------------------------------------------------------- + // Called by the BullMQ processor – not part of the public HTTP surface + // ------------------------------------------------------------------------- + + async markProcessing(trackId: string): Promise { + await this.waveformRepo.update( + { trackId }, + { status: WaveformStatus.PROCESSING }, + ); + } + + async markDone(trackId: string, peaks: number[]): Promise { + await this.waveformRepo.update( + { trackId }, + { status: WaveformStatus.DONE, peaks, failReason: null }, + ); + this.logger.log(`Waveform done for track ${trackId} (${peaks.length} peaks)`); + } + + async markFailed( + trackId: string, + reason: string, + attemptsMade: number, + ): Promise { + await this.waveformRepo.update( + { trackId }, + { + status: WaveformStatus.FAILED, + failReason: reason, + attemptCount: attemptsMade, + }, + ); + this.logger.error( + `Waveform FAILED for track ${trackId} after ${attemptsMade} attempts: ${reason}`, + ); + } + + async incrementAttemptCount(trackId: string): Promise { + await this.waveformRepo.increment({ trackId }, 'attemptCount', 1); + } +} diff --git a/backend/src/waveform.processor.ts b/backend/src/waveform.processor.ts new file mode 100644 index 0000000..6a142e2 --- /dev/null +++ b/backend/src/waveform.processor.ts @@ -0,0 +1,69 @@ +import { Processor, WorkerHost } from '@nestjs/bullmq'; +import { Logger } from '@nestjs/common'; +import { Job } from 'bullmq'; + +import { WaveformGeneratorService } from './waveform-generator.service'; +import { WaveformService, WaveformJobPayload } from './waveform.service'; +import { WAVEFORM_JOBS, WAVEFORM_QUEUE } from './waveform.constants'; + +/** + * BullMQ worker that replaces the old `setTimeout`-based retry mechanism. + * + * BullMQ persists jobs in Redis, so: + * - Restarts do NOT lose queued or in-flight work. + * - Exponential back-off with a configurable number of attempts is handled + * by the queue, not application code. + * - Failed jobs remain visible in the BullMQ dashboard for inspection / + * manual retry without restarting the process. + */ +@Processor(WAVEFORM_QUEUE) +export class WaveformProcessor extends WorkerHost { + private readonly logger = new Logger(WaveformProcessor.name); + + constructor( + private readonly generatorService: WaveformGeneratorService, + private readonly waveformService: WaveformService, + ) { + super(); + } + + async process(job: Job): Promise { + if (job.name !== WAVEFORM_JOBS.GENERATE) { + this.logger.warn(`Unknown job name received: ${job.name}`); + return; + } + + const { trackId, audioFilePath } = job.data; + this.logger.log( + `Processing waveform job ${job.id} for track ${trackId} ` + + `(attempt ${job.attemptsMade + 1} / ${job.opts.attempts})`, + ); + + await this.waveformService.markProcessing(trackId); + await this.waveformService.incrementAttemptCount(trackId); + + try { + const { peaks } = await this.generatorService.generateFromFile( + audioFilePath, + ); + await this.waveformService.markDone(trackId, peaks); + } catch (err: unknown) { + const message = err instanceof Error ? err.message : String(err); + const isLastAttempt = + job.attemptsMade + 1 >= (job.opts.attempts ?? 1); + + if (isLastAttempt) { + // Persist terminal failure so the API can surface it. + await this.waveformService.markFailed( + trackId, + message, + job.attemptsMade + 1, + ); + } + + // Re-throw so BullMQ applies exponential back-off on non-final attempts + // and moves the job to the 'failed' set on the final attempt. + throw err; + } + } +}