206.Flink(一):flink概述,flink集群搭建,flink中执行任务,单节点、yarn运行模式,三种部署模式的具体实现

一、Flink概述

1.基本描述

Flink官网地址:Apache Flink® — Stateful Computations over Data Streams | Apache Flink

Flink是一个框架分布式处理引擎,用于对无界有界数据流进行有状态计算

 2.有界流和无界流

  • 无界流(流):
    • 有定义流的开始,没有定义结束。会无休止产生数据
    • 无界流数据必须持续处理
  • 有界流(批):
    • 有定义流的开始,也有定义流的结束
    • 可以拿到所有数据后再进行处理,并且做排序
    • 有界流通常被称为批处理

3.有状态

flink中除了流之外还会有额外的数据,用来对这些流做一些状态统计。

比如流是路上的汽车,我们是路边的人,数过去了多少车。过去一辆我们可以记一个,再过去就2个。也可以通过画正字的方式记录,最后通过统计正字来得到过去多少车。这里的数字以及正字,就是车以外的额外数据,用作统计。我们每来一个车统计一下,统计完之后可以对外输出。同时,每过一段时间会持久化一下,以防丢失。 

4.flink的特点

低延迟、高吞吐、结果准确、良好的容错

  • 高吞吐、低延迟:每秒可以处理数百万个事件,毫秒级延迟
  • 结果准确:flink提供事件事件(event_time)和处理时间(processing_time)语义。对于乱序事件流,事件事件语序仍然能提供一致且精确的结果
  • 精确一次(exactly-once)的状态一致性保证
  • 可以连接到常见的存储系统:kafka,hive,jdbc,hdfs,redis等
  • 高可用:本身就是高可用,配合k8s,yarn和mesos的紧密集成,再加上从故障中快速恢复和动态扩展的能力,可以以极少的停机时间实现7*24小时运行

5.flink和spark的区别

  • spark以批处理为根本
    • spark采用rdd模型,所谓rdd就是每3秒看做的一个批次,spark引擎处理这三秒的数据。spark streaming的Dstream实际上就是一组组rdd的集合
    • spark是批计算,将DAG划分为不同的stage,一个完成才计算下一个
  • Flink以流处理为根本
    • flink基本模型是数据流,以及事件序列
    • flink是标准的流执行模式,一个事件在一个节点处理完之后可以直接下发下一个节点处理

spark:

flink:

flinkspark
计算模型流计算微批计算
时间语序事件事件、处理时间处理时间
窗口多、灵活少、不灵活
窗口必须是批次的整数倍
状态       没有
流式sql没有

6.flink应用场景

电商、市场营销

物联网(IOT)

物流配送,服务业

银行,金融

7.flink分层api

  • 有状态流处理:通过底层api (处理函数),对最原始的数据加工处理。与DataStream api集成,可以处理复杂计算
  • DataStream(流处理)/DataSet(批处理) api:封装了底层api,提供转换、连接、聚合、窗口等通用模块。在flink1.12之后,DataSet被合到DataStream里面去了,即DataStream是批流都可以处理的api
  • Table api:以表为中心的声明式编程。可以与DataStream无缝切换
  • sql:以sql查询表达式的形式表现程序,可以在table api的表上执行

简单来说,就是flink的一层层封装。

二、Flink快速上手

1.创建项目

新建一个maven项目:

2.导入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.atguigu</groupId><artifactId>FlinkTutorial-1.17</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency></dependencies>
</project>

3.创建文件夹

新建一个input文件夹,里面一个txt,随便输入一些单词

4.批处理形式的word count编写(已过时)

注:此种方式使用的是DataSet API。我们新的版本已经将批和流都统一到DataStream API中了,因此这种方式的代码编写看一看就好,已过时。

