Skip to content

Commit 03b891a

Browse files
committed
Use SparkPlanInfo constructor that is compatible with Databricks' Spark fork
1 parent 9244e6b commit 03b891a

File tree

4 files changed

+113
-58
lines changed

4 files changed

+113
-58
lines changed

dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import datadog.trace.agent.tooling.InstrumenterModule;
88
import datadog.trace.api.Config;
99
import de.thetaphi.forbiddenapis.SuppressForbidden;
10-
import java.lang.reflect.Constructor;
1110
import net.bytebuddy.asm.Advice;
1211
import org.apache.spark.SparkContext;
1312
import org.apache.spark.sql.execution.SparkPlan;
@@ -35,7 +34,8 @@ public String[] helperClassNames() {
3534
packageName + ".SparkSQLUtils",
3635
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
3736
packageName + ".SparkSQLUtils$AccumulatorWithStage",
38-
packageName + ".Spark212PlanSerializer"
37+
packageName + ".Spark212PlanSerializer",
38+
packageName + ".SparkPlanInfoUtils"
3939
};
4040
}
4141

@@ -104,29 +104,15 @@ public static void exit(
104104
if (planInfo.metadata().size() == 0
105105
&& (Config.get().isDataJobsParseSparkPlanEnabled()
106106
|| Config.get().isDataJobsExperimentalFeaturesEnabled())) {
107-
Spark212PlanSerializer planUtils = new Spark212PlanSerializer();
107+
Spark212PlanSerializer planSerializer = new Spark212PlanSerializer();
108108
Map<String, String> meta =
109-
JavaConverters.mapAsScalaMap(planUtils.extractFormattedProduct(plan))
109+
JavaConverters.mapAsScalaMap(planSerializer.extractFormattedProduct(plan))
110110
.toMap(Predef.$conforms());
111-
try {
112-
Constructor<?> targetCtor = null;
113-
for (Constructor<?> c : SparkPlanInfo.class.getConstructors()) {
114-
if (c.getParameterCount() == 5) {
115-
targetCtor = c;
116-
break;
117-
}
118-
}
119-
if (targetCtor != null) {
120-
Object newInst =
121-
targetCtor.newInstance(
122-
planInfo.nodeName(),
123-
planInfo.simpleString(),
124-
planInfo.children(),
125-
meta,
126-
planInfo.metrics());
127-
planInfo = (SparkPlanInfo) newInst;
128-
}
129-
} catch (Throwable ignored) {
111+
112+
SparkPlanInfoUtils planUtils = new SparkPlanInfoUtils();
113+
SparkPlanInfo newPlanInfo = planUtils.upsertSparkPlanInfoMetadata(planInfo, meta);
114+
if (newPlanInfo != null) {
115+
planInfo = newPlanInfo;
130116
}
131117
}
132118
}

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import datadog.trace.agent.tooling.InstrumenterModule;
88
import datadog.trace.api.Config;
99
import de.thetaphi.forbiddenapis.SuppressForbidden;
10-
import java.lang.reflect.Constructor;
1110
import net.bytebuddy.asm.Advice;
1211
import org.apache.spark.SparkContext;
1312
import org.apache.spark.sql.execution.SparkPlan;
@@ -35,7 +34,8 @@ public String[] helperClassNames() {
3534
packageName + ".SparkSQLUtils",
3635
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
3736
packageName + ".SparkSQLUtils$AccumulatorWithStage",
38-
packageName + ".Spark213PlanSerializer"
37+
packageName + ".Spark213PlanSerializer",
38+
packageName + ".SparkPlanInfoUtils"
3939
};
4040
}
4141

