Skip to content

Commit 0172eae

Browse files
committed
add broadcasting when no routing and its marked as required to bulk delete as well
1 parent 619efea commit 0172eae

File tree

2 files changed

+44
-9
lines changed

2 files changed

+44
-9
lines changed

modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
import org.elasticsearch.action.support.BaseAction;
3232
import org.elasticsearch.cluster.ClusterService;
3333
import org.elasticsearch.cluster.ClusterState;
34+
import org.elasticsearch.cluster.metadata.MappingMetaData;
35+
import org.elasticsearch.cluster.routing.GroupShardsIterator;
36+
import org.elasticsearch.cluster.routing.ShardsIterator;
3437
import org.elasticsearch.common.UUID;
3538
import org.elasticsearch.common.collect.Lists;
3639
import org.elasticsearch.common.collect.Maps;
@@ -154,20 +157,39 @@ private void executeBulk(final BulkRequest bulkRequest, final ActionListener<Bul
154157
Map<ShardId, List<BulkItemRequest>> requestsByShard = Maps.newHashMap();
155158
for (int i = 0; i < bulkRequest.requests.size(); i++) {
156159
ActionRequest request = bulkRequest.requests.get(i);
157-
ShardId shardId = null;
158160
if (request instanceof IndexRequest) {
159161
IndexRequest indexRequest = (IndexRequest) request;
160-
shardId = clusterService.operationRouting().indexShards(clusterState, indexRequest.index(), indexRequest.type(), indexRequest.id(), indexRequest.routing()).shardId();
162+
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, indexRequest.index(), indexRequest.type(), indexRequest.id(), indexRequest.routing()).shardId();
163+
List<BulkItemRequest> list = requestsByShard.get(shardId);
164+
if (list == null) {
165+
list = Lists.newArrayList();
166+
requestsByShard.put(shardId, list);
167+
}
168+
list.add(new BulkItemRequest(i, request));
161169
} else if (request instanceof DeleteRequest) {
162170
DeleteRequest deleteRequest = (DeleteRequest) request;
163-
shardId = clusterService.operationRouting().deleteShards(clusterState, deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), deleteRequest.routing()).shardId();
164-
}
165-
List<BulkItemRequest> list = requestsByShard.get(shardId);
166-
if (list == null) {
167-
list = Lists.newArrayList();
168-
requestsByShard.put(shardId, list);
171+
MappingMetaData mappingMd = clusterState.metaData().index(deleteRequest.index()).mapping(deleteRequest.type());
172+
if (mappingMd != null && mappingMd.routing().required() && deleteRequest.routing() == null) {
173+
// if routing is required, and no routing on the delete request, we need to broadcast it....
174+
GroupShardsIterator groupShards = clusterService.operationRouting().broadcastDeleteShards(clusterState, deleteRequest.index());
175+
for (ShardsIterator shardsId : groupShards) {
176+
List<BulkItemRequest> list = requestsByShard.get(shardsId.shardId());
177+
if (list == null) {
178+
list = Lists.newArrayList();
179+
requestsByShard.put(shardsId.shardId(), list);
180+
}
181+
list.add(new BulkItemRequest(i, request));
182+
}
183+
} else {
184+
ShardId shardId = clusterService.operationRouting().deleteShards(clusterState, deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), deleteRequest.routing()).shardId();
185+
List<BulkItemRequest> list = requestsByShard.get(shardId);
186+
if (list == null) {
187+
list = Lists.newArrayList();
188+
requestsByShard.put(shardId, list);
189+
}
190+
list.add(new BulkItemRequest(i, request));
191+
}
169192
}
170-
list.add(new BulkItemRequest(i, request));
171193
}
172194

173195
final AtomicInteger counter = new AtomicInteger(requestsByShard.size());

modules/test/integration/src/test/java/org/elasticsearch/test/integration/routing/SimpleRoutingTests.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.ElasticSearchException;
2323
import org.elasticsearch.action.RoutingMissingException;
2424
import org.elasticsearch.client.Client;
25+
import org.elasticsearch.client.Requests;
2526
import org.elasticsearch.common.xcontent.XContentFactory;
2627
import org.elasticsearch.index.query.xcontent.QueryBuilders;
2728
import org.elasticsearch.test.integration.AbstractNodesTests;
@@ -222,5 +223,17 @@ protected Client getClient() {
222223
assertThat(client.prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(false));
223224
assertThat(client.prepareGet("test", "type1", "1").setRouting("0").execute().actionGet().exists(), equalTo(false));
224225
}
226+
227+
logger.info("--> indexing with id [1], and routing [0]");
228+
client.prepareIndex("test", "type1", "1").setRouting("0").setSource("field", "value1").setRefresh(true).execute().actionGet();
229+
logger.info("--> verifying get with no routing, should not find anything");
230+
231+
logger.info("--> bulk deleting with no routing, should broadcast the delete since _routing is required");
232+
client.prepareBulk().add(Requests.deleteRequest("test").type("type1").id("1")).execute().actionGet();
233+
client.admin().indices().prepareRefresh().execute().actionGet();
234+
for (int i = 0; i < 5; i++) {
235+
assertThat(client.prepareGet("test", "type1", "1").execute().actionGet().exists(), equalTo(false));
236+
assertThat(client.prepareGet("test", "type1", "1").setRouting("0").execute().actionGet().exists(), equalTo(false));
237+
}
225238
}
226239
}

0 commit comments

Comments
 (0)