diff --git a/README.md b/README.md index f9237fb..963f7d8 100644 --- a/README.md +++ b/README.md @@ -80,13 +80,15 @@ go-project-template/ │ ├── httputil/ # HTTP utilities (JSON responses, error mapping) │ ├── validation/ # Custom validation rules │ ├── testutil/ # Test utilities -│ ├── worker/ # Background workers │ ├── user/ # User domain module │ │ ├── domain/ # User entities and domain errors │ │ ├── usecase/ # User business logic │ │ ├── repository/ # User data access │ │ └── http/ # User HTTP handlers and DTOs │ └── outbox/ # Outbox domain module +│ ├── domain/ # Outbox entities and domain errors +│ ├── usecase/ # Outbox event processing logic +│ └── repository/ # Outbox data access ├── migrations/ │ ├── postgresql/ # PostgreSQL migrations │ └── mysql/ # MySQL migrations @@ -126,7 +128,7 @@ The project uses real PostgreSQL and MySQL databases for testing instead of mock ```bash make build # Build the application make run-server # Run HTTP server -make run-worker # Run background worker +make run-worker # Run outbox event processor make run-migrate # Run database migrations make lint # Run linter with auto-fix make clean # Clean build artifacts @@ -192,10 +194,11 @@ All entities use UUIDv7 for primary keys: ### Transactional Outbox Pattern -Ensures reliable event delivery: +Ensures reliable event delivery using a use case-based approach: 1. Business operation and event stored in same transaction -2. Background worker processes pending events +2. Outbox use case processes pending events with configurable retry logic 3. Guarantees at-least-once delivery +4. Extensible event processing via the `EventProcessor` interface ## 🐳 Docker @@ -213,6 +216,8 @@ make docker-run-worker make docker-run-migrate ``` +The worker command runs the outbox event processor, which handles asynchronous event processing using the transactional outbox pattern. + ## 🔧 Configuration All configuration is done via environment variables. Create a `.env` file in your project root: diff --git a/cmd/app/main.go b/cmd/app/main.go index 7fa996d..aadd01a 100644 --- a/cmd/app/main.go +++ b/cmd/app/main.go @@ -156,7 +156,7 @@ func runMigrations() error { return nil } -// runWorker starts the event worker with graceful shutdown support. +// runWorker starts the outbox event processor with graceful shutdown support. func runWorker(ctx context.Context) error { // Load configuration cfg := config.Load() @@ -166,21 +166,21 @@ func runWorker(ctx context.Context) error { // Get logger from container logger := container.Logger() - logger.Info("starting worker", slog.String("version", "1.0.0")) + logger.Info("starting outbox event processor", slog.String("version", "1.0.0")) // Ensure cleanup on exit defer closeContainer(container, logger) - // Get event worker from container (this initializes all dependencies) - eventWorker, err := container.EventWorker() + // Get outbox use case from container (this initializes all dependencies) + outboxUseCase, err := container.OutboxUseCase() if err != nil { - return fmt.Errorf("failed to initialize event worker: %w", err) + return fmt.Errorf("failed to initialize outbox use case: %w", err) } // Setup graceful shutdown ctx, cancel := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM) defer cancel() - // Start worker - return eventWorker.Start(ctx) + // Start outbox event processor + return outboxUseCase.Start(ctx) } diff --git a/docs/architecture.md b/docs/architecture.md index 499c623..23ddd7f 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -250,8 +250,9 @@ internal/ │ │ └── mapper.go │ └── user_handler.go ├── outbox/ # Outbox domain module -│ ├── domain/ -│ └── repository/ +│ ├── domain/ # Outbox entities and domain errors +│ ├── usecase/ # Outbox event processing logic +│ └── repository/ # Outbox data access └── {new-domain}/ # Easy to add new domains ``` @@ -276,8 +277,8 @@ The DI container (`internal/app/`) manages all application components with: Container ├── Infrastructure (Database, Logger) ├── Repositories (User, Outbox) -├── Use Cases (User) -└── Presentation (HTTP Server, Worker) +├── Use Cases (User, Outbox) +└── Presentation (HTTP Server) ``` **Example**: @@ -372,17 +373,19 @@ The transaction is automatically injected into the context and used by repositor ## 📤 Transactional Outbox Pattern -The project demonstrates the transactional outbox pattern for reliable event delivery: +The project demonstrates the transactional outbox pattern for reliable event delivery using a use case-based architecture: 1. 📝 Business operation (e.g., user creation) is executed 2. 📬 Event is stored in outbox table in **same transaction** -3. 🚀 Background worker picks up pending events +3. 🚀 Outbox use case processes pending events with configurable retry logic 4. ✅ Events are marked as processed or failed +5. 🔌 Extensible via the `EventProcessor` interface for custom event handling **Benefits**: - 🔒 **Guaranteed delivery** - Events never lost due to transaction rollback - 🔁 **At-least-once delivery** - Events processed at least once - 🎯 **Consistency** - Business operations and events always in sync +- 🔧 **Extensibility** - Custom event processors for different event types **Example (User Registration)**: ```go @@ -403,4 +406,41 @@ err = uc.txManager.WithTx(ctx, func(ctx context.Context) error { }) ``` -The worker (`internal/worker/event_worker.go`) processes these events asynchronously. +**Processing Events**: + +The outbox use case (`internal/outbox/usecase/outbox_usecase.go`) processes these events asynchronously: + +```go +// Start the outbox event processor +outboxUseCase, err := container.OutboxUseCase() +if err != nil { + return fmt.Errorf("failed to initialize outbox use case: %w", err) +} + +// Processes events in background +err = outboxUseCase.Start(ctx) +``` + +**Custom Event Processing**: + +You can create custom event processors by implementing the `EventProcessor` interface: + +```go +type CustomEventProcessor struct { + logger *slog.Logger + // Add your dependencies here (e.g., message queue client) +} + +func (p *CustomEventProcessor) Process(ctx context.Context, event *domain.OutboxEvent) error { + // Your custom event processing logic + switch event.EventType { + case "user.created": + // Send to message queue, send notification, etc. + return p.publishToQueue(ctx, event) + default: + return fmt.Errorf("unknown event type: %s", event.EventType) + } +} +``` + +Then register it in the DI container when initializing the outbox use case. diff --git a/docs/development.md b/docs/development.md index f05082a..b32215f 100644 --- a/docs/development.md +++ b/docs/development.md @@ -16,7 +16,7 @@ The binary will be created at `./bin/app`. ```bash make run-server # Build and run HTTP server -make run-worker # Build and run background worker +make run-worker # Build and run outbox event processor make run-migrate # Build and run database migrations ``` diff --git a/docs/getting-started.md b/docs/getting-started.md index 920c8b9..9e25a27 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -203,13 +203,13 @@ curl http://localhost:8080/ready ### Start the Background Worker -In another terminal, start the worker process: +In another terminal, start the outbox event processor: ```bash make run-worker ``` -The worker processes outbox events from the database. +The outbox event processor handles asynchronous event processing from the outbox table using the transactional outbox pattern. ## 🧪 Testing the API @@ -296,6 +296,8 @@ The binary supports three commands via `urfave/cli`: ./bin/app worker ``` +This starts the outbox event processor which handles asynchronous event processing using the transactional outbox pattern. + ## 📚 Next Steps Now that you have the application running, you can: diff --git a/internal/app/di.go b/internal/app/di.go index e44ed29..f0910fb 100644 --- a/internal/app/di.go +++ b/internal/app/di.go @@ -13,9 +13,9 @@ import ( "github.com/allisson/go-project-template/internal/database" "github.com/allisson/go-project-template/internal/http" outboxRepository "github.com/allisson/go-project-template/internal/outbox/repository" + outboxUsecase "github.com/allisson/go-project-template/internal/outbox/usecase" userRepository "github.com/allisson/go-project-template/internal/user/repository" userUsecase "github.com/allisson/go-project-template/internal/user/usecase" - "github.com/allisson/go-project-template/internal/worker" ) // Container holds all application dependencies and provides methods to access them. @@ -36,23 +36,23 @@ type Container struct { outboxRepo userUsecase.OutboxEventRepository // Use Cases - userUseCase userUsecase.UseCase + userUseCase userUsecase.UseCase + outboxUseCase outboxUsecase.UseCase // Servers and Workers - httpServer *http.Server - eventWorker *worker.EventWorker + httpServer *http.Server // Initialization flags and mutex for thread-safety - mu sync.Mutex - loggerInit sync.Once - dbInit sync.Once - txManagerInit sync.Once - userRepoInit sync.Once - outboxRepoInit sync.Once - userUseCaseInit sync.Once - httpServerInit sync.Once - eventWorkerInit sync.Once - initErrors map[string]error + mu sync.Mutex + loggerInit sync.Once + dbInit sync.Once + txManagerInit sync.Once + userRepoInit sync.Once + outboxRepoInit sync.Once + userUseCaseInit sync.Once + outboxUseCaseInit sync.Once + httpServerInit sync.Once + initErrors map[string]error } // NewContainer creates a new dependency injection container with the provided configuration. @@ -187,22 +187,22 @@ func (c *Container) HTTPServer() (*http.Server, error) { return c.httpServer, nil } -// EventWorker returns the event worker instance. -func (c *Container) EventWorker() (*worker.EventWorker, error) { +// OutboxUseCase returns the outbox use case instance. +func (c *Container) OutboxUseCase() (outboxUsecase.UseCase, error) { var err error - c.eventWorkerInit.Do(func() { - c.eventWorker, err = c.initEventWorker() + c.outboxUseCaseInit.Do(func() { + c.outboxUseCase, err = c.initOutboxUseCase() if err != nil { - c.initErrors["eventWorker"] = err + c.initErrors["outboxUseCase"] = err } }) if err != nil { return nil, err } - if storedErr, exists := c.initErrors["eventWorker"]; exists { + if storedErr, exists := c.initErrors["outboxUseCase"]; exists { return nil, storedErr } - return c.eventWorker, nil + return c.outboxUseCase, nil } // Shutdown performs cleanup of all initialized resources. @@ -362,28 +362,29 @@ func (c *Container) initHTTPServer() (*http.Server, error) { return server, nil } -// initEventWorker creates the event worker with all its dependencies. -func (c *Container) initEventWorker() (*worker.EventWorker, error) { +// initOutboxUseCase creates the outbox use case with all its dependencies. +func (c *Container) initOutboxUseCase() (outboxUsecase.UseCase, error) { logger := c.Logger() txManager, err := c.TxManager() if err != nil { - return nil, fmt.Errorf("failed to get tx manager for event worker: %w", err) + return nil, fmt.Errorf("failed to get tx manager for outbox use case: %w", err) } outboxRepo, err := c.OutboxRepository() if err != nil { - return nil, fmt.Errorf("failed to get outbox repository for event worker: %w", err) + return nil, fmt.Errorf("failed to get outbox repository for outbox use case: %w", err) } - workerConfig := worker.Config{ + useCaseConfig := outboxUsecase.Config{ Interval: c.config.WorkerInterval, BatchSize: c.config.WorkerBatchSize, MaxRetries: c.config.WorkerMaxRetries, RetryInterval: c.config.WorkerRetryInterval, } - eventWorker := worker.NewEventWorker(workerConfig, txManager, outboxRepo, logger) + eventProcessor := outboxUsecase.NewDefaultEventProcessor(logger) + useCase := outboxUsecase.NewOutboxUseCase(useCaseConfig, txManager, outboxRepo, eventProcessor, logger) - return eventWorker, nil + return useCase, nil } diff --git a/internal/outbox/usecase/outbox_usecase.go b/internal/outbox/usecase/outbox_usecase.go new file mode 100644 index 0000000..e309b9f --- /dev/null +++ b/internal/outbox/usecase/outbox_usecase.go @@ -0,0 +1,200 @@ +// Package usecase implements the outbox business logic and orchestrates outbox domain operations. +package usecase + +import ( + "context" + "encoding/json" + "log/slog" + "time" + + "github.com/allisson/go-project-template/internal/database" + "github.com/allisson/go-project-template/internal/outbox/domain" +) + +// Config holds outbox use case configuration +type Config struct { + Interval time.Duration + BatchSize int + MaxRetries int + RetryInterval time.Duration +} + +// OutboxEventRepository defines outbox event repository operations +type OutboxEventRepository interface { + Create(ctx context.Context, event *domain.OutboxEvent) error + GetPendingEvents(ctx context.Context, limit int) ([]*domain.OutboxEvent, error) + Update(ctx context.Context, event *domain.OutboxEvent) error +} + +// EventProcessor defines the interface for processing different event types +type EventProcessor interface { + Process(ctx context.Context, event *domain.OutboxEvent) error +} + +// UseCase defines the interface for outbox use cases +type UseCase interface { + Start(ctx context.Context) error + ProcessEvents(ctx context.Context) error +} + +// OutboxUseCase implements business logic for processing outbox events +type OutboxUseCase struct { + config Config + txManager database.TxManager + outboxRepo OutboxEventRepository + eventProcessor EventProcessor + logger *slog.Logger +} + +// NewOutboxUseCase creates a new OutboxUseCase +func NewOutboxUseCase( + config Config, + txManager database.TxManager, + outboxRepo OutboxEventRepository, + eventProcessor EventProcessor, + logger *slog.Logger, +) *OutboxUseCase { + return &OutboxUseCase{ + config: config, + txManager: txManager, + outboxRepo: outboxRepo, + eventProcessor: eventProcessor, + logger: logger, + } +} + +// Start starts the outbox event processing loop +func (uc *OutboxUseCase) Start(ctx context.Context) error { + if uc.logger != nil { + uc.logger.Info("starting outbox event processor", + slog.Duration("interval", uc.config.Interval), + slog.Int("batch_size", uc.config.BatchSize), + ) + } + + ticker := time.NewTicker(uc.config.Interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + if uc.logger != nil { + uc.logger.Info("stopping outbox event processor") + } + return ctx.Err() + case <-ticker.C: + if err := uc.ProcessEvents(ctx); err != nil { + if uc.logger != nil { + uc.logger.Error("failed to process events", slog.Any("error", err)) + } + } + } + } +} + +// ProcessEvents retrieves and processes pending events from the outbox in a transaction +func (uc *OutboxUseCase) ProcessEvents(ctx context.Context) error { + return uc.txManager.WithTx(ctx, func(ctx context.Context) error { + // Get pending events + events, err := uc.outboxRepo.GetPendingEvents(ctx, uc.config.BatchSize) + if err != nil { + return err + } + + if len(events) == 0 { + return nil + } + + if uc.logger != nil { + uc.logger.Info("processing events", slog.Int("count", len(events))) + } + + for _, event := range events { + if err := uc.processEvent(ctx, event); err != nil { + if uc.logger != nil { + uc.logger.Error("failed to process event", + slog.String("event_id", event.ID.String()), + slog.String("event_type", event.EventType), + slog.Any("error", err), + ) + } + + // Update event as failed + event.Retries++ + errorMsg := err.Error() + event.LastError = &errorMsg + + if event.Retries >= uc.config.MaxRetries { + event.Status = domain.OutboxEventStatusFailed + } + + if err := uc.outboxRepo.Update(ctx, event); err != nil { + return err + } + continue + } + + // Mark event as processed + now := time.Now() + event.Status = domain.OutboxEventStatusProcessed + event.ProcessedAt = &now + + if err := uc.outboxRepo.Update(ctx, event); err != nil { + return err + } + } + + return nil + }) +} + +// processEvent handles a single outbox event using the configured event processor +func (uc *OutboxUseCase) processEvent(ctx context.Context, event *domain.OutboxEvent) error { + if uc.logger != nil { + uc.logger.Info("processing event", + slog.String("event_id", event.ID.String()), + slog.String("event_type", event.EventType), + ) + } + + return uc.eventProcessor.Process(ctx, event) +} + +// DefaultEventProcessor is a default implementation of EventProcessor +type DefaultEventProcessor struct { + logger *slog.Logger +} + +// NewDefaultEventProcessor creates a new DefaultEventProcessor +func NewDefaultEventProcessor(logger *slog.Logger) *DefaultEventProcessor { + return &DefaultEventProcessor{ + logger: logger, + } +} + +// Process handles event processing with basic logging +func (p *DefaultEventProcessor) Process(ctx context.Context, event *domain.OutboxEvent) error { + // Parse event payload + var payload map[string]interface{} + if err := json.Unmarshal([]byte(event.Payload), &payload); err != nil { + return err + } + + // Handle different event types + switch event.EventType { + case "user.created": + if p.logger != nil { + p.logger.Info("user created event", + slog.Any("payload", payload), + ) + } + // In a real application, you might publish this to a message queue, + // send notifications, update cache, etc. + default: + if p.logger != nil { + p.logger.Warn("unknown event type", slog.String("event_type", event.EventType)) + } + } + + return nil +} diff --git a/internal/worker/event_worker_test.go b/internal/outbox/usecase/outbox_usecase_test.go similarity index 70% rename from internal/worker/event_worker_test.go rename to internal/outbox/usecase/outbox_usecase_test.go index 6b94e8f..f7ba4da 100644 --- a/internal/worker/event_worker_test.go +++ b/internal/outbox/usecase/outbox_usecase_test.go @@ -1,4 +1,4 @@ -package worker +package usecase import ( "context" @@ -27,7 +27,7 @@ func (m *MockTxManager) WithTx(ctx context.Context, fn func(ctx context.Context) return fn(ctx) } -// MockOutboxEventRepository is a mock implementation of repository.OutboxEventRepository +// MockOutboxEventRepository is a mock implementation of OutboxEventRepository type MockOutboxEventRepository struct { mock.Mock } @@ -53,7 +53,17 @@ func (m *MockOutboxEventRepository) Update(ctx context.Context, event *domain.Ou return args.Error(0) } -func TestNewEventWorker(t *testing.T) { +// MockEventProcessor is a mock implementation of EventProcessor +type MockEventProcessor struct { + mock.Mock +} + +func (m *MockEventProcessor) Process(ctx context.Context, event *domain.OutboxEvent) error { + args := m.Called(ctx, event) + return args.Error(0) +} + +func TestNewOutboxUseCase(t *testing.T) { config := Config{ Interval: 5 * time.Second, BatchSize: 10, @@ -62,16 +72,17 @@ func TestNewEventWorker(t *testing.T) { } txManager := &MockTxManager{} outboxRepo := &MockOutboxEventRepository{} + eventProcessor := &MockEventProcessor{} - worker := NewEventWorker(config, txManager, outboxRepo, nil) + uc := NewOutboxUseCase(config, txManager, outboxRepo, eventProcessor, nil) - assert.NotNil(t, worker) - assert.Equal(t, config.Interval, worker.config.Interval) - assert.Equal(t, config.BatchSize, worker.config.BatchSize) - assert.Equal(t, config.MaxRetries, worker.config.MaxRetries) + assert.NotNil(t, uc) + assert.Equal(t, config.Interval, uc.config.Interval) + assert.Equal(t, config.BatchSize, uc.config.BatchSize) + assert.Equal(t, config.MaxRetries, uc.config.MaxRetries) } -func TestEventWorker_Start_ContextCancellation(t *testing.T) { +func TestOutboxUseCase_Start_ContextCancellation(t *testing.T) { config := Config{ Interval: 100 * time.Millisecond, BatchSize: 10, @@ -80,20 +91,21 @@ func TestEventWorker_Start_ContextCancellation(t *testing.T) { } txManager := &MockTxManager{} outboxRepo := &MockOutboxEventRepository{} + eventProcessor := &MockEventProcessor{} - worker := NewEventWorker(config, txManager, outboxRepo, nil) + uc := NewOutboxUseCase(config, txManager, outboxRepo, eventProcessor, nil) ctx, cancel := context.WithCancel(context.Background()) // Cancel context immediately cancel() - err := worker.Start(ctx) + err := uc.Start(ctx) assert.Error(t, err) assert.Equal(t, context.Canceled, err) } -func TestEventWorker_ProcessEvents_Success(t *testing.T) { +func TestOutboxUseCase_ProcessEvents_Success(t *testing.T) { config := Config{ Interval: 5 * time.Second, BatchSize: 10, @@ -102,8 +114,9 @@ func TestEventWorker_ProcessEvents_Success(t *testing.T) { } txManager := &MockTxManager{} outboxRepo := &MockOutboxEventRepository{} + eventProcessor := &MockEventProcessor{} - worker := NewEventWorker(config, txManager, outboxRepo, nil) + uc := NewOutboxUseCase(config, txManager, outboxRepo, eventProcessor, nil) ctx := context.Background() uuid1 := uuid.Must(uuid.NewV7()) @@ -128,18 +141,21 @@ func TestEventWorker_ProcessEvents_Success(t *testing.T) { // Setup expectations txManager.On("WithTx", ctx, mock.AnythingOfType("func(context.Context) error")).Return(nil) outboxRepo.On("GetPendingEvents", ctx, config.BatchSize).Return(events, nil) + eventProcessor.On("Process", ctx, events[0]).Return(nil) + eventProcessor.On("Process", ctx, events[1]).Return(nil) outboxRepo.On("Update", ctx, mock.MatchedBy(func(e *domain.OutboxEvent) bool { return e.Status == domain.OutboxEventStatusProcessed && e.ProcessedAt != nil })).Return(nil).Times(2) - err := worker.processEvents(ctx) + err := uc.ProcessEvents(ctx) assert.NoError(t, err) txManager.AssertExpectations(t) outboxRepo.AssertExpectations(t) + eventProcessor.AssertExpectations(t) } -func TestEventWorker_ProcessEvents_NoEvents(t *testing.T) { +func TestOutboxUseCase_ProcessEvents_NoEvents(t *testing.T) { config := Config{ Interval: 5 * time.Second, BatchSize: 10, @@ -148,8 +164,9 @@ func TestEventWorker_ProcessEvents_NoEvents(t *testing.T) { } txManager := &MockTxManager{} outboxRepo := &MockOutboxEventRepository{} + eventProcessor := &MockEventProcessor{} - worker := NewEventWorker(config, txManager, outboxRepo, nil) + uc := NewOutboxUseCase(config, txManager, outboxRepo, eventProcessor, nil) ctx := context.Background() emptyEvents := []*domain.OutboxEvent{} @@ -158,14 +175,14 @@ func TestEventWorker_ProcessEvents_NoEvents(t *testing.T) { txManager.On("WithTx", ctx, mock.AnythingOfType("func(context.Context) error")).Return(nil) outboxRepo.On("GetPendingEvents", ctx, config.BatchSize).Return(emptyEvents, nil) - err := worker.processEvents(ctx) + err := uc.ProcessEvents(ctx) assert.NoError(t, err) txManager.AssertExpectations(t) outboxRepo.AssertExpectations(t) } -func TestEventWorker_ProcessEvents_GetPendingError(t *testing.T) { +func TestOutboxUseCase_ProcessEvents_GetPendingError(t *testing.T) { config := Config{ Interval: 5 * time.Second, BatchSize: 10, @@ -174,8 +191,9 @@ func TestEventWorker_ProcessEvents_GetPendingError(t *testing.T) { } txManager := &MockTxManager{} outboxRepo := &MockOutboxEventRepository{} + eventProcessor := &MockEventProcessor{} - worker := NewEventWorker(config, txManager, outboxRepo, nil) + uc := NewOutboxUseCase(config, txManager, outboxRepo, eventProcessor, nil) ctx := context.Background() getError := errors.New("database error") @@ -184,7 +202,7 @@ func TestEventWorker_ProcessEvents_GetPendingError(t *testing.T) { txManager.On("WithTx", ctx, mock.AnythingOfType("func(context.Context) error")).Return(nil) outboxRepo.On("GetPendingEvents", ctx, config.BatchSize).Return(nil, getError) - err := worker.processEvents(ctx) + err := uc.ProcessEvents(ctx) assert.Error(t, err) assert.Contains(t, err.Error(), "database error") @@ -192,7 +210,7 @@ func TestEventWorker_ProcessEvents_GetPendingError(t *testing.T) { outboxRepo.AssertExpectations(t) } -func TestEventWorker_ProcessEvents_InvalidJSON(t *testing.T) { +func TestOutboxUseCase_ProcessEvents_ProcessorError(t *testing.T) { config := Config{ Interval: 5 * time.Second, BatchSize: 10, @@ -201,8 +219,9 @@ func TestEventWorker_ProcessEvents_InvalidJSON(t *testing.T) { } txManager := &MockTxManager{} outboxRepo := &MockOutboxEventRepository{} + eventProcessor := &MockEventProcessor{} - worker := NewEventWorker(config, txManager, outboxRepo, nil) + uc := NewOutboxUseCase(config, txManager, outboxRepo, eventProcessor, nil) ctx := context.Background() uuid1 := uuid.Must(uuid.NewV7()) @@ -210,27 +229,31 @@ func TestEventWorker_ProcessEvents_InvalidJSON(t *testing.T) { { ID: uuid1, EventType: "user.created", - Payload: `invalid json`, + Payload: `{"user_id": 1}`, Status: domain.OutboxEventStatusPending, Retries: 0, }, } + processingError := errors.New("processing failed") + // Setup expectations txManager.On("WithTx", ctx, mock.AnythingOfType("func(context.Context) error")).Return(nil) outboxRepo.On("GetPendingEvents", ctx, config.BatchSize).Return(events, nil) + eventProcessor.On("Process", ctx, events[0]).Return(processingError) outboxRepo.On("Update", ctx, mock.MatchedBy(func(e *domain.OutboxEvent) bool { return e.ID == uuid1 && e.Retries == 1 && e.LastError != nil })).Return(nil) - err := worker.processEvents(ctx) + err := uc.ProcessEvents(ctx) - assert.NoError(t, err) // processEvents should not return error, just log and update event + assert.NoError(t, err) // ProcessEvents should not return error, just log and update event txManager.AssertExpectations(t) outboxRepo.AssertExpectations(t) + eventProcessor.AssertExpectations(t) } -func TestEventWorker_ProcessEvents_MaxRetriesReached(t *testing.T) { +func TestOutboxUseCase_ProcessEvents_MaxRetriesReached(t *testing.T) { config := Config{ Interval: 5 * time.Second, BatchSize: 10, @@ -239,8 +262,9 @@ func TestEventWorker_ProcessEvents_MaxRetriesReached(t *testing.T) { } txManager := &MockTxManager{} outboxRepo := &MockOutboxEventRepository{} + eventProcessor := &MockEventProcessor{} - worker := NewEventWorker(config, txManager, outboxRepo, nil) + uc := NewOutboxUseCase(config, txManager, outboxRepo, eventProcessor, nil) ctx := context.Background() uuid1 := uuid.Must(uuid.NewV7()) @@ -248,15 +272,18 @@ func TestEventWorker_ProcessEvents_MaxRetriesReached(t *testing.T) { { ID: uuid1, EventType: "user.created", - Payload: `invalid json`, + Payload: `{"user_id": 1}`, Status: domain.OutboxEventStatusPending, Retries: 2, // Will become 3 after this attempt }, } + processingError := errors.New("processing failed") + // Setup expectations txManager.On("WithTx", ctx, mock.AnythingOfType("func(context.Context) error")).Return(nil) outboxRepo.On("GetPendingEvents", ctx, config.BatchSize).Return(events, nil) + eventProcessor.On("Process", ctx, events[0]).Return(processingError) outboxRepo.On("Update", ctx, mock.MatchedBy(func(e *domain.OutboxEvent) bool { return e.ID == uuid1 && e.Retries == 3 && @@ -264,14 +291,15 @@ func TestEventWorker_ProcessEvents_MaxRetriesReached(t *testing.T) { e.LastError != nil })).Return(nil) - err := worker.processEvents(ctx) + err := uc.ProcessEvents(ctx) assert.NoError(t, err) txManager.AssertExpectations(t) outboxRepo.AssertExpectations(t) + eventProcessor.AssertExpectations(t) } -func TestEventWorker_ProcessEvents_UpdateError(t *testing.T) { +func TestOutboxUseCase_ProcessEvents_UpdateError(t *testing.T) { config := Config{ Interval: 5 * time.Second, BatchSize: 10, @@ -280,8 +308,9 @@ func TestEventWorker_ProcessEvents_UpdateError(t *testing.T) { } txManager := &MockTxManager{} outboxRepo := &MockOutboxEventRepository{} + eventProcessor := &MockEventProcessor{} - worker := NewEventWorker(config, txManager, outboxRepo, nil) + uc := NewOutboxUseCase(config, txManager, outboxRepo, eventProcessor, nil) ctx := context.Background() uuid1 := uuid.Must(uuid.NewV7()) @@ -300,27 +329,20 @@ func TestEventWorker_ProcessEvents_UpdateError(t *testing.T) { // Setup expectations txManager.On("WithTx", ctx, mock.AnythingOfType("func(context.Context) error")).Return(nil) outboxRepo.On("GetPendingEvents", ctx, config.BatchSize).Return(events, nil) + eventProcessor.On("Process", ctx, events[0]).Return(nil) outboxRepo.On("Update", ctx, mock.AnythingOfType("*domain.OutboxEvent")).Return(updateError) - err := worker.processEvents(ctx) + err := uc.ProcessEvents(ctx) assert.Error(t, err) assert.Contains(t, err.Error(), "update failed") txManager.AssertExpectations(t) outboxRepo.AssertExpectations(t) + eventProcessor.AssertExpectations(t) } -func TestEventWorker_ProcessEvent_Success(t *testing.T) { - config := Config{ - Interval: 5 * time.Second, - BatchSize: 10, - MaxRetries: 3, - RetryInterval: 1 * time.Minute, - } - txManager := &MockTxManager{} - outboxRepo := &MockOutboxEventRepository{} - - worker := NewEventWorker(config, txManager, outboxRepo, nil) +func TestDefaultEventProcessor_Process_Success(t *testing.T) { + processor := NewDefaultEventProcessor(nil) ctx := context.Background() uuid1 := uuid.Must(uuid.NewV7()) @@ -332,22 +354,13 @@ func TestEventWorker_ProcessEvent_Success(t *testing.T) { Retries: 0, } - err := worker.processEvent(ctx, event) + err := processor.Process(ctx, event) assert.NoError(t, err) } -func TestEventWorker_ProcessEvent_UnknownEventType(t *testing.T) { - config := Config{ - Interval: 5 * time.Second, - BatchSize: 10, - MaxRetries: 3, - RetryInterval: 1 * time.Minute, - } - txManager := &MockTxManager{} - outboxRepo := &MockOutboxEventRepository{} - - worker := NewEventWorker(config, txManager, outboxRepo, nil) +func TestDefaultEventProcessor_Process_UnknownEventType(t *testing.T) { + processor := NewDefaultEventProcessor(nil) ctx := context.Background() uuid1 := uuid.Must(uuid.NewV7()) @@ -359,22 +372,13 @@ func TestEventWorker_ProcessEvent_UnknownEventType(t *testing.T) { Retries: 0, } - err := worker.processEvent(ctx, event) + err := processor.Process(ctx, event) assert.NoError(t, err) // Unknown events are just logged as warning } -func TestEventWorker_ProcessEvent_InvalidJSON(t *testing.T) { - config := Config{ - Interval: 5 * time.Second, - BatchSize: 10, - MaxRetries: 3, - RetryInterval: 1 * time.Minute, - } - txManager := &MockTxManager{} - outboxRepo := &MockOutboxEventRepository{} - - worker := NewEventWorker(config, txManager, outboxRepo, nil) +func TestDefaultEventProcessor_Process_InvalidJSON(t *testing.T) { + processor := NewDefaultEventProcessor(nil) ctx := context.Background() uuid1 := uuid.Must(uuid.NewV7()) @@ -386,7 +390,7 @@ func TestEventWorker_ProcessEvent_InvalidJSON(t *testing.T) { Retries: 0, } - err := worker.processEvent(ctx, event) + err := processor.Process(ctx, event) assert.Error(t, err) } diff --git a/internal/worker/event_worker.go b/internal/worker/event_worker.go deleted file mode 100644 index 0b7dcef..0000000 --- a/internal/worker/event_worker.go +++ /dev/null @@ -1,169 +0,0 @@ -// Package worker provides background workers for processing asynchronous tasks. -package worker - -import ( - "context" - "encoding/json" - "log/slog" - "time" - - "github.com/allisson/go-project-template/internal/database" - "github.com/allisson/go-project-template/internal/outbox/domain" -) - -// Config holds worker configuration -type Config struct { - Interval time.Duration - BatchSize int - MaxRetries int - RetryInterval time.Duration -} - -// OutboxEventRepository interface defines outbox event repository operations -type OutboxEventRepository interface { - Create(ctx context.Context, event *domain.OutboxEvent) error - GetPendingEvents(ctx context.Context, limit int) ([]*domain.OutboxEvent, error) - Update(ctx context.Context, event *domain.OutboxEvent) error -} - -// EventWorker processes outbox events -type EventWorker struct { - config Config - txManager database.TxManager - outboxRepo OutboxEventRepository - logger *slog.Logger -} - -// NewEventWorker creates a new EventWorker -func NewEventWorker( - config Config, - txManager database.TxManager, - outboxRepo OutboxEventRepository, - logger *slog.Logger, -) *EventWorker { - return &EventWorker{ - config: config, - txManager: txManager, - outboxRepo: outboxRepo, - logger: logger, - } -} - -// Start starts the worker -func (w *EventWorker) Start(ctx context.Context) error { - if w.logger != nil { - w.logger.Info("starting event worker", - slog.Duration("interval", w.config.Interval), - slog.Int("batch_size", w.config.BatchSize), - ) - } - - ticker := time.NewTicker(w.config.Interval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - if w.logger != nil { - w.logger.Info("stopping event worker") - } - return ctx.Err() - case <-ticker.C: - if err := w.processEvents(ctx); err != nil { - if w.logger != nil { - w.logger.Error("failed to process events", slog.Any("error", err)) - } - } - } - } -} - -// processEvents retrieves and processes pending events from the outbox in a transaction. -func (w *EventWorker) processEvents(ctx context.Context) error { - return w.txManager.WithTx(ctx, func(ctx context.Context) error { - // Get pending events - events, err := w.outboxRepo.GetPendingEvents(ctx, w.config.BatchSize) - if err != nil { - return err - } - - if len(events) == 0 { - return nil - } - - if w.logger != nil { - w.logger.Info("processing events", slog.Int("count", len(events))) - } - - for _, event := range events { - if err := w.processEvent(ctx, event); err != nil { - if w.logger != nil { - w.logger.Error("failed to process event", - slog.String("event_id", event.ID.String()), - slog.String("event_type", event.EventType), - slog.Any("error", err), - ) - } - - // Update event as failed - event.Retries++ - errorMsg := err.Error() - event.LastError = &errorMsg - - if event.Retries >= w.config.MaxRetries { - event.Status = domain.OutboxEventStatusFailed - } - - if err := w.outboxRepo.Update(ctx, event); err != nil { - return err - } - continue - } - - // Mark event as processed - now := time.Now() - event.Status = domain.OutboxEventStatusProcessed - event.ProcessedAt = &now - - if err := w.outboxRepo.Update(ctx, event); err != nil { - return err - } - } - - return nil - }) -} - -// processEvent handles a single outbox event and implements the event processing logic. -func (w *EventWorker) processEvent(ctx context.Context, event *domain.OutboxEvent) error { - if w.logger != nil { - w.logger.Info("processing event", - slog.String("event_id", event.ID.String()), - slog.String("event_type", event.EventType), - ) - } - - // Parse event payload - var payload map[string]interface{} - if err := json.Unmarshal([]byte(event.Payload), &payload); err != nil { - return err - } - - // Handle different event types - switch event.EventType { - case "user.created": - if w.logger != nil { - w.logger.Info("user created event", - slog.Any("payload", payload), - ) - } - // In a real application, you might publish this to a message queue, - // send notifications, update cache, etc. - default: - if w.logger != nil { - w.logger.Warn("unknown event type", slog.String("event_type", event.EventType)) - } - } - - return nil -}