Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 81 additions & 2 deletions src/Oagents.Core/Abstractions/Event.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,91 @@
using System.Runtime.Serialization;
using CloudNative.CloudEvents;

namespace Microsoft.AI.Agents.Abstractions
{
[DataContract]
public class Event
{
private CloudEvent _cloudEvent;

public Event()
{
_cloudEvent = new CloudEvent();
Data = new Dictionary<string, string>();
}

public Event(CloudEvent cloudEvent)
{
_cloudEvent = cloudEvent ?? throw new ArgumentNullException(nameof(cloudEvent));
Data = ExtractDataFromCloudEvent(cloudEvent);
}

[DataMember]
public Dictionary<string, string> Data { get; set; }
public string Type { get; set; }
public string Subject { get; set; }

[DataMember]
public string Type
{
get => _cloudEvent.Type ?? string.Empty;
set => _cloudEvent.Type = value;
}

[DataMember]
public string Subject
{
get => _cloudEvent.Subject ?? string.Empty;
set => _cloudEvent.Subject = value;
}

public CloudEvent CloudEvent
{
get
{
// Sync the Data dictionary back to CloudEvent
if (Data.Count > 0)
{
_cloudEvent.Data = Data;
}
return _cloudEvent;
}
}

public static implicit operator CloudEvent(Event evt)
{
return evt.CloudEvent;
}

public static implicit operator Event(CloudEvent cloudEvent)
{
return new Event(cloudEvent);
}

private static Dictionary<string, string> ExtractDataFromCloudEvent(CloudEvent cloudEvent)
{
var data = new Dictionary<string, string>();

if (cloudEvent.Data != null)
{
if (cloudEvent.Data is Dictionary<string, string> dictData)
{
return dictData;
}
else if (cloudEvent.Data is Dictionary<string, object> objDict)
{
foreach (var kvp in objDict)
{
data[kvp.Key] = kvp.Value?.ToString() ?? string.Empty;
}
}
else
{
// For other data types, we'll need to handle them differently
// For now, just convert to string representation
data["data"] = cloudEvent.Data.ToString() ?? string.Empty;
}
}

return data;
}
}
}
6 changes: 6 additions & 0 deletions src/Oagents.Core/Abstractions/IAgent.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
using CloudNative.CloudEvents;

namespace Microsoft.AI.Agents.Abstractions;

public interface IAgent
{
Task HandleEvent(Event item);
Task PublishEvent(string ns, string id, Event item);

// CloudEvent overloads for direct CloudEvent support
Task HandleEvent(CloudEvent item);
Task PublishEvent(string ns, string id, CloudEvent item);
}
1 change: 1 addition & 0 deletions src/Oagents.Core/Oagents.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="CloudNative.CloudEvents" Version="2.8.0" />
<PackageReference Include="Microsoft.SemanticKernel" Version="1.10.0" />
</ItemGroup>

Expand Down
73 changes: 47 additions & 26 deletions src/Oagents.Dapr/Agent.cs
Original file line number Diff line number Diff line change
@@ -1,27 +1,48 @@
using Dapr.Actors.Runtime;
using Dapr.Client;
using Microsoft.AI.Agents.Abstractions;

namespace Microsoft.AI.Agents.Dapr;

