Skip to content

Commit a0a714e

Browse files
committed
Shared Gateway: Allow to set the number of concurrent streams doing snapshot operations, closes elastic#621.
1 parent 93dec72 commit a0a714e

File tree

3 files changed

+43
-7
lines changed

3 files changed

+43
-7
lines changed

modules/elasticsearch/src/main/java/org/elasticsearch/gateway/fs/FsGateway.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,33 @@
1919

2020
package org.elasticsearch.gateway.fs;
2121

22+
import org.elasticsearch.ElasticSearchException;
2223
import org.elasticsearch.cluster.ClusterName;
2324
import org.elasticsearch.cluster.ClusterService;
2425
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
2526
import org.elasticsearch.common.blobstore.fs.FsBlobStore;
2627
import org.elasticsearch.common.inject.Inject;
2728
import org.elasticsearch.common.inject.Module;
2829
import org.elasticsearch.common.settings.Settings;
30+
import org.elasticsearch.common.unit.TimeValue;
31+
import org.elasticsearch.common.util.concurrent.DynamicExecutors;
32+
import org.elasticsearch.common.util.concurrent.EsExecutors;
2933
import org.elasticsearch.env.Environment;
3034
import org.elasticsearch.gateway.blobstore.BlobStoreGateway;
3135
import org.elasticsearch.index.gateway.fs.FsIndexGatewayModule;
3236
import org.elasticsearch.threadpool.ThreadPool;
3337

3438
import java.io.File;
3539
import java.io.IOException;
40+
import java.util.concurrent.ExecutorService;
3641

3742
/**
3843
* @author kimchy (shay.banon)
3944
*/
4045
public class FsGateway extends BlobStoreGateway {
4146

47+
private final ExecutorService concurrentStreamPool;
48+
4249
@Inject public FsGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService,
4350
Environment environment, ClusterName clusterName, ThreadPool threadPool) throws IOException {
4451
super(settings, clusterService, createIndexService);
@@ -51,7 +58,11 @@ public class FsGateway extends BlobStoreGateway {
5158
} else {
5259
gatewayFile = new File(location);
5360
}
54-
initialize(new FsBlobStore(componentSettings, threadPool.cached(), gatewayFile), clusterName, null);
61+
62+
int concurrentStreams = componentSettings.getAsInt("concurrent_streams", 5);
63+
this.concurrentStreamPool = DynamicExecutors.newScalingThreadPool(1, concurrentStreams, TimeValue.timeValueSeconds(5).millis(), EsExecutors.daemonThreadFactory(settings, "[s3_stream]"));
64+
65+
initialize(new FsBlobStore(componentSettings, concurrentStreamPool, gatewayFile), clusterName, null);
5566
}
5667

