Flink - souce算子

水善利万物而不争,处众人之所恶,故几于道💦

目录

  1. 从Java的集合中读取数据
  2. 从本地文件中读取数据
  3. 从HDFS中读取数据
  4. 从Socket中读取数据
  5. 从Kafka中读取数据
  6. 自定义Source

官方文档 - Flink1.13

在这里插入图片描述


1. 从Java的集合中读取数据

fromCollection(waterSensors)

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);List<WaterSensor> waterSensors = Arrays.asList(new WaterSensor("ws_001", 1577844001L, 45),new WaterSensor("ws_002", 1577844015L, 43),new WaterSensor("ws_003", 1577844020L, 42));env.fromCollection(waterSensors).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

2. 从本地文件中读取数据

readTextFile(“input/words.txt”),支持相对路径和绝对路径

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.readTextFile("input/words.txt").print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}

运行结果:
在这里插入图片描述

3. 从HDFS中读取数据

readTextFile(“hdfs://hadoop101:8020/flink/data/words.txt”)

要先在pom文件中添加hadoop-client依赖:

<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version>
</dependency>
public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.readTextFile("hdfs://hadoop101:8020/flink/data/words.txt").print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

4. 从Socket中读取数据

socketTextStream(“hadoop101”,9999),这个输入源不支持多个并行度。

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);//从端口中读数据,  windows中 nc -lp 9999     Linux nc -lk 9999env.socketTextStream("hadoop101",9999).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

5. 从Kafka中读取数据

addSource(new FlinkKafkaConsumer<>(“flink_source_kafka”,new SimpleStringSchema(),properties))

第一个参数是topic,

第二个参数是序列化器,序列化器就是在Kafka和flink之间转换数据 - 官方注释:The de-/serializer used to convert between Kafka’s byte messages and Flink’s objects.(反-序列化程序用于在Kafka的字节消息和Flink的对象之间进行转换。)

第三个参数是Kafka的配置。

public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);Properties properties = new Properties();// 设置集群地址properties.setProperty("bootstrap.servers", "hadoop101:9092,hadoop102:9092,hadoop103:9092");// 设置所属消费者组properties.setProperty("group.id", "flink_consumer_group");env.addSource(new FlinkKafkaConsumer<>("flink_source_kafka",new SimpleStringSchema(),properties)).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}
}

运行结果:
在这里插入图片描述

6. 自定义Source

addSource(new XXXX())

  大多数情况下,前面的数据源已经能够满足需要,但是难免会存在特殊情况的场合,所以flink也提供了能自定义数据源的方式.

public class Flink06_myDefDataSource {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port",1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);env.addSource(new RandomWatersensor()).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}
}

  自定义数据源需要定义一个类,然后实现SourceFunction接口,然后实现其中的两个方法,runcancel,run方法包含具体读数据的逻辑,当调用cancel方法的时候应该可以让run方法中的读数据逻辑停止

public class RandomWatersensor implements SourceFunction<WaterSensor> {private Boolean running = true;@Overridepublic void run(SourceContext<WaterSensor> sourceContext) throws Exception {Random random = new Random();while (running){sourceContext.collect(new WaterSensor("sensor" + random.nextInt(50),Calendar.getInstance().getTimeInMillis(),random.nextInt(100)));Thread.sleep(1000);}}/*** 大多数的source在run方法内部都会有一个while循环,* 当调用这个方法的时候, 应该可以让run方法中的while循环结束*/@Overridepublic void cancel() {running = false;}}

运行结果:
在这里插入图片描述


