仓储地址, 如果有用,欢迎点赞,欢迎讨论,欢迎找茬。
需求在开发中,经常遇到一些需要定时任务的场景。各个语言都有定时语言的库,Golang Cron 提供了Crontab Golang语言版本。这个库非常不错,提供最基本的定时任务编排的功能。但是一些复杂需求无法满足,比如
任何定时任务都有可能失败,失败了就panic了,这样非常不友好。最起码能够让我控制,失败是重试还是停止
某些任务执行周期要10s, 而用户设置的5s一执行,我能不能保证任何时间这个任务只执行一次
我想实时的看到任务的状态,比如是不是在运行?下次运行时间?上次运行时间?
我想看到任务执行了多少次,成功了多少次
我想要限制最大任务数量,比如超过10个任务在执行,不运行新的任务执行
任务执行完了可以告诉我逻辑上有错误,还是有结果。我还可以加上一些钩子函数来处理任务执行的结果
以上的需求都非常常见,可惜这个库都不支持^_^.
完全没用的例子复杂定义任务的场景模型抽象出来大概也就是下面几个功能点,这个没用的例子可以很好的体现出来
用户通过接口,告诉后台我要做一个什么定时工作,schedule是什么
查看所有定时任务的状态
查看所有定时任务的工作结果
本地运行 通过以下命令本地运行 go get -u "github.com/OhBonsai/croner" go get -u "github.com/gin-gonic/gin" go get -u "github.com/gorilla/websocket" cd $GOPATH/src/github.com/OhBonsai/croner/example go run server.go # 打开localhost:8000 前端解释原谅我的狗屎前端。怕大家看不懂,我还是解释一下前端各个部分什么意思。
图中①的区域,是计划定义区,可以设置一些参数,表示谁多久往聊天室说一句什么话。第二个表单可以输入1-10的数字,表示每隔几秒说话。当然cron支持六位的crontab周期定义。
图中②的区域,是执行任务状态区,每秒刷新一次
图中3的区域,就是我们的聊天室啦。后台定时任务钩子函数会定时把消息推到channel中,如果websocket服务端收到消息就发送到浏览器
后端逻辑实现定时计划接口func Run() croner.JobRunReturn
type JobS struct { Duration int `json:"duration"` Who string `json:"who"` What string `json:"what"` } func (j JobS) Run() croner.JobRunReturn { return croner.JobRunReturn{ Value: fmt.Sprintf("[%s] %s: %s", time.Now().Format(time.RFC850), j.Who, j.What), } }初始化设置
var manager = croner.NewCronManager(croner.CronManagerConfig{ true, false, 0, 0, })加上钩子函数,如果接收到任务执行结果,将结果传到ch channel
croner.OnJobReturn(func(runReturn *croner.JobRunReturnWithEid) { say := runReturn.Value.(string) ch <- say })每当接受到post请求,就创建一个任务
_, err = manager.Add(fmt.Sprintf("@every %ds", curJob.Duration), curJob, nil)轮询获区ch传过来的值,通过websocket传到前端
for { select { case msg := <-ch: conn.WriteMessage(websocket.TextMessage, []byte(msg)) default: continue } } 实现详细的使用可以查看测试文件,
任务接口任务只要实现run()函数就行啦。这样我就可以包装你这个函数
type JobRunReturn struct { Value interface{} Error error } type JobInf interface { Run() JobRunReturn } 任务失败控制Cron没有失败控制,通过包装run()函数来实现cron的job接口来增加一些逻辑。加上一个defer来恢复panic, 通过设置配置ignorePanic来控制是否忽略错误继续执行,还是发生错误就是STOP
defer func() { j.TotalCount += 1 if err := recover(); err != nil { errString := fmt.Sprintf("WrappedJob-%d %s execute fail. error is %s", j.Id, j.Name, err) println(errString) atomic.StoreUint32(&j.status, FAIL) if !j.father.ignorePanic { j.father.DisActive(j.Id) } j.father.jobReturnsWithEid <- JobRunReturnWithEid{ JobRunReturn{nil, JobRunError{errString}}, j.Id, } } return }() 单任务周期时间只执行一次这个主要靠锁来实现,任务运行时就锁住,直到完成之后才释放
j.running.Lock() defer j.running.Unlock() 任务状态变更通过原子操作来变更任务状态
atomic.StoreUint32(&(j.status), RUNNING) defer atomic.StoreUint32(&(j.status), IDLE) 最大任务数量通过buffered channel来实现最大任务数量
permit = make(chan struct{}, c.PoolSize) permit <- struct{}{} defer func() { <-permit }() 钩子不断获取任务回传结果,然后遍历执行钩子函数
go func(){ for { select { case value := <-r.jobReturnsWithEid: jobReturnHooks.Run(&value) case <-r.stop: return } } }() 缺陷