Druid核心源码解析DruidDataSource

配置读取

druid连接池支持的所有连接参数可在类com.alibaba.druid.pool.DruidDataSourceFactory中查看。

配置读取代码:

  1.      public void configFromPropety(Properties properties) {
  2.          //这方法太长,自己看源码去吧,就是读读属性。。。。
  3.      }

整体代码比较简单,就是把配置内容,读取到dataSource。

连接池初始化

首先是简单的判断,加锁:

  1. if (inited) {
  2.              //已经被初始化好了,直接return
  3.              return;
  4.          }
  5.          // bug fixed for dead lock, for issue #2980
  6.          DruidDriver.getInstance();
  7.          /**控制创建移除连接的锁,并且通过条件去控制一个连接的生成消费**/
  8.          // public DruidAbstractDataSource(boolean lockFair){
  9.          // lock = new ReentrantLock(lockFair);
  10.          //
  11.          // notEmpty = lock.newCondition();
  12.          // empty = lock.newCondition();
  13.          // }
  14.          final ReentrantLock lock = this.lock;
  15.          try {
  16.              lock.lockInterruptibly();
  17.          } catch (InterruptedException e) {
  18.              throw new SQLException(“interrupt”, e);
  19.          }

之后会更新一些JMX的监控指标:

  1. //一些jmx监控指标
  2.                  this.connectionIdSeedUpdater.addAndGet(this, delta);
  3.                  this.statementIdSeedUpdater.addAndGet(this, delta);
  4.                  this.resultSetIdSeedUpdater.addAndGet(this, delta);
  5.                  this.transactionIdSeedUpdater.addAndGet(this, delta);

druid的监控指标都是通过jmx实现的。

解析连接串:

  1.      if (this.jdbcUrl != null) {
  2.                  //解析连接串
  3.                  this.jdbcUrl = this.jdbcUrl.trim();
  4.                  initFromWrapDriverUrl();
  5.              }

initFromWrapDriverUrl方法,除了从jdbc url中解析出连接和驱动信息,后面还把filters的名字,解析成了对应的filter类。

  1.      private void initFromWrapDriverUrl() throws SQLException {
  2.          if (!jdbcUrl.startsWith(DruidDriver.DEFAULT_PREFIX)) {
  3.              return;
  4.          }
  5.          DataSourceProxyConfig config = DruidDriver.parseConfig(jdbcUrl, null);
  6.          this.driverClass = config.getRawDriverClassName();
  7.          LOG.error(“error url : ‘” + jdbcUrl + “‘, it should be : ‘” + config.getRawUrl() + “‘”);
  8.          this.jdbcUrl = config.getRawUrl();
  9.          if (this.name == null) {
  10.              this.name = config.getName();
  11.          }
  12.          for (Filter filter : config.getFilters()) {
  13.              addFilter(filter);
  14.          }
  15.      }

之后在init方法里面,会进行filters的初始化:

  1.      //初始化filter 属性
  2.              for (Filter filter : filters) {
  3.                  filter.init(this);
  4.              }

之后解析数据库类型:

  1.      if (this.dbTypeName == null || this.dbTypeName.length() == 0) {
  2.                  this.dbTypeName = JdbcUtils.getDbType(jdbcUrl, null);
  3.              }

注意枚举值: com.alibaba.druid.DbType,这个里面包含了目前durid连接池支持的所有数据源 类型,另外,druid还额外提供了一些驱动类,例如:

  1.      elastic_search (1 << 25), // com.alibaba.xdriver.elastic.jdbc.ElasticDriver

clickhouse还提供了负载均衡的驱动类:

com.alibaba.druid.support.clickhouse.BalancedClickhouseDriver 。

