Skip to content

Commit f8f26bf

Browse files
databricks support changes
1 parent 91f2995 commit f8f26bf

File tree

3 files changed

+42
-5
lines changed

3 files changed

+42
-5
lines changed

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -795,7 +795,7 @@ private <T extends SparkListenerEvent> void notifyOl(Consumer<T> ol, T event) {
795795
return;
796796
}
797797

798-
if (isRunningOnDatabricks || isStreamingJob) {
798+
if (isStreamingJob) {
799799
log.debug("Not emitting event when running on databricks or on streaming jobs");
800800
return;
801801
}
@@ -1298,12 +1298,10 @@ private static String getSparkServiceName(SparkConf conf, boolean isRunningOnDat
12981298
return sparkAppName;
12991299
}
13001300

1301-
private static String getServiceForOpenLineage(SparkConf conf, boolean isRunningOnDatabricks) {
1302-
// Service for OpenLineage in Databricks is not supported yet
1301+
private String getServiceForOpenLineage(SparkConf conf, boolean isRunningOnDatabricks) {
13031302
if (isRunningOnDatabricks) {
1304-
return null;
1303+
return databricksServiceName;
13051304
}
1306-
13071305
// Keep service set by user, except if it is only "spark" or "hadoop" that can be set by USM
13081306
String serviceName = Config.get().getServiceName();
13091307
if (Config.get().isServiceNameSetByUser()

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
import datadog.trace.agent.tooling.InstrumenterModule;
1111
import datadog.trace.api.Config;
1212
import datadog.trace.bootstrap.InstanceStore;
13+
import java.lang.reflect.InvocationTargetException;
1314
import net.bytebuddy.asm.Advice;
15+
import org.apache.spark.SparkConf;
1416
import org.apache.spark.deploy.SparkSubmitArguments;
1517
import org.apache.spark.scheduler.SparkListenerInterface;
1618
import org.slf4j.Logger;
@@ -128,6 +130,23 @@ public static boolean enter(@Advice.Argument(0) SparkListenerInterface listener)
128130
&& "io.openlineage.spark.agent.OpenLineageSparkListener"
129131
.equals(listener.getClass().getCanonicalName())) {
130132
log.debug("Detected OpenLineage listener, skipping adding it to ListenerBus");
133+
134+
// Spark config does not get captured on databricks env, possibly bcz of other listener's
135+
// constructor used.
136+
// Reflection is used here to get the config.
137+
try {
138+
log.debug("Getting OpenLineage conf from the listener");
139+
Object openLineageConf = listener.getClass().getMethod("getConf").invoke(listener);
140+
if (openLineageConf != null) {
141+
InstanceStore.of(SparkConf.class)
142+
.put("openLineageSparkConf", (SparkConf) openLineageConf);
143+
}
144+
} catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
145+
log.warn(
146+
"Issue when obtaining OpenLineage conf (possibly unsupported OpenLineage version): {}",
147+
e.getMessage());
148+
}
149+
131150
InstanceStore.of(SparkListenerInterface.class).put("openLineageListener", listener);
132151
return true;
133152
}

dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,26 @@ abstract class AbstractSparkListenerTest extends InstrumentationSpecification {
553553
true | "hadoop" | "expected-service-name"
554554
}
555555

556+
def "test setupOpenLineage gets service name on databricks environment"() {
557+
setup:
558+
SparkConf sparkConf = new SparkConf()
559+
sparkConf.set("spark.databricks.sparkContextId", "some-context")
560+
sparkConf.set("spark.databricks.clusterUsageTags.clusterAllTags", "[{\"key\":\"RunName\",\"value\":\"some-run-name\"}]")
561+
562+
563+
def listener = getTestDatadogSparkListener(sparkConf)
564+
listener.openLineageSparkListener = Mock(SparkListenerInterface)
565+
listener.openLineageSparkConf = new SparkConf()
566+
listener.setupOpenLineage(Mock(DDTraceId))
567+
568+
expect:
569+
assert listener
570+
.openLineageSparkConf
571+
.get("spark.openlineage.run.tags")
572+
.split(";")
573+
.contains("_dd.ol_service:databricks.job-cluster.some-run-name")
574+
}
575+
556576
def "test setupOpenLineage fills ProcessTags"() {
557577
setup:
558578
def listener = getTestDatadogSparkListener()

0 commit comments

Comments
 (0)