package com.dzj.kafka_streaming.listener; import com.dzj.kafka_streaming.dto.TagNameTypeInfo; import com.dzj.kafka_streaming.service.ContentTagRelationService; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; import javax.annotation.Resource; import java.util.ArrayList; import java.util.Base64; import java.util.List; /** * "immersive_streaming_" + userId; 这是旧的key,需要清除 */ @Component public class MessageListener { @Autowired private ContentTagRelationService relationService; @Resource private RedisTemplate<String, Object> redisTemplate; private final String TOPIC_NAME = "event-trace-log" ; // @KafkaListener(topics = {TOPIC_NAME},groupId = "itmentuGroup") @KafkaListener (topics = {TOPIC_NAME}) public void listener(ConsumerRecord<String,String> record) { //获取消息 String message = record.value(); //消息偏移量 long offset = record.offset(); String redisKeyPrefix = "kafka:user_short_video_streaming:_" ; JSONObject dataJson = parseJson(message); String eventCode = dataJson.getString( "eventCode" ); if ( "145001" .equals(eventCode)){ // 测试环境------------------------------------------------------------------------------------------ // 目前只关注沉浸式中得数据 String resourceId = dataJson.getJSONObject( "eventBody" ).getString( "resourceId" ); String resourceType = dataJson.getJSONObject( "eventBody" ).getString( "resourceType" ); Integer duration = dataJson.getJSONObject( "eventBody" ).getInteger( "duration" ); String actionCode = dataJson.getJSONObject( "eventBody" ).getString( "actionCode" ); String userId = dataJson.getJSONObject( "eventBody" ).getString( "userId" ); String appType = dataJson.getJSONObject( "eventBody" ).getString( "appType" ); // System.out.println("________kafka msg: eventCode = " + eventCode + "eventBody = " + dataJson.getJSONObject("eventBody")); /** * 写入Redis * redis存储结构: key = List(5),是一个定长为5,右进左出的队列 * 首先查询该key的list长度,如果长度超过5,就先左边出队列一个,再右边进一个,否则右边进一个 */ String key = redisKeyPrefix + userId; // String key = "immersive_streaming_wyp0001"; // 定义Redis队列写入的结构 JSONObject redisListItem = new JSONObject(); redisListItem.put( "resourceId" ,resourceId); redisListItem.put( "resourceType" ,resourceType); redisListItem.put( "duration" ,duration); redisListItem.put( "actionCode" ,actionCode); redisListItem.put( "appType" ,appType); String redisListItemString = redisListItem.toJSONString(); if (redisTemplate.opsForList().size(key) >= 100 ){ Object leftPop = redisTemplate.opsForList().leftPop(key); redisTemplate.opsForList().rightPush(key, redisListItemString); System.out.println( "[pop]redis key : " + redisKeyPrefix + userId + " now contains: " + redisTemplate.opsForList().range(key, 0 , - 1 )); } else { if (!resourceId.isEmpty() && !resourceType.isEmpty()){ redisTemplate.opsForList().rightPush(key, redisListItemString); Long size = redisTemplate.opsForList().size(key); System.out.println( "redis key : " + redisKeyPrefix + userId + " pushed one: " + size + redisListItemString); System.out.println( "redis key : " + redisKeyPrefix + userId + " now contains: " + redisTemplate.opsForList().range(key, 0 , - 1 )); } } } } /** * 解析json,解码功能 */ public JSONObject parseJson(String message) { JSONObject messageJson = JSONObject.parseObject(message); String dataString = messageJson.getString( "data" ); // --------------------base64解码字符串-------------------- String data_string = "" ; final Base64.Decoder decoder = Base64.getDecoder(); try { data_string = new String(decoder.decode(dataString), "UTF-8" ); } catch (Exception e){ System.out.println( "【kafka parseJson ERROR】com.dzj.kafka_streaming.listener.MessageListener.parseJson" + e); } // string转换为json,只取eventCode = '145001'沉浸式的 JSONObject dataJson = JSONObject.parseObject(data_string); return dataJson; } /** * 从数据库查询 * @param resourceId * @param resourceType * @return */ public List<TagNameTypeInfo> queryByIdAndType(String resourceId, String resourceType ){ List<TagNameTypeInfo> tagNameTypeInfos = new ArrayList<>(); try { tagNameTypeInfos = relationService.queryTagNameTypeInfo(Long.valueOf(resourceId), resourceType); } catch (Exception e){ System.out.println( "【ERROR】" + resourceId + "&" + resourceType + "在数据库中查询不到......." ); } return tagNameTypeInfos; } } |