JavaWeb_LeadNews_Day11-KafkaStream实现实时计算文章分数

JavaWeb_LeadNews_Day11-KafkaStream实现实时计算文章分数

  • KafkaStream
    • 概述
    • 案例-统计单词个数
    • SpringBoot集成
  • 实时计算文章分值
  • 来源
  • Gitee

KafkaStream

概述

  • Kafka Stream: 提供了对存储与Kafka内的数据进行流式处理和分析的功能
  • 特点:
    • Kafka Stream提供了一个非常简单而轻量的Library, 它可以非常方便地嵌入任意Java应用中, 也可以任意方式打包和部署
    • 除了Kafka外, 无任何外部依赖
    • 通过可容错地state, store实现高效地状态操作(如windowed join和aggregation)
    • 支持基于事件时间地窗口操作, 并且可处理晚到的数据(late arrival of records)
  • 关键概念:
    • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器.
    • Sink处理器: sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题
  • KStream:
    • 数据结构类似于map, key-value键值对.
    • 一段顺序的, 无限长, 不断更新的数据集.

案例-统计单词个数

  • 依赖
    依赖中有排除部分依赖, 还是一整个放上来好了
    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency>
    </dependencies>
    
  • 流式处理
    public class KafkaStreamQuickStart {public static void main(String[] args) {// Kafka的配置信息Properties prop = new Properties();prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.174.133:9092");prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart");// Stream 构建器StreamsBuilder streamsBuilder = new StreamsBuilder();// 流式计算streamProcessor(streamsBuilder);// 创建KafkaStream对象KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), prop);// 开启流式计算kafkaStreams.start();}/*** 流式计算* 消息的内容: hello kafka* @param streamsBuilder*/private static void streamProcessor(StreamsBuilder streamsBuilder) {// 创建Kstream对象, 同时指定从哪个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");/*** 处理消息的value*/stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {String[] valAry = value.split(" ");return Arrays.asList(valAry);}})// 按照value聚合处理.groupBy((key, value)->value)// 时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))// 统计单词的个数.count()// 转换为KStream.toStream().map((key, value)->{System.out.println("key:"+key+",value:"+value);return new KeyValue<>(key.key().toString(), value.toString());})// 发送消息.to("itcast-topic-out");}
    }
    
  • 发送消息
    for (int i = 0; i < 5; i++) {ProducerRecord<String, String> kvProducerRecord = new ProducerRecord<>("icast-topic-input", "hello kafka");producer.send(kvProducerRecord);
    }
    
  • 接收消息
    // 订阅主题
    consumer.subscribe(Collections.singleton("itcast-topic-out"));
    

SpringBoot集成

  • 配置
    config.java
    /*** 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数*/
    @Setter
    @Getter
    @Configuration
    @EnableKafkaStreams
    @ConfigurationProperties(prefix="kafka")
    public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;private String hosts;private String group;@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {Map<String, Object> props = new HashMap<>();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");props.put(StreamsConfig.RETRIES_CONFIG, 10);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);}
    }
    
    application.yml
    kafka:hosts: 192.168.174.133:9092group: ${spring.application.name}
    
    BeanConfig.java
    @Slf4j
    @Configuration
    public class KafkaStreamHelloListener {@Beanpublic KStream<String, String> KStream(StreamsBuilder streamsBuilder){// 创建Kstream对象, 同时指定从哪个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");/*** 处理消息的value*/stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {String[] valAry = value.split(" ");return Arrays.asList(valAry);}})// 按照value聚合处理.groupBy((key, value)->value)// 时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))// 统计单词的个数.count()// 转换为KStream.toStream().map((key, value)->{System.out.println("key:"+key+",value:"+value);return new KeyValue<>(key.key().toString(), value.toString());})// 发送消息.to("itcast-topic-out");return stream;}
    }
    

实时计算文章分值

  1. nacos: leadnews-behavior配置kafka生产者
    kafka:bootstrap-servers: 192.168.174.133:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
    
  2. 修改leadnews-behavior
    like
    public ResponseResult likesBehavior(LikesBehaviorDto dto) {...UpdateArticleMess mess = new UpdateArticleMess();mess.setArticleId(dto.getArticleId());mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);if(dto.getOperation() == 0){...mess.setAdd(1);}else{...mess.setAdd(-1);}// kafka: 发送消息, 数据聚合kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC, JSON.toJSONString(mess));...
    }
    
    view
    public ResponseResult readBehavior(ReadBehaviorDto dto) {...// kafka: 发送消息, 数据聚合UpdateArticleMess mess = new UpdateArticleMess();mess.setArticleId(dto.getArticleId());mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);mess.setAdd(1);kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC, JSON.toJSONString(mess));...
    }
    
  3. leadnews-article中添加流式聚合处理
    @Slf4j
    @Configuration
    public class HotArticleStreamHandler {@Beanpublic KStream<String, String> KStream(StreamsBuilder streamsBuilder){// 接收消息KStream<String, String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);// 聚合流式处理stream.map((key, value)->{UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);// 重置消息的key和valuereturn new KeyValue<>(mess.getArticleId().toString(), mess.getType().name()+":"+mess.getAdd());})// 根据文章id进行聚合.groupBy((key, value)->key)// 时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))// 自行实现聚合计算.aggregate(// 初始方法, 返回值是消息的valuenew Initializer<String>() {@Overridepublic String apply() {return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";}},// 真正的聚合操作, 返回值是消息的valuenew Aggregator<String, String, String>() {@Overridepublic String apply(String key, String value, String aggValue) {if(StringUtils.isBlank(value)){return aggValue;}String[] aggAry = aggValue.split(",");int col=0, com=0, lik=0, vie=0;for (String agg : aggAry) {String[] split = agg.split(":");/*** 获得初始值, 也是时间窗口内计算之后的值*/switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){case COLLECTION:col = Integer.parseInt(split[1]);break;case COMMENT:com = Integer.parseInt(split[1]);break;case LIKES:lik = Integer.parseInt(split[1]);break;case VIEWS:vie = Integer.parseInt(split[1]);break;}}// 累加操作String[] valAry = value.split(":");switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){case COLLECTION:col += Integer.parseInt(valAry[1]);break;case COMMENT:com += Integer.parseInt(valAry[1]);break;case LIKES:lik += Integer.parseInt(valAry[1]);break;case VIEWS:vie += Integer.parseInt(valAry[1]);break;}String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);System.out.println("文章的id "+key);System.out.println("当前时间窗口内的消息处理结果: "+formatStr);return formatStr;}}, Materialized.as("hot-article-stream-count-001")).toStream().map((key, value)->{return new KeyValue<>(key.key().toString(), formatObj(key.key().toString(), value));})// 发送消息.to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);return stream;}/*** 格式化消息的value数据* @param articleId* @param value* @return*/private String formatObj(String articleId, String value) {ArticleVisitStreamMess mess = new ArticleVisitStreamMess();mess.setArticleId(Long.valueOf(articleId));// COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0String[] valAry = value.split(",");for (String val : valAry) {String[] split = val.split(":");switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){case COLLECTION:mess.setCollect(Integer.parseInt(split[1]));break;case COMMENT:mess.setComment(Integer.parseInt(split[1]));break;case LIKES:mess.setLike(Integer.parseInt(split[1]));break;case VIEWS:mess.setView(Integer.parseInt(split[1]));break;}}log.info("聚合消息处理之后的结果为: {}", JSON.toJSONString(mess));return JSON.toJSONString(mess);}}
  4. leadnews-article添加聚合数据监听器
    @Slf4j
    @Component
    public class ArticleIncrHandlerListener {@Autowiredprivate ApArticleService apArticleService;@KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)public void onMessage(String mess){if(StringUtils.isNotBlank(mess)){ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);apArticleService.updateScore(articleVisitStreamMess);System.out.println(mess);}}}
    
  5. 替换redis中的热点数据
    /*** 更新 文章分值, 缓存中的热点文章数据* @param mess*/
    @Override
    public void updateScore(ArticleVisitStreamMess mess) {// 1. 更新文章的阅读, 点赞, 收藏, 评论的数量ApArticle apArticle = updateArticleBehavior(mess);// 2. 计算文章的分值Integer score = hotArticleService.computeArticleScore(apArticle);score *= 3;// 3. 替换当前文章对应频道的热点数据replaceDataToRedis(ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId(), apArticle, score);// 4. 替换推荐对应的热点数据replaceDataToRedis(ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG, apArticle, score);
    }/*** 替换数据并且存入到redis中* @param HOT_ARTICLE_FIRST_PAGE* @param apArticle* @param score*/
    private void replaceDataToRedis(String HOT_ARTICLE_FIRST_PAGE, ApArticle apArticle, Integer score) {String articleList = cacheService.get(HOT_ARTICLE_FIRST_PAGE);if(StringUtils.isNotBlank(articleList)){List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleList, HotArticleVo.class);boolean flag = true;// 3.1 文章已是热点文章, 则更新文章分数for (HotArticleVo hotArticleVo : hotArticleVoList) {if(hotArticleVo.getId().equals(apArticle.getId())){hotArticleVo.setScore(score);flag = false;break;}}// 3.2 文章还不是热点文章, 则替换分数最小的文章if(flag){if(hotArticleVoList.size() >= 30){// 热点文章超过30条hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1);if(lastHot.getScore() < score){hotArticleVoList.remove(lastHot);HotArticleVo hotArticleVo = new HotArticleVo();BeanUtils.copyProperties(apArticle, hotArticleVo);hotArticleVo.setScore(score);hotArticleVoList.add(hotArticleVo);}}else{// 热点文章没超过30条HotArticleVo hotArticleVo = new HotArticleVo();BeanUtils.copyProperties(apArticle, hotArticleVo);hotArticleVo.setScore(score);hotArticleVoList.add(hotArticleVo);}}// 3.3 缓存到redishotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());cacheService.set(HOT_ARTICLE_FIRST_PAGE, JSON.toJSONString(hotArticleVoList));}
    }/*** 更新文章行为数据* @param mess*/
    private ApArticle updateArticleBehavior(ArticleVisitStreamMess mess) {ApArticle apArticle = getById(mess.getArticleId());apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment()+mess.getComment());apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection()+mess.getCollect());apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes()+mess.getLike());apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews()+mess.getView());updateById(apArticle);return apArticle;
    }
    

