Flink学习之旅:(三)Flink源算子(数据源)

1.Flink数据源

        Flink可以从各种数据源获取数据,然后构建DataStream 进行处理转换。source就是整个数据处理程序的输入端。

数据集合
数据文件
Socket数据
kafka数据
自定义Source

2.案例

2.1.从集合中获取数据

        创建 FlinkSource_List 类,再创建个 Student 类(姓名、年龄、性别三个属性就行,反正测试用)

package com.qiyu;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;/*** @author MR.Liu* @version 1.0* @data 2023-10-18 16:13*/
public class FlinkSource_List {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);ArrayList<Student> clicks = new ArrayList<>();clicks.add(new Student("Mary",25,1));clicks.add(new Student("Bob",26,2));DataStream<Student> stream = env.fromCollection(clicks);stream.print();env.execute();}
}

运行结果:

Student{name='Mary', age=25, sex=1}
Student{name='Bob', age=26, sex=2}

2.2.从文件中读取数据

文件数据:

spark
hello world kafka spark
hadoop spark

package com.qiyu;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author MR.Liu* @version 1.0* @data 2023-10-18 16:31*/
public class FlinkSource_File {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> stream = env.readTextFile("input/words.txt");stream.print();env.execute();}
}

运行结果:(没做任何处理)

spark
hello world kafka spark
hadoop spark

2.3.从Socket中读取数据

package com.qiyu;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author MR.Liu* @version 1.0* @data 2023-10-18 17:41*/
public class FlinkSource_Socket {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取文本流DataStreamSource<String> lineDSS = env.socketTextStream("192.168.220.130",7777);lineDSS.print();env.execute();}
}

运行结果:

服务器上执行:

 nc -lk 7777

疯狂输出

控制台打印结果 

6> hello
7> world

2.4.从Kafka中读取数据

pom.xml 添加Kafka连接依赖

      <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>
package com.qiyu;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import java.util.Properties;/*** @author MR.Liu* @version 1.0* @data 2023-10-19 10:01*/
public class FlinkSource_Kafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);Properties properties = new Properties();properties.setProperty("bootstrap.servers", "hadoop102:9092");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");properties.setProperty("auto.offset.reset", "latest");DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties));stream.print("Kafka");env.execute();}
}

启动 zk 和kafka

创建topic

bin/kafka-topics.sh --create --bootstrap-server hadoop102:9092 --replication-factor 1 --partitions 1 --topic clicks

生产者、消费者命令

bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092  --topic clicks
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092  --topic clicks --from-beginning

启动生产者命令后疯狂输入 

运行java类,运行结果:和生产者输入的是一样的

Kafka> flinks
Kafka> hadoop
Kafka> hello
Kafka> nihaop

2.5.从自定义Source中读取数据

        大多数情况下,前面几个数据源已经满足需求了。但是遇到特殊情况我们需要自定义的数据源。实现方式如下:

        1.编辑自定义源Source

package com.qiyu;import org.apache.flink.streaming.api.functions.source.SourceFunction;import java.util.Calendar;
import java.util.Random;/*** @author MR.Liu* @version 1.0* @data 2023-10-19 10:37*//**** 主要实现2个方法 run() 和 cancel()*/
public class FlinkSource_Custom implements SourceFunction<Student> {// 声明一个布尔变量,作为控制数据生成的标识位private Boolean running = true;@Overridepublic void run(SourceContext<Student> sourceContext) throws Exception {Random random = new Random(); // 在指定的数据集中随机选取数据String[] name = {"Mary", "Alice", "Bob", "Cary"};int[] sex = {1,2};int age = 0;while (running) {sourceContext.collect(new Student(name[random.nextInt(name.length)],sex[random.nextInt(sex.length)],random.nextInt(100)));// 隔 1 秒生成一个点击事件,方便观测Thread.sleep(1000);}}@Overridepublic void cancel() {running = false;}
}

        2.编写主程序

package com.qiyu;import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** @author MR.Liu* @version 1.0* @data 2023-10-19 10:46*/
public class FlinkSource_Custom2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);
//有了自定义的 source function,调用 addSource 方法DataStreamSource<Student> stream = env.addSource(new FlinkSource_Custom());stream.print("SourceCustom");env.execute();}
}

 运行主程序,运行结果:

