【微服务】mysql + elasticsearch数据双写设计与实现

目录

一、前言

二、为什么使用mysql+es双写

2.1 单用mysql的问题

2.2 为什么不直接使用es

2.2.1 非关系型表达

2.2.2 不支持事务

2.2.3 多字段将造成性能低下

三、mysql+es双写方案设计要点

3.1 全新设计 VS 中途调整架构

3.2 全表映射 VS 关键字段存储

3.2.1 最大程度发挥es性能

3.2.2 选择mysql还是es作为数据托底

3.3 数据一致性保障

3.3.1 同步双写

3.3.2 异步双写

3.3.3 定期同步

3.3.4 数据订阅

四、mysql+es双写方案数据迁移

4.1 数据迁移整体方案

4.1.1 创建索引

4.1.2 双写改造

4.1.3 数据迁移

4.1.4 搜索服务上线

4.2 数据迁移补充说明

五、方案实施

5.1 前置准备

5.1.1 搭建环境

5.1.2 创建数据表

5.1.3 插入初始化数据

5.1.4 创建一个索引

5.2 搭建springboot工程

5.2.1 引入基础依赖

5.2.2 核心配置文件

5.2.3 es客户端连接配置

5.2.3 mybatis文件

5.2.4 业务实现类

5.2.4 相关测试

5.3 双写业务实现

5.4 数据搜索

5.5 数据迁移

六、写在文末


一、前言

在很多电商网站中,对商品的搜索要求很高,主要体现在页面快速响应搜索结果。这就对服务端接口响应速度提出了很高的要求。而商品数据存储离不开mysql,在高并发场景下,尤其是数据规模达到一定量级,mysql的性能瓶颈一定会出现,为了满足极致的搜索速度,往往需要借助第三方存储,比如nosql数据库,当然主流的搭配还是使用搜索引擎来完成,于是在很多场景下,会选择mysql+elasticsearch来满足这个场景下对搜索的要求。

如下是一个典型的使用mysql+es实现数据双写的应用场景。

二、为什么使用mysql+es双写

2.1 单用mysql的问题

在很多互联网项目中,mysql数据库仍然是主流,毕竟关系型数据库可以处理现实场景中很多复杂的业务模型,但是mysql随着数据规模的增长,一旦单表数据量达到了千万级,性能将下降的很快,于是不得不进行数据库的扩展,这样也带来了架构上的复杂性,综合来说,在类似某宝,某东等这样的电商场景下,单表存储数据带来的问题主要如下:

  • 单表数据承载有限,当数据规模超过千万就要考虑分库或分表,从而给数据库架构设计提出新的挑战;

  • mysql不适合全文检索,经管mysql从某个版本支持了全文检索,但是在实际使用中性能很弱;

  • mysql的模糊匹配无法满足多场景下的复杂的搜索要求,比如电商场景下,多维度任意组合搜索是很常用得,而复杂的搜索将会使得mysql性能急剧下降;

2.2 为什么不直接使用es

到这里也会有人提出疑问,既然es搜索速度如此高效,并且也可以存储数据,直接使用es存储mysql表中的数据不就行了。对于这个问题,主要从下面几点进行考虑,

2.2.1 非关系型表达

使用mysql进行数据库设计的一个好处就是,不同的表之间可以通过某个字段进行关联,关联关系的存在,让现实中复杂的业务模型通过表关联进行实现,而es则不支持不同索引之间的关联搜索。

2.2.2 不支持事务

mysql事务的存在,让数的写入完整性得到保障,而es是不支持事务的,这就导致在往es写数据时,数据的一致性需要通过其他的手段来保障。

2.2.3 多字段将造成性能低下

上面谈到,由于es不支持关联查询,实际业务中,一个页面展现的数据往往来自多张表的关联聚合查询结果,es为了达到与mysql同样的效果,只好尽可能在一个索引中冗余更多的字段,从es存储的角度来说,es是基于字段的,大行超多字段将会大大降低性能,同时也会导致后续数据的维护困难和复杂性。

三、mysql+es双写方案设计要点

在很多开发者看来,使用mysql+es双写的方案,就是把某个高频搜索的表的数据存储一份到es就可以了,这么理解倒也不错,不过还缺少很多深入的考虑。在正式开始设计方案之前,需要重点考虑下面几点,这也将是本文后续探讨的重点,以及在实际开发中需要关注的。

3.1 全新设计 VS 中途调整架构

这是一个很现实的摆在很多架构设计者面前的问题,为什么这么说呢,在很多企业的项目中,经历了从单体架构到微服务的改造,从简单的http调用,webservice调用到使用dubbo等服务治理的技术改造。

