日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
Nodejs多線程的探索和實踐

本文轉(zhuǎn)載自微信公眾號「編程雜技」,作者theanarkh  。轉(zhuǎn)載本文請聯(lián)系編程雜技公眾號。

創(chuàng)新互聯(lián)成立于2013年,公司以網(wǎng)站建設(shè)、做網(wǎng)站、系統(tǒng)開發(fā)、網(wǎng)絡(luò)推廣、文化傳媒、企業(yè)宣傳、平面廣告設(shè)計等為主要業(yè)務(wù),適用行業(yè)近百種。服務(wù)企業(yè)客戶上千,涉及國內(nèi)多個省份客戶。擁有多年網(wǎng)站建設(shè)開發(fā)經(jīng)驗。為企業(yè)提供專業(yè)的網(wǎng)站建設(shè)、創(chuàng)意設(shè)計、宣傳推廣等服務(wù)。 通過專業(yè)的設(shè)計、獨特的風(fēng)格,為不同客戶提供各種風(fēng)格的特色服務(wù)。

1 背景

需求中有以下場景

1 對稱解密、非對稱解密

2 壓縮、解壓

3 大量文件的增刪改查

4 處理大量的字符串,解析協(xié)議

上面的場景都是非常耗時間的,解密、壓縮、文件操作,nodejs使用了內(nèi)置的線程池支持了異步。但是處理字符串和解析協(xié)議是單純消耗cpu的操作。而且nodejs對解密的支持似乎不是很好。我使用了純js的解密庫,所以無法在nodejs主線程里處理。尤其rsa解密,非常耗時間。

所以這時候就要探索解決方案,nodejs提供了多線程的能力。所以自然就選擇了這種方案。但是這只是初步的想法和方案。因為nodejs雖然提供了多線程能力,但是沒有提供一個應(yīng)用層的線程池。所以如果我們單純地使用多線程,一個請求一個線程,這顯然不現(xiàn)實。我們不得不實現(xiàn)自己的線程池。本文分享的內(nèi)容是這個線程池的實現(xiàn)。

線程池的設(shè)計涉及到很多方面,對于純cpu型的任務(wù),線程數(shù)和cpu核數(shù)要相等才能達(dá)到最優(yōu)的性能,否則過多的線程引起的上下文切換反而會導(dǎo)致性能下降。而對于io型的任務(wù),更多的線程理論上是會更好,因為可以更早地給硬盤發(fā)出命令,磁盤會優(yōu)化并持續(xù)地處理請求,想象一下,如果發(fā)出一個命令,硬盤處理一個,然后再發(fā)下一個命令,再處理一個,這樣顯然效率很低。當(dāng)然,線程數(shù)也不是越多越好。線程過多會引起系統(tǒng)負(fù)載過高,過多上下文切換也會帶來性能的下降。下面看一下線程池的實現(xiàn)方案。

2 設(shè)計思路

首先根據(jù)配置創(chuàng)建多個線程(分為預(yù)創(chuàng)建和懶創(chuàng)建),然后對用戶暴露提交任務(wù)的接口,由調(diào)度中心負(fù)責(zé)接收任務(wù),然后根據(jù)策略選擇處理該任務(wù)的線程。子線程一直在輪詢是否有任務(wù)需要處理。處理完通知調(diào)度中心。

下面看一下具體的實現(xiàn)

2.1 和用戶通信的數(shù)據(jù)結(jié)構(gòu)

 
 
 
 
  1. class UserWork extends EventEmitter {
  2.     constructor({ workId, threadId }) {
  3.         super();
  4.         this.workId = workId;
  5.         this.threadId = threadId;
  6.         workPool[workId] = this;
  7.     }
  8. }

用戶提交任務(wù)的時候,調(diào)度中心返回一個UserWork對象。用戶可以使用該對象和調(diào)度中心通信。

2.2 調(diào)度中心的實現(xiàn)

調(diào)度中心的實現(xiàn)大致分為以下幾個邏輯。

