Rust整合Elasticsearch

Elasticsearch是什么

Lucene:Java实现的搜索引擎类库

  • 易扩展
  • 高性能
  • 仅限Java开发
  • 不支持水平扩展

Elasticsearch:基于Lucene开发的分布式搜索和分析引擎

  • 支持分布式、水平扩展
  • 提高RestfulAPI,可被任何语言调用

Elastic Stack是什么

ELK(Elastic Stack):Elasticsearch结合Kibana、Logstash、Beats实现日志数据分析、实时监控

  • Elasticsearch:负责存储、搜索、分析数据
  • Kibana:数据可视化
  • LogstashBeats:数据抓取(一般用Debezium、Flink、RisingWave…)

Elasticsearch能做什么

  • 实时数据分析:支持对实时数据进行索引和分析,可快速处理大量的日志、指标和事件数据
  • 实时监控:对系统指标、业务数据和用户行为进行实时监控
  • 电商搜索:为电商平台提供商品搜索功能,帮助用户快速找到所需的商品
  • 知识库搜索:为企业内部的文档、知识库和业务数据提供搜索功能,提高员工的工作效率

Elasticsearch 索引

传统数据库使用正向索引,依据id构建B+树,根据索引id查快,对于非索引文档如商品描述查需要全表扫描

倒排索引:将文档分为词条和id进行存储,先查文档获取id,再根据id查数据库

  • 文档(Document):每条数据就是一个Json文档
  • 词条(Term):文档按语义分成的词语

索引(Index):相同类型文档的集合
映射(Mapping):索引中的文档约束信息
字段(Fielf):Json文档中的字段
DSL:Json风格的请求语句,用来实现CRUD

Docker安装Elasticsearch、Kibana、IK

1、先创建自定义网络

使用默认bridge只能通过ip通信,这里加入了自定义网络,自定义网络可以自动解析容器名

  • docker network ls查看已有网络
  • 创建自定义网络docker network create pub-network
  • 手动连接网络docker network connect pub-network container_name_or_id
  • 删除网络docker network rm network_name_or_idid

2、创建文件夹

mkdir -p /opt/es/datamkdir -p /opt/es/pluginsmkdir -p /opt/es/logs

3、授权

chmod -R 777 /opt/es/datachmod -R 777 /opt/es/logs

安装IK分词器

由于ES对中文分词无法理解语义,需要IK插件
https://release.infinilabs.com/analysis-ik/stable/

Elasticsearch、Kibana、IK所有版本保持一致,解压后使用shell工具将整个文件夹上传到/opt/es/plugins

离线部署Elasticsearch、Kibana

在能访问的地方拉取镜像

docker pull elasticsearch:8.15.2docker pull kibana:8.15.2

这里使用wsl,wsl进入wsl,然后进入win的D盘

cd /mnt/d

打包镜像,这个文件可以在win D盘找到

docker save elasticsearch:8.15.2 > elasticsearch.tardocker save kibana:8.15.2 > kibana.tar

使用shell工具如Windterm上传文件

加载镜像

docker load -i elasticsearch.tardocker load -i kibana.tar

查看镜像

docker images

然后命令部署或者docker-compose部署即可

命令部署Elasticsearch、Kibana

部署Elasticsearch

docker run -d \
--name es \
--network pub-network \
--restart always \
-p 9200:9200 \
-p 9300:9300 \
-e "xpack.security.enabled=false" \
-e "discovery.type=single-node" \
-e "http.cors.enabled=true" \
-e "http.cors.allow-origin:*" \
-e "ES_JAVA_OPTS=-Xms512m -Xmx512m" \
-v /opt/es/data:/usr/share/elasticsearch/data \
-v /opt/es/plugins:/usr/share/elasticsearch/plugins \
-v /opt/es/logs:/usr/share/elasticsearch/logs \
--privileged=true \
elasticsearch:8.15.2

xpack.security.enabled=false禁用密码登录

如果要使用token: -e "xpack.security.enrollment.enabled=true" \

docker部署一般用于开发,不要为难自己,使用token会有很多问题,生产环境再开,使用SSl需要证书

部署Kibana

docker run -d \
--name kibana \
--network pub-network \
--restart always \
-p 5601:5601 \
-e CSP_STRICT=false \
-e I18N_LOCALE=zh-CN \
kibana:8.15.2

报错kibana 服务器尚未准备就绪,是因为配置了ELASTICSEARCH_HOSTS

docker-compose部署Elasticsearch、Kibana

  es:image: elasticsearch:8.15.2container_name: esnetwork_mode: pub-networkrestart: alwaysports:# 9200:对外暴露的端口- 9200:9200# 9300:节点间通信端口- 9300:9300environment:# 禁用密码登录xpack.security.enabled: 'false'# 单节点运行discovery.type: single-node# 允许跨域http.cors.enabled: 'true'# 允许所有访问http.cors.allow-origin: '*'# 堆内存大小ES_JAVA_OPTS: '-Xms512m -Xmx512m'volumes:# 数据挂载- /opt/es/data:/usr/share/elasticsearch/data# 插件挂载- /opt/es/plugins:/usr/share/elasticsearch/plugins# 日志挂载- /opt/es/logs:/usr/share/elasticsearch/logs# 允许root用户运行privileged: truekibana:image: kibana:8.15.2container_name: kibananetwork_mode: pub-networkrestart: alwaysports:- 5601:5601environment:# 禁用安全检查CSP_STRICT: 'false'# 设置中文I18N_LOCALE: zh-CN
networks:pub-network:name: pub-network

部署

docker-compose up -d

删除Elasticsearch、Kibana

docker rm -f esdocker rm -f kibana

开启安全配置(可选,如果要用密码和token)

es8开始需要密码访问,kibana通过token访问

# 生成密码
docker exec -it es /usr/share/elasticsearch/bin/elasticsearch-reset-password -u elastic
# 生成kibana访问token
docker exec -it es /usr/share/elasticsearch/bin/elasticsearch-create-enrollment-token -s kibana

