1. 如何选择节点
1.1. 确定slot
1.1.1. 通过cmdSlot
方法确定在哪个槽上, 这一步只是本地计算
首先入口方法_process,先通过cmdSlot
方法用key计算此次应该落在哪个槽上
通过crc16sum
算法计算key应该属于哪个槽,slotNumber为16384
func Slot(key string) int {if key == "" {return RandomSlot()}key = Key(key)return int(crc16sum(key)) % slotNumber
}
1.2. 选取节点的核心方法
1.2.1. cmdNode内部实现
1.2.2. slotReadOnlyNode方法实现
func (c *ClusterClient) slotReadOnlyNode(state *clusterState, slot int) (*clusterNode, error) {if c.opt.RouteByLatency {return state.slotClosestNode(slot)}if c.opt.RouteRandomly {return state.slotRandomNode(slot)}return state.slotSlaveNode(slot)
}
2. 如何获取所有节点
2.1. c.state.Get(ctx)方法,获取所有节点
/*
如果之前没有获取过状态信息,调用Reload方法来重新加载状态信息
如果之前已经获取过状态信息,并且距离上次获取状态信息的时间超过10秒,那么会进行异步重新加载
*/
func (c *clusterStateHolder) Get(ctx context.Context) (*clusterState, error) {v := c.state.Load()if v == nil {return c.Reload(ctx)}state := v.(*clusterState)if time.Since(state.createdAt) > 10*time.Second {c.LazyReload()}return state, nil
}
2.1.1. 第一次时,调用reload
2.1.1.1. reload方法
reload方法内,核心方法为c.load(ctx)
, 而load方法,为初始化时通过newClusterStateHolder
设置进来的fn
2.1.1.2. fn如下,为核心获取所有节点的c.loadState
方法
2.1.2. 后续调用LazyReload
func (c *clusterStateHolder) LazyReload() {if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {return}go func() {defer atomic.StoreUint32(&c.reloading, 0)_, err := c.Reload(context.Background())if err != nil {return}time.Sleep(200 * time.Millisecond)}()
}
LazyReload
内部还是调用c.Reload
方法
2.2. 核心方法:获取所有节点loadState
方法实现
2.2.1. 核心为通过node.Client.ClusterSlots(ctx)
命令查询所有节点信息
// zhmark 2024/7/3 核心获取所有节点信息的方法
func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) {if c.opt.ClusterSlots != nil {slots, err := c.opt.ClusterSlots(ctx)if err != nil {return nil, err}return newClusterState(c.nodes, slots, "")}addrs, err := c.nodes.Addrs()if err != nil {return nil, err}var firstErr error// 从配置的addrs里,随机选择一个节点,执行查询所有节点for _, idx := range rand.Perm(len(addrs)) {addr := addrs[idx]node, err := c.nodes.GetOrCreate(addr)if err != nil {if firstErr == nil {firstErr = err}continue}slots, err := node.Client.ClusterSlots(ctx).Result()if err != nil {if firstErr == nil {firstErr = err}continue}return newClusterState(c.nodes, slots, node.Client.opt.Addr)}/** No node is connectable. It's possible that all nodes' IP has changed.* Clear activeAddrs to let client be able to re-connect using the initial* setting of the addresses (e.g. [redis-cluster-0:6379, redis-cluster-1:6379]),* which might have chance to resolve domain name and get updated IP address.*/c.nodes.mu.Lock()c.nodes.activeAddrs = nilc.nodes.mu.Unlock()return nil, firstErr
}
通过调用cluster slots
获取所有的节点信息
func (c cmdable) ClusterSlots(ctx context.Context) *ClusterSlotsCmd {cmd := NewClusterSlotsCmd(ctx, "cluster", "slots")_ = c(ctx, cmd)return cmd
}