Skip to main content

AMQP 1.0 Client Libraries

This page documents the usage of AMQP 1.0 client libraries for RabbitMQ 4.0 or more.

The RabbitMQ team supports the following libraries:

Application developers will find here how to use the libraries for the most common use cases. For other information like licensing, downloading, dependency management, advanced and specific usage and configuration, please see the README page in the repository of the respective libraries.

Overview

The RabbitMQ team maintains a set of AMQP 1.0 client libraries designed and optimized for RabbitMQ. They offer a simple and safe, yet powerful API on top of AMQP 1.0. Applications can publish and consume messages with these libraries, as well as manage the server topology in a consistent way across programming languages. The libraries also offer advanced features like automatic connection and topology recovery, and connection affinity with queues.

note

RabbitMQ is compatible with any AMQP-1.0-compliant client library. It is not mandatory to use the RabbitMQ AMQP 1.0 client libraries with RabbitMQ, but applications are strongly encouraged to do so for the best experience.

Safety

RabbitMQ AMQP 1.0 client libraries are safe by default, they always create durable entities and always publish persistent messages.

Guarantees

RabbitMQ AMQP 1.0 client libraries provide at-least-once guarantees.

The broker always confirms the proper handling of published messages. Publishers achieve this by using the unsettled sender settle mode and the first receiver settle mode when they get created.

Consumers must always signal the result of message processing to the broker. Consumers use the same settings as publishers when they get created (first receiver settle mode and unsettled sender settle mode).

Client API

This section covers how to use the RabbitMQ AMQP 1.0 client libraries to connect to a cluster, and publish and consume messages.

Connecting

Libraries provide an entry point to a node or a cluster of nodes. Its name is the "environment". The environment allows creating connections. It can contain infrastucture-related configuration settings shared between connections (e.g. pools of threads for Java). Here is how to create the environment:

Creating the environment
import com.rabbitmq.client.amqp.*;
import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder;

// ...

// create the environment instance
Environment environment = new AmqpEnvironmentBuilder()
.build();
// ...
// close the environment when the application stops
environment.close();

There is usually one environment instance for an application process. The application must close the environment to release its resources when it exits.

Applications open connections from the environment. They must specify appropriate settings to connect to the cluster nodes (URI, credentials).

Opening a connection
// open a connection from the environment
Connection connection = environment.connectionBuilder()
.uri("amqp://admin:admin@localhost:5672/%2f")
.build();
// ...
// close the connection when it is no longer necessary
connection.close();

Libraries use the ANONYMOUS SASL authentication mechanism by default. Connections are expected to be long-lived objects, applications should avoid connection churn. They must be closed when they are no longer needed.

Publishing

A publisher must be created to publish messages. The target a publisher will publish messages to is usually set on creation, but it also possible to set on a per-message basis.

Here is how to declare a publisher with the target set at creation time:

Creating a publisher
Publisher publisher = connection.publisherBuilder()
.exchange("foo").key("bar")
.build();
// ...
// close the publisher when it is no longer necessary
publisher.close();

In the previous example, every message published with the publisher will go to the foo exchange with the bar routing key.

info

RabbitMQ uses the AMQ 0.9.1 model comprising exchanges, queues, and bindings.

Messages are created from the publisher instance. They follow the AMQP 1.0 message format. It is possible to define the body (as an array of bytes), standard properties, and application properties.

When a message is published, the broker indicates how it dealt with it in an asynchronous callback. The client application take appropriate measures depending on the status ("outcome" in AMQP terms) the broker returned for the message (e.g. store the message in another place if the message has not been accepted).

The following snippet shows how to create a message, publish it, and deal with the response from the broker:

Publishing a message
// create the message
Message message = publisher
.message("hello".getBytes(StandardCharsets.UTF_8))
.messageId(1L);

// publish the message and deal with broker feedback
publisher.publish(message, context -> {
// asynchronous feedback from the broker
if (context.status() == Publisher.Status.ACCEPTED) {
// the broker accepted (confirmed) the message
} else {
// deal with possible failure
}
});

The publisher example above send messages to a given exchange with a given routing key, but this is not the only supported target for a publisher. Here are the supported non-null targets for a publisher:

Creating publishers with different targets
// publish to an exchange with a routing key
Publisher publisher1 = connection.publisherBuilder()
.exchange("foo").key("bar") // /exchanges/foo/bar
.build();

// publish to an exchange without a routing key
Publisher publisher2 = connection.publisherBuilder()
.exchange("foo") // /exchanges/foo
.build();

// publish to a queue
Publisher publisher3 = connection.publisherBuilder()
.queue("some-queue") // /queues/some-queue
.build();
info

Libraries translate the API calls into the address format v2.

It is also possible to define the target on a per-message basis. The publisher must be defined without any target and each message define its target in the to field of the properties section. Libraries provide helpers in the message creation API to define the message target, which avoids dealing with the address format.

The following snippet shows how to create a publisher without a target and define messages with different target types:

Setting the target in messages
// no target defined on publisher creation
Publisher publisher = connection.publisherBuilder()
.build();

// publish to an exchange with a routing key
Message message1 = publisher.message()
.toAddress().exchange("foo").key("bar")
.message();

// publish to an exchange without a routing key
Message message2 = publisher.message()
.toAddress().exchange("foo")
.message();

// publish to a queue
Message message3 = publisher.message()
.toAddress().queue("my-queue")
.message();

Consuming

Consumer Creation

Creating a consumer consists in specifying the queue to consume from and the callback to process messages:

Creating a consumer
Consumer consumer = connection.consumerBuilder()
.queue("some-queue")
.messageHandler((context, message) -> {
byte[] body = message.body();
// ...
context.accept(); // settle the message
})
.build(); // do not forget to build the instance!

Once the application is done processing a message, it must settle it. This indicates to the broker the result of the processing and what it should do with the message (e.g. deleting the message). Applications must settle messages or they will run out of credits and the broker will stop dispatching messages to them.

The next section covers the semantics of message settlement.

Message Processing Result (Outcome)

Libraries allows applications to settle messages in different ways. They use terms as explicit as possible in the context of messaging applications. Each term maps to a given outcome in the AMQP specification.

  • accept: the application successfully processed the message and it can be deleted from the queue (accepted outcome)
  • discard: the application cannot process the message because it is invalid, the broker can drop it or dead-letter it if it is configured (rejected outcome)
  • requeue: the application did not process the message, the broker can requeue it and deliver it to the same or a different consumer (released outcome)

discard and requeue have an optional message annotation argument to combine with the existing ones held in the message header section. Such message annotations can be used to provide details on the discard or requeue reason. Application-specific annotation keys must start with the x-opt- prefix whereas annotation keys the broker understands start only with x-. Both discard and requeue use the modified outcome with the message annotation parameter.

Only quorum queues support the modification of message annotations with the modified outcome.

Consumer Graceful Shutdown

A consumer settles a message by accepting it, discarding it, or requeuing it.

Unsettled messages are requeued when a consumer get closed. This can lead to duplicate processing of messages.

Here is an example:

  • A consumer executes a database operation for a given message.
  • The consumer gets closed before it accepts (settles) the message.
  • The message is requeued.
  • Another consumer gets the message and executes the database operation again.

It is difficult to completely avoid duplicate messages, this is why processing should be idempotent. The consumer API provides a way to avoid duplicate messages when a consumer gets closed. It consists in pausing the delivery of messages, getting the number of unsettled messages to make sure it reaches 0 at some point, and then closing the consumer. This ensures the consumer has finally quiesced and all the received messages have been processed.

Here is an example of a consumer graceful shutdown:

Closing a consumer gracefully
// pause the delivery of messages
consumer.pause();
// ensure the number of unsettled messages reaches 0
long unsettledMessageCount = consumer.unsettledMessageCount();
// close the consumer
consumer.close();

An application can still close a consumer without pausing it, at the risk of processing the same messages several times.

Support for Streams

Libraries have out-of-the-box support for streams in consumer configuration.

It is possible to set where to attach to when consuming from a stream:

Attaching to the beginning of a stream
Consumer consumer = connection.consumerBuilder()
.queue("some-stream")
.stream()
.offset(ConsumerBuilder.StreamOffsetSpecification.FIRST)
.builder()
.messageHandler((context, message) -> {
// message processing
})
.build();

There is also support for stream filtering configuration:

Configuring stream filtering
Consumer consumer = connection.consumerBuilder()
.queue("some-stream")
.stream()
.filterValues("invoices", "orders")
.filterMatchUnfiltered(true)
.builder()
.messageHandler((context, message) -> {
// message processing
})
.build();

Consider also using the native stream protocol with the stream client library for your preferred programming language when working with streams.

Topology Management

Applications can manage the RabbitMQ's AMQ 0.9.1 model: declaring and deleting exchanges, queues, and bindings.

To do so, they need to get the management API from a connection:

Getting the management object from the environment
Management management = connection.management();
// ...
// close the management instance when it is no longer needed
management.close();

The management API should be closed as soon as it is no longer needed. An application usually creates the topology it needs when it starts, so the management object can be closed after this step.

Exchanges

Here is how to create an exchange of a built-in type:

