Skip to main content

RabbitMQ tutorial - Work Queues

Work Queues

(using the AMQP 1.0 .NET client)

info

Prerequisites

This tutorial assumes RabbitMQ is installed and running on localhost on the standard port (5672). In case you use a different host, port or credentials, connections settings would require adjusting.

Where to get help

If you're having trouble going through this tutorial you can contact us through GitHub Discussions or RabbitMQ community Discord.

In the first tutorial we wrote programs to send and receive messages from a named queue. In this one we'll create a Work Queue that will be used to distribute time-consuming tasks among multiple workers.

The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to a queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.

This concept is especially useful in web applications where it's impossible to handle a complex task during a short HTTP request window.

This tutorial uses the RabbitMQ AMQP 1.0 .NET client (RabbitMQ.AMQP.Client). It requires RabbitMQ 4.0 or later.

Preparation

In the previous part of this tutorial we sent a message containing "Hello World!". Now we'll be sending strings that stand for complex tasks. We fake work by sleeping: each . in the string adds one second.

The producer is NewTask/Program.cs; the consumer is Worker/Program.cs.

NewTask publishes to a quorum queue task_queue and checks OutcomeState.Accepted:

IQueueSpecification queueSpec = management.Queue(taskQueueName).Type(QueueType.QUORUM);
await queueSpec.DeclareAsync();

IPublisher publisher = await connection.PublisherBuilder().Queue(taskQueueName).BuildAsync();
// ...
PublishResult pr = await publisher.PublishAsync(amqpMessage);
if (pr.Outcome.State != OutcomeState.Accepted)
{
Console.Error.WriteLine($"Unexpected publish outcome: {pr.Outcome.State}");
Environment.Exit(1);
}

Worker uses InitialCredits(1) for fair dispatch and calls ctx.Accept() after DoWork in a finally block:

IConsumer consumer = await connection.ConsumerBuilder()
.Queue(taskQueueName)
.InitialCredits(1)
.MessageHandler((ctx, message) =>
{
string body = Encoding.UTF8.GetString(message.Body()!);
Console.WriteLine($" [x] Received '{body}'");
try
{
DoWork(body);
}
finally
{
Console.WriteLine(" [x] Done");
ctx.Accept();
}

return Task.CompletedTask;
})
.BuildAndStartAsync();

Round-robin dispatching

Run two workers and publish tasks from a third terminal (from dotnet-amqp):

dotnet run --project Worker/Worker.csproj
dotnet run --project Worker/Worker.csproj
dotnet run --project NewTask/NewTask.csproj "First message."
dotnet run --project NewTask/NewTask.csproj "Second message.."

By default, RabbitMQ sends each message to the next consumer in sequence (round-robin).

Message acknowledgment

With AMQP 1.0, the consumer must settle each message (Accept, etc.). Settle after work completes so a crash mid-task allows redelivery.

Fair dispatch

Use InitialCredits(1) on the consumer builder so only one un-settled message is in flight per consumer (similar to prefetch 1 / basicQos in AMQP 0-9-1).

Putting it all together

See NewTask/Program.cs and Worker/Program.cs for the full sources (once merged upstream).

Now we can move on to tutorial 3 and learn how to deliver the same message to many consumers.