前言
昨天在看tcp拥塞控制中的BBR(Bottleneck Bandwidth and Round-trip propagation time)算法时,发现了这一特点:
在BBR以前的拥塞控制算法中(如Reno、Cubic、Vegas),都依赖于丢包事件的发生,在高并发时则会看到网络波动的现象。
而在BBR诞生后,由于其基于数学模型的思想,BBR通过对包延迟的实时计算,持续对瓶颈带宽进行估算,使得网络又快又稳。
又看了些BBR的相关资料后,笔者内心有被其中的数学机制深深惊艳。
BBR所涉及到的内容相对复杂,本文不进一步展开,本文记录一下与BBR核心思想类似的:Phi Accrual failure detector
这也是在我看了BBR后,所联想到的在平时项目中经常遇到一个技术。
存活探测机制
在了解Phi Accrual failure detector之前,一起回顾下在计算机中经常用到的一种技术:节点的存活探测。
用《西游记》的片段来讲,节点的存活探测可以描述为:“我叫你一声,你敢答应吗?”
其对应的结果,只有以下两种情况:
- 答应了,则对方是存活的。
- 未应答,则对方可能挂了。
传统存活探测机制
存活探测机制在分布式集群应用中应用非常广泛,如spring cloud中A服务调用B服务时,被调用的B服务一定是从注册中心中返回的存活(可用)实例。
亦或者,在开源SDN系统ONOS中,判断集群中的各兄弟节点是否存活,以判断是否需要触发设备的Master角色切换。
对于时效性比较高的场景,我们都期望存活探测的判断能又快又准。毕竟慢了会导致资源不可用,误判则又会耗费额外的资源,这些问题,我们都希望能避免掉。
为此,在时效性和准确性的综合考量下,多采用定时+多次确认的方式来实现对节点的存活监测。
在计算机网络领域中,BFD(Bidirectional Forwarding Detection)则是存活探测机制中比较有代表性的技术,它的实现过程也成为了许多系统自实现存活检测的参考对象。
一起回顾下BFD的故障监测机制,过程主要如下:
- 监听UDP:3784和3785端口
- 默认每1000毫秒发送一个ECHO包,用以探测对端(被监测方)是否存活
- 如果连续3个周期(1000ms*3,3s)未收到对端发来的ECHO包,则认为对端(被监测方)挂了
翻译成《西游记》中的场景,则为:
银角大王对孙悟空(被检测者)叫了一次”孙行者“后,孙悟空没有理会他(紫金葫芦没有反应),银角大王会再次重叫:“者行孙”、“行者孙”
然而,网络通信是一个非常复杂的过程,一个看似简单的点对点通信中间可能会经过大量的设备,任何一个链路的短板都可能造成各种问题,通常呈现出诸如:丢包、时延过长、包损坏等现象。
转为《西游记》的场景:孙悟空(被检测者)向银角大王回应说”爷爷在此”
- 丢包:这时天空刮起了大风把声音吹跑了,银角大王处无法听到
- 时延过大:刮起的大风是一个会弧形移动的风(龙卷风)转了一圈后才挂到银角大王的位置
- 包损伤:孙悟空(被检测者)和银角大王中间有很多小妖叽叽喳喳的喊叫,导致孙悟空说的”爷爷在此”在银角大王处只能依稀听到一个“爷”
举个链路延迟导致误判的例子:A节点检测B节点是否可达,在这两个节点并发不高的情况下,发送到收到一个检测包的时延为5ms,当遇到这两个节点在传输大量数据(高并发)时,则可能会导致3000ms后才能收到这个检测包,则A节点会认为B节点已不可达,但实际情况是B节点是正常的,只是中间链路稍微慢了一点而已。
如果使用传统的链路检测机制,则无法识别出此种情况。
缺点
总结一下传统节点故障的缺点
- 在系统高并发或链路不佳时,容易误判
- 系统需要提前设置故障超时时间,周期过长会造成故障识别慢,周期过短容易误判
Phi Accrual failure detector
为了解决上面的情况,实现以下几点:
- 存活探测的判断能又快又准
- 降低节点故障误判的概率
- 自动调整阈值周期
一种基于数学模型的故障检测机制便诞生了——Phi Accrual failure detector。
在论文《Phi Accrual failure detector》中,描述了一种如何使用概率来表征被检测节点是否故障的方法。
其中,Phi也就是符号:φ,公式如下:
用它代表:当前时刻(t now)被检测节点挂掉的概率。
- phi的值越大,则当前时刻被检测节点挂掉的概率越大
- -log10为对数计算,以将概率P later的概率值转为较大的正数,以方便计算
- P later中的t now- T last的含义是:最后一次收到心跳到当前时间的时间间隔,一般使用的单位为毫秒
它的核心思想为:假设网络延迟符合某种概率分布(如正态分布或指数分布),当一个节点预期的心跳包没有按时到达时,随着时间的推移,phi 值会逐渐增加。
在具体的Phi Accrual failure detector方法实现中,一般采用指数分布来进行实现。
指数分布(扩展阅读)
指数分布(Exponential Distribution)是一种连续概率分布,通常用于描述事件发生的时间间隔或等待时间。
假设某件事以一定的平均速率发生,指数分布就可以用来描述该事件发生前的时间长度。
以一个经典的包子铺场景为例:
假设你是一位卖包子的商家,包子店的客流量是随机的,即顾客的到来是无法提前预测的。如果我们要估计顾客之间的到达时间(比如两位顾客之间需要等待的时间),这就可以使用指数分布来建模。
故障检测中的具体实现
在分布式系统的故障检测中,每个节点会定期发送心跳包到监控系统(通常是一个中央协调器或其他节点),系统通过记录每次心跳包的到达时间来计算心跳间隔时间(即两个心跳之间的时间差),而这个周期性的心跳间隔时间一般来讲都是服从指数分布的。
以节点A监测节点B是否存活为例,节点B每间隔500ms向节点A发送一次心跳包,如节点A已经2000ms没有收到节点B的心跳包了,则会认为节点B大概率是挂了。
那么,节点A平均每500ms会收到节点B的心跳包,要预测节点A下一次收到节点B的心跳包的间隔,则可以用指数分布来预测。
指数分布的概率密度函数是:
在节点存活监测场景中:
- λ 是心跳到达的频率(即单位时间内预期有多少次心跳信号到达),如果心跳信号平均每 𝑇 秒发生一次,那么 𝜆=1/𝑇,即λ 是时间的倒数。如节点每500ms(即 0.5秒)发送一次心跳,则𝜆=1/0.5=2
- t 为从上一次心跳到当前的时间间隔
从上面的例图可以看出,随着时间 𝑇 的增加,节点A还未节点B的心跳包的概率密度逐渐减小
也就是:未收到被监测点的心跳包时间越久,被监测节点已经挂了的概率就越大
将其换成函数则是指数分布中的累积分布函数(CDF),即计算时间 x 后还没有收到被监测节点的心跳包,被监测节点已经故障的概率:
由于概率值它是一个百分数,在计算机中为了方便计算和处理,往往使用对数计算它这个概率值转为一个大一点的正数,再给这个数取个代号——Phi。
最后得出Phi 值的计算公式为:
- P(T>t) 是在时间 t 时刻仍然没有收到心跳的概率
- λ 是心跳到达的速率(即平均心跳频率的倒数)
因为log10(𝑒)≈0.4343,最后得 φ ≈ 0.4343⋅λ⋅T
Phi的参考值及含义:
phi = 0:刚收到心跳信号,节点正常
phi = 2-3:系统开始有轻微的怀疑,但仍然认为节点大概率正常
phi ≥ 8:系统高度怀疑节点已经失效
在实际应用中,λ 通常使用一个固定大小的滑动窗口中的平均值计算而来,滑动窗口中记录的是被监测节点的心跳包间隔时长。
使用滑动窗口来保存历史心跳间隔数据,这也是实现λ能动态自适应调整的重要原因。
atomix中的具体实现
使用Phi Accrual failure detector机制实现的节点存活监测开源框架非常多,比较知名的有Apache Cassandra、Akka、Hazelcast等
这里以SDN开源框架onos中的atomix组件为例,看一下它在ONOS集群中判断ONOS兄弟节点是否存活的相关源码,本文只关注Phi Accrual failure detector的实现部分。
atomix中实现了Phi Accrual failure detector的类为PhiAccrualFailureDetector,代码如下:
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;/*** Phi Accrual failure detector.* <p>* A modified version based on a paper titled:* "The φ Accrual Failure Detector" by Hayashibara, et al.*/
public class PhiAccrualFailureDetector {// Default value,窗口大小:250private static final int DEFAULT_WINDOW_SIZE = 250;// 最小窗口大小:25private static final int DEFAULT_MIN_SAMPLES = 25;// phi的计算因子:前文中提到的0.4343private static final double DEFAULT_PHI_FACTOR = 1.0 / Math.log(10.0);private final int minSamples;private final double phiFactor;// 历史心跳数据private final History history;/*** Creates a new failure detector with the default configuration.*/public PhiAccrualFailureDetector() {this(DEFAULT_MIN_SAMPLES, DEFAULT_PHI_FACTOR, DEFAULT_WINDOW_SIZE);}/*** Creates a new failure detector.** @param minSamples the minimum number of samples required to compute phi* @param phiFactor the phi factor*/public PhiAccrualFailureDetector(int minSamples, double phiFactor) {this(minSamples, phiFactor, DEFAULT_WINDOW_SIZE);}/*** Creates a new failure detector.** @param minSamples the minimum number of samples required to compute phi* @param phiFactor the phi factor* @param windowSize the phi accrual window size*/public PhiAccrualFailureDetector(int minSamples, double phiFactor, int windowSize) {this.minSamples = minSamples;this.phiFactor = phiFactor;this.history = new History(windowSize);}/*** 最后一个心跳到达的时间** @return the last time a heartbeat was reported*/public long lastUpdated() {return history.latestHeartbeatTime();}/*** Report a new heart beat for the specified node id.*/public void report() {report(System.currentTimeMillis());}/*** 上报一个最新的心跳到达时间* Report a new heart beat for the specified node id.** @param arrivalTime arrival time*/public void report(long arrivalTime) {checkArgument(arrivalTime >= 0, "arrivalTime must not be negative");long latestHeartbeat = history.latestHeartbeatTime();// 使用当前时间-上次接收时间,得到时间间隔history.samples().addValue(arrivalTime - latestHeartbeat);// 更新最后一次时间间隔history.setLatestHeartbeatTime(arrivalTime);}/*** phi计算,重点* Compute phi for the specified node id.** @return phi value*/public double phi() {long latestHeartbeat = history.latestHeartbeatTime();DescriptiveStatistics samples = history.samples();if (samples.getN() < minSamples) {//当样本数不足时,返回0.0,代表被监测节点是一定存活的return 0.0;}// 样本数充足,使用当前时间计算phi值return computePhi(samples, latestHeartbeat, System.currentTimeMillis());}/*** phi具体计算* Computes the phi value from the given samples.* <p>* The original phi value in Hayashibara's paper is calculated based on a normal distribution.* Here, we calculate it based on an exponential distribution.** @param samples the samples from which to compute phi* @param lastHeartbeat the last heartbeat* @param currentTime the current time* @return phi*/private double computePhi(DescriptiveStatistics samples, long lastHeartbeat, long currentTime) {// 获取出样本总数long size = samples.getN();// 计算出时间间隔long t = currentTime - lastHeartbeat;// samples.getMean()为样本的均值(平均间隔时间)// phi= 计算因子*时间间隔/心跳到达的速率return (size > 0)? phiFactor * t / samples.getMean(): 100;}/*** Stores the history of heartbeats for a node.*/private static class History {//存储样本的集合private final DescriptiveStatistics samples;long lastHeartbeatTime = System.currentTimeMillis();private History(int windowSize) {this.samples = new DescriptiveStatistics(windowSize);}DescriptiveStatistics samples() {return samples;}long latestHeartbeatTime() {return lastHeartbeatTime;}void setLatestHeartbeatTime(long value) {lastHeartbeatTime = value;}}
}
PhiAccrualFailureDetector 部分的源码和上文中的理论知识完全符合,总结起来就是:
phi = 计算因子 * 时间间隔 / 心跳到达的速率
再看一下atomix中调用phi计算的具体代码:
@Overridepublic CompletableFuture<Void> join(BootstrapService bootstrap, NodeDiscoveryService discovery, Member member) {if (started.compareAndSet(false, true)) {// ……// 注册心跳处理器,handleHeartbeat函数bootstrapService.getMessagingService().registerHandler(HEARTBEAT_MESSAGE, this::handleHeartbeat, heartbeatScheduler);// 定时发送心跳,默认1000ms发送一次heartbeatFuture = heartbeatScheduler.scheduleAtFixedRate(this::sendHeartbeats, 0, config.getHeartbeatInterval().toMillis(), TimeUnit.MILLISECONDS);LOGGER.info("Started");}return CompletableFuture.completedFuture(null);}/*** 收到心跳包的处理逻辑* Handles a heartbeat message.*/private byte[] handleHeartbeat(Address address, byte[] message) {GossipMember remoteMember = SERIALIZER.decode(message);LOGGER.trace("{} - Received heartbeat: {}", localMember.id(), remoteMember);// 更新被监测节点的最后一次包到达时间failureDetectors.computeIfAbsent(remoteMember.id(), n -> new PhiAccrualFailureDetector()).report();updateMember(remoteMember, true);// Return only reachable members to avoid populating removed members on remote nodes from unreachable members.return SERIALIZER.encode(Lists.newArrayList(members.values().stream().filter(member -> member.isReachable()).collect(Collectors.toList())));}/*** Sends a heartbeat to the given peer.*/private CompletableFuture<Void> sendHeartbeat(GossipMember member) {return bootstrapService.getMessagingService().sendAndReceive(member.address(), HEARTBEAT_MESSAGE, SERIALIZER.encode(localMember)).whenCompleteAsync((response, error) -> {if (error == null) {Collection<GossipMember> remoteMembers = SERIALIZER.decode(response);for (GossipMember remoteMember : remoteMembers) {if (!remoteMember.id().equals(localMember.id())) {updateMember(remoteMember, remoteMember.id().equals(member.id()));}}} else {LOGGER.debug("{} - Sending heartbeat to {} failed", localMember.id(), member, error);// ……PhiAccrualFailureDetector failureDetector = failureDetectors.computeIfAbsent(member.id(), n -> new PhiAccrualFailureDetector());// 计算出当前phi值double phi = failureDetector.phi();// phi值大于阈值(8),或处于初始化时上次收到心跳包距离当前时间超过了最长间隔阈值(10s)if (phi >= config.getPhiFailureThreshold()|| (phi == 0.0 && System.currentTimeMillis() - failureDetector.lastUpdated() > config.getFailureTimeout().toMillis())) {// 认为被监测的节点已经挂了,触发节点已下线的事件if (members.remove(member.id()) != null) {failureDetectors.remove(member.id());post(new GroupMembershipEvent(GroupMembershipEvent.Type.MEMBER_REMOVED, member));}}}}, heartbeatScheduler).exceptionally(e -> null).thenApply(v -> null);}
atomix中监测兄弟节点是否正常的代码位于HeartbeatMembershipProtocol类中,其实现的行为与《Phi Accrual failure detector》中所描述的过程基本一致:
- 当新的onos节点加入到集群中时,定时(默认1s)向其发送心跳包
- 当发送心跳包失败时(atomix中用的tcp方式发送),计算phi值
- 所计算的phi值大于阈值(默认8),则判定对端不可达
- phi值小于0但距离最后心跳包时间间隔过长(默认10秒),则判定对端不可达(用于处理采用数据不足的情况,类似于tcp拥塞控制中的慢启动)
- 当收到兄弟节点的心跳包后,会持续更新采样数据
- 更新间隔窗口数据
- 更新最后心跳包最新接收时间