2.2.1 初始化

 
 
 
 
  1. constructor(options = {}) {
  2.        this.options = options;
  3.        // 線程池總?cè)蝿?wù)數(shù)
  4.        this.totalWork = 0;
  5.        // 子線程隊列
  6.        this.workerQueue = [];
  7.        // 核心線程數(shù)
  8.        this.coreThreads = ~~options.coreThreads || config.CORE_THREADS;
  9.        // 線程池最大線程數(shù),如果不支持動態(tài)擴(kuò)容則最大線程數(shù)等于核心線程數(shù)
  10.        this.maxThreads = options.expansion !== false ? Math.max(this.coreThreads, config.MAX_THREADS) : this.coreThreads;
  11.        // 工作線程處理任務(wù)的模式
  12.        this.sync = options.sync !== false;
  13.        // 超過任務(wù)隊列長度時的處理策略
  14.        this.discardPolicy = options.discardPolicy ? options.discardPolicy : DISCARD_POLICY.NOT_DISCARD;
  15.        // 是否預(yù)創(chuàng)建子線程
  16.        this.preCreate = options.preCreate === true;
  17.        this.maxIdleTime = ~~options.maxIdleTime || config.MAX_IDLE_TIME;
  18.        this.pollIntervalTime = ~~options.pollIntervalTime || config.POLL_INTERVAL_TIME;
  19.        this.maxWork = ~~options.maxWork || config.MAX_WORK;
  20.        // 是否預(yù)創(chuàng)建線程池
  21.        this.preCreate && this.preCreateThreads();
  22.    }

從初始化代碼中我們看到線程池大致支持的能力。

  1. 核心線程數(shù)
  2. 最大線程數(shù)
  3. 過載時的處理策略,和過載的閾值
  4. 子線程空閑退出的時間和輪詢?nèi)蝿?wù)的時間
  5. 是否預(yù)創(chuàng)建線程池
  6. 是否支持動態(tài)擴(kuò)容

核心線程數(shù)是任務(wù)數(shù)沒有達(dá)到閾值時的工作線程集合。是處理任務(wù)的主力軍。任務(wù)數(shù)達(dá)到閾值后,如果支持動態(tài)擴(kuò)容(可配置)則會創(chuàng)建新的線程去處理更多的任務(wù)。一旦負(fù)載變低,線程空閑時間達(dá)到閾值則會自動退出。如果擴(kuò)容的線程數(shù)達(dá)到閾值,還有新的任務(wù)到來,則根據(jù)丟棄策略進(jìn)行相關(guān)的處理。

2.2.2 創(chuàng)建線程

 
 
 
 
  1. newThread() {
  2.         let { sync } = this;
  3.         const worker = new Worker(workerPath, {workerData: { sync, maxIdleTime: this.maxIdleTime, pollIntervalTime: this.pollIntervalTime, }});
  4.         const node = {
  5.             worker,
  6.             // 該線程處理的任務(wù)數(shù)量
  7.             queueLength: 0,
  8.         };
  9.         this.workerQueue.push(node);
  10.         const threadId = worker.threadId;
  11.         worker.on('exit', (status) => {
  12.             // 異常退出則補(bǔ)充線程,正常退出則不補(bǔ)充
  13.             if (status) {
  14.                 this.newThread();
  15.             }
  16.             this.totalWork -= node.queueLength;
  17.             this.workerQueue = this.workerQueue.filter((worker) => {
  18.                 return worker.threadId !== threadId;
  19.             });
  20.         });
  21.         // 和子線程通信
  22.         worker.on('message', (result) => {
  23.             const {
  24.                 work,
  25.                 event,
  26.             } = result;
  27.             const { data, error, workId } = work;
  28.             // 通過workId拿到對應(yīng)的userWorker
  29.             const userWorker = workPool[workId];
  30.             delete workPool[workId];
  31.             // 任務(wù)數(shù)減一
  32.             node.queueLength--;
  33.             this.totalWork--;
  34.             switch(event) {
  35.                 case 'done':
  36.                     // 通知用戶,任務(wù)完成
  37.                     userWorker.emit('done', data);
  38.                     break;
  39.                 case 'error':
  40.                     // 通知用戶,任務(wù)出錯
  41.                     if (EventEmitter.listenerCount(userWorker, 'error')) {
  42.                         userWorker.emit('error', error);
  43.                     }
  44.                     break;
  45.                 default: break;
  46.             }
  47.         });
  48.         worker.on('error', (...rest) => {
  49.             console.log(...rest)
  50.         });
  51.         return node;
  52.     }

創(chuàng)建線程主要是調(diào)用nodejs提供的模塊進(jìn)行創(chuàng)建。然后監(jiān)聽子線程的退出和message、error事件。如果是異常退出則補(bǔ)充線程。調(diào)度中心維護(hù)了一個子線程的隊列。記錄了每個子線程(worker)的實例和任務(wù)數(shù)。

2.2.3 選擇執(zhí)行任務(wù)的線程

 
 
 
 
  1. selectThead() {
  2.         let min = Number.MAX_SAFE_INTEGER;
  3.         let i = 0;
  4.         let index = 0;
  5.         // 找出任務(wù)數(shù)最少的線程,把任務(wù)交給他
  6.         for (; i < this.workerQueue.length; i++) {
  7.             const { queueLength } = this.workerQueue[i];
  8.             if (queueLength < min) {
  9.                 index = i;
  10.                 min = queueLength;
  11.             }
  12.         }
  13.         return this.workerQueue[index];
  14.     }

選擇策略目前是選擇任務(wù)數(shù)最少的,本來還支持隨機(jī)和輪詢方式,但是貌似沒有什么場景和必要,就去掉了。

2.2.4 暴露提交任務(wù)的接口

 
 
 
 
  1. submit(filename, options = {}) {
  2.         return new Promise(async (resolve, reject) => {
  3.             let thread;
  4.             // 沒有線程則創(chuàng)建一個
  5.             if (this.workerQueue.length) {
  6.                 thread = this.selectThead();
  7.                 // 任務(wù)隊列非空
  8.                 if (thread.queueLength !== 0) {
  9.                     // 子線程個數(shù)還沒有達(dá)到核心線程數(shù),則新建線程處理
  10.                     if (this.workerQueue.length < this.coreThreads) {
  11.                         thread = this.newThread();
  12.                     } else if (this.totalWork + 1 > this.maxWork){
  13.                         // 總?cè)蝿?wù)數(shù)已達(dá)到閾值,還沒有達(dá)到線程數(shù)閾值,則創(chuàng)建
  14.                         if(this.workerQueue.length < this.maxThreads) {
  15.                             thread = this.newThread();
  16.                         } else {
  17.                             // 處理溢出的任務(wù)
  18.                             switch(this.discardPolicy) {
  19.                                 case DISCARD_POLICY.ABORT: 
  20.                                     return reject(new Error('queue overflow'));
  21.                                 case DISCARD_POLICY.CALLER_RUNS: 
  22.                                     const userWork =  new UserWork({workId: this.generateWorkId(), threadId}); 
  23.                                     try {
  24.                                         const asyncFunction = require(filename);
  25.                                         if (!isAsyncFunction(asyncFunction)) {
  26.                                             return reject(new Error('need export a async function'));
  27.                                         }
  28.                                         const result = await asyncFunction(options);
  29.                                         resolve(userWork);
  30.                                         setImmediate(() => {
  31.                                             userWork.emit('done', result);
  32.                                         });
  33.                                     } catch (error) {
  34.                                         resolve(userWork);
  35.                                         setImmediate(() => {
  36.                                             userWork.emit('error', error);
  37.                                         });
  38.                                     }
  39.                                     return;
  40.                                 case DISCARD_POLICY.DISCARD_OLDEST: 
  41.                                     thread.worker.postMessage({cmd: 'delete'});
  42.                                     break;
  43.                                 case DISCARD_POLICY.DISCARD:
  44.                                     return reject(new Error('discard'));
  45.                                 case DISCARD_POLICY.NOT_DISCARD:
  46.                                     break;
  47.                                 default: 
  48.                                     break;
  49.                             }
  50.                         }
  51.                     }
  52.                 }
  53.             } else {
  54.                 thread = this.newThread();
  55.             }
  56.             // 生成一個任務(wù)id
  57.             const workId = this.generateWorkId();
  58.             // 新建一個work,交給對應(yīng)的子線程
  59.             const work = new Work({ workId, filename, options });
  60.             const userWork = new UserWork({workId, threadId: thread.worker.threadId});
  61.             thread.queueLength++;
  62.             this.totalWork++;
  63.             thread.worker.postMessage({cmd: 'add', work});
  64.             resolve(userWork);
  65.         })
  66.     }

提交任務(wù)的函數(shù)比較復(fù)雜,提交一個任務(wù)的時候,調(diào)度中心會根據(jù)當(dāng)前的負(fù)載情況和線程數(shù),決定對一個任務(wù)做如何處理。如果可以處理,則把任務(wù)交給選中的子線程。最后給用戶返回一個UserWorker對象。

2.3調(diào)度中心和子線程的通信數(shù)據(jù)結(jié)構(gòu)

 
 
 
 
  1. class Work {
  2.     constructor({workId, filename, options}) {
  3.         // 任務(wù)id
  4.         this.workId = workId;
  5.         // 文件名
  6.         this.filename = filename;
  7.         // 處理結(jié)果,由用戶代碼返回
  8.         this.data = null;
  9.         // 執(zhí)行出錯
  10.         this.error = null;
  11.         // 執(zhí)行時入?yún)?/li>
  12.         this.options = options;
  13.     }
  14. }

一個任務(wù)對應(yīng)一個id,目前只支持文件的執(zhí)行模式,后續(xù)會支持字符串。

2.4 子線程的實現(xiàn)

子線程的實現(xiàn)主要分為幾個部分

2.4.1 監(jiān)聽調(diào)度中心分發(fā)的命令

 
 
 
 
  1. parentPort.on('message', ({cmd, work}) => {
  2.     switch(cmd) {
  3.         case 'delete':
  4.             return queue.shift();
  5.         case 'add':
  6.             return queue.push(work);
  7.     }
  8. });

2.4.2 輪詢是否有任務(wù)需要處理

 
 
 
 
  1. function poll() {
  2.     const now = Date.now();
  3.     if (now - lastWorkTime > maxIdleTime && !queue.length) {
  4.         process.exit(0);
  5.     }
  6.     setTimeout(async () => {
  7.         // 處理任務(wù)
  8.         poll();
  9.     }
  10.     }, pollIntervalTime);
  11. }
  12. // 輪詢判斷是否有任務(wù)
  13. poll();

不斷輪詢是否有任務(wù)需要處理,如果沒有并且空閑時間達(dá)到閾值則退出。

2.4.3 處理任務(wù)

處理任務(wù)模式分為同步和異步

 
 
 
 
  1. while(queue.length) {
  2.           const work = queue.shift();
  3.           try {
  4.               const { filename, options } = work;
  5.               const asyncFunction = require(filename);
  6.               if (!isAsyncFunction(asyncFunction)) {
  7.                   return;
  8.               }
  9.               lastWorkTime = now;
  10.               const result = await asyncFunction(options);
  11.               work.data = result;
  12.               parentPort.postMessage({event: 'done', work});
  13.           } catch (error) {
  14.               work.error = error.toString();
  15.               parentPort.postMessage({event: 'error', work});
  16.           }
  17.       }

用戶需要導(dǎo)出一個async函數(shù),使用這種方案主要是為了執(zhí)行時可以給用戶傳入?yún)?shù)。并且實現(xiàn)同步。處理完后通知調(diào)度中心。下面是異步處理方式,子線程不需要同步等待用戶的代碼結(jié)果。

 
 
 
 
  1. const arr = [];
  2.        while(queue.length) {
  3.            const work = queue.shift();
  4.            try {
  5.                const { filename } = work;
  6.                const asyncFunction = require(filename);
  7.                if (!isAsyncFunction(asyncFunction)) {
  8.                    return;
  9.                }
  10.                arr.push({asyncFunction, work});
  11.            } catch (error) {
  12.                work.error = error.toString();
  13.                parentPort.postMessage({event: 'error', work});
  14.            }
  15.        }
  16.        arr.map(async ({asyncFunction, work}) => {
  17.            try {
  18.                const { options } = work;
  19.                lastWorkTime = now;
  20.                const result = await asyncFunction(options);
  21.                work.data = result;
  22.                parentPort.postMessage({event: 'done', work});
  23.            } catch (e) {
  24.                work.error = error.toString();
  25.                parentPort.postMessage({event: 'done', work});
  26.            }
  27.        })

最后還有一些配置和定制化的功能。

 
 
 
 
  1. module.exports = {
  2.     // 最大的線程數(shù)
  3.     MAX_THREADS: 50,
  4.     // 線程池最大任務(wù)數(shù)
  5.     MAX_WORK: Infinity,
  6.     // 默認(rèn)核心線程數(shù)
  7.     CORE_THREADS: 10,
  8.     // 最大空閑時間
  9.     MAX_IDLE_TIME: 10 * 60 * 1000,
  10.     // 子線程輪詢時間
  11.     POLL_INTERVAL_TIME: 10,
  12. };
  13. // 丟棄策略
  14. const DISCARD_POLICY = {
  15.     // 報錯
  16.     ABORT: 1,
  17.     // 在主線程里執(zhí)行
  18.     CALLER_RUNS: 2,
  19.     // 丟棄最老的的任務(wù)
  20.     DISCARD_OLDEST: 3,
  21.     // 丟棄
  22.     DISCARD: 4,
  23.     // 不丟棄
  24.     NOT_DISCARD: 5,
  25. };

支持多個類型的線程池

 
 
 
 
  1. class AsyncThreadPool extends ThreadPool {
  2.     constructor(options) {
  3.         super({...options, sync: false});
  4.     }
  5. }
  6. class SyncThreadPool extends ThreadPool {
  7.     constructor(options) {
  8.         super({...options, sync: true});
  9.     }
  10. }
  11. // cpu型任務(wù)的線程池,線程數(shù)和cpu核數(shù)一樣,不支持動態(tài)擴(kuò)容
  12. class CPUThreadPool extends ThreadPool {
  13.     constructor(options) {
  14.         super({...options, coreThreads: cores, expansion: false});
  15.     }
  16. }
  17. // 線程池只有一個線程,類似消息隊列
  18. class SingleThreadPool extends ThreadPool {
  19.     constructor(options) {
  20.         super({...options, coreThreads: 1, expansion: false });
  21.     }
  22. }
  23. // 線程數(shù)固定的線程池,不支持動態(tài)擴(kuò)容線程
  24. class FixedThreadPool extends ThreadPool {
  25.     constructor(options) {
  26.         super({ ...options, expansion: false });
  27.     }
  28. }

這就是線程池的實現(xiàn),有很多細(xì)節(jié)還需要思考。下面是一個性能測試的例子。

3 測試

 
 
 
 
  1. const { MAX } = require('./constants');
  2. module.exports = async function() {
  3.     let ret = 0;
  4.     let i = 0;
  5.     while(i++ < MAX) {
  6.         ret++;
  7.         Buffer.from(String(Math.random())).toString('base64');
  8.     }
  9.     return ret;
  10. }

在服務(wù)器以單線程和多線程的方式執(zhí)行以上代碼,下面是MAX為10000和100000時,使用CPUThreadPool類型線程池的性能對比(具體代碼參考https://github.com/theanarkh/nodejs-threadpool)。

10000

單線程 [ 358.35, 490.93, 705.23, 982.6, 1155.72 ]

多線程 [ 379.3, 230.35, 315.52, 429.4, 496.04 ]

100000

單線程 [ 2485.5, 4454.63, 6894.5, 9173.16, 11011.16 ]

多線程 [ 1791.75, 2787.15, 3275.08, 4093.39, 3674.91 ]

我們發(fā)現(xiàn)這個數(shù)據(jù)差別非常明顯。并且隨著處理時間的增長,性能差距越明顯。


當(dāng)前名稱:Nodejs多線程的探索和實踐
標(biāo)題來源:http://m.5511xx.com/article/cdsgeog.html