Flink Flink数据写入Kafka

一、环境准备

flink 1.14写入Kafka,首先在pom.xml文件中导入相关依赖

    <properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.14.6</flink.version><spark.version>2.4.3</spark.version><hadoop.version>2.8.5</hadoop.version><hbase.version>1.4.9</hbase.version><hive.version>2.3.5</hive.version><java.version>1.8</java.version><scala.version>2.11.8</scala.version><mysql.version>8.0.22</mysql.version><scala.binary.version>2.11</scala.binary.version><maven.compiler.source>${java.version}</maven.compiler.source><maven.compiler.target>${java.version}</maven.compiler.target></properties>
        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>${flink.version}</version></dependency>

二、Flink将Socket数据写入Kafka(精准一次)

注意:如果要使用 精准一次 写入 Kafka,需要满足以下条件,缺一不可
1、开启 checkpoint
2、设置事务前缀
3、设置事务超时时间: checkpoint 间隔 < 事务超时时间 < max 的 15 分钟

package com.flink.DataStream.Sink;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Properties;public class flinkSinkKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();streamExecutionEnvironment.setParallelism(1);// 如果是精准一次,必须开启 checkpointstreamExecutionEnvironment.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);DataStreamSource<String> streamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);/*** TODO Kafka Sink* TODO 注意:如果要使用 精准一次 写入 Kafka,需要满足以下条件,缺一不可* 1、开启 checkpoint* 2、设置事务前缀* 3、设置事务超时时间: checkpoint 间隔 < 事务超时时间 < max 的 15 分钟*/Properties properties=new Properties();properties.put("transaction.timeout.ms",10 * 60 * 1000 + "");KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口.setBootstrapServers("localhost:9092")// 指定序列化器:指定Topic名称、具体的序列化(产生方需要序列化,接收方需要反序列化).setRecordSerializer(KafkaRecordSerializationSchema.<String>builder().setTopic("testtopic01")// 指定value的序列化器.setValueSerializationSchema(new SimpleStringSchema()).build())// 写到 kafka 的一致性级别: 精准一次、至少一次.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 如果是精准一次,必须设置 事务的前缀.setTransactionalIdPrefix("flinkkafkasink-")// 如果是精准一次,必须设置 事务超时时间: 大于 checkpoint间隔,小于 max 15 分钟.setKafkaProducerConfig(properties)//.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "").build();streamSource.sinkTo(kafkaSink);streamExecutionEnvironment.execute();}
}

查看ProduceerConfig配置
在这里插入图片描述

三、启动Zookeeper、Kafka

