forEachAsync<T> function

Future<Null> forEachAsync <T>(
  1. Iterable<T> iterable,
  2. AsyncAction<Null, T> action,
  3. {int maxTasks: 1}
)

Schedules calls to action for each element in iterable. No more than maxTasks calls to action will be pending at once.

Implementation

Future<Null> forEachAsync<T>(Iterable<T> iterable, AsyncAction<Null, T> action,
    {int maxTasks = 1}) {
  if (maxTasks == null || maxTasks < 1) {
    throw ArgumentError('maxTasks must be greater than 0, was: $maxTasks');
  }

  if (iterable == null) {
    throw ArgumentError('iterable must not be null');
  }

  if (iterable.isEmpty) return Future.value();

  var completer = Completer<Null>();
  var iterator = iterable.iterator;
  int pending = 0;
  bool failed = false;

  bool scheduleTask() {
    if (pending < maxTasks && iterator.moveNext()) {
      pending++;
      var item = iterator.current;
      scheduleMicrotask(() {
        var task = action(item);
        task.then((_) {
          pending--;
          if (failed) return;
          if (!scheduleTask() && pending == 0) {
            completer.complete();
          }
        }).catchError((e, stack) {
          if (failed) return;
          failed = true;
          completer.completeError(e, stack);
        });
      });
      return true;
    }
    return false;
  }

  while (scheduleTask()) {}
  return completer.future;
}