Skip to content

Phase 4+: Enhance Idempotency Handling with Distributed Locking #119

@artcava

Description

@artcava

📋 Task Description

Enhance the existing idempotency mechanism with distributed locking using Redis to prevent race conditions in concurrent process creation scenarios. Implement idempotency key TTL, cleanup mechanisms, and proper conflict resolution.

🎯 Objectives

  • Implement distributed locking with RedLock algorithm
  • Add Redis-based idempotency key storage
  • Implement idempotency key TTL (Time To Live)
  • Add concurrent request detection
  • Implement proper conflict resolution (409 Conflict)
  • Add idempotency key cleanup mechanism
  • Enhance existing idempotency checks
  • Add idempotency metrics
  • Create idempotency management endpoints
  • Write comprehensive tests for race conditions
  • Document idempotency guarantees

📦 Deliverables

1. Install RedLock Package

Update src/StarGate.Infrastructure/StarGate.Infrastructure.csproj:

<ItemGroup>
  <PackageReference Include="RedLock.net" Version="2.3.2" />
  <PackageReference Include="StackExchange.Redis" Version="2.7.10" />
</ItemGroup>

2. Create Idempotency Service

Create src/StarGate.Infrastructure/Services/IdempotencyService.cs:

namespace StarGate.Infrastructure.Services;

using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RedLockNet.SERedis;
using RedLockNet.SERedis.Configuration;
using StackExchange.Redis;
using StarGate.Core.Abstractions;

public interface IIdempotencyService
{
    Task<IdempotencyResult> AcquireAsync(
        string idempotencyKey,
        CancellationToken cancellationToken = default);
    
    Task ReleaseAsync(
        string idempotencyKey,
        CancellationToken cancellationToken = default);
    
    Task<bool> ExistsAsync(
        string idempotencyKey,
        CancellationToken cancellationToken = default);
    
    Task<string?> GetResultAsync(
        string idempotencyKey,
        CancellationToken cancellationToken = default);
    
    Task SetResultAsync(
        string idempotencyKey,
        string result,
        CancellationToken cancellationToken = default);
    
    Task CleanupExpiredAsync(CancellationToken cancellationToken = default);
}

public class IdempotencyService : IIdempotencyService, IDisposable
{
    private readonly IConnectionMultiplexer _redis;
    private readonly RedLockFactory _redLockFactory;
    private readonly IdempotencyOptions _options;
    private readonly ILogger<IdempotencyService> _logger;
    private readonly IDatabase _db;

    public IdempotencyService(
        IConnectionMultiplexer redis,
        IOptions<IdempotencyOptions> options,
        ILogger<IdempotencyService> logger)
    {
        _redis = redis ?? throw new ArgumentNullException(nameof(redis));
        _options = options?.Value ?? throw new ArgumentNullException(nameof(options));
        _logger = logger ?? throw new ArgumentNullException(nameof(logger));
        _db = _redis.GetDatabase();

        var endPoints = new List<RedLockEndPoint>
        {
            new DnsEndPoint(_redis.GetEndPoints()[0].ToString())
        };

        _redLockFactory = RedLockFactory.Create(endPoints);
    }

