@Override
public void send(String exchange, String routingKey, Object message, Map<String, Object> properties) {
MessageHeaders messageHeaders = new MessageHeaders(properties);
Message<Object> msg = MessageBuilder.createMessage(message, messageHeaders);
rabbitTemplate.setConfirmCallback(CONFIRM_CALLBACK);
rabbitTemplate.setReturnCallback(RETURN_CALLBACK);
CorrelationData correlationData = new CorrelationData();
correlationData.setId(UUID.randomUUID().toString() + "-" + System.currentTimeMillis());
rabbitTemplate.convertAndSend(exchange, routingKey, msg, correlationData);
}
@Override
public void sendOrder(String exchange, String routingKey, Order order) {
rabbitTemplate.setConfirmCallback(CONFIRM_CALLBACK);
rabbitTemplate.setReturnCallback(RETURN_CALLBACK);
CorrelationData correlationData = new CorrelationData();
correlationData.setId(UUID.randomUUID().toString() + "-" + System.currentTimeMillis());
rabbitTemplate.convertAndSend(exchange, routingKey, order, correlationData);
}
}
第一个方法发送到的是message,而第二个方法发送的则是自定义的Java Bean:Order。
测试的发送代码如下所示:
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import com.hys.springboot.config.RabbitMQConfig;
import com.hys.springboot.entity.Order;
import com.hys.springboot.service.IRabbitSenderService;
@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {
@Autowired
private IRabbitSenderService rabbitSenderService;
@Test
public void testSender1() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
Map<String, Object> properties = new HashMap<>();
properties.put("number", "12345");
properties.put("send_time", sdf.format(new Date()));
rabbitSenderService.send(RabbitMQConfig.TOPIC_EXCHANGE, "topic.user", "Hello World", properties);
}
@Test
public void testSender2() {
Order order = new Order();
order.setId("001");
order.setName("订单一");
rabbitSenderService.sendOrder(RabbitMQConfig.DIRECT_EXCHANGE, RabbitMQConfig.DIRECT_ROUTINGKEY, order);
}
}
2.5 消费者代码
import java.io.IOException;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import com.hys.springboot.config.RabbitMQConfig;
import com.hys.springboot.entity.Order;
import com.hys.springboot.service.IRabbitReceiverService;
import com.rabbitmq.client.Channel;
@Service
public class RabbitReceiverServiceImpl implements IRabbitReceiverService {
private static final Log logger = LogFactory.getLog(RabbitReceiverServiceImpl.class);
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = RabbitMQConfig.TOPIC_QUEUE, durable = "true"), exchange = @Exchange(value = RabbitMQConfig.TOPIC_EXCHANGE, durable = "true", type = ExchangeTypes.TOPIC), key = RabbitMQConfig.TOPIC_ROUTINGKEY))
@RabbitHandler
@Override
public void receiveTopicMessage(Message<Object> message, Channel channel) throws IOException {
if (logger.isDebugEnabled()) {
logger.debug("消费端Payload:" + message.getPayload());
}
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//手工ack
channel.basicAck(deliveryTag, false);
}