diff --git a/iot/api-client/end_to_end_example/cloudiot_pubsub_example_mqtt_device.py b/iot/api-client/end_to_end_example/cloudiot_pubsub_example_mqtt_device.py index c61efb39a9b..3e92bbaa898 100644 --- a/iot/api-client/end_to_end_example/cloudiot_pubsub_example_mqtt_device.py +++ b/iot/api-client/end_to_end_example/cloudiot_pubsub_example_mqtt_device.py @@ -11,16 +11,16 @@ # See the License for the specific language governing permissions and # limitations under the License. r"""Sample device that consumes configuration from Google Cloud IoT. -This example represents a simple device with a temperature sensor and a fan -(simulated with software). When the device's fan is turned on, its temperature +This example represents a simple device with a humidity sensor and a fan +(simulated with software). When the device's fan is turned on, its humidity decreases by one degree per second, and when the device's fan is turned off, -its temperature increases by one degree per second. +its humidity increases by one degree per second. -Every second, the device publishes its temperature reading to Google Cloud IoT -Core. The server meanwhile receives these temperature readings, and decides +Every second, the device publishes its humidity reading to Google Cloud IoT +Core. The server meanwhile receives these humidity readings, and decides whether to re-configure the device to turn its fan on or off. The server will -instruct the device to turn the fan on when the device's temperature exceeds 10 -degrees, and to turn it off when the device's temperature is less than 0 +instruct the device to turn the fan on when the device's humidity exceeds 10 +degrees, and to turn it off when the device's humidity is less than 0 degrees. In a real system, one could use the cloud to compute the optimal thresholds for turning on and off the fan, but for illustrative purposes we use a simple threshold model. @@ -75,19 +75,19 @@ class Device(object): """Represents the state of a single device.""" def __init__(self): - self.temperature = 0 + self.humidity = 0 self.fan_on = False self.connected = False def update_sensor_data(self): """Pretend to read the device's sensor data. - If the fan is on, assume the temperature decreased one degree, + If the fan is on, assume the humidity decreased one degree, otherwise assume that it increased one degree. """ if self.fan_on: - self.temperature -= 1 + self.humidity -= 1 else: - self.temperature += 1 + self.humidity += 1 def wait_for_connection(self, timeout): """Wait for the device to become connected.""" @@ -134,7 +134,9 @@ def on_message(self, unused_client, unused_userdata, message): # The config is passed in the payload of the message. In this example, # the server sends a serialized JSON string. - data = json.loads(payload) + json_payload = payload.decode("utf-8") + print('BEN: Data:',json_payload) + data = json.loads(json_payload) if data['fan_on'] != self.fan_on: # If changing the state of the fan, print a message and # update the internal state. @@ -219,12 +221,13 @@ def main(): client.on_subscribe = device.on_subscribe client.on_message = device.on_message - client.connect(args.mqtt_bridge_hostname, args.mqtt_bridge_port) + print('BEN',args.mqtt_bridge_hostname,"p:"+str(args.mqtt_bridge_port)) + client.connect(args.mqtt_bridge_hostname, int(args.mqtt_bridge_port)) client.loop_start() # This is the topic that the device will publish telemetry events - # (temperature data) to. + # (humidity data) to. mqtt_telemetry_topic = '/devices/{}/events'.format(args.device_id) # This is the topic that the device will receive configuration updates on. @@ -236,15 +239,15 @@ def main(): # Subscribe to the config topic. client.subscribe(mqtt_config_topic, qos=1) - # Update and publish temperature readings at a rate of one per second. + # Update and publish humidity readings at a rate of one per second. for _ in range(args.num_messages): # In an actual device, this would read the device's sensors. Here, - # you update the temperature based on whether the fan is on. + # you update the humidity based on whether the fan is on. device.update_sensor_data() - # Report the device's temperature to the server by serializing it + # Report the device's humidity to the server by serializing it # as a JSON string. - payload = json.dumps({'temperature': device.temperature}) + payload = json.dumps({'humidity': device.humidity, 'timestamp': time.time()}) print('Publishing payload', payload) client.publish(mqtt_telemetry_topic, payload, qos=1) # Send events every second. diff --git a/iot/api-client/end_to_end_example/cloudiot_pubsub_example_server.py b/iot/api-client/end_to_end_example/cloudiot_pubsub_example_server.py index 88a95ee765e..7bbe507d505 100644 --- a/iot/api-client/end_to_end_example/cloudiot_pubsub_example_server.py +++ b/iot/api-client/end_to_end_example/cloudiot_pubsub_example_server.py @@ -46,6 +46,7 @@ from googleapiclient import discovery from googleapiclient.errors import HttpError from oauth2client.service_account import ServiceAccountCredentials +from google.cloud import bigquery API_SCOPES = ['https://www.googleapis.com/auth/cloud-platform'] @@ -58,8 +59,11 @@ class Server(object): """Represents the state of the server.""" def __init__(self, service_account_json): - credentials = ServiceAccountCredentials.from_json_keyfile_name( - service_account_json, API_SCOPES) + from google.auth import compute_engine + credentials = compute_engine.Credentials() + #credentials = ServiceAccountCredentials.from_json_keyfile_name( + # service_account_json, API_SCOPES) + if not credentials: sys.exit('Could not load service account credential ' 'from {}'.format(service_account_json)) @@ -73,6 +77,13 @@ def __init__(self, service_account_json): credentials=credentials, cache_discovery=False) + self._bigquery_client = bigquery.Client() + dataset_id = 'IoT' + table_id = 'humidity' + # Set the destination table + self._table = self._bigquery_client.get_table( + self._bigquery_client.dataset(dataset_id).table(table_id)) + # Used to serialize the calls to the # modifyCloudToDeviceConfig REST method. This is needed # because the google-api-python-client library is built on top @@ -81,22 +92,34 @@ def __init__(self, service_account_json): # api-client-library/python/guide/thread_safety self._update_config_mutex = Lock() + def _insertToBigQuery(self, device_id, humidity, timestamp): + row = (humidity, timestamp, device_id) + errors = self._bigquery_client.insert_rows(self._table, [row]) # API request + + if(errors != []): + print('Error write into bigquery: {}\n data: {}'.format(errors,row)) + else: + print('Success writing into bigquery data: {}'.format(errors,row)) + return + def _update_device_config(self, project_id, region, registry_id, device_id, data): """Push the data to the given device as configuration.""" + print('The device ({}) Got data: {}'.format(device_id,str(data))) + self._insertToBigQuery(device_id, data['humidity'], data['timestamp']) config_data = None - print('The device ({}) has a temperature ' - 'of: {}'.format(device_id, data['temperature'])) - if data['temperature'] < 0: + print('The device ({}) has a humidity ' + 'of: {}'.format(device_id, data['humidity'])) + if data['humidity'] > 50: # Turn off the fan. - config_data = {'fan_on': False} - print('Setting fan state for device', device_id, 'to off.') - elif data['temperature'] > 10: + config_data = {'water_on': False} + print('Setting water', device_id, 'to on.') + elif data['humidity'] <= 50: # Turn on the fan - config_data = {'fan_on': True} - print('Setting fan state for device', device_id, 'to on.') + config_data = {'water_on': True} + print('Setting water', device_id, 'to off.') else: - # Temperature is OK, don't need to push a new config. + # humidity is OK, don't need to push a new config. return config_data_json = json.dumps(config_data) @@ -152,7 +175,9 @@ def callback(message): subscribed topic. """ try: - data = json.loads(message.data) + json_data = message.data.decode("utf-8") + print('BEN: Data:',json_data) + data = json.loads(json_data) except ValueError as e: print('Loading Payload ({}) threw an Exception: {}.'.format( message.data, e)) diff --git a/iot/api-client/end_to_end_example/requirements.txt b/iot/api-client/end_to_end_example/requirements.txt index 9f040c42636..d79d0b00a8b 100644 --- a/iot/api-client/end_to_end_example/requirements.txt +++ b/iot/api-client/end_to_end_example/requirements.txt @@ -1,6 +1,7 @@ cryptography==2.2.2 google-api-python-client==1.7.4 google-auth-httplib2==0.0.3 +google-cloud-bigquery==1.5.0 google-auth==1.5.1 google-cloud-pubsub==0.38.0 pyjwt==1.6.1