public class ReceiveMQ
{
/// <summary>
/// 接收MQ消息
/// </summary>
/// <typeparam></typeparam>
/// <param></param>
/// <param></param>
public static void GetMQ<T>(Func<T,bool> func,string queueName)
{
//创建连接
var connection = ConnectionMQ.Connection();
//创建通道
var channel = connection.CreateModel();
//事件基本消费者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
//接收到消息事件
consumer.Received += (ch, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body);
try
{
var item = JsonConvert.DeserializeObject<T>(message);
func(item);
}
catch (Exception ex)
{
LogHelp.Error(ex);
}
//确认该消息已被消费
channel.BasicAck(ea.DeliveryTag, false);
};
//启动消费者 设置为手动应答消息
channel.BasicConsum、e(queueName, false, consumer);
}
}
4、查看消息
在这里我们发送消息,然后我们去ip+15672看看是否有未消费的消息。
发现有一条未消费的信息,队列名称是Test,正好是我们刚刚发送的消息。
然后我们去消费这一条信息,再次进入ip+15672看看