池化技术(一)Druid是如何管理数据库连接的?

基于依赖程序的版本信息:

姊妹篇:HikariCP是如何管理数据库连接的

零、类图&流程预览

下方流程中涉及到的类、属性、方法名均列在这里:Druid-类图-属性表 ←该表格用来辅助理解下面的流程图和代码,不用细看,混乱时可用来理清关系。

本文会通过getConnection作为入口,探索在druid里,一个连接的生命周期。大体流程被划分成了以下几个主流程:

表1

一、主流程1:获取连接流程

首先从入口来看看它在获取连接时做了哪些操作:

主流程1

上述流程对应源代码如下:

代码段1-1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
init(); //初始化,即主流程2

if (filters.size() > 0) {
FilterChainImpl filterChain = new FilterChainImpl(this); //责任链,内部也是触发下面的getConnectionDirect方法,只是要走一遍责任链上每个filter的逻辑,这里不做描述,后续放到流程1.1里体现
return filterChain.dataSource_connect(this, maxWaitMillis);
} else {
return getConnectionDirect(maxWaitMillis); //触发getConnectionDirect
}
}

public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
int notFullTimeoutRetryCnt = 0;
for (;;) { //死循环
/**
* 真正返回出去的连接对象,注意这里是被druid包装成了DruidPooledConnection类型,
* 实际上池子里存放的连接类型是DruidConnectionHolder,DruidPooledConnection类本身持有一个holder属性,
* 用于保存真正的连接对象,而DruidConnectionHolder才是真正保存驱动连接对象的类。
*/
DruidPooledConnection poolableConnection;
try {
poolableConnection = getConnectionInternal(maxWaitMillis); //从池子里获取连接,这一个后续放到流程1.2体现
} catch (GetConnectionTimeoutException ex) {
if (notFullTimeoutRetryCnt <= this.notFullTimeoutRetryCount && !isFull()) { //出现了超时异常,在连接池没满且重试次数未超过上限的情况下,重试一次(notFullTimeoutRetryCount默认是0,所以至少可以重试一次)。
notFullTimeoutRetryCnt++; //重试次数+1
if (LOG.isWarnEnabled()) {
LOG.warn("get connection timeout retry : " + notFullTimeoutRetryCnt);
}
continue;
}
throw ex; //超过重试次数或者池子已满仍然获取失败,则直接抛出异常
}

if (testOnBorrow) { //testOnBorrow开启时,每次都进行检测连接可用性
boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
if (!validate) {
if (LOG.isDebugEnabled()) {
LOG.debug("skip not validate connection.");
}

Connection realConnection = poolableConnection.conn; //获取真正驱动的连接对象
discardConnection(realConnection); //若连接不可用,则触发discard,这个方法具体放到流程1.4体现
continue;
}
} else {
Connection realConnection = poolableConnection.conn;
if (poolableConnection.conn.isClosed()) {
discardConnection(null); // 传入null,避免重复关闭
continue;
}

if (testWhileIdle) { //不启用testOnBorrow的情况下,才会判断是否启用testWhileIdle
final DruidConnectionHolder holder = poolableConnection.holder;
long currentTimeMillis = System.currentTimeMillis();
long lastActiveTimeMillis = holder.lastActiveTimeMillis; //上次被使用的时间
long lastKeepTimeMillis = holder.lastKeepTimeMillis;

if (lastKeepTimeMillis > lastActiveTimeMillis) {
lastActiveTimeMillis = lastKeepTimeMillis;
}

long idleMillis = currentTimeMillis - lastActiveTimeMillis; //计算出闲置时间

long timeBetweenEvictionRunsMillis = this.timeBetweenEvictionRunsMillis;

if (timeBetweenEvictionRunsMillis <= 0) {
timeBetweenEvictionRunsMillis = DEFAULT_TIME_BETWEEN_EVICTION_RUNS_MILLIS;
}

if (idleMillis >= timeBetweenEvictionRunsMillis || idleMillis < 0) { //当闲置时间超出timeBetweenEvictionRunsMillis(默认60s)时,则触发检查逻辑
boolean validate = testConnectionInternal(poolableConnection.holder, poolableConnection.conn);
if (!validate) {
if (LOG.isDebugEnabled()) {
LOG.debug("skip not validate connection.");
}

discardConnection(realConnection); //连接不可用,同样触发discard
continue;
}
}
}
}

if (removeAbandoned) { //若开启removeAbandoned,则把当前拿到的连接放到activeConnections里,方便后续检查(后面流程4.2体现)
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
poolableConnection.connectStackTrace = stackTrace;
poolableConnection.setConnectedTimeNano(); //设置连接获取时间为当前时间
poolableConnection.traceEnable = true; //这个设置为true,则在归还该连接时会在activeConnections里清除掉该连接对象

activeConnectionLock.lock();
try {
activeConnections.put(poolableConnection, PRESENT);
} finally {
activeConnectionLock.unlock();
}
}

if (!this.defaultAutoCommit) { //默认是不开事务的,所以这里是true,不会触发下面的逻辑;这个不建议手动设置默认值,一般开启事务的工作自己做或者交给第三方框架(如spring)做比较好
poolableConnection.setAutoCommit(false);
}

return poolableConnection; //最终返回可用连接
}
}

上述为获取连接时的流程图,首先会调用init进行连接池的初始化,然后运行责任链上的每一个filter,最终执行getConnectionDirect获取真正的连接对象,如果开启了testOnBorrow,则每次都会去测试连接是否可用

这也是官方不建议设置testOnBorrowtrue的原因,影响性能,这里的测试是指测试mysql服务端的长连接是否断开,一般mysql服务端长连保活时间是8h,被使用一次则刷新一次使用时间,若一个连接距离上次被使用超过了保活时间,那么再次使用时将无法与mysql服务端通信

如果testOnBorrow没有被置为true,则会进行testWhileIdle的检查(这一项官方建议设置为true,缺省值也是true),检查时会判断当前连接对象距离上次被使用的时间是否超过规定检查的时间,若超过,则进行检查一次,这个检查时间通过timeBetweenEvictionRunsMillis来控制,默认60s,每个连接对象会记录下上次被使用的时间,用当前时间减去上一次的使用时间得出闲置时间,闲置时间再跟timeBetweenEvictionRunsMillis比较,超过这个时间就做一次连接可用性检查,这个相比testOnBorrow每次都检查来说,性能会提升很多,用的时候无需关注该值,因为缺省值是true,经测试如果将该值设置为falsetestOnBorrow也设置为false,数据库服务端长连保活时间改为60s,60s内不使用连接,超过60s后使用将会报连接错误。若使用testConnectionInternal方法测试长连接结果为false,则证明该连接已被服务端断开或者有其他的网络原因导致该连接不可用,则会触发discardConnection进行连接回收(对应流程1.4,因为丢弃了一个连接,因此该方法会唤醒主流程3进行检查是否需要新建连接)。整个流程运行在一个死循环内,直到取到可用连接或者超过重试上限报错退出(在连接没有超过连接池上限的话,最多重试一次(重试次数默认重试1次,可以通过notFullTimeoutRetryCount属性来控制),所以取连接这里一旦发生等待,在连接池没有满的情况下,最大等待2 × maxWait的时间 ←这个有待验证)。