访问Elasticsearch、Kibana

Elasticsearch:127.0.0.1:9200,看到以下界面就部署成功了

在这里插入图片描述

Kibana:127.0.0.1:5601看到以下界面就部署成功了

访问:http://127.0.0.1:9200/.kibana跨域查看有没有发现可视化工具kibana

在这里插入图片描述

我们选择手动配置,使用http://es:9200,我们没有配置ssl只能用http,容器名为es

在这里插入图片描述

在终端运行命令查看日志中的验证码

docker logs kibana

在这里插入图片描述

使用

在这里插入图片描述

GET /_analyze
{"analyzer": "ik_max_word","text": "好好学习天天向上"
}

如果一个字为一个词条,就说明分词插件IK没装好,重新安装后重启容器docker restart es

在这里插入图片描述

分词原理

依据字典进行分词

对于一些新词语,如铝合金键盘被称为“铝坨坨”,词典中没有这个词语,会将其逐字分词
在这里插入图片描述

分词流程

  • 1、character filters:字符过滤器,进行原始处理,如转换编码、去停用词、转小写
  • 2、tokenizer:分词器,将文本流进行分词为词条
  • 3、tokenizer filter:将词条进行进一步处理,如同义词处理、拼音处理

扩展词库

在IK插件config/IKAnalyzer.cfg.xml中添加

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties><comment>IK Analyzer 扩展配置</comment><!--用户可以在这里配置自己的扩展字典 --><entry key="ext_dict">ext.dic</entry><!--用户可以在这里配置自己的扩展停止词字典--><entry key="ext_stopwords">stopword.dic</entry><!--用户可以在这里配置远程扩展字典 --><!-- <entry key="remote_ext_dict">words_location</entry> --><!--用户可以在这里配置远程扩展停止词字典--><!-- <entry key="remote_ext_stopwords">words_location</entry> -->
</properties>

停用词库

例如敏感词

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties><comment>IK Analyzer 扩展配置</comment><!--用户可以在这里配置自己的扩展字典 --><entry key="ext_stopwords">stopword.dic</entry>
</properties>

使用

生产使用可以用AI、ELP进行分词

修改配置,添加扩展词库和停用词库

vim /opt/es/plugins/elasticsearch-analysis-ik-8.15.2/config/IKAnalyzer.cfg.xml

这里新建一个词库

touch /opt/es/plugins/elasticsearch-analysis-ik-8.15.2/config/ext.dic

编辑扩展词库

vim /opt/es/plugins/elasticsearch-analysis-ik-8.15.2/config/ext.dic

添加分词

铝坨坨

编辑停用词库

vim /opt/es/plugins/elasticsearch-analysis-ik-8.15.2/config/stopword.dic

添加

重启ES

docker restart es

测试分词

GET /_analyze
{"analyzer": "ik_max_word","text": "重重的铝坨坨"
}

可以看到扩展词库的“铝坨坨”被分词识别出来了,“的”没有被分词
在这里插入图片描述

分词作用

  • 创建倒排索引时对文档分词
  • 用户搜索时对输入的内容分词

IK分词模式

  • ik_smart:智能切分,粗粒度
  • ik_max_word:最细切分,细粒度

DSL 索引操作

  • 仅允许GET, PUT, DELETE, HEAD
  • mapping:对索引库中文档的约束,常见的属性有
    • type:字段数据类型
      • 字符串:text(可分词的文本)、keyword(不分词的精确值,合在一起有意义的词,如国家、品牌)
      • 数值:longintegershortbytedoublefloat
      • 布尔:boolean
      • 日期:date
      • 对象:object
    • index:是否创建倒排索引,默认true
    • analyzer:使用哪种分词器
    • properties:字段的子字段

添加索引库,每次写入操作版本都会+1,如添加(POST)、更新(PUT)

索引库mgr

PUT /mgr
{"mappings": {"properties": {"info": {"type": "text","analyzer": "ik_smart"},"email": {"type": "keyword","index": false},"name": {"type": "object","properties": {"firstName": {"type": "keyword"},"lastName": {"type": "keyword"}}}}}
}

查询索引库

GET /mgr

更新索引库(索引库禁止修改,因为索引库建立倒排索引后无法修改,只能添加新字段)

PUT /mgr/_mapping
{"properties":{"age":{"type":"integer"}}
}

删除索引库

DELETE /mgr

DSL文档操作

添加文档

索引库mgr/文档/文档id

POST /mgr/_doc/1
{"info": "铝坨坨键盘","email": "11111@gmail.com","name": {"firstName": "C","lastName": "I"}
}

查询文档

GET /mgr/_doc/1

更新文档

全量更新,删除旧文档,添加新文档

如果文档id不存在则与添加文档功能相同

PUT /mgr/_doc/1
{"info": "铝坨坨键盘","email": "222@gmail.com","name": {"firstName": "C","lastName": "I"}
}

增量更新(局部更新)

指定_update,指定文档doc

POST /mgr/_update/1
{"doc": {"email": "333@gmail.com"}
}

删除文档

DELETE /mgr/_doc/1

Rust客户端操作Elasticsearch

添加Cargo.toml

elasticsearch = "8.15.0-alpha.1"
# 序列化和反序列化数据
serde = { version = "1.0.127", features = ["derive"] }
# 序列化JSON
serde_json = "1.0.128"
tokio = { version = "1", features = ["full"] }
# 异步锁
once_cell = "1.20.2"

添加环境变量.env

# 指定当前配置文件
RUN_MODE=development

添加配置settings\development.toml

debug = true
# 指定开发环境配置
profile = "development"
[es]
host = "127.0.0.1"

获取配置config\es.rs

use serde::Deserialize;
#[derive(Debug, Deserialize, Clone)]
pub struct EsConfig {host: String,port: u16,
}
impl EsConfig {// 获取redis连接地址pub fn get_url(&self) -> String {format!("http://{host}:{port}", host = self.host, port = self.port)}
}

将配置存放到AppConfig

