Flink流批一体计算(16):PyFlink DataStream API

目录

概述

Pipeline Dataflow

代码示例WorldCount.py

执行脚本WorldCount.py


概述

Apache Flink 提供了 DataStream API,用于构建健壮的、有状态的流式应用程序。它提供了对状态和时间细粒度控制,从而允许实现高级事件驱动系统。

用户实现的Flink程序是由Stream和Transformation这两个基本构建块组成。

Stream是一个中间结果数据,而Transformation是一个操作,它对一个或多个输入Stream进行计算处理,输出一个或多个结果Stream。

当一个Flink程序被执行的时候,它会被映射为Streaming Dataflow。一个Streaming Dataflow是由一组Stream和Transformation Operator组成,它类似于一个DAG图,在启动的时候从一个或多个Source Operator开始,结束于一个或多个Sink Operator。

 FlinkKafkaConsumer是一个Source OperatorMapKeyByTimeWindowApplyTransformation OperatorRollingSink是一个Sink Operator

Pipeline Dataflow

Flink中,程序是并行和分布式的方式运行。一个Stream可以被分成多个Stream分区(Stream Partitions),一个Operator可以被分成多个Operator Subtask

Flink内部有一个优化的功能,根据上下游算子的紧密程度来进行优化。

紧密度低的算子则不能进行优化,而是将每一个Operator Subtask放在不同的线程中独立执行。一个Operator的并行度,等于Operator Subtask的个数,一个Stream的并行度(分区总数)等于生成它的Operator的并行度。

 紧密度高的算子可以进行优化,优化后可以将多个Operator Subtask串起来组成一个Operator Chain,实际上就是一个执行链,每个执行链会在TaskManager上一个独立的线程中执行。

图中上半部分表示的是将Source和map两个紧密度高的算子优化后串成一个Operator Chain,实际上一个Operator Chain就是一个大的Operator的概念。图中的Operator Chain表示一个Operator,keyBy表示一个Operator,Sink表示一个Operator,它们通过Stream连接,而每个Operator在运行时对应一个Task,也就是说图中的上半部分有3个Operator对应的是3个Task。

图中下半部分是上半部分的一个并行版本,对每一个Task都并行化为多个Subtask,这里只是演示了2个并行度,Sink算子是1个并行度。

代码示例WorldCount.py

在本章中,你将学习如何使用 PyFlink 和 DataStream API 构建一个简单的流式应用程序。

编写一个简单的 Python DataStream 作业。

该程序读取一个 csv 文件,计算词频,并将结果写到一个结果文件中。

import argparse
import logging
import sys
from pyflink.common import WatermarkStrategy, Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors import (FileSource, StreamFormat, FileSink, OutputFileConfig,RollingPolicy)word_count_data = ["To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despis'd love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscover'd country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."]def word_count(input_path, output_path):env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.BATCH)# write all the data to one fileenv.set_parallelism(1)# define the sourceif input_path is not None:ds = env.from_source(source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),input_path).process_static_file_set().build(),watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),source_name="file_source")else:print("Executing word_count example with default input data set.")print("Use --input to specify file input.")ds = env.from_collection(word_count_data)def split(line):yield from line.split()# compute word countds = ds.flat_map(split) \.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \.key_by(lambda i: i[0]) \.reduce(lambda i, j: (i[0], i[1] + j[1]))# define the sinkif output_path is not None:ds.sink_to(sink=FileSink.for_row_format(base_path=output_path,encoder=Encoder.simple_string_encoder()).with_output_file_config(OutputFileConfig.builder().with_part_prefix("prefix").with_part_suffix(".ext").build()).with_rolling_policy(RollingPolicy.default_rolling_policy()).build())else:print("Printing result to stdout. Use --output to specify output path.")ds.print()# submit for executionenv.execute()if __name__ == '__main__':logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")parser = argparse.ArgumentParser()parser.add_argument('--input',dest='input',required=False,help='Input file to process.')parser.add_argument('--output',dest='output',required=False,help='Output file to write results to.')argv = sys.argv[1:]known_args, _ = parser.parse_known_args(argv)word_count(known_args.input, known_args.output)

执行脚本WorldCount.py

python word_count.py

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/109248.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Qt 获取文件图标、类型 QFileIconProvider

