一 介绍
elasticsearch数据不是一直不变的,需要与mysql、oracle等数据库的数据做同步。
本博客里涉及到的项目地址:https://www.aliyundrive.com/s/7bRWpTYsxWV
方案一: 同步调用,即操作mysql数据后,接着操作elasticsearch的数据
- 优点:实现简单,粗暴
- 缺点:业务耦合度高
方案二: 引入mq中间件,操作完mysql后,发消息给mq,然后更新elasticsearch。
- 优点:低耦合,实现难度一般
- 缺点:依赖mq的可靠性
方案三: 监听mysql的binlog日志,操作mysql时,监听到binlog后,接着操作elasticsearch数据
- 优点:完全解除服务间耦合
- 缺点:开启binlog增加数据库负担、实现复杂度高(且目前只有mysql支持binlog)
本文介绍比较通用的方案,即方案二,使用的mq消息队列是rabbitmq
二 消费端搭建服务
资料里的hotel-admin项目,是用来操作mysql、产生mq消息的。hotel-demo项目,是用来操作es、消费mq消息的。
2.1 hotel-demo项目搭建rabbitmq
声明消息队列里的exchage、queue、RoutingKey
声明交换机、队列等,一般都是在消费者里操作。由于对于ES来说,新增与修改是一样的(修改时,找不到id,就会新增),所以队列只声明新增、删除两种队列即可。
hotel-demo:引入依赖
<!--amqp-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
添加rabbitmq配置信息
声明exchange、queue、RoutingKey
//常量类定义交换机、队列、路由key等,消息的消费者和发送者都要定义这个类
public class HotelMqConstants {/*** 交换机名称*/public static final String EXCHANGE_NAME = "hotel.topic";/*** 新增、修改队列*/public static final String INSERT_QUEUE_NAME = "hotel.insert.queue";/*** 删除队列*/public static final String DELETE_QUEUE_NAME = "hotel.delete.queue";/*** 新增或修改的RoutingKey*/public static final String INSERT_KEY = "hotel.insert";/*** 删除的RoutingKey*/public static final String DELETE_KEY = "hotel.delete";
}
定义队列、主题等的绑定关系时,有两种方式
- 基于注解(较简单)
- 基于bean
这里使用基于bean的方式,MqConfig.java配置类
public class HotelMqConstants {//交换机名称public static final String EXCHANGE_NAME = "hotel.topic";//插入、更新数据时的队列名public static final String INSERT_QUEUE_NAME = "hotel.insert.queue";//删除数据时的队列名public static final String DELETE_QUEUE_NAME = "hotel.delete.queue";//插入、更新数据时的RoutingKeypublic static final String INSERT_KEY = "hotel.insert";//删除数据时的RoutingKeypublic static final String DELETE_KEY = "hotel.delete";
}
import cn.itcast.hotel.constants.HotelMqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MqConfig {/*** 定义交换机* @return*/@Beanpublic TopicExchange topicExchange(){//参数一:交换机名字//参数二:持久化return new TopicExchange(HotelMqConstants.EXCHANGE_NAME,true,false);}/*** 插入、更新数据的队列*/@Beanpublic Queue insertQueue(){return new Queue(HotelMqConstants.INSERT_QUEUE_NAME,true);}/*** 删除数据的队列*/@Beanpublic Queue deleteQueue(){return new Queue(HotelMqConstants.DELETE_QUEUE_NAME,true);}/*** 定义插入、更新数据时,队列、交换机、路由key的绑定关系*/public Binding insertQueueBinding(){//队列绑定交换机、绑定RoutingKeyreturn BindingBuilder.bind(insertQueue()).to(topicExchange()).with(HotelMqConstants.INSERT_KEY);}/*** 定义删除数据时,队列、交换机、路由key的绑定关系*/public Binding deleteQueueBinding(){//队列绑定交换机、绑定RoutingKeyreturn BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(HotelMqConstants.DELETE_KEY);}
}
2.2 hotel-admin项目搭建rabbitmq
启动访问8099端口
引入amqp依赖
<!--amqp-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
定义常量类
public class HotelMqConstants {//交换机名称public static final String EXCHANGE_NAME = "hotel.topic";//插入、更新数据时的队列名public static final String INSERT_QUEUE_NAME = "hotel.insert.queue";//删除数据时的队列名public static final String DELETE_QUEUE_NAME = "hotel.delete.queue";//插入、更新数据时的RoutingKeypublic static final String INSERT_KEY = "hotel.insert";//删除数据时的RoutingKeypublic static final String DELETE_KEY = "hotel.delete";
}
配置rabbitmq地址
发送mq消息代码
public class HotelController {@Autowiredprivate IHotelService hotelService;@Autowiredprivate RabbitTemplate rabbitTemplate;@PostMappingpublic void saveHotel(@RequestBody Hotel hotel){// 新增酒店hotelService.save(hotel);// 发送MQ消息(第三个参数是消息内容:hotel.getId())rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());}@PutMapping()public void updateById(@RequestBody Hotel hotel){if (hotel.getId() == null) {throw new InvalidParameterException("id不能为空");}hotelService.updateById(hotel);// 发送MQ消息第三个参数是消息内容:hotel.getId())rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());}@DeleteMapping("/{id}")public void deleteById(@PathVariable("id") Long id) {hotelService.removeById(id);// 发送MQ消息(第三个参数是消息内容:hotel.getId())rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.DELETE_KEY, id);}}
2.3 hotel-demo监听消息
hotel-demo是消费者,负责监听消息
添加es依赖
<!--elasticsearch-->
<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.12.1</version>
</dependency>
import cn.itcast.hotel.constants.HotelMqConstants;
import cn.itcast.hotel.service.IHotelService;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class HotelListener {@Autowiredprivate IHotelService hotelService;/*** 监听新增、修改的消息* @param hotelId*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = HotelMqConstants.INSERT_QUEUE_NAME),exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),key = HotelMqConstants.INSERT_KEY))public void listenHotelInsert(Long hotelId){// 新增hotelService.saveById(hotelId);}/*** 监听删除的消息* @param hotelId*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = HotelMqConstants.DELETE_QUEUE_NAME),exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),key = HotelMqConstants.DELETE_KEY))public void listenHotelDelete(Long hotelId){// 删除hotelService.deleteById(hotelId);}
}
import cn.itcast.hotel.mapper.HotelMapper;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.HotelDoc;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import cn.itcast.hotel.service.IHotelService;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilders;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Slf4j
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {@Autowiredprivate RestHighLevelClient restHighLevelClient;@Overridepublic void saveById(Long hotelId) {try {// 查询酒店数据,应该基于Feign远程调用hotel-admin,根据id查询酒店数据(现在直接去数据库查)Hotel hotel = getById(hotelId);// 把hotel对象转换为hotel的DocHotelDoc hotelDoc = new HotelDoc(hotel);// 1.创建RequestIndexRequest request = new IndexRequest("hotel").id(hotelId.toString());// 2.准备参数request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);// 3.发送请求restHighLevelClient.index(request, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException("新增酒店数据失败", e);}}
@Overridepublic void deleteById(Long hotelId) {try {// 1.创建requestDeleteRequest request = new DeleteRequest("hotel", hotelId.toString());// 2.发送请求restHighLevelClient.delete(request, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException("删除酒店数据失败", e);}}
}