Skip to content

Commit 3abda3a

Browse files
committed
add a merge thread pool that only does async merges (optimize), so it can be controlled by itself
1 parent 4d703a6 commit 3abda3a

File tree

2 files changed

+13
-11
lines changed

2 files changed

+13
-11
lines changed

modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
9191
private volatile IndexShardState state;
9292

9393
private final TimeValue refreshInterval;
94-
private final TimeValue optimizeInterval;
94+
private final TimeValue mergeInterval;
9595

9696
private volatile ScheduledFuture refreshScheduledFuture;
9797

98-
private volatile ScheduledFuture optimizeScheduleFuture;
98+
private volatile ScheduledFuture mergeScheduleFuture;
9999

100100
private volatile ShardRouting shardRouting;
101101

@@ -121,7 +121,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
121121
} else {
122122
refreshInterval = new TimeValue(-2);
123123
}
124-
optimizeInterval = indexSettings.getAsTime("index.merge.async_interval", TimeValue.timeValueSeconds(1));
124+
mergeInterval = indexSettings.getAsTime("index.merge.async_interval", TimeValue.timeValueSeconds(1));
125125

126126
logger.debug("state: [CREATED]");
127127

@@ -436,9 +436,9 @@ public void close(String reason) {
436436
refreshScheduledFuture.cancel(true);
437437
refreshScheduledFuture = null;
438438
}
439-
if (optimizeScheduleFuture != null) {
440-
optimizeScheduleFuture.cancel(true);
441-
optimizeScheduleFuture = null;
439+
if (mergeScheduleFuture != null) {
440+
mergeScheduleFuture.cancel(true);
441+
mergeScheduleFuture = null;
442442
}
443443
}
444444
logger.debug("state: [{}]->[{}], reason [{}]", state, IndexShardState.CLOSED, reason);
@@ -553,9 +553,9 @@ private void startScheduledTasksIfNeeded() {
553553
// since we can do async merging, it will not be called explicitly when indexing (adding / deleting docs), and only when flushing
554554
// so, make sure we periodically call it, this need to be a small enough value so mergine will actually
555555
// happen and reduce the number of segments
556-
if (optimizeInterval.millis() > 0) {
557-
optimizeScheduleFuture = threadPool.schedule(optimizeInterval, ThreadPool.Names.MANAGEMENT, new EngineOptimizer());
558-
logger.debug("scheduling optimizer / merger every {}", optimizeInterval);
556+
if (mergeInterval.millis() > 0) {
557+
mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.MERGE, new EngineMerger());
558+
logger.debug("scheduling optimizer / merger every {}", mergeInterval);
559559
} else {
560560
logger.debug("scheduled optimizer / merger disabled");
561561
}
@@ -606,7 +606,7 @@ private class EngineRefresher implements Runnable {
606606
}
607607
}
608608

609-
private class EngineOptimizer implements Runnable {
609+
private class EngineMerger implements Runnable {
610610
@Override public void run() {
611611
try {
612612
// -1 means maybe merge
@@ -627,7 +627,7 @@ private class EngineOptimizer implements Runnable {
627627
logger.warn("Failed to perform scheduled engine optimize/merge", e);
628628
}
629629
if (state != IndexShardState.CLOSED) {
630-
optimizeScheduleFuture = threadPool.schedule(optimizeInterval, ThreadPool.Names.MANAGEMENT, this);
630+
mergeScheduleFuture = threadPool.schedule(mergeInterval, ThreadPool.Names.MERGE, this);
631631
}
632632
}
633633
}

modules/elasticsearch/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public static class Names {
5252
public static final String SEARCH = "search";
5353
public static final String PERCOLATE = "percolate";
5454
public static final String MANAGEMENT = "management";
55+
public static final String MERGE = "merge";
5556
public static final String SNAPSHOT = "snapshot";
5657
}
5758

@@ -74,6 +75,7 @@ public ThreadPool() {
7475
executors.put(Names.SEARCH, build(Names.SEARCH, "cached", groupSettings.get(Names.SEARCH), ImmutableSettings.Builder.EMPTY_SETTINGS));
7576
executors.put(Names.PERCOLATE, build(Names.PERCOLATE, "cached", groupSettings.get(Names.PERCOLATE), ImmutableSettings.Builder.EMPTY_SETTINGS));
7677
executors.put(Names.MANAGEMENT, build(Names.MANAGEMENT, "scaling", groupSettings.get(Names.MANAGEMENT), settingsBuilder().put("keep_alive", "30s").put("size", 20).build()));
78+
executors.put(Names.MERGE, build(Names.MERGE, "scaling", groupSettings.get(Names.MERGE), settingsBuilder().put("keep_alive", "30s").put("size", 20).build()));
7779
executors.put(Names.SNAPSHOT, build(Names.SNAPSHOT, "scaling", groupSettings.get(Names.SNAPSHOT), ImmutableSettings.Builder.EMPTY_SETTINGS));
7880
executors.put(Names.SAME, MoreExecutors.sameThreadExecutor());
7981
this.executors = ImmutableMap.copyOf(executors);

0 commit comments

Comments
 (0)