Skip to content

Commit 22f0e79

Browse files
committed
use merge trigger to control when to do merges
now with merge trigger, we can simply decide when to do merges based on it
1 parent d969e61 commit 22f0e79

File tree

7 files changed

+16
-225
lines changed

7 files changed

+16
-225
lines changed

src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
5050
import org.elasticsearch.index.engine.*;
5151
import org.elasticsearch.index.indexing.ShardIndexingService;
52-
import org.elasticsearch.index.merge.policy.EnableMergePolicy;
5352
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
5453
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
5554
import org.elasticsearch.index.search.nested.IncludeNestedDocsQuery;
@@ -990,9 +989,6 @@ public void maybeMerge() throws EngineException {
990989
if (indexWriter == null) {
991990
throw new EngineClosedException(shardId, failedEngine);
992991
}
993-
if (indexWriter.getConfig().getMergePolicy() instanceof EnableMergePolicy) {
994-
((EnableMergePolicy) indexWriter.getConfig().getMergePolicy()).enableMerge();
995-
}
996992
indexWriter.maybeMerge();
997993
} catch (OutOfMemoryError e) {
998994
failEngine(e);
@@ -1006,9 +1002,6 @@ public void maybeMerge() throws EngineException {
10061002
throw new OptimizeFailedEngineException(shardId, e);
10071003
} finally {
10081004
rwl.readLock().unlock();
1009-
if (indexWriter != null && indexWriter.getConfig().getMergePolicy() instanceof EnableMergePolicy) {
1010-
((EnableMergePolicy) indexWriter.getConfig().getMergePolicy()).disableMerge();
1011-
}
10121005
}
10131006
}
10141007

@@ -1023,9 +1016,6 @@ public void optimize(Optimize optimize) throws EngineException {
10231016
if (indexWriter == null) {
10241017
throw new EngineClosedException(shardId, failedEngine);
10251018
}
1026-
if (indexWriter.getConfig().getMergePolicy() instanceof EnableMergePolicy) {
1027-
((EnableMergePolicy) indexWriter.getConfig().getMergePolicy()).enableMerge();
1028-
}
10291019
if (optimize.onlyExpungeDeletes()) {
10301020
indexWriter.forceMergeDeletes(false);
10311021
} else if (optimize.maxNumSegments() <= 0) {
@@ -1046,9 +1036,6 @@ public void optimize(Optimize optimize) throws EngineException {
10461036
throw new OptimizeFailedEngineException(shardId, e);
10471037
} finally {
10481038
rwl.readLock().unlock();
1049-
if (indexWriter != null && indexWriter.getConfig().getMergePolicy() instanceof EnableMergePolicy) {
1050-
((EnableMergePolicy) indexWriter.getConfig().getMergePolicy()).disableMerge();
1051-
}
10521039
optimizeMutex.set(false);
10531040
}
10541041
}

src/main/java/org/elasticsearch/index/merge/policy/EnableMergePolicy.java

Lines changed: 0 additions & 38 deletions
This file was deleted.

src/main/java/org/elasticsearch/index/merge/policy/LogByteSizeMergePolicyProvider.java

Lines changed: 5 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919

2020
package org.elasticsearch.index.merge.policy;
2121

22-
import org.apache.lucene.index.*;
22+
import org.apache.lucene.index.LogByteSizeMergePolicy;
23+
import org.apache.lucene.index.SegmentInfos;
2324
import org.elasticsearch.ElasticSearchException;
2425
import org.elasticsearch.cluster.metadata.IndexMetaData;
2526
import org.elasticsearch.common.Preconditions;
@@ -32,7 +33,6 @@
3233
import org.elasticsearch.index.store.Store;
3334

3435
import java.io.IOException;
35-
import java.util.Map;
3636
import java.util.Set;
3737
import java.util.concurrent.CopyOnWriteArraySet;
3838

@@ -174,62 +174,19 @@ public void close() {
174174
}
175175
}
176176

