Skip to content

Commit fbd1ee8

Browse files
[HUDI-9316] Add Avro based ReaderContext to assist in migration to FileGroupReader (#13171)
1 parent 760ecb5 commit fbd1ee8

File tree

18 files changed

+911
-532
lines changed

18 files changed

+911
-532
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.hudi.common.table.read;
21+
22+
import org.apache.hudi.client.HoodieJavaWriteClient;
23+
import org.apache.hudi.client.common.HoodieJavaEngineContext;
24+
import org.apache.hudi.common.config.RecordMergeMode;
25+
import org.apache.hudi.common.engine.EngineType;
26+
import org.apache.hudi.common.model.HoodieAvroRecord;
27+
import org.apache.hudi.common.model.HoodieRecord;
28+
import org.apache.hudi.common.model.HoodieRecordPayload;
29+
import org.apache.hudi.common.table.HoodieTableMetaClient;
30+
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
31+
import org.apache.hudi.config.HoodieWriteConfig;
32+
import org.apache.hudi.storage.HoodieStorage;
33+
import org.apache.hudi.storage.StoragePath;
34+
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
35+
import org.apache.hudi.testutils.HoodieJavaClientTestHarness;
36+
37+
import java.io.IOException;
38+
import java.util.ArrayList;
39+
import java.util.HashMap;
40+
import java.util.List;
41+
import java.util.Map;
42+
43+
public abstract class HoodieFileGroupReaderOnJavaTestBase<T> extends TestHoodieFileGroupReaderBase<T> {
44+
45+
@Override
46+
public String getBasePath() {
47+
return tempDir.toAbsolutePath() + "/myTable";
48+
}
49+
50+
@Override
51+
public String getCustomPayload() {
52+
return CustomPayloadForTesting.class.getName();
53+
}
54+
55+
@Override
56+
public void commitToTable(List<HoodieRecord> recordList, String operation, Map<String, String> writeConfigs) {
57+
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
58+
.withEngineType(EngineType.JAVA)
59+
.withEmbeddedTimelineServerEnabled(false)
60+
.withProps(writeConfigs)
61+
.withPath(getBasePath())
62+
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
63+
.build();
64+
65+
HoodieJavaClientTestHarness.TestJavaTaskContextSupplier taskContextSupplier = new HoodieJavaClientTestHarness.TestJavaTaskContextSupplier();
66+
HoodieJavaEngineContext context = new HoodieJavaEngineContext(getStorageConf(), taskContextSupplier);
67+
//init table if not exists
68+
StoragePath basePath = new StoragePath(getBasePath());
69+
try (HoodieStorage storage = new HoodieHadoopStorage(basePath, getStorageConf())) {
70+
boolean basepathExists = storage.exists(basePath);
71+
boolean operationIsInsert = operation.equalsIgnoreCase("insert");
72+
if (!basepathExists || operationIsInsert) {
73+
if (basepathExists) {
74+
storage.deleteDirectory(basePath);
75+
}
76+
Map<String, Object> initConfigs = new HashMap<>(writeConfigs);
77+
HoodieTableMetaClient.TableBuilder builder = HoodieTableMetaClient.newTableBuilder()
78+
.setTableType(writeConfigs.getOrDefault("hoodie.datasource.write.table.type", "MERGE_ON_READ"))
79+
.setTableName(writeConfigs.get("hoodie.table.name"))
80+
.setPartitionFields(writeConfigs.getOrDefault("hoodie.datasource.write.partitionpath.field", ""))
81+
.setRecordMergeMode(RecordMergeMode.getValue(writeConfigs.get("hoodie.record.merge.mode")))
82+
.set(initConfigs);
83+
if (writeConfigs.containsKey("hoodie.datasource.write.payload.class")) {
84+
builder = builder.setPayloadClassName(writeConfigs.get("hoodie.datasource.write.payload.class"));
85+
}
86+
builder.initTable(getStorageConf(), getBasePath());
87+
}
88+
} catch (IOException e) {
89+
throw new RuntimeException(e);
90+
}
91+
92+
try (HoodieJavaWriteClient writeClient = new HoodieJavaWriteClient(context, writeConfig)) {
93+
String instantTime = writeClient.createNewInstantTime();
94+
writeClient.startCommitWithTime(instantTime);
95+
// Make a copy of the records for writing. The writer will clear out the data field.
96+
List<HoodieRecord> recordsCopy = new ArrayList<>(recordList.size());
97+
recordList.forEach(hoodieRecord -> recordsCopy.add(new HoodieAvroRecord<>(hoodieRecord.getKey(), (HoodieRecordPayload) hoodieRecord.getData())));
98+
if (operation.toLowerCase().equals("insert")) {
99+
writeClient.insert(recordsCopy, instantTime);
100+
} else {
101+
writeClient.upsert(recordsCopy, instantTime);
102+
}
103+
}
104+
}
105+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.hudi.common.table.read;
21+
22+
import org.apache.hudi.avro.HoodieAvroReaderContext;
23+
import org.apache.hudi.common.engine.HoodieReaderContext;
24+
import org.apache.hudi.common.table.HoodieTableMetaClient;
25+
import org.apache.hudi.storage.StorageConfiguration;
26+
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
27+
28+
import org.apache.avro.Schema;
29+
import org.apache.avro.generic.IndexedRecord;
30+
31+
import static org.junit.jupiter.api.Assertions.assertEquals;
32+
33+
public class TestHoodieFileGroupReaderOnJava extends HoodieFileGroupReaderOnJavaTestBase<IndexedRecord> {
34+
private static final StorageConfiguration<?> STORAGE_CONFIGURATION = new HadoopStorageConfiguration(false);
35+
36+
@Override
37+
public StorageConfiguration<?> getStorageConf() {
38+
return STORAGE_CONFIGURATION;
39+
}
40+
41+
@Override
42+
public HoodieReaderContext<IndexedRecord> getHoodieReaderContext(String tablePath, Schema avroSchema, StorageConfiguration<?> storageConf, HoodieTableMetaClient metaClient) {
43+
return new HoodieAvroReaderContext(storageConf, metaClient.getTableConfig());
44+
}
45+
46+
@Override
47+
public void assertRecordsEqual(Schema schema, IndexedRecord expected, IndexedRecord actual) {
48+
assertEquals(expected, actual);
49+
}
50+
}

hudi-client/hudi-java-client/src/test/java/org/apache/hudi/hadoop/TestHoodieFileGroupReaderOnHive.java

+4-157
Original file line numberDiff line numberDiff line change
@@ -20,71 +20,38 @@
2020
package org.apache.hudi.hadoop;
2121

2222
import org.apache.hudi.avro.HoodieAvroUtils;
23-
import org.apache.hudi.client.HoodieJavaWriteClient;
24-
import org.apache.hudi.client.common.HoodieJavaEngineContext;
2523
import org.apache.hudi.common.config.HoodieMemoryConfig;
26-
import org.apache.hudi.common.config.HoodieReaderConfig;
27-
import org.apache.hudi.common.config.RecordMergeMode;
28-
import org.apache.hudi.common.engine.EngineType;
2924
import org.apache.hudi.common.engine.HoodieReaderContext;
30-
import org.apache.hudi.common.model.HoodieAvroRecord;
31-
import org.apache.hudi.common.model.HoodieRecord;
32-
import org.apache.hudi.common.model.HoodieRecordPayload;
3325
import org.apache.hudi.common.table.HoodieTableMetaClient;
34-
import org.apache.hudi.common.table.read.CustomPayloadForTesting;
35-
import org.apache.hudi.common.table.read.TestHoodieFileGroupReaderBase;
26+
import org.apache.hudi.common.table.read.HoodieFileGroupReaderOnJavaTestBase;
3627
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
3728
import org.apache.hudi.common.testutils.HoodieTestUtils;
3829
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
39-
import org.apache.hudi.config.HoodieWriteConfig;
40-
import org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
41-
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
42-
import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader;
4330
import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
4431
import org.apache.hudi.storage.HoodieStorage;
4532
import org.apache.hudi.storage.StorageConfiguration;
4633
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
4734
import org.apache.hudi.testutils.ArrayWritableTestUtil;
48-
import org.apache.hudi.testutils.HoodieJavaClientTestHarness;
4935

5036
import org.apache.avro.Schema;
5137
import org.apache.hadoop.conf.Configuration;
5238
import org.apache.hadoop.fs.FileSystem;
53-
import org.apache.hadoop.fs.Path;
5439
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
55-
import org.apache.hadoop.hive.ql.exec.Utilities;
56-
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
5740
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
58-
import org.apache.hadoop.hive.ql.plan.MapredWork;
59-
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
60-
import org.apache.hadoop.hive.ql.plan.TableDesc;
6141
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
6242
import org.apache.hadoop.io.ArrayWritable;
63-
import org.apache.hadoop.io.NullWritable;
64-
import org.apache.hadoop.mapred.FileInputFormat;
65-
import org.apache.hadoop.mapred.InputSplit;
6643
import org.apache.hadoop.mapred.JobConf;
67-
import org.apache.hadoop.mapred.RecordReader;
68-
import org.apache.hadoop.mapred.Reporter;
6944
import org.junit.jupiter.api.AfterAll;
7045
import org.junit.jupiter.api.BeforeAll;
7146

7247
import java.io.IOException;
73-
import java.util.ArrayList;
74-
import java.util.Arrays;
75-
import java.util.HashMap;
76-
import java.util.LinkedHashMap;
7748
import java.util.List;
78-
import java.util.Map;
7949
import java.util.stream.Collectors;
8050

81-
import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_MAP_WORK;
82-
import static org.apache.hadoop.hive.ql.exec.Utilities.MAPRED_MAPPER_CLASS;
8351
import static org.apache.hudi.hadoop.HoodieFileGroupReaderBasedRecordReader.getRecordKeyField;
8452
import static org.apache.hudi.hadoop.HoodieFileGroupReaderBasedRecordReader.getStoredPartitionFieldNames;
85-
import static org.junit.jupiter.api.Assertions.assertEquals;
8653

87-
public class TestHoodieFileGroupReaderOnHive extends TestHoodieFileGroupReaderBase<ArrayWritable> {
54+
public class TestHoodieFileGroupReaderOnHive extends HoodieFileGroupReaderOnJavaTestBase<ArrayWritable> {
8855

8956
private static final String PARTITION_COLUMN = "datestr";
9057
private static JobConf baseJobConf;
@@ -97,7 +64,7 @@ public class TestHoodieFileGroupReaderOnHive extends TestHoodieFileGroupReaderBa
9764
private static final boolean USE_FAKE_PARTITION = true;
9865

9966
@BeforeAll
100-
public static void setUpClass() throws IOException, InterruptedException {
67+
public static void setUpClass() throws IOException {
10168
// Append is not supported in LocalFileSystem. HDFS needs to be setup.
10269
hdfsTestService = new HdfsTestService();
10370
fs = hdfsTestService.start(true).getFileSystem();
@@ -123,140 +90,20 @@ public StorageConfiguration<?> getStorageConf() {
12390
}
12491

12592
@Override
126-
public String getBasePath() {
127-
return tempDir.toAbsolutePath() + "/myTable";
128-
}
129-
130-
@Override
131-
public HoodieReaderContext<ArrayWritable> getHoodieReaderContext(String tablePath, Schema avroSchema, StorageConfiguration<?> storageConf) {
93+
public HoodieReaderContext<ArrayWritable> getHoodieReaderContext(String tablePath, Schema avroSchema, StorageConfiguration<?> storageConf, HoodieTableMetaClient metaClient) {
13294
HoodieFileGroupReaderBasedRecordReader.HiveReaderCreator readerCreator = (inputSplit, jobConf) -> new MapredParquetInputFormat().getRecordReader(inputSplit, jobConf, null);
133-
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(storageConf).setBasePath(tablePath).build();
13495
JobConf jobConf = new JobConf(storageConf.unwrapAs(Configuration.class));
13596
setupJobconf(jobConf);
13697
return new HiveHoodieReaderContext(readerCreator, getRecordKeyField(metaClient),
13798
getStoredPartitionFieldNames(new JobConf(storageConf.unwrapAs(Configuration.class)), avroSchema),
13899
new ObjectInspectorCache(avroSchema, jobConf), storageConf);
139100
}
140101

141-
@Override
142-
public String getCustomPayload() {
143-
return CustomPayloadForTesting.class.getName();
144-
}
145-
146-
@Override
147-
public void commitToTable(List<HoodieRecord> recordList, String operation, Map<String, String> writeConfigs) {
148-
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
149-
.withEngineType(EngineType.JAVA)
150-
.withEmbeddedTimelineServerEnabled(false)
151-
.withProps(writeConfigs)
152-
.withPath(getBasePath())
153-
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
154-
.build();
155-
156-
HoodieJavaClientTestHarness.TestJavaTaskContextSupplier taskContextSupplier = new HoodieJavaClientTestHarness.TestJavaTaskContextSupplier();
157-
HoodieJavaEngineContext context = new HoodieJavaEngineContext(getStorageConf(), taskContextSupplier);
158-
//init table if not exists
159-
Path basePath = new Path(getBasePath());
160-
try {
161-
try (FileSystem lfs = basePath.getFileSystem(baseJobConf)) {
162-
boolean basepathExists = lfs.exists(basePath);
163-
boolean operationIsInsert = operation.equalsIgnoreCase("insert");
164-
if (!basepathExists || operationIsInsert) {
165-
if (basepathExists) {
166-
lfs.delete(new Path(getBasePath()), true);
167-
}
168-
Map<String, Object> initConfigs = new HashMap<>(writeConfigs);
169-
HoodieTableMetaClient.TableBuilder builder = HoodieTableMetaClient.newTableBuilder()
170-
.setTableType(writeConfigs.getOrDefault("hoodie.datasource.write.table.type", "MERGE_ON_READ"))
171-
.setTableName(writeConfigs.get("hoodie.table.name"))
172-
.setPartitionFields(writeConfigs.getOrDefault("hoodie.datasource.write.partitionpath.field", ""))
173-
.setRecordMergeMode(RecordMergeMode.getValue(writeConfigs.get("hoodie.record.merge.mode")))
174-
.set(initConfigs);
175-
if (writeConfigs.containsKey("hoodie.datasource.write.payload.class")) {
176-
builder = builder.setPayloadClassName(writeConfigs.get("hoodie.datasource.write.payload.class"));
177-
}
178-
builder.initTable(storageConf, getBasePath());
179-
}
180-
}
181-
} catch (IOException e) {
182-
throw new RuntimeException(e);
183-
}
184-
185-
try (HoodieJavaWriteClient writeClient = new HoodieJavaWriteClient(context, writeConfig)) {
186-
String instantTime = writeClient.createNewInstantTime();
187-
writeClient.startCommitWithTime(instantTime);
188-
// Make a copy of the records for writing. The writer will clear out the data field.
189-
List<HoodieRecord> recordsCopy = new ArrayList<>(recordList.size());
190-
recordList.forEach(hoodieRecord -> recordsCopy.add(new HoodieAvroRecord<>(hoodieRecord.getKey(), (HoodieRecordPayload) hoodieRecord.getData())));
191-
if (operation.toLowerCase().equals("insert")) {
192-
writeClient.insert(recordsCopy, instantTime);
193-
} else {
194-
writeClient.upsert(recordsCopy, instantTime);
195-
}
196-
}
197-
}
198-
199102
@Override
200103
public void assertRecordsEqual(Schema schema, ArrayWritable expected, ArrayWritable actual) {
201104
ArrayWritableTestUtil.assertArrayWritableEqual(schema, expected, actual, false);
202105
}
203106

204-
private static boolean isLogFileRec(HoodieReaderContext<ArrayWritable> readerContext, Schema schema, ArrayWritable record) {
205-
return !readerContext.getValue(record, schema, HoodieRecord.FILENAME_METADATA_FIELD).toString().contains(".parquet");
206-
}
207-
208-
private static String createUniqueKey(HoodieReaderContext<ArrayWritable> readerContext, Schema schema, ArrayWritable record, boolean isSkipMerge) {
209-
if (isSkipMerge) {
210-
return readerContext.getRecordKey(record, schema) + "_" + readerContext.getValue(record, schema, HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
211-
} else {
212-
return readerContext.getRecordKey(record, schema);
213-
}
214-
}
215-
216-
private RecordReader<NullWritable, ArrayWritable> createRecordReader(String tablePath, boolean isSkipMerge) throws IOException {
217-
JobConf jobConf = new JobConf(baseJobConf);
218-
jobConf.set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false");
219-
jobConf.set(HoodieRealtimeRecordReader.REALTIME_SKIP_MERGE_PROP, String.valueOf(isSkipMerge));
220-
221-
TableDesc tblDesc = Utilities.defaultTd;
222-
// Set the input format
223-
tblDesc.setInputFileFormatClass(HoodieParquetRealtimeInputFormat.class);
224-
LinkedHashMap<Path, PartitionDesc> pt = new LinkedHashMap<>();
225-
LinkedHashMap<Path, ArrayList<String>> talias = new LinkedHashMap<>();
226-
227-
PartitionDesc partDesc = new PartitionDesc(tblDesc, null);
228-
229-
pt.put(new Path(tablePath), partDesc);
230-
231-
ArrayList<String> arrayList = new ArrayList<>();
232-
arrayList.add(tablePath);
233-
talias.put(new Path(tablePath), arrayList);
234-
235-
MapredWork mrwork = new MapredWork();
236-
mrwork.getMapWork().setPathToPartitionInfo(pt);
237-
mrwork.getMapWork().setPathToAliases(talias);
238-
239-
Path mapWorkPath = new Path(tablePath);
240-
Utilities.setMapRedWork(jobConf, mrwork, mapWorkPath);
241-
242-
// Add three partition path to InputPaths
243-
Path[] partitionDirArray = new Path[HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS.length];
244-
Arrays.stream(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS).map(s -> new Path(tablePath, s)).collect(Collectors.toList()).toArray(partitionDirArray);
245-
FileInputFormat.setInputPaths(jobConf, partitionDirArray);
246-
jobConf.set(HAS_MAP_WORK, "true");
247-
// The following config tells Hive to choose ExecMapper to read the MAP_WORK
248-
jobConf.set(MAPRED_MAPPER_CLASS, ExecMapper.class.getName());
249-
// setting the split size to be 3 to create one split for 3 file groups
250-
jobConf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.SPLIT_MAXSIZE, "128000000");
251-
setupJobconf(jobConf);
252-
253-
HoodieCombineHiveInputFormat combineHiveInputFormat = new HoodieCombineHiveInputFormat();
254-
InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 1);
255-
256-
assertEquals(1, splits.length);
257-
return combineHiveInputFormat.getRecordReader(splits[0], jobConf, Reporter.NULL);
258-
}
259-
260107
private void setupJobconf(JobConf jobConf) {
261108
Schema schema = HoodieAvroUtils.addMetadataFields(HoodieTestDataGenerator.AVRO_SCHEMA);
262109
List<Schema.Field> fields = schema.getFields();

0 commit comments

Comments
 (0)