日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問(wèn)題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
手寫(xiě)p-limit,40行代碼實(shí)現(xiàn)并發(fā)控制

前端代碼經(jīng)常要處理各種異步邏輯。

10余年的坡頭網(wǎng)站建設(shè)經(jīng)驗(yàn),針對(duì)設(shè)計(jì)、前端、開(kāi)發(fā)、售后、文案、推廣等六對(duì)一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。網(wǎng)絡(luò)營(yíng)銷推廣的優(yōu)勢(shì)是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動(dòng)調(diào)整坡頭建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無(wú)論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。成都創(chuàng)新互聯(lián)從事“坡頭網(wǎng)站設(shè)計(jì)”,“坡頭網(wǎng)站推廣”以來(lái),每個(gè)客戶項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。

有的是串行的:

const promise1 = new Promise(function(resolve) {
// 異步邏輯 1...
resolve();
});
const promise2 = new Promise(function(resolve) {
// 異步邏輯 2...
resolve();
});

promise1.then(() => promise2);
await promise1;
await promise2;

有的是并行的:

await Promise.all([promise1, promise2]);
await Promise.race([promise1, promise2]);

并行的異步邏輯有時(shí)還要做并發(fā)控制。

并發(fā)控制是常見(jiàn)的需求,也是面試??嫉拿嬖囶}。

一般我們會(huì)用 p-limit 來(lái)做:

import pLimit from 'p-limit';

const limit = pLimit(2);

const input = [
limit(() => fetchSomething('foo')),
limit(() => fetchSomething('bar')),
limit(() => doSomething())
];

const result = await Promise.all(input);
console.log(result);

比如上面這段邏輯,就是幾個(gè)異步邏輯并行執(zhí)行,并且最大并發(fā)是 2。

那如何實(shí)現(xiàn)這樣的并發(fā)控制呢?

我們自己來(lái)寫(xiě)一個(gè):

首先,要傳入并發(fā)數(shù)量,返回一個(gè)添加并發(fā)任務(wù)的函數(shù),我們把它叫做 generator:

const pLimit = (concurrency) => {
const generator = (fn, ...args) =>
new Promise((resolve) => {
//...
});

return generator;
}

這里添加的并發(fā)任務(wù)要進(jìn)行排隊(duì),所以我們準(zhǔn)備一個(gè) queue,并記錄當(dāng)前在進(jìn)行中的異步任務(wù)。

const queue = [];
let activeCount = 0;

const generator = (fn, ...args) =>
new Promise((resolve) => {
enqueue(fn, resolve, ...args);
});

添加的異步任務(wù)就入隊(duì),也就是 enqueue。

enqueue 做的事情就是把一個(gè)異步任務(wù)添加到 queue 中,并且只要沒(méi)達(dá)到并發(fā)上限就再執(zhí)行一批任務(wù):

const enqueue = (fn, resolve, ...args) => {
queue.push(run.bind(null, fn, resolve, ...args));

if (activeCount < concurrency && queue.length > 0) {
queue.shift()();
}
};

具體運(yùn)行的邏輯是這樣的:

const run = async (fn, resolve, ...args) => {
activeCount++;

const result = (async () => fn(...args))();

resolve(result);

try {
await result;
} catch {}

next();
};

計(jì)數(shù),運(yùn)行這個(gè)函數(shù),改變最后返回的那個(gè) promise 的狀態(tài),然后執(zhí)行完之后進(jìn)行下一步處理:

下一步處理自然就是把活躍任務(wù)數(shù)量減一,然后再跑一個(gè)任務(wù):

const next = () => {
activeCount--;

if (queue.length > 0) {
queue.shift()();
}
};

這樣就保證了并發(fā)的數(shù)量限制。

現(xiàn)在的全部代碼如下,只有 40 行代碼:

const pLimit = (concurrency) => {  
const queue = [];
let activeCount = 0;

const next = () => {
activeCount--;

if (queue.length > 0) {
queue.shift()();
}
};

const run = async (fn, resolve, ...args) => {
activeCount++;

const result = (async () => fn(...args))();

resolve(result);

try {
await result;
} catch {}

next();
};

const enqueue = (fn, resolve, ...args) => {
queue.push(run.bind(null, fn, resolve, ...args));

if (activeCount < concurrency && queue.length > 0) {
queue.shift()();
}
};

const generator = (fn, ...args) =>
new Promise((resolve) => {
enqueue(fn, resolve, ...args);
});

return generator;
};

這就已經(jīng)實(shí)現(xiàn)了并發(fā)控制。

不信我們跑跑看:

準(zhǔn)備這樣一段測(cè)試代碼:

const limit = pLimit(2);

function asyncFun(value, delay) {
return new Promise((resolve) => {
console.log('start ' + value);
setTimeout(() => resolve(value), delay);
});
}

(async function () {
const arr = [
limit(() => asyncFun('aaa', 2000)),
limit(() => asyncFun('bbb', 3000)),
limit(() => asyncFun('ccc', 1000)),
limit(() => asyncFun('ccc', 1000)),
limit(() => asyncFun('ccc', 1000))
];

const result = await Promise.all(arr);
console.log(result);
})();

沒(méi)啥好說(shuō)的,就是 setTimeout + promise,設(shè)置不同的 delay 時(shí)間。

并發(fā)數(shù)量為 2。

我們?cè)囅拢?/p>

先并發(fā)執(zhí)行前兩個(gè)任務(wù),2s 的時(shí)候一個(gè)任務(wù)執(zhí)行完,又執(zhí)行了一個(gè)任務(wù),然后再過(guò)一秒,都執(zhí)行完了,有同時(shí)執(zhí)行了兩個(gè)任務(wù)。

經(jīng)過(guò)測(cè)試,我們已經(jīng)實(shí)現(xiàn)了并發(fā)控制!

回顧一下我們實(shí)現(xiàn)的過(guò)程,其實(shí)就是一個(gè)隊(duì)列來(lái)保存任務(wù),開(kāi)始的時(shí)候一次性執(zhí)行最大并發(fā)數(shù)的任務(wù),然后每執(zhí)行完一個(gè)啟動(dòng)一個(gè)新的。

還是比較簡(jiǎn)單的。

上面的 40 行代碼是最簡(jiǎn)化的版本,其實(shí)還有一些可以完善的地方,我們繼續(xù)完善一下。

首先,我們要把并發(fā)數(shù)暴露出去,還要讓開(kāi)發(fā)者可以手動(dòng)清理任務(wù)隊(duì)列。

我們這樣寫(xiě):

Object.defineProperties(generator, {
activeCount: {
get: () => activeCount
},
pendingCount: {
get: () => queue.length
},
clearQueue: {
value: () => {
queue.length = 0;
}
}
});

用 Object.defineProperties 只定義 get 函數(shù),這樣 activeCount、pendingCount 就是只能讀不能改的。

同時(shí)還提供了一個(gè)清空任務(wù)隊(duì)列的函數(shù)。

然后傳入的參數(shù)也加個(gè)校驗(yàn)邏輯:

if (!((Number.isInteger(concurrency) || concurrency === Infinity) && concurrency > 0)) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}

不是整數(shù)或者小于 0 就報(bào)錯(cuò),當(dāng)然,Infinity 也是可以的。

最后,其實(shí)還有一個(gè)特別需要完善的點(diǎn),就是這里:

const enqueue = (fn, resolve, ...args) => {
queue.push(run.bind(null, fn, resolve, ...args));

if (activeCount < concurrency && queue.length > 0) {
queue.shift()();
}
};

應(yīng)該改成這樣:

const enqueue = (fn, resolve, ...args) => {
queue.push(run.bind(null, fn, resolve, ...args));

(async () => {
await Promise.resolve();

if (activeCount < concurrency && queue.length > 0) {
queue.shift()();
}
})();
};

因?yàn)?activeCount-- 的邏輯是在執(zhí)行完任務(wù)之后才執(zhí)行的,萬(wàn)一任務(wù)還沒(méi)執(zhí)行完,這時(shí)候 activeCount 就是不準(zhǔn)的。

所以為了保證并發(fā)數(shù)量能控制準(zhǔn)確,要等全部的微任務(wù)執(zhí)行完再拿 activeCount。

怎么在全部的微任務(wù)執(zhí)行完再執(zhí)行邏輯呢?

加一個(gè)新的微任務(wù)不就行了?

所以有這樣的 await Promise.resolve(); 的邏輯。

這樣,就是一個(gè)完善的并發(fā)控制邏輯了,p-limit 也是這么實(shí)現(xiàn)的。

感興趣的同學(xué)可以自己試一下:

const pLimit = (concurrency) => {
if (!((Number.isInteger(concurrency) || concurrency === Infinity) && concurrency > 0)) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}

const queue = [];
let activeCount = 0;

const next = () => {
activeCount--;

if (queue.length > 0) {
queue.shift()();
}
};

const run = async (fn, resolve, ...args) => {
activeCount++;

const result = (async () => fn(...args))();

resolve(result);

try {
await result;
} catch {}

next();
};

const enqueue = (fn, resolve, ...args) => {
queue.push(run.bind(null, fn, resolve, ...args));

(async () => {
await Promise.resolve();

if (activeCount < concurrency && queue.length > 0) {
queue.shift()();
}
})();
};

const generator = (fn, ...args) =>
new Promise((resolve) => {
enqueue(fn, resolve, ...args);
});

Object.defineProperties(generator, {
activeCount: {
get: () => activeCount
},
pendingCount: {
get: () => queue.length
},
clearQueue: {
value: () => {
queue.length = 0;
}
}
});

return generator;
};

const limit = pLimit(2);

function asyncFun(value, delay) {
return new Promise((resolve) => {
console.log('start ' + value);
setTimeout(() => resolve(value), delay);
});
}

(async function () {
const arr = [
limit(() => asyncFun('aaa', 2000)),
limit(() => asyncFun('bbb', 3000)),
limit(() => asyncFun('ccc', 1000)),
limit(() => asyncFun('ccc', 1000)),
limit(() => asyncFun('ccc', 1000))
];

const result = await Promise.all(arr);
console.log(result);
})();

總結(jié)

js 代碼經(jīng)常要處理異步邏輯的串行、并行,還可能要做并發(fā)控制,這也是面試常考的點(diǎn)。

實(shí)現(xiàn)并發(fā)控制的核心就是通過(guò)一個(gè)隊(duì)列保存所有的任務(wù),然后最開(kāi)始批量執(zhí)行一批任務(wù)到最大并發(fā)數(shù),然后每執(zhí)行完一個(gè)任務(wù)就再執(zhí)行一個(gè)新的。

其中要注意的是為了保證獲取的任務(wù)數(shù)量是準(zhǔn)確的,要在所有微任務(wù)執(zhí)行完之后再獲取數(shù)量。

實(shí)現(xiàn)并發(fā)控制只要 40 多行代碼,其實(shí)這就是 p-limit 的源碼了,大家感興趣也可以自己實(shí)現(xiàn)一下。


新聞標(biāo)題:手寫(xiě)p-limit,40行代碼實(shí)現(xiàn)并發(fā)控制
URL鏈接:http://m.5511xx.com/article/cdseipp.html