@@ -120,7 +120,8 @@ def publish_messages_with_custom_attributes(project_id, topic_name):
120120 data = data .encode ('utf-8' )
121121 # Add two attributes, origin and username, to the message
122122 future = publisher .publish (
123- topic_path , data , origin = 'python-sample' , username = 'gcp' )
123+ topic_path , data , origin = 'python-sample' , username = 'gcp'
124+ )
124125 print (future .result ())
125126
126127 print ('Published messages with custom attributes.' )
@@ -147,7 +148,7 @@ def publish_messages_with_futures(project_id, topic_name):
147148 future = publisher .publish (topic_path , data = data )
148149 print (future .result ())
149150
150- print (" Published messages with futures." )
151+ print (' Published messages with futures.' )
151152 # [END pubsub_publisher_concurrency_control]
152153
153154
@@ -171,17 +172,17 @@ def callback(f):
171172 try :
172173 print (f .result ())
173174 futures .pop (data )
174- except : # noqa
175- print ("Please handle {} for {}." .format (f .exception (), data ))
175+ except : # noqa
176+ print ('Please handle {} for {}.' .format (f .exception (), data ))
177+
176178 return callback
177179
178180 for i in range (10 ):
179181 data = str (i )
180182 futures .update ({data : None })
181183 # When you publish a message, the client returns a future.
182184 future = publisher .publish (
183- topic_path ,
184- data = data .encode ("utf-8" ), # data must be a bytestring.
185+ topic_path , data = data .encode ('utf-8' ) # data must be a bytestring.
185186 )
186187 futures [data ] = future
187188 # Publish failures shall be handled in the callback function.
@@ -191,7 +192,7 @@ def callback(f):
191192 while futures :
192193 time .sleep (5 )
193194
194- print (" Published message with error handler." )
195+ print (' Published message with error handler.' )
195196 # [END pubsub_publish_messages_error_handler]
196197
197198
@@ -207,7 +208,7 @@ def publish_messages_with_batch_settings(project_id, topic_name):
207208 # of data or one second has passed.
208209 batch_settings = pubsub_v1 .types .BatchSettings (
209210 max_bytes = 1024 , # One kilobyte
210- max_latency = 1 , # One second
211+ max_latency = 1 , # One second
211212 )
212213 publisher = pubsub_v1 .PublisherClient (batch_settings )
213214 topic_path = publisher .topic_path (project_id , topic_name )
@@ -223,7 +224,65 @@ def publish_messages_with_batch_settings(project_id, topic_name):
223224 # [END pubsub_publisher_batch_settings]
224225
225226
226- if __name__ == '__main__' :
227+ def publish_messages_with_retry_settings (project_id , topic_name ):
228+ """Publishes messages with custom retry settings."""
229+ # [START pubsub_publisher_retry_settings]
230+ from google .cloud import pubsub_v1
231+
232+ # TODO project_id = "Your Google Cloud Project ID"
233+ # TODO topic_name = "Your Pub/Sub topic name"
234+
235+ # Configure the retry settings. Defaults will be overwritten.
236+ retry_settings = {
237+ 'interfaces' : {
238+ 'google.pubsub.v1.Publisher' : {
239+ 'retry_codes' : {
240+ 'publish' : [
241+ 'ABORTED' ,
242+ 'CANCELLED' ,
243+ 'DEADLINE_EXCEEDED' ,
244+ 'INTERNAL' ,
245+ 'RESOURCE_EXHAUSTED' ,
246+ 'UNAVAILABLE' ,
247+ 'UNKNOWN' ,
248+ ]
249+ },
250+ 'retry_params' : {
251+ 'messaging' : {
252+ 'initial_retry_delay_millis' : 150 , # default: 100
253+ 'retry_delay_multiplier' : 1.5 , # default: 1.3
254+ 'max_retry_delay_millis' : 65000 , # default: 60000
255+ 'initial_rpc_timeout_millis' : 25000 , # default: 25000
256+ 'rpc_timeout_multiplier' : 1.0 , # default: 1.0
257+ 'max_rpc_timeout_millis' : 35000 , # default: 30000
258+ 'total_timeout_millis' : 650000 , # default: 600000
259+ }
260+ },
261+ 'methods' : {
262+ 'Publish' : {
263+ 'retry_codes_name' : 'publish' ,
264+ 'retry_params_name' : 'messaging' ,
265+ }
266+ },
267+ }
268+ }
269+ }
270+
271+ publisher = pubsub_v1 .PublisherClient (client_config = retry_settings )
272+ topic_path = publisher .topic_path (project_id , topic_name )
273+
274+ for n in range (1 , 10 ):
275+ data = u'Message number {}' .format (n )
276+ # Data must be a bytestring
277+ data = data .encode ('utf-8' )
278+ future = publisher .publish (topic_path , data = data )
279+ print (future .result ())
280+
281+ print ('Published messages with retry settings.' )
282+ # [END pubsub_publisher_retry_settings]
283+
284+
285+ if __name__ == "__main__" :
227286 parser = argparse .ArgumentParser (
228287 description = __doc__ ,
229288 formatter_class = argparse .RawDescriptionHelpFormatter
@@ -233,36 +292,47 @@ def publish_messages_with_batch_settings(project_id, topic_name):
233292 subparsers = parser .add_subparsers (dest = 'command' )
234293 subparsers .add_parser ('list' , help = list_topics .__doc__ )
235294
236- create_parser = subparsers .add_parser ('create' , help = create_topic .__doc__ )
295+ create_parser = subparsers .add_parser ('create' ,
296+ help = create_topic .__doc__ )
237297 create_parser .add_argument ('topic_name' )
238298
239- delete_parser = subparsers .add_parser ('delete' , help = delete_topic .__doc__ )
299+ delete_parser = subparsers .add_parser ('delete' ,
300+ help = delete_topic .__doc__ )
240301 delete_parser .add_argument ('topic_name' )
241302
242- publish_parser = subparsers .add_parser (
243- 'publish' , help = publish_messages .__doc__ )
303+ publish_parser = subparsers .add_parser ('publish' ,
304+ help = publish_messages .__doc__ )
244305 publish_parser .add_argument ('topic_name' )
245306
246307 publish_with_custom_attributes_parser = subparsers .add_parser (
247308 'publish-with-custom-attributes' ,
248- help = publish_messages_with_custom_attributes .__doc__ )
309+ help = publish_messages_with_custom_attributes .__doc__ ,
310+ )
249311 publish_with_custom_attributes_parser .add_argument ('topic_name' )
250312
251313 publish_with_futures_parser = subparsers .add_parser (
252- 'publish-with-futures' ,
253- help = publish_messages_with_futures . __doc__ )
314+ 'publish-with-futures' , help = publish_messages_with_futures . __doc__
315+ )
254316 publish_with_futures_parser .add_argument ('topic_name' )
255317
256318 publish_with_error_handler_parser = subparsers .add_parser (
257319 'publish-with-error-handler' ,
258- help = publish_messages_with_error_handler .__doc__ )
320+ help = publish_messages_with_error_handler .__doc__
321+ )
259322 publish_with_error_handler_parser .add_argument ('topic_name' )
260323
261324 publish_with_batch_settings_parser = subparsers .add_parser (
262325 'publish-with-batch-settings' ,
263- help = publish_messages_with_batch_settings .__doc__ )
326+ help = publish_messages_with_batch_settings .__doc__
327+ )
264328 publish_with_batch_settings_parser .add_argument ('topic_name' )
265329
330+ publish_with_retry_settings_parser = subparsers .add_parser (
331+ 'publish-with-retry-settings' ,
332+ help = publish_messages_with_retry_settings .__doc__
333+ )
334+ publish_with_retry_settings_parser .add_argument ('topic_name' )
335+
266336 args = parser .parse_args ()
267337
268338 if args .command == 'list' :
@@ -274,11 +344,13 @@ def publish_messages_with_batch_settings(project_id, topic_name):
274344 elif args .command == 'publish' :
275345 publish_messages (args .project_id , args .topic_name )
276346 elif args .command == 'publish-with-custom-attributes' :
277- publish_messages_with_custom_attributes (
278- args . project_id , args .topic_name )
347+ publish_messages_with_custom_attributes (args . project_id ,
348+ args .topic_name )
279349 elif args .command == 'publish-with-futures' :
280350 publish_messages_with_futures (args .project_id , args .topic_name )
281351 elif args .command == 'publish-with-error-handler' :
282352 publish_messages_with_error_handler (args .project_id , args .topic_name )
283353 elif args .command == 'publish-with-batch-settings' :
284354 publish_messages_with_batch_settings (args .project_id , args .topic_name )
355+ elif args .command == 'publish-with-retry-settings' :
356+ publish_messages_with_retry_settings (args .project_id , args .topic_name )
0 commit comments