177-
public static class EnableMergeLogByteSizeMergePolicy extends CustomLogByteSizeMergePolicy implements EnableMergePolicy {
178-
179-
private final ThreadLocal<Boolean> enableMerge = new ThreadLocal<Boolean>() {
180-
@Override
181-
protected Boolean initialValue() {
182-
return Boolean.FALSE;
183-
}
184-
};
177+
public static class EnableMergeLogByteSizeMergePolicy extends CustomLogByteSizeMergePolicy {
185178

186179
public EnableMergeLogByteSizeMergePolicy(LogByteSizeMergePolicyProvider provider) {
187180
super(provider);
188181
}
189182

190-
@Override
191-
public void enableMerge() {
192-
enableMerge.set(Boolean.TRUE);
193-
}
194-
195-
@Override
196-
public void disableMerge() {
197-
enableMerge.set(Boolean.FALSE);
198-
}
199-
200-
@Override
201-
public boolean isMergeEnabled() {
202-
return enableMerge.get() == Boolean.TRUE;
203-
}
204-
205-
@Override
206-
public void close() {
207-
enableMerge.remove();
208-
super.close();
209-
}
210-
211183
@Override
212184
public MergeSpecification findMerges(MergeTrigger trigger, SegmentInfos infos) throws IOException {
213-
if (enableMerge.get() == Boolean.FALSE) {
185+
// we don't enable merges while indexing documents, we do them in the background
186+
if (trigger == MergeTrigger.SEGMENT_FLUSH) {
214187
return null;
215188
}
216189
return super.findMerges(trigger, infos);
217190
}
218-
219-
@Override
220-
public MergeSpecification findForcedMerges(SegmentInfos infos, int maxSegmentCount, Map<SegmentInfoPerCommit, Boolean> segmentsToMerge) throws IOException {
221-
if (enableMerge.get() == Boolean.FALSE) {
222-
return null;
223-
}
224-
return super.findForcedMerges(infos, maxSegmentCount, segmentsToMerge);
225-
}
226-
227-
@Override
228-
public MergeSpecification findForcedDeletesMerges(SegmentInfos infos) throws CorruptIndexException, IOException {
229-
if (enableMerge.get() == Boolean.FALSE) {
230-
return null;
231-
}
232-
return super.findForcedDeletesMerges(infos);
233-
}
234191
}
235192
}

src/main/java/org/elasticsearch/index/merge/policy/LogDocMergePolicyProvider.java

Lines changed: 5 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919

2020
package org.elasticsearch.index.merge.policy;
2121

22-
import org.apache.lucene.index.*;
22+
import org.apache.lucene.index.LogDocMergePolicy;
23+
import org.apache.lucene.index.SegmentInfos;
2324
import org.elasticsearch.ElasticSearchException;
2425
import org.elasticsearch.cluster.metadata.IndexMetaData;
2526
import org.elasticsearch.common.Preconditions;
@@ -30,7 +31,6 @@
3031
import org.elasticsearch.index.store.Store;
3132

3233
import java.io.IOException;
33-
import java.util.Map;
3434
import java.util.Set;
3535
import java.util.concurrent.CopyOnWriteArraySet;
3636

@@ -158,62 +158,19 @@ public void close() {
158158
}
159159
}
160160

161-
public static class EnableMergeLogDocMergePolicy extends CustomLogDocMergePolicy implements EnableMergePolicy {
162-
163-
private final ThreadLocal<Boolean> enableMerge = new ThreadLocal<Boolean>() {
164-
@Override
165-
protected Boolean initialValue() {
166-
return Boolean.FALSE;
167-
}
168-
};
161+
public static class EnableMergeLogDocMergePolicy extends CustomLogDocMergePolicy {
169162

170163
public EnableMergeLogDocMergePolicy(LogDocMergePolicyProvider provider) {
171164
super(provider);
172165
}
173166

174-
@Override
175-
public void enableMerge() {
176-
enableMerge.set(Boolean.TRUE);
177-
}
178-
179-
@Override
180-
public void disableMerge() {
181-
enableMerge.set(Boolean.FALSE);
182-
}
183-
184-
@Override
185-
public boolean isMergeEnabled() {
186-
return enableMerge.get() == Boolean.TRUE;
187-
}
188-
189-
@Override
190-
public void close() {
191-
enableMerge.remove();
192-
super.close();
193-
}
194-
195167
@Override
196168
public MergeSpecification findMerges(MergeTrigger trigger, SegmentInfos infos) throws IOException {
197-
if (enableMerge.get() == Boolean.FALSE) {
169+
// we don't enable merges while indexing documents, we do them in the background
170+
if (trigger == MergeTrigger.SEGMENT_FLUSH) {
198171
return null;
199172
}
200173
return super.findMerges(trigger, infos);
201174
}
202-
203-
@Override
204-
public MergeSpecification findForcedMerges(SegmentInfos infos, int maxSegmentCount, Map<SegmentInfoPerCommit, Boolean> segmentsToMerge) throws IOException {
205-
if (enableMerge.get() == Boolean.FALSE) {
206-
return null;
207-
}
208-
return super.findForcedMerges(infos, maxSegmentCount, segmentsToMerge);
209-
}
210-
211-
@Override
212-
public MergeSpecification findForcedDeletesMerges(SegmentInfos infos) throws CorruptIndexException, IOException {
213-
if (enableMerge.get() == Boolean.FALSE) {
214-
return null;
215-
}
216-
return super.findForcedDeletesMerges(infos);
217-
}
218175
}
219176
}

src/main/java/org/elasticsearch/index/merge/policy/TieredMergePolicyProvider.java

Lines changed: 6 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
package org.elasticsearch.index.merge.policy;
2121

22-
import org.apache.lucene.index.*;
22+
import org.apache.lucene.index.MergePolicy;
23+
import org.apache.lucene.index.SegmentInfos;
24+
import org.apache.lucene.index.TieredMergePolicy;
2325
import org.elasticsearch.ElasticSearchException;
2426
import org.elasticsearch.cluster.metadata.IndexMetaData;
2527
import org.elasticsearch.common.inject.Inject;
@@ -31,7 +33,6 @@
3133
import org.elasticsearch.index.store.Store;
3234

3335
import java.io.IOException;
34-
import java.util.Map;
3536
import java.util.Set;
3637
import java.util.concurrent.CopyOnWriteArraySet;
3738

@@ -219,62 +220,19 @@ public void close() {
219220
}
220221
}
221222

