2424import argparse
2525import datetime
2626import os
27+ import random
2728import time
2829
2930import jwt
3031import paho .mqtt .client as mqtt
3132
3233
34+ # The initial backoff time after a disconnection occurs, in seconds.
35+ minimum_backoff_time = 1
36+
37+ # The maximum backoff time before giving up, in seconds.
38+ MAXIMUM_BACKOFF_TIME = 32
39+
40+ # Whether to wait with exponential backoff before publishing.
41+ should_backoff = False
42+
43+
3344# [START iot_mqtt_jwt]
3445def create_jwt (project_id , private_key_file , algorithm ):
3546 """Creates a JWT (https://jwt.io) to establish an MQTT connection.
@@ -76,11 +87,22 @@ def on_connect(unused_client, unused_userdata, unused_flags, rc):
7687 """Callback for when a device connects."""
7788 print ('on_connect' , mqtt .connack_string (rc ))
7889
90+ # After a successful connect, reset backoff time and stop backing off.
91+ global should_backoff
92+ global minimum_backoff_time
93+ should_backoff = False
94+ minimum_backoff_time = 1
95+
7996
8097def on_disconnect (unused_client , unused_userdata , rc ):
8198 """Paho callback for when a device disconnects."""
8299 print ('on_disconnect' , error_str (rc ))
83100
101+ # Since a disconnect occurred, the next loop iteration will wait with
102+ # exponential backoff.
103+ global should_backoff
104+ should_backoff = True
105+
84106
85107def on_publish (unused_client , unused_userdata , unused_mid ):
86108 """Paho callback when a message is sent to the broker."""
@@ -134,9 +156,6 @@ def get_client(
134156 # Subscribe to the config topic.
135157 client .subscribe (mqtt_config_topic , qos = 1 )
136158
137- # Start the network loop.
138- client .loop_start ()
139-
140159 return client
141160# [END iot_mqtt_config]
142161
@@ -199,6 +218,8 @@ def parse_command_line_args():
199218
200219# [START iot_mqtt_run]
201220def main ():
221+ global minimum_backoff_time
222+
202223 args = parse_command_line_args ()
203224
204225 # Publish to the events or state topic based on the flag.
@@ -215,6 +236,23 @@ def main():
215236
216237 # Publish num_messages mesages to the MQTT bridge once per second.
217238 for i in range (1 , args .num_messages + 1 ):
239+ # Process network events.
240+ client .loop ()
241+
242+ # Wait if backoff is required.
243+ if should_backoff :
244+ # If backoff time is too large, give up.
245+ if minimum_backoff_time > MAXIMUM_BACKOFF_TIME :
246+ print ('Exceeded maximum backoff time. Giving up.' )
247+ break
248+
249+ # Otherwise, wait and connect again.
250+ delay = minimum_backoff_time + random .randint (0 , 1000 ) / 1000.0
251+ print ('Waiting for {} before reconnecting.' .format (delay ))
252+ time .sleep (delay )
253+ minimum_backoff_time *= 2
254+ client .connect (args .mqtt_bridge_hostname , args .mqtt_bridge_port )
255+
218256 payload = '{}/{}-payload-{}' .format (
219257 args .registry_id , args .device_id , i )
220258 print ('Publishing message {}/{}: \' {}\' ' .format (
@@ -223,7 +261,6 @@ def main():
223261 seconds_since_issue = (datetime .datetime .utcnow () - jwt_iat ).seconds
224262 if seconds_since_issue > 60 * jwt_exp_mins :
225263 print ('Refreshing token after {}s' ).format (seconds_since_issue )
226- client .loop_stop ()
227264 jwt_iat = datetime .datetime .utcnow ()
228265 client = get_client (
229266 args .project_id , args .cloud_region ,
@@ -239,8 +276,6 @@ def main():
239276 # Send events every second. State should not be updated as often
240277 time .sleep (1 if args .message_type == 'event' else 5 )
241278
242- # End the network loop and finish.
243- client .loop_stop ()
244279 print ('Finished.' )
245280# [END iot_mqtt_run]
246281
0 commit comments