SourceCustom> Student{name='Mary', age=1, sex=46}
SourceCustom> Student{name='Cary', age=2, sex=52}
SourceCustom> Student{name='Bob', age=1, sex=14}
SourceCustom> Student{name='Alice', age=1, sex=84}
SourceCustom> Student{name='Alice', age=2, sex=82}
SourceCustom> Student{name='Cary', age=1, sex=28}

.............

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/164209.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ps或游戏提示d3dcompiler_47.dll缺失怎么修复?常见的修复方法总结

在当今这个信息化的时代&#xff0c;计算机已经成为我们生活和工作中不可或缺的一部分。然而&#xff0c;随着软件的不断更新和升级&#xff0c;一些技术问题也时常困扰着我们。其中&#xff0c;d3dcompiler_47.dll缺失就是一个常见的问题。本文将详细介绍五种修复方案&#xf…

性能超越 Clickhouse | 物联网场景中的毫秒级查询案例

1 物联网应用场景简介 物联网&#xff08;Internet of Things&#xff0c;简称 IoT&#xff09;是指通过各种信息传感、通信和 IT 技术来实时连接、采集、监管海量的传感设备&#xff0c;从而实现对现实世界的精确感知和快速响应&#xff0c;继而实现自动化、智能化管理。在查…

Visual Studio2019 与 MySQL连接 版本关系

Refer: VS 连接MySQL | mysql-for-visualstudio 的安装-CSDN博客 【精选】用VS2019&#xff08;C#&#xff09;连接MYSQL(从0入门&#xff0c;手把手教学&#xff09;_mysql-for-visualstudio-1.2.9.msi_Flying___rabbit的博客-CSDN博客 一、工具&#xff1a;VS2019需要连接M…

gin框架39--重构 BasicAuth 中间件

gin框架39--重构 BasicAuth 中间件 介绍gin BasicAuth 解析自定义newAuth实现基础认证注意事项说明 介绍 每当我们打开一个网址的时候&#xff0c;会自动弹出一个认证界面&#xff0c;要求我们输入用户名和密码&#xff0c;这种BasicAuth是最基础、最常见的认证方式&#xff0…

RabbitMQ整理

MQ(Message Queue)&#xff1a;是队列&#xff0c;也是跨进程的通信机制&#xff0c;用于上下游传递信息 FIFO(First In First Out)&#xff1a;先进先出 RabbitMQ访问&#xff1a;http://127.0.0.1:15672/ 默认账号密码&#xff1a;guest 优势&#xff1a;流量削峰&#x…

CRC16计算FC(博途SCL语言)

CRC8的计算FC,相关链接请查看下面文章链接: 博途SCL CRC8 计算FC(计算法)_博途怎么计算crc_RXXW_Dor的博客-CSDN博客关于CRC8的计算网上有很多资料和C代码,这里不在叙述,这里主要记录西门子的博途SCL完成CRC8的计算过程, CRC校验算法,说白了,就是把需要校验的数据与多项式…

CCF CSP认证 历年题目自练Day34

题目一 试题编号&#xff1a; 202303-1 试题名称&#xff1a; 田地丈量 时间限制&#xff1a; 1.0s 内存限制&#xff1a; 512.0MB 问题描述&#xff1a; 问题描述 西西艾弗岛上散落着 n 块田地。每块田地可视为平面直角坐标系下的一块矩形区域&#xff0c;由左下角坐标 (x1,…

xml schema中的all元素

说明 xml schema中的all元素表示其中的子元素可以按照任何顺序出现&#xff0c;每个元素可以出现0次或者1次。 https://www.w3.org/TR/xmlschema-1/#element-all maxOccurs的默认值是1&#xff0c;minOccurs 的默认值是1。 举例 <element name"TradePriceRequest&…

丈母娘说:有编制的不如搞编程的

10月17日百度世界大会召开&#xff0c;据说文心大模型又牛X了&#xff0c;综合水平相比GPT4毫不逊色&#xff0c;真是个让人激动的消息&#xff0c;国产大模型的进展可以说是日新月异&#xff0c;我这耳朵边一直响彻四个字&#xff1a;遥遥领先。 不过今天我关注的重点不是什么…

