【Kafka】Kafka Stream简单使用

一、实时流式计算

1. 概念

一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算
在这里插入图片描述
流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界。

2. 应用场景

  • 日志分析: 网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等,实时的进行数据分析,帮助企业进行决策
  • 大屏看板统计: 可以实时的查看网站注册数量,订单数量,购买数量,金额等。
  • 公交实时数据: 可以随时更新公交车方位,计算多久到达站牌等
  • 实时文章分值计算

比如应用较广的 头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐

3. Kafka Stream

近些年来,开源流处理领域涌现出了很多优秀框架。光是在 Apache 基金会孵化的项目,关于流处理的大数据框架就有十几个之多,比如早期的 Apache SamzaApache Storm,以及这些年火爆的 Spark 以及 Flink 等。

3.1 Kafka Streams的特点

  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
  • 除了Kafka外,无任何外部依赖
  • 充分利用Kafka分区机制实现水平扩展顺序性保证
  • 通过可容错的state store实现高效的状态操作(如windowed joinaggregation
  • 支持正好一次处理语义
  • 提供记录级的处理能力,从而实现毫秒级的低延迟
  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

在这里插入图片描述

3.2 关键概念

一个最简单的Streaming的结构如下图所示:
在这里插入图片描述

从一个Topic中读取到数据,经过一些处理操作之后,写入到另一个Topic中,这就是一个最简单的Streaming流式计算。其中,Source Topic中的数据会源源不断的产生新数据。
那么,我们再在上面的结构之上扩展一下,假设定义了多个Source TopicDestination Topic,那就构成如下图所示的较为复杂的拓扑结构:
在这里插入图片描述

  • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器
  • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题
    在这里插入图片描述
    Kafka Streams被认为是开发实时应用程序的最简单方法。它是一个Kafka的客户端API库,编写简单的java就可以实现流式处理。

3.3 KStream

KStream:数据结构类似于map,如下图,key-value键值对

在这里插入图片描述

KStream数据流(data stream),是一段顺序的,可以无限长,不断更新的数据集。
数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。

KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。

二、测试kafkaStream

先看下简单的kafkaStreamKStream测试

需求分析:求单词个数(word count)
在这里插入图片描述

1. pom.xml引入依赖:

       <!-- kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency>

2. 配置文件

server:port: 9991
spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializercompression-type: lz4consumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3. 编写生产者

ProducerQuickStart.java

package com.kafka.sample;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.*;import java.util.Properties;@Slf4j
public class ProducerQuickStart {public static void main(String[] args) {//1. kafka的配置信息Properties prop = new Properties();//kafka的链接信息prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");//配置重试次数prop.put(ProducerConfig.RETRIES_CONFIG, 5);//数据压缩prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");//ack配置  消息确认机制   默认ack=1,即只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应
//        prop.put(ProducerConfig.ACKS_CONFIG,"all");消息key的序列化器prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//消息value的序列化器prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//2. 生产者对象KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);//封装发送的消息ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("itcast-topic-input", "key_001", "hello kafka");//3. 发送消息for (int i = 0; i < 5; i++) {producer.send(producerRecord);}//4. 关闭消息通道  必须关闭,否则消息发不出去producer.close();}
}

4 编写kafkaStream流式处理

KafkaStreamQuickStart.java

package com.kafka.sample;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** 流式处理*/
public class KafkaStreamQuickStart {public static void main(String[] args) {//kafka的配置信心Properties prop = new Properties();prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");//stream 构建器StreamsBuilder streamsBuilder = new StreamsBuilder();//流式计算streamProcessor(streamsBuilder);//创建kafkaStream对象KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);//开启流式计算kafkaStreams.start();}/*** 流式计算* 消息的内容:hello kafka  hello itcast* @param streamsBuilder*/private static void streamProcessor(StreamsBuilder streamsBuilder) {//创建kstream对象,同时指定从那个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");/*** 处理消息的value*/stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})//按照value进行聚合处理.groupBy((key,value)->value)//时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//统计单词的个数.count()//转换为kStream.toStream().map((key,value)->{System.out.println("key:"+key+",vlaue:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");}
}

5. 编写消费者

ConsumerQuickStart.java

package com.kafka.sample;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class ConsumerQuickStart {public static void main(String[] args) {//1. 添加kafka的配置信息Properties properties = new Properties();// 配置链接信息properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");//配置消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-2");//配置消息的反序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//2. 消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);//3. 订阅主题consumer.subscribe(Collections.singletonList("itcast-topic-out"));//当前线程一直监听消息while(true){//4. 消费者拉取消息: 每秒拉取一次ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.key());System.out.println(record.value());}}}
}

启动项目:

  1. 在远端(192.168.200.130:9092)启动docker中的kafka容器
  2. 启动消费者ConsumerQuickStartmain函数
  3. 启动kafkastreammian函数
  4. 启动生产者ProducerQuickStartmain函数

5. 控制台打印结果:

在这里插入图片描述
在这里插入图片描述

整个过程:
生产者向kafka中发送了5条“hello kafka”消息,topic均为itcast-topic-input。kafkastream监听这个topic,每10秒进行一次流式处理,将“hello kakfa”字符串分割,并统计每个单词出现的次数。然后转为kstream,发送消息到kafka中的topic=itcast-topic-out”。消费者监听“itcast-topic-out”的topic,消费消息。

三、Springboot整合kafkaStream

1. 配置文件新增

application.yml

server:port: 9991
spring:application:name: kafka-demokafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializercompression-type: lz4consumer:group-id: ${spring.application.name}-testkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# kafkaStream新增以下配置
kafka:hosts: 192.168.200.130:9092group: ${spring.application.name}

2. 在微服务中新增配置类

KafkaStreamConfig.java

package com.kafka.config;import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;import java.util.HashMap;
import java.util.Map;/*** 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数*/@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;private String hosts;private String group;@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {Map<String, Object> props = new HashMap<>();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");props.put(StreamsConfig.RETRIES_CONFIG, 10);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);}
}

3. 使用kafkaStream监听消息

KafkaStreamHelloListener.java

package com.kafka.stream;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.time.Duration;
import java.util.Arrays;@Configuration
@Slf4j
public class KafkaStreamHelloListener {@Beanpublic KStream<String,String> kStream(StreamsBuilder streamsBuilder){//创建kstream对象,同时指定从那个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})//根据value进行聚合分组.groupBy((key,value)->value)//聚合计算时间间隔.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//求单词的个数.count().toStream()//处理后的结果转换为string字符串.map((key,value)->{System.out.println("key:"+key+",value:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");return stream;}
}

测试:

启动springboot应用程序,运行之前的ProducerQuickStart来生产消息,约10秒后,看到kafkaStream消息的处理结果
在这里插入图片描述

说明kafkaStream接收到消息并将多条消息进行了统一处理。

参考(推荐阅读):

  1. https://cloud.tencent.com/developer/article/2100664
  2. https://www.cnblogs.com/tree1123/p/11457851.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/116600.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Qt应用开发(基础篇)——消息对话框 QMessageBox

一、前言 QMessageBox类继承于QDialog&#xff0c;是一个模式对话框&#xff0c;常用于通知用户或向用户提出问题并接收答案。 对话框QDialog QMessageBox消息框主要由四部分组成&#xff0c;一个主要文本text&#xff0c;用于提醒用户注意某种情况;一个信息文本informativeTex…

深度解析 PostgreSQL Protocol v3.0(一)

引言 PostgreSQL 使用基于消息的协议在前端&#xff08;也可以称为客户端&#xff09;和后端&#xff08;也可以称为服务器&#xff09;之间进行通信。该协议通过 TCP/IP 和 Unix 域套接字支持。 《深度解析 PostgreSQL Protocol v3.0》系列技术贴&#xff0c;将带大家深度了…

视频剪辑音效处理软件有哪些?视频剪辑软件那个好用

音效是视频剪辑的重要部分&#xff0c;能起到画龙点睛的作用。在短视频平台中&#xff0c;一段出彩的音效能将原本平平无奇的视频变得生动有趣。那么&#xff0c;视频剪辑音效处理软件有哪些&#xff1f;本文会给大家介绍好用的音效处理软件&#xff0c;同时也会介绍视频剪辑音…

【Pandas 入门-5】Pandas 画图

Pandas 画图 除了结合 matplotlib 与 seaborn 画图外&#xff0c;Pandas 也有自己的画图函数plot&#xff0c;它的语法一般为&#xff1a; DataFrame.plot(xNone,yNone, kindline,subplotsFalse, titleNone)x横坐标数据y纵坐标数据kind默认是线图&#xff0c;还可以是‘bar’…

深入理解 JVM 之——动手编译 JDK

更好的阅读体验 \huge{\color{red}{更好的阅读体验}} 更好的阅读体验 本篇为深入理解 Java 虚拟机第一章的实战内容&#xff0c;推荐在学习前先掌握基础的 Linux 操作、编译原理基础以及扎实的 C/C 功底。 该系列的 GitHub 仓库&#xff1a;https://github.com/Doge2077/lear…

C语言 - 结构体、结构体数组、结构体指针和结构体嵌套

结构体的意义 问题&#xff1a;学籍管理需要每个学生的下列数据&#xff1a;学号、姓名、性别、年龄、分数&#xff0c;请用 C 语言程序存储并处理一组学生的学籍。 单个学生学籍的数据结构&#xff1a; 学号&#xff08;num&#xff09;&#xff1a; int 型姓名&#xff08;…

常见网络通信协议(http、https、ws)及安全协议(SSL、TLS、XTLS)

文章内容删除了一大半不合适的内容&#xff0c;发不出来&#xff0c;你懂得。&#x1f970; 一、常见网络通信协议1.1、HTTP 协议1.11 HTTP 协议简介1.12 HTTP 协议的工作流程1.13 HTTP 协议的常用方法1.14 HTTP 协议的常见状态码1.15 HTTP 的缺点 1.2 HTTPS 协议1.21 HTTPS 协…

结合OB Cloud区别于MySQL的4大特性,规划降本方案

任何一家企业想要获得持续性的发展与盈利&#xff0c;“降本增效”都是难以绕开的命题。但是“一刀切”的降本影响往往不太可控&#xff0c;成本的快速收缩往往会给业务带来低效运营和增长缓慢的风险。所以我们所说的降本&#xff0c;是指在成本降低的同时&#xff0c;效率不降…

js 正则表达式 验证 :页面中一个输入框,可输入1个或多个vid/pid,使用英文逗号隔开...

就是意思一个输入框里面&#xff0c;按VID/PID格式输入,VID和PID最大长度是4,最多50组 1、页面代码 <el-form ref"ruleForm" :model"tempSet" :rules"rules" label-position"right"> <!-- 最多 50组&#xff0c;每组9个字符…

【C语言】字符函数,字符串函数,内存函数

大家好&#xff01;今天我们来学习C语言中的字符函数&#xff0c;字符串函数和内存函数。 目录 1. 字符函数 1.1 字符分类函数 1.2 字符转换函数 1.2.1 tolower&#xff08;将大写字母转化为小写字母&#xff09; 1.2.2 toupper&#xff08;将小写字母转化为大写字母&…

ZigBee案例笔记 -- RFID卡片读写(模拟饭卡)

RFID模拟饭卡应用 RFID&#xff08;射频识别技术&#xff09;RFID通讯协议RFID发展历史RFID操作流程说明RFID卡片读写流程RFID寻卡RFID防碰撞RFID选卡RFID卡密验证RFID读卡RFID写卡读写数据流程 RFID饭卡模拟案例驱动代码串口协议饭卡操作案例结果优化建议 RFID&#xff08;射频…

Cordova Android 生成的 APK 中添加代码混淆

要在 Cordova Android 生成的 APK 中添加代码混淆&#xff0c;你可以按照以下步骤进行操作&#xff1a; 1. 在项目根目录下&#xff0c;找到 platforms/android/ 目录&#xff0c;进入该目录。 2. 打开 build.gradle 文件&#xff0c;并在 android { ... } 部分添加以下代码&…

关于两个不同数据库的两张表建立数据库链接,关联查询数据

一、数据库链接 数据库链接&#xff08;database link&#xff09;是用于跨不同数据库之间进行连接和数据传输的工具或方法。它允许在一个数据库中访问另一个数据库中的对象和数据。 二、具体操作 以Oracle数据库为例 --1.建立链接tjpt CREATE DATABASE LINK tjpt CONNECT…

go语言--锁

锁的基础&#xff0c;go的锁是构建在原子操作和信号锁之上的 原子锁 原子包实现协程的对同一个数据的操作&#xff0c;可以实现原子操作&#xff0c;只能用于简单变量的简单操作&#xff0c;可以把多个操作变成一个操作 sema锁 也叫信号量锁/信号锁 核心是一个uint32值&#…

基于单片机的串行通信发射机设计

一、项目介绍 串行通信是一种常见的数据传输方式&#xff0c;允许将数据以比特流的形式在发送端和接收端之间传输。当前实现基于STC89C52单片机的串行通信发射机&#xff0c;通过红外发射管和接收头实现自定义协议的数据无线传输。 二、系统设计 2.1 单片机选择 在本设计中&…

前端基础2——CSS样式

文章目录 一、使用方式1.1 内联方式1.2 内部方式1.3 外部导入方式&#xff08;推荐&#xff09; 二、选择器类型2.1 元素选择器2.2 ID选择器2.3 类选择器2.4 派生选择器 三、常用属性3.1 内边距和外边距3.2 文本3.3 边框3.4 背景3.5 定位3.6 浮动3.7 字体3.8 其他属性 四、案例…

MySQL 数据库常用命令大全(完整版)

文章目录 1. MySQL命令2. MySQL基础命令3. MySQL命令简介4. MySQL常用命令4.1 MySQL准备篇4.1.1 启动和停止MySQL服务4.1.2 修改MySQL账户密码4.1.3 MySQL的登陆和退出4.1.4 查看MySQL版本 4.2 DDL篇&#xff08;数据定义&#xff09;4.2.1 查询数据库4.2.2 创建数据库4.2.3 使…

DataTable扩展 列转行方法(2*2矩阵转换)

源数据 如图所示 // <summary>/// DataTable扩展 列转行方法&#xff08;2*2矩阵转换&#xff09;/// </summary>/// <param name"dtSource">数据源</param>/// <param name"columnFilter">逗号分隔 如SDateTime,PM25,PM10…

docker desktop安装es 并连接elasticsearch-head:5

首先要保证docker安装成功&#xff0c;打开cmd&#xff0c;输入docker -v&#xff0c;出现如下界面说明安装成功了 下面开始安装es 第一步&#xff1a;拉取es镜像 docker pull elasticsearch:7.6.2第二步&#xff1a;运行容器 docker run -d --namees7 --restartalways -p 9…

ShardingSphere——压测实战

摘要 Apache ShardingSphere 关注于全链路压测场景下&#xff0c;数据库层面的解决方案。 将压测数据自动路由至用户指定的数据库&#xff0c;是 Apache ShardingSphere 影子库模块的主要设计目标。 一、压测背景 在基于微服务的分布式应用架构下&#xff0c;业务需要多个服…