进而调用ServeExec方法:
// ServeExec handles requests to execute a command in a container. After // creating/receiving the required streams, it delegates the actual execution // to the executor. func ServeExec(w http.ResponseWriter, req *http.Request, executor Executor, podName string, uid types.UID, container string, cmd []string, streamOpts *Options, idleTimeout, streamCreationTimeout time.Duration, supportedProtocols []string) { ctx, ok := createStreams(req, w, streamOpts, supportedProtocols, idleTimeout, streamCreationTimeout) if !ok { // error is handled by createStreams return } defer ctx.conn.Close() err := executor.ExecInContainer(podName, uid, container, cmd, ctx.stdinStream, ctx.stdoutStream, ctx.stderrStream, ctx.tty, ctx.resizeChan, 0) if err != nil { if exitErr, ok := err.(utilexec.ExitError); ok && exitErr.Exited() { rc := exitErr.ExitStatus() ctx.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{ Status: metav1.StatusFailure, Reason: remotecommandconsts.NonZeroExitCodeReason, Details: &metav1.StatusDetails{ Causes: []metav1.StatusCause{ { Type: remotecommandconsts.ExitCodeCauseType, Message: fmt.Sprintf("%d", rc), }, }, }, Message: fmt.Sprintf("command terminated with non-zero exit code: %v", exitErr), }}) } else { err = fmt.Errorf("error executing command in container: %v", err) runtime.HandleError(err) ctx.writeStatus(apierrors.NewInternalError(err)) } } else { ctx.writeStatus(&apierrors.StatusError{ErrStatus: metav1.Status{ Status: metav1.StatusSuccess, }}) } }在remotecommandserver.ServeExec中调用了executer.ExecInContainer 方法, 该executer接口的实现是criAdapter, criAdapter只是Runtime的一个wrapper,真正调用的是Runtime.Exec, Runtime是个interface,我们来看下具体在dockershim中的实现:
func (r *streamingRuntime) Exec(containerID string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error { return r.exec(containerID, cmd, in, out, err, tty, resize, 0) } // Internal version of Exec adds a timeout. func (r *streamingRuntime) exec(containerID string, cmd []string, in io.Reader, out, errw io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { container, err := checkContainerStatus(r.client, containerID) if err != nil { return err } return r.execHandler.ExecInContainer(r.client, container, cmd, in, out, errw, tty, resize, timeout) }其中execHandler是在初始化streamRuntime的时候定义的NativeExecHandler, 可以看到是直接调用libdocker api与docker进行交互
// NativeExecHandler executes commands in Docker containers using Docker's exec API. type NativeExecHandler struct{} func (*NativeExecHandler) ExecInContainer(client libdocker.Interface, container *dockertypes.ContainerJSON, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize, timeout time.Duration) error { done := make(chan struct{}) defer close(done) createOpts := dockertypes.ExecConfig{ Cmd: cmd, AttachStdin: stdin != nil, AttachStdout: stdout != nil, AttachStderr: stderr != nil, Tty: tty, } execObj, err := client.CreateExec(container.ID, createOpts) if err != nil { return fmt.Errorf("failed to exec in container - Exec setup failed - %v", err) } // Have to start this before the call to client.StartExec because client.StartExec is a blocking // call :-( Otherwise, resize events don't get processed and the terminal never resizes. // // We also have to delay attempting to send a terminal resize request to docker until after the // exec has started; otherwise, the initial resize request will fail. execStarted := make(chan struct{}) go func() { select { case <-execStarted: // client.StartExec has started the exec, so we can start resizing case <-done: // ExecInContainer has returned, so short-circuit return } kubecontainer.HandleResizing(resize, func(size remotecommand.TerminalSize) { client.ResizeExecTTY(execObj.ID, uint(size.Height), uint(size.Width)) }) }() startOpts := dockertypes.ExecStartCheck{Detach: false, Tty: tty} streamOpts := libdocker.StreamOptions{ InputStream: stdin, OutputStream: stdout, ErrorStream: stderr, RawTerminal: tty, ExecStarted: execStarted, } err = client.StartExec(execObj.ID, startOpts, streamOpts) if err != nil { return err } ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() count := 0 for { inspect, err2 := client.InspectExec(execObj.ID) if err2 != nil { return err2 } if !inspect.Running { if inspect.ExitCode != 0 { err = &dockerExitError{inspect} } break } count++ if count == 5 { klog.Errorf("Exec session %s in container %s terminated but process still running!", execObj.ID, container.ID) break } <-ticker.C } return err }至此整个处理流程就结束了。