package com.atguigu.wc;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;/*** TODO DataSet API 实现 wordcount(不推荐)*/
public class BatchWordCount {public static void main(String[] args) throws Exception {// TODO 1. 创建执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// TODO 2.读取数据:从文件中读取DataSource<String> lineDS = env.readTextFile("input/word.txt");// TODO 3.切分、转换 (word,1)FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {// TODO 3.1 按照 空格 切分单词String[] wo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/104144.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【视频】Python用LSTM长短期记忆神经网络对不稳定降雨量时间序列进行预测分析|数据分享...

全文下载链接&#xff1a;http://tecdat.cn/?p23544 在本文中&#xff0c;长短期记忆网络——通常称为“LSTM”——是一种特殊的RNN递归神经网络&#xff0c;能够学习长期依赖关系&#xff08;点击文末“阅读原文”获取完整代码数据&#xff09;。 本文使用降雨量数据&#xf…

Docker file解析

文章目录 简介构建的三步骤Docker执行Dockerfile的大致流程DockerFile常用保留字指令创建第一个Dockerfile镜像的缓存特性 Docker file 解析 简介 Dockerfile是用来构建Docker镜像的文本文件&#xff0c;是由一条条构建镜像所需的指令和参数构成的脚本&#xff0c;记录了镜像构…

ffmpeg,nginx,vlc把rtsp流转hls

ffmpeg:rtsp>hls流; nginx 托管hls流服务; vlc测试hls流服务; 参考了很多相关文档和资料,由于比较乱就不在一一引用介绍了&#xff0c;下面的是实操OK的例子&#xff1b; 1&#xff09;ffmpeg (ffmpeg-4.4.1-full_build)&#xff0c;要用full版本&#xff0c;否则会缺某些…

华为数通方向HCIP-DataCom H12-821题库(单选题:61-80)

第61题 关于 BGP 的Keepalive报文消息的描述,错误的是 A、Keepalive周期性的在两个BGP邻居之间发送 B、Keepalive报文主要用于对等路由器间的运行状态和链路的可用性确认 C、Keepalive 报文只包含一个BGP数据报头 D、缺省情况下,Keepalive 的时间间隔是180s 答案&#xff…

videojs 实现自定义组件(视频画质/清晰度切换) React

前言 最近使用videojs作为视频处理第三方库&#xff0c;用来对接m3u8视频类型。这里总结一下自定义组件遇到的问题及实现&#xff0c;目前看了许多文章也不全&#xff0c;官方文档写的也不是很详细&#xff0c;自己摸索了一段时间陆陆续续完成了&#xff0c;这是实现后的效果.…

MyBatis分页与特殊字符处理

文章目录 一、分页1.1 分页插件PageHelper1.2 使用1.2.1 导入pom依赖1.2.2 Mybatis.cfg.xml配置拦截器1.2.3. 配置 Mapper.xml1.2.4 测试 二、特殊字符处理2.1 使用CDATA区段2.2 使用实体引用 一、分页 1.1 分页插件PageHelper PageHelper 是 Mybatis 的一个插件。官网 Page…

ios小组件报错:Please adopt containerBackground API

iOS 17 小组件报错:Please adopt containerBackground API 使用下面的方法解决了: 代码: extension View {func widgetBackground(_ backgroundView: some View) -> some View {if #available(iOSApplicationExtension 17.0, *) {return containerBackground(for: .wi…

【云原生】Docker私有仓库 RegistryHabor

目录 1.Docker私有仓库&#xff08;Registry&#xff09; 1.1 Registry的介绍 1.2 Registry的部署 步骤一&#xff1a;拉取相关的镜像 步骤二&#xff1a;进行 Registry的相关yml文件配置&#xff08;docker-compose&#xff09; 步骤三&#xff1a;镜像的推送 2. Regist…

k8s 安装 istio(二)

3.3 部署服务网格调用链检测工具 Jaeger 部署 Jaeger 服务 kubectl apply -f https://raw.githubusercontent.com/istio/istio/release-1.16/samples/addons/jaeger.yaml 创建 jaeger-vs.yaml 文件 apiVersion: networking.istio.io/v1alpha3 kind: VirtualService metadata…

java 高级面试题整理(薄弱技术)

session 和cookie的区别和联系 session1.什么是session Session是另一种记录客户状态的机制&#xff0c;不同的是Cookie保存在客户端浏览器中&#xff0c;而Session保存在服务器上。客户端浏览器访问服务器的时候&#xff0c;服务器把客户端信息以某种形式记录在服务器上。这就…

常用的数据可视化工具有哪些?要操作简单的

随着数据量的剧增&#xff0c;对分析效率和数据信息传递都带来了不小的挑战&#xff0c;于是数据可视化工具应运而生&#xff0c;通过直观形象的图表来展现、传递数据信息&#xff0c;提高数据分析报表的易读性。那么&#xff0c;常用的操作简单数据可视化工具有哪些&#xff1…

Dockerfile推送私有仓库的两个案例

一&#xff0c;编写Dockerfile制作Web应用系统nginx镜像&#xff0c;生成镜像nginx:v1.1&#xff0c;并推送其到私有仓库。 具体要求如下&#xff1a; &#xff08;1&#xff09;基于centos基础镜像&#xff1b; &#xff08;2&#xff09;指定作者信息&#xff1b; &#xff…

数据结构:二叉树及相关操作

文章目录 前言一、树的概念及结构1.什么是树2. 树的相关概念3.树的表示 二、二叉树概念及结构1.二叉树概念2.特殊的二叉树3.二叉树的性质4.二叉树的存储结构 三、平衡二叉树实现1.创建树和树的前中后遍历1.前中后遍历2.创建树且打印前中后遍历 2.转换为平衡二叉树和相关操作1.转…

Anaconda安装教程以及深度学习环境搭建

目录 前言 下载Anaconda 虚拟环境的搭建 在pycharm中配置现有的conda环境 CUDA简介 下载安装pytorch包 前言 最近换新笔记本了&#xff0c;要重新安装软件&#xff0c;以前本来是想要写这个教程的&#xff0c;但当时由于截图不全还要懒得再下载重装&#xff0c;就放弃了&…

高效多用的群集-Haproxy搭建Web集群

Haproxy搭建 Web 群集 一、Haproxy前言 HAProxy是一个使用c语言编写的自由及开放源代码软件&#xff0c;其提供高可用性、负载均衡&#xff0c;以及基于TcP和HrrP的应用程序代理。HAProxy特别适用于那些负载特大的web站点&#xff0c;这些站点通常又需要会话保持或七层处理。…

jmeter模拟多用户并发

一、100个真实的用户 1、一个账号模拟100虚拟用户同时登录和100账号同时登录 区别 &#xff08;1&#xff09;1个账号100个人用&#xff0c;同时登录&#xff1b; &#xff08;2&#xff09;100个人100个账号&#xff0c;同时登录。 相同 &#xff08;1&#xff09;两个都…

【ES6】—数组的扩展

一、类数组/ 伪数组 1. 类/伪数组: 并不是真正意义的数组&#xff0c;有长度的属性&#xff0c;但无法使用Array原型上的方法 let divs document.getElementsByTagName(div) console.log(divs) // HTMLCollection []let divs2 document.getElementsByClassName("xxx&q…

【业务功能篇73】分布式ID解决方案

业界实现方案 1. 基于UUID2. 基于DB数据库多种模式(自增主键、segment)3. 基于Redis4. 基于ZK、ETCD5. 基于SnowFlake6. 美团Leaf(DB-Segment、zkSnowFlake)7. 百度uid-generator() 1.基于UUID生成唯一ID UUID:UUID长度128bit&#xff0c;32个16进制字符&#xff0c;占用存储空…

Kdab QML (part9)自由缩放时钟

文章目录 Kdab QML (part9)自由缩放时钟代码详细解释运行截图 Kdab QML (part9)自由缩放时钟 代码 import QtQuick 2.15 import QtQuick.Window 2.15Window {id: rootwidth: 500height: 500visible: truecolor: "lightgrey"title: qsTr("Hello World")It…

springboot2+redis 订阅发布,解决接收消息累计线程到内存溢出,使用自定义线程池接收消息

pom 添加redis <!-- redis 缓存操作 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency> 发布消息 import lombok.extern.slf4j.Slf4j; import o…