文章目录
- 现有问题
- Elastic Stack介绍(一套技术栈)
- 安装ES
- 安装Kibana
- Elasticsearch概念
- 倒排索引
- Mapping
- 分词器
- IK分词器(ES插件)
- 打分机制
- ES的几种调用方式
- restful api调用(http 请求)
- kibana devtools
- 客户端调用
- ES的语法
- DSL
- EQL
- SQL
- Java 操作 ES
- 1)ES 官方的Java API
- 2)ES 以前的官方Java API ,HighLevelRestClient(已废弃,不建议用)
- 3)Spring Data Elasticsearch(推荐)
- 建表(建立索引)
- 增删改查(java使用es步骤,重要!)
- 数据同步
- 方式
- (1)Logstash
- (2)MySQL 同步给 Elasticsearch
- (3) 订阅数据库流水的同步方式 Canal(未实现)
现有问题
搜索不够灵活,比如搜“无敌鸭小黑子”无法搜到“无敌鸭是小黑子”,因为MySQL的like是包含查询,需要分词搜索
Elastic Stack介绍(一套技术栈)
官网:https://www.elastic.co/cn/
各组件介绍:
- beats套件:从各种不同类型的文件、应用中采集数据
- logstash:从多个采集器或数据源来抽取,转换数据,向es输送
- elasticsearch:存储,查询数据
- kibana:可视化es的数据
安装ES
elastic search
介绍:https://www.elastic.co/guide/en/elasticsearch/reference/7.17/setup.html
安装:https://www.elastic.co/guide/en/elasticsearch/reference/7.17/zip-windows.html
注意,一套技术的版本都需要一致,这里都用7.17版本
启动:打开bin/elasticsearch.bat
安装Kibana
介绍:https://www.elastic.co/guide/en/kibana/7.17/introduction.html
安装:https://www.elastic.co/guide/en/kibana/7.17/install.html
启动:bin\kibana.bat,启动后cmd窗口会显示访问地址的
Elasticsearch概念
当成MySQL一样的数据库去学习和理解
倒排索引
正向索引:书籍的目录
倒排索引:根据内容找到文章
文章 A:你好,我是 rapper
文章 B:鱼皮你好,我是 coder
切词:
你好,我是,rapper
鱼皮,你好,我是,coder
构建倒排索引表:
词 | 内容 id |
---|---|
你好 | 文章 A,B |
我是 | 文章 A,B |
rapper | 文章 A |
鱼皮 | 文章 B |
coder | 文章 B |
用户搜:“鱼皮 rapper”
ES 先切词:鱼皮,rapper
然后去倒排索引表找对应的文章
Mapping
文档:https://www.elastic.co/guide/en/elasticsearch/reference/7.17/explicit-mapping.html
可以理解为数据库的表结构,有哪些字段,字段类型
ES支持动态mapping,表结构可以动态改变,而不像MySQL一样没有的字段就不能插入
创建mapping:
GET user/_mappingPUT user
{"mappings": {"properties": {"age": { "type": "integer" }, "email": { "type": "keyword" }, "name": { "type": "text" } }}
}
分词器
指定了分词的规则
内置分词器:https://www.elastic.co/guide/en/elasticsearch/reference/7.17/analysis-analyzers.html
分类:
1)whitespace空格分词器,返回结果 The、quick、brown、fox.
POST _analyze
{"analyzer": "whitespace","text": "The quick brown fox."
}
2)标准分词规则standard,返回结果:is、this、deja、vu
POST _analyze
{"tokenizer": "standard","filter": [ "lowercase", "asciifolding" ],"text": "Is this déja vu?"
}
3)关键词分词器:就是不分词,整句话当关键词,返回结果The quick brown fox.
POST _analyze
{"analyzer": "keyword","text": "The quick brown fox."
}
IK分词器(ES插件)
官网:https://github.com/medcl/elasticsearch-analysis-ik
下载地址:https://github.com/medcl/elasticsearch-analysis-ik/releases/tag/v7.17.7(注意版本一致)
优点:中文友好
ik_smart和ik_max_word的区别?举例:“小黑子”
ik_smart是智能分词,尽量选择最像一个词的拆分方式,比如"小",“黑子”
ik_max_word尽可能地分词,可以包括组合词,比如:“小黑”,“黑子”
使用步骤
- 在 elasticsearch-7.17.9 目录下新建 plugins 目录
- 在 plugins 目录下新建 ik 目录
- 将 elasticsearch-analysis-ik-7.17.7 目录中的所有内容移到 ik 目录下
- 重启es
报错
解决方案:https://github.com/medcl/elasticsearch-analysis-ik/issues/996
下载相近的版本,解压后修改plugin-descriptor.properties文件里面的elasticsearch.version就可以。
下面这两部分改为自己es的版本
安装成功
打分机制
比如有 3 条内容:
1)鱼皮是狗
2)鱼皮是小黑子
3)我是小黑子
用户搜索:
1)鱼皮,第一条分数最高,因为第一条匹配了关键词,而且更短(匹配比例更大)
2)鱼皮小黑子 => 鱼皮、小、黑子,排序结果:2 > 3 > 1
参考文章:https://liyupi.blog.csdn.net/article/details/119176943
官方参考文章:https://www.elastic.co/guide/en/elasticsearch/guide/master/controlling-relevance.html
ES的几种调用方式
restful api调用(http 请求)
GET请求:http://localhost:9200/
curl可以模拟发送请求:curl -X GET “localhost:9200/?pretty”
ES的启动端口
- 9200:给外部用户(给客户端调用)的端口
- 9300:给ES集群内部通信的(外部调用不了的)端口
kibana devtools
自由地对ES进行操作(本质也是 restful api)
devtools不建议生产环境使用
客户端调用
java客户端等
参考文档:https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/7.17/_getting_started.html
ES的语法
DSL
DSL语法:https://www.elastic.co/guide/en/elasticsearch/reference/7.17/query-dsl.html
json格式,好理解,和http请求最兼容,应用最广,推荐!
建表,插入数据
POST post/_doc
{"title": "无敌鸭","desc": "无敌鸭描述"
}
查询全部
GET post/_search
{"query": {"match_all": { }}
}
根据id查询
GET post/_doc/GPaBZ48BMmF6cg2s9YSL
修改
POST post/_doc/GPaBZ48BMmF6cg2s9YSL
{"title": "无敌鸭修改","desc": "无敌鸭修改描述"
}
删除
删除普通索引
(注:es中的索引相当于MySQL中的表)
DELETE index_name
删除数据流式索引
DELETE _data_stream/logs-my_app-default
EQL
专门查询 ECS 文档(标准指标文档)的数据的语法,更加规范,但只适用于特定场景(比如事件流)
文档:https://www.elastic.co/guide/en/elasticsearch/reference/7.17/eql.html
示例:
POST my_event/_doc
{"title": "鱼333333皮","@timestamp": "2099-05-06T16:21:15.000Z","event": {"original": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736"}
}GET my_event/_eql/search
{"query": """any where 1 == 1"""
}
SQL
文档:https://www.elastic.co/guide/en/elasticsearch/reference/7.17/sql-getting-started.html
学习成本低,但是可能需要插件支持、性能较差
示例:
POST /_sql?format=txt
{"query": "SELECT * FROM post where title like '%鱼皮%'"
}
Painless Scripting language
编程取值,更灵活,但是学习成本高
Java 操作 ES
三种方式
1)ES 官方的Java API
https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/7.17/introduction.html
快速开始:https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/7.17/connecting.html
2)ES 以前的官方Java API ,HighLevelRestClient(已废弃,不建议用)
3)Spring Data Elasticsearch(推荐)
spring-data系列:spring 提供的操作数据的框架
spring-data-redis:操作 redis 的一套方法
spring-data-mongodb:操作 mongodb 的一套方法
spring-data-elasticsearch:操作 elasticsearch 的一套方法
官方文档:https://docs.spring.io/spring-data/elasticsearch/docs/4.4.10/reference/html/
用ES实现搜索接口
Spring Data Elasticsearch方式
建表(建立索引)
数据库表结构
content text null comment '内容',tags varchar(1024) null comment '标签列表(json 数组)',thumbNum int default 0 not null comment '点赞数',favourNum int default 0 not null comment '收藏数',userId bigint not null comment '创建用户 id',createTime datetime default CURRENT_TIMESTAMP not null comment '创建时间',updateTime datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '更新时间',isDelete tinyint default 0 not null comment '是否删除',index idx_userId (userId)
) comment '帖子' collate = utf8mb4_unicode_ci;
注意:
- aliases: 别名(为了后续方便数据迁移)
- 字段类型是text,这个字段是可被分词的,可模糊查询的;而如果是keyword,只能完全匹配,精确查询
- analyzer(存储时生效的分词器):用ik_max_word,拆地更碎,索引更多,更有可能被搜出来
- search_analyzer(查询时生效的分词器):用ik_smart,更偏向于用户想搜的分词
- 如果想要让text类型的分词字段也支持精确查询,可以创建keyword类型的子字段
"fields": {"keyword": {"type": "keyword","ignore_above": 256 // 超过字符数则忽略查询}}
建表结构
PUT post_v1
{"aliases": {"post": {}},"mappings": {"properties": {"title": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_smart","fields": {"keyword": {"type": "keyword","ignore_above": 256}}},"content": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_smart","fields": {"keyword": {"type": "keyword","ignore_above": 256}}},"tags": {"type": "keyword"},"userId": {"type": "keyword"},"createTime": {"type": "date"},"updateTime": {"type": "date"},"isDelete": {"type": "keyword"}}}
}
增删改查(java使用es步骤,重要!)
步骤
- 引入jar包
<!-- elasticsearch-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
- 编写实体类
坐标:model/dto/post/PostEsDTO
package com.yupi.springbootinit.model.dto.post;import cn.hutool.core.collection.CollUtil;
import cn.hutool.json.JSONUtil;
import com.yupi.springbootinit.model.entity.Post;
import lombok.Data;import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;import java.io.Serializable;
import java.util.Date;
import java.util.List;/*** 帖子 ES 包装类** * **/
// todo 取消注释开启 ES(须先配置 ES)
//@Document(indexName = "post")
@Data
public class PostEsDTO implements Serializable {private static final String DATE_TIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";/*** id*/@Idprivate Long id;/*** 标题*/private String title;/*** 内容*/private String content;/*** 标签列表*/private List<String> tags;/*** 点赞数*/private Integer thumbNum;/*** 收藏数*/private Integer favourNum;/*** 创建用户 id*/private Long userId;/*** 创建时间*/@Field(index = false, store = true, type = FieldType.Date, format = {}, pattern = DATE_TIME_PATTERN)private Date createTime;/*** 更新时间*/@Field(index = false, store = true, type = FieldType.Date, format = {}, pattern = DATE_TIME_PATTERN)private Date updateTime;/*** 是否删除*/private Integer isDelete;private static final long serialVersionUID = 1L;/*** 对象转包装类** @param post* @return*/public static PostEsDTO objToDto(Post post) {if (post == null) {return null;}PostEsDTO postEsDTO = new PostEsDTO();BeanUtils.copyProperties(post, postEsDTO);String tagsStr = post.getTags();if (StringUtils.isNotBlank(tagsStr)) {postEsDTO.setTags(JSONUtil.toList(tagsStr, String.class));}return postEsDTO;}/*** 包装类转对象** @param postEsDTO* @return*/public static Post dtoToObj(PostEsDTO postEsDTO) {if (postEsDTO == null) {return null;}Post post = new Post();BeanUtils.copyProperties(postEsDTO, post);List<String> tagList = postEsDTO.getTags();if (CollUtil.isNotEmpty(tagList)) {post.setTags(JSONUtil.toJsonStr(tagList));}return post;}
}
- 在application.xml中配置Elasticsearch
spring:elasticsearch:uris: http://localhost:9200username: rootpassword: 123456
- 启动项目(别忘了打开es)
- 使用es
1)方式一 : 继承ElasticsearchRepository<PostEsDTO, Long>这个类,默认提供了简单的增删改查,多用于可预期的,相对没那么复杂的查询,自定义查询,返回结果相对简单直接直接
接口代码:
public interface CrudRepository<T, ID> extends Repository<T, ID> {<S extends T> S save(S entity);<S extends T> Iterable<S> saveAll(Iterable<S> entities);Optional<T> findById(ID id);boolean existsById(ID id);Iterable<T> findAll();Iterable<T> findAllById(Iterable<ID> ids);long count();void deleteById(ID id);void delete(T entity);void deleteAllById(Iterable<? extends ID> ids);void deleteAll(Iterable<? extends T> entities);void deleteAll();
}
测试:
**添加数据 **
@Test
void testAdd() {PostEsDTO postEsDTO = new PostEsDTO();postEsDTO.setId(1L);postEsDTO.setTitle("测试添加数据标题");postEsDTO.setContent("测试添加数据内容");postEsDTO.setTags(Arrays.asList("java", "python"));postEsDTO.setThumbNum(1);postEsDTO.setFavourNum(1);postEsDTO.setUserId(1L);postEsDTO.setCreateTime(new Date());postEsDTO.setUpdateTime(new Date());postEsDTO.setIsDelete(0);postEsDao.save(postEsDTO);System.out.println(postEsDTO.getId());
}
查询数据
方式一:在kibana devtools里查
方式二:在java代码里查
@Testvoid testSelect() {System.out.println(postEsDao.count());Page<PostEsDTO> PostPage = postEsDao.findAll(PageRequest.of(0, 5, Sort.by("createTime")));List<PostEsDTO> postList = PostPage.getContent(); //分页查询System.out.println(postList);Optional<PostEsDTO> byId = postEsDao.findById(1L); //根据id查询System.out.println(byId);}
- 添加自定义方法
坐标:esdao/PostEsDao
支持根据方法名自动生成方法,比如:
List<PostEsDTO> findByTitle(String title);
测试此方法:
@Test
void testFindByTitle() {List<PostEsDTO> postEsDTOS = postEsDao.findByTitle("测试添加数据标题");System.out.println(postEsDTOS);
}
根据他的命名规则写方法名,不用实现,就可以用了~(好神奇唉)
注意:ES中,_开头的字段表示系统默认字段,比如_id,如果系统不指定,会自动生成。但是不会在_source字段中补充id的值,所以建议大家手动指定。
2)方式二:Spring默认给我们提供操作es的客户端对象
ElasticsearchRestTemplate,也提供了增删改查,它的增删改查更灵活,适用于复杂度的操作,返回结果更完整,但需要自己解析,对于复杂的查询,建议使用该方式
三个步骤:
- 取参数
- 把参数组合为ES支持的搜索条件
- 从返回值中取结果
参考文档:
- https://www.elastic.co/guide/en/elasticsearch/reference/7.17/query-filter-context.html
- https://www.elastic.co/guide/en/elasticsearch/reference/7.17/query-dsl-bool-query.html
示例代码:
GET post/_search
{"query": { "bool": { // 组合条件"must": [ // 必须都满足{ "match": { "title": "鱼皮" }}, // match 模糊查询{ "match": { "content": "知识星球" }}],"filter": [ { "term": { "status": "published" }}, // term 精确查询{ "range": { "publish_date": { "gte": "2015-01-01" }}} // range 范围查询]}}
}
wildcard 模糊查询
regexp 正则匹配查询
查询结果中,score 代表匹配分数
建议先测试 DSL、再翻译成 Java
DSL:
{"query": {"bool": {"must_not": [{"match": {"title": ""}},]"should": [{"match": {"title": ""}},{"match": {"desc": ""}}],"filter": [{"term": {"isDelete": 0}},{"term": {"id": 1}},{"term": {"tags": "java"}},{"term": {"tags": "框架"}}],"minimum_should_match": 0}},"from": 0, // 分页"size": 5, // 分页"_source": ["name", "_createTime", "desc", "reviewStatus", "priority", "tags"], // 要查的字段"sort": [ // 排序{"priority": {"order": "desc"}},{"_score": {"order": "desc"}},{"publishTime": {"order": "desc"}}]
}
翻译为Java:
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
// 过滤
boolQueryBuilder.filter(QueryBuilders.termQuery("isDelete", 0));
if (id != null) {boolQueryBuilder.filter(QueryBuilders.termQuery("id", id));
}
if (notId != null) {boolQueryBuilder.mustNot(QueryBuilders.termQuery("id", notId));
}
if (userId != null) {boolQueryBuilder.filter(QueryBuilders.termQuery("userId", userId));
}
// 必须包含所有标签
if (CollectionUtils.isNotEmpty(tagList)) {for (String tag : tagList) {boolQueryBuilder.filter(QueryBuilders.termQuery("tags", tag));}
}
// 包含任何一个标签即可
if (CollectionUtils.isNotEmpty(orTagList)) {BoolQueryBuilder orTagBoolQueryBuilder = QueryBuilders.boolQuery();for (String tag : orTagList) {orTagBoolQueryBuilder.should(QueryBuilders.termQuery("tags", tag));}orTagBoolQueryBuilder.minimumShouldMatch(1);boolQueryBuilder.filter(orTagBoolQueryBuilder);
}
// 按关键词检索
if (StringUtils.isNotBlank(searchText)) {boolQueryBuilder.should(QueryBuilders.matchQuery("title", searchText));boolQueryBuilder.should(QueryBuilders.matchQuery("content", searchText));boolQueryBuilder.minimumShouldMatch(1);
}
// 按标题检索
if (StringUtils.isNotBlank(title)) {boolQueryBuilder.should(QueryBuilders.matchQuery("title", title));boolQueryBuilder.minimumShouldMatch(1);
}
// 按内容检索
if (StringUtils.isNotBlank(content)) {boolQueryBuilder.should(QueryBuilders.matchQuery("content", content));boolQueryBuilder.minimumShouldMatch(1);
}
// 排序
SortBuilder<?> sortBuilder = SortBuilders.scoreSort();
if (StringUtils.isNotBlank(sortField)) {sortBuilder = SortBuilders.fieldSort(sortField);sortBuilder.order(CommonConstant.SORT_ORDER_ASC.equals(sortOrder) ? SortOrder.ASC : SortOrder.DESC);
}
// 分页
PageRequest pageRequest = PageRequest.of((int) current, (int) pageSize);
// 构造查询
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(boolQueryBuilder)
.withPageable(pageRequest).withSorts(sortBuilder).build();
SearchHits<PostEsDTO> searchHits = elasticsearchRestTemplate.search(searchQuery, PostEsDTO.class);
动静分离设计:先模糊筛选静态数据,查出数据后,再根据查到的内容 id 去数据库查找到 动态数据。
修改doSearch查询
坐标:com/yupi/springbootinit/datasource/PostDataSource
@Overridepublic Page<PostVO> doSearch(String searchText, long pageNum, long pageSize) {PostQueryRequest postQueryRequest = new PostQueryRequest();postQueryRequest.setSearchText(searchText);postQueryRequest.setCurrent(pageNum);postQueryRequest.setPageSize(pageSize);HttpServletRequest request = ((ServletRequestAttributes)RequestContextHolder.getRequestAttributes()).getRequest();Page<Post> postPage = postService.searchFromEs(postQueryRequest);return postService.getPostVOPage(postPage,request);}
//searchFromEs方法就是上面翻译为java的代码方法
数据同步
需要将MySQL中的数据和ES中的数据进行同步
方式
全量同步一种方式,增量同步三种方式
- 全量同步
首次将MySQL的数据一次性导入到ES中
- 增量同步-定时任务
比如一分钟一次,找到MySQL过去几分钟有更新的数据,写入到ES
- 增量同步-双写
写数据库的时候,必须也写ES,更新删除同理(需要保证原子性)
- 增量同步–Logstash数据同步管道
( 一般要配合 kafka 消息队列 + beats 采集器 )
(1)Logstash
- Logstash介绍
“传输和处理信息的管道”
- 下载安装包
官方文档:https://www.elastic.co/guide/en/logstash/7.17/installing-logstash.html
下载地址:https://artifacts.elastic.co/downloads/logstash/logstash-7.17.9-windows-x86_64.zip
- 启动
cd logstash-7.17.9
.\bin\logstash.bat -e "input { stdin { } } output { stdout {} }"
遇到bug:系统找不到指定的路径。 could not find java; set JAVA_HOME or ensure java is in PATH
解决:将本地的JAVA_HOME环境变量名字改为它所推荐的LS_JAVA_HOME
- 快速开始:https://www.elastic.co/guide/en/logstash/7.17/running-logstash-windows.html
监听 udp 并输出:
input {udp {port => 514type => "syslog"}
}output {elasticsearch { hosts => ["localhost:9200"] }stdout { codec => rubydebug }
}
在mytask.conf文件中写入
input {udp {port => 514type => "syslog"}
}output {stdout { codec => rubydebug }
}
以该配置文件启动 Logstash,注意要进到bin目录下
logstash.bat -f ..\config\mytask.conf
证明已经监听udp了
(2)MySQL 同步给 Elasticsearch
将mysql的jar包放到配置文件中(jar包可以从idea的maven本地仓库中找)
修改mytask.conf文件:
input {jdbc {jdbc_driver_library => "D:\software\ElasticStack\logstash-7.17.9\config\mysql-connector-java-8.0.29.jar"jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://localhost:3306/my_db"jdbc_user => "root"jdbc_password => "dyz200472"statement => "SELECT * from post where updateTime > :sql_last_value and updateTime < now() order by updateTime desc"tracking_column => "updatetime"tracking_column_type => "timestamp"use_column_value => trueparameters => { "favorite_artist" => "Beethoven" }schedule => "*/5 * * * * *"jdbc_default_timezone => "Asia/Shanghai"}
}filter {mutate {rename => {"updatetime" => "updateTime""userid" => "userId""createtime" => "createTime""isdelete" => "isDelete"}remove_field => ["thumbnum", "favournum"]}
}output {stdout { codec => rubydebug }elasticsearch {hosts => "http://localhost:9200"index => "post_v1"document_id => "%{id}"}
}
注意:sql_last_value 是取上次查到的数据的最后一行的指定的字段,如果要全量更新,只要删除掉 E:\software\ElasticStack\logstash-7.17.9\data\plugins\inputs\jdbc\logstash_jdbc_last_run 文件即可(这个文件存储了上次同步到的数据),这里我删除了但是es上仍然没有更新数据,重启logstash好了
修改为es查询
(3) 订阅数据库流水的同步方式 Canal(未实现)
https://github.com/alibaba/canal/
数据库每次修改时,会修改binlog文件,只要监听该文件的修改,就能第一时间得到数据并修改
canal:帮你监听binlog,并且可以解析binlog为你理解的内容,它伪装成了mysql的从节点,获取主节点给的binlog
快速开始:https://github.com/alibaba/canal/wiki/QuickStart
windows 系统,找到你本地的 mysql 安装目录,在根目录下新建 my.ini 文件:
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义, 不要和 canal 的 slaveId 重复
如果 java 找不到,修改 startup.bat 脚本为你自己的 java home:
set JAVA_HOME=C:\Users\59278\.jdks\corretto-1.8.0_302
echo %JAVA_HOME%
set PATH=%JAVA_HOME%\bin;%PATH%
echo %PATH%
问题:mysql 无法链接,Caused by: java.io.IOException: caching_sha2_password Auth failed
解决方案: https://github.com/alibaba/canal/issues/3902
ALTER USER ‘canal’@‘%’ IDENTIFIED WITH mysql_native_password BY ‘canal’; ALTER USER ‘canal’@‘%’ IDENTIFIED BY ‘canal’ PASSWORD EXPIRE NEVER; FLUSH PRIVILEGES;