一分钟掌握Java Quartz定时任务

前言

前几篇介绍了单体架构的定时任务解决方式,但是现代软件架构由于业务复杂度高,业务的耦合性太强,已经由单体架构拆分成了分布式架构。因此,定时任务的架构也随之修改。而Quartz是分布式定时任务解决方案中使用简单,结构清晰,且不依赖第三方分布式调度中间件的。上车,mars酱带你车里细说~

角色介绍

Quartz入门使用的角色不多,三个角色足够,分别是:

Scheduler:调度器。用来负责任务的调度;

Job:任务。这是一个接口,业务代码继承Job接口并实现它的execute方法,是业务执行的主体部分;

Trigger: 触发器。也是个接口,有两个触发器比较关键,一个是SimpleTrigger,另一个是CronTrigger。前者支持简单的定时,比如:按时、按秒等;后者直接支持cron表达式。下面我们从官方的源代码入手,看看Quartz如何做到分布式的。

官方例子

官方源代码down下来之后,有个examples文件夹:

-1

example1是入门级中最简单的。就两个Java文件,一个HelloJob:

  1. package org.quartz.examples.example1;
  2. import java.util.Date;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.quartz.Job;
  6. import org.quartz.JobExecutionContext;
  7. import org.quartz.JobExecutionException;
  8. /**
  9.      * <p>
  10.      * This is just a simple job that says “Hello” to the world.
  11.      * </p>
  12.      *
  13.      * @author Bill Kratzer
  14.      */
  15. public class HelloJob implements Job {
  16.      private static Logger _log = LoggerFactory.getLogger(HelloJob.class);
  17.      /**
  18.          * <p>
  19.          * Empty constructor for job initilization
  20.          * </p>
  21.          * <p>
  22.          * Quartz requires a public empty constructor so that the
  23.          * scheduler can instantiate the class whenever it needs.
  24.          * </p>
  25.          */
  26.      public HelloJob() {
  27.      }
  28.      /**
  29.          * <p>
  30.          * Called by the <code>{@link org.quartz.Scheduler}</code> when a
  31.          * <code>{@link org.quartz.Trigger}</code> fires that is associated with
  32.          * the <code>Job</code>.
  33.          * </p>
  34.          *
  35.          * @throws JobExecutionException
  36.          * if there is an exception while executing the job.
  37.          */
  38.      public void execute(JobExecutionContext context)
  39.          throws JobExecutionException {
  40.          // Say Hello to the World and display the date/time
  41.          _log.info(“Hello World! – “ + new Date());
  42.      }
  43. }

另一个SimpleExample:

  1.      package org.quartz.examples.example1;
  2.      import org.quartz.JobDetail;
  3.      import org.quartz.Scheduler;
  4.      import org.quartz.SchedulerFactory;
  5.      import org.quartz.Trigger;
  6.      import org.quartz.impl.StdSchedulerFactory;
  7.      import org.slf4j.Logger;
  8.      import org.slf4j.LoggerFactory;
  9.      import java.util.Date;
  10.      import static org.quartz.DateBuilder.evenMinuteDate;
  11.      import static org.quartz.JobBuilder.newJob;
  12.      import static org.quartz.TriggerBuilder.newTrigger;
  13.      /**
  14.      * This Example will demonstrate how to start and shutdown the Quartz scheduler and how to schedule a job to run in
  15.      * Quartz.
  16.      *
  17.      * @author Bill Kratzer
  18.      */
  19.      public class SimpleExample {
  20.          public void run() throws Exception {
  21.              Logger log = LoggerFactory.getLogger(SimpleExample.class);
  22.              log.info(“——- Initializing ———————-“);
  23.              // 1. 创建一个scheduler
  24.              SchedulerFactory sf = new StdSchedulerFactory();
  25.              Scheduler sched = sf.getScheduler();
  26.              log.info(“——- Initialization Complete ———–“);
  27.              // computer a time that is on the next round minute
  28.              Date runTime = evenMinuteDate(new Date());
  29.              log.info(“——- Scheduling Job ——————-“);
  30.              // 2. 指定一个job
  31.              JobDetail job = newJob(HelloJob.class).withIdentity(“job1”, “group1”).build();
  32.              // 3. 指定一个trigger
  33.              Trigger trigger = newTrigger().withIdentity(“trigger1”, “group1”).startAt(runTime).build();
  34.              // 4. 绑定job和trigger
  35.              sched.scheduleJob(job, trigger);
  36.              log.info(job.getKey() + ” will run at: “ + runTime);
  37.              // 5. 执行
  38.              sched.start();
  39.              log.info(“——- Started Scheduler —————–“);
  40.              // wait long enough so that the scheduler as an opportunity to
  41.              // run the job!
  42.              log.info(“——- Waiting 65 seconds… ————-“);
  43.              try {
  44.                  // wait 65 seconds to show job
  45.                  Thread.sleep(65L * 1000L);
  46.                  // executing…
  47.              } catch (Exception e) {
  48.                  //
  49.              }
  50.              // shut down the scheduler
  51.              log.info(“——- Shutting Down ———————“);
  52.              sched.shutdown(true);
  53.              log.info(“——- Shutdown Complete —————–“);
  54.          }
  55.          public static void main(String[] args) throws Exception {
  56.              SimpleExample example = new SimpleExample();
  57.              example.run();
  58.          }
  59.      }

整个SimpleExample只有五个步骤:

  • 创建Scheduler,这是一个调度器,例子中使用调度器工厂来创建一个调度器;
  • 创建一个Job。实际上Job就是那个HelloJob,但是这里把HelloJob丢给了JobDetail对象,Job接口本身只有一个execute函数,没有其他的属性了,如果需要附加其他属性,JobDetail就支持,比如我们需要往Job中传递参数,JobDetail中提供了一个JobDataMap。当Job在运行的时候,execute函数里面的就能获取到JobDetail对象,并将设置的数据传递给Job接口的实现;
  • 创建一个Trigger。Trigger对象主责是任务的执行时间,比如官方例子中的startAt函数,就指定了具体的运行时间,还有startNow(立即执行);
  • 用scheduler绑定Job和Trigger;
  • 执行scheduler。

Quartz的使用是不是简单又清晰?Job是任务,单一职责,不做任何其他事情。Trigger负责执行的频率等等属性。Scheduler负责按照Trigger的规则去执行Job的内容。各自部分的功能符合单一原则。

但是,到这里都不是分布式的方式,依然是单体架构的。那么,Quartz如何做到分布式的呢?

Quartz如何分布式?

Quartz的分布式实现方式并不依赖其他分布式协调管理中间件完成,而是使用数据锁来实现。使用数据做协调管理中间件的唯一的前提是:需要把集群的每台机器时间校对一致。

Quartz数据库核心表如下:

表名 功能描述
QRTZ_CALENDARS 存储Quartz的Calendar信息
QRTZ_CRON_TRIGGERS 存储CronTrigger,包括Cron表达式和时区信息
QRTZ_FIRED_TRIGGERS 存储与已触发的Trigger相关的状态信息,以及相联Job的执行信息
QRTZ_PAUSED_TRIGGER_GRPS 存储已暂停的Trigger组的信息
QRTZ_SCHEDULER_STATE 存储少量的有关Scheduler的状态信息,和别的Scheduler实例
QRTZ_LOCKS 存储程序的悲观锁的信息
QRTZ_JOB_DETAILS 存储每一个已配置的Job的详细信息
QRTZ_JOB_LISTENERS 存储有关已配置的JobListener的信息
QRTZ_SIMPLE_TRIGGERS 存储简单的Trigger,包括重复次数、间隔、以及已触的次数
QRTZ_BLOG_TRIGGERS Trigger作为Blob类型存储
QRTZ_TRIGGER_LISTENERS 存储已配置的TriggerListener的信息
QRTZ_TRIGGERS 存储已配置的Trigger的信息

字体加粗的QRTZ_LOCKS表是一个悲观锁的存储实现,Quartz认为每条记录都可能会产生并发冲突。以上表的SQL可以在quartz目录中找到:

-2

找到自己喜欢的数据库品牌,并创建好表即可。

跟着官方例子看源码

我们从Hello的execute方法开始,反着找,继续看看分布式的方式如何实现。为什么反着找呢?因为这里是我们业务实现的主体内容,Quartz框架最终必须要调用到这个execute的具体实现的。我们找到调用execute的地方有很多处:

-3

从类名来分析,DirectoryScanJob看着是目录扫描任务,FileScanJob直译是文件扫描任务,SendMailJob是发送邮件任务,最后只剩那个JobRunShell,毕竟翻译过来叫“任务运行の核心”啊。进入JobRunShell,找到调用execute函数的部分,execute函数被包裹在一个一百三十多行长又长的run函数中:

  1. public void run() {
  2.      qs.addInternalSchedulerListener(this);
  3.      try {
  4.          // …省略很多源代码
  5.          do {
  6.              // …省略很多源代码
  7.              try {
  8.                  begin();
  9.              } catch (SchedulerException se) {
  10.                  // … 省略源代码
  11.              }
  12.              // … 省略源代码
  13.              try {
  14.                  log.debug(“Calling execute on job “ + jobDetail.getKey());
  15.                  // 这里负责执行job的execute函数
  16.                  job.execute(jec);
  17.                  endTime = System.currentTimeMillis();
  18.              } catch (JobExecutionException jee) {
  19.                  // … 省略源代码
  20.              } catch (Throwable e) {
  21.                  // … 省略源代码
  22.              }
  23.              // …省略很多源代码
  24.              try {
  25.                  complete(true);
  26.              } catch (SchedulerException se) {
  27.                  // … 省略源代码
  28.              }
  29.              // …省略很多源代码
  30.          } while (true);
  31.      } finally {
  32.          qs.removeInternalSchedulerListener(this);
  33.      }
  34. }

可以看到run中间的execute被夹在一个begin函数和comlete函数中,而begin和complete的实现是一个基于JTA事务的JTAJobRunShell的实现来完成的。JobRunShell是一个Runnable接口的实现,那么刚刚的run方法,必定在某处启用了线程(池)的start方法。

mars酱继续跟踪查找源代码,在QuartzSchedulerThread中的run函数中,找到JobRunShell的调用部分:

  1. @Override
  2. public void run() {
  3.      int acquiresFailed = 0;
  4.      while (!halted.get()) {
  5.          // …省略很多源代码
  6.          // 源代码279行
  7.          int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
  8.          // …省略很多源代码
  9.          if(availThreadCount > 0) {
  10.              // …省略很多源代码
  11.              // 取下一个trigger,周期是30秒取一次
  12.              triggers = qsRsrcs.getJobStore().acquireNextTriggers(
  13.                  now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBATchSize()), qsRsrcs.getBatchTimeWindow());
  14.              // …省略很多源代码
  15.              // 触发trigger
  16.              List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
  17.              // …省略很多源代码
  18.              // 释放trigger,当bndle的结果是null就释放trigger
  19.              if (bndle == null) {
  20.                  qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
  21.                  continue;
  22.              }
  23.              // …省略很多源代码
  24.              JobRunShell shell = null;
  25.              try {
  26.                  shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
  27.                  shell.initialize(qs);
  28.              } catch (SchedulerException se) {
  29.                  qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
  30.                  continue;
  31.              }
  32.              // 这里调用JobRunShell
  33.              if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
  34.                  // …省略很多源代码
  35.              }
  36.          }
  37.      }
  38. }

QuartzSchedulerThread的run函数就是核心处理流程了,qsRsrcs.getThreadPool().runInThread(shell)内部就根据具体的SimpleThreadPool或者ZeroSizeThreadPool来执行run函数,while循环基本就是不停的在轮询不断的去拿trigger,然后判断trigger的时间是不是到了,再按照时间trigger的时间规则执行Job,最后再标记为完成、释放trigger。

Trigger的处理

上面的逻辑都是通过qsRsrcs.getJobStore()得到的对象去处理Trigger的,返回对象是JobStore。任意查看qsRsrcs.getJobStore()调用的函数,比如:releaseAcquiredTriggerJobStore,它的实现有两个是比较重要的:一个是RAMJobStore,一个是JobStoreSupport。前者是RAM作为存储介质,作者还特意写上了这样一段注释:

This class implements a JobStore that utilizes RAM as its storage device.

As you should know, the ramification of this is that Access is extrememly fast, but the data is completely volatile – therefore this JobStore should not be used if true persistence between program shutdowns is required.

这段英文的央视翻译:

这个类实现了一个使用RAM作为存储设备的JobStore。

您应该知道,这样做的后果是访问速度非常快,但是数据是完全不稳定的——因此,如果需要在程序关闭之间实现真正的持久性,则不应该使用这个JobStore。

而且内存存储也无法分布式处理吧?所以,mars酱选择了观看JobStoreSupport:

-4

