Java elasticsearch scroll模板实现

一、scroll说明和使用场景

scroll的使用场景:大数据量的检索和操作

scroll顾名思义,就是游标的意思,核心的应用场景就是遍历 elasticsearch中的数据;

通常我们遍历数据采用的是分页,elastcisearch还支持from size的方式进行分页查询,使用 from and size 的深度分页,比如说 ?size=10&from=10000,因为 100,000 排序的结果必须从每个分片上取出并重新排序最后返回 10 条。这个过程需要对每个请求页重新进行提取+排序,效率很低,消耗很大,所以默认的最大可分页的数据是10000,超过10000是不建议的;

使用

通过在url末尾带上scroll=1m表示开启一个游标,1m表示游标的有效期为1分钟

POST /record/_search?scroll=1m
{
  "from"0,
  "size"20
}

返回结果中会把scroll的id带上,再次查询的时候,直接用scroll id查询即可

alt
POST /_search/scroll
{
    "scroll" : "1m"
    "scroll_id" : "FGluY2x1ZGVfY29udGV4dF91dWlkDnF1ZXJ5VGhlbkZldGNoAhZuYmpMbVpwWFRUMnNFMUFFSHlSMHB3AAAAAALBy_0WUWxrNTRTaWNUcy1sOHQ0VUo5dzF6dxZoemFkZTlMeFQ4MmoyOW5SUG8ybE53AAAAAAN6ip8WMmk5TWZlQ21RQnFsNURwaXRzSGhCdw==" 
}

二、基于ElasticsearchRestTemplate的实现

这里我们定义了一个template如下,主要作用就是实现一个基于scroll的数据遍历模板,屏蔽开启scroll 以及 scroll遍历所有数据,通过Consumer<T>钩子函数进行数据处理

import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.SearchHit;
import org.springframework.data.elasticsearch.core.SearchScrollHits;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;

import java.util.List;
import java.util.concurrent.*;

/**
 * scrollTemplate 模板,用于遍历整个Index的数据
 * @author xiuzhu
 * @Date 2023/7/28 13:12
 */

@Slf4j
public class ElasticSearchScrollTemplate<T{

    ExecutorService executorService = new ThreadPoolExecutor(14,
                                      30,TimeUnit.SECONDS,
                                      new LinkedBlockingQueue<Runnable>(5),
                                        Executors.defaultThreadFactory(),
                                        new ThreadPoolExecutor.CallerRunsPolicy()
                                    );

    ElasticsearchRestTemplate elasticSearchRestTemplate;

    Class<T> cls;

    String indexName;

    public ElasticSearchScrollTemplate(
            ElasticsearchRestTemplate template,
            Class<T> cls,
            String indexName
    )
 
{
        this.elasticSearchRestTemplate = template;
        this.cls = cls;
        this.indexName = indexName;
    }

    @FunctionalInterface
    public interface Consumer<T{
        public void accept(List<T> objects);
    }

    public void execute(Consumer<T> consumer) {
        //构建查询条件
        NativeSearchQueryBuilder query = new NativeSearchQueryBuilder();
        BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();

        query.withPageable(PageRequest.of(0300));
        query.withQuery(queryBuilder);

        //保留0.5分钟
        long scrollTimeInMillis = 30*1000;

        IndexCoordinates recordIndex = IndexCoordinates.of(indexName);
        SearchScrollHits<T> hits = elasticSearchRestTemplate.searchScrollStart(scrollTimeInMillis, query.build(), cls, recordIndex);

        // scrollId
        String scrollId = hits.getScrollId();
        List<T> recordEntityList = hits.stream().map(SearchHit::getContent).toList();
        long total = 0L;

        log.info("================ began scroll index={} ====================", indexName);

        executorService.submit(()->{
            consumer.accept(recordEntityList);
        });

        total = total + recordEntityList.size();

        log.info("================  has scroll index={} total={} ====================", indexName, total);
        while (!hits.isEmpty()) {
            hits = elasticSearchRestTemplate.searchScrollContinue(scrollId, scrollTimeInMillis, cls, recordIndex);
            List<T> entities = hits.stream().map(SearchHit::getContent).toList();

            executorService.submit(()->{
                consumer.accept(entities);
            });

            total = total + entities.size();
            try {
                //给系统留GC时间,不然容易内存溢出
                Thread.sleep(300);
            } catch (InterruptedException e) {
                log.error("sleep error", e);
            }
            log.info("================  has scroll index={} total={} ====================", indexName, total);
        }
        log.info("================ end scroll index={} ====================", indexName);
    }
}

使用参考:

@Resource(name = "elasticSearchRestTemplate")
    ElasticsearchRestTemplate elasticsearchRestTemplate;

new ElasticSearchScrollTemplate<>(
                        elasticsearchRestTemplate,
                        RecordEntity.class,
                        "record")
                ).execute((entities)->
{
                    entities.forEach(item->{
                        //这里进行数据的处理,比如修改数据
                        recordEntityService.save(item);
                        log.info("tag update success record={} api={}", item.getId());

                    });
                });

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/120168.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mybatis-generator-maven-plugin使用

前提说明 数据库&#xff1a;MYSQL57Mybatis : http://mybatis.org/generator/index.html 操作说明 引入插件 <plugins><!-- MyBatis 逆向工程 插件 --><plugin><groupId>org.mybatis.generator</groupId><artifactId>mybatis-generat…

汽车电子系统网络安全解决方案

声明 本文是学习GB-T 38628-2020 信息安全技术 汽车电子系统网络安全指南. 而整理的学习笔记,分享出来希望更多人受益,如果存在侵权请及时联系我们 汽车电子系统网络安全范围 本标准给出了汽车电子系统网络安全活动框架&#xff0c;以及在此框架下的汽车电子系统网络安全活动…

E5061B/是德科技keysight E5061B网络分析仪

181/2461/8938产品概述 是德科技E5061B(安捷伦)网络分析仪在从5 Hz到3 GHz的宽频率范围内提供通用的高性能网络分析。E5061B提供ENA系列常见的出色RF性能&#xff0c;还提供全面的LF(低频)网络测量能力&#xff1b;包括内置1 Mohm输入的增益相位测试端口。E5061B从低频到高频的…

Xcode打包ipa文件,查看app包内文件

1、Xcode发布ipa文件前&#xff0c;在info中打开如下两个选项&#xff0c;即可在手机上查看app包名文件夹下的文件及数据。

Tiny Player Mac:小而美,音乐播放的极致体验

对于追求音质和操作简便的Mac用户来说&#xff0c;Tiny Player Mac是一款不可多得的音乐播放器。它以简洁的界面、强大的功能和优异的性能&#xff0c;吸引了无数用户的目光。接下来&#xff0c;让我们一起了解这款小而美的音乐播放器。 Tiny Player Mac支持多种音频格式&#…

无需租云服务器,Linux本地搭建web服务,并内网穿透发布公网访问

文章目录 前言1. 本地搭建web站点2. 测试局域网访问3. 公开本地web网站3.1 安装cpolar内网穿透3.2 创建http隧道&#xff0c;指向本地80端口3.3 配置后台服务 4. 配置固定二级子域名5. 测试使用固定二级子域名访问本地web站点 前言 在web项目中,部署的web站点需要被外部访问,则…

Python Opencv实践 - 矩形轮廓绘制(直边矩形,最小外接矩形)

import cv2 as cv import numpy as np import matplotlib.pyplot as pltimg cv.imread("../SampleImages/stars.png") plt.imshow(img[:,:,::-1])img_gray cv.cvtColor(img, cv.COLOR_BGR2GRAY) #通过cv.threshold转换为二值图 ret,thresh cv.threshold(img_gray,…

ToBeWritten之基于ATTCK的模拟攻击:闭环的防御与安全运营

也许每个人出生的时候都以为这世界都是为他一个人而存在的&#xff0c;当他发现自己错的时候&#xff0c;他便开始长大 少走了弯路&#xff0c;也就错过了风景&#xff0c;无论如何&#xff0c;感谢经历 转移发布平台通知&#xff1a;将不再在CSDN博客发布新文章&#xff0c;敬…

从零开始探索C语言(三)----运算符和判断语句

文章目录 1. C 运算符1.1 算术运算符1.2 关系运算符1.3 逻辑运算符1.4 位运算符1.5 赋值运算符1.6 杂项运算符 ↦ sizeof & 三元1.7 C 中的运算符优先级 2. C 判断2.1 if 语句2.2 if...else 语句2.3 if...else if...else 语句2.4 ? : 运算符(三元运算符) 1. C 运算符 运算…

ChatGPT数据分析及作图插件推荐-Code Interpreter

