文章目录
- 数据聚合
- 一、聚合的种类
- 二、DSL实现聚合
- 1、Bucket(桶)聚合
- 2、Metrics(度量)聚合
- 三、RestAPI实现聚合
- 自动补全
- 一、拼音分词器
- 二、自定义分词器
- 三、自动补全查询
- 四、实现搜索款自动补全(例酒店信息)
- 数据同步
- 双写一致性
数据聚合
一、聚合的种类
官方文档 => 聚合 https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations.html
聚合:对文档信息的统计、分类、运算。类似mysql sum、avg、count
- 桶(Bucket)聚合:用来对文档做分组
- TermAggregation:按照文档字段值分组(相当于mysql group by)
- Date Histogram:按照日期阶梯分组,例如一周一组,一月一组
- 度量(metric)聚合:用来计算一些值,最大值、平均值、最小值等。
- Avg:平均值
- Max:最大值
- Min:最小值
- Stats:同时求max、min、avg、sum等
- 管道(pipeline)聚合:以其他聚合结果为基础继续做集合
二、DSL实现聚合
1、Bucket(桶)聚合
_count:默认是按照文档数量的降序排序
GET /hotel/_search
{"size": 0,"aggs": {"brandAgg": {"terms": {"field": "brand","size": 20,"order": {"_count": "asc"}}}}
}
上面使用的bucket聚合,会扫描索引库所有的文档进行聚合。可以限制扫描的范围:利用query条件即可。
GET /hotel/_search
{"query": {"range": {"price": {"lt": 200 # 只对价位低于200的聚合}}}, "size": 0,"aggs": {"brandAgg": {"terms": {"field": "brand","size": 20,"order": {"_count": "asc"}}}}
}
2、Metrics(度量)聚合
聚合的嵌套,先对外层进行聚合,在对内存进行聚合
注意嵌套查询:写在外层查询括号内,而非并立。
GET /hotel/_search
{"size": 0,"aggs": {"brandAgg": {"terms": {"field": "brand","size": 10,"order": {"scoreAgg.avg": "asc"}},"aggs": {"scoreAgg": {"stats": {"field": "score"}}}}}
}
三、RestAPI实现聚合
bucket trem聚合(group by),实现品牌、星级、城市聚合的方法
public Map<String, List<String>> filters(RequestParam requestParam) {String[] aggNames = new String[]{"brand","city","starName"};Map<String, List<String>> resultMap = new HashMap<>();SearchRequest searchRequest = new SearchRequest("hotel");// 限定聚合范围BoolQueryBuilder boolQueryBuilder = getBoolQueryBuilder(requestParam);searchRequest.source().query(boolQueryBuilder);// 聚合字段searchRequest.source().size(0);searchRequest.source().aggregation(AggregationBuilders.terms(aggNames[0]).field("brand").size(100));searchRequest.source().aggregation(AggregationBuilders.terms(aggNames[1]).field("city").size(100));searchRequest.source().aggregation(AggregationBuilders.terms(aggNames[2]).field("starName").size(100));try {SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);Aggregations aggregations = searchResponse.getAggregations();for (String aggName : aggNames) {Terms terms = aggregations.get(aggName);List<String> list = new ArrayList<>();for (Terms.Bucket bucket : terms.getBuckets()) {list.add(bucket.getKeyAsString());}resultMap.put(aggName,list);}return resultMap;} catch (IOException e) {e.printStackTrace();return null;}}
自动补全
一、拼音分词器
下载拼音分词器:https://github.com/medcl/elasticsearch-analysis-pinyin/releases/tag/v8.6.0
解压放在plugins目录下(docker挂载的目录),然后重启es
二、自定义分词器
拼音分词器的过滤规则,参照上面下载的链接。
创建一个自定义分词器(text index库),分词器名:my_analyzer
// 自定义拼音分词器 + mapping约束
PUT /test
{"settings": {"analysis": {"analyzer": {"my_analyzer": {"tokenizer": "ik_max_word","filter": "py"}},"filter": {"py": {"type": "pinyin","keep_full_pinyin": false,"keep_joined_full_pinyin": true,"keep_original": true,"limit_first_letter_length": 16,"remove_duplicated_term": true,"none_chinese_pinyin_tokenize": false}}}},"mappings": {"properties": {"name": {"type": "text","analyzer": "my_analyzer","search_analyzer": "ik_smart"}}}
}
三、自动补全查询
completion suggester查询:
- 字段类型必须是completion
- 字段值是多词条的数组才有意义
// 自动补全的索引库
PUT test2
{"mappings": {"properties": {"title":{"type": "completion"}}}
}
// 示例数据
POST test2/_doc
{"title": ["Sony", "WH-1000XM3"]
}
POST test2/_doc
{"title": ["SK-II", "PITERA"]
}
POST test2/_doc
{"title": ["Nintendo", "switch"]
}// 自动补全查询
POST /test2/_search
{"suggest": {"title_suggest": {"text": "s", // 关键字"completion": {"field": "title", // 补全字段"skip_duplicates": true, // 跳过重复的"size": 10 // 获取前10条结果}}}
}
四、实现搜索款自动补全(例酒店信息)
在这里插入代码片
构建索引库
// 酒店数据索引库
PUT /hotel
{"settings": {"analysis": {"analyzer": {"text_anlyzer": {"tokenizer": "ik_max_word","filter": "py"},"completion_analyzer": {"tokenizer": "keyword","filter": "py"}},"filter": {"py": {"type": "pinyin","keep_full_pinyin": false,"keep_joined_full_pinyin": true,"keep_original": true,"limit_first_letter_length": 16,"remove_duplicated_term": true,"none_chinese_pinyin_tokenize": false}}}},"mappings": {"properties": {"id":{"type": "keyword"},"name":{"type": "text","analyzer": "text_anlyzer","search_analyzer": "ik_smart","copy_to": "all"},"address":{"type": "keyword","index": false},"price":{"type": "integer"},"score":{"type": "integer"},"brand":{"type": "keyword","copy_to": "all"},"city":{"type": "keyword"},"starName":{"type": "keyword"},"business":{"type": "keyword","copy_to": "all"},"location":{"type": "geo_point"},"pic":{"type": "keyword","index": false},"all":{"type": "text","analyzer": "text_anlyzer","search_analyzer": "ik_smart"},"suggestion":{"type": "completion","analyzer": "completion_analyzer"}}}
}
查询测试
GET /hotel/_search
{"query": {"match_all": {}}
}GET /hotel/_search
{"suggest": {"YOUR_SUGGESTION": {"text": "s","completion": {"field": "suggestion","skip_duplicates": true // 跳过重复的}}}
}
public List<String> getSuggestion(String prefix) {SearchRequest request = new SearchRequest("hotel");ArrayList<String> list = new ArrayList<>();try {request.source().suggest(new SuggestBuilder().addSuggestion("OneSuggestion",SuggestBuilders.completionSuggestion("suggestion").prefix(prefix).skipDuplicates(true).size(10)));SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);Suggest suggest = response.getSuggest();CompletionSuggestion oneSuggestion = suggest.getSuggestion("OneSuggestion");List<CompletionSuggestion.Entry.Option> options = oneSuggestion.getOptions();for (CompletionSuggestion.Entry.Option option : options) {String text = option.getText().toString();list.add(text);}} catch (IOException e) {e.printStackTrace();}return list;}
数据同步
双写一致性
同步调用数据耦合,业务耦合
异步通知:增加实现难度
监听binlog(记录增删改操作):增加mysql压力,中间价搭建