【大数据】Flink + Kafka 实现通用流式数据处理详解

目录

一、前言

二、流式数据处理场景介绍

2.1 流式数据处理概述

2.1.1 流式数据处理场景介绍

2.2 流式数据处理技术栈

2.2.1 数据采集

2.2.2 数据处理

2.2.3 数据存储

2.2.4 数据展示

2.3 流式数据处理场景面临的问题和挑战

三、通用的流式数据处理场景解决方案

3.1 基本流式处理架构

3.2 流处理与批处理结合

3.3 实时数据仓库

3.4 流式 ETL 架构

四、实验环境准备

4.1 kafka搭建过程

4.1.1 创建相关的目录

4.1.2 编写yaml文件

4.1.3 启动kafka和zk容器

4.1.4 检查容器是否启动成功

4.2 测试验证

4.2.1 进入kafka容器

4.2.2 创建一个topic

4.2.3 测试发送消息

4.3 创建两个备用topic

4.3.1 提前创建两个备用topic

4.3.2 开启生产窗口和消费窗口

五、flink 接收kafka数据并写入kafka

5.1 前置准备

5.1.1 组件版本说明

5.1.2 maven核心依赖

5.2 需求说明

5.3 核心代码实现过程

5.3.1 核心代码实现方式一

5.3.2 效果测试

5.3.3 核心代码实现方式二

六、flink 接收kafka数据写入mysql

6.1 前置准备

6.2 代码实现过程

6.2.1 自定义实体类

6.2.2 添加自定义Sink函数

6.2.3 核心任务逻辑实现

6.2.4 效果测试

七、写在文末


一、前言

在大数据场景中,Flink作为重要的流式处理框架,在架构运行中承载着重要的作用,与之配合使用的就是大家熟悉的高性能消息中间件kafka,两者的结合,就可以解决很多场景下的流式数据问题,本文将通过几个案例详细介绍一下。

二、流式数据处理场景介绍

2.1 流式数据处理概述

流式数据处理(Streaming Data Processing)指的是对连续不断的数据流进行实时处理的技术。这种处理方式适用于需要对大量数据进行快速反应和决策的场景。尤其在需要实时分析、决策支持和即时响应的应用场景中尤为重要。

2.1.1 流式数据处理场景介绍

下面详细介绍几种常见的流式数据处理场景:

  • 实时数据分析

    • 应用场景:实时股票价格分析、社交媒体趋势分析、在线广告效果监测等。

    • 需求:需要快速响应数据变化,提供即时的分析结果。

    • 处理流程:收集实时数据流,使用流处理框架(如 Apache Flink、Apache Kafka Streams)进行实时计算,并将结果发送给前端应用或数据可视化工具。

  • 实时交易监控

    • 应用场景:银行交易系统、信用卡欺诈检测、电子商务网站购物车行为监控等。

    • 需求:实时监控交易活动,及时发现异常行为。

    • 处理流程:实时收集交易数据,通过流处理框架进行模式匹配和规则引擎分析,一旦发现异常则立即产生警报。

  • 日志处理与监控

    • 应用场景:网站访问日志分析、服务器性能监控、应用错误跟踪等。

    • 需求:实时监控系统状态,及时发现并解决问题。

    • 处理流程:使用日志收集工具(如 Fluentd、Logstash)将日志数据实时传送到中央存储系统,并通过流处理框架进行实时分析,发现异常则触发警报。

  • 物联网(IoT)数据处理

    • 应用场景:智能家居设备监控、工业自动化生产线监控、智能交通系统等。

    • 需求:从大量传感器数据中提取有价值的信息,实现设备的远程控制或预防性维护。

    • 处理流程:传感器数据通过边缘计算设备进行初步处理后,上传至云端进行进一步分析。

  • 社交媒体分析

    • 应用场景:舆情分析、品牌声誉管理、社交网络影响力评估等。

    • 需求:实时分析社交媒体上的用户行为,了解公众情绪。

    • 处理流程:通过社交媒体 API 获取实时数据,使用自然语言处理技术进行文本分析,并将结果可视化展示。

  • 金融交易撮合

    • 应用场景:股票交易所、外汇交易平台等。

    • 需求:高速处理大量的交易订单,实现订单的快速匹配。

    • 处理流程:接收交易订单流,使用高性能撮合引擎(如开源的 ZeroMQ 或专有的交易引擎)进行订单匹配,并将结果返回给交易双方。

2.2 流式数据处理技术栈

流式数据处理场景中常用的技术栈涵盖了数据采集、处理、存储和展示等多个环节。下面是一些典型的技术和工具,它们在不同的流式数据处理场景中扮演着重要角色。

2.2.1 数据采集

顾名思义,即数据的来源,对于一个系统,或一个数据处理平台,一般来说,采集的数据来源可以有多种渠道,比如文件、日志、数据库、消息队列等等。

  • 消息队列 / 数据总线

    • Apache Kafka:广泛使用的发布订阅消息系统,支持高吞吐量的数据流处理。

    • RabbitMQ:AMQP 标准的消息代理,适用于较小规模的数据传输。

    • Amazon Simple Notification Service (SNS) / Simple Queue Service (SQS):AWS 提供的消息传递服务。

  • 日志收集工具

    • Fluentd:轻量级的统一日志收集工具,支持多种数据源和输出插件。

    • Logstash:Elasticsearch 生态系统的一部分,用于数据的收集、转换和传输。

    • Filebeat:轻量级的日志转发器,同样属于 Elasticsearch 生态系统。

2.2.2 数据处理

采集了数据之后,一般需要对数据进行二次加工处理才能进一步使用,因此这一步在架构中具有非常重要的作用,具体来说,常用的数据处理框架包括:

  • 流处理框架

    • Apache Flink:支持事件驱动处理模型,提供低延迟和高吞吐量的流处理能力。

    • Apache Kafka Streams:基于 Kafka 的流处理库,适合集成到现有应用中。

    • Apache Spark Streaming:基于 Spark 的流处理模块,可以与 Spark 的批处理能力无缝集成。

    • Amazon Kinesis Analytics:AWS 提供的用于分析流数据的服务。

  • 规则引擎

    • Apache Nifi:用于数据流动管理和流处理的任务自动化工具。

    • Drools:基于规则的引擎,可用于实现业务逻辑规则。

2.2.3 数据存储

数据处理完成之后,一般有两种处理方式,一是数据持久化存储,比如存储到mysql,pg,或es,hbase等,也可以存储到文件系统,具体来说:

  • 分布式存储系统

    • Apache Cassandra:分布式 NoSQL 数据库,支持高可用性和分区容忍性。

    • Apache HBase:基于 Hadoop 的分布式列存储系统。

    • Amazon DynamoDB:AWS 提供的键值和文档存储服务。

  • 关系型数据库

    • MySQL / PostgreSQL:传统的关系型数据库管理系统,可用于存储流处理后的结果数据。

  • 非关系数据库

    • es,便于后续数据检索和分布式存储;

    • mongodb,文档型数据库;

2.2.4 数据展示

对于很多大数据项目来说,经过处理之后的数据需要通过业务系统进行前端展示,或者数据的多次消费,即借助可视化工具进行展示,具体来说:

  • 可视化工具

    • Grafana:流行的开源监控和可视化工具,支持多种数据源。

    • Kibana:Elasticsearch 生态系统中的可视化工具,主要用于日志分析。

    • Tableau:商业化的数据可视化软件,支持丰富的数据源和复杂的报表制作。

  • 数据仓库

    • Apache Hive:基于 Hadoop 的数据仓库工具,提供 SQL 查询功能。

    • Amazon Redshift:AWS 的完全托管的 PB 级数据仓库服务。

这些技术和工具可以根据具体的业务需求和技术背景进行组合使用,构建出适合特定场景的流式数据处理系统。在实际应用中,还需要考虑系统的可扩展性、容错性、安全性和性能等因素。

2.3 流式数据处理场景面临的问题和挑战

流式数据处理场景给架构设计带来了充分的灵活性,能够解决很多实际问题,同时也面临着一系列挑战,这些问题涉及到技术实现、数据管理、性能优化等多个方面。具体来说:

  • 数据量大

    • 问题描述:流式数据处理通常涉及大量的数据,这些数据可能是持续不断的,且每秒产生的数据量可能是巨大的。

    • 解决方案:采用分布式的处理框架(如 Apache Flink、Apache Kafka Streams、Spark Streaming),利用集群的计算能力和内存资源来处理大规模数据。

  • 实时性要求高

    • 问题描述:很多流式数据处理场景要求系统能够以毫秒级甚至更低的延迟来处理数据。

    • 解决方案:优化数据处理逻辑,减少不必要的数据移动;使用低延迟的消息队列(如 Kafka);采用高效的序列化协议(如 Protocol Buffers)。

  • 数据一致性问题

    • 问题描述:在处理数据流时,需要保证数据处理的一致性,尤其是在分布式环境中。

    • 解决方案:使用支持事务的流处理框架,如 Flink 的 Exactly Once 语义;确保数据存储层支持 ACID 属性。

  • 容错性问题

    • 问题描述:系统需要能够在出现故障时继续正常运行,并且能够恢复到正确状态。

    • 解决方案:利用流处理框架的容错机制,如 Flink 的 Checkpointing 和 Savepoint;设计健壮的备份和恢复策略。

  • 数据质量问题

    • 问题描述:流式数据可能存在不完整、错误或格式不一致等问题。

    • 解决方案:在数据进入处理流水线之前进行数据清洗;使用数据质量工具(如 Apache Nifi)来检查和纠正数据错误。

  • 系统扩展性问题

    • 问题描述:随着数据量的增长,系统需要能够水平扩展以应对更多的负载。

    • 解决方案:设计可扩展的架构,使用微服务架构来分离关注点;利用云计算平台的弹性伸缩能力。

  • 数据安全问题

    • 问题描述:流式数据处理涉及到敏感数据,必须确保数据的安全性和隐私保护。

    • 解决方案:加密传输通道;对敏感数据进行脱敏处理;遵循数据保护法规(如 GDPR)。

  • 运维复杂度

    • 问题描述:流式数据处理系统的运维相对传统批处理系统更加复杂。

    • 解决方案:采用容器化技术(如 Docker、Kubernetes)简化部署和管理;使用 DevOps 工具(如 Jenkins、GitLab CI/CD)自动化运维过程。

  • 数据集成

    • 问题描述:流式数据处理需要与多种数据源集成,包括数据库、文件系统、外部API等。

    • 解决方案:使用统一的数据接入层(如 Apache NiFi、Apache Camel);设计良好的API接口以支持异构系统的交互。

面对这些问题,开发人员和运维人员需要综合运用多种技术和策略来构建稳定、高效、安全的流式数据处理系统。

三、通用的流式数据处理场景解决方案

flink + kafka可以说能够解决大部分场景下关于流式数据处理的问题,下面结合实践经验,列举几种常见的处理方案。

3.1 基本流式处理架构

适用于需要实时处理大量数据流的应用场景,如实时监控、日志分析、实时告警等。如下图所示:

方案说明:

  • Kafka Brokers:接收来自不同数据源的数据,如日志、传感器数据等。

  • Flink Cluster:消费 Kafka 中的数据,进行实时处理(如聚合、过滤、窗口操作等)。

  • Various Sinks:将处理后的数据写入到不同的存储系统,如 Elasticsearch、HDFS、数据库等。

3.2 流处理与批处理结合

适用于需要同时处理实时数据流和历史数据批处理的应用场景,如实时报表生成与历史数据分析相结合。如下图所示:

方案说明:

  • Kafka Brokers:接收实时数据流;

  • Flink Cluster:消费实时数据流,进行实时处理;

  • Batch Processing:可以使用 Flink 的批处理功能或者结合 Spark 进行批处理任务;

  • Storage Systems:存储处理后的数据,支持实时数据和历史数据的查询;

3.3 实时数据仓库

适用于需要实时数据仓库的应用场景,如实时报表生成、实时数据分析等。如下图所示:

方案说明:

  • Kafka Brokers:接收实时数据流;

  • Flink Cluster:消费实时数据流,进行实时处理,清洗和聚合数据;

  • Real-time Data Warehouse:存储处理后的实时数据,支持实时查询和分析;

  • Analytics Tools:使用 BI 工具或其他分析工具进行实时数据分析,生成报表或仪表盘;

3.4 流式 ETL 架构

适用于需要实时进行数据抽取、转换和加载(ETL)的应用场景,如实时数据迁移、实时数据同步等。如下图所示:

方案说明:

  • Kafka Brokers:接收来自不同数据源的数据。

  • Flink Cluster:消费 Kafka 中的数据,进行实时的 ETL 处理,如数据清洗、转换、加载等。

  • Target System:将处理后的数据加载到目标系统,如数据仓库、HDFS、数据库、S3 等。

