Kafka的重要组件,谈谈流处理引擎Kafka Stream

系列文章目录

上手第一关,手把手教你安装kafka与可视化工具kafka-eagle
Kafka是什么,以及如何使用SpringBoot对接Kafka
架构必备能力——kafka的选型对比及应用场景
Kafka存取原理与实现分析,打破面试难关
防止消息丢失与消息重复——Kafka可靠性分析及优化实践


在这里插入图片描述
我们前面介绍了很多kafka本身的特性与设计,也说了不少原理性的内容,本次我们稍微放松一下,来介绍一下 Kafka的一个重要组件—— Kafka Stream

📕作者简介:战斧,从事金融IT行业,有着多年一线开发、架构经验;爱好广泛,乐于分享,致力于创作更多高质量内容
📗本文收录于 kafka 专栏,有需要者,可直接订阅专栏实时获取更新
📘高质量专栏 云原生、RabbitMQ、Spring全家桶 等仍在更新,欢迎指导
📙Zookeeper Redis dubbo docker netty等诸多框架,以及架构与分布式专题即将上线,敬请期待


一、Kafka Stream是什么

1. 简介

Kafka Stream是 Apache Kafka 的一个子项目,它提供了一种简单而快速的方法来对数据流进行处理,是一种无状态的流处理引擎,可以消费Kafka中的数据并将其转换为输出流。Kafka Stream不像其他流处理工具,它是一个Java库,能够快速构建、部署和管理数据流处理任务。

在这里插入图片描述

我们在前面的文章中《Kafka是什么,以及如何使用SpringBoot对接Kafka》 初步接触了kafka的客户端kafka client,当时如果有眼尖的同学应该也注意到了,在使用Spring Initializr创建项目时,就看见了Kafka Stream的身影

在这里插入图片描述

那么Kafka Stream 与 我们当时接触的 Kafka client 有什么联系吗?其实它们的共同点在于他们都是与Kafka集成的API,从逻辑层次来说,Kafka Stream 是建立在 Kafka client 上的,我们在引用 Kafka Stream 时, 其会自带着 Kafka client 的包,如下:

在这里插入图片描述

那它们的作用到底哪不一样呢?具体来说,它们的不同之处可从这几个方面看:

  • 功能不同
    Kafka Stream是用于流处理任务的API,它提供了一种简单而快速的方法来对数据流进行处理。相反,Kafka Client主要用于生产和消费Kafka消息

  • 处理方式不同
    Kafka Client主要依赖于订阅和轮询来消费Kafka消息。而Kafka Stream依赖于数据流的处理,它会自动将Kafka消息转化为数据流,并实时处理这些数据

  • API调用方式不同
    在Kafka Stream中,您需要定义一个拓扑结构,描述如何将输入流转换成输出流,并执行转换。而在Kafka Client中,您需要调用API来发送和接收Kafka消息

  • 应用场景不同
    Kafka Stream适用于实时数据分析、实时预测等需要流处理的场景。而Kafka Client更适用于异步数据传输的场景,例如日志收集、事件处理等。

2. 特点

我们前面说过,流处理引擎做的人也是很多的,常见的比如说Apache FlinkApache Spark StreamingApache Storm 以及阿里参考 Apache Storm 开发的Jstorm。既然有如此多的可选项,为什么还有Kafka Stream这个东西呢?其实说来也简单,就是应用简单+功能丰富

在这里插入图片描述
总计来说,其具备以下特点:

  1. 无需额外征用集群资源
    在传统的流处理中,需要单独的集群进行数据处理,这就意味着需要额外的开销。而Kafka Stream是直接在Kafka集群上执行的,不会征用额外的资源。

  2. 易于使用
    Kafka Stream提供了简单易用的API,使得开发人员可以快速地进行流处理任务的开发。它还支持Java 8中的Lambda表达式,使得代码更加简洁。

  3. 支持丰富的转换操作
    Kafka Stream支持丰富的转换操作,包括过滤、映射、聚合等。这些操作可以被组合使用,以满足不同的处理需求。

二、流程与核心类

