Pub-Sub with RabbitMQ

The publish-subscribe messaging pattern is used when we want each consumer to get all the messages published by the producer. We have many clients acting as subscriber to a single publisher.

Till the previous post, we used a simplified version of the rabbit messaging schema. There, the producer was connected directly to the queue that was accessed by the consumers. Usually we add an extra layer, called "exchange", that helps managing easier more complex models.

There are four exchange types: direct, topic, headers, fanout. Here we are interested in the last one, the fanout, that is used to broadcast all the messages received by the exchange to each queue known by it.

The main change in the producer is that we declare and use an exchange instead of a queue:

private static final String EXCHANGE_NAME = "logs";

public void producer() {
    // ...
    try {
        // ...
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 1

        System.out.println("Sending a message");
        channel.basicPublish(EXCHANGE_NAME, "", null, "a message".getBytes()); // 2
        System.out.println("Sending an empty message");
        channel.basicPublish(EXCHANGE_NAME, "", null, null); // 3
        // ...
}
1. In this way we create a fanout exchange. After this, we can see that exchange on the rabbit server, using the command
rabbitmqctl list_exchanges
If we are not using an exchange, we should fall back to the default nameless ("") one. We have already seen it in the previous examples.
2. Publishing a message to an exchange is not much a difference from publishing to a queue. As you should remember, publishing to the default exchange is just a matter of specifying a queue instead:
channel.basicPublish("", "hello", null, message.getBytes());
3. I stick to the convention of considering an empty message as a terminator for the subscribers.

On the client side, it is interesting to see how a temporary queue is created, one for each consumer, and used:
public void consumer() {
    // ...

    try {
        // ...
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String myQueue = channel.queueDeclare().getQueue(); // 1
        channel.queueBind(myQueue, EXCHANGE_NAME, ""); // 2

        System.out.println("Consumer waiting for messages.");
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(myQueue, true, consumer);

        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            byte[] body = delivery.getBody();
            if(body.length == 0){
                System.out.println("Empty message received: terminating.");
                break;
            }

            String message = new String(body);
            System.out.println("Received: " + message);
        }

        // ...
1. We delegate to the channel the task of creating a non-durable, exclusive, autodelete queue, generating internally a random unique name, that is returned so that we can use it below.
2. This is how we bind a queue to an exchange. To see the currently active bindings, we can use this command:
rabbitmqctl list_bindings
The complete code for this example is available on github. The post is based on the third part of the RabbitMQ official tutorial.

No comments:

Post a Comment