Skip to content

Commit 7fc24ff

Browse files
author
Tom May
committed
Move LruClient to org.elasticsearch.client.node.lru.
1 parent d0e5ba1 commit 7fc24ff

File tree

4 files changed

+278
-3
lines changed

4 files changed

+278
-3
lines changed

config/elasticsearch.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# use our LruIndex manager
22
client:
3-
type: org.elasticsearch.plugin.lruclient.LruClient
3+
type: lru
44
lru:
55
size: 10
66

modules/elasticsearch/src/main/java/org/elasticsearch/client/node/NodeClientModule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,6 @@ public NodeClientModule(Settings settings) {
4040
bind(ClusterAdminClient.class).to(NodeClusterAdminClient.class).asEagerSingleton();
4141
bind(IndicesAdminClient.class).to(NodeIndicesAdminClient.class).asEagerSingleton();
4242
bind(AdminClient.class).to(NodeAdminClient.class).asEagerSingleton();
43-
bind(Client.class).to(settings.getAsClass("client.type", NodeClient.class, "org.elasticsearch.client.node", "Client")).asEagerSingleton();
43+
bind(Client.class).to(settings.getAsClass("client.type", NodeClient.class, "org.elasticsearch.client.node.", "NodeClient")).asEagerSingleton();
4444
}
4545
}

plugins/client/lru/src/main/java/org/elasticsearch/plugin/lruclient/LruClient.java renamed to plugins/client/lru/src/main/java/org/elasticsearch/client/node/lru/LruClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* under the License.
1818
*/
1919

20-
package org.elasticsearch.plugin.lruclient;
20+
package org.elasticsearch.client.lru;
2121

