import { Injectable, OnApplicationBootstrap, OnModuleDestroy, OnModuleInit, } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { JobsOptions, Queue, Worker } from 'bullmq'; import { AppLoggerService } from '../logging/app-logger.service'; import { RedisService } from '../redis/redis.service'; type JobProcessor = (payload: Record) => Promise; @Injectable() export class AppQueueService implements OnModuleInit, OnApplicationBootstrap, OnModuleDestroy { private readonly processors = new Map(); private queue: Queue | null = null; private worker: Worker | null = null; constructor( private readonly configService: ConfigService, private readonly redisService: RedisService, private readonly logger: AppLoggerService, ) {} onModuleInit(): void { // Intentionally empty. Processors are usually registered by other providers before bootstrap. } onApplicationBootstrap(): void { if (!this.isQueueEnabled() || !this.redisService.isEnabled()) { return; } const queueName = this.getQueueName(); const queueConnection = this.redisService.createQueueClient(); const workerConnection = this.redisService.createQueueClient(); if (!queueConnection || !workerConnection) { return; } this.queue = new Queue(queueName, { connection: queueConnection, defaultJobOptions: this.getDefaultJobOptions(), }); this.worker = new Worker( queueName, async (job) => { const processor = this.processors.get(job.name); if (!processor) { throw new Error(`No processor registered for job "${job.name}"`); } await processor(job.data as Record); }, { connection: workerConnection, concurrency: this.configService.get('queue.workerConcurrency', { infer: true }) ?? 5, }, ); this.worker.on('failed', (job, error) => { this.logger.error( { queue: queueName, jobName: job?.name, jobId: job?.id, error: error.message, }, undefined, AppQueueService.name, ); }); } registerProcessor(jobName: string, processor: JobProcessor): void { this.processors.set(jobName, processor); } async enqueue( jobName: string, payload: Record, options: JobsOptions = {}, ): Promise { if (this.queue) { await this.queue.add(jobName, payload, { ...this.getDefaultJobOptions(), ...options, }); return; } const processor = this.processors.get(jobName); if (!processor) { return; } queueMicrotask(() => { void processor(payload).catch((error: Error) => { this.logger.error( { jobName, payload, error: error.message, }, error.stack, AppQueueService.name, ); }); }); } async onModuleDestroy(): Promise { await this.worker?.close(); await this.queue?.close(); this.worker = null; this.queue = null; } private isQueueEnabled(): boolean { return this.configService.get('queue.enabled', { infer: true }) ?? false; } private getQueueName(): string { return this.configService.get('queue.name', { infer: true }) ?? 'app-jobs'; } private getDefaultJobOptions(): JobsOptions { return { attempts: this.configService.get('queue.defaultJobAttempts', { infer: true }) ?? 3, backoff: { type: 'exponential', delay: this.configService.get('queue.defaultJobBackoffMs', { infer: true }) ?? 1000, }, removeOnComplete: this.configService.get('queue.removeOnComplete', { infer: true }) ?? true, }; } }