最近一个外部的项目,利用到了动静行列,原来是用rabbitmq实现的,可是由于是陈设到别人家的处事器上,想只管简化一些,项目华夏来也要接入了redis缓存,就实验利用redis来实现简朴的动静行列。
利用redis做动静行列有两种要领,一种是利用pub/sub,另一种是利用list布局,共同brpop来消费。这两种方法各有特点,这里简述一下:
pub/sub模式,支持多客户端消费,可是不支持耐久化,这就意味着客户端断开的时间内宣布的动静将会全部舍弃掉。
list共同brpop,默认不支持多客户端消费,支持耐久化。这种模式的多客户端消费可以变相实现,好比下面的伪代码:
#第一步push动静到行列 lpush listA msg #第二步,一个专门的分发客户端取出动静,push到各个子行列 var msg=brpop listA lpush listA1 msg lpush listA2 msg ...... #第三步,多个客户端从对应的行列消费动静 var client1_msg= brpop listA1 var client2_msg= brpop listA2 ......
动静丢失不太可取,所以我选择了list ,下一步需要选择一个符合的客户端。
Stackexchange.redis 算是一个老牌的客户端了,可是由于其回收多路复用的模式,没法支持Redis的blocking pops特性。所以我回收了国人写的CSRedisCore。
首先需要在appsettings.json中添加redis的毗连字符串:
{ "ConnectionStrings": { "redis": "{ip}:{port},password=123456,prefix=my_" } }
详细设置请参考github上的文档:https://github.com/2881099/csredis
然后在startup.cs的ConfigureServices中设置redis:
public void ConfigureServices(IServiceCollection services) { //redis设置 RedisHelper.Initialization(new CSRedis.CSRedisClient(Configuration.GetConnectionString("redis"))); }
虽然也可以回收依赖注入的方法添加CSRedisClient实例,这个不纠结。
在项目中有好几处利用到了行列,所以先封装一个消费处事:
public abstract class RedisMQConsumer : BackgroundService { protected abstract string CacheKey { get; } protected ILogger<RedisMQConsumer> logger; public RedisMQConsumer(ILogger<RedisMQConsumer> logger) { this.logger = logger; } protected override Task ExecuteAsync(CancellationToken stoppingToken) { return Task.Run( async() => { while (!stoppingToken.IsCancellationRequested) { try { var msg = RedisHelper.BRPop(5, CacheKey); try { if (string.IsNullOrEmpty(msg)) continue; if (!Process(msg)) { //插手错误处理惩罚行列,可以在靠山写成果手动处理惩罚 RedisHelper.LPush(CacheKey + "_err", msg); } } catch (Exception exp) { //插手错误处理惩罚行列,可以在靠山写成果手动处理惩罚 RedisHelper.LPush(CacheKey + "_err", msg); logger.LogError(exp, "RedisMQConsumer Execute error"); } } catch { //网络大概间断 await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken); } } }, stoppingToken); } protected abstract bool Process(string message); }
然后就可以担任RedisMQConsumer,编写实际逻辑:
public class AddOrderMQConsumer : RedisMQConsumer { public AddOrderMQConsumer(ILogger<RedisMQConsumer> logger) : base(logger) { } protected override string CacheKey => "addOrder"; protected override bool Process(string message) { var order = JsonSerializer.Deserialize<Order>(message); //处理惩罚逻辑 return true; } }
宣布动静只是往行列中添加项:
RedisHelper.LPush("addOrder", order);
最后把消费处事添加到startup.cs中:
public void ConfigureServices(IServiceCollection services) { //redis设置 RedisHelper.Initialization(new CSRedis.CSRedisClient(Configuration.GetConnectionString("redis"))); //redis动静行列消费处事,放在redis设置下方 services.AddHostedService<AddOrderMQConsumer>(); }
经测试,还算不变,小并发项目可以利用。
到此这篇关于Asp.net core中RedisMQ的简朴应用实现的文章就先容到这了,更多相关Asp.net core RedisMQ内容请搜索剧本之家以前的文章或继承欣赏下面的相关文章但愿各人今后多多支持剧本之家!
您大概感乐趣的文章: