import pLimit from 'p-limit'; const limit = pLimit(1); const input = [ limit(() => fetchSomething('foo')), limit(() => fetchSomething('bar')), limit(() => doSomething()) ]; // Only one promise is run at once const result = await Promise.all(input); console.log(result);
在代码的第一行,使用了 pLimit(1)
来创建一个 p-limit 实例,并将并发限制设为 1。这意味着,在任意时刻,只能有一个 Promise 在运行。
在第四行,使用了 limit(() => fetchSomething('foo'))
来包装一个异步函数,并返回一个 Promise。同样的方式,在第五、六行也包装了其他两个异步函数。
最后,使用 Promise.all
方法来等待所有 Promise 的完成,并在完成后将结果输出到控制台。由于 p-limit 的限制,这些 Promise 只会按顺序一个一个地运行,保证了并发的数量不会超过 1。
import Queue from 'yocto-queue'; export default function pLimit(concurrency) { if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) { throw new TypeError('Expected `concurrency` to be a number from 1 and up'); } const queue = new Queue(); let activeCount = 0; }
yocto-queue 是一种允许高效存储和检索数据的数据结构。前边的章节分析过它的源码,详情参见: 源码共读|yocto-queue 队列 链表
pLimit 函数接受一个参数,并发数,首先函数判断参数是否是数组类型,或者是否能够转换成数字类型,如果不能,抛出一个错误。
const next = () => { activeCount--; if (queue.size > 0) { queue.dequeue()(); } }; const run = async (fn, resolve, args) => { activeCount++; const result = (async () => fn(...args))(); resolve(result); try { await result; } catch {} next(); };
在代码的 next
函数中,如果队列不为空,则从队列中取出一个函数并执行。这个函数的执行会导致计数器的值减 1。
在代码的 run
函数中,使用了 async/await
语法来执行传入的函数 fn
。它还使用了 resolve
函数将函数的返回值包装成一个 Promise,并将这个 Promise 返回给调用者。在函数执行完成后,调用 next
const enqueue = (fn, resolve, args) => { queue.enqueue(run.bind(undefined, fn, resolve, args)); (async () => { // This function needs to wait until the next microtask before comparing // `activeCount` to `concurrency`, because `activeCount` is updated asynchronously // when the run function is dequeued and called. The comparison in the if-statement // needs to happen asynchronously as well to get an up-to-date value for `activeCount`. await Promise.resolve(); if (activeCount < concurrency && queue.size > 0) { queue.dequeue()(); } })(); };
在代码的 enqueue
函数中,使用了 queue.enqueue
方法将传入的函数 fn
加入队列。然后,它使用了 async/await
const generator = (fn, ...args) => new Promise(resolve => { enqueue(fn, resolve, args); }); Object.defineProperties(generator, { activeCount: { get: () => activeCount, }, pendingCount: { get: () => queue.size, }, clearQueue: { value: () => { queue.clear(); }, }, }); return generator;
在代码的 generator
函数中,使用了 new Promise
语法来生成一个新的 Promise,并在其中调用了 enqueue
函数。这样,每次调用生成的函数时,都会生成一个新的 Promise,并将函数加入队列。
最后,使用了 Object.defineProperties
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>