222-
public static class EnableMergeTieredMergePolicyProvider extends CustomTieredMergePolicyProvider implements EnableMergePolicy {
223-
224-
private final ThreadLocal<Boolean> enableMerge = new ThreadLocal<Boolean>() {
225-
@Override
226-
protected Boolean initialValue() {
227-
return Boolean.FALSE;
228-
}
229-
};
223+
public static class EnableMergeTieredMergePolicyProvider extends CustomTieredMergePolicyProvider {
230224

231225
public EnableMergeTieredMergePolicyProvider(TieredMergePolicyProvider provider) {
232226
super(provider);
233227
}
234228

235-
@Override
236-
public void enableMerge() {
237-
enableMerge.set(Boolean.TRUE);
238-
}
239-
240-
@Override
241-
public void disableMerge() {
242-
enableMerge.set(Boolean.FALSE);
243-
}
244-
245-
@Override
246-
public boolean isMergeEnabled() {
247-
return enableMerge.get() == Boolean.TRUE;
248-
}
249-
250-
@Override
251-
public void close() {
252-
enableMerge.remove();
253-
super.close();
254-
}
255-
256229
@Override
257230
public MergePolicy.MergeSpecification findMerges(MergeTrigger trigger, SegmentInfos infos) throws IOException {
258-
if (enableMerge.get() == Boolean.FALSE) {
231+
// we don't enable merges while indexing documents, we do them in the background
232+
if (trigger == MergeTrigger.SEGMENT_FLUSH) {
259233
return null;
260234
}
261235
return super.findMerges(trigger, infos);
262236
}
263-
264-
@Override
265-
public MergeSpecification findForcedMerges(SegmentInfos infos, int maxSegmentCount, Map<SegmentInfoPerCommit, Boolean> segmentsToMerge) throws IOException {
266-
if (enableMerge.get() == Boolean.FALSE) {
267-
return null;
268-
}
269-
return super.findForcedMerges(infos, maxSegmentCount, segmentsToMerge);
270-
}
271-
272-
@Override
273-
public MergeSpecification findForcedDeletesMerges(SegmentInfos infos) throws CorruptIndexException, IOException {
274-
if (enableMerge.get() == Boolean.FALSE) {
275-
return null;
276-
}
277-
return super.findForcedDeletesMerges(infos);
278-
}
279237
}
280238
}

src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,11 @@
2020
package org.elasticsearch.index.merge.scheduler;
2121

2222
import org.apache.lucene.index.*;
23-
import org.apache.lucene.store.AlreadyClosedException;
2423
import org.elasticsearch.common.inject.Inject;
2524
import org.elasticsearch.common.logging.ESLogger;
2625
import org.elasticsearch.common.settings.Settings;
2726
import org.elasticsearch.common.util.concurrent.EsExecutors;
2827
import org.elasticsearch.index.merge.MergeStats;
29-
import org.elasticsearch.index.merge.policy.EnableMergePolicy;
3028
import org.elasticsearch.index.settings.IndexSettings;
3129
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
3230
import org.elasticsearch.index.shard.ShardId;
@@ -88,19 +86,6 @@ private CustomConcurrentMergeScheduler(ESLogger logger, ShardId shardId, Concurr
8886

8987
@Override
9088
public void merge(IndexWriter writer) throws CorruptIndexException, IOException {
91-
try {
92-
// if merge is not enabled, don't do any merging...
93-
if (writer.getConfig().getMergePolicy() instanceof EnableMergePolicy) {
94-
if (!((EnableMergePolicy) writer.getConfig().getMergePolicy()).isMergeEnabled()) {
95-
return;
96-
}
97-
}
98-
} catch (AlreadyClosedException e) {
99-
// called writer#getMergePolicy can cause an AlreadyClosed failure, so ignore it
100-
// since we are doing it on close, return here and don't do the actual merge
101-
// since we do it outside of a lock in the RobinEngine
102-
return;
103-
}
10489
try {
10590
super.merge(writer);
10691
} catch (IOException e) {

0 commit comments

Comments
 (0)