使用kafka完成数据的实时同步,同步到es中。(使用kafka实现自动上下架 upper、lower)

文章目录

  • 1、发送消息 KafkaService
  • 2、生产者 service-album -> AlbumInfoServiceImpl
    • 2.1、新增 saveAlbumInfo()
    • 2.2、更新 updateAlbumInfo()
    • 2.3、删除 removeAlbumInfo()
  • 3、消费者 service-search - > AlbumListener.java

  • 上架:新增专辑到 es
  • 下架:删除专辑
  1. 新增:如果是公开的专辑则发送消息给kafka,search通过监听器获取消息同步新增数据
  2. 更新:如果是公开的专辑则发送消息给kafka,search通过监听器获取消息同步更新数据
            如果是私有的专辑则发送消息给kafka,search通过监听器获取消息es删除数据
  3. 删除:发送消息给kafka,search通过监听器获取消息es删除数据

1、发送消息 KafkaService

package com.atguigu.tingshu.common.service;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;import java.util.concurrent.CompletableFuture;@Service
public class KafkaService {private static final Logger logger = LoggerFactory.getLogger(KafkaService.class);@Autowiredprivate KafkaTemplate kafkaTemplate;/*** 向指定主题发送消息* 此方法通过调用重载的sendMsg方法,向指定主题发送消息,使用默认的消息标签和消息键** @param topic 发送消息的主题* @param msg   需要发送的消息内容*/public void sendMsg(String topic, String msg){// 调用重载的sendMsg方法,传入默认值以简化调用this.sendMsg(topic, null, null, msg);}/*** 发送消息到指定的Kafka主题** @param topic 消息主题* @param partition 分区编号* @param key 消息键值* @param msg 消息内容*/public void sendMsg(String topic, Integer partition, String key, String msg){// 发生消息并返回异步结果CompletableFuture<SendResult> future = this.kafkaTemplate.send(topic, partition, key, msg);// 异步处理发送结果future.whenCompleteAsync((result, ex) -> {if (ex != null){// 如果发送过程中出现异常logger.error("生产者发送消息失败!原因:{}", ex.getMessage());}});}}
  • whenCompleteAsync:异步完成时的处理、当异步操作完成时

在这里插入图片描述

2、生产者 service-album -> AlbumInfoServiceImpl

在这里插入图片描述

2.1、新增 saveAlbumInfo()

  • 新增:如果是公开的专辑则发送消息给kafka,search通过监听器获取消息同步新增数据
    在这里插入图片描述
    在这里插入图片描述

在这里插入图片描述

@Slf4j
@Service
@SuppressWarnings({"unchecked", "rawtypes"})
public class AlbumInfoServiceImpl extends ServiceImpl<AlbumInfoMapper, AlbumInfo> implements AlbumInfoService {@Autowiredprivate AlbumAttributeValueMapper attributeValueMapper;@Autowiredprivate AlbumStatService albumStatService;@Autowiredprivate KafkaService kafkaService;@Transactional(rollbackFor = Exception.class)@Overridepublic void saveAlbumInfo(AlbumInfoVo albumInfoVo) throws FileNotFoundException {// 1.保存专辑信息表AlbumInfo albumInfo = new AlbumInfo();BeanUtils.copyProperties(albumInfoVo, albumInfo);// 设置当前用户的idLong userId = AuthContextHolder.getUserId();albumInfo.setUserId(userId == null ? 1 : userId);albumInfo.setTracksForFree(5);albumInfo.setSecondsForFree(30);albumInfo.setStatus(SystemConstant.ALBUM_STATUS_PASS);this.save(albumInfo);// 主键回写获取专辑idLong albumInfoId = albumInfo.getId();// 2.保存专辑标签值表List<AlbumAttributeValueVo> albumAttributeValueVoList = albumInfoVo.getAlbumAttributeValueVoList();if (!CollectionUtils.isEmpty(albumAttributeValueVoList)) {albumAttributeValueVoList.forEach(albumAttributeValueVo -> {AlbumAttributeValue albumAttributeValue = new AlbumAttributeValue();BeanUtils.copyProperties(albumAttributeValueVo, albumAttributeValue);albumAttributeValue.setAlbumId(albumInfoId);this.attributeValueMapper.insert(albumAttributeValue);});}//		new FileInputStream("xxx");//		try {
//			TimeUnit.SECONDS.sleep(3);
//		} catch (InterruptedException e) {
//			throw new RuntimeException(e);
//		}// 3.保存统计信息:专辑状态表// this.saveAlbumStat(albumInfoId);this.albumStatService.saveAlbumStat(albumInfoId);if (StringUtils.equals(albumInfo.getIsOpen(), "1")) {this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_UPPER, albumInfoId.toString());}//		int i = 1/0;}
}

在这里插入图片描述

2.2、更新 updateAlbumInfo()

