Druid核心源码解析DruidDataSource
配置读取
druid连接池支持的所有连接参数可在类com.alibaba.druid.pool.DruidDataSourceFactory
中查看。
配置读取代码:
- public void configFromPropety(Properties properties) {
- //这方法太长,自己看源码去吧,就是读读属性。。。。
- }
整体代码比较简单,就是把配置内容,读取到dataSource。
连接池初始化
首先是简单的判断,加锁:
- if (inited) {
- //已经被初始化好了,直接return
- return;
- }
- // bug fixed for dead lock, for issue #2980
- DruidDriver.getInstance();
- /**控制创建移除连接的锁,并且通过条件去控制一个连接的生成消费**/
- // public DruidAbstractDataSource(boolean lockFair){
- // lock = new ReentrantLock(lockFair);
- //
- // notEmpty = lock.newCondition();
- // empty = lock.newCondition();
- // }
- final ReentrantLock lock = this.lock;
- try {
- lock.lockInterruptibly();
- } catch (InterruptedException e) {
- throw new SQLException(“interrupt”, e);
- }
之后会更新一些JMX的监控指标:
- //一些jmx监控指标
- this.connectionIdSeedUpdater.addAndGet(this, delta);
- this.statementIdSeedUpdater.addAndGet(this, delta);
- this.resultSetIdSeedUpdater.addAndGet(this, delta);
- this.transactionIdSeedUpdater.addAndGet(this, delta);
druid的监控指标都是通过jmx实现的。
解析连接串:
- if (this.jdbcUrl != null) {
- //解析连接串
- this.jdbcUrl = this.jdbcUrl.trim();
- initFromWrapDriverUrl();
- }
initFromWrapDriverUrl
方法,除了从jdbc url中解析出连接和驱动信息,后面还把filters的名字,解析成了对应的filter类。
- private void initFromWrapDriverUrl() throws SQLException {
- if (!jdbcUrl.startsWith(DruidDriver.DEFAULT_PREFIX)) {
- return;
- }
- DataSourceProxyConfig config = DruidDriver.parseConfig(jdbcUrl, null);
- this.driverClass = config.getRawDriverClassName();
- LOG.error(“error url : ‘” + jdbcUrl + “‘, it should be : ‘” + config.getRawUrl() + “‘”);
- this.jdbcUrl = config.getRawUrl();
- if (this.name == null) {
- this.name = config.getName();
- }
- for (Filter filter : config.getFilters()) {
- addFilter(filter);
- }
- }
之后在init方法里面,会进行filters的初始化:
- //初始化filter 属性
- for (Filter filter : filters) {
- filter.init(this);
- }
之后解析数据库类型:
- if (this.dbTypeName == null || this.dbTypeName.length() == 0) {
- this.dbTypeName = JdbcUtils.getDbType(jdbcUrl, null);
- }
注意枚举值: com.alibaba.druid.DbType
,这个里面包含了目前durid连接池支持的所有数据源 类型,另外,druid还额外提供了一些驱动类,例如:
- elastic_search (1 << 25), // com.alibaba.xdriver.elastic.jdbc.ElasticDriver
clickhouse还提供了负载均衡的驱动类:
com.alibaba.druid.support.clickhouse.BalancedClickhouseDriver
。
在回到init方法,之后是一堆参数解析,不再说,跳过了。 之后是通过SPI加载自定义的filter:
- private void initFromSPIServiceLoader() {
- if (loadSpifilterSkip) {
- return;
- }
- if (autoFilters == null) {
- List<Filter> filters = new ArrayList<Filter>();
- ServiceLoader<Filter> autoFilterLoader = ServiceLoader.load(Filter.class);
- for (Filter filter : autoFilterLoader) {
- AutoLoad autoLoad = filter.getClass().getAnnotation(AutoLoad.class);
- if (autoLoad != null && autoLoad.value()) {
- filters.add(filter);
- }
- }
- autoFilters = filters;
- }
- for (Filter filter : autoFilters) {
- if (LOG.isInfoEnabled()) {
- LOG.info(“load filter from spi :” + filter.getClass().getName());
- }
- addFilter(filter);
- }
- }
注意自定义的filter,要使用com.alibaba.druid.filter.AutoLoad
。
解析驱动:
- protected void resolveDriver() throws SQLException {
- if (this.driver == null) {
- if (this.driverClass == null || this.driverClass.isEmpty()) {
- this.driverClass = JdbcUtils.getDriverClassName(this.jdbcUrl);
- }
- if (MockDriver.class.getName().equals(driverClass)) {
- driver = MockDriver.instance;
- } else if (“com.alibaba.druid.support.clickhouse.BalancedClickhouseDriver”.equals(driverClass)) {
- Properties info = new Properties();
- info.put(“user”, username);
- info.put(“password”, password);
- info.putAll(connectProperties);
- driver = new BalancedClickhouseDriver(jdbcUrl, info);
- } else {
- if (jdbcUrl == null && (driverClass == null || driverClass.length() == 0)) {
- throw new SQLException(“url not set”);
- }
- driver = JdbcUtils.createDriver(driverClassLoader, driverClass);
- }
- } else {
- if (this.driverClass == null) {
- this.driverClass = driver.getClass().getName();
- }
- }
- }
其中durid自己的mock驱动和clickhouse的负载均衡的驱动,特殊判断了下,其他走的都是class forname.
之后是exception sorter和checker的一些东西,跟主线剧情关系不大,skip.
之后是一些初始化JdbcDataSourceStat
,没啥东西。
之后是核心:
- connections = new DruidConnectionHolder[maxActive]; //连接数组
- evictConnections = new DruidConnectionHolder[maxActive]; //销毁的连接数组
- keepAliveConnections = new DruidConnectionHolder[maxActive]; //保持活跃可用的数组
dataSource的连接,都被包装在类DruidConnectionHolder
中,之后是一个同步去初始化连接还是异步去初始化的连接,总之,是去初始化 连接的过程:
- if (createScheduler != null && asyncInit) {
- for (int i = 0; i < initialSize; ++i) {
- submitCreateTask(true);
- }
- } else if (!asyncInit) {
- // init connections
- while (poolingCount < initialSize) {
- try {
- PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
- DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
- connections[poolingCount++] = holder;
- } catch (SQLException ex) {
- LOG.error(“init datasource error, url: “ + this.getUrl(), ex);
- if (initExceptionThrow) {
- connectError = ex;
- break;
- } else {
- Thread.sleep(3000);
- }
- }
- }
- if (poolingCount > 0) {
- poolingPeak = poolingCount;
- poolingPeakTime = System.currentTimeMillis();
- }
- }
初始化的连接个数为连接串里面配置的initialSize
.
核心初始化方法com.alibaba.druid.pool.DruidAbstractDataSource#createPhysicalConnection()
,在这方法里面,会拿用户名密码,之后执行真正的获取connection:
- public Connection createPhysicalConnection(String url, Properties info) throws SQLException {
- Connection conn;
- if (getProxyFilters().size() == 0) {
- conn = getDriver().connect(url, info);
- } else {
- conn = new FilterChainImpl(this).connection_connect(info);
- }
- createCountUpdater.incrementAndGet(this);
- return conn;
- }
注意,如果配置了filters,则所有操作,都会在操作前执行filter处理链。
- public ConnectionProxy connection_connect(Properties info) throws SQLException {
- if (this.pos < filterSize) {
- return nextFilter()
- .connection_connect(this, info);
- }
- Driver driver = dataSource.getRawDriver();
- String url = dataSource.getRawJdbcUrl();
- Connection nativeConnection = driver.connect(url, info);
- if (nativeConnection == null) {
- return null;
- }
- return new ConnectionProxyImpl(dataSource, nativeConnection, info, dataSource.createConnectionId());
- }
再回到主流程init方法,connections
数组初始化完成之后, 开启额外线程:
- createAndLogThread(); //打印连接信息
- createAndStartCreatorThread(); //创建连接线程
- createAndStartDestroyThread(); //销毁连接线程
先看注释,具体里面的内容后面单独拉出来讲。
之后:
- initedLatch.await(); //初始化 latch -1
- init = true; //标记已经初始化完成
- initedTime = new Date(); //时间
- registerMbean(); //为datasource 注册jmx监控指标
最后的最后,如果配置了keepAlive:
- if (keepAlive) {
- // async fill to minIdle
- if (createScheduler != null) {
- for (int i = 0; i < minIdle; ++i) {
- submitCreateTask(true);
- }
- } else {
- this.emptySignal();
- }
- }
这时候,会根据配置的活跃连接数minIdle
,去给datasource的连接,做个保持活跃连接个数,具体后面再说。
连接池使用的核心逻辑
首先,使用数组作为连接的容器,对于真实连接的加入和移除,使用lock就行同步,另外,在加入和移除连接时候,对比生产消费模型,通过lock上的条件,来通知是否可以获取或者加入连接。
- public DruidAbstractDataSource(boolean lockFair){
- lock = new ReentrantLock(lockFair);
- notEmpty = lock.newCondition(); //非空,有连接
- empty = lock.newCondition(); //空的
- }
另外,默认的fairlock为false
- public DruidDataSource(){
- this(false);
- }
- public DruidDataSource(boolean fairLock){
- super(fairLock);
- configFromPropety(System.getProperties());
- }
创建连接
在线程com.alibaba.druid.pool.DruidDataSource.CreateConnectionThread
中:
- if (emptyWait) {
- // 必须存在线程等待,才创建连接
- if (poolingCount >= notEmptyWaitThreadCount //
- && (!(keepAlive && activeCount + poolingCount < minIdle))
- && !isFailContinuous()
- ) {
- empty.await();
- }
- // 防止创建超过maxActive数量的连接
- if (activeCount + poolingCount >= maxActive) {
- empty.await();
- continue;
- }
- }
必须存在线程等待获取连接时候,才能创建连接,并且要保持总的连接数,不能超过配置的最大连接。
创建完连接之后,执行 notEmpty.signalAll();
通知消费者。
获取连接
外层代码:
- @Override
- public DruidPooledConnection getConnection() throws SQLException {
- return getConnection(maxWait);
- }
- public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
- init();
- if (filters.size() > 0) {
- FilterChainImpl filterChain = new FilterChainImpl(this);
- return filterChain.dataSource_connect(this, maxWaitMillis);
- } else {
- return getConnectionDirect(maxWaitMillis);
- }
- }
忽略掉filter chain,其实最后执行的还是com.alibaba.druid.pool.DruidDataSource#getConnectionDirect
:
方法内部:
- poolableConnection = getConnectionInternal(maxWaitMillis);
- 1 , 连接不足,需要直接去创建新的,跟我们初始化一样
- 2,从connections里面拿
- if (maxWait > 0) {
- holder = pollLast(nanos);
- } else {
- holder = takeLast();
- }
其中,maxWait默认为-1,配置在init里面:
- String property = properties.getProperty(“druid.maxWait”);
- if (property != null && property.length() > 0) {
- try {
- int value = Integer.parseInt(property);
- this.setMaxWait(value);
- } catch (NumberFormatException e) {
- LOG.error(“illegal property ‘druid.maxWait'”, e);
- }
- }
这个用于配置拿连接时候,是否在这个时间上进行等待,默认是否,即一直等到拿到连接为止。
直接看下阻塞拿的过程:
- DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
- try {
- //没连接了
- while (poolingCount == 0) {
- //暗示下创建线程没连接了
- emptySignal(); // send signal to CreateThread create connection
- if (failFast && isFailContinuous()) {
- throw new DataSourceNotAvailableException(createError);
- }
- notEmptyWaitThreadCount++;
- if (notEmptyWaitThreadCount > notEmptyWaitThreadPeak) {
- notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
- }
- try {
- //傻等着创建或者回收,能给整出来点儿连接
- notEmpty.await(); // signal by recycle or creator
- } finally {
- notEmptyWaitThreadCount–;
- }
- notEmptyWaitCount++;
- if (!enable) {
- connectErrorCountUpdater.incrementAndGet(this);
- if (disableException != null) {
- throw disableException;
- }
- throw new DataSourceDisableException();
- }
- }
- } catch (InterruptedException ie) {
- notEmpty.signal(); // propagate to non-interrupted thread
- notEmptySignalCount++;
- throw ie;
- }
- //拿数组的最后一个连接
- decrementPoolingCount();
- DruidConnectionHolder last = connections[poolingCount];
- connections[poolingCount] = null;
- return last;
- }
连接回收
- protected void createAndStartDestroyThread() {
- destroyTask = new DestroyTask();
- //自定义配置销毁 ,适用于连接数非常多的 情况
- if (destroyScheduler != null) {
- long period = timeBetweenEvictionRunsMillis;
- if (period <= 0) {
- period = 1000;
- }
- destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destroyTask, period, period,
- TimeUnit.MILLISECONDS);
- initedLatch.countDown();
- return;
- }
- String threadName = “Druid-ConnectionPool-Destroy-“ + System.identityHashCode(this);
- //单线程销毁
- destroyConnectionThread = new DestroyConnectionThread(threadName);
- destroyConnectionThread.start();
- }
实际的销毁:
- public class DestroyTask implements Runnable {
- public DestroyTask() {
- }
- @Override
- public void run() {
- shrink(true, keepAlive);
- if (isRemoveAbandoned()) {
- removeAbandoned();
- }
- }
- }
最终 执行的还是 shrink
方法。
- public void shrink(boolean checkTime, boolean keepAlive) {
- try {
- lock.lockInterruptibly();
- } catch (InterruptedException e) {
- return;
- }
- boolean needFill = false;
- int evictCount = 0;
- int keepAliveCount = 0;
- int fatalErrorIncrement = fatalErrorCount – fatalErrorCountLastShrink;
- fatalErrorCountLastShrink = fatalErrorCount;
- try {
- if (!inited) {
- return;
- }
- final int checkCount = poolingCount – minIdle; //需要检测连接的数量
- final long currentTimeMillis = System.currentTimeMillis();
- for (int i = 0; i < poolingCount; ++i) { //检测目前connections数组中的连接
- DruidConnectionHolder connection = connections[i];
- if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis)) {
- keepAliveConnections[keepAliveCount++] = connection;
- continue;
- }
- if (checkTime) {
- //是否设置了物理连接的超时时间phyTimoutMills。假如设置了该时间,
- // 判断连接时间存活时间是否已经超过phyTimeoutMills,是则放入evictConnections中
- if (phyTimeoutMillis > 0) {
- long phyConnectTimeMillis = currentTimeMillis – connection.connectTimeMillis;
- if (phyConnectTimeMillis > phyTimeoutMillis) {
- evictConnections[evictCount++] = connection;
- continue;
- }
- }
- long idleMillis = currentTimeMillis – connection.lastActiveTimeMillis;//获取连接空闲时间
- //如果某条连接空闲时间小于minEvictableIdleTimeMillis,则不用继续检查剩下的连接了
- if (idleMillis < minEvictableIdleTimeMillis
- && idleMillis < keepAliveBetweenTimeMillis
- ) {
- break;
- }
- if (idleMillis >= minEvictableIdleTimeMillis) {
- // check checkTime is silly code
- //检测检查了几个连接了
- if (checkTime && i < checkCount) {
- //超时了
- evictConnections[evictCount++] = connection;
- continue;
- } else if (idleMillis > maxEvictableIdleTimeMillis) {
- //超时了
- evictConnections[evictCount++] = connection;
- continue;
- }
- }
- if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
- //配置了keepAlive,并且在存活时间内,放到keepAlive数组
- keepAliveConnections[keepAliveCount++] = connection;
- }
- } else {
- //不需要检查时间的,直接移除
- if (i < checkCount) {
- evictConnections[evictCount++] = connection;
- } else {
- break;
- }
- }
- }
- int removeCount = evictCount + keepAliveCount; //移除了几个
- //由于使用connections连接时候,都是取后面的,后面 的是最新的连接,只考虑前面过期就行,所以只需要挪动前面的连接
- if (removeCount > 0) {
- System.arraycopy(connections, removeCount, connections, 0, poolingCount – removeCount);
- Arrays.fill(connections, poolingCount – removeCount, poolingCount, null);
- poolingCount -= removeCount;
- }
- keepAliveCheckCount += keepAliveCount;
- if (keepAlive && poolingCount + activeCount < minIdle) {
- //不够核心的活跃连接时候,需要去创建啦
- needFill = true;
- }
- } finally {
- lock.unlock();
- }
- if (evictCount > 0) {
- for (int i = 0; i < evictCount; ++i) {
- //销毁连接
- DruidConnectionHolder item = evictConnections[i];
- Connection connection = item.getConnection();
- JdbcUtils.close(connection);
- destroyCountUpdater.incrementAndGet(this);
- }
- Arrays.fill(evictConnections, null);
- }
- if (keepAliveCount > 0) {
- // keep order
- for (int i = keepAliveCount – 1; i >= 0; —i) {
- DruidConnectionHolder holer = keepAliveConnections[i];
- Connection connection = holer.getConnection();
- holer.incrementKeepAliveCheckCount();
- boolean validate = false;
- try {
- this.validateConnection(connection);
- validate = true;
- } catch (Throwable error) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(“keepAliveErr”, error);
- }
- // skip
- }
- boolean discard = !validate; //没通过validate
- if (validate) {
- //通过keep alive检查,更新时间
- holer.lastKeepTimeMillis = System.currentTimeMillis();
- //这里还会尝试放回connections数组
- boolean putOk = put(holer, 0L, true);
- if (!putOk) {
- //没放入,标记要丢弃了
- discard = true;
- }
- }
- if (discard) {
- try {
- connection.close();
- } catch (Exception e) {
- // skip
- }
- lock.lock();
- try {
- discardCount++;
- if (activeCount + poolingCount <= minIdle) {
- //发信号让创建线程去创建
- emptySignal();
- }
- } finally {
- lock.unlock();
- }
- }
- }
- this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
- Arrays.fill(keepAliveConnections, null);
- }
- if (needFill) {
- //又要去创建了
- lock.lock();
- try {
- int fillCount = minIdle – (activeCount + poolingCount + createTaskCount);
- for (int i = 0; i < fillCount; ++i) {
- emptySignal();
- }
- } finally {
- lock.unlock();
- }
- } else if (onFatalError || fatalErrorIncrement > 0) {
- lock.lock();
- try {
- emptySignal();
- } finally {
- lock.unlock();
- }
- }
- }
工具数组evictConnections
,keepAliveConnections
用完即被置空,老工具人了。
一波操作下来,完成了对connections数组的大清洗。
小结
- 只写了核心逻辑,很多validate,checker,filter省略了。
- druid连接池源码里面还有很多好用的工具,比如数据库驱动工具,jdbc工具,解析SQL的语法树,iBATis的支持,wall过滤,多数据源…
- 最新的代码我看还有支持配套ZK的高可用方案,用到的话后期我会继续更新源码解析。
以上就是Druid核心源码解析DruidDataSource的详细内容,更多关于Druid核心DruidDataSource的资料请关注我们其它相关文章!
发表评论