Kafka系列(四)

本文接kafka三,代码实践kafkaStream的应用,用来完成流式计算。

kafkastream

        关于流式计算也就是实时处理,无时间概念边界的处理一些数据。想要更有性价比地和java程序进行结合,因此了解了kafka。但是本人阅读了kafka地官网,觉得可阅读性并不是很高,当然是个人认为,就是界面做的就不是很舒服。

简介

简介一下kafkaStream

Kafka Stream的特点
  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署

  • 除了Kafka外,无任何外部依赖

  • 充分利用Kafka分区机制实现水平扩展和顺序性保证(想要保证消息有序性就要设置一个分区)

  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)

  • 支持正好一次处理语义

  • 提供记录级的处理能力,从而实现毫秒级的低延迟

  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)

  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

关键概念
  • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。

  • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题

Kstream

(1)数据结构类似于map,key-value键值对

KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。 数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。

KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。

为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:

(“ alice”,1)->(“” alice“,3)

如果流处理应用是要总结每个用户的价值,它将返回alice,4。因为第二条数据记录不会覆盖第一条,而是做了一个insert,累加。

代码实现

依赖
       <!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency>
kafkaStream配置类

需要在nacos的配置里面配置hosts属性和group,本地等怎么配置都可以,只要能读取到就行。


/*** 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数*/@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;private String hosts;private String group;@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {Map<String, Object> props = new HashMap<>();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");props.put(StreamsConfig.RETRIES_CONFIG, 10);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);}
}

这里生产者和消费者我就不再举例子了,直接举中间这个stream怎么写。

stream需要知道是谁发的,所以生产者和stream需要绑定一个相同的主题,而stream需要知道要给谁发送过去,消费者知道是谁发的,所以stream和消费者又有一个相同的主题。

streamhandler代码

具体的每一行代码的含义结合个人理解都在注释里面。