Qt中获取系统图标、类型是通过QFileIconProvider来实现的,具体如下: 一、Qt获取系统文件图标1、获取文件夹图标QFileIconProvider icon_provider;QIcon icon icon_provider.icon(QFileIconProvider::Folder);2、获取指定文件图标QFileInfo file_info(n…

Django基础6——数据模型关系

文章目录 一、基本了解二、一对一关系三、一对多关系3.1 增删改查3.2 案例:应用详情页3.2 案例:新建应用页 四、多对多关系4.1 增删改查4.2 案例:应用详情页4.3 案例:部署应用页 一、基本了解 常见数据模型关系: 一对一…

vue使用vant中的popup层,在popup层中加搜索功能后,input框获取焦点 ios机型的软键盘不会将popup顶起来的问题

1.使用vant的popup弹出层做了一个piker的选择器,用户需要在此基础上增加筛选功能。也就是输入框 2.可是在ios机型中,input框在获取焦点以后,ios的软键盘弹起会遮盖住我们的popup层,导致体验不是很好 3.在大佬的解答及帮助下,采用窗口滚动的方式解决此方法 <Popupv-model&q…

AxureRP制作静态站点发布互联网,内网穿透实现公网访问

AxureRP制作静态站点发布互联网&#xff0c;内网穿透实现公网访问 文章目录 AxureRP制作静态站点发布互联网&#xff0c;内网穿透实现公网访问前言1.在AxureRP中生成HTML文件2.配置IIS服务3.添加防火墙安全策略4.使用cpolar内网穿透实现公网访问4.1 登录cpolar web ui管理界面4…

Java线程池UncaughtExceptionHandler无效?可能是使用方式不对

背景 在业务处理中&#xff0c;使用了线程池来提交任务执行&#xff0c;但是今天修改了一小段代码&#xff0c;发现任务未正确执行。而且看了相关日志&#xff0c;也并未打印结果。 源码简化版如下&#xff1a; 首先&#xff0c;自定义了一个线程池 public class NamedThrea…

Linux环境下SVN服务器的搭建与公网访问:使用cpolar端口映射的实现方法

文章目录 前言1. Ubuntu安装SVN服务2. 修改配置文件2.1 修改svnserve.conf文件2.2 修改passwd文件2.3 修改authz文件 3. 启动svn服务4. 内网穿透4.1 安装cpolar内网穿透4.2 创建隧道映射本地端口 5. 测试公网访问6. 配置固定公网TCP端口地址6.1 保留一个固定的公网TCP端口地址6…

vue3:使用:图片生成二维码并复制

实现在 vue3 中根据 url 生成一个二维码码&#xff0c;且可以复制。 注&#xff09;复制功能 navigator.clipboard.write 只能在安全的localhost 这种安全网络下使用。https中需要添加安全证书&#xff0c;且在域名&#xff08;例&#xff1a;https://www.baidu.com&#xff0…

家政服务小程序制作教程:从设计到开发的详细步骤

在当今的数字化时代&#xff0c;小程序已经成为了一种趋势&#xff0c;不仅提供了方便快捷的应用体验&#xff0c;也成为了各种行业进行营销和客户管理的有力工具。特别是对于家政行业&#xff0c;通过小程序的应用&#xff0c;可以更好地进行业务管理&#xff0c;提升服务质量…

k8s的学习篇1

一 k8s的概念 1.1 k8s k8s是一个轻量级的&#xff0c;用于管理容器化应用和服务的平台。通过k8s能够进行应用的自动化部署和扩容缩容。 1.2 k8s核心部分 1.prod: 最小的部署单元&#xff1b;一组容器的集合&#xff1b;共享网络&#xff1b;生命周期是短暂的&#xff1b; …

nginx配置keepalive长连接

nginx之keepalive详解与其配置_keepalive_timeout_恒者走天下的博客-CSDN博客 为什么要有keepalive? 因为每次建立tcp都要建立三次握手&#xff0c;消耗时间较长&#xff0c;所以为了减少tcp建立连接需要的时间&#xff0c;就可以设置keep_alive长连接。 nginx中keep_alive对…

Docker网络-探索容器网络如何相互通信

当今世界&#xff0c;企业热衷于容器化&#xff0c;这需要强大的网络技能来正确配置容器架构&#xff0c;因此引入了 Docker Networking 的概念。Docker 是一种容器化平台&#xff0c;允许您在独立、轻量级的容器中运行应用程序和服务。Docker 提供了一套强大的网络功能&#x…

时序预测 | MATLAB实现基于TSO-XGBoost金枪鱼算法优化XGBoost的时间序列预测(多指标评价)

时序预测 | MATLAB实现基于TSO-XGBoost金枪鱼算法优化XGBoost的时间序列预测(多指标评价) 目录 时序预测 | MATLAB实现基于TSO-XGBoost金枪鱼算法优化XGBoost的时间序列预测(多指标评价)预测效果基本介绍程序设计参考资料 预测效果 基本介绍 Matlab实现基于TSO-XGBoost金枪鱼算…

大数据项目实战(安装Hive)

一&#xff0c;搭建大数据集群环境 1.3 安装Hive 1.3.1 Hive的安装 1.安装MySQL服务 1&#xff09;检查是否安装MySQL&#xff0c;如安装将其卸载。卸载命令 rpm -qa | grep mysql 2&#xff09;搜索MySQL文件夹&#xff0c;如存在则删除 find / -name mysql rm -rf /etc/s…

Ceph入门到精通-如何编译安装Quagga?

Quagga 1. 理论部分 1.1 软件简介 Quagga中文翻译斑驴&#xff0c;是一种先进的路由软件包&#xff0c;提供一套基于TCP/IP的路由协议。 1.2 斑驴的应用场景 – 使得操作系统变成专业的路由 – 使得操作系统具有与传统路由通过路由协议直接对接 1.3 斑驴支持的路由协议 …

2分钟搭建自己的GPT网站

如果觉得官方免费的gpt&#xff08;3.5&#xff09;体验比较差&#xff0c;总是断开&#xff0c;或者不会fanqiang&#xff0c;那你可以自己搭建一个。但前提是你得有gpt apikey。年初注册的还有18美金的额度&#xff0c;4.1号后注册的就没有额度了。不过也可以自己充值。 有了…

五度易链最新“产业大数据服务解决方案”亮相,打造数据引擎,构建智慧产业

快来五度易链官网 点击网址【http://www.wdsk.net/】 看看我们都发布了哪些新功能!!! 自2015年布局产业大数据服务行业以来&#xff0c;“五度易链”作为全国产业大数据服务行业先锋企业&#xff0c;以“让数据引领决策&#xff0c;以智慧驾驭未来”为愿景&#xff0c;肩负“打…

Nginx全局配置

目录 一、修改启动进程数 二、日制分割 三、nginx进程的优先级&#xff08;work进程的优先级&#xff09; 四、http设置 4.1http 协议配置说明 4.2mime 4.3 server块构建虚拟主机 4.4 location 一、修改启动进程数 worker_processes 1; #允许的启动工作进程数数量…

webrtc的Sdp中的Plan-b和UnifiedPlan

在一些类似于视频会议场景下&#xff0c;媒体会话参与者需要接收或者发送多个流&#xff0c;例如一个源端&#xff0c;同时发送多个左右音轨的音频&#xff0c;或者多个摄像头的视频流&#xff1b;在2013年&#xff0c;提出了2个不同的SDP IETF草案Plan B和Unified Plan&#x…

【C++初阶】list的常见使用操作

&#x1f466;个人主页&#xff1a;Weraphael ✍&#x1f3fb;作者简介&#xff1a;目前学习C和算法 ✈️专栏&#xff1a;C航路 &#x1f40b; 希望大家多多支持&#xff0c;咱一起进步&#xff01;&#x1f601; 如果文章对你有帮助的话 欢迎 评论&#x1f4ac; 点赞&#x1…

Jumpserver堡垒机管理(安装和相关操作)-------从小白到大神之路之学习运维第89天

第四阶段 时 间&#xff1a;2023年8月28日 参加人&#xff1a;全班人员 内 容&#xff1a; Jumpserver堡垒机管理 目录 一、堡垒机简介 &#xff08;一&#xff09;运维常见背黑锅场景 &#xff08;二&#xff09;背黑锅的主要原因 &#xff08;三&#xff09;解决背黑…