    public async Task<IdempotencyResult> AcquireAsync(
        string idempotencyKey,
        CancellationToken cancellationToken = default)
    {
        ArgumentException.ThrowIfNullOrWhiteSpace(idempotencyKey);

        var lockKey = GetLockKey(idempotencyKey);
        var processingKey = GetProcessingKey(idempotencyKey);
        var resultKey = GetResultKey(idempotencyKey);

        // Check if result already exists
        var existingResult = await _db.StringGetAsync(resultKey);
        if (existingResult.HasValue)
        {
            _logger.LogDebug(
                "Idempotency key {IdempotencyKey} already has result",
                idempotencyKey);

            return new IdempotencyResult
            {
                Success = false,
                Status = IdempotencyStatus.AlreadyProcessed,
                ExistingResult = existingResult.ToString()
            };
        }

        // Try to acquire distributed lock
        var redLock = await _redLockFactory.CreateLockAsync(
            lockKey,
            TimeSpan.FromSeconds(_options.LockTimeoutSeconds),
            TimeSpan.FromSeconds(_options.LockWaitSeconds),
            TimeSpan.FromSeconds(_options.LockRetrySeconds),
            cancellationToken);

        if (!redLock.IsAcquired)
        {
            _logger.LogWarning(
                "Failed to acquire lock for idempotency key {IdempotencyKey}",
                idempotencyKey);

            // Check if currently being processed by another request
            var isProcessing = await _db.StringGetAsync(processingKey);
            if (isProcessing.HasValue)
            {
                return new IdempotencyResult
                {
                    Success = false,
                    Status = IdempotencyStatus.InProgress
                };
            }

            return new IdempotencyResult
            {
                Success = false,
                Status = IdempotencyStatus.LockFailed
            };
        }

        try
        {
            // Double-check result doesn't exist (race condition protection)
            existingResult = await _db.StringGetAsync(resultKey);
            if (existingResult.HasValue)
            {
                return new IdempotencyResult
                {
                    Success = false,
                    Status = IdempotencyStatus.AlreadyProcessed,
                    ExistingResult = existingResult.ToString()
                };
            }

            // Mark as processing
            await _db.StringSetAsync(
                processingKey,
                DateTime.UtcNow.ToString("O"),
                TimeSpan.FromSeconds(_options.ProcessingTimeoutSeconds));

            _logger.LogInformation(
                "Acquired lock for idempotency key {IdempotencyKey}",
                idempotencyKey);

            return new IdempotencyResult
            {
                Success = true,
                Status = IdempotencyStatus.Acquired,
                Lock = redLock
            };
        }
        catch
        {
            // Release lock on error
            await redLock.DisposeAsync();
            throw;
        }
    }

    public async Task ReleaseAsync(
        string idempotencyKey,
        CancellationToken cancellationToken = default)
    {
        ArgumentException.ThrowIfNullOrWhiteSpace(idempotencyKey);

        var processingKey = GetProcessingKey(idempotencyKey);
        await _db.KeyDeleteAsync(processingKey);

        _logger.LogDebug(
            "Released processing marker for idempotency key {IdempotencyKey}",
            idempotencyKey);
    }

    public async Task<bool> ExistsAsync(
        string idempotencyKey,
        CancellationToken cancellationToken = default)
    {
        ArgumentException.ThrowIfNullOrWhiteSpace(idempotencyKey);

        var resultKey = GetResultKey(idempotencyKey);
        return await _db.KeyExistsAsync(resultKey);
    }

    public async Task<string?> GetResultAsync(
        string idempotencyKey,
        CancellationToken cancellationToken = default)
    {
        ArgumentException.ThrowIfNullOrWhiteSpace(idempotencyKey);

        var resultKey = GetResultKey(idempotencyKey);
        var result = await _db.StringGetAsync(resultKey);
        
        return result.HasValue ? result.ToString() : null;
    }

    public async Task SetResultAsync(
        string idempotencyKey,
        string result,
        CancellationToken cancellationToken = default)
    {
        ArgumentException.ThrowIfNullOrWhiteSpace(idempotencyKey);
        ArgumentException.ThrowIfNullOrWhiteSpace(result);

        var resultKey = GetResultKey(idempotencyKey);
        
        await _db.StringSetAsync(
            resultKey,
            result,
            TimeSpan.FromHours(_options.ResultTtlHours));

        _logger.LogInformation(
            "Stored result for idempotency key {IdempotencyKey} with TTL {TtlHours}h",
            idempotencyKey,
            _options.ResultTtlHours);
    }

