NodeJs Promise 并发控制

2021/11/18 javascriptpromise

# 场景

当异步处理过多的时候,比如 Promise.all() 并发发起多个 Promise,假设 Promise 是 tcp 连接,并且数量达到几万个,会带来性能问题或触发系统限制。

# 解决思路

对 Promise 做并发限制。也就是准备一个 Pool(池),用来限制并发上限数。

例如 Pool 中上限是 4,而需要并发的 Promise 数量是 8。那么会先取前 4 个 Promise 执行,剩余的 Promise 「排队」等候。

# 实现思路

简述一个简易实现思路:

  1. 封装一个 ConcurrencyPromisePool
  2. 方法有 all(),和 Promise.prototype.all 类似。
  3. 属性有 limitqueue。前者是并发上限,后者存放排队的 promise。

注意:第 2 点中,all 函数传入的是生成 Promise 的方法,而不是 Promise 实例。因为 Promise 一旦生成实例,会直接执行。所以要把这个执行交给 ConcurrencyPromisePool 来控制。

代码实现:

class ConcurrencyPromisePool {
    constructor(limit) {
        this.limit = limit;
        this.runningNum = 0;
        this.queue = [];
        this.results = [];
    }

    all(promises = []) {
        return new Promise((resolve, reject) => {
            for (const promise of promises) {
                this._run(promise, resolve, reject);
            }
        });
    }

    _run(promise, resolve, reject) {
        if (this.runningNum >= this.limit) {
            console.log('>>> 达到上限,入队:', promise);
            this.queue.push(promise);
            return;
        }

        ++this.runningNum;
        promise()
            .then(res => {
                this.results.push(res);
                --this.runningNum;

                if (this.queue.length === 0 && this.runningNum === 0) {
                    return resolve(this.results);
                }
                if (this.queue.length) {
                    this._run(this.queue.shift(), resolve, reject);
                }
            })
            .catch(reject);
    }
}

代码使用:

const promises = [];
for (let i = 0; i < 5; ++i) {
    promises.push(
        () =>
            new Promise(resolve => {
                console.log(`${i} start`);
                setTimeout(() => {
                console.log(`${i} end`);
                resolve(i);
                }, 1000);
            })
    );
}

const pool = new ConcurrencyPromisePool(4);
pool.all(promises);

输出结果:

0 start
1 start
2 start
3 start
>>> 达到上限,入队: [Function]
0 end
4 start
1 end
2 end
3 end
4 end

# 社区方案

推荐 p-limit.js (opens new window)

源码设计很有意思,不侵入 all 方法,改动成本小:

const pLimit = require('p-limit');

const limit = pLimit(1);

const input = [
    limit(() => fetchSomething('foo')),
    limit(() => fetchSomething('bar')),
    limit(() => doSomething())
];

(async () => {
    // Only one promise is run at once
    const result = await Promise.all(input);
    console.log(result);
})();
上次更新: 2024/4/15 02:28:03