特别说明项

  1. 为了保证性能,不建议将testOnBorrow设置为true,或者说牵扯到长连接可用检测的那几项配置使用druid默认的配置就可以保证性能是最好的,如上所说,默认长连接检查是60s一次,所以不启用testOnBorrow的情况下要想保证万无一失,自己要确认下所连的那个mysql服务端的长连接保活时间(虽然默认是8h,但是dba可能给测试环境设置的时间远小于这个时间,所以如果这个时间小于60s,就需要手动设置timeBetweenEvictionRunsMillis了,如果mysql服务端长连接时间是8h或者更长,则用默认值即可。
  2. 为了防止不必要的扩容,在mysql服务端长连接够用的情况下,对于一些qps较高的服务、网关业务,建议把池子的最小闲置连接数minIdle和最大连接数maxActive设置成一样的,且按照需要调大,且开启keepAlive进行连接活性检查(参考流程4.1),这样就不会后期发生动态新建连接的情况(建连还是个比较重的操作,所以不如一开始就申请好所有需要的连接,个人意见,仅供参考),但是像管理后台这种,长期qps非常低,但是有的时候需要用管理后台做一些巨大的操作(比如导数据什么的)导致需要的连接暴增,且管理后台不会特别要求性能,就适合将minIdle的值设置的比maxActive小,这样不会造成不必要的连接浪费,也不会在需要暴增连接的时候无法动态扩增连接。

二、主流程2:初始化连接池

通过上面的流程图可以看到,在获取一个连接的时候首先会检查连接池是否已经初始化完毕(通过inited来控制,bool类型,未初始化为flase,初始化完毕为true,这个判断过程在init方法内完成),若没有初始化,则调用init进行初始化(图主流程1中的紫色部分),下面来看看init方法里又做了哪些操作:

主流程2

上述流程对应源代码如下:

代码段2-1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
public void init() throws SQLException {
if (inited) {
return; //如果已经被初始化过,则终止该方法
}

// bug fixed for dead lock, for issue #2980
DruidDriver.getInstance();

final ReentrantLock lock = this.lock; //获取重入锁
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
throw new SQLException("interrupt", e);
}

boolean init = false;
try {
if (inited) { //双重检查
return;
}

initStackTrace = Utils.toString(Thread.currentThread().getStackTrace());

this.id = DruidDriver.createDataSourceId(); //生成连接池id
if (this.id > 1) {
//生成其他对象的id,比如连接对象的id、statement对象的id
long delta = (this.id - 1) * 100000;
this.connectionIdSeedUpdater.addAndGet(this, delta);
this.statementIdSeedUpdater.addAndGet(this, delta);
this.resultSetIdSeedUpdater.addAndGet(this, delta);
this.transactionIdSeedUpdater.addAndGet(this, delta);
}

if (this.jdbcUrl != null) {
this.jdbcUrl = this.jdbcUrl.trim();
initFromWrapDriverUrl(); //jdbc url的头必须是jdbc:wrap-jdbc才会触发该方法里的逻辑(这个头貌似是oracle的?本篇文章仅针对mysql)
}

for (Filter filter : filters) {
filter.init(this); //通过池对象初始化filters(因为filter里面可能会用到一些池属性)
}

if (this.dbType == null || this.dbType.length() == 0) {
this.dbType = JdbcUtils.getDbType(jdbcUrl, null); //根据jdbc协议头分析出当前数据库的类型(本文默认mysql)
}

if (JdbcConstants.MYSQL.equals(this.dbType)
|| JdbcConstants.MARIADB.equals(this.dbType)
|| JdbcConstants.ALIYUN_ADS.equals(this.dbType)) {
boolean cacheServerConfigurationSet = false;
if (this.connectProperties.containsKey("cacheServerConfiguration")) {
cacheServerConfigurationSet = true;
} else if (this.jdbcUrl.indexOf("cacheServerConfiguration") != -1) {
cacheServerConfigurationSet = true;
}
if (cacheServerConfigurationSet) {
this.connectProperties.put("cacheServerConfiguration", "true");
}
}

//下面就是对设置的这些属性合理性的判断,不符合要求的将直接抛异常
if (maxActive <= 0) {
throw new IllegalArgumentException("illegal maxActive " + maxActive);
}

if (maxActive < minIdle) {
throw new IllegalArgumentException("illegal maxActive " + maxActive);
}

if (getInitialSize() > maxActive) {
throw new IllegalArgumentException("illegal initialSize " + this.initialSize + ", maxActive " + maxActive);
}

if (timeBetweenLogStatsMillis > 0 && useGlobalDataSourceStat) {
throw new IllegalArgumentException("timeBetweenLogStatsMillis not support useGlobalDataSourceStat=true");
}

if (maxEvictableIdleTimeMillis < minEvictableIdleTimeMillis) {
throw new SQLException("maxEvictableIdleTimeMillis must be grater than minEvictableIdleTimeMillis");
}

if (this.driverClass != null) {
this.driverClass = driverClass.trim();
}

//通过SPI机制加载责任链上需要执行的filter,方法详情在下面
initFromSPIServiceLoader();

//如果driver为空,加载驱动,最终将加载到的驱动注册到DriverManager上去
if (this.driver == null) {
if (this.driverClass == null || this.driverClass.isEmpty()) {
this.driverClass = JdbcUtils.getDriverClassName(this.jdbcUrl); //在driverClass不配置的情况下,druid会通过url来判定属于哪个driverClass
}

if (MockDriver.class.getName().equals(driverClass)) { //忽略
driver = MockDriver.instance;
} else {
if (jdbcUrl == null && (driverClass == null || driverClass.length() == 0)) {
throw new SQLException("url not set");
}
driver = JdbcUtils.createDriver(driverClassLoader, driverClass); //driverClass不为空的情况下直接触发驱动加载
}
} else { //除非手动设置驱动,否则不会走这里的逻辑
if (this.driverClass == null) {
this.driverClass = driver.getClass().getName();
}
}

initCheck(); //根据dbType的不同,来初始化一些标记字段(比如isMySql)

initExceptionSorter(); //异常处理器初始化
initValidConnectionChecker(); //初始化长连接检测时所需要用到的checker的适配类型,具体实现在下面
validationQueryCheck(); //简单的检测validationQuery参数是否填写了,若没填写会打印一个错误日志,不影响主流程

if (isUseGlobalDataSourceStat()) { //默认不开启,忽略
dataSourceStat = JdbcDataSourceStat.getGlobal();
if (dataSourceStat == null) {
dataSourceStat = new JdbcDataSourceStat("Global", "Global", this.dbType);
JdbcDataSourceStat.setGlobal(dataSourceStat);
}
if (dataSourceStat.getDbType() == null) {
dataSourceStat.setDbType(this.dbType);
}
} else {
dataSourceStat = new JdbcDataSourceStat(this.name, this.jdbcUrl, this.dbType, this.connectProperties);
}
dataSourceStat.setResetStatEnable(this.resetStatEnable);

//下面三个数组都跟池子本身有关系,所以容量为maxActive
connections = new DruidConnectionHolder[maxActive]; //初始化连接池本体
evictConnections = new DruidConnectionHolder[maxActive]; //初始化丢弃连接数组(流程4.1需要用到)
keepAliveConnections = new DruidConnectionHolder[maxActive]; //初始化需要检测可用性连接数组(流程4.1要用)

SQLException connectError = null;

if (createScheduler != null && asyncInit) { //另外一种通过线程池管理连接池的方式,默认不启用,忽略
for (int i = 0; i < initialSize; ++i) {
createTaskCount++;
CreateConnectionTask task = new CreateConnectionTask(true);
this.createSchedulerFuture = createScheduler.submit(task);
}
} else if (!asyncInit) {
// init connections
while (poolingCount < initialSize) { //当池子里的连接数少于需要初始化的个数时,则需要不断新增连接填充连接池,直到等于初始化连接数
try {
PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection(); //直接通过驱动程序创建连接对象,参考流程2.1
DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo); //拿着驱动连接包装成holder对象
connections[poolingCount++] = holder; //生成好的连接直接往后排
} catch (SQLException ex) {
LOG.error("init datasource error, url: " + this.getUrl(), ex);
if (initExceptionThrow) {
connectError = ex;
break;
} else {
Thread.sleep(3000); //异常报错后会休眠3s来进行下次的添加
}
}
}

if (poolingCount > 0) {
poolingPeak = poolingCount;
poolingPeakTime = System.currentTimeMillis();
}
}

createAndLogThread(); //开启打印log日志的守护线程
createAndStartCreatorThread(); //开启负责新增连接的守护线程(主流程3)
createAndStartDestroyThread(); //开启负责丢弃连接的守护线程(主流程4)

initedLatch.await(); //倒计数器,用来保证上面的主流程3和4两个守护线程全部开启完毕后才进行接下来的操作
init = true;

initedTime = new Date();
registerMbean();

if (connectError != null && poolingCount == 0) {
throw connectError;
}

if (keepAlive) {
// async fill to minIdle
if (createScheduler != null) { //默认不启用该模式,忽略
for (int i = 0; i < minIdle; ++i) {
createTaskCount++;
CreateConnectionTask task = new CreateConnectionTask(true);
this.createSchedulerFuture = createScheduler.submit(task);
}
} else {
this.emptySignal(); //keepAlive=true,主动唤起主流程3一次
}
}

} catch (SQLException e) {
LOG.error("{dataSource-" + this.getID() + "} init error", e);
throw e;
} catch (InterruptedException e) {
throw new SQLException(e.getMessage(), e);
} catch (RuntimeException e){
LOG.error("{dataSource-" + this.getID() + "} init error", e);
throw e;
} catch (Error e){
LOG.error("{dataSource-" + this.getID() + "} init error", e);
throw e;

} finally {
inited = true; //初始化完成后置为true
lock.unlock(); //释放锁

if (init && LOG.isInfoEnabled()) {
String msg = "{dataSource-" + this.getID();

if (this.name != null && !this.name.isEmpty()) {
msg += ",";
msg += this.name;
}

msg += "} inited";

LOG.info(msg);
}
}
}

private void initFromSPIServiceLoader() {
if (loadSpifilterSkip) { //默认不跳过SPI加载
return;
}

if (autoFilters == null) {
List filters = new ArrayList();
ServiceLoader autoFilterLoader = ServiceLoader.load(Filter.class); //加载Filter的实现

for (Filter filter : autoFilterLoader) {
AutoLoad autoLoad = filter.getClass().getAnnotation(AutoLoad.class);
if (autoLoad != null && autoLoad.value()) {
filters.add(filter);
}
}
autoFilters = filters;
}

for (Filter filter : autoFilters) {
if (LOG.isInfoEnabled()) {
LOG.info("load filter from spi :" + filter.getClass().getName());
}
addFilter(filter); //把通过SPI机制加载到的filter放到池子的filters里,用于后续责任链触发
}
}

private void initValidConnectionChecker() { //初始化checker
if (this.validConnectionChecker != null) {
return;
}

String realDriverClassName = driver.getClass().getName(); //根据驱动的class名称,来适配具体的checker实现
if (JdbcUtils.isMySqlDriver(realDriverClassName)) {
this.validConnectionChecker = new MySqlValidConnectionChecker(); //假设是mysql类型的驱动,那么适配到mysql的checker,MySqlValidConnectionChecker的构造器参考下面的方法

} else if (realDriverClassName.equals(JdbcConstants.ORACLE_DRIVER)
|| realDriverClassName.equals(JdbcConstants.ORACLE_DRIVER2)) {
this.validConnectionChecker = new OracleValidConnectionChecker();

} else if (realDriverClassName.equals(JdbcConstants.SQL_SERVER_DRIVER)
|| realDriverClassName.equals(JdbcConstants.SQL_SERVER_DRIVER_SQLJDBC4)
|| realDriverClassName.equals(JdbcConstants.SQL_SERVER_DRIVER_JTDS)) {
this.validConnectionChecker = new MSSQLValidConnectionChecker();

} else if (realDriverClassName.equals(JdbcConstants.POSTGRESQL_DRIVER)
|| realDriverClassName.equals(JdbcConstants.ENTERPRISEDB_DRIVER)) {
this.validConnectionChecker = new PGValidConnectionChecker();
}
}

//Mysql对应的checker构造器
public MySqlValidConnectionChecker(){
try {
clazz = Utils.loadClass("com.mysql.jdbc.MySQLConnection");
if (clazz == null) {
clazz = Utils.loadClass("com.mysql.cj.jdbc.ConnectionImpl");
}

if (clazz != null) {
//如果驱动程序本身有ping方法,则下面的usePingMethod设置为true,后续连接保活测试就会采用ping.invoke的方式触发。
ping = clazz.getMethod("pingInternal", boolean.class, int.class);
}

if (ping != null) {
usePingMethod = true;
}
} catch (Exception e) {
LOG.warn("Cannot resolve com.mysql.jdbc.Connection.ping method. Will use 'SELECT 1' instead.", e);
}

configFromProperties(System.getProperties());
}

可以看到,实例化的时候会初始化全局的重入锁lock,在初始化过程中包括后续的连接池操作都会利用该锁保证线程安全,初始化连接池的时候首先会进行双重检查是否已经初始化过,若没有,则进行连接池的初始化,这时候还会通过SPI机制额外加载责任链上的filter,但是这类filter需要在类上加上@AutoLoad注解。然后初始化了三个数组,容积都为maxActive,首先connections就是用来存放池子里连接对象的,evictConnections用来存放每次检查需要抛弃的连接(结合流程4.1理解),keepAliveConnections用于存放需要连接检查的存活连接(同样结合流程4.1理解),然后生成初始化数(initialSize)个连接,放进connections,然后生成两个必须的守护线程,用来添加连接进池以及从池子里摘除不需要的连接,这俩过程较复杂,因此拆出来单说(主流程3主流程4)。

特别说明项

  1. 从流程上看如果一开始实例化的时候不对连接池进行初始化(这个初始化是指对池子本身的初始化,并非单纯的指druid对象属性的初始化),那么在第一次调用getConnection时就会走上图那么多逻辑,尤其是耗时较久的建立连接操作,被重复执行了很多次,导致第一次getConnection时耗时过久,如果你的程序并发量很大,那么第一次获取连接时就会因为初始化流程而发生排队,所以建议在实例化连接池后对其进行预热,通过调用init方法或者getConnection方法都可以。

  2. 在构建全局重入锁的时候,利用lock对象生成了俩Condition,对这俩Condition解释如下:

    当连接池连接够用时,利用empty阻塞添加连接的守护线程(主流程3),当连接池连接不够用时,获取连接的那个线程(这里记为业务线程A)就会阻塞notEmpty上,且唤起阻塞在empty上的添加连接的守护线程,走完添加连接的流程,走完后会重新唤起阻塞在notEmpty上的业务线程A业务线程A就会继续尝试获取连接。

三、流程1.1:责任链

⚠️ 这块东西结合源码看更容易理解

流程1.1

上述流程对应源代码如下:

代码段-1-2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//DruidDataSource类里的方法:获取连接
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
init();

if (filters.size() > 0) { //责任链上的filter存在
FilterChainImpl filterChain = new FilterChainImpl(this); //该类是执行整个责任链的执行者
return filterChain.dataSource_connect(this, maxWaitMillis); //每个需要执行责任链的方法,在filterChain里都可以找到映射方法,比如本方法getConnection,就对应filterChain.dataSource_connect(参考流程1.1)
} else {
return getConnectionDirect(maxWaitMillis);
}
}

//FilterChainImpl类里的方法:获取连接映射方法
@Override
public DruidPooledConnection dataSource_connect(DruidDataSource dataSource, long maxWaitMillis) throws SQLException {
if (this.pos < filterSize) {
//除了FilterChainImpl里面包含一些datasource的映射方法,需要执行的filter里面也包括,比如下面的dataSource_getConnection方法
DruidPooledConnection conn = nextFilter().dataSource_getConnection(this, dataSource, maxWaitMillis); //根据下标,获取下一个filter,触发目标方法
return conn;
}

return dataSource.getConnectionDirect(maxWaitMillis); //执行到最后一个filter时,触发datasource,返回真正的连接
}

//FilterChainImpl类里的方法:获取下一个需要执行的filter
private Filter nextFilter() {
return getFilters()
.get(pos++); //根据游标计算
}

//随便找了一个filter里的目标方法
//LogFilter类里的方法:dataSource_getConnection
@Override
public DruidPooledConnection dataSource_getConnection(FilterChain chain, DruidDataSource dataSource,
long maxWaitMillis) throws SQLException {
DruidPooledConnection conn = chain.dataSource_connect(dataSource, maxWaitMillis); //这里又会利用FilterChainImpl触发映射方法

//下面就是自己内部的一些特有逻辑,忽略
ConnectionProxy connection = (ConnectionProxy) conn.getConnectionHolder().getConnection();

if (connectionConnectAfterLogEnable && isConnectionLogEnabled()) {
connectionLog("{conn-" + connection.getId() + "} pool-connect");
}

return conn; //返回
}

这里对应流程1里获取连接时需要执行的责任链,每个DruidAbstractDataSource里都包含filters属性,filters是对Druid里Filters接口的实现,里面有很多对应着连接池里的映射方法,比如例子中dataSourcegetConnection方法在触发的时候就会利用FilterChain把每个filter里的dataSource_getConnection给执行一遍,这里也要说明下FilterChain,通过流程1.1可以看出来,datasource是利用FilterChain来触发各个filter的执行的,FilterChain里也有一堆datasource里的映射方法,比如上图里的dataSource_connect,这个方法会把datasource里的filters全部执行一遍直到nextFilter取不到值,才会触发dataSource.getConnectionDirect,这个结合代码会比较容易理解。

四、流程1.2:从池中获取连接的流程

流程1.2

上述流程对应源代码如下:

代码段1-3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
private DruidPooledConnection getConnectionInternal(long maxWait) throws SQLException {
//可用性判断
if (closed) {
connectErrorCountUpdater.incrementAndGet(this);
throw new DataSourceClosedException("dataSource already closed at " + new Date(closeTimeMillis));
}

if (!enable) {
connectErrorCountUpdater.incrementAndGet(this);
throw new DataSourceDisableException();
}

final long nanos = TimeUnit.MILLISECONDS.toNanos(maxWait); //纳秒
final int maxWaitThreadCount = this.maxWaitThreadCount; //目前因为拿不到连接而发生阻塞的业务线程数

DruidConnectionHolder holder;

for (boolean createDirect = false;;) {
if (createDirect) { //模式未启用,恒等false,下面的逻辑不会触发,所以为了方便阅读,隐藏这部分代码
//代码隐藏
}

try {
lock.lockInterruptibly(); //锁获取
} catch (InterruptedException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("interrupt", e);
}

try {
if (maxWaitThreadCount > 0
&& notEmptyWaitThreadCount >= maxWaitThreadCount) { //如果因为拿不到连接而阻塞的业务线程数达到阈值,则直接抛异常
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count "
+ lock.getQueueLength());
}

if (onFatalError
&& onFatalErrorMaxActive > 0
&& activeCount >= onFatalErrorMaxActive) {
connectErrorCountUpdater.incrementAndGet(this);

StringBuilder errorMsg = new StringBuilder();
errorMsg.append("onFatalError, activeCount ")
.append(activeCount)
.append(", onFatalErrorMaxActive ")
.append(onFatalErrorMaxActive);

if (lastFatalErrorTimeMillis > 0) {
errorMsg.append(", time '")
.append(StringUtils.formatDateTime19(
lastFatalErrorTimeMillis, TimeZone.getDefault()))
.append("'");
}

if (lastFatalErrorSql != null) {
errorMsg.append(", sql \n")
.append(lastFatalErrorSql);
}

throw new SQLException(
errorMsg.toString(), lastFatalError);
}

connectCount++; //连接数累加

if (createScheduler != null
&& poolingCount == 0
&& activeCount < maxActive
&& creatingCountUpdater.get(this) == 0
&& createScheduler instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) createScheduler;
if (executor.getQueue().size() > 0) {
createDirect = true; //createScheduler这种异步添加模式不开启(默认不开启,本文也不是基于该模式的),createDirect永远不等于true,所以上面createDirect==true的代码不会被触发
continue;
}
}

if (maxWait > 0) {
holder = pollLast(nanos); //尝试从池子里获取连接
} else {
holder = takeLast();
}

if (holder != null) {
activeCount++; //拿到连接,activeCount累加
if (activeCount > activePeak) {
activePeak = activeCount;
activePeakTime = System.currentTimeMillis();
}
}
} catch (InterruptedException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException(e.getMessage(), e);
} catch (SQLException e) {
connectErrorCountUpdater.incrementAndGet(this);
throw e;
} finally {
lock.unlock();
}

break;
}

