负载均衡-P2C算法

P2C算法全称Pick of 2 choices,相比WRR,P2C有着更科学的LB策略,它通过随机选择两个节点后在这俩节点里选择优胜者来避免羊群效应,并通过指数加权移动平均算法统计服务端的实时状态,从而做出最优选择。

一、工作流程

P2C算法下的每个节点(下称Node)必须含有下方图中几个指标,它们的计算方法已经标出:

图1

因此最终loadbalancer里保存的节点就会变成下图的结构,pick节点时需要做如下比较:

图2

通过上面的流程可以看到,大体流程还是随机,相比普通的随机LB,它是随机选择两个node,然后比较它们的负载率,然后选出当前负载率最小的node。

二、数据统计

通过上面的简单介绍,可以知道P2C算法的大体流程,那么现在问题就变得简单多了,只需要知道负载率,就可以完成这个“简单”的负载均衡器,但是负载率是由上图五个指标共同参与计算完成的,那现在问题的关键就是如何完成这五个指标的统计,下面来介绍下这五个指标如何计算。

2.1:weight

这个很简单,主要是人为配置的定制,用于给不同的机器按照机器配置分配上不同的权重,权重越高,越容易被pick到。这个值可以做在服务注册与发现里,进行为每个节点分配一个权重值。

2.2:server_cpu

这个值可以通过服务端回写元数据来搞定,比如一次请求:

图3

所以这个值可以通过服务端埋点的方式解决掉。

2.3:inflight

这个代表节点请求拥塞度,代表着当前节点有多少个请求未完成或者正开始请求,它的统计也很简单:

图4

按照这种,每次pick到发送请求前先原子+1,response后说明一次请求完成,这时再原子-1,这样一增一减,在超多线程pick下(即高qps),当前线程获取到的inflight瞬间值,就是在这个时段的拥塞度,比如一个节点如果很闲,响应速度也快,那么它的拥塞度肯定极低(因为一增一减的操作很快就完成,不会淤积过多)。

2.4:latency & client_success

前面的都很好了理解,这俩属性比较麻烦,它们分别代表请求延迟率成功率

这俩值该怎么计算?一般来说是通过平均数的方式来计算,但计算平均数的方式有很多,可以先用我们最熟悉的算术平均数来计算,它的计算公式如下:

img

利用此算法套入我们的场景后,请求总耗时比上请求总次数,就是latency:

img

请求总成功数比上请求总次数,就是client_success:

img

整个过程可以抽象成下面这样:

图5

根据图5的流程,可以通过每次请求,累加总请求数、总耗时、总成功数,然后利用算术平均法更新latencyclient_success的值,这一切可以运作的很好,可是算术平均有个很大的缺陷,不够敏感,出现的网络波动一下就被平均了,是不是想到了你和马云放在一起统计资产时被平均的悲哀?

2.4.1:算术平均数模拟

首先模拟1000次请求,让每次请求在0~25之间产生随机数用来作为本次请求的耗时(为了模拟真实情况,让一些请求耗时过高),每次请求计算当前的算术平均数,然后可以得到下面的统计图:

图6

可以看到,正常情况下,算术平均数表现出很好的稳定性,红线一致维持在黑线的中间,且浮动不大。

现在让我们搞点事情,假设在第100次请求到第200次请求间,模拟下网络抖动,让这期间的响应时间变成125ms(相比正常情况翻五倍),得到统计图如下:

图7

可以看到平均数受到这次抖动的影响,需要非常长的时间才能感知到,恢复时又需要很长时间才能恢复到抖动前的较正常的水平。

也就是说,第100~200间的100次慢请求,不光要到140次请求后才能较明显的感知平均值的上涨,还导致网络恢复后,到第1000次请求时,平均值还没有恢复到正常水平(但其实从第200次请求后,响应时间就正常了),算术平均是所有数据的总平均,受过往值的影响非常深,以至于不能很好的反应某个时段的平均趋势,那么有没有更好的统计办法来避免这类问题呢?

2.4.2:指数加权移动平均算法(EWMA)模拟

参考资料:深入解析TensorFlow中滑动平均模型与代码实现

公式如下:

img

解释:vt代表第t次请求的指数加权平均耗时,vt-1代表上次请求的指数加权平均耗时,θt代表第t次请求的实际耗时。

