Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Papst.EventStore.Contracts.slnx
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
<Folder Name="/src/">
<Project Path="src/Papst.EventStore.Aggregation.EventRegistration/Papst.EventStore.Aggregation.EventRegistration.csproj" />
<Project Path="src/Papst.EventStore.CodeGeneration/Papst.EventStore.CodeGeneration.csproj" />
<Project Path="src/Papst.EventStore.OpenTelemetry/Papst.EventStore.OpenTelemetry.csproj" />
<Project Path="src/Papst.EventStore/Papst.EventStore.csproj" />
</Folder>
<Folder Name="/Tests/">
<Project Path="tests/Papst.EventStore.CodeGeneration.Tests/Papst.EventStore.CodeGeneration.Tests.csproj" />
<Project Path="tests/Papst.EventStore.OpenTelemetry.Tests/Papst.EventStore.OpenTelemetry.Tests.csproj" />
<Project Path="tests/Papst.EventStore.Tests/Papst.EventStore.Tests.csproj" />
</Folder>
</Solution>
2 changes: 2 additions & 0 deletions Papst.EventStore.slnx
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@
<Project Path="src/Papst.EventStore.FileSystem/Papst.EventStore.FileSystem.csproj" />
<Project Path="src/Papst.EventStore.InMemory/Papst.EventStore.InMemory.csproj" />
<Project Path="src/Papst.EventStore.MongoDB/Papst.EventStore.MongoDB.csproj" />
<Project Path="src/Papst.EventStore.OpenTelemetry/Papst.EventStore.OpenTelemetry.csproj" />
<Project Path="src/Papst.EventStore/Papst.EventStore.csproj" />
</Folder>
<Folder Name="/Tests/">
<Project Path="tests/Papst.EventsStore.InMemory.Tests/Papst.EventsStore.InMemory.Tests.csproj" />
<Project Path="tests/Papst.EventStore.AzureCosmos.Tests/Papst.EventStore.AzureCosmos.Tests.csproj" />
<Project Path="tests/Papst.EventStore.CodeGeneration.Tests/Papst.EventStore.CodeGeneration.Tests.csproj" />
<Project Path="tests/Papst.EventStore.MongoDB.Tests/Papst.EventStore.MongoDB.Tests.csproj" />
<Project Path="tests/Papst.EventStore.OpenTelemetry.Tests/Papst.EventStore.OpenTelemetry.Tests.csproj" />
<Project Path="tests/Papst.EventStore.Tests/Papst.EventStore.Tests.csproj" />
</Folder>
</Solution>
20 changes: 20 additions & 0 deletions src/Papst.EventStore.OpenTelemetry/EventStoreActivitySource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System.Diagnostics;
using System.Reflection;

namespace Papst.EventStore.OpenTelemetry;

