elasticsearch实战三 elasticsearch与mysql数据实时同步

一 介绍

elasticsearch数据不是一直不变的,需要与mysql、oracle等数据库的数据做同步。
本博客里涉及到的项目地址:https://www.aliyundrive.com/s/7bRWpTYsxWV

方案一: 同步调用,即操作mysql数据后,接着操作elasticsearch的数据

  • 优点:实现简单,粗暴
  • 缺点:业务耦合度高
    在这里插入图片描述

方案二: 引入mq中间件,操作完mysql后,发消息给mq,然后更新elasticsearch。

  • 优点:低耦合,实现难度一般
  • 缺点:依赖mq的可靠性

在这里插入图片描述

方案三: 监听mysql的binlog日志,操作mysql时,监听到binlog后,接着操作elasticsearch数据

  • 优点:完全解除服务间耦合
  • 缺点:开启binlog增加数据库负担、实现复杂度高(且目前只有mysql支持binlog)

在这里插入图片描述
本文介绍比较通用的方案,即方案二,使用的mq消息队列是rabbitmq

二 消费端搭建服务

资料里的hotel-admin项目,是用来操作mysql、产生mq消息的。hotel-demo项目,是用来操作es、消费mq消息的。
在这里插入图片描述

2.1 hotel-demo项目搭建rabbitmq

声明消息队列里的exchage、queue、RoutingKey
声明交换机、队列等,一般都是在消费者里操作。由于对于ES来说,新增与修改是一样的(修改时,找不到id,就会新增),所以队列只声明新增、删除两种队列即可。

hotel-demo:引入依赖

<!--amqp-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

添加rabbitmq配置信息
在这里插入图片描述
声明exchange、queue、RoutingKey

//常量类定义交换机、队列、路由key等,消息的消费者和发送者都要定义这个类
public class HotelMqConstants {/*** 交换机名称*/public static final String EXCHANGE_NAME = "hotel.topic";/*** 新增、修改队列*/public static final String INSERT_QUEUE_NAME = "hotel.insert.queue";/*** 删除队列*/public static final String DELETE_QUEUE_NAME = "hotel.delete.queue";/*** 新增或修改的RoutingKey*/public static final String INSERT_KEY = "hotel.insert";/*** 删除的RoutingKey*/public static final String DELETE_KEY = "hotel.delete";
}

定义队列、主题等的绑定关系时,有两种方式

  1. 基于注解(较简单)
  2. 基于bean

这里使用基于bean的方式,MqConfig.java配置类