2222
import java.util.Collections;
2323
import java.util.Date;
Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
/*
2+
* Licensed to Elastic Search and Shay Banon under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. Elastic Search licenses this
6+
* file to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.client.node.lru;
21+
22+
import java.util.Collections;
23+
import java.util.Date;
24+
import java.util.HashSet;
25+
import java.util.LinkedHashMap;
26+
import java.util.Map;
27+
import java.util.Map.Entry;
28+
import java.util.Set;
29+
import java.util.concurrent.TimeUnit;
30+
import org.elasticsearch.action.ActionFuture;
31+
import org.elasticsearch.action.ActionListener;
32+
import org.elasticsearch.action.ActionRequest;
33+
import org.elasticsearch.action.admin.indices.status.ShardStatus;
34+
import org.elasticsearch.action.bulk.BulkRequest;
35+
import org.elasticsearch.action.bulk.BulkResponse;
36+
import org.elasticsearch.action.bulk.TransportBulkAction;
37+
import org.elasticsearch.action.count.CountRequest;
38+
import org.elasticsearch.action.count.CountResponse;
39+
import org.elasticsearch.action.count.TransportCountAction;
40+
import org.elasticsearch.action.delete.DeleteRequest;
41+
import org.elasticsearch.action.delete.DeleteResponse;
42+
import org.elasticsearch.action.delete.TransportDeleteAction;
43+
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequest;
44+
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
45+
import org.elasticsearch.action.deletebyquery.TransportDeleteByQueryAction;
46+
import org.elasticsearch.action.get.GetRequest;
47+
import org.elasticsearch.action.get.GetResponse;
48+
import org.elasticsearch.action.get.TransportGetAction;
49+
import org.elasticsearch.action.index.IndexRequest;
50+
import org.elasticsearch.action.index.IndexResponse;
51+
import org.elasticsearch.action.index.TransportIndexAction;
52+
import org.elasticsearch.action.mlt.MoreLikeThisRequest;
53+
import org.elasticsearch.action.mlt.TransportMoreLikeThisAction;
54+
import org.elasticsearch.action.search.*;
55+
import org.elasticsearch.client.node.NodeAdminClient;
56+
import org.elasticsearch.client.node.NodeClient;
57+
import org.elasticsearch.common.inject.Inject;
58+
import org.elasticsearch.common.logging.ESLogger;
59+
import org.elasticsearch.common.logging.Loggers;
60+
import org.elasticsearch.common.settings.Settings;
61+
import org.elasticsearch.index.shard.IndexShardState;
62+
import org.elasticsearch.indices.IndexMissingException;
63+
import org.elasticsearch.threadpool.ThreadPool;
64+
65+
public class LruNodeClient extends NodeClient {
66+
67+
private final ESLogger logger;
68+
private final Map cache;
69+
70+
private class LruCache extends LinkedHashMap<String, Date> {
71+
private int capacity;
72+
public LruCache(int capacity) {
73+
super(capacity,0.75f,true);
74+
this.capacity = capacity;
75+
}
76+
77+
@Override
78+
protected boolean removeEldestEntry(Entry<String, Date> eldest) {
79+
if(size()>capacity) {
80+
logger.debug("LruCache over capacity: {} removing: {}", this.capacity, eldest.getKey());
81+
closeIndex(eldest.getKey());
82+
return true;
83+
}
84+
return false;
85+
}
86+
87+
}
88+
89+
@Inject public LruNodeClient(Settings settings, ThreadPool threadPool, NodeAdminClient admin,
90+
TransportIndexAction indexAction, TransportDeleteAction deleteAction, TransportBulkAction bulkAction,
91+
TransportDeleteByQueryAction deleteByQueryAction, TransportGetAction getAction, TransportCountAction countAction,
92+
TransportSearchAction searchAction, TransportSearchScrollAction searchScrollAction,
93+
TransportMoreLikeThisAction moreLikeThisAction) {
94+
super(settings,threadPool,admin,indexAction,deleteAction,bulkAction,deleteByQueryAction,getAction,countAction,searchAction,searchScrollAction,moreLikeThisAction);
95+
this.logger = Loggers.getLogger(getClass());
96+
this.logger.debug("Lru cache size = " + settings.getAsInt("client.lru.size", 1));
97+
this.cache = Collections.synchronizedMap(new LruCache(settings.getAsInt("client.lru.size", 1)));
98+
}
99+
100+
private void ensureOpen(final String index) {
101+
synchronized(index.intern()) {
102+
if(!this.cache.containsKey(index)) {
103+
logger.debug("Index {} not found! Try to open...", index);
104+
openIndex(index);
105+
}
106+
this.cache.put(index, new Date());
107+
}
108+
}
109+
110+
private void ensureOpen(final String [] indices) {
111+
for(int i = 0; i<indices.length; i++)
112+
ensureOpen(indices[i]);
113+
}
114+
115+
private void openIndex(final String index) {
116+
try {
117+
admin().indices().prepareOpen(index).execute().actionGet();
118+
waitForReadyState(index);
119+
}
120+
catch(IndexMissingException e) {
121+
logger.debug("Index {} does not exist! Can't open.", index);
122+
}
123+
}
124+
125+
private void waitForReadyState(final String index) {
126+
boolean notReady = true;
127+
long stop=System.nanoTime()+TimeUnit.SECONDS.toNanos(10);
128+
while(notReady) {
129+
if(stop<System.nanoTime()) {
130+
logger.debug("Timeout waiting for status of index {}",index);
131+
break;
132+
}
133+
// need to wait till all shards are allocated!
134+
logger.debug("Checking for state in index {}",index);
135+
ShardStatus [] stats = admin().indices().prepareStatus(new String[]{index}).execute().actionGet().getShards();
136+
logger.debug("Checking for state in index {} with # shards: {}",index,stats.length);
137+
if(stats.length<1) {
138+
logger.debug("No shards found for index {}!!",index);
139+
try { Thread.sleep(500); } catch(InterruptedException e) {}
140+
continue;
141+
}
142+
notReady = false;
143+
for(int i=0;i<stats.length;i++) {
144+
logger.debug("Index {} Shard {} State {}",index,stats[i].getShardId(),stats[i].getState());
145+
if(stats[i].getState() != IndexShardState.STARTED) {
146+
try { Thread.sleep(500); } catch(InterruptedException e) {}
147+
notReady = true;
148+
break;
149+
}
150+
}
151+
}
152+
logger.debug("Index {} all shards STARTED!",index);
153+
}
154+
155+
private void closeIndex(final String index) {
156+
// let happen in the background
157+
logger.debug("Closing index: {} ...", index);
158+
admin().indices().prepareClose(index).execute().actionGet();
159+
}
160+
161+
@Override public void index(IndexRequest request, ActionListener<IndexResponse> listener) {
162+
logger.debug("index");
163+
ensureOpen(request.index());
164+
super.index(request,listener);
165+
}
166+
167+
@Override public ActionFuture<DeleteResponse> delete(DeleteRequest request) {
168+
logger.debug("delete");
169+
ensureOpen(request.index());
170+
return super.delete(request);
171+
}
172+
173+
@Override public void delete(DeleteRequest request, ActionListener<DeleteResponse> listener) {
174+
logger.debug("delete");
175+
ensureOpen(request.index());
176+
super.delete(request, listener);
177+
}
178+
179+
@Override public ActionFuture<BulkResponse> bulk(BulkRequest request) {
180+
logger.debug("bulk");
181+
ensureOpen(getIndices(request));
182+
return super.bulk(request);
183+
}
184+
185+
@Override public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
186+
logger.debug("bulk");
187+
ensureOpen(getIndices(request));
188+
super.bulk(request,listener);
189+
}
190+
191+
@Override public ActionFuture<DeleteByQueryResponse> deleteByQuery(DeleteByQueryRequest request) {
192+
logger.debug("deleteByQuery");
193+
ensureOpen(request.indices());
194+
return super.deleteByQuery(request);
195+
}
196+
197+
@Override public void deleteByQuery(DeleteByQueryRequest request, ActionListener<DeleteByQueryResponse> listener) {
198+
logger.debug("deleteByQuery");
199+
ensureOpen(request.indices());
200+
super.deleteByQuery(request,listener);
201+
}
202+
203+
@Override public ActionFuture<GetResponse> get(GetRequest request) {
204+
logger.debug("get");
205+
ensureOpen(request.index());
206+
return super.get(request);
207+
}
208+
209+
@Override public void get(GetRequest request, ActionListener<GetResponse> listener) {
210+
logger.debug("get");
211+
ensureOpen(request.index());
212+
super.get(request, listener);
213+
}
214+
215+
@Override public ActionFuture<CountResponse> count(CountRequest request) {
216+
logger.debug("count");
217+
ensureOpen(request.indices());
218+
return super.count(request);
219+
}
220+
221+
@Override public void count(CountRequest request, ActionListener<CountResponse> listener) {
222+
logger.debug("count");
223+
ensureOpen(request.indices());
224+
super.count(request, listener);
225+
}
226+
227+
@Override public ActionFuture<SearchResponse> search(SearchRequest request) {
228+
logger.debug("search");
229+
ensureOpen(request.indices());
230+
return super.search(request);
231+
}
232+
233+
@Override public void search(SearchRequest request, ActionListener<SearchResponse> listener) {
234+
logger.debug("search");
235+
ensureOpen(request.indices());
236+
super.search(request, listener);
237+
}
238+
239+
@Override public ActionFuture<SearchResponse> searchScroll(SearchScrollRequest request) {
240+
logger.debug("searchScroll");
241+
return super.searchScroll(request);
242+
}
243+
244+
@Override public void searchScroll(SearchScrollRequest request, ActionListener<SearchResponse> listener) {
245+
logger.debug("searchScroll");
246+
super.searchScroll(request, listener);
247+
}
248+
249+
@Override public ActionFuture<SearchResponse> moreLikeThis(MoreLikeThisRequest request) {
250+
logger.debug("moreLikeThis");
251+
ensureOpen(request.index());
252+
return super.moreLikeThis(request);
253+
}
254+
255+
@Override public void moreLikeThis(MoreLikeThisRequest request, ActionListener<SearchResponse> listener) {
256+
logger.debug("moreLikeThis");
257+
ensureOpen(request.index());
258+
super.moreLikeThis(request, listener);
259+
}
260+
261+
private String [] getIndices(BulkRequest request) {
262+
Set<String> indices = new HashSet<String>();
263+
for(ActionRequest r : request.requests) {
264+
if(r instanceof IndexRequest) {
265+
indices.add(((IndexRequest)r).index());
266+
}
267+
else if(r instanceof DeleteRequest) {
268+
indices.add(((DeleteRequest)r).index());
269+
}
270+
}
271+
return indices.toArray(new String[indices.size()]);
272+
}
273+
274+
275+
}

0 commit comments

Comments
 (0)