RabbitMQ Work Queue

The Hello Rabbit example is fine as a first look on the RabbitMQ messaging system. But now it is time to see something a bit more useful. Here we'll see an application where the producer generates a few messages and put them on a queue. The consumer uses that messages to run a task. The twist is that we can have many consumers, and each of them will get its quote of messages.

The first implementation is very close to the Hello RabbitMQ example. We'll have it running, check the results, and then try to change it to get a more interesting behavior.

The main change in the producer is that it sends now more messages to the queue. I didn't care much of the message content, and I opted for a simple growing sequence of X:
// ...
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

StringBuilder sb = new StringBuilder();
for(int i = 0; i < 10; ++i) {
    sb.append("X");

    String message = sb.toString();
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

    System.out.println(message);
}
// ...
In this scenario doesn't make much sense for the producer to send a terminator, given that it doesn't know how many consumers are listening on the queue.

The receiver now does some work on the received message. The producer does not send a terminator, so we loop indefinitely waiting for a next delivery. An interrupt is the expected way of terminating this receiver.
// ...
while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    doWork(delivery.getBody());
}
The job done by the receiver is emulated by a sleep of half a second for each byte in the message body:
// ...
private void doWork(byte[] task) throws InterruptedException {
    System.out.print(task.length + ": ");
    for(int i = 0; i < task.length; ++i) {
        Thread.sleep(500);
        System.out.print('.');
    }
    System.out.println();
}
This application is meant to be run following a defined protocol. Firstly we starts the clients, that would hang on the queue waiting for messages, than the server gets up, delivering messages in a round-robin style to the clients. The clients consume all the messages, and then we are ready for a new server run, o to kill the clients.

If a client starts after the server has run, it has no access to the messages in the queue. It will enter in the distribution of messages in the next producer run.

Even more disturbingly, if a client is killed in the middle of its job, all the messages assigned to it are lost. That is because we have set no message acknowledgment on the consumer. We can easily shift to this behavior changing the call to Channel.basicConsume() and adding a call to Channel.basicAck() when the current message could be safely considered consumed:

// ...
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, false, consumer);

while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    doWork(delivery.getBody());

    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

This post is based on the second episode of the Official Java RabbitMQ Tutorial. I changed a bit the code, accordingly to my tastes. The complete Java source code for this class is available on github.

No comments:

Post a Comment