四、实验环境准备

4.1 kafka搭建过程

4.1.1 创建相关的目录

mkdir -p /docker/kafka/
cd /docker/kafka/

4.1.2 编写yaml文件

vim docker-compose.yaml ,添加如下内容

vim docker-compose.yaml
version: '2.2.2'
services:zookeeper:image: zookeepercontainer_name: zookeeperports:- "2181:2181"networks:- mynetkafka:image: bitnami/kafkacontainer_name: kafkaports:- "9092:9092"environment:KAFKA_BROKER_ID: 0KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://公网IP:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092networks:- mynetnetworks:mynet:name: mynetdriver: bridge

4.1.3 启动kafka和zk容器

执行下面的命令

docker-compose up -d

4.1.4 检查容器是否启动成功

使用 docker ps命令检查

至此,Kafka已安装完成

4.2 测试验证

4.2.1 进入kafka容器

docker exec -ti kafka /bin/bash

4.2.2 创建一个topic

执行下面的命令

kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test

4.2.3 测试发送消息

执行下面的命令打开发送消息的窗口,并发送几条消息

kafka-console-producer.sh --broker-list IP:9092 --topic test

执行下面的命令打开消费消息的窗口

kafka-console-consumer.sh --bootstrap-server IP:9092 --topic test

4.3 创建两个备用topic

4.3.1 提前创建两个备用topic

后文中测试将会用到

kafka-topics.sh --create --bootstrap-server IP:9092 --topic source
kafka-topics.sh --create --bootstrap-server IP:9092 --topic target

4.3.2 开启生产窗口和消费窗口

开启source的生产窗口

开启target 消费窗口

五、flink 接收kafka数据并写入kafka

5.1 前置准备

5.1.1 组件版本说明

本次代码涉及到的各技术组件版本如下:

  • JDK,17;

  • springboot,2.5.5;

  • flink版本,1.19.0;

  • mysql,8.2.23;

5.1.2 maven核心依赖

在pom中添加如下案例中使用的核心依赖组件

		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.19.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.19.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.19.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.19.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>3.2.0-1.19</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>1.19.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>1.19.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>3.1.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.19.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.30</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency>

5.2 需求说明

如下图所示,现在需要一个需求,多端应用将源数据推送到kafka第一个topic,flink接收topic的数据,经过处理之后,再将数据发送至kafka的另一个topic,后续其他的应用从第二个topic中接收并处理消息。

5.3 核心代码实现过程

按照Flink CDC的编码风格,主要分为下面几步:

  • 获取执行环境;

  • 设置source配置信息

    • source即后面flink核心逻辑要处理的数据来源,比如这里是来源于kafka的topic;

  • 添加source,一般调用 addSource 方法;

    • 在这里通常要对接收到的数据进行处理的过程;

  • 添加自定义 Sink

    • Sink即将进行数据处理要做的事情,简单理解就是数据处理并写出的过程;

5.3.1 核心代码实现方式一

结合上述代码实现思路,参考下面的代码

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Properties;/*** 第一种写法*/
public class FlinkKafkaTest1 {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行数env.setParallelism(4);// 每10000毫秒进行一次checkpointenv.enableCheckpointing(10000);//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 设置 Kafka 消费者属性Properties properties = new Properties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group");properties.setProperty("key.deserializer", StringDeserializer.class.getName());properties.setProperty("value.deserializer", StringDeserializer.class.getName());// 创建Kafka消费者,将消费者添加到流FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("source", new SimpleStringSchema(), properties);//设置只读取最新数据consumer.setStartFromLatest();//添加数据源DataStreamSource<String> source = env.addSource(consumer);source.print();DataStream<String> mappedStream = source.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {//进行数据处理,比如这里将值转换为大写return value.toUpperCase();}});//创建一个Flink生产者,将处理过的数据发回去mappedStream.addSink(new FlinkKafkaProducer<String>("target", new SimpleStringSchema(), properties));env.execute("Flink Kafka Integration");}
}

5.3.2 效果测试

这段代码要从source这个topic中接收消息,经过处理之后,再将消息写到target的topic里面,运行这段代码,通过控制台可以看到出于就绪状态了

然后基于上述打开的两个终端窗口,在producer的窗口中发送一条消息,可以看到,经过程序的转换后,将大写的字符串输出到target中了。

