JavaWeb_LeadNews_Day11-KafkaStream实现实时计算文章分数
- KafkaStream
- 概述
- 案例-统计单词个数
- SpringBoot集成
- 实时计算文章分值
- 来源
- Gitee
KafkaStream
概述
- Kafka Stream: 提供了对存储与Kafka内的数据进行流式处理和分析的功能
- 特点:
- Kafka Stream提供了一个非常简单而轻量的Library, 它可以非常方便地嵌入任意Java应用中, 也可以任意方式打包和部署
- 除了Kafka外, 无任何外部依赖
- 通过可容错地state, store实现高效地状态操作(如windowed join和aggregation)
- 支持基于事件时间地窗口操作, 并且可处理晚到的数据(late arrival of records)
- 关键概念:
- 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器.
- Sink处理器: sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题
- KStream:
- 数据结构类似于map, key-value键值对.
- 一段顺序的, 无限长, 不断更新的数据集.
案例-统计单词个数
- 依赖
依赖中有排除部分依赖, 还是一整个放上来好了<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions></dependency> </dependencies>
- 流式处理
public class KafkaStreamQuickStart {public static void main(String[] args) {// Kafka的配置信息Properties prop = new Properties();prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.174.133:9092");prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-quickstart");// Stream 构建器StreamsBuilder streamsBuilder = new StreamsBuilder();// 流式计算streamProcessor(streamsBuilder);// 创建KafkaStream对象KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), prop);// 开启流式计算kafkaStreams.start();}/*** 流式计算* 消息的内容: hello kafka* @param streamsBuilder*/private static void streamProcessor(StreamsBuilder streamsBuilder) {// 创建Kstream对象, 同时指定从哪个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");/*** 处理消息的value*/stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {String[] valAry = value.split(" ");return Arrays.asList(valAry);}})// 按照value聚合处理.groupBy((key, value)->value)// 时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))// 统计单词的个数.count()// 转换为KStream.toStream().map((key, value)->{System.out.println("key:"+key+",value:"+value);return new KeyValue<>(key.key().toString(), value.toString());})// 发送消息.to("itcast-topic-out");} }
- 发送消息
for (int i = 0; i < 5; i++) {ProducerRecord<String, String> kvProducerRecord = new ProducerRecord<>("icast-topic-input", "hello kafka");producer.send(kvProducerRecord); }
- 接收消息
// 订阅主题 consumer.subscribe(Collections.singleton("itcast-topic-out"));
SpringBoot集成
- 配置
config.java
application.yml/*** 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数*/ @Setter @Getter @Configuration @EnableKafkaStreams @ConfigurationProperties(prefix="kafka") public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;private String hosts;private String group;@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {Map<String, Object> props = new HashMap<>();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");props.put(StreamsConfig.RETRIES_CONFIG, 10);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);} }
BeanConfig.javakafka:hosts: 192.168.174.133:9092group: ${spring.application.name}
@Slf4j @Configuration public class KafkaStreamHelloListener {@Beanpublic KStream<String, String> KStream(StreamsBuilder streamsBuilder){// 创建Kstream对象, 同时指定从哪个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");/*** 处理消息的value*/stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {String[] valAry = value.split(" ");return Arrays.asList(valAry);}})// 按照value聚合处理.groupBy((key, value)->value)// 时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))// 统计单词的个数.count()// 转换为KStream.toStream().map((key, value)->{System.out.println("key:"+key+",value:"+value);return new KeyValue<>(key.key().toString(), value.toString());})// 发送消息.to("itcast-topic-out");return stream;} }
实时计算文章分值
- nacos: leadnews-behavior配置kafka生产者
kafka:bootstrap-servers: 192.168.174.133:9092producer:retries: 10key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer
- 修改leadnews-behavior
like
viewpublic ResponseResult likesBehavior(LikesBehaviorDto dto) {...UpdateArticleMess mess = new UpdateArticleMess();mess.setArticleId(dto.getArticleId());mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);if(dto.getOperation() == 0){...mess.setAdd(1);}else{...mess.setAdd(-1);}// kafka: 发送消息, 数据聚合kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC, JSON.toJSONString(mess));... }
public ResponseResult readBehavior(ReadBehaviorDto dto) {...// kafka: 发送消息, 数据聚合UpdateArticleMess mess = new UpdateArticleMess();mess.setArticleId(dto.getArticleId());mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);mess.setAdd(1);kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC, JSON.toJSONString(mess));... }
- leadnews-article中添加流式聚合处理
@Slf4j @Configuration public class HotArticleStreamHandler {@Beanpublic KStream<String, String> KStream(StreamsBuilder streamsBuilder){// 接收消息KStream<String, String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);// 聚合流式处理stream.map((key, value)->{UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);// 重置消息的key和valuereturn new KeyValue<>(mess.getArticleId().toString(), mess.getType().name()+":"+mess.getAdd());})// 根据文章id进行聚合.groupBy((key, value)->key)// 时间窗口.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))// 自行实现聚合计算.aggregate(// 初始方法, 返回值是消息的valuenew Initializer<String>() {@Overridepublic String apply() {return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";}},// 真正的聚合操作, 返回值是消息的valuenew Aggregator<String, String, String>() {@Overridepublic String apply(String key, String value, String aggValue) {if(StringUtils.isBlank(value)){return aggValue;}String[] aggAry = aggValue.split(",");int col=0, com=0, lik=0, vie=0;for (String agg : aggAry) {String[] split = agg.split(":");/*** 获得初始值, 也是时间窗口内计算之后的值*/switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){case COLLECTION:col = Integer.parseInt(split[1]);break;case COMMENT:com = Integer.parseInt(split[1]);break;case LIKES:lik = Integer.parseInt(split[1]);break;case VIEWS:vie = Integer.parseInt(split[1]);break;}}// 累加操作String[] valAry = value.split(":");switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){case COLLECTION:col += Integer.parseInt(valAry[1]);break;case COMMENT:com += Integer.parseInt(valAry[1]);break;case LIKES:lik += Integer.parseInt(valAry[1]);break;case VIEWS:vie += Integer.parseInt(valAry[1]);break;}String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);System.out.println("文章的id "+key);System.out.println("当前时间窗口内的消息处理结果: "+formatStr);return formatStr;}}, Materialized.as("hot-article-stream-count-001")).toStream().map((key, value)->{return new KeyValue<>(key.key().toString(), formatObj(key.key().toString(), value));})// 发送消息.to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);return stream;}/*** 格式化消息的value数据* @param articleId* @param value* @return*/private String formatObj(String articleId, String value) {ArticleVisitStreamMess mess = new ArticleVisitStreamMess();mess.setArticleId(Long.valueOf(articleId));// COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0String[] valAry = value.split(",");for (String val : valAry) {String[] split = val.split(":");switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){case COLLECTION:mess.setCollect(Integer.parseInt(split[1]));break;case COMMENT:mess.setComment(Integer.parseInt(split[1]));break;case LIKES:mess.setLike(Integer.parseInt(split[1]));break;case VIEWS:mess.setView(Integer.parseInt(split[1]));break;}}log.info("聚合消息处理之后的结果为: {}", JSON.toJSONString(mess));return JSON.toJSONString(mess);}}
- leadnews-article添加聚合数据监听器
@Slf4j @Component public class ArticleIncrHandlerListener {@Autowiredprivate ApArticleService apArticleService;@KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)public void onMessage(String mess){if(StringUtils.isNotBlank(mess)){ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);apArticleService.updateScore(articleVisitStreamMess);System.out.println(mess);}}}
- 替换redis中的热点数据
/*** 更新 文章分值, 缓存中的热点文章数据* @param mess*/ @Override public void updateScore(ArticleVisitStreamMess mess) {// 1. 更新文章的阅读, 点赞, 收藏, 评论的数量ApArticle apArticle = updateArticleBehavior(mess);// 2. 计算文章的分值Integer score = hotArticleService.computeArticleScore(apArticle);score *= 3;// 3. 替换当前文章对应频道的热点数据replaceDataToRedis(ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId(), apArticle, score);// 4. 替换推荐对应的热点数据replaceDataToRedis(ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG, apArticle, score); }/*** 替换数据并且存入到redis中* @param HOT_ARTICLE_FIRST_PAGE* @param apArticle* @param score*/ private void replaceDataToRedis(String HOT_ARTICLE_FIRST_PAGE, ApArticle apArticle, Integer score) {String articleList = cacheService.get(HOT_ARTICLE_FIRST_PAGE);if(StringUtils.isNotBlank(articleList)){List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleList, HotArticleVo.class);boolean flag = true;// 3.1 文章已是热点文章, 则更新文章分数for (HotArticleVo hotArticleVo : hotArticleVoList) {if(hotArticleVo.getId().equals(apArticle.getId())){hotArticleVo.setScore(score);flag = false;break;}}// 3.2 文章还不是热点文章, 则替换分数最小的文章if(flag){if(hotArticleVoList.size() >= 30){// 热点文章超过30条hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1);if(lastHot.getScore() < score){hotArticleVoList.remove(lastHot);HotArticleVo hotArticleVo = new HotArticleVo();BeanUtils.copyProperties(apArticle, hotArticleVo);hotArticleVo.setScore(score);hotArticleVoList.add(hotArticleVo);}}else{// 热点文章没超过30条HotArticleVo hotArticleVo = new HotArticleVo();BeanUtils.copyProperties(apArticle, hotArticleVo);hotArticleVo.setScore(score);hotArticleVoList.add(hotArticleVo);}}// 3.3 缓存到redishotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());cacheService.set(HOT_ARTICLE_FIRST_PAGE, JSON.toJSONString(hotArticleVoList));} }/*** 更新文章行为数据* @param mess*/ private ApArticle updateArticleBehavior(ArticleVisitStreamMess mess) {ApArticle apArticle = getById(mess.getArticleId());apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment()+mess.getComment());apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection()+mess.getCollect());apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes()+mess.getLike());apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews()+mess.getView());updateById(apArticle);return apArticle; }
来源
黑马程序员. 黑马头条
Gitee
https://gitee.com/yu-ba-ba-ba/leadnews