目录
前言
pom
yml
Controller
演示
注意
前言
工作中开发过一个服务,这里记作A服务,主要功能是配置,部署以及调用云函数。其中配置云函数的功能里,有一个配置项是并发数,意思是同一时间最多能有多少个请求调用已部署的云函数。
客户端通过调用A服务,再由A服务去调用云函数,从而实现客户端请求云函数的功能。在调用云函数时,是要根据配置的并发数控制请求的数量。如果时单机架构的话,直接使用JDK中的 Semaphore 就能够实现并发线程控制了。但是项目的架构是微服务架构,所以需要使用分布式信号量才行。
项目中有使用 Redis,所以可以基于 Redis 来实现分布式信号量,使用 Spring Boot 集成 Redisson 使用 RSemaphore 就能够轻松实现分布式信号量了。
pom
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.11.4</version></dependency>
yml
application.yml
spring:redis:redisson:config: "classpath:redisson.yml"server:port: 8888
redisson.yml
{"singleServerConfig":{"idleConnectionTimeout":10000,"pingTimeout":1000,"connectTimeout":10000,"timeout":3000,"retryAttempts":3,"retryInterval":1500,"subscriptionsPerConnection":5,"clientName":null,"address": "redis://localhost:6379","subscriptionConnectionMinimumIdleSize":1,"subscriptionConnectionPoolSize":50,"connectionMinimumIdleSize":32,"connectionPoolSize":64,"database":0},"threads":0,"nettyThreads":0,"codec":{"class":"org.redisson.codec.JsonJacksonCodec"},"transportMode":"NIO" }
Controller
以下示例代码演示如何使用 Redisson 中的 RSemaphore
初始化了一个 semaphore 的信号量,并设置其 permits 为 1,即最多一个线程能获取许可
接口 /redisson/semaphore 逻辑中,首先会尝试获取许可,拿到许可后才能往下执行业务逻辑,若是超过1秒还没获取到许可,则说明当前存在线程正在处理,就直接返回提示信息”服务器繁忙,请稍后再试“
@RestController
@RequestMapping("/redisson")
public class RedissonController implements InitializingBean {@Resourceprivate RedissonClient client;private RSemaphore semaphore;@Overridepublic void afterPropertiesSet() {semaphore = client.getSemaphore("semaphore");semaphore.trySetPermits(1);}/*** semaphore*/@GetMapping("/semaphore")public String semaphore() throws InterruptedException {if (!semaphore.tryAcquire(1, TimeUnit.SECONDS)) {return "服务器繁忙,请稍后再试";}try {//处理业务逻辑System.out.println("processing");Thread.sleep(1000 * 4);} catch (InterruptedException e) {throw new RuntimeException(e);} finally {semaphore.release();}return "成功";}
}
演示
启动后,使用 Navicat 连接上 redis 查看,可以看到上面已经设置了一个名为 semaphore, 类型为 string 的 key,其值为 1
Redisson客户端获取许可的底层是通过lua来实现的,如果能够成功获取返回true,否则返回false。调用 acquire 方法时,首先获取到剩余的许可数量,当只有剩余的许可数量大于想要获取的许可数量时返回1否则返回0。
上面的演示代码将分布式信号量的值设置为 1,所以同一时间最多只能有一个线程执行业务逻辑。
同时发送两个请求,就会返回“服务器繁忙,请稍后再试”
结果如下
注意
通过 Redisson 的 Rsemaphore,就能够轻松地实现分布式信号量,从而在分布式系统中控制并发请求数量了。不过使用 Redisson 地 Rsemaphore 时,也需要注意以下的问题
acquire 和 release 方法必须成对使用
也就是说成功调用了acquire获取到了许可,就必须保证执行完业务逻辑后,调用release释放许可。这样才能保证分布式信号不会越用越少,以至于到最后分布式信号量变成0,所有的请求都执行不了。
同理,也要保证程序中多次release,调用一次acquire获取许可,之后却多次调用release释放许可,会导致分布式信号量的值越来越大,从而无法实现并发控制访问请求的功能。