5768
@Override public String type() {
@@ -61,4 +72,9 @@ public class FsGateway extends BlobStoreGateway {
6172
@Override public Class<? extends Module> suggestIndexGateway() {
6273
return FsIndexGatewayModule.class;
6374
}
75+
76+
@Override protected void doClose() throws ElasticSearchException {
77+
super.doClose();
78+
concurrentStreamPool.shutdown();
79+
}
6480
}

plugins/cloud/aws/src/main/java/org/elasticsearch/gateway/s3/S3Gateway.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,23 @@
3131
import org.elasticsearch.common.settings.Settings;
3232
import org.elasticsearch.common.unit.ByteSizeUnit;
3333
import org.elasticsearch.common.unit.ByteSizeValue;
34+
import org.elasticsearch.common.unit.TimeValue;
35+
import org.elasticsearch.common.util.concurrent.DynamicExecutors;
36+
import org.elasticsearch.common.util.concurrent.EsExecutors;
3437
import org.elasticsearch.gateway.blobstore.BlobStoreGateway;
3538
import org.elasticsearch.index.gateway.s3.S3IndexGatewayModule;
3639
import org.elasticsearch.threadpool.ThreadPool;
3740

3841
import java.io.IOException;
42+
import java.util.concurrent.ExecutorService;
3943

4044
/**
4145
* @author kimchy (shay.banon)
4246
*/
4347
public class S3Gateway extends BlobStoreGateway {
4448

49+
private final ExecutorService concurrentStreamPool;
50+
4551
@Inject public S3Gateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService,
4652
ClusterName clusterName, ThreadPool threadPool, AwsS3Service s3Service) throws IOException {
4753
super(settings, clusterService, createIndexService);
@@ -76,13 +82,17 @@ public class S3Gateway extends BlobStoreGateway {
7682
}
7783
ByteSizeValue chunkSize = componentSettings.getAsBytesSize("chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB));
7884

79-
logger.debug("using bucket [{}], region [{}], chunk_size [{}]", bucket, region, chunkSize);
85+
int concurrentStreams = componentSettings.getAsInt("concurrent_streams", 5);
86+
this.concurrentStreamPool = DynamicExecutors.newScalingThreadPool(1, concurrentStreams, TimeValue.timeValueSeconds(5).millis(), EsExecutors.daemonThreadFactory(settings, "[s3_stream]"));
87+
88+
logger.debug("using bucket [{}], region [{}], chunk_size [{}], concurrent_streams [{}]", bucket, region, chunkSize, concurrentStreams);
8089

81-
initialize(new S3BlobStore(settings, s3Service.client(), bucket, region, threadPool.cached()), clusterName, chunkSize);
90+
initialize(new S3BlobStore(settings, s3Service.client(), bucket, region, concurrentStreamPool), clusterName, chunkSize);
8291
}
8392

84-
@Override public void close() throws ElasticSearchException {
85-
super.close();
93+
@Override protected void doClose() throws ElasticSearchException {
94+
super.doClose();
95+
concurrentStreamPool.shutdown();
8696
}
8797

8898
@Override public String type() {

plugins/hadoop/src/main/java/org/elasticsearch/gateway/hdfs/HdfsGateway.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,16 @@
3131
import org.elasticsearch.common.inject.Inject;
3232
import org.elasticsearch.common.inject.Module;
3333
import org.elasticsearch.common.settings.Settings;
34+
import org.elasticsearch.common.unit.TimeValue;
35+
import org.elasticsearch.common.util.concurrent.DynamicExecutors;
36+
import org.elasticsearch.common.util.concurrent.EsExecutors;
3437
import org.elasticsearch.gateway.blobstore.BlobStoreGateway;
3538
import org.elasticsearch.threadpool.ThreadPool;
3639

3740
import java.io.IOException;
3841
import java.net.URI;
3942
import java.util.Map;
43+
import java.util.concurrent.ExecutorService;
4044

4145
/**
4246
* @author kimchy (shay.banon)
@@ -47,6 +51,8 @@ public class HdfsGateway extends BlobStoreGateway {
4751

4852
private final FileSystem fileSystem;
4953

54+
private final ExecutorService concurrentStreamPool;
55+
5056
@Inject public HdfsGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService,
5157
ClusterName clusterName, ThreadPool threadPool) throws IOException {
5258
super(settings, clusterService, createIndexService);
@@ -62,7 +68,10 @@ public class HdfsGateway extends BlobStoreGateway {
6268
}
6369
Path hPath = new Path(new Path(path), clusterName.value());
6470

65-
logger.debug("Using uri [{}], path [{}]", uri, hPath);
71+
int concurrentStreams = componentSettings.getAsInt("concurrent_streams", 5);
72+
this.concurrentStreamPool = DynamicExecutors.newScalingThreadPool(1, concurrentStreams, TimeValue.timeValueSeconds(5).millis(), EsExecutors.daemonThreadFactory(settings, "[s3_stream]"));
73+
74+
logger.debug("Using uri [{}], path [{}], concurrent_streams [{}]", uri, hPath, concurrentStreams);
6675

6776
Configuration conf = new Configuration();
6877
Settings hdfsSettings = settings.getByPrefix("hdfs.conf.");
@@ -72,7 +81,7 @@ public class HdfsGateway extends BlobStoreGateway {
7281

7382
fileSystem = FileSystem.get(URI.create(uri), conf);
7483

75-
initialize(new HdfsBlobStore(settings, fileSystem, threadPool.cached(), hPath), clusterName, null);
84+
initialize(new HdfsBlobStore(settings, fileSystem, concurrentStreamPool, hPath), clusterName, null);
7685
}
7786

7887
@Override public String type() {
@@ -92,5 +101,6 @@ public class HdfsGateway extends BlobStoreGateway {
92101
// ignore
93102
}
94103
}
104+
concurrentStreamPool.shutdown();
95105
}
96106
}

0 commit comments

Comments
 (0)