基于依赖程序的版本信息:mysql-connector-java v8.0.17
一、认识loadbalance模式 首先回忆下jdbc协议头都有哪些,下面总结下:
通过表1
,可以知道在loadblance模式下允许配置多个mysql节点信息,而我们每次建连时,驱动程序就会按照配置的节点,选中一个,然后完成连接的创建,下面我们来探索下它的实现。
二、基本使用 它的基本用法跟其他模式没有区别:
代码块1 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public static void main (String[] args) throws Exception { String url = "jdbc:mysql:loadbalance://127.0.0.1:3306,127.0.0.2:3306,127.0.0.3:3306/mydb" ; LoadBalancedConnection connection = (LoadBalancedConnection) DriverManager.getConnection(url, "root" , "123456" ); Statement statement = connection.createStatement(); ResultSet rs = null ; try { if (statement.execute("select * from t_season" )) { rs = statement.getResultSet(); } } finally { if (rs != null ) { rs.close(); } statement.close(); connection.close(); } }
三、驱动加载流程 jdbc是如何知道我们启用了loadbalance模式的?先来了解下DriverManager
的getConnection
方法,注意这里的DriverManager在java.sql包内,它属于jdk自带的类,目的是扫描所有实现了java.sql.Driver的类,而我们所使用的mysql-connector-java
程序就实现了Driver接口,所以很容易被DriverManager载入,下面来看它是如何完成驱动程序扫描与加载的:
代码块2 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 @CallerSensitive public static Connection getConnection (String url, String user, String password) throws SQLException { java.util.Properties info = new java .util.Properties(); if (user != null ) { info.put("user" , user); } if (password != null ) { info.put("password" , password); } return (getConnection(url, info, Reflection.getCallerClass())); } private static Connection getConnection ( String url, java.util.Properties info, Class<?> caller) throws SQLException { ClassLoader callerCL = caller != null ? caller.getClassLoader() : null ; if (callerCL == null ) { callerCL = Thread.currentThread().getContextClassLoader(); } if (url == null ) { throw new SQLException ("The url cannot be null" , "08001" ); } println("DriverManager.getConnection(\"" + url + "\")" ); ensureDriversInitialized(); SQLException reason = null ; for (DriverInfo aDriver : registeredDrivers) { if (isDriverAllowed(aDriver.driver, callerCL)) { try { println(" trying " + aDriver.driver.getClass().getName()); Connection con = aDriver.driver.connect(url, info); if (con != null ) { println("getConnection returning " + aDriver.driver.getClass().getName()); return (con); } } catch (SQLException ex) { if (reason == null ) { reason = ex; } } } else { println(" skipping: " + aDriver.getClass().getName()); } } if (reason != null ) { println("getConnection failed: " + reason); throw reason; } println("getConnection: no suitable driver found for " + url); throw new SQLException ("No suitable driver found for " + url, "08001" ); } private static boolean isDriverAllowed (Driver driver, Class<?> caller) { ClassLoader callerCL = caller != null ? caller.getClassLoader() : null ; return isDriverAllowed(driver, callerCL); } private static boolean isDriverAllowed (Driver driver, ClassLoader classLoader) { boolean result = false ; if (driver != null ) { Class<?> aClass = null ; try { aClass = Class.forName(driver.getClass().getName(), true , classLoader); } catch (Exception ex) { result = false ; } result = ( aClass == driver.getClass() ) ? true : false ; } return result; }
驱动程序的SPI
支持:
通过上述代码,可以确认最终是通过驱动程序Driver实现类
的connect方法
产生的Connection对象
,下面来看下驱动程序Driver里的实现:
代码块3 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 @Override public java.sql.Connection connect (String url, Properties info) throws SQLException { try { if (!ConnectionUrl.acceptsUrl(url)) { return null ; } ConnectionUrl conStr = ConnectionUrl.getConnectionUrlInstance(url, info); switch (conStr.getType()) { case SINGLE_CONNECTION: return com.mysql.cj.jdbc.ConnectionImpl.getInstance(conStr.getMainHost()); case LOADBALANCE_CONNECTION: return LoadBalancedConnectionProxy.createProxyInstance((LoadbalanceConnectionUrl) conStr); case FAILOVER_CONNECTION: return FailoverConnectionProxy.createProxyInstance(conStr); case REPLICATION_CONNECTION: return ReplicationConnectionProxy.createProxyInstance((ReplicationConnectionUrl) conStr); default : return null ; } } catch (UnsupportedConnectionStringException e) { return null ; } catch (CJException ex) { throw ExceptionFactory.createException(UnableToConnectException.class, Messages.getString("NonRegisteringDriver.17" , new Object [] { ex.toString() }), ex); } }
到这里,可以看到驱动程序之所以会知道我们启用了loadbalance模式,是因为我们所配置的jdbc连接协议头
,根据协议头的不同,会被路由进不同的Connection实现,然后最终将Connection对象返回给用户。
四、驱动程序对LoadBalance的支持 下面重点看下命中LOADBALANCE_CONNECTION
条件的LoadBalancedConnectionProxy.createProxyInstance
的内部逻辑:
代码块4 1 2 3 4 5 6 public static LoadBalancedConnection createProxyInstance (LoadbalanceConnectionUrl connectionUrl) throws SQLException { LoadBalancedConnectionProxy connProxy = new LoadBalancedConnectionProxy (connectionUrl); return (LoadBalancedConnection) java.lang.reflect.Proxy.newProxyInstance(LoadBalancedConnection.class.getClassLoader(), INTERFACES_TO_PROXY, connProxy); }
4.1:LoadBalance模式相关的类关系图 到这里为止,我们已经进入了loadblance模式内部,这里返回的是一个被代理了的LoadBalancedConnection
对象,下面来梳理下它们的继承和代理关系(之后重点分析的字段和方法字体均已标红
):
理清关系后,来看下最主要的几个属性和方法的实现。
代码块4
里直接new出了LoadBalancedConnectionProxy类,并且代理的目标类为LoadBalancedConnection,通过上图,可以知道就是最终返回给用户的那个Connection对象,意味着用户拿着这个Connection做任何操作都会触发LoadBalancedConnectionProxy的invokeMore方法(通过图中展示,其父类实现了InvocationHandler接口
,其invoke
会触发invokeMore
方法,而invokeMore方法的实现在LoadBalancedConnectionProxy里
)
4.2:LoadBalancedConnectionProxy.balancer属性 这是个属性,它包装了一个Balancer
对象,内部有自己的LB算法
,它的初始化如下:
代码块4 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 String strategy = props.getProperty(PropertyKey.ha_loadBalanceStrategy.getKeyName(), "random" ); try { switch (strategy) { case "random" : this .balancer = new RandomBalanceStrategy (); break ; case "bestResponseTime" : this .balancer = new BestResponseTimeBalanceStrategy (); break ; case "serverAffinity" : this .balancer = new ServerAffinityStrategy (props.getProperty(PropertyKey.serverAffinityOrder.getKeyName(), null )); break ; default : this .balancer = (BalanceStrategy) Class.forName(strategy).newInstance(); } } catch (Throwable t) { throw SQLError.createSQLException(Messages.getString("InvalidLoadBalanceStrategy" , new Object [] { strategy }), MysqlErrorNumbers.SQL_STATE_ILLEGAL_ARGUMENT, t, null ); }
我们只关注Random
即可,它的内部实现就是简单的从LoadBalancedConnectionProxy.liveConnections
里随机选一个节点,然后返回出去,为了更加清晰,不再贴代码,大致流程如下(绿色框逻辑都属于RandomBalancer
本身的逻辑,除此之外,图中标注了hostList
属性的数据来源):
它的触发点就是在LoadBalancedConnectionProxy.pickNewConnection
方法(参考下方4.3
),即发生在选取节点时。
4.3:LoadBalancedConnectionProxy.pickNewConnection方法 这个方法是非常核心的功能,每次变换节点时都会触发的一个方法,下面来看下其内部逻辑:
代码块5 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public synchronized void pickNewConnection () throws SQLException { if (this .isClosed && this .closedExplicitly) { return ; } List<String> hostPortList = Collections.unmodifiableList(this .hostsList.stream().map(hi -> hi.getHostPortPair()).collect(Collectors.toList())); if (this .currentConnection == null ) { this .currentConnection = this .balancer.pickConnection(this , hostPortList, Collections.unmodifiableMap(this .liveConnections), this .responseTimes.clone(), this .retriesAllDown); return ; } if (this .currentConnection.isClosed()) { invalidateCurrentConnection(); } int pingTimeout = this .currentConnection.getPropertySet().getIntegerProperty(PropertyKey.loadBalancePingTimeout).getValue(); boolean pingBeforeReturn = this .currentConnection.getPropertySet().getBooleanProperty(PropertyKey.loadBalanceValidateConnectionOnSwapServer).getValue(); for (int hostsTried = 0 , hostsToTry = this .hostsList.size(); hostsTried < hostsToTry; hostsTried++) { ConnectionImpl newConn = null ; try { newConn = (ConnectionImpl) this .balancer.pickConnection(this , hostPortList, Collections.unmodifiableMap(this .liveConnections), this .responseTimes.clone(), this .retriesAllDown); if (this .currentConnection != null ) { if (pingBeforeReturn) { newConn.pingInternal(true , pingTimeout); } syncSessionState(this .currentConnection, newConn); } this .currentConnection = newConn; return ; } catch (SQLException e) { if (shouldExceptionTriggerConnectionSwitch(e) && newConn != null ) { invalidateConnection(newConn); } } } this .isClosed = true ; this .closedReason = "Connection closed after inability to pick valid new connection during load-balance." ; }
4.4:MultiHostConnectionProxy.invoke方法 这是MultiHostConnectionProxy
对InvocationHandler
接口的实现,通过图2
可以知道,它的子类LoadBalancedConnectionProxy
是返回给用户的LoadBalancedConnection
的代理类,意味着用户利用Connection做的每一步操作,都会命中这个invoke
方法的调用,下面来看下这个方法的实现:
代码块6 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 public synchronized Object invoke (Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); if (METHOD_GET_MULTI_HOST_SAFE_PROXY.equals(methodName)) { return this .thisAsConnection; } if (METHOD_EQUALS.equals(methodName)) { return args[0 ].equals(this ); } if (METHOD_HASH_CODE.equals(methodName)) { return this .hashCode(); } if (METHOD_CLOSE.equals(methodName)) { doClose(); this .isClosed = true ; this .closedReason = "Connection explicitly closed." ; this .closedExplicitly = true ; return null ; } if (METHOD_ABORT_INTERNAL.equals(methodName)) { doAbortInternal(); this .currentConnection.abortInternal(); this .isClosed = true ; this .closedReason = "Connection explicitly closed." ; return null ; } if (METHOD_ABORT.equals(methodName) && args.length == 1 ) { doAbort((Executor) args[0 ]); this .isClosed = true ; this .closedReason = "Connection explicitly closed." ; return null ; } if (METHOD_IS_CLOSED.equals(methodName)) { return this .isClosed; } try { return invokeMore(proxy, method, args); } catch (InvocationTargetException e) { throw e.getCause() != null ? e.getCause() : e; } catch (Exception e) { Class<?>[] declaredException = method.getExceptionTypes(); for (Class<?> declEx : declaredException) { if (declEx.isAssignableFrom(e.getClass())) { throw e; } } throw new IllegalStateException (e.getMessage(), e); } }
4.5:LoadBalancedConnectionProxy.invokeMore方法 紧接着上面的代码来看,了解下invokMore
方法(根据图2
可知,此方法为抽象方法,由子类实现,所以它的实现逻辑
在LoadBalancedConnectionProxy
里)
代码块7 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 @Override public synchronized Object invokeMore (Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); if (this .isClosed && !allowedOnClosedConnection(method) && method.getExceptionTypes().length > 0 ) { if (this .autoReconnect && !this .closedExplicitly) { this .currentConnection = null ; pickNewConnection(); this .isClosed = false ; this .closedReason = null ; } else { String reason = "No operations allowed after connection closed." ; if (this .closedReason != null ) { reason += " " + this .closedReason; } for (Class<?> excls : method.getExceptionTypes()) { if (SQLException.class.isAssignableFrom(excls)) { throw SQLError.createSQLException(reason, MysqlErrorNumbers.SQL_STATE_CONNECTION_NOT_OPEN, null ); } } throw ExceptionFactory.createException(CJCommunicationsException.class, reason); } } if (!this .inTransaction) { this .inTransaction = true ; this .transactionStartTime = System.nanoTime(); this .transactionCount++; } Object result = null ; try { result = method.invoke(this .thisAsConnection, args); if (result != null ) { if (result instanceof com.mysql.cj.jdbc.JdbcStatement) { ((com.mysql.cj.jdbc.JdbcStatement) result).setPingTarget(this ); } result = proxyIfReturnTypeIsJdbcInterface(method.getReturnType(), result); } } catch (InvocationTargetException e) { dealWithInvocationException(e); } finally { if ("commit" .equals(methodName) || "rollback" .equals(methodName)) { this .inTransaction = false ; String host = this .connectionsToHostsMap.get(this .currentConnection); if (host != null ) { synchronized (this .responseTimes) { Integer hostIndex = (this .hostsToListIndexMap.get(host)); if (hostIndex != null && hostIndex < this .responseTimes.length) { this .responseTimes[hostIndex] = System.nanoTime() - this .transactionStartTime; } } } pickNewConnection(); } } return result; }
通过这块代码可以知道,一个LoadBalance模式下的连接被创建出来后,除非是commit
或rollback
事务,否则该Connection对象里的currentConnection
永远都不会变
,当然,通过上述代码看,还有一种情况是会变的,那就是当前连接坏掉,然后ping
检查失败isClose
被标记为true
,你的配置里恰好又开启
了autoReconnect
,这时才会重新pick
新的节点。
五、猜想 通过对其代码实现的分析,可以得出如下猜想:
不配autoReconnect
的情况下,只有在利用该连接对象提交
、回滚
事务时才会pick
新的节点。
配置autoReconnect
的情况下,在节点坏掉
后,会pick
一次节点,事务提交
、回滚
一样会pick
节点。
综上,如果我要实现一个select查询
也需要pick
节点实现负载均衡的情况下,不可以用单例Connection
,因为普通select并不会
触发pick
操作。
综合3,想要实现全部意义的LB,必须要使用多实例模式
,这样虽然实现了我想要的LB效果,但代价是巨大的,因为每次都会建连
。
利用连接池
可以一定程度上解决这种问题,连接池可以预先建连一堆Connection对象,这些对象如果在创建时启用jdbc LoadBalance模式,那么意味着每个连接都是随机节点
。
六、验证 为了验证第五节
的结论,我们简单做个试验验证下。
6.1:单例模式下的Query操作 代码块8 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class Test { public static void main (String[] args) throws Exception { String url = "jdbc:mysql:loadbalance://172.22.119.38:4000,172.22.119.8:4000,172.22.119.30:4000/tidb_test" ; LoadBalancedConnection connection = (LoadBalancedConnection) DriverManager.getConnection(url, "tidb_test" , "lPoK3QMSWY1BhSa3WCT1IWOXYkMc3Aqd" ); Test test = new Test (); for (int i = 0 ; i < 100 ; i++) { try { test.triggerQuery(connection); } finally { } } } private void triggerQuery (LoadBalancedConnection connection) throws Exception { Statement statement = connection.createStatement(); ResultSet rs = null ; try { System.out.println("current conn host: " + connection.getHost()); if (statement.execute("select * from t_student where id = 1" )) { rs = statement.getResultSet(); } } finally { if (rs != null ) { rs.close(); } statement.close(); } } }
这段代码运行结果打印如下:
代码块9 1 2 3 4 5 6 7 8 current conn host: 172.22.119.30 current conn host: 172.22.119.30 current conn host: 172.22.119.30 current conn host: 172.22.119.30 current conn host: 172.22.119.30 current conn host: 172.22.119.30 current conn host: 172.22.119.30 ...省略,共100条...
可以看到,如果一直用同一个Connection对象去createStatement,然后执行Query,那么节点始终是一开始pick好的那个,且永远不会变
。
6.2:多实例下的Query操作 这个其实根本没有测试的必要,多实例意味着每次Query前都会新建一个连接
对象,新建一个意味着会pick
一次,那肯定是random
的,我们改下main方法的代码:
代码块10 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static void main (String[] args) throws Exception { String url = "jdbc:mysql:loadbalance://172.22.119.38:4000,172.22.119.8:4000,172.22.119.30:4000/tidb_test" ; Test test = new Test (); for (int i = 0 ; i < 100 ; i++) { LoadBalancedConnection connection = (LoadBalancedConnection) DriverManager.getConnection(url, "tidb_test" , "lPoK3QMSWY1BhSa3WCT1IWOXYkMc3Aqd" ); try { test.triggerQuery(connection); } finally { connection.close(); } } }
这次运行结果如下:
代码块11 1 2 3 4 5 6 7 8 9 10 current conn host: 172.22.119.8 current conn host: 172.22.119.8 current conn host: 172.22.119.8 current conn host: 172.22.119.38 current conn host: 172.22.119.30 current conn host: 172.22.119.38 current conn host: 172.22.119.8 current conn host: 172.22.119.8 current conn host: 172.22.119.38 ...省略,共100条...
可以看到,已经触发了LB算法了(这是预料之中的)。
你可能会问,如果调用了close
呢?会不会close不是真的close,而是触发pick
呢?正常情况下肯定会这样实现的吧?其实并不会,close的逻辑是把里面所有的liveConnections清空
,然后close一遍
,所以下面的代码会报错:
代码块12 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static void main (String[] args) throws Exception { String url = "jdbc:mysql:loadbalance://172.22.119.38:4000,172.22.119.8:4000,172.22.119.30:4000/tidb_test" ; LoadBalancedConnection connection = (LoadBalancedConnection) DriverManager.getConnection(url, "tidb_test" , "lPoK3QMSWY1BhSa3WCT1IWOXYkMc3Aqd" ); Test test = new Test (); for (int i = 0 ; i < 100 ; i++) { try { test.triggerQuery(connection); } finally { connection.close(); } } }
执行结果如下:
是的。。它直接报错了,也就是说,在select这种语句的执行下,要么你用多实例
,每次都建连
,实现你心目中的LB,要么你就用单例
,然后一个连接用到底
。。
但是它并非一无是处,比如,你可以结合连接池
来用它,这样既可以保证连接复用
,也可以保证池内每个连接对象的host在最大限度上不是同一个。
6.3:单例模式下的事务操作 按照源码上的理解,理论上即便是单例
,开启事务
并提交的时候也会切换
一次host,现在将前面的测试代码改成下面这样:
代码块13 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public static void main (String[] args) throws Exception { String url = "jdbc:mysql:loadbalance://172.22.119.38:4000,172.22.119.8:4000,172.22.119.30:4000/tidb_test" ; LoadBalancedConnection connection = (LoadBalancedConnection) DriverManager.getConnection(url, "tidb_test" , "lPoK3QMSWY1BhSa3WCT1IWOXYkMc3Aqd" ); Test test = new Test (); for (int i = 0 ; i < 100 ; i++) { try { test.triggerTransaction(connection); } finally { } } } private void triggerTransaction (LoadBalancedConnection connection) throws Exception { connection.setAutoCommit(false ); Statement statement = connection.createStatement(); ResultSet rs = null ; try { System.out.println("current conn host: " + connection.getHost()); if (statement.execute("select * from t_student where id = 1" )) { rs = statement.getResultSet(); } connection.commit(); } finally { if (rs != null ) { rs.close(); } statement.close(); } }
打印结果如下:
代码块14 1 2 3 4 5 6 7 8 9 10 11 current conn host: 172.22.119.8 current conn host: 172.22.119.30 current conn host: 172.22.119.8 current conn host: 172.22.119.30 current conn host: 172.22.119.38 current conn host: 172.22.119.30 current conn host: 172.22.119.38 current conn host: 172.22.119.30 current conn host: 172.22.119.8 current conn host: 172.22.119.30 ...省略,共100条...
这是符合预期的,因为即便是单实例,每次处理事务的节点也发生了变换。
多实例就不再试了,没有必要。
6.4:开启autoReconnect时,单例执行Query操作 按照我们的结论,这个只有在节点坏掉时才会重新pick节点,以保证可用性,那么我们现在来开启它,然后依然用单例模式操作Query,代码改写如下:
代码块15 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static void main (String[] args) throws Exception { String url = "jdbc:mysql:loadbalance://172.22.119.38:4000,172.22.119.8:4000,172.22.119.30:4000/tidb_test?autoReconnect=true" ; LoadBalancedConnection connection = (LoadBalancedConnection) DriverManager.getConnection(url, "tidb_test" , "lPoK3QMSWY1BhSa3WCT1IWOXYkMc3Aqd" ); Test test = new Test (); for (int i = 0 ; i < 100 ; i++) { try { test.triggerQuery(connection); } finally { } } }
运行结果打印如下:
代码块16 1 2 3 4 5 6 7 8 current conn host: 172.22.119.8 current conn host: 172.22.119.8 current conn host: 172.22.119.8 current conn host: 172.22.119.8 current conn host: 172.22.119.8 current conn host: 172.22.119.8 current conn host: 172.22.119.8 ...省略,共100条...
符合我们的预期,因为它的作用不是干这个的。
七、结论 JDBC驱动程序的LoadBalance模式,是针对每一个
被新建出来的Connection对象的LB,它并非
很多人第一眼看到它协议头时所理解的那种将jdbc连接里配置的所有节点视作一个整体,每次利用Connection对象做一些操作时都会pick出来一个节点使用,以达到某种意义上的负载均衡,而是
每次新建Connection对象时,从那堆host里pick出来其中一个,创建对应的Connection对象,这跟我们第一眼看到它的感觉不太一样,但是实现上确实没什么太大的问题,因为单纯使用JDBC时本就不提倡Connection单例复用,若想要复用,需要结合各类连接池一起使用,通过对JDBC的LB模式的了解可以知道,结合某种连接池技术来支撑,就可以达到我们想象中的LB效果,因为池内每一个Connection对象在创建时,总会触发JDBC的LB策略。