#[derive(Debug, Deserialize, Clone)]
pub struct AppConfig {pub es:EsConfig,
}
impl AppConfig {pub fn read(env_src: Environment) -> Result<Self, config::ConfigError> {// 获取配置文件目录let config_dir = get_settings_dir()?;info!("config_dir: {:#?}", config_dir);// 获取配置文件环境let run_mode = std::env::var("RUN_MODE").map(|env| Profile::from_str(&env).map_err(|e| ConfigError::Message(e.to_string()))).unwrap_or_else(|_e| Ok(Profile::Dev))?;// 当前配置文件名let profile_filename = format!("{run_mode}.toml");// 获取配置let config = config::Config::builder()// 添加默认配置.add_source(config::File::from(config_dir.join("default.toml")))// 添加自定义前缀配置.add_source(config::File::from(config_dir.join(profile_filename)))// 添加环境变量.add_source(env_src).build()?;info!("Successfully read config profile: {run_mode}.");// 反序列化config.try_deserialize()}
}
// 获取配置文件目录
pub fn get_settings_dir() -> Result<std::path::PathBuf, ConfigError> {Ok(get_project_root().map_err(|e| ConfigError::Message(e.to_string()))?.join("settings"))
}
#[cfg(test)]
mod tests {use crate::config::profile::Profile;use self::env::get_env_source;pub use super::*;#[test]pub fn test_profile_to_string() {// 设置dev模式let profile: Profile = Profile::try_from("development").unwrap();println!("profile: {:#?}", profile);assert_eq!(profile, Profile::Dev)}#[test]pub fn test_read_app_config_prefix() {// 读取配置let config = AppConfig::read(get_env_source("APP")).unwrap();println!("config: {:#?}", config);}
}

将配置存放到全局constant\mod.rs

// 环境变量前缀
pub const ENV_PREFIX: &str = "APP";
// 配置
pub static CONFIG: Lazy<crate::config::AppConfig> = Lazy::new(||crate::config::AppConfig::read(get_env_source(ENV_PREFIX)).unwrap()
);

加载配置文件client\builder.rs

use crate::config::AppConfig;
// 传输配置文件到客户端
pub trait ClientBuilder: Sized {fn build_from_config(config: &AppConfig) -> Result<Self,InfraError>;
}

Es客户端client\es.rs

InfraError为自定义错误,请修改为你想要的错误,如标准库错误

// 类型别名
pub type EsClient = Arc<Elasticsearch>;
// 加载配置文件
pub trait EsClientExt: Sized {fn build_from_config(config: &AppConfig) -> impl Future<Output = Result<Self, InfraError>>;
}impl EsClientExt for EsClient {async fn build_from_config(config: &AppConfig) -> Result<Self, InfraError> {// 1、使用single_node方式创建client// let transport = Transport::single_node(&config.es.get_url()).unwrap();// let client = Elasticsearch::new(transport);// Ok(Arc::new(client))// 2、使用builder方式创建client,可以添加多个urllet url = config.es.get_url();let url_parsed = url.parse::<elasticsearch::http::Url>().map_err(|_| InfraError::OtherError("url err".to_string()))?;let conn_pool = SingleNodeConnectionPool::new(url_parsed);let transport = TransportBuilder::new(conn_pool).disable_proxy().build().map_err(|_| InfraError::OtherError("transport err".to_string()))?;let client = Elasticsearch::new(transport);Ok(Arc::new(client))}
}

测试client\es.rs,所有请求在body()中定义DSL语句,通过send()发送

#[cfg(test)]
mod tests {use elasticsearch::{ cat::CatIndicesParts, DeleteParts, IndexParts, UpdateParts };use serde_json::json;use super::*;use crate::constant::CONFIG;#[tokio::test]async fn test_add_document() {let client_result = EsClient::build_from_config(&CONFIG).await;assert!(client_result.is_ok());let client = client_result.unwrap();let response = client.index(IndexParts::IndexId("mgr", "1")).body(json!({"id": 1,"user": "cci","post_date": "2024-01-15T00:00:00Z","message": "Trying out Elasticsearch, so far so good?"})).send().await;assert!(response.is_ok());let response = response.unwrap();assert!(response.status_code().is_success());}#[tokio::test]async fn test_get_indices() {let client_result = EsClient::build_from_config(&CONFIG).await;assert!(client_result.is_ok());let client = client_result.unwrap();let get_index_response = client.cat().indices(CatIndicesParts::Index(&["*"])).send().await;assert!(get_index_response.is_ok());}#[tokio::test]async fn test_update_document() {let client_result = EsClient::build_from_config(&CONFIG).await;assert!(client_result.is_ok());let client = client_result.unwrap();let update_response = client.update(UpdateParts::IndexId("mgr", "1")).body(json!({"doc": {"message": "Updated message"}})).send().await;assert!(update_response.is_ok());let update_response = update_response.unwrap();assert!(update_response.status_code().is_success());}#[tokio::test]async fn test_delete_document() {let client_result = EsClient::build_from_config(&CONFIG).await;assert!(client_result.is_ok());let client = client_result.unwrap();let delete_response = client.delete(DeleteParts::IndexId("mgr", "1")).send().await;assert!(delete_response.is_ok());let delete_response = delete_response.unwrap();assert!(delete_response.status_code().is_success());}
}

使用流程

        // 1、创建clientlet client_result = EsClient::build_from_config(&CONFIG).await;assert!(client_result.is_ok());let client = client_result.unwrap();// 2、定义DSL语句let mut body: Vec<JsonBody<_>> = Vec::with_capacity(4);// 添加文档body.push(json!({"index": {"_id": "1"}}).into());body.push(json!({"id": 1,"user": "kimchy","post_date": "2009-11-15T00:00:00Z","message": "Trying out Elasticsearch, so far so good?"
}).into());// 添加文档body.push(json!({"index": {"_id": "2"}}).into());body.push(json!({"id": 2,"user": "forloop","post_date": "2020-01-08T00:00:00Z","message": "Bulk indexing with the rust client, yeah!"
}).into());// 3、发送请求let response = client.bulk(BulkParts::Index("mgr")).body(body).send().await.unwrap();

