数据聚合、自动补全、数据同步、es集群

目录

数据聚合

聚合的分类

DSL实现bucket聚合

DSL实现Metrics聚合

RestAPI实现聚合

多条件聚合

带过滤条件的聚合

自动补全

安装拼音分词器

自定义分词器

completion suggester查询

修改索引库数据结构

RestAPI实现自动补全查询

实现搜索框自动补全

数据同步

数据同步问题分析

导入Hotel-admin

声明队列和交换机 

发送mq消息

监听MQ消息

测试同步功能

es集群

集群结果介绍

搭建集群

集群职责及脑裂

ES集群的分布式存储

ES集群的分布式查询 

ES集群的故障转移


什么是聚合?
1、聚合是对文档数据的统计、分析、计算

聚合的常见种类有哪些?
1、Bucket:对文档数据分组,并统计每组数量

2、Metric:对文档数据做计算,例如avg

3、Pipeline:基于其它聚合结果再做聚合

参与聚合的字段类型必须是:
1、keyword
2、数值

3、日期

4、布尔


aggs代表聚合,与query同级,此时query的作用是?

1、限定聚合的的文档范围
聚合必须的三要素:

1、聚合名称
2、聚合类型

3、聚合字段
聚合可配置属性有:

1、size:指定聚合结果数量

2、order:指定聚合结果排序方式

3、field:指定聚合字段


如何使用拼音分词器?

1、下载pinyin分词器
2、解压并放到elasticsearch的plugin目录

3、重启即可
如何自定义分词器?
1、创建索引库时,在settings中配置,可以包含三部分

2、character filter
3、tokenizer

4、filter
拼音分词器注意事项?
1、为了避免搜索到同音字,搜索时不要使用拼音分词器


自动补全对字段的要求:

1、类型是completion类型

2、字段值是多词条的数组


方式一:同步调用

1、优点:实现简单,粗暴

2、缺点:业务耦合度高

方式二:异步通知
1、优点:低耦合,实现难度一般

2、缺点:依赖mq的可靠性

方式三:监听binlog
1、优点:完全解除服务间耦合
2、缺点:开启biniog增加数据库负担、实现复杂度高


master eligible节点的作用是什么?

1、参与集群选主
2、主节点可以管理集群状态、管理分片信息、处理创建和删除索引库的请求
data节点的作用是什么?

1、数据的CRUD
coordinator节点的作用是什么?

1、路由请求到其它节点
2、合并查询到的结果,返回给用户


分布式新增如何确定分片?
1、coordinating node根据id做hash运算,得到结果对shard数量取余,余数就是对应的分片
分布式查询:
1、分散阶段: coordinating node将查询请求分发给不同分片

2、收集阶段:将查询结果汇总到coordinating node ,整理并返回给用户


故障转移:
1、master宕机后,EligibleMaster选举为新的主节点。
2、master节点监控分片、节点状态,将故障节点上的分片转移到正常节点,确保数据安全。

数据聚合

聚合的分类

聚合(aggregations)可以实现对文档数据的统计、分析、运算

聚合常见的有三类:

桶(Bucket)聚合:用来对文档做分组
1、TermAggregation:按照文档字段值分组
2、Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组

度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
1、Avg:求平均值
2、Max:求最大值

3、Min:求最小值
4、Stats:同时求max、min、avg、sum等
管道(pipeline)聚合:其它聚合的结果为基础做聚合

DSL实现bucket聚合

统计所有数据中的酒店品牌有几种,此时可以根据酒店品牌的名称做聚合

GET /hotel/_search
{"size": 0,              #size:设置size为0,结果中不包含文档,只包含聚合结果"aggs": {               #aggs:定义聚合"brandAgg": {         #brandAgg:给聚合起个名字"terms": {          #terms:聚合的类型,按照品牌值聚合,所以选term"field": "brand", #field:参与聚合的字段"size": 20        #size:希望获取的聚合结果数量} }}
}

buckets里的doc_count是指该key有多少个

默认情况下,Bucket聚合会统计Bucket内的文档数量,记为_count,并且按照_count降序排序 

 GET /hotel/_search
{"size": 0,"aggs": {"brandAgg": {"terms": {"field": "brand","size": 20,"order": {"_count": "asc"}}}}
}

此时设置为升序,则右边的数据是升序排序 

默认情况下,Bucket聚合是对索引库的所有文档做聚合,我们可以限定要聚合的文档范围,只要添加query条件即可 

GET /hotel/_search
{"query": {"range": {"price": {"lte": 200}}}, "size": 0,"aggs": {"brandAgg": {"terms": {"field": "brand","size": 20}}}
}

此时只会查price小于等于200的数据 

DSL实现Metrics聚合

要求获取每个品牌的用户评分的min、max、avg等值

可以利用stats聚合:

