Lostash同步Mysql数据到ElasticSearch(二)logstash脚本配置和常见坑点

1. logstash脚本编写(采用单文件对应单表实例)

新建脚本文件夹

cd /usr/local/logstash mkdir sql & cd sql
vim 表名称.conf
#如: znyw_data_gkb_logstash.conf

建立文件夹,保存资源文件更新Id

mkdir -p /data/logstash/data/last_run_metadata

脚本JDBC插件参数说明:

Lostash同步Mysql数据到ElasticSearch(二)logstash脚本配置和常见坑点

增加新脚本步骤:修改statement 查询执行SQL脚本(对应数据库表)可以通过代码工具生成。地址:
增量同步字段,建议使用更新时间字段tracking_column => "updateDate"        tracking_column_type => timestamp(类型成对出现)
修改数据存储文件路径,红色为索引名称,每个脚本对应一个。last_run_metadata_path   => "xxx_last_update_time.txt"      
同步频率(可以通过在线表达式进行生成)在线Cron表达式生成器      

第一步要做的事情,配置logstasht同步脚本文件,内容如下:

input {jdbc {type => "jdbc"# 数据库连接地址jdbc_connection_string => "jdbc:mysql://172.168.9.131:3306/gffp_om?characterEncoding=UTF-8&autoReconnect=true&serverTimezone=CTT"# 数据库连接账号密码jdbc_user => "znyg"jdbc_password => "hTORRp86!7"# MySQL依赖包路径jdbc_driver_library => "/usr/local/logstash/mysql-connector-java-8.0.22.jar"# the name of the driver class for mysqljdbc_driver_class => "com.mysql.cj.jdbc.Driver"# 数据库重连尝试次数connection_retry_attempts => "5"# 判断数据库连接是否可用,默认false不开启jdbc_validate_connection => "true"# 数据库连接可用校验超时时间,默认3600Sjdbc_validation_timeout => "3600"# 开启分页查询(默认false不开启);jdbc_paging_enabled => "true"# 单次分页查询条数(默认100000,若字段较多且更新频率较高,建议调低此值);jdbc_page_size => "500"# statement为查询数据sql,如果sql较复杂,建议配通过statement_filepath配置sql文件的存放路径;# sql_last_value为内置的变量,存放上次查询结果中最后一条数据tracking_column的值,此处即为ModifyTime;# statement_filepath => "mysql/jdbc.sql"statement => "select * from xxx WHERE id>= :sql_last_value"# 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false);lowercase_column_names => false# Value can be any of: fatal,error,warn,info,debug,默认info;sql_log_level => warn## 是否记录上次执行结果,true表示会将上次执行结果的tracking_column字段的值保存到last_run_metadata_path指定的文件中;record_last_run => true# 需要记录查询结果某字段的值时,此字段为true,否则默认tracking_column为timestamp的值;use_column_value => true# 需要记录的字段,用于增量同步,需是数据库字段tracking_column => "id"# Value can be any of: numeric,timestamp,Default value is "numeric"tracking_column_type => numeric# record_last_run上次数据存放位置;last_run_metadata_path => "/data/logstash/data/last_run_metadata/_last_id.txt"# 是否清除last_run_metadata_path的记录,需要增量同步时此字段必须为false;clean_run => false## 同步频率(分 时 天 月 年),默认每分钟同步一次;schedule => "* * * * *"}
}filter {}
output {elasticsearch {# 配置ES集群地址hosts => ["172.168.9.135:9200","172.168.9.136:9200","172.168.9.137:9200"]# 索引名字,必须小写index => ""A# 数据唯一索引(建议使用数据库KeyID)document_id => "%{id}"}
}

第二部要做的事情,根据_mapping.json初始化模版在kibana中执行

PUT /_template/索引名称 (与模板名称保持一致。)

模板内容如下:

PUT /_template/A_temp A

{A "template": "A_temp.json" ,"order": 1,
# 索引配置设置,这里需要注意 分片数,在索引创建的时候使用,这个索引一旦创建了后就不能再修改,想改只能删索引重建了,副本数可以热更新,其他参数可以关闭索引更新内容。"settings": {"number_of_shards": 5,"number_of_replicas": 1,"max_result_window": 65536,"max_inner_result_window": 10000,"translog.durability": "request","translog.sync_interval": "3s","auto_expand_replicas": false,"analysis.analyzer.default.type": "ik_max_word","analysis.search_analyzer.default.type": "ik_smart","shard.check_on_startup": false,"codec": "default","store.type": "niofs"},"mappings": {#这个需要注意一下,主要是做模板验证用的,详细参考ES template模板相关内容。"dynamic": "true","dynamic_date_formats": ["yyyy-MM-dd'T'HH:mm:ss.SSS'Z'","yyyy-MM-dd'T'HH:mm:ss.SSS+0800","yyyy-MM-dd'T'HH:mm:ss'Z'","yyyy-MM-dd'T'HH:mm:ss+0800","yyyy-MM-dd'T'HH:mm:ss","yyyy-MM-dd HH:mm:ss","yyyy-MM-dd"],"properties": {"@timestamp": {"type": "date"},"@version": {"type": "integer"},"_class": {"type": "text","fields": {"keyword": {"type": "keyword","ignore_above": 256}}},#这里的是对应数据库字段,一一对于,格式可以指定,特殊的一些使用,我提供了一个工具,大家在文章中找一下。"id": {"type": "long"}}}
}

这里想重点说几个需要注意的地方:

1.logstash同步脚本中index名称与索引名称、模板名称保持一致 看蓝色的A,三个在上面的模板文件和脚本文件中,注意这三个A是给大家标记用的,文件中不需要。

2.看索引模板的代码中,红色代码块,是需要注意的地方,我在文件中写了注释,注意这是json参数,使用时把注释删掉。

2.数据同步

2.1单表同步

#编写一个单表实例测试,如下启动日志会实时打印,便于调试错误 bin/logstash -f test-new.conf &

注意: ES模板名称与logstash脚本index名称保存一致

Lostash同步Mysql数据到ElasticSearch(二)logstash脚本配置和常见坑点

Lostash同步Mysql数据到ElasticSearch(二)logstash脚本配置和常见坑点

检查_mapping是否与template模板一致,字段类型,长度是否与模板一致(同步完成后在kibana中执行)

如:GET idx_znyw_data_gkb_logstash/_mapping

此时会多出几个模板中没有定义,select查询不包含的字段:

有坑的地方重点标准一下

@timestamp,@version, _class, type(这里有个大坑,type属于logstash,mysql实体不能存在此字段,官方也没有解决方案)

检查数据

检查时间类型等是否与mysql中数据同步一致

脚本同步时配置的时间字段包含具体时间信息的以UTC时间,

与数据库保持一致,一种操作是同步时+8处理,一种是不处理,因为ES这块是UTC时间,ES提供的JAVA版本API也是UTC处理,这块时间会进行转换。

那么需要注意的地方是什么:

+8操作后,所有界面上显示的时间均为转换后时区时间,具体减价看服务所在的时区,界面上显示的时间指的是Kibana或者ES HEAD查询出的时间,那么需要注意的是什么呢,在java中查询时你需要注意这个+8操作。主要是查询参数,但是得到的结果你会发现,还是不准确。

我推荐的操作是怎么操作呢,我建议不要在数据同步时做什么特殊处理,这块特别绕,一会+8,一会不+8,那么怎么操作呢,我建议在VO对象中做格式转换使用注解进行view层的数据转换,给用户显示正确的时间,查询时别用vo对象做反射哦,否则参数也会根据注解进行修改,查询参数就不对了。

Lostash同步Mysql数据到ElasticSearch(二)logstash脚本配置和常见坑点

2.2多文件配置

    修改配置文件,这里还可以修改path.config: “/map/es/soft/logstash-7.6.2/config/myconfig/*.conf”  不推荐这种方式,但是👀注意一下,这是一个大坑,与机制有关,我没有扒代码,只是把自己的推断写下来,logstash 本次任务未执行完成,有异常,会保存当前任务执行状态,包括配置文件内容,类似存储同步ID序列,也就是说,如果批量执行过程中,有任务未完成,此时logstash会把包括配置文件的状态一起保存。此时,想修改批处理为单个执行,修改配置文件后不起作用,即便指定单个文件运行,也会把所有的一起执行,只能把同步脚本做迁移,保留一个来运行,或者删除其他脚本,保留一个,当记录的任务执行成功后,此时会释放,重新加载配置文件,在生成环境中遇到了,修改配置文件不起作用,任务重新执行完,logstash才重新加载配置文件

vim /usr/local/logstash/config/pipelines.yml
追加配置所有脚本文件
– pipeline.id: znyw_data_gkb_logstash  
path.config: “/usr/local/logstash/sql/znyw_data_gkb_logstash.conf”
– pipeline.id: znyw_data_gkb_logstash1  
path.config: “/usr/local/logstash/sql/znyw_data_gkb_logstash1.conf”

启动检查数据同步:

nohup ./bin/logstash &

tail -f /usr/local/logstash/nohup.out  ps#查看执行实时情况(更详细排错)

tail -f /data/logstash/logs/logstash-plain.log  #查看任务执行情况(查看执行结果)

3. 问题及解决方案

3.1″path.data” setting.

异常问题:

Logstash could not be started because there is already another instance using the configured data directory.  If you wish to run multiple instances, you must change the “path.data” setting.

  1. 查看是否存在正在运行的进程

ps aux|grep logstash

kill -9 73803

3.2、数据同步后,ES没有数据

  output.elasticsearch模块的index必须是全小写;

3.3、增量同步后last_run_metadata_path文件内容不改变

  如果lowercase_column_names配置的不是false,那么tracking_column字段配置的必须是全小写。

3.4、提示找不到jdbc_driver_library

2032 com.mysql.jdbc.Driver not loaded.
Are you sure you've included the correct jdbc driver in :jdbc_driver_library?

  检测配置的地址是否正确,如果是linux环境,注意路径分隔符是“/”,而不是“\”。

3.5、数据丢失

  statement配置的sql中,如果比较字段使用的是大于“>”,可能存在数据丢失。   

假设当同步完成后last_run_metadata_path存放的时间为2019-01-30 20:45:30,而这时候新入库一条数据的更新时间也为2019-01-30 20:45:30,那么这条数据将无法同步。   

解决方案:将比较字段使用 大于等于“>=”。

3.6、数据重复更新

  上一个问题“数据丢失”提供的解决方案是比较字段使用“大于等于”,但这时又会产生新的问题。   

假设当同步完成后last_run_metadata_path存放的时间为2019-01-30 20:45:30,而数据库中更新时间最大值也为2019-01-30 20:45:30,那么这些数据将重复更新,直到有更新时间更大的数据出现。  

当上述特殊数据很多,且长期没有新的数据更新时,会导致大量的数据重复同步到ES。   

何时会出现以上情况呢:①比较字段非“自增”;②比较字段是程序生成插入。

解决方案:

①比较字段自增保证不重复或重复概率极小(比如使用自增ID或者数据库的timestamp),这样就能避免大部分异常情况了;

②如果确实存在大量程序插入的数据,其更新时间相同,且可能长期无数据更新,可考虑定期更新数据库中的一条测试数据,避免最大值有大量数据。

3.7 logstash突然报Unable to connect to database. Tried 1 times

检查一下上服务部署的机器用命令:

>telnet 172.168.9.131 3306

若报错显示:MySql Host is blocked because of many connection errors; unblock with ‘mysqladmin flush-hosts’

2、使用mysqladmin flush-hosts 命令清理一下hosts文件(不知道mysqladmin在哪个目录下可以使用命令查找:whereis mysqladmin);

① 在查找到的目录下使用命令修改:/usr/local/bin/mysqladmin flush-hosts -uroot -p

  备注:

    其中端口号,用户名,密码都可以根据需要来添加和修改;

    配置有master/slave主从数据库的要把主库和从库都修改一遍的(我就吃了这个亏明明很容易的几条命令结果折腾了大半天);

    第二步也可以在数据库中进行,命令如下:flush hosts;

4. 最后一定要做的事情

4.1 索引副本数配置

因环境节点数量不同,所以特殊索引副本数量需要设置,如下索引进行动态修改

idx_znyg_devicemodel idx_znyg_devicetype idx_znyg_sysdevtype idx_znyg_sysinfo

在kibana中执行如下代码修改副本数量,参数为副本数量为节点数-1,如下操作还可以修改索引settings相关属性,除了分片数量不能动态修改外,只有副本数量可以热更新,其他属性需要关闭索引后,进行修改。

PUT /idx_znyg_sysinfo/_settings
{
   “index”:{
      “number_of_replicas”: 2
     
   }
}

4.2服务健康状态检查任务配置

crontab -e
0 * * * /bin/sh /opt/logstash/logstash-7.1.1/logstash_health.sh

提供一个索引配置工具,自己写的,包括生产SQL语句,显示表数据量,生成ruby关于时间字段的脚步,批量创建更新ES模板,但是模板需要事先写好。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/141843.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深入理解 Swift 新并发模型中 Actor 的重入(Reentrancy)问题

问题现象 我们知道,Swift 5.5 引入的新并发模型极大简化了并行逻辑代码的开发,更重要的是:使用新并发模型中的 Actor 原语可以大大降低并发数据竞争的可能性。 不过,即便 Actor 有如此神奇之功效,它也不是“万能药”,仍不能防止误用带来的问题。比如:Actor 重入(Reen…

Centos7安装go解释器

Centos7安装go解释器 下载解压go压缩包编辑go变量结果验证 下载解压go压缩包 # 下载 wget -c https://go.dev/dl/go1.20.2.linux-amd64.tar.gz# 解压到指定目录 tar xvf go1.20.2.linux-amd64.tar.gz -C /usr/local/编辑go变量 /etc/profile.d/go.sh # 指定go执行程序位置 e…

【软件测试】资深测试聊,自动化测试分层实践,彻底打通高阶...

目录:导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结(尾部小惊喜) 前言 自动化测试的分层…

spring6概述

spring6 1、概述1.1、Spring是什么?1.2、Spring 的狭义和广义1.3、Spring Framework特点1.4、Spring模块组成1.5、Spring6特点1.5.1、版本要求 2.2、构建模块2.3、程序开发2.3.1、引入依赖2.3.3、创建配置文件2.3.4、创建测试类测试2.3.5、运行测试程序 2.4、程序分…

LeetCode算法二叉树—144. 二叉树的前序遍历

目录 144. 二叉树的前序遍历 - 力扣(LeetCode) 代码: 运行结果: 示例 1: 输入:root [1,null,2,3] 输出:[1,2,3]示例 2: 输入:root [] 输出:[]示例 3&am…

Leetcode 386. 字典序排数

文章目录 题目代码&#xff08;9.22 首刷看解析&#xff09; 题目 Leetcode 386. 字典序排数 代码&#xff08;9.22 首刷看解析&#xff09; 迭代DFS class Solution { public:vector<int> lexicalOrder(int n) {vector<int> ret(n);int number 1;for(int i 0…

可视化工具Datart踩(避)坑指南(4)——丢失的精度

作为目前国内开源版本最好用的可视化工具&#xff0c;Datart无疑是低成本高效率可供二开的可视化神兵利器。当然&#xff0c;免费的必然要付出一些踩坑的代价。本篇我们来讲一讲可视化工具Datart踩&#xff08;避&#xff09;坑指南&#xff08;4&#xff09;之丢失的精度。 版…

麦肯锡:中国生成式AI市场现状和未来发展趋势

本文来自《麦肯锡中国金融业CEO季刊》&#xff0c;版权归麦肯锡所有。该季刊主要围绕生成式AI&#xff08;以下简称“GenAI”&#xff09;主题&#xff0c;通过4大章节共8篇文章&#xff0c;全面深入分析了GenAI对各主要行业的影响、价值链投资机会、中国GenAI市场现状和未来趋…

【QandA C++】内存泄漏、进程地址空间、堆和栈、内存对齐、大小端和判断、虚拟内存等重点知识汇总

目录 内存泄漏 内存模型 、进程地址空间 堆和栈的区别 内存对齐 大端小端及判断 虚拟内存有什么作用 内存泄漏 概念: 是指因为疏忽或错误造成程序未能释放已经不再使用的内存的情况, 内存泄漏并不是指内存在物理上的消失, 而是应用程序分配了某段内存后, 因为设计错误…

Leetcode---363周赛

题目列表 2859. 计算 K 置位下标对应元素的和 2860. 让所有学生保持开心的分组方法数 2861. 最大合金数 2862. 完全子集的最大元素和 一、计算k置为下标对应元素的和 简单题&#xff0c;直接暴力模拟&#xff0c;代码如下 class Solution { public:int sumIndicesWithKS…

什么是关系模型? 关系模型的基本概念

关系模型由IBM公司研究员Edgar Frank Codd于1970年发表的论文中提出&#xff0c;经过多年的发展&#xff0c;已经成为目前最常用、最重要的模型之一。 在关系模型中有一些基本的概念&#xff0c;具体如下。 (1)关系(Relation)。关系一词与数学领域有关&#xff0c;它是集合基…

第二届全国高校计算机技能竞赛——Java赛道

第二届全国高校计算机技能竞赛——Java赛道 小赛跳高 签到题 import java.util.*; public class Main{public static void main(String []args) {Scanner sc new Scanner(System.in);double n sc.nextDouble();for(int i 0; i < 4; i) {n n * 0.9;}System.out.printf(&…

Jenkins+Allure+Pytest的持续集成

一、配置 allure 环境变量 1、下载 allure是一个命令行工具&#xff0c;可以去 github 下载最新版&#xff1a;https://github.com/allure-framework/allure2/releases 2、解压到本地 3、配置环境变量 复制路径如&#xff1a;F:\allure-2.13.7\bin 环境变量、Path、添加 F:\a…

Linux系统离线安装Python

目录 一、简介 二、前提准备 三、下载Python源码 四、将离线python包传输到Linux主机 五、编译以及创建软链接 一、简介 由于工作原因&#xff0c;我们经常会在内网环境下使用Linux&#xff0c;不过这样会让我们安装一些软件变得困难&#xff0c;例如需要安装Python。虽然…

解决Pycharm使用Conda激活环境失败的问题

Q:公司电脑终端使用powershell来激活conda环境时报错? 同时手动打开powershell报"profile.ps1” 无法被加载的错误 A: 1,手动打开powershell&#xff0c;设置管理员打开 2,打开powershell 打开 PowerShell 终端&#xff0c;并输入以下命令&#xff1a;Get-ExecutionPo…

气导耳机是什么样的?盘点五款好用的气传导耳机分享

​气传导耳机在运动、户外、办公等场景中具有独特的优势。然而&#xff0c;面对市场上琳琅满目的气传导耳机产品&#xff0c;很多用户不知如何下手。接下来&#xff0c;我将推荐市面上热销火爆&#xff0c;并性能出色、性价比高的气传导耳机给大家&#xff0c;希望大家都能选到…

项目开发过程中,成员提离职,怎么办?

之前写过一篇《如何应对核心员工提离职》反响特别好&#xff0c;今天做个延展篇&#xff0c;在项目过程中&#xff0c;员工突然提离职&#xff0c;我们有什么办法让项目按时按质的上线。 项目做多了&#xff0c;总会碰到这种情况。这里给大家介绍一个解决项目问题的分析方法&a…

Hbuilder本地调试微信H5项目(二)--添加UView框架插件

摘要 在一个已创建的Hbuilder项目中&#xff0c;添加uView框架插件 前置准备 已安装Hbuilder 已创建uni-app的H5默认模板项目 实现逻辑 在Hbuilder官网找到组件说明页面 下载插件并导入HbuilderX 具体实现 访问网站 访问网址Hbuilder的uView1.8.6版本说明页 或者访问…

@Cacheable 注解(指定缓存位置)

一、Cacheable的作用 1、缓存使用步骤&#xff1a;Cacheable这个注解&#xff0c;用它就是为了使用缓存的。所以我们可以先说一下缓存的使用步骤&#xff1a; 1、开启基于注解的缓存&#xff0c;使用 EnableCaching 标识在 SpringBoot 的主启动类上。 2、标注缓存注解即可 使用…

JVM

JVM JVM设计的初心&#xff0c;是为了让java程序员感知不到系统层面的一些内容&#xff0c;让程序员只关注业务逻辑&#xff0c;不关注底层的实现细节。后来有一本书叫《深入了解java虚拟机》&#xff0c;这本书里面讨论了一些jvm话题&#xff0c;于是就掀起了jvm潮流。 1.JV…