使用 go-elasticsearch 请求示例
你可以通过参考Go 官方文档找到简单的示例,所以我认为先看看这个是个好主意。
连接客户端有两种方式,如下图。
至于两者的特点,TypedClient有类型,更容易编写,但文档较少。另外,批量索引不支持TypedClient。由于普通客户端都是基于json的,所以我觉得从文档转换到代码还是比较容易的。
我的建议基本上是使用 TypedClient!
虽然文档很稀疏,但它是基于规律性的类型化的,所以你越习惯它(特别是当涉及到有很多变化的东西时,比如查询),从 json 转换为类型化代码就越容易。
func main() {// clientes, err := elasticsearch.NewClient(elasticsearch.Config{Addresses: []string{"http://localhost:9200"},})if err != nil {log.Fatalf("Error creating the client: %s", err)}// typed clientes, err := elasticsearch.NewTypedClient(elasticsearch.Config{Addresses: []string{"http://localhost:9200"},})if err != nil {log.Fatalf("Error creating the client: %s", err)}
}
Index Create/Delete
关于索引创建,Go官方文档中有一个示例,所以我简单介绍一下。
func main() {es, err := elasticsearch.NewTypedClient(elasticsearch.Config{Addresses: []string{"http://localhost:9200"},})if err != nil {log.Fatalf("Error creating the client: %s", err)}ignoreAbove := 256keywordProperty := types.NewKeywordProperty()keywordProperty.IgnoreAbove = &ignoreAbovedateProperty := types.NewDateProperty()format := "yyyy/MM/dd||yyyy/MM||MM/dd||yyyy||MM||dd"dateProperty.Format = &format// index作成_, err = es.Indices.Create("sample_index").Request(&create.Request{Settings: &types.IndexSettings{IndexSettings: map[string]json.RawMessage{// 設定項目// bulk index里面的数据更新感觉。如果不需要频繁更新,设置得更长会提高性能。"refresh_interval": json.RawMessage(`"30s"`),// 可取得的最大件数的上限"max_result_window": json.RawMessage(`"1000000"`),},},Mappings: &types.TypeMapping{Properties: map[string]types.Property{// 映射的定义"name": keywordProperty,"age": types.NewIntegerNumberProperty(),"is_checked": types.NewBooleanProperty(),"created_at": dateProperty,},},}).Do(context.TODO())if err != nil {log.Fatalf("Error creating the client: %s", err)}// index削除_, err = es.Indices.Delete("sample_index").Do(context.TODO())if err != nil {log.Fatalf("Error deleting the client: %s", err)}
}
Bulk Index
批量索引代码基于go-elasticsearch 示例。
var jsonitier = jsoniter.ConfigCompatibleWithStandardLibrarytype SampleIndexData struct {ID int64 `json:"id"`Name string `json:"name"`Age int `json:"age"`IsChecked bool `json:"is_checked"`CreatedAt string `json:"created_at"`
}func main() {es, err := elasticsearch.NewClient(elasticsearch.Config{Addresses: []string{"http://localhost:9200"},})if err != nil {log.Fatalf("Error creating the client: %s", err)}esRef, err := elasticsearch.NewTypedClient(elasticsearch.Config{Addresses: []string{"http://localhost:9200"},})if err != nil {log.Fatalf("Error creating the client: %s", err)}datas := []*SampleIndexData{}for i := 1; i <= 100; i++ {datas = append(datas, &SampleIndexData{ID: int64(i),Name: fmt.Sprintf("name_%d", i),Age: 20,IsChecked: true,CreatedAt: time.Date(2021, 1, 15, 17, 28, 55, 0, jst).Format("2006/01/02"),})}bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{Index: "sample_index", // The default index nameClient: es, // The Elasticsearch clientNumWorkers: 1, // The number of worker goroutines})if err != nil {log.Fatalf("Error creating the indexer: %s", err)}for _, a := range datas {data, err := jsonitier.Marshal(a)if err != nil {log.Fatalf("Cannot encode article %d: %s", a.ID, err)}err = bi.Add(context.Background(),esutil.BulkIndexerItem{// Delete时,action为“delete”,body为nil。Action: "index",DocumentID: strconv.Itoa(int(a.ID)),Body: bytes.NewReader(data),OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) {fmt.Println("success")},OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) {fmt.Println("failure")},},)if err != nil {log.Fatalf("Unexpected error: %s", err)}}if err := bi.Close(context.Background()); err != nil {log.Fatalf("Unexpected error: %s", err)}// 取决于refresh_interval的值,但是如果感觉很长,在所有的index结束后刷新,数据会立即反映出来,所以很好_, err = esRef.Indices.Refresh().Index("sample_index").Do(context.Background())if err != nil {log.Fatalf("Error getting response: %s", err)}
}
Query
基本查询如下所示:
Go 的官方文档仅包含搜索 API 的简单示例。您基本上必须自己组装上述详细信息。就我而言,我正在检查QueryDSL页面上的查询并在包中复制我需要的内容。
var jst = time.FixedZone("Asia/Tokyo", 9*60*60)
var formatTime = "2006-01-02T15:04:05.999999-07:00"func main() {es, err := elasticsearch.NewTypedClient(elasticsearch.Config{Addresses: []string{"http://localhost:9200"},})if err != nil {log.Fatalf("Error creating the client: %s", err)}ageLte := 40.0ageLteC := (*types.Float64)(&ageLte)ageGte := 13.0ageGteC := (*types.Float64)(&ageGte)pageStart := 0size := 50req := &search.Request{// 查询Query: &types.Query{Bool: &types.BoolQuery{// 过滤器(过滤器)Filter: []types.Query{// 范围过滤器(过滤器){Range: map[string]types.RangeQuery{"age": types.NumberRangeQuery{Gte: ageGteC,Lte: ageLteC,},},},// 术语过滤器(过滤器){Term: map[string]types.TermQuery{"is_checked": {Value: true},},},},},},// 页面的起点From: &pageStart,// 返回的数量Size: &size,// 排序指定Sort: []types.SortCombinations{types.SortOptions{SortOptions: map[string]types.FieldSort{"created_at": {Order: &sortorder.Desc},}},},}res, err := es.Search().Index("sample_index").Request(req).Do(context.TODO())if err != nil {log.Fatalf("Error query: %s", err)}// totalfmt.Println(res.Hits.Total)ds := []*SampleIndexData{}for _, hit := range res.Hits.Hits {var d *SampleIndexDataif err := json.Unmarshal(hit.Source_, &d); err != nil {log.Fatalf("Error decoding: %s", err)}ds = append(ds, d)}// 拿出数据.fmt.Println(ds)
}
此外,您还可以使用 kibana 中的 devtools 轻松检查详细错误,以查看查询是否正确。为了调整查询以使其正确,最好使用一个围绕此问题的工具(在代码中,它也包含在 err 中,所以也在那里检查它。我可以)。
综上所述
我已经简要描述了基本的索引创建/删除、使用 Bulk API 的批量处理以及如何在 go-elasticsearch 中使用 SearchAPI 编写搜索查询。
就我个人而言,我发现很容易迷失在 Elasticsearch 的文档中,当我尝试做一些详细的事情时,我最终会输入大量代码并进行反复试验,因此需要花费大量时间来理解整体概念并编写代码。