β值的定义如下:

1
vt ≈ 1/(1 - β) 次的平均耗时

例:假设β等于0.9,1/(1 - β) 就等于10,vt约等于前10次请求的平均耗时;假设把β值调大至0.98,1/(1-β)=50,就是当前请求的前50次请求的平均耗时。

由此可以推导出:

β值越大,移动平均区间越大,当前平均值的计算受到之前平均值的影响也就越大

β值越小,移动平均区间越小,当前平均值的计算受到之前平均值的影响也就越小

eg:假设现在有两次请求,第一次耗时25ms,第二次耗时50ms,代入公式,计算出第一次和第二次的指数加权平均值为:

1
2
double v1 = β * 0 + (1 - β) * 25
double v2 = β * v1 + (1 - β) * 50 // β值越小,意味着本次请求的实际耗时占比越大,β值越大,之前计算得到的平均值占比越大

这个结论在下面的实验中会有所体现。

相比普通平均值的计算,EWMA算法更在乎历史平均值对统计结果的影响,通过控制β的值就可以调整历史均值对统计趋势的影响程度。

现在用EWMA模拟下1000次请求,为了模拟真实情况,我会让一些请求耗时过高,β取值0.9,代表最新请求时的平均值计算会受到最近10次耗时的影响进行平滑过渡,运行结果绘制如下图(渲染采用Echarts):

图8

最终得到的数据曲线没有算术平均那么稳定,但可以看出,每次网络波动会提升其加权均值,不会像算术平均那样不受网络波动影响。

接下来假设第100~200次请求,发生了网络延迟,延时5倍,再次利用EWMA做下模拟:

图9

请将这张图跟图7进行对比,你会发现,利用指数加权平均算法计算出的平均值在网络恢复时,以极快的速度恢复到了正常水平。

相比算术平均,EWMA更重要的是它平滑的模拟了平均值的趋势。

现在把图9里的β值调整为0.98,此时在计算当前平均值时则受到前面(1/0.02) = 50个平均值的影响,便得到下图:

图10

因为β值变大,后续每个平均值都会受到更多历史均值的影响,而当前耗时仅占很少影响,跟之前的结论一样,β越大,移动窗口越大,当前平均数受到历史平均值的影响就越大,反之越小,所以相比图9,在出现网络抖动后,更缓慢的恢复为正常均值(波形跟算术平均接近)。

还可以将β调小试试,比如将其设为0.32,那么计算均值时仅受到前面(1/0.68) = 1.47个均值影响,此时平均值轨迹几乎和正常响应时间重叠:

图11

可以看到,当β很小时,受影响因子无限趋近于1,越趋近于1则越贴近原本值。

2.4.3:结论

实验组 对照组 目的 结论
图6 图8(β=0.9) 模拟网络正常情况下,两种算法对均值的统计区别 算术平均值非常稳定,对单次网络抖动完全无感知,EWMA均值则会随着响应时间动态变化,因此单次网络抖动后会稍微提升均值,之后便很快恢复
图7 图9(β=0.9) 模拟一段网络延迟,看两种算法的均值变化 算术平均值会缓慢提升,之后再次以极慢的速度下降,对网络延迟反映迟钝,网络延迟结束后仍然要花很长时间才能恢复到正常均值水平,EWMA均值则迅速提升,恢复后迅速下降至正常水平
图9(β=0.9) 图10(β=0.98) 调大EWMA的β值 β值越大,每次计算均值时受到之前均值影响越大,则平均曲线更加平滑,因此图10的曲线要比图9表现更加平滑,但付出的代价是对网络延迟反应也变的迟钝
图9(β=0.9) 图11(β=0.32) 调小EWMA的β值 β值越小,每次计算均值时受到之前均值影响越小,则平均曲线更加趋近于每次的实际耗时,因此图11的曲线要比图9表现的更加趋近于每次的实际耗时,顺理成章的,它对网络延迟的反映极迅速
表1

通过实验,可以看出EWMA的优势巨大,但β的取值需要仔细斟酌,若β太小,则无法很好的体现出平均值,若β太大,很好的体现了平均值,但对网络波动的反应相对迟钝,这里就考虑到一个折中的方案:

