SpringBoot集成Flink-CDC,实现对数据库数据的监听

一、什么是 CDC ?

CDC 是Change Data Capture(变更数据获取)的简称。 核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、 更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

二、Flink-CDC 是什么?

CDC Connectors for Apache Flink是一组用于Apache Flink 的源连接器,使用变更数据捕获 (CDC) 从不同数据库获取变更。用于 Apache Flink 的 CDC 连接器将 Debezium 集成为捕获数据更改的引擎。所以它可以充分发挥 Debezium 的能力。

大概意思就是,Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、 PostgreSQL等数据库直接读取全量数据和增量变更数据的 source 组件。

Flink-CDC 开源地址: Apache/Flink-CDC

Flink-CDC 中文文档:Apache Flink CDC | Apache Flink CDC

三、SpringBoot 整合 Flink-CDC

3.1、如何集成到SpringBoot中?

Flink 作业通常独立于一般的服务之外,专门编写代码,用 Flink 命令行工具来运行和停止。将Flink 作业集成到 Spring Boot 应用中并不常见,而且一般也不建议这样做,因为Flink作业一般运行在大数据环境中。

然而,在特殊需求下,我们可以做一些改变使 Flink 应用适应 Spring Boot 环境,比如在你的场景中使用 Flink CDC 进行 数据变更捕获。将 Flink 作业以本地项目的方式启动,集成在 Spring Boot应用中,可以使用到 Spring 的便利性。

  • CommandLineRunner
  • ApplicationRunner

3.2、集成举例

1、CommandLineRunner

@SpringBootApplication
public class MyApp {public static void main(String[] args) {SpringApplication.run(MyApp.class, args);}@Beanpublic CommandLineRunner commandLineRunner(ApplicationContext ctx) {return args -> {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder().hostname("localhost").port(3306).username("flinkuser").password("flinkpw").databaseList("mydb") // monitor all tables under "mydb" database.tableList("mydb.table1", "mydb.table2") // monitor only "table1" and "table2" under "mydb" database.deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String.build();DataStreamSource<String> mysqlSource = env.addSource(sourceFunction);// formulate processing logic here, e.g., printing to standard outputmysqlSource.print();// execute the Flink job within the Spring Boot applicationenv.execute("Flink CDC");};}
}

2、ApplicationRunner

@SpringBootApplication
public class FlinkCDCApplication implements ApplicationRunner {public static void main(String[] args) {SpringApplication.run(FlinkCDCApplication.class, args);}@Overridepublic void run(ApplicationArguments args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Configure your Flink job hereDebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder().hostname("localhost").port(3306).username("flinkuser").password("flinkpw").databaseList("mydb")// set other source options ....deserializer(new StringDebeziumDeserializationSchema()) // Converts SourceRecord to String.build();DataStream<String> cdcStream = env.addSource(sourceFunction);// Implement your processing logic here// For example:cdcStream.print();// Start the Flink job within the Spring Boot applicationenv.execute("Flink CDC with Spring Boot");}
}

这次用例采用 ApplicationRunner,不过要改变一下,让 Flink CDC 作为 Bean 来实现。

四、功能实现

4.1、功能逻辑

总体来讲,不太想把 Flink CDC单独拉出来,更想让它依托于一个服务上,彻底当成一个组件。

其中在生产者中,我们将要进行实现:

4.2、所需环境

  • MySQL 5.7 +:确保源数据库已经开启 Binlog 日志功能,并且设置 Row 格式
  • Spring Boot2.7.6:还是不要轻易使用 3.0 以上为好,有好多jar没有适配
  • RabbitMQ:适配即可
  • Flink CDC:特别注意版本

4.3、Flink CDC POM依赖

<flink.version>1.13.6</flink.version><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version>
</dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version>
</dependency>
<!--mysql -cdc-->
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version>
</dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.10</version>
</dependency>
<dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.5</version>
</dependency>
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.10</version>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.42</version>
</dependency>

上面是一些Flink CDC必须的依赖,当然如果需要实现其他数据库,可以替换其他数据库的CDC jar。怎么安排jar包的位置和其余需要的jar,这个可自行调整。

4.4、代码展示

核心类