项目地址:https://github.com/VCCICCV/MGR

分析数据结构

mapping要考虑的问题:字段名、数据类型、是否参与搜索(建立倒排索引"index":false,默认true)、是否分词(参与搜索的字段,text分词,keyword、数据类型不分词)、分词器

  • 地理坐标:
    • geo_point:由经度(longitude)和纬度(latitude)确定的一个点,如[ 13.400544, 52.530286 ]
    • geo_shape:由多个geo_point组成的几何图形,如一条线[[13.0, 53.0], [14.0, 52.0]]
  • copy_to:将多个字段组合为一个字段进行索引

Rust客户端操作索引库

生产环境不要使用unwrap()

这里演示在请求正文中操作,使用send()

Transport支持的方法Method

  • Get:获取资源
  • Put:创建或更新资源(全量更新)
  • Post:创建或更新资源(部分更新)
  • Delete:删除资源
  • Head:获取头信息

send()请求正文需要包含的参数:

  • method:必须
  • path:必须
  • headers:必须
  • query_string:可选
  • body:可选
  • timeout:可选

添加索引库

    #[tokio::test]async fn test_create_index() {// 1、创建clientlet client_result = EsClient::build_from_config(&CONFIG).await;assert!(client_result.is_ok());let client = client_result.unwrap();// 2、定义DSL语句let index_name = "mgr";let index_definition =json!({"mappings":{"properties":{"age":{"type":"integer"}}}});let body = Some(serde_json::to_vec(&index_definition).unwrap());let path = format!("/{}", index_name);let headers = HeaderMap::new();let query_string = None;let timeout = None;let method = Method::Put;// 3、发送请求let response = client.send::<Vec<u8>, ()>(method,&path,headers,query_string,body,timeout).await;assert!(response.is_ok());let response = response.unwrap();assert_eq!(response.status_code().is_success(), true);}

你也可以将其简化

    #[tokio::test]async fn test_create_index() {// 1、创建clientlet client = EsClient::build_from_config(&CONFIG).await.unwrap();// 2、定义DSLlet index_definition =json!({"mappings":{"properties":{"age":{"type":"integer"}}}});// 3、发送请求let response = client.send::<Vec<u8>, ()>(Method::Put,format!("/mgr").as_str(),HeaderMap::new(),None,Some(index_definition.to_string().as_bytes().to_vec()),None).await;assert!(response.is_ok());let response = response.unwrap();assert_eq!(response.status_code().is_success(), true);}

查询索引库是否存在

    #[tokio::test]async fn test_query_index() {// 1、创建 clientlet client = EsClient::build_from_config(&CONFIG).await.unwrap();// 2、定义查询 DSL 语句let query = json!({"query": {"match_all": {}}});// 3、发送请求let response = client.send::<Vec<u8>, ()>(Method::Get,format!("/mgr/_search").as_str(),HeaderMap::new(),None,Some(query.to_string().as_bytes().to_vec()),None).await;assert!(response.is_ok());let response = response.unwrap();println!("{:?}", response);assert_eq!(response.status_code().is_success(), true);}

也可以不定义DSL查询

    #[tokio::test]async fn test_query_index2() {// 1、创建 clientlet client = EsClient::build_from_config(&CONFIG).await.unwrap();// 2、发送请求let response = client.send::<Vec<u8>, ()>(Method::Get,format!("/mgr").as_str(),HeaderMap::new(),None,None,None).await;assert!(response.is_ok());let response = response.unwrap();println!("{:?}", response);assert_eq!(response.status_code().is_success(), true);}

更新索引库

    #[tokio::test]async fn test_update_index() {// 1、创建 clientlet client = EsClient::build_from_config(&CONFIG).await.unwrap();// 2、定义查询 DSL 语句let update_content = json!({"properties":{"age":{"type":"integer"}}});// 3、发送请求let response = client.send::<Vec<u8>, ()>(Method::Put,format!("/mgr/_mapping").as_str(),HeaderMap::new(),None,Some(update_content.to_string().as_bytes().to_vec()),None).await;assert!(response.is_ok());let response = response.unwrap();println!("{:?}", response);assert_eq!(response.status_code().is_success(), true);}

删除索引库

    #[tokio::test]async fn test_delete_index() {// 1、创建 clientlet client = EsClient::build_from_config(&CONFIG).await.unwrap();// 2、发送请求let response = client.send::<(), ()>(Method::Delete,format!("/mgr").as_str(),HeaderMap::new(),None,None,None).await;assert!(response.is_ok());let response = response.unwrap();assert_eq!(response.status_code().is_success(), true);}

Rust客户端操作文档

添加文档

    #[tokio::test]async fn test_create_doc() {// 1、创建 clientlet client = EsClient::build_from_config(&CONFIG).await.unwrap();// 2、定义查询 DSL 语句let doc_content =json!({"id": "1","user": "kimchy","post_date": "2009-11-15T00:00:00Z","message": "Trying out Elasticsearch, so far so good?"});// 3、发送请求let response = client.send::<Vec<u8>, ()>(Method::Post,format!("/mgr/_doc/1").as_str(),HeaderMap::new(),None,Some(doc_content.to_string().as_bytes().to_vec()),None).await;assert!(response.is_ok());let response = response.unwrap();println!("{:?}", response);assert_eq!(response.status_code().is_success(), true);}

查询文档是否存在

    #[tokio::test]async fn test_get_doc() {// 1、创建 clientlet client = EsClient::build_from_config(&CONFIG).await.unwrap();// 2、发送请求let response = client.send::<Vec<u8>, ()>(Method::Get,format!("/mgr/_doc/1").as_str(),HeaderMap::new(),None,None,None).await;assert!(response.is_ok());let response = response.unwrap();println!("{:?}", response);assert_eq!(response.status_code().is_success(), true);}