来源

黑马程序员. 黑马头条

Gitee

https://gitee.com/yu-ba-ba-ba/leadnews

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/129596.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

星际争霸之小霸王之小蜜蜂(十二)--猫有九条命

系列文章目录 星际争霸之小霸王之小蜜蜂&#xff08;十一&#xff09;--杀杀杀 星际争霸之小霸王之小蜜蜂&#xff08;十&#xff09;--鼠道 星际争霸之小霸王之小蜜蜂&#xff08;九&#xff09;--狂鼠之灾 星际争霸之小霸王之小蜜蜂&#xff08;八&#xff09;--蓝皮鼠和大…

使用navicat for mongodb连接mongodb

使用navicat for mongodb连接mongodb 安装navicat for mongodb连接mongodb 安装navicat for mongodb 上文mongodb7.0安装全过程详解我们说过&#xff0c;在安装的时候并没有勾选install mongodb compass 我们使用navicat去进行可视化的数据库管理 navicat for mongodb下载地址…

系统架构设计师(第二版)学习笔记----系统架构概述

【原文链接】系统架构设计师&#xff08;第二版&#xff09;学习笔记----系统架构概述 文章目录 一、系统架构的定义与发展历程1.1 架构的定义1.2 架构设计的作用1.3 架构设计产生的背景1.4 软件架构的发展历程1.5 模块化开发方法1.6 模块法方法分解模块遵循的原则1.7 软件工程…

