52
52
import org .apache .spark .sql .streaming .StateOperatorProgress ;
53
53
import org .apache .spark .sql .streaming .StreamingQueryListener ;
54
54
import org .apache .spark .sql .streaming .StreamingQueryProgress ;
55
- import org .apache .spark .util .Utils ;
56
55
import org .slf4j .Logger ;
57
56
import org .slf4j .LoggerFactory ;
58
57
import scala .Tuple2 ;
@@ -87,6 +86,10 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
87
86
88
87
private final AgentTracer .TracerAPI tracer ;
89
88
89
+ // This is created by constructor, and used if we're not in other known
90
+ // parent context like Databricks, OpenLineage
91
+ private final PredeterminedTraceIdParentContext predeterminedTraceIdParentContext ;
92
+
90
93
private AgentSpan applicationSpan ;
91
94
private SparkListenerApplicationStart applicationStart ;
92
95
private final HashMap <String , AgentSpan > streamingBatchSpans = new HashMap <>();
@@ -142,6 +145,9 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
142
145
databricksClusterName = sparkConf .get ("spark.databricks.clusterUsageTags.clusterName" , null );
143
146
databricksServiceName = getDatabricksServiceName (sparkConf , databricksClusterName );
144
147
sparkServiceName = getSparkServiceName (sparkConf , isRunningOnDatabricks );
148
+ predeterminedTraceIdParentContext =
149
+ new PredeterminedTraceIdParentContext (
150
+ Config .get ().getIdGenerationStrategy ().generateTraceId ());
145
151
146
152
// If JVM exiting with System.exit(code), it bypass the code closing the application span
147
153
//
@@ -160,60 +166,23 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
160
166
}));
161
167
}
162
168
163
- static void setupSparkConf (SparkConf sparkConf ) {
164
- sparkConf .set ("spark.openlineage.transport.type" , "composite" );
165
- sparkConf .set ("spark.openlineage.transport.continueOnFailure" , "true" );
166
- sparkConf .set ("spark.openlineage.transport.transports.agent.type" , "http" );
167
- sparkConf .set ("spark.openlineage.transport.transports.agent.url" , getAgentHttpUrl ());
168
- sparkConf .set ("spark.openlineage.transport.transports.agent.endpoint" , AGENT_OL_ENDPOINT );
169
- sparkConf .set ("spark.openlineage.transport.transports.agent.compression" , "gzip" );
170
- sparkConf .set (
171
- "spark.openlineage.run.tags" ,
172
- "_dd.trace_id:"
173
- + listener .applicationSpan .context ().getTraceId ().toString ()
174
- + ";_dd.ol_intake.emit_spans:false" );
175
- for (Tuple2 <String , String > tuple : sparkConf .getAll ()) {
176
- log .error ("Set Spark conf: Key: " + tuple ._1 + ", Value: " + tuple ._2 );
177
- }
178
- }
179
-
180
- public void setupOpenLineage () {
181
- log .error ("Setting up OpenLineage-Datadog integration" );
169
+ public void setupOpenLineage (DDTraceId traceId ) {
170
+ log .error ("Setting up OpenLineage tags" );
182
171
if (openLineageSparkListener != null ) {
183
- log .error ("No init needed" );
184
- setupSparkConf (openLineageSparkConf );
172
+ openLineageSparkConf .set ("spark.openlineage.transport.type" , "composite" );
173
+ openLineageSparkConf .set ("spark.openlineage.transport.continueOnFailure" , "true" );
174
+ openLineageSparkConf .set ("spark.openlineage.transport.transports.agent.type" , "http" );
175
+ openLineageSparkConf .set (
176
+ "spark.openlineage.transport.transports.agent.url" , getAgentHttpUrl ());
177
+ openLineageSparkConf .set (
178
+ "spark.openlineage.transport.transports.agent.endpoint" , AGENT_OL_ENDPOINT );
179
+ openLineageSparkConf .set ("spark.openlineage.transport.transports.agent.compression" , "gzip" );
180
+ openLineageSparkConf .set (
181
+ "spark.openlineage.run.tags" ,
182
+ "_dd.trace_id:" + traceId .toString () + ";_dd.ol_intake.emit_spans:false" );
185
183
return ;
186
184
}
187
-
188
- String className = "io.openlineage.spark.agent.OpenLineageSparkListener" ;
189
- Class clazz ;
190
- try {
191
- try {
192
- clazz = Class .forName (className , true , Thread .currentThread ().getContextClassLoader ());
193
- } catch (ClassNotFoundException e ) {
194
- clazz = Class .forName (className , true , Utils .class .getClassLoader ());
195
- }
196
- } catch (ClassNotFoundException e ) {
197
- log .info ("OpenLineage integration is not present on the classpath" );
198
- return ;
199
- }
200
-
201
- openLineageSparkConf = sparkConf ;
202
- if (clazz == null ) {
203
- log .info ("OpenLineage integration is not present on the classpath: class is null" );
204
- return ;
205
- }
206
- try {
207
- setupSparkConf (openLineageSparkConf );
208
- openLineageSparkListener =
209
- (SparkListenerInterface )
210
- clazz .getConstructor (SparkConf .class ).newInstance (openLineageSparkConf );
211
-
212
- log .info (
213
- "Created OL spark listener: {}" , openLineageSparkListener .getClass ().getSimpleName ());
214
- } catch (Exception e ) {
215
- log .warn ("Failed to instantiate OL Spark Listener: {}" , e .toString ());
216
- }
185
+ log .error ("No OpenLineageSparkListener!" );
217
186
}
218
187
219
188
/** Resource name of the spark job. Provide an implementation based on a specific scala version */
@@ -237,6 +206,12 @@ public void setupOpenLineage() {
237
206
@ Override
238
207
public synchronized void onApplicationStart (SparkListenerApplicationStart applicationStart ) {
239
208
this .applicationStart = applicationStart ;
209
+
210
+ setupOpenLineage (
211
+ OpenlineageParentContext .from (sparkConf )
212
+ .map (context -> context .getTraceId ())
213
+ .orElse (predeterminedTraceIdParentContext .getTraceId ()));
214
+ notifyOl (x -> openLineageSparkListener .onApplicationStart (x ), applicationStart );
240
215
}
241
216
242
217
private void initApplicationSpanIfNotInitialized () {
@@ -260,38 +235,36 @@ private void initApplicationSpanIfNotInitialized() {
260
235
}
261
236
262
237
captureApplicationParameters (builder );
263
- captureOpenlineageContextIfPresent (builder );
238
+
239
+ Optional <OpenlineageParentContext > openlineageParentContext =
240
+ OpenlineageParentContext .from (sparkConf );
241
+ // We know we're not in Databricks context
242
+ if (openlineageParentContext .isPresent ()) {
243
+ captureOpenlineageContextIfPresent (builder , openlineageParentContext .get ());
244
+ } else {
245
+ builder .asChildOf (predeterminedTraceIdParentContext );
246
+ }
264
247
265
248
applicationSpan = builder .start ();
266
249
setDataJobsSamplingPriority (applicationSpan );
267
250
applicationSpan .setMeasured (true );
268
- // We need to set it up after we create application span to have correlation.
269
- setupOpenLineage ();
270
- notifyOl (x -> openLineageSparkListener .onApplicationStart (x ), applicationStart );
271
251
}
272
252
273
- private void captureOpenlineageContextIfPresent (AgentTracer . SpanBuilder builder ) {
274
- Optional < OpenlineageParentContext > openlineageParentContext =
275
- OpenlineageParentContext . from ( sparkConf );
253
+ private void captureOpenlineageContextIfPresent (
254
+ AgentTracer . SpanBuilder builder , OpenlineageParentContext context ) {
255
+ builder . asChildOf ( context );
276
256
277
- if (openlineageParentContext .isPresent ()) {
278
- OpenlineageParentContext context = openlineageParentContext .get ();
279
- builder .asChildOf (context );
257
+ builder .withSpanId (context .getChildRootSpanId ());
280
258
281
- builder .withSpanId (context .getChildRootSpanId ());
259
+ log .debug (
260
+ "Captured Openlineage context: {}, with child trace_id: {}, child root span id: {}" ,
261
+ context ,
262
+ context .getTraceId (),
263
+ context .getChildRootSpanId ());
282
264
283
- log .debug (
284
- "Captured Openlineage context: {}, with child trace_id: {}, child root span id: {}" ,
285
- context ,
286
- context .getTraceId (),
287
- context .getChildRootSpanId ());
288
-
289
- builder .withTag ("openlineage_parent_job_namespace" , context .getParentJobNamespace ());
290
- builder .withTag ("openlineage_parent_job_name" , context .getParentJobName ());
291
- builder .withTag ("openlineage_parent_run_id" , context .getParentRunId ());
292
- } else {
293
- log .debug ("Openlineage context not found" );
294
- }
265
+ builder .withTag ("openlineage_parent_job_namespace" , context .getParentJobNamespace ());
266
+ builder .withTag ("openlineage_parent_job_name" , context .getParentJobName ());
267
+ builder .withTag ("openlineage_parent_run_id" , context .getParentRunId ());
295
268
}
296
269
297
270
@ Override
@@ -779,15 +752,15 @@ public void onOtherEvent(SparkListenerEvent event) {
779
752
780
753
private <T extends SparkListenerEvent > void notifyOl (Consumer <T > ol , T event ) {
781
754
if (isRunningOnDatabricks || isStreamingJob ) {
782
- log .error ("Not emitting event when running on databricks or on streaming jobs" );
755
+ log .debug ("Not emitting event when running on databricks or on streaming jobs" );
783
756
return ;
784
757
}
785
- initApplicationSpanIfNotInitialized ();
786
758
if (openLineageSparkListener != null ) {
787
- log .error ("Notifying with event `{}`" , event .getClass ().getCanonicalName ());
759
+ log .debug (
760
+ "Passing event `{}` to OpenLineageSparkListener" , event .getClass ().getCanonicalName ());
788
761
ol .accept (event );
789
762
} else {
790
- log .error ("OpenLineageSparkListener is null" );
763
+ log .trace ("OpenLineageSparkListener is null" );
791
764
}
792
765
}
793
766
0 commit comments