Skip to content

Commit e0e8250

Browse files
committed
Refactor task loop: scanning for package updated timestamps and updating tracking state. (#9082)
1 parent 16ff011 commit e0e8250

File tree

6 files changed

+545
-89
lines changed

6 files changed

+545
-89
lines changed

app/lib/task/backend.dart

Lines changed: 19 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import 'package:pub_dev/task/clock_control.dart';
4343
import 'package:pub_dev/task/cloudcompute/cloudcompute.dart';
4444
import 'package:pub_dev/task/global_lock.dart';
4545
import 'package:pub_dev/task/handlers.dart';
46+
import 'package:pub_dev/task/loops/scan_packages_updated.dart';
4647
import 'package:pub_dev/task/models.dart'
4748
show
4849
AbortedTokenInfo,
@@ -308,52 +309,30 @@ class TaskBackend {
308309
}) async {
309310
abort ??= Completer<void>();
310311

311-
// Map from package to updated that has been seen.
312-
final seen = <String, DateTime>{};
313-
314-
// We will schedule longer overlaps every 6 hours.
315-
var nextLongScan = clock.fromNow(hours: 6);
316-
317-
// In theory 30 minutes overlap should be enough. In practice we should
318-
// allow an ample room for missed windows, and 3 days seems to be large enough.
319-
var since = clock.ago(days: 3);
320-
while (claim.valid && !abort.isCompleted) {
321-
final sinceParamNow = since;
322-
323-
if (clock.now().isAfter(nextLongScan)) {
324-
// Next time we'll do a longer scan
325-
since = clock.ago(days: 1);
326-
nextLongScan = clock.fromNow(hours: 6);
327-
} else {
328-
// Next time we'll only consider changes since now - 30 minutes
329-
since = clock.ago(minutes: 30);
330-
}
312+
var state = ScanPackagesUpdatedState.init();
313+
bool isAbortedFn() => !claim.valid || abort!.isCompleted;
314+
while (!isAbortedFn()) {
315+
final sinceParamNow = state.since;
331316

332-
// Look at all packages that has changed
333-
await for (final p in _db.packages.listUpdatedSince(sinceParamNow)) {
334-
// Abort, if claim is invalid or abort has been resolved!
335-
if (!claim.valid || abort.isCompleted) {
336-
return;
337-
}
317+
final next = await calculateScanPackagesUpdatedLoop(
318+
state,
319+
_db.packages.listUpdatedSince(sinceParamNow),
320+
isAbortedFn,
321+
);
338322

339-
// Check if the [updated] timestamp has been seen before.
340-
// If so, we skip checking it!
341-
final lastSeen = seen[p.name];
342-
if (lastSeen != null && lastSeen.toUtc() == p.updated.toUtc()) {
343-
continue;
344-
}
345-
// Remember the updated time for this package, so we don't check it
346-
// again...
347-
seen[p.name] = p.updated;
323+
state = next.state;
348324

325+
for (final p in next.packages) {
326+
if (isAbortedFn()) {
327+
return;
328+
}
349329
// Check the package
350-
await trackPackage(p.name, updateDependents: true);
330+
await trackPackage(p, updateDependents: true);
351331
}
352332

353-
// Cleanup the [seen] map for anything older than [since], as this won't
354-
// be relevant to the next iteration.
355-
seen.removeWhere((_, updated) => updated.isBefore(since));
356-
333+
if (isAbortedFn()) {
334+
return;
335+
}
357336
// Wait until aborted or 10 minutes before scanning again!
358337
await abort.future.timeoutWithClock(
359338
Duration(minutes: 10),
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
import 'dart:async';
6+
7+
import 'package:basics/basics.dart';
8+
import 'package:clock/clock.dart';
9+
import 'package:logging/logging.dart';
10+
import 'package:meta/meta.dart';
11+
import 'package:pub_dev/task/cloudcompute/cloudcompute.dart';
12+
13+
final _log = Logger('pub.task.scan_instances');
14+
15+
/// The internal state for scanning and deleting instances.
16+
final class DeleteInstancesState {
17+
// Maps the `CloudInstance.instanceName` to the deletion
18+
// start timestamp.
19+
final Map<String, DateTime> deletions;
20+
21+
DeleteInstancesState({required this.deletions});
22+
23+
factory DeleteInstancesState.init() => DeleteInstancesState(deletions: {});
24+
}
25+
26+
/// The result of the scan and delete instances operation.
27+
final class DeleteInstancesNextState {
28+
/// The next state of the data.
29+
final DeleteInstancesState state;
30+
31+
/// Completes when the microtask-scheduled deletion operations are completed.
32+
///
33+
/// It is not feasible to wait for this in production, but can be used in tests.
34+
@visibleForTesting
35+
final Future<void> deletionsDone;
36+
37+
DeleteInstancesNextState({required this.state, required this.deletionsDone});
38+
}
39+
40+
/// Calculates the next state of delete instances loop by processing
41+
/// the input [instances].
42+
Future<DeleteInstancesNextState> scanAndDeleteInstances(
43+
DeleteInstancesState state,
44+
List<CloudInstance> instances,
45+
Future<void> Function(String zone, String instanceName) deleteInstanceFn,
46+
bool Function() isAbortedFn, {
47+
required int maxTaskRunHours,
48+
}) async {
49+
final keepTreshold = clock.ago(minutes: 5);
50+
final deletionInProgress = {
51+
...state.deletions.whereValue((v) => v.isAfter(keepTreshold)),
52+
};
53+
54+
final futures = <Future>[];
55+
for (final instance in instances) {
56+
if (isAbortedFn()) {
57+
break;
58+
}
59+
60+
// Prevent multiple calls to delete the same instance.
61+
if (deletionInProgress.containsKey(instance.instanceName)) {
62+
continue;
63+
}
64+
65+
// If terminated or older than maxInstanceAge, delete the instance...
66+
final isTerminated = instance.state == InstanceState.terminated;
67+
final isTooOld = instance.created
68+
.add(Duration(hours: maxTaskRunHours))
69+
.isBefore(clock.now());
70+
71+
if (isTooOld) {
72+
// This indicates that something is wrong the with the instance,
73+
// ideally it should have detected its own deadline being violated
74+
// and terminated on its own. Of course, this can fail for arbitrary
75+
// reasons in a distributed system.
76+
_log.warning('terminating $instance for being too old!');
77+
} else if (isTerminated) {
78+
_log.info('deleting $instance as it has terminated.');
79+
} else {
80+
// Do not delete this instance
81+
continue;
82+
}
83+
84+
deletionInProgress[instance.instanceName] = clock.now();
85+
86+
final completer = Completer();
87+
scheduleMicrotask(() async {
88+
try {
89+
await deleteInstanceFn(instance.zone, instance.instanceName);
90+
} catch (e, st) {
91+
_log.severe('Failed to delete $instance', e, st);
92+
} finally {
93+
completer.complete();
94+
}
95+
});
96+
futures.add(completer.future);
97+
}
98+
99+
return DeleteInstancesNextState(
100+
state: DeleteInstancesState(deletions: deletionInProgress),
101+
deletionsDone: futures.isEmpty ? Future.value() : Future.wait(futures),
102+
);
103+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
import 'package:clock/clock.dart';
6+
7+
/// The internal state for deciding which package needs to be updated.
8+
class ScanPackagesUpdatedState {
9+
/// The last time the algorithm checked on a package.
10+
final Map<String, DateTime> seen;
11+
12+
/// The cycle's reference timestamp.
13+
final DateTime since;
14+
15+
/// Most scan cycle will process changes only from a short time period,
16+
/// however, periodically we want to process a longer overlap window.
17+
/// This timestamp indicates the future time when such longer scan should happen.
18+
final DateTime nextLongerOverlap;
19+
20+
ScanPackagesUpdatedState({
21+
required this.seen,
22+
required this.since,
23+
required this.nextLongerOverlap,
24+
});
25+
26+
factory ScanPackagesUpdatedState.init({
27+
Map<String, DateTime>? seen,
28+
}) => ScanPackagesUpdatedState(
29+
seen: seen ?? {},
30+
// In theory 30 minutes overlap should be enough. In practice we should
31+
// allow an ample room for missed windows, and 3 days seems to be large enough.
32+
since: clock.ago(days: 3),
33+
// We will schedule longer overlaps every 6 hours.
34+
nextLongerOverlap: clock.fromNow(hours: 6),
35+
);
36+
}
37+
38+
/// The result of the scan package operation.
39+
class ScanPackagesUpdatedNextState {
40+
/// The next state of the data.
41+
final ScanPackagesUpdatedState state;
42+
43+
/// The package to update.
44+
final List<String> packages;
45+
46+
ScanPackagesUpdatedNextState({required this.state, required this.packages});
47+
}
48+
49+
/// Calculates the next state of scan packages updated loop by
50+
/// processing the input [stream].
51+
Future<ScanPackagesUpdatedNextState> calculateScanPackagesUpdatedLoop(
52+
ScanPackagesUpdatedState state,
53+
Stream<({String name, DateTime updated})> stream,
54+
bool Function() isAbortedFn,
55+
) async {
56+
var since = state.since;
57+
var nextLongScan = state.nextLongerOverlap;
58+
if (clock.now().isAfter(state.nextLongerOverlap)) {
59+
// Next time we'll do a longer scan
60+
since = clock.ago(days: 1);
61+
nextLongScan = clock.fromNow(hours: 6);
62+
} else {
63+
// Next time we'll only consider changes since now - 30 minutes
64+
since = clock.ago(minutes: 30);
65+
}
66+
67+
final seen = {...state.seen};
68+
final packages = <String>[];
69+
70+
await for (final p in stream) {
71+
if (isAbortedFn()) {
72+
break;
73+
}
74+
// Check if the [updated] timestamp has been seen before.
75+
// If so, we skip checking it!
76+
final lastSeen = seen[p.name];
77+
if (lastSeen != null && lastSeen.toUtc() == p.updated.toUtc()) {
78+
continue;
79+
}
80+
// Remember the updated time for this package, so we don't check it
81+
// again...
82+
seen[p.name] = p.updated;
83+
// Needs to be updated.
84+
packages.add(p.name);
85+
}
86+
87+
// Cleanup the [seen] map for anything older than [since], as this won't
88+
// be relevant to the next iteration.
89+
seen.removeWhere((_, updated) => updated.isBefore(since));
90+
91+
return ScanPackagesUpdatedNextState(
92+
state: ScanPackagesUpdatedState(
93+
seen: seen,
94+
since: since,
95+
nextLongerOverlap: nextLongScan,
96+
),
97+
packages: packages,
98+
);
99+
}

app/lib/task/scheduler.dart

Lines changed: 21 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import 'package:pub_dev/task/backend.dart';
1414
import 'package:pub_dev/task/clock_control.dart';
1515
import 'package:pub_dev/task/cloudcompute/cloudcompute.dart';
1616
import 'package:pub_dev/task/global_lock.dart';
17+
import 'package:pub_dev/task/loops/delete_instances.dart';
1718
import 'package:pub_dev/task/models.dart';
1819

1920
final _log = Logger('pub.task.schedule');
@@ -60,64 +61,35 @@ Future<void> schedule(
6061
}
6162
}
6263

63-
// Set of `CloudInstance.instanceName`s currently being deleted.
64-
// This to avoid deleting instances where the deletion process is still
65-
// running.
66-
final deletionInProgress = <String>{};
64+
var deleteInstancesState = DeleteInstancesState.init();
6765

6866
// Create a fast RNG with random seed for picking zones.
6967
final rng = Random(Random.secure().nextInt(2 << 31));
7068

69+
bool isAbortedFn() => !claim.valid || abort.isCompleted;
70+
7171
// Run scheduling iterations, so long as we have a valid claim
72-
while (claim.valid && !abort.isCompleted) {
72+
while (!isAbortedFn()) {
7373
final iterationStart = clock.now();
7474
_log.info('Starting scheduling cycle');
7575

76-
// Count number of instances, and delete old instances
77-
var instances = 0;
78-
await for (final instance in compute.listInstances()) {
79-
instances += 1; // count the instance
80-
81-
// If terminated or older than maxInstanceAge, delete the instance...
82-
final isTerminated = instance.state == InstanceState.terminated;
83-
final isTooOld = instance.created
84-
.add(Duration(hours: activeConfiguration.maxTaskRunHours))
85-
.isBefore(clock.now());
86-
// Also check deletionInProgress to prevent multiple calls to delete the
87-
// same instance
88-
final isBeingDeleted = deletionInProgress.contains(instance.instanceName);
89-
if ((isTerminated || isTooOld) && !isBeingDeleted) {
90-
if (isTooOld) {
91-
// This indicates that something is wrong the with the instance,
92-
// ideally it should have detected its own deadline being violated
93-
// and terminated on its own. Of course, this can fail for arbitrary
94-
// reasons in a distributed system.
95-
_log.warning('terminating $instance for being too old!');
96-
} else if (isTerminated) {
97-
_log.info('deleting $instance as it has terminated.');
98-
}
99-
100-
deletionInProgress.add(instance.instanceName);
101-
scheduleMicrotask(() async {
102-
final deletionStart = clock.now();
103-
try {
104-
await compute.delete(instance.zone, instance.instanceName);
105-
} catch (e, st) {
106-
_log.severe('Failed to delete $instance', e, st);
107-
} finally {
108-
// Wait at-least 5 minutes from start of deletion until we remove
109-
// it from [deletionInProgress] that way we give the API some time
110-
// reconcile state.
111-
await sleepOrAborted(Duration(minutes: 5), since: deletionStart);
112-
deletionInProgress.remove(instance.instanceName);
113-
}
114-
});
115-
}
76+
final instances = await compute.listInstances().toList();
77+
final nextDeleteInstancesState = await scanAndDeleteInstances(
78+
deleteInstancesState,
79+
instances,
80+
compute.delete,
81+
isAbortedFn,
82+
maxTaskRunHours: activeConfiguration.maxTaskRunHours,
83+
);
84+
deleteInstancesState = nextDeleteInstancesState.state;
85+
if (isAbortedFn()) {
86+
break;
11687
}
88+
11789
_log.info('Found $instances instances');
11890

11991
// If we are not allowed to create new instances within the allowed quota,
120-
if (activeConfiguration.maxTaskInstances <= instances) {
92+
if (activeConfiguration.maxTaskInstances <= instances.length) {
12193
_log.info('Reached instance limit, trying again in 30s');
12294
// Wait 30 seconds then list instances again, so that we can count them
12395
await sleepOrAborted(Duration(seconds: 30), since: iterationStart);
@@ -145,7 +117,7 @@ Future<void> schedule(
145117
var pendingPackagesReviewed = 0;
146118
final selectLimit = min(
147119
_maxInstancesPerIteration,
148-
max(0, activeConfiguration.maxTaskInstances - instances),
120+
max(0, activeConfiguration.maxTaskInstances - instances.length),
149121
);
150122

151123
Future<void> scheduleInstance(({String package}) selected) async {
@@ -252,14 +224,14 @@ Future<void> schedule(
252224

253225
// If there was no pending packages reviewed, and no instances currently
254226
// running, then we can easily sleep 5 minutes before we poll again.
255-
if (instances == 0 && pendingPackagesReviewed == 0) {
227+
if (instances.isEmpty && pendingPackagesReviewed == 0) {
256228
await sleepOrAborted(Duration(minutes: 5));
257229
continue;
258230
}
259231

260232
// If more tasks is available and quota wasn't used up, we only sleep 10s
261233
if (pendingPackagesReviewed >= _maxInstancesPerIteration &&
262-
activeConfiguration.maxTaskInstances > instances) {
234+
activeConfiguration.maxTaskInstances > instances.length) {
263235
await sleepOrAborted(Duration(seconds: 10), since: iterationStart);
264236
continue;
265237
}

0 commit comments

Comments
 (0)