flink实现复杂kafka数据读取

接上文:一文说清flink从编码到部署上线
环境说明:MySQL:5.7;flink:1.14.0;hadoop:3.0.0;操作系统:CentOS 7.6;JDK:1.8.0_401。

常见的文章中,kafka数据结构相对简单,本文根据实际项目数据,说明怎样读取解析复杂kafka数据。并将解析的数据输出到控制台。

1.模拟数据

1.1 模拟数据

{"reportFormat": "2","reportVersion": 1,"reports": [{"filename": "1733277155032RvReport","c": {"objStationInfo": {"sStationName": "LLP入口","ucStationDir": 1,"sStationID": 500001},"objVehicle": {"sUUID": "fdabd178-a169-11eb-9483-b95959072a9d","w64Timestamp": "1733881971628","objRfidInfo": {"sReaderID": "10","objTagData": {"sTID": "1234567891","sEPC": "1234567890"}},"ucReportType": "8","ucVehicleType": "1"}}}]
}

1.2 添加到kafka

使用kafka工具,kafkatool2,具体操作如下:
连接到kafka:
在这里插入图片描述
连接成功:
在这里插入图片描述
添加数据:
在这里插入图片描述
在这里插入图片描述
添加成功:
在这里插入图片描述

2.代码实现

2.1 EnvUtil实现

EnvUtil用于创建flink的运行环境。

