Flink Python作业快速入门

Flink Python快速入门_实时计算 Flink版(Flink)-阿里云帮助中心


import argparse
# 用于处理命令行参数和选项,使程序能够接收用户通过命令行传递的参数
import logging
import sysfrom pyflink.common import WatermarkStrategy, Encoder, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors import (FileSource, StreamFormat, FileSink, OutputFileConfig,RollingPolicy)# WatermarkStrategy: 用于生成水印(watermarks),水印是用于处理事件时间(event time)的数据流中的延迟数据的一种机制。
# Encoder: 用于定义如何将数据编码为字节序列,通常用于数据的序列化和反序列化。
# Types: 包含了 Flink 中各种数据类型的定义,用于指定数据流中数据的类型。
# StreamExecutionEnvironment: 是所有 Flink 流处理程序的入口点,用于配置和启动流处理任务。
# RuntimeExecutionMode: 定义了流处理任务的执行模式,例如批处理模式或流处理模式。
# FileSource: 用于从文件系统中读取数据源。
# StreamFormat: 定义了数据的格式,例如 CSV、JSON 等。
# FileSink: 用于将数据写入文件系统。
# OutputFileConfig: 配置输出文件的相关设置,如前缀和后缀。
# RollingPolicy: 定义了文件滚动策略,即何时创建新的输出文件。word_count_data = ["To be, or not to be,--that is the question:--","Whether 'tis nobler in the mind to suffer","The slings and arrows of outrageous fortune","Or to take arms against a sea of troubles,","And by opposing end them?--To die,--to sleep,--","No more; and by a sleep to say we end","The heartache, and the thousand natural shocks","That flesh is heir to,--'tis a consummation","Devoutly to be wish'd. To die,--to sleep;--","To sleep! perchance to dream:--ay, there's the rub;","For in that sleep of death what dreams may come,","When we have shuffled off this mortal coil,","Must give us pause: there's the respect","That makes calamity of so long life;","For who would bear the whips and scorns of time,","The oppressor's wrong, the proud man's contumely,","The pangs of despis'd love, the law's delay,","The insolence of office, and the spurns","That patient merit of the unworthy takes,","When he himself might his quietus make","With a bare bodkin? who would these fardels bear,","To grunt and sweat under a weary life,","But that the dread of something after death,--","The undiscover'd country, from whose bourn","No traveller returns,--puzzles the will,","And makes us rather bear those ills we have","Than fly to others that we know not of?","Thus conscience does make cowards of us all;","And thus the native hue of resolution","Is sicklied o'er with the pale cast of thought;","And enterprises of great pith and moment,","With this regard, their currents turn awry,","And lose the name of action.--Soft you now!","The fair Ophelia!--Nymph, in thy orisons","Be all my sins remember'd."]def word_count(input_path, output_path):"""计算文本文件中单词的频率,并将结果输出到指定路径。该函数从指定的输入路径读取文本数据,进行单词频率统计,并将结果写入指定的输出路径。如果没有提供输入路径或输出路径,则使用默认数据或直接打印结果。参数:- input_path: 输入文本文件的路径。如果为None,则使用默认数据。- output_path: 输出结果的路径。如果为None,则直接打印结果。"""# 获取流处理环境并设置为流处理模式,设置并行度为1env = StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)env.set_parallelism(1)# 定义数据源if input_path is not None:# 从文件系统中读取数据ds = env.from_source(source=FileSource.for_record_stream_format(StreamFormat.text_line_format(),input_path).process_static_file_set().build(),watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),source_name="file_source")else:# 使用默认数据ds = env.from_collection(word_count_data)# 定义分割函数,将每行文本分割成单词def split(line):yield from line.split()# 计算单词频率ds = ds.flat_map(split) \.map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \.key_by(lambda i: i[0]) \.reduce(lambda i, j: (i[0], i[1] + j[1]))# 定义数据汇if output_path is not None:# 将结果写入文件系统ds.sink_to(sink=FileSink.for_row_format(base_path=output_path,encoder=Encoder.simple_string_encoder()).with_output_file_config(OutputFileConfig.builder().with_part_prefix("prefix").with_part_suffix(".ext").build()).with_rolling_policy(RollingPolicy.default_rolling_policy()).build())else:# 直接打印结果ds.print()# 提交作业以执行env.execute()if __name__ == '__main__':# 配置日志输出到标准输出,设置日志级别为INFO,并格式化日志消息以仅显示消息内容logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")# 创建一个ArgumentParser对象以处理命令行参数parser = argparse.ArgumentParser()# 添加可选的命令行参数,用于指定输入和输出文件parser.add_argument('--input',dest='input',required=False,help='要处理的输入文件。')parser.add_argument('--output',dest='output',required=False,help='要写入结果的输出文件。')# 获取命令行参数,排除脚本名称argv = sys.argv[1:]print("Command line arguments: ", argv)# 解析已知的命令行参数,并忽略未知参数known_args, _ = parser.parse_known_args(argv)print("known_args: ", known_args)# 调用word_count函数,传入从解析参数中获取的输入和输出文件路径word_count(known_args.input, known_args.output)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/489020.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前端(五)css属性

