Skip to content

Commit 52438a5

Browse files
committed
set OL if detected
Signed-off-by: Maciej Obuchowski <[email protected]> add OL instrumentation class Signed-off-by: Maciej Obuchowski <[email protected]> take care of sparkconf/ol lifecycle Signed-off-by: Maciej Obuchowski <[email protected]>
1 parent 51813bd commit 52438a5

File tree

3 files changed

+211
-2
lines changed

3 files changed

+211
-2
lines changed

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.Optional;
3737
import java.util.Properties;
3838
import java.util.UUID;
39+
import java.util.function.Consumer;
3940
import org.apache.spark.ExceptionFailure;
4041
import org.apache.spark.SparkConf;
4142
import org.apache.spark.TaskFailedReason;
@@ -51,6 +52,7 @@
5152
import org.apache.spark.sql.streaming.StateOperatorProgress;
5253
import org.apache.spark.sql.streaming.StreamingQueryListener;
5354
import org.apache.spark.sql.streaming.StreamingQueryProgress;
55+
import org.apache.spark.util.Utils;
5456
import org.slf4j.Logger;
5557
import org.slf4j.LoggerFactory;
5658
import scala.Tuple2;
@@ -68,12 +70,16 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
6870
private static final Logger log = LoggerFactory.getLogger(AbstractDatadogSparkListener.class);
6971
private static final ObjectMapper objectMapper = new ObjectMapper();
7072
public static volatile AbstractDatadogSparkListener listener = null;
73+
public static volatile SparkListenerInterface openLineageSparkListener = null;
74+
public static volatile SparkConf openLineageSparkConf = null;
75+
7176
public static volatile boolean finishTraceOnApplicationEnd = true;
7277
public static volatile boolean isPysparkShell = false;
7378

7479
private final int MAX_COLLECTION_SIZE = 5000;
7580
private final int MAX_ACCUMULATOR_SIZE = 50000;
7681
private final String RUNTIME_TAGS_PREFIX = "spark.datadog.tags.";
82+
private static final String AGENT_OL_ENDPOINT = "openlineage/api/v1/lineage";
7783

7884
private final SparkConf sparkConf;
7985
private final String sparkVersion;
@@ -109,6 +115,7 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
109115
private final Map<Long, SparkSQLUtils.AccumulatorWithStage> accumulators =
110116
new RemoveEldestHashMap<>(MAX_ACCUMULATOR_SIZE);
111117

118+
private volatile boolean isStreamingJob = false;
112119
private final boolean isRunningOnDatabricks;
113120
private final String databricksClusterName;
114121
private final String databricksServiceName;
@@ -151,8 +158,62 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
151158
finishApplication(System.currentTimeMillis(), null, 0, null);
152159
}
153160
}));
161+
}
162+
163+
static void setupSparkConf(SparkConf sparkConf) {
164+
sparkConf.set("spark.openlineage.transport.type", "composite");
165+
sparkConf.set("spark.openlineage.transport.continueOnFailure", "true");
166+
sparkConf.set("spark.openlineage.transport.transports.agent.type", "http");
167+
sparkConf.set("spark.openlineage.transport.transports.agent.url", getAgentHttpUrl());
168+
sparkConf.set("spark.openlineage.transport.transports.agent.endpoint", AGENT_OL_ENDPOINT);
169+
sparkConf.set("spark.openlineage.transport.transports.agent.compression", "gzip");
170+
sparkConf.set(
171+
"spark.openlineage.run.tags",
172+
"_dd.trace_id:"
173+
+ listener.applicationSpan.context().getTraceId().toString()
174+
+ ";_dd.ol_intake.emit_spans:false");
175+
for (Tuple2<String, String> tuple : sparkConf.getAll()) {
176+
log.error("Set Spark conf: Key: " + tuple._1 + ", Value: " + tuple._2);
177+
}
178+
}
179+
180+
public void setupOpenLineage() {
181+
log.error("Setting up OpenLineage-Datadog integration");
182+
if (openLineageSparkListener != null) {
183+
log.error("No init needed");
184+
setupSparkConf(openLineageSparkConf);
185+
return;
186+
}
187+
188+
String className = "io.openlineage.spark.agent.OpenLineageSparkListener";
189+
Class clazz;
190+
try {
191+
try {
192+
clazz = Class.forName(className, true, Thread.currentThread().getContextClassLoader());
193+
} catch (ClassNotFoundException e) {
194+
clazz = Class.forName(className, true, Utils.class.getClassLoader());
195+
}
196+
} catch (ClassNotFoundException e) {
197+
log.info("OpenLineage integration is not present on the classpath");
198+
return;
199+
}
200+
201+
openLineageSparkConf = sparkConf;
202+
if (clazz == null) {
203+
log.info("OpenLineage integration is not present on the classpath: class is null");
204+
return;
205+
}
206+
try {
207+
setupSparkConf(openLineageSparkConf);
208+
openLineageSparkListener =
209+
(SparkListenerInterface)
210+
clazz.getConstructor(SparkConf.class).newInstance(openLineageSparkConf);
154211

155-
log.info("Created datadog spark listener: {}", this.getClass().getSimpleName());
212+
log.info(
213+
"Created OL spark listener: {}", openLineageSparkListener.getClass().getSimpleName());
214+
} catch (Exception e) {
215+
log.warn("Failed to instantiate OL Spark Listener: {}", e.toString());
216+
}
156217
}
157218

158219
/** Resource name of the spark job. Provide an implementation based on a specific scala version */
@@ -183,6 +244,8 @@ private void initApplicationSpanIfNotInitialized() {
183244
return;
184245
}
185246

247+
log.error("Starting tracer application span.");
248+
186249
AgentTracer.SpanBuilder builder = buildSparkSpan("spark.application", null);
187250

188251
if (applicationStart != null) {
@@ -202,6 +265,9 @@ private void initApplicationSpanIfNotInitialized() {
202265
applicationSpan = builder.start();
203266
setDataJobsSamplingPriority(applicationSpan);
204267
applicationSpan.setMeasured(true);
268+
// We need to set it up after we create application span to have correlation.
269+
setupOpenLineage();
270+
notifyOl(x -> openLineageSparkListener.onApplicationStart(x), applicationStart);
205271
}
206272

207273
private void captureOpenlineageContextIfPresent(AgentTracer.SpanBuilder builder) {
@@ -233,6 +299,7 @@ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
233299
log.info(
234300
"Received spark application end event, finish trace on this event: {}",
235301
finishTraceOnApplicationEnd);
302+
notifyOl(x -> openLineageSparkListener.onApplicationEnd(x), applicationEnd);
236303

237304
if (finishTraceOnApplicationEnd) {
238305
finishApplication(applicationEnd.time(), null, 0, null);
@@ -405,6 +472,7 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) {
405472
if (sqlSpan != null) {
406473
jobSpanBuilder.asChildOf(sqlSpan.context());
407474
} else if (batchKey != null) {
475+
isStreamingJob = true;
408476
AgentSpan batchSpan =
409477
getOrCreateStreamingBatchSpan(batchKey, jobStart.time(), jobStart.properties());
410478
jobSpanBuilder.asChildOf(batchSpan.context());
@@ -426,6 +494,7 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) {
426494
stageToJob.put(stageId, jobStart.jobId());
427495
}
428496
jobSpans.put(jobStart.jobId(), jobSpan);
497+
notifyOl(x -> openLineageSparkListener.onJobStart(x), jobStart);
429498
}
430499

431500
@Override
@@ -456,6 +525,7 @@ public synchronized void onJobEnd(SparkListenerJobEnd jobEnd) {
456525
if (metrics != null) {
457526
metrics.setSpanMetrics(jobSpan);
458527
}
528+
notifyOl(x -> openLineageSparkListener.onJobEnd(x), jobEnd);
459529

460530
jobSpan.finish(jobEnd.time() * 1000);
461531
}
@@ -624,6 +694,8 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
624694

625695
Properties props = stageProperties.get(stageSpanKey);
626696
sendTaskSpan(stageSpan, taskEnd, props);
697+
698+
notifyOl(x -> openLineageSparkListener.onTaskEnd(x), taskEnd);
627699
}
628700

629701
private void sendTaskSpan(
@@ -705,6 +777,20 @@ public void onOtherEvent(SparkListenerEvent event) {
705777
updateAdaptiveSQLPlan(event);
706778
}
707779

780+
private <T extends SparkListenerEvent> void notifyOl(Consumer<T> ol, T event) {
781+
if (isRunningOnDatabricks || isStreamingJob) {
782+
log.error("Not emitting event when running on databricks or on streaming jobs");
783+
return;
784+
}
785+
initApplicationSpanIfNotInitialized();
786+
if (openLineageSparkListener != null) {
787+
log.error("Notifying with event `{}`", event.getClass().getCanonicalName());
788+
ol.accept(event);
789+
} else {
790+
log.error("OpenLineageSparkListener is null");
791+
}
792+
}
793+
708794
private static final Class<?> adaptiveExecutionUpdateClass;
709795
private static final MethodHandle adaptiveExecutionIdMethod;
710796
private static final MethodHandle adaptiveSparkPlanMethod;
@@ -753,6 +839,7 @@ private synchronized void updateAdaptiveSQLPlan(SparkListenerEvent event) {
753839
private synchronized void onSQLExecutionStart(SparkListenerSQLExecutionStart sqlStart) {
754840
sqlPlans.put(sqlStart.executionId(), sqlStart.sparkPlanInfo());
755841
sqlQueries.put(sqlStart.executionId(), sqlStart);
842+
notifyOl(x -> openLineageSparkListener.onOtherEvent(x), sqlStart);
756843
}
757844

758845
private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd) {
@@ -765,6 +852,7 @@ private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd)
765852
if (metrics != null) {
766853
metrics.setSpanMetrics(span);
767854
}
855+
notifyOl(x -> openLineageSparkListener.onOtherEvent(x), sqlEnd);
768856

769857
span.finish(sqlEnd.time() * 1000);
770858
}
@@ -1260,6 +1348,15 @@ private static String getDatabricksRunName(SparkConf conf) {
12601348
return null;
12611349
}
12621350

1351+
private static String getAgentHttpUrl() {
1352+
StringBuilder sb =
1353+
new StringBuilder("http://")
1354+
.append(Config.get().getAgentHost())
1355+
.append(":")
1356+
.append(Config.get().getAgentPort());
1357+
return sb.toString();
1358+
}
1359+
12631360
@SuppressForbidden // called at most once per spark application
12641361
private static String removeUuidFromEndOfString(String input) {
12651362
return input.replaceAll(

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@
88

99
import datadog.trace.agent.tooling.Instrumenter;
1010
import datadog.trace.agent.tooling.InstrumenterModule;
11+
import datadog.trace.api.Config;
1112
import net.bytebuddy.asm.Advice;
1213
import org.apache.spark.deploy.SparkSubmitArguments;
14+
import org.apache.spark.scheduler.SparkListenerInterface;
15+
import org.slf4j.LoggerFactory;
1316

1417
public abstract class AbstractSparkInstrumentation extends InstrumenterModule.Tracing
1518
implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice {
@@ -28,7 +31,9 @@ public String[] knownMatchingTypes() {
2831
return new String[] {
2932
"org.apache.spark.SparkContext",
3033
"org.apache.spark.deploy.SparkSubmit",
31-
"org.apache.spark.deploy.yarn.ApplicationMaster"
34+
"org.apache.spark.deploy.yarn.ApplicationMaster",
35+
"org.apache.spark.util.SparkClassUtils",
36+
"org.apache.spark.scheduler.LiveListenerBus"
3237
};
3338
}
3439

@@ -55,6 +60,15 @@ public void methodAdvice(MethodTransformer transformer) {
5560
.and(named("finish"))
5661
.and(isDeclaredBy(named("org.apache.spark.deploy.yarn.ApplicationMaster"))),
5762
AbstractSparkInstrumentation.class.getName() + "$YarnFinishAdvice");
63+
64+
// LiveListenerBus class is used when running in a YARN cluster
65+
transformer.applyAdvice(
66+
isMethod()
67+
.and(named("addToSharedQueue"))
68+
// .and(takesArgument(0,
69+
// named("org.apache.spark.scheduler.SparkListenerInterface")))
70+
.and(isDeclaredBy(named("org.apache.spark.scheduler.LiveListenerBus"))),
71+
AbstractSparkInstrumentation.class.getName() + "$LiveListenerBusAdvice");
5872
}
5973

6074
public static class PrepareSubmitEnvAdvice {
@@ -100,4 +114,22 @@ public static void enter(@Advice.Argument(1) int exitCode, @Advice.Argument(2) S
100114
}
101115
}
102116
}
117+
118+
public static class LiveListenerBusAdvice {
119+
@Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class)
120+
public static boolean enter(@Advice.Argument(0) SparkListenerInterface listener) {
121+
if (listener == null || listener.getClass().getCanonicalName() == null) {
122+
return false;
123+
}
124+
if (listener
125+
.getClass()
126+
.getCanonicalName()
127+
.equals("io.openlineage.spark.agent.OpenLineageSparkListener")) {
128+
LoggerFactory.getLogger(Config.class)
129+
.debug("Detected OL listener, skipping initialization");
130+
return true;
131+
}
132+
return false;
133+
}
134+
}
103135
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package datadog.trace.instrumentation.spark;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
4+
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
5+
import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy;
6+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
7+
8+
import com.google.auto.service.AutoService;
9+
import datadog.trace.agent.tooling.Instrumenter;
10+
import datadog.trace.agent.tooling.InstrumenterModule;
11+
import datadog.trace.api.Config;
12+
import java.lang.reflect.Field;
13+
import net.bytebuddy.asm.Advice;
14+
import org.apache.spark.SparkConf;
15+
import org.apache.spark.scheduler.SparkListenerInterface;
16+
import org.slf4j.LoggerFactory;
17+
18+
@AutoService(InstrumenterModule.class)
19+
public class OpenLineageInstrumentation extends InstrumenterModule.Tracing
20+
implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice {
21+
22+
public OpenLineageInstrumentation() {
23+
super("openlineage-spark");
24+
}
25+
26+
@Override
27+
public String[] helperClassNames() {
28+
return new String[] {
29+
packageName + ".AbstractDatadogSparkListener",
30+
packageName + ".DatabricksParentContext",
31+
packageName + ".OpenlineageParentContext",
32+
packageName + ".RemoveEldestHashMap",
33+
packageName + ".SparkAggregatedTaskMetrics",
34+
packageName + ".SparkConfAllowList",
35+
packageName + ".SparkSQLUtils",
36+
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
37+
packageName + ".SparkSQLUtils$AccumulatorWithStage",
38+
};
39+
}
40+
41+
@Override
42+
public boolean defaultEnabled() {
43+
return true;
44+
}
45+
46+
@Override
47+
public String[] knownMatchingTypes() {
48+
return new String[] {
49+
"io.openlineage.spark.agent.OpenLineageSparkListener", "org.apache.spark.util.Utils"
50+
};
51+
}
52+
53+
@Override
54+
public void methodAdvice(MethodTransformer transformer) {
55+
// LiveListenerBus class is used when running in a YARN cluster
56+
transformer.applyAdvice(
57+
isConstructor()
58+
.and(isDeclaredBy(named("io.openlineage.spark.agent.OpenLineageSparkListener")))
59+
.and(takesArgument(0, named("org.apache.spark.SparkConf"))),
60+
OpenLineageInstrumentation.class.getName() + "$OpenLineageSparkListenerAdvice");
61+
}
62+
63+
public static class OpenLineageSparkListenerAdvice {
64+
@Advice.OnMethodExit(suppress = Throwable.class)
65+
public static void exit(@Advice.This Object self) throws IllegalAccessException {
66+
LoggerFactory.getLogger(Config.class).debug("Checking for OpenLineageSparkListener");
67+
try {
68+
Field conf = self.getClass().getDeclaredField("conf");
69+
conf.setAccessible(true);
70+
AbstractDatadogSparkListener.openLineageSparkConf = (SparkConf) conf.get(self);
71+
AbstractDatadogSparkListener.openLineageSparkListener = (SparkListenerInterface) self;
72+
LoggerFactory.getLogger(Config.class)
73+
.debug("Detected OpenLineageSparkListener, passed to DatadogSparkListener");
74+
} catch (NoSuchFieldException | IllegalAccessException e) {
75+
LoggerFactory.getLogger(Config.class)
76+
.debug("Failed to pass OpenLineageSparkListener to DatadogSparkListener", e);
77+
}
78+
}
79+
}
80+
}

0 commit comments

Comments
 (0)