RpcClient is flexible enough to be used directly in most cases, while the RpcSever has to be adapted to the effective application requirements. The example of this post is based on the HelloClient - HelloServer classes included in the source RabbitMQ distribution (you should find them in the test directory, in the com.rabbitmq.examples package). The Java source code for my variation is available on github.
The client sends a string message to the server, and receives back a response. It could also send an empty message, that would be interpreted by the server as a command to shutdown, and in this case no answer is expected.
We delegates to RpcClient all the low level details, and this is the slim resulting code:
// ... RpcClient service = new RpcClient(channel, "", QUEUE_NAME); // 1 if(arg.equalsIgnoreCase("x")) { // 2 System.out.println("Terminating server"); service.publish(null, null); } else { String result = service.stringCall(arg); // 3 System.out.println(result); } // ...1. Instantiate an RPC client that is going to work on the specified channel and queue. The second parameter, not used here, is reserved for the exchange; in that case the third parameter would be used for the routing key.
2. If the user pass an x (upper or lowercase) we interpret it as a request to shut the server down. In this case we the raw RpcClient.publish() method, that expects two parameter, the properties associated to the message, and the message itself, as an array of bytes. In this case both of them are null.
3. Usually we rely on RpcClient.stringCall(), that wraps a complete exchange with the server. The passed message is sent, and when the reply arrives is passed back to the caller.
The server it a bit more complicated. Firstly we need to specialize the StringRpcServer, we could create an anonymous inner class on the fly, but here I wrote it as a plain inner class, aiming to readability:
private class MyRpcServer extends StringRpcServer { public MyRpcServer(Channel channel, String queueName) throws IOException { super(channel, queueName); } @Override public String handleStringCall(String request) { // 1 System.out.println("Input: " + request); return "Hello, " + request + "!"; } @Override public void handleCast(byte[] requestBody) // 2 { if(requestBody.length == 0) { System.out.println("Empty message, terminating."); terminateMainloop(); // 3 } } }1. The standard case, a string is received from the client, here we use it to generate the result, and we send it back to the caller.
2. Less commonly, we want to consume the message received from the client without sending back anything.
3. This is the method to call to terminate the looping on the RPC server, exactly what we need here.
The server itself instantiates its RPC server object and let it looping on the messages arriving from the clients:
// ... channel.queueDeclare(QUEUE_NAME, false, false, false, null); StringRpcServer server = new MyRpcServer(channel, QUEUE_NAME); System.out.println("RPC server is up"); server.mainloop(); // ...RpcServer.mainLoop() loops indefinitely on RpcServer.processRequest() that checks if both correlationId and replyTo are set among the request properties. If so, RpcServer.handleCall() is called, and its returned value is published back to specified queue. Otherwise, RpcServer.handleCast() is called.
No comments:
Post a Comment