定制grpc负载均衡器

一、负载均衡器是如何被grpc加载的

1.1:提供provider类

每个LoadBalancer对象都是通过一个对象来实例化的,不同的LoadBalancerProvider对象会实例化出不同的LoadBalancer对象,而LoadBalancerProvider对象会通过SPI机制载入到grpc的客户端中,比如在grpc-core里的MATE-INF下:

图1

由此可知,grpc原生共提供了两种LoadBalancerProvider,那看看它有关核心方法的实现:

首先是PickFirstLoadBalancerProvider

代码块1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public final class PickFirstLoadBalancerProvider extends LoadBalancerProvider {
private static final String NO_CONFIG = "no service config";

@Override
public boolean isAvailable() { //是否有效(若置为false,即便你在grpc client指定了该lb算法,那么它也不会生效)
return true;
}

@Override
public int getPriority() { //优先级
return 5;
}

@Override
public String getPolicyName() { //lb算法的名称
return "pick_first";
}

@Override
public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) { //提供对应的LoadBalancer对象,可以看到,这个实现类为PickFirstLoadBalancer
return new PickFirstLoadBalancer(helper);
}

@Override
public ConfigOrError parseLoadBalancingPolicyConfig(Map<String, ?> rawLoadBalancingPolicyConfig) {
return ConfigOrError.fromConfig(NO_CONFIG);
}
}

其次是SecretRoundRobinLoadBalancerProvider.Provider

代码块2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
final class SecretRoundRobinLoadBalancerProvider {
private SecretRoundRobinLoadBalancerProvider() {
}

public static final class Provider extends LoadBalancerProvider {

private static final String NO_CONFIG = "no service config";


@Override
public boolean isAvailable() {
return true;
}

@Override
public int getPriority() {
return 5;
}

@Override
public String getPolicyName() { //这里则是返回round robin的lb算法名称
return "round_robin";
}

@Override
public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) { //提供对应的LoadBalancer对象,可以看到,这个实现类为RoundRobinLoadBalancer
return new RoundRobinLoadBalancer(helper);
}

@Override
public ConfigOrError parseLoadBalancingPolicyConfig(
Map<String, ?> rawLoadBalancingPolicyConfig) {
return ConfigOrError.fromConfig(NO_CONFIG);
}
}
}

有了不同的LoadBalancerProvider类去实例化不同的LB算法,就可以扩展并指定不同的LB算法了,我们也可以效仿这种做法,来搞一套我们自己的LB算法做替换。

1.2:提供LoadBalancer的实现

上面是LB算法的实例提供方,那必定存在LB算法的实现方,上面两个Provider分别提供了一个对应的LB算法实现类,即:RoundRobinLoadBalancer & PickFirstLoadBalancer

因为我们使用的是轮询,因此只说明下RoundRobinLoadBalancer即可。

因为源代码过于复杂,这里仅展示出关键步骤(感兴趣可以直接查看io.grpc.util.RoundRobinLoadBalancer,嫌看代码麻烦,可以调到下方图2,快速浏览其过程):

