20
20
package org .elasticsearch .gateway ;
21
21
22
22
import org .elasticsearch .ElasticSearchException ;
23
- import org .elasticsearch .ElasticSearchInterruptedException ;
24
23
import org .elasticsearch .cluster .*;
25
24
import org .elasticsearch .cluster .block .ClusterBlock ;
26
25
import org .elasticsearch .cluster .block .ClusterBlockLevel ;
30
29
import org .elasticsearch .cluster .metadata .MetaData ;
31
30
import org .elasticsearch .cluster .metadata .MetaDataCreateIndexService ;
32
31
import org .elasticsearch .cluster .node .DiscoveryNodes ;
33
- import org .elasticsearch .common .Nullable ;
34
32
import org .elasticsearch .common .component .AbstractLifecycleComponent ;
35
33
import org .elasticsearch .common .inject .Inject ;
36
34
import org .elasticsearch .common .settings .Settings ;
41
39
import java .io .IOException ;
42
40
import java .util .Map ;
43
41
import java .util .concurrent .CountDownLatch ;
44
- import java .util .concurrent .TimeUnit ;
45
42
import java .util .concurrent .atomic .AtomicBoolean ;
46
43
import java .util .concurrent .atomic .AtomicInteger ;
47
44
@@ -66,7 +63,6 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
66
63
67
64
private final MetaDataCreateIndexService createIndexService ;
68
65
69
- private final TimeValue initialStateTimeout ;
70
66
private final TimeValue recoverAfterTime ;
71
67
private final int recoverAfterNodes ;
72
68
private final int expectedNodes ;
@@ -86,7 +82,6 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
86
82
this .discoveryService = discoveryService ;
87
83
this .createIndexService = createIndexService ;
88
84
this .threadPool = threadPool ;
89
- this .initialStateTimeout = componentSettings .getAsTime ("initial_state_timeout" , TimeValue .timeValueSeconds (30 ));
90
85
// allow to control a delay of when indices will get created
91
86
this .recoverAfterTime = componentSettings .getAsTime ("recover_after_time" , null );
92
87
this .recoverAfterNodes = componentSettings .getAsInt ("recover_after_nodes" , -1 );
@@ -115,24 +110,24 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
115
110
} else if (recoverAfterMasterNodes != -1 && nodes .masterNodes ().size () < recoverAfterMasterNodes ) {
116
111
logger .debug ("not recovering from gateway, nodes_size (master) [" + nodes .masterNodes ().size () + "] < recover_after_master_nodes [" + recoverAfterMasterNodes + "]" );
117
112
} else {
118
- boolean ignoreTimeout ;
113
+ boolean ignoreRecoverAfterTime ;
119
114
if (expectedNodes == -1 && expectedMasterNodes == -1 && expectedDataNodes == -1 ) {
120
115
// no expected is set, don't ignore the timeout
121
- ignoreTimeout = false ;
116
+ ignoreRecoverAfterTime = false ;
122
117
} else {
123
118
// one of the expected is set, see if all of them meet the need, and ignore the timeout in this case
124
- ignoreTimeout = true ;
119
+ ignoreRecoverAfterTime = true ;
125
120
if (expectedNodes != -1 && (nodes .masterAndDataNodes ().size () < expectedNodes )) { // does not meet the expected...
126
- ignoreTimeout = false ;
121
+ ignoreRecoverAfterTime = false ;
127
122
}
128
123
if (expectedMasterNodes != -1 && (nodes .masterNodes ().size () < expectedMasterNodes )) { // does not meet the expected...
129
- ignoreTimeout = false ;
124
+ ignoreRecoverAfterTime = false ;
130
125
}
131
126
if (expectedDataNodes != -1 && (nodes .dataNodes ().size () < expectedDataNodes )) { // does not meet the expected...
132
- ignoreTimeout = false ;
127
+ ignoreRecoverAfterTime = false ;
133
128
}
134
129
}
135
- performStateRecovery (initialStateTimeout , ignoreTimeout );
130
+ performStateRecovery (ignoreRecoverAfterTime );
136
131
}
137
132
}
138
133
} else {
@@ -151,7 +146,7 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
151
146
}
152
147
153
148
@ Override public void clusterChanged (final ClusterChangedEvent event ) {
154
- if (! lifecycle .started ()) {
149
+ if (lifecycle .stoppedOrClosed ()) {
155
150
return ;
156
151
}
157
152
if (event .localNodeMaster () && event .state ().blocks ().hasGlobalBlock (STATE_NOT_RECOVERED_BLOCK )) {
@@ -164,42 +159,37 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
164
159
} else if (recoverAfterMasterNodes != -1 && nodes .masterNodes ().size () < recoverAfterMasterNodes ) {
165
160
logger .debug ("not recovering from gateway, nodes_size (master) [" + nodes .masterNodes ().size () + "] < recover_after_master_nodes [" + recoverAfterMasterNodes + "]" );
166
161
} else {
167
- boolean ignoreTimeout ;
162
+ boolean ignoreRecoverAfterTime ;
168
163
if (expectedNodes == -1 && expectedMasterNodes == -1 && expectedDataNodes == -1 ) {
169
164
// no expected is set, don't ignore the timeout
170
- ignoreTimeout = false ;
165
+ ignoreRecoverAfterTime = false ;
171
166
} else {
172
167
// one of the expected is set, see if all of them meet the need, and ignore the timeout in this case
173
- ignoreTimeout = true ;
168
+ ignoreRecoverAfterTime = true ;
174
169
if (expectedNodes != -1 && (nodes .masterAndDataNodes ().size () < expectedNodes )) { // does not meet the expected...
175
- ignoreTimeout = false ;
170
+ ignoreRecoverAfterTime = false ;
176
171
}
177
172
if (expectedMasterNodes != -1 && (nodes .masterNodes ().size () < expectedMasterNodes )) { // does not meet the expected...
178
- ignoreTimeout = false ;
173
+ ignoreRecoverAfterTime = false ;
179
174
}
180
175
if (expectedDataNodes != -1 && (nodes .dataNodes ().size () < expectedDataNodes )) { // does not meet the expected...
181
- ignoreTimeout = false ;
176
+ ignoreRecoverAfterTime = false ;
182
177
}
183
178
}
184
- final boolean fIgnoreTimeout = ignoreTimeout ;
179
+ final boolean fIgnoreRecoverAfterTime = ignoreRecoverAfterTime ;
185
180
threadPool .cached ().execute (new Runnable () {
186
181
@ Override public void run () {
187
- performStateRecovery (null , fIgnoreTimeout );
182
+ performStateRecovery (fIgnoreRecoverAfterTime );
188
183
}
189
184
});
190
185
}
191
186
}
192
187
}
193
188
194
- private void performStateRecovery (@ Nullable TimeValue timeout ) {
195
- performStateRecovery (null , false );
196
- }
197
-
198
- private void performStateRecovery (@ Nullable TimeValue timeout , boolean ignoreTimeout ) {
199
- final CountDownLatch latch = new CountDownLatch (1 );
200
- final Gateway .GatewayStateRecoveredListener recoveryListener = new GatewayRecoveryListener (latch );
189
+ private void performStateRecovery (boolean ignoreRecoverAfterTime ) {
190
+ final Gateway .GatewayStateRecoveredListener recoveryListener = new GatewayRecoveryListener (new CountDownLatch (1 ));
201
191
202
- if (!ignoreTimeout && recoverAfterTime != null ) {
192
+ if (!ignoreRecoverAfterTime && recoverAfterTime != null ) {
203
193
if (scheduledRecovery .compareAndSet (false , true )) {
204
194
logger .debug ("delaying initial state recovery for [{}]" , recoverAfterTime );
205
195
threadPool .schedule (recoverAfterTime , ThreadPool .Names .CACHED , new Runnable () {
@@ -215,14 +205,6 @@ private void performStateRecovery(@Nullable TimeValue timeout, boolean ignoreTim
215
205
gateway .performStateRecovery (recoveryListener );
216
206
}
217
207
}
218
-
219
- if (timeout != null ) {
220
- try {
221
- latch .await (timeout .millis (), TimeUnit .MILLISECONDS );
222
- } catch (InterruptedException e ) {
223
- throw new ElasticSearchInterruptedException (e .getMessage (), e );
224
- }
225
- }
226
208
}
227
209
228
210
class GatewayRecoveryListener implements Gateway .GatewayStateRecoveredListener {
0 commit comments