Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
strategy:
matrix:
os: [Ubuntu]
go-version: ["1.25.x"]
go-version: ["1.26.x"]
runs-on: ${{ matrix.os }}-latest
permissions:
contents: read # for golangci-lint-action
Expand Down
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.3.2] - 2026-02-26

### Added

- `workflows`: Added OTEL metrics for the task runner, tracking `task.executed.count`, `task.computed.count`, `task.failed.count`, `task.input.size` and `task.execution.duration`.

## [0.3.1] - 2026-02-25

### Fixed
Expand Down Expand Up @@ -61,7 +67,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added support for Tilebox Observability, including logging and tracing helpers.
- Added examples for using the library.

[Unreleased]: https://github.com/tilebox/tilebox-go/compare/v0.3.1...HEAD
[Unreleased]: https://github.com/tilebox/tilebox-go/compare/v0.3.2...HEAD
[0.3.2]: https://github.com/tilebox/tilebox-go/compare/v0.3.1...v0.3.2
[0.3.1]: https://github.com/tilebox/tilebox-go/compare/v0.3.0...v0.3.1
[0.3.0]: https://github.com/tilebox/tilebox-go/compare/v0.2.0...v0.3.0
[0.2.1]: https://github.com/tilebox/tilebox-go/compare/v0.2.0...v0.2.1
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 // indirect
go.opentelemetry.io/otel/log v0.16.0 // indirect
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
golang.org/x/net v0.50.0 // indirect
golang.org/x/net v0.51.0 // indirect
golang.org/x/sys v0.41.0 // indirect
golang.org/x/text v0.34.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260223185530-2f722ef697dc // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.50.0 h1:ucWh9eiCGyDR3vtzso0WMQinm2Dnt8cFMuQa9K33J60=
golang.org/x/net v0.50.0/go.mod h1:UgoSli3F/pBgdJBHCTc+tp3gmrU4XswgGRgtnwWTfyM=
golang.org/x/net v0.51.0 h1:94R/GTO7mt3/4wIKpcR5gkGmRLOuE/2hNGeWq/GBIFo=
golang.org/x/net v0.51.0/go.mod h1:aamm+2QF5ogm02fjy5Bb7CQ0WMt1/WVM7FtyaTLlA9Y=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down
114 changes: 95 additions & 19 deletions workflows/v1/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/tilebox/tilebox-go/workflows/v1/runner"
"github.com/tilebox/tilebox-go/workflows/v1/subtask"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/codes"
Expand All @@ -48,17 +49,87 @@ const (
fallbackJitterInterval = 5 * time.Second
)

const (
UnitSeconds = "s"
UnitDimensionless = "1"
UnitBytes = "By"
)

type taskRunnerMetrics struct {
tasksExecutedMetric metric.Int64Counter
tasksComputedMetric metric.Int64Counter
tasksFailedMetric metric.Int64Counter

taskInputSizeMetric metric.Int64Histogram
taskExecutionDurationMetric metric.Float64Histogram
}

func newTaskRunnerMetrics(meter metric.Meter) (*taskRunnerMetrics, error) {
tasksExecutedMetric, err := meter.Int64Counter(
"task.executed.count",
metric.WithDescription("Number of tasks executed"),
metric.WithUnit(UnitDimensionless),
)
if err != nil {
return nil, fmt.Errorf("failed to create task count metric: %w", err)
}

tasksComputedMetric, err := meter.Int64Counter(
"task.computed.count",
metric.WithDescription("Number of tasks computed"),
metric.WithUnit(UnitDimensionless),
)
if err != nil {
return nil, fmt.Errorf("failed to create task computed metric: %w", err)
}

tasksFailedMetric, err := meter.Int64Counter(
"task.failed.count",
metric.WithDescription("Number of tasks failed"),
metric.WithUnit(UnitDimensionless),
)
if err != nil {
return nil, fmt.Errorf("failed to create task failed metric: %w", err)
}

taskArgsSizeMetric, err := meter.Int64Histogram(
"task.input.size",
metric.WithDescription("Task arguments size"),
metric.WithUnit(UnitBytes),
)
if err != nil {
return nil, fmt.Errorf("failed to create task input size metric: %w", err)
}

taskExecutionDurationMetric, err := meter.Float64Histogram(
"task.execution.duration",
metric.WithDescription("Task execution duration"),
metric.WithUnit(UnitSeconds),
)
if err != nil {
return nil, fmt.Errorf("failed to create task duration metric: %w", err)
}

return &taskRunnerMetrics{
tasksExecutedMetric: tasksExecutedMetric,
tasksComputedMetric: tasksComputedMetric,
tasksFailedMetric: tasksFailedMetric,
taskInputSizeMetric: taskArgsSizeMetric,
taskExecutionDurationMetric: taskExecutionDurationMetric,
}, nil
}