if (holder == null) { //没有获取到连接,整理错误信息,抛出错误
long waitNanos = waitNanosLocal.get();

StringBuilder buf = new StringBuilder(128);
buf.append("wait millis ")//
.append(waitNanos / (1000 * 1000))//
.append(", active ").append(activeCount)//
.append(", maxActive ").append(maxActive)//
.append(", creating ").append(creatingCount)//
;
if (creatingCount > 0 && createStartNanos > 0) {
long createElapseMillis = (System.nanoTime() - createStartNanos) / (1000 * 1000);
if (createElapseMillis > 0) {
buf.append(", createElapseMillis ").append(createElapseMillis);
}
}

if (createErrorCount > 0) {
buf.append(", createErrorCount ").append(createErrorCount);
}

List sqlList = this.getDataSourceStat().getRuningSqlList();
for (int i = 0; i < sqlList.size(); ++i) {
if (i != 0) {
buf.append('\n');
} else {
buf.append(", ");
}
JdbcSqlStatValue sql = sqlList.get(i);
buf.append("runningSqlCount ").append(sql.getRunningCount());
buf.append(" : ");
buf.append(sql.getSql());
}

String errorMessage = buf.toString();

if (this.createError != null) {
throw new GetConnectionTimeoutException(errorMessage, createError);
} else {
throw new GetConnectionTimeoutException(errorMessage);
}
}

holder.incrementUseCount();

DruidPooledConnection poolalbeConnection = new DruidPooledConnection(holder); //包装成目标对象
return poolalbeConnection; //返回
}

//尝试从池子里获取连接
private DruidConnectionHolder pollLast(long nanos) throws InterruptedException, SQLException {
long estimate = nanos;

for (;;) {
if (poolingCount == 0) { //池子里的空闲连接为0,说明需要通知主流程3新增连接了
emptySignal(); // empty.signal,唤起主流程3新增连接

if (failFast && isFailContinuous()) { //如果置为快速结束,则不阻塞业务线程,直接抛出异常
throw new DataSourceNotAvailableException(createError);
}

if (estimate <= 0) {
waitNanosLocal.set(nanos - estimate);
return null;
}

notEmptyWaitThreadCount++; //因为获取不到连接而陷入阻塞状态的业务线程数+1
if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
}

try {
long startEstimate = estimate;
estimate = notEmpty.awaitNanos(estimate); // 阻塞(挂起)estimate这么长的世界,期间如果被唤醒,则estimate就会被刷新成剩余等待时间
// recycle or
// creator
notEmptyWaitCount++;
notEmptyWaitNanos += (startEstimate - estimate);

if (!enable) {
connectErrorCountUpdater.incrementAndGet(this);
throw new DataSourceDisableException();
}
} catch (InterruptedException ie) {
notEmpty.signal(); // 期间线程被中断,则唤起一次其他处于阻塞状态的业务线程
notEmptySignalCount++;
throw ie;
} finally {
notEmptyWaitThreadCount--;
}

if (poolingCount == 0) { //依然没有竞争到
if (estimate > 0) { //如果目标阻塞时间(maxWait)还没有用完,则继续尝试获取
continue;
}

waitNanosLocal.set(nanos - estimate);
return null;
}
}

decrementPoolingCount(); //poolingCount--
DruidConnectionHolder last = connections[poolingCount]; //直接获取
connections[poolingCount] = null; //获取后意味着连接已被借出,原有位置置空

long waitNanos = nanos - estimate; //标记这次获取连接花了多长时间,连接够用时便为0
last.setLastNotEmptyWaitNanos(waitNanos);

return last; //返回
}
}

通过getConnectionInternal方法从池子里获取真正的连接对象,druid支持两种方式新增连接,一种是通过开启不同的守护线程通过awaitsignal通信实现(本文启用的方式,也是默认的方式),另一种是直接通过线程池异步新增,这个方式通过在初始化druid时传入asyncInit=true,再把一个线程池对象赋值给createScheduler,就成功启用了这种模式,没仔细研究这种方式,所以本文的流程图和代码块都会规避这个模式。

上面的流程很简单,连接足够时就直接poolingCount-1,数组取值,返回,activeCount+1,整体复杂度为O(1),关键还是看取不到连接时的做法,取不到连接时,druid会先唤起新增连接的守护线程新增连接,然后陷入等待状态,然后唤醒该等待的点有两处,一个是用完了连接recycle(主流程5)进池子后触发,另外一个就是新增连接的守护线程成功新增了一个连接后触发,await被唤起后继续加入锁竞争,然后往下走如果发现池子里的连接数仍然是0(说明在唤醒后参与锁竞争里刚被放进来的连接又被别的线程拿去了),则继续下一次的await,这里采用的是awaitNanos方法,初始值是maxWait,然后下次被刷新后就是maxWait减去上次阻塞花费的实际时间,每次await的时间会逐步减少,直到归零,整体时间是约等于maxWait的,但实际比maxActive要大,因为程序本身存在耗时以及被唤醒后又要参与锁竞争导致也存在一定的耗时。

如果最终都没办法拿到连接则返回null出去,紧接着触发主流程1中的重试逻辑。

druid如何防止在获取不到连接时阻塞过多的业务线程?

通过上面的流程图和流程描述,如果非常极端的情况,池子里的连接完全不够用时,会阻塞过多的业务线程,甚至会阻塞超过maxWait这么久,有没有一种措施是可以在连接不够用的时候控制阻塞线程的个数,超过这个限制后直接报错,而不是陷入等待呢?

druid其实支持这种策略的,在maxWaitThreadCount属性为默认值(-1)的情况下不启用,如果maxWaitThreadCount配置大于0,表示启用,这是druid做的一种丢弃措施,如果你不希望在池子里的连接完全不够用导阻塞的业务线程过多,就可以考虑配置该项,这个属性的意思是说在连接不够用时最多让多少个业务线程发生阻塞,流程1.2的图里没有体现这个开关的用途,可以在代码里查看,每次在pollLast方法里陷入等待前会把属性notEmptyWaitThreadCount进行累加,阻塞结束后会递减,由此可见notEmptyWaitThreadCount就是表示当前等待可用连接时阻塞的业务线程的总个数,而getConnectionInternal在每次调用pollLast前都会判断这样一段代码:

代码块1
1
2
3
4
5
if (maxWaitThreadCount > 0 && notEmptyWaitThreadCount >= maxWaitThreadCount) {
connectErrorCountUpdater.incrementAndGet(this);
throw new SQLException("maxWaitThreadCount " + maxWaitThreadCount + ", current wait Thread count "
+ lock.getQueueLength()); //直接抛异常,而不是陷入等待状态阻塞业务线程
}

可以看到,如果配置了maxWaitThreadCount所限制的等待线程个数,那么会直接判断当前陷入等待的业务线程是否超过了maxWaitThreadCount,一旦超过甚至不触发pollLast的调用(防止新增等待线程),直接抛错。

一般情况下不需要启用该项,一定要启用建议考虑好maxWaitThreadCount的取值,一般来说发生大量等待说明代码里存在不合理的地方:比如典型的连接池基本配置不合理,高qps的系统里maxActive配置过小;比如借出去的连接没有及时close归还;比如存在慢查询或者慢事务导致连接借出时间过久。这些要比配置maxWaitThreadCount更值得优先考虑,当然配置这个做一个极限保护也是没问题的,只是要结合实际情况考虑好取值。

五、流程1.3:连接可用性测试

①init-checker

讲这块的东西之前,先来了解下如何初始化检测连接用的checker,整个流程参考下图:

init-checker流程图

ps:上述流程对应源代码位置:代码段2-1中的initValidConnectionChecker方法与MySqlValidConnectionChecker构造器

初始化checker发生在init阶段(限于篇幅,没有在主流程2(init阶段)里体现出来,只需要记住初始化checker也是发生在init阶段就好),druid支持多种数据库的连接源,所以checker针对不同的驱动程序都做了适配,所以才看到图中checker有不同的实现,我们根据加载到的驱动类名匹配不同的数据库checker,上图匹配至mysql的checker,checker的初始化里做了一件事情,就是判断驱动内是否有ping方法jdbc4开始支持,mysql-connector-java早在3.x的版本就有ping方法的实现了),如果有,则把usePingMethod置为true,用于后续启用checker时做判断用(下面会讲,这里置为true,则通过反射的方式调用驱动程序的ping方法,如果为false,则触发普通的SELECT 1查询检测,SELECT 1就是我们非常熟悉的那个东西啦,新建statement,然后执行SELECT 1,然后再判断连接是否可用)。

②testConnectionInternal

然后回到本节探讨的方法:流程1.3对应的testConnectionInternal

流程1.3

上述流程对应源代码如下:

