19
19
import java .util .Collections ;
20
20
import java .util .List ;
21
21
import java .util .function .Consumer ;
22
+ import java .util .function .Function ;
22
23
23
24
import brave .Span ;
24
25
import brave .Tracer ;
26
+ import brave .Tracing ;
25
27
import brave .http .HttpClientHandler ;
26
28
import brave .http .HttpTracing ;
27
29
import brave .propagation .Propagation ;
28
30
import brave .propagation .TraceContext ;
29
31
import org .apache .commons .logging .Log ;
30
32
import org .apache .commons .logging .LogFactory ;
33
+ import org .reactivestreams .Publisher ;
34
+ import org .reactivestreams .Subscription ;
35
+ import reactor .core .CoreSubscriber ;
31
36
import reactor .core .publisher .Mono ;
37
+ import reactor .util .annotation .Nullable ;
38
+ import reactor .util .context .Context ;
32
39
33
40
import org .springframework .beans .BeansException ;
34
41
import org .springframework .beans .factory .BeanFactory ;
35
42
import org .springframework .beans .factory .config .BeanPostProcessor ;
43
+ import org .springframework .cloud .sleuth .instrument .reactor .ReactorSleuth ;
44
+ import org .springframework .core .io .buffer .DataBuffer ;
36
45
import org .springframework .web .client .RestClientException ;
37
46
import org .springframework .web .reactive .function .client .ClientRequest ;
38
47
import org .springframework .web .reactive .function .client .ClientResponse ;
@@ -90,9 +99,6 @@ private Consumer<List<ExchangeFilterFunction>> addTraceExchangeFilterFunctionIfN
90
99
final class TraceExchangeFilterFunction implements ExchangeFilterFunction {
91
100
92
101
private static final Log log = LogFactory .getLog (TraceExchangeFilterFunction .class );
93
-
94
- private static final String CLIENT_SPAN_KEY = "sleuth.webclient.clientSpan" ;
95
-
96
102
static final Propagation .Setter <ClientRequest .Builder , String > SETTER = new Propagation .Setter <ClientRequest .Builder , String >() {
97
103
@ Override
98
104
public void put (ClientRequest .Builder carrier , String key , String value ) {
@@ -111,12 +117,14 @@ public String toString() {
111
117
}
112
118
};
113
119
114
- public static ExchangeFilterFunction create ( BeanFactory beanFactory ) {
115
- return new TraceExchangeFilterFunction ( beanFactory );
116
- }
120
+ private static final String CLIENT_SPAN_KEY = "sleuth.webclient.clientSpan" ;
121
+
122
+ private static final String CANCELLED_SUBSCRIPTION_ERROR = "CANCELLED" ;
117
123
118
124
final BeanFactory beanFactory ;
119
125
126
+ final Function <? super Publisher <DataBuffer >, ? extends Publisher <DataBuffer >> scopePassingTransformer ;
127
+
120
128
Tracer tracer ;
121
129
122
130
HttpTracing httpTracing ;
@@ -127,86 +135,27 @@ public static ExchangeFilterFunction create(BeanFactory beanFactory) {
127
135
128
136
TraceExchangeFilterFunction (BeanFactory beanFactory ) {
129
137
this .beanFactory = beanFactory ;
138
+ this .scopePassingTransformer = ReactorSleuth
139
+ .scopePassingSpanOperator (beanFactory );
140
+ }
141
+
142
+ public static ExchangeFilterFunction create (BeanFactory beanFactory ) {
143
+ return new TraceExchangeFilterFunction (beanFactory );
130
144
}
131
145
132
146
@ Override
133
147
public Mono <ClientResponse > filter (ClientRequest request , ExchangeFunction next ) {
134
- final ClientRequest .Builder builder = ClientRequest .from (request );
135
- Mono <ClientResponse > exchange = Mono .defer (() -> next .exchange (builder .build ()))
136
- .cast (Object .class ).onErrorResume (Mono ::just )
137
- .zipWith (Mono .subscriberContext ()).flatMap (anyAndContext -> {
138
- if (log .isDebugEnabled ()) {
139
- log .debug ("Wrapping the context [" + anyAndContext + "]" );
140
- }
141
- Object any = anyAndContext .getT1 ();
142
- Span clientSpan = anyAndContext .getT2 ().get (CLIENT_SPAN_KEY );
143
- Mono <ClientResponse > continuation ;
144
- final Tracer .SpanInScope ws = tracer ().withSpanInScope (clientSpan );
145
- if (any instanceof Throwable ) {
146
- continuation = Mono .error ((Throwable ) any );
147
- }
148
- else {
149
- continuation = Mono .just ((ClientResponse ) any );
150
- }
151
- return continuation
152
- .doAfterSuccessOrError ((clientResponse , throwable1 ) -> {
153
- Throwable throwable = throwable1 ;
154
- if (clientResponse == null
155
- || clientResponse .statusCode () == null ) {
156
- if (log .isDebugEnabled ()) {
157
- log .debug (
158
- "No response was returned. Will close the span ["
159
- + clientSpan + "]" );
160
- }
161
- handleReceive (clientSpan , ws , clientResponse ,
162
- throwable );
163
- return ;
164
- }
165
- boolean error = clientResponse .statusCode ()
166
- .is4xxClientError ()
167
- || clientResponse .statusCode ().is5xxServerError ();
168
- if (error ) {
169
- if (log .isDebugEnabled ()) {
170
- log .debug (
171
- "Non positive status code was returned from the call. Will close the span ["
172
- + clientSpan + "]" );
173
- }
174
- throwable = new RestClientException (
175
- "Status code of the response is ["
176
- + clientResponse .statusCode ().value ()
177
- + "] and the reason is ["
178
- + clientResponse .statusCode ()
179
- .getReasonPhrase ()
180
- + "]" );
181
- }
182
- handleReceive (clientSpan , ws , clientResponse , throwable );
183
- });
184
- }).subscriberContext (c -> {
185
- if (log .isDebugEnabled ()) {
186
- log .debug ("Instrumenting WebClient call" );
187
- }
188
- Span parent = c .getOrDefault (Span .class , null );
189
- Span clientSpan = handler ().handleSend (injector (), builder , request ,
190
- tracer ().nextSpan ());
191
- if (log .isDebugEnabled ()) {
192
- log .debug ("Handled send of " + clientSpan );
193
- }
194
- if (parent == null ) {
195
- c = c .put (Span .class , clientSpan );
196
- if (log .isDebugEnabled ()) {
197
- log .debug ("Reactor Context got injected with the client span "
198
- + clientSpan );
199
- }
200
- }
201
- return c .put (CLIENT_SPAN_KEY , clientSpan );
202
- });
203
- return exchange ;
204
- }
148
+ ClientRequest .Builder builder = ClientRequest .from (request );
149
+ if (log .isDebugEnabled ()) {
150
+ log .debug ("Instrumenting WebClient call" );
151
+ }
152
+ Span span = handler ().handleSend (injector (), builder , request ,
153
+ tracer ().nextSpan ());
154
+ if (log .isDebugEnabled ()) {
155
+ log .debug ("Handled send of " + span );
156
+ }
205
157
206
- private void handleReceive (Span clientSpan , Tracer .SpanInScope ws ,
207
- ClientResponse clientResponse , Throwable throwable ) {
208
- handler ().handleReceive (clientResponse , throwable , clientSpan );
209
- ws .close ();
158
+ return new MonoWebClientTrace (next , builder .build (), this , span );
210
159
}
211
160
212
161
@ SuppressWarnings ("unchecked" )
@@ -241,6 +190,190 @@ TraceContext.Injector<ClientRequest.Builder> injector() {
241
190
return this .injector ;
242
191
}
243
192
193
+ private static final class MonoWebClientTrace extends Mono <ClientResponse > {
194
+
195
+ final ExchangeFunction next ;
196
+
197
+ final ClientRequest request ;
198
+
199
+ final Tracer tracer ;
200
+
201
+ final HttpClientHandler <ClientRequest , ClientResponse > handler ;
202
+
203
+ final TraceContext .Injector <ClientRequest .Builder > injector ;
204
+
205
+ final Tracing tracing ;
206
+
207
+ final Function <? super Publisher <DataBuffer >, ? extends Publisher <DataBuffer >> scopePassingTransformer ;
208
+
209
+ private final Span span ;
210
+
211
+ MonoWebClientTrace (ExchangeFunction next , ClientRequest request ,
212
+ TraceExchangeFilterFunction parent , Span span ) {
213
+ this .next = next ;
214
+ this .request = request ;
215
+ this .tracer = parent .tracer ();
216
+ this .handler = parent .handler ();
217
+ this .injector = parent .injector ();
218
+ this .tracing = parent .httpTracing ().tracing ();
219
+ this .scopePassingTransformer = parent .scopePassingTransformer ;
220
+ this .span = span ;
221
+ }
222
+
223
+ @ Override
224
+ public void subscribe (CoreSubscriber <? super ClientResponse > subscriber ) {
225
+
226
+ Context context = subscriber .currentContext ();
227
+
228
+ this .next .exchange (request ).subscribe (
229
+ new WebClientTracerSubscriber (subscriber , context , span , this ));
230
+ }
231
+
232
+ static final class WebClientTracerSubscriber
233
+ implements CoreSubscriber <ClientResponse > {
234
+
235
+ final CoreSubscriber <? super ClientResponse > actual ;
236
+
237
+ final Context context ;
238
+
239
+ final Span span ;
240
+
241
+ final Tracer .SpanInScope ws ;
242
+
243
+ final HttpClientHandler <ClientRequest , ClientResponse > handler ;
244
+
245
+ final Function <? super Publisher <DataBuffer >, ? extends Publisher <DataBuffer >> scopePassingTransformer ;
246
+
247
+ final Tracing tracing ;
248
+
249
+ boolean done ;
250
+
251
+ WebClientTracerSubscriber (CoreSubscriber <? super ClientResponse > actual ,
252
+ Context context , Span span , MonoWebClientTrace parent ) {
253
+ this .actual = actual ;
254
+ this .span = span ;
255
+ this .handler = parent .handler ;
256
+ this .tracing = parent .tracing ;
257
+ this .scopePassingTransformer = parent .scopePassingTransformer ;
258
+
259
+ if (!context .hasKey (Span .class )) {
260
+ context = context .put (Span .class , span );
261
+ if (log .isDebugEnabled ()) {
262
+ log .debug ("Reactor Context got injected with the client span "
263
+ + span );
264
+ }
265
+ }
266
+
267
+ this .context = context .put (CLIENT_SPAN_KEY , span );
268
+ this .ws = parent .tracer .withSpanInScope (span );
269
+
270
+ }
271
+
272
+ @ Override
273
+ public void onSubscribe (Subscription subscription ) {
274
+ this .actual .onSubscribe (new Subscription () {
275
+ @ Override
276
+ public void request (long n ) {
277
+ subscription .request (n );
278
+ }
279
+
280
+ @ Override
281
+ public void cancel () {
282
+ terminateSpanOnCancel ();
283
+ subscription .cancel ();
284
+ }
285
+ });
286
+ }
287
+
288
+ @ Override
289
+ public void onNext (ClientResponse response ) {
290
+ this .done = true ;
291
+ try {
292
+ // decorate response body
293
+ this .actual
294
+ .onNext (ClientResponse .from (response )
295
+ .body (response .bodyToFlux (DataBuffer .class )
296
+ .transform (this .scopePassingTransformer ))
297
+ .build ());
298
+ }
299
+ finally {
300
+ terminateSpan (response , null );
301
+ }
302
+ }
303
+
304
+ @ Override
305
+ public void onError (Throwable t ) {
306
+ try {
307
+ this .actual .onError (t );
308
+ }
309
+ finally {
310
+ terminateSpan (null , t );
311
+ }
312
+ }
313
+
314
+ @ Override
315
+ public void onComplete () {
316
+ try {
317
+ this .actual .onComplete ();
318
+ }
319
+ finally {
320
+ if (!this .done ) {
321
+ terminateSpan (null , null );
322
+ }
323
+ }
324
+ }
325
+
326
+ @ Override
327
+ public Context currentContext () {
328
+ return this .context ;
329
+ }
330
+
331
+ void handleReceive (Span clientSpan , Tracer .SpanInScope ws ,
332
+ ClientResponse clientResponse , Throwable throwable ) {
333
+ this .handler .handleReceive (clientResponse , throwable , clientSpan );
334
+ ws .close ();
335
+ }
336
+
337
+ void terminateSpanOnCancel () {
338
+ if (log .isDebugEnabled ()) {
339
+ log .debug ("Subscription was cancelled. Will close the span ["
340
+ + this .span + "]" );
341
+ }
342
+
343
+ this .span .tag ("error" , CANCELLED_SUBSCRIPTION_ERROR );
344
+ handleReceive (this .span , this .ws , null , null );
345
+ }
346
+
347
+ void terminateSpan (@ Nullable ClientResponse clientResponse ,
348
+ @ Nullable Throwable throwable ) {
349
+ if (clientResponse == null || clientResponse .statusCode () == null ) {
350
+ if (log .isDebugEnabled ()) {
351
+ log .debug ("No response was returned. Will close the span ["
352
+ + this .span + "]" );
353
+ }
354
+ handleReceive (this .span , this .ws , clientResponse , throwable );
355
+ return ;
356
+ }
357
+ boolean error = clientResponse .statusCode ().is4xxClientError ()
358
+ || clientResponse .statusCode ().is5xxServerError ();
359
+ if (error ) {
360
+ if (log .isDebugEnabled ()) {
361
+ log .debug (
362
+ "Non positive status code was returned from the call. Will close the span ["
363
+ + this .span + "]" );
364
+ }
365
+ throwable = new RestClientException ("Status code of the response is ["
366
+ + clientResponse .statusCode ().value ()
367
+ + "] and the reason is ["
368
+ + clientResponse .statusCode ().getReasonPhrase () + "]" );
369
+ }
370
+ handleReceive (this .span , this .ws , clientResponse , throwable );
371
+ }
372
+
373
+ }
374
+
375
+ }
376
+
244
377
static final class HttpAdapter
245
378
extends brave .http .HttpClientAdapter <ClientRequest , ClientResponse > {
246
379
0 commit comments