11-新热文章-实时计算

热点文章-实时计算

1 今日内容

1.1 定时计算与实时计算

在这里插入图片描述

1.2 今日内容

kafkaStream

  • 什么是流式计算

  • kafkaStream概述

  • kafkaStream入门案例

  • Springboot集成kafkaStream

实时计算

  • 用户行为发送消息

  • kafkaStream聚合处理消息

  • 更新文章行为数量

  • 替换热点文章数据

2 实时流式计算

2.1 概念

一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。

在这里插入图片描述

流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界。

2.2 应用场景

  • 日志分析

    网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等,实时的进行数据分析,帮助企业进行决策

  • 大屏看板统计

    可以实时的查看网站注册数量,订单数量,购买数量,金额等。

  • 公交实时数据

    可以随时更新公交车方位,计算多久到达站牌等

  • 实时文章分值计算

    头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐。

2.3 技术方案选型

  • Hadoop

    在这里插入图片描述

  • Apche Storm

    Storm 是一个分布式实时大数据处理系统,可以帮助我们方便地处理海量数据,具有高可靠、高容错、高扩展的特点。是流式框架,有很高的数据吞吐能力。

  • Kafka Stream

    可以轻松地将其嵌入任何Java应用程序中,并与用户为其流应用程序所拥有的任何现有打包,部署和操作工具集成。

3 Kafka Stream

3.1 概述

Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。

Kafka Stream的特点如下:

  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
  • 除了Kafka外,无任何外部依赖
  • 充分利用Kafka分区机制实现水平扩展和顺序性保证
  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
  • 支持正好一次处理语义
  • 提供记录级的处理能力,从而实现毫秒级的低延迟
  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

在这里插入图片描述

3.2 Kafka Streams的关键概念

  • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。
  • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题

在这里插入图片描述

3.3 KStream

(1)数据结构类似于map,如下图,key-value键值对

在这里插入图片描述

(2)KStream

在这里插入图片描述

KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。
数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。

KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。

为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:

(“ alice”,1)->(“” alice“,3)

如果您的流处理应用是要总结每个用户的价值,它将返回4alice。为什么?因为第二条数据记录将不被视为先前记录的更新。(insert)新数据

3.4 Kafka Stream入门案例编写

(1)需求分析,求单词个数(word count)

在这里插入图片描述

(2)引入依赖

在之前的kafka-demo工程的pom文件中引入

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions>
</dependency>

(3)创建原生的kafka staream入门案例

package com.heima.kafka.sample;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;/*** 流式处理*/
public class KafkaStreamQuickStart {public static void main(String[] args) {//kafka的配置信心Properties prop = new Properties();prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");//stream 构建器StreamsBuilder streamsBuilder = new StreamsBuilder();//流式计算streamProcessor(streamsBuilder);//创建kafkaStream对象KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);//开启流式计算kafkaStreams.start();}/*** 流式计算* 消息的内容:hello kafka  hello itcast* @param streamsBuilder*/private static void streamProcessor(StreamsBuilder streamsBuilder) {//创建kstream对象,同时指定从那个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");/*** 处理消息的value*/stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})//按照value进行聚合处理.groupBy((key,value)->value)//时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//统计单词的个数.count()//转换为kStream.toStream().map((key,value)->{System.out.println("key:"+key+",vlaue:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");}
}

(4)测试准备

  • 使用生产者在topic为:itcast_topic_input中发送多条消息

  • 使用消费者接收topic为:itcast_topic_out

结果:

