public class ComsumeRabbitMQHostedService : BackgroundService { private readonly ILogger _logger; private readonly AppSettings _settings; private IConnection _connection; private IModel _channel; public ComsumeRabbitMQHostedService(ILoggerFactory loggerFactory, IOptionsSnapshot<AppSettings> options) { this._logger = loggerFactory.CreateLogger<ComsumeRabbitMQHostedService>(); this._settings = options.Value; InitRabbitMQ(this._settings); } private void InitRabbitMQ(AppSettings settings) { var factory = new ConnectionFactory { HostName = settings.HostName, }; _connection = factory.CreateConnection(); _channel = _connection.CreateModel(); _channel.ExchangeDeclare(_settings.ExchangeName, ExchangeType.Topic); _channel.QueueDeclare(_settings.QueueName, false, false, false, null); _channel.QueueBind(_settings.QueueName, _settings.ExchangeName, _settings.RoutingKey, null); _channel.BasicQos(0, 1, false); _connection.ConnectionShutdown += RabbitMQ_ConnectionShutdown; } protected override Task ExecuteAsync(CancellationToken stoppingToken) { stoppingToken.ThrowIfCancellationRequested(); var consumer = new EventingBasicConsumer(_channel); consumer.Received += (ch, ea) => { var content = System.Text.Encoding.UTF8.GetString(ea.Body); HandleMessage(content); _channel.BasicAck(ea.DeliveryTag, false); }; consumer.Shutdown += OnConsumerShutdown; consumer.Registered += OnConsumerRegistered; consumer.Unregistered += OnConsumerUnregistered; consumer.ConsumerCancelled += OnConsumerConsumerCancelled; _channel.BasicConsume(_settings.QueueName, false, consumer); return Task.CompletedTask; } private void HandleMessage(string content) { _logger.LogInformation($"consumer received {content}"); } private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e) { ... } private void OnConsumerUnregistered(object sender, ConsumerEventArgs e) { ... } private void OnConsumerRegistered(object sender, ConsumerEventArgs e) { ... } private void OnConsumerShutdown(object sender, ShutdownEventArgs e) { ... } private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e) { ... } public override void Dispose() { _channel.Close(); _connection.Close(); base.Dispose(); } }
代码细节就不需要多说了,下面就启动MQ发送程序来模拟消息的发送
同时看我们任务的日志输出
由启动到停止,效果都是符合我们预期的。
下面再来看看Web形式的后台任务是怎么处理的。
Web形式这种模式下的后台任务,其实就是十分简单的了。
我们只要在Startup的ConfigureServices方法里面注册我们的几个后台任务就可以了。
public void ConfigureServices(IServiceCollection services) { services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1); services.AddHostedService<PrinterHostedService2>(); services.AddHostedService<TimerHostedService>(); services.AddHostedService<ComsumeRabbitMQHostedService>(); }
启动Web站点后,我们发了20条MQ消息,再访问了一下Web站点的首页,最后是停止站点。
下面是日志结果,都是符合我们的预期。
可能大家会比较好奇,这三个后台任务是怎么混合在Web项目里面启动的。
答案就在下面的两个链接里。
https://github.com/aspnet/Hosting/blob/2.1.1/src/Microsoft.AspNetCore.Hosting/Internal/WebHost.cs
https://github.com/aspnet/Hosting/blob/2.1.1/src/Microsoft.AspNetCore.Hosting/Internal/HostedServiceExecutor.cs
上面说了那么多,都是在本地直接运行的,可能大家会比较关注这个要怎样部署,下面我们就不看看怎么部署。
部署部署的话,针对不同的情形(web和非web)都有不同的选择。
正常来说,如果本身就是web程序,那么平时我们怎么部署的,就和平时那样部署即可。
花点时间讲讲部署非web的情形。
其实这里的部署等价于让程序在后台运行。
在Linux下面让程序在后台运行方式有好多好多,Supervisor、Screen、pm2、systemctl等。
这里主要介绍一下systemctl,同时用上面的例子来进行部署,由于个人服务器没有MQ环境,所以没有启用消费MQ的后台任务。
先创建一个 service 文件
vim /etc/systemd/system/ghostdemo.service
内容如下: