Published
Queueing with BullMQ

I’ve been working with BullMQ for a bit, so I wanted to take a closer look at the library and provide some clarifications, share my structure and boilerplate code. I hope someone finds it useful.
What and Why
For those unfamilliar, BullMQ is a Node.js library for queueing. You may know RabbitMQ or Kafka, which are similar tools. Keep in mind that BullMQ works on top of Redis.
My usecase was I had an Express API and needed to process background jobs which were too long to be done in a single Request-Response model.
BullMQ classes
BullMQ has 4 base classes from the documentation that provide all the functionallity. Those are:
And I will add from myself a 5th class which is essential: Jobs
Basic flow
A queue and can be used for adding jobs to the queue as well as some other basic manipulation such as pausing, cleaning or getting data from the queue.
Jobs in BullMQ are basically a user created data structure that can be stored in the queue. Jobs are processed by workers. Workers are instances capable of processing jobs. You can have many workers, either running in the same Node.js process, or in separate processes as well as in different machines. They will all consume jobs from the queue and mark the jobs as completed or failed.
All classes emit events that inform on the lifecycles of the jobs that are running in the queue. Every class is an EventEmitter and emits different events.
import { QueueEvents } from "bullmq";
const queueEvents = new QueueEvents("Paint");
queueEvents.on("completed", ({ jobId: string }) => {
// Called every time a job is completed in any worker.
});
queueEvents.on(
"progress",
({ jobId, data }: { jobId: string; data: number | object }) => {
// jobId received a progress event
}
);
Flows are useful for parent - child relationships between jobs. The basic idea is that a parent job will not be moved to the wait status (i.e. where it could be picked up by a worker) until all its children jobs have been processed successfully.
- Add Flows using FlowProducer class
// FlowJob interface that FlowProducer.add() expects
interface FlowJob {
name: string;
queueName: string;
data?: any;
prefix?: string;
opts?: Omit<JobsOptions, "parent" | "repeat">;
children?: FlowJob[];
}
import { FlowProducer } from "bullmq";
const flowProducer = new FlowProducer();
const flow = await flowProducer.add({
name: "renovate-interior",
queueName: "renovate",
children: [
{ name: "paint", data: { place: "ceiling" }, queueName: "steps" },
{ name: "paint", data: { place: "walls" }, queueName: "steps" },
{ name: "fix", data: { place: "floor" }, queueName: "steps" },
],
});
BullMQ structure
Now, let’s dive more how I came up with a structure that works relatively well in my current Express API.
P.S. I store all BullMQ queueing stuff under src/lib/queue
Redis Connection
As we know, BullMQ works on top of Redis, so it needs a Redis connection in order to work. You can either have a local running Redis, or in a Docker. I prefer dockerized solution. Basic docker-compose file may look like this:
name: redis-example
services:
redis:
image: redis:7.4.1
restart: always
ports:
- "6379:6379"
volumes:
- redis-data:/data
command: [
"redis-server",
"--appendonly",
"yes",
"--maxmemory",
"512mb",
"--maxmemory-policy",
"noeviction", # Important to have in order to avoid automatic removal of keys which would cause unexpected errors in BullMQ
]
healthcheck:
test: ["CMD-SHELL", "redis-cli ping"]
interval: 10s
timeout: 5s
retries: 5
And for connecting in your Node.js app, I prefer ioredis package, although you may use node-redis
import { Redis } from "ioredis";
export const redisConnection = new Redis({
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT,
maxRetriesPerRequest: null,
});
export const rootConfig = {
connection: redisConnection,
};
Types and Constants
An example of constants.ts
and types.ts
file:
// constants.ts
export const QUEUE_TYPES = {
REPORTS: "reports",
KPI: "kpi",
} as const;
export const JOB_TYPES = {
DOWNLOAD_REPORT: "download-report",
DOWNLOAD_KPI: "download-kpi",
} as const;
Here we just define our job and queue types, since we can have many queues that process different stuff, as well as many jobs that do a specific task.
// types.ts
import { ConnectionOptions, DefaultJobOptions, Job } from "bullmq";
import { QUEUE_TYPES, JOB_TYPES } from "./constants";
export type QueueType = (typeof QUEUE_TYPES)[keyof typeof QUEUE_TYPES];
export type JobType = (typeof JOB_TYPES)[keyof typeof JOB_TYPES];
export type BaseConfig = {
queueName: QueueType;
connection: ConnectionOptions;
};
export type WorkerConfig = BaseConfig & {
isSandboxed?: boolean;
concurrency?: number;
useWorkerThreads?: boolean;
};
export type QueueConfig = BaseConfig & {
defaultJobOptions?: DefaultJobOptions;
};
export type JobProcessor = {
handle: (job: Job) => Promise<any>;
completed: (job: Job) => void;
failed: (job: Job | undefined, error: Error) => void;
};
It’s just a nice structure I figured, to have customizeable configs, Queue and Job types and JobProcessor (will talk later) for handling typescript.
Queues
For Queues, I decided to have a QueueFactory since queues don’t differ very much (at least in my case)
// queues/queue-factory.ts
import { Queue } from "bullmq";
import { rootConfig } from "../config";
import { QueueConfig, QueueType } from "../types";
export function createQueue(queueName: QueueType): Queue {
const queueConfig: QueueConfig = {
...rootConfig,
queueName,
defaultJobOptions: {
removeOnComplete: {
age: 3600,
count: 200,
},
removeOnFail: {
age: 24 * 3600,
count: 1000,
},
attempts: 3,
backoff: {
type: "exponential",
delay: 1000,
},
},
};
const { queueName: name, ...config } = queueConfig;
return new Queue(name, config);
}
Here we just extend our rootConfig, pass a queueName
and give default job options (you could also pass it as a second param to a function)
And for creating a queue, it’s as simple as:
// queues/reports/index.ts
import { QUEUE_TYPES } from "../../constants";
import { createQueue } from "../queue-factory";
export const ReportsQueue = createQueue(QUEUE_TYPES.REPORTS);
Jobs
Jobs are a bit more complicated. For my case, I have each job follow a strict structure:
queues
jobs
- job_name
- job_name.config.ts
- job_name.processor.ts
- job_name.slave.ts
- job_name.worker.ts
- index.ts
- another_job_name
.
.
index.ts
worker-factory.ts
Let’s dive more into each file and in worker-factory.ts
. Examples are for download-report job.
config.ts
import { rootConfig } from "../../config";
import { QUEUE_TYPES } from "../../constants";
import { WorkerConfig } from "../../types";
export const downloadReportConfig: WorkerConfig = {
...rootConfig,
queueName: QUEUE_TYPES.REPORTS,
isSandboxed: true,
concurrency: 1,
};
export type DownloadReportJobData = {
vendorId: string;
userId: string;
filename: string;
vendorName: string;
data: any;
};
processor.ts
import { Job } from "bullmq";
import { JobProcessor } from "../../types";
import { DownloadReportJobData } from "./download-report.config";
export class DownloadReportProcessor implements JobProcessor {
constructor() {}
async handle(job: Job<DownloadReportJobData>) {
try {
// TIP: Use updateProgress to indicate job progress
await job.updateProgress(10);
// ...
await job.updateProgress(80);
// ...
await job.updateProgress(100);
return someData;
} catch (error) {
// Implement error handling
}
}
completed(job: Job) {
// Do something when completed
console.log("Download report completed", job.id);
}
async failed(job: Job<DownloadReportJobData> | undefined, error: Error) {
// Cleanup if failed
console.error("Download report failed", job?.id, error);
}
}
slave.ts
import { DownloadReportProcessor } from "./download-report.processor";
const instance = new DownloadReportProcessor();
export default instance.handle.bind(instance);
worker.ts
import { createWorker } from "../worker-factory";
import { downloadReportConfig } from "./download-report.config";
import { DownloadReportProcessor } from "./download-report.processor";
const dirname = import.meta.dirname;
const instance = new DownloadReportProcessor();
export const DownloadReportWorker = createWorker(
downloadReportConfig.queueName,
downloadReportConfig,
instance,
`${dirname}/download-report.slave`
);
index.ts
export { DownloadReportWorker } from "./download-report.worker";
export type { DownloadReportJobData } from "./download-report.config";
And as for Worker Factory:
worker-factory.ts
import { Worker } from "bullmq";
import { QueueType, WorkerConfig, JobProcessor } from "../types";
export function createWorker(
queueName: QueueType,
config: WorkerConfig,
processorInstance: JobProcessor,
slavePath: string
): Worker {
const { isSandboxed, ...workerConfig } = config;
const processor = isSandboxed
? `${slavePath}.${env.NODE_ENV === "production" ? "js" : "ts"}` // when we build an app, all files are .js
: processorInstance.handle.bind(processorInstance);
const worker = new Worker(queueName, processor, workerConfig);
worker.on("completed", processorInstance.completed.bind(processorInstance));
worker.on("failed", processorInstance.failed.bind(processorInstance));
return worker;
}
Initializing Workers
Finally, to start workers so that they are monitoring jobs and processing them, my lib.ts
file looks like this:
import { Worker } from "bullmq";
import { JOB_TYPES } from "./constants";
import { DownloadKpiWorker, DownloadReportWorker } from "./jobs";
const WorkerMap = new Map<string, Worker<any, any, string>>([
[JOB_TYPES.DOWNLOAD_REPORT, DownloadReportWorker],
[JOB_TYPES.DOWNLOAD_KPI, DownloadKpiWorker],
]);
export function initializeJobs() {
WorkerMap.forEach((worker, jobType) => {
worker.on("error", (error) => {
console.error(`Worker ${jobType} error:`, error);
});
});
}
export async function stopJobs() {
return Promise.all(
Array.from(WorkerMap.values()).map((worker) => worker.close())
);
}
And then in the worker entrypoint file I have:
// src/worker.ts
import { redisConnection } from "./lib";
import { initializeJobs, stopJobs } from "./lib/queue";
function startWorkers() {
initializeJobs();
console.log("Worker service started");
}
async function stopWorkers() {
await stopJobs();
await redisConnection.quit();
console.log("Worker service stopped");
process.exit(0);
}
startWorkers();
process.on("SIGTERM", stopWorkers);
process.on("SIGINT", stopWorkers);
You can also have it when you start your express server like this:
import { app } from "./app";
import { initializeJobs, redisConnection, stopJobs } from "./lib";
const server = app.listen(process.env.API_PORT, () => {
console.log(`Server is running on port ${process.env.API_PORT}`);
initializeJobs();
});
const shutdown = async () => {
server.close(() => {
console.log("Server shutdown");
});
await stopJobs();
await redisConnection.quit();
process.exit(0);
};
process.on("SIGTERM", shutdown);
process.on("SIGINT", shutdown);
But for my usecase I have separate docker services running: 1 for express API, 1 for workers
Using BullMQ in Express API routes
And, for the final demonstration: how do you actually create a job for worker to process? It’s actually pretty easy.
In your route handler, you may write something like this:
router.get("/", (req, res, next) => {
try {
// generate a filename that will be generated when background processing is finished
const filename = generateFilename();
await ReportsQueue.add(JOB_TYPES.DOWNLOAD_REPORT, {
vendorId,
vendorName,
userId,
filename,
data,
});
res.json({ data: { filename } });
} catch (error) {
next(error);
}
});
And that’s it.
Conclusions
Working with queueing taught me a lot, I think every developer should know how to manage and work with a queueing system. Of course, BullMQ is not Kafka, but you pick a right tool for a job, and for me BullMQ satisfies my project requirements.