Skip to content

Commit a04d8ec

Browse files
committed
Routing: When specify in the mapping _routing required, a delete without explicit routing value should automatically be broadcasted to all shards, closes elastic#522.
1 parent 2483a37 commit a04d8ec

File tree

12 files changed

+572
-7
lines changed

12 files changed

+572
-7
lines changed

modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
import org.elasticsearch.action.bulk.TransportShardBulkAction;
4949
import org.elasticsearch.action.count.TransportCountAction;
5050
import org.elasticsearch.action.delete.TransportDeleteAction;
51+
import org.elasticsearch.action.delete.index.TransportIndexDeleteAction;
52+
import org.elasticsearch.action.delete.index.TransportShardDeleteAction;
5153
import org.elasticsearch.action.deletebyquery.TransportDeleteByQueryAction;
5254
import org.elasticsearch.action.deletebyquery.TransportIndexDeleteByQueryAction;
5355
import org.elasticsearch.action.deletebyquery.TransportShardDeleteByQueryAction;
@@ -99,6 +101,8 @@ public class TransportActionModule extends AbstractModule {
99101
bind(TransportIndexAction.class).asEagerSingleton();
100102
bind(TransportGetAction.class).asEagerSingleton();
101103
bind(TransportDeleteAction.class).asEagerSingleton();
104+
bind(TransportIndexDeleteAction.class).asEagerSingleton();
105+
bind(TransportShardDeleteAction.class).asEagerSingleton();
102106
bind(TransportCountAction.class).asEagerSingleton();
103107

104108
bind(TransportBulkAction.class).asEagerSingleton();

modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,15 @@
2525
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
2626
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
2727
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
28+
import org.elasticsearch.action.delete.index.IndexDeleteRequest;
29+
import org.elasticsearch.action.delete.index.IndexDeleteResponse;
30+
import org.elasticsearch.action.delete.index.TransportIndexDeleteAction;
2831
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
2932
import org.elasticsearch.cluster.ClusterService;
3033
import org.elasticsearch.cluster.ClusterState;
3134
import org.elasticsearch.cluster.action.shard.ShardStateAction;
3235
import org.elasticsearch.cluster.block.ClusterBlockLevel;
36+
import org.elasticsearch.cluster.metadata.MappingMetaData;
3337
import org.elasticsearch.cluster.routing.ShardsIterator;
3438
import org.elasticsearch.common.inject.Inject;
3539
import org.elasticsearch.common.settings.Settings;
@@ -51,33 +55,60 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
5155

5256
private final TransportCreateIndexAction createIndexAction;
5357

58+
private final TransportIndexDeleteAction indexDeleteAction;
59+
5460
@Inject public TransportDeleteAction(Settings settings, TransportService transportService, ClusterService clusterService,
5561
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
56-
TransportCreateIndexAction createIndexAction) {
62+
TransportCreateIndexAction createIndexAction, TransportIndexDeleteAction indexDeleteAction) {
5763
super(settings, transportService, clusterService, indicesService, threadPool, shardStateAction);
5864
this.createIndexAction = createIndexAction;
65+
this.indexDeleteAction = indexDeleteAction;
5966
this.autoCreateIndex = settings.getAsBoolean("action.auto_create_index", true);
6067
}
6168

6269
@Override protected void doExecute(final DeleteRequest deleteRequest, final ActionListener<DeleteResponse> listener) {
6370
if (autoCreateIndex && !clusterService.state().metaData().hasConcreteIndex(deleteRequest.index())) {
6471
createIndexAction.execute(new CreateIndexRequest(deleteRequest.index()), new ActionListener<CreateIndexResponse>() {
6572
@Override public void onResponse(CreateIndexResponse result) {
66-
TransportDeleteAction.super.doExecute(deleteRequest, listener);
73+
innerExecute(deleteRequest, listener);
6774
}
6875

6976
@Override public void onFailure(Throwable e) {
7077
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
7178
// we have the index, do it
72-
TransportDeleteAction.super.doExecute(deleteRequest, listener);
79+
innerExecute(deleteRequest, listener);
7380
} else {
7481
listener.onFailure(e);
7582
}
7683
}
7784
});
7885
} else {
79-
super.doExecute(deleteRequest, listener);
86+
innerExecute(deleteRequest, listener);
87+
}
88+
}
89+
90+
private void innerExecute(final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
91+
ClusterState clusterState = clusterService.state();
92+
request.index(clusterState.metaData().concreteIndex(request.index())); // we need to get the concrete index here...
93+
if (clusterState.metaData().hasIndex(request.index())) {
94+
// check if routing is required, if so, do a broadcast delete
95+
MappingMetaData mappingMd = clusterState.metaData().index(request.index()).mapping(request.type());
96+
if (mappingMd != null && mappingMd.routing().required()) {
97+
if (request.routing() == null) {
98+
indexDeleteAction.execute(new IndexDeleteRequest(request), new ActionListener<IndexDeleteResponse>() {
99+
@Override public void onResponse(IndexDeleteResponse indexDeleteResponse) {
100+
listener.onResponse(new DeleteResponse(request.index(), request.type(), request.id()));
101+
}
102+
103+
@Override public void onFailure(Throwable e) {
104+
listener.onFailure(e);
105+
}
106+
});
107+
return;
108+
}
109+
}
80110
}
111+
super.doExecute(request, listener);
81112
}
82113

