KafkaStream:基本使用

简介:

        kafkaStream:提供了对存储在kafka中的数据进行流式处理和分析的功能

特点:

        KafkasSream提供了一个非常简单轻量的Library,它可以非常方便的嵌入到java程序中,也可以任何方式打包部署

入门案例:

  1、新建工程kafka-demo

           引入kafkaStream依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><!--kafkaStream--><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency></dependencies>

   2、新建流式处理类

          代码如下

package com.heima.kafkademo.sample;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*
* 流式处理
* */
public class KafkaStreamQuickStart {public static void main(String[] args) {/*创建kafka配置中心并配置参数*/Properties prop = new Properties();//连接地址prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");//key序列化prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());//value序列化prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());//创建id名称prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");//stream构造器StreamsBuilder streamsBuilder = new StreamsBuilder();//流式计算streamProcessor(streamsBuilder);//创建KafkaStream对象KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);//开启流式计算kafkaStreams.start();}//流式计算方法private static void streamProcessor(StreamsBuilder streamsBuilder) {//创建kafka对象,同时指定从哪个topic获取消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");//处理消息的valuestream.flatMapValues(new ValueMapper<String, Iterable<?>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})      //按照value进行聚合.groupBy((key,value)->value)//时间窗口,每隔10秒更新一次.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//统计单词个数.count()//转换为kStream.toStream().map((key,value)->{System.out.println("key:"+key+",vlaue:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");}
}

3、启动消费者类和流式处理类监听消息

        使用生产者类发送消息

       消费者和生产者类代码参考Kafka:安装和配置_Success___的博客-CSDN博客

4、测试