css属性 文章目录 css属性一、字体属性二、文本属性三、背景属性四、盒子模型 一、字体属性 font-weight:文字粗细,在100到900之间,normal(400),bord(700),inherit(继承父类) font-style:文字风格,normal表示正常(默认…

nginx反向代理(负载均衡)和tomcat介绍

nginx的代理 负载均衡 负载均衡的算法 负载均衡的架构 基于ip的七层代理 upstream模块要写在http模块中 七层代理的调用要写在location模块中 轮询 加权轮询 最小连接数 ip_Hash URL_HASH 基于域名的七层代理 配置主机 给其余客户机配置域名 给所有机器做域名映射 四层代理…

ansible自动化运维(二)playbook模式详解

相关文章ansible自动化运维(一)简介及清单,模块-CSDN博客ansible自动化运维(三)jinja2模板&&roles角色管理-CSDN博客ansible自动化运维(四)运维实战-CSDN博客 一.Ansible中的playbook模式 Playbo…

验证码功能实现

预览: 前端代码 让图片src 产生验证码图片的servlet <img src""></img> servlet代码 public void checkCode(HttpServletRequest request, HttpServletResponse response) throws IOException {ServletOutputStream os response.getOutputStream()…

Redis的哨兵机制

目录 1. 文章前言2. 基本概念2.1 主从复制的问题2.2 人工恢复主节点故障2.3 哨兵机制自动恢复主节点故障 3. 安装部署哨兵&#xff08;基于docker&#xff09;3.1 安装docker3.2 编排redis主从节点3.3 编排redis-sentinel节点 4. 重新选举5. 选举原理6. 总结 1. 文章前言 &…

Java:集合(List、Map、Set)

文章目录 1. Collection集合1-1. 迭代器遍历方式1-2. 通过for循环进行遍历1-3. forEach遍历 2. List集合2-1. ArrayList底层实现原理2-2. LinkedList底层实现原理 3. Set集合3-1. HashSet 底层实现3-2. LinkedHashSet 底层实现3-3. TreeSet 4. Collection集合->总结5. Map集…

什么是Apache日志?为什么Apache日志分析很重要?

Apache是全球最受欢迎的Web服务器软件&#xff0c;支持约30.2%的所有活跃网站。凭借其可靠性、灵活性和强大的功能&#xff0c;Apache数十年来一直是互联网的中坚力量。 一、Apache Web服务器的工作原理 Apache Web服务器的工作原理如下&#xff1a; 接收HTTP请求&#xff1…

【再谈设计模式】组合模式~层次构建的多面手

一、引言 在软件开发的世界里&#xff0c;我们经常面临着处理对象之间复杂关系的挑战。如何有效地表示对象的部分 - 整体层次结构&#xff0c;并且能够以一种统一的方式操作这些对象&#xff0c;是一个值得探讨的问题。组合模式&#xff08;Composite Pattern&#xff09;为我们…

论文翻译 | ChunkRAG: Novel LLM-Chunk Filtering Method for RAG Systems

摘要 使用大型语言模型&#xff08;LLM&#xff09;的检索-增强生成&#xff08;RAG&#xff09;系统经常由于检索不相关或松散相关的信息而生成不准确的响应。现有的在文档级别操作的方法无法有效地过滤掉此类内容。我们提出了LLM驱动的块过滤&#xff0c;ChunkRAG&#xff0…

Redis配置文件中 supervised指令

什么是Supervised&#xff1f; supervised模式允许Redis被外部进程管理器监控。通过这个选项&#xff0c;Redis能够在崩溃后自动重启&#xff0c;确保服务的高可用性。常见的进程管理器包括systemd和upstart。 开启方法 vim修改&#xff1a; sudo vi /etc/redis/redis.conf…

Android四大组件——Activity(二)

一、Activity之间传递消息 在&#xff08;一&#xff09;中&#xff0c;我们把数据作为独立的键值对进行传递&#xff0c;那么现在把多条数据打包成一个对象进行传递&#xff1a; 1.假设有一个User类的对象&#xff0c;我们先使用putExtra进行传递 activity_demo06.xml <…

SpringBoot基于Redis+WebSocket 实现账号单设备登录.

引言 在现代应用中&#xff0c;一个账号在多个设备上的同时登录可能带来安全隐患。为了解决这个问题&#xff0c;许多应用实现了单设备登录&#xff0c;确保同一个用户只能在一个设备上登录。当用户在新的设备上登录时&#xff0c;旧设备会被强制下线。 本文将介绍如何使用 Spr…

【架构】从 Socket 的角度认识非阻塞模型

文章目录 前言1. 阻塞模型2. 非阻塞模型2.1 Reactor 模型优势2.2 Reactor 模型劣势 后记 前言 近期看了很多中间件的文章&#xff0c;RocketMQ&#xff0c;Dubbo 这些中间件内部的rpc通信都用的是非阻塞的模型。(Netty)&#xff0c;这里从 Socket 的角度总结一下。 1. 阻塞模…

location和重定向、代理

location匹配的规则和优先级 在nginx当中&#xff0c;匹配的对象一般是URI来匹配 http://192.168.233.62/usr/local/nginx/html/index.html 182.168.233.61/ location匹配的分类&#xff1a; 多个location一旦匹配其中之一&#xff0c;不在匹配其他location 1、精确匹配 …

ragflow连ollama时出现的Bug

ragflow和ollama连接后&#xff0c;已经添加了两个模型但是ragflow仍然一直warn&#xff1a;Please add both embedding model and LLM in Settings &#xff1e; Model providers firstly.这里可能是我一开始拉取的镜像容器太小&#xff0c;容不下当前添加的模型&#xff0c;导…

软件测试面试问答

文章目录 什么是软件&#xff1f;软件测试工程师的工作内容什么是软件测试&#xff1f;软件开发生命周期软件开发的几个阶段软件bug的五个要素Bug的十大要素:软件测试的分类软件测试方法分类单元测试设计测试用例的主要方法什么是测试用例测试用例几大要素你的测试职业发展是什…

python学习笔记—7—变量拼接

1. 字符串的拼接 print(var_1 var_2) print("supercarry" "doinb") name "doinb" sex "man" score "100" print("sex:" sex " name:" name " score:" score) 注意&#xff1a; …

Win10环境vscode+latex+中文快速配置

安装vscodelatex workshop 配置&#xff1a; {"liveServer.settings.donotVerifyTags": true,"liveServer.settings.donotShowInfoMsg": true,"explorer.confirmDelete": false,"files.autoSave": "afterDelay","exp…

AI生成不了复杂前端页面?也许有解决方案了

在2024年&#xff0c;编程成为了人工智能领域最热门的赛道。AI编程技术正以惊人的速度进步&#xff0c;但在生成前端页面方面&#xff0c;AI的能力还是饱受质疑。自从ScriptEcho平台上线以来&#xff0c;我们收到了不少用户的反馈&#xff0c;他们表示&#xff1a;“生成的页面…

【热力学与工程流体力学】流体静力学实验,雷诺实验,沿程阻力实验,丘里流量计流量系数测定,局部阻力系数的测定,稳态平板法测定材料的导热系数λ

关注作者了解更多 我的其他CSDN专栏 过程控制系统 工程测试技术 虚拟仪器技术 可编程控制器 工业现场总线 数字图像处理 智能控制 传感器技术 嵌入式系统 复变函数与积分变换 单片机原理 线性代数 大学物理 热工与工程流体力学 数字信号处理 光电融合集成电路…