ArcGIS在VUE框架中的构建思想

项目快要上线了&#xff0c;出乎意料的有些空闲时间。想着就把其他公司开发的一期代码里面&#xff0c;把关于地图方面的代码给优化一下。试运行的时候&#xff0c;客户说控制台有很多飘红的报错&#xff0c;他们很在意&#xff0c;虽然很不情愿&#xff0c;但能改的就给改了吧…

云函数cron-parser解析时区问题

1、问题 云函数部署后cron-parser解析0点会变成8点 考虑可能是时区的问题 然后看文档发现果然有问题&#xff0c;云函数环境是utc0 2、解决 看了半天cron-parser文档发现 Using Date as an input can be problematic specially when using the tz option. The issue bein…

【Linux】线程池 | 自旋锁 | 读写锁

文章目录 一.Linux线程池1.线程池的概念2.线程池的优点3.线程池的应用场景4.线程池的实现 二.其他常见的锁1.STL、智能指针和线程安全2.其他常见的锁 三.读者写者问题1.读者写者模型2.读写锁 一.Linux线程池 1.线程池的概念 线程池是一种线程使用模式。 线程过多会带来调度开销…

网站二级域名怎么部署SSL证书?

二级域名是在主域名下创建的子域名&#xff0c;常用于区分不同功能或部门的网站。随着互联网的发展&#xff0c;越来越多的网站开始采用二级域名来构建更灵活和个性化的网站结构&#xff0c;保护二级域名的数据安全也变得至关重要。为了确保二级域名的安全性&#xff0c;申请SS…

【七:(测试用例)spring boot+testng+xml+mock实现用例管理+数据校验】

目录 1、目录结构的相关类cases类1、添加用户 AddUserTest2、获取用户列表信息 GetUserInfoListTest3、获取用户信息 GetUserInfoTest4、登录测试5、更新用户信息 config类1、报告配置2、用户路径配置 model类utils类 配置配置类SQLMapper.xmlspring boot全局配置databaseConfi…

了解 Elasticsearch 自动生成的文档 _id:重复是一个问题吗?

Elasticsearch 中自动生成的文档 ID 当你在未指定 ID 的情况下对文档建立索引时&#xff0c;Elasticsearch 会自动为该文档生成唯一的 ID。 该 ID 是 Base64 编码的 UUID&#xff0c;由多个部分组成&#xff0c;每个部分都有特定的用途。 ID 生成过程针对索引速度和存储效率进…

2023年中国人力资源咨询发展历程及市场规模前景分析[图]

人力资源咨询是企业借助外部智力资源提高自身管理水平和效率的重要路径&#xff0c;属于管理咨询业的一个重要分支, 一方面&#xff0c;人力资源咨询要为企业提供基础的人力资源外包服务&#xff1b;另一方面&#xff0c;人力资源咨询要为企业提供专业化、职业化现代人力资源管…

用CSS+SVG做一个优雅的环形进度条

开门见山 先上最终效果图&#xff1a;&#xff08;Demo 传送门&#xff09; 其中进度、尺寸、环宽和颜色都可以非常方便地进行控制。 核心原理&#xff1a; 利用两个重叠的圆环形&#xff0c;通过对上层圆环弧长的控制来表示进度&#xff0c;下层圆环则作为辅助&#xff0c;…

开源云财务软件,财务软件源码,永久免费财务软件

纷析云SAAS云财务软件开源版 包含账套、凭证字、科目、期初、币别、账簿、报表、凭证、结账 技术交流群 扫码添加客服进群 商业版地址 纷析云商业版https://f3.fenxi365.com/ 正式环境&#xff0c;可注册账号直接使用或测试 功能对比 功能模块开源版商业版[技术重构]凭证✔…

蜻蜓c影视追剧系统-多个小程序添加说明

多小程序添加设置 蜻蜓c影视追剧 支持多小程序添加&#xff0c;也就是可以管理不同前端的小程序。 此处id 对应前端小程序的mp值 关于添加小程序&#xff1a; 此处有所有填写内容的参考方式&#xff0c;要注意是必须开通了微信支付才可以添加&#xff0c;这里需要添加证书信息…