代码段1-4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
//数据库连接可用性测试
protected boolean testConnectionInternal(DruidConnectionHolder holder, Connection conn) {
String sqlFile = JdbcSqlStat.getContextSqlFile();
String sqlName = JdbcSqlStat.getContextSqlName();

if (sqlFile != null) {
JdbcSqlStat.setContextSqlFile(null);
}
if (sqlName != null) {
JdbcSqlStat.setContextSqlName(null);
}
try {
if (validConnectionChecker != null) { //checker不为空
//checker是init(主流程2)里通过驱动进行适配的检测者,因为本篇文章基于mysql,所以假设这里适配到的checker是MySqlValidConnectionChecker类型的
boolean valid = validConnectionChecker.isValidConnection(conn, validationQuery, validationQueryTimeout);
long currentTimeMillis = System.currentTimeMillis();
if (holder != null) {
holder.lastValidTimeMillis = currentTimeMillis;
}

if (valid && isMySql) {
//这里在现有驱动版本的情况下拿到的lastPacketReceivedTimeMs始终小于0,因为找不到com.mysql.jdbc.MySQLConnection
long lastPacketReceivedTimeMs = MySqlUtils.getLastPacketReceivedTimeMs(conn);
if (lastPacketReceivedTimeMs > 0) {
long mysqlIdleMillis = currentTimeMillis - lastPacketReceivedTimeMs;
if (lastPacketReceivedTimeMs > 0 //
&& mysqlIdleMillis >= timeBetweenEvictionRunsMillis) {
discardConnection(conn);
String errorMsg = "discard long time none received connection. "
+ ", jdbcUrl : " + jdbcUrl
+ ", jdbcUrl : " + jdbcUrl
+ ", lastPacketReceivedIdleMillis : " + mysqlIdleMillis;
LOG.error(errorMsg);
return false;
}
}
}

return valid; //返回验证结果
}

if (conn.isClosed()) {
return false;
}

//checker为空时,就直接利用validationQuery进行常规测试
if (null == validationQuery) {
return true; //validationQuery为空就单纯返回true
}

Statement stmt = null;
ResultSet rset = null;
try {
stmt = conn.createStatement();
if (getValidationQueryTimeout() > 0) {
stmt.setQueryTimeout(validationQueryTimeout);
}
rset = stmt.executeQuery(validationQuery);
if (!rset.next()) {
return false; //执行检测语句失败,返回false
}
} finally {
//关闭资源
JdbcUtils.close(rset);
JdbcUtils.close(stmt);
}

return true; //验证通过返回true
} catch (Throwable ex) {
// skip
return false;
} finally {
if (sqlFile != null) {
JdbcSqlStat.setContextSqlFile(sqlFile);
}
if (sqlName != null) {
JdbcSqlStat.setContextSqlName(sqlName);
}
}
}


//MySqlValidConnectionChecker类里的验证方法
public boolean isValidConnection(Connection conn, String validateQuery, int validationQueryTimeout) throws Exception {
if (conn.isClosed()) {
return false;
}

if (usePingMethod) { //是否启用ping方法(如果驱动程序有该方法,则这里为true,一般情况下都是true)
if (conn instanceof DruidPooledConnection) {
conn = ((DruidPooledConnection) conn).getConnection();
}

if (conn instanceof ConnectionProxy) {
conn = ((ConnectionProxy) conn).getRawObject();
}

if (clazz.isAssignableFrom(conn.getClass())) {
if (validationQueryTimeout < 0) {
validationQueryTimeout = DEFAULT_VALIDATION_QUERY_TIMEOUT;
}

try {
//ping对象是初始化时拿到驱动程序的一个Method对象,这里通过invoke触发调用
ping.invoke(conn, true, validationQueryTimeout * 1000);
} catch (InvocationTargetException e) {
Throwable cause = e.getCause();
if (cause instanceof SQLException) {
throw (SQLException) cause;
}
throw e; //ping出错抛异常
}
return true; //通过则返回true
}
}

//如果不支持ping方式检测,则触发SELECT 1的方式进行检测(一般情况下不会触发,都是上面ping方式)
String query = validateQuery;
if (validateQuery == null || validateQuery.isEmpty()) {
query = DEFAULT_VALIDATION_QUERY;
}

Statement stmt = null;
ResultSet rs = null;
try {
stmt = conn.createStatement();
if (validationQueryTimeout > 0) {
stmt.setQueryTimeout(validationQueryTimeout);
}
rs = stmt.executeQuery(query);
return true;
} finally {
JdbcUtils.close(rs);
JdbcUtils.close(stmt);
}

}

这个方法会利用主流程2init阶段)里初始化好的checker对象(流程参考init-checker)里的isValidConnection方法,如果启用ping,则该方法会利用invoke触发驱动程序里的ping方法,如果不启用ping,就采用SELECT 1方式(从init-checker里可以看出启不启用取决于加载到的驱动程序里是否存在相应的方法)。

六、流程1.4:抛弃连接

流程1.4

上述流程对应源代码如下:

代码段1-5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//丢弃连接
public void discardConnection(Connection realConnection) {
JdbcUtils.close(realConnection); //close掉真正的连接对象,一般调用该方法传入的connection对象都是最原始的驱动连接对象,所以这里并不会触发recycle

lock.lock();
try {
activeCount--; //活跃连接数-1
discardCount++; //丢弃连接数+1

if (activeCount <= minIdle) {
emptySignal(); //唤起一次主流程3新增连接
}
} finally {
lock.unlock();
}
}

经过流程1.3返回的测试结果,如果发现连接不可用,则直接触发抛弃连接逻辑,这个过程非常简单,如上图所示,由流程1.2获取到该连接时累加上去的activeCount,在本流程里会再次减一,表示被取出来的连接不可用,并不能active状态。其次这里的close是拿着驱动那个连接对象进行close,正常情况下一个连接对象会被druid封装成DruidPooledConnection对象,内部持有的conn就是真正的驱动Connection对象,上图中的关闭连接就是获取的该对象进行close,如果使用包装类DruidPooledConnection进行close,则代表回收连接对象(recycle,参考主流程5)。

七、主流程3:添加连接的守护线程

主流程3

上述流程对应源代码如下:

代码段3-1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
//DruidDataSource的内部类,对应主流程3,用来补充连接
public class CreateConnectionThread extends Thread {

public CreateConnectionThread(String name){
super(name); //重置线程名称
this.setDaemon(true); //标记为守护线程
}

//run方法
public void run() {
initedLatch.countDown(); //通知init(主流程2)自己已经启动成功

long lastDiscardCount = 0;
int errorCount = 0;
for (;;) { //死循环
// addLast
try {
lock.lockInterruptibly(); //锁获取
} catch (InterruptedException e2) {
break;
}

long discardCount = DruidDataSource.this.discardCount;
//当前丢弃连接数与最后一次丢弃连接数的差值大于0,说明又发生了丢弃连接的现象,该条件会促进连接的创建
boolean discardChanged = discardCount - lastDiscardCount > 0;
lastDiscardCount = discardCount;

try {
boolean emptyWait = true;

if (createError != null
&& poolingCount == 0
&& !discardChanged) {
emptyWait = false;
}

if (emptyWait
&& asyncInit && createCount < initialSize) {
emptyWait = false;
}

if (emptyWait) {
// 必须存在线程等待,才创建连接,否则不创建
if (poolingCount >= notEmptyWaitThreadCount
&& (!(keepAlive && activeCount + poolingCount < minIdle))
&& !isFailContinuous()
) {
empty.await(); //不需要创建连接时,阻塞(挂起)
}

// 防止创建超过maxActive数量的连接
if (activeCount + poolingCount >= maxActive) {
empty.await(); //超出限制依然挂起,不再新增连接
continue;
}
}

} catch (InterruptedException e) {
lastCreateError = e;
lastErrorTimeMillis = System.currentTimeMillis();

if (!closing) {
LOG.error("create connection Thread Interrupted, url: " + jdbcUrl, e);
}
break;
} finally {
lock.unlock(); //锁释放
}

//从上面的程序走到这里,说明该线程被成功唤起,则进行新建连接
PhysicalConnectionInfo connection = null;

try {
connection = createPhysicalConnection(); //利用驱动程序新建物理连接
} catch (SQLException e) {
LOG.error("create connection SQLException, url: " + jdbcUrl + ", errorCode " + e.getErrorCode()
+ ", state " + e.getSQLState(), e);

errorCount++;
if (errorCount > connectionErrorRetryAttempts && timeBetweenConnectErrorMillis > 0) {
// fail over retry attempts
setFailContinuous(true);
if (failFast) {
lock.lock();
try {
notEmpty.signalAll();
} finally {
lock.unlock();
}
}

if (breakAfterAcquireFailure) {
break;
}

try {
Thread.sleep(timeBetweenConnectErrorMillis);
} catch (InterruptedException interruptEx) {
break;
}
}
} catch (RuntimeException e) {
LOG.error("create connection RuntimeException", e);
setFailContinuous(true);
continue;
} catch (Error e) {
LOG.error("create connection Error", e);
setFailContinuous(true);
break;
}

if (connection == null) {
continue; //新建失败后再次尝试
}

boolean result = put(connection); //尝试放入池子
if (!result) {
JdbcUtils.close(connection.getPhysicalConnection());
LOG.info("put physical connection to pool failed.");
}

errorCount = 0; // reset errorCount
}
}
}



//这一个put方法是上面触发接收PhysicalConnectionInfo类型连接用的,之前说过,最终保存在池子里的连接对象都是DruidConnectionHolder类型,所以这里时进行一次包装,然后真正put进去的是更下面的put方法
protected boolean put(PhysicalConnectionInfo physicalConnectionInfo) {
DruidConnectionHolder holder = null;
try {
//包装成holder类型
holder = new DruidConnectionHolder(DruidDataSource.this, physicalConnectionInfo);
} catch (SQLException ex) {
lock.lock();
try {
if (createScheduler != null) {
createTaskCount--;
}
} finally {
lock.unlock();
}
LOG.error("create connection holder error", ex);
return false;
}

return put(holder); //真正放入池子
}


