From 32cd9b6f5ed70f0b6279e3c7b0120a3561400397 Mon Sep 17 00:00:00 2001 From: Natalie Weizenbaum Date: Wed, 26 Nov 2025 15:14:33 -0800 Subject: [PATCH 1/3] Add `FutureGroup.addCancelable()` --- pkgs/async/CHANGELOG.md | 4 +++- pkgs/async/lib/src/future_group.dart | 30 ++++++++++++++++++++------ pkgs/async/pubspec.yaml | 2 +- pkgs/async/test/future_group_test.dart | 17 +++++++++++++++ 4 files changed, 45 insertions(+), 8 deletions(-) diff --git a/pkgs/async/CHANGELOG.md b/pkgs/async/CHANGELOG.md index 5e5d952ac..44ecb34b6 100644 --- a/pkgs/async/CHANGELOG.md +++ b/pkgs/async/CHANGELOG.md @@ -1,8 +1,10 @@ -## 2.13.1-wip +## 2.14.0-wip - Fix `StreamGroup.broadcast().close()` to properly complete when all streams in the group close without being explicitly removed. - Run `dart format` with the new style. +* Add `CancelableOperationGroup`. + ## 2.13.0 - Fix type check and cast in SubscriptionStream's cancelOnError wrapper diff --git a/pkgs/async/lib/src/future_group.dart b/pkgs/async/lib/src/future_group.dart index daf985d3f..bd0783b5b 100644 --- a/pkgs/async/lib/src/future_group.dart +++ b/pkgs/async/lib/src/future_group.dart @@ -4,6 +4,12 @@ import 'dart:async'; +import 'cancelable_operation.dart'; + +/// A sentinel object indicating that a member of a [FutureGroup] was canceled +/// rather than completing normally. +const _canceledResult = Object(); + /// A collection of futures waits until all added [Future]s complete. /// /// Futures are added to the group with [add]. Once you're finished adding @@ -61,12 +67,21 @@ class FutureGroup implements Sink> { /// The values emitted by the futures that have been added to the group, in /// the order they were added. /// - /// The slots for futures that haven't completed yet are `null`. - final _values = []; + /// This is type `Object?` rather than `T?` so it can contain + /// [_canceledResult]. The slots for futures that haven't completed yet are + /// `null`. + final _values = []; /// Wait for [task] to complete. @override - void add(Future task) { + void add(Future task) => + addCancelable(CancelableOperation.fromFuture(task)); + + /// Wait for [task] to complete. + /// + /// If [task] is canceled, it's removed from the group without adding a value + /// to [future]. + void addCancelable(CancelableOperation task) { if (_closed) throw StateError('The FutureGroup is closed.'); // Ensure that future values are put into [values] in the same order they're @@ -76,11 +91,11 @@ class FutureGroup implements Sink> { _values.add(null); _pending++; - task.then((value) { + task.valueOrCancellation().then((value) { if (_completer.isCompleted) return null; _pending--; - _values[index] = value; + _values[index] = task.isCanceled ? _canceledResult : value; if (_pending != 0) return null; var onIdleController = _onIdleController; @@ -88,7 +103,10 @@ class FutureGroup implements Sink> { if (!_closed) return null; if (onIdleController != null) onIdleController.close(); - _completer.complete(_values.whereType().toList()); + _completer.complete([ + for (var value in _values) + if (value != _canceledResult && value is T) value + ]); }).catchError((Object error, StackTrace stackTrace) { if (_completer.isCompleted) return null; _completer.completeError(error, stackTrace); diff --git a/pkgs/async/pubspec.yaml b/pkgs/async/pubspec.yaml index 688c14b6e..2b05f41a3 100644 --- a/pkgs/async/pubspec.yaml +++ b/pkgs/async/pubspec.yaml @@ -1,5 +1,5 @@ name: async -version: 2.13.1-wip +version: 2.14.0-wip description: Utility functions and classes related to the 'dart:async' library. repository: https://github.com/dart-lang/core/tree/main/pkgs/async issue_tracker: https://github.com/dart-lang/core/issues?q=is%3Aissue+is%3Aopen+label%3Apackage%3Aasync diff --git a/pkgs/async/test/future_group_test.dart b/pkgs/async/test/future_group_test.dart index effca5be4..8dee9525d 100644 --- a/pkgs/async/test/future_group_test.dart +++ b/pkgs/async/test/future_group_test.dart @@ -4,6 +4,7 @@ import 'dart:async'; +import 'package:async/src/cancelable_operation.dart'; import 'package:async/src/future_group.dart'; import 'package:test/test.dart'; @@ -92,6 +93,22 @@ void main() { expect(completed, isTrue); }); + test('a canceled operation doesn\'t block completion', () { + var completer1 = Completer(); + var completer2 = CancelableCompleter(); + var completer3 = Completer(); + + futureGroup.add(completer1.future); + futureGroup.addCancelable(completer2.operation); + futureGroup.add(completer3.future); + futureGroup.close(); + + completer3.complete(3); + completer2.operation.cancel(); + completer1.complete(1); + expect(futureGroup.future, completion(equals([1, 3]))); + }); + test('completes to the values of the futures in order of addition', () { var completer1 = Completer(); var completer2 = Completer(); From 797bd33e6ba5957d9afd6d8a208b52fbd4d979e6 Mon Sep 17 00:00:00 2001 From: Natalie Weizenbaum Date: Wed, 26 Nov 2025 15:36:08 -0800 Subject: [PATCH 2/3] Code review --- pkgs/async/lib/src/future_group.dart | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/pkgs/async/lib/src/future_group.dart b/pkgs/async/lib/src/future_group.dart index bd0783b5b..1089fa2c6 100644 --- a/pkgs/async/lib/src/future_group.dart +++ b/pkgs/async/lib/src/future_group.dart @@ -8,7 +8,7 @@ import 'cancelable_operation.dart'; /// A sentinel object indicating that a member of a [FutureGroup] was canceled /// rather than completing normally. -const _canceledResult = Object(); +final _canceledResult = Object(); /// A collection of futures waits until all added [Future]s complete. /// @@ -74,14 +74,19 @@ class FutureGroup implements Sink> { /// Wait for [task] to complete. @override - void add(Future task) => - addCancelable(CancelableOperation.fromFuture(task)); + void add(Future task) => _add(task); /// Wait for [task] to complete. /// /// If [task] is canceled, it's removed from the group without adding a value /// to [future]. void addCancelable(CancelableOperation task) { + _add(task + .then((value) => value, onCancel: () => _canceledResult) + .valueOrCancellation()); + } + + void _add(Future task) { if (_closed) throw StateError('The FutureGroup is closed.'); // Ensure that future values are put into [values] in the same order they're @@ -91,11 +96,11 @@ class FutureGroup implements Sink> { _values.add(null); _pending++; - task.valueOrCancellation().then((value) { + task.then((value) { if (_completer.isCompleted) return null; _pending--; - _values[index] = task.isCanceled ? _canceledResult : value; + _values[index] = value; if (_pending != 0) return null; var onIdleController = _onIdleController; @@ -104,9 +109,9 @@ class FutureGroup implements Sink> { if (!_closed) return null; if (onIdleController != null) onIdleController.close(); _completer.complete([ - for (var value in _values) - if (value != _canceledResult && value is T) value - ]); + for (var value in _values) + if (value != _canceledResult) value as T + ]); }).catchError((Object error, StackTrace stackTrace) { if (_completer.isCompleted) return null; _completer.completeError(error, stackTrace); @@ -120,6 +125,9 @@ class FutureGroup implements Sink> { _closed = true; if (_pending != 0) return; if (_completer.isCompleted) return; - _completer.complete(_values.whereType().toList()); + _completer.complete([ + for (var value in _values) + if (value != _canceledResult) value as T + ]); } } From 63d96b00cdec1cc1eb4048604d2ef339edc166d7 Mon Sep 17 00:00:00 2001 From: Natalie Weizenbaum Date: Wed, 26 Nov 2025 17:32:26 -0800 Subject: [PATCH 3/3] Reformat --- pkgs/async/lib/src/future_group.dart | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkgs/async/lib/src/future_group.dart b/pkgs/async/lib/src/future_group.dart index 1089fa2c6..c223ca1f7 100644 --- a/pkgs/async/lib/src/future_group.dart +++ b/pkgs/async/lib/src/future_group.dart @@ -109,9 +109,9 @@ class FutureGroup implements Sink> { if (!_closed) return null; if (onIdleController != null) onIdleController.close(); _completer.complete([ - for (var value in _values) - if (value != _canceledResult) value as T - ]); + for (var value in _values) + if (value != _canceledResult) value as T + ]); }).catchError((Object error, StackTrace stackTrace) { if (_completer.isCompleted) return null; _completer.completeError(error, stackTrace);