《百万人高并发场景下,我如何用无锁实现高性能LRU缓存?》
LRU算法核心原理
LRU(Least Recently Used)算法是缓存系统的核心淘汰策略,其核心逻辑可以用一张流程图描述:
(图:访问数据时触发链表重组,新增数据时触发淘汰检测)
一、分段锁设计思路
-
分段缓存(Segment):
将整个缓存按 key 的 hash 值划分为多个 Segment,每个 Segment 内部维护一个小型 LRU 缓存(HashMap + 双向链表)。 -
独立锁:
每个 Segment 都持有自己的 ReentrantLock,操作各自区域的数据时只锁定当前 Segment,而不同 Segment 的操作可以并发执行。 -
整体映射:
外层类根据 key 的 hash 值决定使用哪个 Segment,从而在降低锁竞争的同时仍能保证整体缓存的线程安全。
二、分段锁 LRU 缓存代码实现
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;/*** SegmentedLRUCache 是一个基于分段锁的线程安全 LRU 缓存实现。* 通过将缓存划分为多个 Segment,每个 Segment 内部维护自己的 HashMap 和双向链表,* 有效降低了全局锁的竞争,提高了并发性能。** @param <K> 键的类型* @param <V> 值的类型*/
public class SegmentedLRUCache<K, V> {/*** 每个 Segment 内部的缓存实现,包含一个 HashMap 和双向链表,并用 ReentrantLock 保护操作。*/private static class Segment<K, V> {private final int capacity; // 当前 Segment 的容量private final Map<K, Node<K, V>> map; // 快速查找 key 对应节点private Node<K, V> head; // 双向链表头,最近使用的数据private Node<K, V> tail; // 双向链表尾,最久未使用的数据private final ReentrantLock lock = new ReentrantLock();public Segment(int capacity) {this.capacity = capacity;this.map = new HashMap<>();}public V get(K key) {lock.lock();try {Node<K, V> node = map.get(key);if (node == null) {return null;}// 访问后将节点移动到链表头部moveToHead(node);return node.value;} finally {lock.unlock();}}public void put(K key, V value) {lock.lock();try {Node<K, V> node = map.get(key);if (node != null) {// 更新值并移动到头部node.value = value;moveToHead(node);} else {// 如果超出容量,则移除链表尾部节点(最久未使用)if (map.size() >= capacity) {if (tail != null) {map.remove(tail.key);removeNode(tail);}}Node<K, V> newNode = new Node<>(key, value);addNodeAtHead(newNode);map.put(key, newNode);}} finally {lock.unlock();}}/*** 将节点移动到链表头部。*/private void moveToHead(Node<K, V> node) {if (node == head) {return;}removeNode(node);addNodeAtHead(node);}/*** 从链表中移除节点。*/private void removeNode(Node<K, V> node) {if (node.prev != null) {node.prev.next = node.next;} else {head = node.next;}if (node.next != null) {node.next.prev = node.prev;} else {tail = node.prev;}node.prev = null;node.next = null;}/*** 将节点添加到链表头部。*/private void addNodeAtHead(Node<K, V> node) {node.next = head;node.prev = null;if (head != null) {head.prev = node;}head = node;if (tail == null) {tail = node;}}/*** 调试使用:打印当前 Segment 内缓存的状态(从头到尾)。*/public void printSegment() {lock.lock();try {Node<K, V> current = head;while (current != null) {System.out.print(current.key + ":" + current.value + " ");current = current.next;}System.out.println();} finally {lock.unlock();}}}/*** 双向链表节点,保存缓存中的键值对及前后指针。*/private static class Node<K, V> {K key;V value;Node<K, V> prev;Node<K, V> next;public Node(K key, V value) {this.key = key;this.value = value;}}private final int segmentCount; // Segment 数量private final Segment<K, V>[] segments; // 分段数组/*** 构造方法** @param capacity 整个缓存的总容量* @param segmentCount 分段数量,建议取 16 或 32*/@SuppressWarnings("unchecked")public SegmentedLRUCache(int capacity, int segmentCount) {if (capacity < segmentCount) {throw new IllegalArgumentException("容量必须大于或等于分段数量");}this.segmentCount = segmentCount;segments = new Segment[segmentCount];// 将总容量均摊到各个 Segment 上int capacityPerSegment = capacity / segmentCount;for (int i = 0; i < segmentCount; i++) {segments[i] = new Segment<>(capacityPerSegment);}}/*** 根据 key 的 hash 值定位到对应的 Segment。*/private Segment<K, V> segmentFor(K key) {int h = key.hashCode();// 使用 & 运算保证非负,再取模int index = (h & 0x7fffffff) % segmentCount;return segments[index];}/*** 获取指定 key 对应的值。*/public V get(K key) {if (key == null) throw new NullPointerException();return segmentFor(key).get(key);}/*** 插入或更新一个键值对。*/public void put(K key, V value) {if (key == null) throw new NullPointerException();segmentFor(key).put(key, value);}/*** 调试方法:打印所有 Segment 中缓存的状态。*/public void printCache() {for (int i = 0; i < segmentCount; i++) {System.out.print("Segment " + i + ": ");segments[i].printSegment();}}/*** 测试入口:演示 SegmentedLRUCache 在多线程环境下的表现。*/public static void main(String[] args) {// 总容量为 16,分成 4 个 Segmentfinal SegmentedLRUCache<Integer, String> cache = new SegmentedLRUCache<>(16, 4);// 写线程:不断插入数据Runnable writer = () -> {for (int i = 0; i < 50; i++) {cache.put(i, "Value" + i);System.out.println(Thread.currentThread().getName() + " put: " + i + " => Value" + i);try {Thread.sleep(20);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}};// 读线程:随机读取数据Runnable reader = () -> {for (int i = 0; i < 50; i++) {int key = (int) (Math.random() * 50);String value = cache.get(key);System.out.println(Thread.currentThread().getName() + " get: " + key + " => " + value);try {Thread.sleep(30);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}};Thread writerThread1 = new Thread(writer, "Writer-1");Thread writerThread2 = new Thread(writer, "Writer-2");Thread readerThread1 = new Thread(reader, "Reader-1");Thread readerThread2 = new Thread(reader, "Reader-2");writerThread1.start();writerThread2.start();readerThread1.start();readerThread2.start();try {writerThread1.join();writerThread2.join();readerThread1.join();readerThread2.join();} catch (InterruptedException e) {Thread.currentThread().interrupt();}System.out.println("Final cache state:");cache.printCache();}
}
三、代码详解
-
分段划分:
- 构造方法中,我们将总容量均摊到各个 Segment,每个 Segment 内部维护一个 HashMap 和双向链表。
- 方法
segmentFor(key)
根据 key 的 hash 值计算出所属的 Segment,从而确保不同 key 分布到不同的 Segment。
-
细粒度锁定:
- 每个 Segment 内部用
ReentrantLock
对操作进行保护,保证在同一 Segment 内的并发操作安全。 - 不同 Segment 之间互不干扰,极大减少了锁竞争。
- 每个 Segment 内部用
-
LRU 实现细节:
- 每个 Segment 内的双向链表用于记录访问顺序。
- 访问(get)或更新(put)时,都会将对应节点移到链表头部;当容量超限时,移除尾部节点(最久未使用的数据)。
-
多线程测试:
- 在
main
方法中,我们启动多个读写线程,模拟高并发访问场景,并最终打印每个 Segment 内的缓存状态以验证正确性。
- 在
四、总结
通过分段锁设计,我们将单一全局锁的竞争问题转化为多个独立锁的局部竞争,从而在高并发场景下显著提升 LRU 缓存的性能。
兼顾了线程安全和高效性,非常适用于对缓存响应时间要求较高的业务场景。源码经过测试,拿走即用,你可以根据实际情况调整分段数及容量,进一步优化性能。
最后
欢迎关注gzh:加瓦点灯,每天推送干货知识!