RabbitMQ入门:远程过程调用(RPC)

假如我们想要调用远程的一个方法或函数并等待执行结果,也就是我们通常说的远程过程调用(Remote Procedure Call)。怎么办?

今天我们就用RabbitMQ来实现一个简单的RPC系统:客户端发送一个请求消息,服务端以一个响应消息回应。为了能够接收到响应,客户端在发送消息的同时发送一个回调队列用来告诉服务端响应消息发送到哪个队列里面。也就是说每个消息一个回调队列,在此基础上我们变下,将回调队列定义成类的属性,这个每个客户端一个队列,同一个客户端的请求共用一个队列。那么接下来有个问题,怎么知道这个队列里面的响应消息是属于哪个队列的呢?

我们会用到关联标识(correlationId),每个请求我们都会生成一个唯一的值作为correlationId,这样每次有响应消息来的时候,我们就去看correlationId来确定到底是哪个请求的响应消息,将请求和响应关联起来。如果收到一个不知道的correlationId,就可以确定不是这个客户端的请求的响应,可以直接丢弃掉。

一、工作模型

RabbitMQ入门:远程过程调用(RPC)

客户端发送启动后,会创建独特的回调队列。对于一个请求发送配置了两个属性的消息:一个是回调队列(图中的replay_to),一个是correlation。 这个请求会发送到rpc_queue队列,然后到达服务端处理。

服务端等待rpc_queue队列的请求。当有请求到来时,它就会开始干活并将结果通过发送消息来返回,该返回消息发送到replyTo指定的队列。

客户端将等待回调队列返回数据。当返回的消息到达时,它将检查correlation id属性。如果该属性值和请求匹配,就将响应返回给程序。

二、代码实现

接下来看代码实现:

 客户端

public class RpcClient { Connection connection = null; Channel channel = null; //回调队列:用来接收服务端的响应消息 String queueName = ""; // 定义RpcClient public RpcClient() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); queueName = channel.queueDeclare().getQueue(); } // 真正的处理逻辑 public String call(String msg) throws IOException, InterruptedException { final String uuid = UUID.randomUUID().toString(); //后续,服务端根据"replyTo"来指定将返回信息写入到哪个队列 //后续,服务端根据关联标识"correlationId"来指定返回的响应是哪个请求的 AMQP.BasicProperties prop = new AMQP.BasicProperties().builder().replyTo(queueName).correlationId(uuid).build(); channel.basicPublish("", RpcServer.QUEUE_NAME, prop, msg.getBytes()); final BlockingQueue<String> blockQueue = new ArrayBlockingQueue<String>(1); channel.basicConsume(queueName, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { if (properties.getCorrelationId().equals(uuid)) { String msg = new String(body, "UTF-8"); blockQueue.offer(msg); System.out.println("**** rpc client reciver response :[" + msg + "]"); } } }); return blockQueue.take(); } //关闭连接 public void close() throws IOException { connection.close(); } public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { RpcClient client = new RpcClient(); client.call("4"); client.close(); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zywxsj.html