Persistent and fair RabbitMQ Work Queue

My first version of a Rabbit work queue has a few issues. Most noticeably, it is unfair and non persistent.

The messages are assigned to the consumers as they are put on it by the producer, using a round robin scheduling. This could lead to an unbalanced charge of work on a specific consumer, if it gets a share of messages implying a longer elaboration time.

We have seen how to let consumers to signal when they are done with specific messages, so that the queue can repost it to another consumer when required. But what if we have a server Rabbit crash?

Fair Work Queue

To see how unfairness could make the system less attractive, I have messed a bit up the generated message lengths:
String message = sb.toString();
if(i % 2 == 0)
    message += message;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
The solution to the unfairness of round robin in this context is telling the server to deliver one message after the other, without using prefetch:
channel.basicQos(1);
In this way we are saying that each consumer should be assigned just one message at a time, using a distribution based on a first come, first served basis. We have also a pleasant side effect, now clients could be started after the server. Each client grabs just one message at a time so, till there are messages pending in the queue, new consumers can enter in the game.

Persistent Work Queue

To emulate a server crash we can stop and restart the rabbit service:
rabbitmq-service stop
rabbitmq-service start
If you do that when a client is running on a rabbit queue, you should get a feedback like this:
4: ....
5: .....
Exception in thread "main" com.rabbitmq.client.AlreadyClosedException: clean con
nection shutdown; reason: Attempt to use closed channel
        at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)
...
When the server restarts, we have the sad surprise of having no more queue, and no more messages.

Durable queue

First part of the problem, we want the queue to be durable. In this way it won't disappear on a server crash:
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
Pay attention to the fact that all the user should access a queue declared in the same way, otherwise you should experience an exception like this:
java.io.IOException
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
...
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {
#method(reply-code=406, reply-text=PRECONDITION_FAILED - paramete
rs for queue 'task' in vhost '/' not equivalent, class-id=50, method-id=10), nul
l, "[B@140fee"}
        at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
...

Persistent message

Each message we want to survive a server crash, should be declare as persistent:
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
Combining durability, persistence, and the already seen acknowledgment of message consuming completed by a client, we get a good level of robustness for our message exchange. Still there are some gray areas where a message could get lost. When an higher level of security is required, transactions should be used.

This post, as the previous one, is based on the second official RabbitMQ tutorial lesson. The complete source Java code for the class shown here is on github.

No comments:

Post a Comment