Skip to content

Commit dde898d

Browse files
author
Costin Leau
committed
add compression awareness to HdfsResources
SHDP-86 as long as the compression contains codecs, the inputstream of compressed files is automatically decompressed. further more, this behaviour can be disabled if needed.
1 parent 4a4cbda commit dde898d

File tree

3 files changed

+97
-10
lines changed

3 files changed

+97
-10
lines changed

src/main/java/org/springframework/data/hadoop/fs/HdfsResource.java

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@
2727
import org.apache.hadoop.fs.LocalFileSystem;
2828
import org.apache.hadoop.fs.Path;
2929
import org.apache.hadoop.fs.RawLocalFileSystem;
30+
import org.apache.hadoop.io.compress.CodecPool;
31+
import org.apache.hadoop.io.compress.CompressionCodec;
32+
import org.apache.hadoop.io.compress.CompressionCodecFactory;
33+
import org.apache.hadoop.io.compress.Decompressor;
3034
import org.springframework.core.io.ContextResource;
3135
import org.springframework.core.io.Resource;
3236
import org.springframework.util.Assert;
@@ -45,16 +49,17 @@ class HdfsResource implements ContextResource {
4549
private final FileSystem fs;
4650
private boolean exists;
4751
private final FileStatus status;
52+
private final CompressionCodecFactory codecsFactory;
4853

49-
HdfsResource(String location, FileSystem fs) {
50-
this(location, null, fs);
54+
HdfsResource(String location, FileSystem fs, CompressionCodecFactory codecsFactory) {
55+
this(location, null, fs, codecsFactory);
5156
}
5257

53-
HdfsResource(String parent, String child, FileSystem fs) {
54-
this(StringUtils.hasText(child) ? new Path(new Path(URI.create(parent)), new Path(URI.create(child))) : new Path(URI.create(parent)), fs);
58+
HdfsResource(String parent, String child, FileSystem fs, CompressionCodecFactory codecsFactory) {
59+
this(StringUtils.hasText(child) ? new Path(new Path(URI.create(parent)), new Path(URI.create(child))) : new Path(URI.create(parent)), fs, codecsFactory);
5560
}
5661

57-
HdfsResource(Path path, FileSystem fs) {
62+
HdfsResource(Path path, FileSystem fs, CompressionCodecFactory codecsFactory) {
5863
Assert.notNull(path, "a valid path is required");
5964
Assert.notNull(fs, "non null file system required");
6065

@@ -76,6 +81,7 @@ class HdfsResource implements ContextResource {
7681
} catch (Exception ex) {
7782
}
7883
this.status = status;
84+
this.codecsFactory = codecsFactory;
7985
}
8086

8187

@@ -89,7 +95,7 @@ public long contentLength() throws IOException {
8995
}
9096

9197
public Resource createRelative(String relativePath) throws IOException {
92-
return new HdfsResource(location, relativePath, fs);
98+
return new HdfsResource(location, relativePath, fs, codecsFactory);
9399
}
94100

95101
public boolean exists() {
@@ -142,7 +148,22 @@ public long lastModified() throws IOException {
142148

143149
public InputStream getInputStream() throws IOException {
144150
if (exists) {
145-
return fs.open(path);
151+
InputStream stream = fs.open(path);
152+
153+
if (codecsFactory != null) {
154+
CompressionCodec codec = codecsFactory.getCodec(path);
155+
if (codec != null) {
156+
// the pool is not used since the returned inputstream needs to be decorated to return the decompressor on close
157+
// which can mask the actual stream
158+
// it's also unclear whether the pool is actually useful or not
159+
// Decompressor decompressor = CodecPool.getDecompressor(codec);
160+
// stream = (decompressor != null ? codec.createInputStream(stream, decompressor) : codec.createInputStream(stream));
161+
162+
stream = codec.createInputStream(stream);
163+
}
164+
}
165+
166+
return stream;
146167
}
147168
throw new IOException("Cannot open stream for " + getDescription());
148169
}

src/main/java/org/springframework/data/hadoop/fs/HdfsResourceLoader.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.hadoop.fs.FileStatus;
2727
import org.apache.hadoop.fs.FileSystem;
2828
import org.apache.hadoop.fs.Path;
29+
import org.apache.hadoop.io.compress.CompressionCodecFactory;
2930
import org.springframework.beans.factory.DisposableBean;
3031
import org.springframework.core.PriorityOrdered;
3132
import org.springframework.core.io.Resource;
@@ -48,6 +49,8 @@ public class HdfsResourceLoader implements ResourcePatternResolver, PriorityOrde
4849
private final FileSystem fs;
4950
private final PathMatcher pathMatcher = new AntPathMatcher();
5051
private final boolean internalFS;
52+
private volatile boolean useCodecs = true;
53+
private volatile CompressionCodecFactory codecsFactory;
5154

5255
/**
5356
* Constructs a new <code>HdfsResourceLoader</code> instance.
@@ -70,6 +73,7 @@ public HdfsResourceLoader(Configuration config, URI uri, String user) {
7073

7174
internalFS = true;
7275
FileSystem tempFS = null;
76+
codecsFactory = new CompressionCodecFactory(config);
7377

7478
try {
7579
if (uri == null) {
@@ -103,6 +107,7 @@ public HdfsResourceLoader(FileSystem fs) {
103107
Assert.notNull(fs, "a non-null file-system required");
104108
this.fs = fs;
105109
internalFS = false;
110+
codecsFactory = new CompressionCodecFactory(fs.getConf());
106111
}
107112

108113
/**
@@ -119,7 +124,11 @@ public ClassLoader getClassLoader() {
119124
}
120125

121126
public Resource getResource(String location) {
122-
return new HdfsResource(location, fs);
127+
return new HdfsResource(location, fs, codecs());
128+
}
129+
130+
private CompressionCodecFactory codecs() {
131+
return (useCodecs ? codecsFactory : null);
123132
}
124133

125134
public Resource[] getResources(String locationPattern) throws IOException {
@@ -209,15 +218,15 @@ private void doRetrieveMatchingResources(Path rootDir, String subPattern, Set<Re
209218
}
210219

211220
else if (pathMatcher.match(subPattern, location)) {
212-
results.add(new HdfsResource(p, fs));
221+
results.add(new HdfsResource(p, fs, codecs()));
213222
}
214223
}
215224
}
216225
}
217226

218227
// Remove "if" to allow folders to be added as well
219228
else if (pathMatcher.match(subPattern, stripPrefix(rootDir.toUri().getPath()))) {
220-
results.add(new HdfsResource(rootDir, fs));
229+
results.add(new HdfsResource(rootDir, fs, codecs()));
221230
}
222231
}
223232

@@ -241,4 +250,15 @@ public void close() throws IOException {
241250
fs.close();
242251
}
243252
}
253+
254+
/**
255+
* Indicates whether to use (or not) the codecs found inside the Hadoop configuration.
256+
* This affects the content of the streams backing this resource - whether the raw content is delivered as is
257+
* or decompressed on the fly (if the configuration allows it so). The latter is the default.
258+
*
259+
* @param useCodecs whether to use any codecs defined in the Hadoop configuration
260+
*/
261+
public void setUseCodecs(boolean useCodecs) {
262+
this.useCodecs = useCodecs;
263+
}
244264
}

src/test/java/org/springframework/data/hadoop/fs/HdfsResouceLoaderTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
*/
1616
package org.springframework.data.hadoop.fs;
1717

18+
import java.io.ByteArrayInputStream;
1819
import java.io.InputStream;
20+
import java.io.OutputStream;
1921
import java.net.URI;
2022
import java.net.URL;
2123
import java.util.Arrays;
@@ -26,6 +28,8 @@
2628
import org.apache.hadoop.fs.FileSystem;
2729
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
2830
import org.apache.hadoop.fs.Path;
31+
import org.apache.hadoop.io.compress.DecompressorStream;
32+
import org.apache.hadoop.io.compress.DefaultCodec;
2933
import org.apache.hadoop.security.UserGroupInformation;
3034
import org.junit.After;
3135
import org.junit.Before;
@@ -230,4 +234,46 @@ public void testFilesMatchWithPrefix() throws Exception {
230234
fs.delete(new Path("local/"), true);
231235
}
232236
}
237+
238+
@Test
239+
public void testDecompressedStream() throws Exception {
240+
DefaultCodec codec = new DefaultCodec();
241+
codec.setConf(fs.getConf());
242+
String name = "local/" + UUID.randomUUID() + codec.getDefaultExtension();
243+
OutputStream outputStream = codec.createOutputStream(fs.create(new Path(name)));
244+
byte[] content = name.getBytes();
245+
outputStream.write(content);
246+
outputStream.close();
247+
248+
Resource resource = loader.getResource(name);
249+
assertNotNull(resource);
250+
InputStream inputStream = resource.getInputStream();
251+
assertEquals(DecompressorStream.class, inputStream.getClass());
252+
assertTrue(TestUtils.compareStreams(new ByteArrayInputStream(content), inputStream));
253+
}
254+
255+
@Test
256+
public void testCompressedStream() throws Exception {
257+
258+
DefaultCodec codec = new DefaultCodec();
259+
codec.setConf(fs.getConf());
260+
String name = "local/" + UUID.randomUUID() + codec.getDefaultExtension();
261+
OutputStream outputStream = codec.createOutputStream(fs.create(new Path(name)));
262+
byte[] content = name.getBytes();
263+
outputStream.write(content);
264+
outputStream.close();
265+
266+
loader.setUseCodecs(false);
267+
268+
try {
269+
Resource resource = loader.getResource(name);
270+
assertNotNull(resource);
271+
InputStream inputStream = resource.getInputStream();
272+
System.out.println(inputStream.getClass());
273+
assertFalse(DecompressorStream.class.equals(inputStream.getClass()));
274+
assertFalse(TestUtils.compareStreams(new ByteArrayInputStream(content), inputStream));
275+
} finally {
276+
loader.setUseCodecs(true);
277+
}
278+
}
233279
}

0 commit comments

Comments
 (0)