From e82dd209fe1f69ddac27adc61699578caf259f1a Mon Sep 17 00:00:00 2001 From: Charles Yu Date: Thu, 30 Oct 2025 13:00:06 -0400 Subject: [PATCH 1/2] Use SparkPlanInfo constructor that is compatible with Databricks' Spark fork --- .../spark/Spark212Instrumentation.java | 31 ++------ .../spark/Spark212PlanUtils.java | 77 +++++++++++++++++++ .../spark/Spark213Instrumentation.java | 31 ++------ .../spark/Spark213PlanUtils.java | 70 +++++++++++++++++ .../spark/AbstractSparkPlanSerializer.java | 19 ++--- 5 files changed, 170 insertions(+), 58 deletions(-) create mode 100644 dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212PlanUtils.java create mode 100644 dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213PlanUtils.java diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java index 5b29d586c46..27a319fca27 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java @@ -7,7 +7,6 @@ import datadog.trace.agent.tooling.InstrumenterModule; import datadog.trace.api.Config; import de.thetaphi.forbiddenapis.SuppressForbidden; -import java.lang.reflect.Constructor; import net.bytebuddy.asm.Advice; import org.apache.spark.SparkContext; import org.apache.spark.sql.execution.SparkPlan; @@ -35,7 +34,8 @@ public String[] helperClassNames() { packageName + ".SparkSQLUtils", packageName + ".SparkSQLUtils$SparkPlanInfoForStage", packageName + ".SparkSQLUtils$AccumulatorWithStage", - packageName + ".Spark212PlanSerializer" + packageName + ".Spark212PlanSerializer", + packageName + ".Spark212PlanUtils" }; } @@ -104,29 +104,14 @@ public static void exit( if (planInfo.metadata().size() == 0 && (Config.get().isDataJobsParseSparkPlanEnabled() || Config.get().isDataJobsExperimentalFeaturesEnabled())) { - Spark212PlanSerializer planUtils = new Spark212PlanSerializer(); + Spark212PlanSerializer planSerializer = new Spark212PlanSerializer(); Map meta = - JavaConverters.mapAsScalaMap(planUtils.extractFormattedProduct(plan)) + JavaConverters.mapAsScalaMap(planSerializer.extractFormattedProduct(plan)) .toMap(Predef.$conforms()); - try { - Constructor targetCtor = null; - for (Constructor c : SparkPlanInfo.class.getConstructors()) { - if (c.getParameterCount() == 5) { - targetCtor = c; - break; - } - } - if (targetCtor != null) { - Object newInst = - targetCtor.newInstance( - planInfo.nodeName(), - planInfo.simpleString(), - planInfo.children(), - meta, - planInfo.metrics()); - planInfo = (SparkPlanInfo) newInst; - } - } catch (Throwable ignored) { + + SparkPlanInfo newPlanInfo = Spark212PlanUtils.upsertSparkPlanInfoMetadata(planInfo, meta); + if (newPlanInfo != null) { + planInfo = newPlanInfo; } } } diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212PlanUtils.java b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212PlanUtils.java new file mode 100644 index 00000000000..36869b2feb2 --- /dev/null +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212PlanUtils.java @@ -0,0 +1,77 @@ +package datadog.trace.instrumentation.spark; + +import datadog.trace.util.MethodHandles; +import java.lang.invoke.MethodHandle; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.spark.sql.execution.SparkPlanInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +public class Spark212PlanUtils { + private static final Logger log = LoggerFactory.getLogger(Spark212PlanUtils.class); + + private static final MethodHandles methodLoader = + new MethodHandles(ClassLoader.getSystemClassLoader()); + private static final MethodHandle constructor = + methodLoader.constructor( + SparkPlanInfo.class, + String.class, + String.class, + scala.collection.Seq.class, + scala.collection.immutable.Map.class, + scala.collection.Seq.class); + private static final MethodHandle databricksConstructor = + methodLoader.constructor( + SparkPlanInfo.class, + String.class, + String.class, + scala.collection.Seq.class, + scala.collection.immutable.Map.class, + scala.collection.Seq.class, + Option.class, + String.class, + Option.class); + + public static SparkPlanInfo upsertSparkPlanInfoMetadata( + SparkPlanInfo planInfo, scala.collection.immutable.Map meta) { + // Attempt to create a new SparkPlanInfo with additional metadata replaced + // Since the fields are immutable we must instantiate a new SparkPlanInfo to do this + + Object[] standardArgs = + new Object[] { + planInfo.nodeName(), + planInfo.simpleString(), + planInfo.children(), + meta, + planInfo.metrics() + }; + + if (databricksConstructor != null) { + List databricksArgs = new ArrayList<>(Arrays.asList(standardArgs)); + try { + databricksArgs.add(SparkPlanInfo.class.getMethod("estRowCount").invoke(planInfo)); + databricksArgs.add(SparkPlanInfo.class.getMethod("rddScopeId").invoke(planInfo)); + databricksArgs.add(SparkPlanInfo.class.getMethod("explainId").invoke(planInfo)); + } catch (Throwable t) { + log.warn("Error obtaining Databricks-specific SparkPlanInfo args", t); + } + + SparkPlanInfo newPlan = methodLoader.invoke(databricksConstructor, databricksArgs.toArray()); + if (newPlan != null) { + return newPlan; + } + } + + if (constructor != null) { + SparkPlanInfo newPlan = methodLoader.invoke(constructor, standardArgs); + if (newPlan != null) { + return newPlan; + } + } + + return null; + } +} diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java index 64439e2c6e2..a2a47bda848 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java @@ -7,7 +7,6 @@ import datadog.trace.agent.tooling.InstrumenterModule; import datadog.trace.api.Config; import de.thetaphi.forbiddenapis.SuppressForbidden; -import java.lang.reflect.Constructor; import net.bytebuddy.asm.Advice; import org.apache.spark.SparkContext; import org.apache.spark.sql.execution.SparkPlan; @@ -35,7 +34,8 @@ public String[] helperClassNames() { packageName + ".SparkSQLUtils", packageName + ".SparkSQLUtils$SparkPlanInfoForStage", packageName + ".SparkSQLUtils$AccumulatorWithStage", - packageName + ".Spark213PlanSerializer" + packageName + ".Spark213PlanSerializer", + packageName + ".Spark213PlanUtils" }; } @@ -105,28 +105,13 @@ public static void exit( if (planInfo.metadata().size() == 0 && (Config.get().isDataJobsParseSparkPlanEnabled() || Config.get().isDataJobsExperimentalFeaturesEnabled())) { - Spark213PlanSerializer planUtils = new Spark213PlanSerializer(); + Spark213PlanSerializer planSerializer = new Spark213PlanSerializer(); Map meta = - HashMap.from(JavaConverters.asScala(planUtils.extractFormattedProduct(plan))); - try { - Constructor targetCtor = null; - for (Constructor c : SparkPlanInfo.class.getConstructors()) { - if (c.getParameterCount() == 5) { - targetCtor = c; - break; - } - } - if (targetCtor != null) { - Object newInst = - targetCtor.newInstance( - planInfo.nodeName(), - planInfo.simpleString(), - planInfo.children(), - meta, - planInfo.metrics()); - planInfo = (SparkPlanInfo) newInst; - } - } catch (Throwable ignored) { + HashMap.from(JavaConverters.asScala(planSerializer.extractFormattedProduct(plan))); + + SparkPlanInfo newPlanInfo = Spark213PlanUtils.upsertSparkPlanInfoMetadata(planInfo, meta); + if (newPlanInfo != null) { + planInfo = newPlanInfo; } } } diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213PlanUtils.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213PlanUtils.java new file mode 100644 index 00000000000..ddb3ac92a99 --- /dev/null +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213PlanUtils.java @@ -0,0 +1,70 @@ +package datadog.trace.instrumentation.spark; + +import datadog.trace.util.MethodHandles; +import java.lang.invoke.MethodHandle; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.spark.sql.execution.SparkPlanInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +public class Spark213PlanUtils { + private static final Logger log = LoggerFactory.getLogger(Spark213PlanUtils.class); + + private static final MethodHandles methodLoader = + new MethodHandles(ClassLoader.getSystemClassLoader()); + private static final MethodHandle constructor = + methodLoader.constructor( + SparkPlanInfo.class, + String.class, + String.class, + scala.collection.immutable.Seq.class, + scala.collection.immutable.Map.class, + scala.collection.immutable.Seq.class); + private static final MethodHandle databricksConstructor = + methodLoader.constructor( + SparkPlanInfo.class, + String.class, + String.class, + scala.collection.immutable.Seq.class, + scala.collection.immutable.Map.class, + scala.collection.immutable.Seq.class, + Option.class, + String.class, + Option.class); + + public static SparkPlanInfo upsertSparkPlanInfoMetadata( + SparkPlanInfo planInfo, scala.collection.immutable.Map meta) { + // Attempt to create a new SparkPlanInfo with additional metadata replaced + // Since the fields are immutable we must instantiate a new SparkPlanInfo to do this + + Object[] standardArgs = + new Object[] { + planInfo.nodeName(), + planInfo.simpleString(), + planInfo.children(), + meta, + planInfo.metrics() + }; + + if (databricksConstructor != null) { + List databricksArgs = new ArrayList<>(Arrays.asList(standardArgs)); + try { + databricksArgs.add(SparkPlanInfo.class.getMethod("estRowCount").invoke(planInfo)); + databricksArgs.add(SparkPlanInfo.class.getMethod("rddScopeId").invoke(planInfo)); + databricksArgs.add(SparkPlanInfo.class.getMethod("explainId").invoke(planInfo)); + } catch (Throwable t) { + log.warn("Error obtaining Databricks-specific SparkPlanInfo args", t); + } + + SparkPlanInfo newPlan = methodLoader.invoke(databricksConstructor, databricksArgs.toArray()); + if (newPlan != null) { + return newPlan; + } + } + + return null; + } +} diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkPlanSerializer.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkPlanSerializer.java index d0a91e53416..271f273660a 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkPlanSerializer.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkPlanSerializer.java @@ -60,7 +60,7 @@ public abstract class AbstractSparkPlanSerializer { private final MethodHandles methodLoader = new MethodHandles(ClassLoader.getSystemClassLoader()); private final MethodHandle getSimpleString = - methodLoader.method(TreeNode.class, "simpleString", new Class[] {int.class}); + methodLoader.method(TreeNode.class, "simpleString", int.class); private final MethodHandle getSimpleStringLegacy = methodLoader.method(TreeNode.class, "simpleString"); @@ -156,25 +156,20 @@ protected Object safeParseObjectToJson(Object value, int depth) { } private String getSimpleString(TreeNode value) { - Object simpleString = null; - if (getSimpleString != null) { - try { - simpleString = getSimpleString.invoke(value, MAX_LENGTH); - } catch (Throwable e) { + String simpleString = methodLoader.invoke(getSimpleString, value, MAX_LENGTH); + if (simpleString != null) { + return simpleString; } } if (getSimpleStringLegacy != null) { - try { - simpleString = getSimpleStringLegacy.invoke(value); - } catch (Throwable e) { + String simpleString = methodLoader.invoke(getSimpleStringLegacy, value); + if (simpleString != null) { + return simpleString; } } - if (simpleString != null && simpleString instanceof String) { - return (String) simpleString; - } return null; } From 0bfaa064cdc78d1d0b372f6c94246d83efd8fc26 Mon Sep 17 00:00:00 2001 From: Charles Yu Date: Thu, 6 Nov 2025 10:16:49 -0500 Subject: [PATCH 2/2] Centralize logic in a common AbstractSparkPlanUtils class --- .../spark/Spark212Instrumentation.java | 4 +- .../spark/Spark212PlanUtils.java | 62 +++++-------------- .../spark/Spark213Instrumentation.java | 4 +- .../spark/Spark213PlanUtils.java | 55 +++++----------- .../spark/AbstractSparkPlanUtils.java | 57 +++++++++++++++++ 5 files changed, 93 insertions(+), 89 deletions(-) create mode 100644 dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkPlanUtils.java diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java index 27a319fca27..4cd23089cd1 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java @@ -24,6 +24,7 @@ public String[] helperClassNames() { return new String[] { packageName + ".AbstractDatadogSparkListener", packageName + ".AbstractSparkPlanSerializer", + packageName + ".AbstractSparkPlanUtils", packageName + ".DatabricksParentContext", packageName + ".OpenlineageParentContext", packageName + ".DatadogSpark212Listener", @@ -109,7 +110,8 @@ public static void exit( JavaConverters.mapAsScalaMap(planSerializer.extractFormattedProduct(plan)) .toMap(Predef.$conforms()); - SparkPlanInfo newPlanInfo = Spark212PlanUtils.upsertSparkPlanInfoMetadata(planInfo, meta); + SparkPlanInfo newPlanInfo = + new Spark212PlanUtils().upsertSparkPlanInfoMetadata(planInfo, meta); if (newPlanInfo != null) { planInfo = newPlanInfo; } diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212PlanUtils.java b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212PlanUtils.java index 36869b2feb2..4d9bef21cdd 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212PlanUtils.java +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212PlanUtils.java @@ -1,20 +1,11 @@ package datadog.trace.instrumentation.spark; -import datadog.trace.util.MethodHandles; import java.lang.invoke.MethodHandle; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import org.apache.spark.sql.execution.SparkPlanInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.Option; +import scala.collection.immutable.Map; -public class Spark212PlanUtils { - private static final Logger log = LoggerFactory.getLogger(Spark212PlanUtils.class); - - private static final MethodHandles methodLoader = - new MethodHandles(ClassLoader.getSystemClassLoader()); +public class Spark212PlanUtils extends AbstractSparkPlanUtils { private static final MethodHandle constructor = methodLoader.constructor( SparkPlanInfo.class, @@ -35,43 +26,20 @@ public class Spark212PlanUtils { String.class, Option.class); - public static SparkPlanInfo upsertSparkPlanInfoMetadata( - SparkPlanInfo planInfo, scala.collection.immutable.Map meta) { - // Attempt to create a new SparkPlanInfo with additional metadata replaced - // Since the fields are immutable we must instantiate a new SparkPlanInfo to do this - - Object[] standardArgs = - new Object[] { - planInfo.nodeName(), - planInfo.simpleString(), - planInfo.children(), - meta, - planInfo.metrics() - }; - - if (databricksConstructor != null) { - List databricksArgs = new ArrayList<>(Arrays.asList(standardArgs)); - try { - databricksArgs.add(SparkPlanInfo.class.getMethod("estRowCount").invoke(planInfo)); - databricksArgs.add(SparkPlanInfo.class.getMethod("rddScopeId").invoke(planInfo)); - databricksArgs.add(SparkPlanInfo.class.getMethod("explainId").invoke(planInfo)); - } catch (Throwable t) { - log.warn("Error obtaining Databricks-specific SparkPlanInfo args", t); - } - - SparkPlanInfo newPlan = methodLoader.invoke(databricksConstructor, databricksArgs.toArray()); - if (newPlan != null) { - return newPlan; - } - } + @Override + protected MethodHandle getConstructor() { + return constructor; + } - if (constructor != null) { - SparkPlanInfo newPlan = methodLoader.invoke(constructor, standardArgs); - if (newPlan != null) { - return newPlan; - } - } + @Override + protected MethodHandle getDatabricksConstructor() { + return databricksConstructor; + } - return null; + @Override + protected Object[] getStandardArgs(SparkPlanInfo planInfo, Map meta) { + return new Object[] { + planInfo.nodeName(), planInfo.simpleString(), planInfo.children(), meta, planInfo.metrics() + }; } } diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java index a2a47bda848..c9e534f6429 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java @@ -24,6 +24,7 @@ public String[] helperClassNames() { return new String[] { packageName + ".AbstractDatadogSparkListener", packageName + ".AbstractSparkPlanSerializer", + packageName + ".AbstractSparkPlanUtils", packageName + ".DatabricksParentContext", packageName + ".OpenlineageParentContext", packageName + ".DatadogSpark213Listener", @@ -109,7 +110,8 @@ public static void exit( Map meta = HashMap.from(JavaConverters.asScala(planSerializer.extractFormattedProduct(plan))); - SparkPlanInfo newPlanInfo = Spark213PlanUtils.upsertSparkPlanInfoMetadata(planInfo, meta); + SparkPlanInfo newPlanInfo = + new Spark213PlanUtils().upsertSparkPlanInfoMetadata(planInfo, meta); if (newPlanInfo != null) { planInfo = newPlanInfo; } diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213PlanUtils.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213PlanUtils.java index ddb3ac92a99..c59b4fdf600 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213PlanUtils.java +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213PlanUtils.java @@ -1,20 +1,11 @@ package datadog.trace.instrumentation.spark; -import datadog.trace.util.MethodHandles; import java.lang.invoke.MethodHandle; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import org.apache.spark.sql.execution.SparkPlanInfo; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.Option; +import scala.collection.immutable.Map; -public class Spark213PlanUtils { - private static final Logger log = LoggerFactory.getLogger(Spark213PlanUtils.class); - - private static final MethodHandles methodLoader = - new MethodHandles(ClassLoader.getSystemClassLoader()); +public class Spark213PlanUtils extends AbstractSparkPlanUtils { private static final MethodHandle constructor = methodLoader.constructor( SparkPlanInfo.class, @@ -35,36 +26,20 @@ public class Spark213PlanUtils { String.class, Option.class); - public static SparkPlanInfo upsertSparkPlanInfoMetadata( - SparkPlanInfo planInfo, scala.collection.immutable.Map meta) { - // Attempt to create a new SparkPlanInfo with additional metadata replaced - // Since the fields are immutable we must instantiate a new SparkPlanInfo to do this - - Object[] standardArgs = - new Object[] { - planInfo.nodeName(), - planInfo.simpleString(), - planInfo.children(), - meta, - planInfo.metrics() - }; - - if (databricksConstructor != null) { - List databricksArgs = new ArrayList<>(Arrays.asList(standardArgs)); - try { - databricksArgs.add(SparkPlanInfo.class.getMethod("estRowCount").invoke(planInfo)); - databricksArgs.add(SparkPlanInfo.class.getMethod("rddScopeId").invoke(planInfo)); - databricksArgs.add(SparkPlanInfo.class.getMethod("explainId").invoke(planInfo)); - } catch (Throwable t) { - log.warn("Error obtaining Databricks-specific SparkPlanInfo args", t); - } + @Override + protected MethodHandle getConstructor() { + return constructor; + } - SparkPlanInfo newPlan = methodLoader.invoke(databricksConstructor, databricksArgs.toArray()); - if (newPlan != null) { - return newPlan; - } - } + @Override + protected MethodHandle getDatabricksConstructor() { + return databricksConstructor; + } - return null; + @Override + protected Object[] getStandardArgs(SparkPlanInfo planInfo, Map meta) { + return new Object[] { + planInfo.nodeName(), planInfo.simpleString(), planInfo.children(), meta, planInfo.metrics() + }; } } diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkPlanUtils.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkPlanUtils.java new file mode 100644 index 00000000000..68af1f402dc --- /dev/null +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkPlanUtils.java @@ -0,0 +1,57 @@ +package datadog.trace.instrumentation.spark; + +import datadog.trace.util.MethodHandles; +import java.lang.invoke.MethodHandle; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.spark.sql.execution.SparkPlanInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.immutable.Map; + +abstract class AbstractSparkPlanUtils { + private static final Logger log = LoggerFactory.getLogger(AbstractSparkPlanUtils.class); + + protected static final MethodHandles methodLoader = + new MethodHandles(ClassLoader.getSystemClassLoader()); + + protected abstract MethodHandle getConstructor(); + + protected abstract MethodHandle getDatabricksConstructor(); + + // Deals with Seq which changed from Scala 2.12 to 2.13, so delegate to version-specific classes + protected abstract Object[] getStandardArgs(SparkPlanInfo planInfo, Map meta); + + // Attempt to create a new SparkPlanInfo with additional metadata replaced + // Since the fields are immutable we must instantiate a new SparkPlanInfo to do this + public SparkPlanInfo upsertSparkPlanInfoMetadata( + SparkPlanInfo planInfo, scala.collection.immutable.Map meta) { + if (getDatabricksConstructor() != null) { + List databricksArgs = new ArrayList<>(Arrays.asList(getStandardArgs(planInfo, meta))); + try { + databricksArgs.add(SparkPlanInfo.class.getMethod("estRowCount").invoke(planInfo)); + databricksArgs.add(SparkPlanInfo.class.getMethod("rddScopeId").invoke(planInfo)); + databricksArgs.add(SparkPlanInfo.class.getMethod("explainId").invoke(planInfo)); + } catch (Throwable t) { + log.warn("Error obtaining Databricks-specific SparkPlanInfo args", t); + } + + SparkPlanInfo newPlan = + methodLoader.invoke(getDatabricksConstructor(), databricksArgs.toArray()); + if (newPlan != null) { + return newPlan; + } + } + + if (getConstructor() != null) { + SparkPlanInfo newPlan = + methodLoader.invoke(getConstructor(), getStandardArgs(planInfo, meta)); + if (newPlan != null) { + return newPlan; + } + } + + return null; + } +}