目录
- 1 es与数据库同步的方法
- 2 实践
- 2.1 任务介绍
- 2.2 MQ方面操作
- 2.2.1 声明交换机队列并且绑定
- 2.2.2 hotel_admin端web层设置mq发送消息
- 2.3 hotel_demo端监听接受消息并执行es操作
1 es与数据库同步的方法
方式一:同步调用
- 优点:实现简单,粗暴
- 缺点:业务耦合度高
方式二:异步通知(选择这个折中下)
- 优点:低耦合,实现难度一般
- 缺点:依赖mq的可靠性
方式三:监听binlog
- 优点:完全解除服务间耦合
- 缺点:开启binlog增加数据库负担、实现复杂度高
2 实践
2.1 任务介绍
当酒店数据发生增、删、改时,要求对elasticsearch中数据也要完成相同操作。
同时开启了hotel_admin和hotel_demo两个微服务,利用MQ声明exchange、queue、RoutingKey,在hotel-admin中的增、删、改业务中完成消息发送,在hotel-demo中完成消息监听,并更新elasticsearch中数据,进而完成es和mysql的消息同步
2.2 MQ方面操作
2.2.1 声明交换机队列并且绑定
我打算使用的mq结构如下:
代码:
@Configuration
public class Myconfig {/*** 声明交换机* @return*/@Beanpublic TopicExchange topicExchange(){return new TopicExchange(MqConstants.HOTEL_EXCHANGE,true,false);}/*** 插入/更新队列* @return*/@Beanpublic Queue insertQueue(){return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true);}/*** 删除队列* @return*/@Beanpublic Queue deleteQueue(){return new Queue(MqConstants.HOTEL_DELETE_QUEUE);}/*** 绑定增/改* @return*/@Beanpublic Binding bindingInsert(){return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_QUEUE);}/*** 绑定删除* @return*/@Beanpublic Binding bindingDelete(){return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);}
}
2.2.2 hotel_admin端web层设置mq发送消息
@RestController
@RequestMapping("hotel")
public class HotelController {@Autowiredprivate IHotelService hotelService;@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/{id}")public Hotel queryById(@PathVariable("id") Long id){return hotelService.getById(id);}@GetMapping("/list")public PageResult hotelList(@RequestParam(value = "page", defaultValue = "1") Integer page,@RequestParam(value = "size", defaultValue = "1") Integer size){Page<Hotel> result = hotelService.page(new Page<>(page, size));return new PageResult(result.getTotal(), result.getRecords());}@PostMappingpublic void saveHotel(@RequestBody Hotel hotel){hotelService.save(hotel);rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());}@PutMapping()public void updateById(@RequestBody Hotel hotel){if (hotel.getId() == null) {throw new InvalidParameterException("id不能为空");}hotelService.updateById(hotel);rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());}@DeleteMapping("/{id}")public void deleteById(@PathVariable("id") Long id) {hotelService.removeById(id);rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);}
}
2.3 hotel_demo端监听接受消息并执行es操作
@Component
public class MsgListener {@Autowiredprivate IHotelService hotelService;/*** 监听插入或者更新doc的信息* @param id*/@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)public void receiveInsertMsg(Long id){hotelService.InsertOrUpdate(id);}/*** 监听删除doc的信息* @param id*/@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)public void receiveDeleteMsg(Long id){hotelService.deleteEsById(id);}
}
之后去service层实现监听类调用的增删方法
@Overridepublic void InsertOrUpdate(Long id) {//1 根据id去数据库查信息Hotel db_hotel = this.getById(id);if(db_hotel == null){log.warn("id为:"+id+"的酒店不存在");return;}//2 构建添加对象HotelDoc hotelDoc = new HotelDoc(db_hotel);String jsonString = JSON.toJSONString(hotelDoc);IndexRequest request = new IndexRequest("hotel").id(db_hotel.getId().toString());request.source(jsonString, XContentType.JSON);//3 发送添加请求try {IndexResponse result = restHighLevelClient.index(request, RequestOptions.DEFAULT);} catch (IOException e) {log.warn("同步id为:"+id+"的信息超时");}}@Overridepublic void deleteEsById(Long id) {DeleteRequest request = new DeleteRequest("hotel",id.toString());try {restHighLevelClient.delete(request,RequestOptions.DEFAULT);} catch (IOException e) {log.warn("删除id为:"+id+"的信息超时");}}