/// <summary>
/// Provides the <see cref="ActivitySource"/> for Papst.EventStore OpenTelemetry instrumentation.
/// </summary>
public static class EventStoreActivitySource
{
/// <summary>
/// The name of the ActivitySource used for all EventStore activities.
/// Use this name when configuring OpenTelemetry to listen to EventStore traces.
/// </summary>
public const string SourceName = "Papst.EventStore";

internal static readonly ActivitySource Source = new(
SourceName,
Assembly.GetExecutingAssembly().GetName().Version?.ToString() ?? "0.0.0");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using System;
using System.Linq;
using Microsoft.Extensions.DependencyInjection;

namespace Papst.EventStore.OpenTelemetry;

/// <summary>
/// Extension methods for adding OpenTelemetry instrumentation to Papst.EventStore.
/// </summary>
public static class EventStoreOpenTelemetryExtensions
{
/// <summary>
/// Wraps the registered <see cref="IEventStore"/> with an instrumented decorator that produces
/// OpenTelemetry-compatible <see cref="System.Diagnostics.Activity"/> instances for all EventStore operations.
/// <para>
/// Call this method after registering the EventStore implementation (e.g. after <c>AddInMemoryEventStore()</c>).
/// To collect these traces configure your OpenTelemetry SDK to listen to the
/// <see cref="EventStoreActivitySource.SourceName"/> activity source.
/// </para>
/// </summary>
/// <param name="services">The service collection.</param>
/// <returns>The same <see cref="IServiceCollection"/> for chaining.</returns>
/// <exception cref="InvalidOperationException">
/// Thrown when no <see cref="IEventStore"/> has been registered before calling this method.
/// </exception>
public static IServiceCollection AddEventStoreInstrumentation(this IServiceCollection services)
{
ServiceDescriptor? descriptor = services.LastOrDefault(d => d.ServiceType == typeof(IEventStore));
if (descriptor == null)
{
throw new InvalidOperationException(
$"No {nameof(IEventStore)} registration found. Register an EventStore implementation before calling {nameof(AddEventStoreInstrumentation)}.");
}

services.Remove(descriptor);

ServiceDescriptor decorated = ServiceDescriptor.Describe(
typeof(IEventStore),
sp =>
{
IEventStore inner = (IEventStore)(descriptor.ImplementationInstance
?? descriptor.ImplementationFactory?.Invoke(sp)
?? ActivatorUtilities.CreateInstance(sp, descriptor.ImplementationType!));
return new InstrumentedEventStore(inner);
},
descriptor.Lifetime);

services.Add(decorated);
return services;
}
}
86 changes: 86 additions & 0 deletions src/Papst.EventStore.OpenTelemetry/InstrumentedEventStore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace Papst.EventStore.OpenTelemetry;

/// <summary>
/// A decorator for <see cref="IEventStore"/> that instruments all operations with OpenTelemetry activities.
/// </summary>
internal sealed class InstrumentedEventStore : IEventStore
{
private readonly IEventStore _inner;

public InstrumentedEventStore(IEventStore inner)
{
_inner = inner;
}

/// <inheritdoc />
public async Task<IEventStream> GetAsync(Guid streamId, CancellationToken cancellationToken = default)
{
using Activity? activity = EventStoreActivitySource.Source.StartActivity("EventStore.Get");
activity?.SetTag("event_store.stream_id", streamId.ToString());

try
{
IEventStream stream = await _inner.GetAsync(streamId, cancellationToken).ConfigureAwait(false);
activity?.SetTag("event_store.stream_version", stream.Version.ToString());
return new InstrumentedEventStream(stream);
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
throw;
}
}

/// <inheritdoc />
public async Task<IEventStream> CreateAsync(Guid streamId, string targetTypeName,
CancellationToken cancellationToken = default)
{
using Activity? activity = EventStoreActivitySource.Source.StartActivity("EventStore.Create");
activity?.SetTag("event_store.stream_id", streamId.ToString());
activity?.SetTag("event_store.target_type", targetTypeName);

try
{
IEventStream stream = await _inner.CreateAsync(streamId, targetTypeName, cancellationToken)
.ConfigureAwait(false);
return new InstrumentedEventStream(stream);
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
throw;
}
}

/// <inheritdoc />
public async Task<IEventStream> CreateAsync(Guid streamId, string targetTypeName, string? tenantId,
string? userId, string? username, string? comment,
Dictionary<string, string>? additionalMetaData, CancellationToken cancellationToken = default)
{
using Activity? activity = EventStoreActivitySource.Source.StartActivity("EventStore.Create");
activity?.SetTag("event_store.stream_id", streamId.ToString());
activity?.SetTag("event_store.target_type", targetTypeName);
if (tenantId != null)
{
activity?.SetTag("event_store.tenant_id", tenantId);
}

try
{
IEventStream stream = await _inner.CreateAsync(streamId, targetTypeName, tenantId, userId, username,
comment, additionalMetaData, cancellationToken).ConfigureAwait(false);
return new InstrumentedEventStream(stream);
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
throw;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Papst.EventStore.Documents;

namespace Papst.EventStore.OpenTelemetry;

/// <summary>
/// A decorator for <see cref="IEventStoreTransactionAppender"/> that instruments all operations with OpenTelemetry activities.
/// </summary>
internal sealed class InstrumentedEventStoreTransactionAppender : IEventStoreTransactionAppender
{
private readonly IEventStoreTransactionAppender _inner;
private readonly Guid _streamId;

public InstrumentedEventStoreTransactionAppender(IEventStoreTransactionAppender inner, Guid streamId)
{
_inner = inner;
_streamId = streamId;
}

/// <inheritdoc />
public IEventStoreTransactionAppender Add<TEvent>(Guid id, TEvent evt, EventStreamMetaData? metaData = null,
CancellationToken cancellationToken = default) where TEvent : notnull
{
using Activity? activity = EventStoreActivitySource.Source.StartActivity("EventStore.TransactionAppender.Add");
activity?.SetTag("event_store.stream_id", _streamId.ToString());
activity?.SetTag("event_store.event_id", id.ToString());
activity?.SetTag("event_store.event_type", typeof(TEvent).FullName ?? typeof(TEvent).Name);

_inner.Add(id, evt, metaData, cancellationToken);
return this;
}

/// <inheritdoc />
public async Task CommitAsync(CancellationToken cancellationToken = default)
{
using Activity? activity = EventStoreActivitySource.Source.StartActivity("EventStore.TransactionAppender.Commit");
activity?.SetTag("event_store.stream_id", _streamId.ToString());

try
{
await _inner.CommitAsync(cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
throw;
}
}
}
149 changes: 149 additions & 0 deletions src/Papst.EventStore.OpenTelemetry/InstrumentedEventStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Papst.EventStore.Documents;

namespace Papst.EventStore.OpenTelemetry;

/// <summary>
/// A decorator for <see cref="IEventStream"/> that instruments all operations with OpenTelemetry activities.
/// </summary>
internal sealed class InstrumentedEventStream : IEventStream
{
private readonly IEventStream _inner;

public InstrumentedEventStream(IEventStream inner)
{
_inner = inner;
}

/// <inheritdoc />
public Guid StreamId => _inner.StreamId;

/// <inheritdoc />
public ulong Version => _inner.Version;

/// <inheritdoc />
public DateTimeOffset Created => _inner.Created;

/// <inheritdoc />
public ulong? LatestSnapshotVersion => _inner.LatestSnapshotVersion;

/// <inheritdoc />
public EventStreamMetaData MetaData => _inner.MetaData;

/// <inheritdoc />
public Task<EventStreamDocument?> GetLatestSnapshot(CancellationToken cancellationToken = default)
{
using Activity? activity = EventStoreActivitySource.Source.StartActivity("EventStream.GetLatestSnapshot");
activity?.SetTag("event_store.stream_id", StreamId.ToString());
return _inner.GetLatestSnapshot(cancellationToken);
}

/// <inheritdoc />
public async Task AppendAsync<TEvent>(Guid id, TEvent evt, EventStreamMetaData? metaData = null,
CancellationToken cancellationToken = default) where TEvent : notnull
{
using Activity? activity = EventStoreActivitySource.Source.StartActivity("EventStream.Append");
activity?.SetTag("event_store.stream_id", StreamId.ToString());
activity?.SetTag("event_store.event_id", id.ToString());
activity?.SetTag("event_store.event_type", typeof(TEvent).FullName ?? typeof(TEvent).Name);

try
{
await _inner.AppendAsync(id, evt, metaData, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
throw;
}
}

/// <inheritdoc />
public async Task AppendSnapshotAsync<TEntity>(Guid id, TEntity entity, EventStreamMetaData? metaData = null,
CancellationToken cancellationToken = default) where TEntity : notnull
{
using Activity? activity = EventStoreActivitySource.Source.StartActivity("EventStream.AppendSnapshot");
activity?.SetTag("event_store.stream_id", StreamId.ToString());
activity?.SetTag("event_store.event_id", id.ToString());
activity?.SetTag("event_store.entity_type", typeof(TEntity).FullName ?? typeof(TEntity).Name);

try
{
await _inner.AppendSnapshotAsync(id, entity, metaData, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
throw;
}
}

/// <inheritdoc />
public async Task<IEventStoreTransactionAppender> CreateTransactionalBatchAsync()
{
using Activity? activity = EventStoreActivitySource.Source.StartActivity("EventStream.CreateTransactionalBatch");
activity?.SetTag("event_store.stream_id", StreamId.ToString());

try
{
IEventStoreTransactionAppender appender = await _inner.CreateTransactionalBatchAsync().ConfigureAwait(false);
return new InstrumentedEventStoreTransactionAppender(appender, StreamId);
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
throw;
}
}

/// <inheritdoc />
public IAsyncEnumerable<EventStreamDocument> ListAsync(ulong startVersion = 0,
CancellationToken cancellationToken = default)
=> ListAsyncCore(_inner.ListAsync(startVersion, cancellationToken), "EventStream.List", cancellationToken);

/// <inheritdoc />
public IAsyncEnumerable<EventStreamDocument> ListAsync(ulong startVersion, ulong endVersion,
CancellationToken cancellationToken = default)
=> ListAsyncCore(_inner.ListAsync(startVersion, endVersion, cancellationToken), "EventStream.List", cancellationToken);

/// <inheritdoc />
public IAsyncEnumerable<EventStreamDocument> ListDescendingAsync(ulong endVersion, ulong startVersion,
CancellationToken cancellationToken = default)
=> ListAsyncCore(_inner.ListDescendingAsync(endVersion, startVersion, cancellationToken), "EventStream.ListDescending", cancellationToken);

/// <inheritdoc />
public IAsyncEnumerable<EventStreamDocument> ListDescendingAsync(ulong endVersion,
CancellationToken cancellationToken = default)
=> ListAsyncCore(_inner.ListDescendingAsync(endVersion, cancellationToken), "EventStream.ListDescending", cancellationToken);

/// <inheritdoc />
public Task UpdateStreamMetaData(EventStreamMetaData metaData, CancellationToken cancellationToken = default)
{
using Activity? activity = EventStoreActivitySource.Source.StartActivity("EventStream.UpdateMetaData");
activity?.SetTag("event_store.stream_id", StreamId.ToString());
return _inner.UpdateStreamMetaData(metaData, cancellationToken);
}

private async IAsyncEnumerable<EventStreamDocument> ListAsyncCore(
IAsyncEnumerable<EventStreamDocument> source,
string activityName,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
using Activity? activity = EventStoreActivitySource.Source.StartActivity(activityName);
activity?.SetTag("event_store.stream_id", StreamId.ToString());

ulong count = 0;
await foreach (EventStreamDocument doc in source.WithCancellation(cancellationToken).ConfigureAwait(false))
{
count++;
yield return doc;
}

activity?.SetTag("event_store.document_count", count.ToString());
}
}
Loading