The messages are assigned to the consumers as they are put on it by the producer, using a round robin scheduling. This could lead to an unbalanced charge of work on a specific consumer, if it gets a share of messages implying a longer elaboration time.
We have seen how to let consumers to signal when they are done with specific messages, so that the queue can repost it to another consumer when required. But what if we have a server Rabbit crash?
Fair Work Queue
To see how unfairness could make the system less attractive, I have messed a bit up the generated message lengths:
String message = sb.toString(); if(i % 2 == 0) message += message; channel.basicPublish("", QUEUE_NAME, null, message.getBytes());The solution to the unfairness of round robin in this context is telling the server to deliver one message after the other, without using prefetch:
channel.basicQos(1);In this way we are saying that each consumer should be assigned just one message at a time, using a distribution based on a first come, first served basis. We have also a pleasant side effect, now clients could be started after the server. Each client grabs just one message at a time so, till there are messages pending in the queue, new consumers can enter in the game.
Persistent Work Queue
To emulate a server crash we can stop and restart the rabbit service:
rabbitmq-service stop rabbitmq-service startIf you do that when a client is running on a rabbit queue, you should get a feedback like this:
4: .... 5: ..... Exception in thread "main" com.rabbitmq.client.AlreadyClosedException: clean con nection shutdown; reason: Attempt to use closed channel at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190) ...When the server restarts, we have the sad surprise of having no more queue, and no more messages.
Durable queue
First part of the problem, we want the queue to be durable. In this way it won't disappear on a server crash:
channel.queueDeclare(QUEUE_NAME, true, false, false, null);Pay attention to the fact that all the user should access a queue declared in the same way, otherwise you should experience an exception like this:
java.io.IOException at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) ... Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: { #method(reply-code=406, reply-text=PRECONDITION_FAILED - paramete rs for queue 'task' in vhost '/' not equivalent, class-id=50, method-id=10), nul l, "[B@140fee"} at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) ...
Persistent message
Each message we want to survive a server crash, should be declare as persistent:
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());Combining durability, persistence, and the already seen acknowledgment of message consuming completed by a client, we get a good level of robustness for our message exchange. Still there are some gray areas where a message could get lost. When an higher level of security is required, transactions should be used.
This post, as the previous one, is based on the second official RabbitMQ tutorial lesson. The complete source Java code for the class shown here is on github.
No comments:
Post a Comment