Prerequisite:
- Please ensure you’ve installed RabbitMQ in your local machine.
- Alternatively, you may run RabbitMQ docker container
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management
- We will using RabbitMQ Javascript client amqplib. Please ensure you’ve setup a new project with this package installed.
- Alternatively, you can also clone this project to have application-ready demonstrations.
We are going to demonstrate the Fanout exchange
message queue flow as shown in the figure below
Code demonstration
We will run publisher.js
and comsumer.js
separately.
publisher.js
const amqp = require("amqplib");
async function fanoutExchange(){
try{
const rabbitmqUrl = "amqp://localhost:5672";
const connection = await amqp.connect(rabbitmqUrl);
const exchange = "transports";
const exchangeType = "fanout";
const routingKey = "";
const options = {};
const payload = {
vehicleType: "car",
numberOfPassenger: 3,
};
let channel = await connection.createChannel();
await channel.assertExchange(exchange, exchangeType, options);
channel.publish(
exchange,
routingKey,
Buffer.from(JSON.stringify(payload)),
options
);
}catch(error){
console.error(error)
}
}
fanoutExchange()
consumer.js
const amqp = require("amqplib");
async function fanoutExchangeConsumer(){
try{
const rabbitmqUrl = "amqp://localhost:5672";
const connection = await amqp.connect(rabbitmqUrl);
const exchange = "transports";
const exchangeType = "fanout";
const routingKey = "";
const options = {};
let channel = await connection.createChannel();
await channel.assertExchange(exchange, exchangeType, options);
const { queue } = await channel.assertQueue("", options);
channel.bindQueue(queue, exchange, routingKey);
channel.consume(queue, (data) => {
console.log("Received", JSON.parse(data.content.toString()));
channel.ack(data, false, true);
});
}catch(error){
console.error(error)
}
}
fanoutExchangeConsumer()
Before we start:
- To have better visualization, please run the
publisher.js
in 1 terminal andconsumer.js
in 2 terminals. You can either use the split terminal in Visual Studio Code or run the files in your machine terminals. - If you’re not using the example from the provided Git Repository. Please ensure you run the
consumer.js
in 2 terminals followed by runningpublisher.js
in 1 terminal
Result for this example
Steps to demo from Git Repository
- Open a new terminal
npm run dev
to start the server- Open 2 new terminals and run the following command:
npm run consumer
- Make a POST request to
http://localhost:3000/api/fanout/exchange/transports
with your custom payload.
Result
Explanations
-
Publisher
- Before the process started, we need to first connect to the RabbitMQ.
- The next step is to make connection with the channel and start creating our desired exchange by using
assertExchange()
method- In here we specify the following argument’s value:
- exchange name is
transports
in this example - exchange type is
fanout
- we didn’t specify the options in this example but feel free to have a look on the available options at here
- exchange name is
- In here we specify the following argument’s value:
- And now, we can start publishing our message to the exchange by using
publish()
method with the following arguments:- we can use back the exchange name we specified earlier ‘transports’
routingKey
, we could just leave it as empty as it doesn’t apply anything forfanout
exchange typepayload
, Publish message only accepts buffer payload. To achieve this, we can first stringify our payload if it’s an object and convert it into a Buffer by usingBuffer.from
options
, we didn’t specify the options in this example but feel free to have a look at the available options at here
-
Comsumer
- Before the process started, we need to first connect to the RabbitMQ.
- The next step is to make connection with the channel and start creating our desired exchange by using
assertExchange()
method- In here we specify the following argument’s value:
- exchange name is
transports
in this example - exchange type is
fanout
- we didn’t specify the options in this example but feel free to have a look at the available options at here
- exchange name is
- In here we specify the following argument’s value:
- And now we can start to create our queue and bind with the exchange we created earlier.
- As you can observe from the code, we are putting it as an empty string when creating the queue
channel.assertQueue("",options)
. This is because we do not need to bother with the queue name when binding the exchange and queue. By putting an empty string for the queue name, it will return us a unique and random queue name such asamq.gen-JzTY20BRgKO-HjmUJj0wLg
- There are some useful options we can take note of when creating the queue:
durable
: if true, the queue still stays alive even if we restart the broker.exclusive
: if true, the queue will be deleted when connection is closed.expires
: specify the time in a millisecond to delete the queue when no consumer is connecting to the queue.- checkout other options here
- We could just leave the
routing key
an empty for the binding between exchange and queue because it does not apply anything for thefanout
exchange. - Check out the direct and topic exchange for the use case of
routing key
- As you can observe from the code, we are putting it as an empty string when creating the queue
- After the binding process is finished, we can now consume the message whenever there is an incoming message from the broker by using
.consume()
method - When we received the message, we can now acknowledge the message by using
ack()
method so that the broker will know we successfully retrieve the message.