更新文档

    #[tokio::test]async fn test_update_doc() {// 1、创建 clientlet client = EsClient::build_from_config(&CONFIG).await.unwrap();// 2、定义查询 DSL 语句let doc_content =json!({"doc": {"message": "Updated message"}});// 3、发送请求let response = client.send::<Vec<u8>, ()>(Method::Post,format!("/mgr/_update/1").as_str(),HeaderMap::new(),None,Some(doc_content.to_string().as_bytes().to_vec()),None).await;assert!(response.is_ok());let response = response.unwrap();println!("{:?}", response);assert_eq!(response.status_code().is_success(), true);}

删除文档

    #[tokio::test]async fn test_delete_doc() {// 1、创建 clientlet client = EsClient::build_from_config(&CONFIG).await.unwrap();// 2、发送请求let response = client.send::<Vec<u8>, ()>(Method::Delete,format!("/mgr/_doc/1").as_str(),HeaderMap::new(),None,None,None).await;assert!(response.is_ok());let response = response.unwrap();println!("{:?}", response);assert_eq!(response.status_code().is_success(), true);}

批量添加文档

    #[tokio::test]async fn test_bulk_add_to_mgr() {// 1、创建clientlet client_result = EsClient::build_from_config(&CONFIG).await;assert!(client_result.is_ok());let client = client_result.unwrap();// 2、定义DSL语句let mut body: Vec<JsonBody<_>> = Vec::with_capacity(4);// 添加第一个操作和文档body.push(json!({"index": {"_id": "1"}}).into());body.push(json!({"id": 1,"user": "kimchy","post_date": "2009-11-15T00:00:00Z","message": "Trying out Elasticsearch, so far so good?"
}).into());// 添加第二个操作和文档body.push(json!({"index": {"_id": "2"}}).into());body.push(json!({"id": 2,"user": "forloop","post_date": "2020-01-08T00:00:00Z","message": "Bulk indexing with the rust client, yeah!"
}).into());// 3、发送请求let response = client.bulk(BulkParts::Index("mgr")).body(body).send().await.unwrap();assert!(response.status_code().is_success());}

Rust客户端操作搜索

这里演示在请求体body中进行API调用

  • 查询所有:查出所有数据
  • 全文检索查询(full text):利用分词器对内容分词,从倒排索引库中查询
    • match_query
    • multi_match_query
  • 精确查询:根据精确值查询,如integer、keyword、日期
    • id
    • range:根据值的范围查询
    • term:根据词条精确值查询
  • 地理坐标查询(geo):根据经纬度查询
    • geo_distance:查询geo_point指定距离范围内的所有文档
    • geo_bounding_box:查询geo_point值落在某个矩形范围内的所有文档
  • 复合查询(compound):将上述条件组合起来

查询所有

默认10条

    #[tokio::test]async fn test_search_match_all() {// 1、创建 clientlet client = EsClient::build_from_config(&CONFIG).await.unwrap();// 2. 执行搜索let response = client.search(SearchParts::Index(&["mgr"])).from(0).size(5).body(json!({"query": {"match_all": {}}})).send().await.unwrap();// 3. 解析响应let response_body = response.json::<Value>().await.unwrap();// 搜索耗时let took = response_body["took"].as_i64().unwrap();println!("took: {}ms", took);// 搜索结果for hit in response_body["hits"]["hits"].as_array().unwrap() {println!("{:?}", hit["_source"]);}}

等价于

GET /mgr/_search
{"query": {"match_all": {}}
}

全文搜索

message为文档中的字段

    #[tokio::test]async fn test_search_match() {// 1、创建 clientlet client = EsClient::build_from_config(&CONFIG).await.unwrap();// 2. 执行搜索let response = client.search(SearchParts::Index(&["mgr"])).from(0).size(5).body(json!({"query": {"match": {"message": "good"}}})).send().await.unwrap();// 3. 解析响应let response_body = response.json::<Value>().await.unwrap();// 搜索耗时let took = response_body["took"].as_i64().unwrap();println!("took: {}ms", took);// 搜索结果for hit in response_body["hits"]["hits"].as_array().unwrap() {println!("{:?}", hit["_source"]);}}

相当于

GET /mgr/_search
{"query": {"match": {"message": "good"}}
}

多字段查询

多字段查询效率低,一般在创建时使用copy_to到一个字段中

    #[tokio::test]async fn test_search_multi_match() {// 1、创建 clientlet client = EsClient::build_from_config(&CONFIG).await.unwrap();// 2. 执行搜索let response = client.search(SearchParts::Index(&["mgr"])).from(0).size(5).body(json!({"query": {"multi_match": {"query": "good","fields": ["message","user"]}}})).send().await.unwrap();// 3. 解析响应let response_body = response.json::<Value>().await.unwrap();// 搜索耗时let took = response_body["took"].as_i64().unwrap();println!("took: {}ms", took);// 搜索结果for hit in response_body["hits"]["hits"].as_array().unwrap() {println!("{:?}", hit["_source"]);}}

相当于

GET /mgr/_search
{"query": {"multi_match": {"query": "good","fields": ["message","user"]}}
}

根据范围查询(range)

gte大于等于,lte小于等于;gt大于lt小于

    #[tokio::test]async fn test_search_range() {// 1、创建 clientlet client = EsClient::build_from_config(&CONFIG).await.unwrap();// 2. 执行搜索let response = client.search(SearchParts::Index(&["mgr"])).from(0).size(5).body(json!({"query": {"range": {"id": {"gte": 1,"lte": 1}}}})).send().await.unwrap();// 3. 解析响应let response_body = response.json::<Value>().await.unwrap();// 搜索耗时let took = response_body["took"].as_i64().unwrap();println!("took: {}ms", took);// 搜索结果for hit in response_body["hits"]["hits"].as_array().unwrap() {println!("{:?}", hit["_source"]);}}

相当于

GET /mgr/_search
{"query": {"range": {"id": {"gte": 1,"lte": 1}}}
}