在回到init方法,之后是一堆参数解析,不再说,跳过了。 之后是通过SPI加载自定义的filter:

  1.      private void initFromSPIServiceLoader() {
  2.          if (loadSpifilterSkip) {
  3.              return;
  4.          }
  5.          if (autoFilters == null) {
  6.              List<Filter> filters = new ArrayList<Filter>();
  7.              ServiceLoader<Filter> autoFilterLoader = ServiceLoader.load(Filter.class);
  8.              for (Filter filter : autoFilterLoader) {
  9.                  AutoLoad autoLoad = filter.getClass().getAnnotation(AutoLoad.class);
  10.                  if (autoLoad != null && autoLoad.value()) {
  11.                      filters.add(filter);
  12.                  }
  13.              }
  14.              autoFilters = filters;
  15.          }
  16.          for (Filter filter : autoFilters) {
  17.              if (LOG.isInfoEnabled()) {
  18.                  LOG.info(“load filter from spi :” + filter.getClass().getName());
  19.              }
  20.              addFilter(filter);
  21.          }
  22.      }

注意自定义的filter,要使用com.alibaba.druid.filter.AutoLoad

解析驱动:

  1.      protected void resolveDriver() throws SQLException {
  2.          if (this.driver == null) {
  3.              if (this.driverClass == null || this.driverClass.isEmpty()) {
  4.                  this.driverClass = JdbcUtils.getDriverClassName(this.jdbcUrl);
  5.              }
  6.              if (MockDriver.class.getName().equals(driverClass)) {
  7.                  driver = MockDriver.instance;
  8.              } else if (“com.alibaba.druid.support.clickhouse.BalancedClickhouseDriver”.equals(driverClass)) {
  9.                  Properties info = new Properties();
  10.                  info.put(“user”, username);
  11.                  info.put(“password”, password);
  12.                  info.putAll(connectProperties);
  13.                  driver = new BalancedClickhouseDriver(jdbcUrl, info);
  14.              } else {
  15.                  if (jdbcUrl == null && (driverClass == null || driverClass.length() == 0)) {
  16.                      throw new SQLException(“url not set”);
  17.                  }
  18.                  driver = JdbcUtils.createDriver(driverClassLoader, driverClass);
  19.              }
  20.          } else {
  21.              if (this.driverClass == null) {
  22.                  this.driverClass = driver.getClass().getName();
  23.              }
  24.          }
  25.      }

其中durid自己的mock驱动和clickhouse的负载均衡的驱动,特殊判断了下,其他走的都是class forname.

之后是exception sorter和checker的一些东西,跟主线剧情关系不大,skip.

之后是一些初始化JdbcDataSourceStat,没啥东西。

之后是核心:

  1.      connections = new DruidConnectionHolder[maxActive]; //连接数组
  2.              evictConnections = new DruidConnectionHolder[maxActive]; //销毁的连接数组
  3.              keepAliveConnections = new DruidConnectionHolder[maxActive]; //保持活跃可用的数组

dataSource的连接,都被包装在类DruidConnectionHolder中,之后是一个同步去初始化连接还是异步去初始化的连接,总之,是去初始化 连接的过程:

  1. if (createScheduler != null &&amp; asyncInit) {
  2.                  for (int i = 0; i < initialSize; ++i) {
  3.                      submitCreateTask(true);
  4.                  }
  5.              } else if (!asyncInit) {
  6.                  // init connections
  7.                  while (poolingCount < initialSize) {
  8.                      try {
  9.                      PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
  10.                      DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
  11.                      connections[poolingCount++] = holder;
  12.                      } catch (SQLException ex) {
  13.                      LOG.error(“init datasource error, url: “ + this.getUrl(), ex);
  14.                      if (initExceptionThrow) {
  15.                      connectError = ex;
  16.                      break;
  17.                      } else {
  18.                      Thread.sleep(3000);
  19.                      }
  20.                      }
  21.                  }
  22.                  if (poolingCount > 0) {
  23.                      poolingPeak = poolingCount;
  24.                      poolingPeakTime = System.currentTimeMillis();
  25.                  }
  26.              }

初始化的连接个数为连接串里面配置的initialSize.