Creating an exchange of a built-in type
management.exchange()
.name("my-exchange")
.type(Management.ExchangeType.FANOUT) // enum for built-in type
.declare();

It is also possible to specify the exchange type as a string (for non-built-in type exchanges):

Creating an exchange of a non-built-in type
management.exchange()
.name("my-exchange")
.type("x-delayed-message") // non-built-in type
.autoDelete(false)
.argument("x-delayed-type", "direct")
.declare();

Here is how to delete an exchange:

Deleting an exchange
management.exchangeDeletion().delete("my-exchange");

Queues

Here is how to create a queue with the default queue type:

Creating a classic queue
management.queue()
.name("my-queue")
.exclusive(true)
.autoDelete(false)
.declare();

The management API supports queue arguments explicitly:

Creating a queue with arguments
management.queue()
.name("my-queue")
.type(Management.QueueType.CLASSIC)
.messageTtl(Duration.ofMinutes(10))
.maxLengthBytes(ByteCapacity.MB(100))
.declare();

The management API makes also the distinction between arguments shared by all queue types and arguments valid only for a given type. Here is an example with the creation of a quorum queue:

Creating a quorum queue
management
.queue()
.name("my-quorum-queue")
.quorum() // set queue type to 'quorum'
.quorumInitialGroupSize(3) // specific to quorum queues
.deliveryLimit(3) // specific to quorum queues
.queue()
.declare();

It is possible to query information about a queue:

Getting queue information
Management.QueueInfo info = management.queueInfo("my-queue");
long messageCount = info.messageCount();
int consumerCount = info.consumerCount();
String leaderNode = info.leader();

This API can also be used to check whether a queue exists or not.

And here is how to delete a queue:

Deleting a queue
management.queueDeletion().delete("my-queue");

Bindings

The management API supports binding a queue to an exchange:

Binding a queue to an exchange
management.binding()
.sourceExchange("my-exchange")
.destinationQueue("my-queue")
.key("foo")
.bind();

There is also support for exchange-to-exchange binding:

Binding an exchange to another exchange
management.binding()
.sourceExchange("my-exchange")
.destinationExchange("my-other-exchange")
.key("foo")
.bind();

It is also possible to unbind entities:

Deleting the binding between an exchange and a queue
management.unbind()
.sourceExchange("my-exchange")
.destinationQueue("my-queue")
.key("foo")
.unbind();

Advanced Usage

Lifecycle Listeners

Applications can react to state changes of some API components by adding listeners. An application can add a listener to a connection to stop publishing messages when the connection is recovering after a connection. The application can then resume publishing when the connection has recovered and is open again.

Here is how to set a listener on a connection:

Setting a listener on a connection
Connection connection = environment.connectionBuilder()
.listeners(context -> { // set one or several listeners
context.previousState(); // the previous state
context.currentState(); // the current (new) state
context.failureCause(); // the cause of the failure (in case of failure)
context.resource(); // the connection
}).build();

It is also possible to set listeners on publisher instances:

Setting a listener on a publisher
Publisher publisher = connection.publisherBuilder()
.listeners(context -> {
// ...
})
.exchange("foo").key("bar")
.build();

And on consumer instances as well:

Setting a listener on a consumer
Consumer consumer = connection.consumerBuilder()
.listeners(context -> {
// ...
})
.queue("my-queue")
.build();

Automatic Connection Recovery

Automatic connection recovery is activated by default: client libraries will automatically restore a connection after an unexpected closing (e.g. network glitch, node restart, etc). Automatic topology recovery is also activated as soon as connection recovery is: client libraries will recreate AMQP entities, as well as publishers and consumers for the recovering connection. Developers have less to worry about network stability and node restart, as client libraries will take care of it.

The client tries to reconnect every 5 seconds until it succeeds. It is possible to change this behavior by customizing the back-off delay policy:

Setting a back-off policy for connection recovery
Connection connection = environment.connectionBuilder()
.recovery()
.backOffDelayPolicy(BackOffDelayPolicy.fixed(Duration.ofSeconds(2)))
.connectionBuilder().build();

It is also possible to deactivate topology recovery if it is not appropriate for a given application. The application would usually register a connection lifecycle listener to know when the connection is recovered and recover its own state accordingly.

Deactivating topology recovery
Connection connection = environment.connectionBuilder()
.recovery()
.topology(false) // deactivate topology recovery
.connectionBuilder()
.listeners(context -> {
// set listener that restores application state when connection is recovered
})
.build();

It is also possible to deactivate recovery altogether:

Deactivating recovery
Connection connection = environment.connectionBuilder()
.recovery()
.activated(false) // deactivate recovery
.connectionBuilder().build();