/**
 * @class Queue
 * @description 队列
 */
class Queue {
    /**
     * 构造函数
     * @param {Number} concurrence 同步数
     */
    constructor(concurrence = 1) {
        /**
         * 同步执行数
         */
        this.concurrence = concurrence;
        /**
         * 当前正在运行的任务数
         */
        this.running = new Set();
        /**
         * 任务队列
         */
        this.queue = [];
        /**
         * 标记队列是否暂停
         */
        this.paused = false;

    }

    /**
     * 加入队列
     * @param {Function} callback callback(symbol,done) 执行回调任务
     * @param {Boolean} unshift 是否优先
     */
    push(callback = (done = () => { }) => { }, unshift = false) {
        // 为任务生成唯一的标识符
        let symbol = Symbol();
        // 将任务添加到队列中
        if (!!unshift) this.queue.unshift({ symbol, callback });
        else this.queue.push({ symbol, callback });
        // 开始执行队列中的任务
        if (this.paused === false) this.next();
        // 返回标识
        return symbol;
    }

    /**
     * 队列优先
     * @param {Symbol} symbol 
     */
    unshift(symbol = Symbol()) {
        this.pause();
        let element = null;
        let index = this.queue.findIndex(task => task.symbol === symbol);
        if (index !== -1) element = this.queue.splice(index, 1)[0];
        if (element !== null) this.queue.unshift(element);
        this.resume();
        return element;
    }

    /**
     * 移除任务
     * @param {Symbol} symbol 
     */
    remove(symbol = Symbol()) {
        this.pause();
        let index = this.queue.findIndex(task => task.symbol === symbol);
        if (index !== -1) this.queue.splice(index, 1);
        this.resume();
    }

    /**
     * 下一个任务
     */
    next() {
        while (this.paused === false && this.running.size < this.concurrence && this.queue.length) {
            // 从队列中取出任务
            let { symbol, callback } = this.queue.shift();
            // 增加当前正在运行的任务数
            this.running.add(symbol);
            callback(symbol, () => {
                // 减少当前正在运行的任务数
                this.running.delete(symbol)
                // 执行下一个任务
                this.next();
            });
        }
    }

    /**
     * 暂停队列
     */
    pause() { this.paused = true; }

    /**
     * 恢复队列
     */
    resume() { this.paused = false; this.next(); }

    /**
     * 清空队列
     */
    clear() { this.queue = []; }

    /**
     * 停止队列
     */
    stop() {
        this.clear();
        this.running.clear();
    }
};

文章作者: CaptainTwo
版权声明: 本站所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 CaptainTwo
NodeJs JavaScript NodeJs JavaScript
喜欢就支持一下吧