From 15797709850cd52ff6c3e9ad7fcb164da236b434 Mon Sep 17 00:00:00 2001 From: Istvan Soos Date: Mon, 1 Dec 2025 11:09:39 +0100 Subject: [PATCH] Task backend events exposed for testing. --- app/lib/task/backend.dart | 23 ++++++++++ app/lib/task/scheduler.dart | 10 +++++ app/test/task/task_test.dart | 82 ++++++++++++++++++++++-------------- 3 files changed, 83 insertions(+), 32 deletions(-) diff --git a/app/lib/task/backend.dart b/app/lib/task/backend.dart index 49cad66f5b..a2627e4a74 100644 --- a/app/lib/task/backend.dart +++ b/app/lib/task/backend.dart @@ -17,6 +17,7 @@ import 'package:gcloud/storage.dart' show Bucket; import 'package:googleapis/storage/v1.dart' show DetailedApiRequestError; import 'package:indexed_blob/indexed_blob.dart' show BlobIndex, FileRange; import 'package:logging/logging.dart' show Logger; +import 'package:meta/meta.dart'; import 'package:pana/models.dart' show Summary; import 'package:pool/pool.dart' show Pool; import 'package:pub_dev/package/api_export/api_exporter.dart'; @@ -84,6 +85,20 @@ void registerTaskBackend(TaskBackend backend) => /// The active task backend service. TaskBackend get taskBackend => ss.lookup(#_taskBackend) as TaskBackend; +/// Describes an event that happened inside the task backend (e.g. scheduling). +class TaskEvent { + final DateTime timestamp; + final String kind; + final Map parameters; + + TaskEvent(this.kind, this.parameters) : timestamp = clock.now(); + + @override + String toString() { + return '$timestamp $kind $parameters'; + } +} + class TaskBackend { final DatastoreDB _db; final Bucket _bucket; @@ -99,8 +114,14 @@ class TaskBackend { /// `null` when not started yet. Completer? _stopped; + /// Event log for test verification. + final _events = StreamController.broadcast(); + TaskBackend(this._db, this._bucket); + @visibleForTesting + Stream get events => _events.stream; + /// Start continuous background processes for scheduling of tasks. /// /// Calling [start] without first calling [stop] is an error. @@ -175,6 +196,7 @@ class TaskBackend { taskWorkerCloudCompute, _db, abort: aborted, + eventSink: _events.sink, ); }, abort: aborted); } catch (e, st) { @@ -218,6 +240,7 @@ class TaskBackend { aborted.complete(); } await _stopped!.future; + await _events.close(); _aborted = null; _stopped = null; } diff --git a/app/lib/task/scheduler.dart b/app/lib/task/scheduler.dart index e69e87ad7e..9359563b84 100644 --- a/app/lib/task/scheduler.dart +++ b/app/lib/task/scheduler.dart @@ -27,6 +27,7 @@ Future schedule( CloudCompute compute, DatastoreDB db, { required Completer abort, + required Sink eventSink, }) async { /// Sleep [delay] time [since] timestamp, or now if not given. Future sleepOrAborted(Duration delay, {DateTime? since}) async { @@ -102,6 +103,12 @@ Future schedule( final deletionStart = clock.now(); try { await compute.delete(instance.zone, instance.instanceName); + eventSink.add( + TaskEvent('delete-instance', { + 'name': instance.instanceName, + 'zone': instance.zone, + }), + ); } catch (e, st) { _log.severe('Failed to delete $instance', e, st); } finally { @@ -188,6 +195,9 @@ Future schedule( arguments: [json.encode(payload)], description: description, ); + eventSink.add( + TaskEvent('create-instance', {'name': instanceName, 'zone': zone}), + ); rollbackPackageState = false; } on ZoneExhaustedException catch (e, st) { // A zone being exhausted is normal operations, we just use another diff --git a/app/test/task/task_test.dart b/app/test/task/task_test.dart index 90e3469ee6..56b232e370 100644 --- a/app/test/task/task_test.dart +++ b/app/test/task/task_test.dart @@ -186,45 +186,44 @@ void main() { await taskBackend.backfillTrackingState(); await clockControl.elapse(minutes: 1); + final collector = _TaskEventCollector(); await taskBackend.start(); // We are going to let the task timeout, if this happens we should only // try to scheduled it until we hit the [taskRetryLimit]. - for (var i = 0; i < taskRetryLimit; i++) { - // Within 24 hours an instance should be created - await clockControl.elapseUntil( - () => cloud.listInstances().isNotEmpty, - timeout: Duration(days: 1), - ); - - // If nothing happens, then it should be killed within 24 hours. - // Actually, it'll happen much sooner, like ~2 hours, but we'll leave the - // test some wiggle room. - await clockControl.elapseUntil( - () => cloud.listInstances().isEmpty, - timeout: Duration(days: 1), - ); - } + await clockControl.elapse(hours: 36); + expect(collector.createdCount, taskRetryLimit); + expect(collector.deletedCount, taskRetryLimit); + expect( + collector.events.map( + (e) => (e.timestamp.difference(collector.startTime).inHours, e.kind), + ), + [ + (0, 'create-instance'), + (2, 'delete-instance'), + (3, 'create-instance'), + (5, 'delete-instance'), + (15, 'create-instance'), + (17, 'delete-instance'), + ], + ); // Once we've exceeded the [taskRetryLimit], we shouldn't see any instances // created for the next day... assert(taskRetriggerInterval > Duration(days: 1)); - await expectLater( - clockControl.elapseUntil( - () => cloud.listInstances().isNotEmpty, - timeout: Duration(days: 1), - ), - throwsA(isA()), - ); + await clockControl.elapse(days: 1); + expect(collector.createdCount, taskRetryLimit); + expect(collector.deletedCount, taskRetryLimit); // But the task should be retried after [taskRetriggerInterval], this is a // long time, but for sanity we do re-analyze everything occasionally. - await clockControl.elapseUntil( - () => cloud.listInstances().isNotEmpty, - timeout: taskRetriggerInterval + Duration(days: 1), - ); + await clockControl.elapseTime(taskRetriggerInterval); + await clockControl.elapse(days: 1); + expect(collector.createdCount, taskRetryLimit + 1); + expect(collector.deletedCount, taskRetryLimit + 1); await taskBackend.stop(); + await collector.close(); await clockControl.elapse(minutes: 10); }, @@ -799,12 +798,6 @@ void main() { ); } -extension on Stream { - Future get isNotEmpty async { - return !await this.isEmpty; - } -} - Future upload( http.Client client, UploadInfo destination, @@ -842,3 +835,28 @@ Future upload( // Unhandled response code -> retry fail('Unhandled HTTP status = ${res.statusCode}, body: ${res.body}'); } + +class _TaskEventCollector { + final DateTime startTime; + final List events; + late final StreamSubscription _subscription; + + _TaskEventCollector._(this.startTime, this.events, this._subscription); + + factory _TaskEventCollector() { + final events = []; + return _TaskEventCollector._( + clock.now(), + events, + taskBackend.events.listen(events.add), + ); + } + + int get createdCount => + events.where((e) => e.kind == 'create-instance').length; + + int get deletedCount => + events.where((e) => e.kind == 'delete-instance').length; + + Future close() => _subscription.cancel(); +}