从零开始 Spring Cloud 11:Elasticsearch II

从零开始 Spring Cloud 11:Elasticsearch II

image-20230714102655393

图源:laiketui.com

在上篇文章中我们学习了 es 的基本功能,在本篇文章中会学习 es 的一些高级功能,比如:

  • 聚合查询
  • 自动补全
  • 集群部署

数据聚合

类型

**聚合(aggregations)**可以让我们极其方便的实现对数据的统计、分析、运算。

聚合常见的有三类:

  • **桶(Bucket)**聚合:用来对文档做分组

    • TermAggregation:按照文档字段值分组,例如按照品牌值分组、按照国家分组
    • Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
  • **度量(Metric)**聚合:用以计算一些值,比如:最大值、最小值、平均值等

    • Avg:求平均值
    • Max:求最大值
    • Min:求最小值
    • Stats:同时求max、min、avg、sum等
  • **管道(pipeline)**聚合:其它聚合的结果为基础做聚合

**注意:**参加聚合的字段必须是keyword、日期、数值、布尔类型,也就是说不能对 text 进行聚合,因为 text 可以分词。

更多关于聚合的介绍可以观看这个视频。

DSL

桶聚合

下面是一个使用酒店品牌进行桶聚合的示例:

GET /hotel/_search
{"size": 0, // 返回文档条数,进行聚合的时候可以指定为0,不返回文档"aggs":{ // 聚合定义"brandAgg":{ // 聚合名称"terms": { // 聚合类型"field": "brand", // 参与聚合的字段名"size": 10 // 返回的聚合结果数量}}}
}

返回的内容:

{// ..."aggregations" : {"brandAgg" : { // 聚合名称"doc_count_error_upper_bound" : 0,"sum_other_doc_count" : 39,"buckets" : [ // 聚合分组{"key" : "7天酒店", "doc_count" : 30 // 聚合后的文档数目},{"key" : "如家","doc_count" : 30},{"key" : "皇冠假日","doc_count" : 17},// ...]}}
}

聚合的结果默认按照doc_count倒序排列,可以通过指定排序规则改变这一点:

GET /hotel/_search
{"size": 0,"aggs":{"brandAgg":{"terms": {"field": "brand","size": 10,"order": {"_count": "asc"}}}}
}

返回内容:

{// ..."aggregations" : {"brandAgg" : {"doc_count_error_upper_bound" : 0,"sum_other_doc_count" : 130,"buckets" : [{"key" : "万丽","doc_count" : 2},{"key" : "丽笙","doc_count" : 2},{"key" : "君悦","doc_count" : 4},// ...]}}
}

默认情况下,es 需要对指定索引库的所有文档进行聚合,这就意味着较多的内存开销(需要将所有文档读入内存)。所以在使用聚合时添加一个查询条件以缩小聚合范围是个不错的优化手段

比如对所有价格在 200 以下的酒店进行聚合:

GET /hotel/_search
{"query": {"range": {"price": {"lte": 200}}},"size": 0,"aggs": {"brandAgg": {"terms": {"field": "brand","size": 10}}}
}

度量聚合

使用度量聚合可以对文档内容进行数据分析,比如分析所有酒店评分情况:

GET /hotel/_search
{"size": 0,"aggs": {"scoreAgg": {"stats": {"field": "score"}}}
}

返回结果:

{// ..."aggregations" : {"scoreAgg" : {"count" : 201,"min" : 35.0,"max" : 49.0,"avg" : 43.55223880597015,"sum" : 8754.0}}
}

可以将桶聚合和度量聚合结合(嵌套)起来使用,比如统计每种品牌酒店的评分情况:

GET /hotel/_search
{"size": 0,"aggs": { // 外层聚合"brandAgg": { // 聚合名称"terms": { // 桶聚合"field": "brand", // 按照品牌进行桶聚合"size": 10, "order": { // 对桶聚合结果排序"scoreAgg.avg": "desc" // 对桶聚合按照度量聚合的平均分进行排序}},"aggs": { // 内层聚合"scoreAgg": { // 聚合名称"stats": { // 度量聚合"field": "score" // 对用户评分进行聚合}}}}}
}

返回内容:

{// ..."aggregations" : {"brandAgg" : {"doc_count_error_upper_bound" : 0,"sum_other_doc_count" : 111,"buckets" : [{"key" : "万丽","doc_count" : 2,"scoreAgg" : {"count" : 2,"min" : 46.0,"max" : 47.0,"avg" : 46.5,"sum" : 93.0}},// ...{"key" : "君悦","doc_count" : 4,"scoreAgg" : {"count" : 4,"min" : 44.0,"max" : 47.0,"avg" : 45.5,"sum" : 182.0}},{"key" : "希尔顿","doc_count" : 10,"scoreAgg" : {"count" : 10,"min" : 37.0,"max" : 48.0,"avg" : 45.4,"sum" : 454.0}}]}}
}

RestAPI

下面展示如何用 RestAPI 实现聚合。

桶聚合

假设 DSL 语句如下:

GET /hotel/_search
{"query": {"range": {"price": {"lte": 200}}},"size": 0,"aggs": {"brandAgg": {"terms": {"field": "brand","size": 10}}}
}

RestAPI 实现调用:

@Test
@SneakyThrows
void testBrandAgg() {// 准备查询语句SearchRequest request = new SearchRequest("hotel");request.source().query(QueryBuilders.rangeQuery("price").lte(200)).size(0).aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(20));SearchResponse searchResponse = restHighLevelClient.search(request, RequestOptions.DEFAULT);// 处理返回值// 获取 aggregations 中的内容Aggregations aggregations = searchResponse.getAggregations();if (aggregations != null) {// 获取查询时定义的聚合结果// 返回的类型与查询时的聚合类型相关Terms brandAgg = aggregations.get("brandAgg");if (brandAgg != null) {// 遍历聚合查询结果并打印List<? extends Terms.Bucket> buckets = brandAgg.getBuckets();buckets.forEach(b -> {System.out.println(String.format("key:%s, count: %d", b.getKeyAsString(), b.getDocCount()));});}}
}

度量聚合

假设 DSL 语句:

GET /hotel/_search
{"size": 0,"aggs": {"brandAgg": {"terms": {"field": "brand","size": 10,"order": {"scoreAgg.avg": "desc"}},"aggs": {"scoreAgg": {"stats": {"field": "score"}}}}}
}

对应的 RestAPI 实现:

@Test
@SneakyThrows
void testScoreAgg() {SearchRequest request = new SearchRequest("hotel");TermsAggregationBuilder brandAgg = AggregationBuilders.terms("brandAgg");request.source().size(0).aggregation(brandAgg);StatsAggregationBuilder scoreAgg = AggregationBuilders.stats("scoreAgg");brandAgg.field("brand").size(10).order(BucketOrder.aggregation("scoreAgg.avg", false)).subAggregation(scoreAgg);scoreAgg.field("score");SearchResponse searchResponse = restHighLevelClient.search(request, RequestOptions.DEFAULT);Aggregations aggregations = searchResponse.getAggregations();if (aggregations != null) {Terms brandTerms = aggregations.get("brandAgg");List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();for (Terms.Bucket bucket : buckets) {System.out.println(String.format("brand: %s, count: %d", bucket.getKeyAsString(), bucket.getDocCount()));Aggregations subAggregations = bucket.getAggregations();if (subAggregations != null) {Stats scoreStats = subAggregations.get("scoreAgg");System.out.println(String.format("min: %.2f, max: %.2f, avg: %.2f, sum: %.2f",scoreStats.getMin(),scoreStats.getMax(),scoreStats.getAvg(),scoreStats.getSum()));}}}
}

案例:动态筛选项

可以利用前边所学的知识,为酒店项目中的检索页面提供一个接口,动态查询并返回相关的聚合结果(城市、星级、品牌等)。

相关内容可以查看两个视频:

  • 06-数据聚合-多条件聚合.mp4
  • 07-数据聚合-带过滤条件的聚合.mp4

也可以从这里下载实现后的源码。

自动补全

下载 && 安装

要能够支持拼音的自动补全,需要安装一个 es 的拼音自动补全插件。

  • 注意,插件版本要与 es 版本一致。
  • 这里提供一个该插件 7.12.1 版本的百度云下载。

安装插件需要将插件解压到 es 挂载的插件目录下然后重启 es:

[icexmoon@192 ~]$ docker volume inspect es-plugins
[{"CreatedAt": "2023-08-03T21:12:21+08:00","Driver": "local","Labels": null,"Mountpoint": "/var/lib/docker/volumes/es-plugins/_data","Name": "es-plugins","Options": null,"Scope": "local"}
]
[icexmoon@192 ~]$ cd 下载
[icexmoon@192 下载]$ sudo mv ./py /var/lib/docker/volumes/es-plugins/_data
[icexmoon@192 下载]$ docker restart es

可以通过 devtool 执行以下 DSL 测试拼音分词器:

POST /_analyze
{"text": ["如家酒店还不错"],"analyzer": "pinyin"
}

返回内容:

{"tokens" : [{"token" : "ru",// ...},{"token" : "rjjdhbc",// ...},{"token" : "jia",// ...},{"token" : "jiu",// ...},{"token" : "dian",// ...},{"token" : "hai",// ...},{"token" : "bu",// ...},{"token" : "cuo",// ...}]
}

语句的分词结果是每个汉字对应的拼音以及每个汉字拼音首字母的结合。

自定义分词器

直接使用拼音分词器进行分词会存在一些问题:

  • 分词后的结果没有中文
  • 分词结果中不需要单个字对应的拼音

可以使用自定义分词器解决以上问题。

elasticsearch 中分词器(analyzer)的组成包含三部分:

  • character filters:在tokenizer之前对文本进行处理。例如删除字符、替换字符
  • tokenizer:将文本按照一定的规则切割成词条(term)。例如keyword,就是不分词;还有ik_smart
  • tokenizer filter:将tokenizer输出的词条做进一步处理。例如大小写转换、同义词处理、拼音处理等

这里我们可以让 ik 分词器作为 tokenizer 切割词条,然后再用拼音分词器作为 tokenizer filter 添加词条对应的拼音。

具体实现:

PUT /test
{"settings": {"analysis": {"analyzer": { // 自定义分词器"my_analyzer": { // 分词器名称"tokenizer": "ik_max_word", // 作为 tokenizer 使用的分词器"filter": "py" // 使用自定义过滤器 py 作为分词器的 tokenizer filter}},"filter": { // 自定义 tokenizer filter"py": { // 过滤器名称"type": "pinyin", // 过滤器类型,这里使用 pinyin 分词器"keep_full_pinyin": false, // 关闭单个汉字对应的拼音分词"keep_joined_full_pinyin": true, // 开启完整词条对应的拼音分词"keep_original": true, // 保留原始输入(汉字)"limit_first_letter_length": 16,"remove_duplicated_term": true, // 删除重复词条的索引结果"none_chinese_pinyin_tokenize": false // 关闭对拼音进行分词}}}},"mappings": {"properties": {"name": {"type": "text","analyzer": "my_analyzer", // 文档创建时使用的分词器"search_analyzer": "ik_smart" // 检索文档时使用的分词器}}}
}

这里使用 pinyin 分词器作为 tokenizer filter。为了满足要求,这里还使用了 pinyin 分词器的一系列配置选项,作用见注释,完整的 pinyin 分词器配置项可以查看项目页面。

最后,在映射定义中,指定用自定义分词器 my_analyzer 作为文档索引时的分词器,而使用 ik_smart 作为检索文档时使用的分词器。这是因为如果检索的时候依然使用 my_analyzer 分词器对检索关键字进行分词,就会查询到一些汉字不匹配但拼音匹配的结果,这是不正确的。

可以用 DSL 进行测试:

POST /test/_analyze
{"text": ["如家酒店还不错"],"analyzer": "my_analyzer"
}

返回结果:

{"tokens" : [{"token" : "如家",// ...},{"token" : "rujia",// ...},{"token" : "rj",// ...},{"token" : "酒店",// ...},{"token" : "jiudian",// ...},{"token" : "jd",// ...},{"token" : "还不",// ...},{"token" : "haibu",// ...},{"token" : "hb",// ...},{"token" : "不错",// ...},{"token" : "bucuo",// ...},{"token" : "bc",// ...}]
}

自动补全查询

要使用 DSL 进行自动补全查询,所查询的文档属性必须是 completion 类型:

PUT /test
{"mappings": {"properties": {"title": {"type": "completion"}}}
}

添加一些示例数据:

POST test/_doc
{"title": ["Sony", "WH-1000XM3"]
}
POST test/_doc
{"title": ["SK-II", "PITERA"]
}
POST test/_doc
{"title": ["Nintendo", "switch"]
}

自动补全字段对应的数据是一个包含多个词条的数组,这是为了能更好的自动补全。比如第一个文档的title字段有SonyWH-1000XM3两个词条,查询的时候关键字是s就可以用Sony自动补全,如果关键字是w就可以用WH-1000XM3自动补全。

自动补全查询的 DSL:

POST /test/_search
{"suggest": {"title_suggest": {"text": "s", // 检索关键字"completion": {"field": "title", // 用于自动补全的字段名"skip_duplicates": true, // 是否跳过重复词条"size": 10 // 返回结果条数}}}
}

案例:酒店检索自动补全

重建索引库

先删除之前的索引库:

DELETE /hotel

添加新的索引库定义:

PUT /hotel
{"settings": {"analysis": {"analyzer": { // 自定义分词器"text_anlyzer": { // 用于切分并添加拼音的分词器"tokenizer": "ik_max_word", // 用于切分的分词器"filter": "py" // 过滤器},"completion_analyzer": { // 不切分只添加拼音的分词器"tokenizer": "keyword", // 用于切分的分词器, keyword 为不切分"filter": "py" // 过滤器}},"filter": { // 自定义过滤器"py": { // 过滤器名称"type": "pinyin","keep_full_pinyin": false,"keep_joined_full_pinyin": true,"keep_original": true,"limit_first_letter_length": 16,"remove_duplicated_term": true,"none_chinese_pinyin_tokenize": false}}}},"mappings": {"properties": {"id":{"type": "keyword"},"name":{"type": "text","analyzer": "text_anlyzer", // 对酒店名称分词后添加对应的拼音"search_analyzer": "ik_smart", // 检索酒店名称时对关键字进行中文分词"copy_to": "all"},"address":{"type": "keyword","index": false},"price":{"type": "integer"},"score":{"type": "integer"},"brand":{"type": "keyword","copy_to": "all"},"city":{"type": "keyword"},"starName":{"type": "keyword"},"business":{"type": "keyword","copy_to": "all"},"location":{"type": "geo_point"},"pic":{"type": "keyword","index": false},"all":{"type": "text","analyzer": "text_anlyzer", // 对 all 中文分词后添加对应拼音作为分词"search_analyzer": "ik_smart" // 检索 all 时对关键字进行中文分词},"suggestion":{ "type": "completion", // 用于自动补全的字段"analyzer": "completion_analyzer" // 自动补全字段已经是分词后的短语,只需要附加拼音}}}
}

重新导入数据

之前示例中已经看到,对于completion类型的字段,添加文档时的 DSL 中是以一个 json 数组的形式传入的,所以 Java 代码中对应的属性应该是 List<String> 类型:

@Data
@NoArgsConstructor
public class HotelDoc {// ...private List<String> suggestion = new ArrayList<>();// ...
}

可以将需要自动补全的内容直接添加到这个属性,比如商圈和品牌:

public class HotelDoc {// ...public HotelDoc(Hotel hotel) {// ...this.suggestion.addAll(Arrays.asList(this.brand, this.business));}
}

但商圈中是存在类似江湾、五角场商业区这样的数据,这样的数据只会被jw之类的自动补全,不会被wj之类的自动补全,所以类似的存在我们可能需要切分后作为自动补全候选:

// ...
@Data
@NoArgsConstructor
public class HotelDoc {// ...private List<String> suggestion = new ArrayList<>();public HotelDoc(Hotel hotel) {// ...if (business.contains("、")) {String[] strings = business.split("、");this.suggestion.addAll(Arrays.asList(strings));this.suggestion.add(this.brand);} else {this.suggestion.addAll(Arrays.asList(this.brand, this.business));}}
}

现在执行批量添加酒店的测试用例导入酒店数据。

DSL 测试

用 DSL 查看导入的自动补全字段:

GET /hotel/_search
{"size": 20, "query": {"match_all": {}}
}

返回结果:

{// ..."hits" : {//..."hits" : [{// ..."_source" : {"address" : "静安交通路40号","brand" : "7天酒店","business" : "四川北路商业区",// ..."suggestion" : ["7天酒店","四川北路商业区"]}},{// ..."_source" : {"address" : "广灵二路126号","brand" : "速8","business" : "四川北路商业区",// ..."suggestion" : ["速8","四川北路商业区"]}},{// ..."_source" : {"address" : "兰田路38号","brand" : "速8","business" : "长风公园地区",// ..."suggestion" : ["速8","长风公园地区"]}},{// ..."_source" : {"address" : "徐汇龙华西路315弄58号","brand" : "7天酒店","business" : "八万人体育场地区",// ..."suggestion" : ["7天酒店","八万人体育场地区"]}},// ...{// ..."_source" : {"address" : "松江荣乐东路677号","brand" : "速8","business" : "佘山、松江大学城",// ..."suggestion" : ["佘山","松江大学城","速8"]}},// ...]}
}

用 DSL 进行自动补全查询:

POST /hotel/_search
{"suggest": {"hote_suggestion": {"text": "wj","completion": {"field": "suggestion","skip_duplicates": true,"size": 10}}}
}

返回结果:

{// ..."suggest" : {"hote_suggestion" : [{"text" : "wj","offset" : 0,"length" : 2,"options" : [{"text" : "五角场商业区",// ...},{"text" : "望京",// ...}]}]}
}

使用 RestAPI 实现自动补全

上面的 DSL 语句用 RestAPI 实现就是:

@Test
@SneakyThrows
void testHotelSuggestion() {SearchRequest request = new SearchRequest("hotel");request.source().suggest(new SuggestBuilder().addSuggestion("hotel_suggestion",SuggestBuilders.completionSuggestion("suggestion").prefix("wj").skipDuplicates(true).size(10)));SearchResponse searchResponse = restHighLevelClient.search(request, RequestOptions.DEFAULT);Suggest suggest = searchResponse.getSuggest();if (suggest != null) {CompletionSuggestion hotelSuggestion = suggest.getSuggestion("hotel_suggestion");if (hotelSuggestion != null) {List<CompletionSuggestion.Entry.Option> options = hotelSuggestion.getOptions();List<String> matchedWords = options.stream().map(o -> o.getText().string()).collect(Collectors.toList());System.out.println(matchedWords);}}
}

查询关键字由CompletionSuggestionBuilder.prefix指定,这里是wj

解析返回值是需要注意的是:Suggest.getSuggestion方法是一个泛型方法,返回的是Suggestion的一个子类。具体来说就是CompletionSuggestionPhraseSuggestionTermSuggestion三者之一。实际上使用哪个取决于查询时使用的自动补全类型,比如我们这里查询时使用的是CompletionSuggestion,所以解析结果是也应该使用CompletionSuggestion

之后就是按照上边的方式创建一个接口提供给前端页面用于自动补全,比较简单,这里不再说明。具体可以查看这个视频。

数据同步

除了初始阶段需要用数据库中的数据对 es 进行初始化以外,项目运行过程中数据库中的数据必然会发生改变,此时就需要通过某种方式将数据的改变同步到 es 中,也就是要解决数据库和 es 的数据同步问题。

如果是单体项目,只需要调用 RestAPI 同步数据即可。但如果是微服务架构,且一个微服务负责管理数据库,另一个微服务负责查询 es,这个问题就会变得复杂。

这里讨论三种解决方案。

方案分析

同步调用

image-20210723214931869

这种方式比较简单,就是在管理数据库的微服务(这里是 hotel-admin)中,修改数据后调用使用 es 的微服务(这里是 hotel-demo)提供的索引库更新接口,然后由后者调用 RestAPI 完成 es 的数据更新。

异步调用

image-20210723215140735

这个方案使用了 MQ 作为中间件,hotel-admin 写入数据后,会向 MQ 发送一个数据变更消息,hotel-demo 监听到该消息后,会自行更新 es。

监听 binlog

image-20210723215518541

MySQL 可以产生一个 binlog 文件,可以用中间件 canal 监听 binlog 文件,如果该文件改变,就说明 MySQL 的数据发生变化,canal 就会自行通知 hotel-demo 有数据变更,hotel-demo 就会更新 es 中的数据。

对比

上面提到的三种方案各有优缺点:

方式一:同步调用

  • 优点:实现简单,粗暴
  • 缺点:业务耦合度高

方式二:异步通知

  • 优点:低耦合,实现难度一般
  • 缺点:依赖mq的可靠性

方式三:监听binlog

  • 优点:完全解除服务间耦合
  • 缺点:开启binlog增加数据库负担、实现复杂度高

案例

准备框架代码

这里引入另一个示例项目 hotel-admin 来实现上边所说的数据同步。

先解压项目代码,并加载到 IDE 中。

修改数据库连接相关参数,并启动项目。

访问酒店管理页面(我这里是 http://localhost:8099/),可以进行数据库的增删改查。

创建 MQ 组件

数据同步需要使用一个交换机和两个队列:

image-20210723215850307

对于 es 来说,新增和修改的 DSL 是同样的,RestAPI 也是同样的,所以这里不需要对更新和新增做区分,都使用同一个队列即可。

在两个微服务中都添加 AMQP 依赖:

<!--AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件中添加连接信息:

spring:rabbitmq:host: 192.168.0.88 # RabbitMQ 服务端 ipport: 5672 # 端口virtual-host: / # 虚拟主机username: itcast # 用户名password: 123321 # 密码

在微服务 hotel-demo 中添加交换机和队列定义:

// ...
@Configuration
public class WebConfig {// ...@Beanpublic TopicExchange topicExchange() {return new TopicExchange(MQConstants.HOTEL_EXCHANGE_NAME, true, false);}@Beanpublic Queue insertQueue() {return new Queue(MQConstants.HOTEL_INSERT_QUEUE_NAME, true);}@Beanpublic Queue deleteQueue() {return new Queue(MQConstants.HOTEL_DELETE_QUEUE_NAME, true);}@Beanpublic Binding insertQueueBinding() {return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MQConstants.HOTEL_INSERT_ROUTINE_KEY);}@Beanpublic Binding deleteQueueBinding() {return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MQConstants.HOTEL_DELETE_ROUTINE_KEY);}
}

实现消息发送

在微服务 hotel-adminController 中添加消息发送:

// ...
public class HotelController {@Autowiredprivate IHotelService hotelService;@Autowiredprivate RabbitTemplate rabbitTemplate;// ...@PostMappingpublic void saveHotel(@RequestBody Hotel hotel){hotelService.save(hotel);rabbitTemplate.convertAndSend(MQConstants.HOTEL_EXCHANGE_NAME,MQConstants.HOTEL_INSERT_ROUTINE_KEY,hotel.getId());}@PutMapping()public void updateById(@RequestBody Hotel hotel){if (hotel.getId() == null) {throw new InvalidParameterException("id不能为空");}hotelService.updateById(hotel);rabbitTemplate.convertAndSend(MQConstants.HOTEL_EXCHANGE_NAME,MQConstants.HOTEL_INSERT_ROUTINE_KEY,hotel.getId());}@DeleteMapping("/{id}")public void deleteById(@PathVariable("id") Long id) {hotelService.removeById(id);rabbitTemplate.convertAndSend(MQConstants.HOTEL_EXCHANGE_NAME,MQConstants.HOTEL_DELETE_ROUTINE_KEY,id);}
}

实现消息接收

在微服务 hotel-demo 中添加一个负责监听消息的类:

@Component
public class HotelListener {@Autowiredprivate IHotelService hotelService;@RabbitListener(queues = MQConstants.HOTEL_INSERT_QUEUE_NAME)public void listenHotelInsert(Long id){hotelService.addHotel2ES(id);}@RabbitListener(queues = MQConstants.HOTEL_DELETE_QUEUE_NAME)public void listenHotelDelete(Long id){hotelService.deleteHotelFromES(id);}
}

Service 层对应方法中实现具体的 RestAPI 调用:

@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {// ...@Override@SneakyThrowspublic void addHotel2ES(Long id) {//从数据库读取酒店信息Hotel hotel = this.getById(id);if (hotel != null) {HotelDoc hotelDoc = new HotelDoc(hotel);IndexRequest request = new IndexRequest("hotel").id(hotelDoc.getId().toString());request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);restHighLevelClient.index(request, RequestOptions.DEFAULT);}}@Override@SneakyThrowspublic void deleteHotelFromES(Long id) {if (id != null) {DeleteRequest request = new DeleteRequest("hotel").id(id.toString());restHighLevelClient.delete(request, RequestOptions.DEFAULT);}}
}

这样就实现了对 es 的数据同步,可以通过相关的酒店管理页面进行验证。

ES 集群

单台 ES 部署的方式会存在一些问题:

  • 单台 ES 能存储的数据存在上限(受内存大小限制)
  • 出现故障时整个系统将不可用

可以通过 ES 集群部署的方式解决上边的问题。

关于 ES 的集群部署,有以下概念:

  • 集群(cluster):一组拥有共同的 cluster name 的 节点。
  • 节点(node) :集群中的一个 Elasticearch 实例
  • 分片(shard):索引可以被拆分为不同的部分进行存储,称为分片。在集群环境下,一个索引的不同分片可以拆分到不同的节点中

通过将一个索引库拆分成多个分片并存储到不同的节点,可以解决单个 ES 所能存储的数据有限的问题。

image-20200104124440086-5602723

对于单台 ES 出现问题会影响整个系统的问题,可以通过让不同的节点保留其它节点的分片副本的方式解决。

比如下面是其中一个可能的解决方案:

image-20200104124551912

这有点像 Linux 的磁盘阵列中的 RAID 5,其本质的理念上是一致的:在保证一定读写性能的前提下提升数据恢复的冗余。

搭建 ES 集群

部署

这里使用 docker-compose 部署 es 集群的方式,docker-compose.yml文件定义如下:

version: '2.2'
services:es01: # 服务名称image: elasticsearch:7.12.1 # 使用的镜像container_name: es01 # 容器名称environment: # 环境变量- node.name=es01 # es 节点名称,用于集群部署配置中的互相引用- cluster.name=es-docker-cluster # 集群名称,同一个名称下的节点会被部署为一个集群- discovery.seed_hosts=es02,es03 # 集群中的其它节点的主机名。一般使用ip,这里在同一个 docker 网络,所以可以使用容器名- cluster.initial_master_nodes=es01,es02,es03 # 初始主节点候选- "ES_JAVA_OPTS=-Xms512m -Xmx512m" # java 相关设置volumes:- data01:/usr/share/elasticsearch/data # 数据卷挂载ports:- 9200:9200 # 端口映射networks:- elastic # 所属 docker 网络es02:image: elasticsearch:7.12.1container_name: es02environment:- node.name=es02- cluster.name=es-docker-cluster- discovery.seed_hosts=es01,es03- cluster.initial_master_nodes=es01,es02,es03- "ES_JAVA_OPTS=-Xms512m -Xmx512m"volumes:- data02:/usr/share/elasticsearch/dataports:- 9201:9200networks:- elastices03:image: elasticsearch:7.12.1container_name: es03environment:- node.name=es03- cluster.name=es-docker-cluster- discovery.seed_hosts=es01,es02- cluster.initial_master_nodes=es01,es02,es03- "ES_JAVA_OPTS=-Xms512m -Xmx512m"volumes:- data03:/usr/share/elasticsearch/datanetworks:- elasticports:- 9202:9200
volumes:data01:driver: localdata02:driver: localdata03:driver: localnetworks:elastic:driver: bridge

启动 es 集群服务:

[icexmoon@192 es-cluster]$ docker-compose up -d

需要确保宿主机有 4G 以上内存,如果内存不够,可以停止其它不需要的 docker 容器:

[icexmoon@192 es-cluster]$ docker stop kibana es mq

如果启动后 es 服务会自动退出,观察日志会出现类似下面的错误信息:

max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]

可以通过修改 Linux 系统权限的方式解决:

[icexmoon@192 es-cluster]$ sudo vim /etc/sysctl.conf

添加以下内容:

vm.max_map_count=262144

执行以下命令让其生效:

sysctl -p

状态监控

之前介绍的 kibana 默认情况下只能查看一台 es 的状态信息,如果需要用 kibana 监控 es 集群,需要依赖 es 的 x-pack 功能,配置比较复杂。

这里使用 cerebro 监控集群状态。

这里提供一个百度云的安装包

解压后执行 bin 目录下的 cerebro.bat 批处理文件即可。

会自动打开 cmd 并运行程序:

[info] play.api.Play - Application started (Prod) (no global state)
[info] p.c.s.AkkaHttpServer - Listening for HTTP on /0:0:0:0:0:0:0:0:9000

上边的输出说明程序已经监听本地的 9000 端口,因此可以访问 localhost:9000。

打开的页面需要输入一个 es 节点地址,输入任意节点即可,比如http://192.168.0.88:9200

创建索引库

就像之前说的,在 es 集群部署的情况下,创建索引库可以使用多个分片和备份分片。

可以通过 DSL 创建这样的索引库:

PUT /itcast
{"settings": {"number_of_shards": 3, // 分片数量"number_of_replicas": 1 // 每个分片的副本数量},"mappings": {"properties": {// mapping映射定义 ...}}
}

也可以通过 cerebro 工具创建:

image-20230811183957826

现在这个新建的索引库总共有3个正式分片以及3个备份分片:

image-20230811184134782

图中的虚线就是备份分片。

脑裂问题

集群职责划分

elasticsearch中集群节点有不同的职责划分:

image-20210723223008967

默认情况下,集群中的任何一个节点都同时具备上述四种角色。

但是真实的集群一定要将集群职责分离:

  • master节点:对CPU要求高,但是内存要求低
  • data节点:对CPU和内存要求都高
  • coordinating节点:对网络带宽、CPU要求高

职责分离可以让我们根据不同节点的需求分配不同的硬件去部署。而且避免业务之间的互相干扰。

一个典型的es集群职责划分如图:

image-20210723223629142

脑裂问题

脑裂是因为集群中的节点失联导致的。

例如一个集群中,主节点与其它节点失联:

image-20210723223804995

此时,node2和node3认为node1宕机,就会重新选主:

image-20210723223845754

当node3当选后,集群继续对外提供服务,node2和node3自成集群,node1自成集群,两个集群数据不同步,出现数据差异。

当网络恢复后,因为集群中有两个master节点,集群状态的不一致,出现脑裂的情况:

image-20210723224000555

解决脑裂的方案是,要求选票超过 ( eligible节点数量 + 1 )/ 2 才能当选为主,因此eligible节点数量最好是奇数。对应配置项是discovery.zen.minimum_master_nodes,在es7.0以后,已经成为默认配置,因此一般不会发生脑裂问题。

例如:3个节点形成的集群,选票必须超过 (3 + 1) / 2 ,也就是2票。node3得到node2和node3的选票,当选为主。node1只有自己1票,没有当选。集群中依然只有1个主节点,没有出现脑裂。

分布式存储

相比 es 单体部署,集群部署的 es 如何实现文档的新增和查询?

新增

前边已经说了,拆分成多个分片的索引库,保存数据时会保存存在不同的分片。

比如向之前创建的索引库 itcast(有3个分片) 存储文档:

image-20230814160423483

  • 因为没有安装 kibana,所以这里用 API 调试工具 APIPost 执行 DSL,效果是相同的。
  • 可以向任意一个节点发送请求,因为组成集群的3个节点都可以承担协调节点(coordinating node)职责。

用类似的方式存储 id 分别为 1、2、3 的三条数据。

可以查询这些文档所在的实际分片:

GET /itcast/_search
{"explain": true, // 返回信息中包含所在分片信息"query": {"match_all": {}}
}

返回结果:

{// ..."hits": {// ..."hits": [{"_shard": "[itcast][1]", // 文档位于 itcast 索引库的 1 号分片"_node": "5wH00JaVQMmIBXHdtlFDBw","_index": "itcast","_type": "_doc","_id": "2","_score": 1,"_source": {"msg": "文档2"},// ...},{"_shard": "[itcast][1]","_node": "5wH00JaVQMmIBXHdtlFDBw","_index": "itcast","_type": "_doc","_id": "3","_score": 1,"_source": {"msg": "文档3"},// ...},{"_shard": "[itcast][2]","_node": "EyvF5IibQRGvEFfTMJEQ7g","_index": "itcast","_type": "_doc","_id": "1","_score": 1,"_source": {"msg": "文档1"},// ...}]}
}

返回信息中的_shard中包含所在分片信息。可以看到,文档1在2号分片,文档2和3在1号分片。

实际上,es 会根据以下算法决定文档应当会保存在哪个分片:

shard = hash(_routing) % number_of_shards

公式中的_routing默认为文档 id,number_of_shards则是索引库的分片数目。所以上边的算法和索引库的分片数目是相关的,这也就意味着索引库创建后,索引库使用的分片数目不能改变,否则文档就无法正常保存和查询(按文档 id 查询)。

具体来说,新增文档的流程如下:

image-20210723225436084

图中 P 是主分片(Primary Shard),R 是副本分片(Replica Shard),P-0 是0号主分片,R-0 是0号副本分片。

  • 1)新增一个id=1的文档
  • 2)对id做hash运算,假如得到的是2,则应该存储到shard-2
  • 3)shard-2的主分片在node3节点,将数据路由到node3
  • 4)保存文档
  • 5)同步给shard-2的副本replica-2,在node2节点
  • 6)返回结果给coordinating-node节点

查询

如果是按照文档 ID 查询,很简单,按照新增文档时使用的公式计算其所在分片,然后查找文档并返回即可。

但如果是按照条件查询,就很麻烦,因为无法在一开始就知道要查询的文档所在的分片。

所以此时 es 需要按照两个阶段完成查询:

  • scatter phase:分散阶段,coordinating node会把请求分发到每一个分片
  • gather phase:聚集阶段,coordinating node汇总data node的搜索结果,并处理为最终结果集返回给用户

image-20210723225809848

故障转移

当一个 es 集群中的主节点发生故障无法访问,集群中的候选节点将“选举”产生一个新的主节点。

当 es 集群中的数据节点发生故障,主节点将监控到这种故障,并检查分片是否完整(包含副本分片),如果缺少主分片或者副本分片,将进行数据迁移,以保证分片完整。

演示

我们可以实际演示这种故障转移。

目前我们的 es 集群包含3个节点:

image-20230814163703444

其中 es03 是主节点。

关闭该节点的 docker 容器以模拟主节点故障:

[icexmoon@192 ~]$ docker stop es03

可以通过监控工具观察到故障发生:

image-20230814163840635

一段事件后 es 集群经过故障转移自动恢复正常:

image-20230814164010769

现在 es01 是主节点,且分片恢复成了3个主分片以及3个副本分片的规模。

如果通过 DSL 查询索引库的数据也会发现数据并没有丢失。

如果 es 节点恢复,集群依然可以自动恢复到之前的状态。比如:

[icexmoon@192 ~]$ docker start es03

image-20230814164329379

除了主节点变成 es01 之外,分片依然延续之前的一个节点 2 个分片的结构。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/90700.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【网络基础实战之路】基于BGP协议中的联邦号连接三个AS区域的实战详解

系列文章传送门&#xff1a; 【网络基础实战之路】设计网络划分的实战详解 【网络基础实战之路】一文弄懂TCP的三次握手与四次断开 【网络基础实战之路】基于MGRE多点协议的实战详解 【网络基础实战之路】基于OSPF协议建立两个MGRE网络的实验详解 【网络基础实战之路】基于…

超级品牌,都在打造数据飞轮

更多技术交流、求职机会&#xff0c;欢迎关注字节跳动数据平台微信公众号&#xff0c;回复【1】进入官方交流群 引入 「收钱吧到账15元。」 从北京大栅栏的糖葫芦铺子&#xff0c;到南京夫子庙的鸭血粉丝汤馆&#xff0c;再到广州珠江畔的早茶店&#xff0c;不知不觉间&#xf…

IntelliJ IDEA(简称Idea) 基本常用设置及Maven部署---详细介绍

一&#xff0c;Idea是什么&#xff1f; 前言&#xff1a; 众所周知&#xff0c;现在有许多编译工具&#xff0c;如eclipse&#xff0c;pathon, 今天所要学的Idea编译工具 Idea是JetBrains公司开发的一款强大的集成开发环境&#xff08;IDE&#xff09;&#xff0c;主要用于Java…

基于深度信念神经网络的矿石产量预测,基于DBN的矿石产量预测,DBN的详细原理

目录 背影 DBN神经网络的原理 DBN神经网络的定义 受限玻尔兹曼机(RBM) DBN的矿石产量预测 基本结构 主要参数 数据 MATALB代码 结果图 展望 背影 DBN是一种深度学习神经网络,拥有提取特征,非监督学习的能力,是一种非常好的分类算法,本文将DBN算法进行矿石产量预测 DB…

Markdown编译器的使用

这里写自定义目录标题 欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题&#xff0c;有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants 创建一个自定义列表如何创建一个…

什么是BFC?它有什么作用?如何创建BFC?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 什么是BFC⭐ BFC的作用⭐ 创建BFC的方法⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅&#xff01;这个专栏是为那些对Web…

如何在 3Ds Max 中准确地将参考图像调整为正确的尺寸?

您是否想知道如何在 3Ds Max 中轻松直观地调整参考图像的大小&#xff0c;而无需借助第三方解决方案、插件或脚本&#xff1f; 我问自己这个问题&#xff0c;并高兴地发现了FFD Box 2x2x2&#xff0c;我无法停止钦佩这个修改器的多功能性。 在本文中&#xff0c;我想与您分享一…

SQL server中substring 的用法

一&#xff1a;substring函数是SQL中截取字段数据中的其中一部分 --列&#xff1a;提取abdcsef中的abc数据&#xff0c;使用substring实现select substring(abdcsef,1,3) --‘1’表示截取的起始位置是从第一个字符开始,‘3’表示截取后得到的字符串长度为3个字符 二&#xff1…

item_get_sales-获取TB商品销量详情

一、接口参数说明&#xff1a; item_get_sales-获取商品销量详情&#xff0c;点击更多API调试&#xff0c;请移步注册API账号点击获取测试key和secret 公共参数 请求地址: https://api-gw.onebound.cn/taobao/item_get_sales 名称类型必须描述keyString是调用key&#xff08…

【从零学习python 】29. 「函数参数详解」——了解Python函数参数的不同用法

文章目录 函数参数详解一、缺省参数二、不定长参数三、缺省参数在*args后面可变、不可变类型总结 进阶案例 函数参数详解 一、缺省参数 调用函数时&#xff0c;缺省参数的值如果没有传入&#xff0c;则取默认值。 下例会打印默认的age&#xff0c;如果age没有被传入&#xf…

PLUS操作流程、应用与实践,多源不同分辨率数据的处理、ArcGIS的应用、PLUS模型的应用、InVEST模型的应用

PLUS模型是由中国地质大学&#xff08;武汉&#xff09;地理与信息工程学院高性能空间计算智能实验室开发&#xff0c;是一个基于栅格数据的可用于斑块尺度土地利用/土地覆盖(LULC)变化模拟的元胞自动机(CA)模型。PLUS模型集成了基于土地扩张分析的规则挖掘方法和基于多类型随机…

别人直播的时候怎么录屏?分享一些录屏方法

​随着互联网的快速发展&#xff0c;直播已经成为人们日常生活中不可或缺的一部分。但是&#xff0c;有时候我们可能会错过某些重要的直播内容&#xff0c;这时候就需要录屏来保存和观看。那么&#xff0c;如何录屏别人的直播呢&#xff1f;本文将分享一些录屏方法和技巧&#…

web-vue

<html><head><title>永远朋友</title><script src"../js/vue.js"></script></head><body><div id "app"><input type"text" v-model"message">{{ message }}</div&g…

冠达管理:险资最新重仓股曝光!加仓这些股票

随着上市公司半年报陆续发表&#xff0c;险资最新重仓持股状况也逐渐清晰。 到8月14日&#xff0c;在已发表2023年半年报的上市公司中&#xff0c;超越60家上市公司的前十大流通股东中呈现险资身影。 从职业来看&#xff0c;制造业成为险资的“心头好”。在险资重仓个股中&…

公司电脑三维图纸加密、机械图挡加密软件

机械图纸加密软件的问世&#xff0c;让很多的网络公司都大受其带来的工作中的便利。在安装了机械图纸加密软件后&#xff0c;不仅可以很好的管理员工在工作时的上网娱乐&#xff0c;在对整个公司员工的工作效率上也有着明显的提高&#xff0c;那么对于机械图纸加密软件的具体特…

探索数字孪生的数据之美:实时、多源、多维的未来

在数字孪生的世界里&#xff0c;数据不再是孤立的数字&#xff0c;而是构成了一个真实、动态的虚拟映像&#xff0c;其独特的特点为现代社会带来了前所未有的机遇。 首先&#xff0c;数字孪生的数据特点之一是实时性。在制造业中&#xff0c;数字孪生可以通过实时传感器数据&am…

【网络层+数据链路层】深入理解IP协议和MAC帧协议的基本原理

文章目录 前言一、IP协议二、MAC帧协议 1.以太网2.以太网帧&#xff08;MAC帧&#xff09;格式报头3.基于协议讲解局域网转发的原理总结 前言 为什么经常将TCP/IP放在一起呢&#xff1f;这是因为IP层的核心工作就是通过IP地址来定位主机的&#xff0c;具有将一个数据报从A主机…

【 BERTopic应用 01/3】 分析卡塔尔世界杯推特数据

摄影&#xff1a; Rhett Lewis在 Unsplash上 一、说明 卡塔尔世界杯充满了惊喜&#xff01;从沙特阿拉伯通过击败阿根廷震惊世界到摩洛哥历史性地进入半决赛&#xff0c;你必须听到或见证那些足球热潮中的时刻。在这篇文章中&#xff0c;我将使用 BERTopic 来分析 2022 年世界杯…

对方发送的文件已过期如何恢复,这样做很简单

我们常常使用微信来发送文件、传输文件&#xff0c;但很多人也会遇到文件过期的情况。每当发现文件已过期&#xff0c;都会懊恼自己当初为什么没有早点下载保存。 大家要知道&#xff0c;微信文件如果7天内没有及时下载是会被清理的。不过&#xff0c;大家不要着急&#xff0c…

Kafka第三课

Flume 由三部分 Source Channel Sink 可以通过配置拦截器和Channel选择器,来实现对数据的分流, 可以通过对channel的2个存储容量的的设置,来实现对流速的控制 Kafka 同样由三大部分组成 生产者 服务器 消费者 生产者负责发送数据给服务器 服务器存储数据 消费者通过从服务器取…