Skip to content

[HUDI-9318] Refactor the log records presentation in FileGroupRecordB… #13213

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 25, 2025

Conversation

cshuo
Copy link
Contributor

@cshuo cshuo commented Apr 23, 2025

…uffer

Change Logs

Refactor the log records presentation in FileGroupRecordBuffer

  • cache ordering value for HoodieRecord to avoid dup calculating.
  • introduce a pojo BufferedRecord to substitute Pair<Option<T>, Map<String, Object>> for record buffer in file group reader.
  • convert the buffered record into binary-format before put into spillable map to save space, and reduce spilling.

Impact

Reduce heap size of record to make ExternalSpillableMap less prone to spill.

Risk level (write none, low medium or high below)

medium

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@cshuo cshuo changed the title [HUDI-9318] Refactor the log records presentation in FileGroupRecordB… [WIP][HUDI-9318] Refactor the log records presentation in FileGroupRecordB… Apr 23, 2025
@github-actions github-actions bot added the size:L PR with lines of changes in (300, 1000] label Apr 23, 2025
return isDelete;
}

public boolean isDeleteRecordWithNaturalOrder() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naturalOrdering is a confusing concept to me. usually natural ordering would be something like 1,2,3 but in this case 0 is greater than 1, 2, and 3. I think having some other naming would be helpful here like hardDelete or forcedDeletion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, isDeleteRecordWithNaturalOrder is checked for queries like DELETE FROM TABLE, if so, +1 for hardDelete. WDYT, cc @nsivabalan @yihua @danny0405

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's keep it as it is now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, lets not make too many changes in this patch.

@cshuo cshuo force-pushed the HUDI-9318 branch 2 times, most recently from f9b68bf to 2474394 Compare April 23, 2025 16:54
@cshuo cshuo changed the title [WIP][HUDI-9318] Refactor the log records presentation in FileGroupRecordB… [HUDI-9318] Refactor the log records presentation in FileGroupRecordB… Apr 23, 2025
@cshuo
Copy link
Contributor Author

cshuo commented Apr 23, 2025

cc @nsivabalan @linliu-code @yihua

@linliu-code
Copy link
Contributor

@cshuo , how much memory does it save?

* @param record The engine row
* @return row with binary format
*/
public abstract T toBinaryRow(T record);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some context for why this is required?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of enforcing all implementations to have a concept of a Binary version, can we move this into the code that reads the data into format T? Or possible move it to the seal functionality?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We think through this and think should add a ne interface for 2 reasons:

  • seal is only for copy purposes, not care about the row format itself;
  • toBinaryRow will just focus on row format transformation.

Thus to avoid unnecessary copy or transf of rows.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only enforce the row that would be put in spillable map has binary format. And seal is called in multiple places, e.g., baseRecord read from base file is also sealed, where toBinary is not necessary to avoid additional costs. That's the context we introduce a new method toBinary to separate seal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can move this conversion directly to the code that converts from the avro to the row. This doesn't need to be part of the abstract class in my opinion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think that the UnsafeRow conversion is Spark only and that can be done in the log record iterator where the Avro record is deserialized to UnsafeRow, without adding the public API method toBinaryRow to HoodieReaderContext. However I see there is difficulty due to schema evolution and other handling that require more refactoring to achieve the goal. If we want to keep toBinaryRow in this PR, mark it as @Deprecated or @PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED) so further usage is disallowed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides Spark unsafe row conversion, Flink also needs convert GenericRowData info BinaryRowData. And binary conversion is needed not just in log reading, it's also necessary after merging in fg reader, where the merged row may be not binary format either, so currently it seems necessary to make HoodieReaderContext capable of performing binary conversion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also make this the responsibility of the merger though to return an optimal representation. If the merger returns an avro, we will run it through the same conversion code used when reading the log files

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you fix PR description to all out what changes we are doing in this patch.

@@ -89,6 +89,8 @@ public class HoodieSparkRecord extends HoodieRecord<InternalRow> {
*/
private final transient StructType schema;

private Comparable<?> orderingValue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we make it transient here, not sure Kyro serialization will ignore this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to confirm: Kyro would ignore the transient value

@cshuo
Copy link
Contributor Author

cshuo commented Apr 23, 2025

Can you fix PR description to all out what changes we are doing in this patch.

updated

if (internalRow instanceof UnsafeRow) {
return internalRow;
}
final UnsafeProjection unsafeProjection = HoodieInternalRowUtils.getCachedUnsafeProjection(schema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's create a follow-up JIRA to see if we can further simplify this part by considering schema evolution cc @jonvex , without having to get the projection instance per row.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* @param recordOption An option of record.
* @param metadataMap A map containing the record metadata.
* @param schema The Avro schema of the record.
* @param record An option of record.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So now this record is non-null, correct?

Suggested change
* @param record An option of record.
* @param record The record.

* @param record The engine row
* @return row with binary format
*/
public abstract T toBinaryRow(T record);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think that the UnsafeRow conversion is Spark only and that can be done in the log record iterator where the Avro record is deserialized to UnsafeRow, without adding the public API method toBinaryRow to HoodieReaderContext. However I see there is difficulty due to schema evolution and other handling that require more refactoring to achieve the goal. If we want to keep toBinaryRow in this PR, mark it as @Deprecated or @PublicAPIMethod(maturity = ApiMaturityLevel.DEPRECATED) so further usage is disallowed.

}
}
Option<Pair<HoodieRecord, Schema>> mergedRecord = recordMerger.get().merge(
readerContext.constructHoodieRecord(older, olderInfoMap), readerContext.getSchemaFromMetadata(olderInfoMap),
readerContext.constructHoodieRecord(newer, newerInfoMap), readerContext.getSchemaFromMetadata(newerInfoMap), props);
readerContext.constructHoodieRecord(olderRecord), readerContext.getSchemaFromBufferRecord(olderRecord),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to validate later is that custom merger implementation can return HoodieEmptyRecord to indicate deletes. That should be properly handled in the CUSTOM merge mode.

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@danny0405
Copy link
Contributor

+1, nice fix~

@danny0405 danny0405 merged commit 6566fca into apache:release-1.0.2 Apr 25, 2025
61 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:L PR with lines of changes in (300, 1000]
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants