MySQL Binlog 同步工具go-mysql-transfer Lua模块使用说明

一、go-mysql-transfer

go-mysql-transfer是一款MySQL实时、增量数据同步工具。能够实时解析MySQL二进制日志binlog,并生成指定格式的消息,同步到接收端。

go-mysql-transfer具有如下特点:

1、不依赖其它组件,一键部署

2、集成多种接收端,如:Redis、MongoDB、Elasticsearch、RabbitMQ、Kafka、RocketMQ,不需要再编写客户端,开箱即用

3、内置丰富的数据解析、消息生成规则;支持Lua脚本,以处理更复杂的数据逻辑

4、支持监控告警,集成Prometheus客户端

5、高可用集群部署

6、数据同步失败重试

7、全量数据初始化

详情及安装说明 请参见: MySQL Binlog 增量同步工具go-mysql-transfer实现详解

项目开源地址:

gitee (速度更快) :go-mysql-transfer
github:go-mysql-transfer

如果此工具对你有帮助,请Star支持下

二、Lua脚本引擎

go-mysql-transfer中使用gopher-lua作为Lua虚拟机,支持Lua5.1规范。Lua作为专业的内置脚本语言,其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。开发者只需要花费少量时间就能大致掌握其用法。

基于Lua的高扩展性,可以实现更为复杂的数据解析、消息生成、数据处理逻辑。

三、json模块

提供json数据格式的序列化和反序列化功能,提供encode和decode两个方法。
使用示例如下:

local json = require("json")   -- 加载json模块
local ops = require("mqOps") --加载mq操作模块local row = ops.rawRow()  --当前数据库的一行数据,table类型,key为列名称
local action = ops.rawAction()  --当前数据库事件,包括:insert、updare、deletelocal id = row["ID"] --获取ID列的值
local userName = row["USER_NAME"] --获取USER_NAME列的值
local password = row["PASSWORD"] --获取USER_NAME列的值
local createTime = row["CREATE_TIME"] --获取CREATE_TIME列的值local result = {}  -- 定义一个table,作为结果
result["id"] = id
result["action"] = actionif action == "delete" -- 删除事件
thenlocal val = json.encode(result) -- 将result转为jsonops.SEND("transfer_test_topic",val) -- 发送消息,第一个参数为topic(string类型),第二个参数为消息内容
else result["userName"] = userNameresult["password"] = passwordresult["createTime"] = createTimeresult["source"] = "binlog" -- 数据来源local val = json.encode(result) -- 将result转为jsonops.SEND("transfer_test_topic",val) -- 发送消息,第一个参数为topic(string类型),第二个参数为消息内容-- local obj = json.decode(val ) -- json反序列化-- print(obj ["createTime"])

四、db(数据库操作)模块

比如我们有角色表(t_role):

IDCODENAMEREMARK
1r1管理员具有所有操作权限
2r2测试员具有测试功能的操作权限

用户表(t_user):

IDUSER_NAMEPASSWORDROLE_CODECREATE_TIME
1admin123456r12020-10-20 22:00:10

我们需要监听t_user表,并向接收端发送如下格式的消息:

{"id": "1","userName": "admin""password": "123456","createTime": 100001,"roleName": "系统管理员","roleRemark": "管理后台相关信息","source": "binlog",}

基于Binlog的数据同步工具,只能监听到一行数据的变更,进行响应。无法像基于SQL的ETL工具那样具有多表连接的能力。如果要得到向上面那样的聚合数据,需要使用dbOps模块,用法如下:

local json = require("json")   -- 加载json模块
local ops = require("mqOps") --加载mq操作模块
local db = require("dbOps") --加载数据库(db)操作模块local row = ops.rawRow()  --当前数据库的一行数据,table类型,key为列名称
-- print(json.encode(row))
local action = ops.rawAction()  --当前数据库事件,包括:insert、updare、deletelocal id = row["ID"] --获取ID列的值
local userName = row["USER_NAME"] --获取USER_NAME列的值
local password = row["PASSWORD"] --获取USER_NAME列的值
local roleCode = row["ROLE_CODE"] --角色编码
local createTime = row["CREATE_TIME"] --获取CREATE_TIME列的值local result = {}  -- 定义一个table,作为结果
result["id"] = id
result["action"] = actionif action == "delete" -- 删除事件
thenlocal val = json.encode(result) -- 将result转为jsonops.SEND("user_topic",val) -- 发送消息,第一个参数为topic(string类型),第二个参数为消息内容
else local sql = string.format("SELECT * FROM ESEAP.T_ROLE WHERE CODE = '%s'",roleCode) -- SQL语句,不能直接使用表名,要使用(数据库名称.表名称),如:ESEAP.T_ROLElocal roleRS = db.selectOne(sql) -- 执行SQL查询,返回一条查询结果,table类型,结构如:{"CODE":"a1","ID":"1","NAME":"系统管理员","REMARK":"管理后台相关信息"}-- print(json.encode(roleRS))local roleName = roleRS["NAME"] --角色名称local roleRemark = roleRS["REMARK"] --角色描述-- local roleListRS = db.select(sql) -- 执行SQL查询,返回多条条查询结果,数组类型,元素为table,结构如:[{"CODE":"a1","ID":"1","NAME":"系统管理员","REMARK":"管理后台相关信息"}]-- print(json.encode(roleListRS))result["userName"] = userNameresult["password"] = passwordresult["createTime"] = createTimeresult["source"] = "binlog" -- 数据来源result["roleName"] = roleNameresult["roleRemark"] = roleRemarklocal val = json.encode(result) -- 将result转为jsonops.SEND("user_topic",val) -- 发送消息,第一个参数为topic(string类型),第二个参数为消息内容
end 

dbOps模块的方法说明:
1、selectOne(sql) 查询一条数据,返回table类型的结果;如果查询不到数据,返回空table;如果查询到多个结果,会出错
2、select(sql) 查询多条数据,返回数组类型的结果,数组元素为tablem(格式如:[table1,table2]);查询不到结果,返回空table;

四、http客户端模块

让go-mysql-transfer具体发送任意http请求的能力,httpOps提供的方法说明:

1、get(url,headers) 发送get请求;url为请求地址;headers为请求头参数,table类型
2、delete(url,headers) 发送delete请求;url为请求地址;headers为请求头参数,table类型
3、post(url,headers,formItems) 发送post请求;url为请求地址;headers为请求头参数,table类型;formItems为表单数据,table类型
4、put(url,headers,formItems) 发送put请求;url为请求地址;headers为请求头参数,table类型;formItems为表单数据,table类型

上面4个方法的返回值为一个table类型的结果,元素"status_code"为http响应状态,Number类型(如:200、401、403、500等);元素body为http响应内容,string类型

httpOps模块具体用法如下:

