Skip to content

Commit 3ff2c74

Browse files
author
Tom May
committed
Continue S3 -> Cassandra shredding, add logging, mark things TODO.
1 parent 3947873 commit 3ff2c74

File tree

4 files changed

+29
-69
lines changed

4 files changed

+29
-69
lines changed

plugins/cassandra/src/main/java/org/elasticsearch/cassandra/blobstore/AbstractCassandraBlobContainer.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,13 @@
1919

2020
package org.elasticsearch.cassandra.blobstore;
2121

22-
//import com.amazonaws.services.s3.model.ObjectListing;
23-
//import com.amazonaws.services.s3.model.S3Object;
24-
//import com.amazonaws.services.s3.model.S3ObjectSummary;
2522
import org.elasticsearch.common.blobstore.BlobMetaData;
2623
import org.elasticsearch.common.blobstore.BlobPath;
2724
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
2825
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
2926
import org.elasticsearch.common.collect.ImmutableMap;
27+
import org.elasticsearch.common.logging.ESLogger;
28+
import org.elasticsearch.common.logging.Loggers;
3029

3130
import javax.annotation.Nullable;
3231
import java.io.IOException;
@@ -37,31 +36,35 @@
3736
*/
3837
public class AbstractCassandraBlobContainer extends AbstractBlobContainer {
3938

39+
protected final ESLogger logger = Loggers.getLogger(getClass());
40+
4041
protected final CassandraBlobStore blobStore;
4142

42-
protected final String keyPath;
43+
protected final String keyPath; // XXX
4344

4445
public AbstractCassandraBlobContainer(BlobPath path, CassandraBlobStore blobStore) {
4546
super(path);
4647
this.blobStore = blobStore;
4748
this.keyPath = path.buildAsString("/") + "/";
49+
logger.debug("AbstractCassandraBlobContainer path={}", path);
4850
}
4951

5052
@Override public boolean blobExists(String blobName) {
5153
try {
52-
//XXX blobStore.client().getObjectMetadata(blobStore.bucket(), buildKey(blobName));
54+
logger.debug("TODO blobExists blobName={}", blobName);
5355
return true;
5456
} catch (Exception e) {
5557
return false;
5658
}
5759
}
5860

5961
@Override public boolean deleteBlob(String blobName) throws IOException {
60-
//XXX blobStore.client().deleteObject(blobStore.bucket(), buildKey(blobName));
62+
logger.debug("TODO deleteBlob blobName={}", blobName);
6163
return true;
6264
}
6365

6466
@Override public void readBlob(final String blobName, final ReadBlobListener listener) {
67+
logger.debug("TODO readBlob blobName={}", blobName);
6568
/* XXX
6669
blobStore.executor().execute(new Runnable() {
6770
@Override public void run() {
@@ -94,6 +97,7 @@ public AbstractCassandraBlobContainer(BlobPath path, CassandraBlobStore blobStor
9497
}
9598

9699
@Override public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(@Nullable String blobNamePrefix) throws IOException {
100+
logger.debug("TODO listBlobsByPrefix blobNamePrefix={}", blobNamePrefix);
97101
ImmutableMap.Builder<String, BlobMetaData> blobsBuilder = ImmutableMap.builder();
98102
/* XXX
99103
ObjectListing prevListing = null;

plugins/cassandra/src/main/java/org/elasticsearch/cassandra/blobstore/CassandraBlobStore.java

Lines changed: 13 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,12 @@
1919

2020
package org.elasticsearch.cassandra.blobstore;
2121

22-
//import com.amazonaws.services.s3.AmazonS3;
23-
//import com.amazonaws.services.s3.model.ObjectListing;
24-
//import com.amazonaws.services.s3.model.S3ObjectSummary;
2522
import org.elasticsearch.common.blobstore.BlobPath;
2623
import org.elasticsearch.common.blobstore.BlobStore;
2724
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
2825
import org.elasticsearch.common.component.AbstractComponent;
26+
import org.elasticsearch.common.logging.ESLogger;
27+
import org.elasticsearch.common.logging.Loggers;
2928
import org.elasticsearch.common.settings.Settings;
3029
import org.elasticsearch.common.unit.ByteSizeUnit;
3130
import org.elasticsearch.common.unit.ByteSizeValue;
@@ -38,62 +37,32 @@
3837
*/
3938
public class CassandraBlobStore extends AbstractComponent implements BlobStore {
4039

41-
//XXX private final AmazonS3 client;
40+
private final ESLogger logger = Loggers.getLogger(getClass());
4241

43-
//XXX private final String bucket;
42+
private final Executor executor;
4443

45-
//XXX private final String region;
44+
private final int bufferSizeInBytes; // XXX
4645

47-
//XXX private final Executor executor;
48-
49-
//XXX private final int bufferSizeInBytes;
50-
51-
public CassandraBlobStore(Settings settings, /*AmazonS3 client, */ String bucket, @Nullable String region, Executor executor) {
46+
// XXX executor is a java.util.concurrent.ThreadPoolExecutor
47+
public CassandraBlobStore(Settings settings, Executor executor) {
5248
super(settings);
53-
/* XXX
54-
this.client = client;
55-
this.bucket = bucket;
56-
this.region = region;
49+
5750
this.executor = executor;
5851

5952
this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
6053

61-
if (!client.doesBucketExist(bucket)) {
62-
if (region != null) {
63-
client.createBucket(bucket, region);
64-
} else {
65-
client.createBucket(bucket);
66-
}
67-
}
68-
*/
54+
logger.debug("CassandraBlobStore executor={}, bufferSizeInBytes={}", executor, bufferSizeInBytes);
6955
}
7056

7157
@Override public String toString() {
72-
/* XXX
73-
return (region == null ? "" : region + "/") + bucket;
74-
*/
75-
return null;
76-
}
77-
78-
/* XXX
79-
public AmazonS3 client() {
80-
return client;
81-
}
82-
*/
83-
84-
/* XXX
85-
public String bucket() {
86-
return bucket;
58+
return "cassandra"; // XXX
8759
}
88-
*/
8960

90-
/* XXX
9161
public Executor executor() {
9262
return executor;
9363
}
94-
*/
9564

96-
/*
65+
/* XXX
9766
public int bufferSizeInBytes() {
9867
return bufferSizeInBytes;
9968
}
@@ -104,7 +73,8 @@ public int bufferSizeInBytes() {
10473
}
10574

10675
@Override public void delete(BlobPath path) {
107-
/* XXX
76+
logger.debug("TODO delete path={}", path);
77+
/* XXX TODO
10878
ObjectListing prevListing = null;
10979
while (true) {
11080
ObjectListing list;

plugins/cassandra/src/main/java/org/elasticsearch/cassandra/blobstore/CassandraImmutableBlobContainer.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919

2020
package org.elasticsearch.cassandra.blobstore;
2121

22-
//import com.amazonaws.services.s3.model.ObjectMetadata;
23-
//import com.amazonaws.services.s3.model.PutObjectResult;
2422
import org.elasticsearch.common.blobstore.BlobPath;
2523
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
2624
import org.elasticsearch.common.blobstore.support.BlobStores;
@@ -38,20 +36,17 @@ public CassandraImmutableBlobContainer(BlobPath path, CassandraBlobStore blobSto
3836
}
3937

4038
@Override public void writeBlob(final String blobName, final InputStream is, final long sizeInBytes, final WriterListener listener) {
41-
/* XXX
4239
blobStore.executor().execute(new Runnable() {
4340
@Override public void run() {
4441
try {
45-
ObjectMetadata md = new ObjectMetadata();
46-
md.setContentLength(sizeInBytes);
47-
PutObjectResult objectResult = blobStore.client().putObject(blobStore.bucket(), buildKey(blobName), is, md);
42+
// XXX TODO
43+
logger.debug("TODO writeBlob blobName={}, sizeInBytes={}, is={}", blobName, sizeInBytes, is);
4844
listener.onCompleted();
4945
} catch (Exception e) {
5046
listener.onFailure(e);
5147
}
5248
}
5349
});
54-
*/
5550
}
5651

5752
@Override public void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException {

plugins/cassandra/src/main/java/org/elasticsearch/gateway/cassandra/CassandraGateway.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@
2121

2222
import org.elasticsearch.ElasticSearchException;
2323
import org.elasticsearch.ElasticSearchIllegalArgumentException;
24-
//import org.elasticsearch.cloud.aws.AwsS3Service;
25-
//import org.elasticsearch.cloud.aws.blobstore.S3BlobStore;
24+
import org.elasticsearch.cassandra.blobstore.CassandraBlobStore;
2625
import org.elasticsearch.cluster.ClusterName;
2726
import org.elasticsearch.cluster.ClusterService;
2827
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
@@ -43,22 +42,14 @@
4342
public class CassandraGateway extends BlobStoreGateway {
4443

4544
@Inject public CassandraGateway(Settings settings, ClusterService clusterService, MetaDataCreateIndexService createIndexService,
46-
ClusterName clusterName, ThreadPool threadPool /* XXX , AwsS3Service s3Service*/) throws IOException {
45+
ClusterName clusterName, ThreadPool threadPool) throws IOException {
4746
super(settings, clusterService, createIndexService);
4847

49-
/* XXX
50-
String bucket = componentSettings.get("bucket");
51-
if (bucket == null) {
52-
throw new ElasticSearchIllegalArgumentException("No bucket defined for s3 gateway");
53-
}
54-
55-
String region = componentSettings.get("region");
5648
ByteSizeValue chunkSize = componentSettings.getAsBytesSize("chunk_size", new ByteSizeValue(100, ByteSizeUnit.MB));
5749

58-
logger.debug("using bucket [{}], region [{}], chunk_size [{}]", bucket, region, chunkSize);
50+
logger.debug("using chunk_size [{}]", chunkSize);
5951

60-
initialize(new S3BlobStore(settings, s3Service.client(), bucket, region, threadPool.cached()), clusterName, chunkSize);
61-
*/
52+
initialize(new CassandraBlobStore(settings, threadPool.cached()), clusterName, chunkSize);
6253
}
6354

6455
@Override public void close() throws ElasticSearchException {

0 commit comments

Comments
 (0)