Skip to content

Commit 135b26b

Browse files
committed
Refactor task loop: scanning for package updated timestamps and updating tracking state. (#9082)
1 parent 8e31fcc commit 135b26b

File tree

3 files changed

+291
-49
lines changed

3 files changed

+291
-49
lines changed
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
import 'dart:async';
6+
7+
import 'package:basics/basics.dart';
8+
import 'package:clock/clock.dart';
9+
import 'package:logging/logging.dart';
10+
import 'package:meta/meta.dart';
11+
import 'package:pub_dev/task/cloudcompute/cloudcompute.dart';
12+
13+
final _log = Logger('pub.task.scan_instances');
14+
15+
/// The internal state for scanning and deleting instances.
16+
final class DeleteInstancesState {
17+
// Maps the `CloudInstance.instanceName` to the deletion
18+
// start timestamp.
19+
final Map<String, DateTime> deletions;
20+
21+
DeleteInstancesState({required this.deletions});
22+
23+
factory DeleteInstancesState.init() => DeleteInstancesState(deletions: {});
24+
}
25+
26+
/// The result of the scan and delete instances operation.
27+
final class DeleteInstancesNextState {
28+
/// The next state of the data.
29+
final DeleteInstancesState state;
30+
31+
/// Completes when the microtask-scheduled deletion operations are completed.
32+
///
33+
/// It is not feasible to wait for this in production, but can be used in tests.
34+
@visibleForTesting
35+
final Future<void> deletionsDone;
36+
37+
DeleteInstancesNextState({required this.state, required this.deletionsDone});
38+
}
39+
40+
/// Calculates the next state of delete instances loop by processing
41+
/// the input [instances].
42+
Future<DeleteInstancesNextState> scanAndDeleteInstances(
43+
DeleteInstancesState state,
44+
List<CloudInstance> instances,
45+
Future<void> Function(String zone, String instanceName) deleteInstanceFn,
46+
bool Function() isAbortedFn, {
47+
required int maxTaskRunHours,
48+
}) async {
49+
final keepTreshold = clock.ago(minutes: 5);
50+
final deletionInProgress = {
51+
...state.deletions.whereValue((v) => v.isAfter(keepTreshold)),
52+
};
53+
54+
final futures = <Future>[];
55+
for (final instance in instances) {
56+
if (isAbortedFn()) {
57+
break;
58+
}
59+
60+
// Prevent multiple calls to delete the same instance.
61+
if (deletionInProgress.containsKey(instance.instanceName)) {
62+
continue;
63+
}
64+
65+
// If terminated or older than maxInstanceAge, delete the instance...
66+
final isTerminated = instance.state == InstanceState.terminated;
67+
final isTooOld = instance.created
68+
.add(Duration(hours: maxTaskRunHours))
69+
.isBefore(clock.now());
70+
71+
if (isTooOld) {
72+
// This indicates that something is wrong the with the instance,
73+
// ideally it should have detected its own deadline being violated
74+
// and terminated on its own. Of course, this can fail for arbitrary
75+
// reasons in a distributed system.
76+
_log.warning('terminating $instance for being too old!');
77+
} else if (isTerminated) {
78+
_log.info('deleting $instance as it has terminated.');
79+
} else {
80+
// Do not delete this instance
81+
continue;
82+
}
83+
84+
deletionInProgress[instance.instanceName] = clock.now();
85+
86+
final completer = Completer();
87+
scheduleMicrotask(() async {
88+
try {
89+
await deleteInstanceFn(instance.zone, instance.instanceName);
90+
} catch (e, st) {
91+
_log.severe('Failed to delete $instance', e, st);
92+
} finally {
93+
completer.complete();
94+
}
95+
});
96+
futures.add(completer.future);
97+
}
98+
99+
return DeleteInstancesNextState(
100+
state: DeleteInstancesState(deletions: deletionInProgress),
101+
deletionsDone: futures.isEmpty ? Future.value() : Future.wait(futures),
102+
);
103+
}

app/lib/task/scheduler.dart

Lines changed: 21 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import 'package:pub_dev/task/backend.dart';
1414
import 'package:pub_dev/task/clock_control.dart';
1515
import 'package:pub_dev/task/cloudcompute/cloudcompute.dart';
1616
import 'package:pub_dev/task/global_lock.dart';
17+
import 'package:pub_dev/task/loops/delete_instances.dart';
1718
import 'package:pub_dev/task/models.dart';
1819

1920
final _log = Logger('pub.task.schedule');
@@ -60,64 +61,35 @@ Future<void> schedule(
6061
}
6162
}
6263

63-
// Set of `CloudInstance.instanceName`s currently being deleted.
64-
// This to avoid deleting instances where the deletion process is still
65-
// running.
66-
final deletionInProgress = <String>{};
64+
var deleteInstancesState = DeleteInstancesState.init();
6765

6866
// Create a fast RNG with random seed for picking zones.
6967
final rng = Random(Random.secure().nextInt(2 << 31));
7068