//真正将连接对象放入池子
private boolean put(DruidConnectionHolder holder) {
lock.lock();
try {
if (poolingCount >= maxActive) {
return false; //如果此时发现当前池子里的闲置连接数已经超过了maxActive,那么就不再往里面加了
}
connections[poolingCount] = holder; //加在数组尾部
incrementPoolingCount(); //poolingCount++

if (poolingCount > poolingPeak) {
poolingPeak = poolingCount;
poolingPeakTime = System.currentTimeMillis();
}

notEmpty.signal(); //唤起一个因为拿不到连接对象而发生阻塞的业务线程,让其再次进入运行状态,进行获取连接竞争
notEmptySignalCount++;

if (createScheduler != null) { //模式未启用
createTaskCount--;

if (poolingCount + createTaskCount < notEmptyWaitThreadCount //
&& activeCount + poolingCount + createTaskCount < maxActive) {
emptySignal();
}
}
} finally {
lock.unlock();
}
return true;
}

主流程2init初始化阶段)时就开启了该流程,该流程独立运行,大部分时间处于等待状态,不会抢占cpu,但是当连接不够用时,就会被唤起追加连接,成功创建连接后将会唤醒其他正在等待获取可用连接的线程,比如:

结合流程1.2来看,当连接不够用时,会通过empty.signal唤醒该线程进行补充连接(阻塞在empty上的线程只有主流程3的单线程),然后通过notEmpty阻塞自己,当该线程补充连接成功后,又会对阻塞在notEmpty上的线程进行唤醒,让其进入锁竞争状态,简单理解就是一个生产-消费模型。这里有一些细节,比如池子里的连接使用中(activeCount)加上池子里剩余连接数(poolingCount)就是指当前一共生成了多少个连接,这个数不能比maxActive还大,如果比maxActive还大,则再次陷入等待。而在往池子里put连接时,则判断poolingCount是否大于maxActive来决定最终是否入池。

八、主流程4:抛弃连接的守护线程

主流程4

上述流程对应源代码如下:

代码段4-1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
//连接池瘦身,参考主流程4
public class DestroyConnectionThread extends Thread {

public DestroyConnectionThread(String name){
super(name); //给线程重命名
this.setDaemon(true); //标记为守护线程
}

//run方法
public void run() {
initedLatch.countDown(); //通知init(主流程2)自己已经启动成功

for (;;) { //死循环
// 从前面开始删除
try {
if (closed) {
break;
}

if (timeBetweenEvictionRunsMillis > 0) { //检查时间间隔,不启用(小于等于0时)则默认1s,事实上,druid对于该参数的缺省值是60s
Thread.sleep(timeBetweenEvictionRunsMillis);
} else {
Thread.sleep(1000); //默认1s
}

if (Thread.interrupted()) {
break;
}

destroyTask.run(); //启动destroy的run方法(在下方)
} catch (InterruptedException e) {
break;
}
}
}

}

//DruidDataSource内部类
public class DestroyTask implements Runnable {
public DestroyTask() {

}

@Override
public void run() {
shrink(true, keepAlive); //连接池的检查&瘦身

if (isRemoveAbandoned()) { //如果开启该属性,则进行强制回收检查
removeAbandoned();
}
}

}

流程4.1:连接池瘦身,检查连接是否可用以及丢弃多余连接

整个过程如下:

流程4.1

上述流程对应源代码如下:

代码段-4-2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
//连接池瘦身
public void shrink(boolean checkTime, boolean keepAlive) {
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
return;
}

int evictCount = 0;
int keepAliveCount = 0;
try {
if (!inited) {
return;
}

final int checkCount = poolingCount - minIdle; //根据poolingCount和minIdle计算出evictCheck的范围
final long currentTimeMillis = System.currentTimeMillis();
for (int i = 0; i < poolingCount; ++i) { //开始遍历连接池里闲置的连接
DruidConnectionHolder connection = connections[i];

if (checkTime) { //除非手动调用,不然经过主流程4触发,一般为true
if (phyTimeoutMillis > 0) { //默认不启用,忽略
long phyConnectTimeMillis = currentTimeMillis - connection.connectTimeMillis;
if (phyConnectTimeMillis > phyTimeoutMillis) {
evictConnections[evictCount++] = connection;
continue;
}
}

//计算闲置时间
long idleMillis = currentTimeMillis - connection.lastActiveTimeMillis;

if (idleMillis < minEvictableIdleTimeMillis
&& idleMillis < keepAliveBetweenTimeMillis
) { //如果闲置时间达不到检测&瘦身的阈值,则不处理
break;
}

if (idleMillis >= minEvictableIdleTimeMillis) {
if (checkTime && i < checkCount) { //达到需要丢弃的阈值时,则判断连接下标是否在evictCheck范围,若在,则视为“可以丢弃的对象”放入evictConnections数组
evictConnections[evictCount++] = connection;
continue;
} else if (idleMillis > maxEvictableIdleTimeMillis) { //达到必须要丢弃的阈值时,则不管是不是在evictCheck范围内,都直接放入“可以丢弃的对象”的evictConnections数组
evictConnections[evictCount++] = connection;
continue;
}
}

//如果上面的条件均没有命中,如果keepAlive为true,则判断是不是超过了闲置连接检查其活性的频次阈值(即由keepAliveBetweenTimeMillis控制)
if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
keepAliveConnections[keepAliveCount++] = connection; //满足条件则视为“需要检测活性的对象”,放入keepAliveConnections数组
}
} else {
if (i < checkCount) {
evictConnections[evictCount++] = connection;
} else {
break;
}
}
}

int removeCount = evictCount + keepAliveCount; //这一批需要移除特殊处理的连接总数
if (removeCount > 0) {
System.arraycopy(connections, removeCount, connections, 0, poolingCount - removeCount); //根据当前移除的元素,把剩余的元素移动至数组首部(参考流程4.1)
Arrays.fill(connections, poolingCount - removeCount, poolingCount, null); //剩余位置清空
poolingCount -= removeCount;
}
keepAliveCheckCount += keepAliveCount;
} finally {
lock.unlock();
}

if (evictCount > 0) { //如果需要丢弃的连接数量大于0
for (int i = 0; i < evictCount; ++i) {
DruidConnectionHolder item = evictConnections[i];
Connection connection = item.getConnection();
JdbcUtils.close(connection); //直接关闭连接(这里是直接关闭驱动连接,不再放回池子)
destroyCountUpdater.incrementAndGet(this);
}
Arrays.fill(evictConnections, null); //将evictConnections数组重新置空(方便下次使用)
}

if (keepAliveCount > 0) { //检测那些需要判活的连接数
// keep order
for (int i = keepAliveCount - 1; i >= 0; --i) {
DruidConnectionHolder holer = keepAliveConnections[i];
Connection connection = holer.getConnection();
holer.incrementKeepAliveCheckCount();

boolean validate = false;
try {
this.validateConnection(connection); //检测其活性
validate = true;
} catch (Throwable error) {
if (LOG.isDebugEnabled()) {
LOG.debug("keepAliveErr", error);
}
// skip
}

boolean discard = !validate;
if (validate) { //检测通过
holer.lastKeepTimeMillis = System.currentTimeMillis();
boolean putOk = put(holer); //检测通过后,再次放入池子
if (!putOk) { //放不进去池子(说明已经达到连接池最大连接数阈值maxActive),则视为可以“直接抛弃”的连接
discard = true;
}
}

if (discard) {
try {
connection.close(); //如果可以抛弃,则直接关闭连接(直接调用驱动的close)
} catch (Exception e) {
// skip
}

lock.lock();
try {
discardCount++; //抛弃连接数累加

if (activeCount <= minIdle) {
emptySignal(); //唤起主流程3追加连接对象
}
} finally {
lock.unlock();
}
}
}
this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
Arrays.fill(keepAliveConnections, null); //将keepAliveConnections数组重新置空(方便下次使用)
}
}

//上面检测通过,再次通过该方法重新把连接放入池子
private boolean put(DruidConnectionHolder holder) {
lock.lock();
try {
if (poolingCount >= maxActive) {
return false; //若池子内闲置连接数超过maxActive,则无法继续添加新的连接进来,返回false
}
connections[poolingCount] = holder; //否则直接把此连接对象放入连接池队尾
incrementPoolingCount(); //poolingCount++

if (poolingCount > poolingPeak) {
poolingPeak = poolingCount;
poolingPeakTime = System.currentTimeMillis();
}

notEmpty.signal(); //唤起那些因获取不到可用连接而陷入阻塞状态的业务线程一次
notEmptySignalCount++;

if (createScheduler != null) { //不启用该模式,忽略
createTaskCount--;

if (poolingCount + createTaskCount < notEmptyWaitThreadCount //
&& activeCount + poolingCount + createTaskCount < maxActive) {
emptySignal();
}
}
} finally {
lock.unlock();
}
return true;
}

整个流程分成图中主要的几步,首先利用poolingCount减去minIdle计算出需要做丢弃检查的连接对象区间,意味着这个区间的对象有被丢弃的可能,具体要不要放进丢弃队列evictConnections,要判断两个属性:

minEvictableIdleTimeMillis:最小检查间隙,缺省值30min,官方解释:一个连接在池中最小生存的时间(结合检查区间来看,闲置时间超过这个时间,才会被丢弃)。

maxEvictableIdleTimeMillis:最大检查间隙,缺省值7h,官方解释:一个连接在池中最大生存的时间(无视检查区间,只要闲置时间超过这个时间,就一定会被丢弃)。

如果当前连接对象闲置时间超过minEvictableIdleTimeMillis且下标在evictCheck区间内,则加入丢弃队列evictConnections,如果闲置时间超过maxEvictableIdleTimeMillis,则直接放入evictConnections(一般情况下会命中第一个判断条件,除非一个连接不在检查区间,且闲置时间超过maxEvictableIdleTimeMillis)。

