关于MQ那些事 (3)

  其实这个问题是第一个问题的扩展,换而言之,我们要保证可靠性传输,其实就是保证防止生产者弄丢数据、消息队列弄丢数据、消费者弄丢数据而已

  其实这些问题早在中间件开发者已经考虑到了,也提供了一些可配置的文件给我们自行设定相关参数,消息队列一般都会持久化到磁盘这个不用担心,然后生产者数据丢失的话可以尝试重新发送,消费者丢的的话一般都是采用了自动确认消息模式导致消费信息被删,只要修改为手动确认就行了

  3.如何保证从消息队列里拿到的数据按顺序执行?

  通过算法,将需要保持先后顺序的消息放到同一个消息队列中,然后只用一个消费者去消费该队列。

  4.如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,怎么解决?

    1.定期删除过期的数据

    2.RejectNewEvents 拒绝新来的消息,向Producer返回RejectNewEvents错误码。

    3.按照特定策略丢弃已有消息

  5.数据是通过push还是pull方式给到消费端,各自有什么弊端?

Push模型实时性好,但是因为状态维护等问题,难以应用到消息中间件的实践中。

Pull模式实现起来会相对简单一些,但是实时性取决于轮训的频率,在对实时性要求高的场景不适合使用。

 

3.如何使用MQ(以ActiveQM为例)

  附上官网:

  附上启动服务访问地址::8161/admin/   用户名/密码 admin/admin

  附上代码,jar包自己下 https://pan.baidu.com/s/1SUBoypW-w_--KeFj_HsOtg

  发布订阅模式

  生产者-发布

public class JMSProducer { private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名 private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码 private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址 private static final int SENDNUM=10; // 发送的消息数量 public static void main(String[] args) { ConnectionFactory connectionFactory; // 连接工厂 Connection connection = null; // 连接 Session session; // 会话 接受或者发送消息的线程 Destination destination; // 消息的目的地 MessageProducer messageProducer; // 消息生产者 // 实例化连接工厂 connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL); try { connection=connectionFactory.createConnection(); // 通过连接工厂获取连接 connection.start(); // 启动连接 session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session // destination=session.createQueue("FirstQueue1"); // 创建消息队列 destination=session.createTopic("FirstTopic1"); messageProducer=session.createProducer(destination); // 创建消息生产者 sendMessage(session, messageProducer); // 发送消息 session.commit(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } finally{ if(connection!=null){ try { connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } /** * 发送消息 * @param session * @param messageProducer * @throws Exception */ public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{ for(int i=0;i<JMSProducer.SENDNUM;i++){ TextMessage message=session.createTextMessage("ActiveMQ 发送的消息"+i); System.out.println("发送消息:"+"ActiveMQ 发布的消息"+i); messageProducer.send(message); } } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wppxxy.html