根据词条精确查询(term)

    #[tokio::test]async fn test_search_term() {// 1、创建 clientlet client = EsClient::build_from_config(&CONFIG).await.unwrap();// 2. 执行搜索let response = client.search(SearchParts::Index(&["mgr"])).from(0).size(5).body(json!({"query": {"term": {"user": "kimchy"}}})).send().await.unwrap();// 3. 解析响应let response_body = response.json::<Value>().await.unwrap();// 搜索耗时let took = response_body["took"].as_i64().unwrap();println!("took: {}ms", took);// 搜索结果for hit in response_body["hits"]["hits"].as_array().unwrap() {println!("{:?}", hit["_source"]);}}

相当于

GET /mgr/_search
{"query": {"term": {"user": "kimchy"}}
}

根据地理距离查询

GET /mgr/_search
{"query": {"geo_distance": {"distance": "100km","location": "31.04, 45.12"}}
}

根据指定矩形范围查询

左上经纬度与右下经纬度
geo为文档中的字段

GET /mgr/_search
{"query": {"geo_bounding_box": {"geo": {"top_left": {"lon": 124.45,"lat": 32.11},"bottom_right": {"lon": 125.12,"lat": 30.21}}}}
}

复合查询

查询时文档会对搜索词条的关联度打分_score,返回结果时按照降序排列

关联度计算方法

  • TF-IDF算法(ES5.0之前)

TF(词条频率)= 词条出现次数/文档中词条总数

IDF(逆文档频率)=log(文档总数/包含词条的文档总数)

score = ∑(𝑖=1,𝑛)(TF*IDF):将词条频率与逆文档频率相乘再求和

  • BM25算法(ES5.0之后)

默认采用BM25算法:考虑了TF、IDF、文档长度等因素,能够平衡长短文的关联度

在这里插入图片描述

function_score修改关联度

指定文档和算分函数

