Skip to content

Commit f9b68bf

Browse files
committed
[HUDI-9318] Refactor the log records presentation in FileGroupRecordBuffer
1 parent a827f2c commit f9b68bf

File tree

16 files changed

+373
-317
lines changed

16 files changed

+373
-317
lines changed

hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java

+16-7
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,15 @@
2828
import org.apache.hudi.common.model.HoodieRecord;
2929
import org.apache.hudi.common.model.HoodieRecordMerger;
3030
import org.apache.hudi.common.model.HoodieSparkRecord;
31+
import org.apache.hudi.common.table.read.BufferedRecord;
3132
import org.apache.hudi.common.util.HoodieRecordUtils;
3233
import org.apache.hudi.common.util.Option;
3334

3435
import org.apache.avro.Schema;
3536
import org.apache.spark.sql.HoodieInternalRowUtils;
3637
import org.apache.spark.sql.HoodieUnsafeRowUtils;
3738
import org.apache.spark.sql.catalyst.InternalRow;
39+
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
3840
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
3941
import org.apache.spark.sql.types.StructType;
4042
import org.apache.spark.unsafe.types.UTF8String;
@@ -88,17 +90,15 @@ public String getRecordKey(InternalRow row, Schema schema) {
8890
}
8991

9092
@Override
91-
public HoodieRecord<InternalRow> constructHoodieRecord(Option<InternalRow> rowOption,
92-
Map<String, Object> metadataMap) {
93-
if (!rowOption.isPresent()) {
93+
public HoodieRecord<InternalRow> constructHoodieRecord(BufferedRecord<InternalRow> bufferedRecord) {
94+
if (bufferedRecord.isDelete()) {
9495
return new HoodieEmptyRecord<>(
95-
new HoodieKey((String) metadataMap.get(INTERNAL_META_RECORD_KEY),
96-
(String) metadataMap.get(INTERNAL_META_PARTITION_PATH)),
96+
new HoodieKey(bufferedRecord.getRecordKey(), bufferedRecord.getPartitionPath()),
9797
HoodieRecord.HoodieRecordType.SPARK);
9898
}
9999

100-
Schema schema = getSchemaFromMetadata(metadataMap);
101-
InternalRow row = rowOption.get();
100+
Schema schema = getSchemaFromBufferRecord(bufferedRecord);
101+
InternalRow row = bufferedRecord.getRecord();
102102
return new HoodieSparkRecord(row, HoodieInternalRowUtils.getCachedSchema(schema));
103103
}
104104

@@ -107,6 +107,15 @@ public InternalRow seal(InternalRow internalRow) {
107107
return internalRow.copy();
108108
}
109109

110+
@Override
111+
public InternalRow toBinaryRow(InternalRow internalRow) {
112+
if (internalRow instanceof UnsafeRow) {
113+
return internalRow.copy();
114+
}
115+
UnsafeProjection unsafeProjection = HoodieInternalRowUtils.getCachedUnsafeProjection(getSchemaHandler().getRequiredSchema());
116+
return unsafeProjection.apply(internalRow).copy();
117+
}
118+
110119
private Object getFieldValueFromInternalRow(InternalRow row, Schema recordSchema, String fieldName) {
111120
StructType structType = getCachedSchema(recordSchema);
112121
scala.Option<HoodieUnsafeRowUtils.NestedFieldPath> cachedNestedFieldPath =

hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala

+14
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ object HoodieInternalRowUtils {
6060
new mutable.HashMap[(StructType, StructType), UnsafeProjection]
6161
})
6262

63+
private val identicalUnsafeProjectionThreadLocal: ThreadLocal[mutable.HashMap[Schema, UnsafeProjection]] =
64+
ThreadLocal.withInitial(new Supplier[mutable.HashMap[Schema, UnsafeProjection]] {
65+
override def get(): mutable.HashMap[Schema, UnsafeProjection] =
66+
new mutable.HashMap[Schema, UnsafeProjection]
67+
})
68+
6369
private val schemaMap = new ConcurrentHashMap[Schema, StructType]
6470
private val orderPosListMap = new ConcurrentHashMap[(StructType, String), Option[NestedFieldPath]]
6571

@@ -75,6 +81,14 @@ object HoodieInternalRowUtils {
7581
.getOrElseUpdate((from, to), generateUnsafeProjection(from, to))
7682
}
7783

84+
/**
85+
* Provides cached instance of [[UnsafeProjection]] to project Java object based [[InternalRow]] to [[UnsafeRow]].
86+
*/
87+
def getCachedUnsafeProjection(schema: Schema): UnsafeProjection = {
88+
identicalUnsafeProjectionThreadLocal.get()
89+
.getOrElseUpdate(schema, UnsafeProjection.create(getCachedSchema(schema)))
90+
}
91+
7892
/**
7993
* Provides cached instance of [[UnsafeRowWriter]] transforming provided [[InternalRow]]s from
8094
* one [[StructType]] and into another [[StructType]]

hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java

+20-20
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.hudi.common.model.HoodieRecord;
2424
import org.apache.hudi.common.model.HoodieRecordMerger;
2525
import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
26+
import org.apache.hudi.common.table.read.BufferedRecord;
2627
import org.apache.hudi.common.util.LocalAvroSchemaCache;
2728
import org.apache.hudi.common.util.Option;
2829
import org.apache.hudi.common.util.collection.ClosableIterator;
@@ -243,39 +244,30 @@ public String getRecordKey(T record, Schema schema) {
243244
/**
244245
* Gets the ordering value in particular type.
245246
*
246-
* @param recordOption An option of record.
247-
* @param metadataMap A map containing the record metadata.
248-
* @param schema The Avro schema of the record.
247+
* @param record An option of record.
248+
* @param schema The Avro schema of the record.
249249
* @param orderingFieldName name of the ordering field
250250
* @return The ordering value.
251251
*/
252-
public Comparable getOrderingValue(Option<T> recordOption,
253-
Map<String, Object> metadataMap,
252+
public Comparable getOrderingValue(T record,
254253
Schema schema,
255254
Option<String> orderingFieldName) {
256-
if (metadataMap.containsKey(INTERNAL_META_ORDERING_FIELD)) {
257-
return (Comparable) metadataMap.get(INTERNAL_META_ORDERING_FIELD);
258-
}
259-
260-
if (!recordOption.isPresent() || orderingFieldName.isEmpty()) {
255+
if (orderingFieldName.isEmpty()) {
261256
return DEFAULT_ORDERING_VALUE;
262257
}
263258

264-
Object value = getValue(recordOption.get(), schema, orderingFieldName.get());
259+
Object value = getValue(record, schema, orderingFieldName.get());
265260
Comparable finalOrderingVal = value != null ? convertValueToEngineType((Comparable) value) : DEFAULT_ORDERING_VALUE;
266-
metadataMap.put(INTERNAL_META_ORDERING_FIELD, finalOrderingVal);
267261
return finalOrderingVal;
268262
}
269263

270264
/**
271265
* Constructs a new {@link HoodieRecord} based on the record of engine-specific type and metadata for merging.
272266
*
273-
* @param recordOption An option of the record in engine-specific type if exists.
274-
* @param metadataMap The record metadata.
267+
* @param bufferedRecord buffer record
275268
* @return A new instance of {@link HoodieRecord}.
276269
*/
277-
public abstract HoodieRecord<T> constructHoodieRecord(Option<T> recordOption,
278-
Map<String, Object> metadataMap);
270+
public abstract HoodieRecord<T> constructHoodieRecord(BufferedRecord<T> bufferedRecord);
279271

280272
/**
281273
* Seals the engine-specific record to make sure the data referenced in memory do not change.
@@ -285,6 +277,14 @@ public abstract HoodieRecord<T> constructHoodieRecord(Option<T> recordOption,
285277
*/
286278
public abstract T seal(T record);
287279

280+
/**
281+
* Convert engine specific row into binary format.
282+
*
283+
* @param record The engine row
284+
* @return row with binary format
285+
*/
286+
public abstract T toBinaryRow(T record);
287+
288288
/**
289289
* Generates metadata map based on the information.
290290
*
@@ -319,11 +319,11 @@ public Map<String, Object> generateMetadataForRecord(T record, Schema schema) {
319319
/**
320320
* Gets the schema encoded in the metadata map
321321
*
322-
* @param infoMap The record metadata
322+
* @param record buffered record
323323
* @return the avro schema if it is encoded in the metadata map, else null
324324
*/
325-
public Schema getSchemaFromMetadata(Map<String, Object> infoMap) {
326-
return decodeAvroSchema(infoMap.get(INTERNAL_META_SCHEMA_ID));
325+
public Schema getSchemaFromBufferRecord(BufferedRecord<T> record) {
326+
return decodeAvroSchema(record.getSchemaId());
327327
}
328328

329329
/**
@@ -409,7 +409,7 @@ public boolean supportsParquetRowIndex() {
409409
/**
410410
* Encodes the given avro schema for efficient serialization.
411411
*/
412-
private Integer encodeAvroSchema(Schema schema) {
412+
public Integer encodeAvroSchema(Schema schema) {
413413
return this.localAvroSchemaCache.cacheSchema(schema);
414414
}
415415

hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordReader.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@
2222
import org.apache.hudi.common.engine.HoodieReaderContext;
2323
import org.apache.hudi.common.model.HoodieLogFile;
2424
import org.apache.hudi.common.table.read.FileGroupRecordBuffer;
25+
import org.apache.hudi.common.table.read.BufferedRecord;
2526
import org.apache.hudi.common.util.CollectionUtils;
2627
import org.apache.hudi.common.util.HoodieTimer;
2728
import org.apache.hudi.common.util.Option;
2829
import org.apache.hudi.common.util.ValidationUtils;
29-
import org.apache.hudi.common.util.collection.Pair;
3030
import org.apache.hudi.storage.HoodieStorage;
3131
import org.apache.hudi.storage.StoragePath;
3232

@@ -51,7 +51,7 @@
5151
* @param <T> type of engine-specific record representation.
5252
*/
5353
public class HoodieMergedLogRecordReader<T> extends BaseHoodieLogRecordReader<T>
54-
implements Iterable<Pair<Option<T>, Map<String, Object>>>, Closeable {
54+
implements Iterable<BufferedRecord<T>>, Closeable {
5555
private static final Logger LOG = LoggerFactory.getLogger(HoodieMergedLogRecordReader.class);
5656
// A timer for calculating elapsed time in millis
5757
public final HoodieTimer timer = HoodieTimer.create();
@@ -166,11 +166,11 @@ private void performScan() {
166166
}
167167

168168
@Override
169-
public Iterator<Pair<Option<T>, Map<String, Object>>> iterator() {
169+
public Iterator<BufferedRecord<T>> iterator() {
170170
return recordBuffer.getLogRecordIterator();
171171
}
172172

173-
public Map<Serializable, Pair<Option<T>, Map<String, Object>>> getRecords() {
173+
public Map<Serializable, BufferedRecord<T>> getRecords() {
174174
return recordBuffer.getLogRecords();
175175
}
176176

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.common.table.read;
20+
21+
import org.apache.hudi.common.engine.HoodieReaderContext;
22+
import org.apache.hudi.common.model.DeleteRecord;
23+
import org.apache.hudi.common.model.HoodieRecord;
24+
import org.apache.hudi.common.util.Option;
25+
import org.apache.hudi.exception.HoodieException;
26+
27+
import org.apache.avro.Schema;
28+
29+
import java.io.IOException;
30+
import java.io.Serializable;
31+
import java.util.Properties;
32+
33+
import static org.apache.hudi.common.model.HoodieRecord.DEFAULT_ORDERING_VALUE;
34+
35+
/**
36+
* Buffered Record used by file group reader.
37+
*/
38+
public class BufferedRecord<T> implements Serializable {
39+
private final String recordKey;
40+
private final String partitionPath;
41+
private final Comparable orderingValue;
42+
private T record;
43+
private final Integer schemaId;
44+
private final boolean isDelete;
45+
46+
private BufferedRecord(String partitionPath, String recordKey, Comparable orderingValue, T record, Integer schemaId, boolean isDelete) {
47+
this.partitionPath = partitionPath;
48+
this.recordKey = recordKey;
49+
this.orderingValue = orderingValue;
50+
this.record = record;
51+
this.schemaId = schemaId;
52+
this.isDelete = isDelete;
53+
}
54+
55+
public static <T> BufferedRecord<T> forRecordWithContext(HoodieRecord<T> record, Schema schema, HoodieReaderContext<T> readerContext, Properties props) {
56+
String recordKey = record.getRecordKey();
57+
Integer schemaId = readerContext.encodeAvroSchema(schema);
58+
boolean isDelete;
59+
try {
60+
isDelete = record.isDelete(schema, props);
61+
} catch (IOException e) {
62+
throw new HoodieException("Failed to get isDelete from record.", e);
63+
}
64+
return new BufferedRecord<>(null, recordKey, record.getOrderingValue(schema, props), record.getData(), schemaId, isDelete);
65+
}
66+
67+
public static <T> BufferedRecord<T> forRecordWithContext(T record, Schema schema, HoodieReaderContext<T> readerContext, Option<String> orderingFieldName, boolean isDelete) {
68+
String recordKey = readerContext.getRecordKey(record, schema);
69+
Integer schemaId = readerContext.encodeAvroSchema(schema);
70+
Comparable orderingValue = readerContext.getOrderingValue(record, schema, orderingFieldName);
71+
return new BufferedRecord<>(null, recordKey, orderingValue, record, schemaId, isDelete);
72+
}
73+
74+
public static <T> BufferedRecord<T> forDeleteRecord(DeleteRecord deleteRecord, Comparable orderingValue) {
75+
return new BufferedRecord<>(
76+
deleteRecord.getPartitionPath(), deleteRecord.getRecordKey(), orderingValue, null, null, true);
77+
}
78+
79+
public String getPartitionPath() {
80+
return partitionPath;
81+
}
82+
83+
public String getRecordKey() {
84+
return recordKey;
85+
}
86+
87+
public Comparable getOrderingValue() {
88+
return orderingValue;
89+
}
90+
91+
public T getRecord() {
92+
return record;
93+
}
94+
95+
public Integer getSchemaId() {
96+
return schemaId;
97+
}
98+
99+
public boolean isDelete() {
100+
return isDelete;
101+
}
102+
103+
public boolean isDeleteRecordWithNaturalOrder() {
104+
return isDelete && getOrderingValue().equals(DEFAULT_ORDERING_VALUE);
105+
}
106+
107+
public BufferedRecord<T> toBinary(HoodieReaderContext<T> readerContext) {
108+
if (record != null) {
109+
record = readerContext.toBinaryRow(record);
110+
}
111+
return this;
112+
}
113+
114+
public BufferedRecord<T> copy(T data) {
115+
return new BufferedRecord<>(partitionPath, recordKey, orderingValue, data, schemaId, isDelete);
116+
}
117+
}

0 commit comments

Comments
 (0)