  • 通过流式计算,会把生产者的多条消息汇总成一条发送到消费者中输出

3.5 SpringBoot集成Kafka Stream

(1)自定配置参数

package com.heima.kafka.config;import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;import java.util.HashMap;
import java.util.Map;/*** 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数*/@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;private String hosts;private String group;@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {Map<String, Object> props = new HashMap<>();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");props.put(StreamsConfig.RETRIES_CONFIG, 10);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);}
}

修改application.yml文件,在最下方添加自定义配置

kafka:hosts: 192.168.200.130:9092group: ${spring.application.name}

(2)新增配置类,创建KStream对象,进行聚合

package com.heima.kafka.stream;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.time.Duration;
import java.util.Arrays;@Configuration
@Slf4j
public class KafkaStreamHelloListener {@Beanpublic KStream<String,String> kStream(StreamsBuilder streamsBuilder){//创建kstream对象,同时指定从那个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})//根据value进行聚合分组.groupBy((key,value)->value)//聚合计算时间间隔.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//求单词的个数.count().toStream()//处理后的结果转换为string字符串.map((key,value)->{System.out.println("key:"+key+",value:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//发送消息.to("itcast-topic-out");return stream;}
}

测试:

​ 启动微服务,正常发送消息,可以正常接收到消息

3 app端热点文章计算

3.1 思路说明

在这里插入图片描述

3.2 功能实现

3.2.1 用户行为(阅读量,评论,点赞,收藏)发送消息,以阅读和点赞为例

①在heima-leadnews-behavior微服务中集成kafka生产者配置

修改nacos,新增内容

spring:application:name: leadnews-behaviorkafka:bootstrap-servers: 192.168.200.130:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer

②修改ApLikesBehaviorServiceImpl新增发送消息

定义消息发送封装类:UpdateArticleMess

package com.heima.model.mess;import lombok.Data;@Data
public class UpdateArticleMess {/*** 修改文章的字段类型*/private UpdateArticleType type;/*** 文章ID*/private Long articleId;/*** 修改数据的增量,可为正负*/private Integer add;public enum UpdateArticleType{COLLECTION,COMMENT,LIKES,VIEWS;}
}

topic常量类:

package com.heima.common.constants;public class HotArticleConstants {public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";}

完整代码如下:

package com.heima.behavior.service.impl;import com.alibaba.fastjson.JSON;
import com.heima.behavior.service.ApLikesBehaviorService;
import com.heima.common.constants.BehaviorConstants;
import com.heima.common.constants.HotArticleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.behavior.dtos.LikesBehaviorDto;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.mess.UpdateArticleMess;
import com.heima.model.user.pojos.ApUser;
import com.heima.utils.thread.AppThreadLocalUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;@Service
@Transactional
@Slf4j
public class ApLikesBehaviorServiceImpl implements ApLikesBehaviorService {@Autowiredprivate CacheService cacheService;@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;@Overridepublic ResponseResult like(LikesBehaviorDto dto) {//1.检查参数if (dto == null || dto.getArticleId() == null || checkParam(dto)) {return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}//2.是否登录ApUser user = AppThreadLocalUtil.getUser();if (user == null) {return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);}UpdateArticleMess mess = new UpdateArticleMess();mess.setArticleId(dto.getArticleId());mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);//3.点赞  保存数据if (dto.getOperation() == 0) {Object obj = cacheService.hGet(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());if (obj != null) {return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID, "已点赞");}// 保存当前keylog.info("保存当前key:{} ,{}, {}", dto.getArticleId(), user.getId(), dto);cacheService.hPut(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));mess.setAdd(1);} else {// 删除当前keylog.info("删除当前key:{}, {}", dto.getArticleId(), user.getId());cacheService.hDelete(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());mess.setAdd(-1);}//发送消息,数据聚合kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);}/*** 检查参数** @return*/private boolean checkParam(LikesBehaviorDto dto) {if (dto.getType() > 2 || dto.getType() < 0 || dto.getOperation() > 1 || dto.getOperation() < 0) {return true;}return false;}
}

③修改阅读行为的类ApReadBehaviorServiceImpl发送消息

完整代码:

package com.heima.behavior.service.impl;import com.alibaba.fastjson.JSON;
import com.heima.behavior.service.ApReadBehaviorService;
import com.heima.common.constants.BehaviorConstants;
import com.heima.common.constants.HotArticleConstants;
import com.heima.common.redis.CacheService;
import com.heima.model.behavior.dtos.ReadBehaviorDto;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.enums.AppHttpCodeEnum;
import com.heima.model.mess.UpdateArticleMess;
import com.heima.model.user.pojos.ApUser;
import com.heima.utils.thread.AppThreadLocalUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;@Service
@Transactional
@Slf4j
public class ApReadBehaviorServiceImpl implements ApReadBehaviorService {@Autowiredprivate CacheService cacheService;@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;@Overridepublic ResponseResult readBehavior(ReadBehaviorDto dto) {//1.检查参数if (dto == null || dto.getArticleId() == null) {return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);}//2.是否登录ApUser user = AppThreadLocalUtil.getUser();if (user == null) {return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN);}//更新阅读次数String readBehaviorJson = (String) cacheService.hGet(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString());if (StringUtils.isNotBlank(readBehaviorJson)) {ReadBehaviorDto readBehaviorDto = JSON.parseObject(readBehaviorJson, ReadBehaviorDto.class);dto.setCount((short) (readBehaviorDto.getCount() + dto.getCount()));}// 保存当前keylog.info("保存当前key:{} {} {}", dto.getArticleId(), user.getId(), dto);cacheService.hPut(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto));//发送消息,数据聚合UpdateArticleMess mess = new UpdateArticleMess();mess.setArticleId(dto.getArticleId());mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);mess.setAdd(1);kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);}
}
3.2.2 使用kafkaStream实时接收消息,聚合内容

①在leadnews-article微服务中集成kafkaStream (参考kafka-demo)

②定义实体类,用于聚合之后的分值封装

package com.heima.model.article.mess;import lombok.Data;@Data
public class ArticleVisitStreamMess {/*** 文章id*/private Long articleId;/*** 阅读*/private int view;/*** 收藏*/private int collect;/*** 评论*/private int comment;/*** 点赞*/private int like;
}

修改常量类:增加常量

package com.heima.common.constans;public class HotArticleConstants {public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";public static final String HOT_ARTICLE_INCR_HANDLE_TOPIC="hot.article.incr.handle.topic";
}

③ 定义stream,接收消息并聚合

package com.heima.article.stream;import com.alibaba.fastjson.JSON;
import com.heima.common.constants.HotArticleConstants;
import com.heima.model.mess.ArticleVisitStreamMess;
import com.heima.model.mess.UpdateArticleMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.time.Duration;@Configuration
@Slf4j
public class HotArticleStreamHandler {@Beanpublic KStream<String,String> kStream(StreamsBuilder streamsBuilder){//接收消息KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);//聚合流式处理stream.map((key,value)->{UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);//重置消息的key:1234343434   和  value: likes:1return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());})//按照文章id进行聚合.groupBy((key,value)->key)//时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))/*** 自行的完成聚合的计算*/.aggregate(new Initializer<String>() {/*** 初始方法,返回值是消息的value* @return*/@Overridepublic String apply() {return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";}/*** 真正的聚合操作,返回值是消息的value*/}, new Aggregator<String, String, String>() {@Overridepublic String apply(String key, String value, String aggValue) {if(StringUtils.isBlank(value)){return aggValue;}String[] aggAry = aggValue.split(",");int col = 0,com=0,lik=0,vie=0;for (String agg : aggAry) {String[] split = agg.split(":");/*** 获得初始值,也是时间窗口内计算之后的值*/switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){case COLLECTION:col = Integer.parseInt(split[1]);break;case COMMENT:com = Integer.parseInt(split[1]);break;case LIKES:lik = Integer.parseInt(split[1]);break;case VIEWS:vie = Integer.parseInt(split[1]);break;}}/*** 累加操作*/String[] valAry = value.split(":");switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){case COLLECTION:col += Integer.parseInt(valAry[1]);break;case COMMENT:com += Integer.parseInt(valAry[1]);break;case LIKES:lik += Integer.parseInt(valAry[1]);break;case VIEWS:vie += Integer.parseInt(valAry[1]);break;}String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);System.out.println("文章的id:"+key);System.out.println("当前时间窗口内的消息处理结果:"+formatStr);return formatStr;}}, Materialized.as("hot-atricle-stream-count-001")).toStream().map((key,value)->{return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));})//发送消息.to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);return stream;}/*** 格式化消息的value数据* @param articleId* @param value* @return*/public String formatObj(String articleId,String value){ArticleVisitStreamMess mess = new ArticleVisitStreamMess();mess.setArticleId(Long.valueOf(articleId));//COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0String[] valAry = value.split(",");for (String val : valAry) {String[] split = val.split(":");switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){case COLLECTION:mess.setCollect(Integer.parseInt(split[1]));break;case COMMENT:mess.setComment(Integer.parseInt(split[1]));break;case LIKES:mess.setLike(Integer.parseInt(split[1]));break;case VIEWS:mess.setView(Integer.parseInt(split[1]));break;}}log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess));return JSON.toJSONString(mess);}
}
3.2.3 重新计算文章的分值,更新到数据库和缓存中

①在ApArticleService添加方法,用于更新数据库中的文章分值

/*** 更新文章的分值  同时更新缓存中的热点文章数据* @param mess*/
public void updateScore(ArticleVisitStreamMess mess);

实现类方法

/*** 更新文章的分值  同时更新缓存中的热点文章数据* @param mess*/
@Override
public void updateScore(ArticleVisitStreamMess mess) {//1.更新文章的阅读、点赞、收藏、评论的数量ApArticle apArticle = updateArticle(mess);//2.计算文章的分值Integer score = computeScore(apArticle);score = score * 3;//3.替换当前文章对应频道的热点数据replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId());//4.替换推荐对应的热点数据replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG);}/*** 替换数据并且存入到redis* @param apArticle* @param score* @param s*/
private void replaceDataToRedis(ApArticle apArticle, Integer score, String s) {String articleListStr = cacheService.get(s);if (StringUtils.isNotBlank(articleListStr)) {List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleListStr, HotArticleVo.class);boolean flag = true;//如果缓存中存在该文章,只更新分值for (HotArticleVo hotArticleVo : hotArticleVoList) {if (hotArticleVo.getId().equals(apArticle.getId())) {hotArticleVo.setScore(score);flag = false;break;}}//如果缓存中不存在,查询缓存中分值最小的一条数据,进行分值的比较,如果当前文章的分值大于缓存中的数据,就替换if (flag) {if (hotArticleVoList.size() >= 30) {hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1);if (lastHot.getScore() < score) {hotArticleVoList.remove(lastHot);HotArticleVo hot = new HotArticleVo();BeanUtils.copyProperties(apArticle, hot);hot.setScore(score);hotArticleVoList.add(hot);}} else {HotArticleVo hot = new HotArticleVo();BeanUtils.copyProperties(apArticle, hot);hot.setScore(score);hotArticleVoList.add(hot);}}//缓存到redishotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());cacheService.set(s, JSON.toJSONString(hotArticleVoList));}
}/*** 更新文章行为数量* @param mess*/
private ApArticle updateArticle(ArticleVisitStreamMess mess) {ApArticle apArticle = getById(mess.getArticleId());apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection()+mess.getCollect());apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment()+mess.getComment());apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes()+mess.getLike());apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews()+mess.getView());updateById(apArticle);return apArticle;}/*** 计算文章的具体分值* @param apArticle* @return*/
private Integer computeScore(ApArticle apArticle) {Integer score = 0;if(apArticle.getLikes() != null){score += apArticle.getLikes() * ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;}if(apArticle.getViews() != null){score += apArticle.getViews();}if(apArticle.getComment() != null){score += apArticle.getComment() * ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;}if(apArticle.getCollection() != null){score += apArticle.getCollection() * ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;}return score;
}

②定义监听,接收聚合之后的数据,文章的分值重新进行计算

package com.heima.article.listener;import com.alibaba.fastjson.JSON;
import com.heima.article.service.ApArticleService;
import com.heima.common.constants.HotArticleConstants;
import com.heima.model.mess.ArticleVisitStreamMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ArticleIncrHandleListener {@Autowiredprivate ApArticleService apArticleService;@KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)public void onMessage(String mess){if(StringUtils.isNotBlank(mess)){ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);apArticleService.updateScore(articleVisitStreamMess);}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/302015.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

多线程原理详解01(程序、进程、线程介绍,线程创建的三种方式(Thread、Runnable、Callable)、三种方式各自实现多线程的具体操作、代码解析)

目录 多线程原理详解01_线程简介多任务多线程程序、进程、线程Process&#xff08;进程&#xff09;与 Thread &#xff08;线程&#xff09;总结&#xff1a; 02_线程创建三种方式&#xff1a;1、继承 Thread 类1-1&#xff1a;通过继承Thread类实现多线程演示代码 1-2&#x…

如果用大模型考公,kimi、通义千问谁能考高分?

都说大模型要超越人类了&#xff0c;今天就试试让kimi和通义千问做公务员考试题目&#xff0c;谁能考高分&#xff1f; 测评结果再次让人震惊&#xff01; 问题提干&#xff1a;大小两种规格的盒装鸡蛋&#xff0c;大盒装23个&#xff0c;小盒装16个&#xff0c;采购员小王买了…

LangChain入门:14.LLMChain:最简单的链的使用

摘要 本文将介绍LangChain库中LLMChain工具的使用方法。LLMChain将提示模板、语言模型&#xff08;LLM&#xff09;和输出解析器整合在一起&#xff0c;形成一个连贯的处理链&#xff0c;简化了与语言模型的交互过程。我们将探讨LLMChain的技术特点、应用场景以及它解决的问题…

Day84:服务攻防-端口协议桌面应用QQWPS等RCEhydra口令猜解未授权检测

目录 端口协议-口令爆破&未授权 弱口令爆破 FTP&#xff1a;文件传输协议 RDP&#xff1a;Windows远程桌面协议 SSH&#xff1a;Linux安全外壳协议 未授权案例(rsync) 桌面应用-QQ&WPS&Clash QQ RCE 漏洞复现 WPS RCE 漏洞复现 Clas* RCE 漏洞复现 知识点…

Android图形显示架构概览

图形显示系统作为Android系统核心的子系统&#xff0c;掌握它对于理解Android系统很有帮助&#xff0c;下面从整体上简单介绍图形显示系统的架构&#xff0c;如下图所示。 这个框架只包含了用户空间的图形组件&#xff0c;不涉及底层的显示驱动。框架主要包括以下4个图形组件。…

线程间的通信

文章目录 线程间的通讯技术就是通过等待和唤醒机制&#xff0c;来实现多个线程协同操作完成某一项任务&#xff0c;例如经典的生产者和消费者案例。等待唤醒机制其实就是让线程进入等待状态或者让线程从等待状态中唤醒&#xff0c;需要用到两种方法&#xff0c;如下&#xff1a…

如何使用群晖Synology Drive结合cpolar内网穿透实现同步Obsidian笔记文件

文章目录 一、简介软件特色演示&#xff1a; 二、使用免费群晖虚拟机搭建群晖Synology Drive服务&#xff0c;实现局域网同步1 安装并设置Synology Drive套件2 局域网内同步文件测试 三、内网穿透群晖Synology Drive&#xff0c;实现异地多端同步Windows 安装 Cpolar步骤&#…

【webrtc】源码下载与编译

目录 下载 下依赖 参考文章 &#xff1a; 下载 (1) windows ,centos上都会报错 &#xff08;2&#xff09; ubuntu A : 在git上设置代理 B fetch通过 ubuntu的界面 proxy设置了代理 这将会拉取webRTC源码&#xff0c;且额外加了android相关的依赖&#xff0c;例如And…

go语言学习--3.常用语句

目录 1.条件语句 1.1 if语句 1.2 if-else语句 1.3 switch语句 1.4 select语句 2.循环语句 2.1循环处理语句 2.2循环控制语句 3.go语言关键字 1.条件语句 和c语言类似&#xff0c;相关的条件语句如下表所示&#xff1a; 1.1 if语句 if 布尔表达式 {/* 在布尔表达式为 t…

关于JVM-三色标记算法剖析

相关系列 深入理解JVM垃圾收集器-CSDN博客 深入理解JVM垃圾收集算法-CSDN博客 深入理解jvm执行引擎-CSDN博客 jvm优化原则-CSDN博客 jvm流程图-CSDN博客 三色标记产生的原因&#xff1f; 在并发标记的过程中&#xff0c;因为标记期间应用线程还在继续跑&#xff0c;对象间的引…

postgresql发布和订阅

一、发布订阅介绍 发布和订阅使用了pg的逻辑复制的功能&#xff0c;通过发布端创建publication与表绑定&#xff0c;订阅端创建subscription同时会在发布端创建逻辑复制槽实现逻辑复制功能 逻辑复制基于 发布&#xff08;Publication&#xff09; 与 订阅&#xff08;Subscri…

使用aspose相关包将excel转成pdf 并导出

SpringBoot 项目 基于aspose相关jar包 将excel 转换成pdf 导出 1、依赖的jar包 &#xff0c; jar获取链接 aspose相关三方jar &#xff0c;下载解压后,在项目路径下建一个libs包&#xff0c;然后将下图两个jar 拷贝至刚新建的libs目录中 2、pom.xml中加入maven引入 <depend…

服务器 安装1Panel服务器运维管理面板

服务器 安装1Panel服务器运维管理面板 SSH链接服务器安装1Panel 出现此提示时输入目标路径&#xff0c;须以“/”开头&#xff0c;默认&#xff1a;/opt&#xff0c;本例&#xff1a;/www。 出现此提示时输入目标端口&#xff0c;须未被使用的端口&#xff0c;默认&#xff1…

电力变压器数据集介绍和预处理

1 电力变压器数据集介绍 1.1 数据背景 在这个Github仓库中&#xff0c;作者提供了几个可以用于长序列时间序列问题的数据集。所有数据集都经过了预处理&#xff0c;并存储为.csv文件。数据集的范围从2016/07到2018/07。 ETT-small: 含有2个电力变压器&#xff08;来自2个站点…

pringboot2集成swagger2出现guava的FluentIterable方法不存在

错误信息 Description: An attempt was made to call a method that does not exist. The attempt was made from the following location: springfox.documentation.spring.web.scanners.ApiListingScanner.scan(ApiListingScanner.java:117) The following method did not ex…

Qt 中的项目文件解析和命名规范

&#x1f40c;博主主页&#xff1a;&#x1f40c;​倔强的大蜗牛&#x1f40c;​ &#x1f4da;专栏分类&#xff1a;QT❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 目录 一、Qt项目文件解析 1、.pro 文件解析 2、widget.h 文件解析 3、main.cpp 文件解析 4、widget.cpp…

uniapp引入微信小程序版本VantUI,使用VantUI的自定义tabbar,并解决自定义tabbar出现闪烁的情况

视频教程地址&#xff1a;https://www.bilibili.com/video/BV13m41167iG 1.uniapp引入微信小程序版本VantUI 去vant官网下载源码&#xff0c;源码放在github&#xff0c;自行去下载下来 https://vant-contrib.gitee.io/vant-weapp/#/home 在pages.json的globalStyle里面注册组…

k8s的ca以及相关证书签发流程

k8s的ca以及相关证书签发流程 1. kube-apiserver相关证书说明2. 生成CA凭证1.1. 生成CA私钥1.2. 生成CA证书 2. 生成kube-apiserver凭证2.1. 生成kube-apiserver私钥2.2. 生成kube-apiserver证书请求2.3. 生成kube-apiserver证书 3. 疑问和思考4. 参考文档 对于网站类的应用&am…

中颖51芯片学习3. 定时器

中颖51芯片学习3. 定时器 一、SH79F9476定时器简介1. 简介2. 定时器运行模式 二、定时器21. 说明&#xff08;1&#xff09;时钟&#xff08;2&#xff09;工作模式 2. 寄存器&#xff08;1&#xff09;控制寄存器 T2CON&#xff08;2&#xff09;定时器2模式控制寄存器 T2MOD …

thinkphp6使用阿里云SDK发送短信

使用composer安装sdk "alibabacloud/dysmsapi-20170525": "2.0.24"封装发送短信类 发送到的短信参数写在env文件里面的 #发送短信配置 [AliyunSms] AccessKeyId "" AccessKeySecret "" signName"" templateCode"&…