Skip to content

Commit 50f8fb2

Browse files
committed
Review changes
1 parent 86c95a7 commit 50f8fb2

File tree

13 files changed

+81
-59
lines changed

13 files changed

+81
-59
lines changed

packages/a2a_dart/DESIGN.md

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -138,38 +138,26 @@ Example `A2AClient` usage:
138138
```dart
139139
final client = A2AClient(url: 'https://example.com/a2a', transport: SseTransport(url: 'https://example.com/a2a'));
140140
141-
// For a simple request-response
141+
// Create a task
142142
final task = await client.createTask(Message(
143143
messageId: '1',
144144
role: Role.user,
145145
parts: [const TextPart(text: 'Hello, agent!')],
146146
));
147147
148-
// For a streaming response
149-
final stream = client.messageStream(Message(
150-
role: 'user',
151-
parts: [TextPart(text: 'Tell me a story.')],
152-
));
153-
148+
// Execute the task and get a stream of events
149+
final stream = client.executeTask(task.id);
154150
await for (final event in stream) {
155151
// process events
156152
}
157-
158-
print(task.status.state);
159-
160-
// To execute the task and get a stream of messages:
161-
final stream = client.executeTask(task.id);
162-
await for (final message in stream) {
163-
// process messages
164-
}
165153
```
166154

167155
## 6. Server Framework
168156

169157
The server framework will provide the building blocks for creating A2A-compliant agents in Dart.
170158

171159
- **`A2AServer`**: A top-level class that listens for incoming HTTP requests and routes them to the appropriate `RequestHandler`.
172-
- **`RequestHandler`**: An interface for handling specific A2A methods. Developers will implement this interface to define their agent's behavior.
160+
- **`RequestHandler`**: An interface for handling specific A2A methods. Developers will implement this interface to define their agent's behavior. The `handle` method returns a `HandlerResult` which can be a `SingleResult` for a single response or a `StreamResult` for a streaming response.
173161
- **`TaskManager`**: A class responsible for managing the lifecycle of tasks.
174162

175163
## 7. Error Handling

packages/a2a_dart/GEMINI.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ The `A2AClient` provides a simple and convenient way to interact with an A2A ser
1010

1111
## Server
1212

13-
The `A2AServer` provides a flexible and extensible framework for building A2A agents. It is built on top of the `shelf` package and uses a request handler pipeline to process incoming requests. Each `RequestHandler` is responsible for a single RPC method, making it easy to add new functionality to the server.
13+
The `A2AServer` provides a flexible and extensible framework for building A2A agents. It is built on top of the `shelf` package and uses a request handler pipeline to process incoming requests. Each `RequestHandler` is responsible for a single RPC method, making it easy to add new functionality to the server. The `handle` method of a `RequestHandler` returns a `HandlerResult`, which can be a `SingleResult` for a single response or a `StreamResult` for a streaming response.
1414

1515
## Data Models
1616

packages/a2a_dart/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ Add the following to your `pubspec.yaml` file:
1515

