Filtering RabbitMQ messages by patterns

The RabbitMQ direct exchange type is surely an improvement over the fanout exchange, but when we need a higher degree of flexibility, there is a more powerful exchange type, topic, that could come in our help.

The topic routing key should follow a specific pattern, a list of words separated by dots. The consumer could use a couple of wildcards: the star (*), that is considered a synonim for any single word, and the hash (#) for any number of dot-separated words.

So "one.*" would match "one.two", but not "one.two.three", that would be instead a match for "one.#".

The producer here generates a few messages for routing keys that follows the rule quality.color.animal, and then sends an empty message with the control routing key to terminate the clients execution:
private static final String EXCHANGE_NAME = "logDirect";

private enum Quality { Quick, Lazy, Quiet }
private enum Color { Blue, Red, Yellow }
private enum Animal { Elephant, Rabbit, Fox }
private static final String RK_CONTROL = "Control";

private void producer() {
    // ...
    try {
        // ...
        channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 1

        for(Quality q: Quality.values())
            for(Color c: Color.values())
                for(Animal a: Animal.values()) {
                    String key = q.name() + '.' + c.name() + '.' + a.name(); // 2
                    channel.basicPublish(EXCHANGE_NAME, key, null, "Hello".getBytes());
                    System.out.println("Sent message using routing key " + key);
                }
        channel.basicPublish(EXCHANGE_NAME, RK_CONTROL, null, null);
        // ...
1. The exchange is declared as a topic.
2. For example, Quick.Red.Fox is one of the keys generated in this triple for loop.

The consumer accepts in input a topic, that will be used in the binding between the client and the temporary queue created locally:
private void consumer(String topic) {
    // ...
    try {
        // ...
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();

        channel.queueBind(queueName, EXCHANGE_NAME, RK_CONTROL);
        channel.queueBind(queueName, EXCHANGE_NAME, topic);
        // ...

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            byte[] body = delivery.getBody();
            String key = delivery.getEnvelope().getRoutingKey();
            if(key.compareTo(RK_CONTROL) == 0 && body.length == 0) {
                System.out.println("Control terminator detected.");
                break;
            }
        // ...
You could try to run this producer-consumer couple passing each time a different input string to the consumer, checking for the result. The client should be up before the server starts, otherwise all its messages will be lost.

Try launching it with a parameter like Quiet.#, and you should see as output all nine messages generated by the server that have a key starting with Quiet; an input pattern like *.Red.* will result in the nine messages having Red as second word in the key, including the Quick.Red.Fox; a wrong input like Lazy.* won't give back any message, since it would match only with two-word keys having Lazy at the beginning, but currently our server generates only three word keys.

The full Java class source code is available on github. This post is based on fifth installment of the official RabbitMQ tutorial.

Go to the full post

RabbitMQ direct exchange

We have seen how to use a fanout exchange, now it is time to see a direct exchange at work. We are going to implement a router messaging pattern, where a producer emits messages specifying a routing key, and the consumers accepts all and only the messages that are associated to a specific routing key. In this sense we can say that in this scenario a consumer receives messages selectively.

The producer sends messages with a few routing keys symbolizing a different severity for the associated message. Moreover a special routing key is defined for control messages that could be used to internal management:
private static final String EXCHANGE_NAME = "logsDirect"; // 1
private enum Severity { Debug, Info, Warning, Error }; // 2
private static final String RK_CONTROL = "Control";

private void producer() {
    // ...
    
    try {
        // ...
        channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 3

        for(Severity s: Severity.values()) {
            channel.basicPublish(EXCHANGE_NAME, s.name(), null, "Hello".getBytes()); // 4
            System.out.println("Sending message with severity " + s.name());
        }
        channel.basicPublish(EXCHANGE_NAME, RK_CONTROL, null, null); // 5
        // ...
1. A new exchanges is used.
2. This application uses five streams of messages, four different severity log message levels, and a stream of control messages.
3. The exchange type is "direct". This means that a message goes to the queues whose binding key exactly matches the routing key of the message. Direct exchange becomes equivalent to fanout when all the queues uses the same routing key.
4. Publish to an exchange specifying the routing key.
5. Sending an empty message for the control routing key.

The consumer subscribes to the routing key passed by the user, besides, each consumer gets the control messages:
private void consumer(String[] subscriptions) { // 1
    // ...
    try {
        // ...
        channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 2
        String queueName = channel.queueDeclare().getQueue(); // 3

        channel.queueBind(queueName, EXCHANGE_NAME, RK_CONTROL); // 4
        for(String subscription: subscriptions) { // 5
            boolean matching = false;
            for(Severity sev: Severity.values()) {
                if(subscription.equalsIgnoreCase(sev.name())) {
                    channel.queueBind(queueName, EXCHANGE_NAME, sev.name()); // 6
                    System.out.println("Subscribing to " + sev.name() + " messages");
                    matching = true;
                    break;
                }
            }
            if(!matching) // 7
                System.out.println(subscription + " severity not available");
        }

        // ...
        
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String key = delivery.getEnvelope().getRoutingKey(); // 8
            byte[] body = delivery.getBody();
            if(key.compareTo(RK_CONTROL) == 0 && body.length == 0) { // 9
                System.out.println("Control terminator detected.");
                break;
            }
        // ...
1. Each passed string, if matching with the specified Severity enum, is used to subscribe to a specified routing key.
2. The exchange is declared of the same producer type.
3. A temporary queue is created, see previous post for details.
4. All consumers subscribe to the control message flow.
5. Check all the strings passed by the user, to set the custom subscriptions.
6. Bind the queue used by this consumer to this routing key.
7. The string passed by the user was not recognized, let's issue a warning.
8. Extract the routing key, as set by the producer, from the delivered message.
9. A control empty message is here conventionally considered as a terminator.

The full Java class code is available on github. This post is based on fourth installment of the official RabbitMQ tutorial.

Go to the full post

Pub-Sub with RabbitMQ

The publish-subscribe messaging pattern is used when we want each consumer to get all the messages published by the producer. We have many clients acting as subscriber to a single publisher.

Till the previous post, we used a simplified version of the rabbit messaging schema. There, the producer was connected directly to the queue that was accessed by the consumers. Usually we add an extra layer, called "exchange", that helps managing easier more complex models.

There are four exchange types: direct, topic, headers, fanout. Here we are interested in the last one, the fanout, that is used to broadcast all the messages received by the exchange to each queue known by it.

The main change in the producer is that we declare and use an exchange instead of a queue:

private static final String EXCHANGE_NAME = "logs";

public void producer() {
    // ...
    try {
        // ...
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 1

        System.out.println("Sending a message");
        channel.basicPublish(EXCHANGE_NAME, "", null, "a message".getBytes()); // 2
        System.out.println("Sending an empty message");
        channel.basicPublish(EXCHANGE_NAME, "", null, null); // 3
        // ...
}
1. In this way we create a fanout exchange. After this, we can see that exchange on the rabbit server, using the command
rabbitmqctl list_exchanges
If we are not using an exchange, we should fall back to the default nameless ("") one. We have already seen it in the previous examples.
2. Publishing a message to an exchange is not much a difference from publishing to a queue. As you should remember, publishing to the default exchange is just a matter of specifying a queue instead:
channel.basicPublish("", "hello", null, message.getBytes());
3. I stick to the convention of considering an empty message as a terminator for the subscribers.

On the client side, it is interesting to see how a temporary queue is created, one for each consumer, and used:
public void consumer() {
    // ...

    try {
        // ...
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String myQueue = channel.queueDeclare().getQueue(); // 1
        channel.queueBind(myQueue, EXCHANGE_NAME, ""); // 2

        System.out.println("Consumer waiting for messages.");
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(myQueue, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            byte[] body = delivery.getBody();
            if(body.length == 0){
                System.out.println("Empty message received: terminating.");
                break;
            }

            String message = new String(body);
            System.out.println("Received: " + message);
        }

        // ...
1. We delegate to the channel the task of creating a non-durable, exclusive, autodelete queue, generating internally a random unique name, that is returned so that we can use it below.
2. This is how we bind a queue to an exchange. To see the currently active bindings, we can use this command:
rabbitmqctl list_bindings
The complete code for this example is available on github. The post is based on the third part of the RabbitMQ official tutorial.

Go to the full post

Persistent and fair RabbitMQ Work Queue

My first version of a Rabbit work queue has a few issues. Most noticeably, it is unfair and non persistent.

The messages are assigned to the consumers as they are put on it by the producer, using a round robin scheduling. This could lead to an unbalanced charge of work on a specific consumer, if it gets a share of messages implying a longer elaboration time.

We have seen how to let consumers to signal when they are done with specific messages, so that the queue can repost it to another consumer when required. But what if we have a server Rabbit crash?

Fair Work Queue

To see how unfairness could make the system less attractive, I have messed a bit up the generated message lengths:
String message = sb.toString();
if(i % 2 == 0)
    message += message;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
The solution to the unfairness of round robin in this context is telling the server to deliver one message after the other, without using prefetch:
channel.basicQos(1);
In this way we are saying that each consumer should be assigned just one message at a time, using a distribution based on a first come, first served basis. We have also a pleasant side effect, now clients could be started after the server. Each client grabs just one message at a time so, till there are messages pending in the queue, new consumers can enter in the game.

Persistent Work Queue

To emulate a server crash we can stop and restart the rabbit service:
rabbitmq-service stop
rabbitmq-service start
If you do that when a client is running on a rabbit queue, you should get a feedback like this:
4: ....
5: .....
Exception in thread "main" com.rabbitmq.client.AlreadyClosedException: clean con
nection shutdown; reason: Attempt to use closed channel
        at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)
...
When the server restarts, we have the sad surprise of having no more queue, and no more messages.

Durable queue

First part of the problem, we want the queue to be durable. In this way it won't disappear on a server crash:
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
Pay attention to the fact that all the user should access a queue declared in the same way, otherwise you should experience an exception like this:
java.io.IOException
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
...
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; reason: {
#method(reply-code=406, reply-text=PRECONDITION_FAILED - paramete
rs for queue 'task' in vhost '/' not equivalent, class-id=50, method-id=10), nul
l, "[B@140fee"}
        at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
...

Persistent message

Each message we want to survive a server crash, should be declare as persistent:
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
Combining durability, persistence, and the already seen acknowledgment of message consuming completed by a client, we get a good level of robustness for our message exchange. Still there are some gray areas where a message could get lost. When an higher level of security is required, transactions should be used.

This post, as the previous one, is based on the second official RabbitMQ tutorial lesson. The complete source Java code for the class shown here is on github.

Go to the full post

RabbitMQ Work Queue

The Hello Rabbit example is fine as a first look on the RabbitMQ messaging system. But now it is time to see something a bit more useful. Here we'll see an application where the producer generates a few messages and put them on a queue. The consumer uses that messages to run a task. The twist is that we can have many consumers, and each of them will get its quote of messages.

The first implementation is very close to the Hello RabbitMQ example. We'll have it running, check the results, and then try to change it to get a more interesting behavior.

The main change in the producer is that it sends now more messages to the queue. I didn't care much of the message content, and I opted for a simple growing sequence of X:
// ...
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

StringBuilder sb = new StringBuilder();
for(int i = 0; i < 10; ++i) {
    sb.append("X");

    String message = sb.toString();
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

    System.out.println(message);
}
// ...
In this scenario doesn't make much sense for the producer to send a terminator, given that it doesn't know how many consumers are listening on the queue.

The receiver now does some work on the received message. The producer does not send a terminator, so we loop indefinitely waiting for a next delivery. An interrupt is the expected way of terminating this receiver.
// ...
while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    doWork(delivery.getBody());
}
The job done by the receiver is emulated by a sleep of half a second for each byte in the message body:
// ...
private void doWork(byte[] task) throws InterruptedException {
    System.out.print(task.length + ": ");
    for(int i = 0; i < task.length; ++i) {
        Thread.sleep(500);
        System.out.print('.');
    }
    System.out.println();
}
This application is meant to be run following a defined protocol. Firstly we starts the clients, that would hang on the queue waiting for messages, than the server gets up, delivering messages in a round-robin style to the clients. The clients consume all the messages, and then we are ready for a new server run, o to kill the clients.

If a client starts after the server has run, it has no access to the messages in the queue. It will enter in the distribution of messages in the next producer run.

Even more disturbingly, if a client is killed in the middle of its job, all the messages assigned to it are lost. That is because we have set no message acknowledgment on the consumer. We can easily shift to this behavior changing the call to Channel.basicConsume() and adding a call to Channel.basicAck() when the current message could be safely considered consumed:

// ...
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, false, consumer);

while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    doWork(delivery.getBody());

    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

This post is based on the second episode of the Official Java RabbitMQ Tutorial. I changed a bit the code, accordingly to my tastes. The complete Java source code for this class is available on github.

Go to the full post

Hello RabbitMQ

Current version of RabbitMQ is 2.7.1, you can freely download it from the RabbitMQ site. I have installed it on Windows, following the clear instructions given by the RabbitMQ people, and I easily set up a working environment.

The RabbitMQ server is written in Erlang, and that means we need to install it before. So we should go to the Erlang download page, get the latest version, in my case R15B.

After this prelude I had no more issues, and I installed the RabbitMQ server fast.

Next step is installing the Java RabbitMQ client side support. Very easy, not much to say on this point.

I am ready to create my "Hello" application.

The official RabbitMQ tutorial, episode one Java flavor, is a very good place where to start.

You could just use the provided code as is. Being lazy, I have merged the two-class producer-consumer "hello world" example in a single class application, and made some minor changes. I named the class SendRec, and I put it in the package rmq.

The class has a constant, the queue name, and a main function:
private final static String QUEUE_NAME = "simple";

// ...

public static void main(String[] argv) {
    SendRec sr = new SendRec();

    System.out.println("Sending");
    if(!sr.send())
        return;

    System.out.println("Receiving");
    sr.receive();
}
The idea is pretty straightforward, firstly we send messages to the queue and, if this works fine, we receive them back.

Let's see how to send messages:
public boolean send() {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost"); // 1

    try {
        Connection connection = factory.newConnection(); // 2

        Channel channel = connection.createChannel(); // 3
        channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 4
        String message = "A message"; // 5
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); // 6
        System.out.println(message);

        channel.basicPublish("", QUEUE_NAME, null, null); // 7
        System.out.println("Empty message");

        channel.close(); // 8
        connection.close();
    } catch (IOException e) {
        System.out.println("Error sending messages: " + e.getMessage());
        return false;
    }
    return true;
}
1. Usually we pass more information to a connection factory, but here we are working on a local vanilla setup, so we can rely on the default RabbitMQ setting.
2. Given a factory we can create a connection.
3. And given a connection we can create a communication channel.
4. The simplest communication requires the name of the queue we want to use, a bunch of false and a final null. We'll see more on this in the next future.
5. This is the message that we want to deliver.
6. And this is the simplest way to publish a message. Notice the queue name in second position and the data sent as last parameter. We send just a bare byte array.
7. In this way we send an empty message.
8. We won't use anymore this channel in the current session, so we close it. Actually, we could save some typing here, because when closing the connection the open channels within are automatically closed. But it is better to be more explicit, especially in an example code.

And here is how to receive the messages:
public boolean receive() {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");

    try {
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 1

        QueueingConsumer consumer = new QueueingConsumer(channel); // 2
        channel.basicConsume(QUEUE_NAME, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery(); // 3
            byte[] body = delivery.getBody(); // 4
            if(body.length == 0) { // 5
                System.out.println("Empty message received");
                break;
            }

            String message = new String(body); // 6
            System.out.println(message);
        }

        channel.close(); // 7
        connection.close();
    } catch (IOException|InterruptedException e ) { // 8
        System.out.println("Error getting messages: " + e.getMessage());
        return false;
    }
    return true;
}
}
1. Just the same as before, factory, connection, channel.
2. But here we need another object, a consumer from a queue, associated to the used channel.
3. Get the next message pending on the queue.
4. In the delivery body we find the original message.
5. Here an empty message is considered a terminator.
6. The message is not empty, let's use it.
7. As above, before exiting we close channel and connection.
8. Java 7 feature, more exception types caught in one shot.

To run the application, I wrote a one-liner script like this:
java -cp .;C:\dev\rmq\commons-io-1.2.jar;C:\dev\rmq\commons-cli-1.1.jar;
C:\dev\rmq\rabbitmq-client.jar rmq.SendRec
In your case the path to the RabbitMQ JAR should be different, as the fully qualified class name. And remember that it is running on Windows, so the separators in the classpath are semicolon.

Even for this simple example, could be worthy to know how to use a couple of RabbitMQ server scripts. To check the status of the server we can use:
rabbitmqctl status
To see which queues are currently available on this instance of RabbitMQ server:
rabbitmqctl list_queues
To start and stop the RabbitMQ service:
rabbitmq-service stop|start
The complete Java code for the class showed above is on github.

Go to the full post

Copying a multidimensional array

The handy Arrays.copyOf() function helps us to perform elegantly an array copy, when cloning is not a viable option.

The nuisance is that it is limited to monodimensional array. What if we have to copy a two (or more) dimensional array? We can still use Array.copyOf() for the actual data copy, but we have integrate it with some "by hand" coding, to allocate memory for n-1 dimensions.

I guess that showing a twelve-dimensional example would be funnier, but let see instead a mere, an more common, bidimensional array copy.
int[][] source = {
   {1,2,3,4},
   {3,4,5,6},
   {5,6,7,8},
};

int[][] destination = new int[source.length][]; // 1
for(int i = 0; i < destination.length; ++i)
   destination[i] = Arrays.copyOf(source[i], source[i].length); // 2
1. A Java bidimensional array is nothing more than an array of arrays. What we are doing here is the first step in the copy, allocating enough memory in destination to store the arrays that are going to be created in the subsequent for loop. 2. Last step, create a copy of each one-dimensional array and store it in the just allocated room in destination.

Go to the full post

Limited String.split()

The Java split() String method is fun and useful, but I never thought to it as interesting. And I was wrong. For instance, till the other day I never paid much attention to its two parameter overload that limits the number of token generated.

Basic splitting

Usually we use the one argument overload of String.split(), that converts the string in an array of Strings, using the passed parameter as delimiter.

Let's have a look at this code, where s is a String:
System.out.print("Split [");
for(String c : s.split("/")) {
    System.out.print('_' + c + '_' );
}
System.out.println("]");
If in s we put "alpha/beta/gamma/delta/tango/foxtrot//" we expect
Split [_alpha__beta__gamma__delta__tango__foxtrot_]
The slashes delimited 6 non-empty tokens, so we have in output an array of six Strings.

Passing "/alpha/beta/gamma/delta/tango/foxtrot" to the same code we get
Split [___alpha__beta__gamma__delta__tango__foxtrot_]
Seven tokens! The first slash is considered a delimiter between an empty element and "alpha".

Limited splitting

A bit of refactoring on the previous code, that now uses the other split() overload for an integer, i, and with the adding of a counter in the output to make it clearer:
String[] c = s.split("/", i);
for(int j = 0; j < c.length; ++j) {
    System.out.print(j + ":_" + c[j] + '_');
}
System.out.println();
Negative limit

If i is -1 (or whichever negative number), passing "alpha/beta/gamma/delta/tango/foxtrot//" we get
0:_alpha_1:_beta_2:_gamma_3:_delta_4:_tango_5:_foxtrot_6:__7:__
Even the two empty elements at the end are generated!

If the input is set to "/alpha/beta/gamma/delta/tango/foxtrot" the result is not so interesting, no changes compared to the basic version usage.

Zero limit

In this case the extended version of split() is a synonym of the basic one. We get exactly the same results.

Positive limit

Here we are asking to split to generate, at maximum, a well defined number of tokens.

If the input string s is "alpha/beta/gamma/delta/tango/foxtrot//" we get these results, varying i from 1 to 4:
0:_alpha/beta/gamma/delta/tango/foxtrot//_
0:_alpha_1:_beta/gamma/delta/tango/foxtrot//_
0:_alpha_1:_beta_2:_gamma/delta/tango/foxtrot//_
0:_alpha_1:_beta_2:_gamma_3:_delta/tango/foxtrot//_
Same behavior for "/alpha/beta/gamma/delta/tango/foxtrot":
0:_/alpha/beta/gamma/delta/tango/foxtrot_
0:__1:_alpha/beta/gamma/delta/tango/foxtrot_
0:__1:_alpha_2:_beta/gamma/delta/tango/foxtrot_
0:__1:_alpha_2:_beta_3:_gamma/delta/tango/foxtrot_

Go to the full post