|
6 | 6 | import com.google.auto.service.AutoService; |
7 | 7 | import datadog.trace.agent.tooling.InstrumenterModule; |
8 | 8 | import datadog.trace.api.Config; |
| 9 | +import de.thetaphi.forbiddenapis.SuppressForbidden; |
| 10 | +import java.lang.reflect.Constructor; |
9 | 11 | import net.bytebuddy.asm.Advice; |
10 | 12 | import org.apache.spark.SparkContext; |
11 | 13 | import org.apache.spark.sql.execution.SparkPlan; |
|
14 | 16 | import org.slf4j.LoggerFactory; |
15 | 17 | import scala.Predef; |
16 | 18 | import scala.collection.JavaConverters; |
| 19 | +import scala.collection.immutable.Map; |
17 | 20 |
|
18 | 21 | @AutoService(InstrumenterModule.class) |
19 | 22 | public class Spark212Instrumentation extends AbstractSparkInstrumentation { |
@@ -94,21 +97,37 @@ public static void enter(@Advice.This SparkContext sparkContext) { |
94 | 97 |
|
95 | 98 | public static class SparkPlanInfoAdvice { |
96 | 99 | @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) |
| 100 | + @SuppressForbidden |
97 | 101 | public static void exit( |
98 | 102 | @Advice.Return(readOnly = false) SparkPlanInfo planInfo, |
99 | 103 | @Advice.Argument(0) SparkPlan plan) { |
100 | 104 | if (planInfo.metadata().size() == 0 |
101 | 105 | && (Config.get().isDataJobsParseSparkPlanEnabled() |
102 | 106 | || Config.get().isDataJobsExperimentalFeaturesEnabled())) { |
103 | 107 | Spark212PlanSerializer planUtils = new Spark212PlanSerializer(); |
104 | | - planInfo = |
105 | | - new SparkPlanInfo( |
106 | | - planInfo.nodeName(), |
107 | | - planInfo.simpleString(), |
108 | | - planInfo.children(), |
109 | | - JavaConverters.mapAsScalaMap(planUtils.extractFormattedProduct(plan)) |
110 | | - .toMap(Predef.$conforms()), |
111 | | - planInfo.metrics()); |
| 108 | + Map<String, String> meta = |
| 109 | + JavaConverters.mapAsScalaMap(planUtils.extractFormattedProduct(plan)) |
| 110 | + .toMap(Predef.$conforms()); |
| 111 | + try { |
| 112 | + Constructor<?> targetCtor = null; |
| 113 | + for (Constructor<?> c : SparkPlanInfo.class.getConstructors()) { |
| 114 | + if (c.getParameterCount() == 5) { |
| 115 | + targetCtor = c; |
| 116 | + break; |
| 117 | + } |
| 118 | + } |
| 119 | + if (targetCtor != null) { |
| 120 | + Object newInst = |
| 121 | + targetCtor.newInstance( |
| 122 | + planInfo.nodeName(), |
| 123 | + planInfo.simpleString(), |
| 124 | + planInfo.children(), |
| 125 | + meta, |
| 126 | + planInfo.metrics()); |
| 127 | + planInfo = (SparkPlanInfo) newInst; |
| 128 | + } |
| 129 | + } catch (Throwable ignored) { |
| 130 | + } |
112 | 131 | } |
113 | 132 | } |
114 | 133 | } |
|
0 commit comments