    public async Task CleanupExpiredAsync(CancellationToken cancellationToken = default)
    {
        // Redis TTL handles cleanup automatically
        // This method is for manual cleanup if needed
        _logger.LogInformation("Idempotency cleanup completed (TTL-based)");
        await Task.CompletedTask;
    }

    private static string GetLockKey(string idempotencyKey) =>
        $"stargate:idempotency:lock:{idempotencyKey}";

    private static string GetProcessingKey(string idempotencyKey) =>
        $"stargate:idempotency:processing:{idempotencyKey}";

    private static string GetResultKey(string idempotencyKey) =>
        $"stargate:idempotency:result:{idempotencyKey}";

    public void Dispose()
    {
        _redLockFactory?.Dispose();
    }
}

public class IdempotencyResult
{
    public required bool Success { get; init; }
    public required IdempotencyStatus Status { get; init; }
    public string? ExistingResult { get; init; }
    public IDisposable? Lock { get; init; }
}

public enum IdempotencyStatus
{
    Acquired,
    AlreadyProcessed,
    InProgress,
    LockFailed
}

3. Create Idempotency Options

Create src/StarGate.Infrastructure/Services/IdempotencyOptions.cs:

namespace StarGate.Infrastructure.Services;

public class IdempotencyOptions
{
    public const string SectionName = "Idempotency";

    /// <summary>
    /// Timeout for acquiring distributed lock (seconds)
    /// </summary>
    public int LockTimeoutSeconds { get; set; } = 30;

    /// <summary>
    /// Maximum time to wait for lock (seconds)
    /// </summary>
    public int LockWaitSeconds { get; set; } = 10;

    /// <summary>
    /// Retry interval for lock acquisition (seconds)
    /// </summary>
    public double LockRetrySeconds { get; set; } = 1;

    /// <summary>
    /// Timeout for processing marker (seconds)
    /// </summary>
    public int ProcessingTimeoutSeconds { get; set; } = 300;

    /// <summary>
    /// TTL for idempotency results (hours)
    /// </summary>
    public int ResultTtlHours { get; set; } = 24;
}

4. Update ProcessService with Enhanced Idempotency

Update src/StarGate.Application/Services/ProcessService.cs:

public class ProcessService
{
    private readonly IProcessRepository _processRepository;
    private readonly IIdempotencyService _idempotencyService;
    private readonly IMessageBroker _messageBroker;
    private readonly ILogger<ProcessService> _logger;

    public async Task<ProcessCreationResult> CreateProcessAsync(
        CreateProcessRequest request,
        CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(request);

        // Try to acquire idempotency lock
        var idempotencyResult = await _idempotencyService.AcquireAsync(
            request.IdempotencyKey,
            cancellationToken);

        if (!idempotencyResult.Success)
        {
            return idempotencyResult.Status switch
            {
                IdempotencyStatus.AlreadyProcessed => await HandleAlreadyProcessedAsync(
                    idempotencyResult.ExistingResult!,
                    cancellationToken),
                
                IdempotencyStatus.InProgress => throw new ConflictException(
                    "Request is currently being processed"),
                
                _ => throw new ServiceUnavailableException(
                    "Unable to acquire idempotency lock")
            };
        }

        try
        {
            using (idempotencyResult.Lock)
            {
                // Create process
                var process = await CreateProcessInternalAsync(request, cancellationToken);

                // Store result for future duplicate requests
                var result = new ProcessCreationResult
                {
                    ProcessId = process.ProcessId,
                    Status = process.Status,
                    CreatedAt = process.CreatedAt
                };

                await _idempotencyService.SetResultAsync(
                    request.IdempotencyKey,
                    JsonSerializer.Serialize(result),
                    cancellationToken);

                return result;
            }
        }
        finally
        {
            await _idempotencyService.ReleaseAsync(
                request.IdempotencyKey,
                cancellationToken);
        }
    }

    private async Task<ProcessCreationResult> HandleAlreadyProcessedAsync(
        string existingResultJson,
        CancellationToken cancellationToken)
    {
        var result = JsonSerializer.Deserialize<ProcessCreationResult>(existingResultJson);
        
        _logger.LogInformation(
            "Returning cached result for duplicate request: {ProcessId}",
            result!.ProcessId);

        return result;
    }

    private async Task<Process> CreateProcessInternalAsync(
        CreateProcessRequest request,
        CancellationToken cancellationToken)
    {
        // Existing creation logic...
        var process = new Process
        {
            ProcessId = Guid.NewGuid(),
            ClientId = request.ClientId,
            ProcessType = request.ProcessType,
            ClientProcessId = request.ClientProcessId,
            IdempotencyKey = request.IdempotencyKey,
            Status = ProcessStatus.Pending,
            CreatedAt = DateTime.UtcNow,
            UpdatedAt = DateTime.UtcNow,
            Metadata = request.Metadata
        };

        await _processRepository.CreateAsync(process, cancellationToken);
        await _messageBroker.PublishAsync("stargate.processes", process, cancellationToken);

        return process;
    }
}

public class ProcessCreationResult
{
    public Guid ProcessId { get; init; }
    public string Status { get; init; } = string.Empty;
    public DateTime CreatedAt { get; init; }
}

public class ConflictException : Exception
{
    public ConflictException(string message) : base(message) { }
}

5. Add Idempotency Endpoints

Create src/StarGate.Server/Endpoints/IdempotencyEndpoints.cs:

namespace StarGate.Server.Endpoints;

using Microsoft.AspNetCore.Mvc;
using StarGate.Infrastructure.Services;

public static class IdempotencyEndpoints
{
    public static void MapIdempotencyEndpoints(this IEndpointRouteBuilder app)
    {
        var group = app.MapGroup("/api/idempotency")
            .WithTags("Idempotency")
            .RequireAuthorization();

        // Check if idempotency key exists
        group.MapGet("/{idempotencyKey}/exists", async (
            string idempotencyKey,
            [FromServices] IIdempotencyService service) =>
        {
            var exists = await service.ExistsAsync(idempotencyKey);
            return Results.Ok(new { exists });
        })
        .WithName("CheckIdempotencyKey")
        .WithOpenApi();

        // Get result for idempotency key
        group.MapGet("/{idempotencyKey}/result", async (
            string idempotencyKey,
            [FromServices] IIdempotencyService service) =>
        {
            var result = await service.GetResultAsync(idempotencyKey);
            if (result == null)
            {
                return Results.NotFound();
            }
            return Results.Ok(result);
        })
        .WithName("GetIdempotencyResult")
        .WithOpenApi();

        // Trigger cleanup of expired keys
        group.MapPost("/cleanup", async (
            [FromServices] IIdempotencyService service) =>
        {
            await service.CleanupExpiredAsync();
            return Results.Ok(new { message = "Cleanup completed" });
        })
        .WithName("CleanupIdempotency")
        .WithOpenApi();
    }
}

Register in Program.cs:

app.MapIdempotencyEndpoints();

6. Add Configuration

Update src/StarGate.Server/appsettings.json:

{
  "Idempotency": {
    "LockTimeoutSeconds": 30,
    "LockWaitSeconds": 10,
    "LockRetrySeconds": 1,
    "ProcessingTimeoutSeconds": 300,
    "ResultTtlHours": 24
  }
}

7. Add Idempotency Metrics

Update src/StarGate.Core/Metrics/ApplicationMetrics.cs:

public static readonly Counter IdempotencyHits = Metrics.CreateCounter(
    "stargate_idempotency_hits_total",
    "Total number of duplicate requests detected",
    new CounterConfiguration
    {
        LabelNames = new[] { "process_type" }
    });

public static readonly Counter IdempotencyConflicts = Metrics.CreateCounter(
    "stargate_idempotency_conflicts_total",
    "Total number of idempotency conflicts (409)",
    new CounterConfiguration
    {
        LabelNames = new[] { "process_type" }
    });

