一 简介
上一篇介绍了zookeeper如何进行分布式协调,这次主要讲解quartz使用zookeeper进行分布式计算,因为上一篇只是讲解原理,而这次实际使用,zookeeperService更改了一部分代码,算是集成优化吧。
系统结构图如下:
上图展示了,两个部分,一个是故障灾难转移集群,一个实现了分片的功能。故障灾难转移集群是quartz自带的功能,就不多说,分片功能是自己实现的。在这里要说下quartz使用故障灾难转移集群的一些配置注意事项:
再者就是netcore不支持remote,这个很重要,所以需要自己做一个web服务接口,但是本实例没有实现,而是仅仅使用数据库去配置和跟踪quartz服务,但是这是主要的。而使用api的一些功能就是实时开启,关闭,监控quartz主机状态,监控分片主机状态。所以大家留意这些功能暂时没有,不过大家在本文学会后很容易就可以自己扩展。
在这里要感谢 github账号为 weizhong1988/ 的一个quartz管理的项目。
当然我这次的案例都是基于linux和mysql,而这个项目是sql server,所以我把sql全部替换了,更改了一些东西。后面会把代码全部放出来。界面如下图
好,下面看代码实现。
二 quartz故障灾难转移和分片功能
首先看结构:
然后看Program入口方法:
var host = new HostBuilder()
.UseEnvironment(EnvironmentName.Development)
.ConfigureAppConfiguration((hostContext, configApp) =>
{
configApp.SetBasePath(Directory.GetCurrentDirectory());
configApp.AddJsonFile(
$"appsettings.{hostContext.HostingEnvironment.EnvironmentName}.json",
optional: true);
configApp.AddEnvironmentVariables("PREFIX_");
configApp.AddCommandLine(args);
var rootConfiguration = configApp.Build();
QuartzOpt = new QuartzOption();
rootConfiguration.GetSection("Quartz").Bind(QuartzOpt); //绑定quartz的配置类的数据
}).ConfigureLogging((hostContext, configBuild) =>
{
configBuild.AddConfiguration(hostContext.Configuration.GetSection("Logging"));
configBuild.AddConsole();
configBuild.AddCustomizationLogger();
})
.ConfigureServices((hostContext, service) =>
{
service.AddKafka(KafkaBuilder =>
{
KafkaBuilder.AddConfiguration(hostContext.Configuration.GetSection("KafkaService"));
});
service.AddZookeeper(zookeeperBuilder =>
{
zookeeperBuilder.AddConfiguration(hostContext.Configuration.GetSection("zookeeperService"));
});
service.AddDbContext<QuartzDbContext>(option =>
option.UseMySQL(hostContext.Configuration.GetConnectionString("QuartzDatabase")), ServiceLifetime.Transient, ServiceLifetime.Transient);
//这个是操作数据库的数据库服务,这个和 quartz的cluster数据提供程序是分开的。
})
.Build();
Host = host;
ILoggerFactory loggerFact = host.Services.GetService<ILoggerFactory>();
LogProvider.SetCurrentLogProvider(new ConsoleLogProvider(loggerFact)); //将框架的日志提供程序,传递给quart的日志接口。
var ischema = RunProgramRunExample(loggerFact); //从数据库构造job的方法
host.WaitForShutdown(); //netcore的通用主机。
ischema.Shutdown(true);//quartz自己的主机。