diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 59e142fa3ff..01e37f38c45 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -795,8 +795,8 @@ private void notifyOl(Consumer ol, T event) { return; } - if (isRunningOnDatabricks || isStreamingJob) { - log.debug("Not emitting event when running on databricks or on streaming jobs"); + if (isStreamingJob) { + log.debug("Not emitting event when running streaming jobs"); return; } if (openLineageSparkListener != null) { @@ -1298,12 +1298,10 @@ private static String getSparkServiceName(SparkConf conf, boolean isRunningOnDat return sparkAppName; } - private static String getServiceForOpenLineage(SparkConf conf, boolean isRunningOnDatabricks) { - // Service for OpenLineage in Databricks is not supported yet + private String getServiceForOpenLineage(SparkConf conf, boolean isRunningOnDatabricks) { if (isRunningOnDatabricks) { - return null; + return databricksServiceName; } - // Keep service set by user, except if it is only "spark" or "hadoop" that can be set by USM String serviceName = Config.get().getServiceName(); if (Config.get().isServiceNameSetByUser() diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java index 0277e9446f0..80c71a4f64b 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java @@ -10,7 +10,9 @@ import datadog.trace.agent.tooling.InstrumenterModule; import datadog.trace.api.Config; import datadog.trace.bootstrap.InstanceStore; +import java.lang.reflect.InvocationTargetException; import net.bytebuddy.asm.Advice; +import org.apache.spark.SparkConf; import org.apache.spark.deploy.SparkSubmitArguments; import org.apache.spark.scheduler.SparkListenerInterface; import org.slf4j.Logger; @@ -128,6 +130,23 @@ public static boolean enter(@Advice.Argument(0) SparkListenerInterface listener) && "io.openlineage.spark.agent.OpenLineageSparkListener" .equals(listener.getClass().getCanonicalName())) { log.debug("Detected OpenLineage listener, skipping adding it to ListenerBus"); + + // Spark config does not get captured on databricks env, possibly bcz of other listener's + // constructor used. + // Reflection is used here to get the config. + try { + log.debug("Getting OpenLineage conf from the listener"); + Object openLineageConf = listener.getClass().getMethod("getConf").invoke(listener); + if (openLineageConf != null) { + InstanceStore.of(SparkConf.class) + .put("openLineageSparkConf", (SparkConf) openLineageConf); + } + } catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) { + log.warn( + "Issue when obtaining OpenLineage conf (possibly unsupported OpenLineage version): {}", + e.getMessage()); + } + InstanceStore.of(SparkListenerInterface.class).put("openLineageListener", listener); return true; } diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy index a7a87c97712..e5d9aafad53 100644 --- a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy @@ -553,6 +553,26 @@ abstract class AbstractSparkListenerTest extends InstrumentationSpecification { true | "hadoop" | "expected-service-name" } + def "test setupOpenLineage gets service name on databricks environment"() { + setup: + SparkConf sparkConf = new SparkConf() + sparkConf.set("spark.databricks.sparkContextId", "some-context") + sparkConf.set("spark.databricks.clusterUsageTags.clusterAllTags", "[{\"key\":\"RunName\",\"value\":\"some-run-name\"}]") + + + def listener = getTestDatadogSparkListener(sparkConf) + listener.openLineageSparkListener = Mock(SparkListenerInterface) + listener.openLineageSparkConf = new SparkConf() + listener.setupOpenLineage(Mock(DDTraceId)) + + expect: + assert listener + .openLineageSparkConf + .get("spark.openlineage.run.tags") + .split(";") + .contains("_dd.ol_service:databricks.job-cluster.some-run-name") + } + def "test setupOpenLineage fills ProcessTags"() { setup: def listener = getTestDatadogSparkListener()