1. KStream 和 KTable 概念

我们上面简要介绍了下Kafka Stream的特点。但是,要想明白其流程并正确使用,我们还需要讲两个核心概念,也就是KStreamKTable

  • KStream
    KStream是一个持续不断的流数据记录,每个记录都是一个key-value对,可以被读取、写入和转换。通常,KStream用于处理实时数据流,我们可以直接从kafka集群中指定主题来获取源源不断的数据

  • KTable
    KTable顾名思义,可以看作是一张持久化的、可查询的、支持状态更新的表格。它通常是利用KStream的数据经过一系列转换和聚合操作生成的,KTable可以被读取和更新,但不能被删除。

KStream和KTable是互补的。KStream可以转换成KTable,也可以从KTable中获取值;KTable也可以转换成KStream,我们可以使用下图,看一下是如何针对数据流中,出现的单词进行计数并”落表“的:
在这里插入图片描述

当然,我们还有必要提及一下GlobalKTable,它是一种特殊的KTable,GlobalKTable通常用于处理比较静态的全局数据,例如维护一个全局的用户信息表,而且只在应用程序启动时从Kafka主题中加载所有数据,这意味着需要消耗较大的内存空间。

2. 常用逻辑与转换

我们上面说了KStreamKTable ,在代码里其实也对应了两个类,那这两个类都有些什么方法呢?最重要的,我们想知道,它们是如何互相转换的。

其实关于 KStream ,可能有些同学会想到JDK 里的 Stream ,因为确实很多方法是一致的,所以不用慌张。我们先来介绍下 KStream 的常用方法:

  • filter:过滤数据流中不符合条件的记录。
  • map:将每个记录转换为一个新的记录,可以改变记录的key和value。
  • flatMap:与map类似,可以将一个记录转换为多个新的记录。
  • mapValues:与map类似,但记录的键保留不变,只改变值
  • groupByKey:将记录按key进行分组,生成一个KGroupedStream对象,可以用于聚合操作。
  • reduce:对KGroupedStream对象进行聚合操作。
  • join:将两个KStream对象进行join操作,生成一个新的KStream对象。
  • windowed:对KStream对象进行窗口操作,可以使用时间窗口或大小窗口。
  • aggregate:将当前流中的记录聚合,并生成一个新的KTable。与reduce方法不同,aggregate方法不仅考虑当前记录,还考虑之前记录的聚合结果
  • to:将结果输出到输出主题中

我们举一个小代码段来看下这些方法的使用

KStream<String, String> input = ...;
// 使用filter方法过滤出包含"important"的值
KStream<String, String> filtered = input.filter((key, value) -> value.contains("important"))
// 使用mapValues方法将每个值的长度作为新值。
KStream<String, Integer> mapped = filtered.mapValues(value -> value.length());
// 使用groupBy方法将键值对按键分组,并使用count方法计算每个键出现的次数,将结果存储在KTable中
KTable<String, Integer> counted = mapped.groupBy((key, value) -> key).count(Materialized.as("counts"));
// 使用selectKey方法选取值中"-"前的部分作为新键
KStream<String, String> rekeyed = input.selectKey((key, value) -> value.split("-")[0]);
// 使用leftJoin方法将两个KStream进行左连接,即mapped流和rekeyed流进行连接,
// 连接的条件是两个流中的键相等。连接函数的定义是将两个整型值相加,并将结果作为连接后的流的值
KStream<String, Integer> joined = mapped.leftJoin(rekeyed, (value1, value2) -> value1 + value2);
// 使用groupByKey方法对键值对按键分组,并使用windowedBy方法将窗口大小设置为5分钟,
// 然后使用count方法计算每个键在此时间窗口中出现的次数,最后使用toStream方法将结果
// 转换为KStream类型并将时间窗口的起止时间设置为键,值设置为次数
KStream<String, Long> windowed = input .groupByKey().windowedBy(TimeWindows.of(Duration.ofMinutes(5))).count().toStream().map((key, value) -> new KeyValue<>(key.key(), value));
// 将结果输出到输出主题中
windowed.to("output-topic");

