Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -795,8 +795,8 @@ private <T extends SparkListenerEvent> void notifyOl(Consumer<T> ol, T event) {
return;
}

if (isRunningOnDatabricks || isStreamingJob) {
log.debug("Not emitting event when running on databricks or on streaming jobs");
if (isStreamingJob) {
log.debug("Not emitting event when running streaming jobs");
return;
}
if (openLineageSparkListener != null) {
Expand Down Expand Up @@ -1298,12 +1298,10 @@ private static String getSparkServiceName(SparkConf conf, boolean isRunningOnDat
return sparkAppName;
}

private static String getServiceForOpenLineage(SparkConf conf, boolean isRunningOnDatabricks) {
// Service for OpenLineage in Databricks is not supported yet
private String getServiceForOpenLineage(SparkConf conf, boolean isRunningOnDatabricks) {
if (isRunningOnDatabricks) {
return null;
return databricksServiceName;
}

// Keep service set by user, except if it is only "spark" or "hadoop" that can be set by USM
String serviceName = Config.get().getServiceName();
if (Config.get().isServiceNameSetByUser()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.InstanceStore;
import java.lang.reflect.InvocationTargetException;
import net.bytebuddy.asm.Advice;
import org.apache.spark.SparkConf;
import org.apache.spark.deploy.SparkSubmitArguments;
import org.apache.spark.scheduler.SparkListenerInterface;
import org.slf4j.Logger;
Expand Down Expand Up @@ -128,6 +130,23 @@ public static boolean enter(@Advice.Argument(0) SparkListenerInterface listener)
&& "io.openlineage.spark.agent.OpenLineageSparkListener"
.equals(listener.getClass().getCanonicalName())) {
log.debug("Detected OpenLineage listener, skipping adding it to ListenerBus");

// Spark config does not get captured on databricks env, possibly bcz of other listener's
// constructor used.
// Reflection is used here to get the config.
try {
log.debug("Getting OpenLineage conf from the listener");
Object openLineageConf = listener.getClass().getMethod("getConf").invoke(listener);
if (openLineageConf != null) {
InstanceStore.of(SparkConf.class)
.put("openLineageSparkConf", (SparkConf) openLineageConf);
}
} catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
log.warn(
"Issue when obtaining OpenLineage conf (possibly unsupported OpenLineage version): {}",
e.getMessage());
}
Comment on lines +137 to +148
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe instead of reflection, we should find exactly the databricks forked class, and write an instrumentation similar to what we do for SparkConf itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can try this @mobuchowski.
Reflection is a workaround for the LiveListenerBusAdvice not working on databricks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've done another round of tests and OpenLineageSparkListenerAdvice is not working on databricks environment. It's not working even without considering sparkConf argument. There may be different reasons behind that: perhaps OpenLineage listener gets created before datadog's instrumentation loads or databricks initiates listeners other way than calling regular constructors (they can use reflection as well). To wrap up: there ain't no easy nor known solution to this.

For OpenLineage connector >= 1.39 we could have used a regular listener method call (listener.getConf()) to obtain the conf. However, we need to make sure the code is working for older connector versions, when getConf method didn't exist. That's why I think reflection is necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be different reasons behind that: perhaps OpenLineage listener gets created before datadog's instrumentation loads or databricks initiates listeners other way than calling regular constructors (they can use reflection as well). To wrap up: there ain't no easy nor known solution to this.

Maybe we can test that by raising exception on constructor, to see the stack trace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caused by: java.lang.RuntimeException: some exception
	at io.openlineage.spark.agent.OpenLineageSparkListener.<init>(OpenLineageSparkListener.java:99)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:500)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:481)
	at org.apache.spark.util.Utils$.$anonfun$loadExtensions$1(Utils.scala:3610)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
	at org.apache.spark.util.Utils$.loadExtensions(Utils.scala:3602)
	at org.apache.spark.SparkContext.$anonfun$setupAndStartListenerBus$1(SparkContext.scala:3683)
	at org.apache.spark.SparkContext.$anonfun$setupAndStartListenerBus$1$adapted(SparkContext.scala:3680)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.SparkContext.setupAndStartListenerBus(SparkContext.scala:3680)
	... 42 more


InstanceStore.of(SparkListenerInterface.class).put("openLineageListener", listener);
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,26 @@ abstract class AbstractSparkListenerTest extends InstrumentationSpecification {
true | "hadoop" | "expected-service-name"
}

def "test setupOpenLineage gets service name on databricks environment"() {
setup:
SparkConf sparkConf = new SparkConf()
sparkConf.set("spark.databricks.sparkContextId", "some-context")
sparkConf.set("spark.databricks.clusterUsageTags.clusterAllTags", "[{\"key\":\"RunName\",\"value\":\"some-run-name\"}]")


def listener = getTestDatadogSparkListener(sparkConf)
listener.openLineageSparkListener = Mock(SparkListenerInterface)
listener.openLineageSparkConf = new SparkConf()
listener.setupOpenLineage(Mock(DDTraceId))

expect:
assert listener
.openLineageSparkConf
.get("spark.openlineage.run.tags")
.split(";")
.contains("_dd.ol_service:databricks.job-cluster.some-run-name")
}

def "test setupOpenLineage fills ProcessTags"() {
setup:
def listener = getTestDatadogSparkListener()
Expand Down