Skip to content

Commit 38d10d1

Browse files
committed
Add prefer_local flag to analyze and percolate request, closes elastic#625.
1 parent ff34785 commit 38d10d1

File tree

12 files changed

+143
-37
lines changed

12 files changed

+143
-37
lines changed

modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/analyze/AnalyzeRequest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,15 @@ public String analyzer() {
7979
return this.analyzer;
8080
}
8181

82+
/**
83+
* if this operation hits a node with a local relevant shard, should it be preferred
84+
* to be executed on, or just do plain round robin. Defaults to <tt>true</tt>
85+
*/
86+
@Override public AnalyzeRequest preferLocal(boolean preferLocal) {
87+
super.preferLocal(preferLocal);
88+
return this;
89+
}
90+
8291
@Override public ActionRequestValidationException validate() {
8392
ActionRequestValidationException validationException = super.validate();
8493
if (index == null) {

modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportAnalyzeAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public class TransportAnalyzeAction extends TransportSingleCustomOperationAction
7575

7676
@Override protected ShardsIterator shards(ClusterState clusterState, AnalyzeRequest request) {
7777
request.index(clusterState.metaData().concreteIndex(request.index()));
78-
return clusterState.routingTable().index(request.index()).allShardsIt();
78+
return clusterState.routingTable().index(request.index()).randomAllShardsIt();
7979
}
8080

8181
@Override protected AnalyzeResponse shardOperation(AnalyzeRequest request, int shardId) throws ElasticSearchException {

modules/elasticsearch/src/main/java/org/elasticsearch/action/percolate/PercolateRequest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,15 @@ public PercolateRequest source(byte[] source) {
150150
return this;
151151
}
152152

153+
/**
154+
* if this operation hits a node with a local relevant shard, should it be preferred
155+
* to be executed on, or just do plain round robin. Defaults to <tt>true</tt>
156+
*/
157+
@Override public PercolateRequest preferLocal(boolean preferLocal) {
158+
super.preferLocal(preferLocal);
159+
return this;
160+
}
161+
153162
@Override public ActionRequestValidationException validate() {
154163
ActionRequestValidationException validationException = super.validate();
155164
if (index == null) {

modules/elasticsearch/src/main/java/org/elasticsearch/action/percolate/TransportPercolateAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public class TransportPercolateAction extends TransportSingleCustomOperationActi
6565

6666
@Override protected ShardsIterator shards(ClusterState clusterState, PercolateRequest request) {
6767
request.index(clusterState.metaData().concreteIndex(request.index()));
68-
return clusterState.routingTable().index(request.index()).allShardsIt();
68+
return clusterState.routingTable().index(request.index()).randomAllShardsIt();
6969
}
7070

7171
@Override protected PercolateResponse shardOperation(PercolateRequest request, int shardId) throws ElasticSearchException {

modules/elasticsearch/src/main/java/org/elasticsearch/action/support/single/custom/SingleCustomOperationRequest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public abstract class SingleCustomOperationRequest implements ActionRequest {
3333

3434
private boolean threadedListener = false;
3535
private boolean threadedOperation = true;
36+
private boolean preferLocal = true;
3637

3738
protected SingleCustomOperationRequest() {
3839
}
@@ -68,15 +69,34 @@ public SingleCustomOperationRequest operationThreaded(boolean threadedOperation)
6869
return this;
6970
}
7071

72+
/**
73+
* if this operation hits a node with a local relevant shard, should it be preferred
74+
* to be executed on, or just do plain round robin. Defaults to <tt>true</tt>
75+
*/
76+
public SingleCustomOperationRequest preferLocal(boolean preferLocal) {
77+
this.preferLocal = preferLocal;
78+
return this;
79+
}
80+
81+
/**
82+
* if this operation hits a node with a local relevant shard, should it be preferred
83+
* to be executed on, or just do plain round robin. Defaults to <tt>true</tt>
84+
*/
85+
public boolean preferLocalShard() {
86+
return this.preferLocal;
87+
}
88+
7189
public void beforeLocalFork() {
7290

7391
}
7492

7593
@Override public void readFrom(StreamInput in) throws IOException {
7694
// no need to pass threading over the network, they are always false when coming throw a thread pool
95+
preferLocal = in.readBoolean();
7796
}
7897

7998
@Override public void writeTo(StreamOutput out) throws IOException {
99+
out.writeBoolean(preferLocal);
80100
}
81101
}
82102

modules/elasticsearch/src/main/java/org/elasticsearch/action/support/single/custom/TransportSingleCustomOperationAction.java

Lines changed: 67 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -116,40 +116,44 @@ private void onFailure(ShardRouting shardRouting, Exception e) {
116116
* First get should try and use a shard that exists on a local node for better performance
117117
*/
118118
private void performFirst() {
119-
while (shardsIt.hasNextActive()) {
120-
final ShardRouting shard = shardsIt.nextActive();
121-
if (shard.currentNodeId().equals(nodes.localNodeId())) {
122-
if (request.operationThreaded()) {
123-
request.beforeLocalFork();
124-
threadPool.execute(new Runnable() {
125-
@Override public void run() {
126-
try {
127-
Response response = shardOperation(request, shard.id());
128-
listener.onResponse(response);
129-
} catch (Exception e) {
130-
onFailure(shard, e);
131-
}
132-
}
133-
});
134-
return;
135-
} else {
136-
try {
137-
final Response response = shardOperation(request, shard.id());
138-
if (request.listenerThreaded()) {
139-
threadPool.execute(new Runnable() {
140-
@Override public void run() {
119+
if (request.preferLocalShard()) {
120+
while (shardsIt.hasNextActive()) {
121+
final ShardRouting shard = shardsIt.nextActive();
122+
if (shard.currentNodeId().equals(nodes.localNodeId())) {
123+
if (request.operationThreaded()) {
124+
request.beforeLocalFork();
125+
threadPool.execute(new Runnable() {
126+
@Override public void run() {
127+
try {
128+
Response response = shardOperation(request, shard.id());
141129
listener.onResponse(response);
130+
} catch (Exception e) {
131+
onFailure(shard, e);
142132
}
143-
});
144-
} else {
145-
listener.onResponse(response);
146-
}
133+
}
134+
});
147135
return;
148-
} catch (Exception e) {
149-
onFailure(shard, e);
136+
} else {
137+
try {
138+
final Response response = shardOperation(request, shard.id());
139+
if (request.listenerThreaded()) {
140+
threadPool.execute(new Runnable() {
141+
@Override public void run() {
142+
listener.onResponse(response);
143+
}
144+
});
145+
} else {
146+
listener.onResponse(response);
147+
}
148+
return;
149+
} catch (Exception e) {
150+
onFailure(shard, e);
151+
}
150152
}
151153
}
152154
}
155+
} else {
156+
perform(null);
153157
}
154158
if (!shardsIt.hasNextActive()) {
155159
// no local node get, go remote
@@ -162,7 +166,41 @@ private void perform(final Exception lastException) {
162166
while (shardsIt.hasNextActive()) {
163167
final ShardRouting shard = shardsIt.nextActive();
164168
// no need to check for local nodes, we tried them already in performFirstGet
165-
if (!shard.currentNodeId().equals(nodes.localNodeId())) {
169+
if (shard.currentNodeId().equals(nodes.localNodeId())) {
170+
// we don't prefer local shard, so try and do it here
171+
if (!request.preferLocalShard()) {
172+
if (request.operationThreaded()) {
173+
request.beforeLocalFork();
174+
threadPool.execute(new Runnable() {
175+
@Override public void run() {
176+
try {
177+
Response response = shardOperation(request, shard.id());
178+
listener.onResponse(response);
179+
} catch (Exception e) {
180+
onFailure(shard, e);
181+
}
182+
}
183+
});
184+
return;
185+
} else {
186+
try {
187+
final Response response = shardOperation(request, shard.id());
188+
if (request.listenerThreaded()) {
189+
threadPool.execute(new Runnable() {
190+
@Override public void run() {
191+
listener.onResponse(response);
192+
}
193+
});
194+
} else {
195+
listener.onResponse(response);
196+
}
197+
return;
198+
} catch (Exception e) {
199+
onFailure(shard, e);
200+
}
201+
}
202+
}
203+
} else {
166204
DiscoveryNode node = nodes.get(shard.currentNodeId());
167205
transportService.sendRequest(node, transportShardAction(), new ShardSingleOperationRequest(request, shard.id()), new BaseTransportResponseHandler<Response>() {
168206
@Override public Response newInstance() {

modules/elasticsearch/src/main/java/org/elasticsearch/client/action/admin/indices/analyze/AnalyzeRequestBuilder.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,15 @@ public AnalyzeRequestBuilder setAnalyzer(String analyzer) {
4444
return this;
4545
}
4646

47+
/**
48+
* if this operation hits a node with a local relevant shard, should it be preferred
49+
* to be executed on, or just do plain round robin. Defaults to <tt>true</tt>
50+
*/
51+
public AnalyzeRequestBuilder setPreferLocal(boolean preferLocal) {
52+
request.preferLocal(preferLocal);
53+
return this;
54+
}
55+
4756
@Override protected void doExecute(ActionListener<AnalyzeResponse> listener) {
4857
client.analyze(request, listener);
4958
}

modules/elasticsearch/src/main/java/org/elasticsearch/client/action/percolate/PercolateRequestBuilder.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,15 @@ public PercolateRequestBuilder setListenerThreaded(boolean listenerThreaded) {
128128
return this;
129129
}
130130

131+
/**
132+
* if this operation hits a node with a local relevant shard, should it be preferred
133+
* to be executed on, or just do plain round robin. Defaults to <tt>true</tt>
134+
*/
135+
public PercolateRequestBuilder setPreferLocal(boolean preferLocal) {
136+
request.preferLocal(preferLocal);
137+
return this;
138+
}
139+
131140
/**
132141
* Controls if the operation will be executed on a separate thread when executed locally. Defaults
133142
* to <tt>true</tt> when running in embedded mode.

modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ public List<ShardRouting> shardsWithState(ShardRoutingState... states) {
153153
/**
154154
* An iterator over all shards (including replicas).
155155
*/
156-
public ShardsIterator allShardsIt() {
156+
public ShardsIterator randomAllShardsIt() {
157157
return new PlainShardsIterator(allShards, Math.abs(counter.incrementAndGet()));
158158
}
159159

modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/analyze/RestAnalyzeAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public class RestAnalyzeAction extends BaseRestHandler {
6161
}
6262

6363
AnalyzeRequest analyzeRequest = new AnalyzeRequest(request.param("index"), text);
64+
analyzeRequest.preferLocal(request.paramAsBoolean("prefer_local", analyzeRequest.preferLocalShard()));
6465
analyzeRequest.analyzer(request.param("analyzer"));
6566
client.admin().indices().analyze(analyzeRequest, new ActionListener<AnalyzeResponse>() {
6667
@Override public void onResponse(AnalyzeResponse response) {

modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/percolate/RestPercolateAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public class RestPercolateAction extends BaseRestHandler {
5555
// we don't spawn, then fork if local
5656
percolateRequest.operationThreaded(true);
5757

58+
percolateRequest.preferLocal(request.paramAsBoolean("prefer_local", percolateRequest.preferLocalShard()));
5859
client.percolate(percolateRequest, new ActionListener<PercolateResponse>() {
5960
@Override public void onResponse(PercolateResponse response) {
6061
try {

modules/test/integration/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,21 @@ protected Client getClient() {
101101
.execute().actionGet();
102102
client.admin().cluster().prepareHealth().setWaitForGreenStatus().setWaitForActiveShards(4).execute().actionGet();
103103

104-
PercolateResponse percolate = client.preparePercolate("test").setSource(jsonBuilder().startObject().startObject("doc").startObject("type1")
105-
.field("field1", "value1")
106-
.endObject().endObject().endObject())
107-
.execute().actionGet();
108-
assertThat(percolate.matches().size(), equalTo(1));
104+
for (int i = 0; i < 10; i++) {
105+
PercolateResponse percolate = client.preparePercolate("test").setSource(jsonBuilder().startObject().startObject("doc").startObject("type1")
106+
.field("field1", "value1")
107+
.endObject().endObject().endObject())
108+
.execute().actionGet();
109+
assertThat(percolate.matches().size(), equalTo(1));
110+
}
111+
112+
for (int i = 0; i < 10; i++) {
113+
PercolateResponse percolate = client.preparePercolate("test").setPreferLocal(false).setSource(jsonBuilder().startObject().startObject("doc").startObject("type1")
114+
.field("field1", "value1")
115+
.endObject().endObject().endObject())
116+
.execute().actionGet();
117+
assertThat(percolate.matches().size(), equalTo(1));
118+
}
109119
}
110120

111121
@Test public void dynamicAddingRemovingQueries() throws Exception {

0 commit comments

Comments
 (0)