Skip to content

Commit 6c4124d

Browse files
committed
Search / Broadcast concurrency bug can result in response corruption / errors, closes elastic#1152.
1 parent ceb6973 commit 6c4124d

File tree

5 files changed

+60
-15
lines changed

5 files changed

+60
-15
lines changed

modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,10 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
103103
return shardIt.nextAssignedOrNull();
104104
}
105105

106+
@Override protected ShardRouting firstShardOrNull(ShardIterator shardIt) {
107+
return shardIt.firstAssignedOrNull();
108+
}
109+
106110
/**
107111
* We want to go over all assigned nodes (to get recovery status) and not just active ones.
108112
*/

modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,12 @@
2020
package org.elasticsearch.action.search.type;
2121

2222
import org.elasticsearch.action.ActionListener;
23-
import org.elasticsearch.action.search.*;
23+
import org.elasticsearch.action.search.ReduceSearchPhaseException;
24+
import org.elasticsearch.action.search.SearchOperationThreading;
25+
import org.elasticsearch.action.search.SearchPhaseExecutionException;
26+
import org.elasticsearch.action.search.SearchRequest;
27+
import org.elasticsearch.action.search.SearchResponse;
28+
import org.elasticsearch.action.search.ShardSearchFailure;
2429
import org.elasticsearch.action.support.BaseAction;
2530
import org.elasticsearch.cluster.ClusterService;
2631
import org.elasticsearch.cluster.ClusterState;
@@ -125,13 +130,13 @@ public void start() {
125130
// count the local operations, and perform the non local ones
126131
int localOperations = 0;
127132
for (final ShardIterator shardIt : shardsIts) {
128-
final ShardRouting shard = shardIt.nextActiveOrNull();
133+
final ShardRouting shard = shardIt.firstActiveOrNull();
129134
if (shard != null) {
130135
if (shard.currentNodeId().equals(nodes.localNodeId())) {
131136
localOperations++;
132137
} else {
133138
// do the remote operation here, the localAsync flag is not relevant
134-
performFirstPhase(shardIt.reset());
139+
performFirstPhase(shardIt);
135140
}
136141
} else {
137142
// really, no shards active in this group
@@ -145,10 +150,10 @@ public void start() {
145150
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
146151
@Override public void run() {
147152
for (final ShardIterator shardIt : shardsIts) {
148-
final ShardRouting shard = shardIt.reset().nextActiveOrNull();
153+
final ShardRouting shard = shardIt.firstActiveOrNull();
149154
if (shard != null) {
150155
if (shard.currentNodeId().equals(nodes.localNodeId())) {
151-
performFirstPhase(shardIt.reset());
156+
performFirstPhase(shardIt);
152157
}
153158
}
154159
}
@@ -160,17 +165,17 @@ public void start() {
160165
request.beforeLocalFork();
161166
}
162167
for (final ShardIterator shardIt : shardsIts) {
163-
final ShardRouting shard = shardIt.reset().nextActiveOrNull();
168+
final ShardRouting shard = shardIt.firstActiveOrNull();
164169
if (shard != null) {
165170
if (shard.currentNodeId().equals(nodes.localNodeId())) {
166171
if (localAsync) {
167172
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
168173
@Override public void run() {
169-
performFirstPhase(shardIt.reset());
174+
performFirstPhase(shardIt);
170175
}
171176
});
172177
} else {
173-
performFirstPhase(shardIt.reset());
178+
performFirstPhase(shardIt);
174179
}
175180
}
176181
}

modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,11 @@
3232
import org.elasticsearch.common.Nullable;
3333
import org.elasticsearch.common.settings.Settings;
3434
import org.elasticsearch.threadpool.ThreadPool;
35-
import org.elasticsearch.transport.*;
35+
import org.elasticsearch.transport.BaseTransportRequestHandler;
36+
import org.elasticsearch.transport.BaseTransportResponseHandler;
37+
import org.elasticsearch.transport.TransportChannel;
38+
import org.elasticsearch.transport.TransportException;
39+
import org.elasticsearch.transport.TransportService;
3640

3741
import java.util.concurrent.atomic.AtomicInteger;
3842
import java.util.concurrent.atomic.AtomicReferenceArray;
@@ -101,6 +105,10 @@ protected ShardRouting nextShardOrNull(ShardIterator shardIt) {
101105
return shardIt.nextActiveOrNull();
102106
}
103107

108+
protected ShardRouting firstShardOrNull(ShardIterator shardIt) {
109+
return shardIt.firstActiveOrNull();
110+
}
111+
104112
/**
105113
* Allows to override how shard routing is iterated over. Default implementation uses
106114
* {@link org.elasticsearch.cluster.routing.ShardIterator#hasNextActive()}.
@@ -169,13 +177,13 @@ public void start() {
169177
// count the local operations, and perform the non local ones
170178
int localOperations = 0;
171179
for (final ShardIterator shardIt : shardsIts) {
172-
final ShardRouting shard = nextShardOrNull(shardIt);
180+
final ShardRouting shard = firstShardOrNull(shardIt);
173181
if (shard != null) {
174182
if (shard.currentNodeId().equals(nodes.localNodeId())) {
175183
localOperations++;
176184
} else {
177185
// do the remote operation here, the localAsync flag is not relevant
178-
performOperation(shardIt.reset(), true);
186+
performOperation(shardIt, true);
179187
}
180188
} else {
181189
// really, no shards active in this group
@@ -189,10 +197,10 @@ public void start() {
189197
threadPool.executor(executor).execute(new Runnable() {
190198
@Override public void run() {
191199
for (final ShardIterator shardIt : shardsIts) {
192-
final ShardRouting shard = nextShardOrNull(shardIt.reset());
200+
final ShardRouting shard = firstShardOrNull(shardIt);
193201
if (shard != null) {
194202
if (shard.currentNodeId().equals(nodes.localNodeId())) {
195-
performOperation(shardIt.reset(), false);
203+
performOperation(shardIt, false);
196204
}
197205
}
198206
}
@@ -204,10 +212,10 @@ public void start() {
204212
request.beforeLocalFork();
205213
}
206214
for (final ShardIterator shardIt : shardsIts) {
207-
final ShardRouting shard = nextShardOrNull(shardIt.reset());
215+
final ShardRouting shard = firstShardOrNull(shardIt);
208216
if (shard != null) {
209217
if (shard.currentNodeId().equals(nodes.localNodeId())) {
210-
performOperation(shardIt.reset(), localAsync);
218+
performOperation(shardIt, localAsync);
211219
}
212220
}
213221
}

modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,18 @@ public PlainShardsIterator(List<ShardRouting> shards, int index) {
122122
return null;
123123
}
124124

125+
@Override public ShardRouting firstActiveOrNull() {
126+
int counter = 0;
127+
int index = this.origIndex;
128+
while (counter++ < size()) {
129+
ShardRouting shardRouting = shardModulo(index++);
130+
if (shardRouting.active()) {
131+
return shardRouting;
132+
}
133+
}
134+
return null;
135+
}
136+
125137
@Override public int sizeAssigned() {
126138
int shardsAssigned = 0;
127139
for (ShardRouting shardRouting : shards) {
@@ -168,6 +180,18 @@ public PlainShardsIterator(List<ShardRouting> shards, int index) {
168180
return null;
169181
}
170182

183+
@Override public ShardRouting firstAssignedOrNull() {
184+
int counter = 0;
185+
int index = this.origIndex;
186+
while (counter++ < size()) {
187+
ShardRouting shardRouting = shardModulo(index++);
188+
if (shardRouting.assignedToNode()) {
189+
return shardRouting;
190+
}
191+
}
192+
return null;
193+
}
194+
171195
ShardRouting shardModulo(int counter) {
172196
return shards.get((counter % size()));
173197
}

modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ShardsIterator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ public interface ShardsIterator extends Iterable<ShardRouting>, Iterator<ShardRo
6767
*/
6868
ShardRouting nextActiveOrNull();
6969

70+
ShardRouting firstActiveOrNull();
71+
7072
/**
7173
* The number of assigned shard routing instances.
7274
*
@@ -95,6 +97,8 @@ public interface ShardsIterator extends Iterable<ShardRouting>, Iterator<ShardRo
9597
*/
9698
ShardRouting nextAssignedOrNull();
9799

100+
ShardRouting firstAssignedOrNull();
101+
98102
int hashCode();
99103

100104
boolean equals(Object other);

0 commit comments

Comments
 (0)