Skip to content

Commit 579a67b

Browse files
committed
improve buffered output
- use the same buffer size if wrapping a buffered output - directly call flush on the checksum wrapper
1 parent 574d7cc commit 579a67b

File tree

3 files changed

+34
-22
lines changed

3 files changed

+34
-22
lines changed

src/main/java/org/elasticsearch/common/lucene/store/BufferedChecksumIndexOutput.java renamed to src/main/java/org/apache/lucene/store/BufferedChecksumIndexOutput.java

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@
1717
* under the License.
1818
*/
1919

20-
package org.elasticsearch.common.lucene.store;
21-
22-
import org.apache.lucene.store.BufferedIndexOutput;
23-
import org.apache.lucene.store.IndexOutput;
20+
package org.apache.lucene.store;
2421

2522
import java.io.IOException;
2623
import java.util.zip.Checksum;
@@ -29,15 +26,19 @@
2926
*/
3027
public class BufferedChecksumIndexOutput extends BufferedIndexOutput {
3128

32-
private final IndexOutput out;
33-
29+
private final IndexOutput delegate;
30+
private final BufferedIndexOutput bufferedDelegate;
3431
private final Checksum digest;
3532

36-
public BufferedChecksumIndexOutput(IndexOutput out, Checksum digest) {
37-
// we add 8 to be bigger than the default BufferIndexOutput buffer size so any flush will go directly
38-
// to the output without being copied over to the delegate buffer
39-
super(BufferedIndexOutput.DEFAULT_BUFFER_SIZE + 64);
40-
this.out = out;
33+
public BufferedChecksumIndexOutput(IndexOutput delegate, Checksum digest) {
34+
super(delegate instanceof BufferedIndexOutput ? ((BufferedIndexOutput) delegate).getBufferSize() : BufferedIndexOutput.DEFAULT_BUFFER_SIZE);
35+
if (delegate instanceof BufferedIndexOutput) {
36+
bufferedDelegate = (BufferedIndexOutput) delegate;
37+
this.delegate = delegate;
38+
} else {
39+
this.delegate = delegate;
40+
bufferedDelegate = null;
41+
}
4142
this.digest = digest;
4243
}
4344

@@ -46,7 +47,7 @@ public Checksum digest() {
4647
}
4748

4849
public IndexOutput underlying() {
49-
return this.out;
50+
return this.delegate;
5051
}
5152

5253
// don't override it, base class method simple reads from input and writes to this output
@@ -59,14 +60,18 @@ public void close() throws IOException {
5960
try {
6061
super.close();
6162
} finally {
62-
out.close();
63+
delegate.close();
6364
}
6465

6566
}
6667

6768
@Override
6869
protected void flushBuffer(byte[] b, int offset, int len) throws IOException {
69-
out.writeBytes(b, offset, len);
70+
if (bufferedDelegate != null) {
71+
bufferedDelegate.flushBuffer(b, offset, len);
72+
} else {
73+
delegate.writeBytes(b, offset, len);
74+
}
7075
digest.update(b, offset, len);
7176
}
7277

@@ -77,8 +82,11 @@ protected void flushBuffer(byte[] b, int offset, int len) throws IOException {
7782

7883
@Override
7984
public void flush() throws IOException {
80-
super.flush();
81-
out.flush();
85+
try {
86+
super.flush();
87+
} finally {
88+
delegate.flush();
89+
}
8290
}
8391

8492
@Override
@@ -87,21 +95,21 @@ public void seek(long pos) throws IOException {
8795
// but a checksum of the bytes written to this stream, which is the same for each
8896
// type of file in lucene
8997
super.seek(pos);
90-
out.seek(pos);
98+
delegate.seek(pos);
9199
}
92100

93101
@Override
94102
public long length() throws IOException {
95-
return out.length();
103+
return delegate.length();
96104
}
97105

98106
@Override
99107
public void setLength(long length) throws IOException {
100-
out.setLength(length);
108+
delegate.setLength(length);
101109
}
102110

103111
@Override
104112
public String toString() {
105-
return out.toString();
113+
return delegate.toString();
106114
}
107115
}

src/main/java/org/apache/lucene/store/RateLimitedFSDirectory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ static final class RateLimitedIndexOutput extends BufferedIndexOutput {
8383
private final StoreRateLimiting.Listener rateListener;
8484

8585
RateLimitedIndexOutput(final RateLimiter rateLimiter, final StoreRateLimiting.Listener rateListener, final IndexOutput delegate) {
86-
// TODO if Lucene exposed in BufferedIndexOutput#getBufferSize, we could initialize it if the delegate is buffered
86+
super(delegate instanceof BufferedIndexOutput ? ((BufferedIndexOutput) delegate).getBufferSize() : BufferedIndexOutput.DEFAULT_BUFFER_SIZE);
8787
if (delegate instanceof BufferedIndexOutput) {
8888
bufferedDelegate = (BufferedIndexOutput) delegate;
8989
this.delegate = delegate;
@@ -126,6 +126,11 @@ public void flush() throws IOException {
126126
}
127127
}
128128

129+
@Override
130+
public void setLength(long length) throws IOException {
131+
delegate.setLength(length);
132+
}
133+
129134
@Override
130135
public void close() throws IOException {
131136
try {

src/main/java/org/elasticsearch/index/store/Store.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.elasticsearch.common.compress.CompressorFactory;
3232
import org.elasticsearch.common.inject.Inject;
3333
import org.elasticsearch.common.lucene.Directories;
34-
import org.elasticsearch.common.lucene.store.BufferedChecksumIndexOutput;
3534
import org.elasticsearch.common.lucene.store.ChecksumIndexOutput;
3635
import org.elasticsearch.common.settings.Settings;
3736
import org.elasticsearch.common.unit.ByteSizeValue;

0 commit comments

Comments
 (0)