Skip to content

Commit 56bfb85

Browse files
lokeshj1703voonhous
authored andcommitted
[HUDI-9206] Support reading inflight instants with HoodieLogRecordReader (apache#13010)
(cherry picked from commit 1f52b4e)
1 parent be7b731 commit 56bfb85

File tree

12 files changed

+181
-94
lines changed

12 files changed

+181
-94
lines changed

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -473,20 +473,11 @@ public Iterator<InternalRow> call(ClusteringOperation clusteringOperation) throw
473473
Configuration conf = broadcastManager.retrieveStorageConfig().get();
474474

475475
// instantiate FG reader
476-
HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<>(
477-
readerContextOpt.get(),
476+
HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<>(readerContextOpt.get(),
478477
getHoodieTable().getMetaClient().getStorage().newInstance(new StoragePath(basePath), new HadoopStorageConfiguration(conf)),
479-
basePath,
480-
instantTime,
481-
fileSlice,
482-
readerSchema,
483-
readerSchema,
484-
internalSchemaOption,
485-
getHoodieTable().getMetaClient(),
486-
getHoodieTable().getMetaClient().getTableConfig().getProps(),
487-
0,
488-
Long.MAX_VALUE,
489-
usePosition);
478+
basePath, instantTime, fileSlice, readerSchema, readerSchema, internalSchemaOption,
479+
getHoodieTable().getMetaClient(), getHoodieTable().getMetaClient().getTableConfig().getProps(),
480+
0, Long.MAX_VALUE, usePosition, false);
490481
fileGroupReader.initRecordIterators();
491482
// read records from the FG reader
492483
HoodieFileGroupReader.HoodieFileGroupReaderIterator<InternalRow> recordIterator

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkFileGroupReaderBasedMergeHandle.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -185,20 +185,11 @@ public void write() {
185185
hoodieTable.getMetaClient().getTableConfig().getProps().forEach(props::putIfAbsent);
186186
config.getProps().forEach(props::putIfAbsent);
187187
// Initializes file group reader
188-
try (HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<>(
189-
readerContext,
188+
try (HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<>(readerContext,
190189
storage.newInstance(hoodieTable.getMetaClient().getBasePath(), new HadoopStorageConfiguration(conf)),
191-
hoodieTable.getMetaClient().getBasePath().toString(),
192-
instantTime,
193-
fileSlice,
194-
writeSchemaWithMetaFields,
195-
writeSchemaWithMetaFields,
196-
internalSchemaOption,
197-
hoodieTable.getMetaClient(),
198-
props,
199-
0,
200-
Long.MAX_VALUE,
201-
usePosition)) {
190+
hoodieTable.getMetaClient().getBasePath().toString(), instantTime, fileSlice,
191+
writeSchemaWithMetaFields, writeSchemaWithMetaFields, internalSchemaOption,
192+
hoodieTable.getMetaClient(), props, 0, Long.MAX_VALUE, usePosition, false)) {
202193
fileGroupReader.initRecordIterators();
203194
// Reads the records from the file slice
204195
try (HoodieFileGroupReaderIterator<InternalRow> recordIterator

hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -136,16 +136,14 @@ public abstract class BaseHoodieLogRecordReader<T> {
136136
// Use scanV2 method.
137137
private final boolean enableOptimizedLogBlocksScan;
138138
protected FileGroupRecordBuffer<T> recordBuffer;
139+
// Allows to consider inflight instants while merging log records
140+
protected boolean allowInflightInstants;
139141

140-
protected BaseHoodieLogRecordReader(HoodieReaderContext readerContext,
141-
HoodieStorage storage,
142-
List<String> logFilePaths,
142+
protected BaseHoodieLogRecordReader(HoodieReaderContext readerContext, HoodieStorage storage, List<String> logFilePaths,
143143
boolean reverseReader, int bufferSize, Option<InstantRange> instantRange,
144-
boolean withOperationField, boolean forceFullScan,
145-
Option<String> partitionNameOverride,
146-
Option<String> keyFieldOverride,
147-
boolean enableOptimizedLogBlocksScan,
148-
FileGroupRecordBuffer<T> recordBuffer) {
144+
boolean withOperationField, boolean forceFullScan, Option<String> partitionNameOverride,
145+
Option<String> keyFieldOverride, boolean enableOptimizedLogBlocksScan, FileGroupRecordBuffer<T> recordBuffer,
146+
boolean allowInflightInstants) {
149147
this.readerContext = readerContext;
150148
this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
151149
this.latestInstantTime = readerContext.getLatestCommitTime();
@@ -196,6 +194,8 @@ protected BaseHoodieLogRecordReader(HoodieReaderContext readerContext,
196194

197195
this.partitionNameOverrideOpt = partitionNameOverride;
198196
this.recordBuffer = recordBuffer;
197+
// When the allowInflightInstants flag is enabled, records written by inflight instants are also read
198+
this.allowInflightInstants = allowInflightInstants;
199199
}
200200

201201
/**
@@ -256,8 +256,8 @@ private void scanInternalV1(Option<KeySpec> keySpecOpt) {
256256
// Skip processing a data or delete block with the instant time greater than the latest instant time used by this log record reader
257257
continue;
258258
}
259-
if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
260-
|| inflightInstantsTimeline.containsInstant(instantTime)) {
259+
if (!allowInflightInstants
260+
&& (inflightInstantsTimeline.containsInstant(instantTime) || !completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime))) {
261261
// hit an uncommitted block possibly from a failed write, move to the next one and skip processing this one
262262
continue;
263263
}
@@ -589,8 +589,8 @@ && compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), GREATER_THA
589589
continue;
590590
}
591591
if (logBlock.getBlockType() != COMMAND_BLOCK) {
592-
if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
593-
|| inflightInstantsTimeline.containsInstant(instantTime)) {
592+
if (!allowInflightInstants
593+
&& (inflightInstantsTimeline.containsInstant(instantTime)) || !completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)) {
594594
// hit an uncommitted block possibly from a failed write, move to the next one and skip processing this one
595595
continue;
596596
}

hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -63,16 +63,12 @@ public class HoodieMergedLogRecordReader<T> extends BaseHoodieLogRecordReader<T>
6363
private long totalTimeTakenToReadAndMergeBlocks;
6464

6565
@SuppressWarnings("unchecked")
66-
private HoodieMergedLogRecordReader(HoodieReaderContext<T> readerContext,
67-
HoodieStorage storage, List<String> logFilePaths, boolean reverseReader,
68-
int bufferSize, Option<InstantRange> instantRange,
69-
boolean withOperationField, boolean forceFullScan,
70-
Option<String> partitionName,
71-
Option<String> keyFieldOverride,
72-
boolean enableOptimizedLogBlocksScan,
73-
FileGroupRecordBuffer<T> recordBuffer) {
66+
private HoodieMergedLogRecordReader(HoodieReaderContext<T> readerContext, HoodieStorage storage, List<String> logFilePaths, boolean reverseReader,
67+
int bufferSize, Option<InstantRange> instantRange, boolean withOperationField, boolean forceFullScan,
68+
Option<String> partitionName, Option<String> keyFieldOverride, boolean enableOptimizedLogBlocksScan,
69+
FileGroupRecordBuffer<T> recordBuffer, boolean allowInflightInstants) {
7470
super(readerContext, storage, logFilePaths, reverseReader, bufferSize, instantRange, withOperationField,
75-
forceFullScan, partitionName, keyFieldOverride, enableOptimizedLogBlocksScan, recordBuffer);
71+
forceFullScan, partitionName, keyFieldOverride, enableOptimizedLogBlocksScan, recordBuffer, allowInflightInstants);
7672
this.scannedPrefixes = new HashSet<>();
7773

7874
if (forceFullScan) {
@@ -220,6 +216,7 @@ public static class Builder<T> extends BaseHoodieLogRecordReader.Builder<T> {
220216
private boolean enableOptimizedLogBlocksScan = false;
221217

222218
private FileGroupRecordBuffer<T> recordBuffer;
219+
private boolean allowInflightInstants = false;
223220

224221
@Override
225222
public Builder<T> withHoodieReaderContext(HoodieReaderContext<T> readerContext) {
@@ -292,6 +289,11 @@ public Builder<T> withRecordBuffer(FileGroupRecordBuffer<T> recordBuffer) {
292289
return this;
293290
}
294291

292+
public Builder<T> withAllowInflightInstants(boolean allowInflightInstants) {
293+
this.allowInflightInstants = allowInflightInstants;
294+
return this;
295+
}
296+
295297
@Override
296298
public HoodieMergedLogRecordReader<T> build() {
297299
ValidationUtils.checkArgument(recordBuffer != null, "Record Buffer is null in Merged Log Record Reader");
@@ -307,7 +309,8 @@ public HoodieMergedLogRecordReader<T> build() {
307309
withOperationField, forceFullScan,
308310
Option.ofNullable(partitionName),
309311
Option.ofNullable(keyFieldOverride),
310-
enableOptimizedLogBlocksScan, recordBuffer);
312+
enableOptimizedLogBlocksScan, recordBuffer,
313+
allowInflightInstants);
311314
}
312315
}
313316
}

hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -82,20 +82,17 @@ public final class HoodieFileGroupReader<T> implements Closeable {
8282
private ClosableIterator<T> baseFileIterator;
8383
private final Option<UnaryOperator<T>> outputConverter;
8484
private final HoodieReadStats readStats;
85+
// Allows to consider inflight instants while merging log records using HoodieMergedLogRecordReader
86+
// The inflight instants need to be considered while updating RLI records. RLI needs to fetch the revived
87+
// and deleted keys from the log files written as part of active data commit. During the RLI update,
88+
// the allowInflightInstants flag would need to be set to true. This would ensure the HoodieMergedLogRecordReader
89+
// considers the log records which are inflight.
90+
private boolean allowInflightInstants;
8591

86-
public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
87-
HoodieStorage storage,
88-
String tablePath,
89-
String latestCommitTime,
90-
FileSlice fileSlice,
91-
Schema dataSchema,
92-
Schema requestedSchema,
93-
Option<InternalSchema> internalSchemaOpt,
94-
HoodieTableMetaClient hoodieTableMetaClient,
95-
TypedProperties props,
96-
long start,
97-
long length,
98-
boolean shouldUseRecordPosition) {
92+
public HoodieFileGroupReader(HoodieReaderContext<T> readerContext, HoodieStorage storage, String tablePath,
93+
String latestCommitTime, FileSlice fileSlice, Schema dataSchema, Schema requestedSchema,
94+
Option<InternalSchema> internalSchemaOpt, HoodieTableMetaClient hoodieTableMetaClient, TypedProperties props,
95+
long start, long length, boolean shouldUseRecordPosition, boolean allowInflightInstants) {
9996
this.readerContext = readerContext;
10097
this.storage = storage;
10198
this.hoodieBaseFileOption = fileSlice.getBaseFile();
@@ -134,6 +131,7 @@ public HoodieFileGroupReader(HoodieReaderContext<T> readerContext,
134131
this.recordBuffer = getRecordBuffer(readerContext, hoodieTableMetaClient,
135132
recordMergeMode, props, hoodieBaseFileOption, this.logFiles.isEmpty(),
136133
isSkipMerge, shouldUseRecordPosition, readStats);
134+
this.allowInflightInstants = allowInflightInstants;
137135
}
138136

139137
/**
@@ -290,6 +288,7 @@ private void scanLogFiles() {
290288
.withPartition(getRelativePartitionPath(
291289
new StoragePath(path), logFiles.get(0).getPath().getParent()))
292290
.withRecordBuffer(recordBuffer)
291+
.withAllowInflightInstants(allowInflightInstants)
293292
.build()) {
294293
readStats.setTotalLogReadTimeMs(logRecordReader.getTotalTimeTakenToReadAndMergeBlocks());
295294
readStats.setTotalUpdatedRecordsCompacted(logRecordReader.getNumMergedRecordsInLog());

hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -997,6 +997,7 @@ private static Map<String, HoodieRecord> getLogRecords(List<String> logFilePaths
997997
Collections.emptyList(),
998998
datasetMetaClient.getTableConfig().getRecordMergeStrategyId());
999999

1000+
// CRITICAL: Ensure allowInflightInstants is set to true while replacing the scanner with *LogRecordReader or HoodieFileGroupReader
10001001
HoodieMergedLogRecordScanner mergedLogRecordScanner = HoodieMergedLogRecordScanner.newBuilder()
10011002
.withStorage(datasetMetaClient.getStorage())
10021003
.withBasePath(datasetMetaClient.getBasePath())

hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,7 @@ private List<T> readRecordsFromFileGroup(StorageConfiguration<?> storageConf,
329329
props,
330330
1,
331331
fileSlice.getTotalFileSize(),
332+
false,
332333
false));
333334
}
334335
HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<>(
@@ -344,6 +345,7 @@ private List<T> readRecordsFromFileGroup(StorageConfiguration<?> storageConf,
344345
props,
345346
0,
346347
fileSlice.getTotalFileSize(),
348+
false,
347349
false);
348350
fileGroupReader.initRecordIterators();
349351
while (fileGroupReader.hasNext()) {

hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieFileGroupReaderTestUtils.java

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ public static HoodieFileGroupReader<IndexedRecord> createFileGroupReader(
4545
TypedProperties properties,
4646
HoodieStorage storage,
4747
HoodieReaderContext<IndexedRecord> readerContext,
48-
HoodieTableMetaClient metaClient
49-
) {
48+
HoodieTableMetaClient metaClient,
49+
boolean allowInflightCommits) {
5050
assert (fileSliceOpt.isPresent());
5151
return new HoodieFileGroupReaderBuilder()
5252
.withReaderContext(readerContext)
@@ -55,6 +55,7 @@ public static HoodieFileGroupReader<IndexedRecord> createFileGroupReader(
5555
.withStart(start)
5656
.withLength(length)
5757
.withProperties(properties)
58+
.withAllowInflightCommits(allowInflightCommits)
5859
.build(basePath, latestCommitTime, schema, shouldUseRecordPosition, metaClient);
5960
}
6061

@@ -65,6 +66,7 @@ public static class HoodieFileGroupReaderBuilder {
6566
private TypedProperties props;
6667
private long start;
6768
private long length;
69+
private boolean allowInflightCommits = false;
6870

6971
public HoodieFileGroupReaderBuilder withReaderContext(
7072
HoodieReaderContext<IndexedRecord> context) {
@@ -97,6 +99,11 @@ public HoodieFileGroupReaderBuilder withLength(long length) {
9799
return this;
98100
}
99101

102+
public HoodieFileGroupReaderBuilder withAllowInflightCommits(boolean allowInflightCommits) {
103+
this.allowInflightCommits = allowInflightCommits;
104+
return this;
105+
}
106+
100107
public HoodieFileGroupReader<IndexedRecord> build(
101108
String basePath,
102109
String latestCommitTime,
@@ -108,20 +115,8 @@ public HoodieFileGroupReader<IndexedRecord> build(
108115
props.setProperty(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key(), basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME);
109116
props.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), ExternalSpillableMap.DiskMapType.ROCKS_DB.name());
110117
props.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), "false");
111-
return new HoodieFileGroupReader<>(
112-
readerContext,
113-
storage,
114-
basePath,
115-
latestCommitTime,
116-
fileSlice,
117-
schema,
118-
schema,
119-
Option.empty(),
120-
metaClient,
121-
props,
122-
start,
123-
length,
124-
shouldUseRecordPosition);
118+
return new HoodieFileGroupReader<>(readerContext, storage, basePath, latestCommitTime, fileSlice,
119+
schema, schema, Option.empty(), metaClient, props, start, length, shouldUseRecordPosition, allowInflightCommits);
125120
}
126121
}
127122
}

0 commit comments

Comments
 (0)