日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
定時(shí)任務(wù)實(shí)現(xiàn)原理詳解

 本文轉(zhuǎn)載自微信公眾號(hào)「Java極客技術(shù)」,作者鴨血粉絲。轉(zhuǎn)載本文請(qǐng)聯(lián)系Java極客技術(shù)公眾號(hào)。  

在潼南等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供做網(wǎng)站、網(wǎng)站設(shè)計(jì) 網(wǎng)站設(shè)計(jì)制作定制制作,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),成都品牌網(wǎng)站建設(shè),成都營銷網(wǎng)站建設(shè),外貿(mào)網(wǎng)站建設(shè),潼南網(wǎng)站建設(shè)費(fèi)用合理。

一、摘要

在很多業(yè)務(wù)的系統(tǒng)中,我們常常需要定時(shí)的執(zhí)行一些任務(wù),例如定時(shí)發(fā)短信、定時(shí)變更數(shù)據(jù)、定時(shí)發(fā)起促銷活動(dòng)等等。

在上篇文章中,我們簡單的介紹了定時(shí)任務(wù)的使用方式,不同的架構(gòu)對(duì)應(yīng)的解決方案也有所不同,總結(jié)起來主要分單機(jī)和分布式兩大類,本文會(huì)重點(diǎn)分析下單機(jī)的定時(shí)任務(wù)實(shí)現(xiàn)原理以及優(yōu)缺點(diǎn),分布式框架的實(shí)現(xiàn)原理會(huì)在后續(xù)文章中進(jìn)行分析。

從單機(jī)角度,定時(shí)任務(wù)實(shí)現(xiàn)主要有以下 3 種方案:

  • while + sleep 組合
  • 最小堆實(shí)現(xiàn)
  • 時(shí)間輪實(shí)現(xiàn)

二、while+sleep組合

while+sleep 方案,簡單的說,就是定義一個(gè)線程,然后 while 循環(huán),通過 sleep 延遲時(shí)間來達(dá)到周期性調(diào)度任務(wù)。

