Write Your Own Asynchronous Task Queue with Concurrency Control in JavaScript (Part 3)

In the previous part, we've made the task queue usable. However, for better performance, we usually want multiple tasks in the queue run simultaneously.

What comes to your mind first?

Start Multiple Tasks Simultaneously

We are dealing with an asynchronous issue, but we can still take something that runs synchronously: The for loop.

Think about this:

var tasks = [];
var concurrency = 10;

for (var i = 0; i < concurrency; i++) {
    next();
}

function next() {
    var task = tasks.shift();
    
    task(function () {
        next();
    });
}

The idea behind it couldn't be simpler: Start some tasks, when one of them completes, start another one. Thus the number of running tasks can always be a fixed number if there are sufficient tasks in the queue.

And if there's no more tasks to start:

  1. Do nothing if it's not the last running task completing.
  2. Callback if it is the last running task.

Callback at Proper Time

We've added the variable settled to help determine the callback status. Now we also need to determine whether we really should start another queued task after completing the current one.

The rule we'll set here is: If any error happens during running these tasks, callback with the error and no other tasks will be started.

To achieve this, we can add another variable queueSettled.

Let's again put the lines together:

function runTasks(tasks, concurrency, callback, onprogress) {
    tasks = tasks.concat();
    
    var completed = 0;
    var total = tasks.length;
    
    // To determine whether we need to start the next task when one completes.
    var queueSettled = false;
    
    var sync = true;
    
    // Concurrency can't be greater than total number of tasks.
    concurrency = Math.min(concurrency, total);
    
    // Start multiple tasks.
    for (var i = 0; i < concurrency; i++) {
        next();
    }
    
    sync = false;

    function invokeCallback(error) {
        if (sync) {
            setTimeout(function () {
                callback(error);
            }, 0);
        } else {
            callback(error);
        }
    }

    function next() {
        onprogress(completed, total);

        if (tasks.length === 0) {
            // Now `tasks.length === 0` doesn't mean all tasks have completed any more.
            // So we need to compare `completed` with `total`.
            if (completed === total) {
                invokeCallback();
            }
            
            return;
        }

        var task = tasks.shift();
        var settled = false;

        try {
            task(function (error) {
                if (settled) {
                    return;
                }

                settled = true;

                if (error) {
                    queueSettled = true;
                    invokeCallback(error);
                } else {
                    completed++;
                    if (!queueSettled) {
                        next();
                    }
                }
            });
        } catch (error) {
            if (settled) {
                return;
            }

            settled = true;
            queueSettled = true;
            invokeCallback(error);
        }
    }
}

Seems to be good, right? But there are some small problems.

If the tasks complete synchronously, the first next call in for loop might actually run out all tasks, and later calls in the same loop might invoke next unnecessarily as there might not be task in the queue any more.

As a result, onprogress might be called multiple times with exactly the same completed argument. It could be okay but we want it more "predictable".

The solution is simple too.

We added the flag sync to help determine whether we need an extra setTimeout to make sure callback of function runTasks is called asynchronously.

But we could have a easier way: Use setTimeout to start the first task. And now we'll use setTimeout to start the first several tasks.

Another problem is we forgot to deal with empty tasks queues.

So here's what we get:

function runTasks(tasks, concurrency, callback, onprogress) {
    tasks = tasks.concat();
    
    var completed = 0;
    var total = tasks.length;
    var queueSettled = false;
    
    var sync = true;
    
    // At least trigger `next` one time.
    concurrency = Math.max(Math.min(concurrency, total), 1);
    
    for (var i = 0; i < concurrency; i++) {
        // Use `setTimeout`.
        setTimeout(next, 0);
    }
    
    sync = false;

    function invokeCallback(error) {
        if (sync) {
            setTimeout(function () {
                callback(error);
            }, 0);
        } else {
            callback(error);
        }
    }

    function next() {
        onprogress(completed, total);

        if (tasks.length === 0) {
            if (completed === total) {
                invokeCallback();
            }
            
            return;
        }

        var task = tasks.shift();
        var settled = false;

        try {
            task(function (error) {
                if (settled) {
                    return;
                }

                settled = true;

                if (error) {
                    queueSettled = true;
                    invokeCallback(error);
                } else {
                    completed++;
                    if (!queueSettled) {
                        next();
                    }
                }
            });
        } catch (error) {
            if (settled) {
                return;
            }

            settled = true;
            queueSettled = true;
            invokeCallback(error);
        }
    }
}

I bet it's much easier that what you had thought about!

Pause and Resume

The lines have been growing, but from this section on, I'll stop updating these lines that are never tested (though I will test them some day after I complete this part, I think). :P

Pausing running tasks could be non-trivial. Before start the implementation, you could consider what you want to achieve:

  • Will pausing the task queue wait for current running tasks to complete?
  • Or will it abort current running tasks?

The first one is relatively easy and does not require given tasks to provide handlers to abort. But anyway the second one can fallback to the first one.

And you need to think how you want to achieve it, of course. E.g. how to expose the API?

Retry on Failures

Now we are shifting the task queue for getting the next task to run. So there's two options come to my mind:

  1. Queue the failed task again.
  2. Keep the retrying process in the same scope after fetching the task at the first place.

I tend to use the second approach as I see less indeterminacy on it. To implement, we can either create a sub function inside next to split the logic into get the task from task queue (next) and try (sub function). Or just use one next function to get the task from a failed execution or task queue and try.

Also we usually don't want a task being retried unlimited times, a limitation could be applied as an option.

Summary of Part 3

In this part, we added concurrency control to the task queue and discussed about some extra features. The closer to our goals, the worse we found the API we provided is. The simple runTasks function have carried too much and it certainly could be something better, e.g. a method on a task queue instance.

But we've achieved our goals anyway. Finding out problems sometimes is better than just feeling good, and do not be too lazy to face them. ;)