All Posts

Published Development

Queueing with BullMQ

BullMQ logo

I’ve been working with BullMQ for a bit, so I wanted to take a closer look at the library and provide some clarifications, share my structure and boilerplate code. I hope someone finds it useful.

What and Why

For those unfamilliar, BullMQ is a Node.js library for queueing. You may know RabbitMQ or Kafka, which are similar tools. Keep in mind that BullMQ works on top of Redis.

My usecase was I had an Express API and needed to process background jobs which were too long to be done in a single Request-Response model.

BullMQ classes

BullMQ has 4 base classes from the documentation that provide all the functionallity. Those are:

And I will add from myself a 5th class which is essential: Jobs

Basic flow

A queue and can be used for adding jobs to the queue as well as some other basic manipulation such as pausing, cleaning or getting data from the queue.

Jobs in BullMQ are basically a user created data structure that can be stored in the queue. Jobs are processed by workers. Workers are instances capable of processing jobs. You can have many workers, either running in the same Node.js process, or in separate processes as well as in different machines. They will all consume jobs from the queue and mark the jobs as completed or failed.

All classes emit events that inform on the lifecycles of the jobs that are running in the queue. Every class is an EventEmitter and emits different events.

import { QueueEvents } from "bullmq";

const queueEvents = new QueueEvents("Paint");

queueEvents.on("completed", ({ jobId: string }) => {
  // Called every time a job is completed in any worker.
});

queueEvents.on(
  "progress",
  ({ jobId, data }: { jobId: string; data: number | object }) => {
    // jobId received a progress event
  }
);

Flows are useful for parent - child relationships between jobs. The basic idea is that a parent job will not be moved to the wait status (i.e. where it could be picked up by a worker) until all its children jobs have been processed successfully.

// FlowJob interface that FlowProducer.add() expects
interface FlowJob {
  name: string;
  queueName: string;
  data?: any;
  prefix?: string;
  opts?: Omit<JobsOptions, "parent" | "repeat">;
  children?: FlowJob[];
}

import { FlowProducer } from "bullmq";

const flowProducer = new FlowProducer();

const flow = await flowProducer.add({
  name: "renovate-interior",
  queueName: "renovate",
  children: [
    { name: "paint", data: { place: "ceiling" }, queueName: "steps" },
    { name: "paint", data: { place: "walls" }, queueName: "steps" },
    { name: "fix", data: { place: "floor" }, queueName: "steps" },
  ],
});

BullMQ structure

Now, let’s dive more how I came up with a structure that works relatively well in my current Express API.

P.S. I store all BullMQ queueing stuff under src/lib/queue

Redis Connection

As we know, BullMQ works on top of Redis, so it needs a Redis connection in order to work. You can either have a local running Redis, or in a Docker. I prefer dockerized solution. Basic docker-compose file may look like this:

name: redis-example
services:
  redis:
    image: redis:7.4.1
    restart: always
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data
    command: [
        "redis-server",
        "--appendonly",
        "yes",
        "--maxmemory",
        "512mb",
        "--maxmemory-policy",
        "noeviction", # Important to have in order to avoid automatic removal of keys which would cause unexpected errors in BullMQ
      ]
    healthcheck:
      test: ["CMD-SHELL", "redis-cli ping"]
      interval: 10s
      timeout: 5s
      retries: 5

And for connecting in your Node.js app, I prefer ioredis package, although you may use node-redis

import { Redis } from "ioredis";

export const redisConnection = new Redis({
  host: process.env.REDIS_HOST,
  port: process.env.REDIS_PORT,
  maxRetriesPerRequest: null,
});

export const rootConfig = {
  connection: redisConnection,
};

Types and Constants

An example of constants.ts and types.ts file:

// constants.ts

export const QUEUE_TYPES = {
  REPORTS: "reports",
  KPI: "kpi",
} as const;

export const JOB_TYPES = {
  DOWNLOAD_REPORT: "download-report",
  DOWNLOAD_KPI: "download-kpi",
} as const;

Here we just define our job and queue types, since we can have many queues that process different stuff, as well as many jobs that do a specific task.

// types.ts

