Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,15 @@ go-project-template/
│ ├── httputil/ # HTTP utilities (JSON responses, error mapping)
│ ├── validation/ # Custom validation rules
│ ├── testutil/ # Test utilities
│ ├── worker/ # Background workers
│ ├── user/ # User domain module
│ │ ├── domain/ # User entities and domain errors
│ │ ├── usecase/ # User business logic
│ │ ├── repository/ # User data access
│ │ └── http/ # User HTTP handlers and DTOs
│ └── outbox/ # Outbox domain module
│ ├── domain/ # Outbox entities and domain errors
│ ├── usecase/ # Outbox event processing logic
│ └── repository/ # Outbox data access
├── migrations/
│ ├── postgresql/ # PostgreSQL migrations
│ └── mysql/ # MySQL migrations
Expand Down Expand Up @@ -126,7 +128,7 @@ The project uses real PostgreSQL and MySQL databases for testing instead of mock
```bash
make build # Build the application
make run-server # Run HTTP server
make run-worker # Run background worker
make run-worker # Run outbox event processor
make run-migrate # Run database migrations
make lint # Run linter with auto-fix
make clean # Clean build artifacts
Expand Down Expand Up @@ -192,10 +194,11 @@ All entities use UUIDv7 for primary keys:

### Transactional Outbox Pattern

Ensures reliable event delivery:
Ensures reliable event delivery using a use case-based approach:
1. Business operation and event stored in same transaction
2. Background worker processes pending events
2. Outbox use case processes pending events with configurable retry logic
3. Guarantees at-least-once delivery
4. Extensible event processing via the `EventProcessor` interface

## 🐳 Docker

Expand All @@ -213,6 +216,8 @@ make docker-run-worker
make docker-run-migrate
```

The worker command runs the outbox event processor, which handles asynchronous event processing using the transactional outbox pattern.

## 🔧 Configuration

All configuration is done via environment variables. Create a `.env` file in your project root:
Expand Down
14 changes: 7 additions & 7 deletions cmd/app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func runMigrations() error {
return nil
}

// runWorker starts the event worker with graceful shutdown support.
// runWorker starts the outbox event processor with graceful shutdown support.
func runWorker(ctx context.Context) error {
// Load configuration
cfg := config.Load()
Expand All @@ -166,21 +166,21 @@ func runWorker(ctx context.Context) error {

// Get logger from container
logger := container.Logger()
logger.Info("starting worker", slog.String("version", "1.0.0"))
logger.Info("starting outbox event processor", slog.String("version", "1.0.0"))

// Ensure cleanup on exit
defer closeContainer(container, logger)

// Get event worker from container (this initializes all dependencies)
eventWorker, err := container.EventWorker()
// Get outbox use case from container (this initializes all dependencies)
outboxUseCase, err := container.OutboxUseCase()
if err != nil {
return fmt.Errorf("failed to initialize event worker: %w", err)
return fmt.Errorf("failed to initialize outbox use case: %w", err)
}

// Setup graceful shutdown
ctx, cancel := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
defer cancel()

// Start worker
return eventWorker.Start(ctx)
// Start outbox event processor
return outboxUseCase.Start(ctx)
}
54 changes: 47 additions & 7 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,9 @@ internal/
│ │ └── mapper.go
│ └── user_handler.go
├── outbox/ # Outbox domain module
│ ├── domain/
│ └── repository/
│ ├── domain/ # Outbox entities and domain errors
│ ├── usecase/ # Outbox event processing logic
│ └── repository/ # Outbox data access
└── {new-domain}/ # Easy to add new domains
```

Expand All @@ -276,8 +277,8 @@ The DI container (`internal/app/`) manages all application components with:
Container
├── Infrastructure (Database, Logger)
├── Repositories (User, Outbox)
├── Use Cases (User)
└── Presentation (HTTP Server, Worker)
├── Use Cases (User, Outbox)
└── Presentation (HTTP Server)
```

**Example**:
Expand Down Expand Up @@ -372,17 +373,19 @@ The transaction is automatically injected into the context and used by repositor

## 📤 Transactional Outbox Pattern

The project demonstrates the transactional outbox pattern for reliable event delivery:
The project demonstrates the transactional outbox pattern for reliable event delivery using a use case-based architecture:

1. 📝 Business operation (e.g., user creation) is executed
2. 📬 Event is stored in outbox table in **same transaction**
3. 🚀 Background worker picks up pending events
3. 🚀 Outbox use case processes pending events with configurable retry logic
4. ✅ Events are marked as processed or failed
5. 🔌 Extensible via the `EventProcessor` interface for custom event handling

**Benefits**:
- 🔒 **Guaranteed delivery** - Events never lost due to transaction rollback
- 🔁 **At-least-once delivery** - Events processed at least once
- 🎯 **Consistency** - Business operations and events always in sync
- 🔧 **Extensibility** - Custom event processors for different event types

**Example (User Registration)**:
```go
Expand All @@ -403,4 +406,41 @@ err = uc.txManager.WithTx(ctx, func(ctx context.Context) error {
})
```

The worker (`internal/worker/event_worker.go`) processes these events asynchronously.
**Processing Events**:

The outbox use case (`internal/outbox/usecase/outbox_usecase.go`) processes these events asynchronously:

```go
// Start the outbox event processor
outboxUseCase, err := container.OutboxUseCase()
if err != nil {
return fmt.Errorf("failed to initialize outbox use case: %w", err)
}

// Processes events in background
err = outboxUseCase.Start(ctx)
```

**Custom Event Processing**:

You can create custom event processors by implementing the `EventProcessor` interface:

```go
type CustomEventProcessor struct {
logger *slog.Logger
// Add your dependencies here (e.g., message queue client)
}

func (p *CustomEventProcessor) Process(ctx context.Context, event *domain.OutboxEvent) error {
// Your custom event processing logic
switch event.EventType {
case "user.created":
// Send to message queue, send notification, etc.
return p.publishToQueue(ctx, event)
default:
return fmt.Errorf("unknown event type: %s", event.EventType)
}
}
```

Then register it in the DI container when initializing the outbox use case.
2 changes: 1 addition & 1 deletion docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ The binary will be created at `./bin/app`.

```bash
make run-server # Build and run HTTP server
make run-worker # Build and run background worker
make run-worker # Build and run outbox event processor
make run-migrate # Build and run database migrations
```

Expand Down
6 changes: 4 additions & 2 deletions docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,13 @@ curl http://localhost:8080/ready

### Start the Background Worker

In another terminal, start the worker process:
In another terminal, start the outbox event processor:

```bash
make run-worker
```

The worker processes outbox events from the database.
The outbox event processor handles asynchronous event processing from the outbox table using the transactional outbox pattern.

## 🧪 Testing the API

Expand Down Expand Up @@ -296,6 +296,8 @@ The binary supports three commands via `urfave/cli`:
./bin/app worker
```

This starts the outbox event processor which handles asynchronous event processing using the transactional outbox pattern.

## 📚 Next Steps

Now that you have the application running, you can:
Expand Down
57 changes: 29 additions & 28 deletions internal/app/di.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"github.com/allisson/go-project-template/internal/database"
"github.com/allisson/go-project-template/internal/http"
outboxRepository "github.com/allisson/go-project-template/internal/outbox/repository"
outboxUsecase "github.com/allisson/go-project-template/internal/outbox/usecase"
userRepository "github.com/allisson/go-project-template/internal/user/repository"
userUsecase "github.com/allisson/go-project-template/internal/user/usecase"
"github.com/allisson/go-project-template/internal/worker"
)

// Container holds all application dependencies and provides methods to access them.
Expand All @@ -36,23 +36,23 @@ type Container struct {
outboxRepo userUsecase.OutboxEventRepository

// Use Cases
userUseCase userUsecase.UseCase
userUseCase userUsecase.UseCase
outboxUseCase outboxUsecase.UseCase

// Servers and Workers
httpServer *http.Server
eventWorker *worker.EventWorker
httpServer *http.Server

// Initialization flags and mutex for thread-safety
mu sync.Mutex
loggerInit sync.Once
dbInit sync.Once
txManagerInit sync.Once
userRepoInit sync.Once
outboxRepoInit sync.Once
userUseCaseInit sync.Once
httpServerInit sync.Once
eventWorkerInit sync.Once
initErrors map[string]error
mu sync.Mutex
loggerInit sync.Once
dbInit sync.Once
txManagerInit sync.Once
userRepoInit sync.Once
outboxRepoInit sync.Once
userUseCaseInit sync.Once
outboxUseCaseInit sync.Once
httpServerInit sync.Once
initErrors map[string]error
}

// NewContainer creates a new dependency injection container with the provided configuration.
Expand Down Expand Up @@ -187,22 +187,22 @@ func (c *Container) HTTPServer() (*http.Server, error) {
return c.httpServer, nil
}

// EventWorker returns the event worker instance.
func (c *Container) EventWorker() (*worker.EventWorker, error) {
// OutboxUseCase returns the outbox use case instance.
func (c *Container) OutboxUseCase() (outboxUsecase.UseCase, error) {
var err error
c.eventWorkerInit.Do(func() {
c.eventWorker, err = c.initEventWorker()
c.outboxUseCaseInit.Do(func() {
c.outboxUseCase, err = c.initOutboxUseCase()
if err != nil {
c.initErrors["eventWorker"] = err
c.initErrors["outboxUseCase"] = err
}
})
if err != nil {
return nil, err
}
if storedErr, exists := c.initErrors["eventWorker"]; exists {
if storedErr, exists := c.initErrors["outboxUseCase"]; exists {
return nil, storedErr
}
return c.eventWorker, nil
return c.outboxUseCase, nil
}

// Shutdown performs cleanup of all initialized resources.
Expand Down Expand Up @@ -362,28 +362,29 @@ func (c *Container) initHTTPServer() (*http.Server, error) {
return server, nil
}

// initEventWorker creates the event worker with all its dependencies.
func (c *Container) initEventWorker() (*worker.EventWorker, error) {
// initOutboxUseCase creates the outbox use case with all its dependencies.
func (c *Container) initOutboxUseCase() (outboxUsecase.UseCase, error) {
logger := c.Logger()

txManager, err := c.TxManager()
if err != nil {
return nil, fmt.Errorf("failed to get tx manager for event worker: %w", err)
return nil, fmt.Errorf("failed to get tx manager for outbox use case: %w", err)
}

outboxRepo, err := c.OutboxRepository()
if err != nil {
return nil, fmt.Errorf("failed to get outbox repository for event worker: %w", err)
return nil, fmt.Errorf("failed to get outbox repository for outbox use case: %w", err)
}

workerConfig := worker.Config{
useCaseConfig := outboxUsecase.Config{
Interval: c.config.WorkerInterval,
BatchSize: c.config.WorkerBatchSize,
MaxRetries: c.config.WorkerMaxRetries,
RetryInterval: c.config.WorkerRetryInterval,
}

eventWorker := worker.NewEventWorker(workerConfig, txManager, outboxRepo, logger)
eventProcessor := outboxUsecase.NewDefaultEventProcessor(logger)
useCase := outboxUsecase.NewOutboxUseCase(useCaseConfig, txManager, outboxRepo, eventProcessor, logger)

return eventWorker, nil
return useCase, nil
}
Loading
Loading