import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
/**
* RabbitMQ配置类
* @author Robert Hou
* @date 2019年7月1日
*/
@Configuration
public class RabbitMQConfig {
/**
* EXCHANGE名称
*/
public static final String FANOUT_EXCHANGE = "test.fanout";
public static final String DIRECT_EXCHANGE = "test.direct";
public static final String TOPIC_EXCHANGE = "test.topic";
/**
* QUEUE名称
*/
public static final String FANOUT_QUEUE = "test.fanout.queue";
public static final String DIRECT_QUEUE = "test.direct.queue";
public static final String TOPIC_QUEUE = "test.topic.queue";
/**
* ROUTINGKEY名称
*/
public static final String DIRECT_ROUTINGKEY = "direct";
public static final String TOPIC_ROUTINGKEY = "topic.#";
@Bean
public ConnectionFactory connectionFactory(Environment environment) {
String addresses = environment.getProperty("spring.rabbitmq.addresses");
int port = environment.getProperty("spring.rabbitmq.port", Integer.class);
String username = environment.getProperty("spring.rabbitmq.username");
String password = environment.getProperty("spring.rabbitmq.password");
String virtualHost = environment.getProperty("spring.rabbitmq.virtual-host");
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(addresses, port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE, true, false);
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE, true, false);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE, true, false);
}
@Bean
public Queue fanoutQueue() {
return new Queue(FANOUT_QUEUE, true);
}
@Bean
public Queue directQueue() {
return new Queue(DIRECT_QUEUE, true);
}
@Bean
public Queue topicQueue() {
return new Queue(TOPIC_QUEUE, true);
}
@Bean
public Binding fanoutBinding() {
return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
}
@Bean
public Binding directBinding() {
return BindingBuilder.bind(directQueue()).to(directExchange()).with(DIRECT_ROUTINGKEY);
}
@Bean
public Binding topicBinding() {
return BindingBuilder.bind(topicQueue()).to(topicExchange()).with(TOPIC_ROUTINGKEY);
}
}
2.4 生产者代码
import java.util.Map;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import com.hys.springboot.entity.Order;
import com.hys.springboot.service.IRabbitSenderService;
@Service
public class RabbitSenderServiceImpl implements IRabbitSenderService {
private static final Log logger = LogFactory.getLog(RabbitSenderServiceImpl.class);
@Autowired
private RabbitTemplate rabbitTemplate;
final ConfirmCallback CONFIRM_CALLBACK = (correlationData, ack, cause) -> {
if (logger.isDebugEnabled()) {
logger.debug("correlationData:" + correlationData + " ack:" + ack);
}
if (!ack) {
if (logger.isErrorEnabled()) {
logger.error("异常处理");
}
}
};
final ReturnCallback RETURN_CALLBACK = (message, replyCode, replyText, exchange, routingKey) -> {
if (logger.isErrorEnabled()) {
logger.error("replyCode:" + replyCode + " replyText:" + replyText + " exchange:" + exchange + " routingKey:" + routingKey);
}
};