#启动zookeeper
${ZK_HOME}/bin/zkServer.sh start
#查看zookeeper状态
${ZK_HOME}/bin/zkServer.sh status
#启动kafka
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties
#查看topic
${KAFKA_HOME}/bin/kafka-topics.sh --list --zookeeper localhost:2181
#创建topic
${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic testtopic02 --partitions 2 --replication-factor 1
#删除topic
${KAFKA_HOME}/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic testtopic02
#生产消息
${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testtopic01
#消费消息
${KAFKA_HOME}/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testtopic01 --from-beginning

通过socket模拟数据写入Flink之后,Flink将数据写入Kafka
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/211640.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

案例052:用于日语词汇学习的微信小程序

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;SSM JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder X 小程序…

AWS基于x86 vs Graviton(ARM)的RDS MySQL性能对比

概述 这是一个系列。在前面&#xff0c;我们测试了阿里云经济版&#xff08;“ARM”&#xff09;与标准版的性能/价格对比&#xff1b;华为云x86规格与ARM&#xff08;鲲鹏增强&#xff09;版的性能/价格对比。现在&#xff0c;再来看看AWS的ARM版本的RDS情况 在2018年&#…

第3章:知识表示:概述、符号知识表示、向量知识表示

&#x1f497;&#x1f497;&#x1f497;欢迎来到我的博客&#xff0c;你将找到有关如何使用技术解决问题的文章&#xff0c;也会找到某个技术的学习路线。无论你是何种职业&#xff0c;我都希望我的博客对你有所帮助。最后不要忘记订阅我的博客以获取最新文章&#xff0c;也欢…

有什么可视化数据管理工具?

有什么可视化数据管理工具&#xff1f; 相信大家对数据分析并不陌生。数据可视化不仅可以使得数据更加直观、易于理解&#xff0c;而且可以帮助用户发现数据中的潜在规律和趋势。 但是&#xff0c;对于不熟悉编程的用户来说&#xff0c;如何将枯燥的大数据转化为可视化的图和…

springboot——自动装配

自动装配 Condition: Condition内置方法&#xff1a;boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata)&#xff0c;返回值为布尔型 重写matches方法的类&#xff1a;SpringBootCondition等 SpringBootCondition&#xff1a;springboot自带的实现类…

windows如何解决端口冲突(实用篇)

在项目设计中&#xff0c;环境配置成功点击运行瞬间&#xff0c;一大堆红爆出&#xff0c;8080端口占用&#xff0c;这个是很烦人的。。。 解决方式&#xff1a; 笨方法&#xff1a;一、查看所有端口实用情况&#xff08;挨个扫&#xff09; 按住【WINR】快捷键打开运行输入…

同旺科技 USB TO RS-485 定制款适配器--- 拆解(三)

内附链接 1、USB TO RS-485 定制款适配器 ● 支持USB 2.0/3.0接口&#xff0c;并兼容USB 1.1接口&#xff1b; ● 支持USB总线供电&#xff1b; ● 支持Windows系统驱动&#xff0c;包含WIN10 / WIN11系统32 / 64位&#xff1b; ● 支持Windows RT、Linux、Mac OS X、Windo…

同旺科技 USB TO RS-485 定制款适配器--- 拆解(二)

内附链接 1、USB TO RS-485 定制款适配器 ● 支持USB 2.0/3.0接口&#xff0c;并兼容USB 1.1接口&#xff1b; ● 支持USB总线供电&#xff1b; ● 支持Windows系统驱动&#xff0c;包含WIN10 / WIN11系统32 / 64位&#xff1b; ● 支持Windows RT、Linux、Mac OS X、Windo…

TA-Lib学习研究笔记(九)——Pattern Recognition (3)

TA-Lib学习研究笔记&#xff08;九&#xff09;——Pattern Recognition &#xff08;3&#xff09; 最全面的形态识别的函数的应用&#xff0c;通过使用A股实际的数据&#xff0c;验证形态识别函数&#xff0c;用K线显示出现标志的形态走势&#xff0c;由于入口参数基本上是o…

ROS-ROS通信机制-参数服务器

文章目录 一、基础理论知识二、C实现三、Python实现 一、基础理论知识 参数服务器在ROS中主要用于实现不同节点之间的数据共享。参数服务器相当于是独立于所有节点的一个公共容器&#xff0c;可以将数据存储在该容器中&#xff0c;被不同的节点调用&#xff0c;当然不同的节点…

安卓开发APP应用程序和苹果iOS开发APP应用程序有什么区别?

随着智能手机和平板电脑在全球的普及&#xff0c;APP移动应用已成为日常生活中不可或缺的组成部分。从社交网络到电子商务平台&#xff0c;从个人理财到游戏娱乐&#xff0c;APP几乎渗透了人们所有的活动领域。在开发APP时&#xff0c;开发者通常要面对两大主流平台&#xff1a…

Docker+jenkins+gitlab实现持续集成

1.安装环境 服务器ip虚拟机版本192.168.5.132centos7.6192.168.5.152centos7.6 2. 安装docker 安装必要的一些系统工具 yum install -y yum-utils device-mapper-persistent-data lvm2添加软件源信息&#xff0c;要确保centos7能上外网 yum-config-manager --add-repo http:…

西工大计算机学院计算机系统基础实验一(函数编写1~10)

还是那句话&#xff0c;千万不要慌&#xff0c;千万不要着急&#xff0c;耐下性子慢慢来&#xff0c;一步一个脚印&#xff0c;把基础打的牢牢的&#xff0c;一样不比那些人差。回到实验本身&#xff0c;自从​​​​​​按照西工大计算机学院计算机系统基础实验一&#xff08;…

决策树 (人工智能期末复习)

几个重要概念 信息熵&#xff1a;随机事件未按照某个属性的不同取值划分时的熵减去按照某个属性的不同取值划分时的平均 熵。即前后两次熵的差值。 表示事物的混乱程度&#xff0c;熵越大表示混乱程度越大&#xff0c;越小表示混乱程度越小。 对于随机事件&#xff0c;如果它的…

Python实现广义线性回归模型(statsmodels GLM算法)项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 广义线性模型(Generalized Linear Model&#xff0c;简称GLM)是一种广泛应用于回归分析和分类问题的统…

python炒股自动化(1),量化交易接口区别

要实现股票量化程序化自动化&#xff0c;就需要券商提供的API接口&#xff0c;重点是个人账户小散户可以申请开通&#xff0c;上手要简单&#xff0c;接口要足够全面&#xff0c;功能完善&#xff0c;首先&#xff0c;第一步就是要找对渠道和方法&#xff0c;这里我们不讨论量化…

IDEA快速生成lambda表达式的方法

IDEA快速生成lambda表达式的方法-CSDN博客 建议修改成 shift/

吴恩达《机器学习》11-1-11-2:首先要做什么、误差分析

一、首先要做什么 选择特征向量的关键决策 以垃圾邮件分类器算法为例&#xff0c;首先需要决定如何选择和表达特征向量 &#x1d465;。视频提到的一个示例是构建一个由 100 个最常出现在垃圾邮件中的词构成的列表&#xff0c;根据这些词是否在邮件中出现来创建特征向量&…

鸿蒙4.0开发笔记之ArkTS语法基础之条件渲染和循环渲染的使用(十五)

文章目录 一、条件渲染&#xff08;if&#xff09;二、循环渲染&#xff08;ForEach&#xff09; 一、条件渲染&#xff08;if&#xff09; 1、定义 正如其他语言中的if…else…语句&#xff0c;ArkTS提供了渲染控制的能力&#xff0c;条件渲染可根据应用的不同状态&#xff0…

Linux安装MySQL

更新密钥 rpm --import https://repo.mysql.com/RPM-GPG-KEY-mysql-2022安装Mysql yum库 rpm -Uvh http://repo.mysql.com//mysql57-community-release-el7-7.noarch.rpmyum安装Mysql yum -y install mysql-community-server设置开机自启 systemctl start mysqld systemctl…