-
Notifications
You must be signed in to change notification settings - Fork 2.4k
[HUDI-9318] Refactor the log records presentation in FileGroupRecordB… #13213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
Outdated
Show resolved
Hide resolved
return isDelete; | ||
} | ||
|
||
public boolean isDeleteRecordWithNaturalOrder() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
naturalOrdering
is a confusing concept to me. usually natural ordering would be something like 1,2,3 but in this case 0 is greater than 1, 2, and 3. I think having some other naming would be helpful here like hardDelete
or forcedDeletion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, isDeleteRecordWithNaturalOrder
is checked for queries like DELETE FROM TABLE
, if so, +1 for hardDelete
. WDYT, cc @nsivabalan @yihua @danny0405
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's keep it as it is now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, lets not make too many changes in this patch.
f9b68bf
to
2474394
Compare
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
Outdated
Show resolved
Hide resolved
@cshuo , how much memory does it save? |
* @param record The engine row | ||
* @return row with binary format | ||
*/ | ||
public abstract T toBinaryRow(T record); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add some context for why this is required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of enforcing all implementations to have a concept of a Binary version, can we move this into the code that reads the data into format T
? Or possible move it to the seal
functionality?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We think through this and think should add a ne interface for 2 reasons:
seal
is only for copy purposes, not care about the row format itself;toBinaryRow
will just focus on row format transformation.
Thus to avoid unnecessary copy or transf of rows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should only enforce the row that would be put in spillable map has binary format. And seal
is called in multiple places, e.g., baseRecord
read from base file is also sealed, where toBinary
is not necessary to avoid additional costs. That's the context we introduce a new method toBinary
to separate seal
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can move this conversion directly to the code that converts from the avro to the row. This doesn't need to be part of the abstract class in my opinion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also think that the UnsafeRow
conversion is Spark only and that can be done in the log record iterator where the Avro record is deserialized to UnsafeRow
, without adding the public API method toBinaryRow
to HoodieReaderContext
. However I see there is difficulty due to schema evolution and other handling that require more refactoring to achieve the goal. If we want to keep toBinaryRow
in this PR, mark it as @Deprecated
or @PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED)
so further usage is disallowed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides Spark unsafe row conversion, Flink also needs convert GenericRowData
info BinaryRowData
. And binary conversion is needed not just in log reading, it's also necessary after merging in fg reader, where the merged row may be not binary format either, so currently it seems necessary to make HoodieReaderContext
capable of performing binary conversion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also make this the responsibility of the merger though to return an optimal representation. If the merger returns an avro, we will run it through the same conversion code used when reading the log files
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you fix PR description to all out what changes we are doing in this patch.
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
Outdated
Show resolved
Hide resolved
@@ -89,6 +89,8 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> { | |||
*/ | |||
private final transient StructType schema; | |||
|
|||
private Comparable<?> orderingValue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we make it transient
here, not sure Kyro serialization will ignore this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to confirm: Kyro would ignore the transient
value
updated |
if (internalRow instanceof UnsafeRow) { | ||
return internalRow; | ||
} | ||
final UnsafeProjection unsafeProjection = HoodieInternalRowUtils.getCachedUnsafeProjection(schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's create a follow-up JIRA to see if we can further simplify this part by considering schema evolution cc @jonvex , without having to get the projection instance per row.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issure created: https://issues.apache.org/jira/browse/HUDI-9337
* @param recordOption An option of record. | ||
* @param metadataMap A map containing the record metadata. | ||
* @param schema The Avro schema of the record. | ||
* @param record An option of record. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So now this record is non-null, correct?
* @param record An option of record. | |
* @param record The record. |
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
Outdated
Show resolved
Hide resolved
* @param record The engine row | ||
* @return row with binary format | ||
*/ | ||
public abstract T toBinaryRow(T record); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also think that the UnsafeRow
conversion is Spark only and that can be done in the log record iterator where the Avro record is deserialized to UnsafeRow
, without adding the public API method toBinaryRow
to HoodieReaderContext
. However I see there is difficulty due to schema evolution and other handling that require more refactoring to achieve the goal. If we want to keep toBinaryRow
in this PR, mark it as @Deprecated
or @PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED)
so further usage is disallowed.
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecord.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
Outdated
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
Outdated
Show resolved
Hide resolved
} | ||
} | ||
Option<Pair<HoodieRecord, Schema>> mergedRecord = recordMerger.get().merge( | ||
readerContext.constructHoodieRecord(older, olderInfoMap), readerContext.getSchemaFromMetadata(olderInfoMap), | ||
readerContext.constructHoodieRecord(newer, newerInfoMap), readerContext.getSchemaFromMetadata(newerInfoMap), props); | ||
readerContext.constructHoodieRecord(olderRecord), readerContext.getSchemaFromBufferRecord(olderRecord), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing to validate later is that custom merger implementation can return HoodieEmptyRecord
to indicate deletes. That should be properly handled in the CUSTOM merge mode.
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
Outdated
Show resolved
Hide resolved
+1, nice fix~ |
…uffer
Change Logs
Refactor the log records presentation in FileGroupRecordBuffer
HoodieRecord
to avoid dup calculating.BufferedRecord
to substitutePair<Option<T>, Map<String, Object>>
for record buffer in file group reader.Impact
Reduce heap size of record to make ExternalSpillableMap less prone to spill.
Risk level (write none, low medium or high below)
medium
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist