diff --git a/core-api/src/main/java/com/optimizely/ab/odp/ODPEvent.java b/core-api/src/main/java/com/optimizely/ab/odp/ODPEvent.java index 34bd340b6..903bcf663 100644 --- a/core-api/src/main/java/com/optimizely/ab/odp/ODPEvent.java +++ b/core-api/src/main/java/com/optimizely/ab/odp/ODPEvent.java @@ -15,6 +15,9 @@ */ package com.optimizely.ab.odp; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Collections; import java.util.Map; public class ODPEvent { @@ -23,11 +26,11 @@ public class ODPEvent { private Map identifiers; private Map data; - public ODPEvent(String type, String action, Map identifiers, Map data) { + public ODPEvent(@Nonnull String type, @Nonnull String action, @Nullable Map identifiers, @Nullable Map data) { this.type = type; this.action = action; - this.identifiers = identifiers; - this.data = data; + this.identifiers = identifiers != null ? identifiers : Collections.emptyMap(); + this.data = data != null ? data : Collections.emptyMap(); } public String getType() { diff --git a/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java b/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java new file mode 100644 index 000000000..7cc601f29 --- /dev/null +++ b/core-api/src/main/java/com/optimizely/ab/odp/ODPEventManager.java @@ -0,0 +1,199 @@ +/** + * + * Copyright 2022, Optimizely + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.optimizely.ab.odp; + +import com.optimizely.ab.event.internal.BuildVersionInfo; +import com.optimizely.ab.event.internal.ClientEngineInfo; +import com.optimizely.ab.odp.serializer.ODPJsonSerializerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.*; +import java.util.concurrent.*; + +public class ODPEventManager { + private static final Logger logger = LoggerFactory.getLogger(ODPEventManager.class); + private static final int DEFAULT_BATCH_SIZE = 10; + private static final int DEFAULT_QUEUE_SIZE = 10000; + private static final int DEFAULT_FLUSH_INTERVAL = 1000; + private static final int MAX_RETRIES = 3; + private static final String EVENT_URL_PATH = "/v3/events"; + + private final int queueSize; + private final int batchSize; + private final int flushInterval; + + private Boolean isRunning = false; + + // This needs to be volatile because it will be updated in the main thread and the event dispatcher thread + // needs to see the change immediately. + private volatile ODPConfig odpConfig; + private EventDispatcherThread eventDispatcherThread; + + private final ODPApiManager apiManager; + + // The eventQueue needs to be thread safe. We are not doing anything extra for thread safety here + // because `LinkedBlockingQueue` itself is thread safe. + private final BlockingQueue eventQueue = new LinkedBlockingQueue<>(); + + public ODPEventManager(@Nonnull ODPConfig odpConfig, @Nonnull ODPApiManager apiManager) { + this(odpConfig, apiManager, null, null, null); + } + + public ODPEventManager(@Nonnull ODPConfig odpConfig, @Nonnull ODPApiManager apiManager, @Nullable Integer batchSize, @Nullable Integer queueSize, @Nullable Integer flushInterval) { + this.odpConfig = odpConfig; + this.apiManager = apiManager; + this.batchSize = (batchSize != null && batchSize > 1) ? batchSize : DEFAULT_BATCH_SIZE; + this.queueSize = queueSize != null ? queueSize : DEFAULT_QUEUE_SIZE; + this.flushInterval = (flushInterval != null && flushInterval > 0) ? flushInterval : DEFAULT_FLUSH_INTERVAL; + } + + public void start() { + isRunning = true; + eventDispatcherThread = new EventDispatcherThread(); + eventDispatcherThread.start(); + } + + public void updateSettings(ODPConfig odpConfig) { + this.odpConfig = odpConfig; + } + + public void identifyUser(@Nullable String vuid, String userId) { + Map identifiers = new HashMap<>(); + if (vuid != null) { + identifiers.put(ODPUserKey.VUID.getKeyString(), vuid); + } + identifiers.put(ODPUserKey.FS_USER_ID.getKeyString(), userId); + ODPEvent event = new ODPEvent("fullstack", "client_initialized", identifiers, null); + sendEvent(event); + } + + public void sendEvent(ODPEvent event) { + event.setData(augmentCommonData(event.getData())); + processEvent(event); + } + + private Map augmentCommonData(Map sourceData) { + Map data = new HashMap<>(); + data.put("idempotence_id", UUID.randomUUID().toString()); + data.put("data_source_type", "sdk"); + data.put("data_source", ClientEngineInfo.getClientEngine().getClientEngineValue()); + data.put("data_source_version", BuildVersionInfo.getClientVersion()); + data.putAll(sourceData); + return data; + } + + private void processEvent(ODPEvent event) { + if (!isRunning) { + logger.warn("Failed to Process ODP Event. ODPEventManager is not running"); + return; + } + + if (!odpConfig.isReady()) { + logger.debug("Unable to Process ODP Event. ODPConfig is not ready."); + return; + } + + if (eventQueue.size() >= queueSize) { + logger.warn("Failed to Process ODP Event. Event Queue full. queueSize = " + queueSize); + return; + } + + if (!eventQueue.offer(event)) { + logger.error("Failed to Process ODP Event. Event Queue is not accepting any more events"); + } + } + + public void stop() { + logger.debug("Sending stop signal to ODP Event Dispatcher Thread"); + eventDispatcherThread.signalStop(); + } + + private class EventDispatcherThread extends Thread { + + private volatile boolean shouldStop = false; + + private final List currentBatch = new ArrayList<>(); + + private long nextFlushTime = new Date().getTime(); + + @Override + public void run() { + while (true) { + try { + ODPEvent nextEvent; + + // If batch has events, set the timeout to remaining time for flush interval, + // otherwise wait for the new event indefinitely + if (currentBatch.size() > 0) { + nextEvent = eventQueue.poll(nextFlushTime - new Date().getTime(), TimeUnit.MILLISECONDS); + } else { + nextEvent = eventQueue.poll(); + } + + if (nextEvent == null) { + // null means no new events received and flush interval is over, dispatch whatever is in the batch. + if (!currentBatch.isEmpty()) { + flush(); + } + if (shouldStop) { + break; + } + continue; + } + + if (currentBatch.size() == 0) { + // Batch starting, create a new flush time + nextFlushTime = new Date().getTime() + flushInterval; + } + + currentBatch.add(nextEvent); + + if (currentBatch.size() >= batchSize) { + flush(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + logger.debug("Exiting ODP Event Dispatcher Thread."); + } + + private void flush() { + if (odpConfig.isReady()) { + String payload = ODPJsonSerializerFactory.getSerializer().serializeEvents(currentBatch); + String endpoint = odpConfig.getApiHost() + EVENT_URL_PATH; + Integer statusCode; + int numAttempts = 0; + do { + statusCode = apiManager.sendEvents(odpConfig.getApiKey(), endpoint, payload); + numAttempts ++; + } while (numAttempts < MAX_RETRIES && statusCode != null && (statusCode == 0 || statusCode >= 500)); + } else { + logger.debug("ODPConfig not ready, discarding event batch"); + } + currentBatch.clear(); + } + + public void signalStop() { + shouldStop = true; + } + } +} diff --git a/core-api/src/test/java/com/optimizely/ab/odp/ODPEventManagerTest.java b/core-api/src/test/java/com/optimizely/ab/odp/ODPEventManagerTest.java new file mode 100644 index 000000000..7be51e415 --- /dev/null +++ b/core-api/src/test/java/com/optimizely/ab/odp/ODPEventManagerTest.java @@ -0,0 +1,234 @@ +/** + * + * Copyright 2022, Optimizely + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.optimizely.ab.odp; + +import ch.qos.logback.classic.Level; +import com.optimizely.ab.internal.LogbackVerifier; +import org.json.JSONArray; +import org.json.JSONObject; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; +import static org.junit.Assert.*; + +@RunWith(MockitoJUnitRunner.class) +public class ODPEventManagerTest { + + @Rule + public LogbackVerifier logbackVerifier = new LogbackVerifier(); + + @Mock + ODPApiManager mockApiManager; + + @Captor + ArgumentCaptor payloadCaptor; + + @Before + public void setup() { + mockApiManager = mock(ODPApiManager.class); + } + + @Test + public void logAndDiscardEventWhenEventManagerIsNotRunning() { + ODPConfig odpConfig = new ODPConfig("key", "host", null); + ODPEventManager eventManager = new ODPEventManager(odpConfig, mockApiManager); + ODPEvent event = new ODPEvent("test-type", "test-action", Collections.emptyMap(), Collections.emptyMap()); + eventManager.sendEvent(event); + logbackVerifier.expectMessage(Level.WARN, "Failed to Process ODP Event. ODPEventManager is not running"); + } + + @Test + public void logAndDiscardEventWhenODPConfigNotReady() { + ODPConfig odpConfig = new ODPConfig(null, null, null); + ODPEventManager eventManager = new ODPEventManager(odpConfig, mockApiManager); + eventManager.start(); + ODPEvent event = new ODPEvent("test-type", "test-action", Collections.emptyMap(), Collections.emptyMap()); + eventManager.sendEvent(event); + logbackVerifier.expectMessage(Level.DEBUG, "Unable to Process ODP Event. ODPConfig is not ready."); + } + + @Test + public void dispatchEventsInCorrectNumberOfBatches() throws InterruptedException { + Mockito.reset(mockApiManager); + Mockito.when(mockApiManager.sendEvents(any(), any(), any())).thenReturn(202); + ODPConfig odpConfig = new ODPConfig("key", "http://www.odp-host.com", null); + ODPEventManager eventManager = new ODPEventManager(odpConfig, mockApiManager); + eventManager.start(); + for (int i = 0; i < 25; i++) { + eventManager.sendEvent(getEvent(i)); + } + Thread.sleep(1500); + Mockito.verify(mockApiManager, times(3)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), any()); + } + + @Test + public void dispatchEventsWithCorrectPayload() throws InterruptedException { + Mockito.reset(mockApiManager); + Mockito.when(mockApiManager.sendEvents(any(), any(), any())).thenReturn(202); + int batchSize = 2; + ODPConfig odpConfig = new ODPConfig("key", "http://www.odp-host.com", null); + ODPEventManager eventManager = new ODPEventManager(odpConfig, mockApiManager, batchSize, null, null); + eventManager.start(); + for (int i = 0; i < 6; i++) { + eventManager.sendEvent(getEvent(i)); + } + Thread.sleep(500); + Mockito.verify(mockApiManager, times(3)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), payloadCaptor.capture()); + List payloads = payloadCaptor.getAllValues(); + + for (int i = 0; i < payloads.size(); i++) { + JSONArray events = new JSONArray(payloads.get(i)); + assertEquals(batchSize, events.length()); + for (int j = 0; j < events.length(); j++) { + int id = (batchSize * i) + j; + JSONObject event = events.getJSONObject(j); + assertEquals("test-type-" + id , event.getString("type")); + assertEquals("test-action-" + id , event.getString("action")); + assertEquals("value1-" + id, event.getJSONObject("identifiers").getString("identifier1")); + assertEquals("value2-" + id, event.getJSONObject("identifiers").getString("identifier2")); + assertEquals("data-value1-" + id, event.getJSONObject("data").getString("data1")); + assertEquals(id, event.getJSONObject("data").getInt("data2")); + assertEquals("sdk", event.getJSONObject("data").getString("data_source_type")); + } + } + } + + @Test + public void dispatchEventsWithCorrectFlushInterval() throws InterruptedException { + Mockito.reset(mockApiManager); + Mockito.when(mockApiManager.sendEvents(any(), any(), any())).thenReturn(202); + ODPConfig odpConfig = new ODPConfig("key", "http://www.odp-host.com", null); + ODPEventManager eventManager = new ODPEventManager(odpConfig, mockApiManager); + eventManager.start(); + for (int i = 0; i < 25; i++) { + eventManager.sendEvent(getEvent(i)); + } + Thread.sleep(500); + Mockito.verify(mockApiManager, times(2)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), any()); + + // Last batch is incomplete so it needs almost a second to flush. + Thread.sleep(1500); + Mockito.verify(mockApiManager, times(3)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), any()); + } + + @Test + public void retryFailedEvents() throws InterruptedException { + Mockito.reset(mockApiManager); + Mockito.when(mockApiManager.sendEvents(any(), any(), any())).thenReturn(500); + ODPConfig odpConfig = new ODPConfig("key", "http://www.odp-host.com", null); + ODPEventManager eventManager = new ODPEventManager(odpConfig, mockApiManager); + eventManager.start(); + for (int i = 0; i < 25; i++) { + eventManager.sendEvent(getEvent(i)); + } + Thread.sleep(500); + + // Should be called thrice for each batch + Mockito.verify(mockApiManager, times(6)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), any()); + + // Last batch is incomplete so it needs almost a second to flush. + Thread.sleep(1500); + Mockito.verify(mockApiManager, times(9)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), any()); + } + + @Test + public void shouldFlushAllScheduledEventsBeforeStopping() throws InterruptedException { + Mockito.reset(mockApiManager); + Mockito.when(mockApiManager.sendEvents(any(), any(), any())).thenReturn(202); + ODPConfig odpConfig = new ODPConfig("key", "http://www.odp-host.com", null); + ODPEventManager eventManager = new ODPEventManager(odpConfig, mockApiManager); + eventManager.start(); + for (int i = 0; i < 25; i++) { + eventManager.sendEvent(getEvent(i)); + } + eventManager.stop(); + Thread.sleep(1500); + Mockito.verify(mockApiManager, times(3)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), any()); + logbackVerifier.expectMessage(Level.DEBUG, "Exiting ODP Event Dispatcher Thread."); + } + + @Test + public void prepareCorrectPayloadForIdentifyUser() throws InterruptedException { + Mockito.reset(mockApiManager); + Mockito.when(mockApiManager.sendEvents(any(), any(), any())).thenReturn(202); + int batchSize = 2; + ODPConfig odpConfig = new ODPConfig("key", "http://www.odp-host.com", null); + ODPEventManager eventManager = new ODPEventManager(odpConfig, mockApiManager, batchSize, null, null); + eventManager.start(); + for (int i = 0; i < 2; i++) { + eventManager.identifyUser("the-vuid-" + i, "the-fs-user-id-" + i); + } + + Thread.sleep(1500); + Mockito.verify(mockApiManager, times(1)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), payloadCaptor.capture()); + + String payload = payloadCaptor.getValue(); + JSONArray events = new JSONArray(payload); + assertEquals(batchSize, events.length()); + for (int i = 0; i < events.length(); i++) { + JSONObject event = events.getJSONObject(i); + assertEquals("fullstack", event.getString("type")); + assertEquals("client_initialized", event.getString("action")); + assertEquals("the-vuid-" + i, event.getJSONObject("identifiers").getString("vuid")); + assertEquals("the-fs-user-id-" + i, event.getJSONObject("identifiers").getString("fs_user_id")); + assertEquals("sdk", event.getJSONObject("data").getString("data_source_type")); + } + } + + @Test + public void applyUpdatedODPConfigWhenAvailable() throws InterruptedException { + Mockito.reset(mockApiManager); + Mockito.when(mockApiManager.sendEvents(any(), any(), any())).thenReturn(202); + ODPConfig odpConfig = new ODPConfig("key", "http://www.odp-host.com", null); + ODPEventManager eventManager = new ODPEventManager(odpConfig, mockApiManager); + eventManager.start(); + for (int i = 0; i < 25; i++) { + eventManager.sendEvent(getEvent(i)); + } + Thread.sleep(500); + Mockito.verify(mockApiManager, times(2)).sendEvents(eq("key"), eq("http://www.odp-host.com/v3/events"), any()); + eventManager.updateSettings(new ODPConfig("new-key", "http://www.new-odp-host.com")); + Thread.sleep(1500); + Mockito.verify(mockApiManager, times(1)).sendEvents(eq("new-key"), eq("http://www.new-odp-host.com/v3/events"), any()); + } + + private ODPEvent getEvent(int id) { + Map identifiers = new HashMap<>(); + identifiers.put("identifier1", "value1-" + id); + identifiers.put("identifier2", "value2-" + id); + + Map data = new HashMap<>(); + data.put("data1", "data-value1-" + id); + data.put("data2", id); + + return new ODPEvent("test-type-" + id , "test-action-" + id, identifiers, data); + } +}