@@ -105,28 +105,14 @@ public static void exit(
105105
if (planInfo.metadata().size() == 0
106106
&& (Config.get().isDataJobsParseSparkPlanEnabled()
107107
|| Config.get().isDataJobsExperimentalFeaturesEnabled())) {
108-
Spark213PlanSerializer planUtils = new Spark213PlanSerializer();
108+
Spark213PlanSerializer planSerializer = new Spark213PlanSerializer();
109109
Map<String, String> meta =
110-
HashMap.from(JavaConverters.asScala(planUtils.extractFormattedProduct(plan)));
111-
try {
112-
Constructor<?> targetCtor = null;
113-
for (Constructor<?> c : SparkPlanInfo.class.getConstructors()) {
114-
if (c.getParameterCount() == 5) {
115-
targetCtor = c;
116-
break;
117-
}
118-
}
119-
if (targetCtor != null) {
120-
Object newInst =
121-
targetCtor.newInstance(
122-
planInfo.nodeName(),
123-
planInfo.simpleString(),
124-
planInfo.children(),
125-
meta,
126-
planInfo.metrics());
127-
planInfo = (SparkPlanInfo) newInst;
128-
}
129-
} catch (Throwable ignored) {
110+
HashMap.from(JavaConverters.asScala(planSerializer.extractFormattedProduct(plan)));
111+
112+
SparkPlanInfoUtils planUtils = new SparkPlanInfoUtils();
113+
SparkPlanInfo newPlanInfo = planUtils.upsertSparkPlanInfoMetadata(planInfo, meta);
114+
if (newPlanInfo != null) {
115+
planInfo = newPlanInfo;
130116
}
131117
}
132118
}

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkPlanSerializer.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public abstract class AbstractSparkPlanSerializer {
6060

6161
private final MethodHandles methodLoader = new MethodHandles(ClassLoader.getSystemClassLoader());
6262
private final MethodHandle getSimpleString =
63-
methodLoader.method(TreeNode.class, "simpleString", new Class[] {int.class});
63+
methodLoader.method(TreeNode.class, "simpleString", int.class);
6464
private final MethodHandle getSimpleStringLegacy =
6565
methodLoader.method(TreeNode.class, "simpleString");
6666

@@ -156,25 +156,20 @@ protected Object safeParseObjectToJson(Object value, int depth) {
156156
}
157157

158158
private String getSimpleString(TreeNode value) {
159-
Object simpleString = null;
160-
161159
if (getSimpleString != null) {
162-
try {
163-
simpleString = getSimpleString.invoke(value, MAX_LENGTH);
164-
} catch (Throwable e) {
160+
String simpleString = methodLoader.invoke(getSimpleString, value, MAX_LENGTH);
161+
if (simpleString != null) {
162+
return simpleString;
165163
}
166164
}
167165

168166
if (getSimpleStringLegacy != null) {
169-
try {
170-
simpleString = getSimpleStringLegacy.invoke(value);
171-
} catch (Throwable e) {
167+
String simpleString = methodLoader.invoke(getSimpleStringLegacy, value);
168+
if (simpleString != null) {
169+
return simpleString;
172170
}
173171
}
174172

175-
if (simpleString != null && simpleString instanceof String) {
176-
return (String) simpleString;
177-
}
178173
return null;
179174
}
180175

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package datadog.trace.instrumentation.spark;
2+
3+
import datadog.trace.util.MethodHandles;
4+
import java.lang.invoke.MethodHandle;
5+
import java.util.Arrays;
6+
import java.util.List;
7+
import org.apache.spark.sql.execution.SparkPlanInfo;
8+
import scala.Option;
9+
import scala.collection.Seq;
10+
11+
public class SparkPlanInfoUtils {
12+
private final MethodHandles methodLoader = new MethodHandles(ClassLoader.getSystemClassLoader());
13+
14+
private final MethodHandle[] constructors =
15+
new MethodHandle[] {
16+
methodLoader.constructor(
17+
SparkPlanInfo.class,
18+
String.class,
19+
String.class,
20+
scala.collection.immutable.Seq.class,
21+
scala.collection.immutable.Map.class,
22+
scala.collection.immutable.Seq.class),
23+
methodLoader.constructor(
24+
SparkPlanInfo.class,
25+
String.class,
26+
String.class,
27+
Seq.class,
28+
scala.collection.immutable.Map.class,
29+
Seq.class),
30+
};
31+
private final MethodHandle databricksConstructor =
32+
methodLoader.constructor(
33+
SparkPlanInfo.class,
34+
String.class,
35+
String.class,
36+
Seq.class,
37+
scala.collection.immutable.Map.class,
38+
Seq.class,
39+
Option.class,
40+
String.class,
41+
Option.class);
42+
43+
public SparkPlanInfo upsertSparkPlanInfoMetadata(
44+
SparkPlanInfo planInfo, scala.collection.immutable.Map<String, String> meta) {
45+
// Attempt to create a new SparkPlanInfo with additional metadata replaced
46+
// Since the fields are immutable we must instantiate a new SparkPlanInfo to do this
47+
48+
Object[] standardArgs;
49+
try {
50+
standardArgs =
51+
new Object[] {
52+
SparkPlanInfo.class.getMethod("nodeName").invoke(planInfo),
53+
SparkPlanInfo.class.getMethod("simpleString").invoke(planInfo),
54+
SparkPlanInfo.class.getMethod("children").invoke(planInfo),
55+
meta,
56+
SparkPlanInfo.class.getMethod("metrics").invoke(planInfo)
57+
};
58+
} catch (Throwable t) {
59+
return null;
60+
}
61+
62+
if (databricksConstructor != null) {
63+
List<Object> databricksArgs = Arrays.asList(standardArgs);
64+
try {
65+
databricksArgs.add(SparkPlanInfo.class.getMethod("estRowCount").invoke(planInfo));
66+
databricksArgs.add(SparkPlanInfo.class.getMethod("rddScopeId").invoke(planInfo));
67+
databricksArgs.add(SparkPlanInfo.class.getMethod("explainId").invoke(planInfo));
68+
} catch (Throwable ignored) {
69+
}
70+
71+
SparkPlanInfo newPlan = methodLoader.invoke(databricksConstructor, databricksArgs.toArray());
72+
if (newPlan != null) {
73+
return newPlan;
74+
}
75+
}
76+
77+
for (MethodHandle constructor : constructors) {
78+
if (constructor != null) {
79+
SparkPlanInfo newPlan = methodLoader.invoke(constructor, standardArgs);
80+
if (newPlan != null) {
81+
return newPlan;
82+
}
83+
}
84+
}
85+
86+
return null;
87+
}
88+
}

0 commit comments

Comments
 (0)