Elasticsearch 集成--Flink 框架集成

一、Flink 框架介绍

       Apache Spark 是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。
Apache Spark 掀开了内存计算的先河,以内存作为赌注,赢得了内存计算的飞速发展。
但是在其火热的同时,开发人员发现,在 Spark 中,计算框架普遍存在的缺点和不足依然没
有完全解决,而这些问题随着 5G 时代的来临以及决策者对实时数据分析结果的迫切需要而
凸显的更加明显:
  •  数据精准一次性处理(Exactly-Once
  • 乱序数据,迟到数据
  •  低延迟,高吞吐,准确性
  •  容错性
        Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。在
Spark 火热的同时,也默默地发展自己,并尝试着解决其他计算框架的问题。
慢慢地,随着这些问题的解决, Flink 慢慢被绝大数程序员所熟知并进行大力推广,阿里公
司在 2015 年改进 Flink ,并创建了内部分支 Blink ,目前服务于阿里集团内部搜索、推荐、
广告和蚂蚁等大量核心实时业务。

二、框架集成

2.1创建 Maven 项目

依赖

<?xml version="1.0" encoding="UTF-8"?>
<projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.lun.es</groupId><artifactId>flink-elasticsearch</artifactId><version>1.0</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7_2.11</artifactId><version>1.12.0</version></dependency><!-- jackson --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.11.1</version></dependency></dependencies>
</project>

功能实现

package com.xmx.es;import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;public class FlinkElasticsearchSinkTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.socketTextStream("localhost", 9999);List<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));//httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));// use a ElasticsearchSink.Builder to create an ElasticsearchSinkElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,new ElasticsearchSinkFunction<String>() {public IndexRequest createIndexRequest(String element) {Map<String, String> json = new HashMap<>();json.put("data", element);return Requests.indexRequest().index("my-index")//.type("my-type").source(json);}@Overridepublic void process(String element, RuntimeContext ctx, RequestIndexer indexer) {indexer.add(createIndexRequest(element));}});// configuration for the bulk requests; this instructs the sink to emit after every element, otherwise they would be bufferedesSinkBuilder.setBulkFlushMaxActions(1);// provide a RestClientFactory for custom configuration on the internally createdREST client// esSinkBuilder.setRestClientFactory(// restClientBuilder -> {// restClientBuilder.setDefaultHeaders(...)// restClientBuilder.setMaxRetryTimeoutMillis(...)// restClientBuilder.setPathPrefix(...)// restClientBuilder.setHttpClientConfigCallback(...)// }// );source.addSink(esSinkBuilder.build());env.execute("flink-es");}
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/116943.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Linux】DNS系统,ICMP协议,NAPT技术

遏制自己内心的知识优越感&#xff0c;才能让你发自内心的去尊重他人&#xff0c;避免狂妄自大&#xff0c;才能让你不断的丰富自己的内心。 文章目录 一、DNS系统1.DNS服务器返回域名对应的ip2.使用dig工具分析DNS过程3.浏览器中输入url后发生的事情&#xff1f; 二、ICMP协议…

重磅OpenAI发布ChatGPT企业版本

8月29日凌晨&#xff0c;Open AI官网发布ChatGPT企业版本&#xff01; 企业版简介&#xff1a; ChatGPT企业版提供企业级安全和隐私、无限的高速 GPT-4 访问、用于处理更长输入的更长上下文窗口、高级数据分析功能、自定义选项等等。人工智能可以协助和提升我们工作生活的各个…

postgresql-条件表达式

postgresql-条件表达式 简单Case表达式搜索Case表达式缩写函数总结 简单Case表达式 select e.first_name , e.last_name , e.department_id , case e.department_id when 90 then 管理when 60 then 开发else 其他end as "部门" from cps.public.employees e ;-- 统…

第一课:使用C++实现图片去水印

1.功能概述 实现图片去水印的方法有很多,下面提供一种基于OpenCV库的C++实现方法。主要思路是利用图像中不同水印区域之间的差异,进行区域提取、重构和合成,从而实现去除水印的效果。 2.具体实现 2.1.导入OpenCV库和头文件 #include <iostream> #include <o…

如何使用C++11原子操作实现自旋锁

什么是自旋锁&#xff1f; C自旋锁是一种低层次的同步原语&#xff0c;用于保护共享资源的访问。自旋锁是一种轻量级的锁&#xff0c;适用于短时间的资源锁定。 自旋锁的特点&#xff1a;当一个线程尝试获取已经被另一个线程占有的自旋锁时&#xff0c;这个线程会进入一个循环…

ElasticSearch基础知识汇总

文章目录 前言一、认识ElasticSearch1.正向索引和倒排索引2. MySql与ElasticSearc3.IK分词器 二、ES索引库操作1.mapping映射属性2.索引库的CRUD 三、ES文档库操作 前言 Elasticsearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎&#xff0c;基…

2023年单企业云盘最新产品榜单发布

企业云盘产品为企业提供了高效、安全、可靠的文件存储和共享服务&#xff0c;因此受到了众多企业用户的喜爱。本文参考各个产品测评网站&#xff0c;总结了几款备受好评的企业云盘产品&#xff0c;供您参考。 1, Zoho Workdrive 2, Google Drive 3, OneDrive 4, Dropbox 5…

