For a while people have looked for ways of implementing delayed messaging with RabbitMQ. So far the accepted solution was to use a mix of message TTL and Dead Letter Exchanges as proposed by James Carr here. Since a while we have thought to offer an out-of-the-box solution for this, and these past month we had the time to implement it as a plugin. Enter RabbitMQ Delayed Message Plugin.
The RabbitMQ Delayed Message Plugin adds a new exchange type to RabbitMQ where messages routed by that exchange can be delayed if the users choses to do so. Let’s see how it works.
To install the plugin go to our Community Plugins page and download the corresponding .ez files for your RabbitMQ installation. Copy the plugin into RabbitMQ’s plugin folder and then enable it by running the following command:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Once the plugin has been enabled, we are ready to start using it.
To use the Delayed Message Exchange you just need to declare an
exchange providing the
"x-delayed-message" exchange type as follows:
// ... elided code ... Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args); // ... more code ...
Later on we will explain the meaning of the special argument
"x-delayed-type" that we provided in our exchange declaration.
To delay a message a user must publish the message with the special
x-delay which takes an integer representing the number
of milliseconds the message should be delayed by RabbitMQ. It’s worth
noting that here delay means: delay message routing to queues or to
The exchange has no concept of consumers. So once the delay expired,
the plugin will attempt to route the message to the queues matching
the routing rules of the exchange and the once assigned to the
message. Be aware that if the message can’t be routed to any queue,
then it will be discarded, as is specified by AMQP with unroutable
Here’s some sample code that adds the
x-delay header to a message
and publishes to our exchange.
// ... elided code ... byte messageBodyBytes = "delayed payload".getBytes(); AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder(); headers = new HashMap<String, Object>(); headers.put("x-delay", 5000); props.headers(headers); channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);
In the previous example, the message will be delayed for five seconds before it gets routed by the plugin. That example assumes you have established a connection to RabbitMQ and obtained a channel.
When we declared the exchange above, we provided an
argument set to
direct. What that does is to tell the exchange what
kind of behaviour we want it to have when routing messages, creating
bindings, and so on. In the example, our exchange will behave like the
direct exchange, but we could pass there topic, fanout, or a custom
exchange type provided by some other plugin. By doing this we don’t
limit the user on what kind of routing behaviour the delayed message
Once we receive a message on the consumer side, how can we tell if the
message was delayed or not? The plugin will keep the
header, but will negate the passed value. So if you published a
message with a
5000 milliseconds delay, the consumer receiving said
message will find the
x-delay header set to
Written by: Alvaro Videla
Categories: New Features