diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index d1d580f..4478877 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -13,7 +13,7 @@ jobs: strategy: matrix: os: [Ubuntu] - go-version: ["1.25.x"] + go-version: ["1.26.x"] runs-on: ${{ matrix.os }}-latest permissions: contents: read # for golangci-lint-action diff --git a/CHANGELOG.md b/CHANGELOG.md index a581745..d45dc7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.3.2] - 2026-02-26 + +### Added + +- `workflows`: Added OTEL metrics for the task runner, tracking `task.executed.count`, `task.computed.count`, `task.failed.count`, `task.input.size` and `task.execution.duration`. + ## [0.3.1] - 2026-02-25 ### Fixed @@ -61,7 +67,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added support for Tilebox Observability, including logging and tracing helpers. - Added examples for using the library. -[Unreleased]: https://github.com/tilebox/tilebox-go/compare/v0.3.1...HEAD +[Unreleased]: https://github.com/tilebox/tilebox-go/compare/v0.3.2...HEAD +[0.3.2]: https://github.com/tilebox/tilebox-go/compare/v0.3.1...v0.3.2 [0.3.1]: https://github.com/tilebox/tilebox-go/compare/v0.3.0...v0.3.1 [0.3.0]: https://github.com/tilebox/tilebox-go/compare/v0.2.0...v0.3.0 [0.2.1]: https://github.com/tilebox/tilebox-go/compare/v0.2.0...v0.2.1 diff --git a/go.mod b/go.mod index 6c5fc82..4345c0f 100644 --- a/go.mod +++ b/go.mod @@ -43,7 +43,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 // indirect go.opentelemetry.io/otel/log v0.16.0 // indirect go.opentelemetry.io/proto/otlp v1.9.0 // indirect - golang.org/x/net v0.50.0 // indirect + golang.org/x/net v0.51.0 // indirect golang.org/x/sys v0.41.0 // indirect golang.org/x/text v0.34.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260223185530-2f722ef697dc // indirect diff --git a/go.sum b/go.sum index e4c48b6..07e04c6 100644 --- a/go.sum +++ b/go.sum @@ -123,8 +123,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.50.0 h1:ucWh9eiCGyDR3vtzso0WMQinm2Dnt8cFMuQa9K33J60= -golang.org/x/net v0.50.0/go.mod h1:UgoSli3F/pBgdJBHCTc+tp3gmrU4XswgGRgtnwWTfyM= +golang.org/x/net v0.51.0 h1:94R/GTO7mt3/4wIKpcR5gkGmRLOuE/2hNGeWq/GBIFo= +golang.org/x/net v0.51.0/go.mod h1:aamm+2QF5ogm02fjy5Bb7CQ0WMt1/WVM7FtyaTLlA9Y= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/workflows/v1/runner.go b/workflows/v1/runner.go index b87fe5b..f7aff88 100644 --- a/workflows/v1/runner.go +++ b/workflows/v1/runner.go @@ -25,6 +25,7 @@ import ( "github.com/tilebox/tilebox-go/workflows/v1/runner" "github.com/tilebox/tilebox-go/workflows/v1/subtask" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/trace" "google.golang.org/grpc/codes" @@ -48,6 +49,76 @@ const ( fallbackJitterInterval = 5 * time.Second ) +const ( + UnitSeconds = "s" + UnitDimensionless = "1" + UnitBytes = "By" +) + +type taskRunnerMetrics struct { + tasksExecutedMetric metric.Int64Counter + tasksComputedMetric metric.Int64Counter + tasksFailedMetric metric.Int64Counter + + taskInputSizeMetric metric.Int64Histogram + taskExecutionDurationMetric metric.Float64Histogram +} + +func newTaskRunnerMetrics(meter metric.Meter) (*taskRunnerMetrics, error) { + tasksExecutedMetric, err := meter.Int64Counter( + "task.executed.count", + metric.WithDescription("Number of tasks executed"), + metric.WithUnit(UnitDimensionless), + ) + if err != nil { + return nil, fmt.Errorf("failed to create task count metric: %w", err) + } + + tasksComputedMetric, err := meter.Int64Counter( + "task.computed.count", + metric.WithDescription("Number of tasks computed"), + metric.WithUnit(UnitDimensionless), + ) + if err != nil { + return nil, fmt.Errorf("failed to create task computed metric: %w", err) + } + + tasksFailedMetric, err := meter.Int64Counter( + "task.failed.count", + metric.WithDescription("Number of tasks failed"), + metric.WithUnit(UnitDimensionless), + ) + if err != nil { + return nil, fmt.Errorf("failed to create task failed metric: %w", err) + } + + taskArgsSizeMetric, err := meter.Int64Histogram( + "task.input.size", + metric.WithDescription("Task arguments size"), + metric.WithUnit(UnitBytes), + ) + if err != nil { + return nil, fmt.Errorf("failed to create task input size metric: %w", err) + } + + taskExecutionDurationMetric, err := meter.Float64Histogram( + "task.execution.duration", + metric.WithDescription("Task execution duration"), + metric.WithUnit(UnitSeconds), + ) + if err != nil { + return nil, fmt.Errorf("failed to create task duration metric: %w", err) + } + + return &taskRunnerMetrics{ + tasksExecutedMetric: tasksExecutedMetric, + tasksComputedMetric: tasksComputedMetric, + tasksFailedMetric: tasksFailedMetric, + taskInputSizeMetric: taskArgsSizeMetric, + taskExecutionDurationMetric: taskExecutionDurationMetric, + }, nil +} + // TaskRunner executes tasks. // // Documentation: https://docs.tilebox.com/workflows/concepts/task-runners @@ -55,10 +126,10 @@ type TaskRunner struct { service TaskService taskDefinitions map[taskIdentifier]ExecutableTask - cluster string - tracer trace.Tracer - logger *slog.Logger - taskDurationMetric metric.Float64Histogram + cluster string + tracer trace.Tracer + logger *slog.Logger + metrics *taskRunnerMetrics } func newTaskRunner(ctx context.Context, service TaskService, clusterClient ClusterClient, tracer trace.Tracer, options ...runner.Option) (*TaskRunner, error) { @@ -76,25 +147,19 @@ func newTaskRunner(ctx context.Context, service TaskService, clusterClient Clust return nil, fmt.Errorf("failed to get cluster: %w", err) } - meter := opts.MeterProvider.Meter(otelMeterName) - - taskDurationMetric, err := meter.Float64Histogram( - "task.execution.duration", - metric.WithDescription("Task execution duration"), - metric.WithUnit("s"), - ) + metrics, err := newTaskRunnerMetrics(opts.MeterProvider.Meter(otelMeterName)) if err != nil { - return nil, fmt.Errorf("failed to create task execution duration metric: %w", err) + return nil, fmt.Errorf("failed to create task runner metrics: %w", err) } return &TaskRunner{ service: service, taskDefinitions: make(map[taskIdentifier]ExecutableTask), - cluster: cluster.Slug, - tracer: tracer, - logger: opts.Logger, - taskDurationMetric: taskDurationMetric, + cluster: cluster.Slug, + tracer: tracer, + logger: opts.Logger, + metrics: metrics, }, nil } @@ -329,6 +394,8 @@ func (t *TaskRunner) executeTask(ctx context.Context, task *workflowsv1.Task) (* slog.Time("start_time", beforeTime), ) + taskMetricAttributes := metric.WithAttributes(attribute.String("task_identifier", identifier.Name()), attribute.String("task_version", identifier.Version())) + defer func() { if r := recover(); r != nil { // recover from panics during task executions, so we can still report the error to the server and continue @@ -336,26 +403,35 @@ func (t *TaskRunner) executeTask(ctx context.Context, task *workflowsv1.Task) (* log.ErrorContext(ctx, "task execution failed", slog.String("error", "panic"), slog.Int64("retry_attempt", task.GetRetryCount())) taskExecutionContext = nil err = fmt.Errorf("task panicked: %v", r) + + // also instrument the panic as a failed task execution + t.metrics.tasksFailedMetric.Add(ctx, 1, taskMetricAttributes) + t.metrics.taskExecutionDurationMetric.Record(ctx, time.Since(beforeTime).Seconds(), taskMetricAttributes, metric.WithAttributes(attribute.String("state", "failed"))) } }() + t.metrics.taskInputSizeMetric.Record(ctx, int64(len(task.GetInput())), taskMetricAttributes) + t.metrics.tasksExecutedMetric.Add(ctx, 1, taskMetricAttributes) + executionContext := t.withTaskExecutionContext(ctx, task) err = taskStruct.Execute(executionContext) executionTime := time.Since(beforeTime) - log = log.With( slog.Duration("execution_time", executionTime), slog.String("execution_time_human", roundDuration(executionTime, 2).String()), ) if err != nil { + t.metrics.tasksFailedMetric.Add(ctx, 1, taskMetricAttributes) + t.metrics.taskExecutionDurationMetric.Record(ctx, executionTime.Seconds(), taskMetricAttributes, metric.WithAttributes(attribute.String("state", "failed"))) + log.ErrorContext(ctx, "task execution failed", slog.Any("error", err), slog.Int64("retry_attempt", task.GetRetryCount())) return getTaskExecutionContext(executionContext), fmt.Errorf("failed to execute task: %w", err) } - // record the time it took to run a successful task - t.taskDurationMetric.Record(ctx, executionTime.Seconds()) + t.metrics.tasksComputedMetric.Add(ctx, 1, taskMetricAttributes) + t.metrics.taskExecutionDurationMetric.Record(ctx, executionTime.Seconds(), taskMetricAttributes, metric.WithAttributes(attribute.String("state", "computed"))) return getTaskExecutionContext(executionContext), nil })