83114
@Override protected boolean checkWriteConsistency() {
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to Elastic Search and Shay Banon under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. Elastic Search licenses this
6+
* file to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action.delete.index;
21+
22+
import org.elasticsearch.action.delete.DeleteRequest;
23+
import org.elasticsearch.action.support.replication.IndexReplicationOperationRequest;
24+
import org.elasticsearch.common.io.stream.StreamInput;
25+
import org.elasticsearch.common.io.stream.StreamOutput;
26+
27+
import java.io.IOException;
28+
29+
/**
30+
* @author kimchy (shay.banon)
31+
*/
32+
public class IndexDeleteRequest extends IndexReplicationOperationRequest {
33+
34+
private String type;
35+
36+
private String id;
37+
38+
private boolean refresh = false;
39+
40+
IndexDeleteRequest() {
41+
}
42+
43+
public IndexDeleteRequest(DeleteRequest request) {
44+
this.timeout = request.timeout();
45+
this.consistencyLevel = request.consistencyLevel();
46+
this.replicationType = request.replicationType();
47+
this.index = request.index();
48+
this.type = request.type();
49+
this.id = request.id();
50+
this.refresh = request.refresh();
51+
}
52+
53+
public String type() {
54+
return this.type;
55+
}
56+
57+
public String id() {
58+
return this.id;
59+
}
60+
61+
public boolean refresh() {
62+
return this.refresh;
63+
}
64+
65+
@Override public void readFrom(StreamInput in) throws IOException {
66+
super.readFrom(in);
67+
type = in.readUTF();
68+
id = in.readUTF();
69+
refresh = in.readBoolean();
70+
}
71+
72+
@Override public void writeTo(StreamOutput out) throws IOException {
73+
super.writeTo(out);
74+
out.writeUTF(type);
75+
out.writeUTF(id);
76+
out.writeBoolean(refresh);
77+
}
78+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Licensed to Elastic Search and Shay Banon under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. Elastic Search licenses this
6+
* file to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action.delete.index;
21+
22+
import org.elasticsearch.action.ActionResponse;
23+
import org.elasticsearch.common.io.stream.StreamInput;
24+
import org.elasticsearch.common.io.stream.StreamOutput;
25+
import org.elasticsearch.common.io.stream.Streamable;
26+
27+
import java.io.IOException;
28+
29+
/**
30+
* Delete by query response executed on a specific index.
31+
*
32+
* @author kimchy (shay.banon)
33+
*/
34+
public class IndexDeleteResponse implements ActionResponse, Streamable {
35+
36+
private String index;
37+
38+
private int successfulShards;
39+
40+
private int failedShards;
41+
42+
IndexDeleteResponse(String index, int successfulShards, int failedShards) {
43+
this.index = index;
44+
this.successfulShards = successfulShards;
45+
this.failedShards = failedShards;
46+
}
47+
48+
IndexDeleteResponse() {
49+
50+
}
51+
52+
/**
53+
* The index the delete by query operation was executed against.
54+
*/
55+
public String index() {
56+
return this.index;
57+
}
58+
59+
/**
60+
* The index the delete by query operation was executed against.
61+
*/
62+
public String getIndex() {
63+
return index;
64+
}
65+
66+
/**
67+
* The total number of shards the delete by query was executed on.
68+
*/
69+
public int totalShards() {
70+
return failedShards + successfulShards;
71+
}
72+
73+
/**
74+
* The total number of shards the delete by query was executed on.
75+
*/
76+
public int getTotalShards() {
77+
return totalShards();
78+
}
79+
80+
/**
81+
* The successful number of shards the delete by query was executed on.
82+
*/
83+
public int successfulShards() {
84+
return successfulShards;
85+
}
86+
87+
/**
88+
* The successful number of shards the delete by query was executed on.
89+
*/
90+
public int getSuccessfulShards() {
91+
return successfulShards;
92+
}
93+
94+
/**
95+
* The failed number of shards the delete by query was executed on.
96+
*/
97+
public int failedShards() {
98+
return failedShards;
99+
}
100+
101+
/**
102+
* The failed number of shards the delete by query was executed on.
103+
*/
104+
public int getFailedShards() {
105+
return failedShards;
106+
}
107+
108+
@Override public void readFrom(StreamInput in) throws IOException {
109+
index = in.readUTF();
110+
successfulShards = in.readVInt();
111+
failedShards = in.readVInt();
112+
}
113+
114+
@Override public void writeTo(StreamOutput out) throws IOException {
115+
out.writeUTF(index);
116+
out.writeVInt(successfulShards);
117+
out.writeVInt(failedShards);
118+
}
119+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Licensed to Elastic Search and Shay Banon under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. Elastic Search licenses this
6+
* file to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action.delete.index;
21+
22+
import org.elasticsearch.action.ActionRequestValidationException;
23+
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
24+
import org.elasticsearch.common.io.stream.StreamInput;
25+
import org.elasticsearch.common.io.stream.StreamOutput;
26+
27+
import java.io.IOException;
28+
29+
import static org.elasticsearch.action.Actions.*;
30+
31+
/**
32+
* Delete by query request to execute on a specific shard.
33+
*
34+
* @author kimchy (shay.banon)
35+
*/
36+
public class ShardDeleteRequest extends ShardReplicationOperationRequest {
37+
38+
private int shardId;
39+
private String type;
40+
private String id;
41+
private boolean refresh = false;
42+
43+
ShardDeleteRequest(IndexDeleteRequest request, int shardId) {
44+
this.index = request.index();
45+
this.shardId = shardId;
46+
this.type = request.type();
47+
this.id = request.id();
48+
replicationType(request.replicationType());
49+
consistencyLevel(request.consistencyLevel());
50+
timeout = request.timeout();
51+
this.refresh = request.refresh();
52+
}
53+
54+
ShardDeleteRequest() {
55+
}
56+
57+
@Override public ActionRequestValidationException validate() {
58+
ActionRequestValidationException validationException = super.validate();
59+
if (type == null) {
60+
addValidationError("type is missing", validationException);
61+
}
62+
if (id == null) {
63+
addValidationError("id is missing", validationException);
64+
}
65+
return validationException;
66+
}
67+
68+
public int shardId() {
69+
return this.shardId;
70+
}
71+
72+
public String type() {
73+
return this.type;
74+
}
75+
76+
public String id() {
77+
return this.id;
78+
}
79+
80+
public boolean refresh() {
81+
return this.refresh;
82+
}
83+
84+
@Override public void readFrom(StreamInput in) throws IOException {
85+
super.readFrom(in);
86+
shardId = in.readVInt();
87+
type = in.readUTF();
88+
id = in.readUTF();
89+
refresh = in.readBoolean();
90+
}
91+
92+
@Override public void writeTo(StreamOutput out) throws IOException {
93+
super.writeTo(out);
94+
out.writeVInt(shardId);
95+
out.writeUTF(type);
96+
out.writeUTF(id);
97+
out.writeBoolean(refresh);
98+
}
99+
}

0 commit comments

Comments
 (0)