From 011b45b16baf33d6d406989211b92696e5b777d2 Mon Sep 17 00:00:00 2001 From: kondapally1989 Date: Sat, 17 Jul 2021 14:23:34 +0530 Subject: [PATCH 1/4] remove some of thread.sleeps in test cases --- .../leaderelection/LeaderElectionTest.java | 10 +++++++--- .../java/io/kubernetes/client/ExecTest.java | 17 +++++++++++------ 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/extended/src/test/java/io/kubernetes/client/extended/leaderelection/LeaderElectionTest.java b/extended/src/test/java/io/kubernetes/client/extended/leaderelection/LeaderElectionTest.java index 7c42684163..f188bcb5ac 100644 --- a/extended/src/test/java/io/kubernetes/client/extended/leaderelection/LeaderElectionTest.java +++ b/extended/src/test/java/io/kubernetes/client/extended/leaderelection/LeaderElectionTest.java @@ -285,16 +285,20 @@ public void testLeaderElectionCaptureException() throws ApiException, Interrupte leaderElectionConfig.setLeaseDuration(Duration.ofMillis(1000)); leaderElectionConfig.setRetryPeriod(Duration.ofMillis(200)); leaderElectionConfig.setRenewDeadline(Duration.ofMillis(700)); + final Semaphore sem = new Semaphore(1); LeaderElector leaderElector = - new LeaderElector(leaderElectionConfig, (t) -> actualException.set(t)); + new LeaderElector(leaderElectionConfig, (t) -> { actualException.set(t); sem.release(); }); ExecutorService leaderElectionWorker = Executors.newFixedThreadPool(1); + sem.acquire(); leaderElectionWorker.submit( () -> { leaderElector.run(() -> {}, () -> {}); }); - // TODO: Remove this sleep - Thread.sleep(Duration.ofSeconds(2).toMillis()); + while(!sem.tryAcquire()) { + System.out.println("waiting for leaderElectionWorker to throw exception in LeaderElectionTest::testLeaderElectionCaptureException"); + } + assertEquals(expectedException, actualException.get().getCause()); } diff --git a/util/src/test/java/io/kubernetes/client/ExecTest.java b/util/src/test/java/io/kubernetes/client/ExecTest.java index 1fcbabda72..dacd124d43 100644 --- a/util/src/test/java/io/kubernetes/client/ExecTest.java +++ b/util/src/test/java/io/kubernetes/client/ExecTest.java @@ -34,6 +34,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.StandardCharsets; +import java.util.concurrent.Semaphore; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -82,7 +83,7 @@ public static InputStream makeStream(byte[] prefix, byte[] data) { return new ByteArrayInputStream(out); } - public static Thread asyncCopy(final InputStream is, final OutputStream os) { + public static Thread asyncCopy(final InputStream is, final OutputStream os, Semaphore sem) { Thread t = new Thread( new Runnable() { @@ -91,6 +92,8 @@ public void run() { Streams.copy(is, os); } catch (IOException ex) { ex.printStackTrace(); + }finally { + sem.release(); } } }); @@ -110,14 +113,16 @@ public void testExecProcess() throws IOException, InterruptedException { final ByteArrayOutputStream stdout = new ByteArrayOutputStream(); final ByteArrayOutputStream stderr = new ByteArrayOutputStream(); - - Thread t1 = asyncCopy(process.getInputStream(), stdout); - Thread t2 = asyncCopy(process.getErrorStream(), stderr); + final Semaphore sem = new Semaphore(2); + sem.acquire(2); + Thread t1 = asyncCopy(process.getInputStream(), stdout, sem); + Thread t2 = asyncCopy(process.getErrorStream(), stderr, sem); process.getHandler().bytesMessage(makeStream(3, OUTPUT_EXIT0.getBytes(StandardCharsets.UTF_8))); - // TODO: Fix this asap! - Thread.sleep(1000); + while(!sem.tryAcquire(2)) { + System.out.println("waiting for async Copy task to be completed in ExecTest::testExecProcess"); + } process.destroy(); assertEquals(msgData, stdout.toString()); From 6519c77afc2c7bf0dec6b5e33a61cb3c6bdd3b77 Mon Sep 17 00:00:00 2001 From: kondapally1989 Date: Sat, 17 Jul 2021 14:43:30 +0530 Subject: [PATCH 2/4] fix formatting issue --- .../leaderelection/LeaderElectionTest.java | 14 ++++++++++---- .../test/java/io/kubernetes/client/ExecTest.java | 7 ++++--- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/extended/src/test/java/io/kubernetes/client/extended/leaderelection/LeaderElectionTest.java b/extended/src/test/java/io/kubernetes/client/extended/leaderelection/LeaderElectionTest.java index f188bcb5ac..90a1b80d2d 100644 --- a/extended/src/test/java/io/kubernetes/client/extended/leaderelection/LeaderElectionTest.java +++ b/extended/src/test/java/io/kubernetes/client/extended/leaderelection/LeaderElectionTest.java @@ -287,7 +287,12 @@ public void testLeaderElectionCaptureException() throws ApiException, Interrupte leaderElectionConfig.setRenewDeadline(Duration.ofMillis(700)); final Semaphore sem = new Semaphore(1); LeaderElector leaderElector = - new LeaderElector(leaderElectionConfig, (t) -> { actualException.set(t); sem.release(); }); + new LeaderElector( + leaderElectionConfig, + (t) -> { + actualException.set(t); + sem.release(); + }); ExecutorService leaderElectionWorker = Executors.newFixedThreadPool(1); sem.acquire(); @@ -295,10 +300,11 @@ public void testLeaderElectionCaptureException() throws ApiException, Interrupte () -> { leaderElector.run(() -> {}, () -> {}); }); - while(!sem.tryAcquire()) { - System.out.println("waiting for leaderElectionWorker to throw exception in LeaderElectionTest::testLeaderElectionCaptureException"); + while (!sem.tryAcquire()) { + System.out.println( + "waiting for leaderElectionWorker to throw exception in LeaderElectionTest::testLeaderElectionCaptureException"); } - + assertEquals(expectedException, actualException.get().getCause()); } diff --git a/util/src/test/java/io/kubernetes/client/ExecTest.java b/util/src/test/java/io/kubernetes/client/ExecTest.java index dacd124d43..f136e9ac77 100644 --- a/util/src/test/java/io/kubernetes/client/ExecTest.java +++ b/util/src/test/java/io/kubernetes/client/ExecTest.java @@ -92,7 +92,7 @@ public void run() { Streams.copy(is, os); } catch (IOException ex) { ex.printStackTrace(); - }finally { + } finally { sem.release(); } } @@ -120,8 +120,9 @@ public void testExecProcess() throws IOException, InterruptedException { process.getHandler().bytesMessage(makeStream(3, OUTPUT_EXIT0.getBytes(StandardCharsets.UTF_8))); - while(!sem.tryAcquire(2)) { - System.out.println("waiting for async Copy task to be completed in ExecTest::testExecProcess"); + while (!sem.tryAcquire(2)) { + System.out.println( + "waiting for async Copy task to be completed in ExecTest::testExecProcess"); } process.destroy(); From 6c07bfa070f9f31f86cee7bcf9a20d3fb8060fe7 Mon Sep 17 00:00:00 2001 From: kondapally1989 Date: Mon, 19 Jul 2021 20:13:14 +0530 Subject: [PATCH 3/4] moved semaphores to countdownlatch --- .../leaderelection/LeaderElectionTest.java | 29 +++++++------------ .../workqueue/DefaultDelayingQueueTest.java | 3 +- .../java/io/kubernetes/client/ExecTest.java | 19 +++++------- 3 files changed, 20 insertions(+), 31 deletions(-) diff --git a/extended/src/test/java/io/kubernetes/client/extended/leaderelection/LeaderElectionTest.java b/extended/src/test/java/io/kubernetes/client/extended/leaderelection/LeaderElectionTest.java index 90a1b80d2d..ca5c302eba 100644 --- a/extended/src/test/java/io/kubernetes/client/extended/leaderelection/LeaderElectionTest.java +++ b/extended/src/test/java/io/kubernetes/client/extended/leaderelection/LeaderElectionTest.java @@ -13,8 +13,8 @@ package io.kubernetes.client.extended.leaderelection; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.when; import io.kubernetes.client.openapi.ApiException; import java.net.HttpURLConnection; @@ -25,7 +25,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; @@ -285,25 +284,21 @@ public void testLeaderElectionCaptureException() throws ApiException, Interrupte leaderElectionConfig.setLeaseDuration(Duration.ofMillis(1000)); leaderElectionConfig.setRetryPeriod(Duration.ofMillis(200)); leaderElectionConfig.setRenewDeadline(Duration.ofMillis(700)); - final Semaphore sem = new Semaphore(1); + final CountDownLatch cLatch = new CountDownLatch(1); LeaderElector leaderElector = new LeaderElector( leaderElectionConfig, (t) -> { actualException.set(t); - sem.release(); + cLatch.countDown(); }); ExecutorService leaderElectionWorker = Executors.newFixedThreadPool(1); - sem.acquire(); leaderElectionWorker.submit( () -> { leaderElector.run(() -> {}, () -> {}); }); - while (!sem.tryAcquire()) { - System.out.println( - "waiting for leaderElectionWorker to throw exception in LeaderElectionTest::testLeaderElectionCaptureException"); - } + cLatch.await(); assertEquals(expectedException, actualException.get().getCause()); } @@ -341,8 +336,7 @@ public void testLeaderElectionReportLeaderOnStart() throws ApiException, Interru leaderElectionConfig.setRenewDeadline(Duration.ofMillis(700)); LeaderElector leaderElector = new LeaderElector(leaderElectionConfig); ExecutorService leaderElectionWorker = Executors.newFixedThreadPool(1); - final Semaphore s = new Semaphore(2); - s.acquire(2); + final CountDownLatch cLatch = new CountDownLatch(2); leaderElectionWorker.submit( () -> { leaderElector.run( @@ -350,12 +344,12 @@ public void testLeaderElectionReportLeaderOnStart() throws ApiException, Interru () -> {}, (id) -> { notifications.add(id); - s.release(); + cLatch.countDown(); }); }); // wait for two notifications to occur. - s.acquire(2); + cLatch.await(); assertEquals(2, notifications.size()); assertEquals("foo2", notifications.get(0)); @@ -385,8 +379,7 @@ public void testLeaderElectionShouldReportLeaderItAcquiresOnStart() leaderElectionConfig.setRenewDeadline(Duration.ofMillis(700)); LeaderElector leaderElector = new LeaderElector(leaderElectionConfig); ExecutorService leaderElectionWorker = Executors.newFixedThreadPool(1); - Semaphore s = new Semaphore(1); - s.acquire(); + final CountDownLatch cLatch = new CountDownLatch(1); leaderElectionWorker.submit( () -> { leaderElector.run( @@ -394,11 +387,11 @@ public void testLeaderElectionShouldReportLeaderItAcquiresOnStart() () -> {}, (id) -> { notifications.add(id); - s.release(); + cLatch.countDown(); }); }); - s.acquire(); + cLatch.await(); assertEquals(1, notifications.size()); assertEquals("foo1", notifications.get(0)); } diff --git a/extended/src/test/java/io/kubernetes/client/extended/workqueue/DefaultDelayingQueueTest.java b/extended/src/test/java/io/kubernetes/client/extended/workqueue/DefaultDelayingQueueTest.java index 58e030d64e..48c56b7ce5 100644 --- a/extended/src/test/java/io/kubernetes/client/extended/workqueue/DefaultDelayingQueueTest.java +++ b/extended/src/test/java/io/kubernetes/client/extended/workqueue/DefaultDelayingQueueTest.java @@ -103,7 +103,6 @@ private boolean waitForAdded(DefaultDelayingQueue queue, int size) { } private boolean waitForWaitingQueueToFill(DefaultDelayingQueue queue) { - return Wait.poll( - Duration.ofMillis(10), Duration.ofSeconds(10), () -> queue.waitingForAddQueue.size() == 0); + return waitForAdded(queue, 0); } } diff --git a/util/src/test/java/io/kubernetes/client/ExecTest.java b/util/src/test/java/io/kubernetes/client/ExecTest.java index f136e9ac77..8075e9de51 100644 --- a/util/src/test/java/io/kubernetes/client/ExecTest.java +++ b/util/src/test/java/io/kubernetes/client/ExecTest.java @@ -34,7 +34,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.StandardCharsets; -import java.util.concurrent.Semaphore; +import java.util.concurrent.CountDownLatch; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -83,7 +83,8 @@ public static InputStream makeStream(byte[] prefix, byte[] data) { return new ByteArrayInputStream(out); } - public static Thread asyncCopy(final InputStream is, final OutputStream os, Semaphore sem) { + public static Thread asyncCopy( + final InputStream is, final OutputStream os, CountDownLatch cLatch) { Thread t = new Thread( new Runnable() { @@ -93,7 +94,7 @@ public void run() { } catch (IOException ex) { ex.printStackTrace(); } finally { - sem.release(); + cLatch.countDown(); } } }); @@ -113,17 +114,13 @@ public void testExecProcess() throws IOException, InterruptedException { final ByteArrayOutputStream stdout = new ByteArrayOutputStream(); final ByteArrayOutputStream stderr = new ByteArrayOutputStream(); - final Semaphore sem = new Semaphore(2); - sem.acquire(2); - Thread t1 = asyncCopy(process.getInputStream(), stdout, sem); - Thread t2 = asyncCopy(process.getErrorStream(), stderr, sem); + CountDownLatch cLatch = new CountDownLatch(2); + Thread t1 = asyncCopy(process.getInputStream(), stdout, cLatch); + Thread t2 = asyncCopy(process.getErrorStream(), stderr, cLatch); process.getHandler().bytesMessage(makeStream(3, OUTPUT_EXIT0.getBytes(StandardCharsets.UTF_8))); - while (!sem.tryAcquire(2)) { - System.out.println( - "waiting for async Copy task to be completed in ExecTest::testExecProcess"); - } + cLatch.await(); process.destroy(); assertEquals(msgData, stdout.toString()); From b8d88046a8d9cd1af6de21fd47602aae8dcf9e7f Mon Sep 17 00:00:00 2001 From: kondapally1989 Date: Tue, 20 Jul 2021 21:51:32 +0530 Subject: [PATCH 4/4] undo the wrong change --- .../client/extended/workqueue/DefaultDelayingQueueTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/extended/src/test/java/io/kubernetes/client/extended/workqueue/DefaultDelayingQueueTest.java b/extended/src/test/java/io/kubernetes/client/extended/workqueue/DefaultDelayingQueueTest.java index 48c56b7ce5..58e030d64e 100644 --- a/extended/src/test/java/io/kubernetes/client/extended/workqueue/DefaultDelayingQueueTest.java +++ b/extended/src/test/java/io/kubernetes/client/extended/workqueue/DefaultDelayingQueueTest.java @@ -103,6 +103,7 @@ private boolean waitForAdded(DefaultDelayingQueue queue, int size) { } private boolean waitForWaitingQueueToFill(DefaultDelayingQueue queue) { - return waitForAdded(queue, 0); + return Wait.poll( + Duration.ofMillis(10), Duration.ofSeconds(10), () -> queue.waitingForAddQueue.size() == 0); } }