69+
bool isAbortedFn() => !claim.valid || abort.isCompleted;
70+
7171
// Run scheduling iterations, so long as we have a valid claim
72-
while (claim.valid && !abort.isCompleted) {
72+
while (!isAbortedFn()) {
7373
final iterationStart = clock.now();
7474
_log.info('Starting scheduling cycle');
7575

76-
// Count number of instances, and delete old instances
77-
var instances = 0;
78-
await for (final instance in compute.listInstances()) {
79-
instances += 1; // count the instance
80-
81-
// If terminated or older than maxInstanceAge, delete the instance...
82-
final isTerminated = instance.state == InstanceState.terminated;
83-
final isTooOld = instance.created
84-
.add(Duration(hours: activeConfiguration.maxTaskRunHours))
85-
.isBefore(clock.now());
86-
// Also check deletionInProgress to prevent multiple calls to delete the
87-
// same instance
88-
final isBeingDeleted = deletionInProgress.contains(instance.instanceName);
89-
if ((isTerminated || isTooOld) && !isBeingDeleted) {
90-
if (isTooOld) {
91-
// This indicates that something is wrong the with the instance,
92-
// ideally it should have detected its own deadline being violated
93-
// and terminated on its own. Of course, this can fail for arbitrary
94-
// reasons in a distributed system.
95-
_log.warning('terminating $instance for being too old!');
96-
} else if (isTerminated) {
97-
_log.info('deleting $instance as it has terminated.');
98-
}
99-
100-
deletionInProgress.add(instance.instanceName);
101-
scheduleMicrotask(() async {
102-
final deletionStart = clock.now();
103-
try {
104-
await compute.delete(instance.zone, instance.instanceName);
105-
} catch (e, st) {
106-
_log.severe('Failed to delete $instance', e, st);
107-
} finally {
108-
// Wait at-least 5 minutes from start of deletion until we remove
109-
// it from [deletionInProgress] that way we give the API some time
110-
// reconcile state.
111-
await sleepOrAborted(Duration(minutes: 5), since: deletionStart);
112-
deletionInProgress.remove(instance.instanceName);
113-
}
114-
});
115-
}
76+
final instances = await compute.listInstances().toList();
77+
final nextDeleteInstancesState = await scanAndDeleteInstances(
78+
deleteInstancesState,
79+
instances,
80+
compute.delete,
81+
isAbortedFn,
82+
maxTaskRunHours: activeConfiguration.maxTaskRunHours,
83+
);
84+
deleteInstancesState = nextDeleteInstancesState.state;
85+
if (isAbortedFn()) {
86+
break;
11687
}
88+
11789
_log.info('Found $instances instances');
11890

11991
// If we are not allowed to create new instances within the allowed quota,
120-
if (activeConfiguration.maxTaskInstances <= instances) {
92+
if (activeConfiguration.maxTaskInstances <= instances.length) {
12193
_log.info('Reached instance limit, trying again in 30s');
12294
// Wait 30 seconds then list instances again, so that we can count them
12395
await sleepOrAborted(Duration(seconds: 30), since: iterationStart);
@@ -145,7 +117,7 @@ Future<void> schedule(
145117
var pendingPackagesReviewed = 0;
146118
final selectLimit = min(
147119
_maxInstancesPerIteration,
148-
max(0, activeConfiguration.maxTaskInstances - instances),
120+
max(0, activeConfiguration.maxTaskInstances - instances.length),
149121
);
150122

151123
Future<void> scheduleInstance(({String package}) selected) async {
@@ -252,14 +224,14 @@ Future<void> schedule(
252224

253225
// If there was no pending packages reviewed, and no instances currently
254226
// running, then we can easily sleep 5 minutes before we poll again.
255-
if (instances == 0 && pendingPackagesReviewed == 0) {
227+
if (instances.isEmpty && pendingPackagesReviewed == 0) {
256228
await sleepOrAborted(Duration(minutes: 5));
257229
continue;
258230
}
259231

260232
// If more tasks is available and quota wasn't used up, we only sleep 10s
261233
if (pendingPackagesReviewed >= _maxInstancesPerIteration &&
262-
activeConfiguration.maxTaskInstances > instances) {
234+
activeConfiguration.maxTaskInstances > instances.length) {
263235
await sleepOrAborted(Duration(seconds: 10), since: iterationStart);
264236
continue;
265237
}
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
import 'package:clock/clock.dart';
6+
import 'package:pub_dev/task/cloudcompute/cloudcompute.dart';
7+
import 'package:pub_dev/task/loops/delete_instances.dart';
8+
import 'package:test/test.dart';
9+
10+
void main() {
11+
group('task scan: delete cloud instances', () {
12+
final referenceNow = clock.now();
13+
14+
test('fresh instance is not deleted', () async {
15+
await withClock(Clock.fixed(referenceNow), () async {
16+
final deletions = <String, String>{};
17+
final next = await scanAndDeleteInstances(
18+
DeleteInstancesState.init(),
19+
[
20+
_CloudInstance(
21+
instanceName: 'a',
22+
created: referenceNow.subtract(Duration(minutes: 18)),
23+
),
24+
],
25+
(zone, name) async {
26+
deletions[name] = zone;
27+
},
28+
() => false,
29+
maxTaskRunHours: 1,
30+
);
31+
expect(next.state.deletions, isEmpty);
32+
expect(deletions, {});
33+
});
34+
});
35+
36+
test('old instance is deleted', () async {
37+
await withClock(Clock.fixed(referenceNow), () async {
38+
final deletions = <String, String>{};
39+
final next = await scanAndDeleteInstances(
40+
DeleteInstancesState.init(),
41+
[
42+
_CloudInstance(
43+
instanceName: 'a',
44+
created: referenceNow.subtract(Duration(minutes: 78)),
45+
),
46+
],
47+
(zone, name) async {
48+
deletions[name] = zone;
49+
},
50+
() => false,
51+
maxTaskRunHours: 1,
52+
);
53+
expect(next.state.deletions, hasLength(1));
54+
expect(next.state.deletions.containsKey('a'), isTrue);
55+
56+
// Wait for the async deletion to complete
57+
await next.deletionsDone;
58+
expect(deletions, {'a': 'z1'});
59+
});
60+
});
61+
62+
test('terminated instance is deleted', () async {
63+
await withClock(Clock.fixed(referenceNow), () async {
64+
final deletions = <String, String>{};
65+
final next = await scanAndDeleteInstances(
66+
DeleteInstancesState.init(),
67+
[
68+
_CloudInstance(
69+
instanceName: 'a',
70+
created: referenceNow.subtract(Duration(minutes: 18)),
71+
state: InstanceState.terminated,
72+
),
73+
],
74+
(zone, name) async {
75+
deletions[name] = zone;
76+
},
77+
() => false,
78+
maxTaskRunHours: 1,
79+
);
80+
expect(next.state.deletions, hasLength(1));
81+
expect(next.state.deletions.containsKey('a'), isTrue);
82+
83+
// Wait for the async deletion to complete
84+
await next.deletionsDone;
85+
expect(deletions, {'a': 'z1'});
86+
});
87+
});
88+
89+
test('pending delete is kept within 5 minutes', () async {
90+
await withClock(Clock.fixed(referenceNow), () async {
91+
final deletions = <String, String>{};
92+
final next = await scanAndDeleteInstances(
93+
DeleteInstancesState(deletions: {'a': clock.ago(minutes: 3)}),
94+
[
95+
_CloudInstance(
96+
instanceName: 'a',
97+
created: referenceNow.subtract(Duration(minutes: 78)),
98+
),
99+
],
100+
(zone, name) async {
101+
deletions[name] = zone;
102+
},
103+
() => false,
104+
maxTaskRunHours: 1,
105+
);
106+
expect(next.state.deletions, hasLength(1));
107+
// Wait for the async deletion to complete
108+
await next.deletionsDone;
109+
expect(deletions, {});
110+
});
111+
});
112+
113+
test('pending delete is removed after 5 minutes', () async {
114+
await withClock(Clock.fixed(referenceNow), () async {
115+
final deletions = <String, String>{};
116+
final next = await scanAndDeleteInstances(
117+
DeleteInstancesState(deletions: {'a': clock.ago(minutes: 8)}),
118+
[_CloudInstance(created: clock.now(), instanceName: 'b')],
119+
(zone, name) async {
120+
deletions[name] = zone;
121+
},
122+
() => false,
123+
maxTaskRunHours: 1,
124+
);
125+
expect(next.state.deletions, isEmpty);
126+
await next.deletionsDone;
127+
expect(deletions, {});
128+
});
129+
});
130+
131+
test('pending delete is refreshed after 5 minutes', () async {
132+
await withClock(Clock.fixed(referenceNow), () async {
133+
final deletions = <String, String>{};
134+
final next = await scanAndDeleteInstances(
135+
DeleteInstancesState(deletions: {'a': clock.ago(minutes: 8)}),
136+
[_CloudInstance(created: clock.ago(minutes: 78), instanceName: 'a')],
137+
(zone, name) async {
138+
deletions[name] = zone;
139+
},
140+
() => false,
141+
maxTaskRunHours: 1,
142+
);
143+
expect(next.state.deletions, hasLength(1));
144+
next.state.deletions['a']!.isAfter(clock.ago(minutes: 2));
145+
await next.deletionsDone;
146+
expect(deletions, {'a': 'z1'});
147+
});
148+
});
149+
});
150+
}
151+
152+
class _CloudInstance implements CloudInstance {
153+
@override
154+
final DateTime created;
155+
@override
156+
final String instanceName;
157+
@override
158+
final InstanceState state;
159+
@override
160+
final String zone = 'z1';
161+
162+
_CloudInstance({
163+
required this.created,
164+
required this.instanceName,
165+
this.state = InstanceState.running,
166+
});
167+
}

0 commit comments

Comments
 (0)