        成功接收到消息

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/92590.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Kotlin优点及为什么使用Kotlin

文章目录 一 Hello Kotlin二 Kotlin优点三 团队为什么采用 Kotlin 一 Hello Kotlin Kotlin和Andriod 二 Kotlin优点 三 团队为什么采用 Kotlin

安卓:网络框架okhttp

目录 一、okhttp介绍 1. OkHttpClient类&#xff1a; 常用方法&#xff1a; 2. Request类&#xff1a; 常用方法&#xff1a; 3. Response类&#xff1a; 常用方法&#xff1a; 4. Call类&#xff1a; 常用方法&#xff1a; 5. Interceptor接口&#xff1a; 常用方法&…

使用蓝牙外设却不小心把台式机电脑蓝牙关了

起因 今天犯了一个贼SB的错误&#xff0c;起因是蓝牙键盘突然就不能输入了&#xff08;虽然是连接状态&#xff0c;但是按什么键都没有反应&#xff09; 原来我的解决方法就是重启一下电脑&#xff0c;但是那会电脑开了贼多的软件。我就想重启也太麻烦了&#xff0c;既然重启…

Docker 基本管理(一)

目录 一、虚拟化简介 1.1.虚拟化概述 1.2.cpu的时间分片&#xff08;cpu虚拟化&#xff09; 1.3.cpu虚拟化性性能瓶颈 1.4.虚拟化工作原理 1.5 虚拟化类型 1.6 虚拟化功能 ​二、Docker容器概述 2.1 docker是什么&#xff1f; 2.2 使用docker有什么意义&#xff…

支持https访问

文章目录 1. 打开自己的云服务器的 80 和 443 端口2. 安装 nginx3. 安装 snapd4. 安装 certbot5. 生成证书6. 拷贝生成的证书到项目工作目录7. 修改 main.go 程序如下8. 编译程序9. 启动程序10. 使用 https 和端口 8081 访问页面成功11. 下面修改程序&#xff0c;支持 https 和…

读书笔记 |【项目思维与管理】➾ 顺势而动

读书笔记 |【项目思维与管理】➾ 顺势而动 一、企业步入“终结者时代”二、过去成功的经验也许是最可怕的三、做好非重复性的事四、适应客户是出发点五、向知识型企业转变六、速度是决胜条件 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; …

MySQL 函数

mysql 函数语法 create function 函数名&#xff08;参数名 参数类型&#xff0c;。。。&#xff09; returns type —返回值类型 ----returns 有个 s [characteristics…] begin 函数体 ### 函数体中肯定有 return 语句 end 参数列表 指定参数为 IN | out | INOUT 只对存储过程…

【Linux操作系统】举例解释Linux系统编程中文件io常用的函数

在Linux系统编程中&#xff0c;文件IO操作是非常常见和重要的操作之一。通过文件IO操作&#xff0c;我们可以打开、读取、写入和关闭文件&#xff0c;对文件进行定位、复制、删除和重命名等操作。本篇博客将介绍一些常用的文件IO操作函数。 文章目录 1. open()1.1 原型、参数及…

Android:自定义沿着曲线轨迹移动

前言 前几天&#xff0c;后台有老铁留言&#xff0c;说有个需求&#xff0c;画两条曲线&#xff0c;中间是一个小球&#xff0c;沿着两条线中间的轨迹从左往右移动&#xff0c;让提供个思路&#xff0c;做为一个极度宠粉的博主&#xff0c;思路不仅要提供&#xff0c;实现方案也…

八大排序超详解(动图+源码)

&#x1f493;博主个人主页:不是笨小孩&#x1f440; ⏩专栏分类:数据结构与算法&#x1f440; 刷题专栏&#x1f440; C语言&#x1f440; &#x1f69a;代码仓库:笨小孩的代码库&#x1f440; ⏩社区&#xff1a;不是笨小孩&#x1f440; &#x1f339;欢迎大家三连关注&…

【Linux】IO多路转接——select接口

目录 I/O多路转接之select select初识 select函数 socket就绪条件 select基本工作流程 select服务器 select的优点 select的缺点 select的适用场景 I/O多路转接之select select初识 select是系统提供的一个多路转接接口。 select系统调用可以让我们的程序同时监视多…

拦截器和过滤器的区别

&#x1f600;前言 本篇博文是关于拦截器VS 过滤器的分享&#xff0c;希望你能够喜欢&#x1f60a; &#x1f3e0;个人主页&#xff1a;晨犀主页 &#x1f9d1;个人简介&#xff1a;大家好&#xff0c;我是晨犀&#xff0c;希望我的文章可以帮助到大家&#xff0c;您的满意是我…

【机器学习4】构建良好的训练数据集——数据预处理(一)处理缺失值及异常值

数据预处理 &#x1f4ab;数据预处理的重要性&#x1f4ab;处理缺失值⭐️识别表格中的数据⭐️计算每列缺失值的数量⭐️删除含有缺失值的样本或特征⭐️填充缺失值 &#x1f4ab;处理异常值⭐️异常值的鉴别⭐️异常值的处理 &#x1f4ab;将数据集划分为训练数据集和测试数据…

Jmeter-压力测试工具

文章目录 Jmeter快速入门1.1.下载1.2.解压1.3.运行 2.快速入门2.1.设置中文语言2.2.基本用法 Jmeter快速入门 1s内发送大量请求&#xff0c;模拟高QPS&#xff0c;用以测试网站能承受的压力有多大 Jmeter依赖于JDK&#xff0c;所以必须确保当前计算机上已经安装了JDK&#xff0…

使用ip2region获取客户端地区

目录 从gitee拉取ip2region.xdb资源文件 写测试类 注意要写对资源路径 本地测试结果 ​编辑 远端测试结果 从gitee拉取ip2region.xdb资源文件 git clone https://gitee.com/lionsoul/ip2region.git 将xdb放入resources资源文件夹 引入依赖 <dependency><groupId&…

Vue响应式数据的原理

在 vue2 的响应式中&#xff0c;存在着添加属性、删除属性、以及通过下标修改数组&#xff0c;但页面不会自动更新的问题。而这些问题在 vue3 中都得以解决。 vue3 采用了 proxy 代理&#xff0c;用于拦截对象中任意属性的变化&#xff0c;包括&#xff1a;属性的读写、属性的…

Pycharm社区版连接WSL2中的Mysql8.*

当前时间2023.08.13&#xff0c;Windows11中默认的WSL版本已经是2了&#xff0c;在WSL2中默认的Ubuntu版本已经是22.04&#xff0c;而Ubuntu22.04中默认的Mysql版本已经是8.*。 Wsl 2 中安装mysql WSL2中安装Mysql的方法参考自微软官方文档【开始使用适用于 Linux 的 Windows …

《论文阅读12》RandLA-Net: Efficient Semantic Segmentation of Large-Scale Point Clouds

一、论文 研究领域&#xff1a;全监督3D语义分割&#xff08;室内&#xff0c;室外RGB&#xff0c;kitti&#xff09;论文&#xff1a;RandLA-Net: Efficient Semantic Segmentation of Large-Scale Point Clouds CVPR 2020 牛津大学、中山大学、国防科技大学 论文链接论文gi…

C++储备

一、类的 三大特性 封装&#xff0c;继承&#xff0c;多态 二、虚函数 为啥要用到虚函数 C虚函数详解_Whitesad_的博客-CSDN博客 三、函数重载 四、封装的保护权限 1.public 成员类内&#xff0c;内外都可以访问 2.protected 成员&#xff0c;类内可以访问&#xff0c…

两只小企鹅(Python实现)

目录 1 和她浪漫的昨天 2 未来的旖旎风景 3 Python完整代码 1 和她浪漫的昨天 是的,春天需要你。经常会有一颗星等着你抬头去看&#xff1b; 和她一起吹晚风吗﹖在春天的柏油路夏日的桥头秋季的公园寒冬的阳台&#xff1b; 这世界不停开花&#xff0c;我想放进你心里一朵&am…