diff --git a/pkgs/async/CHANGELOG.md b/pkgs/async/CHANGELOG.md index 5e5d952a..44ecb34b 100644 --- a/pkgs/async/CHANGELOG.md +++ b/pkgs/async/CHANGELOG.md @@ -1,8 +1,10 @@ -## 2.13.1-wip +## 2.14.0-wip - Fix `StreamGroup.broadcast().close()` to properly complete when all streams in the group close without being explicitly removed. - Run `dart format` with the new style. +* Add `CancelableOperationGroup`. + ## 2.13.0 - Fix type check and cast in SubscriptionStream's cancelOnError wrapper diff --git a/pkgs/async/lib/src/future_group.dart b/pkgs/async/lib/src/future_group.dart index daf985d3..c223ca1f 100644 --- a/pkgs/async/lib/src/future_group.dart +++ b/pkgs/async/lib/src/future_group.dart @@ -4,6 +4,12 @@ import 'dart:async'; +import 'cancelable_operation.dart'; + +/// A sentinel object indicating that a member of a [FutureGroup] was canceled +/// rather than completing normally. +final _canceledResult = Object(); + /// A collection of futures waits until all added [Future]s complete. /// /// Futures are added to the group with [add]. Once you're finished adding @@ -61,12 +67,26 @@ class FutureGroup implements Sink> { /// The values emitted by the futures that have been added to the group, in /// the order they were added. /// - /// The slots for futures that haven't completed yet are `null`. - final _values = []; + /// This is type `Object?` rather than `T?` so it can contain + /// [_canceledResult]. The slots for futures that haven't completed yet are + /// `null`. + final _values = []; /// Wait for [task] to complete. @override - void add(Future task) { + void add(Future task) => _add(task); + + /// Wait for [task] to complete. + /// + /// If [task] is canceled, it's removed from the group without adding a value + /// to [future]. + void addCancelable(CancelableOperation task) { + _add(task + .then((value) => value, onCancel: () => _canceledResult) + .valueOrCancellation()); + } + + void _add(Future task) { if (_closed) throw StateError('The FutureGroup is closed.'); // Ensure that future values are put into [values] in the same order they're @@ -88,7 +108,10 @@ class FutureGroup implements Sink> { if (!_closed) return null; if (onIdleController != null) onIdleController.close(); - _completer.complete(_values.whereType().toList()); + _completer.complete([ + for (var value in _values) + if (value != _canceledResult) value as T + ]); }).catchError((Object error, StackTrace stackTrace) { if (_completer.isCompleted) return null; _completer.completeError(error, stackTrace); @@ -102,6 +125,9 @@ class FutureGroup implements Sink> { _closed = true; if (_pending != 0) return; if (_completer.isCompleted) return; - _completer.complete(_values.whereType().toList()); + _completer.complete([ + for (var value in _values) + if (value != _canceledResult) value as T + ]); } } diff --git a/pkgs/async/pubspec.yaml b/pkgs/async/pubspec.yaml index 688c14b6..2b05f41a 100644 --- a/pkgs/async/pubspec.yaml +++ b/pkgs/async/pubspec.yaml @@ -1,5 +1,5 @@ name: async -version: 2.13.1-wip +version: 2.14.0-wip description: Utility functions and classes related to the 'dart:async' library. repository: https://github.com/dart-lang/core/tree/main/pkgs/async issue_tracker: https://github.com/dart-lang/core/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Aasync diff --git a/pkgs/async/test/future_group_test.dart b/pkgs/async/test/future_group_test.dart index effca5be..8dee9525 100644 --- a/pkgs/async/test/future_group_test.dart +++ b/pkgs/async/test/future_group_test.dart @@ -4,6 +4,7 @@ import 'dart:async'; +import 'package:async/src/cancelable_operation.dart'; import 'package:async/src/future_group.dart'; import 'package:test/test.dart'; @@ -92,6 +93,22 @@ void main() { expect(completed, isTrue); }); + test('a canceled operation doesn\'t block completion', () { + var completer1 = Completer(); + var completer2 = CancelableCompleter(); + var completer3 = Completer(); + + futureGroup.add(completer1.future); + futureGroup.addCancelable(completer2.operation); + futureGroup.add(completer3.future); + futureGroup.close(); + + completer3.complete(3); + completer2.operation.cancel(); + completer1.complete(1); + expect(futureGroup.future, completion(equals([1, 3]))); + }); + test('completes to the values of the futures in order of addition', () { var completer1 = Completer(); var completer2 = Completer();