Skip to content

Commit 02c8b32

Browse files
authored
Merge pull request #1259 from fishautumn/draft
fix issue 912: do dedupe with resourceVersion
2 parents 2c2acf5 + 11a4833 commit 02c8b32

File tree

3 files changed

+40
-14
lines changed

3 files changed

+40
-14
lines changed

util/src/main/java/io/kubernetes/client/informer/SharedInformerFactory.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,7 @@
3131
import java.util.concurrent.ExecutorService;
3232
import java.util.concurrent.Executors;
3333
import java.util.concurrent.Future;
34-
import java.util.concurrent.TimeUnit;
3534
import okhttp3.Call;
36-
import okhttp3.OkHttpClient;
3735
import org.apache.commons.collections4.MapUtils;
3836

3937
/** SharedInformerFactory class constructs and caches informers for api types. */
@@ -49,7 +47,7 @@ public class SharedInformerFactory {
4947

5048
/** Constructor w/ default thread pool. */
5149
public SharedInformerFactory() {
52-
this(Configuration.getDefaultApiClient(), Executors.newCachedThreadPool());
50+
this(Configuration.getDefaultApiClient().setReadTimeout(0), Executors.newCachedThreadPool());
5351
}
5452

5553
/** Constructor w/ api client specified and default thread pool. */
@@ -63,7 +61,7 @@ public SharedInformerFactory(ApiClient apiClient) {
6361
* @param threadPool specified thread pool
6462
*/
6563
public SharedInformerFactory(ExecutorService threadPool) {
66-
this(Configuration.getDefaultApiClient(), threadPool);
64+
this(Configuration.getDefaultApiClient().setReadTimeout(0), threadPool);
6765
}
6866

6967
/**
@@ -73,6 +71,10 @@ public SharedInformerFactory(ExecutorService threadPool) {
7371
* @param threadPool specified thread pool
7472
*/
7573
public SharedInformerFactory(ApiClient client, ExecutorService threadPool) {
74+
if (client.getReadTimeout() != 0) {
75+
throw new IllegalArgumentException("read timeout of ApiClient must be zero");
76+
}
77+
7678
apiClient = client;
7779
informerExecutor = threadPool;
7880
informers = new HashMap<>();
@@ -169,11 +171,9 @@ ListerWatcher<ApiType, ApiListType> listerWatcherFor(
169171
CallGenerator callGenerator,
170172
Class<ApiType> apiTypeClass,
171173
Class<ApiListType> apiListTypeClass) {
172-
if (apiClient.getHttpClient().readTimeoutMillis() > 0) {
174+
if (apiClient.getReadTimeout() > 0) {
173175
// set read timeout zero to ensure client doesn't time out
174-
OkHttpClient httpClient =
175-
apiClient.getHttpClient().newBuilder().readTimeout(0, TimeUnit.MILLISECONDS).build();
176-
apiClient.setHttpClient(httpClient);
176+
apiClient.setReadTimeout(0);
177177
}
178178
return new ListerWatcher<ApiType, ApiListType>() {
179179
@Override
@@ -185,6 +185,8 @@ public ApiListType list(CallGeneratorParams params) throws ApiException {
185185
@Override
186186
public Watch<ApiType> watch(CallGeneratorParams params) throws ApiException {
187187
Call call = callGenerator.generate(params);
188+
// bind call with private http client to make sure read timeout is zero.
189+
call = apiClient.getHttpClient().newCall(call.request());
188190
return Watch.createWatch(
189191
apiClient,
190192
call,
@@ -196,12 +198,11 @@ public Watch<ApiType> watch(CallGeneratorParams params) throws ApiException {
196198
private <ApiType extends KubernetesObject, ApiListType extends KubernetesListObject>
197199
ListerWatcher<ApiType, ApiListType> listerWatcherFor(
198200
GenericKubernetesApi<ApiType, ApiListType> genericKubernetesApi) {
199-
if (apiClient.getHttpClient().readTimeoutMillis() > 0) {
201+
if (apiClient.getReadTimeout() > 0) {
200202
// set read timeout zero to ensure client doesn't time out
201-
OkHttpClient httpClient =
202-
apiClient.getHttpClient().newBuilder().readTimeout(0, TimeUnit.MILLISECONDS).build();
203-
apiClient.setHttpClient(httpClient);
203+
apiClient.setReadTimeout(0);
204204
}
205+
// TODO: it seems read timeout is determined by genericKubernetesApi instead of above apiClient.
205206
return new ListerWatcher<ApiType, ApiListType>() {
206207
public ApiListType list(CallGeneratorParams params) throws ApiException {
207208
return genericKubernetesApi

util/src/main/java/io/kubernetes/client/informer/cache/DeltaFIFO.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.function.Consumer;
3030
import java.util.function.Function;
3131
import org.apache.commons.collections4.CollectionUtils;
32+
import org.apache.commons.lang3.StringUtils;
3233
import org.apache.commons.lang3.tuple.MutablePair;
3334
import org.slf4j.Logger;
3435
import org.slf4j.LoggerFactory;
@@ -443,7 +444,22 @@ private Deque<MutablePair<DeltaType, KubernetesObject>> combineDeltas(
443444
*/
444445
private MutablePair<DeltaType, KubernetesObject> isDuplicate(
445446
MutablePair<DeltaType, KubernetesObject> d1, MutablePair<DeltaType, KubernetesObject> d2) {
446-
return isDeletionDup(d1, d2);
447+
MutablePair<DeltaType, KubernetesObject> deletionDelta = isDeletionDup(d1, d2);
448+
449+
// TODO: remove this after the cause of memory leakage is confirmed
450+
// Squashing deltas w/ the same resource version, note that is a temporary fix that eases memory
451+
// intensity.
452+
if (deletionDelta != null) {
453+
return deletionDelta;
454+
}
455+
if (d1.getLeft() != DeltaType.Deleted
456+
&& d2.getLeft() != DeltaType.Deleted
457+
&& StringUtils.equals(
458+
d1.getRight().getMetadata().getResourceVersion(),
459+
d2.getRight().getMetadata().getResourceVersion())) {
460+
return d1;
461+
}
462+
return null;
447463
}
448464

449465
/**

util/src/test/java/io/kubernetes/client/informer/cache/DeltaFIFOTest.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.kubernetes.client.openapi.models.V1ObjectMeta;
2121
import io.kubernetes.client.openapi.models.V1Pod;
2222
import java.util.Arrays;
23+
import java.util.Collections;
2324
import java.util.Deque;
2425
import java.util.LinkedList;
2526
import org.apache.commons.lang3.tuple.MutablePair;
@@ -94,7 +95,9 @@ public void testDeltaFIFOBasic() throws InterruptedException {
9495

9596
@Test
9697
public void testDeltaFIFODedup() {
97-
V1Pod foo1 = new V1Pod().metadata(new V1ObjectMeta().name("foo1").namespace("default"));
98+
V1Pod foo1 =
99+
new V1Pod()
100+
.metadata(new V1ObjectMeta().name("foo1").namespace("default").resourceVersion("ver"));
98101
Cache cache = new Cache();
99102
DeltaFIFO deltaFIFO = new DeltaFIFO(Caches::deletionHandlingMetaNamespaceKeyFunc, cache);
100103
Deque<MutablePair<DeltaFIFO.DeltaType, KubernetesObject>> deltas;
@@ -121,6 +124,12 @@ public void testDeltaFIFODedup() {
121124
assertEquals(foo1, deltas.peekFirst().getRight());
122125
assertEquals(2, deltas.size());
123126
deltaFIFO.getItems().remove(Caches.deletionHandlingMetaNamespaceKeyFunc(foo1));
127+
128+
// add-sync dedupe
129+
deltaFIFO.add(foo1);
130+
deltaFIFO.replace(Collections.singletonList(foo1), foo1.getMetadata().getResourceVersion());
131+
deltas = deltaFIFO.getItems().get(Caches.deletionHandlingMetaNamespaceKeyFunc(foo1));
132+
assertEquals(1, deltas.size());
124133
}
125134

126135
@Test

0 commit comments

Comments
 (0)