  • 更新:如果是公开的专辑则发送消息给kafka,search通过监听器获取消息同步更新数据
            如果是私有的专辑则发送消息给kafka,search通过监听器获取消息es删除数据
@Slf4j
@Service
@SuppressWarnings({"unchecked", "rawtypes"})
public class AlbumInfoServiceImpl extends ServiceImpl<AlbumInfoMapper, AlbumInfo> implements AlbumInfoService {@Autowiredprivate AlbumAttributeValueMapper attributeValueMapper;@Autowiredprivate KafkaService kafkaService;@Transactional@Overridepublic void updateAlbumInfo(Long albumId, AlbumInfoVo albumInfoVo) {AlbumInfo albumInfo = new AlbumInfo();BeanUtils.copyProperties(albumInfoVo, albumInfo);albumInfo.setId(albumId);this.updateById(albumInfo);// 更新专辑标签值表:先删除该专辑所有的标签及值 再去新增this.attributeValueMapper.delete(new LambdaUpdateWrapper<AlbumAttributeValue>().eq(AlbumAttributeValue::getAlbumId, albumId));List<AlbumAttributeValueVo> albumAttributeValueVoList = albumInfoVo.getAlbumAttributeValueVoList();if (!CollectionUtils.isEmpty(albumAttributeValueVoList)) {albumAttributeValueVoList.forEach(albumAttributeValueVo -> {AlbumAttributeValue albumAttributeValue = new AlbumAttributeValue();BeanUtils.copyProperties(albumAttributeValueVo, albumAttributeValue);albumAttributeValue.setAlbumId(albumId);this.attributeValueMapper.insert(albumAttributeValue);});}if (StringUtils.equals(albumInfoVo.getIsOpen(), "1")) {this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_UPPER, albumId.toString());} else {this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_LOWER, albumId.toString());}}
}

在这里插入图片描述

2.3、删除 removeAlbumInfo()