如果连接对象不在evictCheck区间内,且keepAlive属性为true,则判断该对象闲置时间是否超出keepAliveBetweenTimeMillis(缺省值60s),若超出,则意味着该连接需要进行连接可用性检查,则将该对象放入keepAliveConnections队列。

两个队列赋值完成后,则池子会进行一次压缩,没有涉及到的连接对象会被压缩到队首。

然后就是处理evictConnectionskeepAliveConnections两个队列了,evictConnections里的对象会被close最后释放掉,keepAliveConnections里面的对象将会其进行检测(流程参考流程1.3isValidConnection),碰到不可用的连接会调用discard流程1.4)抛弃掉,可用的连接会再次被放进连接池。

整个流程可以看出,连接闲置后,也并非一下子就减少到minIdle的,如果之前产生一堆的连接(不超过maxActive),突然闲置了下来,则至少需要花minEvictableIdleTimeMillis的时间才可以被移出连接池,如果一个连接闲置时间超过maxEvictableIdleTimeMillis则必定被回收,所以极端情况下(比如一个连接池从初始化后就没有再被使用过),连接池里并不会一直保持minIdle个连接,而是一个都没有,生产环境下这是非常不常见的,默认的maxEvictableIdleTimeMillis都有7h除非是极度冷门的系统才会出现这种情况,而开启keepAlive也不会推翻这个规则,keepAlive的优先级是低于maxEvictableIdleTimeMillis的,keepAlive只是保证了那些检查中不需要被移出连接池的连接在指定检测时间内去检测其连接活性,从而决定是否放入池子或者直接discard

流程4.2:主动回收连接,防止内存泄漏

过程如下:

流程4.2

上述流程对应源代码如下:

代码段4-3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
//回收长期未归还的连接(再次说明:该方法仅在removeAbandoned设置为true的情况下触发)
public int removeAbandoned() {
int removeCount = 0;

long currrentNanos = System.nanoTime();

//这个列表用于存放满足条件的真正需要强制回收的连接
List abandonedList = new ArrayList();

activeConnectionLock.lock();
try {
//在removeAbandoned设置为true的情况下,所有被借出去的连接,都会被保存进activeConnections(参考主流程1),所以要进行“长期未归还”的检查,就是从activeConnections开始的
Iterator iter = activeConnections.keySet().iterator();

for (; iter.hasNext();) {
DruidPooledConnection pooledConnection = iter.next();

if (pooledConnection.isRunning()) {
continue; //如果当前连接正在使用中(指的是正在execute),则不处理
}

//利用当前时间和连接被借出去时的时间,计算出连接被借出去的时间有多久
long timeMillis = (currrentNanos - pooledConnection.getConnectedTimeNano()) / (1000 * 1000);

if (timeMillis >= removeAbandonedTimeoutMillis) { //如果连接被借出去的时间超过removeAbandonedTimeoutMillis这个阈值,将会命中“主动归还”的逻辑检查
iter.remove(); //先从activeConnections移除
pooledConnection.setTraceEnable(false); //标记为false,防止回收时重复removeactiveConnections,可以参考主流程5
abandonedList.add(pooledConnection); //放入“强制回收”队列
}
}
} finally {
activeConnectionLock.unlock();
}

if (abandonedList.size() > 0) { //如果“强制回收”队列大于0,说明有需要回收的连接
for (DruidPooledConnection pooledConnection : abandonedList) { //循环这些连接
final ReentrantLock lock = pooledConnection.lock;
lock.lock(); //拿到连接的锁
try {
if (pooledConnection.isDisable()) {
continue; //已经被回收的,则不管
}
} finally {
lock.unlock();
}

//触发回收连接对象(pooledConnection)里的holcder(注意这里其实是把pooledConnection对象里的holder给回收至连接池了,pooledConnection对象本身会被销毁)
JdbcUtils.close(pooledConnection); //这里触发的close,是DruidPooledConnection的close,也就是会触发recycle方法的close
pooledConnection.abandond(); //标记为
removeAbandonedCount++;
removeCount++;

if (isLogAbandoned()) { //日志打印,忽略
StringBuilder buf = new StringBuilder();
buf.append("abandon connection, owner thread: ");
buf.append(pooledConnection.getOwnerThread().getName());
buf.append(", connected at : ");
buf.append(pooledConnection.getConnectedTimeMillis());
buf.append(", open stackTrace\n");

StackTraceElement[] trace = pooledConnection.getConnectStackTrace();
for (int i = 0; i < trace.length; i++) {
buf.append("\tat ");
buf.append(trace[i].toString());
buf.append("\n");
}

buf.append("ownerThread current state is " + pooledConnection.getOwnerThread().getState()
+ ", current stackTrace\n");
trace = pooledConnection.getOwnerThread().getStackTrace();
for (int i = 0; i < trace.length; i++) {
buf.append("\tat ");
buf.append(trace[i].toString());
buf.append("\n");
}

LOG.error(buf.toString());
}
}
}

return removeCount; //返回本次被强制回收的连接个数
}

这个流程在removeAbandoned设置为true的情况下才会触发,用于回收那些拿出去的使用长期未归还(归还:调用close方法触发主流程5)的连接。

先来看看activeConnections是什么,activeConnections用来保存当前从池子里被借出去的连接,这个可以通过主流程1看出来,每次调用getConnection时,如果开启removeAbandoned,则会把连接对象放到activeConnections,然后如果长期不调用close,那么这个被借出去的连接将永远无法被重新放回池子,这是一件很麻烦的事情,这将存在内存泄漏的风险,因为不close,意味着池子会不断产生新的连接放进connections,不符合连接池预期(连接池出发点是尽可能少的创建连接),然后之前被借出去的连接对象还有一直无法被回收的风险,存在内存泄漏的风险,因此为了解决这个问题,就有了这个流程,流程整体很简单,就是将现在借出去还没有归还的连接,做一次判断,符合条件的将会被放进abandonedList进行连接回收(这个list里的连接对象里的abandoned将会被置为true,标记已被该流程处理过,防止主流程5再次处理,具体可以参考代码段5-1)。

这个如果在实践中能保证每次都可以正常close,完全不用设置removeAbandoned=true,目前如果使用了类似mybatisspring等开源框架,框架内部是一定会close的,所以此项是不建议设置的,视情况而定。

九、主流程5:回收连接

这个流程通常是靠连接包装类DruidPooledConnectionclose方法触发的,目标方法为recycle,流程图如下:

主流程5

上述流程对应源代码如下:

代码段5-1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
//DruidPooledConnection类的close方法
@Override
public void close() throws SQLException {
if (this.disable) { //检查,因为该连接对象是抛出去给别的业务线程使用,也就是说并不受连接池本身管控,所以很可能存在多线程同时close的操作,因此这里需要做一层检查,包括下方的syncClose里的检查也是一个意思
return;
}

DruidConnectionHolder holder = this.holder; //拿到对应的holder对象(之前说过,这个对象才是最后放进连接池的对象)
if (holder == null) {
if (dupCloseLogEnable) {
LOG.error("dup close");
}
return;
}

DruidAbstractDataSource dataSource = holder.getDataSource(); //拿到对应的连接池对象
boolean isSameThread = this.getOwnerThread() == Thread.currentThread();

if (!isSameThread) { //关闭该连接与获取该连接的线程并非同一个的时候,则触发下面的syncClose
dataSource.setAsyncCloseConnectionEnable(true);
}

if (dataSource.isAsyncCloseConnectionEnable()) {
syncClose(); //参考上面的解释,该方法详情在下方
return;
}

//一些事件监听器的触发,忽略
for (ConnectionEventListener listener : holder.getConnectionEventListeners()) {
listener.connectionClosed(new ConnectionEvent(this));
}


//责任链的执行,参考流程1.1与代码段1-2,运行方式是一样的,找到映射方法,整个触发一遍责任链上的filters
List filters = dataSource.getProxyFilters();
if (filters.size() > 0) {
FilterChainImpl filterChain = new FilterChainImpl(dataSource);
filterChain.dataSource_recycle(this);
} else {
recycle(); //触发目标方法recycle
}

this.disable = true; //标记该连接已失效,无法再次提供服务
}



//上面逻辑走syncClose的情况,该方法与上面大体相同,但由于不是同一个线程做的操作,所以这里需要锁控制
public void syncClose() throws SQLException {
lock.lock(); //获取锁,这个锁是当前连接对象上的锁,为了解决同一个连接对象在不同的线程里被同时close多次而造成的线程安全问题
try {
if (this.disable) {
return;
}

DruidConnectionHolder holder = this.holder; //同样的,拿到需要归还的holder对象
if (holder == null) {
if (dupCloseLogEnable) {
LOG.error("dup close");
}
return;
}

//同样是一些事件监听器的触发,忽略
for (ConnectionEventListener listener : holder.getConnectionEventListeners()) {
listener.connectionClosed(new ConnectionEvent(this));
}

//同样的责任链的执行,参考上面的解释
DruidAbstractDataSource dataSource = holder.getDataSource();
List filters = dataSource.getProxyFilters();
if (filters.size() > 0) {
FilterChainImpl filterChain = new FilterChainImpl(dataSource);
filterChain.dataSource_recycle(this);
} else {
recycle(); //触发目标方法recycle,方法详情在下方
}

this.disable = true; //标记该连接已失效,无法再次提供服务
} finally {
lock.unlock(); //解锁
}
}


