diff --git a/app/lib/task/loops/delete_instances.dart b/app/lib/task/loops/delete_instances.dart new file mode 100644 index 000000000..f023cde8f --- /dev/null +++ b/app/lib/task/loops/delete_instances.dart @@ -0,0 +1,103 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:basics/basics.dart'; +import 'package:clock/clock.dart'; +import 'package:logging/logging.dart'; +import 'package:meta/meta.dart'; +import 'package:pub_dev/task/cloudcompute/cloudcompute.dart'; + +final _log = Logger('pub.task.scan_instances'); + +/// The internal state for scanning and deleting instances. +final class DeleteInstancesState { + // Maps the `CloudInstance.instanceName` to the deletion + // start timestamp. + final Map deletions; + + DeleteInstancesState({required this.deletions}); + + factory DeleteInstancesState.init() => DeleteInstancesState(deletions: {}); +} + +/// The result of the scan and delete instances operation. +final class DeleteInstancesNextState { + /// The next state of the data. + final DeleteInstancesState state; + + /// Completes when the microtask-scheduled deletion operations are completed. + /// + /// It is not feasible to wait for this in production, but can be used in tests. + @visibleForTesting + final Future deletionsDone; + + DeleteInstancesNextState({required this.state, required this.deletionsDone}); +} + +/// Calculates the next state of delete instances loop by processing +/// the input [instances]. +Future scanAndDeleteInstances( + DeleteInstancesState state, + List instances, + Future Function(String zone, String instanceName) deleteInstanceFn, + bool Function() isAbortedFn, { + required int maxTaskRunHours, +}) async { + final keepTreshold = clock.ago(minutes: 5); + final deletionInProgress = { + ...state.deletions.whereValue((v) => v.isAfter(keepTreshold)), + }; + + final futures = []; + for (final instance in instances) { + if (isAbortedFn()) { + break; + } + + // Prevent multiple calls to delete the same instance. + if (deletionInProgress.containsKey(instance.instanceName)) { + continue; + } + + // If terminated or older than maxInstanceAge, delete the instance... + final isTerminated = instance.state == InstanceState.terminated; + final isTooOld = instance.created + .add(Duration(hours: maxTaskRunHours)) + .isBefore(clock.now()); + + if (isTooOld) { + // This indicates that something is wrong the with the instance, + // ideally it should have detected its own deadline being violated + // and terminated on its own. Of course, this can fail for arbitrary + // reasons in a distributed system. + _log.warning('terminating $instance for being too old!'); + } else if (isTerminated) { + _log.info('deleting $instance as it has terminated.'); + } else { + // Do not delete this instance + continue; + } + + deletionInProgress[instance.instanceName] = clock.now(); + + final completer = Completer(); + scheduleMicrotask(() async { + try { + await deleteInstanceFn(instance.zone, instance.instanceName); + } catch (e, st) { + _log.severe('Failed to delete $instance', e, st); + } finally { + completer.complete(); + } + }); + futures.add(completer.future); + } + + return DeleteInstancesNextState( + state: DeleteInstancesState(deletions: deletionInProgress), + deletionsDone: futures.isEmpty ? Future.value() : Future.wait(futures), + ); +} diff --git a/app/lib/task/scheduler.dart b/app/lib/task/scheduler.dart index e69e87ad7..296b7eb98 100644 --- a/app/lib/task/scheduler.dart +++ b/app/lib/task/scheduler.dart @@ -14,6 +14,7 @@ import 'package:pub_dev/task/backend.dart'; import 'package:pub_dev/task/clock_control.dart'; import 'package:pub_dev/task/cloudcompute/cloudcompute.dart'; import 'package:pub_dev/task/global_lock.dart'; +import 'package:pub_dev/task/loops/delete_instances.dart'; import 'package:pub_dev/task/models.dart'; final _log = Logger('pub.task.schedule'); @@ -60,64 +61,35 @@ Future schedule( } } - // Set of `CloudInstance.instanceName`s currently being deleted. - // This to avoid deleting instances where the deletion process is still - // running. - final deletionInProgress = {}; + var deleteInstancesState = DeleteInstancesState.init(); // Create a fast RNG with random seed for picking zones. final rng = Random(Random.secure().nextInt(2 << 31)); + bool isAbortedFn() => !claim.valid || abort.isCompleted; + // Run scheduling iterations, so long as we have a valid claim - while (claim.valid && !abort.isCompleted) { + while (!isAbortedFn()) { final iterationStart = clock.now(); _log.info('Starting scheduling cycle'); - // Count number of instances, and delete old instances - var instances = 0; - await for (final instance in compute.listInstances()) { - instances += 1; // count the instance - - // If terminated or older than maxInstanceAge, delete the instance... - final isTerminated = instance.state == InstanceState.terminated; - final isTooOld = instance.created - .add(Duration(hours: activeConfiguration.maxTaskRunHours)) - .isBefore(clock.now()); - // Also check deletionInProgress to prevent multiple calls to delete the - // same instance - final isBeingDeleted = deletionInProgress.contains(instance.instanceName); - if ((isTerminated || isTooOld) && !isBeingDeleted) { - if (isTooOld) { - // This indicates that something is wrong the with the instance, - // ideally it should have detected its own deadline being violated - // and terminated on its own. Of course, this can fail for arbitrary - // reasons in a distributed system. - _log.warning('terminating $instance for being too old!'); - } else if (isTerminated) { - _log.info('deleting $instance as it has terminated.'); - } - - deletionInProgress.add(instance.instanceName); - scheduleMicrotask(() async { - final deletionStart = clock.now(); - try { - await compute.delete(instance.zone, instance.instanceName); - } catch (e, st) { - _log.severe('Failed to delete $instance', e, st); - } finally { - // Wait at-least 5 minutes from start of deletion until we remove - // it from [deletionInProgress] that way we give the API some time - // reconcile state. - await sleepOrAborted(Duration(minutes: 5), since: deletionStart); - deletionInProgress.remove(instance.instanceName); - } - }); - } + final instances = await compute.listInstances().toList(); + final nextDeleteInstancesState = await scanAndDeleteInstances( + deleteInstancesState, + instances, + compute.delete, + isAbortedFn, + maxTaskRunHours: activeConfiguration.maxTaskRunHours, + ); + deleteInstancesState = nextDeleteInstancesState.state; + if (isAbortedFn()) { + break; } + _log.info('Found $instances instances'); // If we are not allowed to create new instances within the allowed quota, - if (activeConfiguration.maxTaskInstances <= instances) { + if (activeConfiguration.maxTaskInstances <= instances.length) { _log.info('Reached instance limit, trying again in 30s'); // Wait 30 seconds then list instances again, so that we can count them await sleepOrAborted(Duration(seconds: 30), since: iterationStart); @@ -145,7 +117,7 @@ Future schedule( var pendingPackagesReviewed = 0; final selectLimit = min( _maxInstancesPerIteration, - max(0, activeConfiguration.maxTaskInstances - instances), + max(0, activeConfiguration.maxTaskInstances - instances.length), ); Future scheduleInstance(({String package}) selected) async { @@ -252,14 +224,14 @@ Future schedule( // If there was no pending packages reviewed, and no instances currently // running, then we can easily sleep 5 minutes before we poll again. - if (instances == 0 && pendingPackagesReviewed == 0) { + if (instances.isEmpty && pendingPackagesReviewed == 0) { await sleepOrAborted(Duration(minutes: 5)); continue; } // If more tasks is available and quota wasn't used up, we only sleep 10s if (pendingPackagesReviewed >= _maxInstancesPerIteration && - activeConfiguration.maxTaskInstances > instances) { + activeConfiguration.maxTaskInstances > instances.length) { await sleepOrAborted(Duration(seconds: 10), since: iterationStart); continue; } diff --git a/app/test/task/loops/delete_instances_test.dart b/app/test/task/loops/delete_instances_test.dart new file mode 100644 index 000000000..0f334c894 --- /dev/null +++ b/app/test/task/loops/delete_instances_test.dart @@ -0,0 +1,167 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'package:clock/clock.dart'; +import 'package:pub_dev/task/cloudcompute/cloudcompute.dart'; +import 'package:pub_dev/task/loops/delete_instances.dart'; +import 'package:test/test.dart'; + +void main() { + group('task scan: delete cloud instances', () { + final referenceNow = clock.now(); + + test('fresh instance is not deleted', () async { + await withClock(Clock.fixed(referenceNow), () async { + final deletions = {}; + final next = await scanAndDeleteInstances( + DeleteInstancesState.init(), + [ + _CloudInstance( + instanceName: 'a', + created: referenceNow.subtract(Duration(minutes: 18)), + ), + ], + (zone, name) async { + deletions[name] = zone; + }, + () => false, + maxTaskRunHours: 1, + ); + expect(next.state.deletions, isEmpty); + expect(deletions, {}); + }); + }); + + test('old instance is deleted', () async { + await withClock(Clock.fixed(referenceNow), () async { + final deletions = {}; + final next = await scanAndDeleteInstances( + DeleteInstancesState.init(), + [ + _CloudInstance( + instanceName: 'a', + created: referenceNow.subtract(Duration(minutes: 78)), + ), + ], + (zone, name) async { + deletions[name] = zone; + }, + () => false, + maxTaskRunHours: 1, + ); + expect(next.state.deletions, hasLength(1)); + expect(next.state.deletions.containsKey('a'), isTrue); + + // Wait for the async deletion to complete + await next.deletionsDone; + expect(deletions, {'a': 'z1'}); + }); + }); + + test('terminated instance is deleted', () async { + await withClock(Clock.fixed(referenceNow), () async { + final deletions = {}; + final next = await scanAndDeleteInstances( + DeleteInstancesState.init(), + [ + _CloudInstance( + instanceName: 'a', + created: referenceNow.subtract(Duration(minutes: 18)), + state: InstanceState.terminated, + ), + ], + (zone, name) async { + deletions[name] = zone; + }, + () => false, + maxTaskRunHours: 1, + ); + expect(next.state.deletions, hasLength(1)); + expect(next.state.deletions.containsKey('a'), isTrue); + + // Wait for the async deletion to complete + await next.deletionsDone; + expect(deletions, {'a': 'z1'}); + }); + }); + + test('pending delete is kept within 5 minutes', () async { + await withClock(Clock.fixed(referenceNow), () async { + final deletions = {}; + final next = await scanAndDeleteInstances( + DeleteInstancesState(deletions: {'a': clock.ago(minutes: 3)}), + [ + _CloudInstance( + instanceName: 'a', + created: referenceNow.subtract(Duration(minutes: 78)), + ), + ], + (zone, name) async { + deletions[name] = zone; + }, + () => false, + maxTaskRunHours: 1, + ); + expect(next.state.deletions, hasLength(1)); + // Wait for the async deletion to complete + await next.deletionsDone; + expect(deletions, {}); + }); + }); + + test('pending delete is removed after 5 minutes', () async { + await withClock(Clock.fixed(referenceNow), () async { + final deletions = {}; + final next = await scanAndDeleteInstances( + DeleteInstancesState(deletions: {'a': clock.ago(minutes: 8)}), + [_CloudInstance(created: clock.now(), instanceName: 'b')], + (zone, name) async { + deletions[name] = zone; + }, + () => false, + maxTaskRunHours: 1, + ); + expect(next.state.deletions, isEmpty); + await next.deletionsDone; + expect(deletions, {}); + }); + }); + + test('pending delete is refreshed after 5 minutes', () async { + await withClock(Clock.fixed(referenceNow), () async { + final deletions = {}; + final next = await scanAndDeleteInstances( + DeleteInstancesState(deletions: {'a': clock.ago(minutes: 8)}), + [_CloudInstance(created: clock.ago(minutes: 78), instanceName: 'a')], + (zone, name) async { + deletions[name] = zone; + }, + () => false, + maxTaskRunHours: 1, + ); + expect(next.state.deletions, hasLength(1)); + next.state.deletions['a']!.isAfter(clock.ago(minutes: 2)); + await next.deletionsDone; + expect(deletions, {'a': 'z1'}); + }); + }); + }); +} + +class _CloudInstance implements CloudInstance { + @override + final DateTime created; + @override + final String instanceName; + @override + final InstanceState state; + @override + final String zone = 'z1'; + + _CloudInstance({ + required this.created, + required this.instanceName, + this.state = InstanceState.running, + }); +}