Skip to content

[HUDI-9316] Add Avro based ReaderContext to assist in migration to FileGroupReader #13171

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 22, 2025

Conversation

the-other-tim-brown
Copy link
Contributor

Change Logs

  • Moves the test reader context to a proper avro based reader context with its own unit testing
  • Fixes tests that were not closing resources properly

Impact

In order to migrate all reader paths to a consistent logical flow, we need to define readers that work with all the required engines. Currently the engines will all work with the Avro IndexedRecords since that is what the LogScanners will output today. This is the common denominator that will allow us to switch over the code and then focus on building optimal reader paths for each engine.

Risk level (write none, low medium or high below)

None, this PR in itself is just setting up a larger set of refactoring

Documentation Update

Describe any necessary documentation update if there is any new feature, config, or user-facing change. If not, put "none".

  • The config description must be updated if new configs are added or the default value of the configs are changed
  • Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
    ticket number here and follow the instruction to make
    changes to the website.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:XL PR with lines of changes > 1000 label Apr 18, 2025
import java.util.List;
import java.util.Map;

public abstract class HoodieFileGroupReaderOnJavaTestBase<T> extends TestHoodieFileGroupReaderBase<T> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The contents of this class are moved from TestHoodieFileGroupReaderOnHive

@@ -118,7 +116,6 @@ public static void setUp() throws IOException {
INSERT, DELETE, UPDATE, DELETE, UPDATE);
instantTimes = Arrays.asList(
"001", "002", "003", "004", "005");
shouldWritePositions = Arrays.asList(false, false, false, false, false);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this test and the others, this was previously a static variable that was updated by tests so there was no deterministic execution of these tests as a result since test ordering is random. Moved this to an instance variable so each test run would be deterministic.

Comment on lines +154 to +157
if (metaFieldsPopulated) {
return getFieldValueFromIndexedRecord(record, schema, RECORD_KEY_METADATA_FIELD).toString();
}
return keyGenerator.getRecordKey((GenericRecord) record);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ideally this can be made general in HoodieReaderContext. OK to keep it as is in this PR.

}

@Override
public UnaryOperator<IndexedRecord> projectRecord(Schema from, Schema to, Map<String, String> renamedColumns) {
if (!renamedColumns.isEmpty()) {
throw new UnsupportedOperationException("Schema evolution is not supported for the test reader context");
throw new UnsupportedOperationException("Schema evolution is not supported for the HoodieAvroReaderContext");
}
Map<String, Integer> fromFields = IntStream.range(0, from.getFields().size())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we still need #projectRecord, it would be good to cache the transformation based on <Schema from, Schema to, Map<String, String> renamedColumns> instead of computing the transformation upon each record (BaseSparkInternalRowReaderContext does something similar).

Another optimization is to push down the projection to the reader itself so the reader iterator directly returns the IndexRecord based on the to schema if possible to avoid reinstantiating the record here. This may require more investigation, and we can keep the functional correctness without worrying about the performance in this PR for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to building the transform. There are other places we can do this as well that I have found while digging into this code path.

Comment on lines +115 to +116
// Create dedicated merger to avoid current delete logic holes.
// TODO: Unify delete logic (HUDI-7240).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@linliu-code is this fixed?

Copy link
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. @the-other-tim-brown let's make sure the follow-ups are tracked or addressed in subsequent PRs.

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@yihua yihua merged commit fbd1ee8 into apache:master Apr 22, 2025
60 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:XL PR with lines of changes > 1000
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants