Fibonacci RPC server

As a first example of RabbitMQ RPC (remote procedure call) application, I have studied the Fibonacci generator provided in the RabbitMQ official tutorial. Here you can find my version of the rabbit Fibonacci server, the rabbit Fibonacci client is in the next post.

I put both client and server in the same class, and you could get the complete Java source code from github.

The server waits indefinitely on a queue, named "rpc_queue", for inputs to be used to calculate a Fibonacci number. It consumes just a message at time, and sends an explicit acknowledgment to the broker when it completes the elaboration. All of this is not very useful here, but it could be seen as a hint for a future redesign where more Fibonacci calculators are available to the huge population of clients requesting this awesome service.

The server accepts only input in the range MIN_INPUT .. MAX_INPUT, and interprets an empty message as a terminator. All the other possible messages are rejected.

Sending the result to the client is a bit tricky. We need the client to tell the server on which queue it is waiting for this answer, and this is not enough. Since a client could send more than one request, at least theoretically, we need to get and send back an identifier, that would correlate the request to the response. This is done putting these values in a properties object associated to the message.

This is the most interesting part of the resulting code:
// ...
try {
    // ...

    channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); // 1
    channel.basicQos(1); // 2

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(RPC_QUEUE_NAME, false, consumer); // 3

    System.out.println("RPC Fibonacci calculator is ready");

    boolean terminator = false;
    while (true) {
        String response = ""; // 4

        QueueingConsumer.Delivery delivery = consumer.nextDelivery(); // 5

        try {
            String message = new String(delivery.getBody());
            if(message.isEmpty()) { // 6
                System.out.println("Empty message, terminating");
                terminator = true;
                break;
            }

            int n = Integer.parseInt(message);
            System.out.println("Calculating fibonacci(" + message + ")");
            if(n < MIN_INPUT || n > MAX_INPUT)
                response = "N/A";
            else
                response += fibonacci(n); // 7
        }
        catch (Exception e){
            e.printStackTrace();
        }
        finally {
            if(!terminator) { // 8
                BasicProperties bp = delivery.getProperties(); // 9
                String corrId = bp.getCorrelationId(); // 10
                String replyQueue = bp.getReplyTo(); // 11
                System.out.println("Replying to " + replyQueue + ", " + corrId);
                
                BasicProperties replyProps = new BasicProperties.Builder().correlationId(corrId).build();
                channel.basicPublish("", replyQueue, replyProps, response.getBytes()); // 12
            }
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // 13
        }
    }
}
catch  (Exception e) {
    e.printStackTrace();
}
finally {
    try{ if(connection != null) connection.close(); } catch(IOException e) {}
}
1. The queue used by this application only.
2. No prefetch is done, just one message is picked up at time.
3. Explicit acknowledgment when the output is generated.
4. It is handy to initialize the response to an empty string, so that we can concatenate the resulting Fibonacci number to it.
5. Wait a client's request.
6. Special case, the client asks the server to shutdown.
7. Call the actual Fibonacci generator.
8. Usually we send back a message to the caller, only in case of termination there is no need of sending back anything.
9. In the delivery we have an object where the properties of the message are stored.
10. We use the correlation id the identify uniquely a request coming from a client
11. In the reply to field we expect to see the queue where the client wants us to put the result.
12. This is the main line in the post. We publish the response to the queue passed by the client, specifying in the properties the correlation id of its request.
13. Only now we can say to the broker it could get rid of the message we read on (5).

No comments:

Post a Comment