Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions app/lib/task/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import 'package:gcloud/storage.dart' show Bucket;
import 'package:googleapis/storage/v1.dart' show DetailedApiRequestError;
import 'package:indexed_blob/indexed_blob.dart' show BlobIndex, FileRange;
import 'package:logging/logging.dart' show Logger;
import 'package:meta/meta.dart';
import 'package:pana/models.dart' show Summary;
import 'package:pool/pool.dart' show Pool;
import 'package:pub_dev/package/api_export/api_exporter.dart';
Expand Down Expand Up @@ -84,6 +85,20 @@ void registerTaskBackend(TaskBackend backend) =>
/// The active task backend service.
TaskBackend get taskBackend => ss.lookup(#_taskBackend) as TaskBackend;

/// Describes an event that happened inside the task backend (e.g. scheduling).
class TaskEvent {
final DateTime timestamp;
final String kind;
final Map<String, dynamic> parameters;

TaskEvent(this.kind, this.parameters) : timestamp = clock.now();

@override
String toString() {
return '$timestamp $kind $parameters';
}
}

class TaskBackend {
final DatastoreDB _db;
final Bucket _bucket;
Expand All @@ -99,8 +114,14 @@ class TaskBackend {
/// `null` when not started yet.
Completer<void>? _stopped;

/// Event log for test verification.
final _events = StreamController<TaskEvent>.broadcast();

TaskBackend(this._db, this._bucket);

@visibleForTesting
Stream<TaskEvent> get events => _events.stream;

/// Start continuous background processes for scheduling of tasks.
///
/// Calling [start] without first calling [stop] is an error.
Expand Down Expand Up @@ -175,6 +196,7 @@ class TaskBackend {
taskWorkerCloudCompute,
_db,
abort: aborted,
eventSink: _events.sink,
);
}, abort: aborted);
} catch (e, st) {
Expand Down Expand Up @@ -218,6 +240,7 @@ class TaskBackend {
aborted.complete();
}
await _stopped!.future;
await _events.close();
_aborted = null;
_stopped = null;
}
Expand Down
10 changes: 10 additions & 0 deletions app/lib/task/scheduler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Future<void> schedule(
CloudCompute compute,
DatastoreDB db, {
required Completer<void> abort,
required Sink<TaskEvent> eventSink,
}) async {
/// Sleep [delay] time [since] timestamp, or now if not given.
Future<void> sleepOrAborted(Duration delay, {DateTime? since}) async {
Expand Down Expand Up @@ -102,6 +103,12 @@ Future<void> schedule(
final deletionStart = clock.now();
try {
await compute.delete(instance.zone, instance.instanceName);
eventSink.add(
TaskEvent('delete-instance', {
'name': instance.instanceName,
'zone': instance.zone,
}),
);
} catch (e, st) {
_log.severe('Failed to delete $instance', e, st);
} finally {
Expand Down Expand Up @@ -188,6 +195,9 @@ Future<void> schedule(
arguments: [json.encode(payload)],
description: description,
);
eventSink.add(
TaskEvent('create-instance', {'name': instanceName, 'zone': zone}),
);
rollbackPackageState = false;
} on ZoneExhaustedException catch (e, st) {
// A zone being exhausted is normal operations, we just use another
Expand Down
82 changes: 50 additions & 32 deletions app/test/task/task_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -186,45 +186,44 @@ void main() {
await taskBackend.backfillTrackingState();
await clockControl.elapse(minutes: 1);

final collector = _TaskEventCollector();
await taskBackend.start();

// We are going to let the task timeout, if this happens we should only
// try to scheduled it until we hit the [taskRetryLimit].
for (var i = 0; i < taskRetryLimit; i++) {
// Within 24 hours an instance should be created
await clockControl.elapseUntil(
() => cloud.listInstances().isNotEmpty,
timeout: Duration(days: 1),
);

// If nothing happens, then it should be killed within 24 hours.
// Actually, it'll happen much sooner, like ~2 hours, but we'll leave the
// test some wiggle room.
await clockControl.elapseUntil(
() => cloud.listInstances().isEmpty,
timeout: Duration(days: 1),
);
}
await clockControl.elapse(hours: 36);
expect(collector.createdCount, taskRetryLimit);
expect(collector.deletedCount, taskRetryLimit);
expect(
collector.events.map(
(e) => (e.timestamp.difference(collector.startTime).inHours, e.kind),
),
[
(0, 'create-instance'),
(2, 'delete-instance'),
(3, 'create-instance'),
(5, 'delete-instance'),
(15, 'create-instance'),
(17, 'delete-instance'),
],
);

// Once we've exceeded the [taskRetryLimit], we shouldn't see any instances
// created for the next day...
assert(taskRetriggerInterval > Duration(days: 1));
await expectLater(
clockControl.elapseUntil(
() => cloud.listInstances().isNotEmpty,
timeout: Duration(days: 1),
),
throwsA(isA<TimeoutException>()),
);
await clockControl.elapse(days: 1);
expect(collector.createdCount, taskRetryLimit);
expect(collector.deletedCount, taskRetryLimit);

// But the task should be retried after [taskRetriggerInterval], this is a
// long time, but for sanity we do re-analyze everything occasionally.
await clockControl.elapseUntil(
() => cloud.listInstances().isNotEmpty,
timeout: taskRetriggerInterval + Duration(days: 1),
);
await clockControl.elapseTime(taskRetriggerInterval);
await clockControl.elapse(days: 1);
expect(collector.createdCount, taskRetryLimit + 1);
expect(collector.deletedCount, taskRetryLimit + 1);

await taskBackend.stop();
await collector.close();

await clockControl.elapse(minutes: 10);
},
Expand Down Expand Up @@ -799,12 +798,6 @@ void main() {
);
}

extension<T> on Stream<T> {
Future<bool> get isNotEmpty async {
return !await this.isEmpty;
}
}

Future<void> upload(
http.Client client,
UploadInfo destination,
Expand Down Expand Up @@ -842,3 +835,28 @@ Future<void> upload(
// Unhandled response code -> retry
fail('Unhandled HTTP status = ${res.statusCode}, body: ${res.body}');
}

class _TaskEventCollector {
final DateTime startTime;
final List<TaskEvent> events;
late final StreamSubscription _subscription;

_TaskEventCollector._(this.startTime, this.events, this._subscription);

factory _TaskEventCollector() {
final events = <TaskEvent>[];
return _TaskEventCollector._(
clock.now(),
events,
taskBackend.events.listen(events.add),
);
}

int get createdCount =>
events.where((e) => e.kind == 'create-instance').length;

int get deletedCount =>
events.where((e) => e.kind == 'delete-instance').length;

Future<void> close() => _subscription.cancel();
}