// TaskRunner executes tasks.
//
// Documentation: https://docs.tilebox.com/workflows/concepts/task-runners
type TaskRunner struct {
service TaskService
taskDefinitions map[taskIdentifier]ExecutableTask

cluster string
tracer trace.Tracer
logger *slog.Logger
taskDurationMetric metric.Float64Histogram
cluster string
tracer trace.Tracer
logger *slog.Logger
metrics *taskRunnerMetrics
}

func newTaskRunner(ctx context.Context, service TaskService, clusterClient ClusterClient, tracer trace.Tracer, options ...runner.Option) (*TaskRunner, error) {
Expand All @@ -76,25 +147,19 @@ func newTaskRunner(ctx context.Context, service TaskService, clusterClient Clust
return nil, fmt.Errorf("failed to get cluster: %w", err)
}

meter := opts.MeterProvider.Meter(otelMeterName)

taskDurationMetric, err := meter.Float64Histogram(
"task.execution.duration",
metric.WithDescription("Task execution duration"),
metric.WithUnit("s"),
)
metrics, err := newTaskRunnerMetrics(opts.MeterProvider.Meter(otelMeterName))
if err != nil {
return nil, fmt.Errorf("failed to create task execution duration metric: %w", err)
return nil, fmt.Errorf("failed to create task runner metrics: %w", err)
}

return &TaskRunner{
service: service,
taskDefinitions: make(map[taskIdentifier]ExecutableTask),

cluster: cluster.Slug,
tracer: tracer,
logger: opts.Logger,
taskDurationMetric: taskDurationMetric,
cluster: cluster.Slug,
tracer: tracer,
logger: opts.Logger,
metrics: metrics,
}, nil
}

Expand Down Expand Up @@ -329,33 +394,44 @@ func (t *TaskRunner) executeTask(ctx context.Context, task *workflowsv1.Task) (*
slog.Time("start_time", beforeTime),
)

taskMetricAttributes := metric.WithAttributes(attribute.String("task_identifier", identifier.Name()), attribute.String("task_version", identifier.Version()))

defer func() {
if r := recover(); r != nil {
// recover from panics during task executions, so we can still report the error to the server and continue
// with other tasks
log.ErrorContext(ctx, "task execution failed", slog.String("error", "panic"), slog.Int64("retry_attempt", task.GetRetryCount()))
taskExecutionContext = nil
err = fmt.Errorf("task panicked: %v", r)

// also instrument the panic as a failed task execution
t.metrics.tasksFailedMetric.Add(ctx, 1, taskMetricAttributes)
t.metrics.taskExecutionDurationMetric.Record(ctx, time.Since(beforeTime).Seconds(), taskMetricAttributes, metric.WithAttributes(attribute.String("state", "failed")))
}
}()

t.metrics.taskInputSizeMetric.Record(ctx, int64(len(task.GetInput())), taskMetricAttributes)
t.metrics.tasksExecutedMetric.Add(ctx, 1, taskMetricAttributes)

executionContext := t.withTaskExecutionContext(ctx, task)
err = taskStruct.Execute(executionContext)

executionTime := time.Since(beforeTime)

log = log.With(
slog.Duration("execution_time", executionTime),
slog.String("execution_time_human", roundDuration(executionTime, 2).String()),
)

if err != nil {
t.metrics.tasksFailedMetric.Add(ctx, 1, taskMetricAttributes)
t.metrics.taskExecutionDurationMetric.Record(ctx, executionTime.Seconds(), taskMetricAttributes, metric.WithAttributes(attribute.String("state", "failed")))

log.ErrorContext(ctx, "task execution failed", slog.Any("error", err), slog.Int64("retry_attempt", task.GetRetryCount()))
return getTaskExecutionContext(executionContext), fmt.Errorf("failed to execute task: %w", err)
}

// record the time it took to run a successful task
t.taskDurationMetric.Record(ctx, executionTime.Seconds())
t.metrics.tasksComputedMetric.Add(ctx, 1, taskMetricAttributes)
t.metrics.taskExecutionDurationMetric.Record(ctx, executionTime.Seconds(), taskMetricAttributes, metric.WithAttributes(attribute.String("state", "computed")))

return getTaskExecutionContext(executionContext), nil
})
Expand Down