AMQP 1.0 Client Libraries
This page documents the usage of AMQP 1.0 client libraries for RabbitMQ 4.0 or more.
The RabbitMQ team supports the following libraries:
Application developers will find here how to use the libraries for the most common use cases. For other information like licensing, downloading, dependency management, advanced and specific usage and configuration, please see the README page in the repository of the respective libraries.
Overview
The RabbitMQ team maintains a set of AMQP 1.0 client libraries designed and optimized for RabbitMQ. They offer a simple and safe, yet powerful API on top of AMQP 1.0. Applications can publish and consume messages with these libraries, as well as manage the server topology in a consistent way across programming languages. The libraries also offer advanced features like automatic connection and topology recovery, and connection affinity with queues.
RabbitMQ is compatible with any AMQP-1.0-compliant client library. It is not mandatory to use the RabbitMQ AMQP 1.0 client libraries with RabbitMQ, but applications are strongly encouraged to do so for the best experience.
Safety
RabbitMQ AMQP 1.0 client libraries are safe by default, they always create durable entities and always publish persistent messages.
Guarantees
RabbitMQ AMQP 1.0 client libraries provide at-least-once guarantees.
The broker always confirms the proper handling of published messages.
Publishers achieve this by using the unsettled
sender settle mode and the first
receiver settle mode when they get created.
Consumers must always signal the result of message processing to the broker.
Consumers use the same settings as publishers when they get created (first
receiver settle mode and unsettled
sender settle mode).
Client API
This section covers how to use the RabbitMQ AMQP 1.0 client libraries to connect to a cluster, and publish and consume messages.
Connecting
Libraries provide an entry point to a node or a cluster of nodes. Its name is the "environment". The environment allows creating connections. It can contain infrastucture-related configuration settings shared between connections (e.g. pools of threads for Java). Here is how to create the environment:
- Java
- C#
import com.rabbitmq.client.amqp.*;
import com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder;
// ...
// create the environment instance
Environment environment = new AmqpEnvironmentBuilder()
.build();
// ...
// close the environment when the application stops
environment.close();
using RabbitMQ.AMQP.Client;
using RabbitMQ.AMQP.Client.Impl;
// ...
// create the environment instance
IEnvironment environment = await AmqpEnvironment.CreateAsync(
ConnectionSettingBuilder.Create().Build());
// ...
// close the environment when the application stops
await environment.CloseAsync();
There is usually one environment instance for an application process. The application must close the environment to release its resources when it exits.
Applications open connections from the environment. They must specify appropriate settings to connect to the cluster nodes (URI, credentials).
- Java
- C#
// open a connection from the environment
Connection connection = environment.connectionBuilder()
.uri("amqp://admin:admin@localhost:5672/%2f")
.build();
// ...
// close the connection when it is no longer necessary
connection.close();
// open a connection from the environment setting
IConnection connection = await environment.CreateConnectionAsync();
//open a connection from the environment with different settings
ConnectionSettingBuilder otherSettingBuilder = ConnectionSettingBuilder.Create()
.ContainerId("my_containerId")
.Host("localhost");
IConnection connection = await environment.CreateConnectionAsync(otherSettingBuilder.Build());
// ...
// close the connection when it is no longer necessary
await connection.CloseAsync();
Libraries use the ANONYMOUS
SASL authentication mechanism by default.
Connections are expected to be long-lived objects, applications should avoid connection churn.
They must be closed when they are no longer needed.
Publishing
A publisher must be created to publish messages. The target a publisher will publish messages to is usually set on creation, but it also possible to set on a per-message basis.
Here is how to declare a publisher with the target set at creation time:
- Java
- C#
Publisher publisher = connection.publisherBuilder()
.exchange("foo").key("bar")
.build();
// ...
// close the publisher when it is no longer necessary
publisher.close();
// The publisher can use exchange (optionally with a key) or queue to publish messages.
IPublisher publisher = await connection.PublisherBuilder().Exchange("foo").Key("bar")
.BuildAsync();
// ...
// close the publisher when it is no longer necessary
await publisher.CloseAsync();
publisher.Dispose();
In the previous example, every message published with the publisher will go to the foo
exchange with the bar
routing key.
RabbitMQ uses the AMQ 0.9.1 model comprising exchanges, queues, and bindings.
Messages are created from the publisher instance. They follow the AMQP 1.0 message format. It is possible to define the body (as an array of bytes), standard properties, and application properties.
When a message is published, the broker indicates how it dealt with it in an asynchronous callback.
The client application take appropriate measures depending on the status ("outcome" in AMQP terms) the broker returned for the message (e.g. store the message in another place if the message has not been accepted
).
The following snippet shows how to create a message, publish it, and deal with the response from the broker:
- Java
- C#
// create the message
Message message = publisher
.message("hello".getBytes(StandardCharsets.UTF_8))
.messageId(1L);
// publish the message and deal with broker feedback
publisher.publish(message, context -> {
// asynchronous feedback from the broker
if (context.status() == Publisher.Status.ACCEPTED) {
// the broker accepted (confirmed) the message
} else {
// deal with possible failure
}
});
// create the message
var message = new AmqpMessage("Hello");
// publish the message and deal with broker feedback
// The result is synchronous, use a `List<Task<PublishResult>>` to increase the performances
PublishResult pr = await publisher.PublishAsync(message);
switch (pr.Outcome.State)
{
case OutcomeState.Accepted:
// the broker accepted (confirmed) the message
break;
case OutcomeState.Released:
// the broker could not route the message anywhere
break;
case OutcomeState.Rejected:
// at least one queue rejected the message
break;
}
The publisher example above send messages to a given exchange with a given routing key, but this is not the only supported target for a publisher. Here are the supported non-null targets for a publisher:
- Java
- C#
// publish to an exchange with a routing key
Publisher publisher1 = connection.publisherBuilder()
.exchange("foo").key("bar") // /exchanges/foo/bar
.build();
// publish to an exchange without a routing key
Publisher publisher2 = connection.publisherBuilder()
.exchange("foo") // /exchanges/foo
.build();
// publish to a queue
Publisher publisher3 = connection.publisherBuilder()
.queue("some-queue") // /queues/some-queue
.build();
// publish to an exchange with a routing key
Publisher publisher = await connection.PublisherBuilder()
.Exchange("foo")
.Key("bar")
.BuildAsync();
// publish to an exchange without a routing key
Publisher publisher = await connection.PublisherBuilder()
.Exchange("foo") // /exchanges/foo
.BuildAsync();
// publish to a queue
IPublisher publisher = await _connection.PublisherBuilder()
.Queue("some-queue")// /queues/some-queue
.BuildAsync();
Libraries translate the API calls into the address format v2.
It is also possible to define the target on a per-message basis.
The publisher must be defined without any target and each message define its target in the to
field of the properties section.
Libraries provide helpers in the message creation API to define the message target, which avoids dealing with the address format.
The following snippet shows how to create a publisher without a target and define messages with different target types:
- Java
- C#
// no target defined on publisher creation
Publisher publisher = connection.publisherBuilder()
.build();
// publish to an exchange with a routing key
Message message1 = publisher.message()
.toAddress().exchange("foo").key("bar")
.message();
// publish to an exchange without a routing key
Message message2 = publisher.message()
.toAddress().exchange("foo")
.message();
// publish to a queue
Message message3 = publisher.message()
.toAddress().queue("my-queue")
.message();
// no target defined on publisher creation
IPublisher publisher = await connection.PublisherBuilder()
.BuildAsync();
// publish to an exchange with a routing key
IMessage message = new AmqpMessage("Hello!").ToAddress()
.Exchange("foo")
.Key("bar")
.Build()
await aPublisher.PublishAsync(message);
// publish to a queue
IMessage message = new AmqpMessage("Hello!").ToAddress()
.Queue("foo")
.Build()
await aPublisher.PublishAsync(message);
Consuming
Consumer Creation
Creating a consumer consists in specifying the queue to consume from and the callback to process messages:
- Java
- C#
Consumer consumer = connection.consumerBuilder()
.queue("some-queue")
.messageHandler((context, message) -> {
byte[] body = message.body();
// ...
context.accept(); // settle the message
})
.build(); // do not forget to build the instance!
IConsumer consumer = await connection.ConsumerBuilder()
.Queue("some-queue")
.MessageHandler(async (context, message) =>
{
// deal with the message
await context.AcceptAsync();// settle the message
}
).BuildAndStartAsync();
Once the application is done processing a message, it must settle it. This indicates to the broker the result of the processing and what it should do with the message (e.g. deleting the message). Applications must settle messages or they will run out of credits and the broker will stop dispatching messages to them.
The next section covers the semantics of message settlement.
Message Processing Result (Outcome)
Libraries allows applications to settle messages in different ways. They use terms as explicit as possible in the context of messaging applications. Each term maps to a given outcome in the AMQP specification.
accept
: the application successfully processed the message and it can be deleted from the queue (accepted
outcome)discard
: the application cannot process the message because it is invalid, the broker can drop it or dead-letter it if it is configured (rejected
outcome)requeue
: the application did not process the message, the broker can requeue it and deliver it to the same or a different consumer (released
outcome)
discard
and requeue
have an optional message annotation argument to combine with the existing ones held in the message header section.
Such message annotations can be used to provide details on the discard
or requeue
reason.
Application-specific annotation keys must start with the x-opt-
prefix whereas annotation keys the broker understands start only with x-
.
Both discard
and requeue
use the modified
outcome with the message annotation parameter.
Only quorum queues support the modification of message annotations with the modified
outcome.
Consumer Graceful Shutdown
A consumer settles a message by accepting it, discarding it, or requeuing it.
Unsettled messages are requeued when a consumer get closed. This can lead to duplicate processing of messages.
Here is an example:
- A consumer executes a database operation for a given message.
- The consumer gets closed before it accepts (settles) the message.
- The message is requeued.
- Another consumer gets the message and executes the database operation again.
It is difficult to completely avoid duplicate messages, this is why processing should be idempotent. The consumer API provides a way to avoid duplicate messages when a consumer gets closed. It consists in pausing the delivery of messages, getting the number of unsettled messages to make sure it reaches 0 at some point, and then closing the consumer. This ensures the consumer has finally quiesced and all the received messages have been processed.
Here is an example of a consumer graceful shutdown:
- Java
- C#
// pause the delivery of messages
consumer.pause();
// ensure the number of unsettled messages reaches 0
long unsettledMessageCount = consumer.unsettledMessageCount();
// close the consumer
consumer.close();
// pause the delivery of messages
consumer.pause();
// ensure the number of unsettled messages reaches 0
long unsettledMessageCount = consumer.UnsettledMessageCount();
// close the consumer
consumer.close();
An application can still close a consumer without pausing it, at the risk of processing the same messages several times.
Support for Streams
Libraries have out-of-the-box support for streams in consumer configuration.
It is possible to set where to attach to when consuming from a stream:
- Java
- C#
Consumer consumer = connection.consumerBuilder()
.queue("some-stream")
.stream()
.offset(ConsumerBuilder.StreamOffsetSpecification.FIRST)
.builder()
.messageHandler((context, message) -> {
// message processing
})
.build();
IConsumer consumer = await connection.ConsumerBuilder()
.Queue("some-stream")
.Stream()
.Offset(StreamOffsetSpecification.First)
.Builder()
.MessageHandler( async (context, message) => {
// message processing
})
.BuildAndStartAsync();
There is also support for stream filtering configuration:
- Java
- C#
Consumer consumer = connection.consumerBuilder()
.queue("some-stream")
.stream()
.filterValues("invoices", "orders")
.filterMatchUnfiltered(true)
.builder()
.messageHandler((context, message) -> {
// message processing
})
.build();
IConsumer consumer = await connection.ConsumerBuilder()
.Queue("some-stream")
.Stream()
.FilterValues(["invoices", "order"])
.FilterMatchUnfiltered(true)
.Builder()
.MessageHandler(async (context, message) => {
// message processing
}
).BuildAndStartAsync();
Consider also using the native stream protocol with the stream client library for your preferred programming language when working with streams.
Topology Management
Applications can manage the RabbitMQ's AMQ 0.9.1 model: declaring and deleting exchanges, queues, and bindings.
To do so, they need to get the management API from a connection:
- Java
- C#
Management management = connection.management();
// ...
// close the management instance when it is no longer needed
management.close();
IManagement management = connection.Management();
// ...
// close the management instance when it is no longer needed
await management.CloseAsync()
The management API should be closed as soon as it is no longer needed. An application usually creates the topology it needs when it starts, so the management object can be closed after this step.
Exchanges
Here is how to create an exchange of a built-in type:
- Java
- C#
management.exchange()
.name("my-exchange")
.type(Management.ExchangeType.FANOUT) // enum for built-in type
.declare();
IExchangeSpecification exchangeSpec = management
.Exchange(exchangeName)
.Type(ExchangeType.TOPIC);
await exchangeSpec.DeclareAsync();
It is also possible to specify the exchange type as a string (for non-built-in type exchanges):
- Java
- C#
management.exchange()
.name("my-exchange")
.type("x-delayed-message") // non-built-in type
.autoDelete(false)
.argument("x-delayed-type", "direct")
.declare();
await _management.Exchange("myExchange")
.Type("x-delayed-message")
.Argument("x-delayed-type", "direct")
.DeclareAsync();
Here is how to delete an exchange:
- Java
- C#
management.exchangeDeletion().delete("my-exchange");
await management.Exchange("my-exchange").DeleteAsync();
Queues
Here is how to create a queue with the default queue type:
- Java
- C#
management.queue()
.name("my-queue")
.exclusive(true)
.autoDelete(false)
.declare();
IQueueSpecification queueSpec = management
.Queue("myqueue")
.Exclusive(true)
.AutoDelete(false)
await queueSpec.DeclareAsync();
The management API supports queue arguments explicitly:
- Java
- C#
management.queue()
.name("my-queue")
.type(Management.QueueType.CLASSIC)
.messageTtl(Duration.ofMinutes(10))
.maxLengthBytes(ByteCapacity.MB(100))
.declare();
IQueueSpecification queueSpec = management
.Queue("my-queue")
.Type(QueueType.CLASSIC)
.MessageTtl(TimeSpan.FromMinutes(10))
.MaxLengthBytes(ByteCapacity.Mb(100));
await queueSpec.DeclareAsync();
The management API makes also the distinction between arguments shared by all queue types and arguments valid only for a given type. Here is an example with the creation of a quorum queue:
- Java
- C#
management
.queue()
.name("my-quorum-queue")
.quorum() // set queue type to 'quorum'
.quorumInitialGroupSize(3) // specific to quorum queues
.deliveryLimit(3) // specific to quorum queues
.queue()
.declare();
IQueueSpecification queueSpec = management
.Queue("my-quorum-queue")
.Quorum() // set queue type to 'quorum'
.QuorumInitialGroupSize(3) // specific to quorum queues
.DeliveryLimit(3) // specific to quorum queues
.Queue();
await queueSpec.DeclareAsync();
It is possible to query information about a queue:
- Java
- C#
Management.QueueInfo info = management.queueInfo("my-queue");
long messageCount = info.messageCount();
int consumerCount = info.consumerCount();
String leaderNode = info.leader();
IQueueInfo queueInfo = await management.GetQueueInfoAsync("my-queue");
ulong messageCount = queueInfo.MessageCount();
uint consumerCount = queueInfo.ConsumerCount();
string leader = queueInfo.Leader();
This API can also be used to check whether a queue exists or not.
And here is how to delete a queue:
- Java
- C#
management.queueDeletion().delete("my-queue");
await management.Queue("myqueue").DeleteAsync();
Bindings
The management API supports binding a queue to an exchange:
- Java
- C#
management.binding()
.sourceExchange("my-exchange")
.destinationQueue("my-queue")
.key("foo")
.bind();
IBindingSpecification bindingSpec = management.Binding()
.SourceExchange("my-exchange")
.DestinationQueue("my-queue")
.Key("foo");
await bindingSpec.BindAsync();
There is also support for exchange-to-exchange binding:
- Java
- C#
management.binding()
.sourceExchange("my-exchange")
.destinationExchange("my-other-exchange")
.key("foo")
.bind();
IBindingSpecification bindingSpec = management.Binding()
.SourceExchange("my-exchange")
.DestinationExchange("my-other-exchange")
.Key("foo");
await bindingSpec.BindAsync();
It is also possible to unbind entities:
- Java
- C#
management.unbind()
.sourceExchange("my-exchange")
.destinationQueue("my-queue")
.key("foo")
.unbind();
IBindingSpecification bindingSpec = management.Binding()
.SourceExchange("my-exchange")
.DestinationQueue("my-queue")
.Key("foo");
await bindingSpec.UnbindAsync();
Advanced Usage
Lifecycle Listeners
Applications can react to state changes of some API components by adding listeners. An application can add a listener to a connection to stop publishing messages when the connection is recovering after a connection. The application can then resume publishing when the connection has recovered and is open again.
Here is how to set a listener on a connection:
- Java
- C#
Connection connection = environment.connectionBuilder()
.listeners(context -> { // set one or several listeners
context.previousState(); // the previous state
context.currentState(); // the current (new) state
context.failureCause(); // the cause of the failure (in case of failure)
context.resource(); // the connection
}).build();
connection.ChangeState += (
sender, // the sender instance ( in this case the connection)
fromState, // the previous state
toState, // the current (new) state
e // the cause of the failure (in case of failure)
) =>
{
};
It is also possible to set listeners on publisher instances:
- Java
- C#
Publisher publisher = connection.publisherBuilder()
.listeners(context -> {
// ...
})
.exchange("foo").key("bar")
.build();
publisher.ChangeState += (sender, fromState, toState, e) =>
{
}
And on consumer instances as well:
- Java
- C#
Consumer consumer = connection.consumerBuilder()
.listeners(context -> {
// ...
})
.queue("my-queue")
.build();
consumer.ChangeState += (sender, fromState, toState, e) =>
{
}
Automatic Connection Recovery
Automatic connection recovery is activated by default: client libraries will automatically restore a connection after an unexpected closing (e.g. network glitch, node restart, etc). Automatic topology recovery is also activated as soon as connection recovery is: client libraries will recreate AMQP entities, as well as publishers and consumers for the recovering connection. Developers have less to worry about network stability and node restart, as client libraries will take care of it.
The client tries to reconnect every 5 seconds until it succeeds. It is possible to change this behavior by customizing the back-off delay policy:
- Java
- C#
Connection connection = environment.connectionBuilder()
.recovery()
.backOffDelayPolicy(BackOffDelayPolicy.fixed(Duration.ofSeconds(2)))
.connectionBuilder().build();
class MyBackOffDelay : IBackOffDelayPolicy {
...
}
await AmqpConnection.CreateAsync(
ConnectionSettingBuilder.Create().RecoveryConfiguration(
RecoveryConfiguration.Create()
.BackOffDelayPolicy(new MyBackOffDelay())).Build());
It is also possible to deactivate topology recovery if it is not appropriate for a given application. The application would usually register a connection lifecycle listener to know when the connection is recovered and recover its own state accordingly.
- Java
- C#
Connection connection = environment.connectionBuilder()
.recovery()
.topology(false) // deactivate topology recovery
.connectionBuilder()
.listeners(context -> {
// set listener that restores application state when connection is recovered
})
.build();
await AmqpConnection.CreateAsync(
ConnectionSettingBuilder.Create().RecoveryConfiguration(
RecoveryConfiguration.Create()
.Topology(false)) // deactivate topology recovery
.Build());
It is also possible to deactivate recovery altogether:
- Java
- C#
Connection connection = environment.connectionBuilder()
.recovery()
.activated(false) // deactivate recovery
.connectionBuilder().build();
await AmqpConnection.CreateAsync(
ConnectionSettingBuilder.Create().RecoveryConfiguration(
RecoveryConfiguration.Create().
Activated(false)).// deactivate recovery
Build());