核心初始化方法com.alibaba.druid.pool.DruidAbstractDataSource#createPhysicalConnection(),在这方法里面,会拿用户名密码,之后执行真正的获取connection:

  1.      public Connection createPhysicalConnection(String url, Properties info) throws SQLException {
  2.          Connection conn;
  3.          if (getProxyFilters().size() == 0) {
  4.              conn = getDriver().connect(url, info);
  5.          } else {
  6.              conn = new FilterChainImpl(this).connection_connect(info);
  7.          }
  8.          createCountUpdater.incrementAndGet(this);
  9.          return conn;
  10.      }

注意,如果配置了filters,则所有操作,都会在操作前执行filter处理链。

  1.      public ConnectionProxy connection_connect(Properties info) throws SQLException {
  2.          if (this.pos &lt; filterSize) {
  3.              return nextFilter()
  4.                      .connection_connect(this, info);
  5.          }
  6.          Driver driver = dataSource.getRawDriver();
  7.          String url = dataSource.getRawJdbcUrl();
  8.          Connection nativeConnection = driver.connect(url, info);
  9.          if (nativeConnection == null) {
  10.              return null;
  11.          }
  12.          return new ConnectionProxyImpl(dataSource, nativeConnection, info, dataSource.createConnectionId());
  13.      }

再回到主流程init方法,connections数组初始化完成之后, 开启额外线程:

  1.          createAndLogThread(); //打印连接信息
  2.              createAndStartCreatorThread(); //创建连接线程
  3.              createAndStartDestroyThread(); //销毁连接线程

先看注释,具体里面的内容后面单独拉出来讲。

之后:

  1.      initedLatch.await(); //初始化 latch -1
  2.              init = true; //标记已经初始化完成
  3.              initedTime = new Date(); //时间
  4.              registerMbean(); //为datasource 注册jmx监控指标

最后的最后,如果配置了keepAlive:

  1.              if (keepAlive) {
  2.                  // async fill to minIdle
  3.                  if (createScheduler != null) {
  4.                      for (int i = 0; i &lt; minIdle; ++i) {
  5.                      submitCreateTask(true);
  6.                      }
  7.                  } else {
  8.                      this.emptySignal();
  9.                  }
  10.              }

这时候,会根据配置的活跃连接数minIdle,去给datasource的连接,做个保持活跃连接个数,具体后面再说。

连接池使用的核心逻辑

首先,使用数组作为连接的容器,对于真实连接的加入和移除,使用lock就行同步,另外,在加入和移除连接时候,对比生产消费模型,通过lock上的条件,来通知是否可以获取或者加入连接。

  1.      public DruidAbstractDataSource(boolean lockFair){
  2.          lock = new ReentrantLock(lockFair);
  3.      notEmpty = lock.newCondition(); //非空,有连接
  4.          empty = lock.newCondition(); //空的
  5.      }

另外,默认的fairlock为false

  1.      public DruidDataSource(){
  2.          this(false);
  3.      }
  4.      public DruidDataSource(boolean fairLock){
  5.          super(fairLock);
  6.          configFromPropety(System.getProperties());
  7.      }

创建连接

在线程com.alibaba.druid.pool.DruidDataSource.CreateConnectionThread中:

  1.      if (emptyWait) {
  2.                      // 必须存在线程等待,才创建连接
  3.                      if (poolingCount >= notEmptyWaitThreadCount //
  4.                      && (!(keepAlive && activeCount + poolingCount < minIdle))
  5.                      && !isFailContinuous()
  6.                      ) {
  7.                      empty.await();
  8.                      }
  9.                      // 防止创建超过maxActive数量的连接
  10.                      if (activeCount + poolingCount >= maxActive) {
  11.                      empty.await();
  12.                      continue;
  13.                      }
  14.                      }

必须存在线程等待获取连接时候,才能创建连接,并且要保持总的连接数,不能超过配置的最大连接。

创建完连接之后,执行 notEmpty.signalAll();通知消费者。

获取连接

