Skip to content

Commit 4c6e402

Browse files
author
Tom May
committed
Hook up listBlobsByPrefix and writeBlob to cassandra on localhost with
crappy poorly factored thrift client implementation to see what we're up against. This is truly horrible stuff. I hope it gets better.
1 parent 5ccc3c8 commit 4c6e402

File tree

3 files changed

+207
-33
lines changed

3 files changed

+207
-33
lines changed

plugins/cassandra/src/main/java/org/elasticsearch/cassandra/blobstore/AbstractCassandraBlobContainer.java

Lines changed: 64 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -27,25 +27,45 @@
2727
import org.elasticsearch.common.logging.ESLogger;
2828
import org.elasticsearch.common.logging.Loggers;
2929

30+
import org.apache.cassandra.thrift.Column;
31+
import org.apache.cassandra.thrift.ColumnParent;
32+
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
33+
import org.apache.cassandra.thrift.ConsistencyLevel;
34+
import org.apache.cassandra.thrift.Cassandra;
35+
import org.apache.cassandra.thrift.InvalidRequestException;
36+
import org.apache.cassandra.thrift.Mutation;
37+
import org.apache.cassandra.thrift.SlicePredicate;
38+
import org.apache.cassandra.thrift.SliceRange;
39+
import org.apache.cassandra.thrift.TimedOutException;
40+
import org.apache.cassandra.thrift.UnavailableException;
41+
42+
import org.apache.thrift.TException;
43+
3044
import javax.annotation.Nullable;
3145
import java.io.IOException;
3246
import java.io.InputStream;
47+
import java.nio.charset.Charset;
48+
import java.util.List;
3349