今天打开chatGPT时发现一个重磅更新&#xff01;code interpreter插件可以使用了。 去查看openai官网&#xff0c;发现从2023.7.6号&#xff08;前天&#xff09;开始&#xff0c;code interpreter插件已经面向所有chatGPT plus用户开放了。 为什么说code interpreter插件是一…

422规范详解

概述&#xff1a; 全称为EIA-TIA-422-B&#xff0c;于1994年发布。 典型电路由一个发送器和N个接收器以及一个中断匹配电阻组成。 发送器&#xff1a; 差分输出电压值在2V~10V之间。 4.1.1 发送器输出阻抗 要求A/B之间的差分阻抗≤100Ω。 4.1.2 开路特性 要求差分电压≤…

在Cisco设备上配置接口速度和双工

默认情况下&#xff0c;思科交换机将自动协商速度和双工设置。将设备&#xff08;交换机、路由器或工作站&#xff09;连接到 Cisco 交换机上的端口时&#xff0c;将发生协商过程&#xff0c;设备将就传输参数达成一致&#xff0c;当今的大多数网络适配器都支持此功能。 在本文…

C++中的语法知识虚继承和虚基类

多继承(Multiple Inheritance)是指从多个直接基类中产生派生类的能力,多继承的派生类继承了所有父类的成员。尽管概念上非常简单,但是多个基类的相互交织可能会带来错综复杂的设计问题,命名冲突就是不可回避的一个。 多继承时很容易产生命名冲突,即使我们很小心地将所有类…

mac下配置JDK环境

一、下载安装 下载地址&#xff1a;Java Downloads | Oracle&#xff0c;选择适用于Mac OS的JDK版本&#xff0c;点击下载即可。 下载完之后&#xff0c;直接安装&#xff1a; 安装过程非常简单&#xff0c;按“继续”按钮一直下一步即可。 二、配置环境变量 上一步骤&#x…

深度刨析数据在内存中的存储

✨博客主页&#xff1a;小钱编程成长记 &#x1f388;博客专栏&#xff1a;进阶C语言 深度刨析数据在内存中的存储 1.数据类型介绍1.1 类型的基本归类 2.整形在内存中的存储2.1 原码、反码、补码2.2 大小端介绍 3.浮点型在内存中的存储3.1 一个例子3.2 浮点数的存储规则3.3指数…

matlab函数 状态空间系统ss、能控性矩阵ctrb、矩阵的秩rank、能控标准型canon、零极点配置place、系统极点pole等函数(线性定常系统)

matlab函数 能控性矩阵ctrb、能控标准型canon、零极点配置place 第一章&#xff0c;线性定常系统 ss 如果已知线性定常系统的ABCD四个矩阵&#xff0c;可以得到状态空间系统 其他更具体的用法请直接看帮助文档。 用法&#xff1a;ss(A,B,C,D) 假如 可以输入 A [-1.5,-2…

官方发布:Mac 版 Visual Studio IDE将于明年 8 月 31 日停止支持

近日&#xff0c;微软官方宣布&#xff1a;适用于 Mac 平台的 Visual Studio 集成开发环境&#xff08;IDE&#xff09;已经启动 "退休" 进程。Visual Studio for Mac 17.6 将继续支持 12 个月&#xff0c;持续到 2024 年 8 月 31 日。 微软表示在未来的 1 年内将重…

【AWS】如何用SSH连接aws上的EC2实例(虚拟机)?

目录 0.环境 1.连接结果示例 2.SSH连接思路 3.具体步骤 1&#xff09;安装并运行ssh服务 2&#xff09;启动ssh服务 3&#xff09;在AWS上找到正在运行的EC2实例&#xff0c;并且根据提供的ssh连接语句进行连接 0.环境 windows 11 64位 前提&#xff1a; 有aws账户&…

ios 运行ipa包 日志查看方式

方法一&#xff1a; 使用ideviceinstaller工具 # 安装ipa命令 brew install ideviceinstaller ideviceinstaller -i xxx.ipa# 查看运行日志 idevicesyslog# idevicesyslog 查找命令 idevicesyslog | grep test -A 3 -B 2 # 输出关键字所在行后3行&#xff0c;前2行) idevic…

理解FPGA中的亚稳态

一、前言 大家应该经常能听说到亚稳态这个词&#xff0c;亚稳态主要是指触发器的输出在一段时间内不能达到一个确定的状态&#xff0c;过了这段时间触发器的输出随机选择输出0/1&#xff0c;这是我们在设计时需要避免的。本文主要讲述了FPGA中的亚稳态问题&#xff0c;可以帮助…