From f9ab1f30e2f34db9472c99c287418a88e54a65da Mon Sep 17 00:00:00 2001 From: Vishnu Kamana Date: Thu, 15 Jan 2026 11:07:57 -0800 Subject: [PATCH] Add granular task level maintenance job metrics to track when a maintenance job has been triggered for a particular table/ database --- .../jobs/scheduler/tasks/OperationTask.java | 17 +++++++++++++++ .../tasks/OperationTasksBuilder.java | 21 +++++++++++++++++++ .../openhouse/jobs/util/AppConstants.java | 3 +++ 3 files changed, 41 insertions(+) diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTask.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTask.java index f775cfd0d..155e0b18b 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTask.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTask.java @@ -24,6 +24,9 @@ /** * A callable class to apply an operation to some entity (table/database) by running a Spark job. * Takes care of the job lifecycle using /jobs API. + * + *

NOTE: Every implementation must implement a static {@code OPERATION_TYPE} field in order for + * the job scheduler to load the OperationTask. */ @Slf4j @Getter @@ -270,6 +273,20 @@ private void reportJobState( AppConstants.JOB_DURATION, System.currentTimeMillis() - startTime, attributes); + + // Granular attributes to publish entity level job metrics + Attributes granularAttributes = + Attributes.of( + AttributeKey.stringKey(AppConstants.ENTITY_NAME), + metadata.getEntityName(), + AttributeKey.stringKey(AppConstants.ENTITY_TYPE), + metadata.getClass().getSimpleName().replace("Metadata", ""), + AttributeKey.stringKey(AppConstants.JOB_TYPE), + getType().getValue(), + AttributeKey.stringKey(AppConstants.JOB_STATE), + state.name()); + + otelEmitter.count(METRICS_SCOPE, "maintenance_job_completed", 1, granularAttributes); } protected abstract boolean launchJob(); diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java index fe0290b36..37e3df399 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/scheduler/tasks/OperationTasksBuilder.java @@ -9,12 +9,15 @@ import com.linkedin.openhouse.jobs.client.TablesClient; import com.linkedin.openhouse.jobs.client.model.JobConf; import com.linkedin.openhouse.jobs.scheduler.JobsScheduler; +import com.linkedin.openhouse.jobs.util.AppConstants; import com.linkedin.openhouse.jobs.util.DataLayoutUtil; import com.linkedin.openhouse.jobs.util.DatabaseMetadata; import com.linkedin.openhouse.jobs.util.DirectoryMetadata; import com.linkedin.openhouse.jobs.util.Metadata; import com.linkedin.openhouse.jobs.util.TableDataLayoutMetadata; import com.linkedin.openhouse.jobs.util.TableMetadata; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -168,6 +171,15 @@ private List> processMetadataList( if (optionalOperationTask.isPresent()) { taskList.add(optionalOperationTask.get()); } + + // Publish entity metrics for triggered tasks + Attributes taskAttributes = + Attributes.of( + AttributeKey.stringKey(AppConstants.ENTITY_NAME), metadata.getEntityName(), + AttributeKey.stringKey(AppConstants.ENTITY_TYPE), + metadata.getClass().getSimpleName().replace("Metadata", ""), + AttributeKey.stringKey(AppConstants.JOB_TYPE), jobType.getValue()); + otelEmitter.count(METRICS_SCOPE, "maintenance_job_triggered", 1, taskAttributes); } return taskList; } @@ -183,6 +195,15 @@ public Optional> processMetadata( task.setOtelEmitter(otelEmitter); if (!task.shouldRun()) { log.info("Skipping task {}", task); + + // Publish entity metrics for skipped tasks + Attributes taskAttributes = + Attributes.of( + AttributeKey.stringKey(AppConstants.ENTITY_NAME), metadata.getEntityName(), + AttributeKey.stringKey(AppConstants.ENTITY_TYPE), + metadata.getClass().getSimpleName().replace("Metadata", ""), + AttributeKey.stringKey(AppConstants.JOB_TYPE), task.getType().getValue()); + otelEmitter.count(METRICS_SCOPE, "maintenance_job_skipped", 1, taskAttributes); return Optional.empty(); } else { if (OperationMode.SUBMIT.equals(operationMode)) { diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/AppConstants.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/AppConstants.java index d001d2f23..a756a65f8 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/AppConstants.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/AppConstants.java @@ -9,7 +9,9 @@ public final class AppConstants { // Spark App observability constants public static final String TYPE = "type"; + public static final String ENTITY_TYPE = "entity_type"; public static final String JOB_TYPE = "job_type"; + public static final String JOB_STATE = "job_state"; public static final String ORPHAN_FILE_COUNT = "orphan_file_count"; public static final String STAGED_FILE_COUNT = "staged_file_count"; public static final String ORPHAN_DIRECTORY_COUNT = "orphan_directory_count"; @@ -49,6 +51,7 @@ public final class AppConstants { public static final String JOB_ID = "job_id"; public static final String QUEUED_TIME = "queued_time"; public static final String DATABASE_NAME = "database_name"; + public static final String ENTITY_NAME = "entity_name"; // Maintenance jobs table properties keys public static final String BACKUP_ENABLED_KEY = "retention.backup.enabled";