目录
一、前言
二、流式数据处理场景介绍
2.1 流式数据处理概述
2.1.1 流式数据处理场景介绍
2.2 流式数据处理技术栈
2.2.1 数据采集
2.2.2 数据处理
2.2.3 数据存储
2.2.4 数据展示
2.3 流式数据处理场景面临的问题和挑战
三、通用的流式数据处理场景解决方案
3.1 基本流式处理架构
3.2 流处理与批处理结合
3.3 实时数据仓库
3.4 流式 ETL 架构
四、实验环境准备
4.1 kafka搭建过程
4.1.1 创建相关的目录
4.1.2 编写yaml文件
4.1.3 启动kafka和zk容器
4.1.4 检查容器是否启动成功
4.2 测试验证
4.2.1 进入kafka容器
4.2.2 创建一个topic
4.2.3 测试发送消息
4.3 创建两个备用topic
4.3.1 提前创建两个备用topic
4.3.2 开启生产窗口和消费窗口
五、flink 接收kafka数据并写入kafka
5.1 前置准备
5.1.1 组件版本说明
5.1.2 maven核心依赖
5.2 需求说明
5.3 核心代码实现过程
5.3.1 核心代码实现方式一
5.3.2 效果测试
5.3.3 核心代码实现方式二
六、flink 接收kafka数据写入mysql
6.1 前置准备
6.2 代码实现过程
6.2.1 自定义实体类
6.2.2 添加自定义Sink函数
6.2.3 核心任务逻辑实现
6.2.4 效果测试
七、写在文末
一、前言
在大数据场景中,Flink作为重要的流式处理框架,在架构运行中承载着重要的作用,与之配合使用的就是大家熟悉的高性能消息中间件kafka,两者的结合,就可以解决很多场景下的流式数据问题,本文将通过几个案例详细介绍一下。
二、流式数据处理场景介绍
2.1 流式数据处理概述
流式数据处理(Streaming Data Processing)指的是对连续不断的数据流进行实时处理的技术。这种处理方式适用于需要对大量数据进行快速反应和决策的场景。尤其在需要实时分析、决策支持和即时响应的应用场景中尤为重要。
2.1.1 流式数据处理场景介绍
下面详细介绍几种常见的流式数据处理场景:
-
实时数据分析
-
应用场景:实时股票价格分析、社交媒体趋势分析、在线广告效果监测等。
-
需求:需要快速响应数据变化,提供即时的分析结果。
-
处理流程:收集实时数据流,使用流处理框架(如 Apache Flink、Apache Kafka Streams)进行实时计算,并将结果发送给前端应用或数据可视化工具。
-
-
实时交易监控
-
应用场景:银行交易系统、信用卡欺诈检测、电子商务网站购物车行为监控等。
-
需求:实时监控交易活动,及时发现异常行为。
-
处理流程:实时收集交易数据,通过流处理框架进行模式匹配和规则引擎分析,一旦发现异常则立即产生警报。
-
-
日志处理与监控
-
应用场景:网站访问日志分析、服务器性能监控、应用错误跟踪等。
-
需求:实时监控系统状态,及时发现并解决问题。
-
处理流程:使用日志收集工具(如 Fluentd、Logstash)将日志数据实时传送到中央存储系统,并通过流处理框架进行实时分析,发现异常则触发警报。
-
-
物联网(IoT)数据处理
-
应用场景:智能家居设备监控、工业自动化生产线监控、智能交通系统等。
-
需求:从大量传感器数据中提取有价值的信息,实现设备的远程控制或预防性维护。
-
处理流程:传感器数据通过边缘计算设备进行初步处理后,上传至云端进行进一步分析。
-
-
社交媒体分析
-
应用场景:舆情分析、品牌声誉管理、社交网络影响力评估等。
-
需求:实时分析社交媒体上的用户行为,了解公众情绪。
-
处理流程:通过社交媒体 API 获取实时数据,使用自然语言处理技术进行文本分析,并将结果可视化展示。
-
-
金融交易撮合
-
应用场景:股票交易所、外汇交易平台等。
-
需求:高速处理大量的交易订单,实现订单的快速匹配。
-
处理流程:接收交易订单流,使用高性能撮合引擎(如开源的 ZeroMQ 或专有的交易引擎)进行订单匹配,并将结果返回给交易双方。
-
2.2 流式数据处理技术栈
流式数据处理场景中常用的技术栈涵盖了数据采集、处理、存储和展示等多个环节。下面是一些典型的技术和工具,它们在不同的流式数据处理场景中扮演着重要角色。
2.2.1 数据采集
顾名思义,即数据的来源,对于一个系统,或一个数据处理平台,一般来说,采集的数据来源可以有多种渠道,比如文件、日志、数据库、消息队列等等。
-
消息队列 / 数据总线
-
Apache Kafka:广泛使用的发布订阅消息系统,支持高吞吐量的数据流处理。
-
RabbitMQ:AMQP 标准的消息代理,适用于较小规模的数据传输。
-
Amazon Simple Notification Service (SNS) / Simple Queue Service (SQS):AWS 提供的消息传递服务。
-
-
日志收集工具
-
Fluentd:轻量级的统一日志收集工具,支持多种数据源和输出插件。
-
Logstash:Elasticsearch 生态系统的一部分,用于数据的收集、转换和传输。
-
Filebeat:轻量级的日志转发器,同样属于 Elasticsearch 生态系统。
-
2.2.2 数据处理
采集了数据之后,一般需要对数据进行二次加工处理才能进一步使用,因此这一步在架构中具有非常重要的作用,具体来说,常用的数据处理框架包括:
-
流处理框架
-
Apache Flink:支持事件驱动处理模型,提供低延迟和高吞吐量的流处理能力。
-
Apache Kafka Streams:基于 Kafka 的流处理库,适合集成到现有应用中。
-
Apache Spark Streaming:基于 Spark 的流处理模块,可以与 Spark 的批处理能力无缝集成。
-
Amazon Kinesis Analytics:AWS 提供的用于分析流数据的服务。
-
-
规则引擎
-
Apache Nifi:用于数据流动管理和流处理的任务自动化工具。
-
Drools:基于规则的引擎,可用于实现业务逻辑规则。
-
2.2.3 数据存储
数据处理完成之后,一般有两种处理方式,一是数据持久化存储,比如存储到mysql,pg,或es,hbase等,也可以存储到文件系统,具体来说:
-
分布式存储系统
-
Apache Cassandra:分布式 NoSQL 数据库,支持高可用性和分区容忍性。
-
Apache HBase:基于 Hadoop 的分布式列存储系统。
-
Amazon DynamoDB:AWS 提供的键值和文档存储服务。
-
-
关系型数据库
-
MySQL / PostgreSQL:传统的关系型数据库管理系统,可用于存储流处理后的结果数据。
-
-
非关系数据库
-
es,便于后续数据检索和分布式存储;
-
mongodb,文档型数据库;
-
2.2.4 数据展示
对于很多大数据项目来说,经过处理之后的数据需要通过业务系统进行前端展示,或者数据的多次消费,即借助可视化工具进行展示,具体来说:
-
可视化工具
-
Grafana:流行的开源监控和可视化工具,支持多种数据源。
-
Kibana:Elasticsearch 生态系统中的可视化工具,主要用于日志分析。
-
Tableau:商业化的数据可视化软件,支持丰富的数据源和复杂的报表制作。
-
-
数据仓库
-
Apache Hive:基于 Hadoop 的数据仓库工具,提供 SQL 查询功能。
-
Amazon Redshift:AWS 的完全托管的 PB 级数据仓库服务。
-
这些技术和工具可以根据具体的业务需求和技术背景进行组合使用,构建出适合特定场景的流式数据处理系统。在实际应用中,还需要考虑系统的可扩展性、容错性、安全性和性能等因素。
2.3 流式数据处理场景面临的问题和挑战
流式数据处理场景给架构设计带来了充分的灵活性,能够解决很多实际问题,同时也面临着一系列挑战,这些问题涉及到技术实现、数据管理、性能优化等多个方面。具体来说:
-
数据量大
-
问题描述:流式数据处理通常涉及大量的数据,这些数据可能是持续不断的,且每秒产生的数据量可能是巨大的。
-
解决方案:采用分布式的处理框架(如 Apache Flink、Apache Kafka Streams、Spark Streaming),利用集群的计算能力和内存资源来处理大规模数据。
-
-
实时性要求高
-
问题描述:很多流式数据处理场景要求系统能够以毫秒级甚至更低的延迟来处理数据。
-
解决方案:优化数据处理逻辑,减少不必要的数据移动;使用低延迟的消息队列(如 Kafka);采用高效的序列化协议(如 Protocol Buffers)。
-
-
数据一致性问题
-
问题描述:在处理数据流时,需要保证数据处理的一致性,尤其是在分布式环境中。
-
解决方案:使用支持事务的流处理框架,如 Flink 的 Exactly Once 语义;确保数据存储层支持 ACID 属性。
-
-
容错性问题
-
问题描述:系统需要能够在出现故障时继续正常运行,并且能够恢复到正确状态。
-
解决方案:利用流处理框架的容错机制,如 Flink 的 Checkpointing 和 Savepoint;设计健壮的备份和恢复策略。
-
-
数据质量问题
-
问题描述:流式数据可能存在不完整、错误或格式不一致等问题。
-
解决方案:在数据进入处理流水线之前进行数据清洗;使用数据质量工具(如 Apache Nifi)来检查和纠正数据错误。
-
-
系统扩展性问题
-
问题描述:随着数据量的增长,系统需要能够水平扩展以应对更多的负载。
-
解决方案:设计可扩展的架构,使用微服务架构来分离关注点;利用云计算平台的弹性伸缩能力。
-
-
数据安全问题
-
问题描述:流式数据处理涉及到敏感数据,必须确保数据的安全性和隐私保护。
-
解决方案:加密传输通道;对敏感数据进行脱敏处理;遵循数据保护法规(如 GDPR)。
-
-
运维复杂度
-
问题描述:流式数据处理系统的运维相对传统批处理系统更加复杂。
-
解决方案:采用容器化技术(如 Docker、Kubernetes)简化部署和管理;使用 DevOps 工具(如 Jenkins、GitLab CI/CD)自动化运维过程。
-
-
数据集成
-
问题描述:流式数据处理需要与多种数据源集成,包括数据库、文件系统、外部API等。
-
解决方案:使用统一的数据接入层(如 Apache NiFi、Apache Camel);设计良好的API接口以支持异构系统的交互。
-
面对这些问题,开发人员和运维人员需要综合运用多种技术和策略来构建稳定、高效、安全的流式数据处理系统。
三、通用的流式数据处理场景解决方案
flink + kafka可以说能够解决大部分场景下关于流式数据处理的问题,下面结合实践经验,列举几种常见的处理方案。
3.1 基本流式处理架构
适用于需要实时处理大量数据流的应用场景,如实时监控、日志分析、实时告警等。如下图所示:
方案说明:
-
Kafka Brokers:接收来自不同数据源的数据,如日志、传感器数据等。
-
Flink Cluster:消费 Kafka 中的数据,进行实时处理(如聚合、过滤、窗口操作等)。
-
Various Sinks:将处理后的数据写入到不同的存储系统,如 Elasticsearch、HDFS、数据库等。
3.2 流处理与批处理结合
适用于需要同时处理实时数据流和历史数据批处理的应用场景,如实时报表生成与历史数据分析相结合。如下图所示:
方案说明:
-
Kafka Brokers:接收实时数据流;
-
Flink Cluster:消费实时数据流,进行实时处理;
-
Batch Processing:可以使用 Flink 的批处理功能或者结合 Spark 进行批处理任务;
-
Storage Systems:存储处理后的数据,支持实时数据和历史数据的查询;
3.3 实时数据仓库
适用于需要实时数据仓库的应用场景,如实时报表生成、实时数据分析等。如下图所示:
方案说明:
-
Kafka Brokers:接收实时数据流;
-
Flink Cluster:消费实时数据流,进行实时处理,清洗和聚合数据;
-
Real-time Data Warehouse:存储处理后的实时数据,支持实时查询和分析;
-
Analytics Tools:使用 BI 工具或其他分析工具进行实时数据分析,生成报表或仪表盘;
3.4 流式 ETL 架构
适用于需要实时进行数据抽取、转换和加载(ETL)的应用场景,如实时数据迁移、实时数据同步等。如下图所示:
方案说明:
-
Kafka Brokers:接收来自不同数据源的数据。
-
Flink Cluster:消费 Kafka 中的数据,进行实时的 ETL 处理,如数据清洗、转换、加载等。
-
Target System:将处理后的数据加载到目标系统,如数据仓库、HDFS、数据库、S3 等。
四、实验环境准备
4.1 kafka搭建过程
4.1.1 创建相关的目录
mkdir -p /docker/kafka/
cd /docker/kafka/
4.1.2 编写yaml文件
vim docker-compose.yaml ,添加如下内容
vim docker-compose.yaml
version: '2.2.2'
services:zookeeper:image: zookeepercontainer_name: zookeeperports:- "2181:2181"networks:- mynetkafka:image: bitnami/kafkacontainer_name: kafkaports:- "9092:9092"environment:KAFKA_BROKER_ID: 0KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://公网IP:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092networks:- mynetnetworks:mynet:name: mynetdriver: bridge
4.1.3 启动kafka和zk容器
执行下面的命令
docker-compose up -d
4.1.4 检查容器是否启动成功
使用 docker ps命令检查
至此,Kafka已安装完成
4.2 测试验证
4.2.1 进入kafka容器
docker exec -ti kafka /bin/bash
4.2.2 创建一个topic
执行下面的命令
kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test
4.2.3 测试发送消息
执行下面的命令打开发送消息的窗口,并发送几条消息
kafka-console-producer.sh --broker-list IP:9092 --topic test
执行下面的命令打开消费消息的窗口
kafka-console-consumer.sh --bootstrap-server IP:9092 --topic test
4.3 创建两个备用topic
4.3.1 提前创建两个备用topic
后文中测试将会用到
kafka-topics.sh --create --bootstrap-server IP:9092 --topic source
kafka-topics.sh --create --bootstrap-server IP:9092 --topic target
4.3.2 开启生产窗口和消费窗口
开启source的生产窗口
开启target 消费窗口
五、flink 接收kafka数据并写入kafka
5.1 前置准备
5.1.1 组件版本说明
本次代码涉及到的各技术组件版本如下:
-
JDK,17;
-
springboot,2.5.5;
-
flink版本,1.19.0;
-
mysql,8.2.23;
5.1.2 maven核心依赖
在pom中添加如下案例中使用的核心依赖组件
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.19.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.19.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.19.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.19.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>3.2.0-1.19</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>1.19.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>1.19.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>3.1.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.19.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.30</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency>
5.2 需求说明
如下图所示,现在需要一个需求,多端应用将源数据推送到kafka第一个topic,flink接收topic的数据,经过处理之后,再将数据发送至kafka的另一个topic,后续其他的应用从第二个topic中接收并处理消息。
5.3 核心代码实现过程
按照Flink CDC的编码风格,主要分为下面几步:
-
获取执行环境;
-
设置source配置信息
-
source即后面flink核心逻辑要处理的数据来源,比如这里是来源于kafka的topic;
-
-
添加source,一般调用 addSource 方法;
-
在这里通常要对接收到的数据进行处理的过程;
-
-
添加自定义 Sink
-
Sink即将进行数据处理要做的事情,简单理解就是数据处理并写出的过程;
-
5.3.1 核心代码实现方式一
结合上述代码实现思路,参考下面的代码
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Properties;/*** 第一种写法*/
public class FlinkKafkaTest1 {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行数env.setParallelism(4);// 每10000毫秒进行一次checkpointenv.enableCheckpointing(10000);//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 设置 Kafka 消费者属性Properties properties = new Properties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group");properties.setProperty("key.deserializer", StringDeserializer.class.getName());properties.setProperty("value.deserializer", StringDeserializer.class.getName());// 创建Kafka消费者,将消费者添加到流FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("source", new SimpleStringSchema(), properties);//设置只读取最新数据consumer.setStartFromLatest();//添加数据源DataStreamSource<String> source = env.addSource(consumer);source.print();DataStream<String> mappedStream = source.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {//进行数据处理,比如这里将值转换为大写return value.toUpperCase();}});//创建一个Flink生产者,将处理过的数据发回去mappedStream.addSink(new FlinkKafkaProducer<String>("target", new SimpleStringSchema(), properties));env.execute("Flink Kafka Integration");}
}
5.3.2 效果测试
这段代码要从source这个topic中接收消息,经过处理之后,再将消息写到target的topic里面,运行这段代码,通过控制台可以看到出于就绪状态了
然后基于上述打开的两个终端窗口,在producer的窗口中发送一条消息,可以看到,经过程序的转换后,将大写的字符串输出到target中了。
5.3.3 核心代码实现方式二
使用当前这个版本的flink,也可以简化为下面这种写法
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Properties;/*** 另一种写法*/
public class FlinkKafkaTest2 {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行数env.setParallelism(4);// 每10000毫秒进行一次checkpointenv.enableCheckpointing(10000);//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);KafkaSource<String> source = KafkaSource.<String>builder().setTopics("source").setGroupId("flink-consumer-group").setBootstrapServers("IP:9092").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> dataStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka source");DataStream<String> mappedStream = dataStream.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {//进行数据治理 例如,将值转换为大写return value.toUpperCase();}});// 设置 Kafka 消费者属性Properties properties = new Properties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group");properties.setProperty("key.deserializer", StringDeserializer.class.getName());properties.setProperty("value.deserializer", StringDeserializer.class.getName());//创建一个Flink生产者,将处理过的数据发回去mappedStream.addSink(new FlinkKafkaProducer<String>("target", new SimpleStringSchema(), properties));env.execute("Flink Kafka Integration");}
}
运行一下,效果是类似的,也能达到同样的目的
六、flink 接收kafka数据写入mysql
基于上面的需求,flink 从前一个topic中获取到的数据经过处理之后,可能像上面那样直接发送到另一个topic中,供其他应用使用,也可以在经过处理之后,进行数据入库或其他持久化的操作,比如存储到mysql,es等,于是就变成下面这样。
6.1 前置准备
提前创建一张数据表,用于后续Flink插入数据使用,建表sql如下:
CREATE TABLE `db_user` (`id` varchar(32) NOT NULL,`name` varchar(32) DEFAULT NULL,`age` int DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
6.2 代码实现过程
6.2.1 自定义实体类
与数据表对应,注意实现序列化接口
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;@Data
@AllArgsConstructor
@NoArgsConstructor
public class DbUser implements Serializable {private String id;private String name;private Integer age;}
6.2.2 添加自定义Sink函数
该函数的作用就是将flink中处理后的数据写入到mysql中
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;public class MySqlToDbSink extends RichSinkFunction<DbUser> {private Connection connection = null;private PreparedStatement ps = null;private String tableName = "db_user";@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// 通过Druid获取数据库连接,准备写入数据库connection = DriverManager.getConnection("jdbc:mysql://IP:3306/db", "root", "123");// 插入数据库的语句 因为我们封装的pojo的类型为PojoType<com.lzl.flink.Web, fields = [area: String, createDate: Date, ip: String, operate: String, uuid: String, web: String]>String insertQuery = "INSERT INTO " + tableName + "(id,`name`,age) VALUES (?,?,?)" ;// 执行插入语句ps = connection.prepareStatement(insertQuery);}@Overridepublic void close() throws Exception {super.close();if(connection != null) {connection.close();}if (ps != null ) {ps.close();}}@Overridepublic void invoke(DbUser value, Context context) throws Exception {//组装数据,执行插入操作ps.setString(1, value.getId());ps.setString(2,value.getName());ps.setInt(3, value.getAge());ps.addBatch();int[] count = ps.executeBatch();System.out.println("成功写入MySQL数量:" + count.length);}
}
6.2.3 核心任务逻辑实现
参考下面的实现代码
package com.congge.flink.kafka;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Properties;/*** 另一种写法,结果数据写到mysql*/
public class FlinkKafkaTest3 {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行数env.setParallelism(4);// 每10000毫秒进行一次checkpointenv.enableCheckpointing(10000);//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);KafkaSource<String> source = KafkaSource.<String>builder().setTopics("source").setGroupId("flink-consumer-group").setBootstrapServers("IP:9092").setStartingOffsets(OffsetsInitializer.latest()) //消费最新数据.setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> dataStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka source");DataStream<DbUser> mappedStream = dataStream.map(new MapFunction<String, DbUser>() {@Overridepublic DbUser map(String value) throws Exception {System.out.println("接收到kafka的消息:"+value);String[] messages = value.split(",");//包装为java bean输出DbUser dbUser = new DbUser(messages[0], messages[1], Integer.parseInt(messages[2]));return dbUser;}});// 设置 Kafka 消费者属性Properties properties = new Properties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "124.221.34.148:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group");properties.setProperty("key.deserializer", StringDeserializer.class.getName());properties.setProperty("value.deserializer", StringDeserializer.class.getName());//创建一个Flink生产者,将处理过的数据发回去//mappedStream.addSink(new FlinkKafkaProducer<String>("target", new SimpleStringSchema(), properties));mappedStream.addSink(new MySqlToDbSink());env.execute("Flink Kafka Integration");}
}
6.2.4 效果测试
通过kafka生产者窗口写入一条数据,可以看到,经过程序的转换后成功将数据写入到上述的mysql表里面了
七、写在文末
本文通过理论结合实际案例,详细分享了Flink结合kafka在流失应用场景下数据处理的过程,在实际业务处理时,还需要结合实际的需求,合理的进行上下游的设计,以确保数据流通的最终结果是正确的,希望对看到的同学有用,本篇到此结束,感谢观看。