ChatGPT教你如何解决复杂高并发系统缓存设计(上)
❗缓存,消息队列,分库分表是高并发解决方案三剑客。
为什么需要设计系统缓存
设计系统缓存的主要目的是提高系统的性能和可伸缩性,同时减轻底层资源(如数据库、网络)的负载。以下是设计系统缓存的几个主要原因:
- 提升系统性能:缓存可以将频繁访问的数据存储在快速访问的位置,减少对底层数据源(如数据库)的访问次数和响应时间。通过缓存,系统可以更快地获取数据并提供更快的响应速度,从而提升整体系统性能。
- 减轻底层资源负载:数据库是典型的底层资源,访问数据库通常是一项相对较慢的操作。通过缓存,可以减轻对数据库的频繁访问,从而减少数据库负载,提高数据库的可扩展性和稳定性。
- 改善用户体验:缓存可以将数据就近存储,使用户能够更快地获取所需的数据,提供更快的响应时间。这可以改善用户体验,增加用户满意度,并促使用户更频繁地使用系统。
- 应对高并发访问:在高并发的情况下,如果每个请求都直接访问底层资源,容易导致资源瓶颈和性能下降。通过缓存,可以减少对底层资源的并发访问,提供更好的并发处理能力,确保系统的稳定性和可用性。
- 降低成本:缓存可以减少对昂贵资源(如数据库许可证、硬件资源)的需求。通过减少对底层资源的直接访问,可以降低系统的运行成本,并提高系统的可伸缩性。
总的来说,设计系统缓存是为了提高系统的性能、可扩展性和稳定性,提升用户体验,并降低系统成本。通过合理的缓存设计和策略,可以充分利用缓存技术的优势,优化系统架构和资源利用,使系统更具竞争力和可持续性。
如何解决复杂高并发系统缓存设计
解决复杂高并发系统的缓存设计是一个关键的问题,下面给出一些常见的缓存设计策略和注意事项:
- 数据唯一性和缓存一致性:在设计缓存时,需要确保缓存中的数据与源数据的唯一性和一致性。可以使用缓存更新策略,如缓存失效策略、更新通知策略等,来保证缓存中的数据与源数据保持一致。
- 缓存粒度:需要根据业务需求来确定缓存的粒度。过于细粒度的缓存可能导致缓存的管理和更新成本较高,而过于粗粒度的缓存可能无法充分利用缓存的优势。需要综合考虑数据的访问频率、更新频率以及缓存的大小等因素来确定合适的缓存粒度。
- 缓存过期策略:根据数据的访问模式和业务需求,选择合适的缓存过期策略。常见的缓存过期策略包括基于时间的过期、LRU(最近最少使用)策略、LFU(最不经常使用)策略等。可以根据业务特点来选择合适的缓存过期策略,以平衡缓存的更新成本和数据的实时性要求。
- 缓存穿透和缓存击穿:为了避免缓存穿透(即请求的数据在缓存和数据库中都不存在)和缓存击穿(即某一热点数据过期或被删除,导致大量请求直接访问数据库),可以采用技术手段来解决,如使用布隆过滤器预先过滤不存在的数据、设置热点数据永不过期、使用分布式锁等。
- 缓存并发访问:高并发环境下,缓存的并发访问可能导致缓存雪崩(即缓存集体失效)。为了解决这个问题,可以采用多级缓存架构、缓存预热、使用分布式缓存等技术手段来提高缓存的并发处理能力。
- 缓存监控和容量规划:需要对缓存进行监控和容量规划,及时发现和解决缓存的问题。可以使用监控工具来监控缓存的命中率、缓存大小、缓存失效率等指标,并根据实际情况进行容量规划和性能优化。
- 缓存更新策略:在数据更新时,需要更新缓存以保证数据的
总而言之,核心需解决两大问题:
- 缓存三大问题:缓存雪崩、缓存击穿、缓存穿透
- 数据唯一性和缓存一致性问题
缓存的三大问题
如何解决缓存雪崩、缓存击穿、缓存穿透
缓存雪崩、缓存击穿和缓存穿透是常见的缓存相关问题,下面分别介绍如何解决这些问题:
1、缓存雪崩(Cache Avalanche):
使用缓存过期时间随机化
通过为缓存设置随机的过期时间,避免多个缓存同时过期导致的数据库压力集中。
引入热点数据永不过期:
对于热点数据,可以设置其缓存永不过期,确保其始终可用。
缓存预热
在系统启动时或低峰期,提前加载缓存数据,避免在高峰期同时访问数据库。
使用多级缓存架构
采用多级缓存,将缓存分布在不同层级,避免单一缓存层级出现故障导致缓存雪崩。
2、缓存击穿(Cache Miss):
使用互斥锁(Mutex Lock)或分布式锁
在缓存失效时,通过加锁的方式保证只有一个线程可以访问数据库,其他线程等待该线程完成后从缓存中获取数据。
引入空值缓存(Null Cache)
当查询数据库结果为空时,将空值也缓存起来,设置较短的过期时间,避免多次查询数据库。
3、缓存穿透(Cache Penetration):
使用布隆过滤器(Bloom Filter)
布隆过滤器可以判断一个元素是否存在于集合中,用于预先过滤不存在的数据,减少对数据库的查询压力。
数据校验和合法性检查
在业务层面对请求参数进行校验和合法性检查,过滤掉非法请求,避免对不存在的数据进行缓存查询。
以上是一些常见的解决缓存雪崩、缓存击穿和缓存穿透问题的方法,将针对以上方法详细讲解。
使用多级缓存架构解决缓存雪崩
使用多级缓存架构可以有效地提高缓存的稳定性和性能。下面是一个基本的多级缓存架构的实现示例:
- 一级缓存(本地内存缓存):
- 使用一个本地内存缓存,如 ConcurrentHashMap,作为第一级缓存。
- 当数据被请求时,首先检查本地缓存中是否存在,如果存在则直接返回数据;如果不存在,则进入下一级缓存。
- 二级缓存(分布式缓存):
- 使用一个分布式缓存系统,如 Redis,作为第二级缓存。
- 当数据在一级缓存未命中时,通过访问分布式缓存来尝试获取数据。
- 如果分布式缓存中存在数据,则将数据存储到一级缓存,并返回数据;如果不存在,则进入下一级缓存。
- 三级缓存(持久化存储):
- 使用持久化存储,如数据库,作为第三级缓存。
- 当数据在一级缓存和二级缓存中未命中时,从持久化存储中读取数据,并将数据存储到一级缓存和二级缓存,以供后续访问。
❗多级缓存联想到[CPU多级缓存](https://zhuanlan.zhihu.com/p/370057417)。现代计算机系统中的CPU通常包含多级缓存,例如一级缓存(L1 Cache)、二级缓存(L2 Cache)和三级缓存(L3 Cache)。这些缓存层级用于存储CPU频繁访问的数据,以提高计算机系统的性能。
以下是一个简单的示例,展示如何在Java中使用多级缓存来处理电子商务网站的商品详情页:
public class ProductDetailCache {private static final int LOCAL_CACHE_EXPIRATION = 10; // 本地缓存过期时间,单位:秒private static final int DISTRIBUTED_CACHE_EXPIRATION = 60; // 分布式缓存过期时间,单位:秒private Cache<String, ProductDetail> localCache; // 本地缓存private Cache<String, ProductDetail> distributedCache; // 分布式缓存public ProductDetailCache() {// 初始化本地缓存,使用适合的本地缓存实现,如 Caffeine、Guava Cache 等localCache = Caffeine.newBuilder().expireAfterWrite(LOCAL_CACHE_EXPIRATION, TimeUnit.SECONDS).build();// 初始化分布式缓存,使用适合的分布式缓存实现,如 Redis、Memcached 等distributedCache = RedisCacheBuilder.newBuilder().expireAfterWrite(DISTRIBUTED_CACHE_EXPIRATION, TimeUnit.SECONDS).build();}public ProductDetail getProductDetail(String productId) {// 尝试从本地缓存获取商品详情ProductDetail productDetail = localCache.getIfPresent(productId);if (productDetail != null) {return productDetail;}// 尝试从分布式缓存获取商品详情productDetail = distributedCache.get(productId);if (productDetail != null) {// 将数据存储到本地缓存localCache.put(productId, productDetail);return productDetail;}// 从数据库中查询商品详情productDetail = databaseLookup(productId);if (productDetail != null) {// 将数据存储到本地缓存和分布式缓存localCache.put(productId, productDetail);distributedCache.put(productId, productDetail);}return productDetail;}private ProductDetail databaseLookup(String productId) {// 查询数据库,获取商品详情数据// ...// 返回商品详情对象}
}
在上述示例中,我们使用了两个缓存对象,一个是本地缓存(localCache
),另一个是分布式缓存(distributedCache
)。在获取商品详情时,首先尝试从本地缓存中获取数据,如果缓存命中则直接返回;如果本地缓存未命中,则尝试从分布式缓存中获取数据;如果分布式缓存也未命中,则从数据库中查询商品详情,并将结果存储到本地缓存和分布式缓存中。
本地缓存
本地缓存是指将数据存储在应用程序所在的进程或服务器的内存中,以便快速访问和提高系统性能。本地缓存通常是一个轻量级的数据存储,用于临时存储频繁访问或计算昂贵的数据,避免重复的计算或查询操作。
在Java中,常见的本地缓存实现包括:
- ConcurrentHashMap:Java标准库提供的线程安全的哈希表实现,适用于多线程环境下的本地缓存。
- Caffeine:一个高性能的Java本地缓存库,提供了丰富的特性和配置选项。
- Guava Cache:Google Guava库提供的本地缓存实现,简单易用,支持过期策略、最大容量控制等功能。
使用本地缓存时,需要考虑缓存的大小、过期策略、并发访问控制等因素,以确保缓存能够正确地满足系统需求,并避免潜在的问题(如缓存过期、内存溢出等)。
常见的本地缓存实现有 ConcurrentHashMap、Caffeine 和 Guava Cache。以下是它们的优缺点:
1、ConcurrentHashMap
- 优点:
- 线程安全:ConcurrentHashMap 是 Java 标准库提供的线程安全的哈希表实现,可以在多线程环境下使用。
- 高并发性能:ConcurrentHashMap 使用了分段锁机制,不同的线程可以同时访问不同的段,从而提高并发性能。
- 缺点:
- 缺少过期策略:ConcurrentHashMap 不支持自动过期策略,需要手动管理缓存的过期和清理。
- 缺少额外的特性:相对于其他专门的缓存库,ConcurrentHashMap 提供的功能相对简单,缺少一些高级特性。
ConcurrentHashMap 使用举例:
javaCopy code
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;public class Example {public static void main(String[] args) {// 创建 ConcurrentHashMap 实例Map<String, String> cache = new ConcurrentHashMap<>();// 向缓存中放入数据cache.put("key1", "value1");cache.put("key2", "value2");// 从缓存中获取数据String value1 = cache.get("key1");String value2 = cache.get("key2");System.out.println(value1); // 输出: value1System.out.println(value2); // 输出: value2}
}
2、Caffeine
- 优点:
- 高性能:Caffeine 是一个专注于高性能的本地缓存库,通过使用各种优化技术(如读写锁、无锁数据结构等),提供出色的读写性能。
- 丰富的特性:Caffeine 提供了许多有用的特性,如自动过期、异步加载、缓存统计等,可以灵活地配置和管理缓存行为。
- 缺点:
- 学习成本较高:Caffeine 的配置和使用相对复杂,需要一定的学习成本。
- 额外的依赖:使用 Caffeine 需要添加额外的依赖,可能会增加项目的复杂性。
- Caffeine 使用举例:
javaCopy code
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;public class Example {public static void main(String[] args) {// 创建 Caffeine 缓存实例Cache<String, String> cache = Caffeine.newBuilder().maximumSize(100).build();// 向缓存中放入数据cache.put("key1", "value1");cache.put("key2", "value2");// 从缓存中获取数据String value1 = cache.getIfPresent("key1");String value2 = cache.getIfPresent("key2");System.out.println(value1); // 输出: value1System.out.println(value2); // 输出: value2}
}
3、Guava Cache:
- 优点:
- 简单易用:Guava Cache 是 Google Guava 库提供的一个简单易用的本地缓存实现,具有清晰的 API 接口。
- 支持过期策略:Guava Cache 支持基于时间和大小的过期策略,可以根据需求设置缓存的过期时间和最大容量。
- 可配置性:Guava Cache 提供了丰富的配置选项,可以根据具体场景进行灵活配置。
- 缺点:
- 缺少异步加载:相比于其他一些缓存库,Guava Cache 不支持异步加载数据的能力。
需要根据具体的需求和场景选择合适的本地缓存实现。如果需要简单的线程安全缓存,可以使用 ConcurrentHashMap;如果对高性能和丰富特性有更高的要求,可以选择 Caffeine;如果希望简单易用且具备一定可配置性,可以考虑使用 Guava Cache。
Guava Cache 使用举例:
javaCopy code
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;import java.util.concurrent.TimeUnit;public class Example {public static void main(String[] args) {// 创建 Guava Cache 实例Cache<String, String> cache = CacheBuilder.newBuilder().maximumSize(100).expireAfterWrite(10, TimeUnit.MINUTES).build();// 向缓存中放入数据cache.put("key1", "value1");cache.put("key2", "value2");// 从缓存中获取数据String value1 = cache.getIfPresent("key1");String value2 = cache.getIfPresent("key2");System.out.println(value1); // 输出: value1System.out.println(value2); // 输出: value2}
}
分布式缓存
RedisTemplate实战
springboot整合redis,springboot2.0以后是使用lettuce(低版本)存在问题,堆外内存溢出
解决方法:
切换到jedis<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><exclusions><exclusion><groupId>io.lettuce</groupId><artifactId>lettuce-core</artifactId></exclusion></exclusions></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId></dependency>
Redis 的 SETNX
在 Redis 命令行中是这样执行的:
set <key> <value> NX
springboot整合redis
导入pom
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
yml中加入
spring:redis:database: 0host: 127.0.0.1port: 6379jedis:pool:max-active: 100max-idle: 10max-wait: 100000timeout: 5000
不管是RedisTemplate还是StringRedisTemplate,序列化都有问题,会出现乱码的情况:
在config包下面来设置RedisTemplate的序列化
/*** redis配置缓存*/
@Configuration
@EnableAutoConfiguration
@EnableCaching
public class RedisConfig {@Beanpublic RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory){RedisTemplate<String,Object> template = new RedisTemplate<>();//关联template.setConnectionFactory(factory);//设置key的序列化器template.setKeySerializer(new StringRedisSerializer());//设置value的序列化器template.setValueSerializer(new StringRedisSerializer());return template;}/*** 基于SpringBoot2 对 RedisCacheManager 的自定义配置* @param redisConnectionFactory* @return*/@Beanpublic RedisCacheManager cacheManager(RedisConnectionFactory redisConnectionFactory) {//初始化一个RedisCacheWriterRedisCacheWriter redisCacheWriter = RedisCacheWriter.nonLockingRedisCacheWriter(redisConnectionFactory);//设置CacheManager的值序列化方式为json序列化RedisSerializer<Object> jsonSerializer = new GenericJackson2JsonRedisSerializer();RedisSerializationContext.SerializationPair<Object> pair = RedisSerializationContext.SerializationPair.fromSerializer(jsonSerializer);RedisCacheConfiguration defaultCacheConfig = RedisCacheConfiguration.defaultCacheConfig().serializeValuesWith(pair);//设置默认超过时期是1天defaultCacheConfig.entryTtl(Duration.ofDays(1));Map<String, RedisCacheConfiguration> redisCacheConfigurationMap = new HashMap<>();// 需要作缓存在这里加上就加一个put即可redisCacheConfigurationMap.put("graph", this.getRedisCacheConfigurationWithTtl(1)); // 120秒后失效//初始化RedisCacheManagerreturn new RedisCacheManager(redisCacheWriter, defaultCacheConfig,redisCacheConfigurationMap);}private RedisCacheConfiguration getRedisCacheConfigurationWithTtl(Integer day) {RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig();RedisSerializer<Object> jsonSerializer = new GenericJackson2JsonRedisSerializer();RedisSerializationContext.SerializationPair<Object> pair = RedisSerializationContext.SerializationPair.fromSerializer(jsonSerializer);redisCacheConfiguration = redisCacheConfiguration.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jsonSerializer)).entryTtl(Duration.ofDays(day));return redisCacheConfiguration;}
}
Spring 提供了四个注解来声明缓存规则。@Cacheable,@CachePut,@CacheEvict,@Caching。
注解 | 描述 |
---|---|
@Cacheable | 在方法调用前,首先先去缓存中找方法的返回值,如果能找到,则返回缓存值,否则就执行这个方法,并将返回值放到缓存 |
@CachePut | 方法调用前不会去找缓存,无论如何都会执行方法,在执行后将缓存放入缓存中 |
@CacheEvict | 清除缓存中的一个或者多个记录,默认方法执行后删除缓存 |
@Caching | 能够同时应用多个缓存注解 |
使用缓存
1、pom
//springcahe<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-cache</artifactId></dependency>
//redis<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>
2、配置文件
spring.chache.type=redis
3、启动类上添加 @EnableCaching
注解。
自定义配置类
# 使用 Redis 作为缓存组件
spring.cache.type=redis
# 缓存过期时间为 3600s
spring.cache.redis.time-to-live=3600000
# 缓存的键的名字前缀
spring.cache.redis.key-prefix=passjava_
# 是否使用缓存前缀
spring.cache.redis.use-key-prefix=true
# 是否缓存控制,防止缓存穿透
spring.cache.redis.cache-null-values=true
互斥锁(Mutex Lock)或分布式锁解决缓存击穿
❗分布式锁推荐!!!这是讲解最好分布锁的文章!!!以下内容也是引用该篇文章https://my.oschina.net/u/4499317/blog/5039486
分布式锁
青铜方案(使用setnx容易造成死锁)
我们先用 Redis 的 SETNX 命令来实现最简单的分布式锁
- 多个并发线程都去 Redis 中申请锁,也就是执行 setnx 命令,假设线程 A 执行成功,说明当前线程 A 获得了。
- 其他线程执行 setnx 命令都会是失败的,所以需要等待线程 A 释放锁。
- 线程 A 执行完自己的业务后,删除锁。
- 其他线程继续抢占锁,也就是执行 setnx 命令。因为线程 A 已经删除了锁,所以又有其他线程可以抢占到锁了。
// 1.先抢占锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "123");
if(lock) {// 2.抢占成功,执行业务List<TypeEntity> typeEntityListFromDb = getDataFromDB();// 3.解锁redisTemplate.delete("lock");return typeEntityListFromDb;
} else {// 4.休眠一段时间sleep(100);// 5.抢占失败,等待锁释放return getTypeEntityListByRedisDistributedLock();
}
青铜缺陷:
setnx
占锁成功,业务代码出现异常或者服务器宕机,没有执行删除锁的逻辑,就造成了死锁。
解决方法:
设置锁的设置过期时间,过一段时间后,自动删除锁,这样其他线程就能获取到锁了。
白银方案(设置过期时间并非原子性)
// 1.先抢占锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "123");
if(lock) {// 2.在 10s 以后,自动清理 lockredisTemplate.expire("lock", 10, TimeUnit.SECONDS);// 3.抢占成功,执行业务List<TypeEntity> typeEntityListFromDb = getDataFromDB();// 4.解锁redisTemplate.delete("lock");return typeEntityListFromDb;
}
白银方案缺陷
白银方案看似解决了线程异常或服务器宕机造成的锁未释放的问题,但还是存在其他问题:
因为占锁和设置过期时间是分两步执行的,所以如果在这两步之间发生了异常,则锁的过期时间根本就没有设置成功。
所以和青铜方案有一样的问题:锁永远不能过期。
白银方案解决措施:
事务的原子性(Atom)
# 设置某个 key 的值并设置多少毫秒或秒 过期。
set <key> <value> PX <多少毫秒> NX
或
set <key> <value> EX <多少秒> NX
黄金方案(set EX NX命名原子性,业务还未执行完,但是过期时间已到期自动释放锁,其他用户获得锁并把他的锁释放)
//setIfAbsent("lock", "123", 10, TimeUnit.SECONDS);
// 1.先抢占锁与过期时间
Boolean lock = setIfAbsent("lock", "123", 10, TimeUnit.SECONDS);
if(lock) {// 3.抢占成功,执行业务List<TypeEntity> typeEntityListFromDb = getDataFromDB();// 4.解锁redisTemplate.delete("lock");return typeEntityListFromDb;
}
黄金方案的缺陷
因为锁的编号都叫做 “123”
,用户 A 只认锁编号,看见编号为 “123”
的锁就开,结果把用户 B 的锁打开了,此时用户 B 还未执行完任务,当然生气了。
铂金方案(设置唯一key)
- 设置锁的过期时间时,还需要设置唯一编号。
- 主动删除锁的时候,需要判断锁的编号是否和设置的一致,如果一致,则认为是自己设置的锁,可以进行主动删除。
// 1.生成唯一 id
String uuid = UUID.randomUUID().toString();
// 2. 抢占锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 10, TimeUnit.SECONDS);
if(lock) {System.out.println("抢占成功:" + uuid);// 3.抢占成功,执行业务List<TypeEntity> typeEntityListFromDb = getDataFromDB();// 4.获取当前锁的值String lockValue = redisTemplate.opsForValue().get("lock");// 5.如果锁的值和设置的值相等,则清理自己的锁if(uuid.equals(lockValue)) {System.out.println("清理锁:" + lockValue);redisTemplate.delete("lock");}return typeEntityListFromDb;
} else {System.out.println("抢占失败,等待锁释放");// 4.休眠一段时间sleep(100);// 5.抢占失败,等待锁释放return getTypeEntityListByRedisDistributedLock();
}
- 1.生成随机唯一 id,给锁加上唯一值。
- 2.抢占锁,并设置过期时间为 10 s,且锁具有随机唯一 id。
- 3.抢占成功,执行业务。
- 4.执行完业务后,获取当前锁的值。
- 5.如果锁的值和设置的值相等,则清理自己的锁。
钻石方案(删除锁不上原子性,查询锁的时候是相等但是,耗时太长,在B线程获得锁 了,并把它锁释放了)
上面的线程 A 查询锁和删除锁的逻辑不是原子性
的,所以将查询锁和删除锁这两步作为原子指令操作就可以了。
//这段 Redis 专属脚本
if redis.call("get",KEYS[1]) == ARGV[1]
thenreturn redis.call("del",KEYS[1])
elsereturn 0
end//这段脚本和铂金方案的获取key,删除key的方式很像。先获取 KEYS[1] 的 value,判断 KEYS[1] 的 value 是否和 ARGV[1] 的值相等,如果相等,则删除 KEYS[1]。
分两步:先定义脚本;用 redisTemplate.execute 方法执行脚本。
// 脚本解锁
String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid);
//而这段 Redis 脚本是由 Redis 内嵌的 Lua 环境执行的,所以又称作 Lua 脚本。
分布式锁中的王者方案-Redisson
布隆过滤器和缓存空对象解决缓存缓存穿透
布隆过滤器(Bloom Filter)是一种空间效率高、快速判断元素是否存在的概率型数据结构。它基于哈希函数和位数组实现,并可以用于检索一个元素是否存在于一个集合中。
布隆过滤器的主要特点包括:
- 空间效率高:布隆过滤器使用位数组来表示元素的存在情况,相比于存储实际数据,它所需的存储空间非常小。
- 快速查询:布隆过滤器通过多个哈希函数将元素映射到位数组的多个位置,检查这些位置是否都被标记为存在。如果有任何一个位置不存在,那么可以确定元素一定不存在于集合中。这种方式可以实现快速的查询操作。
- 概率性判断:布隆过滤器的查询结果有一定的概率性。当判断元素不存在时,一定不存在;但当判断元素存在时,有一定的可能是误判,即元素实际上可能不存在于集合中。
使用布隆过滤器时需要注意以下几点:
- 哈希函数的选择:布隆过滤器需要使用多个哈希函数来映射元素到位数组的不同位置。选择合适的哈希函数可以减少冲突,提高过滤器的性能。
- 位数组的大小:位数组的大小需要根据预期的元素数量和可接受的误判率来确定。较小的位数组可能会增加误判率,而较大的位数组则会增加存储空间。
- 误判率的权衡:布隆过滤器的设计需要在空间和查询性能与误判率之间进行权衡。较小的误判率会增加位数组的大小和哈希函数的数量,从而增加存储空间和计算复杂度。
- 动态扩展与数据删除:布隆过滤器通常不支持动态扩展和删除操作,因为删除元素会影响到其他元素的判断结果。如果需要动态操作,可能需要考虑其他数据结构。
布隆过滤器适用于需要判断元素是否可能存在于集合中,但对于具体存在性要求不严格的场景,例如缓存穿透的过滤、网页黑名单过滤等。
在Java中,可以使用第三方库实现布隆过滤器,例如 Google Guava 库中提供了 com.google.common.hash.BloomFilter
类。另外,Apache Commons Collections 也提供了布隆过滤器的实现。这些库提供了简单易用的 API,可以方便地创建和使用布隆过滤器。
缓存空对象
public class UserServiceImpl {@AutowiredUserDAO userDAO;@AutowiredRedisCache redisCache;public User findUser(Integer id) {Object object = redisCache.get(Integer.toString(id));// 缓存中存在,直接返回if(object != null) {// 检验该对象是否为缓存空对象,是则直接返回nullif(object instanceof NullValueResultDO) {return null;}return (User)object;} else { // 缓存中不存在,查询数据库User user = userDAO.getUser(id);// 存入缓存if(user != null) {redisCache.put(Integer.toString(id),user);} else {// 将空对象存进缓存redisCache.put(Integer.toString(id), new NullValueResultDO());}return user;}}
}
布隆过滤器
//哈希函数
public static class MyHash {private int cap;private int seed;// 初始化数据public MyHash(int cap, int seed) {this.cap = cap;this.seed = seed;}// 哈希函数public int hash(String value) {int result = 0;int len = value.length();for (int i = 0; i < len; i++) {result = seed * result + value.charAt(i);}return (cap - 1) & result;}}//布隆过滤器public class MyBloomFilter {// 布隆过滤器长度private static final int SIZE = 2 << 10;// 模拟实现不同的哈希函数private static final int[] num= new int[] {5, 19, 23, 31,47, 71}; // 初始化位数组private BitSet bits = new BitSet(SIZE);// 用于存储哈希函数private MyHash[] function = new MyHash[num.length];// 初始化哈希函数public MyBloomFilter() {for (int i = 0; i < num.length; i++) {function [i] = new MyHash(SIZE, num[i]);}}// 存值Api public void add(String value) {// 对存入得值进行哈希计算for (MyHash f: function) {// 将为数组对应的哈希下标得位置得值改为1bits.set(f.hash(value), true);}}// 判断是否存在该值得Apipublic boolean contains(String value) {if (value == null) {return false;}boolean result= true;for (MyHash f : func) {result= result&& bits.get(f.hash(value));}return result;}
}
谷歌布隆过滤器
<dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>27.0.1-jre</version>
</dependency>// BloomFilter.create(Funnels.stringFunnel(Charset.forName("utf-8")),size, 误判率);public static void MyBloomFilterSysConfig {@AutowiredOrderMapper orderMapper// 1.创建布隆过滤器 第二个参数为预期数据量10000000,第三个参数为错误率0.00001BloomFilter<CharSequence> bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.forName("utf-8")),10000000, 0.00001);// 2.获取所有的订单,并将订单的id放进布隆过滤器里面List<Order> orderList = orderMapper.findAll()for (Order order;orderList ) {Long id = order.getId();bloomFilter.put("" + id);}
}
redis实现布隆过滤器,redisson
BloomFilter<String> bloomFilter = redisson.getBloomFilter("phoneList");//初始化布隆过滤器:预计元素为100000000L,误差率为3%bloomFilter.tryInit(100000000L,0.03);//将号码10086插入到布隆过滤器中bloomFilter.add("10086");
总结
本章主要是针对与解决复杂高并发系统缓存的缓存三大问题进行讲解,后续将针对如何解决缓存的一致性进行论述。