快速灵敏的 Flink1

一、flink单机安装

1、解压
tar -zxvf ./flink-1.13.2-bin-scala_2.12.tgz -C /opt/soft/
2、改名字
mv ./flink-1.13.2/ ./flink1132
3、profile配置
#FLINK
export FLINK_HOME=/opt/soft/flink1132
export PATH=$FLINK_HOME/bin:$PATH
4、查看版本
flink --version
5、启动关闭flink
start-cluster.sh
stop-cluster.sh
6、登录网页   http://192.168.91.11:8081

二、flink开发

1、步骤

创建运行环境--> 加载数据源--> 转换--> 下沉

2、案例

(1)学习数据源加载
package nj.zb.kb23.sourceimport org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentobject AA {def main(args: Array[String]): Unit = {//1、创建环境变量val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置并行步 1env.setParallelism(1)//2、加载数据源val stream: DataStream[Any] = env.fromElements(1,2,3,3,4,"hello",3.1415)//3、下沉stream.print()env.execute("sourcetest")}
}
(2)样例类加载数据源
package nj.zb.kb23.sourceimport org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport scala.util.Random
//定义样例类
case class SensorReading(id:String,timestamp:Long,temperature:Double)object AA {def main(args: Array[String]): Unit = {//1、创建环境变量val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置并行步 1env.setParallelism(1)//2、加载数据源val stream: DataStream[SensorReading] = env.fromCollection(List(SensorReading("sensor_1", 1698731530, 26.3),SensorReading("sensor_2", 1698731530, 26.5),SensorReading("sensor_3", 1698731531, 26.7),SensorReading("sensor_4", 1698731530, 26.9),))//3、输出,又叫下沉stream.print()env.execute("sourcetest")}
}

(3)指定文件加载数据
package nj.zb.kb23.sourceimport org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentobject AA {def main(args: Array[String]): Unit = {//1、创建环境变量val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置并行步 1env.setParallelism(1)//2、加载数据源val stream: DataStream[String] = env.readTextFile("D:\\caozuo\\ideal\\flinkstu\\resources\\sensor")//3、输出,又叫下沉stream.print()env.execute("sourcetest")}
}

(4)指定端口,实时处理数据源
package nj.zb.kb23.sourceimport org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment//定义样例类
case class SensorReading(id:String,timestamp:Long,temperature:Double)object AA {def main(args: Array[String]): Unit = {//1、创建环境变量val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置并行步 1env.setParallelism(1)//2、加载数据源//(1)真实时处理 nc -lk 7777val stream: DataStream[String] = env.socketTextStream("192.168.91.11",7777)stream.print()//3、转换拼接val stream1: DataStream[(String, Int)] = stream.map(x=>x.split(",")).flatMap(x=>x).map(x=>(x,1))stream1.print()//①sumval value: DataStream[(String, Int)] = stream.map(x=>x.split(",")).flatMap(x=>x).map(x=>(x,1)).keyBy(x=>x._1).sum(1)value.print()//   ⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇⬇相等//②reduceval value: DataStream[(String, Int)] = stream.map(x => x.split(",")).flatMap(x => x).map(x => (x, 1)).keyBy(x => x._1).reduce((x, y) => (x._1 + "#" + y._1, x._2 + y._2))value.print()//4、输出,又叫下沉env.execute("sourcetest")}
}
(5)kafka加载数据
package nj.zb.kb23.sourceimport java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.clients.consumer.ConsumerConfig//定义样例类
case class SensorReading(id:String,timestamp:Long,temperature:Double)object AA {def main(args: Array[String]): Unit = {//1、创建环境变量val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置并行步 1env.setParallelism(1)//2、加载数据源val prop = new Properties()prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.91.11:9092")prop.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"sensorgroup1")prop.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")prop.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer")prop.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest")val stream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("sensor", new SimpleStringSchema(), prop))val value: DataStream[(String, Int)] = stream.flatMap(x => x.split(" ")).map(x => (x, 1)).keyBy(x => x._1).reduce((x: (String, Int), y: (String, Int)) => (x._1, x._2 + y._2))//4、输出,又叫下沉stream.print()env.execute("sourcetest")}
}
(6)自定义数据源加载数据
package nj.zb.kb23.sourceimport org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport scala.util.Random//定义样例类
case class SensorReading(id:String,timestamp:Long,temperature:Double)object AA {def main(args: Array[String]): Unit = {//1、创建环境变量val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//设置并行步 1env.setParallelism(1)//2、加载数据源val stream: DataStream[SensorReading] = env.addSource(new MySensorSource)//4、输出,又叫下沉stream.print()env.execute("sourcetest")}
}
//模拟自定义数据源
class MySensorSource extends SourceFunction[SensorReading]{override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {//(1)随机数,true一直生成随机数val random = new Random()while (true){val d: Double = Math.random()ctx.collect(SensorReading("随机数:"+random.nextInt(),System.currentTimeMillis(),d))Thread.sleep(1000)}}override def cancel(): Unit = {}
}

三、flink运行四大组件

1、作业管理器jobmanager

应用程序执行的主过程中,执行应用程序会被jobmanager最先接收,这个应用程序会包括:作业图(jobGraph),逻辑数据流图(logical dataflow graph)和打包了所有的类, 库和其他资源的jar包。jobmanager会向资源管理器请求执行任务必要的资源,也就是任务管理器上的插槽(slot)。一旦它获取了足够的资源,就会将执行图分发到真正运行它们的taskmanager上。在实际运行中,由jobmanager负责协调各项中央操作。

2、任务管理器taskmanager

taskmanager是指工作进程。Flink中包含了多个taskmanager,每个taskmanager中又存在着一定数量的插槽(slots),插槽的数量限制了TaskManager能够执行的任务数量。开始运行后,taskmanager中的插槽会被注册给资源管理器,在收到指令后,taskmanager会提供多个插槽任jobmanager调用。jobmanager通过给插槽分配tasks来执行。运行同一应用程序的taskmanager可以子啊执行过程中互相交换数据。

3、资源管理器resourcemanager

资源管理器在作业管理器申请插槽资源时,会将空闲插槽的任务管理器分配给作业管理器。如果没有足够的插槽来满足作业管理器的请求时,它会向资源提供平台发起会话,以提供启动taskmanager进程的容器。

4、分发器 dispatcher
  1. 提供了REST接口,在应用提交时可以跨作业运行。
  2. 在应用被提交执行的情况下,分发器启动将应用提交给jobmanager。
  3. Webui会由dispatcher启动,以便展示和监控作业的执行信息。
  4. 这取决于应用提交运行的方式取决于是否需要dispatche

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/180520.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

轻量封装WebGPU渲染系统示例<14>- 多线程模型载入(源码)

当前示例源码github地址: https://github.com/vilyLei/voxwebgpu/blob/main/src/voxgpu/sample/ModelLoadTest.ts 此示例渲染系统实现的特性: 1. 用户态与系统态隔离。 细节请见:引擎系统设计思路 - 用户态与系统态隔离-CSDN博客 2. 高频调用与低频调用隔离。 …

C语言--判断一个年份是否是闰年(详解)

一.闰年的定义 闰年是指在公历(格里高利历)中,年份可以被4整除但不能被100整除的年份,或者可以被400整除的年份。简单来说,闰年是一个比平年多出一天的年份,即2月有29天。闰年的目的是校准公历与地球公转周…

CH10_简化条件逻辑

分解条件表达式(Decompose Conditional) if (!aDate.isBefore(plan.summerStart) && !aDate.isAfter(plan.summerEnd))charge quantity * plan.summerRate; elsecharge quantity * plan.regularRate plan.regularServiceCharge;if (summer())…

【蓝桥杯省赛真题42】Scratch舞台特效 蓝桥杯少儿编程scratch图形化编程 蓝桥杯省赛真题讲解

目录 scratch舞台特效 一、题目要求 编程实现 二、案例分析 1、角色分析

【移远QuecPython】EC800M物联网开发板的内置GNSS定位的恶性BUG(目前没有完全的解决方案)

【移远QuecPython】EC800M物联网开发板的内置GNSS定位的恶性BUG(目前没有完全的解决方案) GNSS配置如下: 【移远QuecPython】EC800M物联网开发板的内置GNSS定位获取(北斗、GPS和GNSS) 测试视频(包括BUG复…

Iceberg教程

目录 教程来源于尚硅谷1. 简介1.1 概述1.2 特性 2. 存储结构2.1 数据文件(data files)2.2 表快照(Snapshot)2.3 清单列表(Manifest list)2.4 清单文件(Manifest file)2.5 查询流程分析 3. 与Flink集成3.1 环境准备3.1.1 安装Flink3.1.2 启动Sql-Client 3.2 语法 教程来源于尚硅…

【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制

文章目录 前言:消息的可靠性问题一、生产者消息的确认1.1 生产者确认机制1.2 实现生产者消息的确认1.3 验证生产者消息的确认 二、消息的持久化2.1 演示消息的丢失2.2 声明持久化的交换机和队列2.3 发送持久化的消息 三、消费者消息的确认3.1 配置消费者消息确认3.2…

Git从基础到实践

1.Git是用来做什么的? git就是一款版本控制软件,主要面向代码的管理。你可以理解为Git是一个代码的备份器,给你的每一次修改后的代码做个备份,防止丢失,这个是git最基本的功能。 其次,git不止备份,当你需要比对多…

NEFU数字图像处理(5)图像压缩编码

一、概述 1.1简介 图像压缩编码的过程是在图像存储或传输之前进行,然后再由压缩后的图像数据(编码数据)恢复出原始图像或者是原始图像的近似图像 无损压缩:在压缩过程中没有信息损失,可由编码数据完全恢复出原始图像有…

iOS App Store上传项目报错 缺少隐私政策网址(URL)解决方法

​ 一、问题如下图所示: ​ 二、解决办法:使用Google浏览器(翻译成中文)直接打开该网址 https://www.freeprivacypolicy.com/free-privacy-policy-generator.php 按照要求填写APP信息,最后将生成的网址复制粘贴到隐…

【SOC基础】单片机学习案例汇总 Part2:蜂鸣器、数码管显示

📢:如果你也对机器人、人工智能感兴趣,看来我们志同道合✨ 📢:不妨浏览一下我的博客主页【https://blog.csdn.net/weixin_51244852】 📢:文章若有幸对你有帮助,可点赞 👍…

xilinx fpga ddr mig axi

硬件 参考: https://zhuanlan.zhihu.com/p/97491454 https://blog.csdn.net/qq_22222449/article/details/106492469 https://zhuanlan.zhihu.com/p/26327347 https://zhuanlan.zhihu.com/p/582524766 包括野火、正点原子的资料 一片内存是 1Gbit 128MByte 16bit …

【wp】2023鹏城杯初赛 Web web1(反序列化漏洞)

考点&#xff1a; 常规的PHP反序列化漏洞双写绕过waf 签到题 源码&#xff1a; <?php show_source(__FILE__); error_reporting(0); class Hacker{private $exp;private $cmd;public function __toString(){call_user_func(system, "cat /flag");} }class A {p…

Spring基础

文章目录 Spring基础IoC容器基础IoC理论第一个Spring程序Bean注册与配置依赖注入自动装配生命周期与继承工厂模式和工厂Bean注解开发 AOP面向切片配置实现AOP接口实现AOP注解实现AOP Spring基础 Spring是为了简化开发而生&#xff0c;它是轻量级的IoC和AOP的容器框架&#xff…

I/O多路转接之select

承接上文&#xff1a;I/O模型之非阻塞IO-CSDN博客 简介 select函数原型介绍使用 一个select简单的服务器的代码书写 select的缺点 初识select 系统提供select函数来实现多路复用输入/输出模型 select系统调用是用来让我们的程序监视多个文件描述符的状态变化的; 程序会停在s…

Vue3 实现 clipboard 复制功能

一个很小的交互功能&#xff0c;网上搜了一下有一个 vue3-clipboard 直接支持vue3&#xff0c;到github仓库看了下&#xff0c;原作者已经不维护这个项目了&#xff1a; 推荐使用 vueuse 自带的 useclipboard 功能&#xff0c;由 vue 团队维护&#xff0c;稳定性基本没问题 官…

数据结构构之顺序表

1.线性表 线性表&#xff08;linear list&#xff09;是n个具有相同特性的数据元素的有限序列。 线性表是一种在实际中广泛使用的数据结构&#xff0c;常见的线性表&#xff1a;顺序表、链表、栈、队列、字符串... 线性表在逻辑上是线性结构&#xff0c;也就说是连续的一条直线…

MySQL连接时出现Host ‘::1‘ is not allowed to connect to this MySQL server

报错原因 之前想着要提高一下连接速度&#xff0c;所以在my.ini中加入了&#xff1a;skip-name-resolve&#xff0c;当时的数据库root账号设置的登录权限是%&#xff0c;因此没有出现连接错误&#xff0c;这次因为是新建数据库&#xff0c;root账号的登录权限默认是localhost&…

园区网真实详细配置大全案例

实现要求&#xff1a; 1、只允许行政部电脑对全网telnet管理 2、所有dhcp都在核心 3、wifi用户只能上外网&#xff0c;不能访问局域网其它电脑 4、所有接入交换机上bpdu保护 5、只允许vlan 10-40上网 5、所有接入交换机开dhcp snoop 6、所有的交换机指定核心交换机为ntp时间服务…

解决Visual Studio Code 控制台中文乱码问题

C和CPP运行编码指定 "code-runner.executorMap": {"c": "cd $dir && gcc -fexec-charsetGBK $fileName -o $fileNameWithoutExt && $dir$fileNameWithoutExt","cpp": "cd $dir && g -fexec-charsetGBK $…