如果现在的你正在经历一个全新的项目,那么恭喜你,你可以拥有更多的技术选型空间,但是如果你正则经历项目的服务化改造,这个过程可能比较痛苦,不仅要考虑引入新技术的成本,更要考虑新技术的实现会给未来技术的演进带来何种影响,包括团队学习、维护成本,上线后的运维成本,与其他技术的融合成本等。

回到上面的问题,在使用mysql+es双写方案来说,同样会面临相同的难题,如果是全新的设计,主要考虑的是如何实现mysql与es双写数据的一致性,及如何基于团队成员现有的技术、业务上对双写数据实时性等方面,评估出以最低成本的实现方案即可。

而如果是中途更换设计方案,比如线上的数据规模已经达到千万量级,顶不住客户的压力到了不得不调整架构的阶段来考虑这个问题,这个过程将会拉得很长。此时你考虑的点会更多了,包括:

  • 如何设计es索引?

  • 如何基于现有的代码实现数据双写并且尽可能降低对现有逻辑的侵入性?

  • 如何保障双写数据的一致性?

  • 针对历史数据如何迁移?

  • 如何减少生产上线后的实施成本和运维成本?

  • ...

3.2 全表映射 VS 关键字段存储

使用过mysql的同学应该不陌生,mysql是行式存储数据,而es中,数据则以准json的结构存储,两者之间经管能够通过字段进行对应,但在检索的时候原理是不一样的,如下图所示。

在实际使用mysql+es进行双写方案设计时,很多人直接就认为,将mysql的表字段进行一份全量的拷贝到es的索引中即可,这样从实现上固然没有差别,最终也能达到效果,但这样做真的合理吗?在进行方案设计的时候,从实际经验来说,功能的实现固然重要,但如何做到既能满足功能,又能让设计显得合理才是更需要深入思考的。就这个问题来说,如何才算合理呢?可以从下面几点展开思考。

3.2.1 最大程度发挥es性能

不管是mysql,还是es,不管是hbase还是clickhouse...所有的数据存储介质,都有自己的优势和不足,因此在选择某种存储引擎时一定是利用其优势,同时规避其不足。就es来说,选择它的原因就是因为在海量的数据且复杂的检索场景下,仍然能够保持高性能。

在上文也谈到单纯使用es带来的不足,其中值得注意的一点就是,es是基于字段存储的,对一行数据来说,字段数量越多,当一个待检索的请求发来时,其计算耗费的成本必然越高,这不仅是针对es,甚至mysql等很多关系型数据库,对于单表过多字段的冗余设计也不推荐,所以对es来说,也不建议存储mysql表的所有字段,而是关键的具有重要业务意义的字段数据。

3.2.2 选择mysql还是es作为数据托底

这是一个架构设计中容易被忽略的问题。文章开始谈到,一个基本的业务场景是,主业务数据写入到mysql,同时将数据同步写入es,检索从es获取数据。那么问题来了,实际业务中,究竟以哪个数据为准呢?我们以下面一个简单的同步写入场景的业务逻辑为例来说明相信就能理解了。

@Transactional
public boolean save(){//数据组装try{//写入mysql//写入es}catch(Exception e){//es数据回滚}
}

这是一段同步双写的伪代码,从这段代码不难看出,mysql的写入由事务机制保障,但是es的数据写入与回滚就比较麻烦了,而且这样的实现对业务逻辑的侵入性强,维护性差,但可以发现,我们首要保障的是mysql数据的完整性,因为只有数据成功写入,界面上展示的数据才是正确的。

从这个分析结合实际的业务实现,以一个电商或类似的场景,从产品列表到具体的详情页面为例进行说明,参考下面的流程;

  • 用户浏览列表页;

  • 用户从列表页通过关键字搜索目标数据;

  • 从搜到的结果中选择某个具体的产品;

  • 进入具体产品的页,展示与当前产品完整的数据;

从上面的业务流程分析不难看出,实际要展示某个产品字段数据是非常多的,以某大型电商网站上面展现的某个产品为例,展现在用户面前的商品包括了非常多的数据,这些数据是多个源表经过服务端聚合以后再经过复杂的处理得到的,所以如果将这么多的字段放在es的某个索引中,这明显是不合适的,总结来说,两者搭配使用时可以遵循下面的思路:

  • es存放核心业务表的核心字段,比如产品ID,产品的详情描述,SKU等信息;

  • 列表搜索走es索引,通过es的检索,返回业务主键等关键信息;

  • 将第二步es得到的数据给到mysql的业务表,返回最终的数据给到页面;