//DruidPooledConnection类的recycle方法,由上面俩方法直接触发
public void recycle() throws SQLException {
if (this.disable) {
return;
}

DruidConnectionHolder holder = this.holder; //拿到真正需要归还的连接对象
if (holder == null) {
if (dupCloseLogEnable) {
LOG.error("dup close");
}
return;
}

if (!this.abandoned) { //如果期间已经被流程4.2处理过了(abandoned==true),则不触发下方逻辑
DruidAbstractDataSource dataSource = holder.getDataSource();
dataSource.recycle(this); //真正触发连接池的回收方法,方法详情在下方
}

//连接对象一旦被回收处理,则会把所有与连接相关的属性置空(不持有),closed标记为true
this.holder = null;
conn = null;
transactionInfo = null;
closed = true;
}

//DruidDataSource类里的recycle方法,真正回收连接的方法,由上面DruidPooledConnection类的recycle触发
protected void recycle(DruidPooledConnection pooledConnection) throws SQLException {
final DruidConnectionHolder holder = pooledConnection.holder;

if (holder == null) {
LOG.warn("connectionHolder is null");
return;
}

if (logDifferentThread //
&& (!isAsyncCloseConnectionEnable()) //
&& pooledConnection.ownerThread != Thread.currentThread()//
) {
LOG.warn("get/close not same thread");
}

final Connection physicalConnection = holder.conn; //拿到真正的驱动连接对象

if (pooledConnection.traceEnable) { //如果traceEnable为true(满足该属性为true,必须要removeAbandoned设置为true,这样在主流程1那里才会被放进activeConnections,才会置为true),流程4.2处理过后,会把该属性重新置为false,其他情况均为true
Object oldInfo = null;
activeConnectionLock.lock();
try {
if (pooledConnection.traceEnable) { //双重检查
oldInfo = activeConnections.remove(pooledConnection); //从activeConnections移除,防止流程4.2的重复检查
pooledConnection.traceEnable = false; //置为false
}
} finally {
activeConnectionLock.unlock();
}
if (oldInfo == null) {
if (LOG.isWarnEnabled()) {
LOG.warn("remove abandonded failed. activeConnections.size " + activeConnections.size());
}
}
}

final boolean isAutoCommit = holder.underlyingAutoCommit;
final boolean isReadOnly = holder.underlyingReadOnly;
final boolean testOnReturn = this.testOnReturn;

try {
// 如果在归还至连接池时发现此连接对象还有未处理完的事务,则直接回滚
if ((!isAutoCommit) && (!isReadOnly)) {
pooledConnection.rollback();
}

// reset holder, restore default settings, clear warnings
boolean isSameThread = pooledConnection.ownerThread == Thread.currentThread();
if (!isSameThread) { //同样判断线程,为了保证安全性
final ReentrantLock lock = pooledConnection.lock;
lock.lock();
try {
holder.reset(); //连接被借出去后,可能被业务方改动了一些属性(典型的比如autoCommit),现在利用reset方法还原为默认值
} finally {
lock.unlock();
}
} else {
holder.reset(); //同上,这里认为获取和关闭连接的是同一个线程,不存在线程安全问题,因此不用去竞争锁
}

//连接已被抛弃,则不作任何处理(不再归还)
if (holder.discard) {
return;
}

//忽略
if (phyMaxUseCount > 0 && holder.useCount >= phyMaxUseCount) {
discardConnection(holder.conn);
return;
}

//如果驱动连接本身被人为关闭了,除一些监控值之外,也不做处理
if (physicalConnection.isClosed()) {
lock.lock();
try {
activeCount--;
closeCount++;
} finally {
lock.unlock();
}
return;
}

//参考testOnBorrow,这里testOnReturn就是指每次回收连接都要做连接可用性检查,同样官方不建议开启,影响性能,缺省值也是不开启的
if (testOnReturn) {
//流程忽略
boolean validate = testConnectionInternal(holder, physicalConnection);
if (!validate) {
JdbcUtils.close(physicalConnection);

destroyCountUpdater.incrementAndGet(this);

lock.lock();
try {
activeCount--;
closeCount++;
} finally {
lock.unlock();
}
return;
}
}

if (!enable) {
//中途发现连接又被置为不可用,则直接触发抛弃方法,参考流程1.4和代码段1-5
discardConnection(holder.conn);
return;
}

boolean result;
final long currentTimeMillis = System.currentTimeMillis();

if (phyTimeoutMillis > 0) {
long phyConnectTimeMillis = currentTimeMillis - holder.connectTimeMillis;
if (phyConnectTimeMillis > phyTimeoutMillis) {
discardConnection(holder.conn);
return;
}
}

lock.lock();
try {
activeCount--;
closeCount++;

//最终放入池子,方法详情在下方
result = putLast(holder, currentTimeMillis);
recycleCount++;
} finally {
lock.unlock();
}

if (!result) { //如果加不进去,则直接关闭驱动连接,然后不处理(此时holder已经失去强引用,不久便会被回收)
JdbcUtils.close(holder.conn);
LOG.info("connection recyle failed.");
}
} catch (Throwable e) {
holder.clearStatementCache();

if (!holder.discard) {
this.discardConnection(physicalConnection);
holder.discard = true;
}

LOG.error("recyle error", e);
recycleErrorCountUpdater.incrementAndGet(this);
}
}

//DruidDataSource类里的putLast方法,由上方的recycle方法触发
boolean putLast(DruidConnectionHolder e, long lastActiveTimeMillis) {
if (poolingCount >= maxActive) { //池子已满,不加
return false;
}

e.lastActiveTimeMillis = lastActiveTimeMillis; //刷新上次活跃时间,该时间很重要,直接影响连接检查的触发
connections[poolingCount] = e; //放进连接池数组尾部
incrementPoolingCount(); //poolingCount++

if (poolingCount > poolingPeak) {
poolingPeak = poolingCount;
poolingPeakTime = lastActiveTimeMillis;
}

notEmpty.signal(); //因为成功回收了一个连接,那就唤起一次所有因为获取不到连接而被阻塞的业务线程吧~(参考流程1.2)
notEmptySignalCount++;

return true;
}

这也是非常重要的一个流程,连接用完要归还,就是利用该流程完成归还的动作,利用druid对外包装的Connecion包装类DruidPooledConnectionclose方法触发,该方法会通过自己内部的close或者syncClose方法来间接触发dataSource对象的recycle方法,从而达到回收的目的。

最终的recycle方法:

  1. 如果removeAbandoned被设置为true,则通过traceEnable判断是否需要从activeConnections移除该连接对象,防止流程4.2再次检测到该连接对象,当然如果是流程4.2主动触发的该流程,那么意味着流程4.2里已经remove过该对象了,traceEnable会被置为false,本流程就不再触发remove了(这个流程都是在removeAbandoned=true的情况下进行的,在主流程1里连接被放进activeConnectionstraceEnable被置为true,而在removeAbandoned=false的情况下traceEnable恒等于false)。

  2. 如果回收过程中发现存在有未处理完的事务,则触发回滚(比较有可能触发这一条的是流程4.2里强制归还连接,也有可能是单纯使用连接,开启事务却没有提交事务就直接close的情况),然后利用holder.reset进行恢复连接对象里一些属性的默认值,除此之外,holder对象还会把由它产生的statement对象放到自己的一个arraylist里面,reset方法会循环着关闭内部未关闭的statement对象,最后清空list,当然,statement对象自己也会记录下其产生的所有的resultSet对象,然后关闭statement时同样也会循环关闭内部未关闭的resultSet对象,这是连接池做的一种保护措施,防止用户拿着连接对象做完一些操作没有对打开的资源关闭。

  3. 判断是否开启testOnReturn,这个跟testOnBorrow一样,官方默认不开启,也不建议开启,影响性能,理由参考主流程1里针对testOnBorrow的解释。

  4. 直接放回池子(当前connections的尾部),然后需要注意的是putLast方法和put方法的不同之处,putLast会把lastActiveTimeMillis置为当前时间,也就是说不管一个连接被借出去过久,只要归还了,最后活跃时间就是当前时间,这就会有造成某种特殊异常情况的发生(非常极端,几乎不会触发,可以选择不看):

    如果不开启testOnBorrow和testOnReturn,并且keepAlive设置为false,那么长连接可用测试的间隔依据就是利用当前时间减去上次活跃时间(lastActiveTimeMillis)得出闲置时间,然后再利用闲置时间跟timeBetweenEvictionRunsMillis(默认60s)进行对比,超过才进行长连接可用测试。

    那么如果一个mysql服务端的长连接保活时间被人为调整为60s,然后timeBetweenEvictionRunsMillis被设置为59s,这个设置是非常合理的,保证了测试间隔小于长连接实际保活时间,然后如果这时一个连接被拿出去后一直过了61s才被close回收,该连接对象的lastActiveTimeMillis被刷为当前时间,如果在59s内再次拿到该连接对象,就会绕过连接检查直接报连接不可用的错误。

十、尾声

到这里针对druid连接池的初始化以及其内部一个连接从生产消亡的整个流程就已经整理完了,主要是列出其运行流程以及一些主要的监控数据都是如何产生的,没有涉及到的是一个sql的执行,因为这个基本上就跟使用原生驱动程序差不多,只是druid又包装了一层Statement等,用于完成一些自己的操作。

对于druid,处理连接只是很小的一块内容,却是很核心的一块内容。

Druid地址:https://github.com/alibaba/druid