"
+
+For example, if your project ID is `blue-jet-123`, your device registry id is
+`my-registry`, your device id is `my-device` and you have generated your
+credentials using the [`generate_keys.sh`](../generate_keys.sh) script
+provided in the parent folder, you can run the sample as:
+
+ mvn exec:java \
+ -Dexec.mainClass="com.example.cloud.iot.endtoend.CloudiotPubsubExampleMqttDevice" \
+ -Dexec.args="-project_id=blue-jet-123 \
+ -registry_id=my-registry \
+ -device_id=my-device \
+ -private_key_file=../rsa_private_pkcs8 \
+ -algorithm=RS256"
diff --git a/iot/api-client/end-to-end-example/pom.xml b/iot/api-client/end-to-end-example/pom.xml
new file mode 100644
index 00000000000..29f08246186
--- /dev/null
+++ b/iot/api-client/end-to-end-example/pom.xml
@@ -0,0 +1,121 @@
+
+
+ 4.0.0
+ com.example.cloud
+ cloudiot-manager-demo
+ jar
+ 1.0
+ cloudiot-manager-demo
+ http://maven.apache.org
+
+
+
+ com.google.cloud.samples
+ shared-configuration
+ 1.0.9
+
+
+ 1.8
+ 1.8
+
+
+
+
+ org.eclipse.paho
+ org.eclipse.paho.client.mqttv3
+ 1.2.0
+
+
+ org.json
+ json
+ 20090211
+
+
+ io.jsonwebtoken
+ jjwt
+ 0.7.0
+
+
+ joda-time
+ joda-time
+ 2.1
+
+
+ com.google.apis
+ google-api-services-cloudiot
+ v1-rev20181120-1.27.0
+
+
+ com.google.cloud
+ google-cloud-pubsub
+ 1.53.0
+
+
+ com.google.oauth-client
+ google-oauth-client
+ 1.23.0
+
+
+ com.google.guava
+ guava
+ 23.0
+
+
+ com.google.api-client
+ google-api-client
+ 1.23.0
+
+
+ commons-cli
+ commons-cli
+ 1.3
+
+
+
+
+ junit
+ junit
+ 4.12
+ test
+
+
+ com.google.truth
+ truth
+ 0.34
+ test
+
+
+
+
+
+
+ maven-assembly-plugin
+
+
+
+ com.example.cloudiot.Manage
+
+
+
+ jar-with-dependencies
+
+
+
+
+
+
\ No newline at end of file
diff --git a/iot/api-client/end-to-end-example/src/main/java/com/example/cloud/iot/endtoend/CloudiotPubsubExampleMqttDevice.java b/iot/api-client/end-to-end-example/src/main/java/com/example/cloud/iot/endtoend/CloudiotPubsubExampleMqttDevice.java
new file mode 100644
index 00000000000..f84ca9b71b5
--- /dev/null
+++ b/iot/api-client/end-to-end-example/src/main/java/com/example/cloud/iot/endtoend/CloudiotPubsubExampleMqttDevice.java
@@ -0,0 +1,272 @@
+/*
+ * Copyright 2019 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.cloud.iot.endtoend;
+
+import io.jsonwebtoken.JwtBuilder;
+import io.jsonwebtoken.Jwts;
+import io.jsonwebtoken.SignatureAlgorithm;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.KeyFactory;
+import java.security.spec.PKCS8EncodedKeySpec;
+import java.util.Properties;
+
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.joda.time.DateTime;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Sample device that consumes configuration from Google Cloud IoT. This example represents a simple
+ * device with a temperature sensor and a fan (simulated with software). When the device's fan is
+ * turned on, its temperature decreases by one degree per second, and when the device's fan is
+ * turned off, its temperature increases by one degree per second.
+ *
+ * Every second, the device publishes its temperature reading to Google Cloud IoT Core. The
+ * server meanwhile receives these temperature readings, and decides whether to re-configure the
+ * device to turn its fan on or off. The server will instruct the device to turn the fan on when the
+ * device's temperature exceeds 10 degrees, and to turn it off when the device's temperature is less
+ * than 0 degrees. In a real system, one could use the cloud to compute the optimal thresholds for
+ * turning on and off the fan, but for illustrative purposes we use a simple threshold model.
+ *
+ *
To connect the device you must have downloaded Google's CA root certificates, and a copy of
+ * your private key file. See cloud.google.com/iot for instructions on how to do this. Run this
+ * script with the corresponding algorithm flag.
+ *
+ *
+ * $ mvn clean compile assembly:single
+ *
+ * $ mvn exec:java \
+ * -Dexec.mainClass="com.example.cloud.iot.endtoend.CloudiotPubsubExampleMqttDevice" \
+ * -Dexec.args="-project_id= \
+ * -registry_id= \
+ * -device_id= \
+ * -private_key_file= \
+ * -algorithm="
+ *
+ *
+ *
With a single server, you can run multiple instances of the device with different device ids,
+ * and the server will distinguish them. Try creating a few devices and running them all at the same
+ * time.
+ */
+public class CloudiotPubsubExampleMqttDevice {
+
+ /** Create a RSA-based JWT for the given project id, signed with the given private key. */
+ private static String createJwtRsa(String projectId, String privateKeyFile) throws Exception {
+ DateTime now = new DateTime();
+ // Create a JWT to authenticate this device. The device will be disconnected after the token
+ // expires, and will have to reconnect with a new token. The audience field should always be set
+ // to the GCP project id.
+ JwtBuilder jwtBuilder =
+ Jwts.builder()
+ .setIssuedAt(now.toDate())
+ .setExpiration(now.plusMinutes(20).toDate())
+ .setAudience(projectId);
+
+ byte[] keyBytes = Files.readAllBytes(Paths.get(privateKeyFile));
+ PKCS8EncodedKeySpec spec = new PKCS8EncodedKeySpec(keyBytes);
+ KeyFactory kf = KeyFactory.getInstance("RSA");
+
+ return jwtBuilder.signWith(SignatureAlgorithm.RS256, kf.generatePrivate(spec)).compact();
+ }
+
+ /** Create an ES-based JWT for the given project id, signed with the given private key. */
+ private static String createJwtEs(String projectId, String privateKeyFile) throws Exception {
+ DateTime now = new DateTime();
+ // Create a JWT to authenticate this device. The device will be disconnected after the token
+ // expires, and will have to reconnect with a new token. The audience field should always be set
+ // to the GCP project id.
+ JwtBuilder jwtBuilder =
+ Jwts.builder()
+ .setIssuedAt(now.toDate())
+ .setExpiration(now.plusMinutes(20).toDate())
+ .setAudience(projectId);
+
+ byte[] keyBytes = Files.readAllBytes(Paths.get(privateKeyFile));
+ PKCS8EncodedKeySpec spec = new PKCS8EncodedKeySpec(keyBytes);
+ KeyFactory kf = KeyFactory.getInstance("EC");
+
+ return jwtBuilder.signWith(SignatureAlgorithm.ES256, kf.generatePrivate(spec)).compact();
+ }
+
+ /** Represents the state of a single device. */
+ static class Device implements MqttCallback {
+ private int temperature;
+ private boolean isFanOn;
+ private boolean isConnected;
+
+ public Device(CloudiotPubsubExampleMqttDeviceOptions options) {
+ this.temperature = 0;
+ this.isFanOn = false;
+ this.isConnected = false;
+ }
+
+ /**
+ * Pretend to read the device's sensor data. If the fan is on, assume the temperature decreased
+ * one degree, otherwise assume that it increased one degree.
+ */
+ public void updateSensorData() {
+ if (this.isFanOn) {
+ this.temperature -= 1;
+ } else {
+ this.temperature += 1;
+ }
+ }
+
+ /** Wait for the device to become connected. */
+ public void waitForConnection(int timeOut) throws InterruptedException {
+ // Wait for the device to become connected.
+ int totalTime = 0;
+ while (!this.isConnected && totalTime < timeOut) {
+ Thread.sleep(1000);
+ totalTime += 1;
+ }
+
+ if (!this.isConnected) {
+ throw new RuntimeException("Could not connect to MQTT bridge.");
+ }
+ }
+
+ /** Callback when the device receives a PUBACK from the MQTT bridge. */
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ System.out.println("Published message acked.");
+ }
+
+ /** Callback when the device receives a message on a subscription. */
+ @Override
+ public void messageArrived(String topic, MqttMessage message) {
+ String payload = new String(message.getPayload());
+ System.out.println(
+ String.format(
+ "Received message %s on topic %s with Qos %d", payload, topic, message.getQos()));
+
+ // The device will receive its latest config when it subscribes to the
+ // config topic. If there is no configuration for the device, the device
+ // will receive a config with an empty payload.
+ if (payload == null || payload.length() == 0) {
+ return;
+ }
+
+ // The config is passed in the payload of the message. In this example,
+ // the server sends a serialized JSON string.
+ JSONObject data = null;
+ try {
+ data = new JSONObject(payload);
+ if (data.get("fan_on") instanceof Boolean && (Boolean) data.get("fan_on") != this.isFanOn) {
+ // If changing the state of the fan, print a message and
+ // update the internal state.
+ this.isFanOn = (Boolean) data.get("fan_on");
+ if (this.isFanOn) {
+ System.out.println("Fan turned on");
+ } else {
+ System.out.println("Fan turned off");
+ }
+ }
+ } catch (JSONException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /** Callback for when a device disconnects. */
+ @Override
+ public void connectionLost(Throwable cause) {
+ System.out.println("Disconnected: " + cause.getMessage());
+ this.isConnected = false;
+ }
+ }
+
+ /** Entry point for CLI. */
+ public static void main(String[] args) throws Exception {
+ CloudiotPubsubExampleMqttDeviceOptions options =
+ CloudiotPubsubExampleMqttDeviceOptions.fromFlags(args);
+ if (options == null) {
+ System.exit(1);
+ }
+ final Device device = new Device(options);
+ final String mqttTelemetryTopic = String.format("/devices/%s/events", options.deviceId);
+ // This is the topic that the device will receive configuration updates on.
+ final String mqttConfigTopic = String.format("/devices/%s/config", options.deviceId);
+
+ final String mqttServerAddress =
+ String.format("ssl://%s:%s", options.mqttBridgeHostname, options.mqttBridgePort);
+ final String mqttClientId =
+ String.format(
+ "projects/%s/locations/%s/registries/%s/devices/%s",
+ options.projectId, options.cloudRegion, options.registryId, options.deviceId);
+ MqttConnectOptions connectOptions = new MqttConnectOptions();
+ connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
+
+ Properties sslProps = new Properties();
+ sslProps.setProperty("com.ibm.ssl.protocol", "TLSv1.2");
+ connectOptions.setSSLProperties(sslProps);
+
+ connectOptions.setUserName("unused");
+ if (options.algorithm.equals("RS256")) {
+ System.out.println(options.privateKeyFile);
+
+ connectOptions.setPassword(
+ createJwtRsa(options.projectId, options.privateKeyFile).toCharArray());
+ System.out.println(
+ String.format(
+ "Creating JWT using RS256 from private key file %s", options.privateKeyFile));
+ } else if (options.algorithm.equals("ES256")) {
+ connectOptions.setPassword(
+ createJwtEs(options.projectId, options.privateKeyFile).toCharArray());
+ } else {
+ throw new IllegalArgumentException(
+ "Invalid algorithm " + options.algorithm + ". Should be one of 'RS256' or 'ES256'.");
+ }
+
+ device.isConnected = true;
+
+ MqttClient client = new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence());
+
+ try {
+ client.setCallback(device);
+ client.connect(connectOptions);
+ } catch (MqttException e) {
+ e.printStackTrace();
+ }
+
+ // wait for it to connect
+ device.waitForConnection(5);
+
+ client.subscribe(mqttConfigTopic, 1);
+
+ for (int i = 0; i < options.numMessages; i++) {
+ device.updateSensorData();
+
+ JSONObject payload = new JSONObject();
+ payload.put("temperature", device.temperature);
+ System.out.println("Publishing payload " + payload.toString());
+ MqttMessage message = new MqttMessage(payload.toString().getBytes());
+ message.setQos(1);
+ client.publish(mqttTelemetryTopic, message);
+ Thread.sleep(1000);
+ }
+ client.disconnect();
+
+ System.out.println("Finished looping successfully : " + options.mqttBridgeHostname);
+ }
+}
diff --git a/iot/api-client/end-to-end-example/src/main/java/com/example/cloud/iot/endtoend/CloudiotPubsubExampleMqttDeviceOptions.java b/iot/api-client/end-to-end-example/src/main/java/com/example/cloud/iot/endtoend/CloudiotPubsubExampleMqttDeviceOptions.java
new file mode 100644
index 00000000000..73bf82f709a
--- /dev/null
+++ b/iot/api-client/end-to-end-example/src/main/java/com/example/cloud/iot/endtoend/CloudiotPubsubExampleMqttDeviceOptions.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2019 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.cloud.iot.endtoend;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+
+/** Command line options for the Pubsub Mqtt device example. */
+public class CloudiotPubsubExampleMqttDeviceOptions {
+ String projectId;
+ String registryId;
+ String deviceId;
+ String privateKeyFile;
+ String algorithm;
+ String cloudRegion = "us-central1";
+ int numMessages = 100;
+ String mqttBridgeHostname = "mqtt.googleapis.com";
+ short mqttBridgePort = 8883; // if running from a Compute VM, use 443.
+ String messageType = "event";
+
+ static final Options options = new Options();
+
+ public static CloudiotPubsubExampleMqttDeviceOptions fromFlags(String[] args) {
+ // Required arguments
+ options.addOption(
+ Option.builder()
+ .type(String.class)
+ .longOpt("registry_id")
+ .hasArg()
+ .desc("Cloud IoT Core registry id.")
+ .required()
+ .build());
+ options.addOption(
+ Option.builder()
+ .type(String.class)
+ .longOpt("device_id")
+ .hasArg()
+ .desc("Cloud IoT Core device id.")
+ .required()
+ .build());
+ options.addOption(
+ Option.builder()
+ .type(String.class)
+ .longOpt("private_key_file")
+ .hasArg()
+ .desc("Path to private key file.")
+ .required()
+ .build());
+ options.addOption(
+ Option.builder()
+ .type(String.class)
+ .longOpt("algorithm")
+ .hasArg()
+ .desc("Encryption algorithm to use to generate the JWT. Either 'RS256' or 'ES256'.")
+ .required()
+ .build());
+ // Optional arguments
+ options.addOption(
+ Option.builder()
+ .type(String.class)
+ .longOpt("project_id")
+ .hasArg()
+ .desc("GCP cloud project name.")
+ .build());
+ options.addOption(
+ Option.builder()
+ .type(String.class)
+ .longOpt("cloud_region")
+ .hasArg()
+ .desc("GCP cloud region.")
+ .build());
+ options.addOption(
+ Option.builder()
+ .type(Number.class)
+ .longOpt("num_messages")
+ .hasArg()
+ .desc("Number of messages to publish.")
+ .build());
+ options.addOption(
+ Option.builder()
+ .type(String.class)
+ .longOpt("mqtt_bridge_hostname")
+ .hasArg()
+ .desc("MQTT bridge hostname.")
+ .build());
+ options.addOption(
+ Option.builder()
+ .type(Number.class)
+ .longOpt("mqtt_bridge_port") // this supports either 8883 or 443,
+ .hasArg() // if running on Cloud shell, use 443.
+ .desc("MQTT bridge port.")
+ .build());
+ options.addOption(
+ Option.builder()
+ .type(String.class)
+ .longOpt("message_type")
+ .hasArg()
+ .desc("Indicates whether the message is a telemetry event or a device state message")
+ .build());
+
+ CommandLineParser parser = new DefaultParser();
+ CommandLine commandLine;
+
+ try {
+ commandLine = parser.parse(options, args);
+ CloudiotPubsubExampleMqttDeviceOptions res = new CloudiotPubsubExampleMqttDeviceOptions();
+
+ res.registryId = commandLine.getOptionValue("registry_id");
+ res.deviceId = commandLine.getOptionValue("device_id");
+ res.privateKeyFile = commandLine.getOptionValue("private_key_file");
+ res.algorithm = commandLine.getOptionValue("algorithm");
+
+ if (commandLine.hasOption("project_id")) {
+ res.projectId = commandLine.getOptionValue("project_id");
+ } else {
+ try {
+ res.projectId = System.getenv("GOOGLE_CLOUD_PROJECT");
+ } catch (NullPointerException npe) {
+ res.projectId = System.getenv("GCLOUD_PROJECT");
+ }
+ }
+ if (commandLine.hasOption("cloud_region")) {
+ res.cloudRegion = commandLine.getOptionValue("cloud_region");
+ }
+ if (commandLine.hasOption("num_messages")) {
+ res.numMessages = ((Number) commandLine.getParsedOptionValue("num_messages")).intValue();
+ }
+ if (commandLine.hasOption("mqtt_bridge_hostname")) {
+ res.mqttBridgeHostname = commandLine.getOptionValue("mqtt_bridge_hostname");
+ }
+ if (commandLine.hasOption("mqtt_bridge_port")) {
+ res.mqttBridgePort =
+ ((Number) commandLine.getParsedOptionValue("mqtt_bridge_port")).shortValue();
+ }
+ if (commandLine.hasOption("message_type")) {
+ res.messageType = commandLine.getOptionValue("message_type");
+ }
+
+ return res;
+ } catch (ParseException e) {
+ System.err.println(e.getMessage());
+ return null;
+ }
+ }
+}
diff --git a/iot/api-client/end-to-end-example/src/main/java/com/example/cloud/iot/endtoend/CloudiotPubsubExampleServer.java b/iot/api-client/end-to-end-example/src/main/java/com/example/cloud/iot/endtoend/CloudiotPubsubExampleServer.java
new file mode 100644
index 00000000000..791f218b1f8
--- /dev/null
+++ b/iot/api-client/end-to-end-example/src/main/java/com/example/cloud/iot/endtoend/CloudiotPubsubExampleServer.java
@@ -0,0 +1,334 @@
+/*
+ * Copyright 2019 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.cloud.iot.endtoend;
+
+import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
+import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.services.cloudiot.v1.CloudIot;
+import com.google.api.services.cloudiot.v1.CloudIotScopes;
+import com.google.api.services.cloudiot.v1.model.Device;
+import com.google.api.services.cloudiot.v1.model.DeviceRegistry;
+import com.google.api.services.cloudiot.v1.model.EventNotificationConfig;
+import com.google.api.services.cloudiot.v1.model.GatewayConfig;
+import com.google.api.services.cloudiot.v1.model.ModifyCloudToDeviceConfigRequest;
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsub.v1.Subscriber;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.pubsub.v1.ProjectSubscriptionName;
+import com.google.pubsub.v1.PubsubMessage;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Sample server that pushes configuration to Google Cloud IoT devices.
+ *
+ *
This example represents a server that consumes telemetry data from multiple Cloud IoT devices.
+ * The devices report telemetry data, which the server consumes from a Cloud Pub/Sub topic. The
+ * server then decides whether to turn on or off individual devices fans.
+ *
+ *
If you are running this example from a Compute Engine VM, you will have to enable the Cloud
+ * Pub/Sub API for your project, which you can do from the Cloud Console. Create a pubsub topic, for
+ * example projects/my-project-id/topics/my-topic-name, and a subscription, for example
+ * projects/my-project-id/subscriptions/my-topic-subscription.
+ *
+ *
You can then run the example with
+ * $ mvn clean compile assembly:single
+ *
+ * $ mvn exec:java \
+ * -Dexec.mainClass="com.example.cloud.iot.endtoend.CloudiotPubsubExampleServer" \
+ * -Dexec.args="-project_id= \
+ * -pubsub_subscription=
+ */
+public class CloudiotPubsubExampleServer {
+
+ static final String APP_NAME = "CloudiotPubsubExampleServer";
+
+ CloudIot service;
+
+ /** Represents the state of the server. */
+ public CloudiotPubsubExampleServer() throws GeneralSecurityException, IOException {
+ GoogleCredential credential =
+ GoogleCredential.getApplicationDefault().createScoped(CloudIotScopes.all());
+ JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
+ HttpRequestInitializer init = new RetryHttpInitializerWrapper(credential);
+ this.service =
+ new CloudIot.Builder(GoogleNetHttpTransport.newTrustedTransport(), jsonFactory, init)
+ .setApplicationName(APP_NAME)
+ .build();
+ }
+
+ /** Create a registry for Cloud IoT. */
+ public static void createRegistry(
+ String cloudRegion, String projectId, String registryName, String pubsubTopicPath)
+ throws GeneralSecurityException, IOException {
+ GoogleCredential credential =
+ GoogleCredential.getApplicationDefault().createScoped(CloudIotScopes.all());
+ JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
+ HttpRequestInitializer init = new RetryHttpInitializerWrapper(credential);
+ final CloudIot service =
+ new CloudIot.Builder(GoogleNetHttpTransport.newTrustedTransport(), jsonFactory, init)
+ .setApplicationName(APP_NAME)
+ .build();
+
+ final String projectPath = "projects/" + projectId + "/locations/" + cloudRegion;
+ final String fullPubsubPath = "projects/" + projectId + "/topics/" + pubsubTopicPath;
+
+ DeviceRegistry registry = new DeviceRegistry();
+ EventNotificationConfig notificationConfig = new EventNotificationConfig();
+ notificationConfig.setPubsubTopicName(fullPubsubPath);
+ List notificationConfigs = new ArrayList();
+ notificationConfigs.add(notificationConfig);
+ registry.setEventNotificationConfigs(notificationConfigs);
+ registry.setId(registryName);
+
+ DeviceRegistry reg =
+ service.projects().locations().registries().create(projectPath, registry).execute();
+ System.out.println("Created registry: " + reg.getName());
+ }
+
+ /** Delete this registry from Cloud IoT. */
+ public static void deleteRegistry(String cloudRegion, String projectId, String registryName)
+ throws GeneralSecurityException, IOException {
+ GoogleCredential credential =
+ GoogleCredential.getApplicationDefault().createScoped(CloudIotScopes.all());
+ JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
+ HttpRequestInitializer init = new RetryHttpInitializerWrapper(credential);
+ final CloudIot service =
+ new CloudIot.Builder(GoogleNetHttpTransport.newTrustedTransport(), jsonFactory, init)
+ .setApplicationName(APP_NAME)
+ .build();
+
+ final String registryPath =
+ String.format(
+ "projects/%s/locations/%s/registries/%s", projectId, cloudRegion, registryName);
+
+ System.out.println("Deleting: " + registryPath);
+ service.projects().locations().registries().delete(registryPath).execute();
+ }
+
+ /** Delete this device from Cloud IoT. */
+ public static void deleteDevice(
+ String deviceId, String projectId, String cloudRegion, String registryName)
+ throws GeneralSecurityException, IOException {
+ GoogleCredential credential =
+ GoogleCredential.getApplicationDefault().createScoped(CloudIotScopes.all());
+ JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
+ HttpRequestInitializer init = new RetryHttpInitializerWrapper(credential);
+ final CloudIot service =
+ new CloudIot.Builder(GoogleNetHttpTransport.newTrustedTransport(), jsonFactory, init)
+ .setApplicationName(APP_NAME)
+ .build();
+
+ final String devicePath =
+ String.format(
+ "projects/%s/locations/%s/registries/%s/devices/%s",
+ projectId, cloudRegion, registryName, deviceId);
+
+ System.out.println("Deleting device " + devicePath);
+ service.projects().locations().registries().devices().delete(devicePath).execute();
+ }
+
+ /** Create a device to bind to a gateway. */
+ public static void createDevice(
+ String projectId, String cloudRegion, String registryName, String deviceId)
+ throws GeneralSecurityException, IOException {
+ // [START create_device]
+ GoogleCredential credential =
+ GoogleCredential.getApplicationDefault().createScoped(CloudIotScopes.all());
+ JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
+ HttpRequestInitializer init = new RetryHttpInitializerWrapper(credential);
+ final CloudIot service =
+ new CloudIot.Builder(GoogleNetHttpTransport.newTrustedTransport(), jsonFactory, init)
+ .setApplicationName(APP_NAME)
+ .build();
+
+ final String registryPath =
+ String.format(
+ "projects/%s/locations/%s/registries/%s", projectId, cloudRegion, registryName);
+
+ List devices =
+ service
+ .projects()
+ .locations()
+ .registries()
+ .devices()
+ .list(registryPath)
+ .setFieldMask("config,gatewayConfig")
+ .execute()
+ .getDevices();
+
+ if (devices != null) {
+ System.out.println("Found " + devices.size() + " devices");
+ for (Device d : devices) {
+ if ((d.getId() != null && d.getId().equals(deviceId))
+ || (d.getName() != null && d.getName().equals(deviceId))) {
+ System.out.println("Device exists, skipping.");
+ return;
+ }
+ }
+ }
+
+ System.out.println("Creating device with id: " + deviceId);
+ Device device = new Device();
+ device.setId(deviceId);
+
+ GatewayConfig gwConfig = new GatewayConfig();
+ gwConfig.setGatewayType("NON_GATEWAY");
+ gwConfig.setGatewayAuthMethod("ASSOCIATION_ONLY");
+
+ device.setGatewayConfig(gwConfig);
+ Device createdDevice =
+ service
+ .projects()
+ .locations()
+ .registries()
+ .devices()
+ .create(registryPath, device)
+ .execute();
+
+ System.out.println("Created device: " + createdDevice.toPrettyString());
+ // [END create_device]
+ }
+
+ /** Push the data to the given device as configuration. */
+ public void updateDeviceConfig(
+ String projectId, String region, String registryId, String deviceId, JSONObject data)
+ throws JSONException, UnsupportedEncodingException {
+ // Push the data to the given device as configuration.
+ JSONObject configData = new JSONObject();
+ System.out.println(
+ String.format("Device %s has temperature of: %d", deviceId, data.getInt("temperature")));
+ if (data.getInt("temperature") < 0) {
+ // Turn off the fan
+ configData.put("fan_on", false);
+ System.out.println("Setting fan state for device " + deviceId + " to off.");
+ } else if (data.getInt("temperature") > 10) {
+ // Turn on the fan
+ configData.put("fan_on", true);
+ System.out.println("Setting fan state for device " + deviceId + " to on.");
+ } else {
+ // temperature is okay, don't need to push new config
+ return;
+ }
+ // Data sent through the wire has to be base64 encoded.
+ Base64.Encoder encoder = Base64.getEncoder();
+ String encPayload = encoder.encodeToString(configData.toString().getBytes("UTF-8"));
+
+ ModifyCloudToDeviceConfigRequest request = new ModifyCloudToDeviceConfigRequest();
+ request.setBinaryData(encPayload);
+
+ String deviceName =
+ String.format(
+ "projects/%s/locations/%s/" + "registries/%s/devices/%s",
+ projectId, region, registryId, deviceId);
+
+ try {
+ service
+ .projects()
+ .locations()
+ .registries()
+ .devices()
+ .modifyCloudToDeviceConfig(deviceName, request)
+ .execute();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /** The main loop. Consumes messages from the Pub/Sub subscription. */
+ public void run(String projectId, String subscriptionId) {
+ ProjectSubscriptionName subscriptionName =
+ ProjectSubscriptionName.of(projectId, subscriptionId);
+ // Instantiate an asynchronous message receiver
+ MessageReceiver receiver =
+ new MessageReceiver() {
+ @Override
+ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
+ // handle incoming message, then ack/nack the received message
+ try {
+ JSONObject data = new JSONObject(message.getData().toStringUtf8());
+ String projectId = message.getAttributesOrThrow("projectId");
+ String region = message.getAttributesOrThrow("deviceRegistryLocation");
+ String registryId = message.getAttributesOrThrow("deviceRegistryId");
+ String deviceId = message.getAttributesOrThrow("deviceId");
+
+ CloudiotPubsubExampleServer.this.updateDeviceConfig(
+ projectId, region, registryId, deviceId, data);
+ consumer.ack();
+ } catch (JSONException e) {
+ e.printStackTrace();
+ } catch (UnsupportedEncodingException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ Subscriber subscriber = null;
+ try {
+ subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
+ subscriber.addListener(
+ new Subscriber.Listener() {
+ @Override
+ public void failed(Subscriber.State from, Throwable failure) {
+ // Handle failure. This is called when the Subscriber encountered a fatal error and is
+ // shutting down.
+ System.err.println(failure);
+ }
+ },
+ MoreExecutors.directExecutor());
+ subscriber.startAsync().awaitRunning();
+ System.out.println(
+ String.format("Listening for messages on %s", subscriber.getSubscriptionNameString()));
+ while (true) {
+ Thread.sleep(60000);
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } finally {
+ if (subscriber != null) {
+ subscriber.stopAsync().awaitTerminated();
+ }
+ }
+ }
+
+ /** Entry point for CLI. */
+ public static void main(String[] args) throws Exception {
+ CloudiotPubsubExampleServerOptions options = CloudiotPubsubExampleServerOptions.fromFlags(args);
+ if (options == null) {
+ System.exit(1);
+ }
+
+ CloudiotPubsubExampleServer server = new CloudiotPubsubExampleServer();
+ String projectId = options.projectId;
+ String pubsubscription = options.pubsubSubscription;
+ server.run(projectId, pubsubscription);
+ }
+}
diff --git a/iot/api-client/end-to-end-example/src/main/java/com/example/cloud/iot/endtoend/CloudiotPubsubExampleServerOptions.java b/iot/api-client/end-to-end-example/src/main/java/com/example/cloud/iot/endtoend/CloudiotPubsubExampleServerOptions.java
new file mode 100644
index 00000000000..0038fe0d458
--- /dev/null
+++ b/iot/api-client/end-to-end-example/src/main/java/com/example/cloud/iot/endtoend/CloudiotPubsubExampleServerOptions.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2019 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.cloud.iot.endtoend;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+
+public class CloudiotPubsubExampleServerOptions {
+ String projectId;
+ String pubsubSubscription;
+
+ static final Options options = new Options();
+
+ public static CloudiotPubsubExampleServerOptions fromFlags(String[] args) {
+ // Required arguments
+ options.addOption(
+ Option.builder()
+ .type(String.class)
+ .longOpt("pubsub_subscription")
+ .hasArg()
+ .desc("Google Cloud Pub/Sub subscription name.")
+ .required()
+ .build());
+
+ // Optional arguments
+ options.addOption(
+ Option.builder()
+ .type(String.class)
+ .longOpt("project_id")
+ .hasArg()
+ .desc("GCP cloud project name.")
+ .build());
+
+ CommandLineParser parser = new DefaultParser();
+ CommandLine commandLine;
+
+ try {
+ commandLine = parser.parse(options, args);
+ CloudiotPubsubExampleServerOptions res = new CloudiotPubsubExampleServerOptions();
+
+ if (commandLine.hasOption("project_id")) {
+ res.projectId = commandLine.getOptionValue("project_id");
+ } else {
+ try {
+ res.projectId = System.getenv("GOOGLE_CLOUD_PROJECT");
+ } catch (NullPointerException npe) {
+ res.projectId = System.getenv("GCLOUD_PROJECT");
+ }
+ }
+
+ if (commandLine.hasOption("pubsub_subscription")) {
+ res.pubsubSubscription = commandLine.getOptionValue("pubsub_subscription");
+ }
+
+ return res;
+ } catch (ParseException e) {
+ System.err.println(e.getMessage());
+ return null;
+ }
+ }
+}
diff --git a/iot/api-client/end-to-end-example/src/main/java/com/example/cloud/iot/endtoend/RetryHttpInitializerWrapper.java b/iot/api-client/end-to-end-example/src/main/java/com/example/cloud/iot/endtoend/RetryHttpInitializerWrapper.java
new file mode 100644
index 00000000000..91761156835
--- /dev/null
+++ b/iot/api-client/end-to-end-example/src/main/java/com/example/cloud/iot/endtoend/RetryHttpInitializerWrapper.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2019 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.cloud.iot.endtoend;
+
+import com.google.api.client.auth.oauth2.Credential;
+import com.google.api.client.http.HttpBackOffIOExceptionHandler;
+import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler;
+import com.google.api.client.http.HttpRequest;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.http.HttpResponse;
+import com.google.api.client.http.HttpUnsuccessfulResponseHandler;
+import com.google.api.client.util.ExponentialBackOff;
+import com.google.api.client.util.Sleeper;
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+import java.util.logging.Logger;
+
+/**
+ * RetryHttpInitializerWrapper will automatically retry upon RPC failures, preserving the
+ * auto-refresh behavior of the Google Credentials.
+ */
+public class RetryHttpInitializerWrapper implements HttpRequestInitializer {
+
+ /** A private logger. */
+ private static final Logger LOG = Logger.getLogger(RetryHttpInitializerWrapper.class.getName());
+
+ /** One minutes in milliseconds. */
+ private static final int ONE_MINUTE_MILLIS = 60 * 1000;
+
+ /**
+ * Intercepts the request for filling in the "Authorization" header field, as well as recovering
+ * from certain unsuccessful error codes wherein the Credential must refresh its token for a
+ * retry.
+ */
+ private final Credential wrappedCredential;
+
+ /** A sleeper; you can replace it with a mock in your test. */
+ private final Sleeper sleeper;
+
+ /**
+ * A constructor.
+ *
+ * @param wrappedCredential Credential which will be wrapped and used for providing auth header.
+ */
+ public RetryHttpInitializerWrapper(final Credential wrappedCredential) {
+ this(wrappedCredential, Sleeper.DEFAULT);
+ }
+
+ /**
+ * A protected constructor only for testing.
+ *
+ * @param wrappedCredential Credential which will be wrapped and used for providing auth header.
+ * @param sleeper Sleeper for easy testing.
+ */
+ RetryHttpInitializerWrapper(final Credential wrappedCredential, final Sleeper sleeper) {
+ this.wrappedCredential = Preconditions.checkNotNull(wrappedCredential);
+ this.sleeper = sleeper;
+ }
+
+ /** Initializes the given request. */
+ @Override
+ public final void initialize(final HttpRequest request) {
+ request.setReadTimeout(2 * ONE_MINUTE_MILLIS); // 2 minutes read timeout
+ final HttpUnsuccessfulResponseHandler backoffHandler =
+ new HttpBackOffUnsuccessfulResponseHandler(new ExponentialBackOff()).setSleeper(sleeper);
+ request.setInterceptor(wrappedCredential);
+ request.setUnsuccessfulResponseHandler(
+ new HttpUnsuccessfulResponseHandler() {
+ @Override
+ public boolean handleResponse(
+ final HttpRequest request, final HttpResponse response, final boolean supportsRetry)
+ throws IOException {
+ if (wrappedCredential.handleResponse(request, response, supportsRetry)) {
+ // If credential decides it can handle it, the return code or message indicated
+ // something specific to authentication, and no backoff is desired.
+ return true;
+ } else if (backoffHandler.handleResponse(request, response, supportsRetry)) {
+ // Otherwise, we defer to the judgment of our internal backoff handler.
+ LOG.info("Retrying " + request.getUrl().toString());
+ return true;
+ } else {
+ return false;
+ }
+ }
+ });
+ request.setIOExceptionHandler(
+ new HttpBackOffIOExceptionHandler(new ExponentialBackOff()).setSleeper(sleeper));
+ }
+}
diff --git a/iot/api-client/end-to-end-example/src/test/java/com/example/cloud/iot/endtoend/CloudiotPubsubExampleServerTest.java b/iot/api-client/end-to-end-example/src/test/java/com/example/cloud/iot/endtoend/CloudiotPubsubExampleServerTest.java
new file mode 100644
index 00000000000..cea282ddd89
--- /dev/null
+++ b/iot/api-client/end-to-end-example/src/test/java/com/example/cloud/iot/endtoend/CloudiotPubsubExampleServerTest.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2019 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.example.cloud.iot.endtoend;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.security.GeneralSecurityException;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for iot End to End sample. */
+@RunWith(JUnit4.class)
+@SuppressWarnings("checkstyle:abbreviationaswordinname")
+public class CloudiotPubsubExampleServerTest {
+ private ByteArrayOutputStream bout;
+ private PrintStream out;
+
+ private static final String CLOUD_REGION = "us-central1";
+ private static final String DEVICE_ID_TEMPLATE = "test-device-%s";
+ private static final String DEVICE_ID =
+ String.format(DEVICE_ID_TEMPLATE, System.currentTimeMillis() * 1000);
+ private static final String TOPIC_ID =
+ String.format("test-device-events-%d", System.currentTimeMillis() * 1000);
+ private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT");
+ private static final String REGISTRY_ID =
+ String.format("test-registry-%d", System.currentTimeMillis() * 1000);
+
+ @Before
+ public void setUp() throws Exception {
+ bout = new ByteArrayOutputStream();
+ out = new PrintStream(bout);
+ System.setOut(out);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ System.setOut(null);
+ }
+
+ @Test
+ public void testConfigTurnOn() throws GeneralSecurityException, IOException, JSONException {
+ int maxTemp = 11;
+ JSONObject data = new JSONObject();
+
+ // Set up
+ CloudiotPubsubExampleServer.createRegistry(CLOUD_REGION, PROJECT_ID, REGISTRY_ID, TOPIC_ID);
+ CloudiotPubsubExampleServer.createDevice(PROJECT_ID, CLOUD_REGION, REGISTRY_ID, DEVICE_ID);
+
+ data.put("temperature", maxTemp);
+ CloudiotPubsubExampleServer server = new CloudiotPubsubExampleServer();
+ server.updateDeviceConfig(PROJECT_ID, CLOUD_REGION, REGISTRY_ID, DEVICE_ID, data);
+ String got = bout.toString();
+ Assert.assertTrue(got.contains("on"));
+ Assert.assertTrue(got.contains("11"));
+ Assert.assertTrue(got.contains("test-device-"));
+
+ // Clean up
+ CloudiotPubsubExampleServer.deleteDevice(DEVICE_ID, PROJECT_ID, CLOUD_REGION, REGISTRY_ID);
+ CloudiotPubsubExampleServer.deleteRegistry(CLOUD_REGION, PROJECT_ID, REGISTRY_ID);
+ }
+
+ @Test
+ public void testConfigOff() throws GeneralSecurityException, IOException, JSONException {
+ int minTemp = -1;
+ JSONObject data = new JSONObject();
+
+ // Set up
+ CloudiotPubsubExampleServer.createRegistry(CLOUD_REGION, PROJECT_ID, REGISTRY_ID, TOPIC_ID);
+ CloudiotPubsubExampleServer.createDevice(PROJECT_ID, CLOUD_REGION, REGISTRY_ID, DEVICE_ID);
+
+ data.put("temperature", minTemp);
+
+ CloudiotPubsubExampleServer server = new CloudiotPubsubExampleServer();
+ server.updateDeviceConfig(PROJECT_ID, CLOUD_REGION, REGISTRY_ID, DEVICE_ID, data);
+ String got = bout.toString();
+ Assert.assertTrue(got.contains("off"));
+ Assert.assertTrue(got.contains("-1"));
+ Assert.assertTrue(got.contains("test-device-"));
+
+ // Clean up
+ CloudiotPubsubExampleServer.deleteDevice(DEVICE_ID, PROJECT_ID, CLOUD_REGION, REGISTRY_ID);
+ CloudiotPubsubExampleServer.deleteRegistry(CLOUD_REGION, PROJECT_ID, REGISTRY_ID);
+ }
+}