JMS Request-Reply

Implementing the Request-Reply messaging pattern with ActiveMQ is not complicated. The client component sends a request to the server on the dedicated queue, it attaches to it, as properties, the destination queue where it expects to find the reply, and a correlation id to uniquely identify the request-reply couple. The server uses these information to generate a consistent reply.

The complete Java source code is on github, here I comment just what I think are the most interesting part of the code.

This it the client, after establishing a connection to the broker:
Destination destination = session.createQueue(REQUEST_QUEUE_NAME); // 1
MessageProducer producer = session.createProducer(destination);
Destination dRep = session.createTemporaryQueue(); // 2
TextMessage message = session.createTextMessage(arg);
message.setJMSCorrelationID(UUID.randomUUID().toString()); // 3
message.setJMSReplyTo(dRep); // 4
producer.send(message);
// ...
MessageConsumer consumer = session.createConsumer(dRep); // 5
Message reply = consumer.receive(); // 6
// ...
1. The queue where the requests are sent.
2. A second queue, for the replies. We could have used a "normal" queue, but it is more common to create a cheaper temporary queue for this task.
3. We could keep track by hand of the unique message ids generated by the application, or we could delegate to Java the generation of an unique id, as I did here.
4. The queue to be used as destination for the reply is stored in the JMSReplyTo message property.
5. A consumer is created on the reply queue.
6. And the client patiently waits for an answer from the server.

The interesting part of the server is where it replies to the client:
if(message instanceof TextMessage) {
    TextMessage answer = session.createTextMessage(); // 1
    answer.setText("Reply to " + ((TextMessage) message).getText());
    answer.setJMSCorrelationID(message.getJMSCorrelationID()); // 2
    MessageProducer producer = session.createProducer(null); // 3
    producer.send(message.getJMSReplyTo(), answer);
}
1. The message to be sent as answer.
2. The correlation id is sent back to the client.
3. A producer with no associated queue is created, we are going to explicitly set the destination in its send() method using the JMSReplyTo message property.

Go to the full post

Embedding a broker by BrokerService

Instead of having the ActiveMQ broker as a standalone process, we could decide to have it embedded in one of our Java processes. The other processes would access it in the same way as before, but from other threads within, we could send and receive messages to the broker using the faster internal vm protocol.

A possible advantage of this solution is that the broker could be programmatically configured before it starts, and we could have a logic to determine when to stop it:
BrokerService broker = new BrokerService();
broker.setBrokerName("myBroker");
broker.setDataDirectory("data/"); // 1
// ...
broker.addConnector("tcp://localhost:61616"); // 2
broker.start(); // 3
// ...
broker.stop(); // 4
// ...
1. For a default setup, in the data directory it is created a folder for the broker (using its name, in this case we set it to "myBroker") containing a folder where the KahaDB files for message persistence are stored.
2. Let's our broker to be mimic of a standard broker.
3. When the broker setup is completed, we start it.
4. Shutting down the broker.

The complete Java source code for this example is on github.

Go to the full post

Selecting messages by custom properties

A JMS message comes with a number of standard properties, providing information on the associated message. For example, each message comes with a message id that could be fetched calling getJMSMessageID(). Besides, we can use custom properties to enrich a message in any way we think make sense for our specific case. An interesting aspect of custom properties is that we can use them to discriminate on which message a consumer should receive.

Think to an application where the producer generates messages for the best offers we have in our hardware store. The consumer could be interested in all or only in a subset of them. For this reason, it could be useful to put in the message payload just a generic description, and use custom properties to store structured information, as the product name and its price.

In this way the consumer could easily specify if it wants to get all the messages, or create a selector on the custom properties. The rules to create a selector are coming by simplification from the SQL conditional expressions.

The complete Java source code of this example is on github, it is written for ActiveMQ, so you should have this MOM installed and working on your development environment.

The producer delegates the most interesting part of its job to the send() method, that creates a message, fills it with specific data, and then send it to its associated queue:
TextMessage message = session.createTextMessage();
message.setText(txt);
message.setStringProperty(PROP_DES, des); // 1
message.setDoubleProperty(PROP_PRICE, price); // 2
producer.send(message);
1. Create a String property named as defined in the first parameter ("Description"), and containing the value stored in the second one ("Pins" and "Nails" in my test example).
2. The PROP_PRICE (actually, "Price") property is of type double.
In this test it is handy using a MessageListener, as discussed in a previous post, so that all the relevant messages are consumed by its onMessage() method:
logger.info("{}: {}", PROP_DES, message.getStringProperty(PROP_DES)); // 1
logger.info("{}: {}", PROP_PRICE, message.getDoubleProperty(PROP_PRICE)); // 2
logger.info("Message id: {}", message.getJMSMessageID()); // 3
logger.info("Message: {} ", ((TextMessage) message).getText());
1. The PROP_DES property is retrieved from the message, as the String that it is.
2. PROP_PRICE is a double, but we could have managed it as a String, calling getStringProperty(). ActiveMQ knows how to convert the internal double value to a String, so this cast works fine. If we tried to do the other way round, extracting a double from (1), we would have got a NumberFormatException.
3. There is not much use for this line here, but it is just to show a standard JMS property at work.

We need just one fundamental step, telling the JMS Session that the consumer has to get only some message:
switch (arg) {
    case "Nails":
    case "Pins":
        filter = PROP_DES + " = '" + arg + "'"; // 1
        consumer = session.createConsumer(destination, filter);
        break;
    case "Cheap":
        filter = PROP_PRICE + " < 15"; // 2
        consumer = session.createConsumer(destination, filter);
        break;
    default:
        consumer = session.createConsumer(destination); // 3
        break;
}
1. The string "Description = Nails" (or "Description = Pins") is passed in the next line to the session as a message selector for the consumer that has to be created. Only the messages true under that condition are delivered to this consumer.
2. Same as above, but the condition is now "Price < 15".
3. A "normal" consumer is created, with no associated selector.

Go to the full post

Asynchronous Hello ActiveMQ consumer

It is simple to modify the hello ActiveMQ example to change the consumer from syncronous to asyncronous.

If we want to asynchronously consume a message, we have to create a class that implements the JMS MessageListener interface, something like:
private class MyMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if(message instanceof TextMessage) {
            try {
                logger.info("Received {} ", ((TextMessage) message).getText());
            } catch (JMSException e) {
                logger.error("Can't extract text from received message", e);
            }
        }
        else
            logger.info("Unexpected non-text message received.");
    }
}
Then the consumer code changes only slightly:
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MyMessageListener()); // 1
try{ Thread.sleep(1000); } catch(InterruptedException e) {} // 2
1. Associate to the consumer an instance of our listener.
2. Usually we should implement a more sophisticated way of keeping alive the consumer till all the relevant messages are consumed. But for this simple example sleeping for a second should be enough.

The Java source code for the hello producer-consumer example, both in the synchronous and asynchronous flavor, is on github.

Go to the full post

Hello ActiveMQ

Writing an hello application with ZeroMQ is very easy, RabbitMQ requires a bigger effort, and ActiveMQ is not complicated, but probably the less immediate in the company. This escalation in complication is reflected also in the system's responsiveness (ZeroMQ is blazing fast and, compared to it, ActiveMQ could seem slow) but is well repaid by a higher level of offered services.

It is impossible to say in abstract which Message Queue structure is "better". It is more a matter of selecting the adequate one for specific environment requirements. A reason to decide for ActiveMQ is often that it implements the JMS specification.

In any case, say that we have already decided, and we have installed ActiveMQ on on our machine. Now it is time to write an hello application.

ActiveMQ is bundle with SLF4J as logger. I have talked of the trickiness in their relation in a previous post, so let's assume this is not an issue anymore. If you don't even know what SLF4J is, you could have a look at another post where I have written some notes on its installation.

Producer

The following code is all you need to establish a connection to the ActiveMQ broker and put a text message on a queue:
ConnectionFactory factory = new ActiveMQConnectionFactory(); // 1
Connection connection = null;
try {
    connection = factory.createConnection(); // 2
    connection.start();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 3
    Destination destination = session.createQueue(QUEUE_NAME); // 4
    MessageProducer producer = session.createProducer(destination); // 5
    TextMessage message = session.createTextMessage(arg); // 6
    producer.send(message); // 7
    logger.info("Sending: {}", message.getText()); // 8
} catch(JMSException ex) {
    ex.printStackTrace();
} finally { try { if(connection != null) connection.close(); } catch (JMSException e) {} } // 9
1. Since no address is specified, ActiveMQ assumes we want to connect to default broker, on localhost:61616.
2. A new connection to the browser is created and, in the next line, started.
3. On a connection we create a session, that could be transactional. Here we don't need it, so we pass "false" as first parameter. The second parameter specify if we want to acknowledge explicitly the message consume or not. Here we let ActiveMQ to take care of this.
4. Where the message is going to be sent. If the queue specified does not exist, ActiveMQ creates it for us.
5. The producer is an object created by the session for a specified destination that takes care of the message move from this client to the broker.
6. A text message is created from a String (in this case "arg" is the variable containing it).
7. The producer sends the message.
8. Remember that we are using SLF4J as a logger, if you wander about the strange syntax in the string, you may be interested in the post I have written on the efficiency reason for it.
9. Whatever happens in the try block, we want the connection to be closed.

Consumer

Consuming a message on ActiveMQ is not much different than producing it:
MessageConsumer consumer = session.createConsumer(destination); // 1
Message message = consumer.receive(1000); // 2
if(message == null)
    logger.info("No pending message on {} queue.", QUEUE_NAME);
else if(message instanceof TextMessage) // 3
    logger.info("Received: {}", ((TextMessage)message).getText());
else // 4
    logger.info("Unexpected non-text message consumed.");
1. First difference, we create a consumer on the session.
2. A message is consumed. The passed parameter is a timeout. If no message is waiting for us on the queue, and nothing arrives in a second, the consumer returns a null.
3. In the producer we created a text message, so we expect it to be of the same type here. If this is what we have, we work with it.
4. If we get here we are perplexed: who put a non-textual message on our queue?

You you want to play with the complete source Java code, you can find it on github.

Go to the full post

Installing ActiveMQ

What you need to run ActiveMQ on your machine is a recent Java development kit, and ActiveMQ itself. You can find the latter in the official Apache ActiveMQ download page, and you can get more information on the process in the related Getting Started page.

The ActiveMQ broker is ready to run out of the box, on its default port 61616, simply running a batch file in its binary folder. In my case, I have installed ActiveMQ on Windows XP (I know, we are in 2012, but this ancient operating system is still alive and kicking) in a folder named C:\dev\apache-activemq-5.5.1, and I run its broker executing from a shell in that folder the command
bin\activemq
As a feedback I get some log information including a notification of the JVM used, and on the started ActiveMQ components and subsystems. For the sake of testing the installation, the two most interesting lines are:
INFO | Listening for connections at: (your machine):61616
...
INFO | ActiveMQ Console at http://0.0.0.0:8161/admin
Whenever you want to check if the connection is still alive, you can run netstat, as I did here:
netstat -na|find "61616"
 TCP    0.0.0.0:61616          0.0.0.0:0              LISTENING
Meaning: the port 61616 on localhost is listening for a TCP connection. All is running as expected.

Besides, we can access the ActiveMQ console from our favorite browser at the http://localhost:8161/admin address, and see there a number of administrative information on the broker.

Go to the full post

ActiveMQ 5.5 and SLF4J 1.6

Working on a simple Hello World example for ActiveMQ, I bumped in a conflict between this Apache Message-Oriented-Middleware (MOM) and the SLF4J logging system. It is nothing serious, but I guess it is worth to enter in some detail on this issue.

SLF4J is the used as logger by ActiveMQ. Unfortunately, still in version 5.5, the SLF4J bundled version is 1.5, that's a pity, because starting from SLF4J version 1.6 if you place no binder in your application classpath, the NOP logger is assumed. The effect is that all your log goes to a logical respective of /dev/null, as to say, it disappears in thin air. That's no fun, but better than the behavior of 1.5: crashing miserably with an error like this:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
 further details.
Exception in thread "main" java.lang.NoClassDefFoundError: 
    org/slf4j/impl/StaticLoggerBinder
        at org.slf4j.LoggerFactory.getSingleton(LoggerFactory.java:230)
...
So, you really have to plug in a binder. I already had at hand a 1.6 plugin, I tried to used it, but I got another error:
SLF4J: The requested version 1.6 by your slf4j binding is not compatible
 with [1.5.5, 1.5.6, 1.5.7, 1.5.8, 1.5.9, 1.5.10, 1.5.11]
No compatibily.

There are a couple of solution to this issue. Or you use a plugin for that specific version, or you put the SLF4J API jar of your choice in your classpath before the ActiveMQ full jar.

I opted for the second choice, and my classpath is now referring to these jar - in this order:
  • slf4j-api-1.6.4.jar
  • activemq-all-5.5.1.jar
  • slf4j-simple-1.6.4.jar
Actually, the SLF4J plugin could be in any place, but it is crucial that the API SLF4J jar is before the ActiveMQ one.

Go to the full post

Improved RPC by JSON

If the data exchange is meant to use only strings, the basic RPC support provided by RabbitMQ is more than adequate, and the StringRpcServer class makes it even easier. But what if we want to use other data types? The rabbit answer to this question is JSON-RPC. This RPC protocol based on JSON is implemented in a couple of classes, JsonRpcServer and JsonRpcClient, that take care of the low leve details and let us free to use RPC in a more natural way.

To get acquainted to it, I went through the JSON-RPC example provided in the test folder of the source RabbitMQ distribution, package com.rabbitmq.examples, files HelloJsonService.java, HelloJsonClient.java, and HelloJsonServer.java. I have reworked it and you can see my commented version in this post, full Java source code is available on github.

The service

The JSON-RPC interface between client and server is represented by a service, that exposes to the client which methods are available on the server:
public interface JsonSvc {
    String greeting(String name);
    int sum(List<Integer> values);
}
This interface is going to be implemented by a class, I called it MyJsonSvc, used internally by the RPC server.

The client

In the client, the service is used to provide access to the server functionality, as shown here:
// ...
JsonRpcClient client = new JsonRpcClient(channel, "", QUEUE_NAME, RPC_TIMEOUT_ONE_SECOND); // 1
JsonSvc service = (JsonSvc)client.createProxy(JsonSvc.class); // 2

System.out.println(service.greeting("Rabbit")); // 3

List<Integer> numbers = new ArrayList<>();
numbers.add(1);
numbers.add(2);
numbers.add(3);
System.out.println("1 + 2 + 3 = " + service.sum(numbers));
// ...
client.publish(null, null); // 3
1. A JSON-RPC client is created, specifying a channel, a queue, and also an optional timeout in milliseconds.
2. This is the core of the example, the rabbit JSON-RPC client creates a proxy for the passed interface, then we can use it in our client as shown below.
3. We could still use the client as a plain RpcClient client, bypassing the JSON layer. Here we are sending an empty message with no associated properties.

The server

Implementing the JSON-RPC capabilities in the server is straightforward:
// ...
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
JsonRpcServer server = new MyRpcServer(channel, QUEUE_NAME, JsonSvc.class, new MyJsonSvc()); // 1

System.out.println("JSON-RPC server is up.");
server.mainloop(); // 2
// ...
1. We create a JsonRpcServer object specifying a service on which it would operate, and the interface that it should use to access it.
2. Then we start looping on it.

You have surely noticed in the code here above that I didn't create a plain JsonRpcServer, but a mysterious MyRpcServer. I did that because I wrote the client to be a bit smarter than usual, calling not only the JsonSvc methods, but also sending a plain (empty) message. To give the server a way to manage it as expected (in this case an empty message is seen as a signal to terminate the server run), I have to add functionality to the JsonRpcServer, as we already have seen for StringRpcServer. Actually, all we need is just reimplementing handleCast(), so that we can react correctly to an empty message:
// ...
private class MyRpcServer extends JsonRpcServer {
    // ...
    @Override
    public void handleCast(byte[] requestBody)
    {
        if(requestBody.length == 0) {
            System.out.println("Empty message, terminating.");
            terminateMainloop();
        }
    }
}

Go to the full post

RPC for strings

Writing a RabbitMQ RPC (Remote Procedure Call) application is not difficult, still we have to deal with a few internal details that could easily be hidden in a utility class. Actually, Rabbit gives us two hierarchies designed for that, based on com.rabbitmq.client.RpcServer and com.rabbitmq.client.RpcClient.

RpcClient is flexible enough to be used directly in most cases, while the RpcSever has to be adapted to the effective application requirements. The example of this post is based on the HelloClient - HelloServer classes included in the source RabbitMQ distribution (you should find them in the test directory, in the com.rabbitmq.examples package). The Java source code for my variation is available on github.

The client sends a string message to the server, and receives back a response. It could also send an empty message, that would be interpreted by the server as a command to shutdown, and in this case no answer is expected.

We delegates to RpcClient all the low level details, and this is the slim resulting code:
// ...
RpcClient service = new RpcClient(channel, "", QUEUE_NAME); // 1

if(arg.equalsIgnoreCase("x")) { // 2
    System.out.println("Terminating server");
    service.publish(null, null);
}
else {
    String result = service.stringCall(arg); // 3
    System.out.println(result);
}
// ...
1. Instantiate an RPC client that is going to work on the specified channel and queue. The second parameter, not used here, is reserved for the exchange; in that case the third parameter would be used for the routing key.
2. If the user pass an x (upper or lowercase) we interpret it as a request to shut the server down. In this case we the raw RpcClient.publish() method, that expects two parameter, the properties associated to the message, and the message itself, as an array of bytes. In this case both of them are null.
3. Usually we rely on RpcClient.stringCall(), that wraps a complete exchange with the server. The passed message is sent, and when the reply arrives is passed back to the caller.

The server it a bit more complicated. Firstly we need to specialize the StringRpcServer, we could create an anonymous inner class on the fly, but here I wrote it as a plain inner class, aiming to readability:
private class MyRpcServer extends StringRpcServer {
    public MyRpcServer(Channel channel, String queueName) throws IOException {
        super(channel, queueName);
    }

    @Override
    public String handleStringCall(String request) { // 1
        System.out.println("Input: " + request);
        return "Hello, " + request + "!";
    }

    @Override
    public void handleCast(byte[] requestBody) // 2
    {
        if(requestBody.length == 0) {
            System.out.println("Empty message, terminating.");
            terminateMainloop(); // 3
        }
    }
}
1. The standard case, a string is received from the client, here we use it to generate the result, and we send it back to the caller.
2. Less commonly, we want to consume the message received from the client without sending back anything.
3. This is the method to call to terminate the looping on the RPC server, exactly what we need here.

The server itself instantiates its RPC server object and let it looping on the messages arriving from the clients:
// ...
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

StringRpcServer server = new MyRpcServer(channel, QUEUE_NAME);
System.out.println("RPC server is up");
server.mainloop();
// ...
RpcServer.mainLoop() loops indefinitely on RpcServer.processRequest() that checks if both correlationId and replyTo are set among the request properties. If so, RpcServer.handleCall() is called, and its returned value is published back to specified queue. Otherwise, RpcServer.handleCast() is called.

Go to the full post

Fibonacci RPC client

In this RPC (remote procedure call) RabbitMQ Fibonacci application, once written the server component, we are almost done.

The client simply has to send a message to the server, and just sits there waiting for the answer. The variation is that I wrote such a powerful client that could even kill the server. To do that it just has to send an empty message. This is not a very safe behavior, but it is handy to show how to manage uncommon messages, at least in the RPC by messaging pattern, where the client doesn't wait for an answer.

Here is the most interesting part of the client code, you could also have a look to the source code for the complete Java class, that contains both server and client functionality:
// ...
if(arg.equalsIgnoreCase("x")) {
    System.out.println("Terminating Fibonacci server");
    channel.basicPublish("", RPC_QUEUE_NAME, null, null); // 1
}
else {
    String queueName = channel.queueDeclare().getQueue();
    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer); // 2

    String id = UUID.randomUUID().toString(); // 3
    BasicProperties props = new BasicProperties.Builder().correlationId(id).replyTo(queueName).build(); // 4
    channel.basicPublish("", RPC_QUEUE_NAME, props, arg.getBytes()); // 5
    System.out.println("Reply to " + queueName + ", " + id);

    System.out.print("fibonacci(" + arg + ") = ");
    while (true) { // 6
        QueueingConsumer.Delivery delivery = consumer.nextDelivery();
        if (delivery.getProperties().getCorrelationId().equals(id)) { // 7
            System.out.println(new String(delivery.getBody())); // 8
            break;
        }
    }
}
// ...
1. This is the killer empty message for the RPC Fibonacci server.
2. An implicit acknowledgment is enough in this case.
3. As explained talking about the server, a unique correlation id is used to ensure the matching between request and reply. Here we generate it using the UUID facility.
4. The correlation id and the name for the specific queue created exclusively for this client is passed to the server in the properties associated to the message.
5. Message and properties are sent.
6. We loop indefinitely, discarding all the messages that are not relevant.
7. That's it! The correlation id matches.
8. In the message body we find the reply to our question, and we can terminate.

Go to the full post

Fibonacci RPC server

As a first example of RabbitMQ RPC (remote procedure call) application, I have studied the Fibonacci generator provided in the RabbitMQ official tutorial. Here you can find my version of the rabbit Fibonacci server, the rabbit Fibonacci client is in the next post.

I put both client and server in the same class, and you could get the complete Java source code from github.

The server waits indefinitely on a queue, named "rpc_queue", for inputs to be used to calculate a Fibonacci number. It consumes just a message at time, and sends an explicit acknowledgment to the broker when it completes the elaboration. All of this is not very useful here, but it could be seen as a hint for a future redesign where more Fibonacci calculators are available to the huge population of clients requesting this awesome service.

The server accepts only input in the range MIN_INPUT .. MAX_INPUT, and interprets an empty message as a terminator. All the other possible messages are rejected.

Sending the result to the client is a bit tricky. We need the client to tell the server on which queue it is waiting for this answer, and this is not enough. Since a client could send more than one request, at least theoretically, we need to get and send back an identifier, that would correlate the request to the response. This is done putting these values in a properties object associated to the message.

This is the most interesting part of the resulting code:
// ...
try {
    // ...

    channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); // 1
    channel.basicQos(1); // 2

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(RPC_QUEUE_NAME, false, consumer); // 3

    System.out.println("RPC Fibonacci calculator is ready");

    boolean terminator = false;
    while (true) {
        String response = ""; // 4

        QueueingConsumer.Delivery delivery = consumer.nextDelivery(); // 5

        try {
            String message = new String(delivery.getBody());
            if(message.isEmpty()) { // 6
                System.out.println("Empty message, terminating");
                terminator = true;
                break;
            }

            int n = Integer.parseInt(message);
            System.out.println("Calculating fibonacci(" + message + ")");
            if(n < MIN_INPUT || n > MAX_INPUT)
                response = "N/A";
            else
                response += fibonacci(n); // 7
        }
        catch (Exception e){
            e.printStackTrace();
        }
        finally {
            if(!terminator) { // 8
                BasicProperties bp = delivery.getProperties(); // 9
                String corrId = bp.getCorrelationId(); // 10
                String replyQueue = bp.getReplyTo(); // 11
                System.out.println("Replying to " + replyQueue + ", " + corrId);
                
                BasicProperties replyProps = new BasicProperties.Builder().correlationId(corrId).build();
                channel.basicPublish("", replyQueue, replyProps, response.getBytes()); // 12
            }
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // 13
        }
    }
}
catch  (Exception e) {
    e.printStackTrace();
}
finally {
    try{ if(connection != null) connection.close(); } catch(IOException e) {}
}
1. The queue used by this application only.
2. No prefetch is done, just one message is picked up at time.
3. Explicit acknowledgment when the output is generated.
4. It is handy to initialize the response to an empty string, so that we can concatenate the resulting Fibonacci number to it.
5. Wait a client's request.
6. Special case, the client asks the server to shutdown.
7. Call the actual Fibonacci generator.
8. Usually we send back a message to the caller, only in case of termination there is no need of sending back anything.
9. In the delivery we have an object where the properties of the message are stored.
10. We use the correlation id the identify uniquely a request coming from a client
11. In the reply to field we expect to see the queue where the client wants us to put the result.
12. This is the main line in the post. We publish the response to the queue passed by the client, specifying in the properties the correlation id of its request.
13. Only now we can say to the broker it could get rid of the message we read on (5).

Go to the full post

RabbitMQ message acknowledgment

To understand better how it works, could be useful compare the behavior of a very simple rabbit consumer when the acknowledgment flag is set to true or false in the Channel.basicConsume() call.

I have written a stripped down example where the producer puts a message in a queue and then quits. The consumer works with automatic or explicit acknowledgment accordingly to an input value is provided to it. Looking at the code, we can see how the changes are very localized. We specify that the QueueingConsumer for the channel/queue is in one mode or the other, and then, if an explicit ack is required, we give it when the job is done:
// ...
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, autoAck, consumer); // 1

QueueingConsumer.Delivery delivery = consumer.nextDelivery();
// ...

if(!autoAck) { // 2
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    // ...
}
1. Here we specify if an handshake is required (autoAck sets to false) or if we rely on the standard auto-acknowledging RabbitMQ policy.
2. When we are done with the message, and if we are not in auto-acknowledgment mode, we give our explicit acknowledgment to the rabbit broker.

The complete Java code is on github.

Go to the full post

Always close a RabbitMQ connection

I see now that in the previous rabbit examples I have missed to ensure that a connection would always be closed at the end of the program. The result is that a rabbit process, under some specific circumstance, could unhappily hang until an interrupt terminates its sad life. Remember not to do the same mistake in your production code.

The typical (bad) code is something like:
// ...
try {
    Connection connection = factory.newConnection(); // 1
    Channel channel = connection.createChannel();

    // ...

    connection.close(); // 2 !! BAD !!
}
catch(IOException e) {
    // ...
}
1. A connection is created and open
2. The connection is closed

The issue is that in case of an exception after (1) but before (2), the cleanup connection code is not called, since the control jumps directly to the catch section.

The solution is pretty easy, we should add a finally clause, and clean the connection there:
// ...
try {
    Connection connection = factory.newConnection();
    // ...
    // 1
}
catch(IOException e) {
    // ...
}
finally { // 2
    try{
        if(connection != null) connection.close();
    } catch(IOException e) {} // 3
}
1. The connection cleanup is not done here anymore.
2. The finally section is always execute, whatever happens above.
3. For what said in (2), we should ensure the connection has been actually been instantiate, before calling its close() method. Besides, we have to try-catch it, since it could throws an IO exception.

For tiny test applications, this is not such an important remark. It is probably better stressing other points and leaving out this detail. But in real code, forgetting to adequately protect the connection cleanup procedure could lead to serious problems.

Go to the full post

Passive or active queues?

In the previous RabbitMQ examples I have written, I have always declared queues calling Channel.queueDeclare(). But it is also possible to do it calling Channel.queueDeclarePassive(). As you could easily guess, the first method is for an active queue declaration, and the second for a passive one. The point of this post is: what a passive queue is and how to choose between declaring a queue as passive or active.

If you play with queue setting, one day or another you will end up getting an exception, due to the fact that once a queue is created, you can't access it specifying a different setting:
java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
...    
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
 reason: {#method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - 
 parameters for queue 'ack' in vhost '/' not equivalent, class-id=50, method-id=10),
 null, "[B@19d3b3a"}
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
...
In this case I tried to access a queue called "ack", declared in the default virtual host, passing the parameter "autodelete" set to true, where it was created with false.

When this happens, you could have a look to the RabbitMQ administrator, specifically in the Queues tab, to get all the available information on your queue.

But back to the main theme of the post.

If a queue is declared passively, RabbitMQ assumes it already exists, and we just want to establish a connection to it. What if you call Channel.queueDeclarePassive() and the queue is not there? I guess you already know the answer, you get a fat exception:
java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
...
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error;
 reason: {#method<channel.close>(reply-code=404, reply-text=NOT_FOUND -
 no queue 'ack' in vhost '/', class-id=50, method-id=10), null, "[B@140fee"}
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
...
Close to the exception shown above, but here the reply text is a NOT_FOUND that shows clearly what the problem is. Rabbit was running happily assuming the queue was there, but actually it wasn't.

So, we use a passive queue declare when we can safely assume the queue is already there, and its non-existence would considered an exceptional, almost catastrophic, event.

The safer active queue declaration has the obvious downside of being more expensive, and requiring the user to provide the setting each time a queue declaration is issued. An hybrid approach (active on the server, passive on the client) could make sense if the application has a rigid protocol ensuring that a component would always start before the other(s).

Go to the full post

ConnectionFactory setting by URI

In the previous post we set properties on a RabbitMQ ConnectionFactory constructing an object and then calling all the relevant setter on it. Sometimes it is handier to directly create a ConnectionFactory object passing an URI in its constructor.

We can use both a standard Java URI object, as defined in the java.net package, or a String in the expected AMQP format. Let's say that we already have the URI as a string, and we can pass it to our rabbit application, we can use this information directly:
private ConnectionFactory getFactory(String uri) throws IOException { // 1
    ConnectionFactory factory = new ConnectionFactory();
    try {
        factory.setUri(uri); // 2
    } catch (URISyntaxException|NoSuchAlgorithmException|KeyManagementException e) {
        throw new IOException("Can't set connection factory URI", e); // 3
    }
    return factory;
}
1. We expect uri to be something like "amqp://user:password@127.0.0.1:6391/test", where "test" is the name of our virtual host. We are not forced to specify all parts in the AMQP URI, but the host name must be passed if at least one property among username, password, or port is given.
2. The passed URI is carefully checked, and three different exceptions could be thrown to acknowledge a specific error.
3. The error handling here is very loose, any passible exception is converted in an IOException and forwarded to the caller. The reason for this is just keeping the code simple, since I don't care much of having such detailed information on the reason for failure. This shouldn't be an optimal choice for production code.

Producer and consumer should be slightly changed to try-catch also on this method:
public void consumer(String uri) {
    try {
        ConnectionFactory factory = getFactory(uri);
        Connection connection = factory.newConnection();
        
        // ...

        connection.close();
    }
    catch (IOException|InterruptedException e) {
        e.printStackTrace();
    }
}

Go to the full post

Direct exchange on a custom RabbitMQ broker

The stress in this post is not on how to do a direct exchange with RabbitMQ, but on how to create a RabbitMQ producer-consumer application when the broker is not an off-the-shelf setup. In the previous post I have twisted a bit the broker rabbit configuration, setting a non-standard port, defining a virtual host and a user on it. Now I am going to write a trivial client-server RabbitMQ application for this setup.

This minimal application is contained in a single class, and it has a couple public methods, producer() and consumer(), that implement a very raw direct exchange where a single message is produced and consumed. They both call the private method getFactory() instead of creating a standard ConnectionFactory object. And this the interesting part of the application:
static private final String HOST = "127.0.0.1"; // 1
static private final int PORT = 6391; // 2
static private final String V_HOST = "test"; // 3
static private final String USER = "user"; // 4
static private final String PASSWORD = "password";

//...

private ConnectionFactory getFactory() {
    ConnectionFactory factory = new ConnectionFactory(); // 5
    factory.setHost(HOST); // 6
    factory.setPort(PORT);
    factory.setVirtualHost(V_HOST);
    factory.setUsername(USER);
    factory.setPassword(PASSWORD);

    System.out.println("Starting on " + factory.getHost() + ':' + factory.getPort() + " ["
            + factory.getVirtualHost() + "] for " + factory.getUsername());
    return factory;
}
1. This is a puny testing application running on the same machine of the broker, so localhost is used.
2. Default port for RabbitMQ is 5672, but here we are using a different one.
3. We don't use the default virtual host ("/") but our specific "test" one.
4. The default user is "guest" (with password "guest"), here we are using instead our user expressly created for our v-host.
5. A default connection factory works fine for a default setting of the RabbitMQ broker. For make it work for our customized broker, we have to explicitly set all the changed properties.
6. Actually, the host is localhost, as set by default in the ConnectionFactory ctor, but let's assume we'll change it soon, as it usually happens in real life.

Obviously all those properties should be be easily configured, stored on mass memory and fetched by the application at startup, or passed as argument from the command line, but this is not the point of this post.

The rest of the class code is not very interesting, but if you want to see the full Java class, you can get it from github.

Go to the full post

RabbitMQ broker setup

As we have seen, it is very easy start working with RabbitMQ. If we are happy with the default setting we can be up and running in a matter of minutes. It is still easy, but requires a bit of work to change the rabbit broker setup to use a different port and restrict the access to authorized users.

From the point of view of a broker client, there are a few things that could go wrong when trying to connect. As usual, these problems are converted in exceptions, that your client should make available to the user is some form. Here is the three of them I bumped in while doing some testing.

The broker is down:
java.net.ConnectException: Connection refused: connect
    at java.net.TwoStacksPlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
    at java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source)
    at java.net.AbstractPlainSocketImpl.connect(Unknown Source)
    at java.net.PlainSocketImpl.connect(Unknown Source)
    at java.net.SocksSocketImpl.connect(Unknown Source)
    at java.net.Socket.connect(Unknown Source)
...
The user password is wrong:
com.rabbitmq.client.PossibleAuthenticationFailureException: Possibly caused by authentication failure
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:342)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:533)
...
The user is unknown:
java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:375)
...
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error;
reason: java.io.EOFException
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
...
There is a rabbit plugin designed to provide administrator capabilities, but before using it, we have to enable it through this command:
rabbitmq-plugins enable rabbitmq_management
On Windows you should get this feedback:
The following plugins have been enabled:
  mochiweb
  webmachine
  rabbitmq_mochiweb
  amqp_client
  rabbitmq_management_agent
  rabbitmq_management

Plugin configuration has changed. Restart RabbitMQ for changes to take effect.
If you have RabbitMQ running as a service then you must reinstall by running
  rabbitmq-service.bat stop
  rabbitmq-service.bat install
  rabbitmq-service.bat start
As you could expect, if you don't need it anymore, you can get rid of it:
rabbitmq-plugins disable rabbitmq_management
Once you have installed the plugin, and restarted the broker, you can access the rabbit administrator via your preferred browser, by an user with administration rights, by default guest with password guest:
http://localhost:15672/
[edit]
When I wrote the post, the default port was 55672, with version 3.0 it has be changed to 15672
[/edit]


We can create a virtual host, so to keep insulated all the message streams relative to a specific environment. Just go to the "Virtual Hosts" tab, click on "Add a new virtual host", specify a meaningful name (shame on me, I called it "test") and click on the "Add virtual host" button. So easy just that, and we have a new virtual host.

Still a virtual host with no user associated is not very useful and, as we see in the "All virtual hosts" section, a newly created virtual host has "No users" associated.

So we access the "Add / update a user" section in the "Users" tab, and create a new user. Insert a reasonable user-password couple (again, I didn't pay attention to my good suggestion, and I entered a lousy "user"-"password") and choose the priviledge for the new user among Management, Monitoring, and Administrator. You can also not enter any tag, meaning it is just a simple user with no access to the management plugin. This make perfectly sense in my case, and so I went for it.

We still have to associate the new user to the new virtual host, so we get back to the "Virtual Hosts" tab, select your v-host from the "All virtual hosts" table, click on the "Set permission" section, select the newly created user from the drop-down list and then click on "Set permession".

That's it. Now we have a brand new v-host that works only for the newly specified user.

But before using them in a client, let's mess it up a little bit more. Say that we don't want to use the standard rabbit port but a custom 6391. We can tell to rabbit which port to use setting the environment variable RABBITMQ_NODE_PORT to the required value. So, in Windows we could write a tiny command shell like this one:
@echo off
setlocal
set RABBITMQ_NODE_PORT=6391
rabbitmq-server.bat
endlocal
Running the standard rabbitmq-server.bat the rabbit broker would run on its standard port 15672 (or 55672 for versions before 3.0), running our script it would refer to our 6391 port.

Having changed the configuration as described, now we want to write a very simple direct exchange rabbit application that uses it. It is quite easy, but I went out of time, and I am forced to show you that in the next post.

Go to the full post

Simple Coherence observer

Once you have put some elements in a Oracle Coherence named cache, you are often interested in checking out if someone else is doing anything in there, inserting, updating, or even deleting from it.

What you could want is observing what happens to your cache, probably something like using the observer pattern.

Doing that with Coherence is pretty easy, we need to define a class extending AbstractMapListener, to specify what we want actually do in case of insertion, editing or deletion on the cache; than we call addMapListener() on our cache passing an instance of such class, and basically we are done.

A very simple example of a map listener could be:
public class MyCacheListener extends AbstractMapListener {
    @Override
    public void entryInserted(MapEvent event) {
        System.out.println("*** Inserted: " + event);
    }

    @Override
    public void entryUpdated(MapEvent event) {
        System.out.println("*** Updated: " + event);
    }

    @Override
    public void entryDeleted(MapEvent event) {
        System.out.println("*** Deleted: " + event);
    }
}
It is not a fancy implementation, it just dumps to standard output the event signaling the item affected in the cache, but starting from this we can create a more useful functionality.

The class that works with the cache would probably implement a method like this:
public void observe() {
    cache.addMapListener(new MyCacheListener());
}
And this is a piece of code that would start observing on a Coherence cache:
CacheObserver cohCli = new CacheObserver(); // 1
cohCli.observe();

cohCli.checkPut("key", "value"); // 2
cohCli.getCheck("key", "value");
cohCli.checkPut("key", "change");
cohCli.checkRemove("key");

// ...
1. In my test code, CacheObserver extends the SimpleCache class seen in the previous post adding the observe() method seen here above.
2. Any time a change is done in the cache, the observer would dump the event generated to the screen.

Go to the full post

Basic Coherence functionality

Once you have setup Oracle Coherence in your development environment, and you have tested a simple Hello World application, you are ready to write something moderately more interesting.

In the following example I am using just a pair of Coherence classes. They still have in their package name a "Tangosol" reference, from the company that now, like many other ones, is part of Oracle:
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;

public class SimpleCache {
    protected NamedCache cache;
    protected static final String CACHE_NAME = "MyCache";

    public SimpleCache() { // 1
        CacheFactory.ensureCluster();
        cache = CacheFactory.getCache(CACHE_NAME);
    }

    public void dtor() { // 2
        CacheFactory.shutdown();
    }

    public void checkRemove(String key) {  // 3
        System.out.println("+++ removing " + key);
        if(cache.containsKey(key)) {
            cache.remove(key);
        }
        else {
            System.out.println("+++ " + key + " is not in cache");
        }
    }

    public void checkPut(String key, String value) { // 4
        System.out.println("+++ putting " + key);
        if(cache.containsKey(key)) {
            System.out.println("+++ " + key + " already in cache");
        }
        cache.put(key, value);
    }

    public void getCheck(String key, String expValue) { // 5
        System.out.println("+++ getting " + key);
        String output = (String)cache.get(key);
        if(output == null)
            System.out.println("+++ " + key + " is not in the cache");
        else if(output.compareTo(expValue) != 0)
            System.out.println("+++ Unexpected: " + expValue + " != " + output);
    }

    public void full(String key, String value) { // 6
        checkPut(key, value);
        getCheck(key, value);
        checkRemove(key);
    }

    public static void main(String[] args) {
        SimpleCache cohCli = new SimpleCache();

        if(args.length == 0)
            cohCli.full("key", "value"); // 7
        else {
            if(args[0].matches(".*[Pp].*")) // 8
                cohCli.checkPut("key", "value");
            if(args[0].matches(".*[Gg].*"))
                cohCli.getCheck("key", "value");
            if(args[0].matches(".*[Rr].*"))
                cohCli.checkRemove("key");
        }
        cohCli.dtor();
    }
}
1. In the class constructor we get the cache from the factory.
2. From the name of this method you could correctly guess I am more a C++ that a Java guy. If you wonder, dtor is a common short name for destructor, there is not such a beast in Java, but the name should be thought as an hint to the user programmer - call it when you are done with an instance of this class.
3. Before removing an element in the cache, ensure it actually is in it.
4. If an element with the specified key is already in the cache, issue a warning before storing the new value.
5. Get an element, and check that its value matches the expectation.
6. Utility method, combines (3), (4), and (5) in an unique call.
7. If no parameter is passed to the Java application, a full test is performed, putting, getting, and finally removing a key-value in the cache.
8. The passed parameter is expected to be a string stating what we want to do. A "pgr" is a synonim for a full test; if I want just put I should specify just "p" (or "P") and so on.

Go to the full post