代码块3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
public class RoundRobinLoadBalancer extends LoadBalancer {

@VisibleForTesting
static final Attributes.Key<Ref<ConnectivityStateInfo>> STATE_INFO = Attributes.Key.create("state-info");

private static final Status EMPTY_OK = Status.OK.withDescription("no subchannels ready");

private final Helper helper; //负责创建channel、将picker传出去
private final Map<EquivalentAddressGroup, Subchannel> subchannels = new HashMap<>(); //保存channel用的

private ConnectivityState currentState; //最近一次建连的连接状态
private WeightRoundRobinPicker currentPicker = new EmptyPicker(EMPTY_OK); //最近一次建连后的picker对象,正常情况下会被刷成ReadyPicker

public WeightRoundRobinLoadBalancer(Helper helper) {
this.helper = checkNotNull(helper, "helper");
}

/**
* 刷新节点,每当服务发现更新了节点,都会通知到该方法,用来更新现有参与LB的节点信息
*
* @param resolvedAddresses 新节点集合
*/
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
Set<EquivalentAddressGroup> currentAddrs = subchannels.keySet(); //当前节点集合
Map<EquivalentAddressGroup, EquivalentAddressGroup> latestAddrs = stripAttrs(servers); //最新节点集合
//因为是更新address,所以这里需要过滤出来旧版需要被移除的部分
Set<EquivalentAddressGroup> removedAddrs = setsDifference(currentAddrs, latestAddrs.keySet());

//遍历新节点
for (Map.Entry<EquivalentAddressGroup, EquivalentAddressGroup> latestEntry : latestAddrs.entrySet()) {
EquivalentAddressGroup strippedAddressGroup = latestEntry.getKey(); //新节点的节点信息(参考stripAttrs方法)
EquivalentAddressGroup originalAddressGroup = latestEntry.getValue(); //新节点的完整信息
Subchannel existingSubchannel = subchannels.get(strippedAddressGroup); //尝试在老节点查找新入的节点
if (existingSubchannel != null) { //若老节点原本就存在该节点,则触发下方逻辑
// 新拉取到的节点,它的Attributes可能被改变(附带信息,比如服务发现上节点的权重值前后发生变化),这里需要刷新
existingSubchannel.updateAddresses(Collections.singletonList(originalAddressGroup));
continue;
}
//若老节点不存在,则说明需要new一个新的subchannel
Attributes.Builder subchannelAttrs = Attributes.newBuilder().set(STATE_INFO,
new Ref<>(ConnectivityStateInfo.forNonError(IDLE))); //设置节点初始状态,通过Ref可以修改状态引用

final Subchannel subchannel = checkNotNull(
helper.createSubchannel(CreateSubchannelArgs.newBuilder()
.setAddresses(originalAddressGroup)
.setAttributes(subchannelAttrs.build())
.build()),
"subchannel");
subchannel.start(new SubchannelStateListener() { //异步建连,收到建连成功的通知后,触发onSubchannelState
@Override
public void onSubchannelState(ConnectivityStateInfo state) {
processSubchannelState(subchannel, state); //建连完成,更新连接状态为可用(若建连没问题,则state应为READY,即就绪状态)
}
});
subchannels.put(strippedAddressGroup, subchannel); //将连接放进集合里
subchannel.requestConnection(); //触发建连的动作(建连动作处理完毕后,会通知上方onSubchannelState)
}

ArrayList<Subchannel> removedSubchannels = new ArrayList<>();
for (EquivalentAddressGroup addressGroup : removedAddrs) {
//去除掉需要摘除的节点
removedSubchannels.add(subchannels.remove(addressGroup));
}

// 更新loadbalance状态
updateBalancingState();

for (Subchannel removedSubchannel : removedSubchannels) {
//将需要摘除掉的节点逐个shutdown
shutdownSubchannel(removedSubchannel);
}
}


@Override
public void handleNameResolutionError(Status error) {
updateBalancingState(TRANSIENT_FAILURE, currentPicker instanceof ReadyPicker ? currentPicker : new EmptyPicker(error));
}

@Override
public void shutdown() {
for (Subchannel subchannel : getSubchannels()) {
shutdownSubchannel(subchannel); //逐个关闭所持有的连接
}
}

//这里传入的是服务发现推给的节点集合
private static Map<EquivalentAddressGroup, EquivalentAddressGroup> stripAttrs(List<EquivalentAddressGroup> groupList) {
Map<EquivalentAddressGroup, EquivalentAddressGroup> addrs = new HashMap<>(groupList.size() * 2); //为什么要扩2倍?
for (EquivalentAddressGroup group : groupList) {
//一个map,k是包含了相同address的一个全新EquivalentAddressGroup对象,v是原始的EquivalentAddressGroup对象
addrs.put(stripAttrs(group), group);
}
return addrs;
}

private static EquivalentAddressGroup stripAttrs(EquivalentAddressGroup eag) {
return new EquivalentAddressGroup(eag.getAddresses());
}

//用来更新连接状态
private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
if (subchannels.get(stripAttrs(subchannel.getAddresses())) != subchannel) {
return;
}
if (stateInfo.getState() == IDLE) {
subchannel.requestConnection(); //如果是闲置状态,则触发建连操作
}
//拿到当前channel的state
Ref<ConnectivityStateInfo> subchannelStateRef = getSubchannelStateInfoRef(subchannel);
if (subchannelStateRef.value.getState().equals(TRANSIENT_FAILURE)) {
if (stateInfo.getState().equals(CONNECTING) || stateInfo.getState().equals(IDLE)) {
return; //若连接处于故障状态,则不允许改成"连接中"和"闲置"状态
}
}
subchannelStateRef.value = stateInfo; //修改状态
updateBalancingState(); //更新loadbalance状态
}

