22
22
import org .elasticsearch .common .blobstore .BlobMetaData ;
23
23
import org .elasticsearch .common .blobstore .BlobPath ;
24
24
import org .elasticsearch .common .blobstore .support .AbstractBlobContainer ;
25
- import org .elasticsearch .common .blobstore .support .PlainBlobMetaData ;
26
25
import org .elasticsearch .common .collect .ImmutableMap ;
27
26
import org .elasticsearch .common .logging .ESLogger ;
28
27
import org .elasticsearch .common .logging .Loggers ;
29
28
30
- import org .apache .cassandra .thrift .Cassandra ;
31
- import org .apache .cassandra .thrift .Column ;
32
- import org .apache .cassandra .thrift .ColumnParent ;
33
- import org .apache .cassandra .thrift .ColumnPath ;
34
- import org .apache .cassandra .thrift .ColumnOrSuperColumn ;
35
- import org .apache .cassandra .thrift .ConsistencyLevel ;
36
- import org .apache .cassandra .thrift .Deletion ;
37
- import org .apache .cassandra .thrift .InvalidRequestException ;
38
- import org .apache .cassandra .thrift .Mutation ;
39
- import org .apache .cassandra .thrift .SlicePredicate ;
40
- import org .apache .cassandra .thrift .SliceRange ;
41
- import org .apache .cassandra .thrift .TimedOutException ;
42
- import org .apache .cassandra .thrift .UnavailableException ;
43
-
44
- import org .apache .thrift .TException ;
45
-
46
29
import javax .annotation .Nullable ;
47
30
import java .io .IOException ;
48
- import java .nio .charset .Charset ;
49
- import java .nio .ByteBuffer ;
50
- import java .util .ArrayList ;
51
- import java .util .HashMap ;
52
- import java .util .List ;
53
- import java .util .Map ;
54
31
55
32
/**
56
33
* @author Tom May ([email protected] )
57
34
*/
58
35
public class AbstractCassandraBlobContainer extends AbstractBlobContainer {
59
-
60
- protected static final Charset utf8 = Charset .forName ("UTF-8" );
61
-
62
- protected static final String keySpace = "ElasticSearch" ;
63
-
64
36
protected final ESLogger logger = Loggers .getLogger (getClass ());
65
37
66
38
protected final String blobPath ;
@@ -75,172 +47,22 @@ public AbstractCassandraBlobContainer(BlobPath path, CassandraBlobStore blobStor
75
47
}
76
48
77
49
@ Override public boolean blobExists (String blobName ) {
78
- logger .debug ("TODO blobExists {}" , blobKey (blobName ));
79
- try {
80
- Cassandra .Client client =
81
- CassandraClientFactory .getCassandraClient ();
82
- try {
83
- return client .get_count (
84
- keySpace ,
85
- blobKey (blobName ),
86
- new ColumnParent ("Blobs" ),
87
- ConsistencyLevel .QUORUM ) != 0 ;
88
- }
89
- finally {
90
- CassandraClientFactory .closeCassandraClient (client );
91
- }
92
- } catch (Exception e ) {
93
- return false ;
94
- }
50
+ return blobStore .blobExists (blobPath , blobName );
95
51
}
96
52
97
53
@ Override public boolean deleteBlob (String blobName ) throws IOException {
98
- logger .debug ("deleteBlob {}" , blobKey (blobName ));
99
- Cassandra .Client client =
100
- CassandraClientFactory .getCassandraClient ();
101
- try {
102
- long timestamp = System .currentTimeMillis ();
103
-
104
- Map <String , Map <String , List <Mutation >>> mutationMap =
105
- new HashMap <String , Map <String , List <Mutation >>>();
106
-
107
- // Delete the blob data from Blobs.
108
-
109
- List <Mutation > blobsMutations = new ArrayList <Mutation >();
110
- blobsMutations .add (createDelete (null , timestamp ));
111
-
112
- Map <String , List <Mutation >> blobsMutationMap =
113
- new HashMap <String , List <Mutation >>();
114
- blobsMutationMap .put ("Blobs" , blobsMutations );
115
-
116
- mutationMap .put (blobKey (blobName ), blobsMutationMap );
117
-
118
- // Delete the blobName from BlobNames.
119
-
120
- List <Mutation > blobNamesMutations = new ArrayList <Mutation >();
121
- blobNamesMutations .add (createDelete (blobName , timestamp ));
122
-
123
- Map <String , List <Mutation >> blobNamesMutationMap =
124
- new HashMap <String , List <Mutation >>();
125
- blobNamesMutationMap .put ("BlobNames" , blobNamesMutations );
126
-
127
- mutationMap .put (blobPath , blobNamesMutationMap );
128
-
129
- client .batch_mutate (
130
- keySpace , mutationMap , ConsistencyLevel .QUORUM );
131
-
132
- return true ;
133
- }
134
- catch (Exception e ) {
135
- // TODO S3 does this, what's the deal with returning false
136
- // vs. throwing IOException?
137
- return false ;
138
- }
139
- finally {
140
- CassandraClientFactory .closeCassandraClient (client );
141
- }
54
+ return blobStore .deleteBlob (blobPath , blobName );
142
55
}
143
56
144
- @ Override public void readBlob (final String blobName , final ReadBlobListener listener ) {
145
- logger .debug ("readBlob {}" , blobKey (blobName ));
146
- blobStore .executor ().execute (new Runnable () {
147
- @ Override public void run () {
148
- Cassandra .Client client = null ;
149
- try {
150
- client = CassandraClientFactory .getCassandraClient ();
151
- readBlob (client , blobName , listener );
152
- }
153
- catch (Exception ex ) {
154
- listener .onFailure (ex );
155
- }
156
- finally {
157
- if (client != null ) {
158
- CassandraClientFactory .closeCassandraClient (client );
159
- }
160
- }
161
- }
162
- });
163
- }
164
-
165
- private void readBlob (Cassandra .Client client , String blobName , ReadBlobListener listener )
166
- throws Exception
167
- {
168
- ColumnOrSuperColumn columnOrSuperColumn = client .get (
169
- keySpace ,
170
- blobKey (blobName ),
171
- new ColumnPath ("Blobs" ).setColumn (utf8 .encode ("data" )),
172
- ConsistencyLevel .QUORUM );
173
- Column column = columnOrSuperColumn .getColumn ();
174
- byte [] blobData = column .getValue ();
175
- logger .debug ("Read {} bytes: {}" , blobKey (blobName ), blobData .length );
176
- listener .onPartial (blobData , 0 , blobData .length );
177
- listener .onCompleted ();
57
+ @ Override public void readBlob (String blobName , ReadBlobListener listener ) {
58
+ blobStore .readBlob (blobPath , blobName , listener );
178
59
}
179
60
180
61
@ Override public ImmutableMap <String , BlobMetaData > listBlobsByPrefix (@ Nullable String blobNamePrefix ) throws IOException {
181
- logger .debug ("listBlobsByPrefix {}" , blobKey (blobNamePrefix ));
182
-
183
- List <ColumnOrSuperColumn > columns ;
184
- Cassandra .Client client = CassandraClientFactory .getCassandraClient ();
185
- try {
186
- columns = client .get_slice (
187
- keySpace ,
188
- blobPath ,
189
- new ColumnParent ("BlobNames" ),
190
- new SlicePredicate ().setSlice_range (
191
- new SliceRange ()
192
- .setStart (new byte [0 ])
193
- .setFinish (new byte [0 ])
194
- .setCount (1000000000 )),
195
- ConsistencyLevel .QUORUM );
196
- }
197
- catch (InvalidRequestException ex ) {
198
- throw new IOException ("Cassandra get_slice on ???:??? failed" , ex );
199
- }
200
- catch (UnavailableException ex ) {
201
- throw new IOException ("Cassandra get_slice on ???:??? failed" , ex );
202
- }
203
- catch (TimedOutException ex ) {
204
- throw new IOException ("Cassandra get_slice on ???:??? failed" , ex );
205
- }
206
- catch (TException ex ) {
207
- throw new IOException ("Cassandra get_slice on ???:??? failed" , ex );
208
- }
209
- finally {
210
- CassandraClientFactory .closeCassandraClient (client );
211
- }
212
-
213
- ImmutableMap .Builder <String , BlobMetaData > blobsBuilder = ImmutableMap .builder ();
214
-
215
- for (ColumnOrSuperColumn columnOrSuperColumn : columns ) {
216
- Column column = columnOrSuperColumn .getColumn ();
217
- String name = new String (column .getName (), utf8 );
218
- long length = Integer .parseInt (new String (column .getValue (), utf8 ));
219
- logger .debug ("name: {} length: {}" , name , length );
220
- if (blobNamePrefix == null || name .startsWith (blobNamePrefix )) {
221
- blobsBuilder .put (name , new PlainBlobMetaData (name , length ));
222
- }
223
- }
224
-
225
- return blobsBuilder .build ();
62
+ return blobStore .listBlobsByPrefix (blobPath , blobNamePrefix );
226
63
}
227
64
228
65
@ Override public ImmutableMap <String , BlobMetaData > listBlobs () throws IOException {
229
66
return listBlobsByPrefix (null );
230
67
}
231
-
232
- protected String blobKey (String blobName ) {
233
- return blobPath + '/' + blobName ;
234
- }
235
-
236
- private Mutation createDelete (String name , long timestamp ) {
237
- Deletion deletion = new Deletion (timestamp );
238
- if (name != null ) {
239
- List <ByteBuffer > columnNames = new ArrayList <ByteBuffer >(1 );
240
- columnNames .add (utf8 .encode (name ));
241
- deletion .setPredicate (
242
- new SlicePredicate ().setColumn_names (columnNames ));
243
- }
244
- return new Mutation ().setDeletion (deletion );
245
- }
246
68
}
0 commit comments