import { ConnectionOptions, DefaultJobOptions, Job } from "bullmq";
import { QUEUE_TYPES, JOB_TYPES } from "./constants";

export type QueueType = (typeof QUEUE_TYPES)[keyof typeof QUEUE_TYPES];
export type JobType = (typeof JOB_TYPES)[keyof typeof JOB_TYPES];

export type BaseConfig = {
  queueName: QueueType;
  connection: ConnectionOptions;
};

export type WorkerConfig = BaseConfig & {
  isSandboxed?: boolean;
  concurrency?: number;
  useWorkerThreads?: boolean;
};

export type QueueConfig = BaseConfig & {
  defaultJobOptions?: DefaultJobOptions;
};

export type JobProcessor = {
  handle: (job: Job) => Promise<any>;
  completed: (job: Job) => void;
  failed: (job: Job | undefined, error: Error) => void;
};

It’s just a nice structure I figured, to have customizeable configs, Queue and Job types and JobProcessor (will talk later) for handling typescript.

Queues

For Queues, I decided to have a QueueFactory since queues don’t differ very much (at least in my case)

// queues/queue-factory.ts

import { Queue } from "bullmq";
import { rootConfig } from "../config";
import { QueueConfig, QueueType } from "../types";

export function createQueue(queueName: QueueType): Queue {
  const queueConfig: QueueConfig = {
    ...rootConfig,
    queueName,
    defaultJobOptions: {
      removeOnComplete: {
        age: 3600,
        count: 200,
      },
      removeOnFail: {
        age: 24 * 3600,
        count: 1000,
      },
      attempts: 3,
      backoff: {
        type: "exponential",
        delay: 1000,
      },
    },
  };

  const { queueName: name, ...config } = queueConfig;
  return new Queue(name, config);
}

Here we just extend our rootConfig, pass a queueName and give default job options (you could also pass it as a second param to a function)

And for creating a queue, it’s as simple as:

// queues/reports/index.ts
import { QUEUE_TYPES } from "../../constants";
import { createQueue } from "../queue-factory";

export const ReportsQueue = createQueue(QUEUE_TYPES.REPORTS);

Jobs

Jobs are a bit more complicated. For my case, I have each job follow a strict structure:

queues
jobs
  - job_name
    - job_name.config.ts
    - job_name.processor.ts
    - job_name.slave.ts
    - job_name.worker.ts
    - index.ts
  - another_job_name
  .
  .
  index.ts
  worker-factory.ts

Let’s dive more into each file and in worker-factory.ts. Examples are for download-report job.

config.ts

import { rootConfig } from "../../config";
import { QUEUE_TYPES } from "../../constants";
import { WorkerConfig } from "../../types";

export const downloadReportConfig: WorkerConfig = {
  ...rootConfig,
  queueName: QUEUE_TYPES.REPORTS,
  isSandboxed: true,
  concurrency: 1,
};

export type DownloadReportJobData = {
  vendorId: string;
  userId: string;
  filename: string;
  vendorName: string;
  data: any;
};

processor.ts

import { Job } from "bullmq";
import { JobProcessor } from "../../types";
import { DownloadReportJobData } from "./download-report.config";

export class DownloadReportProcessor implements JobProcessor {
  constructor() {}

  async handle(job: Job<DownloadReportJobData>) {
    try {
      // TIP: Use updateProgress to indicate job progress

      await job.updateProgress(10);
      // ...
      await job.updateProgress(80);
      // ...
      await job.updateProgress(100);
      return someData;
    } catch (error) {
      // Implement error handling
    }
  }

  completed(job: Job) {
    // Do something when completed
    console.log("Download report completed", job.id);
  }

  async failed(job: Job<DownloadReportJobData> | undefined, error: Error) {
    // Cleanup if failed
    console.error("Download report failed", job?.id, error);
  }
}

slave.ts

import { DownloadReportProcessor } from "./download-report.processor";

const instance = new DownloadReportProcessor();

export default instance.handle.bind(instance);

worker.ts

import { createWorker } from "../worker-factory";
import { downloadReportConfig } from "./download-report.config";
import { DownloadReportProcessor } from "./download-report.processor";