  • 删除:发送消息给kafka,search通过监听器获取消息es删除数据
@Slf4j
@Service
@SuppressWarnings({"unchecked", "rawtypes"})
public class AlbumInfoServiceImpl extends ServiceImpl<AlbumInfoMapper, AlbumInfo> implements AlbumInfoService {@Autowiredprivate AlbumAttributeValueMapper attributeValueMapper;@Autowiredprivate AlbumStatMapper albumStatMapper;@Autowiredprivate KafkaService kafkaService;@Transactional@Overridepublic void removeAlbumInfo(Long albumId) {this.removeById(albumId);this.albumStatMapper.delete(new LambdaUpdateWrapper<AlbumStat>().eq(AlbumStat::getAlbumId, albumId));this.attributeValueMapper.delete(new LambdaUpdateWrapper<AlbumAttributeValue>().eq(AlbumAttributeValue::getAlbumId, albumId));this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_LOWER, albumId.toString());}
}

在这里插入图片描述

3、消费者 service-search - > AlbumListener.java

在这里插入图片描述

package com.atguigu.tingshu.search.listener;@Component
public class AlbumListener {@Autowiredprivate AlbumInfoFeignClient albumInfoFeignClient;@Autowiredprivate UserInfoFeignClient userInfoFeignClient;@Autowiredprivate CategoryFeignClient categoryFeignClient;@Autowiredprivate ElasticsearchTemplate elasticsearchTemplate;@KafkaListener(topics = KafkaConstant.QUEUE_ALBUM_UPPER)public void upper(String albumId){if (StringUtils.isBlank(albumId)){return;}// 根据专辑id查询专辑Result<AlbumInfo> albumInfoResult = this.albumInfoFeignClient.getAlbumInfo(Long.valueOf(albumId));Assert.notNull(albumInfoResult, "同步数据时,获取专辑信息失败!");AlbumInfo albumInfo = albumInfoResult.getData();Assert.notNull(albumInfo, "同步数据时,没有对应的专辑!");AlbumInfoIndex albumInfoIndex = new AlbumInfoIndex();// 把专辑信息中的数据复制到index对象BeanUtils.copyProperties(albumInfo, albumInfoIndex);// 查询主播获取主播信息Result<UserInfoVo> userInfoVoResult = this.userInfoFeignClient.getUserById(albumInfo.getUserId());Assert.notNull(userInfoVoResult, "数据导入时,获取主播信息失败!");UserInfoVo userInfoVo = userInfoVoResult.getData();if (userInfoVo != null){albumInfoIndex.setAnnouncerId(userInfoVo.getId());albumInfoIndex.setAnnouncerName(userInfoVo.getNickname());}// 根据三级分类id查询一二三级分类Result<BaseCategoryView> categoryResult = this.categoryFeignClient.getAllLevelCategories(albumInfo.getCategory3Id());Assert.notNull(categoryResult, "数据导入时,获取分类信息失败!");BaseCategoryView baseCategoryView = categoryResult.getData();if (baseCategoryView != null) {albumInfoIndex.setCategory1Id(baseCategoryView.getCategory1Id());albumInfoIndex.setCategory2Id(baseCategoryView.getCategory2Id());}// 查询专辑统计信息
//                Result<AlbumStatVo> albumStatesResult = this.albumInfoFeignClient.getAlbumStates(albumInfo.getId());
//                Assert.notNull(albumStatesResult, "数据导入时,获取专辑统计信息失败!");
//                AlbumStatVo albumStatVo = albumStatesResult.getData();
//                if (albumStatVo != null) {
//                    BeanUtils.copyProperties(albumStatVo, albumInfoIndex);
//                }// 假数据:int playNum = (new Random().nextInt(100) + 1) * 10000;albumInfoIndex.setPlayStatNum(playNum);int subscribeNum = (new Random().nextInt(100) + 1) * 10000;albumInfoIndex.setSubscribeStatNum(subscribeNum);int buyNum = (new Random().nextInt(100) + 1) * 10000;albumInfoIndex.setBuyStatNum(buyNum);int commentNum = (new Random().nextInt(100) + 1) * 10000;albumInfoIndex.setCommentStatNum(commentNum);// 热度albumInfoIndex.setHotScore(playNum * 0.1 + commentNum * 0.2 + subscribeNum * 0.3 + buyNum * 0.4);// 标签Result<List<AlbumAttributeValue>> albumAttributeValueResult = this.albumInfoFeignClient.getAlbumAttributeValue(albumInfo.getId());Assert.notNull(albumAttributeValueResult, "数据导入时,获取标签及值失败!");List<AlbumAttributeValue> albumAttributeValues = albumAttributeValueResult.getData();if (!CollectionUtils.isEmpty(albumAttributeValues)){// 把List<AlbumAttributeValue> 转化成  List<AttributeValueIndex>albumInfoIndex.setAttributeValueIndexList(albumAttributeValues.stream().map(albumAttributeValue -> {AttributeValueIndex attributeValueIndex = new AttributeValueIndex();BeanUtils.copyProperties(albumAttributeValue, attributeValueIndex);return attributeValueIndex;}).collect(Collectors.toList()));}this.elasticsearchTemplate.save(albumInfoIndex);}@KafkaListener(topics = KafkaConstant.QUEUE_ALBUM_LOWER)public void lower(String albumId){if (StringUtils.isBlank(albumId)){return;}this.elasticsearchTemplate.delete(albumId, AlbumInfoIndex.class);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/411268.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【归纳总结】常见排序算法及其实现:直接插入排序、希尔排序、选择排序、堆排序、冒泡排序、快排、归并排序

思维导图&#xff1a; 目录 思维导图&#xff1a; 一、插入排序 1.直接插入排序&#xff1a; a:基本思想&#xff1a; b:基本步骤&#xff1a; c:复杂度分析 d:Java代码实现&#xff1a; 2.希尔排序&#xff08;缩小增量排序&#xff09; a:基本思想&#xff1a; c…

用AI来学习英语口语(白嫖,所以稍微麻烦些)

写在前面 本文看下如何使用AI来免费学习英语口语。 1&#xff1a;正文 首先&#xff0c;我们点击这里来到一个对话窗口&#xff0c;你可以在这个窗口中使用英语来询问你的问题&#xff0c;比如what can i do when i am not happy&#xff1a; 接着复制机器人回答内容&#…

离散数学中的逻辑基础(1)

目录 引言 1. 命题及其逻辑运算 2. 逻辑等价与范式 3. 逻辑推理规则 4. 逻辑问题练习 5. 总结 引言 逻辑是离散数学的核心概念之一&#xff0c;它用于精确描述数学命题并分析其关系。逻辑不仅是数学证明的基础&#xff0c;也是计算机科学中算法设计和编程的基石。本篇文…

minio文件存储+ckplayer视频播放(minio分片上传合并视频播放)

文章目录 参考简述效果启动minio代码配置类RedisConfigWebConfigMinioClientAutoConfigurationOSSPropertiesapplication.yml 实体类MinioObjectResultStatusCodeOssFileOssPolicy 工具类FileTypeUtilMd5UtilMediaTypeMinioTemplate 文件分片上传与合并MinioFileControllerMini…

Avalonia与WPF开发时的差异总结

1.一个控件绑定到另外一个控件的属性 WPF: <TextBox Height"30" Width"100" x:Name"tb"></TextBox><TextBlock Text"{Binding ElementNametb,PathText}" ></TextBlock>Avalonia: <TextBox Height"3…

YarnClient发送和接收请求源码解析

YarnClient发送和接收请求流程 Yarn是通过RPC协议通信的&#xff0c;协议类型可以通过查看RpcKind类得知&#xff0c;总共有三种类型&#xff1a; RPC_BUILTIN ((short) 1), // Used for built in calls by tests RPC_WRITABLE ((short) 2), // Use WritableRp…

比特币的签名和验证(基于ECDSA)

比特币&#xff08;Bitcoin&#xff09;和以太坊&#xff08;Ethereum&#xff09;等区块链技术使用了加密算法来确保交易的安全性。私钥签名和公钥验证是这些算法的核心部分&#xff0c;主要用于证明交易的发起者拥有交易中使用的资金的控制权&#xff0c;而不需要暴露私钥本身…

浪潮服务器NVME 硬盘通过 Intel VROC 做RAID

INTEL VROC Configuration solution 1.VMD configuration in BIOS Processor > IIO Configuration> Intel(R) VDM Technology> Intel(R) VMD for volume Management Device on Socket 0 “CPU 0”, Intel VMD for volume management device for “PStack0” or “PSta…

【香橙派系列教程】(十七) 视觉垃圾桶-功能完善优化

【十七】视觉垃圾桶-功能完善优化 文章目录 【十七】视觉垃圾桶-功能完善优化一、增加垃圾桶开关盖1.引脚2.PWM 频率的公式3.PWM_APIsoftPwmCreatesoftPwmWrite附加说明softPwmStop 4.代码pwm.cpwm.hmain.c 二、项目代码优化编译运行 三、增加OLED 屏幕显示功能myoled.hmyoled.…

小白之 FastGPT Windows 本地化部署

目录 引言环境步骤1. 安装 docker2. 启动 docker3. 浏览器访问4. One API 配置语言模型、向量模型渠道和令牌5. 创建 FastGPT 知识库6. 创建 FastGPT 应用 官方文档 引言 部署之前可以先看一下 RAG 技术原理&#xff0c;也可以后面回过头来看&#xff0c;对一些概念有些了解&a…

Qt+FFmpeg开发视频播放器笔记(一):环境搭建

一、FFmpeg介绍 FFmpeg是一个开源的跨平台多媒体处理工具集&#xff0c;它可以用于处理音频、视频和其他多媒体数据。FFmpeg提供了一组功能强大的命令行工具&#xff0c;用于音频和视频的编解码、转换、处理、流媒体传输等任务。 FFmpeg支持多种音频和视频格式&#xff0c;包…

【自动化】考试答题自动化完成答案,如何实现100%正确呢

一、科目仿真考试不能自动答题 我的答案是可以的&#xff0c;电脑程序可以模拟人的操作完成所有的答题并提交结束考试 二、分析页面内容 完成一个题目&#xff0c;包括判断题&#xff0c;对与错2选1答案&#xff0c;单选题ABCD4选1答案&#xff0c;多选题大家想一想 F12查看按…

C语言 ——— 将动态版本的通讯录实现为文件存储联系人模式

目录 前言 在退出通讯录之前 在运行通讯录之前 前言 在这篇博客中&#xff0c;实现了动态版本的通讯录&#xff0c;接下来会增加函数&#xff0c;能用文件存储通讯录中的联系人 C语言 ——— 在控制台实现通讯录&#xff08;增删查改、动态开辟内存空间&#xff09;-CSDN…

#网络高级 笔记

modbus_tcp协议 modbus_rtu协议和modbus库 http协议和web服务器搭建 服务器原码分析和基于WebServer的工业数据采集项目 第H5&#xff0c;即网页制作&#xff0c;项目完善 一、modbus起源 1.起源 Modbus由Modicon公司于1979年开发&#xff0c;是一种工业现场总线协议标准 Mo…

python将字典数据保存为json文件

目录 一、json库介绍 二、字典生成json文件 1、导入 json 模块 2、将字典数据保存为 json 文件 (1) 创建一个python字典 (2) 指定要保存的 json 文件路径 (3) 将字典数据存为 json 文件 3、读取 json文件&#xff0c;并打印 一、json库介绍 方法作用json.dumps()将py…

对数据处理过程中,缺失值和异常值应该怎么处理?

创作不易&#xff0c;您的关注、点赞、收藏和转发是我坚持下去的动力&#xff01; 大家有技术交流指导、论文及技术文档写作指导、项目开发合作的需求可以私信联系我。 在数据处理过程中&#xff0c;缺失值和异常值的处理是非常重要的步骤&#xff0c;它们可能会对模型的性能…

Datawhale AI夏令营第五期学习!

学习日志 日期&#xff1a; 2024年8月27日 今日学习内容&#xff1a; 今天&#xff0c;我学习了如何在深度学习任务中使用卷积神经网络&#xff08;CNN&#xff09;进行图像分类的基本流程&#xff0c;并成功地在JupyterLab中运行了一个完整的项目。以下是我今天的学习和操作…

【扩散模型(六)】IP-Adapter 是如何训练的?2 源码篇(IP-Adapter Plus)

系列文章目录 【扩散模型&#xff08;二&#xff09;】IP-Adapter 从条件分支的视角&#xff0c;快速理解相关的可控生成研究【扩散模型&#xff08;三&#xff09;】IP-Adapter 源码详解1-训练输入 介绍了训练代码中的 image prompt 的输入部分&#xff0c;即 img projection…

【Verilog 数字系统设计教程】Verilog 基础:硬件描述语言入门指南

目录 摘要 1. 引言 2. Verilog 历史与发展 3. Verilog 基本语法 4. Verilog 模块与端口 5. 组合逻辑与时序逻辑 6. 时钟域与同步设计 7. 测试与仿真 8. Verilog 高级特性 任务&#xff08;Tasks&#xff09; 函数&#xff08;Functions&#xff09; 多维数组 结构体…

【二叉树】OJ题目

&#x1f31f;个人主页&#xff1a;落叶 目录 单值⼆叉树 【单值二叉树】代码 相同的树 【相同二叉树】代码 对称⼆叉树 【对称二叉树】代码 另一颗树的子树 【另一颗树的子树】代码 二叉树的前序遍历 【二叉树前序遍历】代码 二叉树的中序遍历 【二叉树中序遍历】…