diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java index 5b29d586c46..4cd23089cd1 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java @@ -7,7 +7,6 @@ import datadog.trace.agent.tooling.InstrumenterModule; import datadog.trace.api.Config; import de.thetaphi.forbiddenapis.SuppressForbidden; -import java.lang.reflect.Constructor; import net.bytebuddy.asm.Advice; import org.apache.spark.SparkContext; import org.apache.spark.sql.execution.SparkPlan; @@ -25,6 +24,7 @@ public String[] helperClassNames() { return new String[] { packageName + ".AbstractDatadogSparkListener", packageName + ".AbstractSparkPlanSerializer", + packageName + ".AbstractSparkPlanUtils", packageName + ".DatabricksParentContext", packageName + ".OpenlineageParentContext", packageName + ".DatadogSpark212Listener", @@ -35,7 +35,8 @@ public String[] helperClassNames() { packageName + ".SparkSQLUtils", packageName + ".SparkSQLUtils$SparkPlanInfoForStage", packageName + ".SparkSQLUtils$AccumulatorWithStage", - packageName + ".Spark212PlanSerializer" + packageName + ".Spark212PlanSerializer", + packageName + ".Spark212PlanUtils" }; } @@ -104,29 +105,15 @@ public static void exit( if (planInfo.metadata().size() == 0 && (Config.get().isDataJobsParseSparkPlanEnabled() || Config.get().isDataJobsExperimentalFeaturesEnabled())) { - Spark212PlanSerializer planUtils = new Spark212PlanSerializer(); + Spark212PlanSerializer planSerializer = new Spark212PlanSerializer(); Map meta = - JavaConverters.mapAsScalaMap(planUtils.extractFormattedProduct(plan)) + JavaConverters.mapAsScalaMap(planSerializer.extractFormattedProduct(plan)) .toMap(Predef.$conforms()); - try { - Constructor targetCtor = null; - for (Constructor c : SparkPlanInfo.class.getConstructors()) { - if (c.getParameterCount() == 5) { - targetCtor = c; - break; - } - } - if (targetCtor != null) { - Object newInst = - targetCtor.newInstance( - planInfo.nodeName(), - planInfo.simpleString(), - planInfo.children(), - meta, - planInfo.metrics()); - planInfo = (SparkPlanInfo) newInst; - } - } catch (Throwable ignored) { + + SparkPlanInfo newPlanInfo = + new Spark212PlanUtils().upsertSparkPlanInfoMetadata(planInfo, meta); + if (newPlanInfo != null) { + planInfo = newPlanInfo; } } } diff --git a/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212PlanUtils.java b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212PlanUtils.java new file mode 100644 index 00000000000..4d9bef21cdd --- /dev/null +++ b/dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212PlanUtils.java @@ -0,0 +1,45 @@ +package datadog.trace.instrumentation.spark; + +import java.lang.invoke.MethodHandle; +import org.apache.spark.sql.execution.SparkPlanInfo; +import scala.Option; +import scala.collection.immutable.Map; + +public class Spark212PlanUtils extends AbstractSparkPlanUtils { + private static final MethodHandle constructor = + methodLoader.constructor( + SparkPlanInfo.class, + String.class, + String.class, + scala.collection.Seq.class, + scala.collection.immutable.Map.class, + scala.collection.Seq.class); + private static final MethodHandle databricksConstructor = + methodLoader.constructor( + SparkPlanInfo.class, + String.class, + String.class, + scala.collection.Seq.class, + scala.collection.immutable.Map.class, + scala.collection.Seq.class, + Option.class, + String.class, + Option.class); + + @Override + protected MethodHandle getConstructor() { + return constructor; + } + + @Override + protected MethodHandle getDatabricksConstructor() { + return databricksConstructor; + } + + @Override + protected Object[] getStandardArgs(SparkPlanInfo planInfo, Map meta) { + return new Object[] { + planInfo.nodeName(), planInfo.simpleString(), planInfo.children(), meta, planInfo.metrics() + }; + } +} diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java index 64439e2c6e2..c9e534f6429 100644 --- a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java @@ -7,7 +7,6 @@ import datadog.trace.agent.tooling.InstrumenterModule; import datadog.trace.api.Config; import de.thetaphi.forbiddenapis.SuppressForbidden; -import java.lang.reflect.Constructor; import net.bytebuddy.asm.Advice; import org.apache.spark.SparkContext; import org.apache.spark.sql.execution.SparkPlan; @@ -25,6 +24,7 @@ public String[] helperClassNames() { return new String[] { packageName + ".AbstractDatadogSparkListener", packageName + ".AbstractSparkPlanSerializer", + packageName + ".AbstractSparkPlanUtils", packageName + ".DatabricksParentContext", packageName + ".OpenlineageParentContext", packageName + ".DatadogSpark213Listener", @@ -35,7 +35,8 @@ public String[] helperClassNames() { packageName + ".SparkSQLUtils", packageName + ".SparkSQLUtils$SparkPlanInfoForStage", packageName + ".SparkSQLUtils$AccumulatorWithStage", - packageName + ".Spark213PlanSerializer" + packageName + ".Spark213PlanSerializer", + packageName + ".Spark213PlanUtils" }; } @@ -105,28 +106,14 @@ public static void exit( if (planInfo.metadata().size() == 0 && (Config.get().isDataJobsParseSparkPlanEnabled() || Config.get().isDataJobsExperimentalFeaturesEnabled())) { - Spark213PlanSerializer planUtils = new Spark213PlanSerializer(); + Spark213PlanSerializer planSerializer = new Spark213PlanSerializer(); Map meta = - HashMap.from(JavaConverters.asScala(planUtils.extractFormattedProduct(plan))); - try { - Constructor targetCtor = null; - for (Constructor c : SparkPlanInfo.class.getConstructors()) { - if (c.getParameterCount() == 5) { - targetCtor = c; - break; - } - } - if (targetCtor != null) { - Object newInst = - targetCtor.newInstance( - planInfo.nodeName(), - planInfo.simpleString(), - planInfo.children(), - meta, - planInfo.metrics()); - planInfo = (SparkPlanInfo) newInst; - } - } catch (Throwable ignored) { + HashMap.from(JavaConverters.asScala(planSerializer.extractFormattedProduct(plan))); + + SparkPlanInfo newPlanInfo = + new Spark213PlanUtils().upsertSparkPlanInfoMetadata(planInfo, meta); + if (newPlanInfo != null) { + planInfo = newPlanInfo; } } } diff --git a/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213PlanUtils.java b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213PlanUtils.java new file mode 100644 index 00000000000..c59b4fdf600 --- /dev/null +++ b/dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213PlanUtils.java @@ -0,0 +1,45 @@ +package datadog.trace.instrumentation.spark; + +import java.lang.invoke.MethodHandle; +import org.apache.spark.sql.execution.SparkPlanInfo; +import scala.Option; +import scala.collection.immutable.Map; + +public class Spark213PlanUtils extends AbstractSparkPlanUtils { + private static final MethodHandle constructor = + methodLoader.constructor( + SparkPlanInfo.class, + String.class, + String.class, + scala.collection.immutable.Seq.class, + scala.collection.immutable.Map.class, + scala.collection.immutable.Seq.class); + private static final MethodHandle databricksConstructor = + methodLoader.constructor( + SparkPlanInfo.class, + String.class, + String.class, + scala.collection.immutable.Seq.class, + scala.collection.immutable.Map.class, + scala.collection.immutable.Seq.class, + Option.class, + String.class, + Option.class); + + @Override + protected MethodHandle getConstructor() { + return constructor; + } + + @Override + protected MethodHandle getDatabricksConstructor() { + return databricksConstructor; + } + + @Override + protected Object[] getStandardArgs(SparkPlanInfo planInfo, Map meta) { + return new Object[] { + planInfo.nodeName(), planInfo.simpleString(), planInfo.children(), meta, planInfo.metrics() + }; + } +} diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkPlanSerializer.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkPlanSerializer.java index d0a91e53416..271f273660a 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkPlanSerializer.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkPlanSerializer.java @@ -60,7 +60,7 @@ public abstract class AbstractSparkPlanSerializer { private final MethodHandles methodLoader = new MethodHandles(ClassLoader.getSystemClassLoader()); private final MethodHandle getSimpleString = - methodLoader.method(TreeNode.class, "simpleString", new Class[] {int.class}); + methodLoader.method(TreeNode.class, "simpleString", int.class); private final MethodHandle getSimpleStringLegacy = methodLoader.method(TreeNode.class, "simpleString"); @@ -156,25 +156,20 @@ protected Object safeParseObjectToJson(Object value, int depth) { } private String getSimpleString(TreeNode value) { - Object simpleString = null; - if (getSimpleString != null) { - try { - simpleString = getSimpleString.invoke(value, MAX_LENGTH); - } catch (Throwable e) { + String simpleString = methodLoader.invoke(getSimpleString, value, MAX_LENGTH); + if (simpleString != null) { + return simpleString; } } if (getSimpleStringLegacy != null) { - try { - simpleString = getSimpleStringLegacy.invoke(value); - } catch (Throwable e) { + String simpleString = methodLoader.invoke(getSimpleStringLegacy, value); + if (simpleString != null) { + return simpleString; } } - if (simpleString != null && simpleString instanceof String) { - return (String) simpleString; - } return null; } diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkPlanUtils.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkPlanUtils.java new file mode 100644 index 00000000000..68af1f402dc --- /dev/null +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkPlanUtils.java @@ -0,0 +1,57 @@ +package datadog.trace.instrumentation.spark; + +import datadog.trace.util.MethodHandles; +import java.lang.invoke.MethodHandle; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.spark.sql.execution.SparkPlanInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.collection.immutable.Map; + +abstract class AbstractSparkPlanUtils { + private static final Logger log = LoggerFactory.getLogger(AbstractSparkPlanUtils.class); + + protected static final MethodHandles methodLoader = + new MethodHandles(ClassLoader.getSystemClassLoader()); + + protected abstract MethodHandle getConstructor(); + + protected abstract MethodHandle getDatabricksConstructor(); + + // Deals with Seq which changed from Scala 2.12 to 2.13, so delegate to version-specific classes + protected abstract Object[] getStandardArgs(SparkPlanInfo planInfo, Map meta); + + // Attempt to create a new SparkPlanInfo with additional metadata replaced + // Since the fields are immutable we must instantiate a new SparkPlanInfo to do this + public SparkPlanInfo upsertSparkPlanInfoMetadata( + SparkPlanInfo planInfo, scala.collection.immutable.Map meta) { + if (getDatabricksConstructor() != null) { + List databricksArgs = new ArrayList<>(Arrays.asList(getStandardArgs(planInfo, meta))); + try { + databricksArgs.add(SparkPlanInfo.class.getMethod("estRowCount").invoke(planInfo)); + databricksArgs.add(SparkPlanInfo.class.getMethod("rddScopeId").invoke(planInfo)); + databricksArgs.add(SparkPlanInfo.class.getMethod("explainId").invoke(planInfo)); + } catch (Throwable t) { + log.warn("Error obtaining Databricks-specific SparkPlanInfo args", t); + } + + SparkPlanInfo newPlan = + methodLoader.invoke(getDatabricksConstructor(), databricksArgs.toArray()); + if (newPlan != null) { + return newPlan; + } + } + + if (getConstructor() != null) { + SparkPlanInfo newPlan = + methodLoader.invoke(getConstructor(), getStandardArgs(planInfo, meta)); + if (newPlan != null) { + return newPlan; + } + } + + return null; + } +}