【算法】一文带你从浅至深入门dp动态规划

文章目录 一、前言二、动态规划理论基础1、基本概念2、动态规划五部曲【✔】3、出错了如何排查&#xff1f; 三、实战演练&#x1f5e1;0x00 斐波那契数0x01 第N个泰波那契数0x02 爬楼梯0x03 三步问题0x04 使用最小花费爬楼梯⭐解法一解法二 0x05 解码方法* 四、总结与提炼 一、…

英飞凌TC3xx--深度手撕HSM安全启动(三)--TC3xx HSM系统架构

今天聊TC3xx HSM系统,包括所用内核、UCB相关信息、Host和HSM交互方式。 1、HSM系统架构 下图来源于英飞凌官网培训材料。 TC3xx的HSM内核是一颗32位的ARM Cortex M3,主频可达100MHz,支持对称算法AES128、非对称算法PKC(Public Key Crypto) ECC256、Hash SHA2,以及T…

Python的pandas库来实现将Excel文件转换为JSON格式的操作

✅作者简介&#xff1a;2022年博客新星 第八。热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏…

2023 CCF国际AIOps挑战赛,报名倒计时!|截止时间9月15日

智能运维领域最具影响力的专业赛事——2023 CCF国际AIOps挑战赛&#xff0c;自报名启动以来已收到230余支队伍报名&#xff0c;约600余位选手参与本次挑战赛。本次大赛的报名截止时间为9月15日&#xff0c;目前报名已经进入倒计时&#xff0c;请选手们抓紧最后时间报名参赛&…

2023高教社杯数学建模A题思路分析 - 定日镜场的优化设计

# 1 赛题 A 题 定日镜场的优化设计 构建以新能源为主体的新型电力系统&#xff0c; 是我国实现“碳达峰”“碳中和”目标的一项重要 措施。塔式太阳能光热发电是一种低碳环保的新型清洁能源技术[1]。 定日镜是塔式太阳能光热发电站(以下简称塔式电站)收集太阳能的基本组件&…

搭建vue3项目并git管理