demo2 - 自定义从socket中读取数据
public class Flink04_Source_Custom {public static void main(String[] args) throws Exception {// 1. 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new MySource("hadoop102", 9999)).print();env.execute();}public static class MySource implements SourceFunction<WaterSensor> {private String host;private int port;private volatile boolean isRunning = true;private Socket socket;public MySource(String host, int port) {this.host = host;this.port = port;}@Overridepublic void run(SourceContext<WaterSensor> ctx) throws Exception {// 实现一个从socket读取数据的sourcesocket = new Socket(host, port);BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));String line = null;while (isRunning && (line = reader.readLine()) != null) {String[] split = line.split(",");ctx.collect(new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2])));}}/*** 大多数的source在run方法内部都会有一个while循环,* 当调用这个方法的时候, 应该可以让run方法中的while循环结束*/@Overridepublic void cancel() {isRunning = false;try {socket.close();} catch (IOException e) {e.printStackTrace();}}}
}
/*
sensor_1,1607527992000,20
sensor_1,1607527993000,40
sensor_1,1607527994000,50*/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/73774.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【话题】感觉和身边其他人有差距怎么办?也许自我调整很重要

每个人能力有限&#xff0c;水平高低不同&#xff0c;我们身在大环境里&#xff0c;虽然在同一个起跑线上&#xff0c;但是时间久了&#xff0c;你会发现&#xff0c;并越来越感觉到和身边其他人有了差距&#xff0c;慢慢的会有一定的落差感&#xff0c;怎么办呢&#xff01;通…

接口相似数据结构复用率高?Apipost这招搞定!

在API设计和开发过程中&#xff0c;存在许多瓶颈&#xff0c;其中一个主要问题是在遇到相似数据结构的API时会产生重复性较多的工作&#xff1a;在每个API中都编写相同的数据&#xff0c;这不仅浪费时间和精力&#xff0c;还容易出错并降低API的可维护性。 为了解决这个问题&a…

SAS-数据集SQL水平合并

一、SQL水平合并基本语法 sql的合并有两步&#xff0c;step1&#xff1a;进行笛卡尔乘积运算&#xff0c;第一个表的每一行合并第二个表的每一行&#xff0c;即表a有3行&#xff0c;表b有3行&#xff0c;则合并后3*39行。笛卡尔过程包含源数据的所有列&#xff0c;相同列名会合…

【LeetCode每日一题】——566.重塑矩阵

文章目录 一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【题目提示】七【解题思路】八【时间频度】九【代码实现】十【提交结果】 一【题目类别】 矩阵 二【题目难度】 简单 三【题目编号】 566.重塑矩阵 四【题目描述】 在 MATLAB 中&…

【QT学习】01:helloqt

helloqt OVERVIEW helloqt一、helloqt1.使用向导创建2.手动创建3.pro文件4.Qt应用程序框架 二、按钮创建main.cppmywidget.cpp 三、对象模型1.对象树引入2.存在的问题 一、helloqt 创建一个qt项目&#xff0c;可以使用creator的向导创建&#xff0c;也可自己手动创建&#xff…

时间复杂度为O(n2)的三种简单排序算法

1.冒泡排序 冒泡排序只会操作相邻的两个数据。每次冒泡操作都会对相邻的两个元素进行比较&#xff0c;看是否满足大小关系要求。如果不满足就让它俩互换。一次冒泡会让至少少一个元素移动到它应该在的位置&#xff0c;重复n次&#xff0c;就完成了n个数据的排序工作。 /*** …

JavaEE初阶之网络初识

一、网络发展史 1.1独立模式 独立模式:计算机之间相互独立; 1.2网络互连 随着时代的发展,越来越需要计算机之间互相通信,共享软件和数据,即以多个计算机协同工作来完成业务,就有了网络互连。网络互连:将多台计算机连接在一起,完成数据共享。 数据共享本质是网络数据…

Pytorch深度学习-----神经网络之池化层用法详解及其最大池化的使用

系列文章目录 PyTorch深度学习——Anaconda和PyTorch安装 Pytorch深度学习-----数据模块Dataset类 Pytorch深度学习------TensorBoard的使用 Pytorch深度学习------Torchvision中Transforms的使用&#xff08;ToTensor&#xff0c;Normalize&#xff0c;Resize &#xff0c;Co…

39.手机导航

手机导航 html部分 <div class"phone"><div class"content"><img class"active" src"./static/20180529205331_yhGyf.jpeg" alt"" srcset""><img src"./static/20190214214253_hsjqw…

14-1_Qt 5.9 C++开发指南_网络编程及主机信息查询_HostInfo

Qt 网络模块提供了用于编写 TCP/IP 客户端和服务器端程序的各种类&#xff0c;如用于 TCP 通信的QTcpSocket 和 QTcpServer&#xff0c;用于 UDP 通信的 QUdpSocket&#xff0c;还有用于实现 HTTP、FTP 等普通网络协议的高级类如 QNetworkRequest&#xff0c;QNetworkReply 和Q…

SpringBoot 入门

0目录 1.SpringBoot简介&#xff1b;优点和目录结构 2.实战 3.YML基本语法 4.集成Mybatis 1.SpringBoot简介&#xff1b;优点和目录结构 2.实战 创建工程 去s 降低错误率&#xff0c;更改地址 选择Maven 组和名称 修改版本&#xff0c;加入依赖 新建controller …

f12 CSS网页调试_css样式被划了黑线怎么办

我的问题是这样的 class加上去了,但是样式不生效,此时可能是样式被其他样式覆盖了, 解决方案就是 给颜色后边添加一个!important

Python 进阶(三):正则表达式(re 模块)

❤️ 博客主页:水滴技术 🌸 订阅专栏:Python 入门核心技术 🚀 支持水滴:点赞👍 + 收藏⭐ + 留言💬 文章目录 1. 导入re模块2. re模块中的常用函数2.1 re.search()2.2 re.findall()2.3 re.sub()2.4 re.compile()2.5 re.split()3. 正则表达式的语法4. 匹配对象的属性和

淘宝资源采集(从零开始学习淘宝数据爬取)

1. 为什么要进行淘宝数据爬取&#xff1f; 淘宝数据爬取是指通过自动化程序从淘宝网站上获取数据的过程。这些数据可以包括商品信息、销售数据、评论等等。淘宝数据爬取可以帮助您了解市场趋势、优化您的产品选择以及提高销售额。 淘宝作为全球的电商平台&#xff0c;每天都有…

jmeter中json提取器,获取多个值,并通过beanshell组成数组

jmeter中json提取器介绍 特别说明&#xff1a;**Compute concatenation var(suffix_ALL)&#x1f617;*如果找到许多结果&#xff0c;则插件将使用’ &#xff0c; 分隔符将它们连接起来&#xff0c;并将其存储在名为 _ALL的var中 json提取器调试 在查看结果树中选择JSON Pat…

FSM:Full Surround Monodepth from Multiple Cameras

参考代码&#xff1a;None 介绍 深度估计任务作为基础环境感知任务&#xff0c;在基础上构建的3D感知才能更加准确&#xff0c;并且泛化能力更强。单目的自监督深度估计已经有MonoDepth、ManyDepth这些经典深度估计模型了&#xff0c;而这篇文章是对多目自监督深度估计进行探…

JavaEE 面试常见问题

一、常见的 ORM 框架有哪些&#xff1f; 1.Mybatis Mybatis 是一种典型的半自动的 ORM 框架&#xff0c;所谓的半自动&#xff0c;是因为还需要手动的写 SQL 语句&#xff0c;再由框架根据 SQL 及 传入数据来组装为要执行的 SQL 。其优点为&#xff1a; 1. 因为由程序员…

使用vs 2017 C#项目发布

C#项目发布 vs 2017 打包项目源代码 (发布)iis 配置添加ssl 配置 vs 2017 打包项目源代码 (发布) iis 配置 添加ssl 配置 https://help.aliyun.com/zh/ssl-certificate/user-guide/install-ssl-certificates-on-iis-servers

Python+PIL计算两个图像的相似度并返回第一个不匹配的像素的x坐标(附完整版代码)

前言 前几天看到一篇文章写Pythonselenium超级鹰对滑块验证码的操作&#xff0c;大致的思想如下&#xff1a; 1、就是将滑块验证码进行截图 2、利用超级鹰的API进行对图片的处理&#xff0c; 3、返回滑块的距离 我在很久之前也遇到过类似的需求&#xff0c; 当时我的好友帮我写…

React之组件的生命周期

React之组件的生命周期 一、概述二、整体说明三、挂载阶段四、更新阶段五、卸载阶段 一、概述 生命周期:一个事务从创建到最后消亡经历的整个过程组件的生命周期&#xff1a;组件从被创建到挂载到页面中运行&#xff0c;再到组件不用时卸载的过程意义&#xff1a;理解组件的生…