一、负载均衡器是如何被grpc加载的
1.1:提供provider类
每个LoadBalancer
对象都是通过一个对象来实例化的,不同的LoadBalancerProvider
对象会实例化出不同的LoadBalancer
对象,而LoadBalancerProvider
对象会通过SPI机制
载入到grpc的客户端中,比如在grpc-core
里的MATE-INF
下:
由此可知,grpc原生共提供了两种LoadBalancerProvider,那看看它有关核心方法的实现:
首先是PickFirstLoadBalancerProvider
:
代码块11 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public final class PickFirstLoadBalancerProvider extends LoadBalancerProvider { private static final String NO_CONFIG = "no service config"; @Override public boolean isAvailable() { return true; } @Override public int getPriority() { return 5; } @Override public String getPolicyName() { return "pick_first"; } @Override public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) { return new PickFirstLoadBalancer(helper); } @Override public ConfigOrError parseLoadBalancingPolicyConfig(Map<String, ?> rawLoadBalancingPolicyConfig) { return ConfigOrError.fromConfig(NO_CONFIG); } }
|
其次是SecretRoundRobinLoadBalancerProvider.Provider
:
代码块21 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| final class SecretRoundRobinLoadBalancerProvider { private SecretRoundRobinLoadBalancerProvider() { } public static final class Provider extends LoadBalancerProvider { private static final String NO_CONFIG = "no service config"; @Override public boolean isAvailable() { return true; } @Override public int getPriority() { return 5; } @Override public String getPolicyName() { return "round_robin"; } @Override public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) { return new RoundRobinLoadBalancer(helper); } @Override public ConfigOrError parseLoadBalancingPolicyConfig( Map<String, ?> rawLoadBalancingPolicyConfig) { return ConfigOrError.fromConfig(NO_CONFIG); } } }
|
有了不同的LoadBalancerProvider类去实例化不同的LB算法,就可以扩展并指定不同的LB算法了,我们也可以效仿这种做法,来搞一套我们自己的LB算法做替换。
1.2:提供LoadBalancer的实现
上面是LB算法的实例提供方,那必定存在LB算法的实现方,上面两个Provider分别提供了一个对应的LB算法实现类,即:RoundRobinLoadBalancer
& PickFirstLoadBalancer
因为我们使用的是轮询,因此只说明下RoundRobinLoadBalancer即可。
因为源代码过于复杂,这里仅展示出关键步骤(感兴趣可以直接查看io.grpc.util.RoundRobinLoadBalancer
,嫌看代码麻烦,可以调到下方图2
,快速浏览其过程):
代码块31 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
| public class RoundRobinLoadBalancer extends LoadBalancer { @VisibleForTesting static final Attributes.Key<Ref<ConnectivityStateInfo>> STATE_INFO = Attributes.Key.create("state-info"); private static final Status EMPTY_OK = Status.OK.withDescription("no subchannels ready"); private final Helper helper; private final Map<EquivalentAddressGroup, Subchannel> subchannels = new HashMap<>(); private ConnectivityState currentState; private WeightRoundRobinPicker currentPicker = new EmptyPicker(EMPTY_OK); public WeightRoundRobinLoadBalancer(Helper helper) { this.helper = checkNotNull(helper, "helper"); }
@Override public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses(); Set<EquivalentAddressGroup> currentAddrs = subchannels.keySet(); Map<EquivalentAddressGroup, EquivalentAddressGroup> latestAddrs = stripAttrs(servers); Set<EquivalentAddressGroup> removedAddrs = setsDifference(currentAddrs, latestAddrs.keySet()); for (Map.Entry<EquivalentAddressGroup, EquivalentAddressGroup> latestEntry : latestAddrs.entrySet()) { EquivalentAddressGroup strippedAddressGroup = latestEntry.getKey(); EquivalentAddressGroup originalAddressGroup = latestEntry.getValue(); Subchannel existingSubchannel = subchannels.get(strippedAddressGroup); if (existingSubchannel != null) { existingSubchannel.updateAddresses(Collections.singletonList(originalAddressGroup)); continue; } Attributes.Builder subchannelAttrs = Attributes.newBuilder().set(STATE_INFO, new Ref<>(ConnectivityStateInfo.forNonError(IDLE))); final Subchannel subchannel = checkNotNull( helper.createSubchannel(CreateSubchannelArgs.newBuilder() .setAddresses(originalAddressGroup) .setAttributes(subchannelAttrs.build()) .build()), "subchannel"); subchannel.start(new SubchannelStateListener() { @Override public void onSubchannelState(ConnectivityStateInfo state) { processSubchannelState(subchannel, state); } }); subchannels.put(strippedAddressGroup, subchannel); subchannel.requestConnection(); } ArrayList<Subchannel> removedSubchannels = new ArrayList<>(); for (EquivalentAddressGroup addressGroup : removedAddrs) { removedSubchannels.add(subchannels.remove(addressGroup)); } updateBalancingState(); for (Subchannel removedSubchannel : removedSubchannels) { shutdownSubchannel(removedSubchannel); } } @Override public void handleNameResolutionError(Status error) { updateBalancingState(TRANSIENT_FAILURE, currentPicker instanceof ReadyPicker ? currentPicker : new EmptyPicker(error)); } @Override public void shutdown() { for (Subchannel subchannel : getSubchannels()) { shutdownSubchannel(subchannel); } } private static Map<EquivalentAddressGroup, EquivalentAddressGroup> stripAttrs(List<EquivalentAddressGroup> groupList) { Map<EquivalentAddressGroup, EquivalentAddressGroup> addrs = new HashMap<>(groupList.size() * 2); for (EquivalentAddressGroup group : groupList) { addrs.put(stripAttrs(group), group); } return addrs; } private static EquivalentAddressGroup stripAttrs(EquivalentAddressGroup eag) { return new EquivalentAddressGroup(eag.getAddresses()); } private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) { if (subchannels.get(stripAttrs(subchannel.getAddresses())) != subchannel) { return; } if (stateInfo.getState() == IDLE) { subchannel.requestConnection(); } Ref<ConnectivityStateInfo> subchannelStateRef = getSubchannelStateInfoRef(subchannel); if (subchannelStateRef.value.getState().equals(TRANSIENT_FAILURE)) { if (stateInfo.getState().equals(CONNECTING) || stateInfo.getState().equals(IDLE)) { return; } } subchannelStateRef.value = stateInfo; updateBalancingState(); } private void updateBalancingState() { List<Subchannel> activeList = filterNonFailingSubchannels(getSubchannels()); if (activeList.isEmpty()) { boolean isConnecting = false; Status aggStatus = EMPTY_OK; for (Subchannel subchannel : getSubchannels()) { ConnectivityStateInfo stateInfo = getSubchannelStateInfoRef(subchannel).value; if (stateInfo.getState() == CONNECTING || stateInfo.getState() == IDLE) { isConnecting = true; } if (aggStatus == EMPTY_OK || !aggStatus.isOk()) { aggStatus = stateInfo.getStatus(); } } updateBalancingState(isConnecting ? CONNECTING : TRANSIENT_FAILURE, new EmptyPicker(aggStatus)); } else { updateBalancingState(READY, new ReadyPicker(activeList)); } } private void updateBalancingState(ConnectivityState state, WeightRoundRobinPicker picker) { if (state != currentState || !picker.isEquivalentTo(currentPicker)) { helper.updateBalancingState(state, picker); currentState = state; currentPicker = picker; } } }
|
流程简图(不同方法已用不同颜色标出):
1.3:提供LoadBalancer.SubchannelPicker实现
上面说了那么多,都只是在为真正的LB做准备,实际的LB算法保存在Picker类里,我们来看下上文中出现的ReadyPicker
的主要方法实现:
代码块41 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| static final class ReadyPicker extends RoundRobinPicker { private static final AtomicIntegerFieldUpdater<ReadyPicker> indexUpdater = AtomicIntegerFieldUpdater.newUpdater(ReadyPicker.class, "index"); private final List<Subchannel> list; @SuppressWarnings("unused") private volatile int index; ReadyPicker(List<Subchannel> list, int startIndex) { Preconditions.checkArgument(!list.isEmpty(), "empty list"); this.list = list; this.index = startIndex - 1; } @Override public PickResult pickSubchannel(PickSubchannelArgs args) { return PickResult.withSubchannel(nextSubchannel()); } private Subchannel nextSubchannel() { int size = list.size(); int i = indexUpdater.incrementAndGet(this); if (i >= size) { int oldi = i; i %= size; indexUpdater.compareAndSet(this, oldi, i); } return list.get(i); } @Override boolean isEquivalentTo(RoundRobinPicker picker) { if (!(picker instanceof ReadyPicker)) { return false; } ReadyPicker other = (ReadyPicker) picker; return other == this || (list.size() == other.list.size() && new HashSet<>(list).containsAll(other.list)); } }
|
上面就是LB的核心算法,重点是pickSubchannel方法,它是LB算法的触发类。
1.4:grpc client指定LB算法
正常一个client channel的创建方式如下:
代码块51 2 3 4 5 6 7 8 9 10
| public static void main(String[] args) { ChannelBuilder builder = ChannelBuilder.forTarget("这里填服务的discovery_id") .nameResolverFactory(new RPCNamingClientNameResolverFactory(zone, resolver, cluster)) .disableRetry() .defaultLoadBalancingPolicy("round_robin"); ManagedChannel channel = builder.build(); }
|
可以看到grpc client通过defaultLoadBalancingPolicy
方法利用LB的名称指定了一个默认负载均衡器。
二、在grpc里定制自己的LB算法
经过对第一部分的理解,想要在grpc里定制自己的LB算法就变得简单多了,只需要以下几步:
- 定义一个继承了
LoadBalancer.SubchannelPicker
类的XXXPicker,然后通过实现pickSubchannel方法
实现自己的LB逻辑
- 定义一个继承了
LoadBalancer
的XXXLoadBalancer,用来管理连接以及提供对应的Picker对象
- 定义一个继承了
LoadBalancerProvider
的Provider,然后将其按照SPI规范
放到自己项目的META-INF
下,通过newLoadBalancer方法
提供对应的LoadBalancer对象