From 2f952283e8ad641b96f85f8786769fe19b856d9d Mon Sep 17 00:00:00 2001 From: David Dunn Date: Tue, 19 Feb 2019 14:16:20 -0500 Subject: [PATCH 1/4] Add support for tracing-aware Guava FutureCallbacks. --- .../java/com/palantir/tracing/Tracers.java | 82 +++++ .../com/palantir/tracing/TracersTest.java | 293 ++++++++++++++++++ 2 files changed, 375 insertions(+) diff --git a/tracing/src/main/java/com/palantir/tracing/Tracers.java b/tracing/src/main/java/com/palantir/tracing/Tracers.java index 93e344af5..4503b537e 100644 --- a/tracing/src/main/java/com/palantir/tracing/Tracers.java +++ b/tracing/src/main/java/com/palantir/tracing/Tracers.java @@ -16,11 +16,13 @@ package com.palantir.tracing; +import com.google.common.util.concurrent.FutureCallback; import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; +import org.checkerframework.checker.nullness.compatqual.NullableDecl; /** Utility methods for making {@link ExecutorService} and {@link Runnable} instances tracing-aware. */ public final class Tracers { @@ -104,6 +106,11 @@ public static Runnable wrap(Runnable delegate) { return new TracingAwareRunnable(delegate); } + /** Like {@link #wrap(Callable)}, but for Guava's FutureCallback. */ + public static FutureCallback wrap(FutureCallback delegate) { + return new TracingAwareFutureCallback<>(delegate); + } + /** * Like {@link #wrapWithNewTrace(String, ExecutorService)}, but with a default initial span operation. */ @@ -206,6 +213,50 @@ public static Runnable wrapWithNewTrace(String operation, Runnable delegate) { }; } + /** + * Like {@link #wrapWithNewTrace(Callable)}, but for Guava's FutureCallback. + */ + public static FutureCallback wrapWithNewTrace(FutureCallback delegate) { + return wrapWithNewTrace(ROOT_SPAN_OPERATION, delegate); + } + + /** + * Like {@link #wrapWithNewTrace(String, Callable)}, but for Guava's FutureCallback. + */ + public static FutureCallback wrapWithNewTrace(String operation, FutureCallback delegate) { + return new FutureCallback() { + @Override + public void onSuccess(@NullableDecl V result) { + // clear the existing trace and keep it around for restoration when we're done + Optional originalTrace = Tracer.getAndClearTraceIfPresent(); + + try { + Tracer.initTrace(Optional.empty(), Tracers.randomId()); + Tracer.startSpan(operation); + delegate.onSuccess(result); + } finally { + Tracer.fastCompleteSpan(); + restoreTrace(originalTrace); + } + } + + @Override + public void onFailure(Throwable throwable) { + // clear the existing trace and keep it around for restoration when we're done + Optional originalTrace = Tracer.getAndClearTraceIfPresent(); + + try { + Tracer.initTrace(Optional.empty(), Tracers.randomId()); + Tracer.startSpan(operation); + delegate.onFailure(throwable); + } finally { + Tracer.fastCompleteSpan(); + restoreTrace(originalTrace); + } + } + }; + } + /** * Like {@link #wrapWithAlternateTraceId(String, String, Runnable)}, but with a default initial span operation. */ @@ -297,6 +348,37 @@ public void run() { } } + /** + * Wrap a given guava future callback such that its execution operated with the {@link Trace thread-local Trace} of + * the thread the constructs the {@link TracingAwareFutureCallback} instance rather than the thread that executes + * the callback. + */ + private static class TracingAwareFutureCallback implements FutureCallback { + private final FutureCallback delegate; + private DeferredTracer deferredTracer; + + TracingAwareFutureCallback(FutureCallback delegate) { + this.delegate = delegate; + this.deferredTracer = new DeferredTracer(); + } + + @Override + public void onSuccess(@NullableDecl V result) { + deferredTracer.withTrace(() -> { + delegate.onSuccess(result); + return null; + }); + } + + @Override + public void onFailure(Throwable throwable) { + deferredTracer.withTrace(() -> { + delegate.onFailure(throwable); + return null; + }); + } + } + public interface ThrowingCallable { T call() throws E; } diff --git a/tracing/src/test/java/com/palantir/tracing/TracersTest.java b/tracing/src/test/java/com/palantir/tracing/TracersTest.java index 0b636a40a..d412435a6 100644 --- a/tracing/src/test/java/com/palantir/tracing/TracersTest.java +++ b/tracing/src/test/java/com/palantir/tracing/TracersTest.java @@ -20,6 +20,11 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.palantir.tracing.api.OpenSpan; import java.util.Collections; import java.util.HashSet; @@ -27,11 +32,13 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.checkerframework.checker.nullness.compatqual.NullableDecl; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -450,6 +457,292 @@ public void testWrapRunnableWithAlternateTraceId_traceStateRestoredToCleared() { assertThat(Tracer.hasTraceId()).isFalse(); } + @Test + public void testWrappingFutureCallback_futureCallbackTraceIsIsolated() throws Exception { + FutureCallback futureCallback = new FutureCallback() { + @Override + public void onSuccess(@NullableDecl Void result) { + Tracer.startSpan("inside"); // never completed + } + + @Override + public void onFailure(Throwable throwable) { + Tracer.startSpan("inside"); // never completed + } + }; + + ListeningExecutorService listeningExecutorService = + MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + ListenableFuture success = listeningExecutorService.submit(() -> null); + ListenableFuture failure = listeningExecutorService.submit(() -> { + throw new IllegalStateException(); + }); + + Tracer.startSpan("outside"); + FutureCallback successCallback = Tracers.wrap(futureCallback); + Futures.addCallback(success, successCallback, MoreExecutors.directExecutor()); + success.get(); + assertThat(Tracer.completeSpan().get().getOperation()).isEqualTo("outside"); + + Tracer.getAndClearTrace(); + Tracer.startSpan("outside"); + FutureCallback failureCallback = Tracers.wrap(futureCallback); + Futures.addCallback(failure, failureCallback, MoreExecutors.directExecutor()); + assertThatThrownBy(failure::get).isInstanceOf(ExecutionException.class); + assertThat(Tracer.completeSpan().get().getOperation()).isEqualTo("outside"); + } + + @Test + public void testWrappingFutureCallback_traceStateIsCapturedAtConstructionTime() throws Exception { + FutureCallback futureCallback = new FutureCallback() { + @Override + public void onSuccess(@NullableDecl Void result) { + assertThat(Tracer.completeSpan().get().getOperation()).isEqualTo("before-construction"); + } + + @Override + public void onFailure(Throwable throwable) { + assertThat(Tracer.completeSpan().get().getOperation()).isEqualTo("before-construction"); + } + }; + + ListeningExecutorService listeningExecutorService = + MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + ListenableFuture success = listeningExecutorService.submit(() -> null); + ListenableFuture failure = listeningExecutorService.submit(() -> { + throw new IllegalStateException(); + }); + + Tracer.startSpan("before-construction"); + FutureCallback successCallback = Tracers.wrap(futureCallback); + Tracer.startSpan("after-construction"); + Futures.addCallback(success, successCallback, MoreExecutors.directExecutor()); + success.get(); + + Tracer.startSpan("before-construction"); + FutureCallback failureCallback = Tracers.wrap(futureCallback); + Tracer.startSpan("after-construction"); + Futures.addCallback(failure, failureCallback, MoreExecutors.directExecutor()); + assertThatThrownBy(failure::get).isInstanceOf(ExecutionException.class); + } + + @Test + public void testWrapFutureCallbackWithNewTrace_traceStateInsideFutureCallbackIsIsolated() throws Exception { + List traceIds = Lists.newArrayList(); + + FutureCallback futureCallback = new FutureCallback() { + @Override + public void onSuccess(@NullableDecl Void result) { + traceIds.add(Tracer.getTraceId()); + } + + @Override + public void onFailure(Throwable throwable) { + traceIds.add(Tracer.getTraceId()); + } + }; + + ListeningExecutorService listeningExecutorService = + MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + ListenableFuture success = listeningExecutorService.submit(() -> null); + ListenableFuture failure = listeningExecutorService.submit(() -> { + throw new IllegalStateException(); + }); + + String traceIdBeforeConstruction = Tracer.getTraceId(); + Futures.addCallback(success, Tracers.wrapWithNewTrace(futureCallback), MoreExecutors.directExecutor()); + success.get(); + Futures.addCallback(success, Tracers.wrapWithNewTrace(futureCallback), MoreExecutors.directExecutor()); + success.get(); + + String successTraceIdFirstCall = traceIds.get(0); + String successTraceIdSecondCall = traceIds.get(1); + + String successTraceIdAfterCalls = Tracer.getTraceId(); + + assertThat(successTraceIdFirstCall) + .isNotEqualTo(traceIdBeforeConstruction) + .isNotEqualTo(successTraceIdAfterCalls) + .isNotEqualTo(successTraceIdSecondCall); + + assertThat(successTraceIdSecondCall) + .isNotEqualTo(traceIdBeforeConstruction) + .isNotEqualTo(successTraceIdAfterCalls); + + assertThat(traceIdBeforeConstruction) + .isEqualTo(successTraceIdAfterCalls); + + traceIds.clear(); + + Futures.addCallback(failure, Tracers.wrapWithNewTrace(futureCallback), MoreExecutors.directExecutor()); + assertThatThrownBy(failure::get).isInstanceOf(ExecutionException.class); + Futures.addCallback(failure, Tracers.wrapWithNewTrace(futureCallback), MoreExecutors.directExecutor()); + assertThatThrownBy(failure::get).isInstanceOf(ExecutionException.class); + + String failureTraceIdFirstCall = traceIds.get(0); + String failureTraceIdSecondCall = traceIds.get(1); + + String failureTraceIdAfterCalls = Tracer.getTraceId(); + + assertThat(failureTraceIdFirstCall) + .isNotEqualTo(traceIdBeforeConstruction) + .isNotEqualTo(failureTraceIdAfterCalls) + .isNotEqualTo(failureTraceIdSecondCall); + + assertThat(failureTraceIdSecondCall) + .isNotEqualTo(traceIdBeforeConstruction) + .isNotEqualTo(failureTraceIdAfterCalls); + + assertThat(traceIdBeforeConstruction) + .isEqualTo(failureTraceIdAfterCalls); + } + + @Test + public void testWrapFutureCallbackWithNewTrace_traceStateInsideFutureCallbackHasSpan() throws Exception { + List> spans = Lists.newArrayList(); + + FutureCallback futureCallback = new FutureCallback() { + @Override + public void onSuccess(@NullableDecl Void result) { + spans.add(getCurrentFullTrace()); + } + + @Override + public void onFailure(Throwable throwable) { + spans.add(getCurrentFullTrace()); + } + }; + + ListeningExecutorService listeningExecutorService = + MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + ListenableFuture success = listeningExecutorService.submit(() -> null); + ListenableFuture failure = listeningExecutorService.submit(() -> { + throw new IllegalStateException(); + }); + + Futures.addCallback(success, Tracers.wrapWithNewTrace(futureCallback), MoreExecutors.directExecutor()); + success.get(); + List successSpans = spans.get(0); + assertThat(successSpans).hasSize(1); + OpenSpan successSpan = successSpans.get(0); + assertThat(successSpan.getOperation()).isEqualTo("root"); + assertThat(successSpan.getParentSpanId()).isEmpty(); + + Futures.addCallback(failure, Tracers.wrapWithNewTrace(futureCallback), MoreExecutors.directExecutor()); + assertThatThrownBy(failure::get).isInstanceOf(ExecutionException.class); + List failureSpans = spans.get(1); + assertThat(failureSpans).hasSize(1); + OpenSpan failureSpan = failureSpans.get(0); + assertThat(failureSpan.getOperation()).isEqualTo("root"); + assertThat(failureSpan.getParentSpanId()).isEmpty(); + } + + @Test + public void testWrapFutureCallbackWithNewTrace_traceStateInsideFutureCallbackHasGivenSpan() throws Exception { + List> spans = Lists.newArrayList(); + + FutureCallback futureCallback = new FutureCallback() { + @Override + public void onSuccess(@NullableDecl Void result) { + spans.add(getCurrentFullTrace()); + } + + @Override + public void onFailure(Throwable throwable) { + spans.add(getCurrentFullTrace()); + } + }; + + ListeningExecutorService listeningExecutorService = + MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + ListenableFuture success = listeningExecutorService.submit(() -> null); + ListenableFuture failure = listeningExecutorService.submit(() -> { + throw new IllegalStateException(); + }); + + Futures.addCallback(success, Tracers.wrapWithNewTrace("someOperation", futureCallback), + MoreExecutors.directExecutor()); + success.get(); + List successSpans = spans.get(0); + assertThat(successSpans).hasSize(1); + OpenSpan successSpan = successSpans.get(0); + assertThat(successSpan.getOperation()).isEqualTo("someOperation"); + assertThat(successSpan.getParentSpanId()).isEmpty(); + + Futures.addCallback(failure, Tracers.wrapWithNewTrace(futureCallback), MoreExecutors.directExecutor()); + assertThatThrownBy(failure::get).isInstanceOf(ExecutionException.class); + List failureSpans = spans.get(0); + assertThat(failureSpans).hasSize(1); + OpenSpan failureSpan = failureSpans.get(0); + assertThat(failureSpan.getOperation()).isEqualTo("someOperation"); + assertThat(failureSpan.getParentSpanId()).isEmpty(); + } + + @Test + public void testWrapFutureCallbackWithNewTrace_traceStateRestoredWhenThrows() throws Exception { + String traceIdBeforeConstruction = Tracer.getTraceId(); + + FutureCallback futureCallback = new FutureCallback() { + @Override + public void onSuccess(@NullableDecl Void result) { + throw new IllegalStateException(); + } + + @Override + public void onFailure(Throwable throwable) { + throw new IllegalStateException(); + } + }; + + ListeningExecutorService listeningExecutorService = + MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + ListenableFuture success = listeningExecutorService.submit(() -> null); + ListenableFuture failure = listeningExecutorService.submit(() -> { + throw new IllegalStateException(); + }); + + Futures.addCallback(success, Tracers.wrapWithNewTrace(futureCallback), MoreExecutors.directExecutor()); + success.get(); + assertThat(Tracer.getTraceId()).isEqualTo(traceIdBeforeConstruction); + + Futures.addCallback(failure, Tracers.wrapWithNewTrace(futureCallback), MoreExecutors.directExecutor()); + assertThatThrownBy(failure::get).isInstanceOf(ExecutionException.class); + assertThat(Tracer.getTraceId()).isEqualTo(traceIdBeforeConstruction); + } + + @Test + public void testWrapFutureCallbackWithNewTrace_traceStateRestoredToCleared() throws Exception { + // Clear out the default initialized trace + Tracer.getAndClearTraceIfPresent(); + + FutureCallback futureCallback = new FutureCallback() { + @Override + public void onSuccess(@NullableDecl Void result) { + Tracer.startSpan("inside"); + } + + @Override + public void onFailure(Throwable throwable) { + Tracer.startSpan("inside"); + } + }; + + ListeningExecutorService listeningExecutorService = + MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + ListenableFuture success = listeningExecutorService.submit(() -> null); + ListenableFuture failure = listeningExecutorService.submit(() -> { + throw new IllegalStateException(); + }); + + Futures.addCallback(success, Tracers.wrapWithNewTrace(futureCallback), MoreExecutors.directExecutor()); + success.get(); + assertThat(Tracer.hasTraceId()).isFalse(); + + Futures.addCallback(failure, Tracers.wrapWithNewTrace(futureCallback), MoreExecutors.directExecutor()); + assertThatThrownBy(failure::get).isInstanceOf(ExecutionException.class); + assertThat(Tracer.hasTraceId()).isFalse(); + } + @Test public void testTraceIdGeneration() throws Exception { assertThat(Tracers.randomId()).hasSize(16); // fails with p=1/16 if generated string is not padded From c7e322d0827d9fc80a66b364b3c063cc37ca4d76 Mon Sep 17 00:00:00 2001 From: David Dunn Date: Wed, 20 Feb 2019 15:17:56 -0500 Subject: [PATCH 2/4] Addressing CR comments --- .../java/com/palantir/tracing/Tracers.java | 44 ---- .../com/palantir/tracing/TracersTest.java | 230 ++---------------- 2 files changed, 25 insertions(+), 249 deletions(-) diff --git a/tracing/src/main/java/com/palantir/tracing/Tracers.java b/tracing/src/main/java/com/palantir/tracing/Tracers.java index 4503b537e..9daa1895d 100644 --- a/tracing/src/main/java/com/palantir/tracing/Tracers.java +++ b/tracing/src/main/java/com/palantir/tracing/Tracers.java @@ -213,50 +213,6 @@ public static Runnable wrapWithNewTrace(String operation, Runnable delegate) { }; } - /** - * Like {@link #wrapWithNewTrace(Callable)}, but for Guava's FutureCallback. - */ - public static FutureCallback wrapWithNewTrace(FutureCallback delegate) { - return wrapWithNewTrace(ROOT_SPAN_OPERATION, delegate); - } - - /** - * Like {@link #wrapWithNewTrace(String, Callable)}, but for Guava's FutureCallback. - */ - public static FutureCallback wrapWithNewTrace(String operation, FutureCallback delegate) { - return new FutureCallback() { - @Override - public void onSuccess(@NullableDecl V result) { - // clear the existing trace and keep it around for restoration when we're done - Optional originalTrace = Tracer.getAndClearTraceIfPresent(); - - try { - Tracer.initTrace(Optional.empty(), Tracers.randomId()); - Tracer.startSpan(operation); - delegate.onSuccess(result); - } finally { - Tracer.fastCompleteSpan(); - restoreTrace(originalTrace); - } - } - - @Override - public void onFailure(Throwable throwable) { - // clear the existing trace and keep it around for restoration when we're done - Optional originalTrace = Tracer.getAndClearTraceIfPresent(); - - try { - Tracer.initTrace(Optional.empty(), Tracers.randomId()); - Tracer.startSpan(operation); - delegate.onFailure(throwable); - } finally { - Tracer.fastCompleteSpan(); - restoreTrace(originalTrace); - } - } - }; - } - /** * Like {@link #wrapWithAlternateTraceId(String, String, Runnable)}, but with a default initial span operation. */ diff --git a/tracing/src/test/java/com/palantir/tracing/TracersTest.java b/tracing/src/test/java/com/palantir/tracing/TracersTest.java index d412435a6..c4bef0579 100644 --- a/tracing/src/test/java/com/palantir/tracing/TracersTest.java +++ b/tracing/src/test/java/com/palantir/tracing/TracersTest.java @@ -18,6 +18,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.fail; import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; @@ -458,7 +459,7 @@ public void testWrapRunnableWithAlternateTraceId_traceStateRestoredToCleared() { } @Test - public void testWrappingFutureCallback_futureCallbackTraceIsIsolated() throws Exception { + public void testWrappingFutureCallback_success_futureCallbackTraceIsIsolated() throws Exception { FutureCallback futureCallback = new FutureCallback() { @Override public void onSuccess(@NullableDecl Void result) { @@ -467,280 +468,99 @@ public void onSuccess(@NullableDecl Void result) { @Override public void onFailure(Throwable throwable) { - Tracer.startSpan("inside"); // never completed + fail("Future should not fail"); } }; ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); ListenableFuture success = listeningExecutorService.submit(() -> null); - ListenableFuture failure = listeningExecutorService.submit(() -> { - throw new IllegalStateException(); - }); + Tracer.startSpan("outside"); FutureCallback successCallback = Tracers.wrap(futureCallback); - Futures.addCallback(success, successCallback, MoreExecutors.directExecutor()); + Futures.addCallback(success, successCallback, Executors.newSingleThreadExecutor()); success.get(); assertThat(Tracer.completeSpan().get().getOperation()).isEqualTo("outside"); - - Tracer.getAndClearTrace(); - Tracer.startSpan("outside"); - FutureCallback failureCallback = Tracers.wrap(futureCallback); - Futures.addCallback(failure, failureCallback, MoreExecutors.directExecutor()); - assertThatThrownBy(failure::get).isInstanceOf(ExecutionException.class); - assertThat(Tracer.completeSpan().get().getOperation()).isEqualTo("outside"); } @Test - public void testWrappingFutureCallback_traceStateIsCapturedAtConstructionTime() throws Exception { + public void testWrappingFutureCallback_failure_futureCallbackTraceIsIsolated() throws Exception { FutureCallback futureCallback = new FutureCallback() { @Override public void onSuccess(@NullableDecl Void result) { - assertThat(Tracer.completeSpan().get().getOperation()).isEqualTo("before-construction"); + fail("Future should not succeed"); } @Override public void onFailure(Throwable throwable) { - assertThat(Tracer.completeSpan().get().getOperation()).isEqualTo("before-construction"); + Tracer.startSpan("inside"); // never completed } }; ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); - ListenableFuture success = listeningExecutorService.submit(() -> null); ListenableFuture failure = listeningExecutorService.submit(() -> { throw new IllegalStateException(); }); - Tracer.startSpan("before-construction"); - FutureCallback successCallback = Tracers.wrap(futureCallback); - Tracer.startSpan("after-construction"); - Futures.addCallback(success, successCallback, MoreExecutors.directExecutor()); - success.get(); - - Tracer.startSpan("before-construction"); + Tracer.startSpan("outside"); FutureCallback failureCallback = Tracers.wrap(futureCallback); - Tracer.startSpan("after-construction"); - Futures.addCallback(failure, failureCallback, MoreExecutors.directExecutor()); - assertThatThrownBy(failure::get).isInstanceOf(ExecutionException.class); - } - - @Test - public void testWrapFutureCallbackWithNewTrace_traceStateInsideFutureCallbackIsIsolated() throws Exception { - List traceIds = Lists.newArrayList(); - - FutureCallback futureCallback = new FutureCallback() { - @Override - public void onSuccess(@NullableDecl Void result) { - traceIds.add(Tracer.getTraceId()); - } - - @Override - public void onFailure(Throwable throwable) { - traceIds.add(Tracer.getTraceId()); - } - }; - - ListeningExecutorService listeningExecutorService = - MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); - ListenableFuture success = listeningExecutorService.submit(() -> null); - ListenableFuture failure = listeningExecutorService.submit(() -> { - throw new IllegalStateException(); - }); - - String traceIdBeforeConstruction = Tracer.getTraceId(); - Futures.addCallback(success, Tracers.wrapWithNewTrace(futureCallback), MoreExecutors.directExecutor()); - success.get(); - Futures.addCallback(success, Tracers.wrapWithNewTrace(futureCallback), MoreExecutors.directExecutor()); - success.get(); - - String successTraceIdFirstCall = traceIds.get(0); - String successTraceIdSecondCall = traceIds.get(1); - - String successTraceIdAfterCalls = Tracer.getTraceId(); - - assertThat(successTraceIdFirstCall) - .isNotEqualTo(traceIdBeforeConstruction) - .isNotEqualTo(successTraceIdAfterCalls) - .isNotEqualTo(successTraceIdSecondCall); - - assertThat(successTraceIdSecondCall) - .isNotEqualTo(traceIdBeforeConstruction) - .isNotEqualTo(successTraceIdAfterCalls); - - assertThat(traceIdBeforeConstruction) - .isEqualTo(successTraceIdAfterCalls); - - traceIds.clear(); - - Futures.addCallback(failure, Tracers.wrapWithNewTrace(futureCallback), MoreExecutors.directExecutor()); - assertThatThrownBy(failure::get).isInstanceOf(ExecutionException.class); - Futures.addCallback(failure, Tracers.wrapWithNewTrace(futureCallback), MoreExecutors.directExecutor()); + Futures.addCallback(failure, failureCallback, Executors.newSingleThreadExecutor()); assertThatThrownBy(failure::get).isInstanceOf(ExecutionException.class); - - String failureTraceIdFirstCall = traceIds.get(0); - String failureTraceIdSecondCall = traceIds.get(1); - - String failureTraceIdAfterCalls = Tracer.getTraceId(); - - assertThat(failureTraceIdFirstCall) - .isNotEqualTo(traceIdBeforeConstruction) - .isNotEqualTo(failureTraceIdAfterCalls) - .isNotEqualTo(failureTraceIdSecondCall); - - assertThat(failureTraceIdSecondCall) - .isNotEqualTo(traceIdBeforeConstruction) - .isNotEqualTo(failureTraceIdAfterCalls); - - assertThat(traceIdBeforeConstruction) - .isEqualTo(failureTraceIdAfterCalls); - } - - @Test - public void testWrapFutureCallbackWithNewTrace_traceStateInsideFutureCallbackHasSpan() throws Exception { - List> spans = Lists.newArrayList(); - - FutureCallback futureCallback = new FutureCallback() { - @Override - public void onSuccess(@NullableDecl Void result) { - spans.add(getCurrentFullTrace()); - } - - @Override - public void onFailure(Throwable throwable) { - spans.add(getCurrentFullTrace()); - } - }; - - ListeningExecutorService listeningExecutorService = - MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); - ListenableFuture success = listeningExecutorService.submit(() -> null); - ListenableFuture failure = listeningExecutorService.submit(() -> { - throw new IllegalStateException(); - }); - - Futures.addCallback(success, Tracers.wrapWithNewTrace(futureCallback), MoreExecutors.directExecutor()); - success.get(); - List successSpans = spans.get(0); - assertThat(successSpans).hasSize(1); - OpenSpan successSpan = successSpans.get(0); - assertThat(successSpan.getOperation()).isEqualTo("root"); - assertThat(successSpan.getParentSpanId()).isEmpty(); - - Futures.addCallback(failure, Tracers.wrapWithNewTrace(futureCallback), MoreExecutors.directExecutor()); - assertThatThrownBy(failure::get).isInstanceOf(ExecutionException.class); - List failureSpans = spans.get(1); - assertThat(failureSpans).hasSize(1); - OpenSpan failureSpan = failureSpans.get(0); - assertThat(failureSpan.getOperation()).isEqualTo("root"); - assertThat(failureSpan.getParentSpanId()).isEmpty(); - } - - @Test - public void testWrapFutureCallbackWithNewTrace_traceStateInsideFutureCallbackHasGivenSpan() throws Exception { - List> spans = Lists.newArrayList(); - - FutureCallback futureCallback = new FutureCallback() { - @Override - public void onSuccess(@NullableDecl Void result) { - spans.add(getCurrentFullTrace()); - } - - @Override - public void onFailure(Throwable throwable) { - spans.add(getCurrentFullTrace()); - } - }; - - ListeningExecutorService listeningExecutorService = - MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); - ListenableFuture success = listeningExecutorService.submit(() -> null); - ListenableFuture failure = listeningExecutorService.submit(() -> { - throw new IllegalStateException(); - }); - - Futures.addCallback(success, Tracers.wrapWithNewTrace("someOperation", futureCallback), - MoreExecutors.directExecutor()); - success.get(); - List successSpans = spans.get(0); - assertThat(successSpans).hasSize(1); - OpenSpan successSpan = successSpans.get(0); - assertThat(successSpan.getOperation()).isEqualTo("someOperation"); - assertThat(successSpan.getParentSpanId()).isEmpty(); - - Futures.addCallback(failure, Tracers.wrapWithNewTrace(futureCallback), MoreExecutors.directExecutor()); - assertThatThrownBy(failure::get).isInstanceOf(ExecutionException.class); - List failureSpans = spans.get(0); - assertThat(failureSpans).hasSize(1); - OpenSpan failureSpan = failureSpans.get(0); - assertThat(failureSpan.getOperation()).isEqualTo("someOperation"); - assertThat(failureSpan.getParentSpanId()).isEmpty(); + assertThat(Tracer.completeSpan().get().getOperation()).isEqualTo("outside"); } @Test - public void testWrapFutureCallbackWithNewTrace_traceStateRestoredWhenThrows() throws Exception { - String traceIdBeforeConstruction = Tracer.getTraceId(); - + public void testWrappingFutureCallback_success_traceStateIsCapturedAtConstructionTime() throws Exception { FutureCallback futureCallback = new FutureCallback() { @Override public void onSuccess(@NullableDecl Void result) { - throw new IllegalStateException(); + assertThat(Tracer.completeSpan().get().getOperation()).isEqualTo("before-construction"); } @Override public void onFailure(Throwable throwable) { - throw new IllegalStateException(); + fail("Future should not fail"); } }; ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); ListenableFuture success = listeningExecutorService.submit(() -> null); - ListenableFuture failure = listeningExecutorService.submit(() -> { - throw new IllegalStateException(); - }); - Futures.addCallback(success, Tracers.wrapWithNewTrace(futureCallback), MoreExecutors.directExecutor()); + Tracer.startSpan("before-construction"); + FutureCallback successCallback = Tracers.wrap(futureCallback); + Tracer.startSpan("after-construction"); + Futures.addCallback(success, successCallback, Executors.newSingleThreadExecutor()); success.get(); - assertThat(Tracer.getTraceId()).isEqualTo(traceIdBeforeConstruction); - - Futures.addCallback(failure, Tracers.wrapWithNewTrace(futureCallback), MoreExecutors.directExecutor()); - assertThatThrownBy(failure::get).isInstanceOf(ExecutionException.class); - assertThat(Tracer.getTraceId()).isEqualTo(traceIdBeforeConstruction); } @Test - public void testWrapFutureCallbackWithNewTrace_traceStateRestoredToCleared() throws Exception { - // Clear out the default initialized trace - Tracer.getAndClearTraceIfPresent(); - + public void testWrappingFutureCallback_failure_traceStateIsCapturedAtConstructionTime() throws Exception { FutureCallback futureCallback = new FutureCallback() { @Override public void onSuccess(@NullableDecl Void result) { - Tracer.startSpan("inside"); + fail("Future should not succeed"); } @Override public void onFailure(Throwable throwable) { - Tracer.startSpan("inside"); + assertThat(Tracer.completeSpan().get().getOperation()).isEqualTo("before-construction"); } }; ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); - ListenableFuture success = listeningExecutorService.submit(() -> null); ListenableFuture failure = listeningExecutorService.submit(() -> { throw new IllegalStateException(); }); - Futures.addCallback(success, Tracers.wrapWithNewTrace(futureCallback), MoreExecutors.directExecutor()); - success.get(); - assertThat(Tracer.hasTraceId()).isFalse(); - - Futures.addCallback(failure, Tracers.wrapWithNewTrace(futureCallback), MoreExecutors.directExecutor()); + Tracer.startSpan("before-construction"); + FutureCallback failureCallback = Tracers.wrap(futureCallback); + Tracer.startSpan("after-construction"); + Futures.addCallback(failure, failureCallback, Executors.newSingleThreadExecutor()); assertThatThrownBy(failure::get).isInstanceOf(ExecutionException.class); - assertThat(Tracer.hasTraceId()).isFalse(); } @Test From e627b570dd5b8c445860738462b653967ccf5b35 Mon Sep 17 00:00:00 2001 From: David Dunn Date: Thu, 21 Feb 2019 15:10:38 -0500 Subject: [PATCH 3/4] Add operation names --- .../src/main/java/com/palantir/tracing/Tracers.java | 10 +++++----- .../test/java/com/palantir/tracing/TracersTest.java | 8 ++++---- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/tracing/src/main/java/com/palantir/tracing/Tracers.java b/tracing/src/main/java/com/palantir/tracing/Tracers.java index 49b8f170a..dc059050e 100644 --- a/tracing/src/main/java/com/palantir/tracing/Tracers.java +++ b/tracing/src/main/java/com/palantir/tracing/Tracers.java @@ -135,9 +135,9 @@ public static Runnable wrap(Runnable delegate) { return new TracingAwareRunnable(Optional.empty(), delegate); } - /** Like {@link #wrap(Callable)}, but for Guava's FutureCallback. */ - public static FutureCallback wrap(FutureCallback delegate) { - return new TracingAwareFutureCallback<>(delegate); + /** Like {@link #wrap(String, Callable)}, but for Guava's FutureCallback. */ + public static FutureCallback wrap(String operation, FutureCallback delegate) { + return new TracingAwareFutureCallback<>(operation, delegate); } /** @@ -368,9 +368,9 @@ private static class TracingAwareFutureCallback implements FutureCallback private final FutureCallback delegate; private DeferredTracer deferredTracer; - TracingAwareFutureCallback(FutureCallback delegate) { + TracingAwareFutureCallback(String operation, FutureCallback delegate) { this.delegate = delegate; - this.deferredTracer = new DeferredTracer(); + this.deferredTracer = new DeferredTracer(operation); } @Override diff --git a/tracing/src/test/java/com/palantir/tracing/TracersTest.java b/tracing/src/test/java/com/palantir/tracing/TracersTest.java index 1aab8bbea..fd868e584 100644 --- a/tracing/src/test/java/com/palantir/tracing/TracersTest.java +++ b/tracing/src/test/java/com/palantir/tracing/TracersTest.java @@ -528,7 +528,7 @@ public void onFailure(Throwable throwable) { Tracer.startSpan("outside"); - FutureCallback successCallback = Tracers.wrap(futureCallback); + FutureCallback successCallback = Tracers.wrap("successCallback", futureCallback); Futures.addCallback(success, successCallback, Executors.newSingleThreadExecutor()); success.get(); assertThat(Tracer.completeSpan().get().getOperation()).isEqualTo("outside"); @@ -555,7 +555,7 @@ public void onFailure(Throwable throwable) { }); Tracer.startSpan("outside"); - FutureCallback failureCallback = Tracers.wrap(futureCallback); + FutureCallback failureCallback = Tracers.wrap("failureCallback", futureCallback); Futures.addCallback(failure, failureCallback, Executors.newSingleThreadExecutor()); assertThatThrownBy(failure::get).isInstanceOf(ExecutionException.class); assertThat(Tracer.completeSpan().get().getOperation()).isEqualTo("outside"); @@ -580,7 +580,7 @@ public void onFailure(Throwable throwable) { ListenableFuture success = listeningExecutorService.submit(() -> null); Tracer.startSpan("before-construction"); - FutureCallback successCallback = Tracers.wrap(futureCallback); + FutureCallback successCallback = Tracers.wrap("successCallback", futureCallback); Tracer.startSpan("after-construction"); Futures.addCallback(success, successCallback, Executors.newSingleThreadExecutor()); success.get(); @@ -607,7 +607,7 @@ public void onFailure(Throwable throwable) { }); Tracer.startSpan("before-construction"); - FutureCallback failureCallback = Tracers.wrap(futureCallback); + FutureCallback failureCallback = Tracers.wrap("failureCallback", futureCallback); Tracer.startSpan("after-construction"); Futures.addCallback(failure, failureCallback, Executors.newSingleThreadExecutor()); assertThatThrownBy(failure::get).isInstanceOf(ExecutionException.class); From 00a7021f7af7c442e47fb28201b4fdc6bdf177d5 Mon Sep 17 00:00:00 2001 From: David Dunn Date: Fri, 22 Feb 2019 12:53:02 -0500 Subject: [PATCH 4/4] Fix concurrency with separate thread tests --- .../com/palantir/tracing/TracersTest.java | 239 +++++++++++++----- 1 file changed, 173 insertions(+), 66 deletions(-) diff --git a/tracing/src/test/java/com/palantir/tracing/TracersTest.java b/tracing/src/test/java/com/palantir/tracing/TracersTest.java index fd868e584..955b748bd 100644 --- a/tracing/src/test/java/com/palantir/tracing/TracersTest.java +++ b/tracing/src/test/java/com/palantir/tracing/TracersTest.java @@ -18,7 +18,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.assertj.core.api.Assertions.fail; import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; @@ -27,12 +26,14 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.palantir.tracing.api.OpenSpan; +import com.palantir.tracing.api.Span; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -509,108 +510,147 @@ public void testWrapRunnableWithAlternateTraceId_traceStateRestoredToCleared() { } @Test - public void testWrappingFutureCallback_success_futureCallbackTraceIsIsolated() throws Exception { - FutureCallback futureCallback = new FutureCallback() { - @Override - public void onSuccess(@NullableDecl Void result) { - Tracer.startSpan("inside"); // never completed - } - - @Override - public void onFailure(Throwable throwable) { - fail("Future should not fail"); - } - }; + public void testWrappingFutureCallback_futureCallbackTraceIsIsolated_success() throws Exception { + CompletionAwareFutureCallback futureCallback = createFutureCallbackWithFunction( + () -> Tracer.startSpan("inside")); ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); - ListenableFuture success = listeningExecutorService.submit(() -> null); - + ListenableFuture future = listeningExecutorService.submit(() -> null); Tracer.startSpan("outside"); - FutureCallback successCallback = Tracers.wrap("successCallback", futureCallback); - Futures.addCallback(success, successCallback, Executors.newSingleThreadExecutor()); - success.get(); + FutureCallback callback = Tracers.wrap("callback", futureCallback); + // Using direct executor to use same thread to verify callback doesn't modify thread state + Futures.addCallback(future, callback, MoreExecutors.directExecutor()); + + assertThat(futureCallback.waitForCompletion(10, TimeUnit.SECONDS)).isTrue(); + assertThat(futureCallback.wasSuccess()).isTrue(); + future.get(); + assertThat(Tracer.completeSpan().get().getOperation()).isEqualTo("outside"); } @Test - public void testWrappingFutureCallback_failure_futureCallbackTraceIsIsolated() throws Exception { - FutureCallback futureCallback = new FutureCallback() { - @Override - public void onSuccess(@NullableDecl Void result) { - fail("Future should not succeed"); - } - - @Override - public void onFailure(Throwable throwable) { - Tracer.startSpan("inside"); // never completed - } - }; + public void testWrappingFutureCallback_futureCallbackTraceIsIsolated_failure() throws Exception { + CompletionAwareFutureCallback futureCallback = createFutureCallbackWithFunction( + () -> Tracer.startSpan("inside")); ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); - ListenableFuture failure = listeningExecutorService.submit(() -> { + ListenableFuture future = listeningExecutorService.submit(() -> { throw new IllegalStateException(); }); Tracer.startSpan("outside"); - FutureCallback failureCallback = Tracers.wrap("failureCallback", futureCallback); - Futures.addCallback(failure, failureCallback, Executors.newSingleThreadExecutor()); - assertThatThrownBy(failure::get).isInstanceOf(ExecutionException.class); + FutureCallback callback = Tracers.wrap("callback", futureCallback); + // Using direct executor to use same thread to verify callback doesn't modify thread state + Futures.addCallback(future, callback, MoreExecutors.directExecutor()); + + assertThat(futureCallback.waitForCompletion(10, TimeUnit.SECONDS)).isTrue(); + assertThat(futureCallback.wasSuccess()).isFalse(); + assertThatThrownBy(future::get).isInstanceOf(ExecutionException.class); + assertThat(Tracer.completeSpan().get().getOperation()).isEqualTo("outside"); } @Test - public void testWrappingFutureCallback_success_traceStateIsCapturedAtConstructionTime() throws Exception { - FutureCallback futureCallback = new FutureCallback() { - @Override - public void onSuccess(@NullableDecl Void result) { - assertThat(Tracer.completeSpan().get().getOperation()).isEqualTo("before-construction"); - } + public void testWrappingFutureCallback_traceStateShowsCorrectlyParentedNewOperation_success_sameThread() + throws Exception { + AtomicReference span = new AtomicReference<>(); + CompletionAwareFutureCallback futureCallback = createFutureCallbackWithFunction( + () -> span.set(Tracer.completeSpan().get())); - @Override - public void onFailure(Throwable throwable) { - fail("Future should not fail"); - } - }; + ListeningExecutorService listeningExecutorService = + MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + ListenableFuture future = listeningExecutorService.submit(() -> null); + + OpenSpan beforeSpan = Tracer.startSpan("before-construction"); + FutureCallback callback = Tracers.wrap("callback", futureCallback); + Tracer.startSpan("after-construction"); + Futures.addCallback(future, callback, MoreExecutors.directExecutor()); + + assertThat(futureCallback.waitForCompletion(10, TimeUnit.SECONDS)).isTrue(); + assertThat(futureCallback.wasSuccess()).isTrue(); + future.get(); + + assertThat(span.get().getOperation()).isEqualTo("callback"); + assertThat(span.get().getParentSpanId().get()).isEqualTo(beforeSpan.getSpanId()); + } + + @Test + public void testWrappingFutureCallback_traceStateShowsCorrectlyParentedNewOperation_success_differentThread() + throws Exception { + AtomicReference span = new AtomicReference<>(); + CompletionAwareFutureCallback futureCallback = createFutureCallbackWithFunction( + () -> span.set(Tracer.completeSpan().get())); ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); - ListenableFuture success = listeningExecutorService.submit(() -> null); + ListenableFuture future = listeningExecutorService.submit(() -> null); - Tracer.startSpan("before-construction"); - FutureCallback successCallback = Tracers.wrap("successCallback", futureCallback); + OpenSpan beforeSpan = Tracer.startSpan("before-construction"); + FutureCallback callback = Tracers.wrap("callback", futureCallback); Tracer.startSpan("after-construction"); - Futures.addCallback(success, successCallback, Executors.newSingleThreadExecutor()); - success.get(); + Futures.addCallback(future, callback, Executors.newSingleThreadExecutor()); + + assertThat(futureCallback.waitForCompletion(10, TimeUnit.SECONDS)).isTrue(); + assertThat(futureCallback.wasSuccess()).isTrue(); + future.get(); + + assertThat(span.get().getOperation()).isEqualTo("callback"); + assertThat(span.get().getParentSpanId().get()).isEqualTo(beforeSpan.getSpanId()); } @Test - public void testWrappingFutureCallback_failure_traceStateIsCapturedAtConstructionTime() throws Exception { - FutureCallback futureCallback = new FutureCallback() { - @Override - public void onSuccess(@NullableDecl Void result) { - fail("Future should not succeed"); - } + public void testWrappingFutureCallback_traceStateShowsCorrectlyParentedNewOperation_failure_sameThread() + throws Exception { + AtomicReference span = new AtomicReference<>(); + CompletionAwareFutureCallback futureCallback = createFutureCallbackWithFunction( + () -> span.set(Tracer.completeSpan().get())); - @Override - public void onFailure(Throwable throwable) { - assertThat(Tracer.completeSpan().get().getOperation()).isEqualTo("before-construction"); - } - }; + ListeningExecutorService listeningExecutorService = + MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + ListenableFuture future = listeningExecutorService.submit(() -> { + throw new IllegalStateException(); + }); + + OpenSpan beforeSpan = Tracer.startSpan("before-construction"); + FutureCallback callback = Tracers.wrap("callback", futureCallback); + Tracer.startSpan("after-construction"); + Futures.addCallback(future, callback, MoreExecutors.directExecutor()); + + assertThat(futureCallback.waitForCompletion(10, TimeUnit.SECONDS)).isTrue(); + assertThat(futureCallback.wasSuccess()).isFalse(); + assertThatThrownBy(future::get).isInstanceOf(ExecutionException.class); + + assertThat(span.get().getOperation()).isEqualTo("callback"); + assertThat(span.get().getParentSpanId().get()).isEqualTo(beforeSpan.getSpanId()); + } + + @Test + public void testWrappingFutureCallback_traceStateShowsCorrectlyParentedNewOperation_failure_differentThread() + throws Exception { + AtomicReference span = new AtomicReference<>(); + CompletionAwareFutureCallback futureCallback = createFutureCallbackWithFunction( + () -> span.set(Tracer.completeSpan().get())); ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); - ListenableFuture failure = listeningExecutorService.submit(() -> { + ListenableFuture future = listeningExecutorService.submit(() -> { throw new IllegalStateException(); }); - Tracer.startSpan("before-construction"); - FutureCallback failureCallback = Tracers.wrap("failureCallback", futureCallback); + OpenSpan beforeSpan = Tracer.startSpan("before-construction"); + FutureCallback callback = Tracers.wrap("callback", futureCallback); Tracer.startSpan("after-construction"); - Futures.addCallback(failure, failureCallback, Executors.newSingleThreadExecutor()); - assertThatThrownBy(failure::get).isInstanceOf(ExecutionException.class); + Futures.addCallback(future, callback, Executors.newSingleThreadExecutor()); + + assertThat(futureCallback.waitForCompletion(10, TimeUnit.SECONDS)).isTrue(); + assertThat(futureCallback.wasSuccess()).isFalse(); + assertThatThrownBy(future::get).isInstanceOf(ExecutionException.class); + + assertThat(span.get().getOperation()).isEqualTo("callback"); + assertThat(span.get().getParentSpanId().get()).isEqualTo(beforeSpan.getSpanId()); } @Test @@ -728,4 +768,71 @@ private static List getCurrentTrace() { return Lists.reverse(spans); }).orElse(Collections.emptyList()); } + + private static CompletionAwareFutureCallback createFutureCallbackWithFunction(Runnable runnable) { + return new CompletionAwareFutureCallback<>(new FutureCallback() { + @Override + public void onSuccess(@NullableDecl V result) { + runnable.run(); + } + + @Override + public void onFailure(Throwable throwable) { + runnable.run(); + } + }); + } + + /** + * Defines an interface for providing information on completion of execution and the ability to wait on completion + * for use with tests involving FutureCallback. + */ + private interface CompletionAware { + boolean waitForCompletion(long timeout, TimeUnit unit) throws InterruptedException; + boolean wasSuccess(); + } + + /** + * There is no guarantee when a FutureCallback will actually run. In order to verify state from inside the + * FutureCallback, a CountDownLatch is used to track when the FutureCallback has finished executing and to provide + * a condition to wait on. + */ + private static final class CompletionAwareFutureCallback implements FutureCallback, CompletionAware { + private final CountDownLatch latch = new CountDownLatch(1); + private final FutureCallback delegate; + private AtomicReference success = new AtomicReference<>(Boolean.FALSE); + + CompletionAwareFutureCallback(FutureCallback delegate) { + this.delegate = delegate; + } + + @Override + public void onSuccess(@NullableDecl V result) { + try { + success.set(Boolean.TRUE); + delegate.onSuccess(result); + } finally { + latch.countDown(); + } + } + + @Override + public void onFailure(Throwable throwable) { + try { + delegate.onFailure(throwable); + } finally { + latch.countDown(); + } + } + + @Override + public boolean waitForCompletion(long timeout, TimeUnit unit) throws InterruptedException { + return latch.await(timeout, unit); + } + + @Override + public boolean wasSuccess() { + return success.get(); + } + } }