5
5
import java .util .concurrent .LinkedBlockingQueue ;
6
6
import java .util .concurrent .TimeUnit ;
7
7
8
+ import ch .qos .logback .core .spi .FilterReply ;
9
+
8
10
import ch .qos .logback .amqp .tools .Callbacks ;
9
11
import ch .qos .logback .amqp .tools .DefaultBinarySerializer ;
10
12
import ch .qos .logback .amqp .tools .DefaultContextAwareCallbacks ;
13
15
import ch .qos .logback .classic .Logger ;
14
16
import ch .qos .logback .classic .spi .ILoggingEvent ;
15
17
import ch .qos .logback .core .Context ;
18
+ import ch .qos .logback .core .filter .Filter ;
16
19
import ch .qos .logback .core .spi .ContextAwareBase ;
17
20
import ch .qos .logback .core .spi .LifeCycle ;
18
21
import org .slf4j .LoggerFactory ;
@@ -32,6 +35,7 @@ public AmqpConsumerAgent ()
32
35
this .queue = AmqpConsumerAgent .defaultQueue ;
33
36
this .routingKey = AmqpConsumerAgent .defaultRoutingKey ;
34
37
this .serializer = new DefaultBinarySerializer ();
38
+ this .filter = null ;
35
39
this .thread = null ;
36
40
this .shutdownHook = null ;
37
41
this .shouldStop = true ;
@@ -44,6 +48,11 @@ public final String getExchange ()
44
48
return (this .exchange );
45
49
}
46
50
51
+ public final Filter <ILoggingEvent > getFilter ()
52
+ {
53
+ return (this .filter );
54
+ }
55
+
47
56
public final String getHost ()
48
57
{
49
58
return (this .host );
@@ -124,6 +133,15 @@ public final void setExchange (final String exchange)
124
133
}
125
134
}
126
135
136
+ public final void setFilter (final Filter <ILoggingEvent > filter )
137
+ {
138
+ synchronized (this ) {
139
+ if (this .thread != null )
140
+ throw (new IllegalStateException ("amqp appender is already started" ));
141
+ this .filter = filter ;
142
+ }
143
+ }
144
+
127
145
public final void setHost (final String host )
128
146
{
129
147
synchronized (this ) {
@@ -236,8 +254,8 @@ public final void run ()
236
254
this .thread .start ();
237
255
this .consumer =
238
256
new AmqpConsumer (
239
- this .host , this .port , this .virtualHost , this .username , this .password ,
240
- this .exchange , this . queue , this . routingKey , this .callbacks , this .buffer );
257
+ this .host , this .port , this .virtualHost , this .username , this .password , this . exchange , this . queue ,
258
+ this .routingKey , this .callbacks , this .buffer );
241
259
this .consumer .start ();
242
260
this .isStarted = true ;
243
261
this .postStart ();
@@ -299,6 +317,9 @@ private final void loop ()
299
317
continue ;
300
318
}
301
319
try {
320
+ if (this .filter != null )
321
+ if (this .filter .decide (event ) == FilterReply .DENY )
322
+ continue ;
302
323
this .process (event );
303
324
} catch (final Throwable exception ) {
304
325
this .addError (
@@ -311,6 +332,7 @@ private final void loop ()
311
332
private final LinkedBlockingQueue <AmqpMessage > buffer ;
312
333
private AmqpConsumer consumer ;
313
334
private String exchange ;
335
+ private Filter <ILoggingEvent > filter ;
314
336
private String host ;
315
337
private boolean isStarted ;
316
338
private String password ;
0 commit comments