Skip to content

Commit 9a5bffe

Browse files
author
Tom May
committed
More cassandra code jockeying.
1 parent f932625 commit 9a5bffe

File tree

1 file changed

+29
-28
lines changed

1 file changed

+29
-28
lines changed

plugins/cassandra/src/main/java/org/elasticsearch/cassandra/blobstore/CassandraBlobContainer.java

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -126,21 +126,22 @@ public CassandraBlobContainer(BlobPath path, String keyspace, CassandraClientFac
126126
@Override public boolean blobExists(String blobName) {
127127
String blobKey = blobKey(blobName);
128128
logger.debug("TODO blobExists {}", blobKey);
129+
Cassandra.Client client = null;
129130
try {
130-
Cassandra.Client client =
131-
cassandraClientFactory.getCassandraClient();
132-
try {
133-
return client.get_count(
134-
keyspace,
135-
blobKey,
136-
new ColumnParent("Blobs"),
137-
ConsistencyLevel.QUORUM) != 0;
138-
}
139-
finally {
131+
client = cassandraClientFactory.getCassandraClient();
132+
return client.get_count(
133+
keyspace,
134+
blobKey,
135+
new ColumnParent("Blobs"),
136+
ConsistencyLevel.QUORUM) != 0;
137+
}
138+
catch (Exception e) {
139+
return false;
140+
}
141+
finally {
142+
if (client != null) {
140143
cassandraClientFactory.closeCassandraClient(client);
141144
}
142-
} catch (Exception e) {
143-
return false;
144145
}
145146
}
146147

@@ -209,7 +210,10 @@ private boolean deleteBlobs(String... blobNames)
209210
Cassandra.Client client = null;
210211
try {
211212
client = cassandraClientFactory.getCassandraClient();
212-
readBlob(client, blobKey, listener);
213+
byte[] blobData = readBlob(client, blobKey);
214+
logger.debug("Read {} bytes: {}", blobKey, blobData.length);
215+
listener.onPartial(blobData, 0, blobData.length);
216+
listener.onCompleted();
213217
}
214218
catch (Exception ex) {
215219
listener.onFailure(ex);
@@ -223,19 +227,15 @@ private boolean deleteBlobs(String... blobNames)
223227
});
224228
}
225229

226-
private void readBlob(Cassandra.Client client, String blobKey, ReadBlobListener listener)
230+
private byte[] readBlob(Cassandra.Client client, String blobKey)
227231
throws Exception
228232
{
229233
ColumnOrSuperColumn columnOrSuperColumn = client.get(
230234
keyspace,
231235
blobKey,
232236
new ColumnPath("Blobs").setColumn(utf8.encode("data")),
233237
ConsistencyLevel.QUORUM);
234-
Column column = columnOrSuperColumn.getColumn();
235-
byte[] blobData = column.getValue();
236-
logger.debug("Read {} bytes: {}", blobKey, blobData.length);
237-
listener.onPartial(blobData, 0, blobData.length);
238-
listener.onCompleted();
238+
return columnOrSuperColumn.getColumn().getValue();
239239
}
240240

241241
@Override public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(@Nullable String blobNamePrefix) throws IOException {
@@ -317,18 +317,19 @@ private Mutation createDelete(String name, long timestamp) {
317317
logger.debug("writeBlob {} sizeInBytes: {}", blobKey(blobName), sizeInBytes);
318318
executor.execute(new Runnable() {
319319
@Override public void run() {
320+
Cassandra.Client client = null;
320321
try {
321-
Cassandra.Client client =
322-
cassandraClientFactory.getCassandraClient();
323-
try {
324-
writeBlob(client, blobName, is, sizeInBytes);
325-
listener.onCompleted();
326-
}
327-
finally {
322+
client = cassandraClientFactory.getCassandraClient();
323+
writeBlob(client, blobName, is, sizeInBytes);
324+
listener.onCompleted();
325+
}
326+
catch (Exception e) {
327+
listener.onFailure(e);
328+
}
329+
finally {
330+
if (client != null) {
328331
cassandraClientFactory.closeCassandraClient(client);
329332
}
330-
} catch (Exception e) {
331-
listener.onFailure(e);
332333
}
333334
}
334335
});

0 commit comments

Comments
 (0)