Streams and Super Streams (Partitioned Streams)
What is a Stream
RabbitMQ Streams is a persistent replicated data structure that can complete the same tasks as queues: they buffer messages from producers that are read by consumers. However, streams differ from queues in two important ways: how messages are stored and consumed.
Streams model an append-only log of messages that can be repeatedly read until they expire. Streams are always persistent and replicated. A more technical description of this stream behavior is “non-destructive consumer semantics”.
To read messages from a stream in RabbitMQ, one or more consumers subscribe to it and read the same messages as many times as they want.
Data in a stream can be used via a RabbitMQ client library or through a dedicated binary protocol plugin and associated client(s). The latter option is highly recommended as it provides access to all stream-specific features and offers best possible throughput (performance).
In addition to streams, RabbitMQ supports partitioned streams called super streams. They are covered in more details further in this guide.
Now, you might be asking the following questions:
- Do streams replace queues then?
- Should I move away from using queues?
To answer these questions, streams were not introduced to replace queues but to complement them. Streams open up many opportunities for new RabbitMQ use cases which are described in Use Cases for Using Streams.
The following information details streams usage, and the administration and maintenance operations for streams.
You should also review the stream plugin information to learn more about the usage of streams with the binary RabbitMQ Stream protocol and the stream core and stream plugin comparison page for the feature matrix.
Use Cases for Using Streams
Streams were developed to initially cover 4 messaging use-cases that existing queue types either can not provide or provide with downsides:
-
Large fan-outs
When wanting to deliver the same message to multiple subscribers users currently have to bind a dedicated queue for each consumer. If the number of consumers is large this becomes potentially inefficient, especially when wanting persistence and/or replication. Streams will allow any number of consumers to consume the same messages from the same queue in a non-destructive manner, negating the need to bind multiple queues. Stream consumers will also be able to read from replicas allowing read load to be spread across the cluster.
-
Replay (Time-travelling)
As all current RabbitMQ queue types have destructive consume behaviour, i.e. messages are deleted from the queue when a consumer is finished with them, it is not possible to re-read messages that have been consumed. Streams will allow consumers to attach at any point in the log and read from there.
-
Throughput Performance
No persistent queue types are able to deliver throughput that can compete with any of the existing log based messaging systems. Streams have been designed with performance as a major goal.
-
Large backlogs
Most RabbitMQ queues are designed to converge towards the empty state and are optimised as such and can perform worse when there are millions of messages on a given queue. Streams are designed to store larger amounts of data in an efficient manner with minimal in-memory overhead.
How to Use RabbitMQ Streams
An AMQP 0.9.1 client library that can specify optional queue and consumer arguments will be able to use streams as regular AMQP 0.9.1 queues.
Just like queues, streams have to be declared first.
Declaring a RabbitMQ Stream
To declare a stream, set the x-queue-type
queue argument to stream
(the default is classic
). This argument must be provided by a client
at declaration time; it cannot be set or changed using a policy.
This is because policy definition or applicable policy can be changed dynamically but
queue type cannot. It must be specified at the time of declaration.
The following snippet shows how to create a stream with the AMQP 0.9.1 Java client:
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(
"my-stream",
true, // durable
false, false, // not exclusive, not auto-delete
Collections.singletonMap("x-queue-type", "stream")
);
Declaring a queue with an x-queue-type
argument set to stream
will create a stream
with a replica on each configured RabbitMQ node. Streams are quorum systems
so uneven cluster sizes is strongly recommended.
A stream remains an AMQP 0.9.1 queue, so it can be bound to any exchange after its creation, just as any other RabbitMQ queue.
If declaring using management UI, the stream
type must be specified using
the queue type drop down menu.
Streams support additional queue arguments that also can be configured using a policy
x-max-length-bytes
Sets the maximum size of the stream in bytes. See retention. Default: not set.
x-max-age
Sets the maximum age of the stream. See retention. Default: not set.
x-stream-max-segment-size-bytes
A stream is divided up into fixed size segment files on disk. This setting controls the size in bytes of these. Default: 500000000 bytes.
x-stream-filter-size-bytes
The size in bytes of the Bloom filter used for filtering. The value must be between 16 and 255. Default: 16 bytes.
While the x-stream-max-segment-size-bytes
and x-stream-filter-size-bytes
arguments can be configured via a policy, they will only be applied to the stream if the policy is set (exists) at stream declaration time.
If these arguments are changed for a matching but pre-existing stream they will not be changed even if the effective policy of the queue record may indicate it is.
Hence it is best to only configure these via queue arguments.
The following example in Java demonstrates how the argument can be set at stream declaration time in application code:
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-queue-type", "stream");
// maximum stream size: 20 GB
arguments.put("x-max-length-bytes", 20_000_000_000);
// size of segment files: 100 MB
arguments.put("x-stream-max-segment-size-bytes", 100_000_000);
// size of stream bloom filter: 32
arguments.put("x-stream-filter-size-bytes", 32);
channel.queueDeclare(
"my-stream",
true, // durable
false, false, // not exclusive, not auto-delete
arguments
);