Java连接RabbitMQ实例(3)

@Override
 public void send(String exchange, String routingKey, Object message, Map<String, Object> properties) {
 MessageHeaders messageHeaders = new MessageHeaders(properties);
 Message<Object> msg = MessageBuilder.createMessage(message, messageHeaders);
 rabbitTemplate.setConfirmCallback(CONFIRM_CALLBACK);
 rabbitTemplate.setReturnCallback(RETURN_CALLBACK);
 CorrelationData correlationData = new CorrelationData();
 correlationData.setId(UUID.randomUUID().toString() + "-" + System.currentTimeMillis());
 rabbitTemplate.convertAndSend(exchange, routingKey, msg, correlationData);
 }

@Override
 public void sendOrder(String exchange, String routingKey, Order order) {
 rabbitTemplate.setConfirmCallback(CONFIRM_CALLBACK);
 rabbitTemplate.setReturnCallback(RETURN_CALLBACK);
 CorrelationData correlationData = new CorrelationData();
 correlationData.setId(UUID.randomUUID().toString() + "-" + System.currentTimeMillis());
 rabbitTemplate.convertAndSend(exchange, routingKey, order, correlationData);
 }
}
第一个方法发送到的是message,而第二个方法发送的则是自定义的Java Bean:Order。

测试的发送代码如下所示:

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.hys.springboot.config.RabbitMQConfig;
import com.hys.springboot.entity.Order;
import com.hys.springboot.service.IRabbitSenderService;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

@Autowired
 private IRabbitSenderService rabbitSenderService;

@Test
 public void testSender1() {
 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
 Map<String, Object> properties = new HashMap<>();
 properties.put("number", "12345");
 properties.put("send_time", sdf.format(new Date()));
 rabbitSenderService.send(RabbitMQConfig.TOPIC_EXCHANGE, "topic.user", "Hello World", properties);
 }

@Test
 public void testSender2() {
 Order order = new Order();
 order.setId("001");
 order.setName("订单一");
rabbitSenderService.sendOrder(RabbitMQConfig.DIRECT_EXCHANGE, RabbitMQConfig.DIRECT_ROUTINGKEY, order);
 }
}
2.5 消费者代码

import java.io.IOException;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

import com.hys.springboot.config.RabbitMQConfig;
import com.hys.springboot.entity.Order;
import com.hys.springboot.service.IRabbitReceiverService;
import com.rabbitmq.client.Channel;

@Service
public class RabbitReceiverServiceImpl implements IRabbitReceiverService {

private static final Log logger = LogFactory.getLog(RabbitReceiverServiceImpl.class);

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = RabbitMQConfig.TOPIC_QUEUE, durable = "true"), exchange = @Exchange(value = RabbitMQConfig.TOPIC_EXCHANGE, durable = "true", type = ExchangeTypes.TOPIC), key = RabbitMQConfig.TOPIC_ROUTINGKEY))
 @RabbitHandler
 @Override
 public void receiveTopicMessage(Message<Object> message, Channel channel) throws IOException {
 if (logger.isDebugEnabled()) {
 logger.debug("消费端Payload:" + message.getPayload());
 }
 Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
 //手工ack
 channel.basicAck(deliveryTag, false);
 }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/1859b61d160e88471f5f347723086680.html