分布式并发场景的核心问题与解决方案

文章目录

  • 分布式并发场景的核心问题与解决方案
    • 一、核心问题分析
      • 1. 分布式事务问题
      • 2. 数据一致性问题
      • 3. 并发控制问题
      • 4. 分布式锁失效问题
    • 二、解决方案
      • 1. 分布式事务解决方案
        • 1.1 可靠消息最终一致性方案
        • 1.2 TCC方案实现
      • 2. 缓存一致性解决方案
        • 2.1 延迟双删策略
        • 2.2 Canal方案
      • 3. 并发控制解决方案
        • 3.1 基于Redis的原子操作
        • 3.2 防重复提交
    • 三、系统监控与告警
      • 1. 分布式链路追踪
      • 2. 监控指标收集
    • 四、最佳实践建议
    • 五、注意事项

分布式并发场景的核心问题与解决方案

一、核心问题分析

1. 分布式事务问题

在分布式环境下,一个业务操作可能横跨多个服务,比如创建订单时涉及:

  • 订单服务:创建订单
  • 库存服务:扣减库存
  • 支付服务:冻结余额
  • 积分服务:赠送积分

可能出现的问题:

  • 部分服务成功,部分服务失败
  • 网络超时导致事务状态不确定
  • 服务宕机导致事务中断

2. 数据一致性问题

在分布式系统中,由于CAP理论的限制,我们通常需要在一致性和可用性之间做出选择。

典型场景:

  • 主从数据库的数据同步延迟
  • 分布式缓存的数据一致性
  • 跨服务的数据依赖

3. 并发控制问题

多个节点同时处理请求时的并发控制:

  • 超卖问题
  • 重复下单
  • 数据竞争

4. 分布式锁失效问题

  • Redis主从切换导致锁失效
  • 时钟不同步导致的锁判断错误
  • 网络分区导致的锁状态不一致

二、解决方案

1. 分布式事务解决方案

1.1 可靠消息最终一致性方案
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Autowiredprivate OrderMapper orderMapper;@Autowiredprivate MessageMapper messageMapper;@Transactional(rollbackFor = Exception.class)public void createOrder(OrderDTO orderDTO) {// 1. 事务消息表记录TransactionMessage message = new TransactionMessage();message.setMessageId(UUID.randomUUID().toString());message.setMessage(JSON.toJSONString(orderDTO));message.setStatus(MessageStatus.PREPARING);messageMapper.insert(message);// 2. 创建订单Order order = convertToOrder(orderDTO);orderMapper.insert(order);// 3. 发送事务消息sendTransactionMessage(message);}private void sendTransactionMessage(TransactionMessage message) {Message msg = MessageBuilder.withPayload(message).setHeader("messageId", message.getMessageId()).build();TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("ORDER_TOPIC",msg,null);if (result.getLocalTransactionState() != LocalTransactionState.COMMIT_MESSAGE) {throw new BusinessException("发送事务消息失败");}}
}

消息消费者实现:

