使用kafka改造分布式事务

文章目录

  • 1、kafka确保消息不丢失?
    • 1.1、生产者端确保消息不丢失
    • 1.2、kafka服务端确保消息不丢失
    • 1.3、消费者确保正确无误的消费
  • 2、生产者发送消息 KafkaService
  • 3、UserInfoServiceImpl -> login()
  • 4、service-account - > AccountListener.java

1、kafka确保消息不丢失?

1.1、生产者端确保消息不丢失

  1. 发送模式:发后即忘、同步阻塞确认、异步非阻塞确认
  2. 生产者acks模式:props.put(“acks”, “all”)、acks: all(-1)
  3. 配置重试:props.put(“retries”, 3)、retries: 3

1.2、kafka服务端确保消息不丢失

  1. kafka是文件型的消息中间件,不会单纯的因为服务器宕机导致消息丢失
  2. 消息的log日志文件损坏:搭建kafka集群(副本)

1.3、消费者确保正确无误的消费

  1. 偏移量提交
     自动提交:enable-auto-commit: true
     手动提交:ack-mode: manual_immediate:同步提交 异步提交(推荐)
  2. 偏移量重置:
     auto-offset-reset: earliest -> 如果有偏移量则继续消费,如果偏移量没了,从头重新进行消费,可能会存在幂等性问题
     auto-offset-reset: latest -> 如果有偏移量则继续消费,如果偏移量不存在,只消费新消息,旧消息没消费完就丢掉了
     auto-offset-reset: none -> 如果有偏移量则继续消费,如果偏移量不存在,抛出异常
  3. 消费者重试:重试主题和死信主题, @RetryableTopic()

2、生产者发送消息 KafkaService

package com.atguigu.tingshu.common.service;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;import java.util.concurrent.CompletableFuture;@Service
public class KafkaService {private static final Logger logger = LoggerFactory.getLogger(KafkaService.class);@Autowiredprivate KafkaTemplate kafkaTemplate;/*** 向指定主题发送消息* 此方法通过调用重载的sendMsg方法,向指定主题发送消息,使用默认的消息标签和消息键** @param topic 发送消息的主题* @param msg   需要发送的消息内容*/public void sendMsg(String topic, String msg){// 调用重载的sendMsg方法,传入默认值以简化调用this.sendMsg(topic, null, null, msg);}/*** 发送消息到指定的Kafka主题** @param topic 消息主题* @param partition 分区编号* @param key 消息键值* @param msg 消息内容*/public void sendMsg(String topic, Integer partition, String key, String msg){// 发生消息并返回异步结果CompletableFuture<SendResult> future = this.kafkaTemplate.send(topic, partition, key, msg);// 异步处理发送结果future.whenCompleteAsync((result, ex) -> {if (ex != null){// 如果发送过程中出现异常logger.error("生产者发送消息失败!原因:{}", ex.getMessage());}});}}
  • whenCompleteAsync:异步完成时的处理、当异步操作完成时
    在这里插入图片描述

3、UserInfoServiceImpl -> login()

  • 此时 service-user 是生产者 发送消息

在这里插入图片描述

@Slf4j
@Service
@SuppressWarnings({"unchecked", "rawtypes"})
public class UserInfoServiceImpl extends ServiceImpl<UserInfoMapper, UserInfo> implements UserInfoService {@Autowiredprivate WxMaService wxMaService;@Autowiredprivate RedisTemplate redisTemplate;@Autowiredprivate UserAccountFeignClient userAccountFeignClient;@Autowiredprivate KafkaService kafkaService;/*** 根据微信返回的code进行用户登录* @param code 微信登录凭证* @return 返回包含登录令牌的Map对象*///@GlobalTransactional//@Transactional@Overridepublic Map<String, Object> login(String code) {// 创建一个HashMap对象用于存放返回的数据HashMap<String, Object> map = new HashMap<>();try {// 通过微信服务获取用户的会话信息WxMaJscode2SessionResult sessionInfo = this.wxMaService.getUserService().getSessionInfo(code);// 获取用户的openidString openid = sessionInfo.getOpenid();// 查询数据库中是否存在该openid对应的用户信息UserInfo userInfo = this.getOne(new LambdaQueryWrapper<UserInfo>().eq(UserInfo::getWxOpenId, openid));if (userInfo == null) {// 如果用户不存在,则创建一个新的UserInfo对象userInfo = new UserInfo();// 设置用户的openiduserInfo.setWxOpenId(openid);// 设置用户的昵称,其中包含一个随机生成的IDuserInfo.setNickname("这家伙太懒"+ IdWorker.getIdStr());// 设置用户的头像URLuserInfo.setAvatarUrl("https://img0.baidu.com/it/u=1633409170,3159960019&fm=253&fmt=auto&app=138&f=JPEG?w=500&h=500");// 保存用户信息到数据库this.save(userInfo);// 初始化用户账号信息//userAccountFeignClient.initAccount(userInfo.getId());this.kafkaService.sendMsg(KafkaConstant.QUEUE_USER_REGISTER,userInfo.getId().toString());//int i = 1 / 0;}// 生成一个随机的登录令牌String token = UUID.randomUUID().toString();// 创建一个UserInfoVo对象,用于存放用户信息UserInfoVo userInfoVo = new UserInfoVo();// 将UserInfo对象的属性复制到UserInfoVo对象中BeanUtils.copyProperties(userInfo, userInfoVo);// 将用户信息存储到Redis中,设置过期时间为30分钟this.redisTemplate.opsForValue().set(RedisConstant.USER_LOGIN_KEY_PREFIX + token, userInfoVo,RedisConstant.USER_LOGIN_KEY_TIMEOUT, TimeUnit.SECONDS);// 将生成的登录令牌放入Map对象中map.put("token", token);// 返回包含登录令牌的Map对象return map;} catch (WxErrorException e) {// 如果发生微信错误异常,抛出自定义的异常throw new GuiguException(ResultCodeEnum.LOGIN_AUTH);}}}

在这里插入图片描述

4、service-account - > AccountListener.java

