Skip to content

Commit 4b101ea

Browse files
author
Matt Hartzler
committed
better handling of method missing errors which can occur due to LRU cache being out of date in regards to indexes it thinks are open but have been closed on another node.
1 parent a7c07ab commit 4b101ea

File tree

2 files changed

+264
-136
lines changed

2 files changed

+264
-136
lines changed
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package org.elasticsearch.plugin.lruclient;
2+
3+
import java.util.Collections;
4+
import java.util.Date;
5+
import java.util.LinkedHashMap;
6+
import java.util.Map;
7+
import java.util.Map.Entry;
8+
import java.util.concurrent.TimeUnit;
9+
import org.elasticsearch.action.ActionFuture;
10+
import org.elasticsearch.action.ActionResponse;
11+
import org.elasticsearch.action.admin.indices.status.ShardStatus;
12+
import org.elasticsearch.client.node.NodeAdminClient;
13+
import org.elasticsearch.common.logging.Loggers;
14+
import org.elasticsearch.common.logging.ESLogger;
15+
import org.elasticsearch.index.shard.IndexShardState;
16+
import org.elasticsearch.indices.IndexMissingException;
17+
18+
/**
19+
*
20+
* @author matt
21+
*/
22+
public class LruIndexManager {
23+
public static final int MAX_OPEN_RETRIES = 3;
24+
25+
private Map cache;
26+
private NodeAdminClient admin;
27+
private final ESLogger logger = Loggers.getLogger(getClass());
28+
29+
public LruIndexManager(int cache_size, NodeAdminClient admin) {
30+
this.cache = Collections.synchronizedMap(new LruCache(cache_size));
31+
this.admin = admin;
32+
}
33+
34+
public interface IndexOperation {
35+
public void perform();
36+
}
37+
38+
public interface AsyncIndexOperation {
39+
public ActionFuture<? extends ActionResponse> perform();
40+
}
41+
42+
43+
public ActionFuture<? extends ActionResponse> ensureOpen(final String index, AsyncIndexOperation op) {
44+
return ensureOpenAndPerform(new String[]{index},op);
45+
}
46+
47+
public ActionFuture<? extends ActionResponse> ensureOpen(final String [] indices, AsyncIndexOperation op) {
48+
return ensureOpenAndPerform(indices,op);
49+
}
50+
51+
public void ensureOpen(final String index, IndexOperation op) {
52+
ensureOpenAndPerform(new String[]{index},op);
53+
}
54+
55+
public void ensureOpen(final String [] indices, IndexOperation op) {
56+
ensureOpenAndPerform(indices,op);
57+
}
58+
59+
// private
60+
61+
private ActionFuture<? extends ActionResponse> ensureOpenAndPerform(String [] indices, Object op) {
62+
int tries = 0;
63+
boolean force = false;
64+
while(true) {
65+
66+
// make sure in cache
67+
for(int i = 0; i<indices.length; i++)
68+
openIfNotCached(indices[i],force);
69+
70+
try {
71+
if(op instanceof AsyncIndexOperation)
72+
return ((AsyncIndexOperation)op).perform();
73+
else {
74+
((IndexOperation)op).perform();
75+
return null;
76+
}
77+
78+
}
79+
catch(IndexMissingException e) {
80+
if(tries++>MAX_OPEN_RETRIES)
81+
throw(e);
82+
else
83+
force = true; // make sure we eject the index
84+
}
85+
}
86+
}
87+
88+
private void openIfNotCached(final String index, boolean force_eject) {
89+
synchronized(index.intern()) {
90+
if(force_eject)
91+
this.cache.remove(index);
92+
93+
if(!this.cache.containsKey(index)) {
94+
logger.debug("Index {} not found! Try to open...", index);
95+
openIndex(index);
96+
}
97+
this.cache.put(index, new Date());
98+
}
99+
}
100+
101+
private class LruCache extends LinkedHashMap<String, Date> {
102+
private int capacity;
103+
public LruCache(int capacity) {
104+
super(capacity,0.75f,true);
105+
this.capacity = capacity;
106+
}
107+
108+
@Override
109+
protected boolean removeEldestEntry(Entry<String, Date> eldest) {
110+
if(size()>capacity) {
111+
logger.debug("LruCache over capacity: {} removing: {}", this.capacity, eldest.getKey());
112+
closeIndex(eldest.getKey());
113+
return true;
114+
}
115+
return false;
116+
}
117+
118+
}
119+
120+
private void openIndex(final String index) {
121+
try {
122+
admin.indices().prepareOpen(index).execute().actionGet();
123+
waitForReadyState(index);
124+
}
125+
catch(IndexMissingException e) {
126+
logger.debug("Index {} does not exist! Can't open.", index);
127+
}
128+
}
129+
130+
private void waitForReadyState(final String index) {
131+
boolean notReady = true;
132+
long stop=System.nanoTime()+TimeUnit.SECONDS.toNanos(10);
133+
while(notReady) {
134+
if(stop<System.nanoTime()) {
135+
logger.debug("Timeout waiting for status of index {}",index);
136+
break;
137+
}
138+
// need to wait till all shards are allocated!
139+
logger.debug("Checking for state in index {}",index);
140+
ShardStatus [] stats = admin.indices().prepareStatus(new String[]{index}).execute().actionGet().getShards();
141+
logger.debug("Checking for state in index {} with # shards: {}",index,stats.length);
142+
if(stats.length<1) {
143+
logger.debug("No shards found for index {}!!",index);
144+
try { Thread.sleep(500); } catch(InterruptedException e) {}
145+
continue;
146+
}
147+
notReady = false;
148+
for(int i=0;i<stats.length;i++) {
149+
logger.debug("Index {} Shard {} State {}",index,stats[i].getShardId(),stats[i].getState());
150+
if(stats[i].getState() != IndexShardState.STARTED) {
151+
try { Thread.sleep(500); } catch(InterruptedException e) {}
152+
notReady = true;
153+
break;
154+
}
155+
}
156+
}
157+
logger.debug("Index {} all shards STARTED!",index);
158+
}
159+
160+
private void closeIndex(final String index) {
161+
// let happen in the background
162+
logger.debug("Closing index: {} ...", index);
163+
admin.indices().prepareClose(index).execute().actionGet();
164+
}
165+
166+
167+
}

0 commit comments

Comments
 (0)