public class HotelMqConstants {//交换机名称public static final String EXCHANGE_NAME = "hotel.topic";//插入、更新数据时的队列名public static final String INSERT_QUEUE_NAME = "hotel.insert.queue";//删除数据时的队列名public static final String DELETE_QUEUE_NAME = "hotel.delete.queue";//插入、更新数据时的RoutingKeypublic static final String INSERT_KEY = "hotel.insert";//删除数据时的RoutingKeypublic static final String DELETE_KEY = "hotel.delete";
}
import cn.itcast.hotel.constants.HotelMqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MqConfig {/*** 定义交换机* @return*/@Beanpublic TopicExchange topicExchange(){//参数一:交换机名字//参数二:持久化return new TopicExchange(HotelMqConstants.EXCHANGE_NAME,true,false);}/*** 插入、更新数据的队列*/@Beanpublic Queue insertQueue(){return new Queue(HotelMqConstants.INSERT_QUEUE_NAME,true);}/*** 删除数据的队列*/@Beanpublic Queue deleteQueue(){return new Queue(HotelMqConstants.DELETE_QUEUE_NAME,true);}/*** 定义插入、更新数据时,队列、交换机、路由key的绑定关系*/public Binding insertQueueBinding(){//队列绑定交换机、绑定RoutingKeyreturn BindingBuilder.bind(insertQueue()).to(topicExchange()).with(HotelMqConstants.INSERT_KEY);}/*** 定义删除数据时,队列、交换机、路由key的绑定关系*/public Binding deleteQueueBinding(){//队列绑定交换机、绑定RoutingKeyreturn BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(HotelMqConstants.DELETE_KEY);}
}

2.2 hotel-admin项目搭建rabbitmq

启动访问8099端口
在这里插入图片描述
引入amqp依赖

<!--amqp-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

定义常量类

public class HotelMqConstants {//交换机名称public static final String EXCHANGE_NAME = "hotel.topic";//插入、更新数据时的队列名public static final String INSERT_QUEUE_NAME = "hotel.insert.queue";//删除数据时的队列名public static final String DELETE_QUEUE_NAME = "hotel.delete.queue";//插入、更新数据时的RoutingKeypublic static final String INSERT_KEY = "hotel.insert";//删除数据时的RoutingKeypublic static final String DELETE_KEY = "hotel.delete";
}

配置rabbitmq地址
在这里插入图片描述
发送mq消息代码

public class HotelController {@Autowiredprivate IHotelService hotelService;@Autowiredprivate RabbitTemplate rabbitTemplate;@PostMappingpublic void saveHotel(@RequestBody Hotel hotel){// 新增酒店hotelService.save(hotel);// 发送MQ消息(第三个参数是消息内容:hotel.getId())rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());}@PutMapping()public void updateById(@RequestBody Hotel hotel){if (hotel.getId() == null) {throw new InvalidParameterException("id不能为空");}hotelService.updateById(hotel);// 发送MQ消息第三个参数是消息内容:hotel.getId())rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());}@DeleteMapping("/{id}")public void deleteById(@PathVariable("id") Long id) {hotelService.removeById(id);// 发送MQ消息(第三个参数是消息内容:hotel.getId())rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.DELETE_KEY, id);}}

2.3 hotel-demo监听消息

hotel-demo是消费者,负责监听消息

添加es依赖

<!--elasticsearch-->
<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.12.1</version>
</dependency>

import cn.itcast.hotel.constants.HotelMqConstants;
import cn.itcast.hotel.service.IHotelService;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class HotelListener {@Autowiredprivate IHotelService hotelService;/*** 监听新增、修改的消息* @param hotelId*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = HotelMqConstants.INSERT_QUEUE_NAME),exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),key = HotelMqConstants.INSERT_KEY))public void listenHotelInsert(Long hotelId){// 新增hotelService.saveById(hotelId);}/*** 监听删除的消息* @param hotelId*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = HotelMqConstants.DELETE_QUEUE_NAME),exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),key = HotelMqConstants.DELETE_KEY))public void listenHotelDelete(Long hotelId){// 删除hotelService.deleteById(hotelId);}
}
import cn.itcast.hotel.mapper.HotelMapper;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.HotelDoc;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import cn.itcast.hotel.service.IHotelService;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilders;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Slf4j
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {@Autowiredprivate RestHighLevelClient restHighLevelClient;@Overridepublic void saveById(Long hotelId) {try {// 查询酒店数据,应该基于Feign远程调用hotel-admin,根据id查询酒店数据(现在直接去数据库查)Hotel hotel = getById(hotelId);// 把hotel对象转换为hotel的DocHotelDoc hotelDoc = new HotelDoc(hotel);// 1.创建RequestIndexRequest request = new IndexRequest("hotel").id(hotelId.toString());// 2.准备参数request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);// 3.发送请求restHighLevelClient.index(request, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException("新增酒店数据失败", e);}}
@Overridepublic void deleteById(Long hotelId) {try {// 1.创建requestDeleteRequest request = new DeleteRequest("hotel", hotelId.toString());// 2.发送请求restHighLevelClient.delete(request, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException("删除酒店数据失败", e);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/15111.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

智能化食品安全管理:AI视频监控在大型商场的技术方案

前言 在卖场中&#xff0c;尤其是熟食区&#xff0c;AI视频监控的应用对于食品安全至关重要。通过AI视频监控系统&#xff0c;卖场可以实时监测食品处理环节中的每一个细节&#xff0c;从员工的个人防护到清洁操作&#xff0c;再到区域管理&#xff0c;全面提升食品安全管理的…

分析模式应用――帐务模式02

Party 模式中的层次结构模型支持多种灵活的层次结构&#xff0c;但这里我们只要关心上下级的包含关系就可以了&#xff0c;参加结算的称为结算实体BalanceEntity&#xff0c; 不可再拆分的称为LeafEntity&#xff0c; 可以包含下级结算实体的称为CompositeEntity&#xff0c;因…

什么是网络安全

1) 什么是网络安全 作为程序员&#xff0c;主要是面向产品的安全的问题。比如sql注入&#xff0c;xss&#xff0c;csrf&#xff0c;cookie窃取等等&#xff0c;都值得我们去思考。保证网站运行正常&#xff0c;客户数据安全。 2) sql注入 简单的说&#xff0c;就是利用表单提…

2025年软件测试五大趋势:AI、API安全、云测试等前沿实践

随着软件开发的不断进步&#xff0c;测试方法也在演变。企业需要紧跟新兴趋势&#xff0c;以提升软件质量、提高测试效率&#xff0c;并确保安全性&#xff0c;在竞争激烈的技术环境中保持领先地位。本文将深入探讨2025年最值得关注的五大软件测试趋势。 Parasoft下载https://…

等级保护2.0|网络安全服务

等级保护2.0|网络安全服务 定义 对于国家秘密信息、法人和其他组织及公民专有信息以及公开信息的存储、传输、处理这些信息系统分等级实行安全保护&#xff0c;对信息系统中发生的信息安全时间分等级响应、处置。 思想 对信息安全实行等级化保护和等级化管理 目标 突出重…

Spatial Branching for Conic Non-Convexities in Optimal Electricity-Gas Flow

摘要—本文提出了一种基于几何的空间分支策略&#xff08; spatial branching strategy&#xff09;&#xff0c;用于解决集成电力-燃气系统中的圆锥非凸方程&#xff08; conic non-convex equations&#xff09;。所提出的策略嵌入在空间分支定界算法中&#xff0c;以求解最优…

ChunkKV:优化 KV 缓存压缩,让 LLM 长文本推理更高效

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

IDEA编写SpringBoot项目时使用Lombok报错“找不到符号”的原因和解决

目录 概述|背景 报错解析 解决方法 IDEA配置解决 Pom配置插件解决 概述|背景 报错发生背景&#xff1a;在SpringBoot项目中引入Lombok依赖并使用后出现"找不到符号"的问题。 本文讨论在上述背景下发生的报错原因和解决办法&#xff0c;如果仅为了解决BUG不论原…

【Redis】redis 存储的列表如何分页和检索

博主介绍&#xff1a;✌全网粉丝22W&#xff0c;CSDN博客专家、Java领域优质创作者&#xff0c;掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域✌ 技术范围&#xff1a;SpringBoot、SpringCloud、Vue、SSM、HTML、Nodejs、Python、MySQL、PostgreSQL、大数据、物…

基于SpringBoot的线上历史馆藏管理系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏&#xff1a;…

Redis持久化机制详解

为什么需要持久化 Redis通常被作为缓存使用&#xff0c;但是Redis一旦宕机&#xff0c;内存中的数据全部丢失&#xff0c;可能会导致数据库崩溃。如果是从数据库中恢复这些数据就会存在频繁访问数据库和读取速度慢的问题。所以redis实现数据的持久化&#xff0c;是至关重要的。…

代码随想录算法训练营day38

代码随想录算法训练营 —day38 文章目录 代码随想录算法训练营前言一、322. 零钱兑换二维dp数组 二、279.完全平方数二维dp数组 三、139. 单词拆分多重背包背包问题总结问题类型递推公式遍历顺序 前言 今天是算法营的第38天&#xff0c;希望自己能够坚持下来&#xff01; 今日…

数据库高安全—审计追踪:传统审计统一审计

书接上文数据库高安全—角色权限&#xff1a;权限管理&权限检查&#xff0c;从权限管理和权限检查方面解读了高斯数据库的角色权限&#xff0c;本篇将从传统审计和统一审计两方面对高斯数据库的审计追踪技术进行解读。 4 审计追踪 4.1 传统审计 审计内容的记录方式通…

【C++篇】 异常处理

目录 1&#xff0c;异常的概念及使用 1.1&#xff0c;异常的概念 1.2&#xff0c;异常的抛出和捕获 1.3&#xff0c;栈展开 1.4&#xff0c;异常的重新抛出 1.5&#xff0c;异常安全问题 1.6&#xff0c;异常规范 2&#xff0c;标准库中的异常 小结&#xff1a; 1&…

QT修仙之路1-1--遇见QT

文章目录 遇见QT二、QT概述2.1 定义与功能2.2 跨平台特性2.3 优点汇总 三、软件安装四、QT工具介绍(重要)4.1 Assistant4.2 Designer4.3 uic.exe4.4 moc.exe4.5 rcc.exe4.6 qmake4.7 QTcreater 五、QT工程项目解析(作业)5.1 配置文件&#xff08;.pro&#xff09;5.2 头文件&am…

python实现情绪识别模块,并将模块封装成可执行文件

目录&#xff1a; 1.源码&#xff1a;2.情绪识别模型运行流程&#xff1a;3.模型封装需要注意的地方&#xff1a;4.未解决问题&#xff1a; 1.源码&#xff1a; https://gitcode.com/xyint/deep_learning.git 2.情绪识别模型运行流程&#xff1a; 需要获取用户摄像头权限&…

网络防御高级02-综合实验

web页面&#xff1a; [FW]interface GigabitEthernet 0/0/0 [FW-GigabitEthernet0/0/0]service-manage all permit 需求一&#xff0c;接口配置&#xff1a; SW2: [Huawei]sysname SW2 1.创建vlan [sw2]vlan 10 [sw2]vlan 20 2.接口配置 [sw2]interface GigabitEther…

Arbess基础教程-创建流水线

Arbess(谐音阿尔卑斯) 是一款开源免费的 CI/CD 工具&#xff0c;本文将介绍如何使用 Arbess 配置你的第一条流水线&#xff0c;以快速入门上手。 1. 创建流水线 根据不同需求来创建不同的流水线。 1.1 配置基本信息 配置流水线的基本信息&#xff0c;如分组&#xff0c;环境&…

MySQL下载过程

MySQL Enterprise Edition Downloads | Oracle mysql官方下载网址&#xff08;9.2版本&#xff09; 下面的示例是5.7的包&#xff0c;过程是一样的 port&#xff1a;3308&#xff08;默认的是3306&#xff0c;笔者下了一个占用了该端口&#xff09; root&#xff1a;123456 问题…

StochSync:可在任意空间中生成360°全景图和3D网格纹理

StochSync方法可以用于在任意空间中生成图像&#xff0c;尤其是360全景图和3D网格纹理。该方法利用了预训练的图像扩散模型&#xff0c;以实现零-shot生成&#xff0c;消除了对新数据收集和单独训练生成模型的需求。StochSync 结合了 Diffusion Synchronization&#xff08;DS&…