public static readonly Histogram IdempotencyLockDuration = Metrics.CreateHistogram(
    "stargate_idempotency_lock_duration_seconds",
    "Duration of idempotency lock acquisition",
    new HistogramConfiguration
    {
        Buckets = Histogram.LinearBuckets(0.001, 0.01, 10)
    });

8. Update Error Handling

Update src/StarGate.Server/Middleware/ErrorHandlingMiddleware.cs:

if (exception is ConflictException)
{
    context.Response.StatusCode = StatusCodes.Status409Conflict;
    await context.Response.WriteAsJsonAsync(new
    {
        error = "Conflict",
        message = exception.Message,
        type = "IdempotencyConflict"
    });
    return;
}

9. Create Documentation

Create docs/IDEMPOTENCY.md:

# Idempotency - StarGate

## Overview

Idempotency ensures that duplicate requests have the same effect as a single request, preventing accidental duplicate process creation.

## How It Works

### Request Flow
1. Client sends request with idempotency key
2. Server acquires distributed lock on key
3. Server checks if result already exists
4. If exists: return cached result (200 OK)
5. If not: create process and cache result
6. Release lock

### Concurrent Requests
- First request acquires lock
- Subsequent requests wait or return 409 Conflict
- After completion, all return same result

## Idempotency Key

### Format
- Unique string per logical operation
- Typically UUID v4
- Client-generated

### Example
```json
{
  "clientId": "client-123",
  "processType": "order",
  "clientProcessId": "order-789",
  "idempotencyKey": "550e8400-e29b-41d4-a716-446655440000",
  "metadata": {...}
}

Response Codes

200 OK (Duplicate)

{
  "processId": "...",
  "status": "Pending",
  "createdAt": "...",
  "_idempotent": true
}

201 Created (New)

{
  "processId": "...",
  "status": "Pending",
  "createdAt": "..."
}

409 Conflict (In Progress)

{
  "error": "Conflict",
  "message": "Request is currently being processed",
  "type": "IdempotencyConflict"
}

TTL and Cleanup

Result TTL

  • Default: 24 hours
  • Configurable via ResultTtlHours
  • Automatic cleanup by Redis TTL

Processing Timeout

  • Default: 5 minutes (300s)
  • Prevents indefinite locks
  • Automatic release on timeout

Best Practices

Client-Side

  1. Generate unique keys per operation
  2. Retry with same key on network errors
  3. Don't reuse keys across different operations
  4. Store keys for later verification

Server-Side

  1. Always validate idempotency keys
  2. Log duplicate detections for monitoring
  3. Set appropriate TTLs based on use case
  4. Monitor 409 conflicts (may indicate issues)

Monitoring

Metrics

stargate_idempotency_hits_total
stargate_idempotency_conflicts_total
stargate_idempotency_lock_duration_seconds

High Conflict Rate

If idempotency_conflicts_total is high:

  • Increase LockWaitSeconds
  • Check for client-side retry issues
  • Verify network stability

Testing

Race Condition Test

# Send 10 concurrent requests with same key
for i in {1..10}; do
  curl -X POST http://localhost:5000/api/processes \
    -H "Content-Type: application/json" \
    -d '{
      "clientId": "test",
      "processType": "order",
      "clientProcessId": "test-race",
      "idempotencyKey": "same-key-for-all"
    }' &
done
wait

# Should have only 1 process created
# Should have 9 duplicate detections

Troubleshooting

409 Conflicts

  • Requests arriving simultaneously
  • Network delays causing retries
  • Client not waiting for response

Lost Results

  • TTL expired
  • Redis restart
  • Key not found after creation

Performance Impact

  • Lock acquisition: ~10-50ms
  • Redis lookup: ~1-5ms
  • Total overhead: ~15-60ms

## ✅ Acceptance Criteria

- [ ] RedLock package installed
- [ ] Distributed locking implemented
- [ ] Idempotency service created
- [ ] Redis-based key storage implemented
- [ ] TTL configured for results
- [ ] Concurrent request detection working
- [ ] 409 Conflict responses implemented
- [ ] Idempotency key validation added
- [ ] Cleanup mechanism implemented
- [ ] Metrics emitted
- [ ] Management endpoints created
- [ ] Race condition tests passing
- [ ] Documentation complete
- [ ] Code follows CODING-CONVENTIONS.md

## 📝 Testing Instructions

```bash
# Start infrastructure
docker-compose up -d

