20221124 kafka实时数据写入Redis

一、上线结论

  • 实现了将用户线上实时浏览的沉浸式视频信息,保存在Redis中这样一个功能。
  • 为实现沉浸式视频离线推荐到实时推荐提供了强有力的支持。目前只是应用在沉浸式场景,后续也能扩展到其他所有场景。
  • 用于两个场景:(1)根据用户近期观看物料匹配相似物料(2)过滤用户近期观看物料

二、实现效果展示

用户在线上刷一个视频,redis就会将用户的视频信息保存在用户历史浏览的队列中。

队列大小为100。具体保存的信息如下所示:

一、Redis存储KEY:kafka:user_short_video_streaming:_5c91e0cf0cf2f3d119f92774
二、Redis存储value:[{"duration":4,"resourceId":"28808","appType":"DOCTOR","actionCode":"1006","resourceType":"VIDEO"}, {"duration":9,"resourceId":"24262","appType":"DOCTOR","actionCode":"1006","resourceType":"VIDEO"}, {"duration":5,"resourceId":"25330","appType":"DOCTOR","actionCode":"1006","resourceType":"VIDEO"}]

三、实现策略

  • 采用Java语言实现,
  • 先监听kafka,然后解析kafka消息,进行解码,再解析,从解析后的结果中提取user_id, resource_id和resource_type字段。
  • 连接Redis,构造用户队列,队列长度设置为100(用户刷的视频个数),将数据写入Redis
  • 队列大小为100,超过100顺序pop

代码Git:http://gitlab.dzj.com/applied_algorithm/data_analysis/kafka_streaming_immersive.git

四、项目后续规划

  • 扩展到Feed流,搜索召回等全部的场景
  • jar包后台运行方式改为CICD部署

五、附录

5.1 BUG分享

在实现的过程中,遇到一个序列化问题,就是写入的key和value乱码,导致用Python查询的定义好的KEY的时候查询不到,解决方案如下:

自定义RedisTemplete进行重写,  用jackson进行序列化,将这个类注册到Spring Boot中

 折叠源码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

package com.dzj.kafka_streaming.Config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;

import com.fasterxml.jackson.annotation.PropertyAccessor;

import com.fasterxml.jackson.databind.ObjectMapper;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.data.redis.connection.RedisConnectionFactory;

import org.springframework.data.redis.core.RedisTemplate;

import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;

import org.springframework.data.redis.serializer.StringRedisSerializer;

/**

 * @Author : wangyongpeng

 * @Date : 2022/12/16 14:34

 * @Description : 重写RedisTemplate, 进行序列化

 */

@Configuration

public class RedisConfig {

    @Bean

    @SuppressWarnings("all")

    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {

        RedisTemplate<String, Object> template = new RedisTemplate();

        template.setConnectionFactory(redisConnectionFactory);

        // JSON序列化配置

        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);

        ObjectMapper om = new ObjectMapper();

        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);

        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);

        jackson2JsonRedisSerializer.setObjectMapper(om);

        // String 的序列化

        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        // key 采用String的序列化方式

        template.setKeySerializer(stringRedisSerializer);

        // hash的key也采用String的序列化方式

        template.setHashKeySerializer(stringRedisSerializer);

        // valuex序列化方式采用jackson

        template.setValueSerializer(jackson2JsonRedisSerializer);

        // hash的序列化也用jackson

        template.setHashValueSerializer(jackson2JsonRedisSerializer);

        template.afterPropertiesSet();

        return template;

    }

}

5.2 项目核心代码

 折叠源码

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

package com.dzj.kafka_streaming.listener;

import com.dzj.kafka_streaming.dto.TagNameTypeInfo;

import com.dzj.kafka_streaming.service.ContentTagRelationService;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.data.redis.core.RedisTemplate;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSONObject;

import javax.annotation.Resource;

import java.util.ArrayList;

import java.util.Base64;

import java.util.List;

/**

 * "immersive_streaming_" + userId; 这是旧的key,需要清除

 */

@Component

public class MessageListener {

    @Autowired

    private ContentTagRelationService relationService;

    @Resource

    private RedisTemplate<String, Object> redisTemplate;

    private final String TOPIC_NAME = "event-trace-log";

    // @KafkaListener(topics = {TOPIC_NAME},groupId = "itmentuGroup")

    @KafkaListener(topics = {TOPIC_NAME})

    public void listener(ConsumerRecord<String,String> record)  {

        //获取消息

        String message = record.value();

        //消息偏移量

        long offset = record.offset();

        String redisKeyPrefix = "kafka:user_short_video_streaming:_";

        JSONObject dataJson = parseJson(message);

        String eventCode = dataJson.getString("eventCode");

        if ("145001".equals(eventCode)){

            // 测试环境------------------------------------------------------------------------------------------

            // 目前只关注沉浸式中得数据

            String resourceId = dataJson.getJSONObject("eventBody").getString("resourceId");

            String resourceType = dataJson.getJSONObject("eventBody").getString("resourceType");

            Integer duration = dataJson.getJSONObject("eventBody").getInteger("duration");

            String actionCode = dataJson.getJSONObject("eventBody").getString("actionCode");

            String userId = dataJson.getJSONObject("eventBody").getString("userId");

            String appType = dataJson.getJSONObject("eventBody").getString("appType");

            // System.out.println("________kafka msg: eventCode = " + eventCode + "eventBody = " + dataJson.getJSONObject("eventBody"));

            /**

             * 写入Redis

             * redis存储结构: key = List(5),是一个定长为5,右进左出的队列

             * 首先查询该key的list长度,如果长度超过5,就先左边出队列一个,再右边进一个,否则右边进一个

             */

            String key = redisKeyPrefix + userId;

    //        String key = "immersive_streaming_wyp0001";

            // 定义Redis队列写入的结构

            JSONObject redisListItem = new JSONObject();

            redisListItem.put("resourceId",resourceId);

            redisListItem.put("resourceType",resourceType);

            redisListItem.put("duration",duration);

            redisListItem.put("actionCode",actionCode);

            redisListItem.put("appType",appType);

            String redisListItemString = redisListItem.toJSONString();

            if (redisTemplate.opsForList().size(key) >= 100){

                Object leftPop = redisTemplate.opsForList().leftPop(key);

                redisTemplate.opsForList().rightPush(key, redisListItemString);

                System.out.println("[pop]redis key : "+ redisKeyPrefix + userId + " now contains:  "+ redisTemplate.opsForList().range(key,0, -1));

            }else {

                if (!resourceId.isEmpty() && !resourceType.isEmpty()){

                    redisTemplate.opsForList().rightPush(key, redisListItemString);

                    Long size = redisTemplate.opsForList().size(key);

                    System.out.println("redis key : "+ redisKeyPrefix + userId + " pushed one:  "+ size + redisListItemString);

                    System.out.println("redis key : "+ redisKeyPrefix + userId + " now contains:  "+ redisTemplate.opsForList().range(key,0, -1));

                }

            }

        }

    }

     

    /**

     * 解析json,解码功能

     */

    public JSONObject parseJson(String message) {

        JSONObject messageJson = JSONObject.parseObject(message);

        String dataString = messageJson.getString("data");

        // --------------------base64解码字符串--------------------

        String data_string = "";

        final Base64.Decoder decoder = Base64.getDecoder();

        try{

            data_string = new String(decoder.decode(dataString), "UTF-8");

        }catch (Exception e){

            System.out.println("【kafka parseJson ERROR】com.dzj.kafka_streaming.listener.MessageListener.parseJson" + e);

        }

        // string转换为json,只取eventCode = '145001'沉浸式的

        JSONObject dataJson = JSONObject.parseObject(data_string);

        return dataJson;

    }

    /**

     * 从数据库查询

     * @param resourceId

     * @param resourceType

     * @return

     */

    public List<TagNameTypeInfo>  queryByIdAndType(String resourceId, String resourceType ){

        List<TagNameTypeInfo> tagNameTypeInfos = new ArrayList<>();

        try {

            tagNameTypeInfos = relationService.queryTagNameTypeInfo(Long.valueOf(resourceId), resourceType);

        catch (Exception e){

            System.out.println("【ERROR】" + resourceId + "&" + resourceType + "在数据库中查询不到.......");

        }

        return tagNameTypeInfos;

    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/289816.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Apache Hive的基本使用语法(二)

Hive SQL操作 7、修改表 表重命名 alter table score4 rename to score5;修改表属性值 # 修改内外表属性 ALTER TABLE table_name SET TBLPROPERTIES("EXTERNAL""TRUE"); # 修改表注释 ALTER TABLE table_name SET TBLPROPERTIES (comment new_commen…

掌握Flutter底部导航栏:畅游导航之旅

1. 引言 在移动应用开发中&#xff0c;底部导航栏是一种常见且非常实用的用户界面元素。它提供了快速导航至不同功能模块或页面的便捷方式&#xff0c;使用户可以轻松访问应用程序的各个部分。在Flutter中&#xff0c;底部导航栏也是一项强大的功能&#xff0c;开发者可以利用…

寄主机显示器被快递搞坏了怎么办?怎么破?

大家好&#xff0c;我是平泽裕也。 最近&#xff0c;我在社区里看到很多关于开学后弟弟寄来的电脑显示器被快递损坏的帖子。 看到它真的让我感到难过。 如果有人的数码产品被快递损坏了&#xff0c;我会伤心很久。 那么今天就跟大家聊聊寄快递的一些小技巧。 作为一名曾经的…

在scroll-view中使用input,input键盘弹出时,滚动页面,输入框内容会出现错位问题?

解决办法 <view classpages><view><scroll-view scroll-y"{{sysScroll}}" scroll-top"{{scrollTop}}" class"scroll-hei-2 bg-def">...<input bindfocus"onfocus" bindblur"onblur" placeholder&quo…

如何在Apache Arrow中定位与解决问题

如何在apache Arrow定位与解决问题 最近在执行sql时做了一些batch变更&#xff0c;出现了一个 crash问题&#xff0c;底层使用了apache arrow来实现。本节将会从0开始讲解如何调试STL源码crash问题&#xff0c;在这篇文章中以实际工作中resize导致crash为例&#xff0c;引出如何…

车载以太网AVB交换机 gptp透明时钟 8口 千兆/百兆可切换 SW1100TR

SW1100车载以太网交换机 一、产品简要分析 8端口千兆和百兆混合车载以太网交换机&#xff0c;其中包含2个通道的1000BASE-T1采用罗森博格H-MTD接口&#xff0c;5通道100BASE-T1泰科MATEnet接口和1个通道1000BASE-T标准以太网(RJ45接口)&#xff0c;可以实现车载以太网多通道交…

利用R语言和curl库实现网页爬虫的技术要点解析

R语言简介 R语言是一种自由、跨平台的编程语言和软件环境&#xff0c;专门用于统计计算和数据可视化。它具有丰富的数据处理、统计分析和图形展示功能&#xff0c;被广泛应用于数据科学、机器学习、统计建模等领域。 R语言技术优势 丰富的数据处理功能&#xff1a; R语言拥有…

入门指南|营销中人工智能生成内容的主要类型 [新数据、示例和技巧]

由于人工智能技术的进步&#xff0c;内容生成不再是一项令人头疼的任务。随着人工智能越来越多地接管手动内容制作任务&#xff0c;营销人员明智的做法是了解现有的不同类型的人工智能生成内容&#xff0c;以及哪些内容从中受益最多。这些工具可以帮助我们制作对您的受众和品牌…

RabbitMQ3.x之四_RabbitMQ角色说明及创建用户与授权

RabbitMQ3.x之四_角色说明及创建用户与授权 文章目录 RabbitMQ3.x之四_角色说明及创建用户与授权1. 访问和授权1. Tags说明2. 命令行示例 2. 管理界面新建用户及访问授权1. 管理界面新建用户2. 管理界面中的授权说明3. guest用户不能远程登录提示 3. 创建用户1. 基本命令2. 实际…

P8623 [蓝桥杯 2015 省 B] 移动距离 Python

[蓝桥杯 2015 省 B] 移动距离 题目描述 X 星球居民小区的楼房全是一样的&#xff0c;并且按矩阵样式排列。其楼房的编号为 $1,2,3, \cdots $ 。 当排满一行时&#xff0c;从下一行相邻的楼往反方向排号。 比如&#xff1a;当小区排号宽度为 6 6 6 时&#xff0c;开始情形如…

数据结构——链表(练习题)

大家好&#xff0c;我是小锋我们继续来学习链表。 我们在上一节已经把链表中的单链表讲解完了&#xff0c;大家感觉怎么样我们今天来带大家做一些练习帮助大家巩固所学内容。 1. 删除链表中等于给定值 val 的所有结点 . - 力扣&#xff08;LeetCode&#xff09; 我们大家来分…

网站为什么要选择使用安全加速SCDN?

安全加速SCDN&#xff08;安全内容交付网络&#xff09;是一种网络加速服务&#xff0c;旨在提高网站和应用程序的性能和安全性。它使用专门的技术和基础设施来加速内容传输并保护网站免受网络攻击。 安全加速SCDN可以通过内容缓存、快速传输和动态路由技术来加速网站和应用程…

SpringMVC第一个helloword项目

文章目录 前言一、SpringMVC是什么&#xff1f;二、使用步骤1.引入库2.创建控制层3.创建springmvc.xml4.配置web.xml文件5.编写视图页面 总结 前言 提示&#xff1a;这里可以添加本文要记录的大概内容&#xff1a; SpringMVC 提示&#xff1a;以下是本篇文章正文内容&#xf…

【ML】类神经网络训练不起来怎么办 5

【ML】类神经网络训练不起来怎么办 5 1. Saddle Point V.S. Local Minima(局部最小值 与 鞍点)2. Tips for training: Batch and Momentum(批次与 动量)2.1 Tips for training: Batch and Momentum2.2 参考文献:2.3 Gradient Descent2.4 Concluding Remarks(前面三讲)3.…

C# OpenCvSharp 轮廓检测

目录 效果 代码 下载 效果 代码 using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Windows.Forms; using OpenCvSharp; using OpenCvSharp.…

Git 命令总览

Git Git 是一个版本控制系统&#xff0c;用于管理项目代码。通过 Git 可以轻松地进行代码的提交、更新和合并&#xff0c;确保项目代码的安全性和稳定性。同时&#xff0c;Git 还提供了丰富的工具和功能&#xff0c;如分支管理、代码审查、版本回退等&#xff0c;帮助开发更好…

大模型 智能体 智能玩具 智能音箱 构建教程 wukong-robot

视频演示 10:27 一、背景 继上文《ChatGPT+小爱音响能擦出什么火花?》可以看出大伙对AI+硬件的结合十分感兴趣,但上文是针对市场智能音响的AI植入,底层是通过轮询拦截,算是hack兼容,虽然官方有提供开发者接口,也免不了有许多局限性(比如得通过特定指令唤醒),不利于我…

【Web应用技术基础】CSS(6)——使用 HTML/CSS 实现 Educoder 顶部导航栏

第一题&#xff1a;使用flex布局实现Educoder顶部导航栏容器布局 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Educoder</title><script src"https://cdn.staticfile.org/jquery/1.1…

C/C++语言学习路线: 嵌入式开发、底层软件、操作系统方向(持续更新)

初级&#xff1a;用好手上的锤子 1 【感性】认识 C 系编程语言开发调试过程 1.1 视频教程点到为止 1.2 炫技视频看看就行 1.3 编程游戏不玩也罢 有些游戏的主题任务就是编程&#xff0c;游戏和实际应用环境有一定差异&#xff08;工具、操作流程&#xff09;&#xff0c;在…

Unity AI Navigation自动寻路

目录 前言一、Unity中AI Navigation是什么&#xff1f;二、使用步骤1.安装AI Navigation2.创建模型和材质3.编写向目标移动的脚本4.NavMeshLink桥接组件5.NavMeshObstacle组件6.NavMeshModifler组件 三、效果总结 前言 Unity是一款强大的游戏开发引擎&#xff0c;而人工智能&a…