新聞中心
本文轉(zhuǎn)載自微信公眾號(hào)「Java極客技術(shù)」,作者鴨血粉絲。轉(zhuǎn)載本文請(qǐng)聯(lián)系Java極客技術(shù)公眾號(hào)。

在潼南等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供做網(wǎng)站、網(wǎng)站設(shè)計(jì) 網(wǎng)站設(shè)計(jì)制作定制制作,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),成都品牌網(wǎng)站建設(shè),成都營銷網(wǎng)站建設(shè),外貿(mào)網(wǎng)站建設(shè),潼南網(wǎng)站建設(shè)費(fèi)用合理。
一、摘要
在很多業(yè)務(wù)的系統(tǒng)中,我們常常需要定時(shí)的執(zhí)行一些任務(wù),例如定時(shí)發(fā)短信、定時(shí)變更數(shù)據(jù)、定時(shí)發(fā)起促銷活動(dòng)等等。
在上篇文章中,我們簡單的介紹了定時(shí)任務(wù)的使用方式,不同的架構(gòu)對(duì)應(yīng)的解決方案也有所不同,總結(jié)起來主要分單機(jī)和分布式兩大類,本文會(huì)重點(diǎn)分析下單機(jī)的定時(shí)任務(wù)實(shí)現(xiàn)原理以及優(yōu)缺點(diǎn),分布式框架的實(shí)現(xiàn)原理會(huì)在后續(xù)文章中進(jìn)行分析。
從單機(jī)角度,定時(shí)任務(wù)實(shí)現(xiàn)主要有以下 3 種方案:
- while + sleep 組合
- 最小堆實(shí)現(xiàn)
- 時(shí)間輪實(shí)現(xiàn)
二、while+sleep組合
while+sleep 方案,簡單的說,就是定義一個(gè)線程,然后 while 循環(huán),通過 sleep 延遲時(shí)間來達(dá)到周期性調(diào)度任務(wù)。
簡單示例如下:
- public static void main(String[] args) {
- final long timeInterval = 5000;
- new Thread(new Runnable() {
- @Override
- public void run() {
- while (true) {
- System.out.println(Thread.currentThread().getName() + "每隔5秒執(zhí)行一次");
- try {
- Thread.sleep(timeInterval);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }).start();
- }
實(shí)現(xiàn)上非常簡單,如果我們想在創(chuàng)建一個(gè)每隔3秒鐘執(zhí)行一次任務(wù),怎么辦呢?
同樣的,也可以在創(chuàng)建一個(gè)線程,然后間隔性的調(diào)度方法;但是如果創(chuàng)建了大量這種類型的線程,這個(gè)時(shí)候會(huì)發(fā)現(xiàn)大量的定時(shí)任務(wù)線程在調(diào)度切換時(shí)性能消耗會(huì)非常大,而且整體效率低!
面對(duì)這種在情況,大佬們也想到了,于是想出了用一個(gè)線程將所有的定時(shí)任務(wù)存起來,事先排好序,按照一定的規(guī)則來調(diào)度,這樣不就可以極大的減少每個(gè)線程的切換消耗嗎?
正因此,JDK 中的 Timer 定時(shí)器由此誕生了!
三、最小堆實(shí)現(xiàn)
所謂最小堆方案,正如我們上面所說的,每當(dāng)有新任務(wù)加入的時(shí)候,會(huì)把需要即將要執(zhí)行的任務(wù)排到前面,同時(shí)會(huì)有一個(gè)線程不斷的輪詢判斷,如果當(dāng)前某個(gè)任務(wù)已經(jīng)到達(dá)執(zhí)行時(shí)間點(diǎn),就會(huì)立即執(zhí)行,具體實(shí)現(xiàn)代表就是 JDK 中的 Timer 定時(shí)器!
3.1、Timer
首先我們來一個(gè)簡單的 Timer 定時(shí)器例子
- public static void main(String[] args) {
- Timer timer = new Timer();
- //每隔1秒調(diào)用一次
- timer.schedule(new TimerTask() {
- @Override
- public void run() {
- System.out.println("test1");
- }
- }, 1000, 1000);
- //每隔3秒調(diào)用一次
- timer.schedule(new TimerTask() {
- @Override
- public void run() {
- System.out.println("test2");
- }
- }, 3000, 3000);
- }
實(shí)現(xiàn)上,好像跟我們上面介紹的 while+sleep 方案差不多,同樣也是起一個(gè)TimerTask線程任務(wù),只不過共用一個(gè)Timer調(diào)度器。
下面我們一起來打開源碼看看里面到底有些啥!
- 進(jìn)入Timer.schedule()方法
從方法上可以看出,這里主要做參數(shù)驗(yàn)證,其中TimerTask是一個(gè)線程任務(wù),delay表示延遲多久執(zhí)行(單位毫秒),period表示多久執(zhí)行一次(單位毫秒)
- public void schedule(TimerTask task, long delay, long period) {
- if (delay < 0)
- throw new IllegalArgumentException("Negative delay.");
- if (period <= 0)
- throw new IllegalArgumentException("Non-positive period.");
- sched(task, System.currentTimeMillis()+delay, -period);
- }
- 接著看sched()方法
這步操作中,可以很清晰的看到,在同步代碼塊里,會(huì)將task對(duì)象加入到queue
- private void sched(TimerTask task, long time, long period) {
- if (time < 0)
- throw new IllegalArgumentException("Illegal execution time.");
- // Constrain value of period sufficiently to prevent numeric
- // overflow while still being effectively infinitely large.
- if (Math.abs(period) > (Long.MAX_VALUE >> 1))
- period >>= 1;
- synchronized(queue) {
- if (!thread.newTasksMayBeScheduled)
- throw new IllegalStateException("Timer already cancelled.");
- synchronized(task.lock) {
- if (task.state != TimerTask.VIRGIN)
- throw new IllegalStateException(
- "Task already scheduled or cancelled");
- task.nextExecutionTime = time;
- task.period = period;
- task.state = TimerTask.SCHEDULED;
- }
- queue.add(task);
- if (queue.getMin() == task)
- queue.notify();
- }
- }
- 我們繼續(xù)來看queue對(duì)象
任務(wù)會(huì)將入到TaskQueue隊(duì)列中,同時(shí)在Timer初始化階段會(huì)將TaskQueue作為參數(shù)傳入到TimerThread線程中,并且起到線程
- public class Timer {
- private final TaskQueue queue = new TaskQueue();
- private final TimerThread thread = new TimerThread(queue);
- public Timer() {
- this("Timer-" + serialNumber());
- }
- public Timer(String name) {
- thread.setName(name);
- thread.start();
- }
- //...
- }
- 而TaskQueue其實(shí)是一個(gè)最小堆的數(shù)據(jù)實(shí)體類,源碼如下
每當(dāng)有新元素加入的時(shí)候,會(huì)對(duì)原來的數(shù)組進(jìn)行重排,會(huì)將即將要執(zhí)行的任務(wù)排在數(shù)組的前面
- class TaskQueue {
- private TimerTask[] queue = new TimerTask[128];
- private int size = 0;
- void add(TimerTask task) {
- // Grow backing store if necessary
- if (size + 1 == queue.length)
- queue = Arrays.copyOf(queue, 2*queue.length);
- queue[++size] = task;
- fixUp(size);
- }
- private void fixUp(int k) {
- while (k > 1) {
- int j = k >> 1;
- if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)
- break;
- TimerTask tmp = queue[j];
- queue[j] = queue[k];
- queue[k] = tmp;
- k = j;
- }
- }
- //....
- }
- 最后我們來看看TimerThread
TimerThread其實(shí)就是一個(gè)任務(wù)調(diào)度線程,首先從TaskQueue里面獲取排在最前面的任務(wù),然后判斷它是否到達(dá)任務(wù)執(zhí)行時(shí)間點(diǎn),如果已到達(dá),就會(huì)立刻執(zhí)行任務(wù)
- class TimerThread extends Thread {
- boolean newTasksMayBeScheduled = true;
- private TaskQueue queue;
- TimerThread(TaskQueue queue) {
- this.queue = queue;
- }
- public void run() {
- try {
- mainLoop();
- } finally {
- // Someone killed this Thread, behave as if Timer cancelled
- synchronized(queue) {
- newTasksMayBeScheduled = false;
- queue.clear(); // Eliminate obsolete references
- }
- }
- }
- /**
- * The main timer loop. (See class comment.)
- */
- private void mainLoop() {
- while (true) {
- try {
- TimerTask task;
- boolean taskFired;
- synchronized(queue) {
- // Wait for queue to become non-empty
- while (queue.isEmpty() && newTasksMayBeScheduled)
- queue.wait();
- if (queue.isEmpty())
- break; // Queue is empty and will forever remain; die
- // Queue nonempty; look at first evt and do the right thing
- long currentTime, executionTime;
- task = queue.getMin();
- synchronized(task.lock) {
- if (task.state == TimerTask.CANCELLED) {
- queue.removeMin();
- continue; // No action required, poll queue again
- }
- currentTime = System.currentTimeMillis();
- executionTime = task.nextExecutionTime;
- if (taskFired = (executionTime<=currentTime)) {
- if (task.period == 0) { // Non-repeating, remove
- queue.removeMin();
- task.state = TimerTask.EXECUTED;
- } else { // Repeating task, reschedule
- queue.rescheduleMin(
- task.period<0 ? currentTime - task.period
- : executionTime + task.period);
- }
- }
- }
- if (!taskFired) // Task hasn't yet fired; wait
- queue.wait(executionTime - currentTime);
- }
- if (taskFired) // Task fired; run it, holding no locks
- task.run();
- } catch(InterruptedException e) {
- }
- }
- }
- }
總結(jié)這個(gè)利用最小堆實(shí)現(xiàn)的方案,相比 while + sleep 方案,多了一個(gè)線程來管理所有的任務(wù),優(yōu)點(diǎn)就是減少了線程之間的性能開銷,提升了執(zhí)行效率;但是同樣也帶來的了一些缺點(diǎn),整體的新加任務(wù)寫入效率變成了 O(log(n))。
同時(shí),細(xì)心的發(fā)現(xiàn),這個(gè)方案還有以下幾個(gè)缺點(diǎn):
- 串行阻塞:調(diào)度線程只有一個(gè),長任務(wù)會(huì)阻塞短任務(wù)的執(zhí)行,例如,A任務(wù)跑了一分鐘,B任務(wù)至少需要等1分鐘才能跑
- 容錯(cuò)能力差:沒有異常處理能力,一旦一個(gè)任務(wù)執(zhí)行故障,后續(xù)任務(wù)都無法執(zhí)行
3.2、ScheduledThreadPoolExecutor
鑒于 Timer 的上述缺陷,從 Java 5 開始,推出了基于線程池設(shè)計(jì)的 ScheduledThreadPoolExecutor 。
其設(shè)計(jì)思想是,每一個(gè)被調(diào)度的任務(wù)都會(huì)由線程池來管理執(zhí)行,因此任務(wù)是并發(fā)執(zhí)行的,相互之間不會(huì)受到干擾。需要注意的是,只有當(dāng)任務(wù)的執(zhí)行時(shí)間到來時(shí),ScheduledThreadPoolExecutor 才會(huì)真正啟動(dòng)一個(gè)線程,其余時(shí)間 ScheduledThreadPoolExecutor 都是在輪詢?nèi)蝿?wù)的狀態(tài)。
簡單的使用示例:
- public static void main(String[] args) {
- ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(3);
- //啟動(dòng)1秒之后,每隔1秒執(zhí)行一次
- executor.scheduleAtFixedRate((new Runnable() {
- @Override
- public void run() {
- System.out.println("test3");
- }
- }),1,1, TimeUnit.SECONDS);
- //啟動(dòng)1秒之后,每隔3秒執(zhí)行一次
- executor.scheduleAtFixedRate((new Runnable() {
- @Override
- public void run() {
- System.out.println("test4");
- }
- }),1,3, TimeUnit.SECONDS);
- }
同樣的,我們首先打開源碼,看看里面到底做了啥
- 進(jìn)入scheduleAtFixedRate()方法
首先是校驗(yàn)基本參數(shù),然后將任務(wù)作為封裝到ScheduledFutureTask線程中,ScheduledFutureTask繼承自RunnableScheduledFuture,并作為參數(shù)調(diào)用delayedExecute()方法進(jìn)行預(yù)處理
- public ScheduledFuture> scheduleAtFixedRate(Runnable command,
- long initialDelay,
- long period,
- TimeUnit unit) {
- if (command == null || unit == null)
- throw new NullPointerException();
- if (period <= 0)
- throw new IllegalArgumentException();
- ScheduledFutureTask
sft = - new ScheduledFutureTask
(command, - null,
- triggerTime(initialDelay, unit),
- unit.toNanos(period));
- RunnableScheduledFuture
t = decorateTask(command, sft); - sft.outerTask = t;
- delayedExecute(t);
- return t;
- }
- 繼續(xù)看delayedExecute()方法
可以很清晰的看到,當(dāng)線程池沒有關(guān)閉的時(shí)候,會(huì)通過super.getQueue().add(task)操作,將任務(wù)加入到隊(duì)列,同時(shí)調(diào)用ensurePrestart()方法做預(yù)處理
- private void delayedExecute(RunnableScheduledFuture> task) {
- if (isShutdown())
- reject(task);
- else {
- super.getQueue().add(task);
- if (isShutdown() &&
- !canRunInCurrentRunState(task.isPeriodic()) &&
- remove(task))
- task.cancel(false);
- else
- //預(yù)處理
- ensurePrestart();
- }
- }
其中super.getQueue()得到的是一個(gè)自定義的new DelayedWorkQueue()阻塞隊(duì)列,數(shù)據(jù)存儲(chǔ)方面也是一個(gè)最小堆結(jié)構(gòu)的隊(duì)列,這一點(diǎn)在初始化new ScheduledThreadPoolExecutor()的時(shí)候,可以看出!
- public ScheduledThreadPoolExecutor(int corePoolSize) {
- super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
- new DelayedWorkQueue());
- }
打開源碼可以看到,DelayedWorkQueue其實(shí)是ScheduledThreadPoolExecutor中的一個(gè)靜態(tài)內(nèi)部類,在添加的時(shí)候,會(huì)將任務(wù)加入到RunnableScheduledFuture數(shù)組中,同時(shí)線程池中的Woker線程會(huì)通過調(diào)用任務(wù)隊(duì)列中的take()方法獲取對(duì)應(yīng)的ScheduledFutureTask線程任務(wù),接著執(zhí)行對(duì)應(yīng)的任務(wù)線程
- static class DelayedWorkQueue extends AbstractQueue
- implements BlockingQueue
{ - private static final int INITIAL_CAPACITY = 16;
- private RunnableScheduledFuture>[] queue =
- new RunnableScheduledFuture>[INITIAL_CAPACITY];
- private final ReentrantLock lock = new ReentrantLock();
- private int size = 0;
- //....
- public boolean add(Runnable e) {
- return offer(e);
- }
- public boolean offer(Runnable x) {
- if (x == null)
- throw new NullPointerException();
- RunnableScheduledFuture> e = (RunnableScheduledFuture>)x;
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- int i = size;
- if (i >= queue.length)
- grow();
- size = i + 1;
- if (i == 0) {
- queue[0] = e;
- setIndex(e, 0);
- } else {
- siftUp(i, e);
- }
- if (queue[0] == e) {
- leader = null;
- available.signal();
- }
- } finally {
- lock.unlock();
- }
- return true;
- }
- public RunnableScheduledFuture> take() throws InterruptedException {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- for (;;) {
- RunnableScheduledFuture> first = queue[0];
- if (first == null)
- available.await();
- else {
- long delay = first.getDelay(NANOSECONDS);
- if (delay <= 0)
- return finishPoll(first);
- first = null; // don't retain ref while waiting
- if (leader != null)
- available.await();
- else {
- Thread thisThread = Thread.currentThread();
- leader = thisThread;
- try {
- available.awaitNanos(delay);
- } finally {
- if (leader == thisThread)
- leader = null;
- }
- }
- }
- }
- } finally {
- if (leader == null && queue[0] != null)
- available.signal();
- lock.unlock();
- }
- }
- }
- 回到我們最開始說到的ScheduledFutureTask任務(wù)線程類,最終執(zhí)行任務(wù)的其實(shí)就是它
ScheduledFutureTask任務(wù)線程,才是真正執(zhí)行任務(wù)的線程類,只是繞了一圈,做了很多包裝,run()方法就是真正執(zhí)行定時(shí)任務(wù)的方法。
- private class ScheduledFutureTask
- extends FutureTask
implements RunnableScheduledFuture { - /** Sequence number to break ties FIFO */
- private final long sequenceNumber;
- /** The time the task is enabled to execute in nanoTime units */
- private long time;
- /**
- * Period in nanoseconds for repeating tasks. A positive
- * value indicates fixed-rate execution. A negative value
- * indicates fixed-delay execution. A value of 0 indicates a
- * non-repeating task.
- */
- private final long period;
- /** The actual task to be re-enqueued by reExecutePeriodic */
- RunnableScheduledFuture
outerTask = this; - /**
- * Overrides FutureTask version so as to reset/requeue if periodic.
- */
- public void run() {
- boolean periodic = isPeriodic();
- if (!canRunInCurrentRunState(periodic))
- cancel(false);
- else if (!periodic)
- ScheduledFutureTask.super.run();
- else if (ScheduledFutureTask.super.runAndReset()) {
- setNextRunTime();
- reExecutePeriodic(outerTask);
- }
- }
- //...
- }
3.3、小結(jié)
ScheduledExecutorService 相比 Timer 定時(shí)器,完美的解決上面說到的 Timer 存在的兩個(gè)缺點(diǎn)!
在單體應(yīng)用里面,使用 ScheduledExecutorService 可以解決大部分需要使用定時(shí)任務(wù)的業(yè)務(wù)需求!
但是這是否意味著它是最佳的解決方案呢?
我們發(fā)現(xiàn)線程池中 ScheduledExecutorService 的排序容器跟 Timer 一樣,都是采用最小堆的存儲(chǔ)結(jié)構(gòu),新任務(wù)加入排序效率是O(log(n)),執(zhí)行取任務(wù)是O(1)。
這里的寫入排序效率其實(shí)是有空間可提升的,有可能優(yōu)化到O(1)的時(shí)間復(fù)雜度,也就是我們下面要介紹的時(shí)間輪實(shí)現(xiàn)!
四、時(shí)間輪實(shí)現(xiàn)
所謂時(shí)間輪(RingBuffer)實(shí)現(xiàn),從數(shù)據(jù)結(jié)構(gòu)上看,簡單的說就是循環(huán)隊(duì)列,從名稱上看可能感覺很抽象。
它其實(shí)就是一個(gè)環(huán)形的數(shù)組,如圖所示,假設(shè)我們創(chuàng)建了一個(gè)長度為 8 的時(shí)間輪。
插入、取值流程:
- 1.當(dāng)我們需要新建一個(gè) 1s 延時(shí)任務(wù)的時(shí)候,則只需要將它放到下標(biāo)為 1 的那個(gè)槽中,2、3、...、7也同樣如此。
- 2.而如果是新建一個(gè) 10s 的延時(shí)任務(wù),則需要將它放到下標(biāo)為 2 的槽中,但同時(shí)需要記錄它所對(duì)應(yīng)的圈數(shù),也就是 1 圈,不然就和 2 秒的延時(shí)消息重復(fù)了
- 3.當(dāng)創(chuàng)建一個(gè) 21s 的延時(shí)任務(wù)時(shí),它所在的位置就在下標(biāo)為 5 的槽中,同樣的需要為他加上圈數(shù)為 2,依次類推...
因此,總結(jié)起來有兩個(gè)核心的變量:
- 數(shù)組下標(biāo):表示某個(gè)任務(wù)延遲時(shí)間,從數(shù)據(jù)操作上對(duì)執(zhí)行時(shí)間點(diǎn)進(jìn)行取余
- 圈數(shù):表示需要循環(huán)圈數(shù)
通過這張圖可以更直觀的理解!
當(dāng)我們需要取出延時(shí)任務(wù)時(shí),只需要每秒往下移動(dòng)這個(gè)指針,然后取出該位置的所有任務(wù)即可,取任務(wù)的時(shí)間消耗為O(1)。
當(dāng)我們需要插入任務(wù)式,也只需要計(jì)算出對(duì)應(yīng)的下表和圈數(shù),即可將任務(wù)插入到對(duì)應(yīng)的數(shù)組位置中,插入任務(wù)的時(shí)間消耗為O(1)。
如果時(shí)間輪的槽比較少,會(huì)導(dǎo)致某一個(gè)槽上的任務(wù)非常多,那么效率也比較低,這就和 HashMap 的 hash 沖突是一樣的,因此在設(shè)計(jì)槽的時(shí)候不能太大也不能太小。
4.1、代碼實(shí)現(xiàn)
- 首先創(chuàng)建一個(gè)RingBufferWheel時(shí)間輪定時(shí)任務(wù)管理器
- public class RingBufferWheel {
- private Logger logger = LoggerFactory.getLogger(RingBufferWheel.class);
- /**
- * default ring buffer size
- */
- private static final int STATIC_RING_SIZE = 64;
- private Object[] ringBuffer;
- private int bufferSize;
- /**
- * business thread pool
- */
- private ExecutorService executorService;
- private volatile int size = 0;
- /***
- * task stop sign
- */
- private volatile boolean stop = false;
- /**
- * task start sign
- */
- private volatile AtomicBoolean start = new AtomicBoolean(false);
- /**
- * total tick times
- */
- private AtomicInteger tick = new AtomicInteger();
- private Lock lock = new ReentrantLock();
- private Condition condition = lock.newCondition();
- private AtomicInteger taskId = new AtomicInteger();
- private Map
taskMap = new ConcurrentHashMap<>(16); - /**
- * Create a new delay task ring buffer by default size
- *
- * @param executorService the business thread pool
- */
- public RingBufferWheel(ExecutorService executorService) {
- this.executorService = executorService;
- this.bufferSize = STATIC_RING_SIZE;
- this.ringBuffer = new Object[bufferSize];
- }
- /**
- * Create a new delay task ring buffer by custom buffer size
- *
- * @param executorService the business thread pool
- * @param bufferSize custom buffer size
- */
- public RingBufferWheel(ExecutorService executorService, int bufferSize) {
- this(executorService);
- if (!powerOf2(bufferSize)) {
- throw new RuntimeException("bufferSize=[" + bufferSize + "] must be a power of 2");
- }
- this.bufferSize = bufferSize;
- this.ringBuffer = new Object[bufferSize];
- }
- /**
- * Add a task into the ring buffer(thread safe)
- *
- * @param task business task extends {@link Task}
- &nb
分享標(biāo)題:定時(shí)任務(wù)實(shí)現(xiàn)原理詳解
文章URL:http://m.5511xx.com/article/cdochdg.html


咨詢
建站咨詢
