Puka - rethinking AMQP clients

July 8, 2011

I fundamentally disagree with the APIs exposed by our current AMQP client libraries.

There is a reason why they’re imperfect: we intentionally avoided innovation in APIs since the beginning. The purpose of our client libraries is to expose generic AMQP, not any one view of messaging. But, in my opinion, trying to map AMQP directly to client libraries APIs is just wrong and results in over-complication and abstractions hard to use.

There is no common ground: the client libraries blindly following AMQP model will be complex; easy to use client libraries must to be opinionated.

1. Channels

The main problem with client libraries following the protocol is caused by the nature of AMQP channels. Channels are often explained as an abstraction matching an operating system thread - you may have many of those, and each one is synchronous.

That’s all good, but an AMQP channel is not limited to being a thread - It’s so much more than that: error scope, transaction scope, ordering guarantee and scope for acks.

The programmer may decide to use many channels within a single thread, or the opposite: many threads may need to work on a single channel.

The example of the first situation: forwarding messages between two queues (one channel for publishing, one for consuming). Second situation: splitting work from one channel between multiple worker threads (in order to share basic.qos quota between workers).

Inevitably, an author of a client library must make a decision on the relationship between a channel and a thread. It may sound boring if you’re from .NET/Java background - these frameworks are opinionated about threading. But assuming anything about threading model in a third party library is a very bad practice in some languages, for example C and Python.

We can repeat almost the same discussion for the problem of handling multiple connections. For example a single thread may need to speak to two connections.

Every client library author must answer the following two questions:

Two questions - four possible choices:

Blocking on multiple channelsHandling multiple connections from a single thread
nonosimple blocking client (pyamqplib)
noyessemi-asynchronous client (pika 0.5.2)
yesnothreaded clients (rabbitmq-java, rabbitmq-dotnet)
yesyesfully asynchronous client (puka)

2. Error handling

The next problem is error handling. Using some of the client libraries it’s virtually impossible to catch a AMQP error and recover from it without having to restart the whole program. This is often caused by users not understanding the nature of channels as error scope. But the libraries don’t make dealing with errors easy: you get a channel error, now what? For example, doing basic.publish may kill your channel, in theory at any time.

3. Synchronous publish

The last broken thing is the lack of support for synchronous publish. It wasn’t practically possible to make sure a message got delivered to the broker before RabbitMQ extended AMQP to support ‘confirms’. The only solution was to use transactions, which slowed publishing radically. Now, with ‘confirms’ it’s possible but rather hard - as well as writing a callback a user needs to maintain a lock between a library thread and user thread, which requires understanding of the library threading model.

The birth

Out of this frustration a new experimental Python client was born: Puka.

Puka tries to provide simple APIs to the underlying AMQP protocol and reasonable error handling. The major features of Puka:

The anti-features of Puka:

Code snippets

As a teaser, here are a few code snippets.

Declare 1000 queues, one by one:

for i in range(1000):
    promise = client.queue_declare(queue='a%04i' % i)

Declare 1000 queues in parallel:

promises = [client.queue_declare(queue='a%04i' % i) for i in range(1000)]
for promise in promises:

Asynchronous publish:

client.basic_publish(exchange='', routing_key='test',
                     body="Hello world!")

Synchronous publish:

promise = client.basic_publish(exchange='', routing_key='test',
                              body="Hello world!")

AMQP errors don’t affect other parts of your program (publishes, consumes, etc). For example if a ‘test’ queue was already declared as ‘durable’, and you try to redeclare it without a proper flag you’ll get an error:

> promise = client.queue_declare(queue='test')
> client.wait(promise)
Traceback (most recent call last):
puka.spec_exceptions.PreconditionFailed: {'class_id': 50, 'method_id': 10,
    'reply_code': 406, 'reply_text': "PRECONDITION_FAILED - parameters for queue
    'test' in vhost '/' not equivalent"}

In Puka you may simply catch this exception and continue:

   promise = client.queue_declare(queue='test')
except puka.PreconditionFailed:
    # Oh, sorry. Forgot it was durable.
   promise = client.queue_declare(queue='test', durable=True)

You may take a look at Puka code for RabbitMQ tutorials and Puka examples and tests.


In summary, Puka provides a simpler APIs, flexible programming model, proper error handling and doesn’t make any decisions on threading. It makes using AMQP fun again.

Written by: Marek Majkowski