实时调整β值,比如EWMA可以在网络波动时适当降低β的值,使其快速感知到波动的存在,当网络波动结束后,适当提升β的值,这样就可以在网络稳定的情况下较好的反映一个区段内的均值情况,这样等于结合了图10图11各自的优点,实现后将达到一种效果:快速感知网络延迟并迅速提高其均值,当网络恢复后,慢慢降回正常水平(均值恢复需要慢慢进行,因为刚恢复的节点稳定性不可信,慢慢恢复到正常水平,以信任其稳定性)

2.4.4:利用衰减函数动态调整β值

通过上面的要求,我们需要完善这个变化的β,那么它该如何变化呢?如何能达到碰到网络波动时迅速感知,当波动过后慢慢恢复的效果呢?慢慢恢复需要多慢?可不可以通过调整某个阈值来控制恢复的速率?

带着上面的问题,需要了解一下:衰减函数(牛顿冷却定律

公式为:

img

e是常量,Δt表示第t次请求的耗时,k表示衰减系数,它的函数图如下:

图12

我们把k*Δt看做x的取值,那么kΔt成正比,即:kΔt取值越大,β就越小

现在来看看这个结论支不支持我们要实现的功能:

  1. 网络抖动时,假设Δt非常大,即便不乘k值,β值也会变得很小,这是符合我们预期的,我们需要的就是在网络抖动时,迅速感知
  2. 网络恢复时,Δt迅速降低,假设此时Δt非常小,则k值越大,图12里对应的x越大,β的值就越小,事实上通过实验可以得出,如果k值很大,得出的曲线近乎等于图11

经过上面的梳理,发现k值似乎没有起到衰减作用,反而因为它的存在导致β值降低,它的取值在网络抖动恢复后依旧在削弱β的值,导致网络恢复后迅速降低到正常水平,这是我们不愿意看到的,那么上面的函数需要做下变体,即让Δt和k值成反比即可:

img

此时结论如下:

  1. 网络抖动时,假设Δt非常大,即便k值起到中和作用,β值较之前也会明显变小,这是符合我们预期的,我们需要的就是在网络抖动时,迅速感知
  2. 网络恢复时,即使Δt迅速降低,那么由于k值的中和(Δt/k的值大小和k值成反比),k越大,β越大,则均值计算受之前波动期的均值影响越大,曲线恢复越缓慢。

这点可以通过下方的验证得到证实,调整衰减系数k,的确可以控制在遇到波动时恢复到正常水平时的速度,衰减系数设置越大,波幅越大(恢复越慢),反之越小(恢复越快)。

2.4.5:衰减系数验证

第一组:随机次数的网络抖动,衰减系数分别为60050

衰减系数为600时的走势图,可以看出,网络恢复后均值变化衰减速度很慢

衰减系数为50时的走势图,可以看出,网络恢复后均值变化衰减速度很快

第二组:第100~200次请求响应时间扩大5倍,衰减系数仍然是60050

衰减系数为600时的走势图,可以看出在抖动发生时,仍然可以迅速感知,后续恢复时的衰减速度跟上面结果一样慢

衰减系数为50时的走势图,可以看出在抖动发生时,可以非常迅速的感知,后续恢复时的衰减速度跟上面结果一样快

三、利用JAVA实现P2C算法

首先定义Node类:

代码块1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public class Node {

//惩罚值
private static final long penalty = 250_000_000_000L; //单位:纳秒(250s)
//衰减系数
private static final long tau = 600_000_000L; //单位:纳秒(600ms)

protected final String host;
protected final int weight; //权重

// client统计数据
protected final AtomicLong lag = new AtomicLong(); //加权移动平均算法计算出的请求延迟度
protected final AtomicLong success = new AtomicLong(1000); // 加权移动平均算法计算出的请求成功率(只记录grpc内部错误,比如context deadline)
protected final AtomicLong inflight = new AtomicLong(1); // 当前客户端正在发送并等待response的请求数(pending request)
protected final AtomicLong svrCPU = new AtomicLong(500); //对应服务端的CPU使用率

// 最近一次resp时间戳
protected final AtomicLong stamp = new AtomicLong();
// 最近被pick的时间戳,利用该值可以统计被选中后,一次请求的耗时
protected final AtomicLong pick = new AtomicLong();

public Node(String host, int weight) {
this.host = host;
this.weight = weight;
}

public boolean valid() {
return health() > 500 && svrCPU.get() < 900;
}

public long health() {
return success.get(); //成功率
}

public long load() {
long lag = (long) (Math.sqrt((double) this.lag.get()) + 1);
long load = this.svrCPU.get() * lag * this.inflight.get(); //根据cpu使用率、延迟率、拥塞度计算出负载率
if (load == 0) {
// penalty是初始化没有数据时的惩罚值,默认为1e9 * 250
load = penalty;
}
return load;
}

//被pick后,完成请求后触发逻辑
public void responseTrigger(long pickTime, long cpu, boolean error) {
this.inflight.decrementAndGet();
long now = System.nanoTime();
long stamp = this.stamp.getAndSet(now);
long td = now - stamp; //计算距离上次response的时间差,节点本身闲置越久,这个值越大
if (td < 0) {
td = 0;
}
//实时计算β值,利用衰减函数计算,公式为:β = e^(-t/k),相比前文给出的衰减公式这里是按照k值的反比计算的,即k值和β值成正比
double w = Math.exp((double) -td / (double) tau);

long lag = now - pickTime; //实际耗时
if (lag < 0) {
lag = 0;
}
long oldLag = this.lag.get();
if (oldLag == 0) {
w = 0;
}

//计算指数加权移动平均响应时间
lag = (int) ((double) oldLag * w + (double) lag * (1.0 - w));
this.lag.set(lag); //更新

int success = error ? 0 : 1000;
//计算指数加权移动平均成功率
success = (int) ((double) this.success.get() * w + (double) success * (1.0 - w));
this.success.set(success); //更新

//更新本次请求服务端返回的cpu使用率
if (cpu > 0) {
this.svrCPU.set(cpu);
}
}
}

再来定义LoadBalancer:

代码块2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
public class P2CLoadBalancer {

//闲置时间的最大容忍值
private static final long forceGap = 3000_000_000L; //单位:纳秒(3s)

private static final Random r = new Random();

private final List<Node> nodes; //保存了参与lb的节点集合

public P2CLoadBalancer(List<Node> nodes) {
this.nodes = nodes;
}

public Node pick(long start) { //外界给入start,值为当前时间,resp后应给recycle传同样的值
Node pc, upc;
if (nodes == null || nodes.size() <= 0) {
throw new IllegalArgumentException("no node!");
}
if (nodes.size() == 1) {
return nodes.get(0);
}

Node[] randomPair = prePick();

/**
* 这里根据各自当前指标,计算出谁更合适被pick
* 计算方式:
* nodeA.load nodeB.load
* ---------------------------- : ----------------------------
* nodeA.health * nodeA.weight nodeB.health * nodeB.weight
*
* health和weight都是提权用的,而load是降权用的,所以用load除以heal和weight的乘积,计算出的值越大,越不容易被pick
*/
if (randomPair[0].load() * randomPair[1].health() * randomPair[1].weight >
randomPair[1].load() * randomPair[0].health() * randomPair[0].weight) {
pc = randomPair[1];
upc = randomPair[0];
} else {
pc = randomPair[0];
upc = randomPair[1];
}

// 如果落选的节点,在forceGap期间内没有被选中一次,那么强制选中一次,利用强制的机会,来触发成功率、延迟的衰减
long pick = upc.pick.get();
if ((start - pick) > forceGap && upc.pick.compareAndSet(pick, start)) {
pc = upc; //强制选中
}

// 节点未发生切换才更新pick时间
if (pc != upc) {
pc.pick.set(start);
}
pc.inflight.incrementAndGet();

return pc;
}

//pick出去后,等来了response后,应触发该方法
public void recycle(Node node, long pickTime, long cpu, boolean error) {
node.responseTrigger(pickTime, cpu, error);
}

// 随机选择俩节点
public Node[] prePick() {
Node[] randomPair = new Node[2];
for (int i = 0; i < 3; i++) {
int a = r.nextInt(nodes.size());
int b = r.nextInt(nodes.size() - 1);
if (b >= a) {
b += 1; //防止随机出的节点相同
}
randomPair[0] = nodes.get(a);
randomPair[1] = nodes.get(b);
if (randomPair[0].valid() || randomPair[1].valid()) {
break;
}
}

return randomPair;
}
}

四、算法验证

算法的验证会以实际压测的方式来进行,请前往:P2C算法验证实验