29
29
import jwt
30
30
import paho .mqtt .client as mqtt
31
31
32
+
32
33
# [START iot_mqtt_jwt]
33
34
def create_jwt (project_id , private_key_file , algorithm ):
34
35
"""Creates a JWT (https://jwt.io) to establish an MQTT connection.
@@ -64,6 +65,8 @@ def create_jwt(project_id, private_key_file, algorithm):
64
65
return jwt .encode (token , private_key , algorithm = algorithm )
65
66
# [END iot_mqtt_jwt]
66
67
68
+
69
+ # [START iot_mqtt_config]
67
70
def error_str (rc ):
68
71
"""Convert a Paho error to a human readable string."""
69
72
return '{}: {}' .format (rc , mqtt .error_string (rc ))
@@ -84,6 +87,46 @@ def on_publish(unused_client, unused_userdata, unused_mid):
84
87
print ('on_publish' )
85
88
86
89
90
+ def get_client (
91
+ project_id , cloud_region , registry_id , device_id , private_key_file ,
92
+ algorithm , ca_certs , mqtt_bridge_hostname , mqtt_bridge_port ):
93
+ """Create our MQTT client. The client_id is a unique string that identifies
94
+ this device. For Google Cloud IoT Core, it must be in the format below."""
95
+ client = mqtt .Client (
96
+ client_id = ('projects/{}/locations/{}/registries/{}/devices/{}'
97
+ .format (
98
+ project_id ,
99
+ cloud_region ,
100
+ registry_id ,
101
+ device_id )))
102
+
103
+ # With Google Cloud IoT Core, the username field is ignored, and the
104
+ # password field is used to transmit a JWT to authorize the device.
105
+ client .username_pw_set (
106
+ username = 'unused' ,
107
+ password = create_jwt (
108
+ project_id , private_key_file , algorithm ))
109
+
110
+ # Enable SSL/TLS support.
111
+ client .tls_set (ca_certs = ca_certs )
112
+
113
+ # Register message callbacks. https://eclipse.org/paho/clients/python/docs/
114
+ # describes additional callbacks that Paho supports. In this example, the
115
+ # callbacks just print to standard out.
116
+ client .on_connect = on_connect
117
+ client .on_publish = on_publish
118
+ client .on_disconnect = on_disconnect
119
+
120
+ # Connect to the Google MQTT bridge.
121
+ client .connect (mqtt_bridge_hostname , mqtt_bridge_port )
122
+
123
+ # Start the network loop.
124
+ client .loop_start ()
125
+
126
+ return client
127
+ # [END iot_mqtt_config]
128
+
129
+
87
130
def parse_command_line_args ():
88
131
"""Parse command line arguments."""
89
132
parser = argparse .ArgumentParser (description = (
@@ -131,57 +174,48 @@ def parse_command_line_args():
131
174
default = 8883 ,
132
175
type = int ,
133
176
help = 'MQTT bridge port.' )
177
+ parser .add_argument (
178
+ '--jwt_expires_minutes' ,
179
+ default = 20 ,
180
+ type = int ,
181
+ help = ('Expiration time, in minutes, for JWT tokens.' ))
134
182
135
183
return parser .parse_args ()
136
184
137
185
186
+ # [START iot_mqtt_run]
138
187
def main ():
139
188
args = parse_command_line_args ()
140
189
141
- # Create our MQTT client. The client_id is a unique string that identifies
142
- # this device. For Google Cloud IoT Core, it must be in the format below.
143
- client = mqtt .Client (
144
- client_id = ('projects/{}/locations/{}/registries/{}/devices/{}'
145
- .format (
146
- args .project_id ,
147
- args .cloud_region ,
148
- args .registry_id ,
149
- args .device_id )))
150
-
151
- # With Google Cloud IoT Core, the username field is ignored, and the
152
- # password field is used to transmit a JWT to authorize the device.
153
- client .username_pw_set (
154
- username = 'unused' ,
155
- password = create_jwt (
156
- args .project_id , args .private_key_file , args .algorithm ))
157
-
158
- # Enable SSL/TLS support.
159
- client .tls_set (ca_certs = args .ca_certs )
160
-
161
- # Register message callbacks. https://eclipse.org/paho/clients/python/docs/
162
- # describes additional callbacks that Paho supports. In this example, the
163
- # callbacks just print to standard out.
164
- client .on_connect = on_connect
165
- client .on_publish = on_publish
166
- client .on_disconnect = on_disconnect
167
-
168
- # Connect to the Google MQTT bridge.
169
- client .connect (args .mqtt_bridge_hostname , args .mqtt_bridge_port )
170
-
171
- # Start the network loop.
172
- client .loop_start ()
173
-
174
190
# Publish to the events or state topic based on the flag.
175
191
sub_topic = 'events' if args .message_type == 'event' else 'state'
176
192
177
193
mqtt_topic = '/devices/{}/{}' .format (args .device_id , sub_topic )
178
194
195
+ jwt_iat = datetime .datetime .utcnow ()
196
+ jwt_exp_mins = args .jwt_expires_minutes
197
+ client = get_client (
198
+ args .project_id , args .cloud_region , args .registry_id , args .device_id ,
199
+ args .private_key_file , args .algorithm , args .ca_certs ,
200
+ args .mqtt_bridge_hostname , args .mqtt_bridge_port )
201
+
179
202
# Publish num_messages mesages to the MQTT bridge once per second.
180
203
for i in range (1 , args .num_messages + 1 ):
181
204
payload = '{}/{}-payload-{}' .format (
182
205
args .registry_id , args .device_id , i )
183
206
print ('Publishing message {}/{}: \' {}\' ' .format (
184
207
i , args .num_messages , payload ))
208
+ seconds_since_issue = (datetime .datetime .utcnow () - jwt_iat ).seconds
209
+ if seconds_since_issue > 60 * jwt_exp_mins :
210
+ print ('Refreshing token after {}s' ).format (seconds_since_issue )
211
+ client .loop_stop ()
212
+ jwt_iat = datetime .datetime .utcnow ()
213
+ client = get_client (
214
+ args .project_id , args .cloud_region ,
215
+ args .registry_id , args .device_id , args .private_key_file ,
216
+ args .algorithm , args .ca_certs , args .mqtt_bridge_hostname ,
217
+ args .mqtt_bridge_port )
218
+
185
219
# Publish "payload" to the MQTT topic. qos=1 means at least once
186
220
# delivery. Cloud IoT Core also supports qos=0 for at most once
187
221
# delivery.
@@ -193,6 +227,7 @@ def main():
193
227
# End the network loop and finish.
194
228
client .loop_stop ()
195
229
print ('Finished.' )
230
+ # [END iot_mqtt_run]
196
231
197
232
198
233
if __name__ == '__main__' :
0 commit comments