Skip to content

Commit 93e04b7

Browse files
authored
Merge pull request #1368 from yue9944882/reflector-relist-with-rv
Reflector relists with last synced resource version
2 parents db12bdc + c8a47c9 commit 93e04b7

File tree

4 files changed

+105
-25
lines changed

4 files changed

+105
-25
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,12 @@
300300
<version>2.0.0.0</version>
301301
<scope>test</scope>
302302
</dependency>
303+
<dependency>
304+
<groupId>org.awaitility</groupId>
305+
<artifactId>awaitility</artifactId>
306+
<version>4.0.3</version>
307+
<scope>test</scope>
308+
</dependency>
303309

304310
</dependencies>
305311
</dependencyManagement>

util/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,12 @@
105105
<artifactId>wiremock</artifactId>
106106
<scope>test</scope>
107107
</dependency>
108+
<dependency>
109+
<groupId>org.awaitility</groupId>
110+
<artifactId>awaitility</artifactId>
111+
<scope>test</scope>
112+
</dependency>
113+
108114
</dependencies>
109115
<build>
110116
<plugins>

util/src/main/java/io/kubernetes/client/informer/cache/ReflectorRunnable.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
*/
1313
package io.kubernetes.client.informer.cache;
1414

15+
import com.google.common.base.Strings;
1516
import io.kubernetes.client.common.KubernetesListObject;
1617
import io.kubernetes.client.common.KubernetesObject;
1718
import io.kubernetes.client.informer.EventType;
@@ -72,7 +73,9 @@ public void run() {
7273
log.info("{}#Start listing and watching...", apiTypeClass);
7374

7475
try {
75-
ApiListType list = listerWatcher.list(new CallGeneratorParams(Boolean.FALSE, null, null));
76+
ApiListType list =
77+
listerWatcher.list(
78+
new CallGeneratorParams(Boolean.FALSE, getRelistResourceVersion(), null));
7679

7780
V1ListMeta listMeta = list.getMetadata();
7881
String resourceVersion = listMeta.getResourceVersion();
@@ -172,6 +175,13 @@ public String getLastSyncResourceVersion() {
172175
return lastSyncResourceVersion;
173176
}
174177

178+
private String getRelistResourceVersion() {
179+
if (Strings.isNullOrEmpty(lastSyncResourceVersion)) {
180+
return "0";
181+
}
182+
return lastSyncResourceVersion;
183+
}
184+
175185
private void watchHandler(Watchable<ApiType> watch) {
176186
while (watch.hasNext()) {
177187
io.kubernetes.client.util.Watch.Response<ApiType> item = watch.next();

util/src/test/java/io/kubernetes/client/informer/cache/ReflectorRunnableTest.java

Lines changed: 82 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,11 @@
1212
*/
1313
package io.kubernetes.client.informer.cache;
1414

15-
import static org.junit.Assert.*;
16-
import static org.mockito.Mockito.*;
15+
import static org.mockito.Mockito.any;
16+
import static org.mockito.Mockito.never;
17+
import static org.mockito.Mockito.times;
18+
import static org.mockito.Mockito.verify;
19+
import static org.mockito.Mockito.when;
1720

1821
import io.kubernetes.client.informer.EventType;
1922
import io.kubernetes.client.informer.ListerWatcher;
@@ -25,7 +28,10 @@
2528
import io.kubernetes.client.util.CallGeneratorParams;
2629
import io.kubernetes.client.util.Watch;
2730
import io.kubernetes.client.util.Watchable;
31+
import java.time.Duration;
2832
import java.util.concurrent.atomic.AtomicReference;
33+
import org.awaitility.Awaitility;
34+
import org.hamcrest.core.IsEqual;
2935
import org.junit.Rule;
3036
import org.junit.Test;
3137
import org.junit.contrib.java.lang.system.EnvironmentVariables;
@@ -43,15 +49,15 @@ public class ReflectorRunnableTest {
4349
@Mock private ListerWatcher<V1Pod, V1PodList> listerWatcher;
4450

4551
@Test
46-
public void testReflectorRunOnce() throws InterruptedException, ApiException {
52+
public void testReflectorRunOnce() throws ApiException {
4753
String mockResourceVersion = "1000";
4854
when(listerWatcher.list(any()))
4955
.thenReturn(
5056
new V1PodList().metadata(new V1ListMeta().resourceVersion(mockResourceVersion)));
5157
when(listerWatcher.watch(any()))
5258
.then(
5359
(v) -> {
54-
Thread.sleep(999999L); // block forever
60+
Awaitility.await().forever(); // block forever
5561
return null;
5662
});
5763
ReflectorRunnable<V1Pod, V1PodList> reflectorRunnable =
@@ -61,17 +67,17 @@ public void testReflectorRunOnce() throws InterruptedException, ApiException {
6167
Thread thread = new Thread(reflectorRunnable::run);
6268
thread.setDaemon(true);
6369
thread.start();
64-
65-
// sleep 1s for starting list-watch
66-
Thread.sleep(1000);
67-
68-
verify(deltaFIFO, times(1)).replace(any(), any());
69-
verify(deltaFIFO, never()).add(any());
70-
verify(listerWatcher, times(1)).list(any());
71-
verify(listerWatcher, times(1)).watch(any());
70+
Awaitility.await()
71+
.atMost(Duration.ofSeconds(1))
72+
.pollInterval(Duration.ofMillis(100))
73+
.until(() -> mockResourceVersion.equals(reflectorRunnable.getLastSyncResourceVersion()));
7274
} finally {
7375
reflectorRunnable.stop();
7476
}
77+
verify(deltaFIFO, times(1)).replace(any(), any());
78+
verify(deltaFIFO, never()).add(any());
79+
verify(listerWatcher, times(1)).list(any());
80+
verify(listerWatcher, times(1)).watch(any());
7581
}
7682

7783
@Test
@@ -98,18 +104,17 @@ public Watchable<V1Pod> watch(CallGeneratorParams params) throws ApiException {
98104
Thread thread = new Thread(reflectorRunnable::run);
99105
thread.setDaemon(true);
100106
thread.start();
101-
102-
Thread.sleep(1000);
103-
104-
assertTrue(((MockWatch<V1Pod>) watch).isClosed());
107+
Awaitility.await()
108+
.atMost(Duration.ofSeconds(1))
109+
.pollInterval(Duration.ofMillis(100))
110+
.until(() -> ((MockWatch<V1Pod>) watch).isClosed());
105111
} finally {
106112
reflectorRunnable.stop();
107113
}
108114
}
109115

110116
@Test
111-
public void testReflectorRunnableCaptureListException()
112-
throws ApiException, InterruptedException {
117+
public void testReflectorRunnableCaptureListException() throws ApiException {
113118
RuntimeException expectedException = new RuntimeException("noxu");
114119
AtomicReference<Throwable> actualException = new AtomicReference<>();
115120
when(listerWatcher.list(any())).thenThrow(expectedException);
@@ -125,16 +130,17 @@ public void testReflectorRunnableCaptureListException()
125130
Thread thread = new Thread(reflectorRunnable::run);
126131
thread.setDaemon(true);
127132
thread.start();
128-
Thread.sleep(1000);
133+
Awaitility.await()
134+
.atMost(Duration.ofSeconds(1))
135+
.pollInterval(Duration.ofMillis(100))
136+
.untilAtomic(actualException, new IsEqual<>(expectedException));
129137
} finally {
130138
reflectorRunnable.stop();
131139
}
132-
assertEquals(expectedException, actualException.get());
133140
}
134141

135142
@Test
136-
public void testReflectorRunnableCaptureWatchException()
137-
throws ApiException, InterruptedException {
143+
public void testReflectorRunnableCaptureWatchException() throws ApiException {
138144
RuntimeException expectedException = new RuntimeException("noxu");
139145
AtomicReference<Throwable> actualException = new AtomicReference<>();
140146
when(listerWatcher.list(any())).thenReturn(new V1PodList().metadata(new V1ListMeta()));
@@ -151,10 +157,62 @@ public void testReflectorRunnableCaptureWatchException()
151157
Thread thread = new Thread(reflectorRunnable::run);
152158
thread.setDaemon(true);
153159
thread.start();
154-
Thread.sleep(1000);
160+
Awaitility.await()
161+
.atMost(Duration.ofSeconds(1))
162+
.pollInterval(Duration.ofMillis(100))
163+
.untilAtomic(actualException, new IsEqual<>(expectedException));
164+
} finally {
165+
reflectorRunnable.stop();
166+
}
167+
}
168+
169+
@Test
170+
public void testReflectorRelistShouldHonorLastSyncResourceVersion() {
171+
String expectedResourceVersion = "999";
172+
AtomicReference<String> requestedResourceVersion = new AtomicReference<>();
173+
ReflectorRunnable<V1Pod, V1PodList> reflectorRunnable =
174+
new ReflectorRunnable<>(
175+
V1Pod.class,
176+
new ListerWatcher<V1Pod, V1PodList>() {
177+
@Override
178+
public V1PodList list(CallGeneratorParams params) throws ApiException {
179+
requestedResourceVersion.set(params.resourceVersion);
180+
return new V1PodList()
181+
.metadata(new V1ListMeta().resourceVersion(expectedResourceVersion));
182+
}
183+
184+
@Override
185+
public Watchable<V1Pod> watch(CallGeneratorParams params) throws ApiException {
186+
throw new ApiException("HTTP GONE");
187+
}
188+
},
189+
deltaFIFO);
190+
191+
// run the reflector twice, and check the requesting resource version at the second time.
192+
// first run
193+
try {
194+
Thread thread = new Thread(reflectorRunnable::run);
195+
thread.setDaemon(true);
196+
thread.start();
197+
Awaitility.await()
198+
.atMost(Duration.ofSeconds(1))
199+
.pollInterval(Duration.ofMillis(100))
200+
.until(
201+
() -> expectedResourceVersion.equals(reflectorRunnable.getLastSyncResourceVersion()));
202+
} finally {
203+
reflectorRunnable.stop();
204+
}
205+
// second run
206+
try {
207+
Thread thread = new Thread(reflectorRunnable::run);
208+
thread.setDaemon(true);
209+
thread.start();
210+
Awaitility.await()
211+
.atMost(Duration.ofSeconds(1))
212+
.pollInterval(Duration.ofMillis(100))
213+
.untilAtomic(requestedResourceVersion, new IsEqual<>(expectedResourceVersion));
155214
} finally {
156215
reflectorRunnable.stop();
157216
}
158-
assertEquals(expectedException, actualException.get());
159217
}
160218
}

0 commit comments

Comments
 (0)