diff --git a/codebuild/samples/connect-linux.sh b/codebuild/samples/connect-linux.sh new file mode 100755 index 000000000..bfb18834c --- /dev/null +++ b/codebuild/samples/connect-linux.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +set -e + +env + +pushd $CODEBUILD_SRC_DIR/samples/BasicConnect + +ENDPOINT=$(aws secretsmanager get-secret-value --secret-id "unit-test/endpoint" --query "SecretString" | cut -f2 -d":" | sed -e 's/[\\\"\}]//g') + +mvn compile + +echo "Basic Mqtt (Direct) Connect test" +mvn exec:java -Dexec.mainClass="basicconnect.BasicConnect" -Daws.crt.ci="True" -Dexec.arguments="--endpoint,$ENDPOINT,--key,/tmp/privatekey.pem,--cert,/tmp/certificate.pem" + +popd + +pushd $CODEBUILD_SRC_DIR/samples/WebsocketConnect + +mvn compile + +echo "Websocket Connect test" +mvn exec:java -Dexec.mainClass="websocketconnect.WebsocketConnect" -Daws.crt.ci="True" -Dexec.arguments="--endpoint,$ENDPOINT,--signing_region,us-east-1,--port,443" + +popd diff --git a/codebuild/samples/linux-smoke-tests.yml b/codebuild/samples/linux-smoke-tests.yml index f4abd6974..6b394950b 100644 --- a/codebuild/samples/linux-smoke-tests.yml +++ b/codebuild/samples/linux-smoke-tests.yml @@ -14,6 +14,7 @@ phases: - echo Build started on `date` - $CODEBUILD_SRC_DIR/codebuild/samples/setup-linux.sh - $CODEBUILD_SRC_DIR/codebuild/samples/pubsub-linux.sh + - $CODEBUILD_SRC_DIR/codebuild/samples/connect-linux.sh post_build: commands: - echo Build completed on `date` diff --git a/codebuild/samples/pubsub-linux.sh b/codebuild/samples/pubsub-linux.sh index d8a08524a..be9b572a9 100755 --- a/codebuild/samples/pubsub-linux.sh +++ b/codebuild/samples/pubsub-linux.sh @@ -10,10 +10,7 @@ ENDPOINT=$(aws secretsmanager get-secret-value --secret-id "unit-test/endpoint" mvn compile -echo "Mqtt Direct test" +echo "Basic PubSub test" mvn exec:java -Dexec.mainClass="pubsub.PubSub" -Daws.crt.ci="True" -Dexec.arguments="--endpoint,$ENDPOINT,--key,/tmp/privatekey.pem,--cert,/tmp/certificate.pem" -echo "Websocket test" -mvn exec:java -Dexec.mainClass="pubsub.PubSub" -Daws.crt.ci="True" -Dexec.arguments="--endpoint,$ENDPOINT,--use_websocket,--region,us-east-1,--port,443" - popd diff --git a/pom.xml b/pom.xml index bef673035..7097286a1 100644 --- a/pom.xml +++ b/pom.xml @@ -9,12 +9,15 @@ sdk samples/BasicPubSub + samples/BasicConnect + samples/WebsocketConnect + samples/X509CredentialsProviderConnect + samples/RawConnect + samples/Pkcs11Connect samples/Greengrass samples/Jobs samples/PubSubStress - samples/RawPubSub - samples/Pkcs11PubSub - samples/WindowsCertPubSub + samples/WindowsCertConnect samples/Shadow samples/Identity diff --git a/samples/BasicConnect/pom.xml b/samples/BasicConnect/pom.xml new file mode 100644 index 000000000..1eb09c277 --- /dev/null +++ b/samples/BasicConnect/pom.xml @@ -0,0 +1,54 @@ + + 4.0.0 + software.amazon.awssdk.iotdevicesdk + BasicConnect + jar + 1.0-SNAPSHOT + ${project.groupId}:${project.artifactId} + Java bindings for the AWS IoT Core Service + https://github.com/awslabs/aws-iot-device-sdk-java-v2 + + 1.8 + 1.8 + UTF-8 + + + + software.amazon.awssdk.iotdevicesdk + aws-iot-device-sdk + 1.0.0-SNAPSHOT + + + + + + org.codehaus.mojo + exec-maven-plugin + 1.4.0 + + main + + + + org.codehaus.mojo + build-helper-maven-plugin + 3.2.0 + + + add-source + generate-sources + + add-source + + + + ../Utils/CommandLineUtils + + + + + + + + diff --git a/samples/BasicConnect/src/main/java/basicconnect/BasicConnect.java b/samples/BasicConnect/src/main/java/basicconnect/BasicConnect.java new file mode 100644 index 000000000..5297ce7db --- /dev/null +++ b/samples/BasicConnect/src/main/java/basicconnect/BasicConnect.java @@ -0,0 +1,95 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +package basicconnect; + +import software.amazon.awssdk.crt.CRT; +import software.amazon.awssdk.crt.CrtResource; +import software.amazon.awssdk.crt.CrtRuntimeException; +import software.amazon.awssdk.crt.io.ClientBootstrap; +import software.amazon.awssdk.crt.mqtt.MqttClientConnection; +import software.amazon.awssdk.crt.mqtt.MqttClientConnectionEvents; +import software.amazon.awssdk.iot.iotjobs.model.RejectedError; + +import java.util.concurrent.ExecutionException; + +import utils.commandlineutils.CommandLineUtils; + +public class BasicConnect { + + // When run normally, we want to exit nicely even if something goes wrong + // When run from CI, we want to let an exception escape which in turn causes the + // exec:java task to return a non-zero exit code + static String ciPropValue = System.getProperty("aws.crt.ci"); + static boolean isCI = ciPropValue != null && Boolean.valueOf(ciPropValue); + + static CommandLineUtils cmdUtils; + + static void onRejectedError(RejectedError error) { + System.out.println("Request rejected: " + error.code.toString() + ": " + error.message); + } + + /* + * When called during a CI run, throw an exception that will escape and fail the exec:java task + * When called otherwise, print what went wrong (if anything) and just continue (return from main) + */ + static void onApplicationFailure(Throwable cause) { + if (isCI) { + throw new RuntimeException("BasicConnect execution failure", cause); + } else if (cause != null) { + System.out.println("Exception encountered: " + cause.toString()); + } + } + + public static void main(String[] args) { + + cmdUtils = new CommandLineUtils(); + cmdUtils.registerProgramName("BasicConnect"); + cmdUtils.addCommonMQTTCommands(); + cmdUtils.addCommonProxyCommands(); + cmdUtils.registerCommand("key", "", "Path to your key in PEM format."); + cmdUtils.registerCommand("cert", "", "Path to your client certificate in PEM format."); + cmdUtils.registerCommand("client_id", "", "Client id to use (optional, default='test-*')."); + cmdUtils.registerCommand("port", "", "Port to connect to on the endpoint (optional, default='8883')."); + cmdUtils.sendArguments(args); + + MqttClientConnectionEvents callbacks = new MqttClientConnectionEvents() { + @Override + public void onConnectionInterrupted(int errorCode) { + if (errorCode != 0) { + System.out.println("Connection interrupted: " + errorCode + ": " + CRT.awsErrorString(errorCode)); + } + } + + @Override + public void onConnectionResumed(boolean sessionPresent) { + System.out.println("Connection resumed: " + (sessionPresent ? "existing session" : "clean session")); + } + }; + + try { + + // Create a connection using a certificate and key + // Note: The data for the connection is gotten from cmdUtils. + // (see buildDirectMQTTConnection for implementation) + MqttClientConnection connection = cmdUtils.buildDirectMQTTConnection(callbacks); + if (connection == null) + { + onApplicationFailure(new RuntimeException("MQTT connection creation failed!")); + } + + // Connect and disconnect using the connection we created + // (see sampleConnectAndDisconnect for implementation) + cmdUtils.sampleConnectAndDisconnect(connection); + + } catch (CrtRuntimeException | InterruptedException | ExecutionException ex) { + onApplicationFailure(ex); + } + + CrtResource.waitForNoResources(); + System.out.println("Complete!"); + } + +} diff --git a/samples/BasicPubSub/src/main/java/pubsub/PubSub.java b/samples/BasicPubSub/src/main/java/pubsub/PubSub.java index b3e7f68a2..189631155 100644 --- a/samples/BasicPubSub/src/main/java/pubsub/PubSub.java +++ b/samples/BasicPubSub/src/main/java/pubsub/PubSub.java @@ -37,27 +37,9 @@ public class PubSub { static String ciPropValue = System.getProperty("aws.crt.ci"); static boolean isCI = ciPropValue != null && Boolean.valueOf(ciPropValue); - static String clientId = "test-" + UUID.randomUUID().toString(); - static String rootCaPath; - static String certPath; - static String keyPath; - static String endpoint; static String topic = "test/topic"; static String message = "Hello World!"; static int messagesToPublish = 10; - static int port = 8883; - - static String proxyHost; - static int proxyPort; - static String region = "us-east-1"; - static boolean useWebsockets = false; - static boolean useX509Credentials = false; - static String x509RoleAlias; - static String x509Endpoint; - static String x509Thing; - static String x509CertPath; - static String x509KeyPath; - static String x509RootCaPath; static CommandLineUtils cmdUtils; @@ -82,70 +64,17 @@ public static void main(String[] args) { cmdUtils = new CommandLineUtils(); cmdUtils.registerProgramName("PubSub"); cmdUtils.addCommonMQTTCommands(); + cmdUtils.addCommonTopicMessageCommands(); + cmdUtils.registerCommand("key", "", "Path to your key in PEM format."); + cmdUtils.registerCommand("cert", "", "Path to your client certificate in PEM format."); cmdUtils.registerCommand("client_id", "", "Client id to use (optional, default='test-*')."); cmdUtils.registerCommand("port", "", "Port to connect to on the endpoint (optional, default='8883')."); - cmdUtils.registerCommand("topic", "", "Topic to subscribe/publish to (optional, default='test/topic')."); - cmdUtils.registerCommand("message", "", "Message to publish (optional, default='Hello World')."); cmdUtils.registerCommand("count", "", "Number of messages to publish (optional, default='10')."); - cmdUtils.registerCommand("use_websocket", "", "Use websockets (optional)."); - cmdUtils.registerCommand("x509", "", "Use the x509 credentials provider while using websockets (optional)."); - cmdUtils.registerCommand("x509_role_alias", "", "Role alias to use with the x509 credentials provider (required for x509)."); - cmdUtils.registerCommand("x509_endpoint", "", "Endpoint to fetch x509 credentials from (required for x509)."); - cmdUtils.registerCommand("x509_thing", "", "Thing name to fetch x509 credentials on behalf of (required for x509)."); - cmdUtils.registerCommand("x509_cert", "", "Path to the IoT thing certificate used in fetching x509 credentials (required for x509)."); - cmdUtils.registerCommand("x509_key", "", "Path to the IoT thing private key used in fetching x509 credentials (required for x509)."); - cmdUtils.registerCommand("x509_ca_file", "", "Path to the root certificate used in fetching x509 credentials (required for x509)."); - cmdUtils.registerCommand("proxy_host", "", "Websocket proxy host to use (optional, required if --proxy_port is set)."); - cmdUtils.registerCommand("proxy_port", "", "Websocket proxy port to use (optional, required if --proxy_host is set)."); - cmdUtils.registerCommand("region", "", "AWS IoT service region (optional, default='us-east-1')."); - - cmdUtils.registerCommand("help", "", "Prints this message"); cmdUtils.sendArguments(args); - if (cmdUtils.hasCommand("help")) { - cmdUtils.printHelp(); - System.exit(1); - } - - endpoint = cmdUtils.getCommandRequired("endpoint", ""); - clientId = cmdUtils.getCommandOrDefault("client_id", clientId); - port = Integer.parseInt(cmdUtils.getCommandOrDefault("port", String.valueOf(port))); - rootCaPath = cmdUtils.getCommandOrDefault("root_ca", rootCaPath); - certPath = cmdUtils.getCommandOrDefault("cert", certPath); - keyPath = cmdUtils.getCommandOrDefault("key", keyPath); topic = cmdUtils.getCommandOrDefault("topic", topic); message = cmdUtils.getCommandOrDefault("message", message); messagesToPublish = Integer.parseInt(cmdUtils.getCommandOrDefault("count", String.valueOf(messagesToPublish))); - useWebsockets = cmdUtils.hasCommand("use_websocket"); - useX509Credentials = cmdUtils.hasCommand("x509"); - if (useX509Credentials) { - useWebsockets = true; - } - x509RoleAlias = cmdUtils.getCommandOrDefault("x509_role_alias", x509RoleAlias); - x509Endpoint = cmdUtils.getCommandOrDefault("x509_endpoint", x509Endpoint); - x509Thing = cmdUtils.getCommandOrDefault("x509_thing", x509Thing); - x509CertPath = cmdUtils.getCommandOrDefault("x509_cert", x509CertPath); - x509KeyPath = cmdUtils.getCommandOrDefault("x509_key", x509KeyPath); - x509RootCaPath = cmdUtils.getCommandOrDefault("x509_ca_file", x509RootCaPath); - proxyHost = cmdUtils.getCommandOrDefault("proxy_host", proxyHost); - proxyPort = Integer.parseInt(cmdUtils.getCommandOrDefault("proxy_port", String.valueOf(proxyPort))); - region = cmdUtils.getCommandOrDefault("region", region); - - if (useWebsockets == false) { - if (certPath == null || keyPath == null) { - cmdUtils.printHelp(); - System.out.println("--cert and --key required if not using --use_websocket."); - onApplicationFailure(null); - return; - } - } else if (useX509Credentials) { - if (x509RoleAlias == null || x509Endpoint == null || x509Thing == null || x509CertPath == null || x509KeyPath == null) { - cmdUtils.printHelp(); - System.out.println("--x509_role_alias, --x509_endpoint, --x509_thing, --x509_cert, and --x509_key required if using x509."); - onApplicationFailure(null); - return; - } - } MqttClientConnectionEvents callbacks = new MqttClientConnectionEvents() { @Override @@ -161,92 +90,48 @@ public void onConnectionResumed(boolean sessionPresent) { } }; - try ( - AwsIotMqttConnectionBuilder builder = AwsIotMqttConnectionBuilder.newMtlsBuilderFromPath(certPath, keyPath)) { + try { - if (rootCaPath != null) { - builder.withCertificateAuthorityFromPath(null, rootCaPath); - } - - builder.withConnectionEventCallbacks(callbacks) - .withClientId(clientId) - .withEndpoint(endpoint) - .withPort((short)port) - .withCleanSession(true) - .withProtocolOperationTimeoutMs(60000); - - HttpProxyOptions proxyOptions = null; - if (proxyHost != null && proxyPort > 0) { - proxyOptions = new HttpProxyOptions(); - proxyOptions.setHost(proxyHost); - proxyOptions.setPort(proxyPort); - - builder.withHttpProxyOptions(proxyOptions); + MqttClientConnection connection = cmdUtils.buildMQTTConnection(callbacks); + if (connection == null) + { + onApplicationFailure(new RuntimeException("MQTT connection creation failed!")); } - if (useWebsockets) { - builder.withWebsockets(true); - builder.withWebsocketSigningRegion(region); - - if (useX509Credentials) { - try (TlsContextOptions x509TlsOptions = TlsContextOptions.createWithMtlsFromPath(x509CertPath, x509KeyPath)) { - if (x509RootCaPath != null) { - x509TlsOptions.withCertificateAuthorityFromPath(null, x509RootCaPath); - } - - try (ClientTlsContext x509TlsContext = new ClientTlsContext(x509TlsOptions)) { - X509CredentialsProvider.X509CredentialsProviderBuilder x509builder = new X509CredentialsProvider.X509CredentialsProviderBuilder() - .withTlsContext(x509TlsContext) - .withEndpoint(x509Endpoint) - .withRoleAlias(x509RoleAlias) - .withThingName(x509Thing) - .withProxyOptions(proxyOptions); - try (X509CredentialsProvider provider = x509builder.build()) { - builder.withWebsocketCredentialsProvider(provider); - } - } - } - } + CompletableFuture connected = connection.connect(); + try { + boolean sessionPresent = connected.get(); + System.out.println("Connected to " + (!sessionPresent ? "new" : "existing") + " session!"); + } catch (Exception ex) { + throw new RuntimeException("Exception occurred during connect", ex); } - try(MqttClientConnection connection = builder.build()) { + CountDownLatch countDownLatch = new CountDownLatch(messagesToPublish); - CompletableFuture connected = connection.connect(); - try { - boolean sessionPresent = connected.get(); - System.out.println("Connected to " + (!sessionPresent ? "new" : "existing") + " session!"); - } catch (Exception ex) { - throw new RuntimeException("Exception occurred during connect", ex); - } + CompletableFuture subscribed = connection.subscribe(topic, QualityOfService.AT_LEAST_ONCE, (message) -> { + String payload = new String(message.getPayload(), StandardCharsets.UTF_8); + System.out.println("MESSAGE: " + payload); + countDownLatch.countDown(); + }); - CountDownLatch countDownLatch = new CountDownLatch(messagesToPublish); + subscribed.get(); - CompletableFuture subscribed = connection.subscribe(topic, QualityOfService.AT_LEAST_ONCE, (message) -> { - String payload = new String(message.getPayload(), StandardCharsets.UTF_8); - System.out.println("MESSAGE: " + payload); - countDownLatch.countDown(); - }); + int count = 0; + while (count++ < messagesToPublish) { + CompletableFuture published = connection.publish(new MqttMessage(topic, message.getBytes(), QualityOfService.AT_LEAST_ONCE, false)); + published.get(); + Thread.sleep(1000); + } - subscribed.get(); + countDownLatch.await(); - int count = 0; - while (count++ < messagesToPublish) { - CompletableFuture published = connection.publish(new MqttMessage(topic, message.getBytes(), QualityOfService.AT_LEAST_ONCE, false)); - published.get(); - Thread.sleep(1000); - } - - countDownLatch.await(); - - CompletableFuture disconnected = connection.disconnect(); - disconnected.get(); - } + CompletableFuture disconnected = connection.disconnect(); + disconnected.get(); } catch (CrtRuntimeException | InterruptedException | ExecutionException ex) { onApplicationFailure(ex); } CrtResource.waitForNoResources(); - System.out.println("Complete!"); } } diff --git a/samples/Identity/src/main/java/identity/FleetProvisioningSample.java b/samples/Identity/src/main/java/identity/FleetProvisioningSample.java index cf7231829..ab690139a 100644 --- a/samples/Identity/src/main/java/identity/FleetProvisioningSample.java +++ b/samples/Identity/src/main/java/identity/FleetProvisioningSample.java @@ -37,15 +37,9 @@ import utils.commandlineutils.CommandLineUtils; public class FleetProvisioningSample { - static String clientId = "test-" + UUID.randomUUID().toString(); - static String rootCaPath; - static String certPath; - static String keyPath; - static String endpoint; static String templateName; static String templateParameters; static String csrPath; - static int port = 8883; static CompletableFuture gotResponse; static IotIdentityClient iotIdentityClient; @@ -124,25 +118,15 @@ public static void main(String[] args) { cmdUtils = new CommandLineUtils(); cmdUtils.registerProgramName("FleetProvisioningSample"); cmdUtils.addCommonMQTTCommands(); + cmdUtils.registerCommand("key", "", "Path to your key in PEM format."); + cmdUtils.registerCommand("cert", "", "Path to your client certificate in PEM format."); cmdUtils.registerCommand("client_id", "", "Client id to use (optional, default='test-*')."); cmdUtils.registerCommand("port", "", "Port to connect to on the endpoint (optional, default='8883')."); cmdUtils.registerCommand("template_name", "", "Provisioning template name."); cmdUtils.registerCommand("template_parameters", "", "Provisioning template parameters."); cmdUtils.registerCommand("csr", "", "Path to the CSR file (optional)."); - cmdUtils.registerCommand("help", "", "Prints this message"); cmdUtils.sendArguments(args); - if (cmdUtils.hasCommand("help")) { - cmdUtils.printHelp(); - System.exit(1); - } - - clientId = cmdUtils.getCommandOrDefault("client_id", clientId); - endpoint = cmdUtils.getCommandRequired("endpoint", ""); - port = Integer.parseInt(cmdUtils.getCommandOrDefault("port", String.valueOf(port))); - rootCaPath = cmdUtils.getCommandRequired("root_ca", ""); - certPath = cmdUtils.getCommandRequired("cert", ""); - keyPath = cmdUtils.getCommandRequired("key", ""); templateName = cmdUtils.getCommandRequired("template_name", ""); templateParameters = cmdUtils.getCommandRequired("template_parameters", ""); csrPath = cmdUtils.getCommandOrDefault("csr", csrPath); @@ -161,39 +145,31 @@ public void onConnectionResumed(boolean sessionPresent) { } }; - try ( - AwsIotMqttConnectionBuilder builder = AwsIotMqttConnectionBuilder.newMtlsBuilderFromPath(certPath, keyPath)) { - - builder.withCertificateAuthorityFromPath(null, rootCaPath) - .withEndpoint(endpoint) - .withClientId(clientId) - .withCleanSession(true) - .withConnectionEventCallbacks(callbacks); + try { - try(MqttClientConnection connection = builder.build()) { - iotIdentityClient = new IotIdentityClient(connection); + MqttClientConnection connection = cmdUtils.buildMQTTConnection(callbacks); + iotIdentityClient = new IotIdentityClient(connection); - CompletableFuture connected = connection.connect(); - try { - boolean sessionPresent = connected.get(); - System.out.println("Connected to " + (!sessionPresent ? "new" : "existing") + " session!"); - } catch (Exception ex) { - throw new RuntimeException("Exception occurred during connect", ex); - } + CompletableFuture connected = connection.connect(); + try { + boolean sessionPresent = connected.get(); + System.out.println("Connected to " + (!sessionPresent ? "new" : "existing") + " session!"); + } catch (Exception ex) { + throw new RuntimeException("Exception occurred during connect", ex); + } - try { - if (csrPath == null) { - createKeysAndCertificateWorkflow(); - } else { - createCertificateFromCsrWorkflow(); - } - } catch (Exception e) { - throw new RuntimeException("Exception occurred during connect", e); + try { + if (csrPath == null) { + createKeysAndCertificateWorkflow(); + } else { + createCertificateFromCsrWorkflow(); } - - CompletableFuture disconnected = connection.disconnect(); - disconnected.get(); + } catch (Exception e) { + throw new RuntimeException("Exception occurred during connect", e); } + + CompletableFuture disconnected = connection.disconnect(); + disconnected.get(); } catch (CrtRuntimeException | InterruptedException | ExecutionException ex) { System.out.println("Exception encountered: " + ex.toString()); } diff --git a/samples/Jobs/src/main/java/jobs/JobsSample.java b/samples/Jobs/src/main/java/jobs/JobsSample.java index a416d1b62..0d65181d9 100644 --- a/samples/Jobs/src/main/java/jobs/JobsSample.java +++ b/samples/Jobs/src/main/java/jobs/JobsSample.java @@ -38,13 +38,7 @@ import utils.commandlineutils.CommandLineUtils; public class JobsSample { - static String clientId = "test-" + UUID.randomUUID().toString(); static String thingName; - static String rootCaPath; - static String certPath; - static String keyPath; - static String endpoint; - static int port = 8883; static CompletableFuture gotResponse; static List availableJobs = new LinkedList<>(); @@ -95,24 +89,14 @@ public static void main(String[] args) { cmdUtils = new CommandLineUtils(); cmdUtils.registerProgramName("JobsSample"); cmdUtils.addCommonMQTTCommands(); + cmdUtils.registerCommand("key", "", "Path to your key in PEM format."); + cmdUtils.registerCommand("cert", "", "Path to your client certificate in PEM format."); cmdUtils.registerCommand("client_id", "", "Client id to use (optional, default='test-*')."); cmdUtils.registerCommand("thing_name", "", "The name of the IoT thing."); cmdUtils.registerCommand("port", "", "Port to connect to on the endpoint (optional, default='8883')."); - cmdUtils.registerCommand("help", "", "Prints this message"); cmdUtils.sendArguments(args); - if (cmdUtils.hasCommand("help")) { - cmdUtils.printHelp(); - System.exit(1); - } - - clientId = cmdUtils.getCommandOrDefault("client_id", clientId); thingName = cmdUtils.getCommandRequired("thing_name", ""); - endpoint = cmdUtils.getCommandRequired("endpoint", ""); - port = Integer.parseInt(cmdUtils.getCommandOrDefault("port", String.valueOf(port))); - rootCaPath = cmdUtils.getCommandRequired("root_ca", ""); - certPath = cmdUtils.getCommandRequired("cert", ""); - keyPath = cmdUtils.getCommandRequired("key", ""); MqttClientConnectionEvents callbacks = new MqttClientConnectionEvents() { @Override @@ -128,180 +112,172 @@ public void onConnectionResumed(boolean sessionPresent) { } }; - try( - AwsIotMqttConnectionBuilder builder = AwsIotMqttConnectionBuilder.newMtlsBuilderFromPath(certPath, keyPath)) { + try { - builder.withCertificateAuthorityFromPath(null, rootCaPath) - .withEndpoint(endpoint) - .withClientId(clientId) - .withCleanSession(true) - .withConnectionEventCallbacks(callbacks); + MqttClientConnection connection = cmdUtils.buildMQTTConnection(callbacks); + IotJobsClient jobs = new IotJobsClient(connection); - try(MqttClientConnection connection = builder.build()) { - IotJobsClient jobs = new IotJobsClient(connection); + CompletableFuture connected = connection.connect(); + try { + boolean sessionPresent = connected.get(); + System.out.println("Connected to " + (!sessionPresent ? "new" : "existing") + " session!"); + } catch (Exception ex) { + throw new RuntimeException("Exception occurred during connect", ex); + } + + { + gotResponse = new CompletableFuture<>(); + GetPendingJobExecutionsSubscriptionRequest subscriptionRequest = new GetPendingJobExecutionsSubscriptionRequest(); + subscriptionRequest.thingName = thingName; + CompletableFuture subscribed = jobs.SubscribeToGetPendingJobExecutionsAccepted( + subscriptionRequest, + QualityOfService.AT_LEAST_ONCE, + JobsSample::onGetPendingJobExecutionsAccepted); + try { + subscribed.get(); + System.out.println("Subscribed to GetPendingJobExecutionsAccepted"); + } catch (Exception ex) { + throw new RuntimeException("Failed to subscribe to GetPendingJobExecutions", ex); + } - CompletableFuture connected = connection.connect(); + subscribed = jobs.SubscribeToGetPendingJobExecutionsRejected( + subscriptionRequest, + QualityOfService.AT_LEAST_ONCE, + JobsSample::onRejectedError); + subscribed.get(); + System.out.println("Subscribed to GetPendingJobExecutionsRejected"); + + GetPendingJobExecutionsRequest publishRequest = new GetPendingJobExecutionsRequest(); + publishRequest.thingName = thingName; + CompletableFuture published = jobs.PublishGetPendingJobExecutions( + publishRequest, + QualityOfService.AT_LEAST_ONCE); try { - boolean sessionPresent = connected.get(); - System.out.println("Connected to " + (!sessionPresent ? "new" : "existing") + " session!"); + published.get(); + gotResponse.get(); } catch (Exception ex) { - throw new RuntimeException("Exception occurred during connect", ex); + throw new RuntimeException("Exception occurred during publish", ex); } + } + if (availableJobs.isEmpty()) { + System.out.println("No jobs queued, no further work to do"); + } + + for (String jobId : availableJobs) { + gotResponse = new CompletableFuture<>(); + DescribeJobExecutionSubscriptionRequest subscriptionRequest = new DescribeJobExecutionSubscriptionRequest(); + subscriptionRequest.thingName = thingName; + subscriptionRequest.jobId = jobId; + jobs.SubscribeToDescribeJobExecutionAccepted( + subscriptionRequest, + QualityOfService.AT_LEAST_ONCE, + JobsSample::onDescribeJobExecutionAccepted); + jobs.SubscribeToDescribeJobExecutionRejected( + subscriptionRequest, + QualityOfService.AT_LEAST_ONCE, + JobsSample::onRejectedError); + + DescribeJobExecutionRequest publishRequest = new DescribeJobExecutionRequest(); + publishRequest.thingName = thingName; + publishRequest.jobId = jobId; + publishRequest.includeJobDocument = true; + publishRequest.executionNumber = 1L; + jobs.PublishDescribeJobExecution(publishRequest, QualityOfService.AT_LEAST_ONCE); + gotResponse.get(); + } + + for (int jobIdx = 0; jobIdx < availableJobs.size(); ++jobIdx) { { gotResponse = new CompletableFuture<>(); - GetPendingJobExecutionsSubscriptionRequest subscriptionRequest = new GetPendingJobExecutionsSubscriptionRequest(); + + // Start the next pending job + StartNextPendingJobExecutionSubscriptionRequest subscriptionRequest = new StartNextPendingJobExecutionSubscriptionRequest(); subscriptionRequest.thingName = thingName; - CompletableFuture subscribed = jobs.SubscribeToGetPendingJobExecutionsAccepted( + + jobs.SubscribeToStartNextPendingJobExecutionAccepted( subscriptionRequest, QualityOfService.AT_LEAST_ONCE, - JobsSample::onGetPendingJobExecutionsAccepted); - try { - subscribed.get(); - System.out.println("Subscribed to GetPendingJobExecutionsAccepted"); - } catch (Exception ex) { - throw new RuntimeException("Failed to subscribe to GetPendingJobExecutions", ex); - } - - subscribed = jobs.SubscribeToGetPendingJobExecutionsRejected( + JobsSample::onStartNextPendingJobExecutionAccepted); + jobs.SubscribeToStartNextPendingJobExecutionRejected( subscriptionRequest, QualityOfService.AT_LEAST_ONCE, JobsSample::onRejectedError); - subscribed.get(); - System.out.println("Subscribed to GetPendingJobExecutionsRejected"); - GetPendingJobExecutionsRequest publishRequest = new GetPendingJobExecutionsRequest(); + StartNextPendingJobExecutionRequest publishRequest = new StartNextPendingJobExecutionRequest(); publishRequest.thingName = thingName; - CompletableFuture published = jobs.PublishGetPendingJobExecutions( - publishRequest, - QualityOfService.AT_LEAST_ONCE); - try { - published.get(); - gotResponse.get(); - } catch (Exception ex) { - throw new RuntimeException("Exception occurred during publish", ex); - } - } + publishRequest.stepTimeoutInMinutes = 15L; + jobs.PublishStartNextPendingJobExecution(publishRequest, QualityOfService.AT_LEAST_ONCE); - if (availableJobs.isEmpty()) { - System.out.println("No jobs queued, no further work to do"); + gotResponse.get(); } - for (String jobId : availableJobs) { + { + // Update the service to let it know we're executing gotResponse = new CompletableFuture<>(); - DescribeJobExecutionSubscriptionRequest subscriptionRequest = new DescribeJobExecutionSubscriptionRequest(); + + UpdateJobExecutionSubscriptionRequest subscriptionRequest = new UpdateJobExecutionSubscriptionRequest(); subscriptionRequest.thingName = thingName; - subscriptionRequest.jobId = jobId; - jobs.SubscribeToDescribeJobExecutionAccepted( + subscriptionRequest.jobId = currentJobId; + jobs.SubscribeToUpdateJobExecutionAccepted( subscriptionRequest, QualityOfService.AT_LEAST_ONCE, - JobsSample::onDescribeJobExecutionAccepted); - jobs.SubscribeToDescribeJobExecutionRejected( + (response) -> { + System.out.println("Marked job " + currentJobId + " IN_PROGRESS"); + gotResponse.complete(null); + }); + jobs.SubscribeToUpdateJobExecutionRejected( subscriptionRequest, QualityOfService.AT_LEAST_ONCE, JobsSample::onRejectedError); - DescribeJobExecutionRequest publishRequest = new DescribeJobExecutionRequest(); + UpdateJobExecutionRequest publishRequest = new UpdateJobExecutionRequest(); publishRequest.thingName = thingName; - publishRequest.jobId = jobId; - publishRequest.includeJobDocument = true; - publishRequest.executionNumber = 1L; - jobs.PublishDescribeJobExecution(publishRequest, QualityOfService.AT_LEAST_ONCE); + publishRequest.jobId = currentJobId; + publishRequest.executionNumber = currentExecutionNumber; + publishRequest.status = JobStatus.IN_PROGRESS; + publishRequest.expectedVersion = currentVersionNumber++; + jobs.PublishUpdateJobExecution(publishRequest, QualityOfService.AT_LEAST_ONCE); + gotResponse.get(); } - for (int jobIdx = 0; jobIdx < availableJobs.size(); ++jobIdx) { - { - gotResponse = new CompletableFuture<>(); - - // Start the next pending job - StartNextPendingJobExecutionSubscriptionRequest subscriptionRequest = new StartNextPendingJobExecutionSubscriptionRequest(); - subscriptionRequest.thingName = thingName; - - jobs.SubscribeToStartNextPendingJobExecutionAccepted( - subscriptionRequest, - QualityOfService.AT_LEAST_ONCE, - JobsSample::onStartNextPendingJobExecutionAccepted); - jobs.SubscribeToStartNextPendingJobExecutionRejected( - subscriptionRequest, - QualityOfService.AT_LEAST_ONCE, - JobsSample::onRejectedError); - - StartNextPendingJobExecutionRequest publishRequest = new StartNextPendingJobExecutionRequest(); - publishRequest.thingName = thingName; - publishRequest.stepTimeoutInMinutes = 15L; - jobs.PublishStartNextPendingJobExecution(publishRequest, QualityOfService.AT_LEAST_ONCE); - - gotResponse.get(); - } - - { - // Update the service to let it know we're executing - gotResponse = new CompletableFuture<>(); - - UpdateJobExecutionSubscriptionRequest subscriptionRequest = new UpdateJobExecutionSubscriptionRequest(); - subscriptionRequest.thingName = thingName; - subscriptionRequest.jobId = currentJobId; - jobs.SubscribeToUpdateJobExecutionAccepted( - subscriptionRequest, - QualityOfService.AT_LEAST_ONCE, - (response) -> { - System.out.println("Marked job " + currentJobId + " IN_PROGRESS"); - gotResponse.complete(null); - }); - jobs.SubscribeToUpdateJobExecutionRejected( - subscriptionRequest, - QualityOfService.AT_LEAST_ONCE, - JobsSample::onRejectedError); - - UpdateJobExecutionRequest publishRequest = new UpdateJobExecutionRequest(); - publishRequest.thingName = thingName; - publishRequest.jobId = currentJobId; - publishRequest.executionNumber = currentExecutionNumber; - publishRequest.status = JobStatus.IN_PROGRESS; - publishRequest.expectedVersion = currentVersionNumber++; - jobs.PublishUpdateJobExecution(publishRequest, QualityOfService.AT_LEAST_ONCE); - - gotResponse.get(); - } - - // Fake doing something - Thread.sleep(1000); - - { - // Update the service to let it know we're done - gotResponse = new CompletableFuture<>(); - - UpdateJobExecutionSubscriptionRequest subscriptionRequest = new UpdateJobExecutionSubscriptionRequest(); - subscriptionRequest.thingName = thingName; - subscriptionRequest.jobId = currentJobId; - jobs.SubscribeToUpdateJobExecutionAccepted( - subscriptionRequest, - QualityOfService.AT_LEAST_ONCE, - (response) -> { - System.out.println("Marked job " + currentJobId + " SUCCEEDED"); - gotResponse.complete(null); - }); - jobs.SubscribeToUpdateJobExecutionRejected( - subscriptionRequest, - QualityOfService.AT_LEAST_ONCE, - JobsSample::onRejectedError); - - UpdateJobExecutionRequest publishRequest = new UpdateJobExecutionRequest(); - publishRequest.thingName = thingName; - publishRequest.jobId = currentJobId; - publishRequest.executionNumber = currentExecutionNumber; - publishRequest.status = JobStatus.SUCCEEDED; - publishRequest.expectedVersion = currentVersionNumber++; - jobs.PublishUpdateJobExecution(publishRequest, QualityOfService.AT_LEAST_ONCE); - - gotResponse.get(); - } - } + // Fake doing something + Thread.sleep(1000); + + { + // Update the service to let it know we're done + gotResponse = new CompletableFuture<>(); - CompletableFuture disconnected = connection.disconnect(); - disconnected.get(); + UpdateJobExecutionSubscriptionRequest subscriptionRequest = new UpdateJobExecutionSubscriptionRequest(); + subscriptionRequest.thingName = thingName; + subscriptionRequest.jobId = currentJobId; + jobs.SubscribeToUpdateJobExecutionAccepted( + subscriptionRequest, + QualityOfService.AT_LEAST_ONCE, + (response) -> { + System.out.println("Marked job " + currentJobId + " SUCCEEDED"); + gotResponse.complete(null); + }); + jobs.SubscribeToUpdateJobExecutionRejected( + subscriptionRequest, + QualityOfService.AT_LEAST_ONCE, + JobsSample::onRejectedError); + + UpdateJobExecutionRequest publishRequest = new UpdateJobExecutionRequest(); + publishRequest.thingName = thingName; + publishRequest.jobId = currentJobId; + publishRequest.executionNumber = currentExecutionNumber; + publishRequest.status = JobStatus.SUCCEEDED; + publishRequest.expectedVersion = currentVersionNumber++; + jobs.PublishUpdateJobExecution(publishRequest, QualityOfService.AT_LEAST_ONCE); + + gotResponse.get(); + } } + + CompletableFuture disconnected = connection.disconnect(); + disconnected.get(); } catch (CrtRuntimeException | InterruptedException | ExecutionException ex) { System.out.println("Exception encountered: " + ex.toString()); } diff --git a/samples/Pkcs11PubSub/pom.xml b/samples/Pkcs11Connect/pom.xml similarity index 98% rename from samples/Pkcs11PubSub/pom.xml rename to samples/Pkcs11Connect/pom.xml index 0945187dc..d87816823 100644 --- a/samples/Pkcs11PubSub/pom.xml +++ b/samples/Pkcs11Connect/pom.xml @@ -2,7 +2,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> 4.0.0 software.amazon.awssdk.iotdevicesdk - Pkcs11PubSub + Pkcs11Connect jar 1.0-SNAPSHOT ${project.groupId}:${project.artifactId} diff --git a/samples/Pkcs11PubSub/src/main/java/pkcs11pubsub/Pkcs11PubSub.java b/samples/Pkcs11Connect/src/main/java/pkcs11connect/Pkcs11Connect.java similarity index 65% rename from samples/Pkcs11PubSub/src/main/java/pkcs11pubsub/Pkcs11PubSub.java rename to samples/Pkcs11Connect/src/main/java/pkcs11connect/Pkcs11Connect.java index 8a9e62515..6781451a8 100644 --- a/samples/Pkcs11PubSub/src/main/java/pkcs11pubsub/Pkcs11PubSub.java +++ b/samples/Pkcs11Connect/src/main/java/pkcs11connect/Pkcs11Connect.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0. */ -package pkcs11pubsub; +package pkcs11connect; import software.amazon.awssdk.crt.*; import software.amazon.awssdk.crt.io.*; @@ -18,28 +18,13 @@ import utils.commandlineutils.CommandLineUtils; -public class Pkcs11PubSub { - +public class Pkcs11Connect { // When run normally, we want to exit nicely even if something goes wrong // When run from CI, we want to let an exception escape which in turn causes the // exec:java task to return a non-zero exit code static String ciPropValue = System.getProperty("aws.crt.ci"); static boolean isCI = ciPropValue != null && Boolean.valueOf(ciPropValue); - static String clientId = "test-" + UUID.randomUUID().toString(); - static String rootCaPath; - static String certPath; - static String endpoint; - static String pkcs11LibPath; - static String pkcs11UserPin; - static String pkcs11TokenLabel; - static Long pkcs11SlotId; - static String pkcs11KeyLabel; - static String topic = "test/topic"; - static String message = "Hello World!"; - static int messagesToPublish = 10; - static int port = 8883; - static CommandLineUtils cmdUtils; /* @@ -49,7 +34,7 @@ public class Pkcs11PubSub { */ static void onApplicationFailure(Throwable cause) { if (isCI) { - throw new RuntimeException("BasicPubSub execution failure", cause); + throw new RuntimeException("Pkcs11PubSub execution failure", cause); } else if (cause != null) { System.out.println("Exception encountered: " + cause.toString()); } @@ -60,7 +45,7 @@ public static void main(String[] args) { cmdUtils = new CommandLineUtils(); cmdUtils.registerProgramName("Pkcs11PubSub"); cmdUtils.addCommonMQTTCommands(); - cmdUtils.removeCommand("key"); + cmdUtils.registerCommand("cert", "", "Path to your client certificate in PEM format."); cmdUtils.registerCommand("client_id", "", "Client id to use (optional, default='test-*')."); cmdUtils.registerCommand("port", "", "Port to connect to on the endpoint (optional, default='8883')."); cmdUtils.registerCommand("pkcs11_lib", "", "Path to PKCS#11 library."); @@ -68,9 +53,6 @@ public static void main(String[] args) { cmdUtils.registerCommand("token_label", "", "Label of PKCS#11 token to use (optional)."); cmdUtils.registerCommand("slot_id", "", "Slot ID containing PKCS#11 token to use (optional)."); cmdUtils.registerCommand("key_label", "", "Label of private key on the PKCS#11 token (optional)."); - cmdUtils.registerCommand("topic", "", "Topic to subscribe/publish to (optional, default='test/topic')."); - cmdUtils.registerCommand("message", "", "Message to publish (optional, default='Hello World')."); - cmdUtils.registerCommand("count", "", "Number of messages to publish (optional, default='10')."); cmdUtils.registerCommand("help", "", "Prints this message"); cmdUtils.sendArguments(args); @@ -79,17 +61,19 @@ public static void main(String[] args) { System.exit(1); } - endpoint = cmdUtils.getCommandRequired("endpoint", ""); - certPath = cmdUtils.getCommandRequired("cert", ""); - rootCaPath = cmdUtils.getCommandOrDefault("root_ca", rootCaPath); - clientId = cmdUtils.getCommandOrDefault("client_id", clientId); - port = Integer.parseInt(cmdUtils.getCommandOrDefault("port", String.valueOf(port))); - pkcs11LibPath = cmdUtils.getCommandRequired("pkcs11_lib", ""); - pkcs11UserPin = cmdUtils.getCommandRequired("pin", ""); - pkcs11TokenLabel = cmdUtils.getCommandOrDefault("key_label", pkcs11TokenLabel); - topic = cmdUtils.getCommandOrDefault("topic", topic); - message = cmdUtils.getCommandOrDefault("message", message); - messagesToPublish = Integer.parseInt(cmdUtils.getCommandOrDefault("count", String.valueOf(messagesToPublish))); + String endpoint = cmdUtils.getCommandRequired("endpoint", ""); + String certPath = cmdUtils.getCommandRequired("cert", ""); + String CaPath = cmdUtils.getCommandOrDefault("ca_file", ""); + String clientId = cmdUtils.getCommandOrDefault("client_id", "test-" + UUID.randomUUID().toString()); + int port = Integer.parseInt(cmdUtils.getCommandOrDefault("port", "8883")); + String pkcs11LibPath = cmdUtils.getCommandRequired("pkcs11_lib", ""); + String pkcs11UserPin = cmdUtils.getCommandRequired("pin", ""); + String pkcs11TokenLabel = cmdUtils.getCommandOrDefault("key_label", ""); + Long pkcs11SlotId = null; + if (cmdUtils.hasCommand("slot_id")) { + Long.parseLong(cmdUtils.getCommandOrDefault("slot_id", "-1")); + } + String pkcs11KeyLabel = cmdUtils.getCommandOrDefault("key_label", ""); MqttClientConnectionEvents callbacks = new MqttClientConnectionEvents() { @Override @@ -119,7 +103,7 @@ public void onConnectionResumed(boolean sessionPresent) { // are multiple tokens, or multiple keys to choose from, you // must narrow down which one should be used. - if (pkcs11TokenLabel != null) { + if (pkcs11TokenLabel != null && pkcs11TokenLabel != "") { pkcs11Options.withTokenLabel(pkcs11TokenLabel); } @@ -127,15 +111,15 @@ public void onConnectionResumed(boolean sessionPresent) { pkcs11Options.withSlotId(pkcs11SlotId); } - if (pkcs11KeyLabel != null) { + if (pkcs11KeyLabel != null && pkcs11KeyLabel != "") { pkcs11Options.withPrivateKeyObjectLabel(pkcs11KeyLabel); } try (AwsIotMqttConnectionBuilder builder = AwsIotMqttConnectionBuilder .newMtlsPkcs11Builder(pkcs11Options)) { - if (rootCaPath != null) { - builder.withCertificateAuthorityFromPath(null, rootCaPath); + if (CaPath != null) { + builder.withCertificateAuthorityFromPath(null, CaPath); } builder.withConnectionEventCallbacks(callbacks) @@ -154,38 +138,16 @@ public void onConnectionResumed(boolean sessionPresent) { } catch (Exception ex) { throw new RuntimeException("Exception occurred during connect", ex); } - - CountDownLatch countDownLatch = new CountDownLatch(messagesToPublish); - - CompletableFuture subscribed = connection.subscribe(topic, QualityOfService.AT_LEAST_ONCE, - (message) -> { - String payload = new String(message.getPayload(), StandardCharsets.UTF_8); - System.out.println("MESSAGE: " + payload); - countDownLatch.countDown(); - }); - - subscribed.get(); - - int count = 0; - while (count++ < messagesToPublish) { - CompletableFuture published = connection.publish( - new MqttMessage(topic, message.getBytes(), QualityOfService.AT_LEAST_ONCE, false)); - published.get(); - Thread.sleep(1000); - } - - countDownLatch.await(); - + System.out.println("Disconnecting..."); CompletableFuture disconnected = connection.disconnect(); disconnected.get(); + System.out.println("Disconnected."); } } catch (CrtRuntimeException | InterruptedException | ExecutionException ex) { onApplicationFailure(ex); } } - CrtResource.waitForNoResources(); - System.out.println("Complete!"); } } diff --git a/samples/PubSubStress/src/main/java/pubsubstress/PubSubStress.java b/samples/PubSubStress/src/main/java/pubsubstress/PubSubStress.java index 7b8288014..f0ef4775e 100644 --- a/samples/PubSubStress/src/main/java/pubsubstress/PubSubStress.java +++ b/samples/PubSubStress/src/main/java/pubsubstress/PubSubStress.java @@ -27,23 +27,13 @@ public class PubSubStress { private static final int PROGRESS_OP_COUNT = 100; static String clientId = "test-" + UUID.randomUUID().toString(); - static String rootCaPath; - static String certPath; - static String keyPath; - static String endpoint; static String topic = "test/topic"; static String message = "Hello World!"; static int messagesToPublish = 5000; - static boolean showHelp = false; static int connectionCount = 1000; static int eventLoopThreadCount = 1; static int testIterations = 1; - static String region = "us-east-1"; - static String proxyHost; - static int proxyPort; - static boolean useWebsockets; - private static Map connections = new HashMap<>(); private static List validClientIds = new ArrayList<>(); private static List validTopics = new ArrayList<>(); @@ -64,7 +54,7 @@ public ConnectionState() {} public CompletableFuture subscribeFuture; } - static void initConnections(AwsIotMqttConnectionBuilder builder) { + static void initConnections() { List connectionsInProgress = new ArrayList<>(); for (int i = 0; i < connectionCount; ++i) { @@ -83,11 +73,7 @@ public void onConnectionResumed(boolean sessionPresent) { }; String newClientId = String.format("%s%d", clientId, i); - - builder.withClientId(newClientId) - .withConnectionEventCallbacks(callbacks); - - MqttClientConnection connection = builder.build(); + MqttClientConnection connection = cmdUtils.buildMQTTConnection(callbacks); try { ConnectionState connectionState = new ConnectionState(); @@ -202,76 +188,33 @@ public static void main(String[] args) { cmdUtils = new CommandLineUtils(); cmdUtils.registerProgramName("PubSubStress"); cmdUtils.addCommonMQTTCommands(); + cmdUtils.addCommonTopicMessageCommands(); + cmdUtils.registerCommand("key", "", "Path to your key in PEM format."); + cmdUtils.registerCommand("cert", "", "Path to your client certificate in PEM format."); cmdUtils.registerCommand("client_id", "", "Client id to use (optional, default='test-*')"); - cmdUtils.registerCommand("topic", "", "Topic to subscribe/publish to (optional, default='test/topic')."); - cmdUtils.registerCommand("message", "", "Message to publish (optional, default='Hello World')."); cmdUtils.registerCommand("count", "", "Number of messages to publish (optional, default='10')."); cmdUtils.registerCommand("connections", "", "Number of connections to make (optional, default='1000')."); cmdUtils.registerCommand("threads", "", "Number of IO threads to use (optional, default='1')."); cmdUtils.registerCommand("iterations", "", "Number of times to repeat the basic stress test logic (optional, default='1')."); - cmdUtils.registerCommand("use_websockets", "", "Use websockets (optional)."); + cmdUtils.registerCommand("signing_region", "", "Use websockets (optional). Sets the Websocket signing region to use (default='us-east-1')."); cmdUtils.registerCommand("proxy_host", "", "Websocket proxy host to use (optional, required for websockets)."); cmdUtils.registerCommand("proxy_port", "", "Websocket proxy port to use (optional, required for websockets)."); - cmdUtils.registerCommand("region", "", "Websocket signing region to use (optional, default='us-east-1')."); - cmdUtils.registerCommand("help", "", "Prints this message"); cmdUtils.sendArguments(args); - if (cmdUtils.hasCommand("help")) { - cmdUtils.printHelp(); - System.exit(1); - } - - endpoint = cmdUtils.getCommandRequired("endpoint", ""); - rootCaPath = cmdUtils.getCommandOrDefault("root_ca", rootCaPath); - certPath = cmdUtils.getCommandOrDefault("cert", certPath); - keyPath = cmdUtils.getCommandOrDefault("key", keyPath); - clientId = cmdUtils.getCommandOrDefault("client_id", clientId); topic = cmdUtils.getCommandOrDefault("topic", topic); message = cmdUtils.getCommandOrDefault("message", message); messagesToPublish = Integer.parseInt(cmdUtils.getCommandOrDefault("count", String.valueOf(messagesToPublish))); connectionCount = Integer.parseInt(cmdUtils.getCommandOrDefault("connections", String.valueOf(connectionCount))); eventLoopThreadCount = Integer.parseInt(cmdUtils.getCommandOrDefault("threads", String.valueOf(eventLoopThreadCount))); testIterations = Integer.parseInt(cmdUtils.getCommandOrDefault("iterations", String.valueOf(testIterations))); - useWebsockets = cmdUtils.hasCommand("use_websockets"); - proxyHost = cmdUtils.getCommandOrDefault("proxy_host", proxyHost); - proxyPort = Integer.parseInt(cmdUtils.getCommandOrDefault("proxy_port", String.valueOf(proxyPort))); - region = cmdUtils.getCommandOrDefault("region", region); - - if (!useWebsockets) { - if (certPath == null || keyPath == null) { - cmdUtils.printHelp(); - System.out.println("--cert and --key required if not using websockets."); - System.exit(-1); - } - } int iteration = 0; while(iteration < testIterations) { System.out.println(String.format("Starting iteration %d", iteration)); - try ( - AwsIotMqttConnectionBuilder builder = AwsIotMqttConnectionBuilder.newMtlsBuilderFromPath(certPath, keyPath)) { - - builder.withCertificateAuthorityFromPath(null, rootCaPath) - .withEndpoint(endpoint) - .withCleanSession(true) - .withProtocolOperationTimeoutMs(10000); - - if (proxyHost != null && proxyPort > 0) { - HttpProxyOptions proxyOptions = new HttpProxyOptions(); - proxyOptions.setHost(proxyHost); - proxyOptions.setPort(proxyPort); - - builder.withHttpProxyOptions(proxyOptions); - } - - if (useWebsockets) { - builder.withWebsockets(true); - builder.withWebsocketSigningRegion(region); - } - + try { try { - initConnections(builder); + initConnections(); Log.log(Log.LogLevel.Info, Log.LogSubject.MqttGeneral, "START OF PUBLISH......"); diff --git a/samples/README.md b/samples/README.md index 144b0d243..067c36dbb 100644 --- a/samples/README.md +++ b/samples/README.md @@ -1,8 +1,11 @@ # Sample apps for the AWS IoT Device SDK for Java v2 * [BasicPubSub](#basicpubsub) -* [Pkcs11PubSub](#pkcs11pubsub) -* [WindowsCertPubSub](#windowscertpubsub) +* [Basic Connect](#basic-connect) +* [Websocket Connect](#websocket-connect) +* [Pkcs11 Connect](#pkcs11-connect) +* [Raw Connect](#raw-connect) +* [WindowsCert Connect](#windowscert-connect) * [Shadow](#shadow) * [Jobs](#jobs) * [fleet provisioning](#fleet-provisioning) @@ -11,7 +14,6 @@ **Additional sample apps not described below:** * [PubSubStress](https://github.com/aws/aws-iot-device-sdk-java-v2/tree/main/samples/PubSubStress) -* [RawPubSub](https://github.com/aws/aws-iot-device-sdk-java-v2/tree/main/samples/RawPubSub) Note that all samples will show their options by passing in `--help`. For example: ```sh @@ -42,34 +44,137 @@ mvn compile exec:java -pl samples/BasicPubSub -Daws.crt.debugnative=true -Daws.c ## BasicPubSub -This sample demonstrates connecting to IoT Core, subscribing to a topic, and publishing to that topic. +This sample uses the +[Message Broker](https://docs.aws.amazon.com/iot/latest/developerguide/iot-message-broker.html) +for AWS IoT to send and receive messages through an MQTT connection. +On startup, the device connects to the server, subscribes to a topic, and begins publishing messages to that topic. The device should receive those same messages back from the message broker, since it is subscribed to that same topic. Status updates are continually printed to the console. source: `samples/BasicPubSub` -To Run: +
+(see sample policy) +
+{
+  "Version": "2012-10-17",
+  "Statement": [
+    {
+      "Effect": "Allow",
+      "Action": [
+        "iot:Publish",
+        "iot:Receive"
+      ],
+      "Resource": [
+        "arn:aws:iot:region:account:topic/test/topic"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": [
+        "iot:Subscribe"
+      ],
+      "Resource": [
+        "arn:aws:iot:region:account:topicfilter/test/topic"
+      ]
+    },
+    {
+      "Effect": "Allow",
+      "Action": [
+        "iot:Connect"
+      ],
+      "Resource": [
+        "arn:aws:iot:region:account:client/test-*"
+      ]
+    }
+  ]
+}
+
+
+ +To Run this sample, use the following command: ```sh # Windows Platform: Windows command prompt does not support single quote, please use double quote. -mvn compile exec:java -pl samples/BasicPubSub -Dexec.mainClass=pubsub.PubSub -Dexec.args='--endpoint --cert --key --ca_file ' +mvn compile exec:java -pl samples/BasicPubSub -Dexec.mainClass=pubsub.PubSub -Dexec.args='--endpoint --cert --key --ca_file ' ``` -The sample can connect to IoT Core in several ways: +## Basic Connect -1) To connect directly with the MQTT protocol, using a certificate and private key for mutual TLS, - you must pass `--cert` and `--key`. -2) To connect with MQTT over websockets, using your AWS credentials for authentication, - you must pass `--use_websocket` and `--region`. -3) To connect with MQTT over websockets, using the - [X.509 credentials provider](https://docs.aws.amazon.com/iot/latest/developerguide/authorizing-direct-aws.html) - for authentication, you must pass `--use_websocket`, `--region`, `--x509`, `--x509_role_alias`, `--x509_endpoint`, `--x509_thing`, `--x509_cert`, and `--x509_key`. +This sample makes an MQTT connection using a certificate and key file. On startup, the device connects to the server using the certificate and key files, and then disconnects. This sample is for reference on connecting via certificate and key files. -## Pkcs11PubSub +Source: `samples/BasicConnect` -This sample shows connecting to IoT Core using mutual TLS, -with the private key stored on a PKCS#11 compatible smart card or Hardware Security Module (HSM) +Your Thing's [Policy](https://docs.aws.amazon.com/iot/latest/developerguide/iot-policies.html) must provide privileges for this sample to connect. + +
+(see sample policy) +
+{
+  "Version": "2012-10-17",
+  "Statement": [
+    {
+      "Effect": "Allow",
+      "Action": [
+        "iot:Connect"
+      ],
+      "Resource": [
+        "arn:aws:iot:region:account:client/test-*"
+      ]
+    }
+  ]
+}
+
+
+ +To run the basic connect sample use the following command: + +```sh +mvn compile exec:java -pl samples/BasicConnect -Dexec.mainClass=basicconnect.BasicConnect -Dexec.args='--endpoint --cert --key --ca_file ' +``` + +## Websocket Connect + +This sample makes an MQTT connection via websockets and then disconnects. On startup, the device connects to the server via websockets and then disconnects. This sample is for reference on connecting via websockets. + +Source: `samples/WebsocketConnect` + +Your Thing's [Policy](https://docs.aws.amazon.com/iot/latest/developerguide/iot-policies.html) must provide privileges for this sample to connect. -Currently, TLS integration with PKCS#11 is only available on Unix devices. +
+(see sample policy) +
+{
+  "Version": "2012-10-17",
+  "Statement": [
+    {
+      "Effect": "Allow",
+      "Action": [
+        "iot:Connect"
+      ],
+      "Resource": [
+        "arn:aws:iot:region:account:client/test-*"
+      ]
+    }
+  ]
+}
+
+
+ +To run the websocket connect use the following command: + +```sh +mvn compile exec:java -pl samples/WebsocketConnect -Dexec.mainClass=websocketconnect.WebsocketConnect -Dexec.args='--endpoint --cert --key --ca_file ' +``` + +Note that using Websockets will attempt to fetch the AWS credentials from your enviornment variables or local files. +See the [authorizing direct AWS](https://docs.aws.amazon.com/iot/latest/developerguide/authorizing-direct-aws.html) page for documentation on how to get the AWS credentials, which then you can set to the `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS`, and `AWS_SESSION_TOKEN` environment variables. + +## PKCS#11 Connect + +This sample is similar to the [Basic Connect](#basic-connect), +but the private key for mutual TLS is stored on a PKCS#11 compatible smart card or Hardware Security Module (HSM) -source: `samples/Pkcs11PubSub` +WARNING: Unix only. Currently, TLS integration with PKCS#11 is only available on Unix devices. + +source: `samples/Pkcs11Connect` To run this sample using [SoftHSM2](https://www.opendnssec.org/softhsm/) as the PKCS#11 device: @@ -112,10 +217,32 @@ To run this sample using [SoftHSM2](https://www.opendnssec.org/softhsm/) as the 5) Now you can run the sample: ```sh - mvn compile exec:java -pl samples/Pkcs11PubSub -Dexec.mainClass=pkcs11pubsub.Pkcs11PubSub -Dexec.args='--endpoint --cert --ca_file --pkcs11_lib --pin --token_label --key_label ' + mvn compile exec:java -pl samples/Pkcs11Connect -Dexec.mainClass=pkcs11connect.Pkcs11Connect -Dexec.args='--endpoint --cert --ca_file --pkcs11_lib --pin --token_label --key_label ' ``` -## WindowsCertPubSub +Your Thing's [Policy](https://docs.aws.amazon.com/iot/latest/developerguide/iot-policies.html) must provide privileges for this sample to connect. + +
+(see sample policy) +
+{
+  "Version": "2012-10-17",
+  "Statement": [
+    {
+      "Effect": "Allow",
+      "Action": [
+        "iot:Connect"
+      ],
+      "Resource": [
+        "arn:aws:iot:region:account:client/test-*"
+      ]
+    }
+  ]
+}
+
+
+ +## WindowsCert Connect WARNING: Windows only @@ -133,7 +260,7 @@ If your certificate and private key are in a [TPM](https://docs.microsoft.com/en-us/windows/security/information-protection/tpm/trusted-platform-module-overview), you would use them by passing their certificate store path. -source: `samples/WindowsCertPubSub` +source: `samples/WindowsCertConnect` To run this sample with a basic certificate from AWS IoT Core: @@ -175,9 +302,30 @@ To run this sample with a basic certificate from AWS IoT Core: 4) Now you can run the sample: ```sh - mvn compile exec:java -pl samples/WindowsCertPubSub "-Dexec.mainClass=windowscertpubsub.WindowsCertPubSub" "-Dexec.args=--endpoint xxxx-ats.iot.xxxx.amazonaws.com --cert CurrentUser\MY\A11F8A9B5DF5B98BA3508FBCA575D09570E0D2C6 --rootca AmazonRootCA1.pem" + mvn compile exec:java -pl samples/WindowsCertConnect "-Dexec.mainClass=windowscertconnect.WindowsCertConnect" "-Dexec.args=--endpoint --cert --ca_file " ``` +Your Thing's [Policy](https://docs.aws.amazon.com/iot/latest/developerguide/iot-policies.html) must provide privileges for this sample to connect. + +
+(see sample policy) +
+{
+  "Version": "2012-10-17",
+  "Statement": [
+    {
+      "Effect": "Allow",
+      "Action": [
+        "iot:Connect"
+      ],
+      "Resource": [
+        "arn:aws:iot:region:account:client/test-*"
+      ]
+    }
+  ]
+}
+
+
## Shadow @@ -204,7 +352,7 @@ Source: `samples/Shadow` To Run: ``` sh -mvn compile exec:java -pl samples/Shadow -Dexec.mainClass=shadow.ShadowSample -Dexec.args='--endpoint --ca_file /path/to/AmazonRootCA1.pem --cert --key --thing_name ' +mvn compile exec:java -pl samples/Shadow -Dexec.mainClass=shadow.ShadowSample -Dexec.args='--endpoint --ca_file --cert --key --thing_name ' ``` Your Thing's @@ -280,7 +428,7 @@ Source: `samples/Jobs` To Run: ``` sh -mvn compile exec:java -pl samples/Jobs -Dexec.mainClass=jobs.JobsSample -Dexec.args='--endpoint --ca_file /path/to/AmazonRootCA1.pem --cert --key --thing_name ' +mvn compile exec:java -pl samples/Jobs -Dexec.mainClass=jobs.JobsSample -Dexec.args='--endpoint --ca_file --cert --key --thing_name ' ``` Your Thing's @@ -356,15 +504,15 @@ cd ~/samples/Identity Run the sample using CreateKeysAndCertificate: ``` sh -mvn compile exec:java -pl samples/Identity -Dexec.mainClass="identity.FleetProvisioningSample" -Dexec.args="--endpoint --ca_file ---cert --key --template_name --template_parameters " +mvn compile exec:java -pl samples/Identity -Dexec.mainClass="identity.FleetProvisioningSample" -Dexec.args="--endpoint --ca_file +--cert --key --template_name