Skip to content

Commit fd8014f

Browse files
Added support for custom routing key inside AMQP consumer.
1 parent 4069114 commit fd8014f

File tree

4 files changed

+21
-14
lines changed

4 files changed

+21
-14
lines changed

logback-amqp-appender/src/test/java/ch/qos/logback/amqp/tests/AmqpAppenderTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,11 @@ public final void testAppender ()
3737
for (int index = 0; index < AmqpAppenderTests.messageCount; index++) {
3838
MDC.put (DefaultMutator.applicationKey, String.format ("app-%d", index % 3 + 1));
3939
MDC.put (DefaultMutator.componentKey, String.format ("comp-%d", index % 2 + 1));
40-
testLogger.error (UUID.randomUUID ().toString ());
40+
final String message = UUID.randomUUID ().toString ();
41+
if (index % 4 != 0)
42+
testLogger.error (message);
43+
else
44+
testLogger.error (message, new Throwable (message));
4145
}
4246

4347
realLogger.debug ("waiting for message draining (i.e. until their all sent)");

logback-amqp-consumer/src/main/java/ch/qos/logback/amqp/AmqpConsumer.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@ public final class AmqpConsumer
1818
{
1919
public AmqpConsumer (
2020
final String host, final Integer port, final String virtualHost, final String username, final String password,
21-
final String[] exchanges, final String queue, final Callbacks callbacks,
21+
final String exchange, final String queue, final String routingKey, final Callbacks callbacks,
2222
final LinkedBlockingQueue<AmqpMessage> sink)
2323
{
2424
super (host, port, virtualHost, username, password, callbacks);
25-
this.exchanges = exchanges;
25+
this.exchange = exchange;
2626
this.queue = queue;
27+
this.routingKey = routingKey;
2728
this.queue1 = null;
2829
this.sink = sink;
2930
}
@@ -98,14 +99,14 @@ private final void consume (final Envelope envelope, final BasicProperties prope
9899
private final boolean declare ()
99100
{
100101
final Channel channel = this.getChannel ();
101-
for (final String exchange : this.exchanges) {
102-
this.callbacks.handleLogEvent (Level.INFO, null, "amqp consumer declaring the exchange `%s`", exchange);
102+
{
103+
this.callbacks.handleLogEvent (Level.INFO, null, "amqp consumer declaring the exchange `%s`", this.exchange);
103104
try {
104-
channel.exchangeDeclare (exchange, "topic", true, false, null);
105+
channel.exchangeDeclare (this.exchange, "topic", true, false, null);
105106
} catch (final Throwable exception) {
106107
this.callbacks.handleException (
107108
exception, "amqp consumer encountered an error while declaring the exchange `%s`; aborting!",
108-
exchange);
109+
this.exchange);
109110
return (false);
110111
}
111112
}
@@ -129,16 +130,16 @@ private final boolean declare ()
129130
return (false);
130131
}
131132
}
132-
for (final String exchange : this.exchanges) {
133+
{
133134
this.callbacks.handleLogEvent (
134-
Level.INFO, null, "amqp consumer binding the queue `%s` to exchange `%s`", this.queue1, exchange);
135+
Level.INFO, null, "amqp consumer binding the queue `%s` to exchange `%s` with routing key `%s`", this.queue1, this.exchange, this.routingKey);
135136
try {
136-
channel.queueBind (this.queue1, exchange, "#", null);
137+
channel.queueBind (this.queue1, this.exchange, this.routingKey, null);
137138
} catch (final Throwable exception) {
138139
this.callbacks.handleException (
139140
exception,
140-
"amqp consumer encountered an error while binding the queue `%s` to exchange `%s`; aborting!",
141-
this.queue1, exchange);
141+
"amqp consumer encountered an error while binding the queue `%s` to exchange `%s` with routing key `%s`; aborting!",
142+
this.queue1, this.exchange, this.routingKey);
142143
return (false);
143144
}
144145
}
@@ -159,8 +160,9 @@ private final boolean register ()
159160
}
160161
}
161162

162-
private final String[] exchanges;
163+
private final String exchange;
163164
private final String queue;
165+
private final String routingKey;
164166
private String queue1;
165167
private final LinkedBlockingQueue<AmqpMessage> sink;
166168

logback-amqp-consumer/src/main/java/ch/qos/logback/amqp/AmqpConsumerAgent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ public final void run ()
237237
this.consumer =
238238
new AmqpConsumer (
239239
this.host, this.port, this.virtualHost, this.username, this.password,
240-
new String[] {this.exchange}, this.queue, this.callbacks, this.buffer);
240+
this.exchange, this.queue, this.routingKey, this.callbacks, this.buffer);
241241
this.consumer.start ();
242242
this.isStarted = true;
243243
this.postStart ();

logback-amqp-consumer/src/test/config/logback-consumer.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
<password>guest</password>
1313
<exchange>logback</exchange>
1414
<queue></queue>
15+
<routingKey>#</routingKey>
1516
</amqpConsumerAgent>
1617

1718
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">

0 commit comments

Comments
 (0)