Skip to content

Commit 02981f6

Browse files
committed
Routing: Allow to specify on the _routing mapping that its required, and fail index operations that do not provide one, closes elastic#520.
1 parent 8a8a6d5 commit 02981f6

19 files changed

+264
-20
lines changed

modules/elasticsearch/src/main/java/org/elasticsearch/action/ActionRequestValidationException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import java.util.List;
2626

2727
/**
28-
* @author kimchy (Shay Banon)
28+
* @author kimchy (shay.banon)
2929
*/
3030
public class ActionRequestValidationException extends ElasticSearchException {
3131

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to Elastic Search and Shay Banon under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. Elastic Search licenses this
6+
* file to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action;
21+
22+
import org.elasticsearch.ElasticSearchException;
23+
24+
/**
25+
* @author kimchy (shay.banon)
26+
*/
27+
public class RoutingMissingException extends ElasticSearchException {
28+
29+
private final String index;
30+
31+
private final String type;
32+
33+
private final String id;
34+
35+
public RoutingMissingException(String index, String type, String id) {
36+
super("routing is required for [" + index + "]/[" + type + "]/[" + id + "]");
37+
this.index = index;
38+
this.type = type;
39+
this.id = id;
40+
}
41+
42+
public String index() {
43+
return index;
44+
}
45+
46+
public String type() {
47+
return type;
48+
}
49+
50+
public String id() {
51+
return id;
52+
}
53+
}

modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/TransportShardReplicationPingAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public class TransportShardReplicationPingAction extends TransportShardReplicati
5757
return "ping/replication/shard";
5858
}
5959

60-
@Override protected ShardReplicationPingResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) {
60+
@Override protected ShardReplicationPingResponse shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
6161
return new ShardReplicationPingResponse();
6262
}
6363

modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.elasticsearch.ExceptionsHelper;
2323
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.action.RoutingMissingException;
2425
import org.elasticsearch.action.delete.DeleteRequest;
2526
import org.elasticsearch.action.delete.DeleteResponse;
2627
import org.elasticsearch.action.index.IndexRequest;
@@ -31,6 +32,7 @@
3132
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
3233
import org.elasticsearch.cluster.action.shard.ShardStateAction;
3334
import org.elasticsearch.cluster.block.ClusterBlockLevel;
35+
import org.elasticsearch.cluster.metadata.MappingMetaData;
3436
import org.elasticsearch.cluster.routing.ShardsIterator;
3537
import org.elasticsearch.common.collect.Sets;
3638
import org.elasticsearch.common.inject.Inject;
@@ -95,7 +97,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
9597
return clusterState.routingTable().index(request.index()).shard(request.shardId()).shardsIt();
9698
}
9799

98-
@Override protected BulkShardResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) {
100+
@Override protected BulkShardResponse shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
99101
IndexShard indexShard = indexShard(shardRequest);
100102
final BulkShardRequest request = shardRequest.request;
101103
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
@@ -105,6 +107,15 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
105107
if (item.request() instanceof IndexRequest) {
106108
IndexRequest indexRequest = (IndexRequest) item.request();
107109
try {
110+
111+
// validate, if routing is required, that we got routing
112+
MappingMetaData mappingMd = clusterState.metaData().index(request.index()).mapping(indexRequest.type());
113+
if (mappingMd != null && mappingMd.routing().required()) {
114+
if (indexRequest.routing() == null) {
115+
throw new RoutingMissingException(indexRequest.index(), indexRequest.type(), indexRequest.id());
116+
}
117+
}
118+
108119
SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id()).routing(indexRequest.routing());
109120
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
110121
ops[i] = indexShard.prepareIndex(sourceToParse);

modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
100100
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
101101
}
102102

103-
@Override protected DeleteResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) {
103+
@Override protected DeleteResponse shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
104104
DeleteRequest request = shardRequest.request;
105105
IndexShard indexShard = indexShard(shardRequest);
106106
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id());

modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
6464
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
6565
}
6666

67-
@Override protected ShardDeleteByQueryResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) {
67+
@Override protected ShardDeleteByQueryResponse shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
6868
ShardDeleteByQueryRequest request = shardRequest.request;
6969
indexShard(shardRequest).deleteByQuery(request.querySource(), request.queryParserName(), request.types());
7070
return new ShardDeleteByQueryResponse();

modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.elasticsearch.ExceptionsHelper;
2323
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.action.RoutingMissingException;
2425
import org.elasticsearch.action.TransportActions;
2526
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
2627
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@@ -31,6 +32,7 @@
3132
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
3233
import org.elasticsearch.cluster.action.shard.ShardStateAction;
3334
import org.elasticsearch.cluster.block.ClusterBlockLevel;
35+
import org.elasticsearch.cluster.metadata.MappingMetaData;
3436
import org.elasticsearch.cluster.routing.ShardsIterator;
3537
import org.elasticsearch.common.UUID;
3638
import org.elasticsearch.common.inject.Inject;
@@ -133,9 +135,18 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
133135
.indexShards(clusterService.state(), request.index(), request.type(), request.id(), request.routing());
134136
}
135137

136-
@Override protected IndexResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) {
137-
IndexShard indexShard = indexShard(shardRequest);
138+
@Override protected IndexResponse shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
138139
final IndexRequest request = shardRequest.request;
140+
141+
// validate, if routing is required, that we got routing
142+
MappingMetaData mappingMd = clusterState.metaData().index(request.index()).mapping(request.type());
143+
if (mappingMd != null && mappingMd.routing().required()) {
144+
if (request.routing() == null) {
145+
throw new RoutingMissingException(request.index(), request.type(), request.id());
146+
}
147+
}
148+
149+
IndexShard indexShard = indexShard(shardRequest);
139150
SourceToParse sourceToParse = SourceToParse.source(request.source()).type(request.type()).id(request.id()).routing(request.routing());
140151
ParsedDocument doc;
141152
if (request.opType() == IndexRequest.OpType.INDEX) {

modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ protected TransportShardReplicationOperationAction(Settings settings, TransportS
103103

104104
protected abstract String transportAction();
105105

106-
protected abstract Response shardOperationOnPrimary(ShardOperationRequest shardRequest);
106+
protected abstract Response shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest);
107107

108108
protected abstract void shardOperationOnReplica(ShardOperationRequest shardRequest);
109109

@@ -254,7 +254,7 @@ public void start() {
254254
* Returns <tt>true</tt> if the action starting to be performed on the primary (or is done).
255255
*/
256256
public boolean start(final boolean fromClusterEvent) throws ElasticSearchException {
257-
ClusterState clusterState = clusterService.state();
257+
final ClusterState clusterState = clusterService.state();
258258
nodes = clusterState.nodes();
259259
if (!clusterState.routingTable().hasIndex(request.index())) {
260260
retry(fromClusterEvent, null);
@@ -313,11 +313,11 @@ public boolean start(final boolean fromClusterEvent) throws ElasticSearchExcepti
313313
request.beforeLocalFork();
314314
threadPool.execute(new Runnable() {
315315
@Override public void run() {
316-
performOnPrimary(shard.id(), fromClusterEvent, true, shard);
316+
performOnPrimary(shard.id(), fromClusterEvent, true, shard, clusterState);
317317
}
318318
});
319319
} else {
320-
performOnPrimary(shard.id(), fromClusterEvent, false, shard);
320+
performOnPrimary(shard.id(), fromClusterEvent, false, shard, clusterState);
321321
}
322322
} else {
323323
DiscoveryNode node = nodes.get(shard.currentNodeId());
@@ -413,9 +413,9 @@ private void retry(boolean fromClusterEvent, final ShardId shardId) {
413413
}
414414
}
415415

416-
private void performOnPrimary(int primaryShardId, boolean fromDiscoveryListener, boolean alreadyThreaded, final ShardRouting shard) {
416+
private void performOnPrimary(int primaryShardId, boolean fromDiscoveryListener, boolean alreadyThreaded, final ShardRouting shard, ClusterState clusterState) {
417417
try {
418-
Response response = shardOperationOnPrimary(new ShardOperationRequest(primaryShardId, request));
418+
Response response = shardOperationOnPrimary(clusterState, new ShardOperationRequest(primaryShardId, request));
419419
performReplicas(response, alreadyThreaded);
420420
} catch (Exception e) {
421421
// shard has not been allocated yet, retry it here

modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,43 @@
3131
*/
3232
public class MappingMetaData {
3333

34+
public static class Routing {
35+
36+
public static final Routing EMPTY = new Routing(false);
37+
38+
private final boolean required;
39+
40+
public Routing(boolean required) {
41+
this.required = required;
42+
}
43+
44+
public boolean required() {
45+
return required;
46+
}
47+
}
48+
3449
private final String type;
3550

3651
private final CompressedString source;
3752

53+
private final Routing routing;
54+
3855
public MappingMetaData(DocumentMapper docMapper) {
3956
this.type = docMapper.type();
4057
this.source = docMapper.mappingSource();
58+
this.routing = new Routing(docMapper.routingFieldMapper().required());
4159
}
4260

4361
public MappingMetaData(String type, CompressedString source) {
4462
this.type = type;
4563
this.source = source;
64+
this.routing = Routing.EMPTY;
65+
}
66+
67+
MappingMetaData(String type, CompressedString source, Routing routing) {
68+
this.type = type;
69+
this.source = source;
70+
this.routing = routing;
4671
}
4772

4873
public String type() {
@@ -53,12 +78,22 @@ public CompressedString source() {
5378
return this.source;
5479
}
5580

81+
public Routing routing() {
82+
return this.routing;
83+
}
84+
5685
public static void writeTo(MappingMetaData mappingMd, StreamOutput out) throws IOException {
5786
out.writeUTF(mappingMd.type());
5887
mappingMd.source().writeTo(out);
88+
// routing
89+
out.writeBoolean(mappingMd.routing().required());
5990
}
6091

6192
public static MappingMetaData readFrom(StreamInput in) throws IOException {
62-
return new MappingMetaData(in.readUTF(), CompressedString.readCompressedString(in));
93+
String type = in.readUTF();
94+
CompressedString source = CompressedString.readCompressedString(in);
95+
// routing
96+
Routing routing = new Routing(in.readBoolean());
97+
return new MappingMetaData(type, source, routing);
6398
}
6499
}

modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ public interface DocumentMapper {
6666

6767
AllFieldMapper allFieldMapper();
6868

69+
RoutingFieldMapper routingFieldMapper();
70+
6971
DocumentFieldMappers mappers();
7072

7173
/**

modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/RoutingFieldMapper.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,7 @@
2626
*/
2727
public interface RoutingFieldMapper extends FieldMapper<String>, InternalMapper {
2828

29+
boolean required();
30+
2931
String value(Document document);
3032
}

modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/RoutingFieldMapper.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,28 +41,43 @@ public static class Defaults extends AbstractFieldMapper.Defaults {
4141
public static final Field.Store STORE = Field.Store.YES;
4242
public static final boolean OMIT_NORMS = true;
4343
public static final boolean OMIT_TERM_FREQ_AND_POSITIONS = true;
44+
public static final boolean REQUIRED = false;
4445
}
4546

4647
public static class Builder extends AbstractFieldMapper.Builder<Builder, RoutingFieldMapper> {
4748

49+
private boolean required = Defaults.REQUIRED;
50+
4851
public Builder() {
4952
super(Defaults.NAME);
5053
store = Defaults.STORE;
5154
index = Defaults.INDEX;
5255
}
5356

57+
public Builder required(boolean required) {
58+
this.required = required;
59+
return builder;
60+
}
61+
5462
@Override public RoutingFieldMapper build(BuilderContext context) {
55-
return new RoutingFieldMapper(store, index);
63+
return new RoutingFieldMapper(store, index, required);
5664
}
5765
}
5866

67+
private final boolean required;
68+
5969
protected RoutingFieldMapper() {
60-
this(Defaults.STORE, Defaults.INDEX);
70+
this(Defaults.STORE, Defaults.INDEX, Defaults.REQUIRED);
6171
}
6272

63-
protected RoutingFieldMapper(Field.Store store, Field.Index index) {
73+
protected RoutingFieldMapper(Field.Store store, Field.Index index, boolean required) {
6474
super(new Names(Defaults.NAME, Defaults.NAME, Defaults.NAME, Defaults.NAME), index, store, Defaults.TERM_VECTOR, 1.0f, Defaults.OMIT_NORMS, Defaults.OMIT_TERM_FREQ_AND_POSITIONS,
6575
Lucene.KEYWORD_ANALYZER, Lucene.KEYWORD_ANALYZER);
76+
this.required = required;
77+
}
78+
79+
@Override public boolean required() {
80+
return this.required;
6681
}
6782

6883
@Override public String value(Document document) {
@@ -107,7 +122,7 @@ protected RoutingFieldMapper(Field.Store store, Field.Index index) {
107122

108123
@Override public void toXContent(XContentBuilder builder, Params params) throws IOException {
109124
// if all are defaults, no sense to write it at all
110-
if (index == Defaults.INDEX && store == Defaults.STORE) {
125+
if (index == Defaults.INDEX && store == Defaults.STORE && required == Defaults.REQUIRED) {
111126
return;
112127
}
113128
builder.startObject(CONTENT_TYPE);
@@ -117,6 +132,9 @@ protected RoutingFieldMapper(Field.Store store, Field.Index index) {
117132
if (store != Defaults.STORE) {
118133
builder.field("store", store.name().toLowerCase());
119134
}
135+
if (required != Defaults.REQUIRED) {
136+
builder.field("required", required);
137+
}
120138
builder.endObject();
121139
}
122140

modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/XContentDocumentMapper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,10 @@ public RootObjectMapper root() {
312312
return this.allFieldMapper;
313313
}
314314

315+
@Override public org.elasticsearch.index.mapper.RoutingFieldMapper routingFieldMapper() {
316+
return this.routingFieldMapper;
317+
}
318+
315319
@Override public Analyzer indexAnalyzer() {
316320
return this.indexAnalyzer;
317321
}

modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/XContentDocumentMapperParser.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,13 @@ private IdFieldMapper.Builder parseIdField(Map<String, Object> idNode, XContentM
218218
private RoutingFieldMapper.Builder parseRoutingField(Map<String, Object> routingNode, XContentMapper.TypeParser.ParserContext parserContext) {
219219
RoutingFieldMapper.Builder builder = routing();
220220
parseField(builder, builder.name, routingNode, parserContext);
221+
for (Map.Entry<String, Object> entry : routingNode.entrySet()) {
222+
String fieldName = Strings.toUnderscoreCase(entry.getKey());
223+
Object fieldNode = entry.getValue();
224+
if (fieldName.equals("required")) {
225+
builder.required(nodeBooleanValue(fieldNode));
226+
}
227+
}
221228
return builder;
222229
}
223230

modules/test/integration/src/test/java/org/elasticsearch/test/integration/AbstractNodesTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ public Node buildNode(String id) {
5656
return buildNode(id, EMPTY_SETTINGS);
5757
}
5858

59+
public Node buildNode(String id, Settings.Builder settings) {
60+
return buildNode(id, settings.build());
61+
}
62+
5963
public Node buildNode(String id, Settings settings) {
6064
String settingsSource = getClass().getName().replace('.', '/') + ".yml";
6165
Settings finalSettings = settingsBuilder()

0 commit comments

Comments
 (0)