  • 此时 service-account 是消费者 接收消息

在这里插入图片描述

@Slf4j
@Component
public class AccountListener {@Autowiredprivate UserAccountService userAccountService;@RetryableTopic(backoff = @Backoff(2000))@KafkaListener(topics = KafkaConstant.QUEUE_USER_REGISTER)public void listen(String userId, Acknowledgment ack){// 如果是空消息直接确认掉,后续不用再执行if (StringUtils.isBlank(userId)) {ack.acknowledge();return;}// 初始化账户this.userAccountService.saveAccount(Long.valueOf(userId));ack.acknowledge();// 手动确认}
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/409177.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Renesa Version Board开发RT-Thread 之UART驱动应用

目录 概述 1 硬件介绍 2 软件配置 2.1 RT-Thread Studio配置参数 2.2 FSP配置MCU 3 RT-Thread中UART的接口介绍 3.1 RT-Thread UART简介 3.2 RT-Thread 下的UART接口 4 UART的应用 4.1 应用功能实现 4.2 源代码文件 5 测试 程序下载地址&#xff1a; RenesaVersio…

应用层协议(上)Http(URL、Cookie、Session)内含逻辑图解通俗易懂!

绪论​ “少年没有乌托邦 心向远方自明朗”&#xff0c;本章是应用层常用且重要的协议htttp&#xff0c;没看过应用层建议一定先看那一篇后再看本章才能更好的去从上到下的理解应用层。 话不多说安全带系好&#xff0c;发车啦&#xff08;建议电脑观看&#xff09;。 1.Http协…

Linux rocky 9.2 安装mysql-8.0.39-linux-glibc2.28-x86_64.tar.xz

数据库官方下载&#xff1a;MySQL :: Download MySQL Community Server 本文也绑定该资源包&#xff0c;免费提供下载学习。 1.系统版本 2.新建目录&#xff0c;存放数据库安装包&#xff0c;并且上传 需要用到的工具&#xff1a;yum -y install vim lrzsz tar 上传解压&…

探索Python交互式编程的新境界:Python-prompt-toolkit的魔法

文章目录 探索Python交互式编程的新境界&#xff1a;Python-prompt-toolkit的魔法背景&#xff1a;为何选择Python-prompt-toolkit&#xff1f;Python-prompt-toolkit是什么&#xff1f;如何安装Python-prompt-toolkit&#xff1f;简单使用&#xff1a;Python-prompt-toolkit的…

C++,std::queue 详解

文章目录 1. 概述2. 包含头文件3. 基本操作3.1 构造函数3.2 赋值操作3.3 成员函数 4. 迭代器5. 示例6. 注意事项参考 1. 概述 std::queue 是 C 标准模板库&#xff08;STL&#xff09;中的一个容器适配器&#xff0c;它提供了一种先进先出&#xff08;FIFO&#xff09;的数据结…

【研发日记】嵌入式处理器技能解锁(五)——TI C2000 DSP的中断系统

文章目录 前言 背景介绍 中断框架 外设中断 ePIE模块 CPU中断 中断嵌套 应用实例 总结 参考资料 前言 见《【研发日记】嵌入式处理器技能解锁(一)——多任务异步执行调度的三种方法》 见《【研发日记】嵌入式处理器技能解锁(二)——TI C2000 DSP的SCI(串口)通信》 见…

基于Java的小区物业管理系统APP的设计与实现(论文+源码)_kaic

摘 要 小区物业管理系统是现代社会中非常热门的软件&#xff0c;伴随着社区规模的不断扩大和住户的不断增多&#xff0c;本系统的主要目的是辞别帐本以及传统的单一数据管理系统&#xff0c;快捷的保存用户各种数据信息。本系统针对Java系统展开&#xff0c;使用Java、SpringB…

无人机RTK定位定向技术详解

无人机RTK&#xff08;Real-Time Kinematic&#xff0c;实时动态差分技术&#xff09;定位定向技术&#xff0c;是无人机领域的一项高精度导航与定位技术。它结合了全球导航卫星系统&#xff08;如GPS、GLONASS、Galileo、BDS等&#xff09;与实时差分技术&#xff0c;通过地面…

超越GPT4V,最强多模态MiniCPM-V2.6模型分享

MiniCPM-V2.6是由OpenBMB开发的一款多模态大型语言模型&#xff08;MLLM&#xff09;&#xff0c;专为视觉-语言理解设计。 MiniCPM-V2.6模型能够处理图像、视频和文本输入&#xff0c;并提供高质量的文本输出。 MiniCPM-V 2.6模型在单图像理解方面超越了广泛使用的专有模型&…

机器学习课程学习周报九

机器学习课程学习周报九 文章目录 机器学习课程学习周报九摘要Abstract一、机器学习部分1.1 Word Embedding1.1.1 词嵌入的基本概念1.1.2 word2vec连续词袋模型CBOW1.1.3 word2vec跳字模型Skip-gram 1.2 Transformer代码实践DatasetDataloaderModelLearning rate scheduleModel…

windows javascript 打开、关闭摄像头

1. 效果 打开摄像头 关闭摄像头&#xff08;包括指示灯也关了的&#xff09; 2. 代码 open_close_camera.html // open_close_camera.html <!DOCTYPE html> <html><head><meta charset"UTF-8"><title>use camera</title>…

使用Dotween制作按钮弹性动画效果

效果&#xff1a; 方式&#xff1a; 优点&#xff0c;不需要写任何代码、稳定、可自定义效果

Agent实际落地的应用 未来生活的无形助手

在这个信息爆炸的时代&#xff0c;我们每个人都在追求更高效的生活方式。想象一下&#xff0c;如果有一个无形的助手&#xff0c;能够理解我们的需求&#xff0c;自动处理繁琐的任务&#xff0c;甚至为我们提供个性化的建议&#xff0c;那将是多么美好的体验&#xff01;这正是…

数字模拟IC设计前端、后端、前仿、后仿新版虚拟机

虚拟化平台&#xff1a;VMware Workstation 15 Pro以上版本 操作系统&#xff1a;CentOS Linux release 7.9.2009 (Core) 一、射频模拟IC设计必备软件 Cadence IC06.18.350/IC23.10.080&#xff08;virtuoso&#xff09; Cadence SPECTRE23.10.538-isr10 Cadence ASSURA04.…

Spring Boot OAuth2.0应用

本文展示Spring Boot中&#xff0c;新版本OAuth2.0的简单实现&#xff0c;版本信息&#xff1a; spring-boot 2.7.10 spring-security-oauth2-authorization-server 0.4.0 spring-security-oauth2-client 5.7.7 spring-boot-starter-oauth2-resource-server 2.7.10展示三个服务…

远程供水无障碍,管线车助力全面消防防护_鼎跃安全

夏季是各类自然灾害的高发季节&#xff0c;其中森林火灾尤为频繁。这一时期的气候特征是干旱少雨&#xff0c;伴随着高温和强风&#xff0c;使得森林火灾的发生频率大幅增加。由于夏季空气湿度低&#xff0c;植被含水量减少&#xff0c;一旦出现火源&#xff0c;火势极易蔓延。…

数据结构-链表-第二天

结合leetcode学习c 链表比数组更易增加和删除数据&#xff0c;但访问速度更慢 定义 链表&#xff08;linked list&#xff09;是一种线性数据结构&#xff0c;其中的每个元素都是一个节点对象&#xff0c;各个节点通过“引用”相连接。 引用记录了下一个节点的内存地址&#…

windows本地搭建zookeeper和kafka环境

zookeeper 1.1 下载zookeeper 下载地址 随便进一个站点&#xff0c;默认是新版本&#xff0c;旧版本点击archives进入&#xff0c;选择合适的版本下载&#xff0c;本文使用的是3.7.2 下载时候选择apache-zookeeper-3.7.2-bin.tar.gz 格式的&#xff0c;编译后的&#xff0c;解…

centos 虚拟机器刚刚安装没有ip地址的问题

刚刚安装好的虚拟机器&#xff0c;我们通过 ip addr 查看ip发现是这样的 该虚拟机器没有ip地址&#xff0c;那么怎么办 原来是在/etc/sysconfig/network-scripts/ifcfg-ens33中关于网络的配置有问题 ONBOOTno 表示不开启网卡&#xff0c;我们需要将这个值进行修改为yes 当前…

prolog 基础 - 关系和属性

首先进入环境&#xff1b; 看一下一开始的提示符是 ?- &#xff0c;现在可以用write语句输出一些东西&#xff1b; 根据资料&#xff0c;在prolog中&#xff0c; 两个对象之间的关系&#xff0c;使用括号表示。比如&#xff0c;jack的朋友是peter&#xff0c;写成friend(ja…