Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions example/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,9 @@ build/
# Directory created by dartdoc
doc/api/

# Directory created by Flutter Version Manager
.fvm/

.vscode/
.idea/

9 changes: 8 additions & 1 deletion lib/src/compute_api/task.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import 'dart:isolate';
class Task {
final Function task;
final dynamic param;

final Capability capability;

Task({
Expand All @@ -13,6 +12,10 @@ class Task {
});
}

class WorkerCleanUpTask extends Task {
WorkerCleanUpTask() : super(task: (){}, capability: Capability());
}

class TaskResult {
final dynamic result;
final Capability capability;
Expand All @@ -22,3 +25,7 @@ class TaskResult {
required this.capability,
});
}

class WorkerCleanUpTaskResult extends TaskResult {
WorkerCleanUpTaskResult() : super(result: null, capability: Capability());
}
37 changes: 23 additions & 14 deletions lib/src/compute_api/worker.dart
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,16 @@ class Worker {

_sendPort = await _broadcastReceivePort.first as SendPort;

_broadcastPortSubscription = _broadcastReceivePort.listen((dynamic res) {
_broadcastPortSubscription = _broadcastReceivePort.listen((dynamic res) async {
status = WorkerStatus.idle;
if (res is RemoteExecutionError) {
onError(res, this);
return;
} else if(res is WorkerCleanUpTaskResult) {
await _broadcastPortSubscription.cancel();
_isolate.kill();
_receivePort.close();
return;
}
onResult(res as TaskResult, this);
});
Expand All @@ -71,9 +76,7 @@ class Worker {
}

Future<void> dispose() async {
await _broadcastPortSubscription.cancel();
_isolate.kill();
_receivePort.close();
_sendPort.send(WorkerCleanUpTask());
}
}

Expand All @@ -85,16 +88,22 @@ Future<void> isolateEntryPoint(IsolateInitParams params) async {

await for (final Task task in receivePort.cast<Task>()) {
try {
final shouldPassParam = task.param != null;

final dynamic computationResult =
shouldPassParam ? await task.task(task.param) : await task.task();

final result = TaskResult(
result: computationResult,
capability: task.capability,
);
sendPort.send(result);
if(task is WorkerCleanUpTask) {
receivePort.close();
final result = WorkerCleanUpTaskResult();
sendPort.send(result);
} else {
final shouldPassParam = task.param != null;

final dynamic computationResult =
shouldPassParam ? await task.task(task.param) : await task.task();

final result = TaskResult(
result: computationResult,
capability: task.capability,
);
sendPort.send(result);
}
} catch (error) {
sendPort.send(RemoteExecutionError(error.toString(), task.capability));
}
Expand Down