从上面的分析来看,在实际业务中,应该酌情考虑是否应该将核心业务表的全量数据存于es,一般建议业务表的核心字段,比如业务主键 + 高频搜索的字段存放es中;

3.3 数据一致性保障

使用双写方案在实际操作中,基于双写方案,如何保障mysql与es的数据一致性是设计与开发过程中需要重点关注的。

我们知道,mysql有事务机制保障数据的一致性,而es没有事务,在上文的伪代码中仅仅是使用了一种非常简单的逻辑来保障,这样是远远不够的。一旦发生了mysql与es数据的不一致,带来的问题是很严重的。关于如何保障数据的一致性,结合实际操作经验,给出下面的几点建议:

3.3.1 同步双写

同步双写是保障数据一致性最简单的方式,也是实际操作中比较简单的操作方式,只需要将数据写到 MySQL 时,同时将数据写到 ES即可,通过mysql自身的事务机制间接保障两者数据一致性,其优缺点如下。

优点:

  • 这种方式简单粗暴,实时写入能做到秒级。

缺点:

  • 业务耦合,代码侵入性强,即在代码中需要写入mysql表的位置都需要加写入es的代码;

  • 性能影响,同步写入两个存储,响应时间变长;

  • 可能存在丢数据的风险;

3.3.2 异步双写

异步双写,即在数据写入mysql的同时,异步写入到es中,具体在实践过程中也有多种方式可以选择,下面提供几种方案。

异步线程

利用异步线程的方式,写入mysql的时候,开启多线程写入es;

内存队列

可以利用Java中提供的内存队列,写入mysql的同时向内存队列,比如BlockingQueue,另有一个线程消费内存队列中的数据写入es;

事件监听

主业务流程写入mysql的同时发布事件,另有一个事件订阅者订阅mysql写入事件,从而做到与主业务逻辑的解耦。

引入消息中间件

也可以考虑引入消息中间件,做到与主业务逻辑的彻底解耦,写入mysql的同时,向消息队列发送消息,另有服务消费者订阅消息消费,异步写入es ;

上述各种方式均可以在实践中使用,需要结合团队的技术储备,以及服务器资源,后续的运维成本等综合考虑。  

3.3.3 定期同步

定期同步适合对搜索场景不那么敏感的业务,在这种场景下,可以考虑每隔一段时间,或每天的某些时间点进行同步,将数据批量从mysql写入到es中。定期同步的优缺点如下。

优点:

  • 实现简单,系统资源占用少;

缺点:

  • 实时性难以保证;

  • 瞬时存储压力较大;

3.3.4 数据订阅

既要提高实时性,又要低入侵, 可以考虑利用 MySQL 的 Binlog 来进行同步。在很多数据同步工具中,都采用了类似的思想,简单来说,订阅mysql的binglog日志,然后通过回放binlog日志变化解析出变化的数据,从而进行数据同步。比如大家熟悉的canal就是很好的利用了这一点。

这种方式可以很好的与核心业务解耦,从而实现异步,总结来说,优点如下:

  • 降低对主业务逻辑的代码侵入性;

  • 数据的实时性好;

缺点:

  • 对第三方组件存在一定的依赖性;

  • 同步很难做到灵活性,很难对同步的数据做进一步的处理,比如同步时那些明显有问题的数据;

四、mysql+es双写方案数据迁移

对于一个全新的系统,结合上面考虑的要点,设计出一个相对完善的方案并落地实施不算难事,但是据个人经验,比较难的是中途引入es来补充和完善mysql的搜索能力上的短板。为什么这么说呢?

试想你的生产系统已经运行了很久了,mysql核心业务表也产生了相当量级的数据了。引入es之后,即便是双写,es中的存储的数据也是从某个时间点开始,搜索出来的数据也只有那个时间点之后的。那么之前的数据怎么办呢?肯定不能扔掉的。这时候就需考虑如何将之前mysql中老数据无损的迁入到es索引中。

这时候可能有人说这也不是什么难事吧,找个业务不繁忙的时间段将mysql中的老数据一次性迁移到es不就解决问题了吗?如果真是这么简单,就不会有那么多的麻烦事了,下面结合实践经验,从迁移的方案和迁移注意事项两方面进行说明。

4.1 数据迁移整体方案

以一个对数据搜索场景不是那么敏感的场景为例进行说明。整体业务流程如下:

 结合上面的流程,完整的数据迁移思路如下:

  • 创建索引;

  • 双写方案V1版生产上线(不包括es搜索),业务数据实现mysql+es双写,考虑使用消息中间件,记录时间点为T1;

  • 在完成数据迁移之前,搜索业务逻辑仍然走mysql,此时es索引中存储的是T1时间点开始之后的mysql数据;

  • 业务低峰期,利用数据同步工具或FlinkCDC等方案第一次完成全量迁移,针对T1之前的;

  • 双写方案V2版生产上线,数据搜索走es;