外层代码:

  1.      @Override
  2.      public DruidPooledConnection getConnection() throws SQLException {
  3.          return getConnection(maxWait);
  4.      }
  5.      public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
  6.          init();
  7.          if (filters.size() > 0) {
  8.              FilterChainImpl filterChain = new FilterChainImpl(this);
  9.              return filterChain.dataSource_connect(this, maxWaitMillis);
  10.          } else {
  11.              return getConnectionDirect(maxWaitMillis);
  12.          }
  13.      }

忽略掉filter chain,其实最后执行的还是com.alibaba.druid.pool.DruidDataSource#getConnectionDirect

方法内部:

  1.      poolableConnection = getConnectionInternal(maxWaitMillis);
  • 1 , 连接不足,需要直接去创建新的,跟我们初始化一样
  • 2,从connections里面拿
  1.      if (maxWait &gt; 0) {
  2.                      holder = pollLast(nanos);
  3.                  } else {
  4.                      holder = takeLast();
  5.                  }

其中,maxWait默认为-1,配置在init里面:

  1.      String property = properties.getProperty(“druid.maxWait”);
  2.              if (property != null && property.length() > 0) {
  3.                  try {
  4.                      int value = Integer.parseInt(property);
  5.                      this.setMaxWait(value);
  6.                  } catch (NumberFormatException e) {
  7.                      LOG.error(“illegal property ‘druid.maxWait'”, e);
  8.                  }
  9.              }

这个用于配置拿连接时候,是否在这个时间上进行等待,默认是否,即一直等到拿到连接为止。

直接看下阻塞拿的过程:

  1.      DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
  2.          try {
  3.              //没连接了
  4.              while (poolingCount == 0) {
  5.                  //暗示下创建线程没连接了
  6.                  emptySignal(); // send signal to CreateThread create connection
  7.                  if (failFast &amp;&amp; isFailContinuous()) {
  8.                      throw new DataSourceNotAvailableException(createError);
  9.                  }
  10.                  notEmptyWaitThreadCount++;
  11.                  if (notEmptyWaitThreadCount &gt; notEmptyWaitThreadPeak) {
  12.                      notEmptyWaitThreadPeak = notEmptyWaitThreadCount;
  13.                  }
  14.                  try {
  15.                      //傻等着创建或者回收,能给整出来点儿连接
  16.                      notEmpty.await(); // signal by recycle or creator
  17.                  } finally {
  18.                      notEmptyWaitThreadCount–;
  19.                  }
  20.                  notEmptyWaitCount++;
  21.                  if (!enable) {
  22.                      connectErrorCountUpdater.incrementAndGet(this);
  23.                      if (disableException != null) {
  24.                      throw disableException;
  25.                      }
  26.                      throw new DataSourceDisableException();
  27.                  }
  28.              }
  29.          } catch (InterruptedException ie) {
  30.              notEmpty.signal(); // propagate to non-interrupted thread
  31.              notEmptySignalCount++;
  32.              throw ie;
  33.          }
  34.          //拿数组的最后一个连接
  35.          decrementPoolingCount();
  36.          DruidConnectionHolder last = connections[poolingCount];
  37.          connections[poolingCount] = null;
  38.          return last;
  39.      }

连接回收

  1.      protected void createAndStartDestroyThread() {
  2.          destroyTask = new DestroyTask();
  3.      //自定义配置销毁 ,适用于连接数非常多的 情况
  4.          if (destroyScheduler != null) {
  5.              long period = timeBetweenEvictionRunsMillis;
  6.              if (period &lt;= 0) {
  7.                  period = 1000;
  8.              }
  9.              destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destroyTask, period, period,
  10.                      TimeUnit.MILLISECONDS);
  11.              initedLatch.countDown();
  12.              return;
  13.          }
  14.          String threadName = “Druid-ConnectionPool-Destroy-“ + System.identityHashCode(this);
  15.          //单线程销毁
  16.          destroyConnectionThread = new DestroyConnectionThread(threadName);
  17.          destroyConnectionThread.start();
  18.      }

实际的销毁:

  1.      public class DestroyTask implements Runnable {
  2.          public DestroyTask() {
  3.          }
  4.          @Override
  5.          public void run() {
  6.              shrink(true, keepAlive);
  7.              if (isRemoveAbandoned()) {
  8.                  removeAbandoned();
  9.              }
  10.          }
  11.      }

最终 执行的还是 shrink方法。

  1.      public void shrink(boolean checkTime, boolean keepAlive) {
  2.          try {
  3.              lock.lockInterruptibly();
  4.          } catch (InterruptedException e) {
  5.              return;
  6.          }
  7.          boolean needFill = false;
  8.          int evictCount = 0;
  9.          int keepAliveCount = 0;
  10.          int fatalErrorIncrement = fatalErrorCount  fatalErrorCountLastShrink;
  11.          fatalErrorCountLastShrink = fatalErrorCount;
  12.          try {
  13.              if (!inited) {
  14.                  return;
  15.              }
  16.              final int checkCount = poolingCount  minIdle; //需要检测连接的数量
  17.              final long currentTimeMillis = System.currentTimeMillis();
  18.              for (int i = 0; i < poolingCount; ++i) { //检测目前connections数组中的连接
  19.                  DruidConnectionHolder connection = connections[i];
  20.                  if ((onFatalError || fatalErrorIncrement > 0) && (lastFatalErrorTimeMillis > connection.connectTimeMillis)) {
  21.                      keepAliveConnections[keepAliveCount++] = connection;
  22.                      continue;
  23.                  }
  24.                  if (checkTime) {
  25.                      //是否设置了物理连接的超时时间phyTimoutMills。假如设置了该时间,
  26.                      // 判断连接时间存活时间是否已经超过phyTimeoutMills,是则放入evictConnections中
  27.                      if (phyTimeoutMillis > 0) {
  28.                      long phyConnectTimeMillis = currentTimeMillis  connection.connectTimeMillis;
  29.                      if (phyConnectTimeMillis > phyTimeoutMillis) {
  30.                      evictConnections[evictCount++] = connection;
  31.                      continue;
  32.                      }
  33.                      }
  34.                      long idleMillis = currentTimeMillis  connection.lastActiveTimeMillis;//获取连接空闲时间
  35.                      //如果某条连接空闲时间小于minEvictableIdleTimeMillis,则不用继续检查剩下的连接了
  36.                      if (idleMillis < minEvictableIdleTimeMillis
  37.                      && idleMillis < keepAliveBetweenTimeMillis
  38.                      ) {
  39.                      break;
  40.                      }
  41.                      if (idleMillis >= minEvictableIdleTimeMillis) {
  42.                      // check checkTime is silly code
  43.                      //检测检查了几个连接了
  44.                      if (checkTime && i < checkCount) {
  45.                      //超时了
  46.                      evictConnections[evictCount++] = connection;
  47.                      continue;
  48.                      } else if (idleMillis > maxEvictableIdleTimeMillis) {
  49.                      //超时了
  50.                      evictConnections[evictCount++] = connection;
  51.                      continue;
  52.                      }
  53.                      }
  54.                      if (keepAlive && idleMillis >= keepAliveBetweenTimeMillis) {
  55.                      //配置了keepAlive,并且在存活时间内,放到keepAlive数组
  56.                      keepAliveConnections[keepAliveCount++] = connection;
  57.                      }
  58.                  } else {
  59.                      //不需要检查时间的,直接移除
  60.                      if (< checkCount) {
  61.                      evictConnections[evictCount++] = connection;
  62.                      } else {
  63.                      break;
  64.                      }
  65.                  }
  66.              }
  67.              int removeCount = evictCount + keepAliveCount; //移除了几个
  68.              //由于使用connections连接时候,都是取后面的,后面 的是最新的连接,只考虑前面过期就行,所以只需要挪动前面的连接
  69.              if (removeCount > 0) {
  70.                  System.arraycopy(connections, removeCount, connections, 0, poolingCount  removeCount);
  71.                  Arrays.fill(connections, poolingCount  removeCount, poolingCount, null);
  72.                  poolingCount -= removeCount;
  73.              }
  74.              keepAliveCheckCount += keepAliveCount;
  75.              if (keepAlive && poolingCount + activeCount < minIdle) {
  76.                  //不够核心的活跃连接时候,需要去创建啦
  77.                  needFill = true;
  78.              }
  79.          } finally {
  80.              lock.unlock();
  81.          }
  82.          if (evictCount > 0) {
  83.              for (int i = 0; i < evictCount; ++i) {
  84.                  //销毁连接
  85.                  DruidConnectionHolder item = evictConnections[i];
  86.                  Connection connection = item.getConnection();
  87.                  JdbcUtils.close(connection);
  88.                  destroyCountUpdater.incrementAndGet(this);
  89.              }
  90.              Arrays.fill(evictConnections, null);
  91.          }
  92.          if (keepAliveCount > 0) {
  93.              // keep order
  94.              for (int i = keepAliveCount  1; i >= 0; i) {
  95.                  DruidConnectionHolder holer = keepAliveConnections[i];
  96.                  Connection connection = holer.getConnection();
  97.                  holer.incrementKeepAliveCheckCount();
  98.                  boolean validate = false;
  99.                  try {
  100.                      this.validateConnection(connection);
  101.                      validate = true;
  102.                  } catch (Throwable error) {
  103.                      if (LOG.isDebugEnabled()) {
  104.                      LOG.debug(“keepAliveErr”, error);
  105.                      }
  106.                      // skip
  107.                  }
  108.                  boolean discard = !validate; //没通过validate
  109.                  if (validate) {
  110.                      //通过keep alive检查,更新时间
  111.                      holer.lastKeepTimeMillis = System.currentTimeMillis();
  112.                      //这里还会尝试放回connections数组
  113.                      boolean putOk = put(holer, 0L, true);
  114.                      if (!putOk) {
  115.                      //没放入,标记要丢弃了
  116.                      discard = true;
  117.                      }
  118.                  }
  119.                  if (discard) {
  120.                      try {
  121.                      connection.close();
  122.                      } catch (Exception e) {
  123.                      // skip
  124.                      }
  125.                      lock.lock();
  126.                      try {
  127.                      discardCount++;
  128.                      if (activeCount + poolingCount <= minIdle) {
  129.                      //发信号让创建线程去创建
  130.                      emptySignal();
  131.                      }
  132.                      } finally {
  133.                      lock.unlock();
  134.                      }
  135.                  }
  136.              }
  137.              this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
  138.              Arrays.fill(keepAliveConnections, null);
  139.          }
  140.          if (needFill) {
  141.              //又要去创建了
  142.              lock.lock();
  143.              try {
  144.                  int fillCount = minIdle  (activeCount + poolingCount + createTaskCount);
  145.                  for (int i = 0; i < fillCount; ++i) {
  146.                      emptySignal();
  147.                  }
  148.              } finally {
  149.                  lock.unlock();
  150.              }
  151.          } else if (onFatalError || fatalErrorIncrement > 0) {
  152.              lock.lock();
  153.              try {
  154.                  emptySignal();
  155.              } finally {
  156.                  lock.unlock();
  157.              }
  158.          }
  159.      }

工具数组evictConnections,keepAliveConnections 用完即被置空,老工具人了。

一波操作下来,完成了对connections数组的大清洗。

小结

  • 只写了核心逻辑,很多validate,checker,filter省略了。
  • druid连接池源码里面还有很多好用的工具,比如数据库驱动工具,jdbc工具,解析SQL的语法树,iBATis的支持,wall过滤,多数据源…
  • 最新的代码我看还有支持配套ZK的高可用方案,用到的话后期我会继续更新源码解析。

以上就是Druid核心源码解析DruidDataSource的详细内容,更多关于Druid核心DruidDataSource的资料请关注我们其它相关文章!

标签

发表评论