package com.zl.utils;import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import java.time.Duration;
import java.time.ZoneOffset;
import java.util.concurrent.TimeUnit;/*** EnvUtil* @description:*/
public class EnvUtil {/*** 设置flink执行环境* @param parallelism 并行度*/public static StreamExecutionEnvironment setFlinkEnv(int parallelism) {// System.setProperty("HADOOP_USER_NAME", "用户名") 对应的是 hdfs文件系统目录下的路径:/user/用户名的文件夹名,本文为rootSystem.setProperty("HADOOP_USER_NAME", "root");Configuration conf = new Configuration();conf.setInteger("rest.port", 1000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);if (parallelism >0 ){//设置并行度env.setParallelism(parallelism);} else {env.setParallelism(1);// 默认1}// 添加重启机制env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, Time.minutes(6)));// 启动checkpoint,设置模式为精确一次 (这是默认值),10*60*1000=60000env.enableCheckpointing(600000, CheckpointingMode.EXACTLY_ONCE);//rocksdb状态后端,启用增量checkpointenv.setStateBackend(new EmbeddedRocksDBStateBackend(true));//设置checkpoint路径CheckpointConfig checkpointConfig = env.getCheckpointConfig();// 同一时间只允许一个 checkpoint 进行(默认)checkpointConfig.setMaxConcurrentCheckpoints(1);//最小间隔,10*60*1000=60000checkpointConfig.setMinPauseBetweenCheckpoints(60000);// 取消任务后,checkpoint仍然保存checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//checkpoint容忍失败的次数checkpointConfig.setTolerableCheckpointFailureNumber(5);//checkpoint超时时间 默认10分钟checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));//禁用operator chain(方便排查反压)env.disableOperatorChaining();return env;}public static StreamTableEnvironment getFlinkTenv(StreamExecutionEnvironment env) {StreamTableEnvironment tenv = StreamTableEnvironment.create(env);//设置时区 东八tenv.getConfig().setLocalTimeZone(ZoneOffset.ofHours(8));Configuration configuration = tenv.getConfig().getConfiguration();// 开启miniBatchconfiguration.setString("table.exec.mini-batch.enabled", "true");// 批量输出的间隔时间configuration.setString("table.exec.mini-batch.allow-latency", "5 s");// 防止OOM设置每个批次最多缓存数据的条数,可以设为2万条configuration.setString("table.exec.mini-batch.size", "20000");// 开启LocalGlobalconfiguration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");//设置TTL API指定tenv.getConfig().setIdleStateRetention(Duration.ofHours(25));return tenv;}}

2.2 FlinkSourceUtil实现

FlinkSourceUtil用于连接kafka。

package com.zl.kafka.domain;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;/*** @desc:*/
@ToString
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RvTable {private String uniqueId;//flink生成的唯一键private long reportTime;// 过车时间private String dt;                           // 分区字段private String dh;                           // 小时private String reportFormat;private int reportVersion;private String filename;public String sStationName;    // 采集点名称public String ucStationDir;     // 采集点方向编号public String sStationID;      // 采集点编号private String sUUID;private long w64Timestamp;     //事件时间(毫秒级别)private String sReaderID;//射频设备(模块)代码private String sTIDR;private String sEPCR;private int ucReportType;//8->视频 2->射频 138,202->视频+射频private int ucVehicleType;public void parseTableColunm() {this.reportTime = this.w64Timestamp;this.uniqueId = this.sUUID;}
}

2.3 RvTable实现

RvTable解析数据最后存储的model。

package com.zl.kafka.domain;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.commons.lang3.StringUtils;/*** @desc:*/
@ToString
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RvTable {private String uniqueId;//flink生成的唯一键private long reportTime;// 过车时间private String dt;                           // 分区字段private String dh;                           // 小时private String reportFormat;private int reportVersion;private String filename;public String sStationName;    // 采集点名称public String ucStationDir;     // 采集点方向编号public String sStationID;      // 采集点编号private String sUUID;private long w64Timestamp;     //事件时间(毫秒级别)private String sReaderID;//射频设备(模块)代码private String sTIDR;private String sEPCR;private int ucReportType;//8->视频 2->射频 138,202->视频+射频private int ucVehicleType;public void parseTableColunm() {this.reportTime = this.w64Timestamp;this.uniqueId = this.sUUID;}}

2.4 核心逻辑实现

package com.zl.kafka;import com.alibaba.fastjson.JSON;
import com.zl.kafka.domain.RvTable;
import com.zl.utils.EnvUtil;
import com.zl.utils.FlinkSourceUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.text.SimpleDateFormat;
import java.util.Date;public class KafkaExample {public static void main(String[] args) throws Exception {// 配置运行环境,并行度1StreamExecutionEnvironment env = EnvUtil.setFlinkEnv(1);// 程序间隔离,每个程序单独设置env.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000/flinktest/KafkaExample");/// 读取kafka数据SingleOutputStreamOperator<String> rvSourceStream = env.addSource(FlinkSourceUtil.getKafkaSource("rvGroup","rv-test","10.86.97.21:9092","earliest"))// earliest/latest.setParallelism(1).uid("getRV").name("getRV");// 解析转换数据格式SingleOutputStreamOperator<String> rvParseStream = null;try {rvParseStream = rvSourceStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) {if (StringUtils.isEmpty(value)) {return;}parseRVData(value, out);}}).setParallelism(1).uid("rvParse").name("rvParse");} catch (Exception e) {e.printStackTrace();}rvParseStream.print();env.execute("rvParseJob");}// mainpublic static void parseRVData(String jsonStr, Collector<String> out) {try {if (StringUtils.isEmpty(jsonStr) || !isJSON(jsonStr)) {return;}JSONObject in = JSONObject.parseObject(jsonStr);// =====报告头信息 =====String reportFormat = stringDefaultIfEmpty(in.getString("reportFormat"));int reportVersion = intDefaultIfEmpty(in.getInteger("reportVersion"));JSONArray reports = in.getJSONArray("reports");if (reports != null) {for (int i = 0; i < reports.size(); i++) {RvTable rvTable = new RvTable();JSONObject record = reports.getJSONObject(i);if (record != null) {String filename = stringDefaultIfEmpty(record.getString("filename"));JSONObject c = record.getJSONObject("c");if (c != null) {// ===== 采集点信息 =====JSONObject objStationInfo = c.getJSONObject("objStationInfo");if(objStationInfo != null) {rvTable.setSStationID(stringDefaultIfEmpty(objStationInfo.getString("sStationID")));rvTable.setSStationName(stringDefaultIfEmpty(objStationInfo.getString("sStationName")));rvTable.setUcStationDir(stringDefaultIfEmpty(objStationInfo.getString("ucStationDir")));}JSONObject objVehicle = c.getJSONObject("objVehicle");if (objVehicle != null) {// ===== 车辆报告信息 =====rvTable.setSUUID(stringDefaultIfEmpty(objVehicle.getString("sUUID")));rvTable.setW64Timestamp(objVehicle.getLong("w64Timestamp"));rvTable.setUcReportType(intDefaultIfEmpty(objVehicle.getInteger("ucReportType")));rvTable.setUcVehicleType(intDefaultIfEmpty(objVehicle.getInteger("ucVehicleType")));// ===== 车辆报告信息/射频车辆信息 =====JSONObject objRfidInfo = objVehicle.getJSONObject("objRfidInfo");if (objRfidInfo != null) {rvTable.setSReaderID(stringDefaultIfEmpty(objRfidInfo.getString("sReaderID")));JSONObject objTagData = objRfidInfo.getJSONObject("objTagData");if (objTagData != null) {rvTable.setSTIDR(stringDefaultIfEmpty(objTagData.getString("sTID")));rvTable.setSEPCR(stringDefaultIfEmpty(objTagData.getString("sEPC")));}}// ===== 自加特殊处理字段 =====long timestamp = rvTable.getW64Timestamp();SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH");Date date = new Date(timestamp);String[] s = simpleDateFormat.format(date).split(" ");rvTable.setDt(s[0]);rvTable.setDh(s[1]);out.collect(JSONObject.toJSONString(rvTable));}// if (objVehicle != null)}// if (c != null)}// if (record != null)}// for 循环}} catch (Exception e) {e.printStackTrace();// 此处把解析后的数据存储到数据库……}}// parseRVDatapublic static boolean isJSON(String str) {boolean result;try {JSON.parse(str);result = true;} catch (Exception e) {result = false;}return result;}public static int intDefaultIfEmpty(Integer num) {if (num == null) {num = 0;return num;}return num;}public static String stringDefaultIfEmpty(String str) {return StringUtils.defaultIfEmpty(str, "ENULL");}public static Long longDefaultIfEmpty(Long num) {if (num == null) {num = 0l;return num;}return num;}public static Double doubleDefaultIfEmpty(Double num) {if (num == null) {num = 0.0;return num;}return num;}
}

2.5 pom.xml

注意修改此处:
在这里插入图片描述

3.运行效果

3.1 运行日志

在这里插入图片描述

3.2 web UI

访问:http://IP:1000/
在这里插入图片描述
在这里插入图片描述

4.部署

相关构建、部署,参考:一文说清flink从编码到部署上线
部署脚本:

flink run-application -t yarn-application -Dparallelism.default=1 -Denv.java.opts=" -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" -Dtaskmanager.memory.process.size=1g -Dyarn.application.name="FlinkCdcKafka"  -Dtaskmanager.numberOfTaskSlots=1 -c com.zl.kafka.KafkaExample /home/FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar

在这里插入图片描述

5. 完整代码

完整代码见:https://gitee.com/core815/flink-cdc-mysql

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/493865.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

第十五届蓝桥杯Scratch01月stema选拔赛—排序

排序 具体要求&#xff1a; 1). 点击绿旗&#xff0c;在舞台上出现4张点数不同的扑克牌&#xff0c;牌上的点数是随机的&#xff08;4-9点&#xff09;&#xff0c;如图所示&#xff1b; 完整题目可点击下方链接查看&#xff1a; 排序_scratch_嗨信奥-玩嗨信息奥林匹克竞赛-…

图形学笔记 - 5. 光线追踪2 - 加速结构

目录 使用AABB加速光线追踪 Uniform Spatial Partitions (Grids) 均匀空间划分 空间划分 KD树预处理 KD-Tree数据结构 遍历kd树 对象划分 & Bounding Volume Hierarchy 层次包围盒 BVH BVH遍历 空间划分与物体划分呢 GTC news: DLSS、RTXGI 实时光线追踪 使用AAB…

计算机毕业设计原创定制(免费送源码):NodeJS+MVVM+MySQL 樱花在线视频网站

目 录 摘要 1 1 绪论 1 1.1研究背景 1 1.2系统设计思想 1 1.3B/S体系工作原理 1 1.4node.js主要功能 2 1.5论文结构与章节安排 3 2 樱花在线视频网站分析 4 2.1 可行性分析 4 2.2 系统流程分析 4 2.2.1数据增加流程 5 2.3.2数据修改流程 5 2.3.3数据删除流程 5 …

SpringBoot 启动类 SpringApplication 二 run方法

配置 在Program arguments配置2个参数&#xff1a;--server.port8081 --spring.profiles.activedev。 run方法 run方法执行结束代表SpringBoot启动完成&#xff0c;即完成加载bean。 // ConfigurableApplicationContext 是IOC容器 public ConfigurableApplicationContext ru…

如何调大unity软件的字体

一、解决的问题&#xff1a; unity软件的字体太小&#xff0c;怎么调大点&#xff1f;二、解决方法&#xff1a; 1.操作步骤&#xff1a; 打开Unity编辑器> Edit>preferences> UI Scaling>Use custom scaling value&#xff08;取消勾选“使用默认桌面设置”&…

SYD881X RTC定时器事件在调用timeAppClockSet后会出现比较大的延迟

RTC定时器事件在调用timeAppClockSet后会出现比较大的延迟 这里RTC做了两个定时器一个是12秒,一个是185秒: #define RTCEVT_NUM ((uint8_t) 0x02)//当前定时器事件数#define RTCEVT_12S ((uint32_t) 0x0000002)//定时器1s事件 /*整分钟定时器事件&#xff0c;因为其余的…

内置函数.

日期函数 current_date/time() 日期/时间 获得年月日&#xff1a; 获得时分秒&#xff1a; 获得时间戳&#xff1a;日期时间 now()函数 体会date(datetime)的用法&#xff1a;只显示日期 在日期的基础上加日期&#xff1a;按照日历自动计算 关键字为 intervalinterval 后的数值…

PHP 微信棋牌开发全解析:高级教程

PHP - 多维数组详解 多维数组是 PHP 中一种强大的数据结构&#xff0c;指的是一个数组的元素中可以包含一个或多个数组。它常用于存储复杂的嵌套数据&#xff0c;如表格数据或多层次关系的数据结构。 注释&#xff1a; 数组的维度表示您需要指定索引的层级数&#xff0c;以访问…

【Java】递归算法

递归的本质&#xff1a; 方法调用自身。 案例1. 斐波那契数列 1 1 2 3 5 8 13 21 .. f(n)f(n-1)f(n-2) 方法的返回值&#xff1a; 只要涉及到加减乘除&#xff0c;就是int,其他的就是void。 案例2. 青蛙跳台 青蛙一次可以跳一级台阶&#xff0c;也可以跳两级台阶&#xff…

JVM简介—1.Java内存区域

大纲 1.运行时数据区的介绍 2.运行时数据区各区域的作用 3.各个版本内存区域的变化 4.直接内存的使用和作用 5.站在线程的角度看Java内存区域 6.深入分析堆和栈的区别 7.方法的出入栈和栈上分配、逃逸分析及TLAB 8.虚拟机中的对象创建步骤 9.对象的内存布局 10.对象的…

大腾智能CAD:国产云原生三维设计新选择

在快速发展的工业设计领域&#xff0c;CAD软件已成为不可或缺的核心工具。它通过强大的建模、分析、优化等功能&#xff0c;不仅显著提升了设计效率与精度&#xff0c;还促进了设计思维的创新与拓展&#xff0c;为产品从概念构想到实体制造的全过程提供了强有力的技术支持。然而…

设计模式の享元模板代理模式

文章目录 前言一、享元模式二、模板方法模式三、代理模式3.1、静态代理3.2、JDK动态代理3.3、Cglib动态代理3.4、小结 前言 本篇是关于设计模式中享元模式、模板模式、以及代理模式的学习笔记。 一、享元模式 享元模式是一种结构型设计模式&#xff0c;目的是为了相似对象的复用…

Linux网络功能 - 服务和客户端程序CS架构和简单web服务示例

By: fulinux E-mail: fulinux@sina.com Blog: https://blog.csdn.net/fulinus 喜欢的盆友欢迎点赞和订阅! 你的喜欢就是我写作的动力! 目录 概述准备工作扫描服务端有那些开放端口创建客户端-服务器设置启动服务器和客户端进程双向发送数据保持服务器进程处于活动状态设置最小…

用人话讲计算机:Python篇!(十五)迭代器、生成器、装饰器

一、迭代器 &#xff08;1&#xff09;定义 标准解释&#xff1a;迭代器是 Python 中实现了迭代协议的对象&#xff0c;即提供__iter__()和 __next__()方法&#xff0c;任何实现了这两个方法的对象都可以被称为迭代器。 所谓__iter__()&#xff0c;即返回迭代器自身 所谓__…

小程序快速实现大模型聊天机器人

需求分析&#xff1a; 基于大模型&#xff0c;打造一个聊天机器人&#xff1b;使用开放API快速搭建&#xff0c;例如&#xff1a;讯飞星火&#xff1b;先实现UI展示&#xff0c;在接入API。 最终实现效果如下&#xff1a; 一.聊天机器人UI部分 1. 创建微信小程序&#xff0c…

【Android】unzip aar删除冲突classes再zip

# 解压JAR文件 jar xf your-library.jar # 解压AAR文件&#xff08;AAR实际上是ZIP格式&#xff09; unzip your-library.aar # 删除不需要的类 rm -rf path/to/com/example/unwanted/ # 对于JAR打包 jar cf your-library-modified.jar -C path/to/unzipped/ . # 对于AAR打包…

使用C语言编写UDP循环接收并打印消息的程序

使用C语言编写UDP循环接收并打印消息的程序 前提条件程序概述伪代码C语言实现编译和运行C改进之自由设定端口注意事项在本文中,我们将展示如何使用C语言编写一个简单的UDP服务器程序,该程序将循环接收来自指定端口的UDP消息,并将接收到的消息打印到控制台。我们将使用POSIX套…

你的第一个博客-第一弹

使用 Flask 开发博客 Flask 是一个轻量级的 Web 框架&#xff0c;适合小型应用和学习项目。我们将通过 Flask 开发一个简单的博客系统&#xff0c;支持用户注册、登录、发布文章等功能。 步骤&#xff1a; 安装 Flask 和其他必要库&#xff1a; 在开发博客之前&#xff0c;首…

Vue(二)

1.Vue生命周期 Vue生命周期就是一个Vue实例从 创建 到 销毁 的整个过程。生命周期四个阶段&#xff1a; 创建阶段&#xff1a;创建响应式数据。 挂载阶段&#xff1a;渲染模板。 更新阶段&#xff1a;修改数据&#xff0c;更新视图。 销毁阶段&#xff1a;销毁Vue实例。 …

macOS 配置 vscode 命令行启动

打开 vscode 使用 cmd shift p 组合快捷键&#xff0c;输入 install 点击 Install ‘code’ command in PATH Ref https://code.visualstudio.com/docs/setup/mac