一文说清flink从编码到部署上线

引言:目前flink的文章比较多,但一般都关注某一特定方面,很少有一个文章,从一个简单的例子入手,说清楚从编码、构建、部署全流程是怎么样的。所以编写本文,自己做个记录备查同时跟大家分享一下。本文以简单的mysql cdc为例展开说明。
环境说明:MySQL:5.7;flink:1.14.0;hadoop:3.0.0;操作系统:CentOS 7.6;JDK:1.8.0_401。

1.MySQL

1.1 创建数据库和测试数据

数据库脚本:

CREATE DATABASE `flinktest`;
USE `flinktest`;
CREATE TABLE `products` (`id` int(11) NOT NULL AUTO_INCREMENT,`name` varchar(255) NOT NULL,`description` varchar(512) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8mb4;insert  into `products`(`id`,`name`,`description`) values 
(1,'aaa','aaaa'),
(2,'ccc','ccc'),
(3,'dd','ddd'),
(4,'eeee','eee'),
(5,'ffff','ffff'),
(6,'hhhh','hhhh'),
(7,'iiii','iiii'),
(8,'jjjj','jjjj');

账号使用root就行。

1.2 开启binlog

参考:https://core815.blog.csdn.net/article/details/144233298
踩坑:测试过程中发现mysql 9.0一直无法获取更新的数据,最终使用的5.7。

2.编码

2.1 主要实现

package com.zl;import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;import java.util.Arrays;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;import static com.mysql.cj.conf.PropertyKey.useSSL;public class MysqlExample {public static void main(String[] args) throws Exception {List<String> SYNC_TABLES = Arrays.asList("flinktest.products");MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("10.86.37.169").port(3306).databaseList("flinktest").tableList(String.join(",", SYNC_TABLES)).username("root").password("pwd").startupOptions(StartupOptions.initial()).deserializer(new JsonDebeziumDeserializationSchema()).build();/// 配置flink访问页面-开始/* Configuration config = new Configuration();// 启用 Web UI,访问地址【http://ip:port】config.setBoolean("web.ui.enabled", true); config.setString(RestOptions.BIND_PORT,"8081");
//        这个使用jar直接运行可以,如果提交给yarn会报错,需要改为getExecutionEnvironment()StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);*////配置flink访问页面-结束StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);/// 设置CK存储-开始(不需要可注释掉)// hadoop部署见:https://core815.blog.csdn.net/article/details/144022938// hdfs访问地址见:/home/hadoop-3.3.3/etc/hadoop/core-site.xmlenv.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000"+"/flinktest/");env.getCheckpointConfig().setCheckpointInterval(3000);/// 设置CK存储-结束// 如果不能正常读取mysql的binlog://①可能是mysql没有打开binlog或者mysql版本不支持(当前在mysql5.7.20环境下,功能正常);// ②可能是数据库ip、port、账号、密码错误。env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source").setParallelism(1).print();env.execute("Print MySQL Snapshot + Binlog");}}

2.2 依赖

<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.zl.flinkcdc</groupId><artifactId>FlickCDC</artifactId><packaging>jar</packaging><version>1.0-SNAPSHOT</version><name>FlickCDC</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink-version>1.14.0</flink-version><flink-cdc-version>2.4.0</flink-cdc-version><hadoop.version>3.0.0</hadoop.version><slf4j.version>1.7.25</slf4j.version><log4j.version>2.16.0</log4j.version></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink-version}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink-cdc-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-guava</artifactId><version>30.1.1-jre-15.0</version></dependency><!--<dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-guava</artifactId><version>18.0-13.0</version></dependency>--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink-version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.11</artifactId><version>${flink-version}</version></dependency><!--        hadoop相关依赖--><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version><scope>provided</scope><exclusions><exclusion><artifactId>commons-cli</artifactId><groupId>commons-cli</groupId></exclusion><exclusion><artifactId>commons-compress</artifactId><groupId>org.apache.commons</groupId></exclusion><exclusion><artifactId>guava</artifactId><groupId>com.google.guava</groupId></exclusion><exclusion><artifactId>jackson-annotations</artifactId><groupId>com.fasterxml.jackson.core</groupId></exclusion><exclusion><artifactId>jackson-core</artifactId><groupId>com.fasterxml.jackson.core</groupId></exclusion><exclusion><artifactId>jackson-databind</artifactId><groupId>com.fasterxml.jackson.core</groupId></exclusion><exclusion><artifactId>slf4j-api</artifactId><groupId>org.slf4j</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version><scope>provided</scope><exclusions><exclusion><artifactId>asm</artifactId><groupId>org.ow2.asm</groupId></exclusion><exclusion><artifactId>avro</artifactId><groupId>org.apache.avro</groupId></exclusion><exclusion><artifactId>commons-cli</artifactId><groupId>commons-cli</groupId></exclusion><exclusion><artifactId>commons-codec</artifactId><groupId>commons-codec</groupId></exclusion><exclusion><artifactId>commons-compress</artifactId><groupId>org.apache.commons</groupId></exclusion><exclusion><artifactId>commons-io</artifactId><groupId>commons-io</groupId></exclusion><exclusion><artifactId>commons-lang3</artifactId><groupId>org.apache.commons</groupId></exclusion><exclusion><artifactId>commons-logging</artifactId><groupId>commons-logging</groupId></exclusion><exclusion><artifactId>commons-math3</artifactId><groupId>org.apache.commons</groupId></exclusion><exclusion><artifactId>guava</artifactId><groupId>com.google.guava</groupId></exclusion><exclusion><artifactId>jackson-databind</artifactId><groupId>com.fasterxml.jackson.core</groupId></exclusion><exclusion><artifactId>jaxb-api</artifactId><groupId>javax.xml.bind</groupId></exclusion><exclusion><artifactId>log4j</artifactId><groupId>log4j</groupId></exclusion><exclusion><artifactId>nimbus-jose-jwt</artifactId><groupId>com.nimbusds</groupId></exclusion><exclusion><artifactId>slf4j-api</artifactId><groupId>org.slf4j</groupId></exclusion><exclusion><artifactId>slf4j-log4j12</artifactId><groupId>org.slf4j</groupId></exclusion><exclusion><artifactId>zookeeper</artifactId><groupId>org.apache.zookeeper</groupId></exclusion><exclusion><artifactId>jsr305</artifactId><groupId>com.google.code.findbugs</groupId></exclusion><exclusion><artifactId>gson</artifactId><groupId>com.google.code.gson</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>${hadoop.version}</version><scope>provided</scope><exclusions><exclusion><artifactId>commons-cli</artifactId><groupId>commons-cli</groupId></exclusion><exclusion><artifactId>guava</artifactId><groupId>com.google.guava</groupId></exclusion><exclusion><artifactId>jackson-databind</artifactId><groupId>com.fasterxml.jackson.core</groupId></exclusion></exclusions></dependency><dependency><groupId>commons-cli</groupId><artifactId>commons-cli</artifactId><version>1.5.0</version></dependency><!--mvn install:install-file -Dfile=D:/maven/flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar-DgroupId=org.apache.flink -DartifactId=flink-shaded-hadoop-3 -Dversion=3.1.1.7.2.9.0-173-9.0 -Dpackaging=jar--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-3</artifactId><version>3.1.1.7.2.9.0-173-9.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><addClasspath>true</addClasspath><mainClass>com.zl.MysqlExample</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>

完整代码见:https://gitee.com/core815/flink-cdc-mysql

3.打包

mvn版本:3.5.4。
到pom.xml所在路径,执行“mvn package”
在这里插入图片描述
打包效果:
在这里插入图片描述

4.jar直接运行

java -jar FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar

在这里插入图片描述

5.flink yarn运行

hadoop、flink、yarn环境见:https://core815.blog.csdn.net/article/details/144022938

把FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar放到“/home”路径下。

执行下面命令:

flink run-application -t yarn-application -Dparallelism.default=1 -Denv.java.opts=" -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" -Dtaskmanager.memory.process.size=1g -Dyarn.application.name="FlinkCdcMysql"  -Dtaskmanager.numberOfTaskSlots=1 -c com.zl.MysqlExample /home/FlickCDC-1.0-SNAPSHOT-jar-with-dependencies.jar

控制台看到如下打印:
在这里插入图片描述
yarn管理页面:
在这里插入图片描述
运行日志查看步骤:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
下面即可看到完整日志:
在这里插入图片描述

6.常见问题

6.1 问题1

日志错误:
The MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match the configured timezone Asia/Shanghai. Specify the right server-time-zone to avoid inconsistencies for time-related fields.

解决:
修改my.cnf文件。
[mysqld]
default-time-zone=‘Asia/Shanghai’
重启MySQL服务。

6.2 问题2:hdfs

日志错误:
Permission denied: user=PC2023, access=WRITE, inode=“/”:root:supergroup:drwxr-xr-x

解决:

临时解决
hadoop fs -chmod -R 777 /

6.3 问题3:guava30 guava18冲突

分析:
flink 1.13 cdc2.3的组合容易出这个问题。

解决:
参考:https://developer.aliyun.com/ask/574901
flink 使用1.14.0版本;cdc使用2.4.0版本。

6.4 问题4

日志错误:
/user/root/.flink/application_1733492706887_0002/log4j.properties could only be written to 0 of the 1 minReplication nodes

解决:
https://www.pianshen.com/article/1554707968/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/487669.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

过滤器Filter,ajax异步请求,服务器响应的数据类型,json

1.过滤器Filter 按照过滤规则筛选出想要的资源 很多地方都需要判断是否登录&#xff0c;对每个资源进行判断&#xff0c;非常麻烦&#xff0c;可以使用过滤器在访问这些资源前进行判断。 案例&#xff1a; package com.ghx.filter;import javax.servlet.*; import javax.ser…

【网络协议栈】TCP/IP协议栈中重要协议和技术(DNS、ICMP、NAT、代理服务器、以及内网穿透)

每日激励&#xff1a;“请给自己一个鼓励说&#xff1a;Jack我很棒&#xff01;—Jack” 绪论​&#xff1a; 本章是TCP/IP网络协议层的完结篇&#xff0c;本章将主要去补充一些重要的协议和了解一些网络中常见的名词&#xff0c;具体如&#xff1a;DNS、ICMP、NAT、代理服务器…

服务器数据恢复—LINUX下各文件系统删除/格式化的数据恢复可行性分析

Linux操作系统是世界上流行的操作系统之一&#xff0c;被广泛用于服务器、个人电脑、移动设备和嵌入式系统。Linux系统下数据被误删除或者误格式化的问题非常普遍。下面北亚企安数据恢复工程师简单聊一下基于linux的文件系统&#xff08;EXT2/EXT3/EXT4/Reiserfs/Xfs&#xff0…

因果推荐CIKM24 | 通过偏好感知因果干预和反事实数据增强来提升序列推荐

论文来源&#xff1a;CIKM 24 论文链接&#xff1a;PACIFIC: Enhancing Sequential Recommendation via Preference-aware Causal Intervention and Counterfactual Data Augmentation | Proceedings of the 33rd ACM International Conference on Information and Knowledge …

如何在 Odoo18 视图中添加关联数据看板按钮 | 免费开源ERP实施诀窍

文 / 开源智造 Odoo亚太金牌服务 引言 关联数据看板按钮乃是 Odoo 当中的一项强效功能&#xff0c;它容许用户顺遂地访问相关记录&#xff0c;或者直接从模型的表单视图施行特定操作。它们为用户给予了对重要信息的疾速访问途径&#xff0c;并简化了工作流程&#xff0c;由此…

提升网站流量的关键:AI在SEO关键词优化中的应用

内容概要 在当今数字时代&#xff0c;提升网站流量已成为每个网站管理员的首要任务。而人工智能的技术进步&#xff0c;为搜索引擎优化&#xff08;SEO&#xff09;提供了强有力的支持&#xff0c;尤其是在关键词优化方面。关键词是连接用户需求与网站内容的桥梁&#xff0c;其…

腾讯图标/百并发

腾讯新图标&#xff0c;识别速度7毫秒&#xff0c; 百并发无压力

python和C++中的逻辑与/或、位与/或

在 Python 和 C 中&#xff0c;“与”和“或”的实现逻辑相似&#xff0c;但符号和使用方式有区别。 1.Python 中的与、或 与&#xff08;AND&#xff09;&#xff1a;and或&#xff08;OR&#xff09;&#xff1a;or 1.1 逻辑与、或&#xff1a; 用于布尔值&#xff08;Tr…

PR基本操作

将剪辑添加到序列 1.在项目面板中选择素材&#xff0c;右击插入或覆盖选项&#xff0c;添加的素材依指针所在位置为起点。 上图画框位置会影响素材插入的轨道。 2.直接拖动素材到对应的时间轴轨道即可 3.拖动素材到节目监视器 在此项前插入&#xff1a;在V1轨道当前指针所…

如何配置Github并在本地提交代码

前提: 可以流畅访问github, 需要一些上网技巧, 这就自行处理了 申请一个github账号 Github官网地址 首先就是邮箱注册啦, github没有对邮箱的限制, 只要是能收邮件的就ok, qq邮箱, 163等都可以使用. 然后和普通注册账号一样, 一路填写需要的信息, 验证邮箱即可. 如何新增代…

nacos服务注册流程

一、客户端自动注册实例流程 1.首先客户端需要引入服务发现包 <groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId><version>2.2.6.RELEASE</version>2. NacosServiceRegistryA…

qemu安装arm64架构银河麒麟

qemu虚拟化软件&#xff0c;可以在一个平台上模拟另一个硬件平台&#xff0c;可以支持多种处理器架构。 一、安装 安装教程&#xff1a;https://blog.csdn.net/qq_36035382/article/details/125308044 下载链接&#xff1a;https://qemu.weilnetz.de/w64/2024/ 我下载的是 …

小程序维护外包流程和费用

由于某些原因很多老板想要跟换掉小程序原来合作的开发公司&#xff0c;重新把小程序系统维护外包新的公司。小程序系统外包维护是一个涉及多个方面的过程&#xff0c;需要从需求明确、选择团队到持续优化等多个环节进行细致管理。以下就是小程序系统外包维护主要包括几个关键步…

C—指针初阶(2)

如果看完阁下满意的话&#xff0c;能否一键三连呢&#xff0c;我的动力就是大家的支持与肯定&#xff0c;冲&#xff01; 二级指针 我们先看概念以及作用&#xff1a;用来存放一级指针的地址的指针 先看例子&#xff0c;我们逐一分析 我们先分析上面那个“1” 标注那里&#x…

x64dbg 安装使用教程

x64dbg的安装与配置 x64dbg官网地址&#xff1a;https://x64dbg.com/#start x64dbg界面介绍 1.反汇编窗口 这个位置显示的是需要分析的程序的反汇编代码。在第一个区域的最左侧例如“7712EAA3”这一列就是内存地址区域&#xff0c;接着“E8 07”就是汇编指令的opcode&#xff…

视频推拉流EasyDSS无人机直播技术巡查焚烧、烟火情况

焚烧作为一种常见的废弃物处理方式&#xff0c;往往会对环境造成严重污染。因此&#xff0c;减少焚烧、推广绿色能源和循环经济成为重要措施。通过加强森林防灭火队伍能力建设与长效机制建立&#xff0c;各地努力减少因焚烧引发的森林火灾&#xff0c;保护生态环境。 巡察烟火…

禾赛激光雷达AT_128_P_A02激光时间同步配置

sudo apt update sudo apt-get install linuxptp ethtool sudo ethtool -T 网卡名字(ifconfig可以查看)网卡名即连接激光雷达之后配置的ip所对应的网卡名称 一般来说看到如下提示信息才正常&#xff1a; lzslzs-Precision-3591:~$ ethtool -T enp0s31f6 Time stamping paramet…

java项目部署相关记录

部署 简单部署(Windows) redis我使用小皮面板开的 mysql也是 maven打包 先打包,打包成功,找到对应jar包 在文件夹打开,然后再次目录上开cmd java -jar jplan-0.0.1-SNAPSHOT.jar需要注意的是,windows的java环境要和idea项目里边的java是一致的 运行成功! 属性配置方式…

v-for遍历多个el-popover;el-popover通过visible控制显隐;点击其他隐藏el-popover

场景:el-popover通过visible控制显隐;同时el-popover是遍历生成的多个。 原文档的使用visible后就不能点击其他地方使其隐藏;同时解决实现点击其他区域隐藏 <template><div><template v-for="(item,index) in arr" :key="index"><…

【Ubuntu】清理、压缩VirtualBox磁盘空间大小

1、说明 本人为虚拟机创建了两个硬盘:root.vdi 和 hoom.vdi,在创建虚拟机时,分别挂载在/root目录和/home目录下。 下面演示分别清理、压缩两个磁盘的空间。 2、清理空间 1)清理 root.vid sudo dd if=/dev/zero of=/EMPTY bs=1M;sudo rm -f /EMPTY输出信息中会提示,如…