RocketMQ生产消费模型选择

生产者,根据某个标识将消息放到同一个队列中

public class Producer { public static void main(String[] args) throws MQClientException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("10.130.41.36:9876"); producer.setInstanceName("Producer"); producer.setVipChannelEnabled(false); producer.start(); String[] tags = {"tagA","tagB"}; for (int i = 1; i <= 10; i++) { try { Message msg = new Message("TopicTest",tags[i%tags.length],"key1"+i,("订单一号" + i).getBytes()); SendResult sendResult = producer.send(msg, new SelectMessageQueueByHash(),1); System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); } } for (int i = 1; i <= 10; i++) { try { Message msg = new Message("TopicTest",tags[i%tags.length],"key2"+i,("订单二号" + i).getBytes()); SendResult sendResult = producer.send(msg, new SelectMessageQueueByHash(),2); System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); } } for (int i = 1; i <= 10; i++) { try { Message msg = new Message("TopicTest",tags[i%tags.length],"key3"+i,("订单三号" + i).getBytes()); SendResult sendResult = producer.send(msg, new SelectMessageQueueByHash(),3); System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); } } producer.shutdown(); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpfxfg.html