Flink定制化功能开发,demo代码

前言:

       这是一个Flink自定义开发的基础教学。本文将通过flink的DataStream模块API,以kafka为数据源,构建一个基础测试环境;包含一个kafka生产者线程工具,一个自定义FilterFunction算子,一个自定义MapFunction算子,用一个flink任务的代码逻辑,将实时读kafka并多层处理串起来;让读者体会通过Flink构建自定义函数的技巧。

一、Flink的开发模块分析

Flink提供四个基础模块:核心SDK开发API分别是处理实时计算的DataStream和处理离线计算的DataSet;基于这两个SDK,在其上包装了TableAPI开发模块的SDK;在Table API之上,定义了高度抽象可用SQL开发任务的FlinkSQL。在核心开发API之下,还有基础API的接口,可用于对时间,状态,算子等最细粒度的特性对象做操作,如包装自定义算子的ProcessWindowFunction和ProcessFunction等基础函数以及内置的对象状态StateTtlConfig;

FLINK开发API关系结构如下:

二、定制化开发Demo演示

2.1 场景介绍

Flink实时任务的的通用技术架构是消息队列中间件+Flink任务:

将数据采集到Kafka或pulser这类队列中间件的Topic,然后使用Flink内置的kafkaSource,监控Topic的数据情况,做实时处理。

  1. 这里提供一个kafka的生产者线程,可以自定义构建需要的数据和上传时间,用于控制写入kafka的数据源;
  2. 重写两个DataStream的基础算子:FilterFunction和MapFunction,用于让读者体会,如何对FLINK函数的重新包装,后续更基础的函数原理一样;我这里用String数据对象做处理,减少对象转换的SDK引入,通常要基于业务做数据polo的加工,这个自己处理,将对象换成业务对象;
  3. 然后使用Flink将整个业务串起来,从kafka读数据,经过两层处理,最终输出需要的结果;

2.2 本地demo演示

2.2.1 pom文件

