版本 awsVersion = ‘1.11.277’
LeaseManager 接口管理实例的租约信息,提供以下功能:
- 注册实例
- 取消注册实例
- 实例续约
- 剔除过期实例
public interface LeaseManager<T> {/** 注册实例并续约*/void register(T r, int leaseDuration, boolean isReplication);/*** 取消注册实例*/boolean cancel(String appName, String id, boolean isReplication);/*** 续约*/boolean renew(String appName, String id, boolean isReplication);/*** 剔除过期实例*/void evict();
}
InstanceRegistry 接口即注册表服务,继承 LeaseManager 接口,提供以下功能:
- 启动和关闭注册表服务
- 更新注册表中实例的状态
- 从注册表中获取应用信息和实例信息
- 初始化和获取注册表缓存
- 租约过期机制和自我保护机制(和 LeaseManager 的 evict() 方法相关)
public interface InstanceRegistry extends LeaseManager<InstanceInfo>, LookupService<String> {// ========================// 启动和关闭注册表服务// ========================/*** 在PeerAwareInstanceRegistry接口的init()和syncUp()方法调用后被调用* 1.更新expectedNumberOfClientsSendingRenews* 更新numberOfRenewsPerMinThreshold* 2.如果从其他Eureka节点拉取注册表成功并且实例数量大于0* 设置peerInstancesTransferEmptyOnStartup为false* 和PeerAwareInstanceRegistry接口的shouldAllowAccess()方法相关* 3.设置startupTime为当前时间* 4.设置自身实例状态为InstanceStatus.UP* 5.调用postInit()方法* 创建EvictionTask并通过Timer调度定时剔除过期实例* 配置evictionIntervalTimerInMs指定剔除过期实例的时间间隔,默认60s*/void openForTraffic(ApplicationInfoManager applicationInfoManager, int count);void shutdown();// ========================// 更新注册表中实例的状态// ========================@Deprecatedvoid storeOverriddenStatusIfRequired(String id, InstanceStatus overriddenStatus);/*** 更新注册表中实例的overriddenStatus*/void storeOverriddenStatusIfRequired(String appName, String id, InstanceStatus overriddenStatus);/*** 更新注册表中实例的overriddenStatus和status*/boolean statusUpdate(String appName,String id,InstanceStatus newStatus,String lastDirtyTimestamp,boolean isReplication);/*** 删除注册表中实例的overriddenStatus并设置status*/boolean deleteStatusOverride(String appName,String id,InstanceStatus newStatus,String lastDirtyTimestamp,boolean isReplication);/*** 获取注册表中overriddenStatus集合的快照*/Map<String, InstanceStatus> overriddenInstanceStatusesSnapshot();// ========================// 注册表 CRUD// ========================/*** 获取本地注册表*/Applications getApplicationsFromLocalRegionOnly();/*** 根据应用名称从本地注册表或其他region的注册表中获取应用信息*/Application getApplication(String appName, boolean includeRemoteRegion);/*** 根据应用名称和实例id从本地注册表或其他region的注册表中获取实例信息*/InstanceInfo getInstanceByAppAndId(String appName, String id);/*** 根据应用名称和实例id从本地注册表或其他region的注册表中获取实例信息*/InstanceInfo getInstanceByAppAndId(String appName, String id, boolean includeRemoteRegions);/*** 清空注册表*/void clearRegistry();// ========================// 注册表缓存// ========================/*** 初始化注册表缓存ResponseCacheImpl*/void initializedResponseCache();/*** 获取注册表缓存ResponseCacheImpl*/ResponseCache getResponseCache();// ========================// 租约过期机制&自我保护机制// ========================/*** 获取上一分钟收到的续约(renew)请求数*/long getNumOfRenewsInLastMin();/*** 获取每一分钟续约(renew)请求数的阈值* 如果上一分钟收到的续约请求数小于阈值,开启自我保护机制* 计算方式:实例数量 * (60 / 续约间隔时间)* 续约百分比阈值0.85* this.expectedNumberOfClientsSendingRenews * * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds()) ** serverConfig.getRenewalPercentThreshold())*/int getNumOfRenewsPerMinThreshold();/*** 是否启用租约过期机制*/boolean isLeaseExpirationEnabled();/*** 是否启用自我保护机制*/boolean isSelfPreservationModeEnabled();
}
Map<String, RemoteRegionRegistry> regionNameVSRemoteRegistry 是 AbstractInstanceRegistry 抽象类的成员变量,key 是 remoteRegionUrlsWithName 配置中的 regionName,value 则是 initRemoteRegionRegistry() 方法中创建的RemoteRegionRegistry 对象。
// 配置remoteRegionUrlsWithName
regionName1;regionUrl1,regionName2;regionUrl2...
RemoteRegionRegistry 类表示其他区域的注册表信息,配置 remoteRegion.registryFetchIntervalInSeconds
指定从其他区域拉取注册表信息的间隔时间,默认 30s
。
拉取成功后,将 readyForServingData 设置为 true,表示该区域的注册表已经可以提供服务。
Runnable remoteRegionFetchTask = new Runnable() {@Overridepublic void run() {try {if (fetchRegistry()) {readyForServingData = true;} else {logger.warn("Failed to fetch remote registry. " +"This means this eureka server " + "is not ready for serving traffic.");}}}
};scheduler.schedule(new TimedSupervisorTask("RemoteRegionFetch_" + regionName,scheduler,remoteRegionFetchExecutor,// 配置remoteRegion.registryFetchIntervalInSecondsserverConfig.getRemoteRegionRegistryFetchInterval(),TimeUnit.SECONDS,5, // exponential backoff boundremoteRegionFetchTask),serverConfig.getRemoteRegionRegistryFetchInterval(),TimeUnit.SECONDS);
remoteRegion.global.appWhiteList
和 remoteRegion.{regionName}.appWhiteList
配置全局和 regionName 指定区域的拉取白名单
,appName 不在白名单中的应用信息是无法拉取的。
PeerAwareInstanceRegistry 接口继承 InstanceRegistry 接口,提供以下功能:
public interface PeerAwareInstanceRegistry extends InstanceRegistry {/*** 初始化PeerAwareInstanceRegistryImpl,包括:* 1.实例化注册表缓存ResponseCacheImpl* 2.创建定时任务,定时更新numberOfRenewsPerMinThreshold* 配置renewalThresholdUpdateIntervalMs* 指定更新numberOfRenewsPerMinThreshold的时间间隔,默认15min* 3.初始化其他区域注册表regionNameVSRemoteRegistry*/void init(PeerEurekaNodes peerEurekaNodes) throws Exception;/*** 是否可以对外提供注册表服务* 1.如果在调用openForTraffic方法时* 从其他Eureka节点拉取注册表失败则返回false* 2.如果remoteRegionRequired为true* 还需要等待其他区域注册表全部拉取成功后才返回true*/boolean shouldAllowAccess(boolean remoteRegionRequired);/*** 从其他Eureka节点拉取注册表信息*/int syncUp();/*** 注册实例信息*/void register(InstanceInfo info, boolean isReplication);void statusUpdate(final String asgName,final ASGResource.ASGStatus newStatus,final boolean isReplication);
}
注:在 LeaseManager 接口中已经声明了 register(T r, int leaseDuration, boolean isReplication) 方法的前提下,为什么在 PeerAwareInstanceRegistry 接口中再次声明 register(InstanceInfo info, boolean isReplication) 方法呢?原来 register(InstanceInfo info, boolean isReplication) 方法是在 syncUp() 方法中被调用,是将从其他 Eureka 节点拉取过来的注册表中的实例信息注册到本地注册表中
。
// com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl
public int syncUp() {// 统计从其他Eureka节点同步过来的实例信息数量int count = 0;// ...// 从其他Eureka节点拉取注册表信息Applications apps = eurekaClient.getApplications();for (Application app : apps.getRegisteredApplications()) {for (InstanceInfo instance : app.getInstances()) {// ...// 判断该实例的availabilityZone是否和当前Eureka节点属于同一个region// 如果是,则将该实例注册到本地注册表if (isRegisterable(instance)) {register(instance,instance.getLeaseInfo().getDurationInSecs(),true);count++;}} }// ...return count;
}
PeerAwareInstanceRegistryImpl 类构造方法和 init 方法代码如下:
@Singleton
public class PeerAwareInstanceRegistryImplextends AbstractInstanceRegistryimplements PeerAwareInstanceRegistry {// startupTime、peerInstancesTransferEmptyOnStartup// 在openForTraffic方法被调用时赋值private long startupTime = 0;private boolean peerInstancesTransferEmptyOnStartup = true;// peerEurekaNodes在init方法被调用时赋值protected volatile PeerEurekaNodes peerEurekaNodes;// eurekaClient在构造方法被调用时赋值protected final EurekaClient eurekaClient;// instanceStatusOverrideRule在构造方法被调用时赋值private final InstanceStatusOverrideRule instanceStatusOverrideRule;// 定时调用updateRenewalThreshold方法private Timer timer = new Timer("ReplicaAwareInstanceRegistry - RenewalThresholdUpdater", true);@Injectpublic PeerAwareInstanceRegistryImpl(EurekaServerConfig serverConfig,EurekaClientConfig clientConfig,ServerCodecs serverCodecs,EurekaClient eurekaClient) {super(serverConfig, clientConfig, serverCodecs);this.eurekaClient = eurekaClient;this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1);// We first check if the instance is STARTING or DOWN,// then we check explicit overrides,// then we check the status of a potentially existing lease.this.instanceStatusOverrideRule =new FirstMatchWinsCompositeRule(new DownOrStartingRule(),new OverrideExistsRule(overriddenInstanceStatusMap),new LeaseExistsRule());}@Overridepublic void init(PeerEurekaNodes peerEurekaNodes) throws Exception {// 1.统计每分钟和其他Eureka节点的同步频率this.numberOfReplicationsLastMin.start();// 2.赋值peerEurekaNodes属性,保存Eureka集群节点信息this.peerEurekaNodes = peerEurekaNodes;// 3.创建本地注册表缓存ResponseCacheImplinitializedResponseCache();// 4.创建TimerTask,通过Timer调度updateRenewalThreshold方法// 定时更新numberOfRenewsPerMinThresholdscheduleRenewalThresholdUpdateTask();// 5.创建其他区域的注册表RemoteRegionRegistryinitRemoteRegionRegistry();// ...}
}
-
实现了
PeerAwareInstanceRegistry
接口,通过eurekaClient 属性
获得了从其他 Eureka 节点拉取注册表
(PeerAwareInstanceRegistry#syncUp()
方法)的能力, -
通过
peerEurekaNodes 属性
获得了将本地注册表的更新同步
给其他 Eureka 节点(PeerAwareInstanceRegistryImpl#replicateToPeers()
方法)的能力
注:为什么不将 Eureka 节点之间同步更新数据的操作和拉取注册表的操作一起声明在 PeerAwareInstanceRegistry 接口中,而是另外通过 PeerEurekaNode 类去实现呢?
既然 numberOfRenewsPerMinThreshold 是通过实例数量实时计算为什么不将 numberOfRenewsPerMinThreshold 属性声明在 PeerAwareInstanceRegistryImpl 类中,而是声明在父类 AbstractInstanceRegistry 中?
instanceStatusOverrideRule 根据规则计算实例的