package com.neu.article.stream;import com.alibaba.fastjson.JSON;import com.neu.base.constants.HotArticleConstants;
import com.neu.base.model.mess.ArticleVisitStreamMess;
import com.neu.base.model.mess.UpdateArticleMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.time.Duration;@Configuration
@Slf4j
public class HotArticleStreamHandler {@Beanpublic KStream<String,String> kStream(StreamsBuilder streamsBuilder){//接收消息KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);//聚合流式处理stream.map((key,value)->{UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);//重置消息的key:1234343434(文章id)   和  value: likes:1 当前文章点赞一次//mess.getType().name():用于区分是点赞还是阅读 mess.getAdd():用于区分是加1还是减1return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());})//按照文章id进行聚合.groupBy((key,value)->key)//时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))/*** 自行的完成聚合的计算*/.aggregate(new Initializer<String>() {/*** 初始方法,返回值是消息的value->aggValue,聚合之后的value* @return*/@Overridepublic String apply() {return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";}/*** 真正的聚合操作,返回值是消息的value*/}, new Aggregator<String, String, String>() {/**** @param key 消息的key :mess.getArticleId().toString()* @param value  消息的value likes:1* @param aggValue   初始化消息聚合后的一个值 COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0* @return*/@Overridepublic String apply(String key, String value, String aggValue) {System.out.println(value);if(StringUtils.isBlank(value)){return aggValue;}String[] aggAry = aggValue.split(",");int col = 0,com=0,lik=0,vie=0;for (String agg : aggAry) {//agg遍历第一次的时候最开始为 COLLECTION:0String[] split = agg.split(":");//split[0] = COLLECTION  split[1] = 0/*** 获得初始值,也是时间窗口内计算之后的值*/switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){case COLLECTION:col = Integer.parseInt(split[1]);break;case COMMENT:com = Integer.parseInt(split[1]);break;case LIKES:lik = Integer.parseInt(split[1]);break;case VIEWS:vie = Integer.parseInt(split[1]);break;}}/*** 累加操作   likes:1*/String[] valAry = value.split(":");//valAry[0] = likes  valAry[1] = 1switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){case COLLECTION:col += Integer.parseInt(valAry[1]);break;case COMMENT:com += Integer.parseInt(valAry[1]);break;case LIKES:lik += Integer.parseInt(valAry[1]);break;case VIEWS:vie += Integer.parseInt(valAry[1]);break;}//返回值是有要求的,必须与初始化apply方法的返回值形式一致String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);System.out.println("文章的id:"+key);System.out.println("当前时间窗口内的消息处理结果:"+formatStr);return formatStr;}//Materialized.as("hot-atricle-stream-count-001"):用于指定六十处理的状态,字符串可以随便给,多个流处理的话不重复就行}, Materialized.as("hot-atricle-stream-count-001")).toStream().map((key,value)->{//key.key().toString():文章id,value:COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));})//发送消息.to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);return stream;}/*** 格式化消息的value数据* @param articleId* @param value* @return*/public String formatObj(String articleId,String value){ArticleVisitStreamMess mess = new ArticleVisitStreamMess();mess.setArticleId(Long.valueOf(articleId));//COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0String[] valAry = value.split(",");for (String val : valAry) {String[] split = val.split(":");switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){case COLLECTION:mess.setCollect(Integer.parseInt(split[1]));break;case COMMENT:mess.setComment(Integer.parseInt(split[1]));break;case LIKES:mess.setLike(Integer.parseInt(split[1]));break;case VIEWS:mess.setView(Integer.parseInt(split[1]));break;}}log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));return JSON.toJSONString(mess);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/239853.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【每日一题】2744. 最大字符串配对数目-2024.1.17

题目&#xff1a; 2744. 最大字符串配对数目 给你一个下标从 0 开始的数组 words &#xff0c;数组中包含 互不相同 的字符串。 如果字符串 words[i] 与字符串 words[j] 满足以下条件&#xff0c;我们称它们可以匹配&#xff1a; 字符串 words[i] 等于 words[j] 的反转字符…

Pytorch各种Dropout层应用于详解

目录 torch框架Dropout functions详解 dropout 用途 用法 使用技巧 参数 数学理论公式 代码示例 alpha_dropout 用途 用法 使用技巧 参数 数学理论公式 代码示例 feature_alpha_dropout 用途 用法 使用技巧 参数 数学理论 代码示例 dropout1d 用途 用…

Windows无法登录管理路由器故障排查

问题描述 家里的路由器使用拨号上网&#xff0c;路由器DHCP分发IP的范围是192.168.1.0/24。默认使用192.168.1.1管理路由器。然后拨号上网成功后&#xff0c;修改了私网IP的分发范围&#xff1a;192.168.5.1-192.168.5.10。为了防止有人蹭网&#xff0c;只分配的10个IP地址。修…

rust跟我学三:文件时间属性获得方法

图为RUST吉祥物 大家好,我是get_local_info作者带剑书生,这里用一篇文章讲解get_local_info是怎样获得杀毒软件的病毒库时间的。 首先,先要了解get_local_info是什么? get_local_info是一个获取linux系统信息的rust三方库,并提供一些常用功能,目前版本0.2.4。详细介绍地址…

推荐几个Github高星GoLang管理系统

在Web开发领域&#xff0c;Go语言&#xff08;Golang&#xff09;以其高效、简洁、高并发等特性逐渐成为许多开发者的首选语言。有许多优秀的Go语言Web后台管理系统&#xff0c;这些项目星星众多&#xff0c;提供了丰富的功能和良好的代码质量。本文将介绍一些GitHub高星的GoLa…

新品新品新品来袭PFA大口试剂瓶100ml

大口瓶&#xff0c;方便清洗&#xff0c;满足颗粒数要求。

excel(vab)删除空行

删除第一、二、三列位空的所有行&#xff08;8000)行范围以内 代码如下&#xff1a; Sub Macro1()Dim hang As Integer For hang 8000 To 1 Step -1If Sheet1.Cells(hang, 1) "" And Sheet1.Cells(hang, 2) "" And Sheet1.Cells(hang, 3) "&quo…

SDRAM小项目——命令解析模块

简单介绍&#xff1a; 在FPGA中实现命令解析模块&#xff0c;命令解析模块的用来把pc端传入FPGA中的数据分解为所需要的数据和触发命令&#xff0c;虽然代码不多&#xff0c;但是却十分重要。 SDRAM的整体结构如下&#xff0c;可以看出&#xff0c;命令解析模块cmd_decode负责…

民营经济迎来新发展,创维汽车创始人黄宏生谈创业之道

2024年1月15日&#xff0c;上海高金金融研究院民营经济研究中心高净值研究院年度大咖论坛正式召开&#xff0c;多位来自不同行业的优秀民营企业家在本次论坛上分享企业的创新与发展之道。创维集团、创维汽车创始人黄宏生先生作为本次论坛的首位分享嘉宾&#xff0c;为其他奋斗创…

【技术分享】远程透传网关-单网口快速实现三菱 FX3U 网口PLC程序远程上下载

准备工作 一台可联网操作的电脑一台单网口的远程透传网关及博达远程透传配置工具网线一条&#xff0c;用于实现网络连接和连接PLC一台三菱 FX3U PLC及其编程软件一张4G卡或WIFI天线实现通讯(使用4G联网则插入4G SIM卡&#xff0c;WIFI联网则将WIFI天线插入USB口&#xff09; …

Docker部署Traefik结合内网穿透远程访问Dashboard界面

文章目录 前言1. Docker 部署 Trfɪk2. 本地访问traefik测试3. Linux 安装cpolar4. 配置Traefik公网访问地址5. 公网远程访问Traefik6. 固定Traefik公网地址 前言 Trfɪk 是一个云原生的新型的 HTTP 反向代理、负载均衡软件&#xff0c;能轻易的部署微服务。它支持多种后端 (D…

P9852 [ICPC2021 Nanjing R] Windblume Festival 题解(SPJ)

[ICPC2021 Nanjing R] Windblume Festival 单击此处下载原神 题面翻译 给一个长度为 n n n 环形整数序列 a a a, 每次操作可以任意选择一个下标 x x x&#xff0c;令 $ a_x a_x - a_{(x\bmod n)1}$&#xff0c;之后移除 a ( x m o d n ) 1 a_{(x\bmod n)1} a(xmodn)1​…

MDT的驱动管理和自动匹配

在企业的生产环境中&#xff0c;我们经常会遇到在一个公司中&#xff0c;有很多电脑不是同一个品牌的。有DELL&#xff0c; Lenovo等等诸多品牌。 如果是在小的企业或者组织里&#xff0c;计算机型号单一&#xff0c;我们就可以直接导入驱动&#xff0c;然后直接部署到系统里。…

探索Vue3:深入理解响应式语法糖

🚀 欢迎来到我的专栏!专注于Vue3的实战总结和开发实践分享,让你轻松驾驭Vue3的奇妙世界! 🌈✨在这里,我将为你呈现最新的Vue3技术趋势,分享独家实用教程,并为你解析开发中的难题。让我们一起深入Vue3的魅力,助力你成为Vue大师! 👨‍💻💡不再徘徊,快来关注…

代码随想录算法训练营第三十六天|435. 无重叠区间、763.划分字母区间、56. 合并区间

题目&#xff1a;435. 无重叠区间 文章链接&#xff1a;代码随想录 视频链接&#xff1a;LeetCode:435.无重叠区间 题目链接&#xff1a;力扣题目链接 图释&#xff1a; class Solution { public:static bool cmp(const vector<int>&a, const vector<int>…

Oracle篇—实例中和name相关参数的区别和作用

☘️博主介绍☘️&#xff1a; ✨又是一天没白过&#xff0c;我是奈斯&#xff0c;DBA一名✨ ✌✌️擅长Oracle、MySQL、SQLserver、Linux&#xff0c;也在积极的扩展IT方向的其他知识面✌✌️ ❣️❣️❣️大佬们都喜欢静静的看文章&#xff0c;并且也会默默的点赞收藏加关注❣…

计算机导论07-算法和数据结构

文章目录 算法基础算法及其特性算法的概念算法与程序算法表示 算法的描述自然语言流程图盒图&#xff08;N-S图&#xff09;伪代码程序设计语言 算法评价算法的衡量标准算法的规模时间复杂度空间复杂度 数据结构数据结构的概念数据的逻辑结构数据的存储结构数据的基本操作 常用…

❤ React报错问题分析

❤ React报错问题分析 ❤️ You passed a second argument to root.render(…) but it only accepts one argument. You passed a second argument to root.render(…) but it only accepts one argument. react-dom.development.js:86 Warning: You passed a second argumen…

drools开源规则引擎介绍以及在Centos上的具体部署方案,让你的业务规则能够独立于应用程序本身

Drools是一个基于Java的开源规则引擎&#xff0c;用于处理业务规则和复杂事件处理。它提供了一个声明性的规则语言&#xff0c;允许开发人员定义业务规则&#xff0c;并通过引擎执行这些规则。以下是Drools规则引擎的简介和一些应用场景描述。 Drools规则引擎简介 规则引擎概述…

Apache StringUtils:Java字符串处理工具类

简介 在我们的代码中经常需要对字符串判空&#xff0c;截取字符串、转换大小写、分隔字符串、比较字符串、去掉多余空格、拼接字符串、使用正则表达式等等。如果只用 String 类提供的那些方法&#xff0c;我们需要手写大量的额外代码&#xff0c;不然容易出现各种异常。现在有…