搭建vue3项目 采用vue3的create-vue脚手架搭建项目&#xff0c;底层是vite&#xff0c;要求环境 node 16.0及以上&#xff08;node -v检查node版本&#xff09; 在文件夹右键->终端-> npm init vuelatest&#xff0c;输入项目名称&#xff0c;根据需要选择是否装包 src…

COMO-ViT论文阅读笔记

Low-Light Image Enhancement with Illumination-Aware Gamma Correction and Complete Image Modelling Network 这是一篇美团、旷视、深先院、华为诺亚方舟实验室、中国电子科技大学 五个单位合作的ICCV2023的暗图增强论文&#xff0c;不过没有开源代码。 文章的贡献点一个是…

LabVIEW利用纳米结构干电极控制神经肌肉活动

LabVIEW利用纳米结构干电极控制神经肌肉活动 随着人口老龄化&#xff0c;长期护理的必要性变得更加重要&#xff0c;医疗中心的压力开始达到惊人的水平。全球对所有社会和经济部门的认识对于更好地协调卫生和社会服务之间的护理以及为更多的院外治疗提供条件至关重要。 关于医…

[管理与领导-85]:IT基层管理者 - 核心技能 - 高效执行力 - 10 - 高效执行力的9个段位

目录 前言&#xff1a; 一段&#xff1a;准确执行&#xff0c;快速反应&#xff0c;坚决执行 &#xff08;态度很重要&#xff09; 二段&#xff1a;结果导向 苦劳过后&#xff0c;有功劳&#xff08;有结果很重要&#xff09; 三段&#xff1a;有始有终 主动反馈、有始有终…

【C++】day4学习成果:仿写string类等等

1.仿照string类&#xff0c;完成myString 类 代码&#xff1a; #include <iostream> #include <cstring>using namespace std;class myString {private:char *str; //记录c风格的字符串int size; //记录字符串的实际长度public://无参构造myS…

SpringBoot原理-自动配置-原理分析-源码跟踪

自动配置原理 SpringBootApplication 该注解标识在SpringBoot项目的启动类上&#xff0c;是SpringBoot中最为重要的注解&#xff0c;该注解由三个部分组成。 SpringBootConfiguration&#xff1a;该注解与Configuration注解作用一样&#xff0c;用来声明当前类为一个配置类Comp…

VM安装RedHat7虚机ens33网络不显示IP问题解决

1、今天在VMware中安装RedHat7.4虚拟机&#xff0c;网络连接使用的是 NAT 连接方式&#xff0c;刚开始安装成功之后输入ifconfig 还能看到ens33自动分配的IP地址&#xff0c;但是当虚机关机重启后&#xff0c;再查看IP发现原来的ens33网络已经没有了&#xff0c;只变成了这两个…

【大数据】Kafka 入门指南

Kafka 入门指南 1.Kafka 简介2.Kafka 架构3.分区与副本4.偏移量5.消费者组6.总结 1.Kafka 简介 Apache Kafka 是一种高吞吐、分布式的流处理平台&#xff0c;由 LinkedIn 开发并于 2011 年开源。它具有 高伸缩性、高可靠性 和 低延迟 等特点&#xff0c;因此在大型数据处理场景…

Python类的概念

类 类的技术名词解释 ● 类(Class): 用来描述具有相同的属性和方法的对象的集合。它定义了该集合中每个对象所共有的属性和方法。对象是类的实例。 ● 类变量&#xff1a;类变量在整个实例化的对象中是公用的。类变量定义在类中且在函数体之外。类变量通常不作为实例变量使用…

CSP 201312-1 出现次数最多的数

答题 用两个map&#xff0c;一个map记录每个数出现的次数并降序排序&#xff0c;另一个map将次数作为键&#xff0c;数本身作为值&#xff0c;降序排序&#xff0c;搞定 #include<iostream> #include<map> using namespace std; int main(){map<int,int,great…

git 合并分支某次(commit)提交

需求&#xff1a;将develop分支某次提交合并到master上面&#xff0c;其他修改不同步&#xff1b; //切换到master分支 git checkout master //查看develop分支提交记录&#xff0c;获取对应记录哈希值&#xff1b; git log develop // 按上下按钮可以上下查询对应记录&#xf…

获取该虚拟机的所有权失败,主机上的某个应用程序正在使用该虚拟机

点击“openstack-controller”虚机 打开出现如下错误&#xff0c;点击“获取所有权” 点击“取消” 这时候不要删除虚拟机&#xff0c;这种错误一般是由于虚拟机没有正常关闭引起的。 找到openstack-controller的虚拟磁盘文件及配置文件存放的位置&#xff0c;删除openstack-…