簡單示例如下:

 
 
 
 
  1. public static void main(String[] args) { 
  2.     final long timeInterval = 5000; 
  3.     new Thread(new Runnable() { 
  4.         @Override 
  5.         public void run() { 
  6.             while (true) { 
  7.                 System.out.println(Thread.currentThread().getName() + "每隔5秒執(zhí)行一次"); 
  8.                 try { 
  9.                     Thread.sleep(timeInterval); 
  10.                 } catch (InterruptedException e) { 
  11.                     e.printStackTrace(); 
  12.                 } 
  13.             } 
  14.         } 
  15.     }).start(); 

實(shí)現(xiàn)上非常簡單,如果我們想在創(chuàng)建一個(gè)每隔3秒鐘執(zhí)行一次任務(wù),怎么辦呢?

同樣的,也可以在創(chuàng)建一個(gè)線程,然后間隔性的調(diào)度方法;但是如果創(chuàng)建了大量這種類型的線程,這個(gè)時(shí)候會(huì)發(fā)現(xiàn)大量的定時(shí)任務(wù)線程在調(diào)度切換時(shí)性能消耗會(huì)非常大,而且整體效率低!

面對(duì)這種在情況,大佬們也想到了,于是想出了用一個(gè)線程將所有的定時(shí)任務(wù)存起來,事先排好序,按照一定的規(guī)則來調(diào)度,這樣不就可以極大的減少每個(gè)線程的切換消耗嗎?

正因此,JDK 中的 Timer 定時(shí)器由此誕生了!

三、最小堆實(shí)現(xiàn)

所謂最小堆方案,正如我們上面所說的,每當(dāng)有新任務(wù)加入的時(shí)候,會(huì)把需要即將要執(zhí)行的任務(wù)排到前面,同時(shí)會(huì)有一個(gè)線程不斷的輪詢判斷,如果當(dāng)前某個(gè)任務(wù)已經(jīng)到達(dá)執(zhí)行時(shí)間點(diǎn),就會(huì)立即執(zhí)行,具體實(shí)現(xiàn)代表就是 JDK 中的 Timer 定時(shí)器!

3.1、Timer

首先我們來一個(gè)簡單的 Timer 定時(shí)器例子

 
 
 
 
  1. public static void main(String[] args) { 
  2.     Timer timer = new Timer(); 
  3.     //每隔1秒調(diào)用一次 
  4.     timer.schedule(new TimerTask() { 
  5.         @Override 
  6.         public void run() { 
  7.             System.out.println("test1"); 
  8.         } 
  9.     }, 1000, 1000); 
  10.     //每隔3秒調(diào)用一次 
  11.     timer.schedule(new TimerTask() { 
  12.         @Override 
  13.         public void run() { 
  14.             System.out.println("test2"); 
  15.         } 
  16.     }, 3000, 3000); 
  17.  

實(shí)現(xiàn)上,好像跟我們上面介紹的 while+sleep 方案差不多,同樣也是起一個(gè)TimerTask線程任務(wù),只不過共用一個(gè)Timer調(diào)度器。

下面我們一起來打開源碼看看里面到底有些啥!

  • 進(jìn)入Timer.schedule()方法

從方法上可以看出,這里主要做參數(shù)驗(yàn)證,其中TimerTask是一個(gè)線程任務(wù),delay表示延遲多久執(zhí)行(單位毫秒),period表示多久執(zhí)行一次(單位毫秒)

 
 
 
 
  1. public void schedule(TimerTask task, long delay, long period) { 
  2.     if (delay < 0) 
  3.         throw new IllegalArgumentException("Negative delay."); 
  4.     if (period <= 0) 
  5.         throw new IllegalArgumentException("Non-positive period."); 
  6.     sched(task, System.currentTimeMillis()+delay, -period); 
  • 接著看sched()方法

這步操作中,可以很清晰的看到,在同步代碼塊里,會(huì)將task對(duì)象加入到queue

 
 
 
 
  1. private void sched(TimerTask task, long time, long period) { 
  2.     if (time < 0) 
  3.         throw new IllegalArgumentException("Illegal execution time."); 
  4.  
  5.     // Constrain value of period sufficiently to prevent numeric 
  6.     // overflow while still being effectively infinitely large. 
  7.     if (Math.abs(period) > (Long.MAX_VALUE >> 1)) 
  8.         period >>= 1; 
  9.  
  10.     synchronized(queue) { 
  11.         if (!thread.newTasksMayBeScheduled) 
  12.             throw new IllegalStateException("Timer already cancelled."); 
  13.  
  14.         synchronized(task.lock) { 
  15.             if (task.state != TimerTask.VIRGIN) 
  16.                 throw new IllegalStateException( 
  17.                     "Task already scheduled or cancelled"); 
  18.             task.nextExecutionTime = time; 
  19.             task.period = period; 
  20.             task.state = TimerTask.SCHEDULED; 
  21.         } 
  22.  
  23.         queue.add(task); 
  24.         if (queue.getMin() == task) 
  25.             queue.notify(); 
  26.     } 
  • 我們繼續(xù)來看queue對(duì)象

任務(wù)會(huì)將入到TaskQueue隊(duì)列中,同時(shí)在Timer初始化階段會(huì)將TaskQueue作為參數(shù)傳入到TimerThread線程中,并且起到線程

 
 
 
 
  1. public class Timer { 
  2.      
  3.     private final TaskQueue queue = new TaskQueue(); 
  4.  
  5.     private final TimerThread thread = new TimerThread(queue); 
  6.  
  7.     public Timer() { 
  8.         this("Timer-" + serialNumber()); 
  9.     } 
  10.  
  11.     public Timer(String name) { 
  12.         thread.setName(name); 
  13.         thread.start(); 
  14.     } 
  15.  
  16.     //... 
  • 而TaskQueue其實(shí)是一個(gè)最小堆的數(shù)據(jù)實(shí)體類,源碼如下

每當(dāng)有新元素加入的時(shí)候,會(huì)對(duì)原來的數(shù)組進(jìn)行重排,會(huì)將即將要執(zhí)行的任務(wù)排在數(shù)組的前面

 
 
 
 
  1. class TaskQueue { 
  2.      
  3.     private TimerTask[] queue = new TimerTask[128]; 
  4.  
  5.  
  6.     private int size = 0; 
  7.  
  8.     void add(TimerTask task) { 
  9.         // Grow backing store if necessary 
  10.         if (size + 1 == queue.length) 
  11.             queue = Arrays.copyOf(queue, 2*queue.length); 
  12.  
  13.         queue[++size] = task; 
  14.         fixUp(size); 
  15.     } 
  16.  
  17.     private void fixUp(int k) { 
  18.         while (k > 1) { 
  19.             int j = k >> 1; 
  20.             if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime) 
  21.                 break; 
  22.             TimerTask tmp = queue[j]; 
  23.    queue[j] = queue[k]; 
  24.    queue[k] = tmp; 
  25.             k = j; 
  26.         } 
  27.     } 
  28.   
  29.  //.... 
  • 最后我們來看看TimerThread

TimerThread其實(shí)就是一個(gè)任務(wù)調(diào)度線程,首先從TaskQueue里面獲取排在最前面的任務(wù),然后判斷它是否到達(dá)任務(wù)執(zhí)行時(shí)間點(diǎn),如果已到達(dá),就會(huì)立刻執(zhí)行任務(wù)

 
 
 
 
  1. class TimerThread extends Thread { 
  2.  
  3.     boolean newTasksMayBeScheduled = true; 
  4.  
  5.     private TaskQueue queue; 
  6.  
  7.     TimerThread(TaskQueue queue) { 
  8.         this.queue = queue; 
  9.     } 
  10.  
  11.     public void run() { 
  12.         try { 
  13.             mainLoop(); 
  14.         } finally { 
  15.             // Someone killed this Thread, behave as if Timer cancelled 
  16.             synchronized(queue) { 
  17.                 newTasksMayBeScheduled = false; 
  18.                 queue.clear();  // Eliminate obsolete references 
  19.             } 
  20.         } 
  21.     } 
  22.  
  23.     /** 
  24.      * The main timer loop.  (See class comment.) 
  25.      */ 
  26.     private void mainLoop() { 
  27.         while (true) { 
  28.             try { 
  29.                 TimerTask task; 
  30.                 boolean taskFired; 
  31.                 synchronized(queue) { 
  32.                     // Wait for queue to become non-empty 
  33.                     while (queue.isEmpty() && newTasksMayBeScheduled) 
  34.                         queue.wait(); 
  35.                     if (queue.isEmpty()) 
  36.                         break; // Queue is empty and will forever remain; die 
  37.  
  38.                     // Queue nonempty; look at first evt and do the right thing 
  39.                     long currentTime, executionTime; 
  40.                     task = queue.getMin(); 
  41.                     synchronized(task.lock) { 
  42.                         if (task.state == TimerTask.CANCELLED) { 
  43.                             queue.removeMin(); 
  44.                             continue;  // No action required, poll queue again 
  45.                         } 
  46.                         currentTime = System.currentTimeMillis(); 
  47.                         executionTime = task.nextExecutionTime; 
  48.                         if (taskFired = (executionTime<=currentTime)) { 
  49.                             if (task.period == 0) { // Non-repeating, remove 
  50.                                 queue.removeMin(); 
  51.                                 task.state = TimerTask.EXECUTED; 
  52.                             } else { // Repeating task, reschedule 
  53.                                 queue.rescheduleMin( 
  54.                                   task.period<0 ? currentTime   - task.period 
  55.                                                 : executionTime + task.period); 
  56.                             } 
  57.                         } 
  58.                     } 
  59.                     if (!taskFired) // Task hasn't yet fired; wait 
  60.                         queue.wait(executionTime - currentTime); 
  61.                 } 
  62.                 if (taskFired)  // Task fired; run it, holding no locks 
  63.                     task.run(); 
  64.             } catch(InterruptedException e) { 
  65.             } 
  66.         } 
  67.     } 

總結(jié)這個(gè)利用最小堆實(shí)現(xiàn)的方案,相比 while + sleep 方案,多了一個(gè)線程來管理所有的任務(wù),優(yōu)點(diǎn)就是減少了線程之間的性能開銷,提升了執(zhí)行效率;但是同樣也帶來的了一些缺點(diǎn),整體的新加任務(wù)寫入效率變成了 O(log(n))。

同時(shí),細(xì)心的發(fā)現(xiàn),這個(gè)方案還有以下幾個(gè)缺點(diǎn):

  • 串行阻塞:調(diào)度線程只有一個(gè),長任務(wù)會(huì)阻塞短任務(wù)的執(zhí)行,例如,A任務(wù)跑了一分鐘,B任務(wù)至少需要等1分鐘才能跑
  • 容錯(cuò)能力差:沒有異常處理能力,一旦一個(gè)任務(wù)執(zhí)行故障,后續(xù)任務(wù)都無法執(zhí)行

3.2、ScheduledThreadPoolExecutor

鑒于 Timer 的上述缺陷,從 Java 5 開始,推出了基于線程池設(shè)計(jì)的 ScheduledThreadPoolExecutor 。

其設(shè)計(jì)思想是,每一個(gè)被調(diào)度的任務(wù)都會(huì)由線程池來管理執(zhí)行,因此任務(wù)是并發(fā)執(zhí)行的,相互之間不會(huì)受到干擾。需要注意的是,只有當(dāng)任務(wù)的執(zhí)行時(shí)間到來時(shí),ScheduledThreadPoolExecutor 才會(huì)真正啟動(dòng)一個(gè)線程,其余時(shí)間 ScheduledThreadPoolExecutor 都是在輪詢?nèi)蝿?wù)的狀態(tài)。

簡單的使用示例:

 
 
 
 
  1. public static void main(String[] args) { 
  2.     ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(3); 
  3.     //啟動(dòng)1秒之后,每隔1秒執(zhí)行一次 
  4.     executor.scheduleAtFixedRate((new Runnable() { 
  5.         @Override 
  6.         public void run() { 
  7.             System.out.println("test3"); 
  8.         } 
  9.     }),1,1, TimeUnit.SECONDS); 
  10.     //啟動(dòng)1秒之后,每隔3秒執(zhí)行一次 
  11.     executor.scheduleAtFixedRate((new Runnable() { 
  12.         @Override 
  13.         public void run() { 
  14.             System.out.println("test4"); 
  15.         } 
  16.     }),1,3, TimeUnit.SECONDS); 

同樣的,我們首先打開源碼,看看里面到底做了啥

  • 進(jìn)入scheduleAtFixedRate()方法

首先是校驗(yàn)基本參數(shù),然后將任務(wù)作為封裝到ScheduledFutureTask線程中,ScheduledFutureTask繼承自RunnableScheduledFuture,并作為參數(shù)調(diào)用delayedExecute()方法進(jìn)行預(yù)處理

 
 
 
 
  1. public ScheduledFuture scheduleAtFixedRate(Runnable command, 
  2.                                               long initialDelay, 
  3.                                               long period, 
  4.                                               TimeUnit unit) { 
  5.     if (command == null || unit == null) 
  6.         throw new NullPointerException(); 
  7.     if (period <= 0) 
  8.         throw new IllegalArgumentException(); 
  9.     ScheduledFutureTask sft = 
  10.         new ScheduledFutureTask(command, 
  11.                                       null, 
  12.                                       triggerTime(initialDelay, unit), 
  13.                                       unit.toNanos(period)); 
  14.     RunnableScheduledFuture t = decorateTask(command, sft); 
  15.     sft.outerTask = t; 
  16.     delayedExecute(t); 
  17.     return t; 
  • 繼續(xù)看delayedExecute()方法

可以很清晰的看到,當(dāng)線程池沒有關(guān)閉的時(shí)候,會(huì)通過super.getQueue().add(task)操作,將任務(wù)加入到隊(duì)列,同時(shí)調(diào)用ensurePrestart()方法做預(yù)處理

 
 
 
 
  1. private void delayedExecute(RunnableScheduledFuture task) { 
  2.     if (isShutdown()) 
  3.         reject(task); 
  4.     else { 
  5.         super.getQueue().add(task); 
  6.         if (isShutdown() && 
  7.             !canRunInCurrentRunState(task.isPeriodic()) && 
  8.             remove(task)) 
  9.             task.cancel(false); 
  10.         else 
  11.    //預(yù)處理 
  12.             ensurePrestart(); 
  13.     } 

其中super.getQueue()得到的是一個(gè)自定義的new DelayedWorkQueue()阻塞隊(duì)列,數(shù)據(jù)存儲(chǔ)方面也是一個(gè)最小堆結(jié)構(gòu)的隊(duì)列,這一點(diǎn)在初始化new ScheduledThreadPoolExecutor()的時(shí)候,可以看出!

 
 
 
 
  1. public ScheduledThreadPoolExecutor(int corePoolSize) { 
  2.     super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, 
  3.           new DelayedWorkQueue()); 

打開源碼可以看到,DelayedWorkQueue其實(shí)是ScheduledThreadPoolExecutor中的一個(gè)靜態(tài)內(nèi)部類,在添加的時(shí)候,會(huì)將任務(wù)加入到RunnableScheduledFuture數(shù)組中,同時(shí)線程池中的Woker線程會(huì)通過調(diào)用任務(wù)隊(duì)列中的take()方法獲取對(duì)應(yīng)的ScheduledFutureTask線程任務(wù),接著執(zhí)行對(duì)應(yīng)的任務(wù)線程

 
 
 
 
  1. static class DelayedWorkQueue extends AbstractQueue 
  2.         implements BlockingQueue { 
  3.  
  4.     private static final int INITIAL_CAPACITY = 16; 
  5.     private RunnableScheduledFuture[] queue = 
  6.         new RunnableScheduledFuture[INITIAL_CAPACITY]; 
  7.     private final ReentrantLock lock = new ReentrantLock(); 
  8.     private int size = 0;    
  9.  
  10.     //.... 
  11.  
  12.     public boolean add(Runnable e) { 
  13.         return offer(e); 
  14.     } 
  15.  
  16.     public boolean offer(Runnable x) { 
  17.         if (x == null) 
  18.             throw new NullPointerException(); 
  19.         RunnableScheduledFuture e = (RunnableScheduledFuture)x; 
  20.         final ReentrantLock lock = this.lock; 
  21.         lock.lock(); 
  22.         try { 
  23.             int i = size; 
  24.             if (i >= queue.length) 
  25.                 grow(); 
  26.             size = i + 1; 
  27.             if (i == 0) { 
  28.                 queue[0] = e; 
  29.                 setIndex(e, 0); 
  30.             } else { 
  31.                 siftUp(i, e); 
  32.             } 
  33.             if (queue[0] == e) { 
  34.                 leader = null; 
  35.                 available.signal(); 
  36.             } 
  37.         } finally { 
  38.             lock.unlock(); 
  39.         } 
  40.         return true; 
  41.     } 
  42.  
  43.     public RunnableScheduledFuture take() throws InterruptedException { 
  44.         final ReentrantLock lock = this.lock; 
  45.         lock.lockInterruptibly(); 
  46.         try { 
  47.             for (;;) { 
  48.                 RunnableScheduledFuture first = queue[0]; 
  49.                 if (first == null) 
  50.                     available.await(); 
  51.                 else { 
  52.                     long delay = first.getDelay(NANOSECONDS); 
  53.                     if (delay <= 0) 
  54.                         return finishPoll(first); 
  55.                     first = null; // don't retain ref while waiting 
  56.                     if (leader != null) 
  57.                         available.await(); 
  58.                     else { 
  59.                         Thread thisThread = Thread.currentThread(); 
  60.                         leader = thisThread; 
  61.                         try { 
  62.                             available.awaitNanos(delay); 
  63.                         } finally { 
  64.                             if (leader == thisThread) 
  65.                                 leader = null; 
  66.                         } 
  67.                     } 
  68.                 } 
  69.             } 
  70.         } finally { 
  71.             if (leader == null && queue[0] != null) 
  72.                 available.signal(); 
  73.             lock.unlock(); 
  74.         } 
  75.     } 
  • 回到我們最開始說到的ScheduledFutureTask任務(wù)線程類,最終執(zhí)行任務(wù)的其實(shí)就是它

ScheduledFutureTask任務(wù)線程,才是真正執(zhí)行任務(wù)的線程類,只是繞了一圈,做了很多包裝,run()方法就是真正執(zhí)行定時(shí)任務(wù)的方法。

 
 
 
 
  1. private class ScheduledFutureTask 
  2.             extends FutureTask implements RunnableScheduledFuture { 
  3.  
  4.     /** Sequence number to break ties FIFO */ 
  5.     private final long sequenceNumber; 
  6.  
  7.     /** The time the task is enabled to execute in nanoTime units */ 
  8.     private long time; 
  9.  
  10.     /** 
  11.      * Period in nanoseconds for repeating tasks.  A positive 
  12.      * value indicates fixed-rate execution.  A negative value 
  13.      * indicates fixed-delay execution.  A value of 0 indicates a 
  14.      * non-repeating task. 
  15.      */ 
  16.     private final long period; 
  17.  
  18.     /** The actual task to be re-enqueued by reExecutePeriodic */ 
  19.     RunnableScheduledFuture outerTask = this; 
  20.  
  21.     /** 
  22.      * Overrides FutureTask version so as to reset/requeue if periodic. 
  23.      */ 
  24.     public void run() { 
  25.         boolean periodic = isPeriodic(); 
  26.         if (!canRunInCurrentRunState(periodic)) 
  27.             cancel(false); 
  28.         else if (!periodic) 
  29.             ScheduledFutureTask.super.run(); 
  30.         else if (ScheduledFutureTask.super.runAndReset()) { 
  31.             setNextRunTime(); 
  32.             reExecutePeriodic(outerTask); 
  33.         } 
  34.     } 
  35.   
  36.  //... 

3.3、小結(jié)

ScheduledExecutorService 相比 Timer 定時(shí)器,完美的解決上面說到的 Timer 存在的兩個(gè)缺點(diǎn)!

在單體應(yīng)用里面,使用 ScheduledExecutorService 可以解決大部分需要使用定時(shí)任務(wù)的業(yè)務(wù)需求!

但是這是否意味著它是最佳的解決方案呢?

我們發(fā)現(xiàn)線程池中 ScheduledExecutorService 的排序容器跟 Timer 一樣,都是采用最小堆的存儲(chǔ)結(jié)構(gòu),新任務(wù)加入排序效率是O(log(n)),執(zhí)行取任務(wù)是O(1)。

這里的寫入排序效率其實(shí)是有空間可提升的,有可能優(yōu)化到O(1)的時(shí)間復(fù)雜度,也就是我們下面要介紹的時(shí)間輪實(shí)現(xiàn)!

四、時(shí)間輪實(shí)現(xiàn)

所謂時(shí)間輪(RingBuffer)實(shí)現(xiàn),從數(shù)據(jù)結(jié)構(gòu)上看,簡單的說就是循環(huán)隊(duì)列,從名稱上看可能感覺很抽象。

它其實(shí)就是一個(gè)環(huán)形的數(shù)組,如圖所示,假設(shè)我們創(chuàng)建了一個(gè)長度為 8 的時(shí)間輪。

插入、取值流程:

  • 1.當(dāng)我們需要新建一個(gè) 1s 延時(shí)任務(wù)的時(shí)候,則只需要將它放到下標(biāo)為 1 的那個(gè)槽中,2、3、...、7也同樣如此。
  • 2.而如果是新建一個(gè) 10s 的延時(shí)任務(wù),則需要將它放到下標(biāo)為 2 的槽中,但同時(shí)需要記錄它所對(duì)應(yīng)的圈數(shù),也就是 1 圈,不然就和 2 秒的延時(shí)消息重復(fù)了
  • 3.當(dāng)創(chuàng)建一個(gè) 21s 的延時(shí)任務(wù)時(shí),它所在的位置就在下標(biāo)為 5 的槽中,同樣的需要為他加上圈數(shù)為 2,依次類推...

因此,總結(jié)起來有兩個(gè)核心的變量:

  • 數(shù)組下標(biāo):表示某個(gè)任務(wù)延遲時(shí)間,從數(shù)據(jù)操作上對(duì)執(zhí)行時(shí)間點(diǎn)進(jìn)行取余
  • 圈數(shù):表示需要循環(huán)圈數(shù)

通過這張圖可以更直觀的理解!

當(dāng)我們需要取出延時(shí)任務(wù)時(shí),只需要每秒往下移動(dòng)這個(gè)指針,然后取出該位置的所有任務(wù)即可,取任務(wù)的時(shí)間消耗為O(1)。

當(dāng)我們需要插入任務(wù)式,也只需要計(jì)算出對(duì)應(yīng)的下表和圈數(shù),即可將任務(wù)插入到對(duì)應(yīng)的數(shù)組位置中,插入任務(wù)的時(shí)間消耗為O(1)。

如果時(shí)間輪的槽比較少,會(huì)導(dǎo)致某一個(gè)槽上的任務(wù)非常多,那么效率也比較低,這就和 HashMap 的 hash 沖突是一樣的,因此在設(shè)計(jì)槽的時(shí)候不能太大也不能太小。

4.1、代碼實(shí)現(xiàn)

  • 首先創(chuàng)建一個(gè)RingBufferWheel時(shí)間輪定時(shí)任務(wù)管理器
 
 
 
 
  1. public class RingBufferWheel { 
  2.  
  3.     private Logger logger = LoggerFactory.getLogger(RingBufferWheel.class); 
  4.  
  5.  
  6.     /** 
  7.      * default ring buffer size 
  8.      */ 
  9.     private static final int STATIC_RING_SIZE = 64; 
  10.  
  11.     private Object[] ringBuffer; 
  12.  
  13.     private int bufferSize; 
  14.  
  15.     /** 
  16.      * business thread pool 
  17.      */ 
  18.     private ExecutorService executorService; 
  19.  
  20.     private volatile int size = 0; 
  21.  
  22.     /*** 
  23.      * task stop sign 
  24.      */ 
  25.     private volatile boolean stop = false; 
  26.  
  27.     /** 
  28.      * task start sign 
  29.      */ 
  30.     private volatile AtomicBoolean start = new AtomicBoolean(false); 
  31.  
  32.     /** 
  33.      * total tick times 
  34.      */ 
  35.     private AtomicInteger tick = new AtomicInteger(); 
  36.  
  37.     private Lock lock = new ReentrantLock(); 
  38.     private Condition condition = lock.newCondition(); 
  39.  
  40.     private AtomicInteger taskId = new AtomicInteger(); 
  41.     private Map taskMap = new ConcurrentHashMap<>(16); 
  42.  
  43.     /** 
  44.      * Create a new delay task ring buffer by default size 
  45.      * 
  46.      * @param executorService the business thread pool 
  47.      */ 
  48.     public RingBufferWheel(ExecutorService executorService) { 
  49.         this.executorService = executorService; 
  50.         this.bufferSize = STATIC_RING_SIZE; 
  51.         this.ringBuffer = new Object[bufferSize]; 
  52.     } 
  53.  
  54.  
  55.     /** 
  56.      * Create a new delay task ring buffer by custom buffer size 
  57.      * 
  58.      * @param executorService the business thread pool 
  59.      * @param bufferSize      custom buffer size 
  60.      */ 
  61.     public RingBufferWheel(ExecutorService executorService, int bufferSize) { 
  62.         this(executorService); 
  63.  
  64.         if (!powerOf2(bufferSize)) { 
  65.             throw new RuntimeException("bufferSize=[" + bufferSize + "] must be a power of 2"); 
  66.         } 
  67.         this.bufferSize = bufferSize; 
  68.         this.ringBuffer = new Object[bufferSize]; 
  69.     } 
  70.  
  71.     /** 
  72.      * Add a task into the ring buffer(thread safe) 
  73.      * 
  74.      * @param task business task extends {@link Task} 
  75.   &nb
    分享標(biāo)題:定時(shí)任務(wù)實(shí)現(xiàn)原理詳解
    文章URL:http://m.5511xx.com/article/cdochdg.html