Flink的环境搭建及使用

在idea中创建一个Maven项目,导入Flink的依赖,在代码中创建Flink环境,编写代码.

如果不想去找flink依赖,就去flink官网,提供了一个mvn的命令,快速下载在本地构建一个flink的项目,可以直接从这个项目的pom.xml文件中拿到依赖配置

一、环境搭建

pom.xml文件的依赖导入

<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.15.4</flink.version><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.17.1</log4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies>


二、使用Flink

以WordCount为例:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class Demo1WordCount {public static void main(String[] args) throws Exception {//1、创建flink的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度,一个并行度对应一个taskenv.setParallelism(2);//修改数据从上游发送到下游的缓存时间env.setBufferTimeout(2000);/** 无界流*///2、读取数据//nc -lk 8888DataStream<String> linesDS = env.socketTextStream("master", 8888);//一行转换成多行DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> out) throws Exception {for (String word : line.split(",")) {//将数据发送到下游out.collect(word);}}});//转换成kv格式DataStream<Tuple2<String, Integer>> kvDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String word) throws Exception {//返回一个二元组return Tuple2.of(word, 1);}});//按照单词进行分组//底层是hash分区KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> kv) throws Exception {return kv.f0;}});//统计数量DataStream<Tuple2<String, Integer>> countDS = keyByDS.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> kv1,Tuple2<String, Integer> kv2) throws Exception {int count = kv1.f1 + kv2.f1;return Tuple2.of(kv1.f0, count);}});//打印结果countDS.print();//3、启动flinkenv.execute("wc");}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/464982.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[mysql]修改表和课后练习

目录 DDL数据定义语言 添加一个字段 添加一个字段到最后一个 添加到表中的第一个一个字段 选择其中一个位置: 修改一个字段:数据类型,长度,默认值(略) 重命名一个字段 删除一个字段 重命名表 删除表 清空表 DCL中事务相关内容 DCL中COMMIT和ROLLBACK的讲解 对比TR…

SpringBoot+ClickHouse集成

前面已经完成ClickHouse的搭建&#xff0c;创建账号&#xff0c;创建数据库&#xff0c;保存数据库等&#xff0c;接下来就是在SpringBoot项目中集成ClickHouse。 一&#xff0c;引入依赖 <!-- SpringBoot集成ClickHouse --> <dependency><groupId>com.baom…

搜维尔科技:【煤矿虚拟仿真】煤矿企业、高校、科研单位-多语言支持、数字孪生、交互式学习体验

品牌&#xff1a;SouVR 发票&#xff1a;支持专票、普票 单位&#xff1a;套 版本号&#xff1a;1.0 包装清单&#xff1a;软件1套 软件形式&#xff1a;U盘、光盘 运行环境&#xff1a;windows 应用对象&#xff1a;煤矿企业、高校、科研单位 系统配置&#xff1a;…

(五)Spark大数据开发实战:灵活运用PySpark常用DataFrame API

目录 一、PySpark 二、数据介绍 三、PySpark大数据开发实战 1、数据文件上传HDFS 2、导入模块及数据 3、数据统计与分析 ①、计算演员参演电影数 ②、依次罗列电影番位前十的演员 ③、按照番位计算演员参演电影数 ④、求每位演员所有参演电影中的最早、最晚上映时间及…

单链表的实现(数据结构)

一. 单链表的实现 我们在上一篇中简单的认识了链表的组成和结构&#xff0c;并打印出链表&#xff0c;那么今天就来具体实现一下单链表对于数据增加、删减、插入等。 接下来就是我们在链表中对于数据的增、删、插的实现&#xff0c;对于我们的链表来说在任何地方增加数据都需…

Golang | Leetcode Golang题解之第540题有序数组中的单一元素

题目&#xff1a; 题解&#xff1a; func singleNonDuplicate(nums []int) int {low, high : 0, len(nums)-1for low < high {mid : low (high-low)/2mid - mid & 1if nums[mid] nums[mid1] {low mid 2} else {high mid}}return nums[low] }

算法: 链表题目练习

文章目录 链表题目练习两数相加两两交换链表中的节点重排链表合并 K 个升序链表K 个一组翻转链表 总结 链表题目练习 两数相加 坑: 两个链表都遍历完后,可能需要进位. class Solution {public ListNode addTwoNumbers(ListNode l1, ListNode l2) {ListNode cur1 l1;ListNode…

深入Pillow:处理图像下载中的意外挑战

在当今数字化时代&#xff0c;获取和处理图像数据已经成为了许多应用程序的核心功能。从社交媒体到电子商务&#xff0c;图像的获取和处理对于用户体验至关重要。下载图片不仅能够丰富我们的内容&#xff0c;还能够通过分析图像数据为我们的应用提供更多价值。然而&#xff0c;…

零基础Java第十三期:继承与多态(一)

目录 一、继承 1.1. 继承的目的 1.2. 继承的概念 1.3. 继承的语法 1.4. 父类的访问 1.5. 继承中的重载与重写 1.6. 子类的构造方法 1.7. 再谈初始化 一、继承 1.1. 继承的目的 我们来定义一个Dog和Cat的类&#xff1a; public class Dog {public int age;public Strin…

【ONLYOFFICE文档】8.2版本测评

文章目录 引言ONLYOFFICE 产品简介PDF编辑器新功能1.协作编辑 PDF&#xff0c;让团队合作更高效2.为 PDF 表单添加签名3.改进的简洁界面4.性能优化&#xff1a;更快、更高效的体验 文档编辑器中的新功能域代码功能&#xff1a;自动更新文档中的动态数据协作功能&#xff1a;轻松…

【JAVA】java 企业微信信息推送

前言 JAVA中 将信息 推送到企业微信 // 企微消息推送messageprivate String getMessage(String name, String problemType, String pushResults, Long orderId,java.util.Date submitTime, java.util.Date payTime) {String message "对接方&#xff1a;<font color\…

【RK3588 Linux 5.x 内核编程】-GPIO设备驱动与点亮LED

GPIO设备驱动与点亮LED 文章目录 GPIO设备驱动与点亮LED1、Linux中的GPIO介绍2、GPIO库和工具3、Linux内核GPIO操作步骤3.1 验证GPIO3.2 请求GPIO3.3 导出GPIO到sysfs(可选)3.4 设置GPIO为输入/输出3.5 更改GPIO的电平(值)3.6 读取GPIO的值(电平)3.7 释放GPIO4、GPIO驱动…

金华迪加 现场大屏互动系统 mobile.do.php 任意文件上传漏洞复现

0x01 产品简介 金华迪加现场大屏互动系统是一种集成了先进技术和创意设计的互动展示解决方案,旨在通过大屏幕和多种交互方式,为观众提供沉浸式的互动体验。该系统广泛应用于各类活动、展览、会议等场合,能够显著提升现场氛围和参与者的体验感。 0x02 漏洞概述 金华迪加 现…

[VUE]框架网页开发1 本地开发环境安装

前言 其实你不要看我的文章比较长&#xff0c;但是他就是很长&#xff01;步骤其实很简单&#xff0c;主要是为新手加了很多解释&#xff01; 步骤一&#xff1a;下载并安装 Node.js 访问 Node.js 官网&#xff1a; Node.js — Download Node.js 下载 Windows 64 位版本&…

[signal] void QComboBox::currentTextChanged(const QString text)

[signal] void QComboBox::currentTextChanged(const QString &text) This signal is sent whenever currentText changes. The new value is passed as text. This function was introduced in Qt 5.0. Note: Notifier signal for property currentText. 属性currentText的…

Unity中实现伤害飘字或者提示飘字效果(DoTween实现版本)

&#xff01;&#xff01;&#xff01;在实现以下效果之前&#xff0c;一定要往项目中导入DoTween插件。 一、搭建测试场景 1、在场景中新建一个带有Text组件的游戏物体A&#xff0c;并把这个游戏物体A中Text组件的Color属性中alpha值为0&#xff0c;让文字在场景中隐藏。 …

掌握PyQt5图形界面化工具及绑定爬虫程序

PyQT5——图形化界面 文章目录 PyQT5——图形化界面集成化图形界面工具为什么使用 \$ProjectFileDir$?示例场景其他 Varaiablespyuic参数解释整体含义示例使用PyQt5和pyuic 创建pyqt5的程序创建一个窗口app.exec\_()和sys.exit(app.exec_())的区别1. app.exec_()2. sys.exit(a…

从零开始在本地服务器上安装OnlyOffice并进行跨地域协同编辑文件

文章目录 前言1. 安装Docker2. 本地安装部署ONLYOFFICE3. 安装cpolar内网穿透4. 固定OnlyOffice公网地址 前言 本篇文章讲解如何使用Docker在本地Linux服务器上安装ONLYOFFICE&#xff0c;并结合cpolar内网穿透实现公网访问本地部署的文档编辑器与远程协作。 Community Editi…

20241102在荣品PRO-RK3566开发板的预置Android13下适配宸芯的数传模块CX6603N

20241102在荣品PRO-RK3566开发板的预置Android13下适配宸芯的数传模块CX6603N 2024/11/2 18:04 在WIN10使用程序&#xff1a;ViewLink-4.0.7_0708-windows-x64.exe 在荣品PRO-RK3566开发板的预置Android13下使用&#xff1a;ViewLink-2023_12_21-release-0.2.6.apk adb install…

智能AI合同审查系统如何优化合同风险管理的案例解读

在合同管理和合规性要求日趋严格的法律行业&#xff0c;智能合同审查系统能够大幅提升合同数据管理的效率和准确性。法律行业中&#xff0c;合同涉及金额、产品参数和条款细节较多&#xff0c;同时对合规性有极高的要求。特别是在高度受监管的行业&#xff08;如金融、医疗、制…