local json = require("json")   -- 加载json模块
local ops = require("redisOps") --加载redis操作模块
local httpcli = require("httpOps") --加载http操作模块local row = ops.rawRow()  --数据库当前变更的一行数据,table类型,key为列名称
local action = ops.rawAction()  --当前数据库事件,包括:insert、updare、deletelocal _id = row["ID"] --获取ID列的值
local _userName = row["USER_NAME"] --获取USER_NAME列的值
local _password = row["PASSWORD"] --获取USER_NAME列的值
local _createTime = row["CREATE_TIME"] --获取CREATE_TIME列的值
local key = "user_".._id -- 定义keyif action == "insert" -- 插入事件
then-- getlocal url = string.format("http://localhost:9999/http_tests?user_name=%s", userName) local res = httpcli.get(url,{Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="}) -- http get请求,第一个参数为URL,类型为string;第二个参数为header参数,类型为tablelocal status = res.status_code--print(res.status_code)  -- http响应代码,如:200、401、403、500等--print(res.body)-- http响应内容,string类型--local resObj = json.decode(res.body) -- json反序列化响应内容--print(resObj["msg"])-- delete--local url = string.format("http://localhost:9999/http_tests?user_name=%s", userName) --local res = httpcli.delete(url,{--  Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="--}) -- http delete请求,第一个参数为URL,类型为string;第二个参数为header参数,类型为table-- post--local url = "http://localhost:9999/http_tests"--local res = httpcli.post(url,{--  Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="--},{--  id=_id,--  userName=_userName,--  password=_password,--  createTime=_createTime--}) -- http post请求,第一个参数为URL,类型为string;第二个参数为header参数,类型为table;第三个参数为post内容,类型为table--put--local url = "http://localhost:9999/http_tests"--local res = httpcli.put(url,{--  Authorization="Basic OSdjJGRpbjpvcGVuIANlc2SdDE=="--},{--  id=_id,--  userName=_userName,--  password=_password,--  createTime=_createTime--}) -- http put请求,第一个参数为URL,类型为string;第二个参数为header参数,类型为table;第三个参数为post内容,类型为tableif status == 200then ops.SADD("user_set",userName.."|succeed") -- 对应Redis的SADD命令,第一个参数为key(支持string类型),第二个参数为valueelseops.SADD("user_set",userName.."|failed") -- 对应Redis的SADD命令,第一个参数为key(支持string类型),第二个参数为valueendend 
最后编辑于:2024-12-25 21:32:32


喜欢的朋友记得点赞、收藏、关注哦!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/1254.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

灌区闸门自动化控制系统-精准渠道量测水-灌区现代化建设

项目背景 本项目聚焦于黑龙江某一灌区的现代化改造工程,该灌区覆盖广阔,灌溉面积高达7.5万亩,地域上跨越6个乡镇及涵盖17个村庄。项目核心在于通过全面的信息化建设,强力推动节水灌溉措施的实施,旨在显著提升农业用水的…

vue2修改表单只提交被修改的数据的字段传给后端接口

效果: 步骤一、 vue2修改表单提交的时候,只将修改的数据的字段传给后端接口,没有修改得数据不传参给接口。 在 data 对象中添加一个新的属性,用于存储初始表单数据的副本,与当前表单数据进行比较,找出哪些…

LiveNVR监控流媒体Onvif/RTSP常见问题-二次开发接口jquery调用示例如何解决JS|axios调用接口时遇到的跨域问题

LiveNVR二次开发接口jquery调用示例如何解决JS|axios调用接口时遇到的跨域问题 1、接口调用示例2、JS调用遇到跨域解决示例3、axios请求接口遇到跨域问题3.1、post请求3.2、get请求 4、RTSP/HLS/FLV/RTMP拉流Onvif流媒体服务 1、接口调用示例 下面是完整的 jquery 调用示例 $.a…

RTDETR融合[WACV 2024]的MetaSeg中的gmb模块

RT-DETR使用教程: RT-DETR使用教程 RT-DETR改进汇总贴:RT-DETR更新汇总贴 《MetaSeg: MetaFormer-based Global Contexts-aware Network for Efficient Semantic Segmentation》 一、 模块介绍 论文链接:https://arxiv.org/abs/2408.07576 代…

TensorFlow Quantum快速编程(基本篇)

一、TensorFlow Quantum 概述 1.1 简介 TensorFlow Quantum(TFQ)是由 Google 开发的一款具有开创性意义的开源库,它宛如一座桥梁,巧妙地将量子计算与 TensorFlow 强大的机器学习功能紧密融合。在当今科技飞速发展的时代,传统机器学习虽已取得诸多瞩目成就,然而面对日益…

Spring Boot 2 学习全攻略

Spring Boot 2 学习资料 Spring Boot 2 学习资料 Spring Boot 2 学习资料 在当今快速发展的 Java 后端开发领域,Spring Boot 2 已然成为一股不可忽视的强大力量。它简化了 Spring 应用的初始搭建以及开发过程,让开发者能够更加专注于业务逻辑的实现&am…

深度学习笔记11-优化器对比实验(Tensorflow)

🍨 本文为🔗365天深度学习训练营中的学习记录博客🍖 原作者:K同学啊 目录 一、导入数据并检查 二、配置数据集 三、数据可视化 四、构建模型 五、训练模型 六、模型对比评估 七、总结 一、导入数据并检查 import pathlib,…

HBuilderX打包ios保姆式教程

1、登录苹果开发者后台并登录已认证开发者账号ID Sign In - Apple 2、创建标识符(App ID)、证书,描述文件 3、首先创建标识符,用于新建App应用 3-1、App的话直接选择第一个App IDs,点击右上角继续 3-2、选择App&#x…

计算机网络 (39)TCP的运输连接管理

前言 TCP(传输控制协议)是一种面向连接的、可靠的传输协议,它在计算机网络中扮演着至关重要的角色。TCP的运输连接管理涉及连接建立、数据传送和连接释放三个阶段。 一、TCP的连接建立 TCP的连接建立采用三次握手机制,其过程如下&…

2 XDMA IP中断

三种中断 1. Legacy 定义:Legacy 中断是传统的中断处理方式,使用物理中断线(例如 IRQ)来传递中断信号。缺点: 中断线数量有限,通常为 16 条,限制了可连接设备的数量。中断处理可能会导致中断风…

22、PyTorch nn.Conv2d卷积网络使用教程

文章目录 1. 卷积2. python 代码3. notes 1. 卷积 输入A张量为: A [ 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 ] \begin{equation} A\begin{bmatrix} 0&1&2&3\\\\ 4&5&6&7\\\\ 8&9&10&11\\\\ 12&13&14&15 \end{b…

基于微信小程序的汽车销售系统的设计与实现springboot+论文源码调试讲解

第4章 系统设计 一个成功设计的系统在内容上必定是丰富的,在系统外观或系统功能上必定是对用户友好的。所以为了提升系统的价值,吸引更多的访问者访问系统,以及让来访用户可以花费更多时间停留在系统上,则表明该系统设计得比较专…

【ROS2】☆ launch之Python

☆重点 ROS1和ROS2其中一个很大区别之一就是launch的编写方式。在ROS1中采用xml格式编写launch,而ROS2保留了XML 格式launch,还另外引入了Python和YAML 编写方式。选择哪种编写取决于每位开发人员的爱好,但是ROS2官方推荐使用Python方式编写…

Facebook 隐私变革之路:回顾与展望

在数字时代,个人隐私的保护一直是社交平台面临的重大挑战之一。作为全球最大的社交网络平台,Facebook(现为Meta)在处理用户隐私方面的变革,历经了多次调整与完善。本文将回顾Facebook在隐私保护方面的历程,…

P10打卡——pytorch实现车牌识别

🍨 本文为🔗365天深度学习训练营中的学习记录博客🍖 原作者:K同学啊 1.检查GPU from torchvision.transforms import transforms from torch.utils.data import DataLoader from torchvision import datasets import torchvisio…

Pycharm连接远程解释器

这里写目录标题 0 前言1 给项目添加解释器2 通过SSH连接3 找到远程服务器的torch环境所对应的python路径,并设置同步映射(1)配置服务器的系统环境(2)配置服务器的conda环境 4 进入到程序入口(main.py&#…

VS2015 + OpenCV + OnnxRuntime-Cpp + YOLOv8 部署

近期有个工作需求是进行 YOLOv8 模型的 C 部署,部署环境如下 系统:WindowsIDE:VS2015语言:COpenCV 4.5.0OnnxRuntime 1.15.1 0. 预训练模型保存为 .onnx 格式 假设已经有使用 ultralytics 库训练并保存为 .pt 格式的 YOLOv8 模型…

uniApp通过xgplayer(西瓜播放器)接入视频实时监控

🚀 个人简介:某大型国企资深软件开发工程师,信息系统项目管理师、CSDN优质创作者、阿里云专家博主,华为云云享专家,分享前端后端相关技术与工作常见问题~ 💟 作 者:码喽的自我修养&#x1f9…

fast-crud select下拉框 实现多选功能及下拉框数据动态获取(通过接口获取)

教程 fast-crud select示例配置需求:需求比较复杂 1. 下拉框选项需要通过后端接口获取 2. 实现多选功能 由于这个前端框架使用逻辑比较复杂我也是第一次使用,所以只记录核心问题 环境:vue3,typescript,fast-crud ,elementPlus 效果 代码 // crud.tsx文件(/.ts也行 js应…

【Go】:图片上添加水印的全面指南——从基础到高级特性

前言 在数字内容日益重要的今天,保护版权和标识来源变得关键。为图片添加水印有助于声明所有权、提升品牌认知度,并防止未经授权的使用。本文将介绍如何用Go语言实现图片水印,包括静态图片和带旋转、倾斜效果的文字水印,帮助您有…