Skip to content

Commit 8a7f856

Browse files
author
Tom May
committed
Make CassandraClientFactory a configurable instance.
1 parent 609914d commit 8a7f856

File tree

4 files changed

+35
-16
lines changed

4 files changed

+35
-16
lines changed

config/elasticsearch.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
gateway:
22
type: cassandra
3+
cassandra:
4+
host: localhost
5+
port: 9160
36

47
index:
58
numberOfShards: 1

plugins/cassandra/src/main/java/org/elasticsearch/cassandra/blobstore/CassandraBlobStore.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ public class CassandraBlobStore extends AbstractComponent implements BlobStore {
115115

116116
private final ESLogger logger = Loggers.getLogger(getClass());
117117

118+
private final CassandraClientFactory cassandraClientFactory;
119+
118120
private final Executor executor;
119121

120122
private final int bufferSizeInBytes; // XXX
@@ -123,11 +125,16 @@ public class CassandraBlobStore extends AbstractComponent implements BlobStore {
123125
public CassandraBlobStore(Settings settings, Executor executor) {
124126
super(settings);
125127

128+
String host = settings.get("host", "localhost");
129+
int port = settings.getAsInt("port", 9160);
130+
cassandraClientFactory = new CassandraClientFactory(host, port);
131+
126132
this.executor = executor;
127133

128134
this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();
129135

130-
logger.debug("CassandraBlobStore executor: {} bufferSizeInBytes: {}", executor, bufferSizeInBytes);
136+
logger.debug("CassandraBlobStore {}:{} executor: {} bufferSizeInBytes: {}",
137+
host, port, executor, bufferSizeInBytes);
131138
}
132139

133140
@Override public String toString() {
@@ -168,7 +175,7 @@ boolean blobExists(String blobPath, String blobName) {
168175
logger.debug("TODO blobExists {}", blobKey);
169176
try {
170177
Cassandra.Client client =
171-
CassandraClientFactory.getCassandraClient();
178+
cassandraClientFactory.getCassandraClient();
172179
try {
173180
return client.get_count(
174181
keySpace,
@@ -177,7 +184,7 @@ boolean blobExists(String blobPath, String blobName) {
177184
ConsistencyLevel.QUORUM) != 0;
178185
}
179186
finally {
180-
CassandraClientFactory.closeCassandraClient(client);
187+
cassandraClientFactory.closeCassandraClient(client);
181188
}
182189
} catch (Exception e) {
183190
return false;
@@ -224,7 +231,7 @@ private boolean deleteBlobs(String blobPath, String... blobNames)
224231

225232
Cassandra.Client client = null;
226233
try {
227-
client = CassandraClientFactory.getCassandraClient();
234+
client = cassandraClientFactory.getCassandraClient();
228235
client.batch_mutate(
229236
keySpace, mutationMap, ConsistencyLevel.QUORUM);
230237
return true;
@@ -236,7 +243,7 @@ private boolean deleteBlobs(String blobPath, String... blobNames)
236243
}
237244
finally {
238245
if (client != null) {
239-
CassandraClientFactory.closeCassandraClient(client);
246+
cassandraClientFactory.closeCassandraClient(client);
240247
}
241248
}
242249
}
@@ -258,15 +265,15 @@ void readBlob(String blobPath, String blobName, final BlobContainer.ReadBlobList
258265
@Override public void run() {
259266
Cassandra.Client client = null;
260267
try {
261-
client = CassandraClientFactory.getCassandraClient();
268+
client = cassandraClientFactory.getCassandraClient();
262269
readBlob(client, blobKey, listener);
263270
}
264271
catch (Exception ex) {
265272
listener.onFailure(ex);
266273
}
267274
finally {
268275
if (client != null) {
269-
CassandraClientFactory.closeCassandraClient(client);
276+
cassandraClientFactory.closeCassandraClient(client);
270277
}
271278
}
272279
}
@@ -291,7 +298,7 @@ private void readBlob(Cassandra.Client client, String blobKey, BlobContainer.Rea
291298
ImmutableMap<String, BlobMetaData> listBlobsByPrefix(String blobPath, @Nullable String blobNamePrefix) throws IOException {
292299
logger.debug("listBlobsByPrefix {}", blobKey(blobPath, blobNamePrefix));
293300
List<ColumnOrSuperColumn> columns;
294-
Cassandra.Client client = CassandraClientFactory.getCassandraClient();
301+
Cassandra.Client client = cassandraClientFactory.getCassandraClient();
295302
try {
296303
columns = client.get_slice(
297304
keySpace,
@@ -317,7 +324,7 @@ ImmutableMap<String, BlobMetaData> listBlobsByPrefix(String blobPath, @Nullable
317324
throw new IOException("Cassandra get_slice on ???:??? failed", ex);
318325
}
319326
finally {
320-
CassandraClientFactory.closeCassandraClient(client);
327+
cassandraClientFactory.closeCassandraClient(client);
321328
}
322329

323330
ImmutableMap.Builder<String, BlobMetaData> blobsBuilder = ImmutableMap.builder();
@@ -341,13 +348,13 @@ void writeBlob(final String blobPath, final String blobName, final InputStream i
341348
@Override public void run() {
342349
try {
343350
Cassandra.Client client =
344-
CassandraClientFactory.getCassandraClient();
351+
cassandraClientFactory.getCassandraClient();
345352
try {
346353
writeBlob(client, blobPath, blobName, is, sizeInBytes);
347354
listener.onCompleted();
348355
}
349356
finally {
350-
CassandraClientFactory.closeCassandraClient(client);
357+
cassandraClientFactory.closeCassandraClient(client);
351358
}
352359
} catch (Exception e) {
353360
listener.onFailure(e);

plugins/cassandra/src/main/java/org/elasticsearch/cassandra/blobstore/CassandraClientFactory.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,24 +31,33 @@
3131
import org.apache.cassandra.thrift.Cassandra;
3232

3333
class CassandraClientFactory {
34-
public static Cassandra.Client getCassandraClient()
34+
private final String host;
35+
private final int port;
36+
37+
public CassandraClientFactory(String host, int port) {
38+
this.host = host;
39+
this.port = port;
40+
}
41+
42+
public Cassandra.Client getCassandraClient()
3543
throws IOException
3644
{
3745
TTransport transport =
38-
new TFramedTransport(new TSocket("localhost", 9160));
46+
new TFramedTransport(new TSocket(host, port));
3947
TProtocol protocol = new TBinaryProtocol(transport);
4048
Cassandra.Client client = new Cassandra.Client(protocol);
4149
try {
4250
transport.open();
4351
}
4452
catch (TTransportException ex) {
4553
throw new IOException(
46-
"Cassandra transport.open to localhost:9160 failed", ex);
54+
"Cassandra transport.open to " + host + ":" + port + " failed",
55+
ex);
4756
}
4857
return client;
4958
}
5059

51-
public static void closeCassandraClient(Cassandra.Client client) {
60+
public void closeCassandraClient(Cassandra.Client client) {
5261
client.getInputProtocol().getTransport().close();
5362
}
5463
}

plugins/cassandra/src/main/java/org/elasticsearch/gateway/cassandra/CassandraGateway.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public class CassandraGateway extends BlobStoreGateway {
4949

5050
logger.debug("using chunk_size [{}]", chunkSize);
5151

52-
initialize(new CassandraBlobStore(settings, threadPool.cached()), clusterName, chunkSize);
52+
initialize(new CassandraBlobStore(componentSettings, threadPool.cached()), clusterName, chunkSize);
5353
}
5454

5555
@Override public void close() throws ElasticSearchException {

0 commit comments

Comments
 (0)