2
2
package ch .qos .logback .amqp ;
3
3
4
4
5
- import ch .qos .logback .amqp .tools .ExceptionHandler ;
5
+ import ch .qos .logback .amqp .tools .Callbacks ;
6
+ import ch .qos .logback .classic .Level ;
6
7
import com .rabbitmq .client .Channel ;
7
8
import com .rabbitmq .client .Connection ;
8
9
import com .rabbitmq .client .ConnectionFactory ;
@@ -14,15 +15,15 @@ public abstract class AmqpAccessor
14
15
{
15
16
protected AmqpAccessor (
16
17
final String host , final Integer port , final String virtualHost , final String username , final String password ,
17
- final ExceptionHandler exceptionHandler )
18
+ final Callbacks callbacks )
18
19
{
19
20
super ();
20
21
this .host = ((host != null ) && !host .isEmpty ()) ? host : "127.0.0.1" ;
21
22
this .port = ((port != null ) && (port != 0 )) ? port : 5672 ;
22
23
this .virtualHost = ((virtualHost != null ) && !virtualHost .isEmpty ()) ? virtualHost : "/" ;
23
24
this .username = ((username != null ) && !username .isEmpty ()) ? username : "guest" ;
24
25
this .password = ((password != null ) && !password .isEmpty ()) ? password : "guest" ;
25
- this .exceptionHandler = exceptionHandler ;
26
+ this .callbacks = callbacks ;
26
27
this .thread = null ;
27
28
this .shutdownHook = null ;
28
29
this .shouldStopLoop = true ;
@@ -38,7 +39,7 @@ public final boolean isConnected ()
38
39
}
39
40
}
40
41
41
- public final boolean isStarted ()
42
+ public final boolean isRunning ()
42
43
{
43
44
synchronized (this ) {
44
45
return ((this .thread != null ) && (this .thread .isAlive ()));
@@ -50,12 +51,15 @@ public final void start ()
50
51
synchronized (this ) {
51
52
if (this .thread != null )
52
53
throw (new IllegalStateException ("amqp accessor is already started" ));
54
+ this .callbacks .handleLogEvent (Level .INFO , null , "amqp accessor starting" );
53
55
this .thread = new Thread (new Runnable () {
54
56
public final void run ()
55
57
{
58
+ AmqpAccessor .this .callbacks .handleLogEvent (Level .INFO , null , "amqp accessor started" );
56
59
AmqpAccessor .this .loop ();
57
60
if (AmqpAccessor .this .shutdownHook != null )
58
61
Runtime .getRuntime ().removeShutdownHook (AmqpAccessor .this .shutdownHook );
62
+ AmqpAccessor .this .callbacks .handleLogEvent (Level .INFO , null , "amqp accessor stopped" );
59
63
}
60
64
});
61
65
this .thread .setName (String .format ("%s@%x" , this .getClass ().getName (), System .identityHashCode (this )));
@@ -64,9 +68,10 @@ public final void run ()
64
68
public final void run ()
65
69
{
66
70
AmqpAccessor .this .shutdownHook = null ;
67
- if (AmqpAccessor .this .isStarted ()) {
68
- AmqpAccessor .this .stop ();
69
- while (AmqpAccessor .this .isStarted ())
71
+ if (AmqpAccessor .this .isRunning ()) {
72
+ if (!AmqpAccessor .this .shouldStopLoop )
73
+ AmqpAccessor .this .stop ();
74
+ while (AmqpAccessor .this .isRunning ())
70
75
try {
71
76
Thread .sleep (AmqpAccessor .waitTimeout );
72
77
} catch (final InterruptedException exception ) {
@@ -86,6 +91,7 @@ public final void stop ()
86
91
synchronized (this ) {
87
92
if (this .thread == null )
88
93
throw (new IllegalStateException ("amqp accessor is not started" ));
94
+ this .callbacks .handleLogEvent (Level .INFO , null , "amqp accessor stopping" );
89
95
this .shouldStopLoop = true ;
90
96
}
91
97
}
@@ -95,6 +101,9 @@ protected final boolean connect ()
95
101
synchronized (this ) {
96
102
if (this .connection != null )
97
103
throw (new IllegalStateException ("amqp accessor is already connected" ));
104
+ this .callbacks .handleLogEvent (
105
+ Level .INFO , null , "amqp accessor connecting to `%s@%s:%s:%s`" , this .username , this .host , this .port ,
106
+ this .virtualHost );
98
107
this .shouldReconnect = true ;
99
108
final ConnectionFactory connectionFactory = new ConnectionFactory ();
100
109
connectionFactory .setHost (this .host );
@@ -106,16 +115,15 @@ protected final boolean connect ()
106
115
this .connection = connectionFactory .newConnection ();
107
116
} catch (final Throwable exception ) {
108
117
this .connection = null ;
109
- this .exceptionHandler .handleException (
110
- "amqp accessor encountered an error while connecting; aborting!" , exception );
118
+ this .callbacks .handleException (exception , "amqp accessor encountered an error while connecting; aborting!" );
111
119
}
112
120
if (this .connection != null )
113
121
try {
114
122
this .channel = this .connection .createChannel ();
115
123
} catch (final Throwable exception ) {
116
124
this .channel = null ;
117
- this .exceptionHandler .handleException (
118
- "amqp accessor encountered an error while opening a channel; aborting!" , exception );
125
+ this .callbacks .handleException (
126
+ exception , "amqp accessor encountered an error while opening the channel; aborting!" );
119
127
}
120
128
if ((this .connection == null ) || (this .channel == null )) {
121
129
if (this .connection != null )
@@ -126,12 +134,15 @@ protected final boolean connect ()
126
134
public void shutdownCompleted (final ShutdownSignalException exception )
127
135
{
128
136
AmqpAccessor .this .shouldReconnect = true ;
129
- if (!exception .isInitiatedByApplication ())
130
- AmqpAccessor .this .exceptionHandler .handleException (
131
- "amqp consumer encountered an shutdown error; ignoring!" , exception );
137
+ if (!exception .isInitiatedByApplication ()) {
138
+ AmqpAccessor .this .callbacks .handleException (
139
+ exception , "amqp consumer encountered an shutdown error; ignoring!" );
140
+ AmqpAccessor .this .disconnect ();
141
+ }
132
142
}
133
143
});
134
144
this .shouldReconnect = false ;
145
+ this .callbacks .handleLogEvent (Level .INFO , null , "amqp accessor connected" );
135
146
}
136
147
return (this .connection != null );
137
148
}
@@ -141,6 +152,7 @@ protected final void disconnect ()
141
152
{
142
153
if (this .connection == null )
143
154
throw (new IllegalStateException ("amqp accessor is not connected" ));
155
+ this .callbacks .handleLogEvent (Level .INFO , null , "amqp accessor disconnecting" );
144
156
this .shouldReconnect = true ;
145
157
try {
146
158
try {
@@ -150,8 +162,7 @@ protected final void disconnect ()
150
162
this .connection .close ();
151
163
}
152
164
} catch (final Throwable exception ) {
153
- this .exceptionHandler .handleException (
154
- "amqp accessor encountered an error while disconnecting; ignoring!" , exception );
165
+ this .callbacks .handleException (exception , "amqp accessor encountered an error while disconnecting; ignoring!" );
155
166
} finally {
156
167
this .connection = null ;
157
168
this .channel = null ;
@@ -191,7 +202,7 @@ protected final void sleep ()
191
202
} catch (final InterruptedException exception ) {}
192
203
}
193
204
194
- protected final ExceptionHandler exceptionHandler ;
205
+ protected final Callbacks callbacks ;
195
206
protected final String host ;
196
207
protected final String password ;
197
208
protected final int port ;
0 commit comments