3450
/**
3551
* @author Tom May ([email protected])
3652
*/
3753
public class AbstractCassandraBlobContainer extends AbstractBlobContainer {
3854

55+
protected static final Charset utf8 = Charset.forName("UTF-8");
56+
3957
protected final ESLogger logger = Loggers.getLogger(getClass());
4058

4159
protected final CassandraBlobStore blobStore;
4260

43-
protected final String keyPath; // XXX
61+
protected final String blobPath;
62+
63+
protected static final String keySpace = "ElasticSearch";
4464

4565
public AbstractCassandraBlobContainer(BlobPath path, CassandraBlobStore blobStore) {
4666
super(path);
4767
this.blobStore = blobStore;
48-
this.keyPath = path.buildAsString("/") + "/";
68+
this.blobPath = path.buildAsString("/");
4969
logger.debug("AbstractCassandraBlobContainer path={}", path);
5070
}
5171

@@ -97,42 +117,55 @@ public AbstractCassandraBlobContainer(BlobPath path, CassandraBlobStore blobStor
97117
}
98118

99119
@Override public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(@Nullable String blobNamePrefix) throws IOException {
100-
logger.debug("TODO listBlobsByPrefix blobNamePrefix={}", blobNamePrefix);
120+
logger.debug("listBlobsByPrefix blobNamePrefix={}", blobNamePrefix);
121+
122+
List<ColumnOrSuperColumn> columns;
123+
Cassandra.Client client = CassandraClientFactory.getCassandraClient();
124+
try {
125+
columns =
126+
client.get_slice(
127+
keySpace,
128+
blobPath,
129+
new ColumnParent("BlobNames"),
130+
new SlicePredicate().setSlice_range(
131+
new SliceRange()
132+
.setStart(new byte[0])
133+
.setFinish(new byte[0])
134+
.setCount(1000000000)),
135+
ConsistencyLevel.QUORUM);
136+
}
137+
catch (InvalidRequestException ex) {
138+
throw new IOException("Cassandra get_slice on ???:??? failed", ex);
139+
}
140+
catch (UnavailableException ex) {
141+
throw new IOException("Cassandra get_slice on ???:??? failed", ex);
142+
}
143+
catch (TimedOutException ex) {
144+
throw new IOException("Cassandra get_slice on ???:??? failed", ex);
145+
}
146+
catch (TException ex) {
147+
throw new IOException("Cassandra get_slice on ???:??? failed", ex);
148+
}
149+
finally {
150+
CassandraClientFactory.closeCassandraClient(client);
151+
}
152+
101153
ImmutableMap.Builder<String, BlobMetaData> blobsBuilder = ImmutableMap.builder();
102-
/* XXX
103-
ObjectListing prevListing = null;
104-
while (true) {
105-
ObjectListing list;
106-
if (prevListing != null) {
107-
list = blobStore.client().listNextBatchOfObjects(prevListing);
108-
} else {
109-
if (blobNamePrefix != null) {
110-
list = blobStore.client().listObjects(blobStore.bucket(), buildKey(blobNamePrefix));
111-
} else {
112-
list = blobStore.client().listObjects(blobStore.bucket(), keyPath);
113-
}
114-
}
115-
for (S3ObjectSummary summary : list.getObjectSummaries()) {
116-
String name = summary.getKey().substring(keyPath.length());
117-
blobsBuilder.put(name, new PlainBlobMetaData(name, summary.getSize()));
118-
}
119-
if (list.isTruncated()) {
120-
prevListing = list;
121-
} else {
122-
break;
154+
155+
for (ColumnOrSuperColumn columnOrSuperColumn : columns) {
156+
Column column = columnOrSuperColumn.getColumn();
157+
String name = new String(column.getName(), utf8);
158+
long length = Integer.parseInt(new String(column.getValue(), utf8));
159+
logger.debug("name: {}, length: {}", name, length);
160+
if (blobNamePrefix == null || name.startsWith(blobNamePrefix)) {
161+
blobsBuilder.put(name, new PlainBlobMetaData(name, length));
123162
}
124163
}
125-
*/
164+
126165
return blobsBuilder.build();
127166
}
128167

129168
@Override public ImmutableMap<String, BlobMetaData> listBlobs() throws IOException {
130169
return listBlobsByPrefix(null);
131170
}
132-
133-
/* XXX
134-
protected String buildKey(String blobName) {
135-
return keyPath + blobName;
136-
}
137-
*/
138171
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to Elastic Search and Shay Banon under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. Elastic Search licenses this
6+
* file to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.cassandra.blobstore;
21+
22+
import java.io.IOException;
23+
24+
import org.apache.thrift.TException;
25+
import org.apache.thrift.protocol.TBinaryProtocol;
26+
import org.apache.thrift.protocol.TProtocol;
27+
import org.apache.thrift.transport.TFramedTransport;
28+
import org.apache.thrift.transport.TSocket;
29+
import org.apache.thrift.transport.TTransport;
30+
import org.apache.thrift.transport.TTransportException;
31+
32+
import org.apache.cassandra.thrift.Cassandra;
33+
34+
class CassandraClientFactory {
35+
public static Cassandra.Client getCassandraClient()
36+
throws IOException
37+
{
38+
TTransport transport =
39+
new TFramedTransport(new TSocket("localhost", 9160));
40+
TProtocol protocol = new TBinaryProtocol(transport);
41+
Cassandra.Client client = new Cassandra.Client(protocol);
42+
try {
43+
transport.open();
44+
}
45+
catch (TTransportException ex) {
46+
throw new IOException(
47+
"Cassandra transport.open to localhost:9160 failed", ex);
48+
}
49+
return client;
50+
}
51+
52+
public static void closeCassandraClient(Cassandra.Client client) {
53+
client.getInputProtocol().getTransport().close();
54+
}
55+
}

plugins/cassandra/src/main/java/org/elasticsearch/cassandra/blobstore/CassandraImmutableBlobContainer.java

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,28 @@
2323
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
2424
import org.elasticsearch.common.blobstore.support.BlobStores;
2525

26+
import org.apache.cassandra.thrift.Column;
27+
import org.apache.cassandra.thrift.ColumnParent;
28+
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
29+
import org.apache.cassandra.thrift.ConsistencyLevel;
30+
import org.apache.cassandra.thrift.Cassandra;
31+
import org.apache.cassandra.thrift.InvalidRequestException;
32+
import org.apache.cassandra.thrift.Mutation;
33+
import org.apache.cassandra.thrift.SlicePredicate;
34+
import org.apache.cassandra.thrift.SliceRange;
35+
import org.apache.cassandra.thrift.TimedOutException;
36+
import org.apache.cassandra.thrift.UnavailableException;
37+
38+
import org.apache.thrift.TException;
39+
40+
import java.io.DataInputStream;
2641
import java.io.IOException;
2742
import java.io.InputStream;
43+
import java.nio.ByteBuffer;
44+
import java.util.ArrayList;
45+
import java.util.HashMap;
46+
import java.util.List;
47+
import java.util.Map;
2848

2949
/**
3050
* @author Tom May ([email protected])
@@ -35,12 +55,24 @@ public CassandraImmutableBlobContainer(BlobPath path, CassandraBlobStore blobSto
3555
super(path, blobStore);
3656
}
3757

58+
// InputStream is a completely shitty abstraction for something to
59+
// write via thrift. And passing a sizeInBytes along with an
60+
// InputStream is a sign that it's a shitty abstraction in
61+
// general. At least we can use the sizeInBytes to allocate a
62+
// ByteBuffer and copy to it then hand it to thrift.
3863
@Override public void writeBlob(final String blobName, final InputStream is, final long sizeInBytes, final WriterListener listener) {
3964
blobStore.executor().execute(new Runnable() {
4065
@Override public void run() {
4166
try {
42-
// XXX TODO
43-
logger.debug("TODO writeBlob blobName={}, sizeInBytes={}, is={}", blobName, sizeInBytes, is);
67+
logger.debug("writeBlob blobName={}, sizeInBytes={}, is={}", blobName, sizeInBytes, is);
68+
Cassandra.Client client =
69+
CassandraClientFactory.getCassandraClient();
70+
try {
71+
writeBlob(client, blobName, is, sizeInBytes);
72+
}
73+
finally {
74+
CassandraClientFactory.closeCassandraClient(client);
75+
}
4476
listener.onCompleted();
4577
} catch (Exception e) {
4678
listener.onFailure(e);
@@ -52,4 +84,58 @@ public CassandraImmutableBlobContainer(BlobPath path, CassandraBlobStore blobSto
5284
@Override public void writeBlob(String blobName, InputStream is, long sizeInBytes) throws IOException {
5385
BlobStores.syncWriteBlob(this, blobName, is, sizeInBytes);
5486
}
87+
88+
private void writeBlob(Cassandra.Client client, String blobName, InputStream is, long sizeInBytes)
89+
throws InvalidRequestException, TimedOutException, UnavailableException, TException, IOException
90+
{
91+
long timestamp = System.currentTimeMillis();
92+
93+
Map<String, Map<String, List<Mutation>>> mutationMap =
94+
new HashMap<String, Map<String, List<Mutation>>>();
95+
96+
// Insert the blob data into Blobs.
97+
98+
int intSizeInBytes = (int) sizeInBytes;
99+
if (intSizeInBytes != sizeInBytes) {
100+
throw new IllegalArgumentException(
101+
"Blob " + blobName + " size " + sizeInBytes +
102+
" is too large.");
103+
}
104+
ByteBuffer blobData = ByteBuffer.allocate(intSizeInBytes);
105+
new DataInputStream(is).readFully(blobData.array());
106+
107+
List<Mutation> blobsMutations = new ArrayList<Mutation>();
108+
blobsMutations.add(createInsert("data", blobData, timestamp));
109+
110+
Map<String, List<Mutation>> blobsMutationMap =
111+
new HashMap<String, List<Mutation>>();
112+
blobsMutationMap.put("Blobs", blobsMutations);
113+
114+
mutationMap.put(blobPath + '/' + blobName, blobsMutationMap);
115+
116+
// Insert the blobName into BlobNames.
117+
118+
ByteBuffer size = utf8.encode(Long.toString(sizeInBytes));
119+
120+
List<Mutation> blobNamesMutations = new ArrayList<Mutation>();
121+
blobNamesMutations.add(createInsert(blobName, size, timestamp));
122+
123+
Map<String, List<Mutation>> blobNamesMutationMap =
124+
new HashMap<String, List<Mutation>>();
125+
blobNamesMutationMap.put("BlobNames", blobNamesMutations);
126+
127+
mutationMap.put(blobPath, blobNamesMutationMap);
128+
129+
client.batch_mutate(
130+
keySpace, mutationMap, ConsistencyLevel.QUORUM);
131+
}
132+
133+
private Mutation createInsert(String name, ByteBuffer value, long timestamp) {
134+
return new Mutation().setColumn_or_supercolumn(
135+
new ColumnOrSuperColumn().setColumn(
136+
new Column(
137+
utf8.encode(name),
138+
value,
139+
timestamp)));
140+
}
55141
}

0 commit comments

Comments
 (0)