
How often have you needed to decouple processes using a queue, only to find that you didn’t have one due to limitations such as resource constraints or the inability to install additional software?
In this series, I’ll show you how to solve this problem by using different libraries of tools that help you resolve it even if you do not have a real queue. These solutions are versatile and can be applied to a variety of scenarios.
Before getting started, I want to reassure you: these approaches are not just for when you cannot have a real queue due to limitations. They are also flexible enough to be used even when you can have one, without any problem.
By exploring the examples in this series, you’ll evaluate which approach is the best for your case.
Fastq
The first post is about fastq, an npm package for Node.js built by Matteo Collina.
As you can understand, this solution only works in a Node.js environment.
Using this package, you can create one or more in-memory queues.
This means that when the process kills, you will lose all the data in the queue, and you will need a strategy to recreate the queue on the application’s startup.
Getting started with fastq
To get started with fastq, you must install it on your Node.js project.
I have already created a simple project for you to follow along with the post; you can find it here.
So, the first simple step, after the git clone and the npm install, is to run the command npm install fastq, which installs the dependency in your project.
With that, you are ready to use fastq on your project.
As you can see, this package’s setup is smooth and easy to do.
My first queue
Now, it’s time to create your first queue.
Fastq has a simple API to interact with it. You must create a function that is the worker for your queue. This function has the first argument, which is the item to handle in the queue, and a second argument, which depends on which approach you want to follow to complete the item’s execution. If you like to use the callback approach, the second argument is your done function to call at the end of the worker; if there is a successful execution, the callback must be invoked with null as the first parameter. Otherwise, you must pass the error as the first parameter. Instead, if you are a promise approacher, you can skip this argument and handle the error throwing it, as you already know.
You’ll follow the promising approach for this post, but feel free to write the code using callback if you prefer.
Let’s create a new file src/queue.ts
containing our queue worker.
To simply this post, this queue will emulate the user’s sign-up in a real system, and its ownership will be to send a fake email to the user to confirm its email. As you can imagine, you won’t dive into all these details, but you’ll fake some services to have an accurate idea of what you can do with fastq.
So now it’s time to build your first queue! In the queue.ts
file, let’s start with the interface definition of the task.
interface UserCreatedTask {
id: string;
}
Code language: TypeScript (typescript)
This interface describes what your item in the queue looks like.
Now, you need to use it to handle the process in the queue, so first, you need to import some dependencies.
import type { queueAsPromised } from 'fastq';
import * as fastq from 'fastq';
import logger from 'logger.js';
import { setTimeout } from 'node:timers/promises';
Code language: TypeScript (typescript)
The first two imports are used for the queue; the logger shows some in the terminal during the execution, and the setTimeout fakes the execution time of an external service (in this case, a fake SMTP server).
With all these dependencies now you can create your worker.
async function userInsertHandler(arg: UserCreatedTask) {
logger.info(arg, 'User created task received');
const fakeImplementation = Math.random() > 0.8 ? 'success' : 'error'
const timeout = fakeImplementation === 'success' ? 2000 : 1000;
await setTimeout(timeout);
if (fakeImplementation === 'error') {
throw new Error(`User created task got error with id: ${arg.id}`);
}
Code language: TypeScript (typescript)
As you can see, this function is easy but can give you all the ideas required to handle a queue with fastq.
First, the argument is of type UserCreatedTask
, which means that you expect the item to have that shape.
Then, you faked a success or error result using a simple Math.random()
. If the result is successful, the execution will wait for 2 seconds and then resolve the promise; otherwise, the execution will wait for 1 second and raise an error.
Very easy, as you can understand.
But now you need to associate this worker to a real fastq queue; to do that, you must type this code.
const queue: queueAsPromised<UserCreatedTask> = fastq.promise(userInsertHandler, 1);
export default queue;
Code language: TypeScript (typescript)
Using the fastq.promise
method, you create your queue with fastq and associate your userInsertHandler
worker to your queue. Last but not least, the second parameter is the number of concurrency workers you want to have; in this case, there is just one worker for this queue, but you can increase it if you prefer.
The last row exposes the queue outside this JS module and allows pushing items inside the queue.

Pushing data inside the code
The last puzzle piece is about pushing data inside the queue.
To do that, you need to import the queue in the consumer module and invoke the push method.
So, move to the src/index.ts
file and import the queue.
import userInsertQueue from './queue.js';
Code language: TypeScript (typescript)
Now, you’ll create a fake running method that creates different user IDs and pushes them inside the queue.
async function run() {
const usersToHandle = new Array(5).fill(undefined).map(() => randomUUID())
for (const id of usersToHandle) {
const task = {
id
}
userInsertQueue.push(task)
.then(() => {
logger.info(task, `Task with id ${task.id} has been completed`);
})
.catch((error) => {
logger.error(task, `Task with id ${task.id} has failed`, error);
})
logger.info(task, `Task with id ${task.id} has been pushed`);
}
}
run()
Code language: TypeScript (typescript)
As you can see, the push method is a promise, so you can handle the promise result to understand when the task will be resolved or rejected.
Please remember this because it will be helpful when discussing the retry strategy.
Okay, let’s return to the example. In this run method, you create a user ID list using the randomUUID
method. Then, looping the list, you push the element inside the queue. Using the then and catch methods, you log the result of the execution of that ID.
Perfect, it’s time to check the result, so move to the terminal and run the command npm run start.
You should see something like this in your terminal
[14:35:13.067] INFO (59175): User created task received
id: "a966759c-f385-4651-a0c1-55742ceb4017"
[14:35:13.067] INFO (59175): Task with id a966759c-f385-4651-a0c1-55742ceb4017 has been pushed
id: "a966759c-f385-4651-a0c1-55742ceb4017"
[14:35:13.067] INFO (59175): Task with id 24d9d735-2399-471e-b64c-55ee0c769daa has been pushed
id: "24d9d735-2399-471e-b64c-55ee0c769daa"
[14:35:13.067] INFO (59175): Task with id 726b4bc5-794d-4581-a96c-e41a474c446d has been pushed
id: "726b4bc5-794d-4581-a96c-e41a474c446d"
[14:35:13.067] INFO (59175): Task with id 1a6dd1b4-2123-410b-a2ad-f420a14d53e9 has been pushed
id: "1a6dd1b4-2123-410b-a2ad-f420a14d53e9"
[14:35:13.067] INFO (59175): Task with id 6a8681e4-bba4-42e0-b484-976de8be9bcb has been pushed
id: "6a8681e4-bba4-42e0-b484-976de8be9bcb"
[14:35:14.069] INFO (59175): User created task received
id: "24d9d735-2399-471e-b64c-55ee0c769daa"
[14:35:14.070] ERROR (59175): Task with id a966759c-f385-4651-a0c1-55742ceb4017 has failed
id: "a966759c-f385-4651-a0c1-55742ceb4017"
[14:35:15.070] INFO (59175): User created task received
id: "726b4bc5-794d-4581-a96c-e41a474c446d"
[14:35:15.071] ERROR (59175): Task with id 24d9d735-2399-471e-b64c-55ee0c769daa has failed
id: "24d9d735-2399-471e-b64c-55ee0c769daa"
[14:35:16.071] INFO (59175): User created task received
id: "1a6dd1b4-2123-410b-a2ad-f420a14d53e9"
[14:35:16.071] ERROR (59175): Task with id 726b4bc5-794d-4581-a96c-e41a474c446d has failed
id: "726b4bc5-794d-4581-a96c-e41a474c446d"
[14:35:17.072] INFO (59175): User created task received
id: "6a8681e4-bba4-42e0-b484-976de8be9bcb"
[14:35:17.073] ERROR (59175): Task with id 1a6dd1b4-2123-410b-a2ad-f420a14d53e9 has failed
id: "1a6dd1b4-2123-410b-a2ad-f420a14d53e9"
[14:35:19.074] INFO (59175): Task with id 6a8681e4-bba4-42e0-b484-976de8be9bcb has been completed
id: "6a8681e4-bba4-42e0-b484-976de8be9bcb"
Code language: Bash (bash)
Double-checking the result, you can notice that some tasks have been failed and others have been completed. So, as you can understand, there isn’t any retry strategy out-of-the-box in fastq, because the failed tasks remain failed at the end of the execution.
So, how can you build your retry strategy?
In this example, a map can be used to save the task’s status and the number of retries.
Let’s see the implementations
import logger from 'logger.js';
import { randomUUID } from 'node:crypto';
import userInsertQueue from './queue.js';
type Status = 'not-handle' | 'error' | 'success'
async function run() {
const usersToHandle = new Array(5).fill(undefined).map(() => randomUUID())
const userProcessStatus: Record<string, { status: Status, retryValue: number }> = {}
for (const id of usersToHandle) {
userProcessStatus[id] = {
status: 'not-handle',
retryValue: 0
};
const task = {
id
}
const pushMessage = () => userInsertQueue.push(task)
.then(() => {
logger.info(`Task with id ${task.id} has been proceed`);
userProcessStatus[id].status = 'success';
})
.catch(err => {
const state = userProcessStatus[id]
logger.error(err, `Task with id ${task.id} got error after retry ${state.retryValue}`);
state.status = 'error';
if (state.retryValue < 3) {
state.retryValue++
pushMessage()
}
})
pushMessage()
logger.info(task, `Task with id ${task.id} has been pushed`);
}
process.on('exit', () => {
for (const [id, { status, retryValue }] of Object.entries(userProcessStatus)) {
logger.info(`Id ${id} is ${status} after ${retryValue} retries`);
}
})
}
run()
Code language: TypeScript (typescript)
In this approach, the userProcessStatus
map contains the user IDs as keys and an object with two fields: the user’s status and the retries for that ID in the values.
Then, using the pushMessage
you create a method that handles this map. First, you push the element in the queue, and waiting for the result, you can handle the retry in case of failure. So, if the result succeeds, you can set the element in the map as completed. Still, if the element fails, you set the status as error
, and if the retry number is less than three, you increase the retry number and invoke the pushMessage again to retry that specific element.
I also added a hook before exiting the process to recap the tasks’ results just for debugging.
Now, you can run npm run start again and look at the result. You will notice that the program tries to handle the failed tasks before ending.
Conclusion
It’s time to wrap up this blog post.
In this article, you learnt:
- how to create your worker
- how to create a queue with fastq and associate this queue with your worker
- how to push items in the queue
- how to build a retry strategy for your item in the queue
As you understand, this is a good solution in some scenarios and is easy to use, but it has some complications you must handle. First, if you need a retry strategy, you must implement it; second, this solution is entirely in memory, so you must hold the status of your entity in some way; otherwise, if the process dies, you’ll lose the status of your system.
That’s it from this blog post; I hope you’ve enjoyed it, and see you soon for the next chapter.
You can find the result of this post at this link.