//每次建连事件被异步触发后,都会触发一次该方法,用来刷新picker
private void updateBalancingState() {
List<Subchannel> activeList = filterNonFailingSubchannels(getSubchannels()); //将当前就绪状态的channel过滤出来
if (activeList.isEmpty()) { //建连触发才会走到这里,因此只要建连没出问题,一般情况下都不为空,除非这批节点没一个可用的
boolean isConnecting = false;
Status aggStatus = EMPTY_OK;
for (Subchannel subchannel : getSubchannels()) {
ConnectivityStateInfo stateInfo = getSubchannelStateInfoRef(subchannel).value;
if (stateInfo.getState() == CONNECTING || stateInfo.getState() == IDLE) {
isConnecting = true;
}
if (aggStatus == EMPTY_OK || !aggStatus.isOk()) {
aggStatus = stateInfo.getStatus();
}
}
updateBalancingState(isConnecting ? CONNECTING : TRANSIENT_FAILURE, new EmptyPicker(aggStatus));
} else {
updateBalancingState(READY, new ReadyPicker(activeList)); //可以看到,最后会将建连成功的节点丢到ReadyPicker里,用来做lb
}
}

//更新picker的值
private void updateBalancingState(ConnectivityState state, WeightRoundRobinPicker picker) {
if (state != currentState || !picker.isEquivalentTo(currentPicker)) {
helper.updateBalancingState(state, picker); //非常重要的一步,负责将当前的picker送出去,给ManagedChannelImpl使用,client请求时会触发picker的pickSubchannel方法
currentState = state; //刷新连接状态
currentPicker = picker; //刷新picker对象
}
}
}

流程简图(不同方法已用不同颜色标出):

图2

1.3:提供LoadBalancer.SubchannelPicker实现

上面说了那么多,都只是在为真正的LB做准备,实际的LB算法保存在Picker类里,我们来看下上文中出现的ReadyPicker的主要方法实现:

代码块4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
static final class ReadyPicker extends RoundRobinPicker { //RoundRobinPicker继承了LoadBalancer.SubchannelPicker
private static final AtomicIntegerFieldUpdater<ReadyPicker> indexUpdater =
AtomicIntegerFieldUpdater.newUpdater(ReadyPicker.class, "index");

private final List<Subchannel> list; // 这里保存的就是当前已就绪的channel(结合图2里ReadyPicker的初始化理解)
@SuppressWarnings("unused")
private volatile int index;

ReadyPicker(List<Subchannel> list, int startIndex) {
Preconditions.checkArgument(!list.isEmpty(), "empty list");
this.list = list;
this.index = startIndex - 1; //轮询算法开始的位置
}

@Override
public PickResult pickSubchannel(PickSubchannelArgs args) { //实现了pickSubchannel方法,该方法就是对外pick节点的核心方法
return PickResult.withSubchannel(nextSubchannel()); //nextSubchannel里放的就是轮询算法的核心代码了
}

private Subchannel nextSubchannel() { //轮询算法
int size = list.size();
int i = indexUpdater.incrementAndGet(this);
if (i >= size) {
int oldi = i;
i %= size;
indexUpdater.compareAndSet(this, oldi, i);
}
return list.get(i);
}

@Override
boolean isEquivalentTo(RoundRobinPicker picker) {
if (!(picker instanceof ReadyPicker)) {
return false;
}
ReadyPicker other = (ReadyPicker) picker;
return other == this
|| (list.size() == other.list.size() && new HashSet<>(list).containsAll(other.list));
}
}

上面就是LB的核心算法,重点是pickSubchannel方法,它是LB算法的触发类。

1.4:grpc client指定LB算法

正常一个client channel的创建方式如下:

代码块5
1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
ChannelBuilder builder = ChannelBuilder.forTarget("这里填服务的discovery_id")
//这个方法用来设置一个继承了io.grpc.NameResolverProvider的服务发现,可以定制
.nameResolverFactory(new RPCNamingClientNameResolverFactory(zone, resolver, cluster))
.disableRetry() //禁止重试
//这里就是用来启用对应的LB模块了,还记得xxxProvider里的getPolicyName方法吗?这里跟那里面返回的名称匹配,匹配后即可启用对应的LB服务
.defaultLoadBalancingPolicy("round_robin");

ManagedChannel channel = builder.build(); //这样一个grpc的channel client就被创建出来了
}

可以看到grpc client通过defaultLoadBalancingPolicy方法利用LB的名称指定了一个默认负载均衡器。

二、在grpc里定制自己的LB算法

经过对第一部分的理解,想要在grpc里定制自己的LB算法就变得简单多了,只需要以下几步:

  1. 定义一个继承了LoadBalancer.SubchannelPicker类的XXXPicker,然后通过实现pickSubchannel方法实现自己的LB逻辑
  2. 定义一个继承了LoadBalancer的XXXLoadBalancer,用来管理连接以及提供对应的Picker对象
  3. 定义一个继承了LoadBalancerProvider的Provider,然后将其按照SPI规范放到自己项目的META-INF下,通过newLoadBalancer方法提供对应的LoadBalancer对象