RabbitMQ消息发布和消费的确认机制

新公司项目使用的消息队列是RabbitMQ,之前其实没有在实际项目上用过RabbitMQ,所以对它的了解都谈不上入门。趁着周末休息的时间也猛补习了一波,写了两个窗体应用,一个消息发布端和消息消费端。园子里解释RabbitMQ基础的很多了,这里就不对RabbitMQ的基础再做叙述了,来点实际工作中一定会碰到的问题和解决的方案。

RabbitMQ 消息发布确认机制

默认情况下消息发布端执行BasicPublish方法后,消息是否到达指定的队列的结果发布端是未知的。BasicPublish方法的返回值是void。假设我们想对消息进行监控,针对消息发送失败后进行补发则需要一个消息确认机制来帮我们实现。

事务机制

Confirm确认机制

上面是已知可通过RabbitMQ自带的特性实现消息确认机制的两种方式。

事务机制

事务机制依赖三个RabbitMQ提供的方法

txSelect()

txCommit()

txRollback()
看名字大概知道意思了,特别是Commit和Rollback,使用方式和数据库的事务使用几乎一样,txSelect()声明事务的开始,txCommit()提交事务,txRollBack()执行提交失败后的回滚。
使用代码如下:

// 采取RabbitMQ事务方式传输消息 private void SendMessageByTransaction(RabbitMQConnectionDTO mqConnection, string exchangeName, string queueName, string routingKey, EnumRabbitExchangeType exchangeType, string message) { try { ConnectionFactory rabbitMqFactory = new ConnectionFactory() { HostName = mqConnection.HostName, UserName = mqConnection.UserName, Password = mqConnection.Password, Port = mqConnection.Port }; using (IConnection conn = rabbitMqFactory.CreateConnection()) { using (IModel channel = conn.CreateModel()) { channel.ExchangeDeclare(exchangeName, exchangeType.ToString(), durable: true, autoDelete: false, arguments: null); channel.QueueDeclare(queueName, durable: false, autoDelete: false, exclusive: false, arguments: null); // 必须执行QueueBind 需要将routingKey与队列和交换机进行绑定 否则就算事务提交了队列也不会有数据~ channel.QueueBind(queueName, exchangeName, routingKey); byte[] messagebuffer = Encoding.UTF8.GetBytes(message); try { channel.TxSelect(); channel.BasicPublish(exchangeName, routingKey, null, messagebuffer); //if (1 == 1) throw new Exception("没错!我是故意抛出异常的!看看最终队列是否写入了消息~"); channel.TxCommit(); } catch (Exception ex) { Rtx_Receive.Text = Rtx_Receive.Text + $"\r 异常产生时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")},异常信息:{ex.Message}"; channel.TxRollback(); // TODO 进行补发OR其他逻辑处理 } Rtx_Receive.Text = Rtx_Receive.Text + $"\r 发送时间:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}"; } } } catch (Exception ex) { MessageBox.Show($"发送消息失败!{ex.Message}"); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwdysd.html