public abstract class Agent : Actor, IAgent
{
private readonly DaprClient daprClient;

protected Agent(ActorHost host, DaprClient daprClient) : base(host)
{
this.daprClient = daprClient;
}
public abstract Task HandleEvent(Event item);

public async Task PublishEvent(string ns, string id, Event item)
{
var metadata = new Dictionary<string, string>() {
{ "cloudevent.Type", item.Type },
{ "cloudevent.Subject", item.Subject },
{ "cloudevent.id", Guid.NewGuid().ToString()}
};

await daprClient.PublishEventAsync(ns, id, item, metadata);
}
using Dapr.Actors.Runtime;
using Dapr.Client;
using Microsoft.AI.Agents.Abstractions;
using CloudNative.CloudEvents;

namespace Microsoft.AI.Agents.Dapr;

public abstract class Agent : Actor, IAgent
{
private readonly DaprClient daprClient;

protected Agent(ActorHost host, DaprClient daprClient) : base(host)
{
this.daprClient = daprClient;
}

public abstract Task HandleEvent(Event item);

// CloudEvent overload - default implementation converts to Event
public virtual Task HandleEvent(CloudEvent item)
{
return HandleEvent(new Event(item));
}

public async Task PublishEvent(string ns, string id, Event item)
{
var metadata = new Dictionary<string, string>() {
{ "cloudevent.Type", item.Type },
{ "cloudevent.Subject", item.Subject },
{ "cloudevent.id", Guid.NewGuid().ToString()}
};

await daprClient.PublishEventAsync(ns, id, item, metadata);
}

// CloudEvent overload - publishes CloudEvent directly with proper metadata
public async Task PublishEvent(string ns, string id, CloudEvent item)
{
var metadata = new Dictionary<string, string>() {
{ "cloudevent.Type", item.Type ?? string.Empty },
{ "cloudevent.Subject", item.Subject ?? string.Empty },
{ "cloudevent.id", item.Id ?? Guid.NewGuid().ToString()},
{ "cloudevent.Source", item.Source?.ToString() ?? string.Empty}
};

await daprClient.PublishEventAsync(ns, id, item, metadata);
}
}
1 change: 1 addition & 0 deletions src/Oagents.Dapr/Oagents.Dapr.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="CloudNative.CloudEvents" Version="2.8.0" />
<PackageReference Include="Dapr.Client" Version="1.13.0" />
<PackageReference Include="Dapr.Actors" Version="1.13.0" />
</ItemGroup>
Expand Down
75 changes: 44 additions & 31 deletions src/Oagents.Orleans/Agent.cs
Original file line number Diff line number Diff line change
@@ -1,32 +1,45 @@
using Microsoft.AI.Agents.Abstractions;
using Orleans.Runtime;
using Orleans.Streams;

namespace Microsoft.AI.Agents.Orleans;

public abstract class Agent : Grain, IGrainWithStringKey, IAgent
{
protected virtual string Namespace { get; set; }
public abstract Task HandleEvent(Event item);

private async Task HandleEvent(Event item, StreamSequenceToken? token)
{
await HandleEvent(item);
}

public async Task PublishEvent(string ns, string id, Event item)
{
var streamProvider = this.GetStreamProvider("StreamProvider");
var streamId = StreamId.Create(ns, id);
var stream = streamProvider.GetStream<Event>(streamId);
await stream.OnNextAsync(item);
}

public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider("StreamProvider");
var streamId = StreamId.Create(Namespace, this.GetPrimaryKeyString());
var stream = streamProvider.GetStream<Event>(streamId);
await stream.SubscribeAsync(HandleEvent);
}
using Microsoft.AI.Agents.Abstractions;
using Orleans.Runtime;
using Orleans.Streams;
using CloudNative.CloudEvents;

namespace Microsoft.AI.Agents.Orleans;

public abstract class Agent : Grain, IGrainWithStringKey, IAgent
{
protected virtual string Namespace { get; set; }
public abstract Task HandleEvent(Event item);

// CloudEvent overload - default implementation converts to Event
public virtual Task HandleEvent(CloudEvent item)
{
return HandleEvent(new Event(item));
}

private async Task HandleEvent(Event item, StreamSequenceToken? token)
{
await HandleEvent(item);
}

public async Task PublishEvent(string ns, string id, Event item)
{
var streamProvider = this.GetStreamProvider("StreamProvider");
var streamId = StreamId.Create(ns, id);
var stream = streamProvider.GetStream<Event>(streamId);
await stream.OnNextAsync(item);
}

// CloudEvent overload - converts to Event and publishes
public Task PublishEvent(string ns, string id, CloudEvent item)
{
return PublishEvent(ns, id, new Event(item));
}

public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider("StreamProvider");
var streamId = StreamId.Create(Namespace, this.GetPrimaryKeyString());
var stream = streamProvider.GetStream<Event>(streamId);
await stream.SubscribeAsync(HandleEvent);
}
}
1 change: 1 addition & 0 deletions src/Oagents.Orleans/Oagents.Orleans.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="CloudNative.CloudEvents" Version="2.8.0" />
<PackageReference Include="Microsoft.Orleans.Sdk" Version="8.1.0" />
<PackageReference Include="Microsoft.Orleans.Streaming" Version="8.1.0" />
</ItemGroup>
Expand Down