Skip to content

Commit b2a907a

Browse files
author
Tom May
committed
Crappy Cassandra readBlob implementation.
1 parent 4c6e402 commit b2a907a

File tree

1 file changed

+27
-22
lines changed

1 file changed

+27
-22
lines changed

plugins/cassandra/src/main/java/org/elasticsearch/cassandra/blobstore/AbstractCassandraBlobContainer.java

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import org.apache.cassandra.thrift.Column;
3131
import org.apache.cassandra.thrift.ColumnParent;
32+
import org.apache.cassandra.thrift.ColumnPath;
3233
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
3334
import org.apache.cassandra.thrift.ConsistencyLevel;
3435
import org.apache.cassandra.thrift.Cassandra;
@@ -84,36 +85,40 @@ public AbstractCassandraBlobContainer(BlobPath path, CassandraBlobStore blobStor
8485
}
8586

8687
@Override public void readBlob(final String blobName, final ReadBlobListener listener) {
87-
logger.debug("TODO readBlob blobName={}", blobName);
88-
/* XXX
88+
logger.debug("readBlob blobName={}", blobName);
8989
blobStore.executor().execute(new Runnable() {
9090
@Override public void run() {
91-
InputStream is;
91+
Cassandra.Client client = null;
9292
try {
93-
S3Object object = blobStore.client().getObject(blobStore.bucket(), buildKey(blobName));
94-
is = object.getObjectContent();
95-
} catch (Exception e) {
96-
listener.onFailure(e);
97-
return;
93+
client = CassandraClientFactory.getCassandraClient();
94+
readBlob(client, blobName, listener);
9895
}
99-
byte[] buffer = new byte[blobStore.bufferSizeInBytes()];
100-
try {
101-
int bytesRead;
102-
while ((bytesRead = is.read(buffer)) != -1) {
103-
listener.onPartial(buffer, 0, bytesRead);
104-
}
105-
listener.onCompleted();
106-
} catch (Exception e) {
107-
try {
108-
is.close();
109-
} catch (IOException e1) {
110-
// ignore
96+
catch (Exception ex) {
97+
listener.onFailure(ex);
98+
}
99+
finally {
100+
if (client != null) {
101+
CassandraClientFactory.closeCassandraClient(client);
111102
}
112-
listener.onFailure(e);
113103
}
114104
}
115105
});
116-
*/
106+
}
107+
108+
private void readBlob(Cassandra.Client client, String blobName, ReadBlobListener listener)
109+
throws Exception
110+
{
111+
ColumnOrSuperColumn columnOrSuperColumn = client.get(
112+
keySpace,
113+
blobPath + '/' + blobName,
114+
new ColumnPath("Blobs").setColumn(utf8.encode("data")),
115+
ConsistencyLevel.QUORUM);
116+
Column column = columnOrSuperColumn.getColumn();
117+
byte[] blobData = column.getValue();
118+
logger.debug("Read {} ({} bytes): {}",
119+
blobName, blobData.length, new String(blobData));
120+
listener.onPartial(blobData, 0, blobData.length);
121+
listener.onCompleted();
117122
}
118123

119124
@Override public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(@Nullable String blobNamePrefix) throws IOException {

0 commit comments

Comments
 (0)