这里以flink1.14.6+scala1.12版本为例:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.example</groupId><artifactId>flinkCDC</artifactId><version>1.0-SNAPSHOT</version></parent><artifactId>flinkStream</artifactId><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink-version>1.14.6</flink-version><scala-version>2.12</scala-version><hadop-common-version>2.9.1</hadop-common-version><elasticsearch.version>7.6.2</elasticsearch.version><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.17.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots></snapshots></repository></repositories><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink-version}</version><!--            <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala-version}</artifactId><version>${flink-version}</version><!--            <scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala-version}</artifactId><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions><version>${flink-version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.1</version></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><createDependencyReducedPom>false</createDependencyReducedPom><artifactSet><excludes><exclude>org.apache.flink:flink-shaded-force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>myflinkml.DataStreamJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build>
</project>
2.2.2 kafka生产者线程方法

package org.example.util;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.*;/*** 向kafka生产数据** @author i7杨* @date 2024/01/12 13:02:29*/public class KafkaProducerUtil extends Thread {private String topic;public KafkaProducerUtil(String topic) {super();this.topic = topic;}private static Producer<String, String> createProducer() {// 通过Properties类设置Producer的属性Properties properties = new Properties();
//        测试环境 kafka 配置properties.put("bootstrap.servers", "ip2:9092,ip:9092,ip3:9092");properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");return new KafkaProducer<String, String>(properties);}@Overridepublic void run() {Producer<String, String> producer = createProducer();Random random = new Random();Random random2 = new Random();while (true) {int nums = random.nextInt(10);int nums2 = random.nextInt(50);
//            double nums2 = random2.nextDouble();String time = new Date().getTime() / 1000 + 5 + "";String type = "pv";try {if (nums2 % 2 == 0) {type = "pv";} else {type = "uv";}
//                String info = "{\"user\":" + nums + ",\"item\":" + nums * 10 + ",\"category\":" + nums2 + ",\"pv\":" + nums2 * 5 + ",\"ts\":\"" + time + "\"}";String info = nums + "=" + nums2;System.out.println("message : " + info);producer.send(new ProducerRecord<String, String>(this.topic, info));} catch (Exception e) {e.printStackTrace();}System.out.println("=========数据已经写入==========");try {sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {new KafkaProducerUtil("test01").run();}public static void sendMessage(String topic, String message) {Producer<String, String> producer = createProducer();producer.send(new ProducerRecord<String, String>(topic, message));}}
2.2.3 自定义基础函数

这里自定义了filter和map两个算子函数,测试逻辑按照数据结构变化:

自定义FilterFunction函数算子:阈值小于40的过滤掉

package org.example.funtion;import org.apache.flink.api.common.functions.FilterFunction;/*** FilterFunction重构** @author i7杨* @date 2024/01/12 13:02:29*/public class InfoFilterFunction implements FilterFunction<String> {private double threshold;public InfoFilterFunction(double threshold) {this.threshold = threshold;}@Overridepublic boolean filter(String value) throws Exception {if (value.split("=").length == 2)// 阈值过滤return Double.valueOf(value.split("=")[1]) > threshold;else return false;}
}

自定义MapFunction函数:后缀为2的,添加上特殊信息

package org.example.funtion;import org.apache.flink.api.common.functions.MapFunction;public class ActionMapFunction implements MapFunction<String, String> {@Overridepublic String map(String value) throws Exception {System.out.println("value:" + value);if (value.endsWith("2"))return value.concat(":Special processing information");else return value;}
}
2.2.4 flink任务代码

任务逻辑:使用kafka工具产生数据,然后监控kafka的topic,讲几个函数串起来,输出结果;

package org.example.service;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.example.funtion.ActionMapFunction;
import org.example.funtion.InfoFilterFunction;import java.util.*;public class FlinkTestDemo {public static void main(String[] args) throws Exception {// 设置执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// Kafka 配置Properties kafkaProps = new Properties();kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip1:9092,ip2:9092,ip3:9092");kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group");kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 创建 Kafka 消费者FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("test01",// Kafka 主题名称new SimpleStringSchema(),kafkaProps);// 从 Kafka 中读取数据流DataStream<String> kafkaStream = env.addSource(kafkaConsumer);env.disableOperatorChaining();kafkaStream.filter(new InfoFilterFunction(40)).map(new ActionMapFunction()).print("阈值大于40以上的message=");// 执行任务env.execute("This is a testing task");}}

运行结果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/239204.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Tiktok/抖音旋转验证码识别

一、引言 在数字世界的飞速发展中&#xff0c;安全防护成为了一个不容忽视的课题。Tiktok/抖音&#xff0c;作为全球最大的短视频平台之一&#xff0c;每天都有数以亿计的用户活跃在其平台上。为了保护用户的账号安全&#xff0c;Tiktok/抖音引入了一种名为“旋转验证码”的安…

win10系统postgresql重装软件后原数据如何迁移

1、备份postgresql安装目录下的data文件夹 2、重新安装postgresql同一版本的软件 3、停止postgresql-x64-12服务 4、替换data文件夹 删除postgresql安装后新的的data文件夹 删除后将第一步备份的data文件夹粘贴过来&#xff0c;还是同一位置 5、启动postgresql-x64-12服务 …

[笔记]深度学习入门 基于Python的理论与实现(一)

代码仓库 gitee 1. python 入门 1.5之前是python安装和基础语法, 我直接跳过了 1.5 Numpy 深度学习中经常出现数组和矩阵运算&#xff0c;Numpy 的数组类 numpy.array 提供了很多便捷的方法 1.5.1 导入 Numpy import numpy as np1.5.2 生成 Numpy 数组 np.array()&#xf…

C语言从入门到实战——联合体和枚举

联合体和枚举 前言一、 联合体1.1 联合体类型的声明1.2 联合体的特点1.3 相同成员的结构体和联合体对比1.4 联合体大小的计算1.5 联合的一个练习 二、枚举类型2.1 枚举类型的声明2.2 枚举类型的优点2.3 枚举类型的使用 前言 C语言中&#xff0c;联合体&#xff08;union&#…

Shape-IoU——综合考量边框形状与尺度的度量

今天看到一篇文章主要是提出了一种更有效的IOU度量方法&#xff0c;论文地址在这里&#xff0c;如下所示&#xff1a; 摘要 边界盒回归损失作为检测器定位分支的重要组成部分&#xff0c;在目标检测任务中起着重要作用。现有的边界框回归方法通常考虑GT框和预测框之间的几何关…

【深度学习每日小知识】Logistic Loss 逻辑回归

逻辑回归的损失函数 线性回归的损失函数是平方损失。逻辑回归的损失函数是对数损失&#xff0c;定义如下&#xff1a; L o g L o s s ∑ ( x , y ) ∈ D − y log ⁡ ( y ′ ) − ( 1 − y ) log ⁡ ( 1 − y ′ ) LogLoss\sum_{(x,y)\in D}-y\log(y)-(1-y)\log(1-y) LogLoss…

Spring Task 任务调度工具

大家好我是苏麟 , 今天聊聊Spring Task 任务调度工具 Spring Task Spring Task 是Spring框架提供的任务调度工具&#xff0c;可以按照约定的时间自动执行某个代码逻辑。 定位&#xff1a;定时任务框架 作用&#xff1a;定时自动执行某段Java代码 什么是定时任务 ? 通过时…

IDEA连接Github⭐️使用Git工具上传本地文件到远程仓库

环境准备 已安装IDEA开发工具&#xff0c;Git版本管理工具&#xff0c;已注册GitHub账号 需要先准备好这些环境&#xff0c;可以自行搜索教程&#xff0c;下面的安装是基于这里的环境上操作的 目录 一、需要提供SSH公钥 ​二、Github配置SSH公钥 ​三、IDEA配置连接 四、连…

微信小程序分销商城在线搭建源码系统:轻松无忧 带完整的安装部署教程

随着移动互联网的快速发展&#xff0c;微信小程序作为一种新的应用形态&#xff0c;已经深入到人们生活的方方面面。尤其在电商领域&#xff0c;微信小程序分销商城为商家提供了一种全新的销售渠道。然而&#xff0c;对于许多不具备技术背景的商家来说&#xff0c;如何快速搭建…

Flutter-Web从0到部署上线(实践+埋坑)

本文字数&#xff1a;7743字 预计阅读时间&#xff1a;60分钟 01 前言 首先说明一下&#xff0c;这篇文章是给具备Flutter开发经验的客户端同学看的。Flutter 的诞生虽然来自 Google 的 Chrome 团队&#xff0c;但大家都知道 Flutter 最先支持的平台是 Android 和 iOS&#xff…

使用WAF防御网络上的隐蔽威胁之CSRF攻击

在网络安全领域&#xff0c;除了常见的XSS&#xff08;跨站脚本&#xff09;攻击外&#xff0c;CSRF&#xff08;跨站请求伪造&#xff09;攻击也是一种常见且危险的威胁。这种攻击利用用户已经验证的身份在没有用户知情的情况下&#xff0c;执行非授权的操作。了解CSRF攻击的机…

ASP.NET Core列表增删改查

前置要求&#xff1a; 1. vueelement-plus实现前端静态页面 HelloWorld.vue <template><h2>hello界面</h2><div class"tableList"><!-- 搜索框 --><el-row :gutter"20"><el-col :span"8"><!-- 搜…

科研绘图(八)线性热图

线性热图&#xff08;Linear Heat Map&#xff09;是一种数据可视化技术&#xff0c;用于展示数值在一维线性空间上的分布情况。它通常用于展示沿着一条线&#xff08;例如时间线或任何一维序列&#xff09;的数据密度或强度变化。线性热图与传统的二维热图不同&#xff0c;后者…

day2·算法-快乐数-有效三角形个数

今天又来更新啦&#xff0c;准备蓝桥杯的小伙伴可以和我一起来刷题&#xff0c;建议大家先看题&#xff0c;整理出思路&#xff0c;再看如何用简单的写法将思路构建出来&#xff0c;然后优化细节&#xff0c;找到解决某些例外出现的方法&#xff0c;从而成功解答这道题。 快乐…

adb wifi 远程调试 安卓手机 命令

使用adb wifi 模式调试需要满足以下前提条件&#xff1a; 手机 和 PC 需要在同一局域网下。手机需要开启开发者模式&#xff0c;然后打开 USB 调试模式。 具体操作步骤如下&#xff1a; 将安卓手机通过 USB 线连接到 PC。&#xff08;连接的时候&#xff0c;会弹出请求&#x…

深入探索CSS动画的魅力-附带动画实例

一、网页动画发展简史 GIF动画 GIF全称为“Graphics Interchange Format”&#xff0c;是一种基于LZW算法的连续色调无损压缩格式。 由于其文件小、无损压缩、易于播放等优点&#xff0c;GIF成为了网页动画的最初选择。然而&#xff0c;GIF动画的色彩数量和帧数有限&#xff…

Python数据分析案例32——财经新闻爬虫和可视化分析

案例背景 很多同学的课程作业都是需要自己爬虫数据然后进行分析&#xff0c;这里提供一个财经新闻的爬虫案例供学习。本案例的全部数据和代码获取可以参考&#xff1a;财经新闻数据 数据来源 新浪财经的新闻网&#xff0c;说实话&#xff0c;他这个网站做成这样就是用来爬虫的…

k8s集群配置NodeLocal DNSCache

一、简介 当集群规模较大时&#xff0c;运行的服务非常多&#xff0c;服务之间的频繁进行大量域名解析&#xff0c;CoreDNS将会承受更大的压力&#xff0c;可能会导致如下影响&#xff1a; 延迟增加&#xff1a;有限的coredns服务在解析大量的域名时&#xff0c;会导致解析结果…

海外云手机助力企业拓展海外市场

在当前全球化的商业环境中&#xff0c;由于政策限制&#xff0c;许多企业面临着无法顺利将产品推广到国外的困境&#xff0c;使得海外市场的机遇白白流失。而随着科技的不断创新&#xff0c;一种解决企业海外拓展困境的工具应运而生&#xff0c;那就是海外云手机。本文将深入探…

【python】搭配Miniconda使用VSCode

现在的spyder总是运行出错&#xff0c;启动不了&#xff0c;尝试使用VSCode。 一、在VSCode中使用Miniconda管理的Python环境&#xff0c;可以按照以下步骤进行&#xff1a; a. 确保Miniconda环境已经安装并且正确配置。 b. 打开VSCode&#xff0c;安装Python扩展。 打开VS…