如何使用client-go构建pod web shell

代码示例及原理

  • 原理是利用websocket协议实现对pod的exec登录,利用client-go构造与远程apiserver的长连接,将对pod容器的输入和pod容器的输出重定向到我们的io方法中,从而实现浏览器端的虚拟终端的效果
  • 消息体结构如下
type Connection struct {WsSocket    *websocket.Conn // 主websocket连接OutWsSocket *websocket.ConnInChan      chan *WsMessage // 输入消息管道OutChan     chan *WsMessage // 输出消息管道Mutex     sync.Mutex // 并发控制IsClosed  bool // 是否关闭CloseChan chan byte // 关闭连接管道
}
// 消息体
type WsMessage struct {MessageType int    `json:"messageType"`Data        []byte `json:"data"`
}
// terminal的行宽和列宽
type XtermMessage struct {Rows uint16 `json:"rows"`Cols uint16 `json:"cols"`
}
  • 下面需要一个handler来控制终端和接收消息,ResizeEvent用来控制终端变更的事件
type ContainerStreamHandler struct {WsConn      *ConnectionResizeEvent chan remotecommand.TerminalSize
}
  • 为了控制终端,我们需要重写TerminalSize的Next方法,这个方法是client-go定义的接口,如下所示
/*
Copyright 2017 The Kubernetes Authors.Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/package remotecommand// TerminalSize and TerminalSizeQueue was a part of k8s.io/kubernetes/pkg/util/term
// and were moved in order to decouple client from other term dependencies// TerminalSize represents the width and height of a terminal.
type TerminalSize struct {Width  uint16Height uint16
}// TerminalSizeQueue is capable of returning terminal resize events as they occur.
type TerminalSizeQueue interface {// Next returns the new terminal size after the terminal has been resized. It returns nil when// monitoring has been stopped.Next() *TerminalSize
}
  • 我们重写的Next方法如下
func (handler *ContainerStreamHandler) Next() (size *remotecommand.TerminalSize) {select {case ret := <-handler.ResizeEvent:size = &retcase <-handler.WsConn.CloseChan:return nil // 这里很重要, 具体见最后的解释}return
}
  • 当我们从ResizeEvent管道中接收到调整终端大小的事件之后,这个事件会被client-go接收到,源码如下
func (p *streamProtocolV3) handleResizes() {if p.resizeStream == nil || p.TerminalSizeQueue == nil {return}go func() {defer runtime.HandleCrash()encoder := json.NewEncoder(p.resizeStream)for {size := p.TerminalSizeQueue.Next() // 接收到我们的调整终端大小的事件if size == nil {return}if err := encoder.Encode(&size); err != nil {runtime.HandleError(err)}}}()
}
  • 最后我们给出核心的实现(只是一个大概的框架,具体细节有问题可以留言)
func ExecCommandInContainer(ctx context.Context, conn *tty.Connection, podName, namespace, containerName string) (err error) {kubeClient, err := k8s.CreateClientFromConfig([]byte(env.Kubeconfig))if err != nil {return}restConfig, err := clientcmd.RESTConfigFromKubeConfig([]byte(env.Kubeconfig))if err != nil {return}// 构造请求req := kubeClient.CoreV1().RESTClient().Post().Resource("pods").Name(podName).Namespace(namespace).SubResource("exec").VersionedParams(&corev1.PodExecOptions{Command:   []string{"/bin/sh", "-c", "export LANG=\"en_US.UTF-8\"; [ -x /bin/bash ] && exec /bin/bash || exec /bin/sh"},Container: containerName,Stdin:     true,Stdout:    true,Stderr:    true,TTY:       true,}, scheme.ParameterCodec)// 使用spdy协议对http协议进行增量升级 exec, err := remotecommand.NewSPDYExecutor(restConfig, "POST", req.URL())if err != nil {return err}handler := &tty.ContainerStreamHandler{WsConn:      conn,ResizeEvent: make(chan remotecommand.TerminalSize),}// 核心函数,重定向标准输入和输出err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{Stdin:             handler,Stdout:            handler,Stderr:            handler,TerminalSizeQueue: handler,Tty:               true,})return
}
  • 整个函数的核心是StreamWithContext函数,这个函数是client-go的一个方法,接下来我们详细分析一下
// StreamWithContext opens a protocol streamer to the server and streams until a client closes
// the connection or the server disconnects or the context is done.
func (e *spdyStreamExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error {conn, streamer, err := e.newConnectionAndStream(ctx, options)if err != nil {return err}defer conn.Close()panicChan := make(chan any, 1) // panic管道errorChan := make(chan error, 1) // error管道go func() {defer func() {if p := recover(); p != nil {panicChan <- p}}()errorChan <- streamer.stream(conn)}()select {case p := <-panicChan:panic(p)case err := <-errorChan:return errcase <-ctx.Done():return ctx.Err()}
}
  • 这个方法首先初始化了一个连接,然后定义了两个管道,分别接收panic事件和error事件,核心是streamer.stream()方法,接下来我们分析一下这个方法
func (p *streamProtocolV4) stream(conn streamCreator) error {// 创建一个与apiserver的连接传输流if err := p.createStreams(conn); err != nil {return err}// now that all the streams have been created, proceed with reading & copying// 观察流中的错误errorChan := watchErrorStream(p.errorStream, &errorDecoderV4{})// 监听终端调整的事件p.handleResizes()// 将我们的标准输入拷贝到remoteStdin也就是远端的标准输入当中p.copyStdin()var wg sync.WaitGroup// 将远端的标准输出拷贝到我们的标准输出当中p.copyStdout(&wg)p.copyStderr(&wg)// we're waiting for stdout/stderr to finish copyingwg.Wait()// waits for errorStream to finish reading with an error or nilreturn <-errorChan
}
  • 整体逻辑还是很清晰的,具体实现细节看源码吧

OOM 问题

  • 这个功能上线之后,发现内存不断攀升,如下图(出现陡降是因为我重启了服务)
    在这里插入图片描述
  • 使用pprof进行问题排查,在你的main.go文件中加入下面的内容
import _ "net/http/pprof"func main(){go func() {http.ListenAndServe("localhost:6060", nil)}()
}
  • 然后你可以在http://localhost:6060/debug/pprof/中看到下面的页面
    在这里插入图片描述
  • 点击full goroutine stack dump,你会看到goroutine的堆栈存储情况,如下图
    在这里插入图片描述
  • 查看之后发现出现了很多的残留goroutine,出现在Next方法中,如下图
    在这里插入图片描述
  • 这个问题出现的原因是我们重写的Next方法,当断开连接的时候必须要主动返回一个nil,否则会残留一个go func,具体看上面的handleResizes方法中有一个for循环,必须收到一个sizenil,才能跳出此func,一开始我写的方法如下,这样写的话当浏览器退出的时候,是不会给我一个nil的终端调整的事件的
// 错误写法
func (handler *ContainerStreamHandler) Next() (size *remotecommand.TerminalSize) {ret := <-handler.ResizeEvent:size = &retreturn
}
// 正确写法
func (handler *ContainerStreamHandler) Next() (size *remotecommand.TerminalSize) {select {case ret := <-handler.ResizeEvent:size = &retcase <-handler.WsConn.CloseChan:return nil // 当发现管道关闭的时候,主动返回一个nil}return
}

有问题欢迎交流

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/322451.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

路由策略与路由控制

1.路由控制工具 匹配工具1&#xff1a;访问控制列表 &#xff08;1&#xff09;通配符 当进行IP地址匹配的时候&#xff0c;后面会跟着32位掩码位&#xff0c;这32位称为通配符。 通配符&#xff0c;也是点分十进制格式&#xff0c;换算成二进制后&#xff0c;“0”表示“匹配…

element-ui table sortable排序 掉后端接口方式

实例: 官方解释:如果需要后端排序&#xff0c;需将sortable设置为custom&#xff0c;同时在 Table 上监听sort-change事件&#xff0c;在事件回调中可以获取当前排序的字段名和排序顺序&#xff0c;从而向接口请求排序后的表格数据。 1.table上要加 sort-change"sortCha…

15_Scala面向对象编程_访问权限

文章目录 Scala访问权限1.同类中访问2.同包不同类访问3.不同包访问4.子类权限小结 Scala访问权限 知识点概念 private --同类访问private[包名] --包私有&#xff1b; 同类同包下访问protected --同类&#xff0c;或子类 //同包不能访问(default)(public)默认public --公…

Python | Leetcode Python题解之第78题子集

题目&#xff1a; 题解&#xff1a; class Solution:def subsets(self, nums: List[int]) -> List[List[int]]:self.res []self.backtrack([], 0, nums)return self.resdef backtrack(self, sol, index, nums):self.res.append(sol)for i in range(index, len(nums)):self…

物联网实战--平台篇之(四)账户后台交互

目录 一、交互逻辑 二、请求验证码 三、帐号注册 四、帐号/验证码登录 五、重置密码 本项目的交流QQ群:701889554 物联网实战--入门篇https://blog.csdn.net/ypp240124016/category_12609773.html 物联网实战--驱动篇https://blog.csdn.net/ypp240124016/category_12631…

P9422 [蓝桥杯 2023 国 B] 合并数列

P9422 [蓝桥杯 2023 国 B] 合并数列 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) 用队列即可 当两个队列队首&#xff1a;a b &#xff0c;弹出 当a < b&#xff0c;把a加给其后一个元素&#xff0c;弹出a 当b < a&#xff0c;把b加给其后一个元素&#xff0c;弹出…

git 配置相关

问题一&#xff1a;ssh-keygen -t ed25519 -C "Gitee SSH Key" 这个命令中的 ed25519 字符是什么意思&#xff1f; ssh-keygen 是一个用于生成SSH密钥的工具&#xff0c;SSH&#xff08;Secure Shell&#xff09;是一种网络协议&#xff0c;用于加密方式远程登录和其…

Docker使用进阶篇

文章目录 1 前言2 使用Docker安装常用镜像示例2.1 Docker安装RabbitMQ2.2 Docker安装Nacos2.3 Docker安装xxl-job&#xff08;推荐该方式构建&#xff09;2.4 Docker安装redis2.5 Docker安装mysql 3 Docker自定义镜像3.1 Dockerfile的基本结构3.2 Dockerfile指令3.3 自定义JDK镜…

免费思维13招之三:赠品型思维

免费思维13招之三:赠品型思维 这节来学习一下免费模式中的三个子思维——赠品型思维、主副型思维和分级型思维。这三个思维有一个共同的名字又叫——产品型思维。 什么是产品型思维?顾名思义,就是在产品上的商业思维。也就是说,通过某一产品的免费来吸引客户,而后进行其…

redis--安装

简介 官网&#xff1a;RedisInsight - The Best Redis GUI 各个版本官网下载地址&#xff1a;http://download.redis.io/releases/ Redis和Memcached是非关系型数据库也称为NoSQL数据库&#xff0c;MySQL、Mariadb、SQL Server、PostgreSQL Oracle 数据库属于关系型数据 应用…

You don’t have permission.

The document “XXX” could not be saved. You don’t have permission. 1.查看修改了iOS系统库导致的, 根据提示, 进入到"XXX"文件中, 然后commandz回退/取消 2. Xcode 调试遇到的报错&#xff08;持续更新&#xff09;

C语言 函数的嵌套与递归 调用

本文 我们来说函数的嵌套调用和递归调用 在很多大型项目中 我们肯定不可能将所有逻辑都写在一个函数中 肯定要按功能拆解成多个特定的功能函数 函数并不允许嵌套调用&#xff0c;但是 允许在逻辑代码中嵌套调用 所谓函数嵌套调用 就是在一个函数中调用另一个函数&#xff0c;而…

计算机毕业设计Python+Spark知识图谱医生推荐系统 医生门诊预测系统 医生数据分析 医生可视化 医疗数据分析 医生爬虫 大数据毕业设计 机器学习

摘 要 随着我国社会经济发展水平的不断提高&#xff0c;人们的物质生活水平也有了很大的改善&#xff0c;越来越多的人不满足于当前的医疗服务质量&#xff0c;由于地域和空间的限制&#xff0c;医疗资源不平衡&#xff0c;无法实现全民共享。针对当今社会中存在的求医难的问题…

2024-05-08 问AI: 在深度学习中,介绍一下RMSProp 优化器

文心一言 在深度学习中&#xff0c;RMSProp&#xff08;Root Mean Square Propagation&#xff09;优化器是一种常用的优化算法&#xff0c;主要用于神经网络训练的梯度下降算法的变体。它是对Adagrad优化器的一种改进&#xff0c;旨在解决Adagrad中学习率过快下降的问题。 R…

Angular中的路由

Angular中的路由 文章目录 Angular中的路由前言一、创建路由二、创建多个组件路由三、创建子路由四、创建多个组件子路由 前言 在Angular中&#xff0c;路由是用于在不同的视图和组件之间导航的机制。Angular提供了一种强大的路由机制来管理单页应用&#xff08;SPA&#xff0…

SQL 基础 | JOIN 操作介绍

在SQL中&#xff0c;JOIN是一种强大的功能&#xff0c;用于将两个或多个表中的行结合起来&#xff0c;基于相关的列之间的关系。 JOIN操作通常用在SELECT语句中&#xff0c;以便从多个表中检索数据。 以下是几种基本的JOIN类型以及它们的用法&#xff1a; INNER JOIN&#xff1…

Jmeter 中 CSV 如何参数化测试数据并实现自动断言

当我们使用Jmeter工具进行接口测试&#xff0c;可利用CSV Data Set Config配置元件&#xff0c;对测试数据进行参数化&#xff0c;循环读取csv文档中每一行测试用例数据&#xff0c;来实现接口自动化。此种情况下&#xff0c;很多测试工程师只会人工地查看响应结果来判断用例是…

算法-排序

算法稳定性 什么是算法稳定性&#xff1b;假设KiKj&#xff08;1<i<n,1<j<n,i!j&#xff09;,且在排序前的序列中Ri领先Rj&#xff08;i<j&#xff09;。 如果排序后Ri依然领先Rj&#xff0c;则称所用的排序方法是稳定的&#xff1b;反之不稳定&#xff1b; …

Django Admin后台管理:高效开发与实践

title: Django Admin后台管理&#xff1a;高效开发与实践 date: 2024/5/8 14:24:15 updated: 2024/5/8 14:24:15 categories: 后端开发 tags: DjangoAdmin模型管理用户认证数据优化自定义扩展实战案例性能安全 第1章&#xff1a;Django Admin基础 1.1 Django Admin简介 Dj…

【线性代数】俗说矩阵听课笔记

基础解系的概念 31线性相关&#xff0c;线性无关&#xff0c;拓展与证明 n个m维向量在n<m时可能线性相关也可能线性无关&#xff0c;线性无关时可以构成某个m维空间的一组基。m不小于n时&#xff0c;秩小于n则线性相关。 n个m维向量在n>m时可一定线性相关。低维向量一定…