优秀的系统都是根据反馈逐渐完善出来的
上篇文章介绍了我们为了应对安全和多分支频繁测试的问题而开发了一套Alodi系统,Alodi可以通过一个按钮快速构建一套测试环境,生成一个临时访问地址,详细信息可以看这一篇文章:Alodi:为了保密我开发了一个系统
系统上线后,SSH登陆控制台成了一个迫切的需求,Kubernetes的Dashboard控制台虽然有WebSSH的功能,但却没办法跟Alodi系统相结合,决定在Alodi中集成WebSSH的功能,先来看看最后实现的效果吧
涉及技术Kubernetes Stream:接收数据执行,提供实时返回数据流
Django Channels:维持长连接,接收前端数据转给Kubernetes,同时将Kubernetes返回的数据发送给前端
xterm.js:一个前端终端组件,用于模拟Terminal的界面显示
基本的数据流向是:用户 --> xterm.js --> django channels --> kubernetes stream,接下来看看具体的代码实现
Kubernetes StreamKubernetes本身提供了stream方法来实现exec的功能,返回的就是一个WebSocket可以使用的数据流,使用起来也非常方便,代码如下:
from kubernetes import client, config from kubernetes.stream import stream class KubeApi: def __init__(self, namespace='alodi'): config.load_kube_config("/ops/coffee/kubeconfig.yaml") self.namespace = namespace def pod_exec(self, pod, container=""): api_instance = client.CoreV1Api() exec_command = [ "/bin/sh", "-c", 'TERM=xterm-256color; export TERM; [ -x /bin/bash ] ' '&& ([ -x /usr/bin/script ] ' '&& /usr/bin/script -q -c "/bin/bash" /dev/null || exec /bin/bash) ' '|| exec /bin/sh'] cont_stream = stream(api_instance.connect_get_namespaced_pod_exec, name=pod, namespace=self.namespace, container=container, command=exec_command, stderr=True, stdin=True, stdout=True, tty=True, _preload_content=False ) return cont_stream这里的pod name可以通过list_namespaced_pod方法获取,代码如下:
def get_deployment_pod(self, RAND): api_instance = client.CoreV1Api() try: r = api_instance.list_namespaced_pod( namespace=self.namespace, label_selector="app=%s" % RAND ) return True, r except Exception as e: return False, 'Get Deployment: ' + str(e) state, data = self.get_deployment_pod(RAND) pod_name = data.items[0].metadata.namelist_namespaced_pod会列出namespace下所有pod的详细信息,这里传了两个参数,第一个namespace是必须的,表示我们要列出pod的namespace,第二个label_selector非必须,表示可以通过设置的标签过滤namespace下的pod,由于我们在创建的时候给每个deployment都添加了唯一的app=RAND的标签,所以这里可以过滤出来我们项目所对应的pod
一个deployment可能对应多个pod,获取到的data.items包含了所有的pod信息,为一个list列表,可根据需要取到对应pod的name
Django Channels之前有两篇文章详细介绍过Django Channels,不了解的可以先查看:Django使用Channels实现WebSocket--上篇和Django使用Channels实现WebSocket--下篇,最重要的两部分代码如下
routing代码:
from channels.auth import AuthMiddlewareStack from channels.routing import ProtocolTypeRouter, URLRouter from django.urls import path, re_path from medivh.consumers import SSHConsumer application = ProtocolTypeRouter({ 'websocket': AuthMiddlewareStack( URLRouter([ re_path(r'^pod/(?P<name>\w+)', SSHConsumer), ]) ), })正则匹配所有以pod开头的websocket连接,都交由名为SSHConsumer的Consumer处理,Consumer代码如下:
from channels.generic.websocket import WebsocketConsumer from medivh.backends.kube import KubeApi from threading import Thread class K8SStreamThread(Thread): def __init__(self, websocket, container_stream): Thread.__init__(self) self.websocket = websocket self.stream = container_stream def run(self): while self.stream.is_open(): if self.stream.peek_stdout(): stdout = self.stream.read_stdout() self.websocket.send(stdout) if self.stream.peek_stderr(): stderr = self.stream.read_stderr() self.websocket.send(stderr) else: self.websocket.close() class SSHConsumer(WebsocketConsumer): def connect(self): self.name = self.scope["url_route"]["kwargs"]["name"] # kube exec self.stream = KubeApi().pod_exec(self.name) kub_stream = K8SStreamThread(self, self.stream) kub_stream.start() self.accept() def disconnect(self, close_code): self.stream.write_stdin('exit\r') def receive(self, text_data): self.stream.write_stdin(text_data)WebSSH可以看作是一个最简单的websocket长连接,每个连接建立后都是独立的,不会跟其他连接共享数据,所以这里不需要用到Group
当连接建立时通过self.scope获取到url中的name,传给Kubernetes API,同时会新起一个线程不断循环是否有新数据产生,如果有则发送给websocket
当websocket接收到数据就直接写入Kubernetes API,当websocket关闭则会发送个exit命令给Kubernetes
前端页面