Java连接RabbitMQ实例(2)

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

/**
 * RabbitMQ配置类
* @author Robert Hou
 * @date 2019年7月1日
*/
@Configuration
public class RabbitMQConfig {

/**
 * EXCHANGE名称
*/
public static final String FANOUT_EXCHANGE = "test.fanout";
 public static final String DIRECT_EXCHANGE = "test.direct";
 public static final String TOPIC_EXCHANGE = "test.topic";
 /**
 * QUEUE名称
*/
public static final String FANOUT_QUEUE = "test.fanout.queue";
 public static final String DIRECT_QUEUE = "test.direct.queue";
 public static final String TOPIC_QUEUE = "test.topic.queue";
 /**
 * ROUTINGKEY名称
*/
public static final String DIRECT_ROUTINGKEY = "direct";
 public static final String TOPIC_ROUTINGKEY = "topic.#";

@Bean
 public ConnectionFactory connectionFactory(Environment environment) {
 String addresses = environment.getProperty("spring.rabbitmq.addresses");
 int port = environment.getProperty("spring.rabbitmq.port", Integer.class);
 String username = environment.getProperty("spring.rabbitmq.username");
 String password = environment.getProperty("spring.rabbitmq.password");
 String virtualHost = environment.getProperty("spring.rabbitmq.virtual-host");
 CachingConnectionFactory connectionFactory = new CachingConnectionFactory(addresses, port);
 connectionFactory.setUsername(username);
 connectionFactory.setPassword(password);
 connectionFactory.setVirtualHost(virtualHost);
 connectionFactory.setPublisherConfirms(true);
 return connectionFactory;
 }

@Bean
 public FanoutExchange fanoutExchange() {
 return new FanoutExchange(FANOUT_EXCHANGE, true, false);
 }

@Bean
 public DirectExchange directExchange() {
 return new DirectExchange(DIRECT_EXCHANGE, true, false);
 }

@Bean
 public TopicExchange topicExchange() {
 return new TopicExchange(TOPIC_EXCHANGE, true, false);
 }

@Bean
 public Queue fanoutQueue() {
 return new Queue(FANOUT_QUEUE, true);
 }

@Bean
 public Queue directQueue() {
 return new Queue(DIRECT_QUEUE, true);
 }

@Bean
 public Queue topicQueue() {
 return new Queue(TOPIC_QUEUE, true);
 }

@Bean
 public Binding fanoutBinding() {
 return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
 }

@Bean
 public Binding directBinding() {
 return BindingBuilder.bind(directQueue()).to(directExchange()).with(DIRECT_ROUTINGKEY);
 }

@Bean
 public Binding topicBinding() {
 return BindingBuilder.bind(topicQueue()).to(topicExchange()).with(TOPIC_ROUTINGKEY);
 }
}
2.4 生产者代码

import java.util.Map;
import java.util.UUID;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import com.hys.springboot.entity.Order;
import com.hys.springboot.service.IRabbitSenderService;

@Service
public class RabbitSenderServiceImpl implements IRabbitSenderService {

private static final Log logger = LogFactory.getLog(RabbitSenderServiceImpl.class);
 @Autowired
 private RabbitTemplate rabbitTemplate;
 final ConfirmCallback CONFIRM_CALLBACK = (correlationData, ack, cause) -> {
 if (logger.isDebugEnabled()) {
 logger.debug("correlationData:" + correlationData + " ack:" + ack);
 }
 if (!ack) {
 if (logger.isErrorEnabled()) {
 logger.error("异常处理");
 }
 }
 };
final ReturnCallback RETURN_CALLBACK = (message, replyCode, replyText, exchange, routingKey) -> {
if (logger.isErrorEnabled()) {
 logger.error("replyCode:" + replyCode + " replyText:" + replyText + " exchange:" + exchange + " routingKey:" + routingKey);
 }
 };

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/1859b61d160e88471f5f347723086680.html