|
28 | 28 |
|
29 | 29 | package com.elasticinbox.core.blob.store; |
30 | 30 |
|
| 31 | +import static com.elasticinbox.core.blob.store.BlobStoreConstants.*; |
| 32 | + |
31 | 33 | import java.io.IOException; |
32 | 34 | import java.io.InputStream; |
33 | 35 | import java.net.URI; |
34 | 36 | import java.security.GeneralSecurityException; |
35 | 37 | import java.util.UUID; |
36 | 38 |
|
| 39 | +import org.slf4j.Logger; |
| 40 | +import org.slf4j.LoggerFactory; |
| 41 | + |
37 | 42 | import com.elasticinbox.common.utils.Assert; |
38 | 43 | import com.elasticinbox.config.Configurator; |
39 | 44 | import com.elasticinbox.config.DatabaseConstants; |
40 | 45 | import com.elasticinbox.core.blob.BlobDataSource; |
41 | 46 | import com.elasticinbox.core.blob.BlobURI; |
42 | 47 | import com.elasticinbox.core.blob.compression.CompressionHandler; |
| 48 | +import com.elasticinbox.core.blob.compression.DeflateCompressionHandler; |
43 | 49 | import com.elasticinbox.core.blob.encryption.EncryptionHandler; |
44 | 50 | import com.elasticinbox.core.model.Mailbox; |
| 51 | +import com.google.common.io.ByteStreams; |
| 52 | +import com.google.common.io.FileBackedOutputStream; |
45 | 53 |
|
46 | 54 | /** |
47 | | - * Blob storage mediator is an abstraction layer which contains logic which |
| 55 | + * Blob storage mediator is an abstraction layer containing logic which |
48 | 56 | * determines where to store or how to access given blob. |
49 | 57 | * |
50 | 58 | * @author Rustam Aliyev |
51 | 59 | */ |
52 | 60 | public final class BlobStorageMediator implements BlobStorage |
53 | 61 | { |
| 62 | + private static final Logger logger = |
| 63 | + LoggerFactory.getLogger(BlobStorageMediator.class); |
| 64 | + |
| 65 | + protected final CompressionHandler compressionHandler; |
| 66 | + |
54 | 67 | private BlobStorage cloudBlobStorage; |
55 | 68 | private BlobStorage dbBlobStorage; |
56 | 69 |
|
57 | | - public BlobStorageMediator(CompressionHandler ch, EncryptionHandler eh) { |
58 | | - cloudBlobStorage = new CloudBlobStorage(ch, eh); |
59 | | - dbBlobStorage = new CassandraBlobStorage(ch, eh); |
| 70 | + /** |
| 71 | + * Initialise mediator with compression and encryption handlers for writes. |
| 72 | + * To disable compression/encryption set value to null. |
| 73 | + * |
| 74 | + * @param ch |
| 75 | + * Injected compression handler |
| 76 | + * @param eh |
| 77 | + * Injected encryption handler |
| 78 | + */ |
| 79 | + public BlobStorageMediator(final CompressionHandler ch, final EncryptionHandler eh) |
| 80 | + { |
| 81 | + this.compressionHandler = ch; |
| 82 | + cloudBlobStorage = new CloudBlobStorage(eh); |
| 83 | + dbBlobStorage = new CassandraBlobStorage(); |
60 | 84 | } |
61 | | - |
62 | | - public URI write(UUID messageId, Mailbox mailbox, String profileName, |
63 | | - InputStream in, Long size) throws IOException, |
| 85 | + |
| 86 | + public BlobURI write(final UUID messageId, final Mailbox mailbox, final String profileName, |
| 87 | + final InputStream in, final Long size) throws IOException, |
64 | 88 | GeneralSecurityException |
65 | 89 | { |
66 | 90 | Assert.notNull(in, "No data to store"); |
67 | 91 |
|
68 | | - if (size <= Configurator.getDatabaseBlobMaxSize()) { |
69 | | - return dbBlobStorage.write(messageId, mailbox, null, in, size); |
| 92 | + BlobURI blobUri; |
| 93 | + InputStream in1; |
| 94 | + Long updatedSize = size; |
| 95 | + boolean compressed = false; |
| 96 | + |
| 97 | + // compress stream and calculate compressed size |
| 98 | + if ((compressionHandler != null) && (size > MIN_COMPRESS_SIZE)) |
| 99 | + { |
| 100 | + InputStream compressedInputStream = compressionHandler.compress(in); |
| 101 | + FileBackedOutputStream fbout = new FileBackedOutputStream(MAX_MEMORY_FILE_SIZE, true); |
| 102 | + updatedSize = ByteStreams.copy(compressedInputStream, fbout); |
| 103 | + in1 = fbout.getSupplier().getInput(); |
| 104 | + compressed = true; |
| 105 | + } else { |
| 106 | + in1 = in; |
| 107 | + } |
| 108 | + |
| 109 | + if (updatedSize <= Configurator.getDatabaseBlobMaxSize()) |
| 110 | + { |
| 111 | + logger.debug( |
| 112 | + "Storing Blob in the database because size ({}KB) was less than database threshold {}KB", |
| 113 | + updatedSize, Configurator.getDatabaseBlobMaxSize()); |
| 114 | + blobUri = dbBlobStorage.write(messageId, mailbox, null, in1, updatedSize); |
70 | 115 | } else { |
71 | | - return cloudBlobStorage.write(messageId, mailbox, Configurator.getBlobStoreWriteProfileName(), in, size); |
| 116 | + logger.debug( |
| 117 | + "Storing Blob in the cloud because size ({}KB) was greater than database threshold {}KB", |
| 118 | + updatedSize, Configurator.getDatabaseBlobMaxSize()); |
| 119 | + blobUri = cloudBlobStorage.write(messageId, mailbox, Configurator.getBlobStoreWriteProfileName(), in1, updatedSize); |
72 | 120 | } |
| 121 | + |
| 122 | + // add compression information to the blob URI |
| 123 | + if (compressed) { |
| 124 | + blobUri.setCompression(compressionHandler.getType()); |
| 125 | + } |
| 126 | + |
| 127 | + return blobUri; |
73 | 128 | } |
74 | 129 |
|
75 | | - public BlobDataSource read(URI uri) throws IOException |
| 130 | + public BlobDataSource read(final URI uri) throws IOException |
76 | 131 | { |
77 | 132 | // check if blob was stored for the message |
78 | 133 | Assert.notNull(uri, "URI cannot be null"); |
79 | 134 |
|
80 | | - boolean isDbProfile = new BlobURI().fromURI(uri).getProfile() |
81 | | - .equals(DatabaseConstants.DATABASE_PROFILE); |
| 135 | + BlobDataSource blobDS; |
| 136 | + BlobURI blobUri = new BlobURI().fromURI(uri); |
82 | 137 |
|
83 | | - if (isDbProfile) { |
84 | | - return dbBlobStorage.read(uri); |
| 138 | + if (blobUri.getProfile().equals(DatabaseConstants.DATABASE_PROFILE)) { |
| 139 | + blobDS = dbBlobStorage.read(uri); |
| 140 | + } else { |
| 141 | + blobDS = cloudBlobStorage.read(uri); |
| 142 | + } |
| 143 | + |
| 144 | + // if compressed, add compression handler to data source |
| 145 | + if ((blobUri.getCompression() != null && blobUri.getCompression() |
| 146 | + .equals(DeflateCompressionHandler.COMPRESSION_TYPE_DEFLATE)) || |
| 147 | + // TODO: deprecated suffix based compression detection |
| 148 | + // kept for backward compatibility with 0.3 |
| 149 | + blobUri.getName().endsWith(BlobStoreConstants.COMPRESS_SUFFIX)) |
| 150 | + { |
| 151 | + CompressionHandler ch = new DeflateCompressionHandler(); |
| 152 | + return new BlobDataSource(uri, blobDS.getInputStream(), ch); |
85 | 153 | } else { |
86 | | - return cloudBlobStorage.read(uri); |
| 154 | + return blobDS; |
87 | 155 | } |
88 | 156 | } |
89 | 157 |
|
90 | | - public void delete(URI uri) throws IOException |
| 158 | + public void delete(final URI uri) throws IOException |
91 | 159 | { |
92 | 160 | // check if blob was stored for the message, silently skip otherwise |
93 | 161 | if (uri == null) { |
|
0 commit comments