代码位于 /vendor/k8s.io/apiserver/pkg/storage/etcd3/store.go
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error { if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 { return errors.New("resourceVersion should not be set on objects to be created") } if err := s.versioner.PrepareObjectForStorage(obj); err != nil { return fmt.Errorf("PrepareObjectForStorage failed: %v", err) } //将资源转换成无版本类型 data, err := runtime.Encode(s.codec, obj) if err != nil { return err } key = path.Join(s.pathPrefix, key) opts, err := s.ttlOpts(ctx, int64(ttl)) if err != nil { return err } //再将资源转换成适合存储的格式 newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key)) if err != nil { return storage.NewInternalError(err.Error()) } startTime := time.Now() //检查资源是否已经存在了 txnResp, err := s.client.KV.Txn(ctx).If( notFound(key), ).Then( //不存在才调用Put把资源存进去 clientv3.OpPut(key, string(newData), opts...), ).Commit() metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime) if err != nil { return err } if !txnResp.Succeeded { return storage.NewKeyExistsError(key, 0) } //转换响应结果 if out != nil { putResp := txnResp.Responses[0].GetResponsePut() return decode(s.codec, s.versioner, data, out, putResp.Header.Revision) } return nil }至此,资源已落库,创建请求已完毕,apiserver也将结果响应给客户端。
小结本篇衔接前一篇apiserver的启动流程,讲述了认证器,授权器,准入控制器如何被配置的,如果根据APIGroupInfo映射好的storage创建处理请求的handler。当一个请求来的时候如何执行认证操作,授权操作,接着经过Mutate准入控制器和Validate准入控制器等一系列校验,最终转换资源的版本,调用Etcd客户端将资源持久化,也将结果响应回给客户端。
如有兴趣,可阅读鄙人“k8s源码之旅”系列的其他文章
kubelet源码分析——kubelet简介与启动
kubelet源码分析——启动Pod
kubelet源码分析——关闭Pod
scheduler源码分析——调度流程
apiserver源码分析——启动流程
apiserver源码分析——处理请求