From 000a9894c699fe30f007d60d46b0fd3b34b7a784 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 10 Jun 2025 09:53:15 +0000 Subject: [PATCH 1/2] Initial plan for issue From 699105831f9806fab000fd550536a0c637bd36b2 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 10 Jun 2025 10:02:51 +0000 Subject: [PATCH 2/2] Implement CloudEvents integration in abstraction library Co-authored-by: kostapetan <10728102+kostapetan@users.noreply.github.com> --- src/Oagents.Core/Abstractions/Event.cs | 83 +++++++++++++++++++++- src/Oagents.Core/Abstractions/IAgent.cs | 6 ++ src/Oagents.Core/Oagents.Core.csproj | 1 + src/Oagents.Dapr/Agent.cs | 73 ++++++++++++------- src/Oagents.Dapr/Oagents.Dapr.csproj | 1 + src/Oagents.Orleans/Agent.cs | 75 +++++++++++-------- src/Oagents.Orleans/Oagents.Orleans.csproj | 1 + 7 files changed, 181 insertions(+), 59 deletions(-) diff --git a/src/Oagents.Core/Abstractions/Event.cs b/src/Oagents.Core/Abstractions/Event.cs index 95b64ba2..84b8de17 100644 --- a/src/Oagents.Core/Abstractions/Event.cs +++ b/src/Oagents.Core/Abstractions/Event.cs @@ -1,12 +1,91 @@ using System.Runtime.Serialization; +using CloudNative.CloudEvents; namespace Microsoft.AI.Agents.Abstractions { [DataContract] public class Event { + private CloudEvent _cloudEvent; + + public Event() + { + _cloudEvent = new CloudEvent(); + Data = new Dictionary(); + } + + public Event(CloudEvent cloudEvent) + { + _cloudEvent = cloudEvent ?? throw new ArgumentNullException(nameof(cloudEvent)); + Data = ExtractDataFromCloudEvent(cloudEvent); + } + + [DataMember] public Dictionary Data { get; set; } - public string Type { get; set; } - public string Subject { get; set; } + + [DataMember] + public string Type + { + get => _cloudEvent.Type ?? string.Empty; + set => _cloudEvent.Type = value; + } + + [DataMember] + public string Subject + { + get => _cloudEvent.Subject ?? string.Empty; + set => _cloudEvent.Subject = value; + } + + public CloudEvent CloudEvent + { + get + { + // Sync the Data dictionary back to CloudEvent + if (Data.Count > 0) + { + _cloudEvent.Data = Data; + } + return _cloudEvent; + } + } + + public static implicit operator CloudEvent(Event evt) + { + return evt.CloudEvent; + } + + public static implicit operator Event(CloudEvent cloudEvent) + { + return new Event(cloudEvent); + } + + private static Dictionary ExtractDataFromCloudEvent(CloudEvent cloudEvent) + { + var data = new Dictionary(); + + if (cloudEvent.Data != null) + { + if (cloudEvent.Data is Dictionary dictData) + { + return dictData; + } + else if (cloudEvent.Data is Dictionary objDict) + { + foreach (var kvp in objDict) + { + data[kvp.Key] = kvp.Value?.ToString() ?? string.Empty; + } + } + else + { + // For other data types, we'll need to handle them differently + // For now, just convert to string representation + data["data"] = cloudEvent.Data.ToString() ?? string.Empty; + } + } + + return data; + } } } \ No newline at end of file diff --git a/src/Oagents.Core/Abstractions/IAgent.cs b/src/Oagents.Core/Abstractions/IAgent.cs index d05c4c37..8e340c4e 100644 --- a/src/Oagents.Core/Abstractions/IAgent.cs +++ b/src/Oagents.Core/Abstractions/IAgent.cs @@ -1,7 +1,13 @@ +using CloudNative.CloudEvents; + namespace Microsoft.AI.Agents.Abstractions; public interface IAgent { Task HandleEvent(Event item); Task PublishEvent(string ns, string id, Event item); + + // CloudEvent overloads for direct CloudEvent support + Task HandleEvent(CloudEvent item); + Task PublishEvent(string ns, string id, CloudEvent item); } \ No newline at end of file diff --git a/src/Oagents.Core/Oagents.Core.csproj b/src/Oagents.Core/Oagents.Core.csproj index 1ce891cf..cdbe585b 100644 --- a/src/Oagents.Core/Oagents.Core.csproj +++ b/src/Oagents.Core/Oagents.Core.csproj @@ -16,6 +16,7 @@ + diff --git a/src/Oagents.Dapr/Agent.cs b/src/Oagents.Dapr/Agent.cs index 104bd0ed..8a5d3340 100644 --- a/src/Oagents.Dapr/Agent.cs +++ b/src/Oagents.Dapr/Agent.cs @@ -1,27 +1,48 @@ -using Dapr.Actors.Runtime; -using Dapr.Client; -using Microsoft.AI.Agents.Abstractions; - -namespace Microsoft.AI.Agents.Dapr; - -public abstract class Agent : Actor, IAgent -{ - private readonly DaprClient daprClient; - - protected Agent(ActorHost host, DaprClient daprClient) : base(host) - { - this.daprClient = daprClient; - } - public abstract Task HandleEvent(Event item); - - public async Task PublishEvent(string ns, string id, Event item) - { - var metadata = new Dictionary() { - { "cloudevent.Type", item.Type }, - { "cloudevent.Subject", item.Subject }, - { "cloudevent.id", Guid.NewGuid().ToString()} - }; - - await daprClient.PublishEventAsync(ns, id, item, metadata); - } +using Dapr.Actors.Runtime; +using Dapr.Client; +using Microsoft.AI.Agents.Abstractions; +using CloudNative.CloudEvents; + +namespace Microsoft.AI.Agents.Dapr; + +public abstract class Agent : Actor, IAgent +{ + private readonly DaprClient daprClient; + + protected Agent(ActorHost host, DaprClient daprClient) : base(host) + { + this.daprClient = daprClient; + } + + public abstract Task HandleEvent(Event item); + + // CloudEvent overload - default implementation converts to Event + public virtual Task HandleEvent(CloudEvent item) + { + return HandleEvent(new Event(item)); + } + + public async Task PublishEvent(string ns, string id, Event item) + { + var metadata = new Dictionary() { + { "cloudevent.Type", item.Type }, + { "cloudevent.Subject", item.Subject }, + { "cloudevent.id", Guid.NewGuid().ToString()} + }; + + await daprClient.PublishEventAsync(ns, id, item, metadata); + } + + // CloudEvent overload - publishes CloudEvent directly with proper metadata + public async Task PublishEvent(string ns, string id, CloudEvent item) + { + var metadata = new Dictionary() { + { "cloudevent.Type", item.Type ?? string.Empty }, + { "cloudevent.Subject", item.Subject ?? string.Empty }, + { "cloudevent.id", item.Id ?? Guid.NewGuid().ToString()}, + { "cloudevent.Source", item.Source?.ToString() ?? string.Empty} + }; + + await daprClient.PublishEventAsync(ns, id, item, metadata); + } } diff --git a/src/Oagents.Dapr/Oagents.Dapr.csproj b/src/Oagents.Dapr/Oagents.Dapr.csproj index 9a7d0965..9e69e40e 100644 --- a/src/Oagents.Dapr/Oagents.Dapr.csproj +++ b/src/Oagents.Dapr/Oagents.Dapr.csproj @@ -12,6 +12,7 @@ + diff --git a/src/Oagents.Orleans/Agent.cs b/src/Oagents.Orleans/Agent.cs index d65ae7c5..8f23afbb 100644 --- a/src/Oagents.Orleans/Agent.cs +++ b/src/Oagents.Orleans/Agent.cs @@ -1,32 +1,45 @@ -using Microsoft.AI.Agents.Abstractions; -using Orleans.Runtime; -using Orleans.Streams; - -namespace Microsoft.AI.Agents.Orleans; - -public abstract class Agent : Grain, IGrainWithStringKey, IAgent -{ - protected virtual string Namespace { get; set; } - public abstract Task HandleEvent(Event item); - - private async Task HandleEvent(Event item, StreamSequenceToken? token) - { - await HandleEvent(item); - } - - public async Task PublishEvent(string ns, string id, Event item) - { - var streamProvider = this.GetStreamProvider("StreamProvider"); - var streamId = StreamId.Create(ns, id); - var stream = streamProvider.GetStream(streamId); - await stream.OnNextAsync(item); - } - - public async override Task OnActivateAsync(CancellationToken cancellationToken) - { - var streamProvider = this.GetStreamProvider("StreamProvider"); - var streamId = StreamId.Create(Namespace, this.GetPrimaryKeyString()); - var stream = streamProvider.GetStream(streamId); - await stream.SubscribeAsync(HandleEvent); - } +using Microsoft.AI.Agents.Abstractions; +using Orleans.Runtime; +using Orleans.Streams; +using CloudNative.CloudEvents; + +namespace Microsoft.AI.Agents.Orleans; + +public abstract class Agent : Grain, IGrainWithStringKey, IAgent +{ + protected virtual string Namespace { get; set; } + public abstract Task HandleEvent(Event item); + + // CloudEvent overload - default implementation converts to Event + public virtual Task HandleEvent(CloudEvent item) + { + return HandleEvent(new Event(item)); + } + + private async Task HandleEvent(Event item, StreamSequenceToken? token) + { + await HandleEvent(item); + } + + public async Task PublishEvent(string ns, string id, Event item) + { + var streamProvider = this.GetStreamProvider("StreamProvider"); + var streamId = StreamId.Create(ns, id); + var stream = streamProvider.GetStream(streamId); + await stream.OnNextAsync(item); + } + + // CloudEvent overload - converts to Event and publishes + public Task PublishEvent(string ns, string id, CloudEvent item) + { + return PublishEvent(ns, id, new Event(item)); + } + + public async override Task OnActivateAsync(CancellationToken cancellationToken) + { + var streamProvider = this.GetStreamProvider("StreamProvider"); + var streamId = StreamId.Create(Namespace, this.GetPrimaryKeyString()); + var stream = streamProvider.GetStream(streamId); + await stream.SubscribeAsync(HandleEvent); + } } diff --git a/src/Oagents.Orleans/Oagents.Orleans.csproj b/src/Oagents.Orleans/Oagents.Orleans.csproj index a7fde5e0..235d7d7c 100644 --- a/src/Oagents.Orleans/Oagents.Orleans.csproj +++ b/src/Oagents.Orleans/Oagents.Orleans.csproj @@ -12,6 +12,7 @@ +