Skip to content

Commit 11c80b7

Browse files
committed
Non-data master nodes and non-master data nodes fail to store data, closes elastic#579.
1 parent f6a1131 commit 11c80b7

File tree

7 files changed

+89
-12
lines changed

7 files changed

+89
-12
lines changed

modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.common.inject.Inject;
3434
import org.elasticsearch.common.settings.Settings;
3535
import org.elasticsearch.gateway.GatewayService;
36+
import org.elasticsearch.indices.IndexMissingException;
3637
import org.elasticsearch.threadpool.ThreadPool;
3738
import org.elasticsearch.timer.TimerService;
3839
import org.elasticsearch.transport.TransportService;
@@ -65,8 +66,8 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
6566
return new ClusterHealthResponse();
6667
}
6768

68-
@Override protected ClusterHealthResponse masterOperation(ClusterHealthRequest request, ClusterState state) throws ElasticSearchException {
69-
int waitFor = 4;
69+
@Override protected ClusterHealthResponse masterOperation(ClusterHealthRequest request, ClusterState unusedState) throws ElasticSearchException {
70+
int waitFor = 5;
7071
if (request.waitForStatus() == null) {
7172
waitFor--;
7273
}
@@ -79,14 +80,19 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
7980
if (request.waitForNodes().isEmpty()) {
8081
waitFor--;
8182
}
83+
if (request.indices().length == 0) { // check that they actually exists in the meta data
84+
waitFor--;
85+
}
8286
if (waitFor == 0) {
8387
// no need to wait for anything
84-
return clusterHealth(request);
88+
ClusterState clusterState = clusterService.state();
89+
return clusterHealth(request, clusterState);
8590
}
8691
long endTime = System.currentTimeMillis() + request.timeout().millis();
8792
while (true) {
8893
int waitForCounter = 0;
89-
ClusterHealthResponse response = clusterHealth(request);
94+
ClusterState clusterState = clusterService.state();
95+
ClusterHealthResponse response = clusterHealth(request, clusterState);
9096
if (request.waitForStatus() != null && response.status().value() <= request.waitForStatus().value()) {
9197
waitForCounter++;
9298
}
@@ -96,6 +102,14 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
96102
if (request.waitForActiveShards() != -1 && response.activeShards() >= request.waitForActiveShards()) {
97103
waitForCounter++;
98104
}
105+
if (request.indices().length > 0) {
106+
try {
107+
clusterState.metaData().concreteIndices(request.indices());
108+
waitForCounter++;
109+
} catch (IndexMissingException e) {
110+
// missing indices, wait a bit more...
111+
}
112+
}
99113
if (!request.waitForNodes().isEmpty()) {
100114
if (request.waitForNodes().startsWith(">=")) {
101115
int expected = Integer.parseInt(request.waitForNodes().substring(2));
@@ -161,14 +175,13 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
161175
}
162176
}
163177

164-
private ClusterHealthResponse clusterHealth(ClusterHealthRequest request) {
165-
ClusterState clusterState = clusterService.state();
178+
private ClusterHealthResponse clusterHealth(ClusterHealthRequest request, ClusterState clusterState) {
166179
RoutingTableValidation validation = clusterState.routingTable().validate(clusterState.metaData());
167180
ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(), validation.failures());
168181
response.numberOfNodes = clusterState.nodes().size();
169182
response.numberOfDataNodes = clusterState.nodes().dataNodes().size();
170183

171-
for (String index : clusterState.metaData().concreteIndices(request.indices())) {
184+
for (String index : clusterState.metaData().concreteIndicesIgnoreMissing(request.indices())) {
172185
IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(index);
173186
IndexMetaData indexMetaData = clusterState.metaData().index(index);
174187
if (indexRoutingTable == null) {

modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public class TransportClusterStateAction extends TransportMasterNodeOperationAct
8585
}
8686

8787
if (request.filteredIndices().length > 0) {
88-
String[] indices = currentState.metaData().concreteIndices(request.filteredIndices(), true);
88+
String[] indices = currentState.metaData().concreteIndicesIgnoreMissing(request.filteredIndices());
8989
for (String filteredIndex : indices) {
9090
IndexMetaData indexMetaData = currentState.metaData().index(filteredIndex);
9191
if (indexMetaData != null) {

modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,20 @@ public String[] getConcreteAllIndices() {
138138
return concreteAllIndices();
139139
}
140140

141+
/**
142+
* Translates the provided indices (possibly aliased) into actual indices.
143+
*/
141144
public String[] concreteIndices(String[] indices) throws IndexMissingException {
142145
return concreteIndices(indices, false);
143146
}
144147

148+
/**
149+
* Translates the provided indices (possibly aliased) into actual indices.
150+
*/
151+
public String[] concreteIndicesIgnoreMissing(String[] indices) {
152+
return concreteIndices(indices, true);
153+
}
154+
145155
/**
146156
* Translates the provided indices (possibly aliased) into actual indices.
147157
*/

modules/elasticsearch/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@
4242
*/
4343
public class DiscoveryNode implements Streamable, Serializable {
4444

45+
public static boolean nodeRequiresLocalStorage(Settings settings) {
46+
return !(settings.getAsBoolean("node.client", false) || (!settings.getAsBoolean("node.data", true) && !settings.getAsBoolean("node.master", true)));
47+
}
48+
4549
public static Map<String, String> buildCommonNodesAttributes(Settings settings) {
4650
Map<String, String> attributes = Maps.newHashMap(settings.getByPrefix("node.").getAsMap());
4751
if (attributes.containsKey("client")) {

modules/elasticsearch/src/main/java/org/elasticsearch/env/NodeEnvironment.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.lucene.store.Lock;
2323
import org.apache.lucene.store.NativeFSLockFactory;
2424
import org.elasticsearch.ElasticSearchIllegalStateException;
25+
import org.elasticsearch.cluster.node.DiscoveryNode;
2526
import org.elasticsearch.common.component.AbstractComponent;
2627
import org.elasticsearch.common.inject.Inject;
2728
import org.elasticsearch.common.settings.Settings;
@@ -45,8 +46,7 @@ public class NodeEnvironment extends AbstractComponent {
4546
@Inject public NodeEnvironment(Settings settings, Environment environment) throws IOException {
4647
super(settings);
4748

48-
if (!settings.getAsBoolean("node.data", true) || settings.getAsBoolean("node.client", false) ||
49-
!settings.getAsBoolean("node.master", true)) {
49+
if (!DiscoveryNode.nodeRequiresLocalStorage(settings)) {
5050
nodeFile = null;
5151
lock = null;
5252
localNodeId = -1;

modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ private synchronized void lazyInitialize() {
354354
initialized = true;
355355

356356
// if this is not a possible master node or data node, bail, we won't save anything here...
357-
if (!clusterService.localNode().masterNode() || !clusterService.localNode().dataNode()) {
357+
if (!clusterService.localNode().masterNode() && !clusterService.localNode().dataNode()) {
358358
location = null;
359359
} else {
360360
// create the location where the state will be stored

modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/local/LocalGatewayIndexStateTests.java

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.common.logging.ESLogger;
3131
import org.elasticsearch.common.logging.Loggers;
3232
import org.elasticsearch.common.xcontent.XContentFactory;
33+
import org.elasticsearch.env.NodeEnvironment;
3334
import org.elasticsearch.gateway.Gateway;
3435
import org.elasticsearch.node.internal.InternalNode;
3536
import org.elasticsearch.test.integration.AbstractNodesTests;
@@ -52,7 +53,9 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests {
5253
if (node("node" + i) != null) {
5354
node("node" + i).stop();
5455
// since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well
55-
((InternalNode) node("node" + i)).injector().getInstance(Gateway.class).reset();
56+
if (((InternalNode) node("node" + i)).injector().getInstance(NodeEnvironment.class).hasNodeFile()) {
57+
((InternalNode) node("node" + i)).injector().getInstance(Gateway.class).reset();
58+
}
5659
}
5760
}
5861
closeAllNodes();
@@ -221,4 +224,51 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests {
221224
logger.info("--> indexing a simple document");
222225
client("node1").prepareIndex("test", "type1", "2").setSource("field1", "value1").execute().actionGet();
223226
}
227+
228+
@Test public void testJustMasterNode() throws Exception {
229+
logger.info("--> cleaning nodes");
230+
buildNode("node1", settingsBuilder().put("gateway.type", "local").build());
231+
buildNode("node2", settingsBuilder().put("gateway.type", "local").build());
232+
cleanAndCloseNodes();
233+
234+
logger.info("--> starting 1 master node non data");
235+
startNode("node1", settingsBuilder().put("node.data", false).put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build());
236+
237+
logger.info("--> create an index");
238+
client("node1").admin().indices().prepareCreate("test").execute().actionGet();
239+
240+
logger.info("--> closing master node");
241+
closeNode("node1");
242+
243+
logger.info("--> starting 1 master node non data again");
244+
startNode("node1", settingsBuilder().put("node.data", false).put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build());
245+
246+
logger.info("--> waiting for test index to be created");
247+
ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setIndices("test").execute().actionGet();
248+
assertThat(health.timedOut(), equalTo(false));
249+
250+
logger.info("--> verify we have an index");
251+
ClusterStateResponse clusterStateResponse = client("node1").admin().cluster().prepareState().setFilterIndices("test").execute().actionGet();
252+
assertThat(clusterStateResponse.state().metaData().hasIndex("test"), equalTo(true));
253+
}
254+
255+
@Test public void testJustMasterNodeAndJustDataNode() throws Exception {
256+
logger.info("--> cleaning nodes");
257+
buildNode("node1", settingsBuilder().put("gateway.type", "local").build());
258+
buildNode("node2", settingsBuilder().put("gateway.type", "local").build());
259+
cleanAndCloseNodes();
260+
261+
logger.info("--> starting 1 master node non data");
262+
startNode("node1", settingsBuilder().put("node.data", false).put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build());
263+
startNode("node2", settingsBuilder().put("node.master", false).put("gateway.type", "local").put("index.number_of_shards", 2).put("index.number_of_replicas", 1).build());
264+
265+
logger.info("--> create an index");
266+
client("node1").admin().indices().prepareCreate("test").execute().actionGet();
267+
268+
logger.info("--> waiting for test index to be created");
269+
ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setIndices("test").setWaitForYellowStatus().execute().actionGet();
270+
assertThat(health.timedOut(), equalTo(false));
271+
272+
client("node1").prepareIndex("test", "type1").setSource("field1", "value1").setTimeout("100ms").execute().actionGet();
273+
}
224274
}

0 commit comments

Comments
 (0)