  • MysqlEventListener:配置类
  • MysqlDeserialization:MySQL消息读取自定义序列化
  • DataChangeInfo:封装的变更对象
  • DataChangeSink:继承一个Flink提供的抽象类,用于定义数据的输出或“下沉”逻辑,sink 是Flink处理流的最后阶段,通常用于将数据写入外部系统,如数据库、文件系统、消息队列等
(1)通过 ApplicationRunner 接入 SpringBoot
@Component
public class MysqlEventListener implements ApplicationRunner {private final DataChangeSink dataChangeSink;public MysqlEventListener(DataChangeSink dataChangeSink) {this.dataChangeSink = dataChangeSink;}@Overridepublic void run(ApplicationArguments args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DebeziumSourceFunction<DataChangeInfo> dataChangeInfoMySqlSource = buildDataChangeSourceRemote();DataStream<DataChangeInfo> streamSource = env.addSource(dataChangeInfoMySqlSource, "mysql-source").setParallelism(1);streamSource.addSink(dataChangeSink);env.execute("mysql-stream-cdc");}private DebeziumSourceFunction<DataChangeInfo> buildDataChangeSourceLocal() {return MySqlSource.<DataChangeInfo>builder().hostname("127.0.0.1").port(3306).username("root").password("0507").databaseList("flink-cdc-producer").tableList("flink-cdc-producer.producer_content", "flink-cdc-producer.name_content")/** initial初始化快照,即全量导入后增量导入(检测更新数据写入)* latest:只进行增量导入(不读取历史变化)* timestamp:指定时间戳进行数据导入(大于等于指定时间错读取数据)*/.startupOptions(StartupOptions.latest()).deserializer(new MysqlDeserialization()).serverTimeZone("GMT+8").build();}
}
(2)自定义 MySQL 消息读取序列化
public class MysqlDeserialization implements DebeziumDeserializationSchema<DataChangeInfo> {public static final String TS_MS = "ts_ms";public static final String BIN_FILE = "file";public static final String POS = "pos";public static final String CREATE = "CREATE";public static final String BEFORE = "before";public static final String AFTER = "after";public static final String SOURCE = "source";public static final String UPDATE = "UPDATE";/*** 反序列化数据,转为变更JSON对象*/@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<DataChangeInfo> collector) {String topic = sourceRecord.topic();String[] fields = topic.split("\.");String database = fields[1];String tableName = fields[2];Struct struct = (Struct) sourceRecord.value();final Struct source = struct.getStruct(SOURCE);DataChangeInfo dataChangeInfo = new DataChangeInfo();dataChangeInfo.setBeforeData(getJsonObject(struct, BEFORE).toJSONString());dataChangeInfo.setAfterData(getJsonObject(struct, AFTER).toJSONString());//5.获取操作类型  CREATE UPDATE DELETEEnvelope.Operation operation = Envelope.operationFor(sourceRecord);
//        String type = operation.toString().toUpperCase();
//        int eventType = type.equals(CREATE) ? 1 : UPDATE.equals(type) ? 2 : 3;dataChangeInfo.setEventType(operation.name());dataChangeInfo.setFileName(Optional.ofNullable(source.get(BIN_FILE)).map(Object::toString).orElse(""));dataChangeInfo.setFilePos(Optional.ofNullable(source.get(POS)).map(x -> Integer.parseInt(x.toString())).orElse(0));dataChangeInfo.setDatabase(database);dataChangeInfo.setTableName(tableName);dataChangeInfo.setChangeTime(Optional.ofNullable(struct.get(TS_MS)).map(x -> Long.parseLong(x.toString())).orElseGet(System::currentTimeMillis));//7.输出数据collector.collect(dataChangeInfo);}private Struct getStruct(Struct value, String fieldElement) {return value.getStruct(fieldElement);}/*** 从元数据获取出变更之前或之后的数据*/private JSONObject getJsonObject(Struct value, String fieldElement) {Struct element = value.getStruct(fieldElement);JSONObject jsonObject = new JSONObject();if (element != null) {Schema afterSchema = element.schema();List<Field> fieldList = afterSchema.fields();for (Field field : fieldList) {Object afterValue = element.get(field);jsonObject.put(field.name(), afterValue);}}return jsonObject;}@Overridepublic TypeInformation<DataChangeInfo> getProducedType() {return TypeInformation.of(DataChangeInfo.class);}
}
(3)封装的变更对象
@Data
public class DataChangeInfo implements Serializable {/*** 变更前数据*/private String beforeData;/*** 变更后数据*/private String afterData;/*** 变更类型 1新增 2修改 3删除*/private String eventType;/*** binlog文件名*/private String fileName;/*** binlog当前读取点位*/private Integer filePos;/*** 数据库名*/private String database;/*** 表名*/private String tableName;/*** 变更时间*/private Long changeTime;}

这里的 beforeData、afterData直接存储Struct 不好吗,还得费劲去来回转?

我曾尝试过使用Struct 存放在对象中,但是无法进行序列化。具体原因可以网上搜索,或者自己尝试一下。

(4)定义 Flink 的 Sink
@Component
@Slf4j
public class DataChangeSink extends RichSinkFunction<DataChangeInfo> {transient RabbitTemplate rabbitTemplate;transient ConfirmService confirmService;transient TableDataConvertService tableDataConvertService;@Overridepublic void invoke(DataChangeInfo value, Context context) {log.info("收到变更原始数据:{}", value);//转换后发送到对应的MQif (MIGRATION_TABLE_CACHE.containsKey(value.getTableName())) {String routingKey = MIGRATION_TABLE_CACHE.get(value.getTableName());//可根据需要自行进行confirmService的设计rabbitTemplate.setReturnsCallback(confirmService);rabbitTemplate.setConfirmCallback(confirmService);rabbitTemplate.convertAndSend(EXCHANGE_NAME, routingKey, tableDataConvertService.convertSqlByDataChangeInfo(value));}}/*** 在启动SpringBoot项目是加载了Spring容器,其他地方可以使用@Autowired获取Spring容器中的类;但是Flink启动的项目中,* 默认启动了多线程执行相关代码,导致在其他线程无法获取Spring容器,只有在Spring所在的线程才能使用@Autowired,* 故在Flink自定义的Sink的open()方法中初始化Spring容器*/@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);this.rabbitTemplate = ApplicationContextUtil.getBean(RabbitTemplate.class);this.confirmService = ApplicationContextUtil.getBean(ConfirmService.class);this.tableDataConvertService = ApplicationContextUtil.getBean(TableDataConvertService.class);}
}
(5)数据转换类接口和实现类
public interface TableDataConvertService {String convertSqlByDataChangeInfo(DataChangeInfo dataChangeInfo);
}@Service
public class TableDataConvertServiceImpl implements TableDataConvertService {@AutowiredMap<String, SqlGeneratorService> sqlGeneratorServiceMap;@Overridepublic String convertSqlByDataChangeInfo(DataChangeInfo dataChangeInfo) {SqlGeneratorService sqlGeneratorService = sqlGeneratorServiceMap.get(dataChangeInfo.getEventType());return sqlGeneratorService.generatorSql(dataChangeInfo);}
}

因为在dataChangeInfo 中我们有封装对象的类型(CREATEDELETEUPDATE),所以我希望通过不同类来进行不同的工作。于是就有了下面的类结构:

根据 dataChangeInfo 的类型去生成对应的SqlGeneratorServiceImpl

这是策略模式还是模板方法?

策略模式(Strategy Pattern)允许在运行时选择算法的行为。在策略模式中,定义了一系列的算法(策略),并将每一个算法封装起来,使它们可以相互替换。策略模式允许算法独立于使用它的客户端进行变化。

InsertSqlGeneratorServiceImpl、UpdateSqlGeneratorServiceImpl 和 DeleteSqlGeneratorServiceImpl 各自实现了 SqlGeneratorService 接口,这确实表明了一种策略。每一个实现类表示一个特定的SQL生成策略,并且可以相互替换,只要它们遵守同一个接口。

模板方法模式(Template Method Pattern),则侧重于在抽象类中定义算法的框架,让子类实现算法的某些步骤而不改变算法的结构。AbstractSqlGenerator 作为抽象类的存在是为了被继承,但如果它不含有模板方法(即没有定义算法骨架的方法),那它就不符合模板方法模式。

在实际应用中,一个设计可能同时结合了多个设计模式,或者在某些情况下,一种设计模式的实现可能看起来与另一种模式类似。在这种情况下,若 AbstractSqlGenerator 提供了更多的共享代码或默认实现表现出框架角色,那么它可能更接近模板方法。而如果 AbstractSqlGenerator 仅仅作为一种接口实现方式,且策略之间可以相互替换,那么这确实更符合策略模式。

值得注意的是,在 TableDataConvertServiceImpl 中,我们注入了一个Map<String, SqlGeneratorService> sqlGeneratorServiceMap,通过它来进行具体实现类的获取。那么他是个什么东西呢?作用是什么呢?为什么可以通过它来获取呢?

@Resource、@Autowired 标注作用于 Map 类型时,如果 Map 的 key 为 String 类型,则 Spring 会将容器中所有类型符合 Map 的 value 对应的类型的 Bean 增加进来,用 Bean 的 id 或 name 作为 Map 的 key。

那么可以看到下面第六步,在进行DeleteSqlGeneratorServiceImpl装配的时候进行指定了名字**@Service(“DELETE”)**,方便通过dataChangeInfo获取。

(6)转换类部分代码
public interface SqlGeneratorService {String generatorSql(DataChangeInfo dataChangeInfo);
}public abstract class AbstractSqlGenerator implements SqlGeneratorService {@Overridepublic String generatorSql(DataChangeInfo dataChangeInfo) {return null;}public String quoteIdentifier(String identifier) {// 对字段名进行转义处理,这里简化为对其加反引号// 实际应该处理数据库标识符的特殊字符return "`" + identifier + "`";}
}@Service("DELETE")
@Slf4j
public class DeleteSqlGeneratorServiceImpl extends AbstractSqlGenerator {@Overridepublic String generatorSql(DataChangeInfo dataChangeInfo) {String beforeData = dataChangeInfo.getBeforeData();Map<String, Object> beforeDataMap = JSONObjectUtils.JsonToMap(beforeData);StringBuilder wherePart = new StringBuilder();for (String key : beforeDataMap.keySet()) {Object beforeValue = beforeDataMap.get(key);if ("create_time".equals(key)){SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");beforeValue = dateFormat.format(beforeValue);}if (wherePart.length() > 0) {// 不是第一个更改的字段,增加逗号分隔wherePart.append(", ");}wherePart.append(quoteIdentifier(key)).append(" = ").append(formatValue(beforeValue));}log.info("wherePart : {}", wherePart);return "DELETE FROM " + dataChangeInfo.getTableName() + " WHERE " + wherePart;}
}

核心代码如上所示,具体实现可自行设计。

五、源码获取

Github:incremental-sync-flink-cdc

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/6058.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2025.1.20——一、[RCTF2015]EasySQL1 二次注入|报错注入|代码审计

题目来源&#xff1a;buuctf [RCTF2015]EasySQL1 目录 一、打开靶机&#xff0c;整理信息 二、解题思路 step 1&#xff1a;初步思路为二次注入&#xff0c;在页面进行操作 step 2&#xff1a;尝试二次注入 step 3&#xff1a;已知双引号类型的字符型注入&#xff0c;构造…

C语言初阶牛客网刷题——HJ73 计算日期到天数转换【难度:简单】

1. 题目描述——HJ73 计算日期到天数转换 牛客网OJ题链接 描述 每一年中都有 12 个月份。其中&#xff0c;1,3,5,7,8,10,12 月每个月有 31 天&#xff1b; 4,6,9,11 月每个月有 30 天&#xff1b;而对于 2 月&#xff0c;闰年时有29 天&#xff0c;平年时有 28 天。 现在&am…

Flutter项目和鸿蒙平台的通信

Flutter项目和鸿蒙平台的通信 前言Flutter和Harmonyos通信MethodChannelBasicMessageChannelEventChannel 前言 大家在使用Flutter开发项目的时候&#xff0c; Flutter提供了Platfrom Channel API来和个个平台进行交互。 Flutter官方目前提供了一下三种方式来和个个平台交互&…

Unity自学之旅04

Unity自学之旅04 Unity自学之旅④&#x1f4dd; 跳跃&#x1f42f; 攻击&#x1f984; GUIGUI前置&#xff0c;显示收集物品数量和角色HpUGUI游戏暂停和重新开始 &#x1f917; 总结归纳 Unity自学之旅④ &#x1f4dd; 跳跃 public class PlayerBehaviorRigid : MonoBehavio…

Node.js——express中间件(全局中间件、路由中间件、静态资源中间件)

个人简介 &#x1f440;个人主页&#xff1a; 前端杂货铺 &#x1f64b;‍♂️学习方向&#xff1a; 主攻前端方向&#xff0c;正逐渐往全干发展 &#x1f4c3;个人状态&#xff1a; 研发工程师&#xff0c;现效力于中国工业软件事业 &#x1f680;人生格言&#xff1a; 积跬步…

10倍数据交付提升 | 通过逻辑数据仓库和数据编织高效管理和利用大数据

数据已经成为企业核心竞争力的关键要素。随着大数据技术的发展&#xff0c;如何高效管理和利用海量的数据&#xff0c;已成为企业在数字化转型过程中面临的重要课题。传统的数据仓库已经不能满足当今企业对数据处理的高效性、灵活性和实时性的需求。在这种背景下&#xff0c;逻…

PHP礼品兑换系统小程序

&#x1f381; 礼品兑换系统&#xff1a;革新企业礼品管理&#xff0c;专属神器来袭&#xff01; &#x1f4bb; 一款专为追求高效与个性化的现代企业量身打造的礼品兑换系统&#xff0c;它基于强大的ThinkPHP框架与前沿的Uniapp技术栈深度融合&#xff0c;不仅完美适配礼品卡…

【玩转全栈】----Django基本配置和介绍

目录 Django基本介绍&#xff1a; Django基本配置&#xff1a; 安装Django 创建项目 创建app 注册app Django配置路由URL Django创建视图 启动项目 Django基本介绍&#xff1a; Django是一个开源的、基于Python的高级Web框架&#xff0c;旨在以快速、简洁的方式构建高质量的Web…

RabbitMQ 高级特性

目录 1.消息确认 1.1 消息确认机制 1.2 手动确认方法 1. 2.1肯定确认 1.2.2 否定确认 1.3 SpringBoot 代码示例 1.3.1 配置确认机制 1.3.2 配置队列,交换机,绑定关系 1.3.3 生产者(向 rabbitmq 发送消息) 1.3.4 消费者(消费队列中的信息) 2.持久性 2.1 交换机…

QT:控件属性及常用控件(3)-----输入类控件(正则表达式)

输入类控件既可以进行显示&#xff0c;也能让用户输入一些内容&#xff01; 文章目录 1.Line Edit1.1 用户输入个人信息1.2 基于正则表达式的文本限制1.3 验证两次输入的密码是否一致1.4 让输入的密码可以被查看 2.Text Edit2.1 输入和显示同步2.1 其他信号出发情况 3.ComboBox…

迅为RK3568开发板篇OpenHarmony实操HDF驱动控制LED-添加内核编译

编译内核时将该 HDF 驱动编译到镜像中&#xff0c;接下来编写驱动编译脚本 Makefile&#xff0c;代码如下所示&#xff1a; 加入编译体系&#xff0c;填加模块目录到 drivers/hdf_core/adapter/khdf/linux/Makefile 文件 更多内容可以关注&#xff1a;迅为RK3568开发板篇OpenHa…

【面试总结】FFN(前馈神经网络)在Transformer模型中先升维再降维的原因

FFN&#xff08;前馈神经网络&#xff09;在Transformer模型中先升维再降维的设计具有多方面的重要原因&#xff0c;以下是对这些原因的总结&#xff1a; 1.目标与动机 高维映射空间&#xff1a;FFN的设计目的是通过一系列线性变换来拟合一个高维的映射空间&#xff0c;而不仅…

从零安装 LLaMA-Factory 微调 Qwen 大模型成功及所有的坑

文章目录 从零安装 LLaMA-Factory 微调 Qwen 大模型成功及所有的坑一 参考二 安装三 启动准备大模型文件 四 数据集&#xff08;关键&#xff09;&#xff01;4.1 Alapaca格式4.2 sharegpt4.3 在 dataset_info.json 中注册4.4 官方 alpaca_zh_demo 例子 999条数据, 本机微调 5分…

【Rabbitmq】Rabbitmq高级特性-发送者可靠性

Rabbitmq发送者可靠性 发送者重连发送者确认1.开启确认机制2.ReturnCallback3.ConfirmCallback MQ的可靠性数据持久化交换机持久化队列持久化消息持久化 Lazy Queue 总结其他文章 Rabbitmq提供了两种发送来保证发送者的可靠性&#xff0c;第一种叫发送者重连&#xff0c;第二种…

计算机网络 (55)流失存储音频/视频

一、定义与特点 定义&#xff1a;流式存储音频/视频是指经过压缩并存储在服务器上的多媒体文件&#xff0c;客户端可以通过互联网边下载边播放这些文件&#xff0c;也称为音频/视频点播。 特点&#xff1a; 边下载边播放&#xff1a;用户无需等待整个文件下载完成即可开始播放…

抖音小程序一键获取手机号

前端代码组件 <button v-if"!isFromOrderList"class"get-phone-btn" open-type"getPhoneNumber"getphonenumber"onGetPhoneNumber">一键获取</button>// 获取手机号回调onGetPhoneNumber(e) {var that this tt.login({f…

论文速读|NoteLLM: A Retrievable Large Language Model for Note Recommendation.WWW24

论文地址&#xff1a;https://arxiv.org/abs/2403.01744 bib引用&#xff1a; misc{zhang2024notellmretrievablelargelanguage,title{NoteLLM: A Retrievable Large Language Model for Note Recommendation}, author{Chao Zhang and Shiwei Wu and Haoxin Zhang and Tong Xu…

Day 15 卡玛笔记

这是基于代码随想录的每日打卡 222. 完全二叉树的节点个数 给你一棵 完全二叉树 的根节点 root &#xff0c;求出该树的节点个数。 完全二叉树 的定义如下&#xff1a;在完全二叉树中&#xff0c;除了最底层节点可能没填满外&#xff0c;其余每层节点数都达到最大值&#x…

【阿里云】使用docker安装nginx后可以直接访问

一、创建目录 mkdir -p config/{cert,conf.d} html logs二、上传nginx.conf的配置文件 user nginx; worker_processes auto;error_log /var/log/nginx/error.log notice; pid /var/run/nginx.pid;events {worker_connections 1024; }http {include /etc/ngin…

在elasticsearch中,document数据的写入流程如何?

本文将为您介绍文档内容是如何写入ES集群中。 数据写入ES集群的流程图如下 流程介绍 用户携带数据发起POST请求指向集群9200端口。9200端口将数据写入请求发给主分片。主分片会对数据进行分片计算分发给具体分片。&#xff08;计算方式&#xff1a;hash % primary_number_sha…