@@ -24,6 +24,7 @@ public AmqpConsumer (
24
24
super (host , port , virtualHost , username , password , callbacks );
25
25
this .exchanges = exchanges ;
26
26
this .queue = queue ;
27
+ this .queue1 = null ;
27
28
this .sink = sink ;
28
29
}
29
30
@@ -34,9 +35,7 @@ protected final void loop ()
34
35
synchronized (this ) {
35
36
if (this .shouldStopLoop ())
36
37
break loop ;
37
- if (!this .shouldReconnect ())
38
- break ;
39
- if (this .connect ())
38
+ if (this .reconnect ())
40
39
break ;
41
40
}
42
41
this .sleep ();
@@ -111,27 +110,38 @@ private final boolean declare ()
111
110
}
112
111
}
113
112
{
114
- this .callbacks .handleLogEvent (Level .INFO , null , "amqp consumer declaring the queue `%s`" , this .queue );
113
+ final String queue ;
114
+ final boolean unique ;
115
+ if ((this .queue == null ) || this .queue .isEmpty ()) {
116
+ if (this .queue1 != null )
117
+ queue = this .queue1 ;
118
+ else
119
+ queue = "" ;
120
+ unique = true ;
121
+ } else {
122
+ queue = this .queue ;
123
+ unique = false ;
124
+ }
125
+ this .callbacks .handleLogEvent (Level .INFO , null , "amqp consumer declaring the queue `%s`" , queue );
115
126
try {
116
- channel .queueDeclare (this . queue , true , false , false , null );
127
+ this . queue1 = channel .queueDeclare (queue , true , unique , unique , null ). getQueue ( );
117
128
} catch (final Throwable exception ) {
118
- this .callbacks
119
- .handleException (
120
- exception , "amqp consumer encountered an error while declaring the queue `%s`; aborting!" ,
121
- this .queue );
129
+ this .queue1 = null ;
130
+ this .callbacks .handleException (
131
+ exception , "amqp consumer encountered an error while declaring the queue `%s`; aborting!" , queue );
122
132
return (false );
123
133
}
124
134
}
125
135
for (final String exchange : this .exchanges ) {
126
136
this .callbacks .handleLogEvent (
127
- Level .INFO , null , "amqp consumer binding the queue `%s` to exchange `%s`" , this .queue , exchange );
137
+ Level .INFO , null , "amqp consumer binding the queue `%s` to exchange `%s`" , this .queue1 , exchange );
128
138
try {
129
- channel .queueBind (this .queue , exchange , "#" , null );
139
+ channel .queueBind (this .queue1 , exchange , "#" , null );
130
140
} catch (final Throwable exception ) {
131
141
this .callbacks .handleException (
132
142
exception ,
133
143
"amqp consumer encountered an error while binding the queue `%s` to exchange `%s`; aborting!" ,
134
- this .queue , exchange );
144
+ this .queue1 , exchange );
135
145
return (false );
136
146
}
137
147
}
@@ -143,7 +153,7 @@ private final boolean register ()
143
153
this .callbacks .handleLogEvent (Level .INFO , null , "amqp consumer registering the consumer" );
144
154
final Channel channel = this .getChannel ();
145
155
try {
146
- channel .basicConsume (this .queue , true , this .queue , true , true , null , new ConsumerCallback ());
156
+ channel .basicConsume (this .queue1 , true , this .queue1 , true , true , null , new ConsumerCallback ());
147
157
return (true );
148
158
} catch (final Throwable exception ) {
149
159
this .callbacks .handleException (
@@ -154,6 +164,7 @@ private final boolean register ()
154
164
155
165
private final String [] exchanges ;
156
166
private final String queue ;
167
+ private String queue1 ;
157
168
private final LinkedBlockingQueue <AmqpMessage > sink ;
158
169
159
170
private final class ConsumerCallback
0 commit comments