flink自定义process,使用状态求历史总和(scala)

es idea maven 依赖

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>1.11.1</version>
</dependency>


import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch7.{ElasticsearchSink, RestClientFactory}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.flink.util.Collector
import org.apache.http.HttpHost
import org.apache.http.client.config.RequestConfig
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.elasticsearch.action.DocWriteRequest
import org.elasticsearch.client.{Requests, RestClientBuilder}import java.time.Duration
import java.util.Propertiesobject Test {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//需要状态开启下面的配置//env.setStateBackend(new RocksDBStateBackend(s"hdfs://${namenodeID}", true))//hdfs 作为状态后端//env.enableCheckpointing(10 * 60 * 1000L)//env.getCheckpointConfig.setCheckpointTimeout(10 * 60 * 1000L)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件时间val props = new Propertiesprops.setProperty("bootstrap.servers", "host:6667") //有些是9092端口props.setProperty("group.id", "groupId")props.setProperty("retries", "10")props.setProperty("retries.backoff.ms", "100")props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000")//是否配置了权限,有的话加上下面的配置// props.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='' password='';")//props.setProperty("security.protocol", "SASL_PLAINTEXT");// props.setProperty("sasl.mechanism", "PLAIN")val httpHosts = new java.util.ArrayList[HttpHost]httpHosts.add(new HttpHost("esIPOne", 9200, "http"))httpHosts.add(new HttpHost("esIPTwo", 9200, "http"))httpHosts.add(new HttpHost("esIPThree", 9200, "http"))val esSinkBuilder = new ElasticsearchSink.Builder[ResultBean](httpHosts, new ElasticsearchSinkFunction[ResultBean] {def process(element: ResultBean, ctx: RuntimeContext, indexer: RequestIndexer) {val json = new java.util.HashMap[String, Any]json.put("@timestamp", element.ts)json.put("data", element.data)json.put("sum", element.sum)val rqst = Requests.indexRequest().index("indexName").id(element.id).source(json).opType(DocWriteRequest.OpType.INDEX)indexer.add(rqst)}})setESConf(esSinkBuilder, 5000)val myConsumer = new FlinkKafkaConsumer[DemoBean]("topicName", new DemoKafka(), props).setStartFromEarliest() //从什么时间开始读val source = env.addSource(myConsumer).uid("source-data").name("数据源").assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[DemoBean](Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner[DemoBean] {override def extractTimestamp(element: DemoBean, recordTimestamp: Long): Long = element.ts}).withIdleness(Duration.ofSeconds(5))).uid("water-marks").name("注册水位线")source.keyBy(k => k.id).process(new DemoProcess()).uid("demo-process").name("process 示例").addSink(esSinkBuilder.build()).uid("es-sink").name("数据写入es")env.execute("任务名")}private class DemoKafka() extends KafkaDeserializationSchema[DemoBean] {override def isEndOfStream(t: DemoBean): Boolean = falseoverride def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): DemoBean = {val value = new String(consumerRecord.value())val list = value.split("\t")DemoBean(list(0), list(1), list(2).toInt, list(3).toLong)}override def getProducedType: TypeInformation[DemoBean] = TypeExtractor.getForClass(classOf[DemoBean])}private class DemoProcess extends KeyedProcessFunction[String, DemoBean, ResultBean] {private var hisSumState: ValueState[Int] = _override def open(parameters: Configuration): Unit = {hisSumState = getRuntimeContext.getState(new ValueStateDescriptor("his-sum", classOf[Int]))}override def processElement(data: DemoBean, ctx: KeyedProcessFunction[String, DemoBean, ResultBean]#Context, out: Collector[ResultBean]): Unit = {val his = if (hisSumState.value() == null) 0 else hisSumState.value()val now = data.valuehisSumState.update(now)out.collect(ResultBean(data.id, data.data, his + now, data.value))}}def setESConf[T](esSinkBuilder: ElasticsearchSink.Builder[T], numMaxActions: Int) {esSinkBuilder.setBulkFlushMaxActions(numMaxActions)esSinkBuilder.setBulkFlushMaxSizeMb(10)esSinkBuilder.setBulkFlushInterval(10000)esSinkBuilder.setBulkFlushBackoff(true)esSinkBuilder.setBulkFlushBackoffDelay(2)esSinkBuilder.setBulkFlushBackoffRetries(3)esSinkBuilder.setRestClientFactory(new RestClientFactory {override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {override def customizeRequestConfig(requestConfigBuilder: RequestConfig.Builder): RequestConfig.Builder = {requestConfigBuilder.setConnectTimeout(12000)requestConfigBuilder.setSocketTimeout(90000)}})}})}private case class DemoBean(id: String, data: String, value: Int, ts: Long)private case class ResultBean(id: String, data: String, sum: Int, ts: Long)
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/427970.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C++初阶学习——探索STL奥秘——反向迭代器

适配器模式是 STL 中的重要组成部分&#xff0c;除了容器适配器外&#xff0c;还有 选代器适配器&#xff0c;借助 选代器适配器 &#xff0c;可以轻松将各种容器中的普通迭代器转变为反向迭代器&#xff0c;这正是适配器的核心思想 注:库中的反向迭代器在设计时&#xff0c;为…

HashMap线程不安全|Hashtable|ConcurrentHashMap

文章目录 常见集合线程安全性HashMap为什么线程不安全&#xff1f;怎么保证HashMap线程安全 HashtableConcurrentHashMap 引入细粒度锁代码中分析总结 小结 常见集合线程安全性 ArrayList、LinkedList、TreeSet、HashSet、HashMap、TreeMap等都是线程不安全的。 HashTable是线…

【Python报错已解决】To update, run: python.exe -m pip install --upgrade pip

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 专栏介绍 在软件开发和日常使用中&#xff0c;BUG是不可避免的。本专栏致力于为广大开发者和技术爱好者提供一个关于BUG解决的经…

【C++篇】~类和对象(中)

类和对象&#xff08;中&#xff09; 1.类的默认成员函数​ 默认成员函数就是用户没有显式实现&#xff0c;编译器会自动生成的成员函数称为默认成员函数。一个类&#xff0c;我们不写的情况下编译器会默认生成以下6个默认成员函数&#xff0c;需要注意的是这6个中最重要的是前…

【LeetCode每日一题】——401.二进制手表

文章目录 一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【题目提示】七【解题思路】八【时间频度】九【代码实现】十【提交结果】 一【题目类别】 回溯 二【题目难度】 简单 三【题目编号】 401.二进制手表 四【题目描述】 二进制手表顶部…

arcgisPro地理配准

1、添加图像 2、在【影像】选项卡中&#xff0c;点击【地理配准】 3、 点击添加控制点 4、选择影像左上角格点&#xff0c;然后右击填入目标点的投影坐标 5、依次输入四个格角点的坐标 6、点击【变换】按钮&#xff0c;选择【一阶多项式&#xff08;仿射&#xff09;】变换 7…

1.Seata 1.5.2 seata-server搭建

一&#xff1a;Seata基本介绍 Seata是一款开源的分布式事务解决方案&#xff0c;致力于在微服务架构下提供高性能和简单易用的分布式事务服务。 详见官网链接&#xff1a;https://seata.apache.org/zh-cn/ 1.历史项目里的使用经验&#xff1a; 之前公司里的oem用户对应的App…

多重指针变量(n重指针变量)实例分析

0 前言 指针之于C语言&#xff0c;就像子弹于枪械。没了子弹的枪械虽然可以用来肉搏&#xff0c;却失去了迅速解决、优雅解决战斗的能力。但上了膛的枪械也非常危险&#xff0c;时刻要注意是否上了保险&#xff0c;使用C语言的指针也是如此&#xff0c;要万分小心&#xff0c;…

【VUE3.0】动手做一套像素风的前端UI组件库---先导篇

系列文章目录 【VUE3.0】动手做一套像素风的前端UI组件库—Button 目录 系列文章目录引言准备素材字体鼠标手势图 创建vue3项目构建项目1. 根据命令行提示选择如下&#xff1a;2. 进入项目根目录下载依赖并启动。3. 设置项目src路径别名&#xff0c;方便后期应用路径。4. 将素…

solana项目counter,测试过程中执行报错记录分享

跟随HackQuest部署counter项目&#xff0c;使用 Solana 官方提供的 playgroud 。这个平台让我们的部署和测试过程变得更加简便高效。 合约代码 lib.rs中复制以下代码 use anchor_lang::prelude::*; use std::ops::DerefMut;declare_id!("CVQCRMyzWNr8MbNhzjbfPu9YVvr97…

Amoco:一款针对二进制源码的安全分析工具

关于Amoco Amoco是一款功能强大的二进制源码静态分析工具&#xff0c;该工具基于Python 3.8开发&#xff0c;可以帮助广大研究人员轻松对二进制程序执行静态符号分析。 工具特性 1、一个通用的指令解码框架&#xff0c;旨在减少实现对新架构的支持所需的时间。例如&#xff0c…

通过springcloud gateway优雅的进行springcloud oauth2认证和权限控制

代码地址 如果对你有帮助请给个start&#xff0c;本项目会持续更新&#xff0c;目标是做一个可用的快速微服务开发平台&#xff0c;成为接私活&#xff0c;毕设的开发神器&#xff0c; 欢迎大神们多提意见和建议 使用的都是spring官方最新的版本&#xff0c;版本如下&#xff1…

F12抓包11:UI自动化 - Recoder(记录器)

课程大纲 使用场景&#xff08;导入和导出&#xff09;: ① 测试的重复性工作&#xff0c;本浏览器录制并进行replay&#xff1b; ② 导入/导出录制脚本&#xff0c;移植后replay&#xff1b; ③ 导出给开发进行replay复现bug&#xff1b; ④ 进行前端性能分析。 1、录制脚…

Virtuoso服务在centos中自动停止的原因分析及解决方案

目录 前言1. 问题背景2. 原因分析2.1 终端关闭导致信号12.2 nohup命令的局限性 3. 解决方案3.1 使用 screen 命令保持会话3.2 使用 tmux 作为替代方案3.3 使用系统服务&#xff08;systemd&#xff09; 4. 其他注意事项4.1 网络配置4.2 日志监控 结语 前言 在使用Virtuoso作为…

mybatisplus映射与数据库表格不一致问题

1.字段映射与属性名不一致 TableField(value"数据库字段名") 2.entity添加了数据库表格不存在的属性 TableField(existfalse) 3.entity对象查询时&#xff0c;有些字段不想要显示在查询结果上 TableField(selectfalse) 4.表名不一致 TableName("数据库表名&…

爬虫--翻页tips

免责声明&#xff1a;本文仅做分享&#xff01; 伪线程 from DrissionPage import ChromiumPage import timepage ChromiumPage() page.get("https://you.ctrip.com/sight/taian746.html") # 初始化 第0页 index_page 0# 翻页点击函数 sleep def page_turn():page…

使用API有效率地管理Dynadot域名,为域名进行隐私保护设置

前言 Dynadot是通过ICANN认证的域名注册商&#xff0c;自2002年成立以来&#xff0c;服务于全球108个国家和地区的客户&#xff0c;为数以万计的客户提供简洁&#xff0c;优惠&#xff0c;安全的域名注册以及管理服务。 Dynadot平台操作教程索引&#xff08;包括域名邮箱&…

八股文-多线程、并发

八股文-多线程、并发 最近学到了一种方法&#xff0c;可以用于简历项目经验编写以及面试题目的回答 STAR法则&#xff1a;在什么背景下&#xff0c;你需要解决什么问题&#xff0c;你做了啥&#xff0c;得到了什么结果 情境&#xff08;Situation&#xff09;&#xff1a; 描…

电子元件制造5G智能工厂物联数字孪生平台,推进制造业数字化转型

5G智能工厂与物联数字孪生平台的融合应用&#xff0c;不仅为电容器制造业注入了新的活力&#xff0c;更为整个制造业的数字化转型树立了新的标杆。电子元件制造过程中&#xff0c;数字孪生平台通过实时监测生产线的各个环节&#xff0c;实现了生产流程的可视化监控。管理人员可…

苹果M4 MacBook Air被曝2025Q1发布 屏幕面板10月出货

9 月 20 日最新消息屏幕供应链咨询公司 DSCC 首席执行官罗斯・杨&#xff08;Ross Young&#xff09;昨日&#xff08;9 月 19 日&#xff09;在 X 平台面向其订阅用户发布推文&#xff0c;透露苹果 M4 MacBook Air 与低成本 iPad 的屏幕预估将于今年 10 月开始出货。 苹果正在…