而关于KTable,也有一些常用方法,如下:

  • filter:根据指定的谓词过滤记录,并返回一个新的KTable。谓词是一个接受key和value作为参数的函数,如果返回true,则保留该记录,否则过滤掉。
  • mapValues:对KTable中的每个value执行指定的转换函数,并返回一个新的KTable。
  • groupBy:根据指定的key进行分组,并返回一个KGroupedTable对象,该对象用于执行各种聚合操作。
  • join:将当前KTable与另一个KTable或KStream进行连接,并返回一个新的KTable。
  • toStream:将KTable转换为KStream,并返回一个新的KStream对象。

我们也写一小段代码用于演示:

// 从输入流中建立一个KTable
StreamsBuilder builder = new StreamsBuilder();
KTable<String, String> myKTable = builder.table("input-topic", Materialized.as("ktable-store"));// 1. 执行一些过滤操作,保留包含特定前缀的键
KTable<String, String> filteredKTable = myKTable.filter((key, value) -> key.startsWith("prefix"));// 2. 执行mapValues操作,将每个键值对中的value进行大写转换
KTable<String, String> uppercasedKTable = myKTable.mapValues(e -> e.toUpperCase());// 3. 执行groupBy操作,将键值对按照key的前缀分组
KTable<String, String> groupedKTable = myKTable.groupBy((key, value) -> KeyValue.pair(key.split("_")[0], value)).reduce((aggValue, newValue) -> aggValue + "_" + newValue);// 4. 执行leftJoin操作,将两个KTable进行连接,如果某一个KTable中没有该key,则用null进行填充
KTable<String, String> leftJoinedKTable = myKTable.leftJoin(filteredKTable,(value1, value2) -> value1 + "-" + value2);// 5. 执行toStream操作,将KTable转换为KStream类型
myKTable.toStream().map((key, value) -> KeyValue.pair(key, value.toUpperCase()));

当然,关于上述哪些方法,我们也可以用一张图来概括它们之间的转换关系,如下图,其中的 KGroupedStream 和 KGroupedTable 其实就是KStream 和 KTable 进行聚合操作后的产物
在这里插入图片描述

三、使用场景与Demo

1. 实时数据分析

Kafka Stream可以将实时到达的数据进行处理,以便进行实时数据分析。在这种情况下,Kafka Stream通常会将数据转换为一些有用的信息,以便于更好的理解数据,我们可以举一个简单的示例demo

假设我们有一个数据流,其中包含电影评分信息和电影相关信息。我们的任务是计算出每个电影的平均评分。

首先,我们需要定义输入数据流所需的数据结构。假设我们的数据结构如下:

@Data
public class MovieRating {private String movieId;private float rating;
}@Data
public class Movie {private String movieId;private String title;
}

接下来,我们需要编写Kafka流应用程序。我们可以将其分为三个步骤:

1.从Kafka主题读取电影评分和电影相关信息。
2.以电影ID为键,将电影评分聚合到一个窗口中,并计算平均值。
3.将结果写入新的Kafka主题。

public static void main(String[] args) throws Exception {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "movie-ratings-app");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);final StreamsBuilder builder = new StreamsBuilder();// 步骤1:从kafka主题中读取电影信息及评分// 我们假设主题包含Avro编码的数据KStream<String, MovieRating> ratings = builder.stream("movie-ratings");KStream<String, Movie> movies = builder.stream("movies");// 步骤2: 按电影ID聚合评分并计算平均评分.KTable<Windowed<String>, Double> averageRatings = ratings.groupBy((key, value) -> value.getMovieId()).windowedBy(TimeWindows.of(Duration.ofMinutes(10))).aggregate(() -> new RatingAggregate(0.0, 0L),(key, value, aggregate) -> new RatingAggregate(aggregate.getSum() + value.getRating(), aggregate.getCount() + 1),Materialized.with(Serdes.String(), new RatingAggregateSerde())).mapValues((aggregate) -> aggregate.getSum() / aggregate.getCount()).toStream().groupByKey().windowedBy(TimeWindows.of(Duration.ofMinutes(10))).reduce((value1, value2) -> Math.max(value1, value2),Materialized.with(Serdes.String(), Serdes.Double())).toStream().map((key, value) -> new KeyValue<>(key.key(), value));// 步骤3: 将结果写入一个新的kafka主题.averageRatings.to("average-ratings");final KafkaStreams streams = new KafkaStreams(builder.build(), props);streams.start();
}// 用于聚合评分的辅助类
public static class RatingAggregate {private double sum;private long count;public RatingAggregate(double sum, long count) {this.sum = sum;this.count = count;}public double getSum() {return sum;}public long getCount() {return count;}
}// 序列化与反序列化.
public static class RatingAggregateSerde extends Serdes.WrapperSerde<RatingAggregate> {public RatingAggregateSerde() {super(new JsonSerializer<>(), new JsonDeserializer<>(RatingAggregate.class));}
}

