Skip to content

Commit 44775c2

Browse files
committed
Routing: Allow to define path on the _routing mapping, to automatically extract the routing from it, closes elastic#524.
1 parent 4e75f3d commit 44775c2

File tree

10 files changed

+312
-19
lines changed

10 files changed

+312
-19
lines changed

modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.action.bulk;
2121

22+
import org.elasticsearch.ElasticSearchException;
2223
import org.elasticsearch.ExceptionsHelper;
2324
import org.elasticsearch.action.ActionListener;
2425
import org.elasticsearch.action.ActionRequest;
@@ -152,13 +153,27 @@ private void executeBulk(final BulkRequest bulkRequest, final ActionListener<Bul
152153
deleteRequest.index(clusterState.metaData().concreteIndex(deleteRequest.index()));
153154
}
154155
}
156+
final BulkItemResponse[] responses = new BulkItemResponse[bulkRequest.requests.size()];
157+
155158

156159
// first, go over all the requests and create a ShardId -> Operations mapping
157160
Map<ShardId, List<BulkItemRequest>> requestsByShard = Maps.newHashMap();
158161
for (int i = 0; i < bulkRequest.requests.size(); i++) {
159162
ActionRequest request = bulkRequest.requests.get(i);
160163
if (request instanceof IndexRequest) {
161164
IndexRequest indexRequest = (IndexRequest) request;
165+
// handle routing
166+
MappingMetaData mappingMd = clusterState.metaData().index(indexRequest.index()).mapping(indexRequest.type());
167+
if (mappingMd != null) {
168+
try {
169+
indexRequest.processRouting(mappingMd);
170+
} catch (ElasticSearchException e) {
171+
responses[i] = new BulkItemResponse(i, indexRequest.opType().toString().toLowerCase(),
172+
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e.getDetailedMessage()));
173+
continue;
174+
}
175+
}
176+
162177
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, indexRequest.index(), indexRequest.type(), indexRequest.id(), indexRequest.routing()).shardId();
163178
List<BulkItemRequest> list = requestsByShard.get(shardId);
164179
if (list == null) {
@@ -193,7 +208,6 @@ private void executeBulk(final BulkRequest bulkRequest, final ActionListener<Bul
193208
}
194209

195210
final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
196-
final BulkItemResponse[] responses = new BulkItemResponse[bulkRequest.requests.size()];
197211
for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
198212
final ShardId shardId = entry.getKey();
199213
final List<BulkItemRequest> requests = entry.getValue();

modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,25 @@
2020
package org.elasticsearch.action.index;
2121

2222
import org.apache.lucene.util.UnicodeUtil;
23+
import org.elasticsearch.ElasticSearchException;
2324
import org.elasticsearch.ElasticSearchGenerationException;
2425
import org.elasticsearch.ElasticSearchIllegalArgumentException;
26+
import org.elasticsearch.ElasticSearchParseException;
2527
import org.elasticsearch.action.ActionRequestValidationException;
28+
import org.elasticsearch.action.RoutingMissingException;
2629
import org.elasticsearch.action.WriteConsistencyLevel;
2730
import org.elasticsearch.action.support.replication.ReplicationType;
2831
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
2932
import org.elasticsearch.client.Requests;
33+
import org.elasticsearch.cluster.metadata.MappingMetaData;
3034
import org.elasticsearch.common.Required;
3135
import org.elasticsearch.common.Unicode;
3236
import org.elasticsearch.common.io.stream.StreamInput;
3337
import org.elasticsearch.common.io.stream.StreamOutput;
3438
import org.elasticsearch.common.unit.TimeValue;
3539
import org.elasticsearch.common.xcontent.XContentBuilder;
3640
import org.elasticsearch.common.xcontent.XContentFactory;
41+
import org.elasticsearch.common.xcontent.XContentParser;
3742
import org.elasticsearch.common.xcontent.XContentType;
3843

3944
import javax.annotation.Nullable;
@@ -251,7 +256,7 @@ public String routing() {
251256
}
252257

253258
/**
254-
* The source of the document to index.
259+
* The source of the document to index, recopied to a new array if it has an offset or unsafe.
255260
*/
256261
public byte[] source() {
257262
if (sourceUnsafe || sourceOffset > 0) {
@@ -262,6 +267,18 @@ public byte[] source() {
262267
return source;
263268
}
264269

270+
public byte[] unsafeSource() {
271+
return this.source;
272+
}
273+
274+
public int unsafeSourceOffset() {
275+
return this.sourceOffset;
276+
}
277+
278+
public int unsafeSourceLength() {
279+
return this.sourceLength;
280+
}
281+
265282
/**
266283
* Index the Map as a {@link org.elasticsearch.client.Requests#INDEX_CONTENT_TYPE}.
267284
*
@@ -485,6 +502,27 @@ public boolean refresh() {
485502
return this.refresh;
486503
}
487504

505+
public void processRouting(MappingMetaData mappingMd) throws ElasticSearchException {
506+
if (routing == null && mappingMd.routing().hasPath()) {
507+
XContentParser parser = null;
508+
try {
509+
parser = XContentFactory.xContent(source, sourceOffset, sourceLength)
510+
.createParser(source, sourceOffset, sourceLength);
511+
routing = mappingMd.parseRouting(parser);
512+
} catch (Exception e) {
513+
throw new ElasticSearchParseException("failed to parse doc to extract routing", e);
514+
} finally {
515+
if (parser != null) {
516+
parser.close();
517+
}
518+
}
519+
}
520+
// might as well check for routing here
521+
if (mappingMd.routing().required() && routing == null) {
522+
throw new RoutingMissingException(index, type, id);
523+
}
524+
}
525+
488526
@Override public void readFrom(StreamInput in) throws IOException {
489527
super.readFrom(in);
490528
type = in.readUTF();

modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.cluster.action.shard.ShardStateAction;
3434
import org.elasticsearch.cluster.block.ClusterBlockLevel;
3535
import org.elasticsearch.cluster.metadata.MappingMetaData;
36+
import org.elasticsearch.cluster.metadata.MetaData;
3637
import org.elasticsearch.cluster.routing.ShardsIterator;
3738
import org.elasticsearch.common.UUID;
3839
import org.elasticsearch.common.inject.Inject;
@@ -82,34 +83,46 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
8283
this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true);
8384
}
8485

85-
@Override protected void doExecute(final IndexRequest indexRequest, final ActionListener<IndexResponse> listener) {
86+
@Override protected void doExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
8687
if (allowIdGeneration) {
87-
if (indexRequest.id() == null) {
88-
indexRequest.id(UUID.randomBase64UUID());
88+
if (request.id() == null) {
89+
request.id(UUID.randomBase64UUID());
8990
// since we generate the id, change it to CREATE
90-
indexRequest.opType(IndexRequest.OpType.CREATE);
91+
request.opType(IndexRequest.OpType.CREATE);
9192
}
9293
}
93-
if (autoCreateIndex && !clusterService.state().metaData().hasConcreteIndex(indexRequest.index())) {
94-
createIndexAction.execute(new CreateIndexRequest(indexRequest.index()).cause("auto(index api)"), new ActionListener<CreateIndexResponse>() {
94+
if (autoCreateIndex && !clusterService.state().metaData().hasConcreteIndex(request.index())) {
95+
createIndexAction.execute(new CreateIndexRequest(request.index()).cause("auto(index api)"), new ActionListener<CreateIndexResponse>() {
9596
@Override public void onResponse(CreateIndexResponse result) {
96-
TransportIndexAction.super.doExecute(indexRequest, listener);
97+
innerExecute(request, listener);
9798
}
9899

99100
@Override public void onFailure(Throwable e) {
100101
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
101102
// we have the index, do it
102-
TransportIndexAction.super.doExecute(indexRequest, listener);
103+
innerExecute(request, listener);
103104
} else {
104105
listener.onFailure(e);
105106
}
106107
}
107108
});
108109
} else {
109-
super.doExecute(indexRequest, listener);
110+
innerExecute(request, listener);
110111
}
111112
}
112113

114+
private void innerExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
115+
MetaData metaData = clusterService.state().metaData();
116+
request.index(metaData.concreteIndex(request.index()));
117+
if (metaData.hasIndex(request.index())) {
118+
MappingMetaData mappingMd = metaData.index(request.index()).mapping(request.type());
119+
if (mappingMd != null) {
120+
request.processRouting(mappingMd);
121+
}
122+
}
123+
super.doExecute(request, listener);
124+
}
125+
113126
@Override protected boolean checkWriteConsistency() {
114127
return true;
115128
}

modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919

2020
package org.elasticsearch.cluster.metadata;
2121

22+
import org.elasticsearch.common.Strings;
2223
import org.elasticsearch.common.compress.CompressedString;
2324
import org.elasticsearch.common.io.stream.StreamInput;
2425
import org.elasticsearch.common.io.stream.StreamOutput;
26+
import org.elasticsearch.common.xcontent.XContentParser;
2527
import org.elasticsearch.index.mapper.DocumentMapper;
2628

2729
import java.io.IOException;
@@ -33,17 +35,39 @@ public class MappingMetaData {
3335

3436
public static class Routing {
3537

36-
public static final Routing EMPTY = new Routing(false);
38+
public static final Routing EMPTY = new Routing(false, null);
3739

3840
private final boolean required;
3941

40-
public Routing(boolean required) {
42+
private final String path;
43+
44+
private final String[] pathElements;
45+
46+
public Routing(boolean required, String path) {
4147
this.required = required;
48+
this.path = path;
49+
if (path == null) {
50+
pathElements = Strings.EMPTY_ARRAY;
51+
} else {
52+
pathElements = Strings.delimitedListToStringArray(path, ".");
53+
}
4254
}
4355

4456
public boolean required() {
4557
return required;
4658
}
59+
60+
public boolean hasPath() {
61+
return path != null;
62+
}
63+
64+
public String path() {
65+
return this.path;
66+
}
67+
68+
public String[] pathElements() {
69+
return this.pathElements;
70+
}
4771
}
4872

4973
private final String type;
@@ -55,7 +79,7 @@ public boolean required() {
5579
public MappingMetaData(DocumentMapper docMapper) {
5680
this.type = docMapper.type();
5781
this.source = docMapper.mappingSource();
58-
this.routing = new Routing(docMapper.routingFieldMapper().required());
82+
this.routing = new Routing(docMapper.routingFieldMapper().required(), docMapper.routingFieldMapper().path());
5983
}
6084

6185
public MappingMetaData(String type, CompressedString source) {
@@ -82,18 +106,58 @@ public Routing routing() {
82106
return this.routing;
83107
}
84108

109+
public String parseRouting(XContentParser parser) throws IOException {
110+
return parseRouting(parser, 0);
111+
}
112+
113+
private String parseRouting(XContentParser parser, int location) throws IOException {
114+
XContentParser.Token t = parser.currentToken();
115+
if (t == null) {
116+
t = parser.nextToken();
117+
}
118+
if (t == XContentParser.Token.START_OBJECT) {
119+
t = parser.nextToken();
120+
}
121+
String routingPart = routing().pathElements()[location];
122+
123+
for (; t == XContentParser.Token.FIELD_NAME; t = parser.nextToken()) {
124+
// Must point to field name
125+
String fieldName = parser.currentName();
126+
// And then the value...
127+
t = parser.nextToken();
128+
if (routingPart.equals(fieldName)) {
129+
location++;
130+
if (location == routing.pathElements().length) {
131+
return parser.textOrNull();
132+
}
133+
if (t == XContentParser.Token.START_OBJECT) {
134+
return parseRouting(parser, location);
135+
}
136+
} else {
137+
parser.skipChildren();
138+
}
139+
}
140+
return null;
141+
}
142+
85143
public static void writeTo(MappingMetaData mappingMd, StreamOutput out) throws IOException {
86144
out.writeUTF(mappingMd.type());
87145
mappingMd.source().writeTo(out);
88146
// routing
89147
out.writeBoolean(mappingMd.routing().required());
148+
if (mappingMd.routing().hasPath()) {
149+
out.writeBoolean(true);
150+
out.writeUTF(mappingMd.routing().path());
151+
} else {
152+
out.writeBoolean(false);
153+
}
90154
}
91155

92156
public static MappingMetaData readFrom(StreamInput in) throws IOException {
93157
String type = in.readUTF();
94158
CompressedString source = CompressedString.readCompressedString(in);
95159
// routing
96-
Routing routing = new Routing(in.readBoolean());
160+
Routing routing = new Routing(in.readBoolean(), in.readBoolean() ? in.readUTF() : null);
97161
return new MappingMetaData(type, source, routing);
98162
}
99163
}

modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/RoutingFieldMapper.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,7 @@ public interface RoutingFieldMapper extends FieldMapper<String>, InternalMapper
2828

2929
boolean required();
3030

31+
String path();
32+
3133
String value(Document document);
3234
}

0 commit comments

Comments
 (0)