5
5
namespace Enqueue \Dbal ;
6
6
7
7
use Doctrine \DBAL \Connection ;
8
- use Doctrine \DBAL \Exception \DeadlockException ;
8
+ use Doctrine \DBAL \Exception \RetryableException ;
9
9
use Doctrine \DBAL \ParameterType ;
10
10
use Doctrine \DBAL \Types \Type ;
11
11
use Ramsey \Uuid \Uuid ;
@@ -54,34 +54,36 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessa
54
54
;
55
55
56
56
while (microtime (true ) < $ endAt ) {
57
- $ result = $ select ->execute ()->fetch ();
58
- if (empty ($ result )) {
59
- return null ;
60
- }
61
-
62
- $ update
63
- ->setParameter ('messageId ' , $ result ['id ' ], Type::GUID )
64
- ;
65
-
66
- if ($ update ->execute ()) {
67
- $ deliveredMessage = $ this ->getConnection ()->createQueryBuilder ()
68
- ->select ('* ' )
69
- ->from ($ this ->getContext ()->getTableName ())
70
- ->andWhere ('delivery_id = :deliveryId ' )
71
- ->setParameter ('deliveryId ' , $ deliveryId ->getBytes (), Type::GUID )
72
- ->setMaxResults (1 )
73
- ->execute ()
74
- ->fetch ()
75
- ;
76
-
77
- // the message has been removed by a 3rd party, such as truncate operation.
78
- if (false == $ deliveredMessage ) {
79
- continue ;
57
+ try {
58
+ $ result = $ select ->execute ()->fetch ();
59
+ if (empty ($ result )) {
60
+ return null ;
80
61
}
81
62
82
- if ($ deliveredMessage ['redelivered ' ] || empty ($ deliveredMessage ['time_to_live ' ]) || $ deliveredMessage ['time_to_live ' ] > time ()) {
83
- return $ this ->getContext ()->convertMessage ($ deliveredMessage );
63
+ $ update
64
+ ->setParameter ('messageId ' , $ result ['id ' ], Type::GUID );
65
+
66
+ if ($ update ->execute ()) {
67
+ $ deliveredMessage = $ this ->getConnection ()->createQueryBuilder ()
68
+ ->select ('* ' )
69
+ ->from ($ this ->getContext ()->getTableName ())
70
+ ->andWhere ('delivery_id = :deliveryId ' )
71
+ ->setParameter ('deliveryId ' , $ deliveryId ->getBytes (), Type::GUID )
72
+ ->setMaxResults (1 )
73
+ ->execute ()
74
+ ->fetch ();
75
+
76
+ // the message has been removed by a 3rd party, such as truncate operation.
77
+ if (false === $ deliveredMessage ) {
78
+ continue ;
79
+ }
80
+
81
+ if ($ deliveredMessage ['redelivered ' ] || empty ($ deliveredMessage ['time_to_live ' ]) || $ deliveredMessage ['time_to_live ' ] > time ()) {
82
+ return $ this ->getContext ()->convertMessage ($ deliveredMessage );
83
+ }
84
84
}
85
+ } catch (RetryableException $ e ) {
86
+ // maybe next time we'll get more luck
85
87
}
86
88
}
87
89
@@ -111,7 +113,7 @@ protected function redeliverMessages(): void
111
113
$ update ->execute ();
112
114
113
115
$ this ->redeliverMessagesLastExecutedAt = microtime (true );
114
- } catch (DeadlockException $ e ) {
116
+ } catch (RetryableException $ e ) {
115
117
// maybe next time we'll get more luck
116
118
}
117
119
}
@@ -135,7 +137,7 @@ protected function removeExpiredMessages(): void
135
137
136
138
try {
137
139
$ delete ->execute ();
138
- } catch (DeadlockException $ e ) {
140
+ } catch (RetryableException $ e ) {
139
141
// maybe next time we'll get more luck
140
142
}
141
143
0 commit comments