diff --git a/src/main/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClient.java b/src/main/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClient.java index c207958..3f0d3cb 100644 --- a/src/main/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClient.java +++ b/src/main/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClient.java @@ -56,6 +56,7 @@ import org.sourcelab.kafka.connect.apiclient.request.post.PostConnectorTaskRestart; import org.sourcelab.kafka.connect.apiclient.request.put.PutConnectorConfig; import org.sourcelab.kafka.connect.apiclient.request.put.PutConnectorPause; +import org.sourcelab.kafka.connect.apiclient.request.put.PutConnectorStop; import org.sourcelab.kafka.connect.apiclient.request.put.PutConnectorPluginConfigValidate; import org.sourcelab.kafka.connect.apiclient.request.put.PutConnectorResume; import org.sourcelab.kafka.connect.apiclient.request.put.PutConnectorTopicsReset; @@ -284,6 +285,17 @@ public Boolean pauseConnector(final String connectorName) { return submitRequest(new PutConnectorPause(connectorName)); } + /** + * Stop a connector. + * https://docs.confluent.io/current/connect/references/restapi.html#put--connectors-(string-name)-stop + * + * @param connectorName Name of connector to stop. + * @return Boolean true if success. + */ + public Boolean stopConnector(final String connectorName) { + return submitRequest(new PutConnectorStop(connectorName)); + } + /** * Resume a connector. * https://docs.confluent.io/current/connect/references/restapi.html#put--connectors-(string-name)-resume diff --git a/src/main/java/org/sourcelab/kafka/connect/apiclient/request/put/PutConnectorStop.java b/src/main/java/org/sourcelab/kafka/connect/apiclient/request/put/PutConnectorStop.java new file mode 100644 index 0000000..462c7bf --- /dev/null +++ b/src/main/java/org/sourcelab/kafka/connect/apiclient/request/put/PutConnectorStop.java @@ -0,0 +1,54 @@ +/** + * Copyright 2018, 2019, 2020, 2021 SourceLab.org https://github.com/SourceLabOrg/kafka-connect-client + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated + * documentation files (the "Software"), to deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit + * persons to whom the Software is furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial portions of the + * Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE + * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package org.sourcelab.kafka.connect.apiclient.request.put; + +import org.sourcelab.kafka.connect.apiclient.util.UrlEscapingUtil; + +import java.io.IOException; +import java.util.Objects; + +/** + * Defines request to stop a connector. + */ +public final class PutConnectorStop implements PutRequest { + private final String connectorName; + + /** + * Constructor. + * @param connectorName Name of connector + */ + public PutConnectorStop(final String connectorName) { + Objects.requireNonNull(connectorName); + this.connectorName = connectorName; + } + + @Override + public String getApiEndpoint() { + return "/connectors/" + UrlEscapingUtil.escapePath(connectorName) + "/stop"; + } + + @Override + public Object getRequestBody() { + return null; + } + + @Override + public Boolean parseResponse(final String responseStr) throws IOException { + return true; + } +} diff --git a/src/test/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClientTest.java b/src/test/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClientTest.java index a586e1b..6e00f63 100644 --- a/src/test/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClientTest.java +++ b/src/test/java/org/sourcelab/kafka/connect/apiclient/KafkaConnectClientTest.java @@ -192,6 +192,14 @@ public void testPauseConnector() { logger.info("Result: {}", kafkaConnectClient.pauseConnector(connectorName)); } + /** + * Test stopping a connector. + */ + @Test + public void testStopConnector() { + logger.info("Result: {}", kafkaConnectClient.stopConnector(connectorName)); + } + /** * Test pausing a connector. */