The team was recently asked about whether and how quorum queues can offer the same message ordering guarantees as classic queues given that they will deliver messages from a local queue replica (leader or follower) when possible. Mirrored queues always deliver from the master (the leader), so delivering from any queue replica sounds like it could impact those guarantees.
That is the subject of this post. Be warned, this post is a technical deep dive for the curious and the distributed systems enthusiast. We’ll take a look at how quorum queues can deliver messages from any queue replica, leader or follower, without additional coordination (extra to Raft) but maintaining message ordering guarantees.
All queues, including quorum queues, provide ordering guarantees per channel for messages that are not redeliveries. The simplest way to look at it is if a queue has only one consumer, then that consumer will get messages delivered in FIFO order. Once you have two consumers on a single queue, then those guarantees change to a monotonic ordering of non-redelivered messages - that is that there may be gaps (because consumers now compete) but a consumer will never be delivered a later message before an earlier message (that is not a redelivery).
If you need bullet proof FIFO ordering guarantees of ALL messages (including redelivered messages) then you need to use the Single Active Consumer feature with a prefetch of 1. Any redelivered messages get added back to the queue before the next delivery takes place - maintaining FIFO order.
Quorum queues deliver the same ordering guarantees as classic queues. It just happens to also be able to deliver from any local replica, that is, local to the consumer channel. If you want to understand how quorum queues manage that, then read on! If not then stop here but be happy knowing that the usual ordering guarantees are still maintained.
RabbitMQ tries to make things simple by allowing any client to connect to any node in a cluster. If a consumer connects to broker 1 but the queue exists on broker 2, then the traffic will be proxied from broker 2 to broker 1 and back. The consumer has no clue that the queue is hosted on a different node.
This flexibility and ease of use comes at a cost though. On a three node cluster, the worst case scenario is that the publisher connects to broker 1, its messages are routed to a classic unreplicated queue on broker 2 and the consumer of that queue is connected to broker 3. To process these messages, all three brokers have been roped into it which is of course less efficient.
If that queue were replicated then the messages would have to be transmitted between the brokers one time for the proxying and then additionally for the replication. On top of that we covered how inefficient the mirrored queue algorithm is with multiple sends of each message.
It would be nice if consumers could get delivered the messages from where they are connected to, rather than from the leader that exists on a different broker - this would save on network utilisation and take some pressure off the queue leader.
With mirrored queues, all messages are delivered by the queue master (and potentially sent to the consumer via another broker). This is simple and requires no coordination between the master and its mirrors.
In quorum queues we could have added coordination between the leader and the followers to achieve local delivery. Communication between the leader and followers would coordinate who would deliver which message - because what we can’t have is a message being delivered twice or not at all. Unfortunate things can happen, consumers can fail, brokers can fail, network partitions etc and the coordination would need to handle all of that.
But coordination is bad for performance. The kind of coordination to make local delivery work could be extremely impactful on performance and also extremely complex. We needed another way and luckily everything we needed was already built into the protocol.
A common method of avoiding coordination in distributed systems is by using determinism. If every node in a cluster gets the same data, in the same order and makes decisions based only on that data then each node will make the same decision at that point in the log.
Deterministic decision making requires that each node is fed the same data in the same order. Quorum queues are built on Raft which is a replicated commit log - an ordered sequence of operations. So as long as all the information required to perform local delivery is written to this ordered log of operations, then each replica (leader or follower) will know who should deliver each message without needing to talk to each other about it.
It turns out that even for leader-only deliveries, we still need the coming and going of consumers to be added to the log. If a broker fails and another follower gets promoted to leader, it will need to know about the surviving consumer channels that exist across the cluster so it can deliver messages to them. This information also enables coordination free local delivery.
Quorum queues are built on a Raft implementation called Ra (also developed by the RabbitMQ team). Ra is a programmable state machine that replicates a log of operations. It differentiates between operations that all replicas should perform (commands), for consistency, and external operations that only the leader should perform (effects). These commands, states and effects are programmed by the developer. Quorum queues have their own commands, states and effects.
A good example of commands and effects are a key-value store. Adding, updating and deleting the data should be performed by all replicas. Each replica needs to have the same data, so when a leader fails, a follower can take over, with the same data. So data modifications are commands. But notifying a client application that a key changed should only happen once. If a client app asked to be notified when a key is updated, it doesn’t want to be notified by the leader and all the secondary replicas! So only the leader should execute the effects.
Ra has support for “local” effects. In the case of quorum queues, only the send_msg effect is local. The way it works is that all replicas know which consumer channels exist and on which nodes. When a consumer registers, that information is added to the log and likewise, when it fails or cancels that is also added to the log.
Each replica “applies” each committed (majority replicated) command in the log in order. Applying an enqueue command adds the message to the queue, applying a consumer down command removes that consumer from the Service Queue (more on that next) and returns all messages it has pending back to the queue for redelivery.
The consumers are added to a Service Queue (SQ) which is deterministically maintained - meaning that all replicas have the same SQ at any given point in the log. Each consumer will assess any given message not yet delivered, with exactly the same SQ as all the other replicas and will dequeue a consumer from the SQ. If that consumer is local (meaning that its channel process is hosted on the same broker as the replica) then the replica will send the message to that local channel. That channel will then send it to the consumer. If the consumer channel is not local, then the replica will not deliver it, but will track its state (who it was delivered to, whether it has been acknowledged etc). One caveat is that if there isn’t a replica that is local to the consumer channel, then the leader sends it to that channel (the proxying approach).
If you still find this interesting, but find it hard to conceptualise then I don’t blame you. What we need are diagrams and a sequence of events to demonstrate this.
I will group sets of events into each diagram, so as to keep the number of diagrams as low as possible.
Each diagram consists of three queue replicas, one leader and two followers. We see the state of the log, the service queue, the queue representation and the “apply” actions. Each operation has the format “command term:offset data”. So for example E 1:1 m1 is the enqueue command, which is added in the first term, has the first offset and is message m1. Terms and offsets are Raft algorithm terms and not super important in order to understand local delivery (but I recommend reading up on the Raft algorithm if you find this interesting).
The consumer does exist of course, but the replicas only learn of consumers when they apply the subscribe command in their logs. They do have those commands in their logs, but they have not yet applied them.
Notice that Follower A and Leader B are at the same point in their logs, and have the same Service Queues.
At this point the SQ of Leader B is different from the followers, but that is only because it is one command ahead in its log.
See that the service queues match each other - the followers are at the same offset and the leader is ahead by one, but acknowledgements don’t affect the service queues.
So we see that without additional coordination between the replicas, we achieve local delivery, while maintaining FIFO order, even across leadership fail-overs.
But what about if a consumer fails after having been delivered a message by a follower? Will that be detected and the message redelivered to another consumer channel on a different broker?
We’ll continue where we left off from Group 6 - m1 was already delivered to C1 but not acknowledged.
The quorum queue handled the consumer 1 failure without any problems, while still delivering from a local replica without additional coordination. The key is deterministic decision making which requires that each node uses only data in the log to inform it’s decisions and that there is no divergence of committed entries in their logs (which is all handled by Raft).
Quorum queues have the same ordering guarantees as any queue but are also able to deliver messages from a local replica. How they achieve this is interesting but not relevant to developers or administrators. What IS useful is understanding that this is another reason to choose quorum queues over mirrored queues. We previously described the very network inefficient algorithm behind mirrored queues, and now you’ve seen that with quorum queues we have heavily optimised network utilisation.
Consuming from a follower replica doesn’t just result in better network utilisation though, we also get better isolation between publisher and consumer load. Publishers can impact consumers and the other way around because they put contention on the same resource - a queue. By allowing consumers to consume from a different broker, we get better isolation. Just see the recent sizing case study that showed that quorum queues can sustain a high publish rate even in the face of huge queue backlogs and extra pressure from consumers. Mirrored queues were more susceptible.
So… consider quorum queues!
Written by: Jack Vanlightly
Categories: Technical Deep Dive