5.3.3 核心代码实现方式二

使用当前这个版本的flink,也可以简化为下面这种写法

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Properties;/*** 另一种写法*/
public class FlinkKafkaTest2 {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行数env.setParallelism(4);// 每10000毫秒进行一次checkpointenv.enableCheckpointing(10000);//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);KafkaSource<String> source = KafkaSource.<String>builder().setTopics("source").setGroupId("flink-consumer-group").setBootstrapServers("IP:9092").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> dataStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka source");DataStream<String> mappedStream = dataStream.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {//进行数据治理 例如,将值转换为大写return value.toUpperCase();}});// 设置 Kafka 消费者属性Properties properties = new Properties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group");properties.setProperty("key.deserializer", StringDeserializer.class.getName());properties.setProperty("value.deserializer", StringDeserializer.class.getName());//创建一个Flink生产者,将处理过的数据发回去mappedStream.addSink(new FlinkKafkaProducer<String>("target", new SimpleStringSchema(), properties));env.execute("Flink Kafka Integration");}
}

运行一下,效果是类似的,也能达到同样的目的

六、flink 接收kafka数据写入mysql

基于上面的需求,flink 从前一个topic中获取到的数据经过处理之后,可能像上面那样直接发送到另一个topic中,供其他应用使用,也可以在经过处理之后,进行数据入库或其他持久化的操作,比如存储到mysql,es等,于是就变成下面这样。

6.1 前置准备

提前创建一张数据表,用于后续Flink插入数据使用,建表sql如下:

CREATE TABLE `db_user` (`id` varchar(32) NOT NULL,`name` varchar(32) DEFAULT NULL,`age` int DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

6.2 代码实现过程

6.2.1 自定义实体类

与数据表对应,注意实现序列化接口

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;@Data
@AllArgsConstructor
@NoArgsConstructor
public class DbUser implements Serializable {private String id;private String name;private Integer age;}

6.2.2 添加自定义Sink函数

该函数的作用就是将flink中处理后的数据写入到mysql中

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;public class MySqlToDbSink extends RichSinkFunction<DbUser> {private Connection connection = null;private PreparedStatement ps = null;private String tableName = "db_user";@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// 通过Druid获取数据库连接,准备写入数据库connection = DriverManager.getConnection("jdbc:mysql://IP:3306/db", "root", "123");// 插入数据库的语句   因为我们封装的pojo的类型为PojoType<com.lzl.flink.Web, fields = [area: String, createDate: Date, ip: String, operate: String, uuid: String, web: String]>String insertQuery = "INSERT INTO " +   tableName + "(id,`name`,age) VALUES (?,?,?)" ;// 执行插入语句ps = connection.prepareStatement(insertQuery);}@Overridepublic void close() throws Exception {super.close();if(connection != null) {connection.close();}if (ps != null ) {ps.close();}}@Overridepublic void invoke(DbUser value, Context context) throws Exception {//组装数据,执行插入操作ps.setString(1, value.getId());ps.setString(2,value.getName());ps.setInt(3, value.getAge());ps.addBatch();int[] count = ps.executeBatch();System.out.println("成功写入MySQL数量:" + count.length);}
}

6.2.3 核心任务逻辑实现

参考下面的实现代码

package com.congge.flink.kafka;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Properties;/*** 另一种写法,结果数据写到mysql*/
public class FlinkKafkaTest3 {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行数env.setParallelism(4);// 每10000毫秒进行一次checkpointenv.enableCheckpointing(10000);//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);KafkaSource<String> source = KafkaSource.<String>builder().setTopics("source").setGroupId("flink-consumer-group").setBootstrapServers("IP:9092").setStartingOffsets(OffsetsInitializer.latest())   //消费最新数据.setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> dataStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka source");DataStream<DbUser> mappedStream = dataStream.map(new MapFunction<String, DbUser>() {@Overridepublic DbUser map(String value) throws Exception {System.out.println("接收到kafka的消息:"+value);String[] messages = value.split(",");//包装为java bean输出DbUser dbUser = new DbUser(messages[0], messages[1], Integer.parseInt(messages[2]));return dbUser;}});// 设置 Kafka 消费者属性Properties properties = new Properties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "124.221.34.148:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group");properties.setProperty("key.deserializer", StringDeserializer.class.getName());properties.setProperty("value.deserializer", StringDeserializer.class.getName());//创建一个Flink生产者,将处理过的数据发回去//mappedStream.addSink(new FlinkKafkaProducer<String>("target", new SimpleStringSchema(), properties));mappedStream.addSink(new MySqlToDbSink());env.execute("Flink Kafka Integration");}
}

6.2.4 效果测试

通过kafka生产者窗口写入一条数据,可以看到,经过程序的转换后成功将数据写入到上述的mysql表里面了

七、写在文末

本文通过理论结合实际案例,详细分享了Flink结合kafka在流失应用场景下数据处理的过程,在实际业务处理时,还需要结合实际的需求,合理的进行上下游的设计,以确保数据流通的最终结果是正确的,希望对看到的同学有用,本篇到此结束,感谢观看。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/460200.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

精准测试在基金团队应用实践

以下为作者观点&#xff1a; 一、引言 精准测试是一套计算机测试辅助分析系统&#xff0c;精准测试的核心组件包含&#xff0c;软件覆盖率分析、用例和代码的双向追踪、智能回归测试用例选取、缺陷定位、测试用例聚类分析、测试用例自动生成系统&#xff0c;这些功能完整的构…

参与国家标准制定对企业发展有哪些好处?

1. 提升企业竞争力&#xff1a; • 技术优势凸显&#xff1a;参与标准制定的过程中&#xff0c;企业能将自身先进的技术和管理理念融入标准&#xff0c;这不仅是对企业技术实力的认可&#xff0c;也能使企业在行业中占据技术制高点。 • 质量优势强化&#xff1a;国家标准对产品…

滚柱导轨出现异常损坏的原因

滚柱导轨是一种精密的直线滚动导轨&#xff0c;具有较高的承载能力和较高的刚性&#xff0c;对反复动作、起动、停止往复运动频率较高情况下可减少整机重量和传动机构及动力成本。滚柱导轨可获得较高的灵敏度和高性能的平面直线运动&#xff0c;在重载或变载的情况下&#xff0…

开发了一个成人学位英语助考微信小程序

微信小程序名称&#xff1a;石榴英语 全称&#xff1a;石榴英语真题助手 功能定位 北京成人学士学位英语辅助学习工具&#xff0c;包含记高频单词&#xff0c;高频词组&#xff0c;专项练习&#xff0c;模拟考试等功能。 开发背景 个人工作需要提高学习英文水平&#xff…

基于Matlab 火焰识别技术

Matlab 火焰识别技术 课题介绍 森林承担着为人类提供氧气以及回收二氧化碳等废弃气体的作用&#xff0c;森林保护显得尤其重要。但是每年由于火灾引起的事故不计其数&#xff0c;造成重大的损失。如果有一款监测软件&#xff0c;从硬件处获得的图像中监测是否有火焰&#xff…

同声传译器什么好用?哪款是你的会议利器推荐榜?

眨眼之间&#xff0c;冬日的脚步悄然而至&#xff0c;又可以踏上前往东北的旅程&#xff0c;去欣赏那银装素裹的绝美雪景。 在这样一个充满异域风情和语言挑战的旅途中&#xff0c;一款顶尖的同声传译器软件无疑是旅行者的最佳伴侣。 它能帮助我们跨越语言的鸿沟&#xff0c;…

jenkins自动化构建vue(web)项目并部署(项目实战)

安装nodejs插件 系统管理>插件管理 安装完成 配置node 新建任务 根据自己情况来设置是否需要丢弃旧的构建&#xff0c;我保存了5天和5次 cd /var/jenkins_home/workspace/hainan_road_web/SDGS-YHJC/sdgs-ui npm config set registry https://registry.npmmirror.com n…

(二 上)VB 2010 设计初步

目录 一、常用类应用 1.Console类控制台 2.窗体基本控件 二、面向对象程序设计 1.类和对象 2.对象的属性、方法、事件属 1.属性 2.方法 3.事件、事件过程 1.事件 2.事件过程 3.对象浏览器 三、.NET类库与命名空间 1.命名空间 常用命名空间 1.System命名空间 2.…

scala 权限

一.访问权限 idea实例 关于protected:

短视频矩阵系统源码开发分享/源代码部署/oem贴牌搭建分享

短视频矩阵软件开发综述 抖音短视频SEO矩阵系统源码是一款在高速数据处理和分析方面表现卓越的系统。它结合了深度学习、大数据分析和可视化等多种先进技术&#xff0c;极大地提升了信息处理的效率与准确性。 短视频矩阵软件系统的开发需要多方面的技术支持&#xff0c;涵盖了…

Linux——— 信号

文章目录 前言&#xff1a;引入信号生活中的例子信号概念见一见Linux中的信号 浅度理解信号信号处理&#xff08;浅谈&#xff09;:如何自定义捕捉 信号保存&#xff08;浅谈&#xff09; 信号产生系统调用产生异常产生&#xff1a;浅谈除0异常浅谈解引用野指针异常Core &&…

resources下lib文件中的jar包怎么添加到git

这里讲怎么处理这部分的问题&#xff1a; 1&#xff1a;java maven resource 目录下的jar无法被添加到git 2&#xff1a;使用git命令添加jar包时报错&#xff1a;The following paths are ignored by one of your .gitignore files: ***&#xff0c;use -if **** 上面都是相同…

快速入门kotlin编程(精简但全面版)

注&#xff1a;本文章为个人学习记录&#xff0c;如有错误&#xff0c;欢迎留言指正。 目录 1. 变量 1.1 变量声明 1.2 数据类型 2. 函数 3. 判断语句 3.1 if 3.2 when语句 4. 循环语句 4.1 while 4.2 for-in 5. 类和对象 5.1 类的创建和对象的初始化 5.2 继承 5…

部署Leanote 蚂蚁笔记

目录 选择leanote的原因环境参考下载部署安装mongodb恢复mongodb数据mongodb创建用户编辑app.conf启动编写快捷启动脚本&#xff0c;start.sh stop.shmongodb的备份与恢复编写脚本(备份leanote)leanote自带的备份与恢复 配置pdf导出 选择leanote的原因 Leanote 虽然最后一次更…

MATLAB——入门知识

内容源于b站清风数学建模 数学建模清风老师《MATLAB教程新手入门篇》https://www.bilibili.com/video/BV1dN4y1Q7Kt/ 目录 1.帮助文档 2.注释 3.特殊字符 4.设置MATLAB数值显示格式 4.1.临时更改 4.2.永久改 5.常用函数 6.易错点 1.帮助文档 doc sum help sum e…

Qt Modbus初识

项目场景&#xff1a; 项目中&#xff0c;需要用modbus与温控器通信&#xff0c;控制面板的加热温度&#xff0c;Qt框架下已经提供了modbus模块 初识Modbus Modbus 协议是一种通信协议&#xff0c;而且是一种开放协议&#xff0c;因此广泛地用于在工业自动化系统中实现设备之…

jenkins搭建及流水线配置

1.安装docker curl https://mirrors.aliyun.com/repo/Centos-7.repo >> CentOS-Base-Aliyun.repomv CentOS-Base-Aliyun.repo /etc/yum.repos.d/yum -y install yum-utils device-mapper-persistent-data lvm2yum-config-manager --add-repo http://mirrors.aliyun.com/…

CSP/信奥赛C++刷题训练:经典前缀和例题(4):洛谷P3662:Why Did the Cow Cross the Road II S

CSP/信奥赛C刷题训练&#xff1a;经典前缀和例题&#xff08;4&#xff09; [USACO17FEB] Why Did the Cow Cross the Road II S 题目描述 The long road through Farmer John’s farm has N N N crosswalks across it, conveniently numbered 1 … N 1 \ldots N 1…N ( 1 …

spring容器的启动流程

spring容器的启动流程是一个面试中比较难答的题目。这块内容比较复杂&#xff0c;回答的时候如果想到什么回答什么&#xff0c;很容易把面试官绕晕。因此比较好的回答方式就是&#xff0c;先理清一个大致的启动流程&#xff0c;再根据面试官的问题细说小点。 这里我们从Annota…

RHCE——DNS域名解析服务器、selinux、防火墙

1、DNS简介 DNS &#xff08; Domain Name System &#xff09;是互联网上的一项服务&#xff0c;它作为将域名和 IP 地址相互映射的一个分布式 数据库&#xff0c;能够使人更方便的访问互联网。 DNS 系统使用的是网络的查询&#xff0c;那么自然需要有监听的 port 。 DNS 使…