Filtering RabbitMQ messages by patterns

The RabbitMQ direct exchange type is surely an improvement over the fanout exchange, but when we need a higher degree of flexibility, there is a more powerful exchange type, topic, that could come in our help.

The topic routing key should follow a specific pattern, a list of words separated by dots. The consumer could use a couple of wildcards: the star (*), that is considered a synonim for any single word, and the hash (#) for any number of dot-separated words.

So "one.*" would match "one.two", but not "one.two.three", that would be instead a match for "one.#".

The producer here generates a few messages for routing keys that follows the rule quality.color.animal, and then sends an empty message with the control routing key to terminate the clients execution:
private static final String EXCHANGE_NAME = "logDirect";

private enum Quality { Quick, Lazy, Quiet }
private enum Color { Blue, Red, Yellow }
private enum Animal { Elephant, Rabbit, Fox }
private static final String RK_CONTROL = "Control";

private void producer() {
    // ...
    try {
        // ...
        channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 1

        for(Quality q: Quality.values())
            for(Color c: Color.values())
                for(Animal a: Animal.values()) {
                    String key = q.name() + '.' + c.name() + '.' + a.name(); // 2
                    channel.basicPublish(EXCHANGE_NAME, key, null, "Hello".getBytes());
                    System.out.println("Sent message using routing key " + key);
                }
        channel.basicPublish(EXCHANGE_NAME, RK_CONTROL, null, null);
        // ...
1. The exchange is declared as a topic.
2. For example, Quick.Red.Fox is one of the keys generated in this triple for loop.

The consumer accepts in input a topic, that will be used in the binding between the client and the temporary queue created locally:
private void consumer(String topic) {
    // ...
    try {
        // ...
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();

        channel.queueBind(queueName, EXCHANGE_NAME, RK_CONTROL);
        channel.queueBind(queueName, EXCHANGE_NAME, topic);
        // ...

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            byte[] body = delivery.getBody();
            String key = delivery.getEnvelope().getRoutingKey();
            if(key.compareTo(RK_CONTROL) == 0 && body.length == 0) {
                System.out.println("Control terminator detected.");
                break;
            }
        // ...
You could try to run this producer-consumer couple passing each time a different input string to the consumer, checking for the result. The client should be up before the server starts, otherwise all its messages will be lost.

Try launching it with a parameter like Quiet.#, and you should see as output all nine messages generated by the server that have a key starting with Quiet; an input pattern like *.Red.* will result in the nine messages having Red as second word in the key, including the Quick.Red.Fox; a wrong input like Lazy.* won't give back any message, since it would match only with two-word keys having Lazy at the beginning, but currently our server generates only three word keys.

The full Java class source code is available on github. This post is based on fifth installment of the official RabbitMQ tutorial.

No comments:

Post a Comment