const dirname = import.meta.dirname;
const instance = new DownloadReportProcessor();

export const DownloadReportWorker = createWorker(
  downloadReportConfig.queueName,
  downloadReportConfig,
  instance,
  `${dirname}/download-report.slave`
);

index.ts

export { DownloadReportWorker } from "./download-report.worker";
export type { DownloadReportJobData } from "./download-report.config";

And as for Worker Factory:

worker-factory.ts

import { Worker } from "bullmq";
import { QueueType, WorkerConfig, JobProcessor } from "../types";

export function createWorker(
  queueName: QueueType,
  config: WorkerConfig,
  processorInstance: JobProcessor,
  slavePath: string
): Worker {
  const { isSandboxed, ...workerConfig } = config;

  const processor = isSandboxed
    ? `${slavePath}.${env.NODE_ENV === "production" ? "js" : "ts"}` // when we build an app, all files are .js
    : processorInstance.handle.bind(processorInstance);

  const worker = new Worker(queueName, processor, workerConfig);

  worker.on("completed", processorInstance.completed.bind(processorInstance));
  worker.on("failed", processorInstance.failed.bind(processorInstance));

  return worker;
}

Initializing Workers

Finally, to start workers so that they are monitoring jobs and processing them, my lib.ts file looks like this:

import { Worker } from "bullmq";
import { JOB_TYPES } from "./constants";
import { DownloadKpiWorker, DownloadReportWorker } from "./jobs";

const WorkerMap = new Map<string, Worker<any, any, string>>([
  [JOB_TYPES.DOWNLOAD_REPORT, DownloadReportWorker],
  [JOB_TYPES.DOWNLOAD_KPI, DownloadKpiWorker],
]);

export function initializeJobs() {
  WorkerMap.forEach((worker, jobType) => {
    worker.on("error", (error) => {
      console.error(`Worker ${jobType} error:`, error);
    });
  });
}

export async function stopJobs() {
  return Promise.all(
    Array.from(WorkerMap.values()).map((worker) => worker.close())
  );
}

And then in the worker entrypoint file I have:

// src/worker.ts
import { redisConnection } from "./lib";
import { initializeJobs, stopJobs } from "./lib/queue";

function startWorkers() {
  initializeJobs();
  console.log("Worker service started");
}

async function stopWorkers() {
  await stopJobs();
  await redisConnection.quit();

  console.log("Worker service stopped");

  process.exit(0);
}

startWorkers();

process.on("SIGTERM", stopWorkers);
process.on("SIGINT", stopWorkers);

You can also have it when you start your express server like this:

import { app } from "./app";
import { initializeJobs, redisConnection, stopJobs } from "./lib";

const server = app.listen(process.env.API_PORT, () => {
  console.log(`Server is running on port ${process.env.API_PORT}`);
  initializeJobs();
});

const shutdown = async () => {
  server.close(() => {
    console.log("Server shutdown");
  });

  await stopJobs();
  await redisConnection.quit();

  process.exit(0);
};

process.on("SIGTERM", shutdown);
process.on("SIGINT", shutdown);

But for my usecase I have separate docker services running: 1 for express API, 1 for workers

Using BullMQ in Express API routes

And, for the final demonstration: how do you actually create a job for worker to process? It’s actually pretty easy.

In your route handler, you may write something like this:

router.get("/", (req, res, next) => {
  try {
    // generate a filename that will be generated when background processing is finished
    const filename = generateFilename();
    await ReportsQueue.add(JOB_TYPES.DOWNLOAD_REPORT, {
      vendorId,
      vendorName,
      userId,
      filename,
      data,
    });

    res.json({ data: { filename } });
  } catch (error) {
    next(error);
  }
});

And that’s it.

Conclusions

Working with queueing taught me a lot, I think every developer should know how to manage and work with a queueing system. Of course, BullMQ is not Kafka, but you pick a right tool for a job, and for me BullMQ satisfies my project requirements.

Do you notice any problems with this post?

Feel free to make edits on Github, as any improvement, no matter how minor, is welcomed!

Edit

© 2025 Volodymyr Mokhun