4.1.1 创建索引

建议自定义创建索引,控制索引中的字段信息,结合上面谈到的要点,es索引存储的字段信息为mysql核心业务表中的核心业务字段,比如业务主键,用于搜索的高频字段信息。

4.1.2 双写改造

稳妥起见,在第一个改造发布的版本中,代码逻辑层面先支持双写,比如通过异步线程将数据写入es,此时es索引中就存储了某个时间点T1之后的数据。

4.1.3 数据迁移

使用数据迁移工具或自己开发一个微服务,在业务低峰期(凌晨2点)完成一次全量数据的迁移,迁移完成后,ES中的数据基本与mysql表数据同步了。

4.1.4 搜索服务上线

上线搜索服务,此时数据的搜索将走es,具体的实现逻辑结合自身的业务场景酌情改造。比如上文谈到的,如果产品的详情页面是多个表的聚合结果,首先需要通过搜索得到核心的业务字段信息,然后代入到后面的逻辑中进行数据的组装。

4.2 数据迁移补充说明

以上结合实际场景给出了一个相对通用的数据迁移方案,在实际操作中,遇到的情况可能比这个更复杂,比如你可能遇到下面的这些情况:

  • 你要迁移的数据表经过了分库分表,即业务表的数据存储在多个库或多张表中,这种情况下如何迁移?;

  • 你要迁移的数据表数据量非常大,而且可以预计每月的增长量为几百万,如何保障保证es的存储容量?如何规划es的后续扩容?;

  • 迁移的数据量巨大,需要很久怎么办?

  • 迁移数据量巨大,迁移过程中发生异常怎么办?

  • ...

五、方案实施

下面通过实际代码演示一下完整的业务流程。

5.1 前置准备

5.1.1 搭建环境

这里假设你已经提前搭建好es、mysql的环境。

es的搭建可以参考文章:es脚本编程使用,mysql可以使用下面的docker命令快速开启mysql服务

docker run -p 3307:3306 --name mysql57 \
-v /usr/local/docker/mysql/data:/var/lib/mysql \
-v /usr/local/docker/mysql/conf:/etc/mysql/conf.d \
-v /usr/local/docker/mysql/log:/var/log/mysql \
-e MYSQL_ROOT_PASSWORD=你的root密码\
-d mysql:5.7

5.1.2 创建数据表

使用下面的sql语句创建一张数据表,其中desc字段会被作为高频字段搜索使用

CREATE TABLE `product` (`id` int(12) NOT NULL,`pro_name` varchar(64) DEFAULT NULL,`pro_no` varchar(32) DEFAULT NULL,`price` int(10) DEFAULT NULL,`category` varchar(32) DEFAULT NULL,`stock` int(32) DEFAULT NULL,`desc` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

5.1.3 插入初始化数据

为上述的表插入一些数据

INSERT INTO `pt_res`.`product`(`id`, `pro_name`, `pro_no`, `price`, `category`, `stock`, `desc`) VALUES (1, '小米14', 'A100', 3999, 'phone', 32, 'xiao mi phone');
INSERT INTO `pt_res`.`product`(`id`, `pro_name`, `pro_no`, `price`, `category`, `stock`, `desc`) VALUES (2, 'Java入门到精通', 'B100', 56, 'book', 12, 'Java technology');
INSERT INTO `pt_res`.`product`(`id`, `pro_name`, `pro_no`, `price`, `category`, `stock`, `desc`) VALUES (3, '精品男鞋', 'X100', 325, 'shoe', 82, 'Man shoe');

5.1.4 创建一个索引

创建一个名为product的索引,并指定desc字段分词,里面的字段与mysql表对应,但不是所有字段;

PUT product
{"mappings": {"properties": {"id":{"type": "long"},"pro_name": {"type": "keyword"},"desc": {"type": "text"}}}
}

测试创建一条数据

PUT /product/_doc/11
{"pro_name":"汪汪队纪念品","desc":"for children play"
}

查询这条数据

GET /product/_doc/11 

到这里,我们的准备工作就完成了,接下来将在代码中完成剩下的操作。

5.2 搭建springboot工程

本工程要做的事情如下:

  • 整合mybatis,与es;
  • 利用mybatis实现增删改查功能;
  • 利用异步线程写入es;
  • 实现mysql历史数据的迁移;

5.2.1 引入基础依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>2.1.4</version></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>7.6.2</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.6.2</version></dependency><!--<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client</artifactId><version>7.6.2</version></dependency>--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>${boot-web.version}</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.15</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lomok.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency></dependencies>

5.2.2 核心配置文件

主要配置mysql,mybatis以及es相关的连接信息

server:port: 8082spring:datasource:username: rootpassword: rooturl: jdbc:mysql://IP:3307/pt_res?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=falsedriver-class-name : com.mysql.jdbc.Driverelasticsearch:rest:uris: [IP:9200]host: IPport: 9200mybatis:mapper-locations: classpath:mybatis/*.xmltype-aliases-package: com.congge.entity

5.2.3 es客户端连接配置

自定义一个类,自定义一个RestHighLevelClient 的bean,配置es连接信息

import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
@Slf4j
public class EsConfig {@Value("${spring.elasticsearch.host}")private String host;@Value("${spring.elasticsearch.port}")private int port;@Bean(name = "restHighLevelClient")public RestHighLevelClient restHighLevelClient() {return new RestHighLevelClient(RestClient.builder(new HttpHost(host, port, "http")));}}

5.2.3 mybatis文件

在resources目录下创建mybatis目录,在里面编写与mysql操作的文件,这里创建一个操作product表的xml文件

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapperPUBLIC "-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.congge.dao.ProductDao"><resultMap id="BaseResultMap" type="com.congge.entity.Product"><id column="id" property="id" jdbcType="VARCHAR" /><result column="pro_name" property="proName" jdbcType="VARCHAR" /><result column="pro_no" property="proNo" jdbcType="VARCHAR" /><result column="price" property="price" jdbcType="INTEGER" /><result column="category" property="category" jdbcType="VARCHAR" /><result column="stock" property="stock" jdbcType="INTEGER" /><result column="desc" property="desc" jdbcType="VARCHAR" /></resultMap><select id="getAll" resultMap="BaseResultMap">select * from product</select>
</mapper>

注意启动类上面添加dao包的扫描

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
@MapperScan("com.congge.dao")
public class SyncApp {public static void main(String[] args) {SpringApplication.run(SyncApp.class,args);}}

5.2.4 业务实现类

@Service
public class ProductServiceImpl implements ProductService {@Autowiredprivate ProductDao productDao;@Overridepublic List<Product> getAll() {return productDao.getAll();}
}

5.2.4 相关测试

框架整合完毕之后,及时通过单元测试验证是否整合成功,下面给出了一些关于mysql操作以及索引操作的单元测试用例

import com.alibaba.fastjson.JSONObject;
import com.congge.SyncApp;
import com.congge.entity.Product;
import com.congge.entity.es.ProductInfo;
import com.congge.service.ProductService;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.IndicesClient;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.GetIndexResponse;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.util.List;
import java.util.Map;//@RunWith(SpringRunner.class)
@SpringBootTest(classes = {SyncApp.class})
public class EsTest {@Autowiredprivate RestHighLevelClient restHighLevelClient;@Autowiredprivate ProductService productService;@Testpublic void testFindAll(){List<Product> all = productService.getAll();System.out.println(all);}@org.junit.jupiter.api.Testvoid contextLoads() {System.out.println(restHighLevelClient);}/*** 判断索引是否存在*/@Testpublic void getIndex() throws Exception {IndicesClient indices = restHighLevelClient.indices();GetIndexRequest student0517 = new GetIndexRequest("product");boolean exists = indices.exists(student0517, RequestOptions.DEFAULT);if(exists){GetIndexResponse indexResponse = indices.get(student0517, RequestOptions.DEFAULT);Map<String, MappingMetaData> mappings = indexResponse.getMappings();System.out.println(mappings);}else{System.out.println("索引不存在");}}@Testpublic void getDocById() throws Exception {GetRequest getRequest = new GetRequest("product").id("11");GetResponse documentFields = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);//集合方式Map<String, Object> source = documentFields.getSource();for (String key : source.keySet()) {System.out.println(source.get(key));}//字符串  -----JSONString sourceAsString = documentFields.getSourceAsString();System.out.println(sourceAsString);//把JSON转换为 stuent//JSON字符串-->JSON对象JSONObject jsonObject = JSONObject.parseObject(sourceAsString);System.out.println(jsonObject);}@Testpublic void getDocByIdV2() throws Exception {SearchRequest searchRequest = new SearchRequest();searchRequest.indices("product");SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();searchRequest.source(searchSourceBuilder.query(QueryBuilders.termQuery("_id", 11)));searchSourceBuilder.size(1);SearchResponse search = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);SearchHits searchHits = search.getHits();for (SearchHit searchHit : searchHits) {Map<String, Object> sourceMap = searchHit.getSourceAsMap();System.out.println(sourceMap);}}@Testpublic void insertDoc() throws Exception {com.fasterxml.jackson.databind.ObjectMapper objectMapper = new com.fasterxml.jackson.databind.ObjectMapper();IndexRequest indexRequest = new IndexRequest("product");ProductInfo pro = new ProductInfo();pro.setId(13);pro.setPro_name("MP3");pro.setDesc("music player");String proData = objectMapper.writeValueAsString(user);indexRequest.source(proData,XContentType.JSON);//插入数据IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);System.out.println(response.status());System.out.println(response.getResult());}}

5.3 双写业务实现

按照上文的业务实现流程,向mysql表插入一条数据,同时写入一条数据到es

    @Override@Transactionalpublic Object save(Product product) {productDao.save(product);saveEs(product)//CompletableFuture.runAsync(() -> saveEs(product), newCachedThreadPool());return product.getId();}public void saveEs(Product product){com.fasterxml.jackson.databind.ObjectMapper objectMapper = new com.fasterxml.jackson.databind.ObjectMapper();IndexRequest indexRequest = new IndexRequest("product");ProductInfo pro = new ProductInfo();pro.setId(product.getId());pro.setPro_name(product.getProName());pro.setDesc(product.getDesc());String productData = null;try {productData = objectMapper.writeValueAsString(pro);} catch (JsonProcessingException e) {e.printStackTrace();}indexRequest.source(productData,XContentType.JSON);//插入数据IndexResponse response = null;try {response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);} catch (IOException e) {log.error("save to es error,error : 【{}】",e.getMessage());e.printStackTrace();}System.out.println(response.status());System.out.println(response.getResult());}/*** 带有缓存功能线程池** @return*/public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}

使用单元测试测试一下方法

    @Testpublic void testSave(){Product product = new Product();product.setId(6);product.setProName("可比克薯片");product.setProNo("F003");product.setPrice(7);product.setCategory("food");product.setStock(33);product.setDesc("classics food");Object save = productService.save(product);System.out.println(save);}

跑通之后,检查mysql与es的数据是否正常写入

5.4 数据搜索

我们假设用户输入关键字进行搜索,首先通过es的检索,得到表的基本关键字段,比如id,然后去mysql中查询完整的信息,核心业务实现逻辑如下。

@Overridepublic  List<Product> query(String key) {List<Integer> result = queryFromEs(key);List<Product> queryRes = null;if(!CollectionUtils.isEmpty(result)){queryRes =  productDao.getProductIn(result);}return queryRes;}private List<Integer> queryFromEs(String key) {SearchRequest request = new SearchRequest();request.indices("product");SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();FuzzyQueryBuilder fuzzyQueryBuilder =QueryBuilders.fuzzyQuery("desc", key).fuzziness(Fuzziness.ONE);sourceBuilder.query(fuzzyQueryBuilder);request.source(sourceBuilder);SearchResponse response = null;try {response = restHighLevelClient.search(request, RequestOptions.DEFAULT);} catch (IOException e) {e.printStackTrace();}System.out.println(response.getHits().getHits());System.out.println(response.getHits().getTotalHits());SearchHits hits = response.getHits();List<Integer> ids = new ArrayList<>();for (SearchHit searchHit : hits){Map<String, Object> sourceAsMap = searchHit.getSourceAsMap();System.out.println(sourceAsMap);ids.add(Integer.valueOf(sourceAsMap.get("id").toString()));}return ids;}

编写单元测试用例

    @Testpublic void query(){List<Product> res = productService.query("food");System.out.println(res);}

事实上,实际业务中,从es中查出了id等信息之后,需要通过id字段去mysql中进行多表关联的查询才能聚合结果,但是走es的搜索之后,可以大大提升获取id的性能

5.5 数据迁移

简单起见,这里直接使用定时任务做数据同步,可以考虑凌晨的时候来做这件事,核心迁移方法

public void doSync() {//设置一个时间点的条件作为同步数据的边界List<Product> syncDatas = productDao.getSyncDatas();for(Product product :syncDatas ){ObjectMapper objectMapper = new ObjectMapper();IndexRequest indexRequest = new IndexRequest("product");ProductInfo productInfo = new ProductInfo();productInfo.setId(product.getId());productInfo.setPro_name(product.getProName());productInfo.setDesc(product.getDesc());String proData = null;try {proData = objectMapper.writeValueAsString(productInfo);} catch (JsonProcessingException e) {e.printStackTrace();}indexRequest.source(proData,XContentType.JSON);//插入数据try {IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);} catch (IOException e) {e.printStackTrace();}}log.info("同步完成");}

最后增加一个定时任务的类,将上述的方法添加进去

import com.congge.service.ProductService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;@Configuration
@EnableScheduling
public class SyncTask {@Autowiredprivate ProductService productService;@Scheduled(cron = "0/2 * * * * ?")private void configureTasks(){System.out.println("开始执行数据同步");productService.doSync();System.out.println("数据同步完成");}}

六、写在文末

本文通过较大的篇幅详细讨论了mysql与es实现双写的设计以及实现过程,当然在实际操作过程中还有很多值得探讨和细节,希望为看到的小伙伴提供一个思路,本篇到此结束,感谢观看。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/184248.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《强化学习与机器人控制》:探索深度学习的应用宝典

《强化学习与机器人控制》是一本涵盖了广泛主题的深度著作&#xff0c;它不仅介绍了人机交互控制和强化学习的基本原理&#xff0c;还深入探讨了无模型强化学习控制器以及其在机器人控制中的应用。这本书对于研究生和执业工程师来说是一本极具价值的参考书&#xff0c;它为读者…

Markov Chain Fingerprinting to Classify Encrypted Traffic 论文笔记

0.Abstract 在本文中&#xff0c;提出了用于SSL/TLS会话中传输的应用程序流量的随机指纹。这个指纹基于一阶齐次马尔可夫链&#xff0c;模型识别应用程序的准确率&#xff0c;并提供了检测异常对话的可能性。 1.Introduction 通过SSL/TLS会话时的头部信息创建统计指纹&#xff…

【Linux】:初识git || centos下安装git || 创建本地仓库 || 配置本地仓库 || 认识工作区/暂存区(索引)以及版本库

&#x1f4ee;1.初识git Git 原理与使用 课程⽬标 • 技术⽬标:掌握Git企业级应⽤&#xff0c;深刻理解Git操作过程与操作原理&#xff0c;理解⼯作区&#xff0c;暂存区&#xff0c;版本库的含义 • 技术⽬标:掌握Git版本管理&#xff0c;⾃由进⾏版本回退、撤销、修改等Git操…

Python和BeautifulSoup库的魔力:解析TikTok视频页面

概述 短视频平台如TikTok已成为信息传播和电商推广的重要渠道。用户通过短视频分享生活、创作内容&#xff0c;吸引了数以亿计的观众&#xff0c;为企业和创作者提供了广阔的市场和宣传机会。然而&#xff0c;要深入了解TikTok上的视频内容以及用户互动情况&#xff0c;需要借…

YOLOv8-seg 分割代码详解(一)Predict

前言 本文从 U-Net 入手熟悉分割的简单方法&#xff0c;再看 YOLOv8 的方法。主要梳理 YOLOv8 的网络结构&#xff0c;以及 Predict 过程的后处理方法。 U-Net 代码地址&#xff1a;https://github.com/milesial/Pytorch-UNet YOLOv8 代码地址&#xff1a;https://github.com/…

B站双11,联手天猫暴涨2亿消费新势力

一直以来&#xff0c;手持高活跃、高粘性用户群体的B站是行业用来观察年轻人消费习惯的重要平台。以至于用户群体的不断壮大带动了B站的商业价值。如今B站的商业舞台越来越大&#xff0c;不断地向外界招手&#xff0c;欢迎更多品牌积极加入到这个千万年轻人聚集的内容社区。 2…

大数据疫情分析及可视化系统 计算机竞赛

文章目录 0 前言2 开发简介3 数据集4 实现技术4.1 系统架构4.2 开发环境4.3 疫情地图4.3.1 填充图(Choropleth maps)4.3.2 气泡图 4.4 全国疫情实时追踪4.6 其他页面 5 关键代码最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 大数据疫…

Web Worker:JS多线程的伪解药?

前言 在前端开发领域&#xff0c;JavaScript 的单线程限制一直是一个难以忽视的挑战。当谈到解决JavaScript的单线程限制时&#xff0c;HTML5引入的Web Worker被普遍认为是一剂解药&#x1f48a;。同时&#xff0c;业界中大量的文章也是聚焦于讨论web worker的神奇力量。然而&…

Android内存回收机制、GC算法及内存问题分析解决

Android内存回收机制、GC算法及内存问题分析解决 在Android开发中&#xff0c;Java内存回收和垃圾收集&#xff08;GC&#xff09;机制是确保应用程序高效运行的关键部分。针对不同对象存活率&#xff0c;Android平台采用了引用计数算法和可达性分析法来判定对象的可回收性&am…

RTC实时时钟——DS1302

DS1302目录 一、DS1302简介引脚定义与推荐电路 二、芯片手册1.操作寄存器的定义2.时序定义dc1302.cds1302.h 三、蓝桥杯实践 一、DS1302简介 RTC(Real Time Clock):实时时钟&#xff0c;是一种集成电路&#xff0c;通常称为时钟芯片。现在流行的串行时钟电路很多&#xff0c;如…

华为李鹏:到 2025 年智能算力需求将达到目前水平的 100 倍

在第十四届全球移动宽带论坛上&#xff0c;华为高级副总裁、运营商 BG 总裁李鹏表示&#xff0c;大模型为代表的 AI 应用发展带来对智能算力的爆发式需求。 李鹏在题为《加速 5G 商业正循环&#xff0c;拥抱更繁荣的 5.5G》的讲话中表示&#xff0c;「5G 已经走在商业成功的正确…

C# OpenCvSharp 去除字母后面的杂线

效果 项目 代码 using OpenCvSharp; using System; using System.Drawing; using System.Windows.Forms;namespace OpenCvSharp_Demo {public partial class frmMain : Form{public frmMain(){InitializeComponent();}string image_path "";private void Form1_Loa…

三国志14信息查询小程序(历史武将信息一览)制作更新过程05-后台接口的编写及调用

1&#xff0c;创建ASP.NET Web API项目 生成完毕&#xff0c;项目结构如下&#xff1a; 运行看一下&#xff1a; 2&#xff0c;后台接口编写 &#xff08;1&#xff09;在Models文件夹中新建一个sandata.cs文件&#xff08;就是上篇中武将信息表的model文件&#xff09; u…

伦敦金开户需要多少资金,有开户条件吗?

伦敦金&#xff08;London Gold&#xff09;是黄金市场中备受瞩目的投资种类之一&#xff0c;无论是专业投资者还是新手&#xff0c;都对伦敦金感兴趣。但关于开户需要多少资金&#xff0c;以及是否有特定的开户条件&#xff0c;这些问题可能会让一些新手投资者感到困惑。 首先…

notepad++搜索结果窗口不见了

1、使用notepad打开一个文件文件 2、ctrlf&#xff0c;打开搜索窗口&#xff0c;随便搜索一个内容 3、按F7,然后AltF7 切换焦点到Find result. 会有一个小窗口出现&#xff0c;内容是&#xff1a;还原&#xff0c;移动&#xff0c;大小等 4&#xff0c;点移动&#xff0c;使…

[答疑]校长出轨主任流程的业务建模

DDD领域驱动设计批评文集 做强化自测题获得“软件方法建模师”称号 《软件方法》各章合集 艳阳天 2023-10-27 19:45 我有点迷糊。校长出轨主任在酒店被拍到&#xff0c;不属于学校的业务流程&#xff0c;但闹出这种事对学校有很大影响。如果学校想用一个系统抓风纪&#xff…

论文阅读—— BiFormer(cvpr2023)

论文&#xff1a;https://arxiv.org/abs/2303.08810 github&#xff1a;GitHub - rayleizhu/BiFormer: [CVPR 2023] Official code release of our paper "BiFormer: Vision Transformer with Bi-Level Routing Attention" 一、介绍 1、要解决的问题&#xff1a;t…

OpenLayers入门,OpenLayers加载离线xyz瓦片地图并显示离线鹰眼控件

专栏目录: OpenLayers入门教程汇总目录 前言 本章介绍如何使用OpenLayers加载离线xyz瓦片地图图层,并显示离线xyz瓦片的鹰眼控件。 本章是综合案例,涉及到两块内容,一个是离线瓦片地图加载,二个是鹰眼控件,拆分的参考文章如下: OpenLayers入门,OpenLayers地图鹰眼控…

Java面试题(每天10题)-------连载(26)

目录 多线程篇 1、什么是FutureTask&#xff1f; 2、什么是同步容器和并发容器的实现&#xff1f; 3、什么是多线程的上下文切换&#xff1f; 4、ThreadLocal的设计理念与作用&#xff1f; 5、ThreadPool&#xff08;线程池&#xff09;用法与优势&#xff1f; 6、Concur…

智能文件改名:高效复制并删除冗余,简化文件管理“

在繁杂的电脑文件世界中&#xff0c;如何高效地管理文件成为了许多人的难题。为了解决这一难题&#xff0c;我们推出了一款智能文件改名工具&#xff0c;它能够轻松复制文件并删除目标文件夹中的冗余文件&#xff0c;让您的文件管理更加高效便捷。 第一步&#xff0c;我们要打…