1616
```yaml
1717
dependencies:
18-
a2a_dart: ^1.0.0
18+
a2a_dart: ^0.1.0
1919
```
2020
2121
## Usage
@@ -45,8 +45,8 @@ void main() async {
4545

4646
// Execute the task and stream the results.
4747
final stream = client.executeTask(task.id);
48-
await for (final message in stream) {
49-
print('Received message: ${message.messageId}');
48+
await for (final event in stream) {
49+
print('Received event: ${event.type}');
5050
}
5151
}
5252
```

packages/a2a_dart/lib/src/client/sse_transport.dart

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ class SseTransport extends HttpTransport {
6363
} catch (e) {
6464
throw A2AException.parsing(message: e.toString());
6565
}
66+
}
6667
} else if (line.startsWith('data:')) {
6768
data.add(line.substring(5).trim());
6869
} else if (line.startsWith(':')) {

packages/a2a_dart/lib/src/server/a2a_server.dart

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import 'package:shelf/shelf.dart';
1111
import 'package:shelf/shelf_io.dart' as io;
1212
import 'package:shelf_router/shelf_router.dart';
1313

14+
import 'handler_result.dart';
1415
import 'request_handler.dart';
1516

1617
/// A server for handling A2A RPC calls.
@@ -120,30 +121,29 @@ class A2AServer {
120121
if (handler != null) {
121122
final result = await handler.handle(params);
122123
_log.info('Returning successful response for method $method');
123-
if (result.containsKey('stream')) {
124-
final stream = result['stream'] as Stream<Map<String, dynamic>>;
125-
final responseStream = stream.map((event) {
126-
return utf8.encode(
127-
'data: ${jsonEncode({
128-
'jsonrpc': '2.0',
129-
'result': event,
130-
'id': id
131-
})}\n\n',
132-
);
133-
});
134-
return Response.ok(
135-
responseStream,
136-
headers: {
137-
'Content-Type': 'text/event-stream',
138-
'Cache-Control': 'no-cache',
139-
'Connection': 'keep-alive',
140-
},
141-
);
142-
}
143-
return Response.ok(
144-
jsonEncode({'jsonrpc': '2.0', 'result': result, 'id': id}),
145-
headers: {'Content-Type': 'application/json'},
146-
);
124+
125+
return switch (result) {
126+
SingleResult(data: final data) => Response.ok(
127+
jsonEncode({'jsonrpc': '2.0', 'result': data, 'id': id}),
128+
headers: {'Content-Type': 'application/json'},
129+
),
130+
StreamResult(stream: final stream) => Response.ok(
131+
stream.map((event) {
132+
return utf8.encode(
133+
'data: ${jsonEncode({
134+
'jsonrpc': '2.0',
135+
'result': event,
136+
'id': id
137+
})}\n\n',
138+
);
139+
}),
140+
headers: {
141+
'Content-Type': 'text/event-stream',
142+
'Cache-Control': 'no-cache',
143+
'Connection': 'keep-alive',
144+
},
145+
),
146+
};
147147
} else {
148148
return Response.notFound(
149149
jsonEncode({

packages/a2a_dart/lib/src/server/create_task_handler.dart

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import 'dart:async';
66

77
import '../core/message.dart';
8+
import 'handler_result.dart';
89
import 'request_handler.dart';
910
import 'task_manager.dart';
1011

@@ -25,14 +26,11 @@ class CreateTaskHandler implements RequestHandler {
2526
String get method => 'create_task';
2627

2728
@override
28-
FutureOr<Map<String, dynamic>> handle(Map<String, dynamic> params) {
29-
// The 'message' parameter is part of the A2A specification for 'create_task',
30-
// but it is not yet used in this implementation.
31-
// ignore: unused_local_variable
29+
FutureOr<HandlerResult> handle(Map<String, dynamic> params) {
3230
final message = Message.fromJson(params['message'] as Map<String, dynamic>);
3331

34-
final task = _taskManager.createTask();
32+
final task = _taskManager.createTask(message);
3533

36-
return task.toJson();
34+
return SingleResult(task.toJson());
3735
}
3836
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright 2025 The Flutter Authors.
2+
// Use of this source code is governed by a BSD-style license that can be
3+
// found in the LICENSE file.
4+
5+
import 'dart:async';
6+
7+
/// A sealed class representing the result of a [RequestHandler].
8+
sealed class HandlerResult {}
9+
10+
/// A [HandlerResult] that represents a single, non-streaming response.
11+
class SingleResult extends HandlerResult {
12+
/// The data to be returned in the response.
13+
final Map<String, dynamic> data;
14+
15+
/// Creates a [SingleResult] with the given [data].
16+
SingleResult(this.data);
17+
}
18+
19+
/// A [HandlerResult] that represents a streaming response.
20+
class StreamResult extends HandlerResult {
21+
/// The stream of data to be returned in the response.
22+
final Stream<Map<String, dynamic>> stream;
23+
24+
/// Creates a [StreamResult] with the given [stream].
25+
StreamResult(this.stream);
26+
}

packages/a2a_dart/lib/src/server/request_handler.dart

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
import 'dart:async';
66

7-
/// A handler for a specific A2A RPC method.
7+
import 'handler_result.dart';
8+
9+
/// Defines the interface for a handler of a specific A2A RPC method.
810
///
911
/// Implement this class to create a handler for a specific RPC method. The
1012
/// [A2AServer] will delegate requests to the appropriate handler based on the
@@ -18,5 +20,5 @@ abstract class RequestHandler {
1820
/// The [params] are the parameters of the RPC call. This method should return
1921
/// a [FutureOr] of a [Map] that will be sent as the `result` of the JSON-RPC
2022
/// 2.0 response.
21-
FutureOr<Map<String, dynamic>> handle(Map<String, dynamic> params);
23+
FutureOr<HandlerResult> handle(Map<String, dynamic> params);
2224
}

packages/a2a_dart/lib/src/server/task_manager.dart

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Use of this source code is governed by a BSD-style license that can be
33
// found in the LICENSE file.
44

5+
import 'package:a2a_dart/src/core/message.dart';
56
import 'package:a2a_dart/src/core/task.dart';
67
import 'package:uuid/uuid.dart';
78

@@ -16,12 +17,13 @@ class TaskManager {
1617
/// Creates a new [Task] with a unique ID and `submitted` status.
1718
///
1819
/// The new task is stored in the task manager and then returned.
19-
Task createTask() {
20+
Task createTask([Message? message]) {
2021
final taskId = _uuid.v4();
2122
final contextId = _uuid.v4();
2223
final task = Task(
2324
id: taskId,
2425
contextId: contextId,
26+
history: [if (message != null) message],
2527
status: const TaskStatus(state: TaskState.submitted),
2628
);
2729
_tasks[taskId] = task;

packages/a2a_dart/test/integration/client_server_test.dart

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import 'package:a2a_dart/src/core/part.dart';
1212
import 'package:a2a_dart/src/core/task.dart';
1313
import 'package:a2a_dart/src/server/a2a_server.dart';
1414
import 'package:a2a_dart/src/server/create_task_handler.dart';
15+
import 'package:a2a_dart/src/server/handler_result.dart';
1516
import 'package:a2a_dart/src/server/request_handler.dart';
1617
import 'package:a2a_dart/src/server/task_manager.dart';
1718
import 'package:logging/logging.dart';
@@ -26,7 +27,7 @@ class MockExecuteTaskHandler implements RequestHandler {
2627
String get method => 'execute_task';
2728

2829
@override
29-
FutureOr<Map<String, dynamic>> handle(Map<String, dynamic> params) {
30+
FutureOr<HandlerResult> handle(Map<String, dynamic> params) {
3031
final streamController = StreamController<Map<String, dynamic>>();
3132
final taskId = params['task_id'] as String;
3233
final task = taskManager.getTask(taskId)!;
@@ -74,7 +75,7 @@ class MockExecuteTaskHandler implements RequestHandler {
7475
);
7576
streamController.close();
7677

77-
return {'stream': streamController.stream};
78+
return StreamResult(streamController.stream);
7879
}
7980
}
8081

0 commit comments

Comments
 (0)