Flink中自定义Source和Sink的使用

只要自定一个Source类实现SourceFunction接口,一个Sink类实现SinkFunction接口,就能正常使用自定义的Source和Sink,或者直接extends继承RichSourceFunction和RichSinkFunction,RichSinkFunction:多个open和close方法

1、自定义Source

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;public class Demo3SourceFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//使用自定义sourceDataStream<Integer> myDS = env.addSource(new MySource());myDS.print();env.execute();}
}//自定义source
//实现SourceFunction接口
class MySource implements SourceFunction<Integer> {//在run方法中读取外部的数据,使用原生java代码@Overridepublic void run(SourceContext<Integer> ctx) throws Exception {while (true) {ctx.collect(1);Thread.sleep(1000);}}//cancel方法是任务被取消是执行的,用于回收资源@Overridepublic void cancel() {}
}

2、自定义Sink

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;public class Demo2MySink {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> linesDS = env.socketTextStream("master", 8888);linesDS.addSink(new MySink());env.execute();}
}//自定义Sink
class MySink implements SinkFunction<String> {//每一条数据执行一次@Overridepublic void invoke(String value, Context context) throws Exception {System.out.println("mySink:" + value);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/471639.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于微信小程序的平安驾校预约平台的设计与实现(源码+LW++远程调试+代码讲解等)

摘 要 互联网发展至今&#xff0c;广泛参与在社会中的方方面面。它让信息都可以通过网络传播&#xff0c;搭配信息管理工具可以很好地为人们提供服务。针对高校教师成果信息管理混乱&#xff0c;出错率高&#xff0c;信息安全性差&#xff0c;劳动强度大&#xff0c;费时费力…

FFmpeg 4.3 音视频-多路H265监控录放C++开发十三.2:avpacket中包含多个 NALU如何解析头部分析

前提&#xff1a; 注意的是&#xff1a;我们这里是从avframe转换成avpacket 后&#xff0c;从avpacket中查看NALU。 在实际开发中&#xff0c;我们有可能是从摄像头中拿到 RGB 或者 PCM&#xff0c;然后将pcm打包成avframe&#xff0c;然后将avframe转换成avpacket&#xff0…

从0学习React(11)

1. 引言 上个星期的工作内容是写IT资产管理的前端页面。其实&#xff0c;尽管我之前有一些前端开发的经验&#xff0c;但并不是很多。这次让我独立完成一个页面的开发&#xff0c;刚开始时我感到无从下手。 2. 初期的困惑和焦虑 我记得在星期一和星期二的时候&#xff0c;那…

BILSTM法律网站用户提问自动分类

项目源码获取方式见文章末尾&#xff01; 600多个深度学习项目资料&#xff0c;快来加入社群一起学习吧。 《------往期经典推荐------》 项目名称 1.【基于CNN-RNN的影像报告生成】 2.【卫星图像道路检测DeepLabV3Plus模型】 3.【GAN模型实现二次元头像生成】 4.【CNN模型实现…

unity 一个物体随键盘上下左右旋转和前进的脚本

注意&#xff1a;脚本挂在gamaobject 上面 &#xff0c;操作对象的目标 this.gameObject 为操作对象 using System.Collections; using System.Collections.Generic; using UnityEngine;public class changePosition : MonoBehaviour {//操作对象的目标 this.gameObject 为操…

【论文阅读】Virtual Compiler Is All You Need For Assembly Code Search

阅读笔记:Virtual Compiler Is All You Need For Assembly Code Search 1. 研究背景 逆向工程:逆向工程需要在庞大的二进制文件中快速定位特定功能(例如恶意行为)。传统方法依赖于经验和启发式算法,效率低下。汇编代码搜索:通过自然语言搜索汇编代码功能,能够更高效地处…

Wireshark中的length栏位

注&#xff1a;Ethernet II的最小data length为46&#xff0c;如果小于&#xff0c;会补全到46. 1.指定网卡抓取的&#xff0c;链路为ethernet。 IPv4 Ethernet II 长度为 14 bytes - L1ipv4 header中的length包括header和payload的总长度 - L2wireshark中length表示抓取的pac…

CentOS网络配置

上一篇文章&#xff1a;VMware Workstation安装Centos系统 在CentOS系统中进行网络配置是确保系统能够顺畅接入网络的重要步骤。本文将详细介绍如何配置静态IP地址、网关、DNS等关键网络参数&#xff0c;以帮助需要的人快速掌握CentOS网络配置的基本方法和技巧。通过遵循本文的…

前端搭建低代码平台,微前端如何选型?

目录 背景 一、微前端是什么&#xff1f; 二、三大特性 三、现有微前端解决方案 1、iframe 2、Web Components 3、ESM 4、EMP 5、Fronts 6、无界&#xff08;文档&#xff09; 7、qiankun 四、我们选择的方案 引入qiankun并使用&#xff08;src外层作为主应用&#xff09; 主应…

CSS:怎么把网站都变成灰色

当大家看到全站的内容都变成了灰色&#xff0c;包括按钮、图片等等。这时候我们可能会好奇这是怎么做到的呢&#xff1f; 有人会以为所有的内容都统一换了一个 CSS 样式&#xff0c;图片也全换成灰色的了&#xff0c;按钮等样式也统一换成了灰色样式。但你想想这个成本也太高了…

基于Spring Boot的计算机课程管理:工程认证的实践

2相关技术 2.1 MYSQL数据库 MySQL是一个真正的多用户、多线程SQL数据库服务器。 是基于SQL的客户/服务器模式的关系数据库管理系统&#xff0c;它的有点有有功能强大、使用简单、管理方便、安全可靠性高、运行速度快、多线程、跨平台性、完全网络化、稳定性等&#xff0c;非常…

flinkOnYarn并配置prometheus+grafana监控告警

flinkOnYarn并配置prometheusgrafana监控告警 一、相关服务版本&#xff1a; flink版本&#xff1a;1.17.2 pushgateway版本&#xff1a;1.10.0 prometheus版本&#xff1a;3.0.0 grafana-v11.3.0参考了网上的多个文档以及学习某硅谷的视频&#xff0c;总结了一下文档&#x…

在esxi8.0中安装黑群晖的过程记录及小问题处理

问题记录 1.某种原因在网页中安装系统后&#xff0c;发现synology搜出来的设备还是169的地址&#xff0c;但是点击设置需要输入管理员账号密码才能设置ip&#xff0c;试了一下&#xff0c;账号输入admin&#xff0c;密码留空正常设置。 2.晚上试了一下&#xff0c;在全新的esxi…

基于微信小程序的公务员考试学习平台的设计与实现,LW+源码+讲解

摘 要 小程序公考学习平台使用Java语言进行编码&#xff0c;使用Mysql创建数据表保存本系统产生的数据。系统可以提供信息显示和相应服务&#xff0c;其管理小程序公考学习平台信息&#xff0c;查看小程序公考学习平台信息&#xff0c;管理小程序公考学习平台。 总之&#x…

深度学习之pytorch常见的学习率绘制

文章目录 0. Scope1. StepLR2. MultiStepLR3. ExponentialLR4. CosineAnnealingLR5. ReduceLROnPlateau6. CyclicLR7. OneCycleLR小结参考文献 https://blog.csdn.net/coldasice342/article/details/143435848 0. Scope 在深度学习中&#xff0c;学习率&#xff08;Learning R…

2024 年(第 7 届)“泰迪杯”数据分析技能赛B 题 特殊医学用途配方食品数据分析 完整代码 结果 可视化分享

一、背景特殊医学用途配方食品简称特医食品&#xff0c;是指为满足进食受限、消化吸收障碍、代谢素乱或者特定疾病状态人群对营养素或者膳食的特殊需要&#xff0c;专门加工配置而成的配方食品&#xff0c;包括0月龄至12月龄的特殊医学用途婴儿配方食品和适用于1岁以上的特殊医…

数据产品:深度探索与案例剖析

​在当今数字化时代&#xff0c;数据产品正逐渐成为各行业发展的关键驱动力。让我们深入了解数据产品的分类与特点&#xff0c;以及通过典型案例分析&#xff0c;感受数据产品的强大魅力。 首先&#xff0c;数据产品主要分为报表型、分析型、平台型等不同类别。 报表型数据产品…

opc da 服务器数据 转 IEC61850项目案例

目录 1 案例说明 2 VFBOX网关工作原理 3 应用条件 4 查看OPC DA服务器的相关参数 5 配置网关采集opc da数据 6 用IEC61850协议转发数据 7 网关使用多个逻辑设备和逻辑节点的方法 8 在服务器上运行仰科OPC DA采集软件 9 案例总结 1 案例说明 在OPC DA服务器上运行OPC …

Vue3 -- 环境变量的配置【项目集成3】

环境&#xff1a; 在项目开发过程中&#xff0c;至少会经历开发环境、测试环境和生产环境(即正式环境)三个阶段。 开发环境 .env.development测试环境 .env.test生产环境 .env.production 不同阶段请求的状态(如接口地址等)不一样&#xff0c;开发项目的时候要经常配置代理跨…

Go八股(Ⅴ)map

1.哈希表 哈希表用来存储键值对&#xff0c;通过hash函数把键值对散列到一个个桶中。 Go使用与运算&#xff0c;桶个数m&#xff0c;则编号[0,m-1]&#xff0c;把键的hash值与m-1与运算。**为了保证所有桶都会被选中&#xff0c;m一定为2的整数次幂。**这样m的二进制数表示一…