diff --git a/pom.xml b/pom.xml index e69f7023e..b4fb2ac0f 100644 --- a/pom.xml +++ b/pom.xml @@ -74,8 +74,8 @@ dev.openfeature sdk - - [1.14,1.99999) + + [1.14.1,1.99999) provided diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java index 9758b2091..6327ec404 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java @@ -2,14 +2,12 @@ import dev.openfeature.contrib.providers.flagd.resolver.Resolver; import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent; -import dev.openfeature.contrib.providers.flagd.resolver.common.Util; import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver; import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache; import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver; import dev.openfeature.sdk.EvaluationContext; import dev.openfeature.sdk.EventProvider; import dev.openfeature.sdk.Hook; -import dev.openfeature.sdk.ImmutableContext; import dev.openfeature.sdk.Metadata; import dev.openfeature.sdk.ProviderEvaluation; import dev.openfeature.sdk.ProviderEvent; @@ -36,7 +34,7 @@ public class FlagdProvider extends EventProvider { private static final String FLAGD_PROVIDER = "flagd"; private final Resolver flagResolver; private final List hooks = new ArrayList<>(); - private final EventsLock eventsLock = new EventsLock(); + private final FlagdProviderSyncResources syncResources = new FlagdProviderSyncResources(); /** * An executor service responsible for emitting @@ -108,7 +106,9 @@ public FlagdProvider(final FlagdOptions options) { gracePeriod = Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD; hooks.add(new SyncMetadataHook(this::getEnrichedContext)); errorExecutor = Executors.newSingleThreadScheduledExecutor(); - this.eventsLock.initialized = initialized; + if (initialized) { + this.syncResources.initialize(); + } } @Override @@ -118,28 +118,27 @@ public List getProviderHooks() { @Override public void initialize(EvaluationContext evaluationContext) throws Exception { - synchronized (eventsLock) { - if (eventsLock.initialized) { + synchronized (syncResources) { + if (syncResources.isInitialized()) { return; } flagResolver.init(); + // block till ready - this works with deadline fine for rpc, but with in_process + // we also need to take parsing into the equation + // TODO: evaluate where we are losing time, so we can remove this magic number - + syncResources.waitForInitialization(this.deadline * 2); } - // block till ready - this works with deadline fine for rpc, but with in_process - // we also need to take parsing into the equation - // TODO: evaluate where we are losing time, so we can remove this magic number - - // follow up - // wait outside of the synchonrization or we'll deadlock - Util.busyWaitAndCheck(this.deadline * 2, () -> eventsLock.initialized); } @Override public void shutdown() { - synchronized (eventsLock) { - if (!eventsLock.initialized) { - return; - } + synchronized (syncResources) { try { + if (!syncResources.isInitialized() || syncResources.isShutDown()) { + return; + } + this.flagResolver.shutdown(); if (errorExecutor != null) { errorExecutor.shutdownNow(); @@ -148,7 +147,7 @@ public void shutdown() { } catch (Exception e) { log.error("Error during shutdown {}", FLAGD_PROVIDER, e); } finally { - eventsLock.initialized = false; + syncResources.shutdown(); } } } @@ -189,15 +188,13 @@ public ProviderEvaluation getObjectEvaluation(String key, Value defaultVa * @return context */ EvaluationContext getEnrichedContext() { - return eventsLock.enrichedContext; + return syncResources.getEnrichedContext(); } @SuppressWarnings("checkstyle:fallthrough") private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) { - - synchronized (eventsLock) { - log.info("FlagdProviderEvent: {}", flagdProviderEvent.getEvent()); - + log.info("FlagdProviderEvent event {} ", flagdProviderEvent.getEvent()); + synchronized (syncResources) { /* * We only use Error and Ready as previous states. * As error will first be emitted as Stale, and only turns after a while into an @@ -209,29 +206,30 @@ private void onProviderEvent(FlagdProviderEvent flagdProviderEvent) { */ switch (flagdProviderEvent.getEvent()) { case PROVIDER_CONFIGURATION_CHANGED: - if (eventsLock.previousEvent == ProviderEvent.PROVIDER_READY) { + if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_READY) { onConfigurationChanged(flagdProviderEvent); break; } - // intentional fall through, a not-ready change will trigger a ready. + // intentional fall through case PROVIDER_READY: /* * Sync metadata is used to enrich the context, and is immutable in flagd, * so we only need it to be fetched once at READY. */ if (flagdProviderEvent.getSyncMetadata() != null) { - eventsLock.enrichedContext = contextEnricher.apply(flagdProviderEvent.getSyncMetadata()); + syncResources.setEnrichedContext(contextEnricher.apply(flagdProviderEvent.getSyncMetadata())); } onReady(); - eventsLock.previousEvent = ProviderEvent.PROVIDER_READY; + syncResources.setPreviousEvent(ProviderEvent.PROVIDER_READY); break; case PROVIDER_ERROR: - if (eventsLock.previousEvent != ProviderEvent.PROVIDER_ERROR) { + if (syncResources.getPreviousEvent() != ProviderEvent.PROVIDER_ERROR) { onError(); + syncResources.setPreviousEvent(ProviderEvent.PROVIDER_ERROR); } - eventsLock.previousEvent = ProviderEvent.PROVIDER_ERROR; break; + default: log.info("Unknown event {}", flagdProviderEvent.getEvent()); } @@ -246,8 +244,7 @@ private void onConfigurationChanged(FlagdProviderEvent flagdProviderEvent) { } private void onReady() { - if (!eventsLock.initialized) { - eventsLock.initialized = true; + if (syncResources.initialize()) { log.info("initialized FlagdProvider"); } if (errorTask != null && !errorTask.isCancelled()) { @@ -272,7 +269,7 @@ private void onError() { if (!errorExecutor.isShutdown()) { errorTask = errorExecutor.schedule( () -> { - if (eventsLock.previousEvent == ProviderEvent.PROVIDER_ERROR) { + if (syncResources.getPreviousEvent() == ProviderEvent.PROVIDER_ERROR) { log.debug( "Provider did not reconnect successfully within {}s. Emit ERROR event...", gracePeriod); @@ -286,14 +283,4 @@ private void onError() { TimeUnit.SECONDS); } } - - /** - * Contains all fields we need to worry about locking, used as intrinsic lock - * for sync blocks. - */ - static class EventsLock { - volatile ProviderEvent previousEvent = null; - volatile boolean initialized = false; - volatile EvaluationContext enrichedContext = new ImmutableContext(); - } } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProviderSyncResources.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProviderSyncResources.java new file mode 100644 index 000000000..2cbb82976 --- /dev/null +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProviderSyncResources.java @@ -0,0 +1,94 @@ +package dev.openfeature.contrib.providers.flagd; + +import dev.openfeature.sdk.EvaluationContext; +import dev.openfeature.sdk.ImmutableContext; +import dev.openfeature.sdk.ProviderEvent; +import dev.openfeature.sdk.exceptions.GeneralError; +import lombok.Getter; +import lombok.Setter; + +/** + * Contains all fields we need to worry about locking, used as intrinsic lock + * for sync blocks in the {@link FlagdProvider}. + */ +@Getter +class FlagdProviderSyncResources { + @Setter + private volatile ProviderEvent previousEvent = null; + + private volatile EvaluationContext enrichedContext = new ImmutableContext(); + private volatile boolean initialized; + private volatile boolean isShutDown; + + public void setEnrichedContext(EvaluationContext context) { + this.enrichedContext = new ImmutableContext(context.asMap()); + } + + /** + * With this method called, it is suggested that initialization has been completed. It will wake up all threads that + * wait for the initialization. Subsequent calls have no effect. + * + * @return true iff this was the first call to {@code initialize()} + */ + public synchronized boolean initialize() { + if (this.initialized) { + return false; + } + this.initialized = true; + this.notifyAll(); + return true; + } + + /** + * Blocks the calling thread until either {@link FlagdProviderSyncResources#initialize()} or + * {@link FlagdProviderSyncResources#shutdown()} is called or the deadline is exceeded, whatever happens first. If + * {@link FlagdProviderSyncResources#initialize()} has been executed before {@code waitForInitialization(long)} is + * called, it will return instantly. If the deadline is exceeded, a GeneralError will be thrown. + * If {@link FlagdProviderSyncResources#shutdown()} is called in the meantime, an {@link IllegalStateException} will + * be thrown. Otherwise, the method will return cleanly. + * + * @param deadline the maximum time in ms to wait + * @throws GeneralError when the deadline is exceeded before + * {@link FlagdProviderSyncResources#initialize()} is called on this object + * @throws IllegalStateException when {@link FlagdProviderSyncResources#shutdown()} is called or has been called on + * this object + */ + public void waitForInitialization(long deadline) { + long start = System.currentTimeMillis(); + long end = start + deadline; + while (!initialized && !isShutDown) { + long now = System.currentTimeMillis(); + // if wait(0) is called, the thread would wait forever, so we abort when this would happen + if (now >= end) { + throw new GeneralError(String.format( + "Deadline exceeded. Condition did not complete within the %d ms deadline", deadline)); + } + long remaining = end - now; + synchronized (this) { + if (isShutDown) { + break; + } + if (initialized) { // might have changed in the meantime + return; + } + try { + this.wait(remaining); + } catch (InterruptedException e) { + // try again. Leave the continue to make PMD happy + continue; + } + } + } + if (isShutDown) { + throw new IllegalStateException("Already shut down"); + } + } + + /** + * Signals a shutdown. Threads waiting for initialization will wake up and throw an {@link IllegalStateException}. + */ + public synchronized void shutdown() { + isShutDown = true; + this.notifyAll(); + } +} diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/Util.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/Util.java deleted file mode 100644 index 394716415..000000000 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/Util.java +++ /dev/null @@ -1,39 +0,0 @@ -package dev.openfeature.contrib.providers.flagd.resolver.common; - -import dev.openfeature.sdk.exceptions.GeneralError; -import java.util.function.Supplier; -import lombok.extern.slf4j.Slf4j; - -/** - * Utility class for managing gRPC connection states and handling synchronization operations. - */ -@Slf4j -public class Util { - - /** - * Private constructor to prevent instantiation of utility class. - */ - private Util() {} - - /** - * A helper method to block the caller until a condition is met or a timeout occurs. - * - * @param deadline the maximum number of milliseconds to block - * @param connectedSupplier a function that evaluates to {@code true} when the desired condition is met - * @throws InterruptedException if the thread is interrupted during the waiting process - * @throws GeneralError if the deadline is exceeded before the condition is met - */ - public static void busyWaitAndCheck(final Long deadline, final Supplier connectedSupplier) - throws InterruptedException { - long start = System.currentTimeMillis(); - - do { - if (deadline <= System.currentTimeMillis() - start) { - throw new GeneralError(String.format( - "Deadline exceeded. Condition did not complete within the %d " + "deadline", deadline)); - } - - Thread.sleep(50L); - } while (!connectedSupplier.get()); - } -} diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java index e28597d90..46fce500b 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java @@ -38,7 +38,6 @@ public class InProcessResolver implements Resolver { private final Storage flagStore; private final Consumer onConnectionEvent; private final Operator operator; - private final long deadline; private final String scope; /** @@ -52,7 +51,6 @@ public class InProcessResolver implements Resolver { */ public InProcessResolver(FlagdOptions options, Consumer onConnectionEvent) { this.flagStore = new FlagStore(getConnector(options, onConnectionEvent)); - this.deadline = options.getDeadline(); this.onConnectionEvent = onConnectionEvent; this.operator = new Operator(); this.scope = options.getSelector(); @@ -70,10 +68,12 @@ public void init() throws Exception { flagStore.getStateQueue().take(); switch (storageStateChange.getStorageState()) { case OK: + log.info("onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED"); onConnectionEvent.accept(new FlagdProviderEvent( ProviderEvent.PROVIDER_CONFIGURATION_CHANGED, storageStateChange.getChangedFlagsKeys(), storageStateChange.getSyncMetadata())); + log.info("post onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED"); break; case ERROR: onConnectionEvent.accept(new FlagdProviderEvent(ProviderEvent.PROVIDER_ERROR)); diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderSyncResourcesTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderSyncResourcesTest.java new file mode 100644 index 000000000..6ff6e1765 --- /dev/null +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderSyncResourcesTest.java @@ -0,0 +1,140 @@ +package dev.openfeature.contrib.providers.flagd; + +import dev.openfeature.sdk.exceptions.GeneralError; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +class FlagdProviderSyncResourcesTest { + private static final long MAX_TIME_TOLERANCE = 20; + + private FlagdProviderSyncResources flagdProviderSyncResources; + + @BeforeEach + void setUp() { + flagdProviderSyncResources = new FlagdProviderSyncResources(); + } + + @Timeout(2) + @Test + void waitForInitialization_failsWhenDeadlineElapses() { + Assertions.assertThrows(GeneralError.class, () -> flagdProviderSyncResources.waitForInitialization(2)); + } + + @Timeout(2) + @Test + void waitForInitialization_waitsApproxForDeadline() { + final AtomicLong start = new AtomicLong(); + final AtomicLong end = new AtomicLong(); + final long deadline = 45; + + start.set(System.currentTimeMillis()); + Assertions.assertThrows(GeneralError.class, () -> flagdProviderSyncResources.waitForInitialization(deadline)); + end.set(System.currentTimeMillis()); + + final long elapsed = end.get() - start.get(); + // should wait at least for the deadline + Assertions.assertTrue(elapsed >= deadline); + // should not wait much longer than the deadline + Assertions.assertTrue(elapsed < deadline + MAX_TIME_TOLERANCE); + } + + @Timeout(2) + @Test + void interruptingWaitingThread_isIgnored() throws InterruptedException { + final AtomicBoolean isWaiting = new AtomicBoolean(); + final long deadline = 500; + Thread waitingThread = new Thread(() -> { + long start = System.currentTimeMillis(); + isWaiting.set(true); + Assertions.assertThrows( + GeneralError.class, () -> flagdProviderSyncResources.waitForInitialization(deadline)); + + long end = System.currentTimeMillis(); + long duration = end - start; + // even though thread was interrupted, it still waited for the deadline + Assertions.assertTrue(duration >= deadline); + Assertions.assertTrue(duration < deadline + MAX_TIME_TOLERANCE); + }); + waitingThread.start(); + + while (!isWaiting.get()) { + Thread.yield(); + } + + Thread.sleep(MAX_TIME_TOLERANCE); // waitingThread should have started waiting in the meantime + + for (int i = 0; i < 50; i++) { + waitingThread.interrupt(); + Thread.sleep(10); + } + + waitingThread.join(); + } + + @Timeout(2) + @Test + void callingInitialize_wakesUpWaitingThread() throws InterruptedException { + final AtomicBoolean isWaiting = new AtomicBoolean(); + Thread waitingThread = new Thread(() -> { + long start = System.currentTimeMillis(); + isWaiting.set(true); + flagdProviderSyncResources.waitForInitialization(10000); + long end = System.currentTimeMillis(); + long duration = end - start; + Assertions.assertTrue(duration < MAX_TIME_TOLERANCE); + }); + waitingThread.start(); + + while (!isWaiting.get()) { + Thread.yield(); + } + + Thread.sleep(MAX_TIME_TOLERANCE); // waitingThread should have started waiting in the meantime + + flagdProviderSyncResources.initialize(); + + waitingThread.join(); + } + + @Timeout(2) + @Test + void callingShutdown_wakesUpWaitingThreadWithException() throws InterruptedException { + final AtomicBoolean isWaiting = new AtomicBoolean(); + Thread waitingThread = new Thread(() -> { + long start = System.currentTimeMillis(); + isWaiting.set(true); + Assertions.assertThrows( + IllegalStateException.class, () -> flagdProviderSyncResources.waitForInitialization(10000)); + + long end = System.currentTimeMillis(); + long duration = end - start; + Assertions.assertTrue(duration < MAX_TIME_TOLERANCE); + }); + waitingThread.start(); + + while (!isWaiting.get()) { + Thread.yield(); + } + + Thread.sleep(MAX_TIME_TOLERANCE); // waitingThread should have started waiting in the meantime + + flagdProviderSyncResources.shutdown(); + + waitingThread.join(); + } + + @Timeout(2) + @Test + void waitForInitializationAfterCallingInitialize_returnsInstantly() { + flagdProviderSyncResources.initialize(); + long start = System.currentTimeMillis(); + flagdProviderSyncResources.waitForInitialization(10000); + long end = System.currentTimeMillis(); + // do not use MAX_TIME_TOLERANCE here, this should happen faster than that + Assertions.assertTrue(start + 1 >= end); + } +} diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java index 0c7444c34..1b39add30 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderTest.java @@ -5,6 +5,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.argThat; @@ -600,7 +601,8 @@ void contextEnrichment() throws Exception { MutableStructure metadata = new MutableStructure(); metadata.add(key, val); // given - final Function mockEnricher = mock(Function.class); + final Function enricher = structure -> new ImmutableContext(structure.asMap()); + final Function mockEnricher = mock(Function.class, delegatesTo(enricher)); // mock a resolver try (MockedConstruction mockResolver = diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/EventSteps.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/EventSteps.java index bbf9bb3c0..96130c048 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/EventSteps.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/EventSteps.java @@ -48,7 +48,7 @@ public void a_stale_event_handler(String eventType) { } @When("a {} event was fired") - public void eventWasFired(String eventType) throws InterruptedException { + public void eventWasFired(String eventType) { eventHandlerShouldBeExecutedWithin(eventType, EVENT_TIMEOUT_MS); } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java index 77f3ddabd..eb837afe1 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ProviderSteps.java @@ -53,14 +53,14 @@ public static void afterAll() throws IOException { } @Before - public void before() throws IOException { + public void before() { if (!container.isRunning()) { container.start(); } } @After - public void tearDown() throws InterruptedException { + public void tearDown() { if (state.client != null) { when().post("http://" + container.getLaunchpadUrl() + "/stop") .then() @@ -70,7 +70,7 @@ public void tearDown() throws InterruptedException { } @Given("a {} flagd provider") - public void setupProvider(String providerType) throws IOException, InterruptedException { + public void setupProvider(String providerType) throws InterruptedException { String flagdConfig = "default"; state.builder.deadline(1000).keepAlive(0).retryGracePeriod(2); boolean wait = true; @@ -125,28 +125,26 @@ public void setupProvider(String providerType) throws IOException, InterruptedEx .statusCode(200); // giving flagd a little time to start - Thread.sleep(30); + Thread.sleep(300); FeatureProvider provider = new FlagdProvider(state.builder.resolverType(State.resolverType).build()); - + String providerName = "Provider " + Math.random(); OpenFeatureAPI api = OpenFeatureAPI.getInstance(); - String providerName = providerType + Math.random(); if (wait) { api.setProviderAndWait(providerName, provider); } else { api.setProvider(providerName, provider); } - log.info("provider name: {}", providerName); this.state.client = api.getClient(providerName); } @When("the connection is lost") - public void the_connection_is_lost() throws InterruptedException { + public void the_connection_is_lost() { when().post("http://" + container.getLaunchpadUrl() + "/stop").then().statusCode(200); } @When("the connection is lost for {int}s") - public void the_connection_is_lost_for(int seconds) throws InterruptedException { + public void the_connection_is_lost_for(int seconds) { when().post("http://" + container.getLaunchpadUrl() + "/restart?seconds={seconds}", seconds) .then() .statusCode(200);