diff --git a/util/src/main/java/io/kubernetes/client/informer/SharedInformerFactory.java b/util/src/main/java/io/kubernetes/client/informer/SharedInformerFactory.java index ea6b5dfdf7..79ce9349b7 100644 --- a/util/src/main/java/io/kubernetes/client/informer/SharedInformerFactory.java +++ b/util/src/main/java/io/kubernetes/client/informer/SharedInformerFactory.java @@ -31,9 +31,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import okhttp3.Call; -import okhttp3.OkHttpClient; import org.apache.commons.collections4.MapUtils; /** SharedInformerFactory class constructs and caches informers for api types. */ @@ -49,7 +47,7 @@ public class SharedInformerFactory { /** Constructor w/ default thread pool. */ public SharedInformerFactory() { - this(Configuration.getDefaultApiClient(), Executors.newCachedThreadPool()); + this(Configuration.getDefaultApiClient().setReadTimeout(0), Executors.newCachedThreadPool()); } /** Constructor w/ api client specified and default thread pool. */ @@ -63,7 +61,7 @@ public SharedInformerFactory(ApiClient apiClient) { * @param threadPool specified thread pool */ public SharedInformerFactory(ExecutorService threadPool) { - this(Configuration.getDefaultApiClient(), threadPool); + this(Configuration.getDefaultApiClient().setReadTimeout(0), threadPool); } /** @@ -73,6 +71,10 @@ public SharedInformerFactory(ExecutorService threadPool) { * @param threadPool specified thread pool */ public SharedInformerFactory(ApiClient client, ExecutorService threadPool) { + if (client.getReadTimeout() != 0) { + throw new IllegalArgumentException("read timeout of ApiClient must be zero"); + } + apiClient = client; informerExecutor = threadPool; informers = new HashMap<>(); @@ -169,11 +171,9 @@ ListerWatcher listerWatcherFor( CallGenerator callGenerator, Class apiTypeClass, Class apiListTypeClass) { - if (apiClient.getHttpClient().readTimeoutMillis() > 0) { + if (apiClient.getReadTimeout() > 0) { // set read timeout zero to ensure client doesn't time out - OkHttpClient httpClient = - apiClient.getHttpClient().newBuilder().readTimeout(0, TimeUnit.MILLISECONDS).build(); - apiClient.setHttpClient(httpClient); + apiClient.setReadTimeout(0); } return new ListerWatcher() { @Override @@ -185,6 +185,8 @@ public ApiListType list(CallGeneratorParams params) throws ApiException { @Override public Watch watch(CallGeneratorParams params) throws ApiException { Call call = callGenerator.generate(params); + // bind call with private http client to make sure read timeout is zero. + call = apiClient.getHttpClient().newCall(call.request()); return Watch.createWatch( apiClient, call, @@ -196,12 +198,11 @@ public Watch watch(CallGeneratorParams params) throws ApiException { private ListerWatcher listerWatcherFor( GenericKubernetesApi genericKubernetesApi) { - if (apiClient.getHttpClient().readTimeoutMillis() > 0) { + if (apiClient.getReadTimeout() > 0) { // set read timeout zero to ensure client doesn't time out - OkHttpClient httpClient = - apiClient.getHttpClient().newBuilder().readTimeout(0, TimeUnit.MILLISECONDS).build(); - apiClient.setHttpClient(httpClient); + apiClient.setReadTimeout(0); } + // TODO: it seems read timeout is determined by genericKubernetesApi instead of above apiClient. return new ListerWatcher() { public ApiListType list(CallGeneratorParams params) throws ApiException { return genericKubernetesApi diff --git a/util/src/main/java/io/kubernetes/client/informer/cache/DeltaFIFO.java b/util/src/main/java/io/kubernetes/client/informer/cache/DeltaFIFO.java index c8220cd3dc..af4e1b4c85 100644 --- a/util/src/main/java/io/kubernetes/client/informer/cache/DeltaFIFO.java +++ b/util/src/main/java/io/kubernetes/client/informer/cache/DeltaFIFO.java @@ -29,6 +29,7 @@ import java.util.function.Consumer; import java.util.function.Function; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.MutablePair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -443,7 +444,22 @@ private Deque> combineDeltas( */ private MutablePair isDuplicate( MutablePair d1, MutablePair d2) { - return isDeletionDup(d1, d2); + MutablePair deletionDelta = isDeletionDup(d1, d2); + + // TODO: remove this after the cause of memory leakage is confirmed + // Squashing deltas w/ the same resource version, note that is a temporary fix that eases memory + // intensity. + if (deletionDelta != null) { + return deletionDelta; + } + if (d1.getLeft() != DeltaType.Deleted + && d2.getLeft() != DeltaType.Deleted + && StringUtils.equals( + d1.getRight().getMetadata().getResourceVersion(), + d2.getRight().getMetadata().getResourceVersion())) { + return d1; + } + return null; } /** diff --git a/util/src/test/java/io/kubernetes/client/informer/cache/DeltaFIFOTest.java b/util/src/test/java/io/kubernetes/client/informer/cache/DeltaFIFOTest.java index 39b7d0fba5..2e8a3d2873 100644 --- a/util/src/test/java/io/kubernetes/client/informer/cache/DeltaFIFOTest.java +++ b/util/src/test/java/io/kubernetes/client/informer/cache/DeltaFIFOTest.java @@ -20,6 +20,7 @@ import io.kubernetes.client.openapi.models.V1ObjectMeta; import io.kubernetes.client.openapi.models.V1Pod; import java.util.Arrays; +import java.util.Collections; import java.util.Deque; import java.util.LinkedList; import org.apache.commons.lang3.tuple.MutablePair; @@ -94,7 +95,9 @@ public void testDeltaFIFOBasic() throws InterruptedException { @Test public void testDeltaFIFODedup() { - V1Pod foo1 = new V1Pod().metadata(new V1ObjectMeta().name("foo1").namespace("default")); + V1Pod foo1 = + new V1Pod() + .metadata(new V1ObjectMeta().name("foo1").namespace("default").resourceVersion("ver")); Cache cache = new Cache(); DeltaFIFO deltaFIFO = new DeltaFIFO(Caches::deletionHandlingMetaNamespaceKeyFunc, cache); Deque> deltas; @@ -121,6 +124,12 @@ public void testDeltaFIFODedup() { assertEquals(foo1, deltas.peekFirst().getRight()); assertEquals(2, deltas.size()); deltaFIFO.getItems().remove(Caches.deletionHandlingMetaNamespaceKeyFunc(foo1)); + + // add-sync dedupe + deltaFIFO.add(foo1); + deltaFIFO.replace(Collections.singletonList(foo1), foo1.getMetadata().getResourceVersion()); + deltas = deltaFIFO.getItems().get(Caches.deletionHandlingMetaNamespaceKeyFunc(foo1)); + assertEquals(1, deltas.size()); } @Test