Skip to content

Move error logging into debug if block #220

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 40 commits into
base: 6.0.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
5fd2075
Bump Confluent to 6.1.0-SNAPSHOT, Kafka to 6.1.0-SNAPSHOT
andrewegel Jun 8, 2020
0d4c52f
Merge branch 6.0.x into master (using strategy ours)
ConfluentJenkins Jun 17, 2020
a5ae5e8
Merge branch '6.0.x'
rayokota Jun 19, 2020
e8548d6
Merge branch '6.0.x'
ConfluentJenkins Jun 25, 2020
23ffc8b
MINOR: Upgrade jetty to 9.4.30.v20200611 and jersey to 2.31 (#184)
kkonstantine Jun 29, 2020
1851cf6
Merge branch '6.0.x'
ConfluentJenkins Jun 30, 2020
74dd8d8
Merge branch '6.0.x'
rnpridgeon Jun 30, 2020
fd25fb8
Merge branch '6.0.x'
rnpridgeon Jul 9, 2020
29e0001
Merge branch '6.0.x'
niteshmor Jul 9, 2020
40c80ba
Merge branch '6.0.x'
rnpridgeon Jul 10, 2020
3fba317
Merge branch '6.0.x'
maverick64 Jul 14, 2020
8bbac43
Fix build failure (#194)
xiaodongdu Jul 23, 2020
5ae5f0a
Merge branch '6.0.x'
maverick64 Jul 29, 2020
a48d388
Merge remote-tracking branch 'origin/6.0.x'
xvrl Aug 12, 2020
2cdba3a
Merge branch '6.0.x'
maverick64 Aug 17, 2020
4e9073f
Merge branch '6.0.x'
niteshmor Aug 17, 2020
2803b6a
CIAM-261: Add Jetty ThreadPool Metrics (#205)
omkreddy Sep 1, 2020
bc40222
Merge branch '6.0.x'
ConfluentJenkins Sep 17, 2020
9c1c02f
Merge branch '6.0.x'
ConfluentJenkins Sep 19, 2020
16921b1
Merge branch '6.0.x'
ConfluentJenkins Sep 24, 2020
81735bb
Merge branch '6.0.x'
andrewegel Sep 24, 2020
0c5b7d3
Merge branch '6.0.x'
ConfluentJenkins Sep 30, 2020
9420856
Merge branch '6.0.x'
andrewegel Sep 30, 2020
bf676fa
Merge branch '6.0.x'
ConfluentJenkins Oct 1, 2020
a4b16b0
Merge branch '6.0.x'
ConfluentJenkins Oct 2, 2020
b275c66
MINOR: Ignore flaky testJettyThreadPoolMetrics test
omkreddy Oct 7, 2020
9cb5cae
Merge branch '6.0.x'
ConfluentJenkins Oct 9, 2020
cd4aec9
MINOR: Fix compilation error from createKeyStore (#207)
mattwong949 Oct 13, 2020
57f6569
ST-3461: Nano versioning (#203)
elismaga Oct 17, 2020
3869dad
Merge branch '6.0.x'
ConfluentJenkins Oct 20, 2020
69dbf3b
Merge branch '6.0.x' into 6.1.x
ConfluentJenkins Dec 1, 2020
bbaee93
Merge branch '6.0.x' into 6.1.x
niteshmor Dec 2, 2020
9cb9613
Merge branch '6.0.x' into 6.1.x
niteshmor Dec 8, 2020
cc9e582
Merge branch '6.0.x' into 6.1.x
niteshmor Dec 11, 2020
826f690
Merge branch '6.0.x' into 6.1.x
ConfluentJenkins Dec 11, 2020
9462347
Merge branch '6.0.x' into 6.1.x
srpanwar-confluent Dec 15, 2020
d52c67b
Merge branch '6.0.x' into 6.1.x
ConfluentJenkins Dec 22, 2020
b32e988
Merge branch '6.0.x' into 6.1.x
ConfluentJenkins Dec 30, 2020
6c7c43b
Merge branch '6.0.x' into 6.1.x
srpanwar-confluent Jan 5, 2021
9411c4c
Moved error logging into the if block
AdiWehrli Feb 4, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,9 @@

common {
slackChannel = '#c3-alerts'
upstreamProjects = 'confluentinc/common'
downStreamRepos = ["schema-registry", "metadata-service", "kafka-rest",
"confluent-security-plugins", "ce-kafka-http-server", "secret-registry",
"confluent-cloud-plugins"]
nanoVersion = true
}
//change
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>io.confluent</groupId>
<artifactId>rest-utils-parent</artifactId>
<version>6.0.2-SNAPSHOT</version>
<version>6.1.0-0</version>
</parent>

<artifactId>rest-utils</artifactId>
Expand Down
40 changes: 38 additions & 2 deletions core/src/main/java/io/confluent/rest/ApplicationServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.confluent.rest;

import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Metrics;

import org.apache.kafka.common.config.ConfigException;
Expand Down Expand Up @@ -46,6 +48,7 @@
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -166,6 +169,33 @@ private void attachMetricsListener(Metrics metrics, Map<String, String> tags) {
}
}

private void addJettyThreadPoolMetrics(Metrics metrics, Map<String, String> tags) {
//add metric for jetty thread pool queue size
String requestQueueSizeName = "request-queue-size";
String metricGroupName = "jetty-metrics";

MetricName requestQueueSizeMetricName = metrics.metricName(requestQueueSizeName,
metricGroupName, "The number of requests in the jetty thread pool queue.", tags);
Gauge<Integer> queueSize = (config, now) -> getQueueSize();
metrics.addMetric(requestQueueSizeMetricName, queueSize);

//add metric for thread pool busy thread count
String busyThreadCountName = "busy-thread-count";
MetricName busyThreadCountMetricName = metrics.metricName(busyThreadCountName,
metricGroupName, "jetty thread pool busy thread count.",
tags);
Gauge<Integer> busyThreadCount = (config, now) -> getBusyThreads();
metrics.addMetric(busyThreadCountMetricName, busyThreadCount);

//add metric for thread pool usage
String threadPoolUsageName = "thread-pool-usage";
final MetricName threadPoolUsageMetricName = metrics.metricName(threadPoolUsageName,
metricGroupName, " jetty thread pool usage.",
Collections.emptyMap());
Gauge<Double> threadPoolUsage = (config, now) -> (getBusyThreads() / (double) getMaxThreads());
metrics.addMetric(threadPoolUsageMetricName, threadPoolUsage);
}

private void finalizeHandlerCollection(HandlerCollection handlers, HandlerCollection wsHandlers) {
/* DefaultHandler must come last eo ensure all contexts
* have a chance to handle a request first */
Expand Down Expand Up @@ -193,6 +223,7 @@ protected final void doStart() throws Exception {
HandlerCollection wsHandlers = new HandlerCollection();
for (Application app : applications.getApplications()) {
attachMetricsListener(app.getMetrics(), app.getMetricsTags());
addJettyThreadPoolMetrics(app.getMetrics(), app.getMetricsTags());
handlers.addHandler(app.configureHandler());
wsHandlers.addHandler(app.configureWebSocketHandler());
}
Expand Down Expand Up @@ -400,6 +431,13 @@ public int getThreads() {
return getThreadPool().getThreads();
}

/**
* @return number of busy threads in the pool.
*/
public int getBusyThreads() {
return ((QueuedThreadPool)getThreadPool()).getBusyThreads();
}

/**
* For unit testing.
*
Expand All @@ -410,8 +448,6 @@ public int getMaxThreads() {
}

/**
* For unit testing.
*
* @return the size of the queue in the pool.
*/
public int getQueueSize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ public DebuggableExceptionMapper(RestConfig restConfig) {
*/
public Response.ResponseBuilder createResponse(Throwable exc, int errorCode,
Response.Status status, String msg) {
log.error("Request Failed with exception " , exc);
String readableMessage = msg;
if (restConfig != null && restConfig.getBoolean(RestConfig.DEBUG_CONFIG)) {
log.error("Request Failed with exception " , exc);
readableMessage += " " + exc.getClass().getName() + ": " + exc.getMessage();
try {
ByteArrayOutputStream os = new ByteArrayOutputStream();
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/java/io/confluent/rest/ApiHeadersTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ private static void createKeystoreWithCert(File file, String alias, Map<String,
final KeyPair keypair = TestSslUtils.generateKeyPair("RSA");

final X509Certificate cert = new CertificateBuilder(30, "SHA1withRSA")
.sanDnsName("localhost").generate("CN=mymachine.local, O=A client", keypair);
.sanDnsNames("localhost").generate("CN=mymachine.local, O=A client", keypair);

TestSslUtils.createKeyStore(file.getPath(), new Password(SSL_PASSWORD), alias,
TestSslUtils.createKeyStore(file.getPath(), new Password(SSL_PASSWORD), new Password(SSL_PASSWORD), alias,
keypair.getPrivate(), cert);
certs.put(alias, cert);
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/test/java/io/confluent/rest/SslTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ public void setUp() throws Exception {
private void createKeystoreWithCert(File file, String alias, Map<String, X509Certificate> certs) throws Exception {
KeyPair keypair = TestSslUtils.generateKeyPair("RSA");
CertificateBuilder certificateBuilder = new CertificateBuilder(30, "SHA1withRSA");
X509Certificate cCert = certificateBuilder.sanDnsName("localhost")
X509Certificate cCert = certificateBuilder.sanDnsNames("localhost")
.generate("CN=mymachine.local, O=A client", keypair);
TestSslUtils.createKeyStore(file.getPath(), new Password(SSL_PASSWORD), alias, keypair.getPrivate(), cCert);
TestSslUtils.createKeyStore(file.getPath(), new Password(SSL_PASSWORD), new Password(SSL_PASSWORD),alias, keypair.getPrivate(), cCert);
certs.put(alias, cCert);
}

Expand All @@ -123,9 +123,9 @@ private void enableSslClientAuth(Properties props) {
private void createWrongKeystoreWithCert(File file, String alias, Map<String, X509Certificate> certs) throws Exception {
KeyPair keypair = TestSslUtils.generateKeyPair("RSA");
CertificateBuilder certificateBuilder = new CertificateBuilder(30, "SHA1withRSA");
X509Certificate cCert = certificateBuilder.sanDnsName("fail")
X509Certificate cCert = certificateBuilder.sanDnsNames("fail")
.generate("CN=mymachine.local, O=A client", keypair);
TestSslUtils.createKeyStore(file.getPath(), new Password(SSL_PASSWORD), alias, keypair.getPrivate(), cCert);
TestSslUtils.createKeyStore(file.getPath(), new Password(SSL_PASSWORD), new Password(SSL_PASSWORD), alias, keypair.getPrivate(), cCert);
certs.put(alias, cCert);
}

Expand Down
112 changes: 91 additions & 21 deletions core/src/test/java/io/confluent/rest/TestCustomizeThreadPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,17 @@
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.test.TestUtils;
import org.junit.Ignore;
import org.junit.Test;

import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
Expand Down Expand Up @@ -119,12 +128,79 @@ public void testQueueFull() throws Exception {
}
}

@Test
@Ignore
public void testJettyThreadPoolMetrics() throws Exception {
RestResource.latch = new CountDownLatch(1);
TestCustomizeThreadPoolApplication app = new TestCustomizeThreadPoolApplication();
String uri = app.getUri();
try {
app.start();
assertEquals(0, getIntMetricValue(app.metrics, "request-queue-size"));

//send 18 requests: queueSize (8) + threads (10)
int numThread = 18;
Thread[] threads = sendRequests(uri + "/custom/resource", numThread);
TestUtils.waitForCondition(() -> app.server.getQueueSize() == 8, "Queue is not full");
assertEquals(8, getIntMetricValue(app.metrics, "request-queue-size"));
assertEquals(10, getIntMetricValue(app.metrics, "busy-thread-count"));
assertEquals(1.0, getDoubleMetricValue(app.metrics, "thread-pool-usage"), 0.0);

RestResource.latch.countDown();
for(int i = 0; i < numThread; i++) {
threads[i].join();
}

TestUtils.waitForCondition(() -> app.server.getQueueSize() == 0, "Queue is not empty");
assertEquals(0, getIntMetricValue(app.metrics, "request-queue-size"));
assertTrue(getDoubleMetricValue(app.metrics, "thread-pool-usage") > 0);
assertTrue(getDoubleMetricValue(app.metrics, "thread-pool-usage") < 1);
} finally {
RestResource.latch = null;
app.stop();
}
}

public static int getIntMetricValue(Metrics metrics, String attribute) {
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
Optional<KafkaMetric> metric = allMetrics.entrySet().stream().filter((m) -> {
return m.getKey().name().equals(attribute);
}).map(Map.Entry::getValue).findFirst();
return metric.isPresent() ? (Integer) metric.get().metricValue() : -1;
}

public static double getDoubleMetricValue(Metrics metrics, String attribute) {
Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
Optional<KafkaMetric> metric = allMetrics.entrySet().stream().filter((m) -> {
return m.getKey().name().equals(attribute);
}).map(Map.Entry::getValue).findFirst();
return metric.isPresent() ? (Double) metric.get().metricValue() : -1;
}

/**
* Simulate multiple HTTP clients sending HTTP requests same time. Each client will send one HTTP request.
* The requests will be put in queue if the number of clients are more than the working threads.
* */
@SuppressWarnings("SameParameterValue")
private void makeConcurrentGetRequests(String uri, int numThread, TestCustomizeThreadPoolApplication app) throws Exception {
Thread[] threads = sendRequests(uri, numThread);

long startingTime = System.currentTimeMillis();
while(System.currentTimeMillis() - startingTime < 360*1000) {
log.info("Queue size {}, queue capacity {} ", app.getServer().getQueueSize(), app.getServer().getQueueCapacity());
assertTrue("Number of jobs in queue is not more than capacity of queue ", app.getServer().getQueueSize() <= app.getServer().getQueueCapacity());
Thread.sleep(2000);
if (app.getServer().getQueueSize() == 0)
break;
}

for(int i = 0; i < numThread; i++) {
threads[i].join();
}
log.info("End queue size {}, queue capacity {} ", app.getServer().getQueueSize(), app.getServer().getQueueCapacity());
}

private Thread[] sendRequests(final String uri, final int numThread) {
Thread[] threads = new Thread[numThread];
for(int i = 0; i < numThread; i++) {
threads[i] = new Thread() {
Expand All @@ -134,7 +210,7 @@ public void run() {
CloseableHttpResponse response = null;
try {
response = httpclient.execute(httpget);
HttpStatus.Code statusCode = HttpStatus.getCode(response.getStatusLine().getStatusCode());
Code statusCode = HttpStatus.getCode(response.getStatusLine().getStatusCode());
log.info("Status code {}, reason {} ", statusCode, response.getStatusLine().getReasonPhrase());
assertThat(statusCode, is(Code.OK));
} catch (Exception e) {
Expand All @@ -152,34 +228,28 @@ public void run() {

threads[i].start();
}

long startingTime = System.currentTimeMillis();
while(System.currentTimeMillis() - startingTime < 360*1000) {
log.info("Queue size {}, queue capacity {} ", app.getServer().getQueueSize(), app.getServer().getQueueCapacity());
assertTrue("Number of jobs in queue is not more than capacity of queue ", app.getServer().getQueueSize() <= app.getServer().getQueueCapacity());
Thread.sleep(2000);
if (app.getServer().getQueueSize() == 0)
break;
}

for(int i = 0; i < numThread; i++) {
threads[i].join();
}
log.info("End queue size {}, queue capacity {} ", app.getServer().getQueueSize(), app.getServer().getQueueCapacity());
return threads;
}

@Path("/custom")
@Produces(MediaType.TEXT_PLAIN)
public static class RestResource {

static CountDownLatch latch = null;

@GET
@Path("/resource")
public String get() {
synchronized(locker) {
try {
locker.wait(10000);
} catch (Exception e) {
log.info(e.getMessage());
public String get() throws InterruptedException {
if (latch == null) {
synchronized(locker) {
try {
locker.wait(10000);
} catch (Exception e) {
log.info(e.getMessage());
}
}
} else {
latch.await();
}
return "ThreadPool";
}
Expand Down
4 changes: 3 additions & 1 deletion examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<artifactId>rest-utils-parent</artifactId>
<groupId>io.confluent</groupId>
<version>6.0.2-SNAPSHOT</version>
<version>6.1.0-0</version>
</parent>

<artifactId>rest-utils-examples</artifactId>
Expand All @@ -19,11 +19,13 @@
<dependency>
<groupId>io.confluent</groupId>
<artifactId>rest-utils</artifactId>
<version>${io.confluent.rest-utils.version}</version>
</dependency>

<dependency>
<groupId>io.confluent</groupId>
<artifactId>rest-utils-test</artifactId>
<version>${io.confluent.rest-utils.version}</version>
<scope>test</scope>
</dependency>

Expand Down
3 changes: 2 additions & 1 deletion package/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>io.confluent</groupId>
<artifactId>rest-utils-parent</artifactId>
<version>6.0.2-SNAPSHOT</version>
<version>6.1.0-0</version>
</parent>

<artifactId>rest-utils-package</artifactId>
Expand All @@ -19,6 +19,7 @@
<dependency>
<groupId>io.confluent</groupId>
<artifactId>rest-utils</artifactId>
<version>${io.confluent.rest-utils.version}</version>
</dependency>
</dependencies>

Expand Down
8 changes: 5 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
<parent>
<groupId>io.confluent</groupId>
<artifactId>common</artifactId>
<version>6.0.2-SNAPSHOT</version>
<version>[6.1.0-0, 6.1.1-0)</version>
</parent>

<artifactId>rest-utils-parent</artifactId>
<packaging>pom</packaging>
<name>rest-utils-parent</name>
<version>6.1.0-0</version>
<organization>
<name>Confluent, Inc.</name>
<url>http://confluent.io</url>
Expand Down Expand Up @@ -54,6 +55,7 @@
<jetty.version>9.4.33.v20201020</jetty.version>
<asynchttpclient.version>2.2.0</asynchttpclient.version>
<checkstyle.suppressions.location>checkstyle/suppressions.xml</checkstyle.suppressions.location>
<io.confluent.rest-utils.version>6.1.0-0</io.confluent.rest-utils.version>
</properties>

<repositories>
Expand All @@ -70,12 +72,12 @@
<dependency>
<groupId>io.confluent</groupId>
<artifactId>rest-utils</artifactId>
<version>${project.version}</version>
<version>${io.confluent.rest-utils.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>rest-utils-test</artifactId>
<version>${project.version}</version>
<version>${io.confluent.rest-utils.version}</version>
<scope>test</scope>
</dependency>

Expand Down
Loading