SpringBoot 整合 RabbitMQ

1. 创建 SpringBoot 工程 把版本改为 2.7.14 引入这两个依赖: <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springfr…

【目标检测】理论篇(3)YOLOv5实现

Yolov5网络构架实现 import torch import torch.nn as nnclass SiLU(nn.Module):staticmethoddef forward(x):return x * torch.sigmoid(x)def autopad(k, pNone):if p is None:p k // 2 if isinstance(k, int) else [x // 2 for x in k] return pclass Focus(nn.Module):def …

Qt无边框青绿色主题

收费产品&#xff0c;学生党、闹眼子党勿扰 收费金额&#xff1a;500元 1 概述 最近因项目需要&#xff0c;写了一个炫酷的青绿色、无边框界面&#xff0c;和3DSMax的界面有点类似。 2 截图 首先看看3DSMax的界面 不知道大家看出来没&#xff0c;这个ui其实很简单&#xff…

Python 阿里云盾滑块验证

&#xfeff;<table><tr><td bgcolororange>本文仅供学习交流使用&#xff0c;如侵立删&#xff01;</td></tr></table> 记一次阿里云盾滑块验证分析并通过 操作环境 win10 、 macPython3.9selenium、pyautogui 分析 最近在做中国庭审…

React 生命周期新旧对比

前言 React16.4版本之后使用了新的生命周期&#xff0c;它使用了一些新的生命周期钩子&#xff08;getDerivedStateFromProps、getSnapshotBeforeUpdate&#xff09;&#xff0c;并且即将废弃老版的3个生命周期钩子&#xff08;componentWillMount、componentWillReceiveProps…

前端进阶之——模块化

在做项目的时候越来越发现模块化的重要性&#xff0c;做好模块化开发不仅给后期的维护带来不少好处而且大大提升项目开发效率&#xff0c;接下来整理一下模块化相关知识吧。 模块化开发的优点 封装方法、提高代码的复用性、可维护性和可读性隔离作用域&#xff0c;避免污染全…

【LeetCode】《LeetCode 101》第十二章:字符串

文章目录 12.1 字符串比较242 . 有效的字母异位词&#xff08;简单&#xff09;205. 同构字符串&#xff08;简单&#xff09;647. 回文子串&#xff08;中等&#xff09;696 . 计数二进制子串&#xff08;简单&#xff09; 12.2 字符串理解224. 基本计算器&#xff08;困难&am…

常用Web漏洞扫描工具汇总(持续更新中)

常用Web漏洞扫描工具汇总 常用Web漏洞扫描工具汇总1、AWVS&#xff0c;2、OWASP Zed&#xff08;ZAP&#xff09;&#xff0c;3、Nikto&#xff0c;4、BurpSuite&#xff0c;5、Nessus&#xff0c;6、nmap7、X-ray还有很多不是非常知名&#xff0c;但可能也很大牌、也较常见的。…

前端基础4——jQuery

文章目录 一、基本了解1.1 导入jQuery库1.2 基本语法1.3 选择器 二、操作HTML2.1 隐藏和显示元素2.2 获取与设置内容2.3 获取、设置和删除属性2.4 添加元素2.5 删除元素2.6 设置CSS样式 三、jQuery Ajax3.1 基本语法3.2 回调函数3.3 常用HTTP方法3.4 案例一3.4.1 准备工作3.4.2…

react学习之路:InputNumber的parser在ts里面报类型错误

错误提示&#xff1a; Type ‘(value: string | undefined) > string’ is not assignable to type ‘(displayValue: string | undefined) > 0 | 2 | 20’. Type ‘string’ is not assignable to type ‘0 | 2 | 20’. 代码示例&#xff1a; <InputNumbermin{0}m…

2511. 最多可以摧毁的敌人城堡数目

文章目录 Tag题目来源题目解读解题思路复杂度分析写在最后 Tag 【数组】 题目来源 2511. 最多可以摧毁的敌人城堡数目 题目解读 在数组 forts 中&#xff0c;forts[i] 有三种数值&#xff1a; -1&#xff1a; 表示第 i 个位置没有城堡&#xff0c;是空地&#xff1b;0&…

从零开始,探索C语言中的字符串

字符串 1. 前言2. 预备知识2.1 字符2.2 字符数组 3. 什么是字符串4. \04.1 \0是什么4.2 \0的作用4.2.1 打印字符串4.2.2 求字符串长度 1. 前言 大家好&#xff0c;我是努力学习游泳的鱼。你已经学会了如何使用变量和常量&#xff0c;也知道了字符的概念。但是你可能还不了解由…

layui框架学习(40:数据表格_主要事件)

Layui数据表格模块主要通过各类事件响应工具栏操作、单元格编辑或点击等交互操作&#xff0c;本文学习table数据表格模块中的主要事件及处理方式。   头部工具栏事件。通过代码“table.on(‘toolbar(test)’, function(obj))”获取lay-filter属性为test的数据表格的头部工具栏…