Prerequisite:
- Please ensure you’ve installed RabbitMQ in your local machine.
- Alternatively, you may run RabbitMQ docker container
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management
- We will using RabbitMQ Javascript client amqplib. Please ensure you’ve setup a new project with this package installed.
- Alternatively, you can also clone this project to have application-ready demonstrations.
Code demonstration
We will run publisher.js
and comsumer.js
separately.
publisher.js
const amqp = require("amqplib");
async function topicExchangePublisher() {
try {
const rabbitmqUrl = "amqp://localhost:3031";
const connection = await amqp.connect(rabbitmqUrl);
const exchange = "animals";
const exchangeType = "topic";
const routingKey = "animal.mammal.tiger";
const options = {};
const payload = {
numberOfAnimal: 10,
};
let channel = await connection.createChannel();
await channel.assertExchange(exchange, exchangeType, options);
channel.publish(
exchange,
routingKey,
Buffer.from(JSON.stringify(payload)),
options
);
} catch (error) {
console.error(error);
}
}
topicExchangePublisher();
consumer.js
const amqp = require("amqplib");
async function topicExchangeConsumer() {
try {
const rabbitmqUrl = "amqp://localhost:3031";
const connection = await amqp.connect(rabbitmqUrl);
const exchange = "animals";
const exchangeType = "topic";
const routingKey = process.argv[2];
const options = {};
let channel = await connection.createChannel();
await channel.assertExchange(exchange, exchangeType, options);
const { queue } = await channel.assertQueue("", options);
channel.bindQueue(queue, exchange, routingKey);
channel.consume(queue, (data) => {
console.log("Received", JSON.parse(data.content.toString()));
channel.ack(data, false, true);
});
} catch (error) {
console.error(error);
}
}
topicExchangeConsumer();
Before we start:
- To have better visualization, please run the
publisher.js
in 1 terminal andconsumer.js
in 2 terminals. You can either use the split terminal in Visual Studio Code or run the files in your machine terminals. - If you’re not using the example from the provided Git Repository. Please ensure you run the
consumer.js
in 2 terminals followed by runningpublisher.js
in 1 terminal as shown in below:node consumer.js animal.#
node consumer.js "animal.*.tiger"
node producder
Result for this example
Steps to demo from Git Repository
-
Open a new terminal
-
npm run dev
to start the server -
Open 2 new terminals and run the following command:
npm run consumer
{ "payload":{ "numberOfAnimal": 10 }, "properties":{ "routing_key":"animal.mammal.tiger" } }
-
Make a POST request to
http://localhost:3000/api/topic/exchange/animals
with the payload above. -
Note : If you wanted to add your new custom data such as
routing_key
,exchange name
and etc, please ensure you have read the instructions in ReadMe
Result
Explanations
-
Publisher
- Before process started, we need to first connect to the RabbitMQ.
- The next step is to make a connection with the channel and start creating our desired exchange by using
assertExchange()
method- In here we specify the following argument’s value:
- exchange name is
animals
in this example - exchange type is
topic
- we didn’t specify the options in this example but feel free to have a look at the available options at here
- exchange name is
- In here we specify the following argument’s value:
- And now, we can start publishing our message to the exchange by using
publish()
method with the following arguments:- we can use back the exchange name we specified earlier ‘animals’
routingKey
in this case played an important property as in the consumer binding queue process, the broker would need to refer to this value and bind to the corresponding queue.payload
, Publish message only accepts buffer payload. To achieve this, we can first stringify our payload if it’s an object and convert it into a Buffer by usingBuffer.from
options
, we didn’t specify the options in this example but feel free to have a look at the available options at here
-
Comsumer
- Before the process is started, we need to first connect to the RabbitMQ.
- The next step is to make a connection with the channel and start creating our desired exchange by using
assertExchange()
method- In here we specify the following argument’s value:
- exchange name is
animals
in this example - exchange type is
topic
- we didn’t specify the options in this example but feel free to have a look at the available options at here
- exchange name is
- In here we specify the following argument’s value:
- And now we can start to creating our queue and bind with the exchange we created earlier.
- As you can observe from the code, we are putting it as an empty string when creating the queue
channel.assertQueue("",options)
. This is because we do not need to bother with the queue name when binding the exchange and queue. By putting an empty string for the queue name, it will return us a unique and random queue name such asamq.gen-JzTY20BRgKO-HjmUJj0wLg
- There are some useful options we can take note of when creating the queue :
durable
: if true, the queue still stays alive even if we restart the broker.exclusive
: if true, queue will be deleted when the connection is closed.expires
: specify the time in a millisecond to delete the queue when there is no consumer connecting to the queue.- checkout other options here
routingKey
property is very important fortopic
exchange as it will refer to the message properties sent from the publisher and bind the exchange with its queue if theirroutingKey
condition is matched.
- As you can observe from the code, we are putting it as an empty string when creating the queue
- There are few important rule we need to adhere for
topic
exchange:- Topic exchange routing key must be a list of words, delimited by dots
*
(star) can substitute for exactly one word.#
(hash) can substitute for zero or more words.
- From example above, the
routingKey
used by the publisher is known asanimal.mammal.tiger
. We will be able to receive the message if the routing key for consumer isanimal.*.tiger
oranimal.#
, and not receiving any message if the routing key for consumer isanimal.insect.*
. This is because:animal.*.tiger
: The*
(star) could be replaced by any kind of string. Even if the publisher provideanimal.huge.tiger
and it will still works.animal.#
: This will works because as long as the routing key from publisher is containsanimal
as the first character in the words.animal.insect.*
: This will not work because the characterinsect
does not fulfilled the condition foranimal.mammal.tiger
unless theroutingKey
used by the publisher is updated toanimal.insect.tiger
.
- After the binding process is finished, we can now consume the message whenever there is an incoming message from the broker by using
.consume()
method - When we received the message, we can now acknowledge the message by using
ack()
method so that the broker will know we successfully retrieve the message.