之前写过一篇jenkins+mqtt实现本地构建和远程自动发版_jenkins远程调用和本地调用-CSDN博客
由于本地搭建jenkins实在太费机器了,这次改用云效搭建。不过云效并没有直接发送mqtt的方法,需要编写中转接口。
中转接口采用go-gin框架实现,代码如下
main.go
package mainimport ("encoding/json""fmt"mqtt "github.com/eclipse/paho.mqtt.golang""github.com/gin-gonic/gin""os""time"
)func main() {router := gin.Default()router.POST("/publish/notify", func(c *gin.Context) {obj := struct {App string `json:"app"`Link string `json:"link"`}{}err := c.BindJSON(&obj)if err != nil {fmt.Println(err)} else {fmt.Println(obj)}//把读取到的json透传发送到mqtt服务器Publish(obj.App, obj.Link, "/publish/notify")c.JSON(200, gin.H{"msg": "this is a post msg",})})router.POST("/publish/notifyProd", func(c *gin.Context) {obj := struct {App string `json:"app"`Link string `json:"link"`}{}err := c.BindJSON(&obj)if err != nil {fmt.Println(err)} else {fmt.Println(obj)}//把读取到的json透传发送到mqtt服务器Publish(obj.App, obj.Link, "/publish/notifyProd")c.JSON(200, gin.H{"msg": "this is a post msg",})})// 默认端口是8080,也可以指定端口 r.Run(":80")router.Run(":80")
}func Publish(app string, link string, topic string) {broker := "tcp://broker.emqx.io:1883"clientId := "go-mqtt-client"opts := mqtt.NewClientOptions().AddBroker(broker).SetClientID(clientId)opts.SetUsername("") // 设置用户名opts.SetPassword("") // 设置密码opts.SetCleanSession(true)opts.SetKeepAlive(2 * time.Second)opts.SetDefaultPublishHandler(f)opts.OnConnect = func(c mqtt.Client) {fmt.Println("Connected")}opts.OnConnectionLost = func(c mqtt.Client, e error) {fmt.Println("Disconnected")}c := mqtt.NewClient(opts)if token := c.Connect(); token.Wait() && token.Error() != nil {panic(token.Error())}// 构建JSON消息message := map[string]string{"app": app,"link": link,}jsonMessage, err := json.Marshal(message)if err != nil {fmt.Println("Error marshaling JSON:", err)os.Exit(1)}// 发布消息if token := c.Publish(topic, 0, false, jsonMessage); token.Wait() && token.Error() != nil {fmt.Println(token.Error())os.Exit(1)}// 等待一段时间以确保消息被发送time.Sleep(2 * time.Second)c.Disconnect(250)
}
func f(client mqtt.Client, msg mqtt.Message) {fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
云效调用
在构建制品后选择执行命令
命令内容填写
# input your command here
# 构造 JSON 参数
json_data=$(jq -n --arg app "myapp" --arg link "${artifacts}" '{app: $app, link: $link}')# 打印 JSON 参数(可选)
echo "$json_data"# 发送 HTTP POST 请求
curl -X POST "http://transfer.example.com/publish/notify" \-H "Content-Type: application/json" \-d "$json_data"
将myapp替换成你想改的app名,transfer.example.com改成你部署在公网的中转接口域名或ip
上面用到了${artifacts}参数,需要在云效中添加artifacts=制品名称xxx
之后在部署的服务器上部署发布的客户端
客户端的逻辑跟上一篇文章类似,代码如下
main.go
package mainimport ("bytes""fmt""io/ioutil""log""os""os/exec""os/signal""strings""time""github.com/bitly/go-simplejson"mqtt "github.com/eclipse/paho.mqtt.golang"
)// 全局变量,存储程序启动时的当前工作目录
var baseDir stringvar messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {go handleMessage(client, msg)
}var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {fmt.Println("Connected")
}var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {fmt.Printf("Connect lost: %v", err)
}func main() {// 获取程序启动时的当前工作目录var err errorbaseDir, err = os.Getwd()if err != nil {log.Fatalf("获取当前工作目录失败: %v", err)}fmt.Println("程序启动时的当前工作目录:", baseDir)//合建chanc := make(chan os.Signal)//监听指定信号 ctrl+c killsignal.Notify(c, os.Interrupt, os.Kill)//阻塞直到有信号传入fmt.Println("启动")//执行具体方法initmqtt()//阻塞直至有信号传入s := <-cfmt.Println("退出信号", s)
}func initmqtt() {var broker = "broker.emqx.io"var port = 1883opts := mqtt.NewClientOptions()opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))opts.SetDefaultPublishHandler(messagePubHandler)opts.OnConnect = connectHandleropts.OnConnectionLost = connectLostHandlerclient := mqtt.NewClient(opts)if token := client.Connect(); token.Wait() && token.Error() != nil {panic(token.Error())}sub(client)
}func sub(client mqtt.Client) {topic := "/publish/notify"token := client.Subscribe(topic, 1, nil)token.Wait()fmt.Printf("Subscribed to topic: %s\n", topic)
}// readFile 使用ioutil.ReadFile 直接从文件读取到 []byte中
func readFile(fileName string) string {f, err := ioutil.ReadFile(fileName)if err != nil {log.Printf("读取文件失败:%#v", err)return ""}return string(f)
}// 读取消息的发布模块名和链接
func readIssueModule(issuejson string) (string, string) {buf := bytes.NewBuffer([]byte(issuejson))js, _ := simplejson.NewFromReader(buf)var each_map = make(map[string]interface{})each_map, _ = js.Map()app := each_map["app"].(string)link := each_map["link"].(string)return app, link
}// 根据模块名和模块匹配本地json模块发布
func issueModuleLocalJson(app string, link string, localjson string) {//读本地配置 war包路径 存储命令等fmt.Println(localjson)buf := bytes.NewBuffer([]byte(localjson))js, _ := simplejson.NewFromReader(buf)fmt.Println(js)//获取json字符串中的 数组rows, _ := js.Array()fmt.Println(rows)//遍历rows数组for _, row := range rows {each_map := row.(map[string]interface{})jsonapp := each_map["app"].(string)warpath := each_map["warpath"].(string)warname := each_map["warname"].(string)backpath := each_map["backpath"].(string)stop := each_map["stop"].(string)start := each_map["start"].(string)transfer := each_map["transfer"].(string)unzip := each_map["unzip"].(string)if jsonapp == app {fmt.Println("找到对应模块" + app)// 创建备份目录fmt.Println("创建备份目录" + backpath)exec_shell("mkdir -p " + backpath)// 获取当前时间并格式化timestamp := time.Now().Format("20060102150405")backupDir := fmt.Sprintf("%s/%s/%s", backpath, app, timestamp)// 创建备份子目录fmt.Println("创建备份子目录" + backupDir)err := os.MkdirAll(backupDir, 0755)if err != nil {fmt.Println("创建备份子目录失败:", err)continue}// 改变当前工作目录err = os.Chdir(backupDir)if err != nil {fmt.Println("改变目录失败:", err)continue}pwd, _ := os.Getwd()fmt.Println("当前目录" + pwd)if strings.Contains(link, "&") {link = "'" + link + "'"}fmt.Println("下载文件" + link)exec_shell(transfer + " " + link)fmt.Println("解压文件")exec_shell(unzip)fmt.Println("停止服务")exec_shell(stop)fmt.Println("拷贝文件到对应目录")exec_shell("cp -rf" + " " + warname + " " + warpath)fmt.Println("启动服务")exec_shell(start)fmt.Println("完成本次发布")break}}
}// 阻塞式的执行外部shell命令的函数,等待执行完毕并返回标准输出
func exec_shell(s string) (string, error) {cmd := exec.Command("/bin/bash", "-c", s)var out bytes.Buffercmd.Stdout = &outerr := cmd.Run()checkErr(err)return out.String(), err
}// 错误处理函数
func checkErr(err error) {if err != nil {fmt.Println(err)panic(err)}
}// 处理MQTT消息的函数
func handleMessage(client mqtt.Client, msg mqtt.Message) {fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())//读取发布配置,备份war包,替换war包,重启tomcat或者dockerissuejson := string(msg.Payload())fmt.Println(issuejson)//读本地配置 war包路径 存储命令等localjson := readFile(baseDir + "/" + "jenkinsmqtt.json")//关于json的配置说明/*app: 应用的名称。warpath: 应用的部署路径。warname: 应用的 WAR 文件名。backpath: 备份路径。stop: 停止应用的命令。start: 启动应用的命令。restart: 重启应用的命令。transfer: 下载应用包的命令。unzip: 解压应用包的命令。*/fmt.Println(localjson)//发布模块解析app, link := readIssueModule(issuejson)fmt.Println(app)fmt.Println(link)//模块发布issueModuleLocalJson(app, link, localjson)
}
jenkinsmqtt.json示例
[{"app": "myapp","warpath": "/data/app/myapp/webapps","warname": "myapp.war","backpath": "/data/app/cibak","stop": "docker stop myapp","start": "docker start myapp","restart": "docker restart myapp","transfer": "wget -O myapp.tgz ","unzip": "tar -zxvf myapp.tgz"}
]