日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
Java線程池架構(gòu)原理和源碼解析(ThreadPoolExecutor)

在前面介紹JUC的文章中,提到了關(guān)于線程池Execotors的創(chuàng)建介紹,在文章:《java之JUC系列-外部Tools》中第一部分有詳細(xì)的說明,請參閱;

創(chuàng)新互聯(lián)公司是一家專注于網(wǎng)站制作、成都做網(wǎng)站與策劃設(shè)計,漣水網(wǎng)站建設(shè)哪家好?創(chuàng)新互聯(lián)公司做網(wǎng)站,專注于網(wǎng)站建設(shè)十余年,網(wǎng)設(shè)計領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:漣水等地區(qū)。漣水做網(wǎng)站價格咨詢:028-86922220

文章中其實說明了外部的使用方式,但是沒有說內(nèi)部是如何實現(xiàn)的,為了加深對實現(xiàn)的理解,在使用中可以放心,我們這里將做源碼解析以及反饋到原理 上,Executors工具可以創(chuàng)建普通的線程池以及schedule調(diào)度任務(wù)的調(diào)度池,其實兩者實現(xiàn)上還是有一些區(qū)別,但是理解了 ThreadPoolExecutor,在看ScheduledThreadPoolExecutor就非常輕松了,后面的文章中也會專門介紹這塊,但是 需要先看這篇文章。

使用Executors最常用的莫過于是使用:Executors.newFixedThreadPool(int)這個方法,因為它既可以限制數(shù)量,而且線程用完后不會一直被cache??;那么就通過它來看看源碼,回過頭來再看其他構(gòu)造方法的區(qū)別:

在《java之JUC系列-外部Tools》文章中提到了構(gòu)造方法,為了和本文對接,再貼下代碼:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue());
}

其實你可以自己new一個ThreadPoolExecutor,來達(dá)到自己的參數(shù)可控的程度,例如,可以將LinkedBlockingQueue換成其它的(如:SynchronousQueue),只是可讀性會降低,這里只是使用了一種設(shè)計模式。

我們現(xiàn)在來看看ThreadPoolExecutor的源碼是怎么樣的,也許你剛開始看他的源碼會很痛苦,因為你不知道作者為什么是這樣設(shè)計的,所以本文就我看到的思想會給你做一個介紹,此時也許你通過知道了一些作者的思想,你也許就知道應(yīng)該該如何去操作了。

這里來看下構(gòu)造方法中對那些屬性做了賦值:

源碼段1:

   public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

這里你可以看到最終賦值的過程,可以先大概知道下參數(shù)的意思:

corePoolSize:核心運行的poolSize,也就是當(dāng)超過這個范圍的時候,就需要將新的Thread放入到等待隊列中了;

maximumPoolSize:一般你用不到,當(dāng)大于了這個值就會將Thread由一個丟棄處理機制來處理, 但是當(dāng)你發(fā)生:newFixedThreadPool的時候,corePoolSize和maximumPoolSize是一樣的,而 corePoolSize是先執(zhí)行的,所以他會先被放入等待隊列,而不會執(zhí)行到下面的丟棄處理中,看了后面的代碼你就知道了。

workQueue:等待隊列,當(dāng)達(dá)到corePoolSize的時候,就向該等待隊列放入線程信息(默認(rèn)為一個LinkedBlockingQueue),運行中的隊列屬性為:workers,為一個HashSet;內(nèi)部被包裝了一層,后面會看到這部分代碼。

keepAliveTime:默認(rèn)都是0,當(dāng)線程沒有任務(wù)處理后,保持多長時間,cachedPoolSize是默認(rèn)60s,不推薦使用。

threadFactory:是構(gòu)造Thread的方法,你可以自己去包裝和傳遞,主要實現(xiàn)newThread方法即可;

handler:也就是參數(shù)maximumPoolSize達(dá)到后丟棄處理的方法,java提供了5種丟棄處理的方法,當(dāng)然你也可以自己弄,主要是要實現(xiàn)接口:RejectedExecutionHandler中的方法:

public void rejectedExecution(Runnabler, ThreadPoolExecutor e)

java默認(rèn)的是使用:AbortPolicy,他的作用是當(dāng)出現(xiàn)這中情況的時候會拋出一個異常;其余的還包含:

1、CallerRunsPolicy:如果發(fā)現(xiàn)線程池還在運行,就直接運行這個線程

2、DiscardOldestPolicy:在線程池的等待隊列中,將頭取出一個拋棄,然后將當(dāng)前線程放進去。

3、DiscardPolicy:什么也不做

4、AbortPolicy:java默認(rèn),拋出一個異常:RejectedExecutionException。

通常你得到線程池后,會調(diào)用其中的:submit方法或execute方法 去操作;其實你會發(fā)現(xiàn),submit方法最終會調(diào)用execute方法來進行操作,只是他提供了一個Future來托管返回值的處理而已,當(dāng)你調(diào)用需要有 返回值的信息時,你用它來處理是比較好的;這個Future會包裝對Callable信息,并定義一個Sync對象(),當(dāng)你發(fā)生讀取返回值的操作的時 候,會通過Sync對象進入鎖,直到有返回值的數(shù)據(jù)通知,具體細(xì)節(jié)先不要看太多,繼續(xù)向下:

來看看execute最為核心的方法吧:

源碼段2:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
    }

這段代碼看似簡單,其實有點難懂,很多人也是這里沒看懂,沒事,我一個if一個if說:

首先第一個判定空操作就不用說了,下面判定的poolSize >= corePoolSize成立時候會進入if的區(qū)域,當(dāng)然它不成立也有可能會進入,他會判定addIfUnderCorePoolSize是否返回false,如果返回false就會進去;

我們先來看下addIfUnderCorePoolSize方法的源碼是什么:

源碼段3:

    private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
    }

可以發(fā)現(xiàn),這段源碼是如果發(fā)現(xiàn)小雨corePoolSize就會創(chuàng)建一個新的線程,并且調(diào)用線程的start()方法將線程運行起來:這個addThread()方法,我們先不考慮細(xì)節(jié),因為我們還要先看到前面是怎么進去的,這里可以發(fā)信啊,只有沒有創(chuàng)建成功Thread才會返回false,也就是當(dāng)當(dāng)前的poolSize > corePoolSize的時候,或線程池已經(jīng)不是在running狀態(tài)的時候才會出現(xiàn);

注意:這里在外部判定一次poolSize和corePoolSize只是初步判定,內(nèi)部是加鎖后判定的,以得到更為準(zhǔn)確的結(jié)果,而外部初步判定如果是大于了,就沒有必要進入這段有鎖的代碼了。

此時我們知道了,當(dāng)前線程數(shù)量大于corePoolSize的時候,就會進入【代碼段2】的第一個if語句中,回到【源碼段2】,繼續(xù)看if語句中的內(nèi)容:

這里標(biāo)記為

源碼段4

if (runState == RUNNING && workQueue.offer(command)) {
   if (runState != RUNNING || poolSize == 0)
       ensureQueuedTaskHandled(command);
   }
   else if (!addIfUnderMaximumPoolSize(command))
       reject(command); // is shutdown or saturated

第一個if,也就是當(dāng)當(dāng)前狀態(tài)為running的時候,就會去執(zhí)行workQueue.offer(command),這個workQueue其實 就是一個BlockingQueue,offer()操作就是在隊列的尾部寫入一個對象,此時寫入的對象為線程的對象而已;所以你可以認(rèn)為只有線程池在 RUNNING狀態(tài),才會在隊列尾部插入數(shù)據(jù),否則就執(zhí)行else if,其實else if可以看出是要做一個是否大于MaximumPoolSize的判定,如果大于這個值,就會做reject的操作,關(guān)于reject的說明,我們在【源碼段1】的解釋中已經(jīng)非常明確的說明,這里可以簡單看下源碼,以應(yīng)征結(jié)果:

源碼段5:

    private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < maximumPoolSize && runState == RUNNING)
                //在corePoolSize = maximumPoolSize下,該代碼幾乎不可能運行
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
}
void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }

也就是如果線程池滿了,而且線程池調(diào)用了shutdown后,還在調(diào)用execute方法時,就會拋出上面說明的異常:RejectedExecutionException

再回頭來看下【代碼段4】中進入到等待隊列后的操作:

if (runState != RUNNING || poolSize == 0)

                   ensureQueuedTaskHandled(command);

這段代碼是要在線程池運行狀態(tài)不是RUNNING或poolSize == 0才會調(diào)用,他是干啥呢?

他為什么會不等于RUNNING呢?外面那一層不是判定了他== RUNNING了么,其實有時間差就是了,如果是poolSize == 0也會執(zhí)行這段代碼,但是里面的判定條件是如果不是RUNNING,就做reject操作,在第一個線程進去的時候,會將第一個線程直接啟動起來;很多人 也是看這段代碼很繞,因為不斷的循環(huán)判定類似的判定條件,你主要記住他們之間有時間差,要取最新的就好了。

此時貌似代碼看完了?咦,此時有問題了:

1、  等待中的線程在后來是如何跑起來的呢?線程池是不是有類似Timer一樣的守護進程不斷掃描線程隊列和等待隊列?還是利用某種鎖機制,實現(xiàn)類似wait和notify實現(xiàn)的?

2、  線程池的運行隊列和等待隊列是如何管理的呢?這里還沒看出影子呢!

NO,NO,NO!

Java在實現(xiàn)這部分的時候,使用了怪異的手段,神馬手段呢,還要再看一部分代碼才曉得。

在前面【源碼段3】中,我們看到了一個方法叫:addThread(),也許很少有人會想到關(guān)鍵在這里,其實關(guān)鍵就是在這里:

我們看看addThread()方法到底做了什么。

源碼段6:

    private Thread addThread(Runnable firstTask) {
        Worker w = new Worker(firstTask);
        Thread t = threadFactory.newThread(w);
        if (t != null) {
            w.thread = t;
            workers.add(w);
            int nt = ++poolSize;
            if (nt > largestPoolSize)
                largestPoolSize = nt;
        }
        return t;
    }

這里創(chuàng)建了一個Work,其余的操作,就是講poolSize疊加,然后將將其放入workers的運行隊列等操作;

我們主要關(guān)心Worker是干什么的,因為這個threadFactory對我們用途不大,只是做了Thread的命名處理;而Worker你會發(fā) 現(xiàn)它的定義也是一個Runnable,外部開始在代碼段中發(fā)現(xiàn)了調(diào)用哪個這個Worker的start()方法,也就是線程的啟動方法,其實也就是調(diào)用了 Worker的run()方法,那么我們重點要關(guān)心run方法是如何處理的

源碼段7:

       public void run() {
            try {
                Runnable task = firstTask;
                firstTask = null;
                while (task != null || (task = getTask()) != null) {
                    runTask(task);
                    task = null;
                }
            } finally {
                workerDone(this);
            }
        }

FirstTask其實就是開始在創(chuàng)建work的時候,由外部傳入的Runnable對象,也就是你自己的Thread,你會發(fā)現(xiàn)它如果發(fā)現(xiàn)task為空,就會調(diào)用getTask()方法再判定,直到兩者為空,并且是一個while循環(huán)體。

那么看看getTask()方法的實現(xiàn)為:

源碼段8:

     Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    r = workQueue.take();
                if (r != null)
                    return r;
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
    }

你會發(fā)現(xiàn)它是從workQueue隊列中,也就是等待隊列中獲取一個元素出來并返回!

回過頭來根據(jù)代碼段6理解下:

當(dāng)前線程運行完后,在到workQueue中去獲取一個task出來,繼續(xù)運行,這樣就保證了線程池中有一定的線程一直在運行;此時若跳出了 while循環(huán),只有workQueue隊列為空才會出現(xiàn)或出現(xiàn)了類似于shutdown的操作,自然運行隊列會減少1,當(dāng)再有新的線程進來的時候,就又 開始向worker里面放數(shù)據(jù)了,這樣以此類推,實現(xiàn)了線程池的功能。

這里可以看下run方法的finally中調(diào)用的workerDone方法為:

源碼段9:

    void workerDone(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
            if (--poolSize == 0)
                tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

注意這里將workers.remove(w)掉,并且調(diào)用了—poolSize來做操作。

至于tryTerminate是做了更多關(guān)于回收方面的操作。

最后我們還要看一段代碼就是在【源碼段6】中出現(xiàn)的代碼調(diào)用為:runTask(task);這個方法也是運行的關(guān)鍵。

源碼段10:

     private void runTask(Runnable task) {
            final ReentrantLock runLock = this.runLock;
            runLock.lock();
            try {
                if (runState < STOP &&
                    Thread.interrupted() &&
                    runState >= STOP)
                    thread.interrupt();

                boolean ran = false;
                beforeExecute(thread, task);
                try {
                    task.run();
                    ran = true;
                    afterExecute(task, null);
                    ++completedTasks;
                } catch (RuntimeException ex) {
                    if (!ran)
                        afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                runLock.unlock();
            }
        }

你可以看到,這里面的task為傳入的task信息,調(diào)用的不是start方法,而是run方法,因為run方法直接調(diào)用不會啟動新的線程,也是因為這樣,導(dǎo)致了你無法獲取到你自己的線程的狀態(tài),因為線程池是直接調(diào)用的run方法,而不是start方法來運行。

這里有個beforeExecuteafterExecute方法,分別代表在執(zhí)行前和執(zhí)行后,你可以做一段操作,在這個類中,這兩個方法都是【空body】的,因為普通線程池?zé)o需做更多的操作。

如果你要實現(xiàn)類似暫停等待通知的或其他的操作,可以自己extends后進行重寫構(gòu)造;

本文沒有介紹關(guān)于ScheduledThreadPoolExecutor調(diào)用的細(xì)節(jié),下一篇文章會詳細(xì)說明,因為大部分代碼和本文一致,區(qū)別在于一些細(xì)節(jié),在介紹:ScheduledThreadPoolExecutor的時候,會明確的介紹它與TimerTimerTask的巨大區(qū)別,區(qū)別不在于使用,而是在于本身內(nèi)在的處理細(xì)節(jié)。


名稱欄目:Java線程池架構(gòu)原理和源碼解析(ThreadPoolExecutor)
分享路徑:http://m.5511xx.com/article/cdesdjs.html