@Component
@RocketMQMessageListener(topic = "ORDER_TOPIC",consumerGroup = "order-consumer-group"
)
public class OrderMessageListener implements RocketMQListener<Message> {@Autowiredprivate StockService stockService;@Autowiredprivate MessageMapper messageMapper;@Overridepublic void onMessage(Message message) {String messageId = message.getHeaders().get("messageId", String.class);// 1. 检查消息是否已处理if (messageMapper.checkProcessed(messageId)) {return;}try {// 2. 处理业务逻辑TransactionMessage txMessage = JSON.parseObject(new String((byte[]) message.getPayload()),TransactionMessage.class);OrderDTO orderDTO = JSON.parseObject(txMessage.getMessage(),OrderDTO.class);// 3. 扣减库存stockService.decreaseStock(orderDTO.getProductId(), orderDTO.getQuantity());// 4. 更新消息状态messageMapper.markAsProcessed(messageId);} catch (Exception e) {// 5. 失败处理messageMapper.markAsFailed(messageId, e.getMessage());// 根据业务需求决定是否抛出异常重试throw e;}}
}
1.2 TCC方案实现
@Service
public class OrderTccServiceImpl implements OrderTccService {@Autowiredprivate OrderMapper orderMapper;@Autowiredprivate StockTccService stockTccService;@Autowiredprivate PaymentTccService paymentTccService;@GlobalTransactionalpublic void createOrder(OrderDTO orderDTO) {// 1. Try阶段// 1.1 订单服务TryOrder order = prepareTryOrder(orderDTO);// 1.2 库存服务TrystockTccService.tryDecrease(orderDTO.getProductId(),orderDTO.getQuantity());// 1.3 支付服务TrypaymentTccService.tryFreeze(orderDTO.getUserId(),orderDTO.getAmount());}// Try阶段的订单处理private Order prepareTryOrder(OrderDTO orderDTO) {Order order = convertToOrder(orderDTO);order.setStatus(OrderStatus.TRY);orderMapper.insert(order);return order;}// Confirm阶段的订单处理public void confirmOrder(BusinessActionContext context) {String orderId = context.getActionContext("orderId").toString();Order order = orderMapper.selectById(orderId);order.setStatus(OrderStatus.CONFIRMED);orderMapper.updateById(order);}// Cancel阶段的订单处理public void cancelOrder(BusinessActionContext context) {String orderId = context.getActionContext("orderId").toString();Order order = orderMapper.selectById(orderId);order.setStatus(OrderStatus.CANCELED);orderMapper.updateById(order);}
}

2. 缓存一致性解决方案

2.1 延迟双删策略
@Service
public class ProductServiceImpl implements ProductService {@Autowiredprivate ProductMapper productMapper;@Autowiredprivate RedisTemplate<String, Product> redisTemplate;@Autowiredprivate ThreadPoolExecutor threadPoolExecutor;private static final String PRODUCT_CACHE_KEY = "product:";private static final long DELAY_DELETE_TIME = 1000; // 1秒@Transactional(rollbackFor = Exception.class)public void updateProduct(Product product) {// 1. 删除缓存String cacheKey = PRODUCT_CACHE_KEY + product.getId();redisTemplate.delete(cacheKey);// 2. 更新数据库productMapper.updateById(product);// 3. 延迟双删threadPoolExecutor.execute(() -> {try {Thread.sleep(DELAY_DELETE_TIME);redisTemplate.delete(cacheKey);} catch (InterruptedException e) {Thread.currentThread().interrupt();log.error("延迟双删失败", e);}});}
}
2.2 Canal方案
@Component
public class ProductCanalClient {@Autowiredprivate RedisTemplate<String, Product> redisTemplate;@Listen(table = "product")public void handleProductChange(CanalEntry.Entry entry) {if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {CanalEntry.RowChange rowChange = entry.getRowChange();for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {if (rowChange.getEventType() == CanalEntry.EventType.UPDATE) {// 处理更新事件handleProductUpdate(rowData);} else if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {// 处理删除事件handleProductDelete(rowData);}}}}private void handleProductUpdate(CanalEntry.RowData rowData) {// 解析变更数据Map<String, String> data = parseRowData(rowData.getAfterColumnsList());String productId = data.get("id");// 更新缓存String cacheKey = "product:" + productId;Product product = convertToProduct(data);redisTemplate.opsForValue().set(cacheKey, product);}
}

3. 并发控制解决方案

3.1 基于Redis的原子操作
@Service
public class StockService {@Autowiredprivate StringRedisTemplate redisTemplate;private static final String STOCK_KEY = "product:stock:";public boolean decreaseStock(Long productId, Integer quantity) {String key = STOCK_KEY + productId;// Lua脚本保证原子性String script = "local stock = redis.call('get', KEYS[1]) " +"if stock and tonumber(stock) >= tonumber(ARGV[1]) then " +"    return redis.call('decrby', KEYS[1], ARGV[1]) " +"end " +"return -1";DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();redisScript.setScriptText(script);redisScript.setResultType(Long.class);Long result = redisTemplate.execute(redisScript,Collections.singletonList(key),quantity.toString());return result != null && result >= 0;}
}
3.2 防重复提交
@Aspect
@Component
public class RepeatSubmitAspect {@Autowiredprivate StringRedisTemplate redisTemplate;@Around("@annotation(repeatSubmit)")public Object around(ProceedingJoinPoint joinPoint, RepeatSubmit repeatSubmit) throws Throwable {HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();String token = request.getHeader("token");String key = getRepeatSubmitKey(joinPoint, token);// 使用Redis的setIfAbsent实现防重boolean isNotRepeat = redisTemplate.opsForValue().setIfAbsent(key,"1",repeatSubmit.interval(),TimeUnit.MILLISECONDS);if (!isNotRepeat) {throw new BusinessException("请勿重复提交");}return joinPoint.proceed();}
}

三、系统监控与告警

1. 分布式链路追踪

@Configuration
public class SleuthConfig {@Beanpublic Sampler defaultSampler() {return Sampler.ALWAYS_SAMPLE;}
}

2. 监控指标收集

@Component
public class DistributedMetrics {@Autowiredprivate MeterRegistry registry;// 记录分布式锁获取情况private Counter lockCounter;private Timer lockTimer;@PostConstructpublic void init() {lockCounter = registry.counter("distributed.lock.acquire");lockTimer = registry.timer("distributed.lock.time");}public void recordLockAcquire(String lockKey, boolean success) {lockCounter.increment();Tags tags = Tags.of("lock_key", lockKey,"success", String.valueOf(success));registry.counter("distributed.lock.acquire", tags).increment();}public void recordLockTime(String lockKey, long timeMillis) {lockTimer.record(timeMillis, TimeUnit.MILLISECONDS);}
}

四、最佳实践建议

  1. 业务设计层面

    • 尽量避免复杂分布式事务
    • 考虑业务可补偿性
    • 合理设计重试机制
  2. 技术选型层面

    • 优先考虑消息队列解耦
    • 合理使用缓存
    • 选择合适的分布式事务方案
  3. 监控运维层面

    • 完善的监控系统
    • 合理的告警阈值
    • 灾难恢复预案
  4. 性能优化层面

    • 合理的数据分片策略
    • 避免长事务
    • 批量处理优化

五、注意事项

  1. 数据库层面

    • 避免大事务
    • 合理设计索引
    • 注意死锁问题
  2. 缓存层面

    • 防止缓存雪崩
    • 注意缓存穿透
    • 合理设置过期时间
  3. 消息队列层面

    • 保证消息可靠性
    • 处理重复消息
    • 注意消息顺序性
  4. 分布式锁层面

    • 防止锁失效
    • 避免死锁
    • 合理设置超时时间

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/455970.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

进程间通信(二)消息队列、共享内存、信号量

文章目录 进程间通信System V IPC概述System V IPC 对象的访问消息队列示例--使用消息队列实现进程间的通信 共享内存示例--使用共享内存实现父子进程间的通信&#xff08;进程同步&#xff09;示例--使用进程实现之前的ATM案例&#xff08;进程互斥&#xff09; 信号量示例--利…

Linux笔记---vim的使用

1. vim的基本概念 Vim是一款功能强大的文本编辑器&#xff0c;它起源于Unix系统的vi编辑器&#xff0c;并在其基础上进行了许多改进和增强。 Vim以其高效的键盘操作、高度的可定制性和强大的文本处理能力而闻名&#xff0c;尤其受程序员和系统管理员的欢迎。 Vim支持多种模式…

STM32之基本定时器TIM6和TIM7

1.定时器概念和作用 在编程任务中&#xff0c;定时器是非常常用的一个问题。当需要定时发送数据&#xff0c;定时起某个任务&#xff0c;定时做某个操作等等&#xff0c;这些都离不开定时器。本文基于以STM32F4xx系列开发板&#xff0c;介绍一下基本定时器。 2.基本定时器TIM…

基于Ubuntu24.04,下载并编译Android12系统源码 (二)

1. 前言 上篇文章&#xff0c;我们基于Ubuntu24.04&#xff0c;已经成功下载下来了Android12的源码&#xff0c;这篇文章我们会接着上文&#xff0c;基于Ubuntu24.04来编译Android源码。 2. 编译源码 2.1 了解源码编译的名词 Makefile &#xff1a; Android平台的一个编译系…

鸿蒙网络编程系列28-服务端证书锁定防范中间人攻击示例

1. TLS通讯中间人攻击及防范简介 TLS安全通讯的基础是基于对操作系统或者浏览器根证书的信任&#xff0c;如果CA证书签发机构被入侵&#xff0c;或者设备内置证书被篡改&#xff0c;都会导致TLS握手环节面临中间人攻击的风险。其实&#xff0c;这种风险被善意利用的情况还是很…

PHP企业门店订货通进销存系统小程序源码

订货通进销存系统&#xff0c;企业运营好帮手&#xff01; &#x1f4e6; 开篇&#xff1a;告别繁琐&#xff0c;企业运营新选择 嘿&#xff0c;各位企业主和创业者们&#xff01;今天我要给大家介绍一款超实用的企业运营神器——“订货通进销存系统”。在这个数字化时代&…

Docker入门之构建

Docker构建概述 Docker Build 实现了客户端-服务器架构&#xff0c;其中&#xff1a; 客户端&#xff1a;Buildx 是用于运行和管理构建的客户端和用户界面。服务器&#xff1a;BuildKit 是处理构建执行的服务器或构建器。 当您调用构建时&#xff0c;Buildx 客户端会向 Bui…

Element UI

Element ui 就是基于vue的一个ui框架,该框架基于vue开发了很多相关组件,方便我们快速开发页面。 官网: https://element.eleme.io/#/zh-CN 安装Element UI vue init webpack element(项目名)确认项目是否构建成功&#xff1a;进入到项目的根路径 执行 npm start 访问 h…

NSSCTF

[NSSRound#1 Basic]basic_check nikto扫描 nikto -h url PUT请求&#xff0c;如果不存在这个路径下的文件&#xff0c;将会创建&#xff0c;如果存在&#xff0c;会执行覆盖操作。 [NSSRound#8 Basic]MyDoor if (isset($_GET[N_S.S])) {eval($_GET[N_S.S]); } php特性&#…

形式架构定义语言(ADL)

简介 形式规范 多年来&#xff0c;学术界一直在试图通过使用与测试截然不同且更加主动的方法来确保程序语义的正确执行&#xff1a;形式化方法。研究者们认为这种方法通过更加精确、无二义性的描述来达到让程序绝对地按照设计者的思想执行的目的。这种思想早期体现在Floyd在1…

STM32之OLED驱动函数

类似51单片机中的LCD1602驱动差不多&#xff0c; 1.oled驱动代码 oled.c #include "stm32f10x.h" #include "OLED_Font.h"/*引脚配置*/ #define OLED_W_SCL(x) GPIO_WriteBit(GPIOB, GPIO_Pin_8, (BitAction)(x)) #define OLED_W_SDA(x) GPIO_WriteBi…

Python入门(二)编程中的“真”与“假”,单双向选择的判断

编程中的“真”与“假” 在编程中&#xff0c;这种“真”、“假”状态我们用布尔数来表示&#xff0c;“真”是True&#xff0c;“假”是False。 另一种方式&#xff0c;是通过比较运算得到。 如图&#xff0c;3赋值给a&#xff0c;1赋值给b&#xff0c;进行大小的比较。 a &g…

U9的插件开发之BE插件(1)

U9插件可分为&#xff1a;BE插件、BP插件、UI插件&#xff1b; BE(Business Entity) 简单就是指实体&#xff0c;U9的元数据。 我的案例是设置BE默认值&#xff0c;即在单据新增时&#xff0c;设置单据某一个字段的默认值&#xff0c;具体如下&#xff1a; 1.插件开发工具&a…

Linux的目录结构 常用基础命令(2)

Linux的目录结构 根目录&#xff1a; 所有分区、目录、文件等的位置起点 整个树形目录结构中&#xff0c;使用独立的一个“/”表示 常见的子目录 /root /bin /boot /dev /etc /home /var /usr /sbin 基础知识 以 . 开头的文件均为隐藏文件 路径用/分开 / 不在第一位就…

plsql 高版本用不了 expaste 插件 问题

plsql 高版本用不了 expaste 插件 问题 其实不是版本问题&#xff0c;而是高版本的咩有在用这个插件&#xff0c;在另外一个功能里面&#xff0c; 查询你要的数据&#xff0c; 选择数据&#xff0c;右键&#xff0c;点 右键 复制为表达式列表&#xff0c;即可 在空白处粘贴…

【C++】C++11基础入门

目录 一、C11发展史&#xff1a; 二、列表初始化&#xff1a; 1、初始化&#xff1a; 2、initializer_list函数&#xff1a; 三、声明&#xff1a; 1、auto自动识别类型&#xff1a; 2、decltype&#xff1a; 3、nullptr&#xff1a; 四、范围for&#xff1a; 五、STL…

vue3+vue-baidu-map-3x 实现地图定位

文档地址&#xff1a;一个是2一个是3 https://dafrok.github.io/vue-baidu-map/#/zh/index vue-baidu-map-3x 1.首先要到百度地图开放平台上建一个账号&#xff0c;如果有百度账号可以直接登录百度地图-百万开发者首选的地图服务商,提供专属的行业解决方案 2.点击控制台&am…

V2X介绍

文章目录 什么是V2XV2X的发展史早期的DSRC后起之秀C-V2XC-V2X 和DSRC 两者的对比 什么是V2X 所谓V2X&#xff0c;与流行的B2B、B2C如出一辙&#xff0c;意为vehicle to everything&#xff0c;即车对外界的信息交换。车联网通过整合全球定位系统&#xff08;GPS&#xff09;导…

C#使用log4net结合sqlite数据库记录日志

0 前言 为什么要把日志存到数据库里? 因为结构化的数据库存储的日志信息,可以写专门的软件读取历史日志信息,通过各种条件筛选,可操作性极大增强,有这方面需求的开发人员可以考虑。 为什么选择SQLite? 轻量级数据库,免安装,数据库的常用的基本功能都有,可以随程序…

如何打开/解包星露谷物语XNB文件(附软件资源)

一、什么是 XNB 文件&#xff1f; 游戏将数据、地图和纹理存储在 .xnb 这种压缩数据文件中&#xff0c;它们在游戏的 Content 文件夹中。例如&#xff0c;对话期间显示的阿比盖尔的头像来自这个文件&#xff1a; Content\Portraits\Abigail.xnb。解包这个文件&#xff0c;你会…