基于依赖程序的版本信息:
姊妹篇:HikariCP是如何管理数据库连接的
零、类图&流程预览 下方流程中涉及到的类、属性、方法名均列在这里:Druid-类图-属性表 ←该表格用来辅助理解下面的流程图和代码,不用细看,混乱时可用来理清关系。
本文会通过getConnection作为入口,探索在druid里,一个连接的生命周期。大体流程被划分成了以下几个主流程:
一、主流程1:获取连接流程 首先从入口来看看它在获取连接时做了哪些操作:
上述流程对应源代码如下:
代码段1-1 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 public DruidPooledConnection getConnection (long maxWaitMillis) throws SQLException { init(); if (filters.size() > 0 ) { FilterChainImpl filterChain = new FilterChainImpl (this ); return filterChain.dataSource_connect(this , maxWaitMillis); } else { return getConnectionDirect(maxWaitMillis); } } public DruidPooledConnection getConnectionDirect (long maxWaitMillis) throws SQLException { int notFullTimeoutRetryCnt = 0 ; for (;;) { DruidPooledConnection poolableConnection; try { poolableConnection = getConnectionInternal(maxWaitMillis); } catch (GetConnectionTimeoutException ex) { if (notFullTimeoutRetryCnt <= this .notFullTimeoutRetryCount && !isFull()) { notFullTimeoutRetryCnt++; if (LOG.isWarnEnabled()) { LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt); } continue ; } throw ex; } if (testOnBorrow) { boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn); if (!validate) { if (LOG.isDebugEnabled()) { LOG.debug("skip not validate connection." ); } Connection realConnection = poolableConnection.conn; discardConnection(realConnection); continue ; } } else { Connection realConnection = poolableConnection.conn; if (poolableConnection.conn.isClosed()) { discardConnection(null ); continue ; } if (testWhileIdle) { final DruidConnectionHolder holder = poolableConnection.holder; long currentTimeMillis = System.currentTimeMillis(); long lastActiveTimeMillis = holder.lastActiveTimeMillis; long lastKeepTimeMillis = holder.lastKeepTimeMillis; if (lastKeepTimeMillis > lastActiveTimeMillis) { lastActiveTimeMillis = lastKeepTimeMillis; } long idleMillis = currentTimeMillis - lastActiveTimeMillis; long timeBetweenEvictionRunsMillis = this .timeBetweenEvictionRunsMillis; if (timeBetweenEvictionRunsMillis <= 0 ) { timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS; } if (idleMillis >= timeBetweenEvictionRunsMillis || idleMillis < 0 ) { boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn); if (!validate) { if (LOG.isDebugEnabled()) { LOG.debug("skip not validate connection." ); } discardConnection(realConnection); continue ; } } } } if (removeAbandoned) { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); poolableConnection.connectStackTrace = stackTrace; poolableConnection.setConnectedTimeNano(); poolableConnection.traceEnable = true ; activeConnectionLock.lock(); try { activeConnections.put(poolableConnection, PRESENT); } finally { activeConnectionLock.unlock(); } } if (!this .defaultAutoCommit) { poolableConnection.setAutoCommit(false ); } return poolableConnection; } }
上述为获取连接时的流程图,首先会调用init
进行连接池的初始化,然后运行责任链
上的每一个filter
,最终执行getConnectionDirect
获取真正的连接对象,如果开启了testOnBorrow
,则每次都会去测试连接是否可用
这也是官方不建议
设置testOnBorrow
为true
的原因,影响性能,这里的测试是指测试mysql服务端的长连接是否断开,一般mysql服务端长连保活时间是8h
,被使用一次则刷新一次使用时间,若一个连接距离上次被使用超过了保活时间,那么再次使用时将无法与mysql服务端通信
如果testOnBorrow
没有被置为true
,则会进行testWhileIdle
的检查(这一项官方建议设置为true
,缺省值也是true
),检查时会判断当前连接对象距离上次被使用的时间是否超过规定检查的时间,若超过,则进行检查一次,这个检查时间通过timeBetweenEvictionRunsMillis
来控制,默认60s
,每个连接对象会记录下上次被使用的时间,用当前时间减去上一次的使用时间得出闲置时间,闲置时间再跟timeBetweenEvictionRunsMillis
比较,超过这个时间就做一次连接可用性检查,这个相比testOnBorrow
每次都检查来说,性能会提升很多,用的时候无需关注该值,因为缺省值是true
,经测试如果将该值设置为false
,testOnBorrow
也设置为false
,数据库服务端长连保活时间改为60s,60s内不使用连接,超过60s后使用将会报连接错误。若使用testConnectionInternal
方法测试长连接结果为false
,则证明该连接已被服务端断开或者有其他的网络原因导致该连接不可用,则会触发discardConnection
进行连接回收(对应流程1.4
,因为丢弃了一个连接,因此该方法会唤醒主流程3
进行检查是否需要新建连接)。整个流程运行在一个死循环内,直到取到可用连接或者超过重试上限报错退出(在连接没有超过连接池上限的话,最多重试一次(重试次数默认重试1次,可以通过notFullTimeoutRetryCount
属性来控制),所以取连接这里一旦发生等待,在连接池没有满的情况下,最大等待2 × maxWait
的时间 ←这个有待验证)。
特别说明项
为了保证性能,不建议将testOnBorrow
设置为true
,或者说牵扯到长连接可用检测的那几项配置使用druid默认的配置就可以保证性能是最好的,如上所说,默认长连接检查是60s一次,所以不启用testOnBorrow
的情况下要想保证万无一失,自己要确认下所连的那个mysql服务端的长连接保活时间(虽然默认是8h,但是dba可能给测试环境设置的时间远小于这个时间,所以如果这个时间小于60s,就需要手动设置timeBetweenEvictionRunsMillis
了,如果mysql服务端长连接时间是8h或者更长,则用默认值即可。
为了防止不必要的扩容,在mysql服务端长连接够用的情况下,对于一些qps较高的服务、网关业务,建议把池子的最小闲置连接数minIdle
和最大连接数maxActive
设置成一样的,且按照需要调大,且开启keepAlive
进行连接活性检查(参考流程4.1
),这样就不会后期发生动态新建连接的情况(建连还是个比较重的操作,所以不如一开始就申请好所有需要的连接,个人意见,仅供参考),但是像管理后台这种,长期qps非常低,但是有的时候需要用管理后台做一些巨大的操作(比如导数据什么的)导致需要的连接暴增,且管理后台不会特别要求性能,就适合将minIdle
的值设置的比maxActive
小,这样不会造成不必要的连接浪费,也不会在需要暴增连接的时候无法动态扩增连接。
二、主流程2:初始化连接池 通过上面的流程图可以看到,在获取一个连接的时候首先会检查连接池是否已经初始化完毕(通过inited
来控制,bool类型
,未初始化为flase
,初始化完毕为true
,这个判断过程在init方法
内完成),若没有初始化,则调用init进行初始化(图主流程1
中的紫色部分),下面来看看init方法
里又做了哪些操作:
上述流程对应源代码如下:
代码段2-1 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 public void init () throws SQLException { if (inited) { return ; } DruidDriver.getInstance(); final ReentrantLock lock = this .lock; try { lock.lockInterruptibly(); } catch (InterruptedException e) { throw new SQLException ("interrupt" , e); } boolean init = false ; try { if (inited) { return ; } initStackTrace = Utils.toString(Thread.currentThread().getStackTrace()); this .id = DruidDriver.createDataSourceId(); if (this .id > 1 ) { long delta = (this .id - 1 ) * 100000 ; this .connectionIdSeedUpdater.addAndGet(this , delta); this .statementIdSeedUpdater.addAndGet(this , delta); this .resultSetIdSeedUpdater.addAndGet(this , delta); this .transactionIdSeedUpdater.addAndGet(this , delta); } if (this .jdbcUrl != null ) { this .jdbcUrl = this .jdbcUrl.trim(); initFromWrapDriverUrl(); } for (Filter filter : filters) { filter.init(this ); } if (this .dbType == null || this .dbType.length() == 0 ) { this .dbType = JdbcUtils.getDbType(jdbcUrl, null ); } if (JdbcConstants.MYSQL.equals(this .dbType) || JdbcConstants.MARIADB.equals(this .dbType) || JdbcConstants.ALIYUN_ADS.equals(this .dbType)) { boolean cacheServerConfigurationSet = false ; if (this .connectProperties.containsKey("cacheServerConfiguration" )) { cacheServerConfigurationSet = true ; } else if (this .jdbcUrl.indexOf("cacheServerConfiguration" ) != -1 ) { cacheServerConfigurationSet = true ; } if (cacheServerConfigurationSet) { this .connectProperties.put("cacheServerConfiguration" , "true" ); } } if (maxActive <= 0 ) { throw new IllegalArgumentException ("illegal maxActive " + maxActive); } if (maxActive < minIdle) { throw new IllegalArgumentException ("illegal maxActive " + maxActive); } if (getInitialSize() > maxActive) { throw new IllegalArgumentException ("illegal initialSize " + this .initialSize + ", maxActive " + maxActive); } if (timeBetweenLogStatsMillis > 0 && useGlobalDataSourceStat) { throw new IllegalArgumentException ("timeBetweenLogStatsMillis not support useGlobalDataSourceStat=true" ); } if (maxEvictableIdleTimeMillis < minEvictableIdleTimeMillis) { throw new SQLException ("maxEvictableIdleTimeMillis must be grater than minEvictableIdleTimeMillis" ); } if (this .driverClass != null ) { this .driverClass = driverClass.trim(); } initFromSPIServiceLoader(); if (this .driver == null ) { if (this .driverClass == null || this .driverClass.isEmpty()) { this .driverClass = JdbcUtils.getDriverClassName(this .jdbcUrl); } if (MockDriver.class.getName().equals(driverClass)) { driver = MockDriver.instance; } else { if (jdbcUrl == null && (driverClass == null || driverClass.length() == 0 )) { throw new SQLException ("url not set" ); } driver = JdbcUtils.createDriver(driverClassLoader, driverClass); } } else { if (this .driverClass == null ) { this .driverClass = driver.getClass().getName(); } } initCheck(); initExceptionSorter(); initValidConnectionChecker(); validationQueryCheck(); if (isUseGlobalDataSourceStat()) { dataSourceStat = JdbcDataSourceStat.getGlobal(); if (dataSourceStat == null ) { dataSourceStat = new JdbcDataSourceStat ("Global" , "Global" , this .dbType); JdbcDataSourceStat.setGlobal(dataSourceStat); } if (dataSourceStat.getDbType() == null ) { dataSourceStat.setDbType(this .dbType); } } else { dataSourceStat = new JdbcDataSourceStat (this .name, this .jdbcUrl, this .dbType, this .connectProperties); } dataSourceStat.setResetStatEnable(this .resetStatEnable); connections = new DruidConnectionHolder [maxActive]; evictConnections = new DruidConnectionHolder [maxActive]; keepAliveConnections = new DruidConnectionHolder [maxActive]; SQLException connectError = null ; if (createScheduler != null && asyncInit) { for (int i = 0 ; i < initialSize; ++i) { createTaskCount++; CreateConnectionTask task = new CreateConnectionTask (true ); this .createSchedulerFuture = createScheduler.submit(task); } } else if (!asyncInit) { while (poolingCount < initialSize) { try { PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection(); DruidConnectionHolder holder = new DruidConnectionHolder (this , pyConnectInfo); connections[poolingCount++] = holder; } catch (SQLException ex) { LOG.error("init datasource error, url: " + this .getUrl(), ex); if (initExceptionThrow) { connectError = ex; break ; } else { Thread.sleep(3000 ); } } } if (poolingCount > 0 ) { poolingPeak = poolingCount; poolingPeakTime = System.currentTimeMillis(); } } createAndLogThread(); createAndStartCreatorThread(); createAndStartDestroyThread(); initedLatch.await(); init = true ; initedTime = new Date (); registerMbean(); if (connectError != null && poolingCount == 0 ) { throw connectError; } if (keepAlive) { if (createScheduler != null ) { for (int i = 0 ; i < minIdle; ++i) { createTaskCount++; CreateConnectionTask task = new CreateConnectionTask (true ); this .createSchedulerFuture = createScheduler.submit(task); } } else { this .emptySignal(); } } } catch (SQLException e) { LOG.error("{dataSource-" + this .getID() + "} init error" , e); throw e; } catch (InterruptedException e) { throw new SQLException (e.getMessage(), e); } catch (RuntimeException e){ LOG.error("{dataSource-" + this .getID() + "} init error" , e); throw e; } catch (Error e){ LOG.error("{dataSource-" + this .getID() + "} init error" , e); throw e; } finally { inited = true ; lock.unlock(); if (init && LOG.isInfoEnabled()) { String msg = "{dataSource-" + this .getID(); if (this .name != null && !this .name.isEmpty()) { msg += "," ; msg += this .name; } msg += "} inited" ; LOG.info(msg); } } } private void initFromSPIServiceLoader () { if (loadSpifilterSkip) { return ; } if (autoFilters == null ) { List filters = new ArrayList (); ServiceLoader autoFilterLoader = ServiceLoader.load(Filter.class); for (Filter filter : autoFilterLoader) { AutoLoad autoLoad = filter.getClass().getAnnotation(AutoLoad.class); if (autoLoad != null && autoLoad.value()) { filters.add(filter); } } autoFilters = filters; } for (Filter filter : autoFilters) { if (LOG.isInfoEnabled()) { LOG.info("load filter from spi :" + filter.getClass().getName()); } addFilter(filter); } } private void initValidConnectionChecker () { if (this .validConnectionChecker != null ) { return ; } String realDriverClassName = driver.getClass().getName(); if (JdbcUtils.isMySqlDriver(realDriverClassName)) { this .validConnectionChecker = new MySqlValidConnectionChecker (); } else if (realDriverClassName.equals(JdbcConstants.ORACLE_DRIVER) || realDriverClassName.equals(JdbcConstants.ORACLE_DRIVER2)) { this .validConnectionChecker = new OracleValidConnectionChecker (); } else if (realDriverClassName.equals(JdbcConstants.SQL_SERVER_DRIVER) || realDriverClassName.equals(JdbcConstants.SQL_SERVER_DRIVER_SQLJDBC4) || realDriverClassName.equals(JdbcConstants.SQL_SERVER_DRIVER_JTDS)) { this .validConnectionChecker = new MSSQLValidConnectionChecker (); } else if (realDriverClassName.equals(JdbcConstants.POSTGRESQL_DRIVER) || realDriverClassName.equals(JdbcConstants.ENTERPRISEDB_DRIVER)) { this .validConnectionChecker = new PGValidConnectionChecker (); } } public MySqlValidConnectionChecker () { try { clazz = Utils.loadClass("com.mysql.jdbc.MySQLConnection" ); if (clazz == null ) { clazz = Utils.loadClass("com.mysql.cj.jdbc.ConnectionImpl" ); } if (clazz != null ) { ping = clazz.getMethod("pingInternal" , boolean .class, int .class); } if (ping != null ) { usePingMethod = true ; } } catch (Exception e) { LOG.warn("Cannot resolve com.mysql.jdbc.Connection.ping method. Will use 'SELECT 1' instead." , e); } configFromProperties(System.getProperties()); }
可以看到,实例化的时候会初始化全局的重入锁lock
,在初始化过程中包括后续的连接池操作都会利用该锁保证线程安全,初始化连接池的时候首先会进行双重检查是否已经初始化过,若没有,则进行连接池的初始化,这时候还会通过SPI机制
额外加载责任链上的filter
,但是这类filter需要在类上加上@AutoLoad
注解。然后初始化了三个数组,容积都为maxActive
,首先connections
就是用来存放池子里连接对象的,evictConnections
用来存放每次检查需要抛弃的连接(结合流程4.1
理解),keepAliveConnections
用于存放需要连接检查的存活连接(同样结合流程4.1
理解),然后生成初始化数(initialSize
)个连接,放进connections
,然后生成两个必须的守护线程,用来添加连接进池以及从池子里摘除不需要的连接,这俩过程较复杂,因此拆出来单说(主流程3
和主流程4
)。
特别说明项
从流程上看如果一开始实例化的时候不对连接池进行初始化(这个初始化是指对池子本身的初始化,并非单纯的指druid对象属性的初始化),那么在第一次调用getConnection
时就会走上图那么多逻辑,尤其是耗时较久的建立连接操作,被重复执行了很多次,导致第一次getConnection
时耗时过久,如果你的程序并发量很大,那么第一次获取连接时就会因为初始化流程而发生排队,所以建议在实例化连接池后对其进行预热
,通过调用init方法
或者getConnection
方法都可以。
在构建全局重入锁的时候,利用lock
对象生成了俩Condition
,对这俩Condition
解释如下:
当连接池连接够用时,利用empty阻塞
添加连接的守护线程(主流程3
),当连接池连接不够用时,获取连接的那个线程(这里记为业务线程A
)就会阻塞
在notEmpty
上,且唤起
阻塞在empty
上的添加连接的守护线程
,走完添加连接的流程,走完后会重新唤起
阻塞在notEmpty
上的业务线程A
,业务线程A
就会继续尝试获取连接。
三、流程1.1:责任链
⚠️ 这块东西结合源码看更容易理解
上述流程对应源代码如下:
代码段-1-2 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public DruidPooledConnection getConnection (long maxWaitMillis) throws SQLException { init(); if (filters.size() > 0 ) { FilterChainImpl filterChain = new FilterChainImpl (this ); return filterChain.dataSource_connect(this , maxWaitMillis); } else { return getConnectionDirect(maxWaitMillis); } } @Override public DruidPooledConnection dataSource_connect (DruidDataSource dataSource, long maxWaitMillis) throws SQLException { if (this .pos < filterSize) { DruidPooledConnection conn = nextFilter().dataSource_getConnection(this , dataSource, maxWaitMillis); return conn; } return dataSource.getConnectionDirect(maxWaitMillis); } private Filter nextFilter () { return getFilters() .get(pos++); } @Override public DruidPooledConnection dataSource_getConnection (FilterChain chain, DruidDataSource dataSource, long maxWaitMillis) throws SQLException { DruidPooledConnection conn = chain.dataSource_connect(dataSource, maxWaitMillis); ConnectionProxy connection = (ConnectionProxy) conn.getConnectionHolder().getConnection(); if (connectionConnectAfterLogEnable && isConnectionLogEnabled()) { connectionLog("{conn-" + connection.getId() + "} pool-connect" ); } return conn; }
这里对应流程1
里获取连接时需要执行的责任链,每个DruidAbstractDataSource
里都包含filters
属性,filters
是对Druid里Filters
接口的实现,里面有很多对应着连接池里的映射方法,比如例子中dataSource
的getConnection
方法在触发的时候就会利用FilterChain
把每个filter
里的dataSource_getConnection
给执行一遍,这里也要说明下FilterChain
,通过流程1.1
可以看出来,datasource是利用FilterChain
来触发各个filter的执行的,FilterChain
里也有一堆datasource
里的映射方法,比如上图里的dataSource_connect
,这个方法会把datasource里的filters
全部执行一遍直到nextFilter
取不到值,才会触发dataSource.getConnectionDirect
,这个结合代码会比较容易理解。
四、流程1.2:从池中获取连接的流程
上述流程对应源代码如下:
代码段1-3 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 private DruidPooledConnection getConnectionInternal (long maxWait) throws SQLException { if (closed) { connectErrorCountUpdater.incrementAndGet(this ); throw new DataSourceClosedException ("dataSource already closed at " + new Date (closeTimeMillis)); } if (!enable) { connectErrorCountUpdater.incrementAndGet(this ); throw new DataSourceDisableException (); } final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait); final int maxWaitThreadCount = this .maxWaitThreadCount; DruidConnectionHolder holder; for (boolean createDirect = false ;;) { if (createDirect) { } try { lock.lockInterruptibly(); } catch (InterruptedException e) { connectErrorCountUpdater.incrementAndGet(this ); throw new SQLException ("interrupt" , e); } try { if (maxWaitThreadCount > 0 && notEmptyWaitThreadCount >= maxWaitThreadCount) { connectErrorCountUpdater.incrementAndGet(this ); throw new SQLException ("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count " + lock.getQueueLength()); } if (onFatalError && onFatalErrorMaxActive > 0 && activeCount >= onFatalErrorMaxActive) { connectErrorCountUpdater.incrementAndGet(this ); StringBuilder errorMsg = new StringBuilder (); errorMsg.append("onFatalError, activeCount " ) .append(activeCount) .append(", onFatalErrorMaxActive " ) .append(onFatalErrorMaxActive); if (lastFatalErrorTimeMillis > 0 ) { errorMsg.append(", time '" ) .append(StringUtils.formatDateTime19( lastFatalErrorTimeMillis, TimeZone.getDefault())) .append("'" ); } if (lastFatalErrorSql != null ) { errorMsg.append(", sql \n" ) .append(lastFatalErrorSql); } throw new SQLException ( errorMsg.toString(), lastFatalError); } connectCount++; if (createScheduler != null && poolingCount == 0 && activeCount < maxActive && creatingCountUpdater.get(this ) == 0 && createScheduler instanceof ScheduledThreadPoolExecutor) { ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler; if (executor.getQueue().size() > 0 ) { createDirect = true ; continue ; } } if (maxWait > 0 ) { holder = pollLast(nanos); } else { holder = takeLast(); } if (holder != null ) { activeCount++; if (activeCount > activePeak) { activePeak = activeCount; activePeakTime = System.currentTimeMillis(); } } } catch (InterruptedException e) { connectErrorCountUpdater.incrementAndGet(this ); throw new SQLException (e.getMessage(), e); } catch (SQLException e) { connectErrorCountUpdater.incrementAndGet(this ); throw e; } finally { lock.unlock(); } break ; } if (holder == null ) { long waitNanos = waitNanosLocal.get(); StringBuilder buf = new StringBuilder (128 ); buf.append("wait millis " ) .append(waitNanos / (1000 * 1000 )) .append(", active " ).append(activeCount) .append(", maxActive " ).append(maxActive) .append(", creating " ).append(creatingCount) ; if (creatingCount > 0 && createStartNanos > 0 ) { long createElapseMillis = (System.nanoTime() - createStartNanos) / (1000 * 1000 ); if (createElapseMillis > 0 ) { buf.append(", createElapseMillis " ).append(createElapseMillis); } } if (createErrorCount > 0 ) { buf.append(", createErrorCount " ).append(createErrorCount); } List sqlList = this .getDataSourceStat().getRuningSqlList(); for (int i = 0 ; i < sqlList.size(); ++i) { if (i != 0 ) { buf.append('\n' ); } else { buf.append(", " ); } JdbcSqlStatValue sql = sqlList.get(i); buf.append("runningSqlCount " ).append(sql.getRunningCount()); buf.append(" : " ); buf.append(sql.getSql()); } String errorMessage = buf.toString(); if (this .createError != null ) { throw new GetConnectionTimeoutException (errorMessage, createError); } else { throw new GetConnectionTimeoutException (errorMessage); } } holder.incrementUseCount(); DruidPooledConnection poolalbeConnection = new DruidPooledConnection (holder); return poolalbeConnection; } private DruidConnectionHolder pollLast (long nanos) throws InterruptedException, SQLException { long estimate = nanos; for (;;) { if (poolingCount == 0 ) { emptySignal(); if (failFast && isFailContinuous()) { throw new DataSourceNotAvailableException (createError); } if (estimate <= 0 ) { waitNanosLocal.set(nanos - estimate); return null ; } notEmptyWaitThreadCount++; if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) { notEmptyWaitThreadPeak = notEmptyWaitThreadCount; } try { long startEstimate = estimate; estimate = notEmpty.awaitNanos(estimate); notEmptyWaitCount++; notEmptyWaitNanos += (startEstimate - estimate); if (!enable) { connectErrorCountUpdater.incrementAndGet(this ); throw new DataSourceDisableException (); } } catch (InterruptedException ie) { notEmpty.signal(); notEmptySignalCount++; throw ie; } finally { notEmptyWaitThreadCount--; } if (poolingCount == 0 ) { if (estimate > 0 ) { continue ; } waitNanosLocal.set(nanos - estimate); return null ; } } decrementPoolingCount(); DruidConnectionHolder last = connections[poolingCount]; connections[poolingCount] = null ; long waitNanos = nanos - estimate; last.setLastNotEmptyWaitNanos(waitNanos); return last; } }
通过getConnectionInternal
方法从池子里获取真正的连接对象,druid支持两种方式新增连接,一种是通过开启不同的守护线程通过await
、signal
通信实现(本文启用的方式,也是默认的方式),另一种是直接通过线程池异步新增,这个方式通过在初始化druid时传入asyncInit=true
,再把一个线程池对象赋值给createScheduler
,就成功启用了这种模式,没仔细研究这种方式,所以本文的流程图和代码块都会规避这个模式。
上面的流程很简单,连接足够时就直接poolingCount-1
,数组取值,返回,activeCount+1
,整体复杂度为O(1)
,关键还是看取不到连接时的做法,取不到连接时,druid会先唤起新增连接的守护线程新增连接,然后陷入等待状态,然后唤醒该等待的点有两处,一个是用完了连接recycle(主流程5)
进池子后触发,另外一个就是新增连接的守护线程成功新增了一个连接后触发,await
被唤起后继续加入锁竞争,然后往下走如果发现池子里的连接数仍然是0(说明在唤醒后参与锁竞争里刚被放进来的连接又被别的线程拿去了),则继续下一次的await
,这里采用的是awaitNanos
方法,初始值是maxWait
,然后下次被刷新后就是maxWait
减去上次阻塞花费的实际时间,每次await
的时间会逐步减少,直到归零,整体时间是约等于maxWait
的,但实际比maxActive
要大,因为程序本身存在耗时以及被唤醒后又要参与锁竞争导致也存在一定的耗时。
如果最终都没办法拿到连接则返回null出去,紧接着触发主流程1
中的重试逻辑。
druid如何防止在获取不到连接时阻塞过多的业务线程?
通过上面的流程图和流程描述,如果非常极端的情况,池子里的连接完全不够用时,会阻塞过多的业务线程,甚至会阻塞超过maxWait
这么久,有没有一种措施是可以在连接不够用的时候控制阻塞线程的个数,超过这个限制后直接报错,而不是陷入等待呢?
druid其实支持这种策略的,在maxWaitThreadCount
属性为默认值(-1
)的情况下不启用,如果maxWaitThreadCount
配置大于0,表示启用,这是druid做的一种丢弃措施,如果你不希望在池子里的连接完全不够用导阻塞的业务线程过多,就可以考虑配置该项,这个属性的意思是说在连接不够用时最多让多少个业务线程发生阻塞,流程1.2
的图里没有体现这个开关的用途,可以在代码里查看,每次在pollLast
方法里陷入等待前会把属性notEmptyWaitThreadCount
进行累加,阻塞结束后会递减,由此可见notEmptyWaitThreadCount
就是表示当前等待可用连接时阻塞的业务线程的总个数,而getConnectionInternal
在每次调用pollLast
前都会判断这样一段代码:
代码块1 1 2 3 4 5 if (maxWaitThreadCount > 0 && notEmptyWaitThreadCount >= maxWaitThreadCount) { connectErrorCountUpdater.incrementAndGet(this ); throw new SQLException ("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count " + lock.getQueueLength()); }
可以看到,如果配置了maxWaitThreadCount
所限制的等待线程个数,那么会直接判断当前陷入等待的业务线程是否超过了maxWaitThreadCount
,一旦超过甚至不触发pollLast
的调用(防止新增等待线程),直接抛错。
一般情况下不需要启用该项,一定要启用建议考虑好maxWaitThreadCount
的取值,一般来说发生大量等待说明代码里存在不合理的地方:比如典型的连接池基本配置不合理,高qps的系统里maxActive
配置过小;比如借出去的连接没有及时close归还;比如存在慢查询或者慢事务导致连接借出时间过久。这些要比配置maxWaitThreadCount
更值得优先考虑,当然配置这个做一个极限保护也是没问题的,只是要结合实际情况考虑好取值。
五、流程1.3:连接可用性测试 ①init-checker 讲这块的东西之前,先来了解下如何初始化检测连接用的checker,整个流程参考下图:
ps:上述流程对应源代码位置:代码段2-1
中的initValidConnectionChecker
方法与MySqlValidConnectionChecker
构造器
初始化checker
发生在init阶段
(限于篇幅,没有在主流程2(init阶段)
里体现出来,只需要记住初始化checker
也是发生在init阶段
就好),druid支持多种数据库的连接源,所以checker针对不同的驱动程序都做了适配,所以才看到图中checker有不同的实现,我们根据加载到的驱动类名匹配不同的数据库checker,上图匹配至mysql的checker,checker的初始化里做了一件事情,就是判断驱动内是否有ping方法
(jdbc4
开始支持,mysql-connector-java
早在3.x
的版本就有ping方法
的实现了),如果有,则把usePingMethod
置为true
,用于后续启用checker时做判断用(下面会讲,这里置为true
,则通过反射
的方式调用驱动程序的ping方法
,如果为false
,则触发普通的SELECT 1
查询检测,SELECT 1
就是我们非常熟悉的那个东西啦,新建statement
,然后执行SELECT 1
,然后再判断连接是否可用)。
②testConnectionInternal 然后回到本节探讨的方法:流程1.3
对应的testConnectionInternal
上述流程对应源代码如下:
代码段1-4 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 protected boolean testConnectionInternal (DruidConnectionHolder holder, Connection conn) { String sqlFile = JdbcSqlStat.getContextSqlFile(); String sqlName = JdbcSqlStat.getContextSqlName(); if (sqlFile != null ) { JdbcSqlStat.setContextSqlFile(null ); } if (sqlName != null ) { JdbcSqlStat.setContextSqlName(null ); } try { if (validConnectionChecker != null ) { boolean valid = validConnectionChecker.isValidConnection(conn, validationQuery, validationQueryTimeout); long currentTimeMillis = System.currentTimeMillis(); if (holder != null ) { holder.lastValidTimeMillis = currentTimeMillis; } if (valid && isMySql) { long lastPacketReceivedTimeMs = MySqlUtils.getLastPacketReceivedTimeMs(conn); if (lastPacketReceivedTimeMs > 0 ) { long mysqlIdleMillis = currentTimeMillis - lastPacketReceivedTimeMs; if (lastPacketReceivedTimeMs > 0 && mysqlIdleMillis >= timeBetweenEvictionRunsMillis) { discardConnection(conn); String errorMsg = "discard long time none received connection. " + ", jdbcUrl : " + jdbcUrl + ", jdbcUrl : " + jdbcUrl + ", lastPacketReceivedIdleMillis : " + mysqlIdleMillis; LOG.error(errorMsg); return false ; } } } return valid; } if (conn.isClosed()) { return false ; } if (null == validationQuery) { return true ; } Statement stmt = null ; ResultSet rset = null ; try { stmt = conn.createStatement(); if (getValidationQueryTimeout() > 0 ) { stmt.setQueryTimeout(validationQueryTimeout); } rset = stmt.executeQuery(validationQuery); if (!rset.next()) { return false ; } } finally { JdbcUtils.close(rset); JdbcUtils.close(stmt); } return true ; } catch (Throwable ex) { return false ; } finally { if (sqlFile != null ) { JdbcSqlStat.setContextSqlFile(sqlFile); } if (sqlName != null ) { JdbcSqlStat.setContextSqlName(sqlName); } } } public boolean isValidConnection (Connection conn, String validateQuery, int validationQueryTimeout) throws Exception { if (conn.isClosed()) { return false ; } if (usePingMethod) { if (conn instanceof DruidPooledConnection) { conn = ((DruidPooledConnection) conn).getConnection(); } if (conn instanceof ConnectionProxy) { conn = ((ConnectionProxy) conn).getRawObject(); } if (clazz.isAssignableFrom(conn.getClass())) { if (validationQueryTimeout < 0 ) { validationQueryTimeout = DEFAULT_VALIDATION_QUERY_TIMEOUT; } try { ping.invoke(conn, true , validationQueryTimeout * 1000 ); } catch (InvocationTargetException e) { Throwable cause = e.getCause(); if (cause instanceof SQLException) { throw (SQLException) cause; } throw e; } return true ; } } String query = validateQuery; if (validateQuery == null || validateQuery.isEmpty()) { query = DEFAULT_VALIDATION_QUERY; } Statement stmt = null ; ResultSet rs = null ; try { stmt = conn.createStatement(); if (validationQueryTimeout > 0 ) { stmt.setQueryTimeout(validationQueryTimeout); } rs = stmt.executeQuery(query); return true ; } finally { JdbcUtils.close(rs); JdbcUtils.close(stmt); } }
这个方法会利用主流程2
(init阶段
)里初始化好的checker对象
(流程参考init-checker
)里的isValidConnection方法
,如果启用ping
,则该方法会利用invoke
触发驱动程序里的ping方法
,如果不启用ping,就采用SELECT 1
方式(从init-checker
里可以看出启不启用取决于加载到的驱动程序里是否存在相应的方法)。
六、流程1.4:抛弃连接
上述流程对应源代码如下:
代码段1-5 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public void discardConnection (Connection realConnection) { JdbcUtils.close(realConnection); lock.lock(); try { activeCount--; discardCount++; if (activeCount <= minIdle) { emptySignal(); } } finally { lock.unlock(); } }
经过流程1.3
返回的测试结果,如果发现连接不可用,则直接触发抛弃连接逻辑,这个过程非常简单,如上图所示,由流程1.2
获取到该连接时累加上去的activeCount
,在本流程里会再次减一,表示被取出来的连接不可用,并不能active状态。其次这里的close是拿着驱动那个连接对象进行close,正常情况下一个连接对象会被druid封装成DruidPooledConnection
对象,内部持有的conn
就是真正的驱动Connection
对象,上图中的关闭连接就是获取的该对象进行close
,如果使用包装类DruidPooledConnection
进行close,则代表回收连接对象(recycle
,参考主流程5
)。
七、主流程3:添加连接的守护线程
上述流程对应源代码如下:
代码段3-1 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 public class CreateConnectionThread extends Thread { public CreateConnectionThread (String name) { super (name); this .setDaemon(true ); } public void run () { initedLatch.countDown(); long lastDiscardCount = 0 ; int errorCount = 0 ; for (;;) { try { lock.lockInterruptibly(); } catch (InterruptedException e2) { break ; } long discardCount = DruidDataSource.this .discardCount; boolean discardChanged = discardCount - lastDiscardCount > 0 ; lastDiscardCount = discardCount; try { boolean emptyWait = true ; if (createError != null && poolingCount == 0 && !discardChanged) { emptyWait = false ; } if (emptyWait && asyncInit && createCount < initialSize) { emptyWait = false ; } if (emptyWait) { if (poolingCount >= notEmptyWaitThreadCount && (!(keepAlive && activeCount + poolingCount < minIdle)) && !isFailContinuous() ) { empty.await(); } if (activeCount + poolingCount >= maxActive) { empty.await(); continue ; } } } catch (InterruptedException e) { lastCreateError = e; lastErrorTimeMillis = System.currentTimeMillis(); if (!closing) { LOG.error("create connection Thread Interrupted, url: " + jdbcUrl, e); } break ; } finally { lock.unlock(); } PhysicalConnectionInfo connection = null ; try { connection = createPhysicalConnection(); } catch (SQLException e) { LOG.error("create connection SQLException, url: " + jdbcUrl + ", errorCode " + e.getErrorCode() + ", state " + e.getSQLState(), e); errorCount++; if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0 ) { setFailContinuous(true ); if (failFast) { lock.lock(); try { notEmpty.signalAll(); } finally { lock.unlock(); } } if (breakAfterAcquireFailure) { break ; } try { Thread.sleep(timeBetweenConnectErrorMillis); } catch (InterruptedException interruptEx) { break ; } } } catch (RuntimeException e) { LOG.error("create connection RuntimeException" , e); setFailContinuous(true ); continue ; } catch (Error e) { LOG.error("create connection Error" , e); setFailContinuous(true ); break ; } if (connection == null ) { continue ; } boolean result = put(connection); if (!result) { JdbcUtils.close(connection.getPhysicalConnection()); LOG.info("put physical connection to pool failed." ); } errorCount = 0 ; } } } protected boolean put (PhysicalConnectionInfo physicalConnectionInfo) { DruidConnectionHolder holder = null ; try { holder = new DruidConnectionHolder (DruidDataSource.this , physicalConnectionInfo); } catch (SQLException ex) { lock.lock(); try { if (createScheduler != null ) { createTaskCount--; } } finally { lock.unlock(); } LOG.error("create connection holder error" , ex); return false ; } return put(holder); } private boolean put (DruidConnectionHolder holder) { lock.lock(); try { if (poolingCount >= maxActive) { return false ; } connections[poolingCount] = holder; incrementPoolingCount(); if (poolingCount > poolingPeak) { poolingPeak = poolingCount; poolingPeakTime = System.currentTimeMillis(); } notEmpty.signal(); notEmptySignalCount++; if (createScheduler != null ) { createTaskCount--; if (poolingCount + createTaskCount < notEmptyWaitThreadCount && activeCount + poolingCount + createTaskCount < maxActive) { emptySignal(); } } } finally { lock.unlock(); } return true ; }
在主流程2
(init初始化阶段
)时就开启了该流程,该流程独立运行,大部分时间处于等待状态,不会抢占cpu,但是当连接不够用时,就会被唤起追加连接,成功创建连接后将会唤醒其他正在等待获取可用连接的线程,比如:
结合流程1.2
来看,当连接不够用时,会通过empty.signal
唤醒该线程进行补充连接(阻塞在empty
上的线程只有主流程3
的单线程),然后通过notEmpty
阻塞自己,当该线程补充连接成功后,又会对阻塞在notEmpty
上的线程进行唤醒,让其进入锁竞争状态,简单理解就是一个生产-消费模型。这里有一些细节,比如池子里的连接使用中(activeCount
)加上池子里剩余连接数(poolingCount
)就是指当前一共生成了多少个连接,这个数不能比maxActive
还大,如果比maxActive
还大,则再次陷入等待。而在往池子里put
连接时,则判断poolingCount
是否大于maxActive
来决定最终是否入池。
八、主流程4:抛弃连接的守护线程
上述流程对应源代码如下:
代码段4-1 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public class DestroyConnectionThread extends Thread { public DestroyConnectionThread (String name) { super (name); this .setDaemon(true ); } public void run () { initedLatch.countDown(); for (;;) { try { if (closed) { break ; } if (timeBetweenEvictionRunsMillis > 0 ) { Thread.sleep(timeBetweenEvictionRunsMillis); } else { Thread.sleep(1000 ); } if (Thread.interrupted()) { break ; } destroyTask.run(); } catch (InterruptedException e) { break ; } } } } public class DestroyTask implements Runnable { public DestroyTask () { } @Override public void run () { shrink(true , keepAlive); if (isRemoveAbandoned()) { removeAbandoned(); } } }
流程4.1:连接池瘦身,检查连接是否可用以及丢弃多余连接 整个过程如下:
上述流程对应源代码如下:
代码段-4-2 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 public void shrink (boolean checkTime, boolean keepAlive) { try { lock.lockInterruptibly(); } catch (InterruptedException e) { return ; } int evictCount = 0 ; int keepAliveCount = 0 ; try { if (!inited) { return ; } final int checkCount = poolingCount - minIdle; final long currentTimeMillis = System.currentTimeMillis(); for (int i = 0 ; i < poolingCount; ++i) { DruidConnectionHolder connection = connections[i]; if (checkTime) { if (phyTimeoutMillis > 0 ) { long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis; if (phyConnectTimeMillis > phyTimeoutMillis) { evictConnections[evictCount++] = connection; continue ; } } long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis; if (idleMillis < minEvictableIdleTimeMillis && idleMillis < keepAliveBetweenTimeMillis ) { break ; } if (idleMillis >= minEvictableIdleTimeMillis) { if (checkTime && i < checkCount) { evictConnections[evictCount++] = connection; continue ; } else if (idleMillis > maxEvictableIdleTimeMillis) { evictConnections[evictCount++] = connection; continue ; } } if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) { keepAliveConnections[keepAliveCount++] = connection; } } else { if (i < checkCount) { evictConnections[evictCount++] = connection; } else { break ; } } } int removeCount = evictCount + keepAliveCount; if (removeCount > 0 ) { System.arraycopy(connections, removeCount, connections, 0 , poolingCount - removeCount); Arrays.fill(connections, poolingCount - removeCount, poolingCount, null ); poolingCount -= removeCount; } keepAliveCheckCount += keepAliveCount; } finally { lock.unlock(); } if (evictCount > 0 ) { for (int i = 0 ; i < evictCount; ++i) { DruidConnectionHolder item = evictConnections[i]; Connection connection = item.getConnection(); JdbcUtils.close(connection); destroyCountUpdater.incrementAndGet(this ); } Arrays.fill(evictConnections, null ); } if (keepAliveCount > 0 ) { for (int i = keepAliveCount - 1 ; i >= 0 ; --i) { DruidConnectionHolder holer = keepAliveConnections[i]; Connection connection = holer.getConnection(); holer.incrementKeepAliveCheckCount(); boolean validate = false ; try { this .validateConnection(connection); validate = true ; } catch (Throwable error) { if (LOG.isDebugEnabled()) { LOG.debug("keepAliveErr" , error); } } boolean discard = !validate; if (validate) { holer.lastKeepTimeMillis = System.currentTimeMillis(); boolean putOk = put(holer); if (!putOk) { discard = true ; } } if (discard) { try { connection.close(); } catch (Exception e) { } lock.lock(); try { discardCount++; if (activeCount <= minIdle) { emptySignal(); } } finally { lock.unlock(); } } } this .getDataSourceStat().addKeepAliveCheckCount(keepAliveCount); Arrays.fill(keepAliveConnections, null ); } } private boolean put (DruidConnectionHolder holder) { lock.lock(); try { if (poolingCount >= maxActive) { return false ; } connections[poolingCount] = holder; incrementPoolingCount(); if (poolingCount > poolingPeak) { poolingPeak = poolingCount; poolingPeakTime = System.currentTimeMillis(); } notEmpty.signal(); notEmptySignalCount++; if (createScheduler != null ) { createTaskCount--; if (poolingCount + createTaskCount < notEmptyWaitThreadCount && activeCount + poolingCount + createTaskCount < maxActive) { emptySignal(); } } } finally { lock.unlock(); } return true ; }
整个流程分成图中主要的几步,首先利用poolingCount
减去minIdle
计算出需要做丢弃检查的连接对象区间,意味着这个区间的对象有被丢弃的可能,具体要不要放进丢弃队列evictConnections
,要判断两个属性:
minEvictableIdleTimeMillis
:最小检查间隙,缺省值30min
,官方解释:一个连接在池中最小生存的时间(结合检查区间来看,闲置时间超过这个时间,才会被丢弃)。
maxEvictableIdleTimeMillis
:最大检查间隙,缺省值7h
,官方解释:一个连接在池中最大生存的时间(无视检查区间,只要闲置时间超过这个时间,就一定会被丢弃)。
如果当前连接对象闲置时间超过minEvictableIdleTimeMillis
且下标在evictCheck
区间内,则加入丢弃队列evictConnections
,如果闲置时间超过maxEvictableIdleTimeMillis
,则直接放入evictConnections
(一般情况下会命中第一个判断条件,除非一个连接不在检查区间,且闲置时间超过maxEvictableIdleTimeMillis
)。
如果连接对象不在evictCheck
区间内,且keepAlive
属性为true
,则判断该对象闲置时间是否超出keepAliveBetweenTimeMillis
(缺省值60s
),若超出,则意味着该连接需要进行连接可用性检查,则将该对象放入keepAliveConnections
队列。
两个队列赋值完成后,则池子会进行一次压缩,没有涉及到的连接对象会被压缩到队首。
然后就是处理evictConnections
和keepAliveConnections
两个队列了,evictConnections
里的对象会被close最后释放掉,keepAliveConnections
里面的对象将会其进行检测(流程参考流程1.3
的isValidConnection
),碰到不可用的连接会调用discard
(流程1.4
)抛弃掉,可用的连接会再次被放进连接池。
整个流程可以看出,连接闲置后,也并非一下子就减少到minIdle
的,如果之前产生一堆的连接(不超过maxActive
),突然闲置了下来,则至少需要花minEvictableIdleTimeMillis
的时间才可以被移出连接池,如果一个连接闲置时间超过maxEvictableIdleTimeMillis
则必定被回收,所以极端情况下(比如一个连接池从初始化后就没有再被使用过),连接池里并不会一直保持minIdle
个连接,而是一个都没有,生产环境下这是非常不常见的,默认的maxEvictableIdleTimeMillis
都有7h
除非是极度冷门的系统才会出现这种情况,而开启keepAlive
也不会推翻这个规则,keepAlive
的优先级是低于maxEvictableIdleTimeMillis
的,keepAlive
只是保证了那些检查中不需要被移出连接池的连接在指定检测时间内去检测其连接活性,从而决定是否放入池子或者直接discard
。
流程4.2:主动回收连接,防止内存泄漏 过程如下:
上述流程对应源代码如下:
代码段4-3 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 public int removeAbandoned () { int removeCount = 0 ; long currrentNanos = System.nanoTime(); List abandonedList = new ArrayList (); activeConnectionLock.lock(); try { Iterator iter = activeConnections.keySet().iterator(); for (; iter.hasNext();) { DruidPooledConnection pooledConnection = iter.next(); if (pooledConnection.isRunning()) { continue ; } long timeMillis = (currrentNanos - pooledConnection.getConnectedTimeNano()) / (1000 * 1000 ); if (timeMillis >= removeAbandonedTimeoutMillis) { iter.remove(); pooledConnection.setTraceEnable(false ); abandonedList.add(pooledConnection); } } } finally { activeConnectionLock.unlock(); } if (abandonedList.size() > 0 ) { for (DruidPooledConnection pooledConnection : abandonedList) { final ReentrantLock lock = pooledConnection.lock; lock.lock(); try { if (pooledConnection.isDisable()) { continue ; } } finally { lock.unlock(); } JdbcUtils.close(pooledConnection); pooledConnection.abandond(); removeAbandonedCount++; removeCount++; if (isLogAbandoned()) { StringBuilder buf = new StringBuilder (); buf.append("abandon connection, owner thread: " ); buf.append(pooledConnection.getOwnerThread().getName()); buf.append(", connected at : " ); buf.append(pooledConnection.getConnectedTimeMillis()); buf.append(", open stackTrace\n" ); StackTraceElement[] trace = pooledConnection.getConnectStackTrace(); for (int i = 0 ; i < trace.length; i++) { buf.append("\tat " ); buf.append(trace[i].toString()); buf.append("\n" ); } buf.append("ownerThread current state is " + pooledConnection.getOwnerThread().getState() + ", current stackTrace\n" ); trace = pooledConnection.getOwnerThread().getStackTrace(); for (int i = 0 ; i < trace.length; i++) { buf.append("\tat " ); buf.append(trace[i].toString()); buf.append("\n" ); } LOG.error(buf.toString()); } } } return removeCount; }
这个流程在removeAbandoned
设置为true
的情况下才会触发,用于回收那些拿出去的使用长期未归还(归还:调用close
方法触发主流程5
)的连接。
先来看看activeConnections
是什么,activeConnections
用来保存当前从池子里被借出去的连接,这个可以通过主流程1
看出来,每次调用getConnection
时,如果开启removeAbandoned
,则会把连接对象放到activeConnections
,然后如果长期不调用close
,那么这个被借出去的连接将永远无法被重新放回池子,这是一件很麻烦的事情,这将存在内存泄漏的风险,因为不close,意味着池子会不断产生新的连接放进connections
,不符合连接池预期(连接池出发点是尽可能少的创建连接),然后之前被借出去的连接对象还有一直无法被回收的风险,存在内存泄漏的风险,因此为了解决这个问题,就有了这个流程,流程整体很简单,就是将现在借出去还没有归还的连接,做一次判断,符合条件的将会被放进abandonedList
进行连接回收(这个list里的连接对象里的abandoned
将会被置为true
,标记已被该流程处理过,防止主流程5
再次处理,具体可以参考代码段5-1
)。
这个如果在实践中能保证每次都可以正常close
,完全不用设置removeAbandoned=true
,目前如果使用了类似mybatis
、spring
等开源框架,框架内部是一定会close
的,所以此项是不建议设置的,视情况而定。
九、主流程5:回收连接 这个流程通常是靠连接包装类DruidPooledConnection
的close
方法触发的,目标方法为recycle
,流程图如下:
上述流程对应源代码如下:
代码段5-1 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 @Override public void close () throws SQLException { if (this .disable) { return ; } DruidConnectionHolder holder = this .holder; if (holder == null ) { if (dupCloseLogEnable) { LOG.error("dup close" ); } return ; } DruidAbstractDataSource dataSource = holder.getDataSource(); boolean isSameThread = this .getOwnerThread() == Thread.currentThread(); if (!isSameThread) { dataSource.setAsyncCloseConnectionEnable(true ); } if (dataSource.isAsyncCloseConnectionEnable()) { syncClose(); return ; } for (ConnectionEventListener listener : holder.getConnectionEventListeners()) { listener.connectionClosed(new ConnectionEvent (this )); } List filters = dataSource.getProxyFilters(); if (filters.size() > 0 ) { FilterChainImpl filterChain = new FilterChainImpl (dataSource); filterChain.dataSource_recycle(this ); } else { recycle(); } this .disable = true ; } public void syncClose () throws SQLException { lock.lock(); try { if (this .disable) { return ; } DruidConnectionHolder holder = this .holder; if (holder == null ) { if (dupCloseLogEnable) { LOG.error("dup close" ); } return ; } for (ConnectionEventListener listener : holder.getConnectionEventListeners()) { listener.connectionClosed(new ConnectionEvent (this )); } DruidAbstractDataSource dataSource = holder.getDataSource(); List filters = dataSource.getProxyFilters(); if (filters.size() > 0 ) { FilterChainImpl filterChain = new FilterChainImpl (dataSource); filterChain.dataSource_recycle(this ); } else { recycle(); } this .disable = true ; } finally { lock.unlock(); } } public void recycle () throws SQLException { if (this .disable) { return ; } DruidConnectionHolder holder = this .holder; if (holder == null ) { if (dupCloseLogEnable) { LOG.error("dup close" ); } return ; } if (!this .abandoned) { DruidAbstractDataSource dataSource = holder.getDataSource(); dataSource.recycle(this ); } this .holder = null ; conn = null ; transactionInfo = null ; closed = true ; } protected void recycle (DruidPooledConnection pooledConnection) throws SQLException { final DruidConnectionHolder holder = pooledConnection.holder; if (holder == null ) { LOG.warn("connectionHolder is null" ); return ; } if (logDifferentThread && (!isAsyncCloseConnectionEnable()) && pooledConnection.ownerThread != Thread.currentThread() ) { LOG.warn("get/close not same thread" ); } final Connection physicalConnection = holder.conn; if (pooledConnection.traceEnable) { Object oldInfo = null ; activeConnectionLock.lock(); try { if (pooledConnection.traceEnable) { oldInfo = activeConnections.remove(pooledConnection); pooledConnection.traceEnable = false ; } } finally { activeConnectionLock.unlock(); } if (oldInfo == null ) { if (LOG.isWarnEnabled()) { LOG.warn("remove abandonded failed. activeConnections.size " + activeConnections.size()); } } } final boolean isAutoCommit = holder.underlyingAutoCommit; final boolean isReadOnly = holder.underlyingReadOnly; final boolean testOnReturn = this .testOnReturn; try { if ((!isAutoCommit) && (!isReadOnly)) { pooledConnection.rollback(); } boolean isSameThread = pooledConnection.ownerThread == Thread.currentThread(); if (!isSameThread) { final ReentrantLock lock = pooledConnection.lock; lock.lock(); try { holder.reset(); } finally { lock.unlock(); } } else { holder.reset(); } if (holder.discard) { return ; } if (phyMaxUseCount > 0 && holder.useCount >= phyMaxUseCount) { discardConnection(holder.conn); return ; } if (physicalConnection.isClosed()) { lock.lock(); try { activeCount--; closeCount++; } finally { lock.unlock(); } return ; } if (testOnReturn) { boolean validate = testConnectionInternal(holder, physicalConnection); if (!validate) { JdbcUtils.close(physicalConnection); destroyCountUpdater.incrementAndGet(this ); lock.lock(); try { activeCount--; closeCount++; } finally { lock.unlock(); } return ; } } if (!enable) { discardConnection(holder.conn); return ; } boolean result; final long currentTimeMillis = System.currentTimeMillis(); if (phyTimeoutMillis > 0 ) { long phyConnectTimeMillis = currentTimeMillis - holder.connectTimeMillis; if (phyConnectTimeMillis > phyTimeoutMillis) { discardConnection(holder.conn); return ; } } lock.lock(); try { activeCount--; closeCount++; result = putLast(holder, currentTimeMillis); recycleCount++; } finally { lock.unlock(); } if (!result) { JdbcUtils.close(holder.conn); LOG.info("connection recyle failed." ); } } catch (Throwable e) { holder.clearStatementCache(); if (!holder.discard) { this .discardConnection(physicalConnection); holder.discard = true ; } LOG.error("recyle error" , e); recycleErrorCountUpdater.incrementAndGet(this ); } } boolean putLast (DruidConnectionHolder e, long lastActiveTimeMillis) { if (poolingCount >= maxActive) { return false ; } e.lastActiveTimeMillis = lastActiveTimeMillis; connections[poolingCount] = e; incrementPoolingCount(); if (poolingCount > poolingPeak) { poolingPeak = poolingCount; poolingPeakTime = lastActiveTimeMillis; } notEmpty.signal(); notEmptySignalCount++; return true ; }
这也是非常重要的一个流程,连接用完要归还,就是利用该流程完成归还的动作,利用druid对外包装的Connecion包装类DruidPooledConnection
的close
方法触发,该方法会通过自己内部的close
或者syncClose
方法来间接触发dataSource对象的recycle
方法,从而达到回收的目的。
最终的recycle
方法:
如果removeAbandoned
被设置为true
,则通过traceEnable
判断是否需要从activeConnections
移除该连接对象,防止流程4.2
再次检测到该连接对象,当然如果是流程4.2
主动触发的该流程,那么意味着流程4.2
里已经remove过该对象了,traceEnable
会被置为false
,本流程就不再触发remove了(这个流程都是在removeAbandoned=true
的情况下进行的,在主流程1
里连接被放进activeConnections
时traceEnable
被置为true
,而在removeAbandoned=false
的情况下traceEnable
恒等于false
)。
如果回收过程中发现存在有未处理完的事务,则触发回滚(比较有可能触发这一条的是流程4.2
里强制归还连接,也有可能是单纯使用连接,开启事务却没有提交事务就直接close
的情况),然后利用holder.reset
进行恢复连接对象里一些属性的默认值,除此之外,holder对象还会把由它产生的statement对象放到自己的一个arraylist里面,reset方法会循环着关闭内部未关闭的statement对象,最后清空list,当然,statement对象自己也会记录下其产生的所有的resultSet对象,然后关闭statement时同样也会循环关闭内部未关闭的resultSet对象,这是连接池做的一种保护措施,防止用户拿着连接对象做完一些操作没有对打开的资源关闭。
判断是否开启testOnReturn
,这个跟testOnBorrow
一样,官方默认不开启,也不建议开启,影响性能,理由参考主流程1
里针对testOnBorrow
的解释。
直接放回池子(当前connections
的尾部),然后需要注意的是putLast
方法和put
方法的不同之处,putLast
会把lastActiveTimeMillis
置为当前时间,也就是说不管一个连接被借出去过久,只要归还了,最后活跃时间就是当前时间,这就会有造成某种特殊异常情况的发生(非常极端,几乎不会触发,可以选择不看):
如果不开启testOnBorrow和testOnReturn,并且keepAlive设置为false,那么长连接可用测试的间隔依据就是利用当前时间减去上次活跃时间(lastActiveTimeMillis)得出闲置时间,然后再利用闲置时间跟timeBetweenEvictionRunsMillis(默认60s)进行对比,超过才进行长连接可用测试。
那么如果一个mysql服务端的长连接保活时间被人为调整为60s,然后timeBetweenEvictionRunsMillis被设置为59s,这个设置是非常合理的,保证了测试间隔小于长连接实际保活时间,然后如果这时一个连接被拿出去后一直过了61s才被close回收,该连接对象的lastActiveTimeMillis被刷为当前时间,如果在59s内再次拿到该连接对象,就会绕过连接检查直接报连接不可用的错误。
十、尾声 到这里针对druid连接池
的初始化以及其内部一个连接从生产
到消亡
的整个流程就已经整理完了,主要是列出其运行流程以及一些主要的监控数据都是如何产生的,没有涉及到的是一个sql的执行,因为这个基本上就跟使用原生驱动程序差不多,只是druid又包装了一层Statement等,用于完成一些自己的操作。
对于druid,处理连接只是很小的一块内容,却是很核心的一块内容。
Druid地址:https://github.com/alibaba/druid