GET /mgr/_search
{"query": {"function_score": {"query": {"match": {// 查询方法"message": "good"}},"functions": [ // 算分函数{"filter": {// 只有符合过滤条件的才被计算"term": {// 根据词条精确查询"id": 1}},"weight": 3 // 指定加权函数}],// 加权模式:相乘"boost_mode": "multiply"}}
}
  • weight:给定常量值,还可以指定以下值

  • field_value_factor:用文档中的指定字段值作为函数结果

  • random_score:随机生成一个值

  • script_score:自定义计算公式

  • boost_mode:加权模式,multiply与原来的_score相乘,还可以配置:

  • replace:替换原来的_score

  • sum:求和

  • avg:取平均值

  • min:取最小值

  • max:取最大值
    相当于

    #[tokio::test]async fn test_function_score_query() {// 1、创建 clientlet client = EsClient::build_from_config(&CONFIG).await.unwrap();// 2. 执行搜索let response = client.search(SearchParts::Index(&["mgr"])).from(0).size(5).body(json!({"query": {"function_score": {"query": {"match": {// 查询方法"message": "good"}},"functions": [ // 算分函数{"filter": {// 只有符合过滤条件的才被计算"term": {// 根据词条精确查询"id": 1}},"weight": 3 // 指定加权函数}],// 加权模式:相乘"boost_mode": "multiply"}}})).send().await.unwrap();// 3. 解析响应let response_body = response.json::<Value>().await.unwrap();// 搜索耗时let took = response_body["took"].as_i64().unwrap();println!("took: {}ms", took);// 搜索结果for hit in response_body["hits"]["hits"].as_array().unwrap() {println!("{:?}", hit["_source"]);}}

boolean query 布尔查询

布尔查询是一个或多个子句查询的组合,组合方式有

  • must:必须匹配每个子查询,类似于“与”
  • should:选择性匹配子查询,类似于“或”
  • must_not:必须不匹配,不参与算分,类似于“非”
  • filter:必须匹配,

查询message中包含rust,post_date不小于2020年1月1日的文档

GET /mgr/_search
{"query": {"bool": {"must": [{"match_phrase": {"message": "rust"}}],"must_not": [{"range": {"post_date": {"lt": "2020-01-01T00:00:00Z"}}}]}}
}

搜索结果处理

排序

GET /mgr/_search
{"query": {"match_all": {}},"sort": [{"id": "desc"// ASC升序,DESC降序}]
}

地理位置排序

GET /mgr/_search
{"query": {"match_all": {}},"sort": [{"_geo_distance":{"FIELD": {"lat": 40,// 纬度"lon": -70// 经度},"order":"asc",// 排序方式"unit":"km" // 单位}}]
}

分页

1、from+size分页查询(默认10条数据)

GET /mgr/_search
{"query": {"match_all": {}},"from":1,// 分页开始位置"size":10,// 期望获取的文档总数"sort": [{"id": "desc"// ASC升序,DESC降序}]
}

深度分页问题:一般将ES作为分布式部署,当需要"from"=990,"size"=10查数据时:
1、先从每个数据分片上查询前1000条数据
2、将所有节点的结果聚合,在内存中重新排序选出前1000条文档
3、在这1000条文档中选取"from"=990,"size"=10的数据

如果搜索页数过深,或者结果集(from+size)越大,对内存和CPU的消耗越高,因此ES设定的查询上限是10000

深度分页解决方案:

2、search after分页查询:分页时排序,从上一次的排序值开始查询下一页文档(只能向后查询)

3、scroll分页查询:将排序数据形成快照,保存在内存中(内存消耗大,官方不推荐)

高亮处理

搜索键盘时关键字高亮
在这里插入图片描述

highlight指定高亮字段

默认搜索字段和高亮字段匹配才高亮

GET /mgr/_search
{"query": {"match": {"message":"rust"// 搜索message中包含rust的文档}},"highlight":{"fields":{"message":{// 指定高亮字段"require_field_match":"false"// 搜索字段和高亮字段可以不匹配}}}
}

数据聚合

聚合(aggregations)可以实现对文档数据的统计、分析、运算,聚合分类:

  • 桶(Buket):用来对数据分组
    • https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket.html
    • TermAggregation:按文档字段或词条值分组
    • Date Histogram:按日期阶梯分组,如一周为一组
  • 度量(Metric):用于计算一些值,如最大值、最小值、平均值
    • https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics.html
    • Avg:求平均值
    • Max:求最大值
    • Min:求最小值
    • Sum:求和
    • Stats:同时求Max、Min、Avg、Sum等
  • 管道(pipeline):以其他聚合的结果作为聚合的基础
    • https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-pipeline.html

桶(Buket)

Buket默认统计其中的文档数量_count,并且按照降序排序

GET /mgr/_search
{"size":0,// 文档大小,结果不包含文档,只包含聚合结果"aggs": {//指定聚合"idAgg": {// 聚合名"terms": {// 精确查询"field":"id",// 指定字段"order":{"_count":"asc"// 按升序排序}}}}
}

度量(Metric)

GET /mgr/_search
{"size":0,// 文档大小,结果不包含文档,只包含聚合结果"aggs": {//指定聚合"idAgg": {// 聚合名"terms": {// 精确查询"field":"id",// 指定字段"size":20},"aggs":{// 子聚合"score_stats":{// 聚合名"max":{//聚合类型,min、max、avg等"field":"score"// 聚合字段}}}}}
}

自动补全

拼音补全

如果你想要通过拼音补全,请下载解压拼音分词器上传到/opt/es/plugins目录然后重启es
https://github.com/infinilabs/analysis-pinyin/releases

  • 补全字段必须是completion类型
  • 拼音分词需要自定义分词器

进行拼音分词:创建索引并设置字段类型为completion,同时指定先分词再根据词条过滤(如果不自定义分词器,默认将每个汉字单独分为拼音,所以先分词词条再进行拼音处理),其他设置见github仓库

PUT /test
{"settings": {// 设置"analysis": {"analyzer": {// 设置分词器"my_analyzer": {// 分词器名"filters": ["lowercase",// 转小写"stop"// 去停用词],"tokenizer": "ik_max_word", // 分词器"filter": "py" // 过滤时进行拼音}}},"filter": { // 自定义tokenizer filter"py": { // 过滤器名称"type": "pinyin", // 过滤器类型,这里是pinyin"keep_full_pinyin": false,// 是否保留完整的拼音形式"keep_joined_full_pinyin": true,// 是否保留连接起来的完整拼音形式"keep_original": true,// 是否保留原始的文本内容"limit_first_letter_length": 16,// 限制拼音首字母的长度为 16"remove_duplicated_term": true,// 是否移除重复的词条"none_chinese_pinyin_tokenize": false// 不对非中文字符进行拼音分词}}},"mappings": {"properties": {"user": {"type": "completion"}}}
}

不进行拼音分词:创建索引并设置字段类型为completion

PUT /test
{"mappings": {"properties": {"user": {"type": "completion"}}}
}

添加文档

POST /test/_doc/1
{"id": 1,"message": "Trying out Elasticsearch, so far so good?","post_date": "2009-11-15T00:00:00Z","user": "kimchy"
}

根据关键字查询补全

GET /test/_search
{"suggest": {"YOUR_SUGGESTION": {// 指定自动补全查询名字"text": "k",// 关键字前缀"completion": {// 自动补全类型"field": "user",// 补全字段"skip_duplicates": true,// 是否跳过重复的建议"size": 10 // 获取前10条结果}}}
}

所有代码地址:https://github.com/VCCICCV/MGR/blob/main/auth/infrastructure/src/client/es.rs

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/462355.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

opencv-day2-图像预处理1

图像预处理 在计算机视觉和图像处理领域&#xff0c;图像预处理能够提高后续处理&#xff08;如特征提取、目标检测等&#xff09;的准确性和效率。 常见的图像预处理操作&#xff1a; 图像色彩空间转换 图像大小调整 图像仿射变换 图像翻转 图像裁剪 图像二值化处理 图…

scrapy爬取名人名言

爬取名人名言&#xff1a;http://quotes.toscrape.com/ 1 创建爬虫项目&#xff0c;在终端中输入&#xff1a; scrapy startproject quotes2 创建之后&#xff0c;在spiders文件夹下面创建爬虫文件quotes.py&#xff0c;内容如下&#xff1a; import scrapy from scrapy.spi…

一文了解Linux内核I2C子系统,驱动苹果MFI加密芯片

版本 日期 作者 变更表述 1.0 2024/10/27 于忠军 文档创建 背景&#xff1a;由于苹果有一套MFI IAP2的蓝牙私有协议&#xff0c;这个协议是基于BR/EDR的RFCOMM自定义UUID来实现IAP2协议的通信&#xff0c;中间会牵扯到苹果加密芯片的I2C读取&#xff0c;所以我们借此机…

Spring之依赖注入(DI)和控制反转(IoC)——配置文件、纯注解

依赖注入 依赖注入(Dependency Injection&#xff0c;简称 DI)与控制反转(loC)的含义相同&#xff0c;只不过这两 个称呼是从两个角度描述的同一个概念。对于一个 Spring 初学者来说&#xff0c;这两种称呼很难理解, 下面我们将通过简单的语言来描述这两个概念。 当Java对象&…

Ubuntu 22.04安装部署

一、部署环境 表 1‑1 环境服务版本号系统Ubuntu22.04 server lts运行环境1JDK1.8前端WEBNginx1.8数据库postgresqlpostgresql13postgis3.1pgrouting3.1消息队列rabbitmq3.X(3.0以上)运行环境2erlang23.3.3.1 二、安装系统 2.1安装 1.安装方式&#xff0c;选第一条。 2.选择…

基于ResNet50模型的船型识别与分类系统研究

项目源码获取方式见文章末尾&#xff01; 600多个深度学习项目资料&#xff0c;快来加入社群一起学习吧。 《------往期经典推荐------》 项目名称 1.【LSTM模型实现光伏发电功率的预测】 2.【卫星图像道路检测DeepLabV3Plus模型】 3.【GAN模型实现二次元头像生成】 4.【CNN模…

信息学科平台系统开发:基于Spring Boot的最佳实践

3系统分析 3.1可行性分析 通过对本基于保密信息学科平台系统实行的目的初步调查和分析&#xff0c;提出可行性方案并对其一一进行论证。我们在这里主要从技术可行性、经济可行性、操作可行性等方面进行分析。 3.1.1技术可行性 本基于保密信息学科平台系统采用Spring Boot框架&a…

Cityscapes数据集:如何将像素级的多边形标注的分割数据标注转为目标检测的bbox标注

Cityscapes数据集官网下载地址&#xff1a; https://www.cityscapes-dataset.com/ 相关介绍&#xff1a;从官网下载这三个压缩包文件leftImg8bit_trainvaltest.zip、gtCoarse.zip、gtFine_trainvaltest.zip 1&#xff09;leftImg8bit_trainvaltest.zip分为train、val以及tes…

【周末推荐】Windows无缝连接iPhone

关注“ONE生产力”&#xff0c;获取更多精彩推荐&#xff01; 又到了周末推荐时间了&#xff0c;今天我们介绍一个Windows内置的功能&#xff0c;能够帮助大家将自己的电脑和iPhone连接在一起。 很多用Windows的小伙伴羡慕macOS可以和iPhone无缝连接&#xff0c;轻松阅读和回…

JDBC/ODBC—数据库连接API概述

JDBC/ODBC概述 在数据库连接领域&#xff0c;有两种广泛使用的技术&#xff1a;ODBC&#xff08;Open Database Connectivity - 开放数据库连接&#xff09;和 JDBC&#xff08;Java Database Connectivity - Java 数据库连接&#xff09;。 一、什么是 ODBC&#xff1f; Ope…

Vagrant使用教程:创建CentOS 8虚拟机

目录 简介准备工作下载配置Vagrant修改环境变量创建VAGRANT_HOME环境变量修改virturalBox新建虚拟机文件的默认生成路径修改Vagrant配置支持VirtualBox7.1.x版本创建Vagrant文件添加镜像 初始化并开机初始化开发环境开机 其他配置项宿主机的交换目录修改虚拟机内存修改 访问方式…

使用Django Channels实现WebSocket实时通信

&#x1f493; 博客主页&#xff1a;瑕疵的CSDN主页 &#x1f4dd; Gitee主页&#xff1a;瑕疵的gitee主页 ⏩ 文章专栏&#xff1a;《热点资讯》 使用Django Channels实现WebSocket实时通信 Django Channels 简介 环境搭建 安装 Django 和 Channels 创建 Django 项目 配置 A…

【JAVA 笔记】11 ch08_opp_intermediate 第8章 面向对象编程(中级部分)

第8章 面向对象编程(中级部分) IDEA 常用快捷键 包 包的三大作用 包基本语法 包的本质分析 包的命名 常用的包 如何引入包 注意事项和使用细节 访问修饰符 基本介绍 访问修饰符的访问范围! 使用的注意事项 面向对象编程三大特征 基本介绍 封装介绍 封装的理解和好处 封装的实现…

面试题:JVM(四)

new对象流程&#xff1f;&#xff08;龙湖地产&#xff09; 对象创建方法&#xff0c;对象的内存分配。&#xff08;360安全&#xff09; 1. 对象实例化 创建对象的方式有几种&#xff1f; 创建对象的步骤 指针碰撞&#xff1a;以指针为分界线&#xff0c;一边是已连续使用的…

无人机螺旋桨动平衡分析测试台

产品简介 Flight Stand系列动力测试台全部支持螺旋桨动平衡分析测试功能&#xff0c;用户仅需几个简单的操作步骤&#xff0c;轻松实现电机和螺旋桨ISO 21940-11:2016标准级的动平衡精度。 功能说明 测试台一体化集成有三坐标振动传感器和转速传感器&#xff0c;通过测量动力…

qt QTextEdit详解

QTextEdit是Qt框架中的一个文本编辑控件类&#xff0c;它提供了丰富的功能用于编辑和显示纯文本以及富文本。 重要方法 setPlainText(const QString &text)&#xff1a;设置纯文本内容。toPlainText()&#xff1a;获取纯文本内容。setHtml(const QString &text)&#…

杂项——USB键盘与鼠标流量分析——BUUCTF——流量分析

第一次做USB键盘与鼠标流量分析的题目&#xff0c;现在来好好做一个总结 1. 基础知识 USB流量指的是USB设备接口的流量&#xff0c;攻击者能够通过监听usb接口流量获取键盘敲击键、鼠标移动与点击、存储设备的铭文传输通信、USB无线网卡网络传输内容等等。 在正式介绍 USB H…

Windows部署rabbitmq

本次安装环境&#xff1a; 系统&#xff1a;Windows 11 软件建议版本&#xff1a; erlang OPT 26.0.2rabbitmq 3.12.4 一、下载 1.1 下载erlang 官网下载地址&#xff1a; 1.2 下载rabbitmq 官网下载地址&#xff1a; 建议使用解压版&#xff0c;安装版可能会在安装软件…

HTML静态网页成品作业(HTML+CSS)——自行车介绍网页设计制作(1个页面)

&#x1f389;不定期分享源码&#xff0c;关注不丢失哦 文章目录 一、作品介绍二、作品演示三、代码目录四、网站代码HTML部分代码CSS部分代码 五、源码获取 一、作品介绍 &#x1f3f7;️本套采用HTMLCSS&#xff0c;未使用Javacsript代码&#xff0c;共有1个页面。 二、作品…

工厂电气及PLC【1章各种元件符号】

交流接触器的线圈通电后&#xff0c;线圈电流会产生磁场&#xff0c;衔铁在磁吸引力作用下带动触点动作&#xff1a;常开的主触点闭合&#xff0c;接通主电路&#xff1b;同时&#xff0c;常开的辅助触点闭合&#xff0c;常闭的辅助触点断开。当线圈失电或电压显著降低时&#…