在上面的代码中,我们使用Serdes.String()和SpecificAvroSerde来序列化和反序列化字符串和Avro-encoded对象。我们使用TimeWindows.of(Duration.ofMinutes(10))定义大小为10分钟的窗口。我们使用RatingAggregate类来辅助计算每个电影的平均评分,RatingAggregateSerde类来序列化和反序列化RatingAggregate对象

2. 实时预测

Kafka Stream可以用于实时预测任务,例如在一些应用中,需要根据实时到达的数据来进行预测。Kafka Stream可以使用已有的模型,对实时数据进行预测,从而实现实时的推荐或预测等功能。

还是拿电影举例,我们经常可以看到电影票房的预测,我们可以以此写一个Demo

public class MovieProcessor {private static final String INPUT_TOPIC = "box-office-input";private static final String OUTPUT_TOPIC = "box-office-output";public static void main(String[] args) {// 创建 Kafka Streams 配置Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "box-office-predictor");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());// 创建 Kafka StreamsStreamsBuilder builder = new StreamsBuilder();KStream<String, String> inputStream = builder.stream(INPUT_TOPIC);// 将上映日期转换为毫秒数,并计算出预测票房KTable<Long, Double> boxOfficePrediction = inputStream.mapValues(new ValueMapper<String, Double>() {@Overridepublic Double apply(String value) {String[] fields = value.split(",");long releaseDateMillis = LocalDate.parse(fields[1]).toEpochDay() * 24 * 60 * 60 * 1000;int runtime = Integer.parseInt(fields[2]);double boxOffice = Double.parseDouble(fields[3]);double prediction = boxOffice / (runtime * 60 * 1000.0) * (releaseDateMillis - System.currentTimeMillis());return prediction > 0 ? prediction : 0;}}).groupBy(new KeyValueMapper<String, Double, Long>() {@Overridepublic Long apply(String key, Double value) {return 1L;}}).reduce(new Reducer<Double>() {@Overridepublic Double apply(Double value1, Double value2) {return value1 + value2;}}).mapValues(new ValueMapper<Double, Double>() {@Overridepublic Double apply(Double value) {return value / (24 * 60 * 60 * 1000.0);}});// 将预测结果发送到 Kafka Topic 中boxOfficePrediction.toStream().to("prediction");// 启动 Kafka StreamsKafkaStreams streams = new KafkaStreams(builder.build(), props);streams.start();}
}

四、总结

今天我们学了一些关于Kafka Stream的内容太,知道了它是一种流处理引擎,可以消费Kafka中的数据,进行处理后,还能其转换为输出流。它特点在于不需要额外征用集群资源、易于使用、支持丰富的转换操作。使用场景包括实时数据分析、实时预测等。但其实Kafka Stream的内容还是很多的,我们将在后面的学习中继续讲解

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/197071.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CentOS挂载:解锁文件系统的力量

目录 引言1 挂载简介2 挂载本地分区3 挂载网络共享文件系统4 使用CIFS挂载结论 引言 在CentOS&#xff08;一种基于Linux的操作系统&#xff09;上挂载文件系统是一项常见而重要的任务&#xff0c;无论是将新的磁盘驱动器添加到系统&#xff0c;还是挂载网络共享资源&#xff…

【flutter】使用getx下的GetMaterialApp创建路由和使用时间选择器国际化问题

GetMaterialApp是啥 网上解释说是 MaterialApp Getx properties GetMaterialApp 问题 在使用showDateRangePicker组件的时候&#xff0c; 一直报错 No MaterialLocalizations found 我就愁思是不是GetMaterialApp跟MaterialApp方法不一样的问题&#xff0c;结果不是&#…

城市网吧视频智能监控方案,实现视频远程集中监控

网吧环境较为复杂&#xff0c;电脑设备众多且人员流动性大&#xff0c;极易发生人员或消防事故&#xff0c;亟需改变&#xff0c;TSINGSEE青犀AI智能网吧视频监管方案可以帮助实现对网吧环境和用户活动的实时监控和管理。 1、视频监控系统 在网吧内部布置高清摄像头&#xff0…

【C++入门到精通】右值引用 | 完美转发 C++11 [ C++入门 ]

阅读导航 引言一、左值引用和右值引用1. 什么是左值&#xff1f;什么是左值引用&#xff1f;2. 什么是右值&#xff1f;什么是右值引用&#xff1f;3. move( )函数 二、左值引用与右值引用比较三、右值引用使用场景和意义四、完美转发std::forward 函数完美转发实际中的使用场景…

SSM项目初始化流程与操作概念解释-SpringBoot简化版

文章目录 1.引入概念2.导入依赖3.项目配置4.依照SpringMVC框架构建项目 1.引入概念 例如某一个XX系统&#xff0c;该系统存在前台页面&#xff08;给用户直观看或使用&#xff09;&#xff0c;和后台页面&#xff08;给管理人员调整数据和权限&#xff09;。 这二个页面都通过…

机器学习笔记 - Ocr识别中的文本检测EAST网络概述

一、文本检测 文本检测简单来说就是找到图像中可以出现文本的区域。例如,请参见下图,其中在检测到的文本周围绘制了绿色边框。 在进行文本检测时,你可能会遇到两种情况 具有结构化文本的图像:这是指具有干净/均匀背景和常规字体的图像。文本大多密集,行结构正确,…

uniapp中使用render.js进行openers、arcgis等地图操作

uniapp中使用render.js进行openers、arcgis等地图操作 一、为啥需要render.js render.js主要作用于APP上&#xff0c;因为Uniapp本质为vuejshtml进行开发&#xff0c;整个技术栈还是H5&#xff0c;对DOM元素进行操作。而APP中没用Dom元素这个概念。因此利用render.js这个视图层…

GEM5 Garnet DVFS / NoC DVFS教程:ruby.clk_domain ruby.voltage_domain

简介 gem5中的 NoC部分是Garnet实现的&#xff0c;但是Garnet并没有单独的时钟域&#xff0c;而是保持ruby一致&#xff0c;要做noc的DVFS&#xff0c;便是要改ruby的 改电压 #这里只是生成一个随便变量名&#xff0c;存一下值。改是和频率一起的 userssaved_voltage_domain…

C++二分查找算法:查找和最小的 K 对数字

相关专题 二分查找相关题目 题目 给定两个以 非递减顺序排列 的整数数组 nums1 和 nums2 , 以及一个整数 k 。 定义一对值 (u,v)&#xff0c;其中第一个元素来自 nums1&#xff0c;第二个元素来自 nums2 。 请找到和最小的 k 个数对 (u1,v1), (u2,v2) … (uk,vk) 。 示例 1:…

采用Nexus搭建Maven私服

采用Nexus搭建Maven私服 1.采用docker安装 1.创建数据目录挂载的目录&#xff1a; /usr/local/springcloud_1113/nexus3/nexus-data2.查询并拉取镜像docker search nexus3docker pull sonatype/nexus33.查看拉取的镜像docker images4.创建docker容器&#xff1a;可能出现启动…

【数据结构】树与二叉树(十八):树的存储结构——Father链接结构、儿子链表链接结构

文章目录 5.1 树的基本概念5.1.1 树的定义5.1.2 森林的定义5.1.3 树的术语 5.2 二叉树5.3 树5.3.1 树的存储结构1. 理论基础2. 典型实例 5.3.2 Father链接结构a. 定义树节点结构b. 创建新节点c. 主函数d. 代码整合 5.3.3 儿子链表链接结构a. 定义树节点结构b. 创建新节点c. 添加…

【IDEA 使用easyAPI、easyYapi、Apifox helper等插件时,导出接口文档缺少代码字段注释的相关内容、校验规则的解决方法】

问题 IDEA 使用easyAPI、easyYapi、Apifox helper等插件时&#xff0c;导出的接口文档上面&#xff0c;缺少我们代码里的注解字段&#xff0c;如我们规定了NOTNULL、字段描述等。 问题链接&#xff0c;几个月之前碰到过&#xff0c;并提问了&#xff0c;到现在解决&#xff0c…

Docker之DockerFile解析

DockerFile解析 是什么 Dockerfile是用来构建Docker镜像的文本文件&#xff0c;是由一条条构建镜像所需的指令和参数构成的脚本。 概述 官网 https://docs.docker.com/engine/reference/builder/ 构建三步骤 编写Dockerfile文件 docker build命令构建镜像 docker run依镜像运…

元宇宙3D云展厅应用到汽车销售的方案及特点

为了紧紧抓住年轻消费者的需求&#xff0c;汽车销售行业也正在经历一场深刻的变革。在这个变革的前沿&#xff0c;元宇宙3D汽车展厅作为一项全新技术闪亮登场&#xff0c;打破了传统汽车销售模式的限制&#xff0c;为消费者带来了前所未有的购车体验。 元宇宙3D汽车展厅采用了尖…

进程概述

文章目录 计算机算机组成因特尔CPU型号摩尔定律衡量CPU的指标指令&#xff08;Instruction)操作系统&#xff08;Operating System&#xff09;虚拟地址空间&#xff08;Virtual Address Space&#xff09;进程(Process/task)进程管理(PCB - 进程控制块)进程控制块&#xff08;…

基于SSM的古董拍卖系统

基于SSM的古董拍卖系统的设计与实现~ 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringMyBatisSpringMVC工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 主页 拍卖界面 管理员界面 摘要 古董拍卖系统是一个基于SSM框架&#xff08;Spring …

JRC Monthly Water History, v1.4数据集

简介&#xff1a; JRC Monthly Water History产品&#xff0c;是利用1984至2020年获取的landsat5、landsat7和landsat8的卫星影像&#xff0c;生成的一套30米分辨率的全球地表水覆盖的月度地表水监测地图集。该数据集共有442景数据&#xff0c;包含1984年3月至2020年12月间的月…

Kubernetes Dashboard部署ImagePullBackOff问题处理

通常&#xff0c;出现ImagePullBackOff问题是由于Kubernetes集群无法拉取所需的镜像导致的。解决这个问题的方法通常包括以下步骤&#xff1a; 1. 检查Pod的描述信息&#xff1a; kubectl describe pod/[pod名称] --namespacekubernetes-dashboard 查看Events部分是否有关于…

项目踩坑之面试遇到的问题及解决

第一点&#xff1a; 问题 遇到的问题之&#xff1a;在实现后台管理端-账户操作的时候&#xff0c;添加员工的时候如果重复添加同一个员工&#xff0c;会触发一个数据库唯一约束异常&#xff0c;但客户端无法清晰的理解这个错误&#xff0c;所以我们就对新增员工的代码进行try…

4.1 Windows驱动开发:内核中进程与句柄互转

在内核开发中&#xff0c;经常需要进行进程和句柄之间的互相转换。进程通常由一个唯一的进程标识符&#xff08;PID&#xff09;来标识&#xff0c;而句柄是指对内核对象的引用。在Windows内核中&#xff0c;EProcess结构表示一个进程&#xff0c;而HANDLE是一个句柄。 为了实…