# Run application
dotnet run --project src/StarGate.Server

# Test normal idempotency
KEY=$(uuidgen)
curl -X POST http://localhost:5000/api/processes \
  -H "Content-Type: application/json" \
  -d '{
    "clientId": "test",
    "processType": "order",
    "clientProcessId": "test-1",
    "idempotencyKey": "'$KEY'"
  }'

# Duplicate request (should return same result)
curl -X POST http://localhost:5000/api/processes \
  -H "Content-Type: application/json" \
  -d '{
    "clientId": "test",
    "processType": "order",
    "clientProcessId": "test-1",
    "idempotencyKey": "'$KEY'"
  }'

# Test concurrent requests
KEY2=$(uuidgen)
for i in {1..10}; do
  curl -X POST http://localhost:5000/api/processes \
    -H "Content-Type: application/json" \
    -d '{
      "clientId": "test",
      "processType": "order",
      "clientProcessId": "test-race",
      "idempotencyKey": "'$KEY2'"
    }' &
done
wait

# Check metrics
curl http://localhost:5000/metrics | grep idempotency

# Check Redis keys
docker exec stargate-redis redis-cli KEYS "stargate:idempotency:*"

# Get idempotency result
curl http://localhost:5000/api/idempotency/$KEY/result

📚 References

🏷️ Labels

phase-4+ production-readiness idempotency distributed-locking redis

⏱️ Estimated Effort

6-8 hours

🔗 Dependencies

  • Redis infrastructure
  • Existing idempotency mechanism (basic)
  • ProcessService implementation

🔗 Related Issues

Part of "Production-Ready API" initiative - prevents duplicate process creation

📌 Important Notes

RedLock Algorithm

Why RedLock:

  • Distributed consensus
  • Works across multiple Redis instances
  • Handles network partitions
  • Automatic lock expiry

How it works:

  1. Get current time
  2. Try to acquire lock on all instances
  3. If majority acquired: success
  4. Calculate lock validity time
  5. Use lock or release if invalid

Race Condition Protection

Scenario:

Request A ----acquire-lock----create-process----release-lock----
Request B --------wait----------------(409 or wait)-------------

Without locking:

Request A ----check-db----create-process----save----
Request B ----check-db----create-process----save----
Result: 2 processes created ❌

With locking:

Request A ----lock----check----create----save----unlock----
Request B ----wait-----------------------------lock----return-cached----
Result: 1 process created ✓

TTL Strategy

Result TTL (24h):

  • Long enough for client retries
  • Short enough to avoid memory issues
  • Balances durability and cleanup

Processing TTL (5m):

  • Longer than max request timeout
  • Prevents indefinite locks
  • Allows recovery from crashes

Lock TTL (30s):

  • Shorter than processing
  • Automatic release on crash
  • Prevents permanent locks

Performance Considerations

Overhead:

  • Lock acquisition: 10-50ms
  • Redis operations: 1-5ms
  • Total: ~15-60ms per request

Optimization:

  • Use pipelining for Redis
  • Connection pooling
  • Local caching (with invalidation)

Error Handling

Lock acquisition failed:

  • Return 503 (temporary)
  • Client should retry
  • Log for monitoring

Already processed:

  • Return cached result (200)
  • No error logged
  • Increment metrics

In progress:

  • Return 409 Conflict
  • Client should wait and retry
  • Or poll for result

Metadata

Metadata

Assignees

No one assigned

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions