|
2 | 2 |
|
3 | 3 | namespace React\Async;
|
4 | 4 |
|
| 5 | +use Fiber; |
5 | 6 | use React\EventLoop\Loop;
|
6 | 7 | use React\Promise\CancellablePromiseInterface;
|
7 | 8 | use React\Promise\Deferred;
|
|
148 | 149 | * });
|
149 | 150 | * ```
|
150 | 151 | *
|
| 152 | + * Promises returned by `async()` can be cancelled, and when done any currently |
| 153 | + * and future awaited promise inside that and any nested fibers with their |
| 154 | + * awaited promises will also be cancelled. As such the following example will |
| 155 | + * only output `ab` as the [`sleep()`](https://reactphp.org/promise-timer/#sleep) |
| 156 | + * between `a` and `b` is cancelled throwing a timeout exception that bubbles up |
| 157 | + * through the fibers ultimately to the end user through the [`await()`](#await) |
| 158 | + * on the last line of the example. |
| 159 | + * |
| 160 | + * ```php |
| 161 | + * $promise = async(static function (): int { |
| 162 | + * echo 'a'; |
| 163 | + * await(async(static function(): void { |
| 164 | + * echo 'b'; |
| 165 | + * await(sleep(2)); |
| 166 | + * echo 'c'; |
| 167 | + * })()); |
| 168 | + * echo 'd'; |
| 169 | + * |
| 170 | + * return time(); |
| 171 | + * })(); |
| 172 | + * |
| 173 | + * $promise->cancel(); |
| 174 | + * await($promise); |
| 175 | + * ``` |
| 176 | + * |
151 | 177 | * @param callable(mixed ...$args):mixed $function
|
152 | 178 | * @return callable(): PromiseInterface<mixed>
|
153 | 179 | * @since 4.0.0
|
154 | 180 | * @see coroutine()
|
155 | 181 | */
|
156 | 182 | function async(callable $function): callable
|
157 | 183 | {
|
158 |
| - return static fn (mixed ...$args): PromiseInterface => new Promise(function (callable $resolve, callable $reject) use ($function, $args): void { |
159 |
| - $fiber = new \Fiber(function () use ($resolve, $reject, $function, $args): void { |
160 |
| - try { |
161 |
| - $resolve($function(...$args)); |
162 |
| - } catch (\Throwable $exception) { |
163 |
| - $reject($exception); |
| 184 | + return static function (mixed ...$args) use ($function): PromiseInterface { |
| 185 | + $fiber = null; |
| 186 | + $promise = new Promise(function (callable $resolve, callable $reject) use ($function, $args, &$fiber): void { |
| 187 | + $fiber = new \Fiber(function () use ($resolve, $reject, $function, $args, &$fiber): void { |
| 188 | + try { |
| 189 | + $resolve($function(...$args)); |
| 190 | + } catch (\Throwable $exception) { |
| 191 | + $reject($exception); |
| 192 | + } finally { |
| 193 | + fiberMap()->unregister($fiber); |
| 194 | + } |
| 195 | + }); |
| 196 | + |
| 197 | + fiberMap()->register($fiber); |
| 198 | + |
| 199 | + $fiber->start(); |
| 200 | + }, function () use (&$fiber): void { |
| 201 | + if ($fiber instanceof Fiber) { |
| 202 | + fiberMap()->cancel($fiber); |
| 203 | + foreach (fiberMap()->getPromises($fiber) as $promise) { |
| 204 | + if (method_exists($promise, 'cancel')) { |
| 205 | + $promise->cancel(); |
| 206 | + } |
| 207 | + } |
164 | 208 | }
|
165 | 209 | });
|
166 | 210 |
|
167 |
| - $fiber->start(); |
168 |
| - }); |
| 211 | + $lowLevelFiber = \Fiber::getCurrent(); |
| 212 | + if ($lowLevelFiber !== null) { |
| 213 | + fiberMap()->attachPromise($lowLevelFiber, $promise); |
| 214 | + } |
| 215 | + |
| 216 | + return $promise; |
| 217 | + }; |
169 | 218 | }
|
170 | 219 |
|
171 | 220 |
|
@@ -230,6 +279,13 @@ function await(PromiseInterface $promise): mixed
|
230 | 279 | $rejected = false;
|
231 | 280 | $resolvedValue = null;
|
232 | 281 | $rejectedThrowable = null;
|
| 282 | + $lowLevelFiber = \Fiber::getCurrent(); |
| 283 | + |
| 284 | + if ($lowLevelFiber !== null) { |
| 285 | + if (fiberMap()->isCanceled($lowLevelFiber) && $promise instanceof CancellablePromiseInterface) { |
| 286 | + $promise->cancel(); |
| 287 | + } |
| 288 | + } |
233 | 289 |
|
234 | 290 | $promise->then(
|
235 | 291 | function (mixed $value) use (&$resolved, &$resolvedValue, &$fiber): void {
|
@@ -285,6 +341,10 @@ function (mixed $throwable) use (&$rejected, &$rejectedThrowable, &$fiber): void
|
285 | 341 | throw $rejectedThrowable;
|
286 | 342 | }
|
287 | 343 |
|
| 344 | + if ($lowLevelFiber !== null) { |
| 345 | + fiberMap()->attachPromise($lowLevelFiber, $promise); |
| 346 | + } |
| 347 | + |
288 | 348 | $fiber = FiberFactory::create();
|
289 | 349 |
|
290 | 350 | return $fiber->suspend();
|
@@ -601,3 +661,17 @@ function waterfall(array $tasks): PromiseInterface
|
601 | 661 |
|
602 | 662 | return $deferred->promise();
|
603 | 663 | }
|
| 664 | + |
| 665 | +/** |
| 666 | + * @internal |
| 667 | + */ |
| 668 | +function fiberMap(): FiberMap |
| 669 | +{ |
| 670 | + static $wm = null; |
| 671 | + |
| 672 | + if ($wm === null) { |
| 673 | + $wm = new FiberMap(); |
| 674 | + } |
| 675 | + |
| 676 | + return $wm; |
| 677 | +} |
0 commit comments