从import可以知道,这个玩意是连接了数据库的,所以呢,acquireNextTriggers、triggersFired、releaseAcquiredTrigger这些方法负责具体trigger的相关操作,都最终会执行到JobStoreSupport的第3844行的executeInNonManagedTXLock函数:

  1.      /**
  2.          * Execute the given callback having optionally acquired the given lock.
  3.          * This uses the non-managed transaction connection.
  4.          *
  5.          * @param lockName The name of the lock to acquire, for example
  6.          * “TRIGGER_ACCESS”. If null, then no lock is acquired, but the
  7.          * lockCallback is still executed in a non-managed transaction.
  8.          */
  9.      protected <T> T executeInNonManagedTXLock(
  10.              String lockName,
  11.              TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
  12.          boolean transOwner = false;
  13.          Connection conn = null;
  14.          try {
  15.              if (lockName != null) {
  16.                  // If we aren’t using db locks, then delay getting DB connection
  17.                  // until after acquiring the lock since it isn’t needed.
  18.                  if (getLockHandler().requiresConnection()) {
  19.                      conn = getNonManagedTXConnection();
  20.                  }
  21.                  transOwner = getLockHandler().obtainLock(conn, lockName);
  22.              }
  23.              if (conn == null) {
  24.                  conn = getNonManagedTXConnection();
  25.              }
  26.              final T result = txCallback.execute(conn);
  27.              try {
  28.                  commitConnection(conn);
  29.              } catch (JobPersistenceException e) {
  30.                  rollbackConnection(conn);
  31.                  if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() {
  32.                      @Override
  33.                      public Boolean execute(Connection conn) throws JobPersistenceException {
  34.                      return txValidator.validate(conn, result);
  35.                      }
  36.                  })) {
  37.                      throw e;
  38.                  }
  39.              }
  40.              Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
  41.              if(sigTime != null && sigTime >= 0) {
  42.                  signalSchedulingChangeImmediately(sigTime);
  43.              }
  44.              return result;
  45.          } catch (JobPersistenceException e) {
  46.              rollbackConnection(conn);
  47.              throw e;
  48.          } catch (RuntimeException e) {
  49.              rollbackConnection(conn);
  50.              throw new JobPersistenceException(“Unexpected runtime exception: “
  51.                      + e.getMessage(), e);
  52.          } finally {
  53.              try {
  54.                  releaseLock(lockName, transOwner);
  55.              } finally {
  56.                  cleanupConnection(conn);
  57.              }
  58.          }
  59.      }

整体的过程简要说明就是:获取数据库连接,给需要执行的trigger加锁,处理完之后再释放锁。

结合起来

结合前面的流程来看,一个调度器在执行前如果涉及到分布式的情况,流程如下:

  • 首先要获取QUARTZ_LOCKS表中对应的锁(在executeInNonManagedTXLock函数的getLockHandler().obtainLock(conn, lockName)中);
  • 获取锁后执行QuartzSchedulerThread中的JobRunShell,完成任务的执行;
  • 最后QuartzSchedulerThread中调用triggeredJobComplete函数,锁被释放,在executeInNonManagedTXLock函数的releaseLock(lockName, transOwner)中执行;

集群中的每一个调度器实例都遵循这样的操作流程。

总结

Quartz 是一款用于分布式系统的高性能调度框架,它采用了数据库作为分布式锁机制来保证同一时刻只有一个 Scheduler 实例访问数据库中的 Trigger。

在 Quartz 中,调度器线程会争抢访问数据库中的 Trigger,以确保在同一时刻只有一个调度器线程执行某个 Trigger 的操作。如果有多个调度器线程同时尝试访问同一个 Trigger,它们会相互等待对方释放锁。当一个调度器线程获得了锁,它就可以访问数据库并执行相应的操作。

另外,Quartz 还采用了悲观锁的策略来避免死锁的发生。当一个调度器线程尝试获取锁时,如果锁已经被其他线程占用,那么这个线程会等待,直到有线程释放了锁。如果在等待过程中没有其他线程释放锁,那么这个线程就会一直等待下去,直到调度器重新分配了锁。

总之,Quartz 的分布式调度原理是通过数据库锁和悲观锁来实现的,以保证同一时刻只有一个调度器线程访问数据库中的 Trigger,从而提高系统的性能和可靠性。

以上就是一分钟掌握Java Quartz定时任务的详细内容,更多关于Java Quartz定时任务的资料请关注我们其它相关文章!

标签

发表评论