接着开始执行
ctx, cancel := context.WithTimeout(context.Background(), c.Duration("timeout")) defer cancel() ctx = interrupt.WithContext(ctx) return pipeline.New(config, pipeline.WithContext(ctx), pipeline.WithLogger(defaultLogger), pipeline.WithTracer(defaultTracer), pipeline.WithEngine(engine), ).Run()其中pipeline.NEW创建了Runtime对象;
type Runtime struct { err error // 错误信息 spec *backend.Config // 配置信息 engine backend.Engine // docker engine started int64 // 开始时间 ctx context.Context tracer Tracer logger Logger }其中Engine,操作容器的interface,目前仅docker可用。
// Engine defines a container orchestration backend and is used // to create and manage container resources. type Engine interface { // Setup the pipeline environment. Setup(context.Context, *Config) error // Start the pipeline step. Exec(context.Context, *Step) error // Kill the pipeline step. Kill(context.Context, *Step) error // Wait for the pipeline step to complete and returns // the completion results. Wait(context.Context, *Step) (*State, error) // Tail the pipeline step logs. Tail(context.Context, *Step) (io.ReadCloser, error) // Destroy the pipeline environment. Destroy(context.Context, *Config) error }关注Run:
// Run starts the runtime and waits for it to complete. func (r *Runtime) Run() error { // 延迟函数,用于销毁docker env defer func() { r.engine.Destroy(r.ctx, r.spec) }() // 初始化docker engine r.started = time.Now().Unix() if err := r.engine.Setup(r.ctx, r.spec); err != nil { return err } // 依次运行stage for _, stage := range r.spec.Stages { select { case <-r.ctx.Done(): return ErrCancel // 执行 case err := <-r.execAll(stage.Steps): if err != nil { r.err = err } } } return r.err }重点在于使用errgroup.Group通过协程方式运行step:
// 执行所有steps func (r *Runtime) execAll(procs []*backend.Step) <-chan error { var g errgroup.Group done := make(chan error) // 遍历执行step for _, proc := range procs { // 协程 exec proc := proc g.Go(func() error { return r.exec(proc) }) } go func() { done <- g.Wait() close(done) }() return done } // 执行单个step func (r *Runtime) exec(proc *backend.Step) error { switch { case r.err != nil && proc.OnFailure == false: return nil case r.err == nil && proc.OnSuccess == false: return nil } // trace日志 if r.tracer != nil { state := new(State) state.Pipeline.Time = r.started state.Pipeline.Error = r.err state.Pipeline.Step = proc state.Process = new(backend.State) // empty if err := r.tracer.Trace(state); err == ErrSkip { return nil } else if err != nil { return err } } // docker engine执行 if err := r.engine.Exec(r.ctx, proc); err != nil { return err } // 记录日志信息 if r.logger != nil { rc, err := r.engine.Tail(r.ctx, proc) if err != nil { return err } go func() { r.logger.Log(proc, multipart.New(rc)) rc.Close() }() } if proc.Detached { return nil } // 等待docker engine执行完成 wait, err := r.engine.Wait(r.ctx, proc) if err != nil { return err } if r.tracer != nil { state := new(State) state.Pipeline.Time = r.started state.Pipeline.Error = r.err state.Pipeline.Step = proc state.Process = wait if err := r.tracer.Trace(state); err != nil { return err } } if wait.OOMKilled { return &OomError{ Name: proc.Name, Code: wait.ExitCode, } } else if wait.ExitCode != 0 { return &ExitError{ Name: proc.Name, Code: wait.ExitCode, } } return nil }作者:Jadepeng
出处:jqpeng的技术记事本--
您的支持是对博主最大的鼓励,感谢您的认真阅读。
本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。