GET /hotel/_search
{"size": 0,"aggs": {"brandAgg": {"terms": {"field": "brand","size": 20,"order": {"scoreAgg.avg": "desc"}},"aggs": {               #是brands聚合的子集合,也就是分组后对每组分别计算"scoreAgg": {         #聚合名称"stats": {          #聚合类型,这里的stats可以计算min,max,avg等"field": "score"  #聚合字段,这里是score}}}}}
}

统计出了每个buckets的min,max,avg,sum了 

RestAPI实现聚合

请求组装

HotelSearchTest.java

@Test
void testAggregation() throws IOException {//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSL//2.1、设置sizerequest.source().size(0);//2.2、聚合request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(10));//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析响应Aggregations aggregations = response.getAggregations();//4.1、根据聚合名称获聚合结果Terms brandTerms = aggregations.get("brandAgg");//4.2、获取bucketsList<? extends Terms.Bucket> buckets = brandTerms.getBuckets();//4.3、遍历for(Terms.Bucket bucket : buckets){//4.4、获取keyString key = bucket.getKeyAsString();System.out.println(key);}
}
package cn.itcast.hotel;import cn.itcast.hotel.pojo.HotelDoc;
import com.alibaba.fastjson.JSON;
import org.apache.http.HttpHost;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilders;
import org.elasticsearch.search.suggest.completion.CompletionSuggester;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.util.CollectionUtils;import javax.naming.directory.SearchResult;
import java.io.IOException;
import java.util.List;
import java.util.Map;import static cn.itcast.hotel.constants.HotelConstants.MAPPING_TEMPLATE;public class HotelSearchTest {private RestHighLevelClient client;@Testvoid testMatchAll() throws IOException {//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSLrequest.source().query(QueryBuilders.matchAllQuery());//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析响应handleResponse(response);}@Testvoid testMatch() throws IOException {//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSLrequest.source().query(QueryBuilders.matchQuery("all","如家"));//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析响应handleResponse(response);}@Testvoid testBool() throws IOException {//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSL//2.1、准备booleanQueryBoolQueryBuilder boolQuery = QueryBuilders.boolQuery();//2.2、添加termboolQuery.must(QueryBuilders.termQuery("city","上海"));//2.3、添加rangeboolQuery.filter(QueryBuilders.rangeQuery("price").lte(250));request.source().query(boolQuery);//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析响应handleResponse(response);}@Testvoid testRageAndSort() throws IOException {//页码,每页大小int page = 2, size = 5;//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSL//2.1、准备queryrequest.source().query(QueryBuilders.matchAllQuery());//2.2、排序sortrequest.source().sort("price", SortOrder.ASC);//2.3、分页from,sizerequest.source().from((page - 1) * size).size(5);//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析响应handleResponse(response);}@Testvoid testHighlight() throws IOException {//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSL//2.1、queryrequest.source().query(QueryBuilders.matchQuery("all","如家"));//2.2、高亮request.source().highlighter(new HighlightBuilder().field("name").requireFieldMatch(false));//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析响应handleResponse(response);}@Testvoid testAggregation() throws IOException {//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSL//2.1、设置sizerequest.source().size(0);//2.2、聚合request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(10));//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析响应Aggregations aggregations = response.getAggregations();//4.1、根据聚合名称获聚合结果Terms brandTerms = aggregations.get("brandAgg");//4.2、获取bucketsList<? extends Terms.Bucket> buckets = brandTerms.getBuckets();//4.3、遍历for(Terms.Bucket bucket : buckets){//4.4、获取keyString key = bucket.getKeyAsString();System.out.println(key);}}private void handleResponse(SearchResponse response) {//4、解析响应SearchHits searchHits = response.getHits();//4.1获取总条数long total = searchHits.getTotalHits().value;System.out.println("共搜索到" + total + "条数据");//4.2文档数组SearchHit[] hits = searchHits.getHits();//4.3遍历for (SearchHit hit : hits) {//获取文档sourceString json = hit.getSourceAsString();//反序列化HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);//获取高亮结果Map<String, HighlightField> highlightFields = hit.getHighlightFields();//判断是否有高亮结果if(!CollectionUtils.isEmpty(highlightFields)){//根据字段名获取高亮结果HighlightField highlightField = highlightFields.get("name");//判断是否有字段名高亮if(highlightField != null){//获取高亮值String name = highlightField.getFragments()[0].string();//覆盖非高亮结果hotelDoc.setName(name);}}System.out.println("hotelDoc = " + hotelDoc);}System.out.println(response);}@BeforeEachvoid setUp() {this.client = new RestHighLevelClient(RestClient.builder(HttpHost.create("http://xxx.xxx.xxx.xxx:9200")));}@AfterEachvoid tearDown() throws IOException {this.client.close();}
}

成功获取到聚合结果 

多条件聚合

IHotelService.java

Map<String, List<String>> filters(RequestParams params);
package cn.itcast.hotel.service;import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import com.baomidou.mybatisplus.extension.service.IService;import java.util.List;
import java.util.Map;public interface IHotelService extends IService<Hotel> {PageResult search(RequestParams params);Map<String, List<String>> filters(RequestParams params);}

Hotelservice.java

 @Overridepublic Map<String, List<String>> filters() {try {//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSL//2.2、设置sizerequest.source().size(0);//2.3、聚合buildAggregation(request);//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析响应Map<String, List<String>> result = new HashMap<>();Aggregations aggregations = response.getAggregations();//4.1、根据品牌名称,获取品牌结果List<String> brandList = getAggByName(aggregations,"brandAgg");result.put("brand",brandList);//4.1、根据城市名称,获取品牌结果List<String> cityList = getAggByName(aggregations,"cityAgg");result.put("city",cityList);//4.1、根据星级名称,获取品牌结果List<String> starList = getAggByName(aggregations,"starAgg");result.put("starName",starList);return result;} catch (IOException e) {throw new RuntimeException(e);}}private void buildAggregation(SearchRequest request) {request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(100));request.source().aggregation(AggregationBuilders.terms("cityAgg").field("city").size(100));request.source().aggregation(AggregationBuilders.terms("starAgg").field("starName").size(100));}private void buildBasicQuery(RequestParams params, SearchRequest request) {//构建BooleanQueryBoolQueryBuilder boolQuery = QueryBuilders.boolQuery();//关键字搜索String key = params.getKey();if(key == null || "".equals(key)){boolQuery.must(QueryBuilders.matchAllQuery());}else{boolQuery.must(QueryBuilders.matchQuery("all",key));}//城市条件if(params.getCity() != null && !"".equals(params.getCity())){boolQuery.filter(QueryBuilders.termQuery("city", params.getCity()));}//品牌条件if(params.getBrand() != null && !"".equals(params.getBrand())){boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));}//星级条件if(params.getStarName() != null && !"".equals(params.getStarName())){boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName()));}//价格if(params.getMinPrice() != null && params.getMaxPrice() != null){boolQuery.filter(QueryBuilders.rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice()));}request.source().query(functionScoreQueryBuilder);}private List<String> getAggByName(Aggregations aggregations, String aggName) {//4.1、根据聚合名称获聚合结果Terms brandTerms = aggregations.get(aggName);//4.2、获取bucketsList<? extends Terms.Bucket> buckets = brandTerms.getBuckets();//4.3、遍历List<String> brandList = new ArrayList<>();for(Terms.Bucket bucket : buckets){//4.4、获取keyString key = bucket.getKeyAsString();brandList.add(key);}return brandList;}private PageResult handleResponse(SearchResponse response) {//4、解析响应SearchHits searchHits = response.getHits();//4.1获取总条数long total = searchHits.getTotalHits().value;//4.2文档数组SearchHit[] hits = searchHits.getHits();//4.3遍历List<HotelDoc> hotels = new ArrayList<>();for (SearchHit hit : hits) {//获取文档sourceString json = hit.getSourceAsString();//反序列化HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);//获取距离Object[] sortValues = hit.getSortValues();if(sortValues.length > 0){Object sortValue = sortValues[0];hotelDoc.setDistance(sortValue);}hotels.add(hotelDoc);}//4、封装返回return new PageResult(total,hotels);}
package cn.itcast.hotel.service.impl;import cn.itcast.hotel.mapper.HotelMapper;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.HotelDoc;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import cn.itcast.hotel.service.IHotelService;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.lucene.queries.function.FunctionScoreQuery;
import org.apache.lucene.search.vectorhighlight.ScoreOrderFragmentsBuilder;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilder;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilders;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {@Autowiredprivate RestHighLevelClient client;@Overridepublic PageResult search(RequestParams params) {try {//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSL//2.1、querybuildBasicQuery(params,request);//2.2、分页int page = params.getPage();int size = params.getSize();request.source().from((page - 1) * size).size(size);//2.3、排序String location = params.getLocation();if(location != null && !"".equals(location)){request.source().sort(SortBuilders.geoDistanceSort("location",new GeoPoint(location)).order(SortOrder.ASC).unit(DistanceUnit.KILOMETERS));}//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析响应return handleResponse(response);} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic Map<String, List<String>> filters() {try {//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSL//2.2、设置sizerequest.source().size(0);//2.3、聚合buildAggregation(request);//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析响应Map<String, List<String>> result = new HashMap<>();Aggregations aggregations = response.getAggregations();//4.1、根据品牌名称,获取品牌结果List<String> brandList = getAggByName(aggregations,"brandAgg");result.put("brand",brandList);//4.1、根据城市名称,获取品牌结果List<String> cityList = getAggByName(aggregations,"cityAgg");result.put("city",cityList);//4.1、根据星级名称,获取品牌结果List<String> starList = getAggByName(aggregations,"starAgg");result.put("starName",starList);return result;} catch (IOException e) {throw new RuntimeException(e);}}private void buildAggregation(SearchRequest request) {request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(100));request.source().aggregation(AggregationBuilders.terms("cityAgg").field("city").size(100));request.source().aggregation(AggregationBuilders.terms("starAgg").field("starName").size(100));}private void buildBasicQuery(RequestParams params, SearchRequest request) {//构建BooleanQueryBoolQueryBuilder boolQuery = QueryBuilders.boolQuery();//关键字搜索String key = params.getKey();if(key == null || "".equals(key)){boolQuery.must(QueryBuilders.matchAllQuery());}else{boolQuery.must(QueryBuilders.matchQuery("all",key));}//城市条件if(params.getCity() != null && !"".equals(params.getCity())){boolQuery.filter(QueryBuilders.termQuery("city", params.getCity()));}//品牌条件if(params.getBrand() != null && !"".equals(params.getBrand())){boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));}//星级条件if(params.getStarName() != null && !"".equals(params.getStarName())){boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName()));}//价格if(params.getMinPrice() != null && params.getMaxPrice() != null){boolQuery.filter(QueryBuilders.rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice()));}request.source().query(functionScoreQueryBuilder);}private List<String> getAggByName(Aggregations aggregations, String aggName) {//4.1、根据聚合名称获聚合结果Terms brandTerms = aggregations.get(aggName);//4.2、获取bucketsList<? extends Terms.Bucket> buckets = brandTerms.getBuckets();//4.3、遍历List<String> brandList = new ArrayList<>();for(Terms.Bucket bucket : buckets){//4.4、获取keyString key = bucket.getKeyAsString();brandList.add(key);}return brandList;}private PageResult handleResponse(SearchResponse response) {//4、解析响应SearchHits searchHits = response.getHits();//4.1获取总条数long total = searchHits.getTotalHits().value;//4.2文档数组SearchHit[] hits = searchHits.getHits();//4.3遍历List<HotelDoc> hotels = new ArrayList<>();for (SearchHit hit : hits) {//获取文档sourceString json = hit.getSourceAsString();//反序列化HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);//获取距离Object[] sortValues = hit.getSortValues();if(sortValues.length > 0){Object sortValue = sortValues[0];hotelDoc.setDistance(sortValue);}hotels.add(hotelDoc);}//4、封装返回return new PageResult(total,hotels);}}

HotelDemoApplicationTest.java

@Test
void contextLoads() {Map<String, List<String>> filters = hotelService.filters();System.out.println(filters);
}
package cn.itcast.hotel;import cn.itcast.hotel.service.IHotelService;
import com.fasterxml.jackson.databind.deser.impl.CreatorCandidate;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.util.List;
import java.util.Map;@SpringBootTest
class HotelDemoApplicationTests {@Autowiredprivate IHotelService hotelService;@Testvoid contextLoads() {Map<String, List<String>> filters = hotelService.filters();System.out.println(filters);}}

多条件聚合成功 

带过滤条件的聚合

HotelController.java

@PostMapping("filters")
public Map<String, List<String>> getFilters(@RequestBody RequestParams params){return hotelService.filters(params);
} 
package cn.itcast.hotel.web;import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import cn.itcast.hotel.service.IHotelService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;import java.util.List;
import java.util.Map;@RestController
@RequestMapping("/hotel")
public class HotelController {@Autowiredprivate IHotelService hotelService;@PostMapping("/list")public PageResult search(@RequestBody RequestParams params){return hotelService.search(params);}@PostMapping("filters")public Map<String, List<String>> getFilters(@RequestBody RequestParams params){return hotelService.filters(params);}}

IHotelService.java

Map<String, List<String>> filters(RequestParams params);
package cn.itcast.hotel.service;import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import com.baomidou.mybatisplus.extension.service.IService;import java.util.List;
import java.util.Map;public interface IHotelService extends IService<Hotel> {PageResult search(RequestParams params);Map<String, List<String>> filters(RequestParams params);
}

HotelService.java

  @Override
    public Map<String, List<String>> filters(RequestParams params) {
        try {
            //1、准备Request
            SearchRequest request = new SearchRequest("hotel");
            //2、准备DSL
            //2.1、query
            buildBasicQuery(params, request);
            //2.2、设置size
            request.source().size(0);
            //2.3、聚合
            buildAggregation(request);
            //3、发送请求
            SearchResponse response = client.search(request, RequestOptions.DEFAULT);
            //4、解析响应
            Map<String, List<String>> result = new HashMap<>();
            Aggregations aggregations = response.getAggregations();
            //4.1、根据品牌名称,获取品牌结果
            List<String> brandList = getAggByName(aggregations,"brandAgg");
            result.put("brand",brandList);
            //4.1、根据城市名称,获取品牌结果
            List<String> cityList = getAggByName(aggregations,"cityAgg");
            result.put("city",cityList);
            //4.1、根据星级名称,获取品牌结果
            List<String> starList = getAggByName(aggregations,"starAgg");
            result.put("starName",starList);
            return result;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

    }

    private void buildAggregation(SearchRequest request) {
        request.source().aggregation(AggregationBuilders
                .terms("brandAgg")
                .field("brand")
                .size(100)
        );
        request.source().aggregation(AggregationBuilders
                .terms("cityAgg")
                .field("city")
                .size(100)
        );
        request.source().aggregation(AggregationBuilders
                .terms("starAgg")
                .field("starName")
                .size(100)
        );
    }
 

    private void buildBasicQuery(RequestParams params, SearchRequest request) {
        //构建BooleanQuery
        BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
        //关键字搜索
        String key = params.getKey();
        if(key == null || "".equals(key)){
            boolQuery.must(QueryBuilders.matchAllQuery());
        }else{
            boolQuery.must(QueryBuilders.matchQuery("all",key));
        }
        //城市条件
        if(params.getCity() != null && !"".equals(params.getCity())){
            boolQuery.filter(QueryBuilders.termQuery("city", params.getCity()));
        }
        //品牌条件
        if(params.getBrand() != null && !"".equals(params.getBrand())){
            boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));
        }
        //星级条件
        if(params.getStarName() != null && !"".equals(params.getStarName())){
            boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName()));
        }
        //价格
        if(params.getMinPrice() != null && params.getMaxPrice() != null){
            boolQuery.filter(QueryBuilders.rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice()));
        }

        request.source().query(functionScoreQueryBuilder);
    }

 private List<String> getAggByName(Aggregations aggregations, String aggName) {
        //4.1、根据聚合名称获聚合结果
        Terms brandTerms = aggregations.get(aggName);
        //4.2、获取buckets
        List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
        //4.3、遍历
        List<String> brandList = new ArrayList<>();
        for(Terms.Bucket bucket : buckets){
            //4.4、获取key
            String key = bucket.getKeyAsString();
            brandList.add(key);
        }
        return brandList;
    }

    private PageResult handleResponse(SearchResponse response) {
        //4、解析响应
        SearchHits searchHits = response.getHits();
        //4.1获取总条数
        long total = searchHits.getTotalHits().value;
        //4.2文档数组
        SearchHit[] hits = searchHits.getHits();
        //4.3遍历
        List<HotelDoc> hotels = new ArrayList<>();
        for (SearchHit hit : hits) {
            //获取文档source
            String json = hit.getSourceAsString();
            //反序列化
            HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);
            //获取距离
            Object[] sortValues = hit.getSortValues();
            if(sortValues.length > 0){
                Object sortValue = sortValues[0];
                hotelDoc.setDistance(sortValue);
            }
            hotels.add(hotelDoc);
        }
        //4、封装返回
        return new PageResult(total,hotels);
    }

package cn.itcast.hotel.service.impl;import cn.itcast.hotel.mapper.HotelMapper;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.HotelDoc;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import cn.itcast.hotel.service.IHotelService;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.lucene.queries.function.FunctionScoreQuery;
import org.apache.lucene.search.vectorhighlight.ScoreOrderFragmentsBuilder;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilder;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilders;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {@Autowiredprivate RestHighLevelClient client;@Overridepublic PageResult search(RequestParams params) {try {//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSL//2.1、querybuildBasicQuery(params,request);//2.2、分页int page = params.getPage();int size = params.getSize();request.source().from((page - 1) * size).size(size);//2.3、排序String location = params.getLocation();if(location != null && !"".equals(location)){request.source().sort(SortBuilders.geoDistanceSort("location",new GeoPoint(location)).order(SortOrder.ASC).unit(DistanceUnit.KILOMETERS));}//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析响应return handleResponse(response);} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic Map<String, List<String>> filters(RequestParams params) {try {//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSL//2.1、querybuildBasicQuery(params, request);//2.2、设置sizerequest.source().size(0);//2.3、聚合buildAggregation(request);//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析响应Map<String, List<String>> result = new HashMap<>();Aggregations aggregations = response.getAggregations();//4.1、根据品牌名称,获取品牌结果List<String> brandList = getAggByName(aggregations,"brandAgg");result.put("brand",brandList);//4.1、根据城市名称,获取品牌结果List<String> cityList = getAggByName(aggregations,"cityAgg");result.put("city",cityList);//4.1、根据星级名称,获取品牌结果List<String> starList = getAggByName(aggregations,"starAgg");result.put("starName",starList);return result;} catch (IOException e) {throw new RuntimeException(e);}}private void buildAggregation(SearchRequest request) {request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(100));request.source().aggregation(AggregationBuilders.terms("cityAgg").field("city").size(100));request.source().aggregation(AggregationBuilders.terms("starAgg").field("starName").size(100));}private void buildBasicQuery(RequestParams params, SearchRequest request) {//构建BooleanQueryBoolQueryBuilder boolQuery = QueryBuilders.boolQuery();//关键字搜索String key = params.getKey();if(key == null || "".equals(key)){boolQuery.must(QueryBuilders.matchAllQuery());}else{boolQuery.must(QueryBuilders.matchQuery("all",key));}//城市条件if(params.getCity() != null && !"".equals(params.getCity())){boolQuery.filter(QueryBuilders.termQuery("city", params.getCity()));}//品牌条件if(params.getBrand() != null && !"".equals(params.getBrand())){boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));}//星级条件if(params.getStarName() != null && !"".equals(params.getStarName())){boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName()));}//价格if(params.getMinPrice() != null && params.getMaxPrice() != null){boolQuery.filter(QueryBuilders.rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice()));}request.source().query(functionScoreQueryBuilder);}private List<String> getAggByName(Aggregations aggregations, String aggName) {//4.1、根据聚合名称获聚合结果Terms brandTerms = aggregations.get(aggName);//4.2、获取bucketsList<? extends Terms.Bucket> buckets = brandTerms.getBuckets();//4.3、遍历List<String> brandList = new ArrayList<>();for(Terms.Bucket bucket : buckets){//4.4、获取keyString key = bucket.getKeyAsString();brandList.add(key);}return brandList;}private PageResult handleResponse(SearchResponse response) {//4、解析响应SearchHits searchHits = response.getHits();//4.1获取总条数long total = searchHits.getTotalHits().value;//4.2文档数组SearchHit[] hits = searchHits.getHits();//4.3遍历List<HotelDoc> hotels = new ArrayList<>();for (SearchHit hit : hits) {//获取文档sourceString json = hit.getSourceAsString();//反序列化HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);//获取距离Object[] sortValues = hit.getSortValues();if(sortValues.length > 0){Object sortValue = sortValues[0];hotelDoc.setDistance(sortValue);}hotels.add(hotelDoc);}//4、封装返回return new PageResult(total,hotels);}}

此时,只要使用条件,就会影响到其他的条件。比如选择某个地点,或输入某个品牌,就会减少一些条件。例如:选择100-300元,就不会出现五星级

自动补全

安装拼音分词器

pyhttps://pan.baidu.com/s/1gPkxVU6dJL7mRVaPYwNmYQ?pwd=hano

把该文件夹放到/var/lib/docker/volumes/es-plugins/_data目录下 

自定义分词器

elasticsearch中分词器(analyzer)的组成包含三部分

1、character filters:在tokenizer之前对文本进行处理。例如删除字符、替换字符
2、tokenizer:将文本按照一定的规则切割成词条(term)。例如keyword,就是不分词;还有ik_smart
3、tokenizer filter:将tokenizer输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等 

我们可以在创建索引库时,通过settings来配置自定义的analyzer(分词器) 

PUT /test
{"settings": {"analysis": {"analyzer": {"my_analyzer": {"tokenizer": "ik_max_word","filter": "py"}},"filter": {"py": {"type": "pinyin","keep_full_pinyin": false,"keep_joined_full_pinyin": true,"keep_original": true,"limit_first_letter_length": 16,"remove_duplicated_term": true,"none_chinese_pinyin_tokenize": false}}}},"mappings": {"properties": {"name": {"type": "text","analyzer": "my_analyzer",}}}
}

创建成功 

可以看到,汉字、拼音、拼音首字母的类型都有 

拼音分词器适合在创建倒排索引的时候使用,但不能在搜索的时候使用。

这里创建两条倒排索引,并搜索第三条命令

POST /test/_doc/1
{"id": 1,"name": "狮子"
}
POST /test/_doc/2
{"id": 2,"name": "虱子"
}GET /test/_search
{"query": {"match": {"name": "掉入狮子笼咋办"}}
}

此时会发现两个shizi都搜索到了 

因此字段在创建倒排索引时应该用my_analyzer分词器;字段在搜索时应该使用ik_smart分词器 

PUT /test
{"settings": {"analysis": {"analyzer": {"my_analyzer": {"tokenizer": "ik_max_word","filter": "py"}},"filter": {"py": {"type": "pinyin","keep_full_pinyin": false,"keep_joined_full_pinyin": true,"keep_original": true,"limit_first_letter_length": 16,"remove_duplicated_term": true,"none_chinese_pinyin_tokenize": false}}}},"mappings": {"properties": {"name": {"type": "text","analyzer": "my_analyzer","search_analyzer": "ik_smart"}}}
}

再次创建这两个倒排索引,查询第三条命令

POST /test/_doc/1
{"id": 1,"name": "狮子"
}
POST /test/_doc/2
{"id": 2,"name": "虱子"
}GET /test/_search
{"query": {"match": {"name": "掉入狮子笼咋办"}}
}

此时发现,同音字问题解决了 

completion suggester查询

elasticsearch提供了Completion Suggester查询来实现自动补全功能。这个查询会匹配以用户输入内容开头的词条并返回。为了提高补全查询的效率,对于文档中字段的类型有一些约束:

1、参与补全查询的字段必须是completion类型。
2、字段的内容一般是用来补全的多个词条形成的数组。

创建索引库

PUT test2
{"mappings": {"properties": {"title": {"type": "completion"}}}
}

插入三条数据

POST test2/_doc
{"title": ["Sony","WH-1000XM3"]
}
POST test2/_doc
{"title": ["SK-II","PITERA"]
}
POST test2/_doc
{"title": ["Nintendo","switch"]
}

自动补全查询

GET /test2/_search
{"suggest": {"titleSuggest": {"text": "s","completion": {"field": "title","skip_duplicates": true,"size": 10}}}
}

可以看到,所以我只搜了s,但是S开头的数据都查询到了 

修改索引库数据结构

创建索引库

PUT /hotel
{"settings": {"analysis": {"analyzer": {"text_anlyzer": {"tokenizer": "ik_max_word","filter": "py"},"completion_analyzer": {"tokenizer": "keyword","filter": "py"}},"filter": {"py": {"type": "pinyin","keep_full_pinyin": false,"keep_joined_full_pinyin": true,"keep_original": true,"limit_first_letter_length": 16,"remove_duplicated_term": true,"none_chinese_pinyin_tokenize": false}}}},"mappings": {"properties": {"id":{"type": "keyword"},"name":{"type": "text","analyzer": "text_anlyzer","search_analyzer": "ik_smart","copy_to": "all"},"address":{"type": "keyword","index": false},"price":{"type": "integer"},"score":{"type": "integer"},"brand":{"type": "keyword","copy_to": "all"},"city":{"type": "keyword"},"starName":{"type": "keyword"},"business":{"type": "keyword","copy_to": "all"},"location":{"type": "geo_point"},"pic":{"type": "keyword","index": false},"all":{"type": "text","analyzer": "text_anlyzer","search_analyzer": "ik_smart"},"suggestion":{"type": "completion","analyzer": "completion_analyzer"}}}
}

HotelDoc.java

package cn.itcast.hotel.pojo;import lombok.Data;
import lombok.NoArgsConstructor;import java.util.*;@Data
@NoArgsConstructor
public class HotelDoc {private Long id;private String name;private String address;private Integer price;private Integer score;private String brand;private String city;private String starName;private String business;private String location;private String pic;private Object distance;private Boolean isAD;private List<String> suggestion;public HotelDoc(Hotel hotel) {this.id = hotel.getId();this.name = hotel.getName();this.address = hotel.getAddress();this.price = hotel.getPrice();this.score = hotel.getScore();this.brand = hotel.getBrand();this.city = hotel.getCity();this.starName = hotel.getStarName();this.business = hotel.getBusiness();this.location = hotel.getLatitude() + ", " + hotel.getLongitude();this.pic = hotel.getPic();if(this.business.contains("、")){// business有多个值,需要切割String[] arr = this.business.split("、");//添加元素this.suggestion = new ArrayList<>();this.suggestion.add(this.brand);Collections.addAll(this.suggestion,arr);}else {this.suggestion = Arrays.asList(this.brand, this.business);}}
}

运行HotelDocmentTest.java的testBulkRequest()

@Test
void testBulkRequest() throws IOException {//批量查询酒店数据List<Hotel> hotels = hotelService.list();//1、创建RequestBulkRequest request = new BulkRequest();//2、准备参数,添加多个新增的Requestfor(Hotel hotel:hotels){//转换为文档类型HotelDocHotelDoc hotelDoc = new HotelDoc(hotel);//创建新增文档的Request对象request.add(new IndexRequest("hotel").id(hotelDoc.getId().toString()).source(JSON.toJSONString(hotelDoc),XContentType.JSON));}//3、发送请求client.bulk(request,RequestOptions.DEFAULT);
}

测试自动补全

GET /hotel/_search
{"suggest": {"suggestions": {"text": "sd","completion": {"field": "suggestion","skip_duplicates": true,"size": 10}}}
}

可以看到自动补全成功。只输入sd能把所有sd开头的text和s开头加d开头的text的数据查询出来

RestAPI实现自动补全查询

@Test
void testSuggest() throws IOException {//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSLrequest.source().suggest(new SuggestBuilder().addSuggestion("suggestions",SuggestBuilders.completionSuggestion("suggestion").prefix("h").skipDuplicates(true).size(10)));//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析结果Suggest suggest = response.getSuggest();//4.1、根据补全查询名称,获取补全结果CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");//4.2、获取optionsList<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();//4.3、遍历for(CompletionSuggestion.Entry.Option option : options) {String text = option.getText().toString();System.out.println(text);}
}
package cn.itcast.hotel;import cn.itcast.hotel.pojo.HotelDoc;
import com.alibaba.fastjson.JSON;
import org.apache.http.HttpHost;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilders;
import org.elasticsearch.search.suggest.completion.CompletionSuggester;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.util.CollectionUtils;import javax.naming.directory.SearchResult;
import java.io.IOException;
import java.util.List;
import java.util.Map;import static cn.itcast.hotel.constants.HotelConstants.MAPPING_TEMPLATE;public class HotelSearchTest {private RestHighLevelClient client;@Testvoid testMatchAll() throws IOException {//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSLrequest.source().query(QueryBuilders.matchAllQuery());//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析响应handleResponse(response);}@Testvoid testMatch() throws IOException {//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSLrequest.source().query(QueryBuilders.matchQuery("all","如家"));//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析响应handleResponse(response);}@Testvoid testBool() throws IOException {//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSL//2.1、准备booleanQueryBoolQueryBuilder boolQuery = QueryBuilders.boolQuery();//2.2、添加termboolQuery.must(QueryBuilders.termQuery("city","上海"));//2.3、添加rangeboolQuery.filter(QueryBuilders.rangeQuery("price").lte(250));request.source().query(boolQuery);//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析响应handleResponse(response);}@Testvoid testRageAndSort() throws IOException {//页码,每页大小int page = 2, size = 5;//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSL//2.1、准备queryrequest.source().query(QueryBuilders.matchAllQuery());//2.2、排序sortrequest.source().sort("price", SortOrder.ASC);//2.3、分页from,sizerequest.source().from((page - 1) * size).size(5);//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析响应handleResponse(response);}@Testvoid testHighlight() throws IOException {//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSL//2.1、queryrequest.source().query(QueryBuilders.matchQuery("all","如家"));//2.2、高亮request.source().highlighter(new HighlightBuilder().field("name").requireFieldMatch(false));//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析响应handleResponse(response);}@Testvoid testAggregation() throws IOException {//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSL//2.1、设置sizerequest.source().size(0);//2.2、聚合request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(10));//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析响应Aggregations aggregations = response.getAggregations();//4.1、根据聚合名称获聚合结果Terms brandTerms = aggregations.get("brandAgg");//4.2、获取bucketsList<? extends Terms.Bucket> buckets = brandTerms.getBuckets();//4.3、遍历for(Terms.Bucket bucket : buckets){//4.4、获取keyString key = bucket.getKeyAsString();System.out.println(key);}}@Testvoid testSuggest() throws IOException {//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSLrequest.source().suggest(new SuggestBuilder().addSuggestion("suggestions",SuggestBuilders.completionSuggestion("suggestion").prefix("h").skipDuplicates(true).size(10)));//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析结果Suggest suggest = response.getSuggest();//4.1、根据补全查询名称,获取补全结果CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");//4.2、获取optionsList<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();//4.3、遍历for(CompletionSuggestion.Entry.Option option : options) {String text = option.getText().toString();System.out.println(text);}}private void handleResponse(SearchResponse response) {//4、解析响应SearchHits searchHits = response.getHits();//4.1获取总条数long total = searchHits.getTotalHits().value;System.out.println("共搜索到" + total + "条数据");//4.2文档数组SearchHit[] hits = searchHits.getHits();//4.3遍历for (SearchHit hit : hits) {//获取文档sourceString json = hit.getSourceAsString();//反序列化HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);//获取高亮结果Map<String, HighlightField> highlightFields = hit.getHighlightFields();//判断是否有高亮结果if(!CollectionUtils.isEmpty(highlightFields)){//根据字段名获取高亮结果HighlightField highlightField = highlightFields.get("name");//判断是否有字段名高亮if(highlightField != null){//获取高亮值String name = highlightField.getFragments()[0].string();//覆盖非高亮结果hotelDoc.setName(name);}}System.out.println("hotelDoc = " + hotelDoc);}System.out.println(response);}@BeforeEachvoid setUp() {this.client = new RestHighLevelClient(RestClient.builder(HttpHost.create("http://192.168.80.130:9200")));}@AfterEachvoid tearDown() throws IOException {this.client.close();}
}

可以看到,只查询h,就能查到h开头的数据 

实现搜索框自动补全

HotelController.java

    @GetMapping("suggestion")
    public List<String> getSuggestions(@RequestParam("key")String prefix){
        return hotelService.getSuggestions(prefix);
    }

package cn.itcast.hotel.web;import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import cn.itcast.hotel.service.IHotelService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;import java.util.List;
import java.util.Map;@RestController
@RequestMapping("/hotel")
public class HotelController {@Autowiredprivate IHotelService hotelService;@PostMapping("/list")public PageResult search(@RequestBody RequestParams params){return hotelService.search(params);}@PostMapping("filters")public Map<String, List<String>> getFilters(@RequestBody RequestParams params){return hotelService.filters(params);}@GetMapping("suggestion")public List<String> getSuggestions(@RequestParam("key")String prefix){return hotelService.getSuggestions(prefix);}
}

IHotelService.java

 List<String> getSuggestions(String prefix);

package cn.itcast.hotel.service;import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import com.baomidou.mybatisplus.extension.service.IService;import java.util.List;
import java.util.Map;public interface IHotelService extends IService<Hotel> {PageResult search(RequestParams params);Map<String, List<String>> filters(RequestParams params);List<String> getSuggestions(String prefix);}

HotelService.java

@Override
public List<String> getSuggestions(String prefix) {try {//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSLrequest.source().suggest(new SuggestBuilder().addSuggestion("suggestions",SuggestBuilders.completionSuggestion("suggestion").prefix(prefix).skipDuplicates(true).size(10)));//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析结果Suggest suggest = response.getSuggest();//4.1、根据补全查询名称,获取补全结果CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");//4.2、获取optionsList<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();//4.3、遍历List<String> list = new ArrayList<>(options.size());for(CompletionSuggestion.Entry.Option option : options) {String text = option.getText().toString();list.add(text);}return list;} catch (IOException e) {throw new RuntimeException(e);}
}
package cn.itcast.hotel.service.impl;import cn.itcast.hotel.mapper.HotelMapper;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.HotelDoc;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import cn.itcast.hotel.service.IHotelService;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.lucene.queries.function.FunctionScoreQuery;
import org.apache.lucene.search.vectorhighlight.ScoreOrderFragmentsBuilder;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilder;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilders;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {@Autowiredprivate RestHighLevelClient client;@Overridepublic PageResult search(RequestParams params) {try {//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSL//2.1、querybuildBasicQuery(params,request);//2.2、分页int page = params.getPage();int size = params.getSize();request.source().from((page - 1) * size).size(size);//2.3、排序String location = params.getLocation();if(location != null && !"".equals(location)){request.source().sort(SortBuilders.geoDistanceSort("location",new GeoPoint(location)).order(SortOrder.ASC).unit(DistanceUnit.KILOMETERS));}//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析响应return handleResponse(response);} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic Map<String, List<String>> filters(RequestParams params) {try {//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSL//2.1、querybuildBasicQuery(params, request);//2.2、设置sizerequest.source().size(0);//2.3、聚合buildAggregation(request);//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析响应Map<String, List<String>> result = new HashMap<>();Aggregations aggregations = response.getAggregations();//4.1、根据品牌名称,获取品牌结果List<String> brandList = getAggByName(aggregations,"brandAgg");result.put("brand",brandList);//4.1、根据城市名称,获取品牌结果List<String> cityList = getAggByName(aggregations,"cityAgg");result.put("city",cityList);//4.1、根据星级名称,获取品牌结果List<String> starList = getAggByName(aggregations,"starAgg");result.put("starName",starList);return result;} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic List<String> getSuggestions(String prefix) {try {//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSLrequest.source().suggest(new SuggestBuilder().addSuggestion("suggestions",SuggestBuilders.completionSuggestion("suggestion").prefix(prefix).skipDuplicates(true).size(10)));//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析结果Suggest suggest = response.getSuggest();//4.1、根据补全查询名称,获取补全结果CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");//4.2、获取optionsList<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();//4.3、遍历List<String> list = new ArrayList<>(options.size());for(CompletionSuggestion.Entry.Option option : options) {String text = option.getText().toString();list.add(text);}return list;} catch (IOException e) {throw new RuntimeException(e);}}private List<String> getAggByName(Aggregations aggregations, String aggName) {//4.1、根据聚合名称获聚合结果Terms brandTerms = aggregations.get(aggName);//4.2、获取bucketsList<? extends Terms.Bucket> buckets = brandTerms.getBuckets();//4.3、遍历List<String> brandList = new ArrayList<>();for(Terms.Bucket bucket : buckets){//4.4、获取keyString key = bucket.getKeyAsString();brandList.add(key);}return brandList;}private void buildAggregation(SearchRequest request) {request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(100));request.source().aggregation(AggregationBuilders.terms("cityAgg").field("city").size(100));request.source().aggregation(AggregationBuilders.terms("starAgg").field("starName").size(100));}private void buildBasicQuery(RequestParams params, SearchRequest request) {//构建BooleanQueryBoolQueryBuilder boolQuery = QueryBuilders.boolQuery();//关键字搜索String key = params.getKey();if(key == null || "".equals(key)){boolQuery.must(QueryBuilders.matchAllQuery());}else{boolQuery.must(QueryBuilders.matchQuery("all",key));}//城市条件if(params.getCity() != null && !"".equals(params.getCity())){boolQuery.filter(QueryBuilders.termQuery("city", params.getCity()));}//品牌条件if(params.getBrand() != null && !"".equals(params.getBrand())){boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));}//星级条件if(params.getStarName() != null && !"".equals(params.getStarName())){boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName()));}//价格if(params.getMinPrice() != null && params.getMaxPrice() != null){boolQuery.filter(QueryBuilders.rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice()));}request.source().query(functionScoreQueryBuilder);}private PageResult handleResponse(SearchResponse response) {//4、解析响应SearchHits searchHits = response.getHits();//4.1获取总条数long total = searchHits.getTotalHits().value;//4.2文档数组SearchHit[] hits = searchHits.getHits();//4.3遍历List<HotelDoc> hotels = new ArrayList<>();for (SearchHit hit : hits) {//获取文档sourceString json = hit.getSourceAsString();//反序列化HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);//获取距离Object[] sortValues = hit.getSortValues();if(sortValues.length > 0){Object sortValue = sortValues[0];hotelDoc.setDistance(sortValue);}hotels.add(hotelDoc);}//4、封装返回return new PageResult(total,hotels);}}

在搜索框输入x,可以看到出现自动补全

数据同步

数据同步问题分析

elasticsearch中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,elasticsearch也必须跟着改变,这个就是elasticsearch与mysql之间的数据同步。

方案一:同步调用

方案二:异步调用

方案三:监听binlog

导入Hotel-admin

Hotel-adminhttps://pan.baidu.com/s/1lS-PUGaqUWdHEeGHkNrR0w?pwd=5vxx

声明队列和交换机 

在hotel-demo的pom文件

<!--amqp-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.10.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>cn.itcast.hotel</groupId><artifactId>hotel-admin</artifactId><version>0.0.1-SNAPSHOT</version><name>hotel-admin</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.4.2</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><!--amqp--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

在hotel-demo的application.yaml文件

rabbitmq:host: xxx.xxx.xxx.xxxport: 5672username: itcastpassword: 123321virtual-host: /
server:port: 8099
spring:datasource:url: jdbc:mysql://localhost:3306/heima?useSSL=falseusername: rootpassword: 1234driver-class-name: com.mysql.jdbc.Driverrabbitmq:host: xxx.xxx.xxx.xxxport: 5672username: itcastpassword: 123321virtual-host: /
logging:level:cn.itcast: debugpattern:dateformat: MM-dd HH:mm:ss:SSS
mybatis-plus:configuration:map-underscore-to-camel-case: truetype-aliases-package: cn.itcast.hotel.pojo

在hotel-demo的constants包创建MqConstants.java

package cn.itcast.hotel.constants;public class MqConstants {//交换机public final static String HOTEL_EXCHANGE = "hotel.topic";//监听新增和修改的队列public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";//监听删除的队列public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";//新增或修改的RoutingKeypublic final static String HOTEL_INSERT_KEY = "hotel.insert";//删除的RoutingKeypublic final static String HOTEL_DELETE_KEY = "hotel.delete";}

在hotel-demo创建config包创建MqConfig.java

package cn.itcast.hotel.config;import cn.itcast.hotel.constants.MqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MqConfig {@Beanpublic TopicExchange topicExchange(){return new TopicExchange(MqConstants.HOTEL_EXCHANGE,true,true);}@Beanpublic Queue insertQueue(){return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true);}@Beanpublic Queue deleteQueue(){return new Queue(MqConstants.HOTEL_DELETE_QUEUE,true);}@Beanpublic Binding insertQueueBinding(){return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);}@Beanpublic Binding deleteQueueBinding(){return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);}
}

发送mq消息

在hotel-admin创建constants包,并把hotel-demo的MqConstants.java复制过来

package cn.itcast.hotel.constants;public class MqConstants {//交换机public final static String HOTEL_EXCHANGE = "hotel.topic";//监听新增和修改的队列public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";//监听删除的队列public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";//新增或修改的RoutingKeypublic final static String HOTEL_INSERT_KEY = "hotel.insert";//删除的RoutingKeypublic final static String HOTEL_DELETE_KEY = "hotel.delete";}

在hotel-admin的pom文件

<!--amqp-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.10.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>cn.itcast.hotel</groupId><artifactId>hotel-admin</artifactId><version>0.0.1-SNAPSHOT</version><name>hotel-admin</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.4.2</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><!--amqp--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

在hotel-admin的application.yaml文件

rabbitmq:host: xxx.xxx.xxx.xxxport: 5672username: itcastpassword: 123321virtual-host: /
server:port: 8099
spring:datasource:url: jdbc:mysql://localhost:3306/heima?useSSL=falseusername: rootpassword: 1234driver-class-name: com.mysql.jdbc.Driverrabbitmq:host: xxx.xxx.xxx.xxxport: 5672username: itcastpassword: 123321virtual-host: /
logging:level:cn.itcast: debugpattern:dateformat: MM-dd HH:mm:ss:SSS
mybatis-plus:configuration:map-underscore-to-camel-case: truetype-aliases-package: cn.itcast.hotel.pojo

在hotel-admin的HotelController.java

@PostMapping
public void saveHotel(@RequestBody Hotel hotel){hotelService.save(hotel);rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}@PutMapping()
public void updateById(@RequestBody Hotel hotel){if (hotel.getId() == null) {throw new InvalidParameterException("id不能为空");}hotelService.updateById(hotel);rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
}@DeleteMapping("/{id}")
public void deleteById(@PathVariable("id") Long id) {hotelService.removeById(id);rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);
}
package cn.itcast.hotel.web;import cn.itcast.hotel.constants.MqConstants;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.service.IHotelService;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;import java.security.InvalidParameterException;@RestController
@RequestMapping("hotel")
public class HotelController {@Autowiredprivate IHotelService hotelService;@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/{id}")public Hotel queryById(@PathVariable("id") Long id){return hotelService.getById(id);}@GetMapping("/list")public PageResult hotelList(@RequestParam(value = "page", defaultValue = "1") Integer page,@RequestParam(value = "size", defaultValue = "1") Integer size){Page<Hotel> result = hotelService.page(new Page<>(page, size));return new PageResult(result.getTotal(), result.getRecords());}@PostMappingpublic void saveHotel(@RequestBody Hotel hotel){hotelService.save(hotel);rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());}@PutMapping()public void updateById(@RequestBody Hotel hotel){if (hotel.getId() == null) {throw new InvalidParameterException("id不能为空");}hotelService.updateById(hotel);rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());}@DeleteMapping("/{id}")public void deleteById(@PathVariable("id") Long id) {hotelService.removeById(id);rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);}
}

监听MQ消息

在hotel-demo创建mq包创建HotelListener.java

//监听酒店新增和修改的业务
@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
public void listenHotelInsertOrUpdate(Long id){hotelService.insertById(id);
}//监听酒店删除的业务
@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
public void listenHotelDelete(Long id){hotelService.deleteById(id);
}
package cn.itcast.hotel.mq;import cn.itcast.hotel.constants.MqConstants;
import cn.itcast.hotel.service.IHotelService;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class HotelListener {@Autowiredprivate IHotelService hotelService;//监听酒店新增和修改的业务@RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)public void listenHotelInsertOrUpdate(Long id){hotelService.insertById(id);}//监听酒店删除的业务@RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)public void listenHotelDelete(Long id){hotelService.deleteById(id);}
}

hotel-demo的IHotelService.java

void deleteById(Long id);
void insertById(Long id);
package cn.itcast.hotel.service;import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import com.baomidou.mybatisplus.extension.service.IService;import java.util.List;
import java.util.Map;public interface IHotelService extends IService<Hotel> {PageResult search(RequestParams params);Map<String, List<String>> filters(RequestParams params);List<String> getSuggestions(String prefix);void deleteById(Long id);void insertById(Long id);
}

hotel-demo的HotelService.java

@Override
public void deleteById(Long id) {try {//1、准备RequestDeleteRequest request = new DeleteRequest("hotel", id.toString());//2、发送请求client.delete(request,RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}
}@Override
public void insertById(Long id) {try {//0、根据id查询酒店数据Hotel hotel = getById(id);//转换成文档类型HotelDoc hotelDoc = new HotelDoc(hotel);//1、准备Request对象IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());//2、准备json文档request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);//3、发送请求client.index(request,RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}
}
package cn.itcast.hotel.service.impl;import cn.itcast.hotel.mapper.HotelMapper;
import cn.itcast.hotel.pojo.Hotel;
import cn.itcast.hotel.pojo.HotelDoc;
import cn.itcast.hotel.pojo.PageResult;
import cn.itcast.hotel.pojo.RequestParams;
import cn.itcast.hotel.service.IHotelService;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.lucene.queries.function.FunctionScoreQuery;
import org.apache.lucene.search.vectorhighlight.ScoreOrderFragmentsBuilder;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilder;
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilders;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {@Autowiredprivate RestHighLevelClient client;@Overridepublic PageResult search(RequestParams params) {try {//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSL//2.1、querybuildBasicQuery(params,request);//2.2、分页int page = params.getPage();int size = params.getSize();request.source().from((page - 1) * size).size(size);//2.3、排序String location = params.getLocation();if(location != null && !"".equals(location)){request.source().sort(SortBuilders.geoDistanceSort("location",new GeoPoint(location)).order(SortOrder.ASC).unit(DistanceUnit.KILOMETERS));}//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析响应return handleResponse(response);} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic Map<String, List<String>> filters(RequestParams params) {try {//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSL//2.1、querybuildBasicQuery(params, request);//2.2、设置sizerequest.source().size(0);//2.3、聚合buildAggregation(request);//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析响应Map<String, List<String>> result = new HashMap<>();Aggregations aggregations = response.getAggregations();//4.1、根据品牌名称,获取品牌结果List<String> brandList = getAggByName(aggregations,"brandAgg");result.put("brand",brandList);//4.1、根据城市名称,获取品牌结果List<String> cityList = getAggByName(aggregations,"cityAgg");result.put("city",cityList);//4.1、根据星级名称,获取品牌结果List<String> starList = getAggByName(aggregations,"starAgg");result.put("starName",starList);return result;} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic List<String> getSuggestions(String prefix) {try {//1、准备RequestSearchRequest request = new SearchRequest("hotel");//2、准备DSLrequest.source().suggest(new SuggestBuilder().addSuggestion("suggestions",SuggestBuilders.completionSuggestion("suggestion").prefix(prefix).skipDuplicates(true).size(10)));//3、发送请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);//4、解析结果Suggest suggest = response.getSuggest();//4.1、根据补全查询名称,获取补全结果CompletionSuggestion suggestions = suggest.getSuggestion("suggestions");//4.2、获取optionsList<CompletionSuggestion.Entry.Option> options = suggestions.getOptions();//4.3、遍历List<String> list = new ArrayList<>(options.size());for(CompletionSuggestion.Entry.Option option : options) {String text = option.getText().toString();list.add(text);}return list;} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void deleteById(Long id) {try {//1、准备RequestDeleteRequest request = new DeleteRequest("hotel", id.toString());//2、发送请求client.delete(request,RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void insertById(Long id) {try {//0、根据id查询酒店数据Hotel hotel = getById(id);//转换成文档类型HotelDoc hotelDoc = new HotelDoc(hotel);//1、准备Request对象IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());//2、准备json文档request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);//3、发送请求client.index(request,RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}}private List<String> getAggByName(Aggregations aggregations, String aggName) {//4.1、根据聚合名称获聚合结果Terms brandTerms = aggregations.get(aggName);//4.2、获取bucketsList<? extends Terms.Bucket> buckets = brandTerms.getBuckets();//4.3、遍历List<String> brandList = new ArrayList<>();for(Terms.Bucket bucket : buckets){//4.4、获取keyString key = bucket.getKeyAsString();brandList.add(key);}return brandList;}private void buildAggregation(SearchRequest request) {request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(100));request.source().aggregation(AggregationBuilders.terms("cityAgg").field("city").size(100));request.source().aggregation(AggregationBuilders.terms("starAgg").field("starName").size(100));}private void buildBasicQuery(RequestParams params, SearchRequest request) {//构建BooleanQueryBoolQueryBuilder boolQuery = QueryBuilders.boolQuery();//关键字搜索String key = params.getKey();if(key == null || "".equals(key)){boolQuery.must(QueryBuilders.matchAllQuery());}else{boolQuery.must(QueryBuilders.matchQuery("all",key));}//城市条件if(params.getCity() != null && !"".equals(params.getCity())){boolQuery.filter(QueryBuilders.termQuery("city", params.getCity()));}//品牌条件if(params.getBrand() != null && !"".equals(params.getBrand())){boolQuery.filter(QueryBuilders.termQuery("brand", params.getBrand()));}//星级条件if(params.getStarName() != null && !"".equals(params.getStarName())){boolQuery.filter(QueryBuilders.termQuery("starName", params.getStarName()));}//价格if(params.getMinPrice() != null && params.getMaxPrice() != null){boolQuery.filter(QueryBuilders.rangeQuery("price").gte(params.getMinPrice()).lte(params.getMaxPrice()));}//算分控制FunctionScoreQueryBuilder functionScoreQueryBuilder =//构建ScoreQueryBuilders.functionScoreQuery(//原始查询,相关性算分boolQuery,//function score的数组new FunctionScoreQueryBuilder.FilterFunctionBuilder[]{//其中一个function score 元素new FunctionScoreQueryBuilder.FilterFunctionBuilder(//过滤条件QueryBuilders.termQuery("isAD",true),//算分函数ScoreFunctionBuilders.weightFactorFunction(10))});request.source().query(functionScoreQueryBuilder);}private PageResult handleResponse(SearchResponse response) {//4、解析响应SearchHits searchHits = response.getHits();//4.1获取总条数long total = searchHits.getTotalHits().value;//4.2文档数组SearchHit[] hits = searchHits.getHits();//4.3遍历List<HotelDoc> hotels = new ArrayList<>();for (SearchHit hit : hits) {//获取文档sourceString json = hit.getSourceAsString();//反序列化HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);//获取距离Object[] sortValues = hit.getSortValues();if(sortValues.length > 0){Object sortValue = sortValues[0];hotelDoc.setDistance(sortValue);}hotels.add(hotelDoc);}//4、封装返回return new PageResult(total,hotels);}}

测试同步功能

运行hotel-demo和hotel-admin

在RabbitMQ中可以看到两个新的队列

交换机也有一个新的

在交换机里可以看到两个队列

在8099端口中,删除一个数据

在RabbitMQ可以看到一条消息 

并且在8089端口该数据确实找不到了

es集群

集群结果介绍

单机的elasticsearch做数据存储,必然面临两个问题:海量数据存储问题、单点故障问题。

1、海量数据存储问题:将索引库从逻辑上拆分为N个分片(shard),存储到多个节点

2、单点故障问题:将分片数据在不同节点备份(replica)

搭建集群

把该文件放到/root里

decker-composehttps://pan.baidu.com/s/1TJZz4GSoy6CA5DbC3IeL-w?pwd=a87u

运行指令

vi /etc/sysctl.conf 

在最后一行后面加上一条

vm.max_map_count=262144 

运行指令,若能看到刚刚输入的指令,则说明成功

sysctl -p 

运行指令,可以运行三个新的容器。注意一定要先关闭以前运行的es,否则会报错

docker-compose up -d 

解压该文件 

cerebrohttps://pan.baidu.com/s/1GlX4vsf1RQj1ocYufPDx5A?pwd=p0g9

运行lesson的cerebro-0.9.4的bin的cerebro.bat文件。若闪退,则编辑该文件。加上这两行。

注意D:\jdk\jdk8是你的jdk文件位置。闪退的原因是你的jdk等级过高,请下载jdk8并把这个位置写上你的jdk8的文件位置

访问localhost:9000。这里输入虚拟机ip加端口9200

可以看到集群中的节点信息。实心五角星是主节点,其他是候选节点

点击此处,创建索引库

name是索引库名字,number of shards是几个片,number of replicas是几个备份。最后点create创建

回到主页,可以看到每个片都有两个,因为每个片都有一个,而且我设置了一个备份,所以每个片有两个。实线框是主分片,虚线框是副本分片。

可以注意到,每个号码的主分片和副本分片一定不在同一个机器上。确保有机器宕机,不会造成数据故障。

集群职责及脑裂

slasticsearch中集群节点有不同的职责划分,因此建议集群部署时,每个节点都有独立的角色

节点类型配置参数默认值节点职责
master eligiblenode.mastertrue备选主节点:主节点可以管理和记录集群状态、决定分片在哪个节点、处理创建和删除索引库的请求
 
datanode.datatrue数据节点:存储数据、搜索、聚合、CRUD
ingestnode.ingesttrue数据存储之前的预处理
coordinating上面3个参数都为false
则为coordinating节点
路由请求到其它节点
合并其它节点处理的结果,返回给用户

默认情况下,每个节点都是master eligible节点,因此一旦master节点宕机,其它候选节点会选举一个成为主节点。当主节点与其他节点网络故障时,可能发生脑裂问题。

为了避免脑裂,需要要求选票超过( eligible节点数量+1 )/2才能当选为主,因此eligible节点数量最好是奇数。对应配置项是discovery.zen.minimum_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题

ES集群的分布式存储

elasticsearch会通过hash算法来计算文档应该存储到哪个分片:

公式:

shard = hash(_routing)% number_of_shards

说明:
1、_routing默认是文档的id
2、算法与分片数量有关,因此索引库一旦创建,分片数量不能修改!

ES集群的分布式查询 

elasticsearch的查询分成两个阶段:
1、scatter phase:分散阶段,coordinating node会把请求分发到每一个分片
2、gather phase:聚集阶段,coordinating node汇总data node的搜索结果,并处理为最终结果集返回给用户

ES集群的故障转移

集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其它节点,确保数据安全,这个叫做故障转移。 

当我停掉es01后,es01的分片分配给了其他的机器

当我重启es后,es01的分片又回来了

数据并未发生故障 

上一篇:DSL查询语法和RestClient查询文档​​​​​​​ 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/236034.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

华为HarmonyOS 创建第一个鸿蒙应用 运行Hello World

使用DevEco Studio创建第一个项目 Hello World 1.创建项目2.预览 Hello World3.安装模拟器4.运行 Hello World 1.创建项目 创建第一个项目&#xff0c;命名为HelloWorld&#xff0c;点击Finish 选择Empty Ability模板&#xff0c;点击Next Hello World 项目已经成功创建…

【VRTK】【VR开发】【Unity】19-VRTK实现旋转运动

课程配套学习项目源码资源下载 https://download.csdn.net/download/weixin_41697242/88485426?spm=1001.2014.3001.5503 【背景】 在实际开发中,旋转运动也是时常需要模拟的重要运动类型。常见的场景有开关门,方向盘轮胎以及拉动拉杆等等。 旋转运动的实现可以基于物理系…

使用 matlab 求解最小二乘问题

有约束线性最小二乘 其标准形式为&#xff1a; min ⁡ x 1 2 ∥ C x − d ∥ 2 2 \mathop {\min }\limits_x \quad \frac{1}{2}\left\| Cx-d \right\|_2^2 xmin​21​∥Cx−d∥22​ 约束条件为&#xff1a; A ⋅ x ≤ b A e q ⋅ x b e q l b ≤ x ≤ u b \begin{aligned} …

基于多反应堆的高并发服务器【C/C++/Reactor】(中)HttpResponse的定义和初始化 以及组织 HttpResponse 响应消息

一、HttpResponse的定义 1.定义状态码枚举 // 定义状态码枚举 enum HttpStatusCode {Unknown 0,OK 200,MovedPermanently 301,MovedTemporarily 302,BadRequest 400,NotFound 404 }; 2.HTTP 响应报文格式 这个数据块主要是分为四部分 第一部分是状态行第二部分是响应…

游戏、设计选什么内存条?光威龙武系列DDR5量大管饱

如果你是一位PC玩家或者创作者&#xff0c;日常工作娱乐中&#xff0c;确实少不了大容量高频内存的支持&#xff0c;这样可以获得更高的工作效率&#xff0c;光威龙武系列DDR5内存条无疑是理想之选。它可以为计算机提供强劲的性能表现和稳定的运行体验&#xff0c;让我们畅玩游…

无监督学习Principal Component Analysis(PCA)精简高维数据

目录 介绍 一、PCA之前 二、PCA之后 介绍 Principal Component Analysis (PCA) 是一种常用的数据降维和特征提取技术。PCA通过线性变换将高维数据映射到低维空间&#xff0c;从而得到数据的主要特征。PCA的目标是找到一个正交基的集合&#xff0c;使得将数据投影到这些基…

【C语言小游戏】贪吃蛇

文章目录 1.引言2.运行图2.涉及知识3 Windows API3.1 控制台3.2 控制台屏幕坐标3.3 操作句柄3.4 控制台屏幕光标3.5 监视按键 4. 设计说明5. 完整代码 1.引言 使⽤C语⾔在Windows环境的控制台中模拟实现经典⼩游戏贪吃蛇 实现基本的功能&#xff1a; 贪吃蛇地图绘制蛇吃⻝物的…

vue简体繁体互转无需做字库

第一种方法 vue-i18n 需要自己写字库库很麻烦,而且不支持后端传值 第二种 opencc 这个库前端去使用的时候 数据较多的情况非常慢.影响使用 第三种 language-hk-loader npm i language-hk-loader 从其他博客中看到的一种,很方便不需要写字库,但是在打包的时候去整体的去翻译…

网络层详解

目录 前言 一、IP协议 1、IP协议报头 2、协议字段理解 &#xff08;1&#xff09;4位版本 &#xff08;2&#xff09;4位首部长度 &#xff08;3&#xff09;8位服务类型 &#xff08;4&#xff09;16位总长度 &#xff08;5&#xff09;标识、标志与片偏移 &#xf…

2024-01-11 部署Stable Diffusion遇挫记

点击 <C 语言编程核心突破> 快速C语言入门 部署Stable Diffusion遇挫记 前言一、一如既往的GitHub部署二、使用的感受总结 create by Stable Diffusion; prompt: fire water llama 前言 要解决问题: 由于近期的努力, 已经实现语音转文字模型, 通用chat迷你大模型的本地…

后端程序员开发win小工具(未完待续)

github&#xff1a;https://gitee.com/forgot940629/win-tool-demo 本地启动&#xff0c;查看http://127.0.0.1:8080/form 场景 在日常工作中可能需要后端开发者开发一些辅助工具。这些辅助工具通常希望能想其他软件一样在桌面系统运行&#xff0c;并且有一些桌面应用的基本…

dbeaver连接人大金仓报错 can‘t load driver class ‘com.kingbase8.Driver;‘

dbeaver可以连接很多数据库&#xff0c;设置dbeaver连接人大金仓&#xff0c;下载安装完成后&#xff0c;需要自行配置人大金仓的驱动&#xff0c;否则无法连接数据库。 一、dbeaver 下载 dbeaver 下载地址&#xff1a;https://dbeaver.io/download/ 二、查找人大金仓驱动 首…

【开源项目】轻量元数据管理解决方案——Marquez

大家好&#xff0c;我是独孤风。 又到了本周的开源项目推荐。最近推荐的元数据管理项目很多&#xff0c;但是很多元数据管理平台的功能复杂难用。 那么有没有轻量一点的元数据管理项目呢&#xff1f; 今天为大家推荐的开源项目&#xff0c;就是一个轻量级的元数据管理工具。虽然…

Linux动态分配IP与正向解析DNS

目录 一、DHCP分配 1. 动态分配 1.1 服务端服务安装 1.2 修改服务端dhcp配置 1.3 修改客户端dhcp&#xff0c;重启查询网卡信息 2. 根据mac固定分配 2.1 修改服务器端dhcp服务配置 2.2 客户端自动获取&#xff0c;查看网卡信息 二、时间同步 1. 手动同步 2. 自动同…

使用numpy处理图片——图片拼接

大纲 左右拼接上下拼接 在《使用numpy处理图片——图片切割》一文中&#xff0c;我们介绍了如何使用numpy将一张图片切割成4部分。本文我们将反其道而行之&#xff0c;将4张图片拼接成1张图片。 基本的思路就是先用两张图以左右结构拼接成上部&#xff0c;另外两张图也以左右拼…

Win2008R2上RedisDesktopManager 黑屏

问题&#xff1a; 运行发现右侧显示缓存信息的部分是黑屏。 解决方式&#xff1a; 管理工具->远程桌面服务->远程桌面会话主机配置->RDP-TCP->属性->客户端设置->颜色深度->限制最大颜色深度,将16位改为32位

新能源时代-电动汽车充电桩设备建设及运维平台搭建

安科瑞 崔丽洁 摘要&#xff1a;在社会经济发展的新时期&#xff0c;我国城市化的水平也在随之不断的提高&#xff0c;使我国制造业迅速崛起&#xff0c;并加剧了该行业的竞争力&#xff0c;要想使企业在竞争中占据有力的位置&#xff0c;企业就要顺应时代发展的潮流&#xff0…

Thumbnail AI:让图片处理更智能

一、产品介绍 Thumbnail AI是一款基于人工智能技术的图片处理软件&#xff0c;能够快速、准确地生成各种尺寸的缩略图。这款软件非常适合用于网站建设、广告设计、电商等领域&#xff0c;能够大大提高图片处理效率。 二、应用场景 网站建设&#xff1a;在网站建设中&#xff…

深度学习”和“多层神经网络”的区别

在讨论深度学习与多层神经网络之间的差异时&#xff0c;我们必须首先理解它们各自是什么以及它们在计算机科学和人工智能领域的角色。 深度学习是一种机器学习的子集&#xff0c;它使用了人工神经网络的架构。深度学习的核心思想是模拟人脑神经元的工作方式&#xff0c;以建立…

SpringBoot 把PageHelper分页信息返回给前端

第1步&#xff1a;定义线程容器收纳HttpHeaders和HttpStatus import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus;public class ResponseUtils {private static ThreadLocal<HttpHeaders> ThreadLocalHeaders new InheritableT…