RabbitMQ direct exchange

We have seen how to use a fanout exchange, now it is time to see a direct exchange at work. We are going to implement a router messaging pattern, where a producer emits messages specifying a routing key, and the consumers accepts all and only the messages that are associated to a specific routing key. In this sense we can say that in this scenario a consumer receives messages selectively.

The producer sends messages with a few routing keys symbolizing a different severity for the associated message. Moreover a special routing key is defined for control messages that could be used to internal management:
private static final String EXCHANGE_NAME = "logsDirect"; // 1
private enum Severity { Debug, Info, Warning, Error }; // 2
private static final String RK_CONTROL = "Control";

private void producer() {
    // ...
    
    try {
        // ...
        channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 3

        for(Severity s: Severity.values()) {
            channel.basicPublish(EXCHANGE_NAME, s.name(), null, "Hello".getBytes()); // 4
            System.out.println("Sending message with severity " + s.name());
        }
        channel.basicPublish(EXCHANGE_NAME, RK_CONTROL, null, null); // 5
        // ...
1. A new exchanges is used.
2. This application uses five streams of messages, four different severity log message levels, and a stream of control messages.
3. The exchange type is "direct". This means that a message goes to the queues whose binding key exactly matches the routing key of the message. Direct exchange becomes equivalent to fanout when all the queues uses the same routing key.
4. Publish to an exchange specifying the routing key.
5. Sending an empty message for the control routing key.

The consumer subscribes to the routing key passed by the user, besides, each consumer gets the control messages:
private void consumer(String[] subscriptions) { // 1
    // ...
    try {
        // ...
        channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 2
        String queueName = channel.queueDeclare().getQueue(); // 3

        channel.queueBind(queueName, EXCHANGE_NAME, RK_CONTROL); // 4
        for(String subscription: subscriptions) { // 5
            boolean matching = false;
            for(Severity sev: Severity.values()) {
                if(subscription.equalsIgnoreCase(sev.name())) {
                    channel.queueBind(queueName, EXCHANGE_NAME, sev.name()); // 6
                    System.out.println("Subscribing to " + sev.name() + " messages");
                    matching = true;
                    break;
                }
            }
            if(!matching) // 7
                System.out.println(subscription + " severity not available");
        }

        // ...
        
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String key = delivery.getEnvelope().getRoutingKey(); // 8
            byte[] body = delivery.getBody();
            if(key.compareTo(RK_CONTROL) == 0 && body.length == 0) { // 9
                System.out.println("Control terminator detected.");
                break;
            }
        // ...
1. Each passed string, if matching with the specified Severity enum, is used to subscribe to a specified routing key.
2. The exchange is declared of the same producer type.
3. A temporary queue is created, see previous post for details.
4. All consumers subscribe to the control message flow.
5. Check all the strings passed by the user, to set the custom subscriptions.
6. Bind the queue used by this consumer to this routing key.
7. The string passed by the user was not recognized, let's issue a warning.
8. Extract the routing key, as set by the producer, from the delivered message.
9. A control empty message is here conventionally considered as a terminator.

The full Java class code is available on github. This post is based on fourth installment of the official RabbitMQ tutorial.

1 comment:

  1. Hi Dude,

    RabbitMQ is open source solution providing robust messaging for applications. It is easy to use, fit for purpose at cloud scale and supported on all major operating systems and developer platforms. Thanks a lot!

    ReplyDelete