Skip to content

feat(datahub-sync): adds DataPlatformInstance aspect #13133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import com.linkedin.common.BrowsePathEntry;
import com.linkedin.common.BrowsePathEntryArray;
import com.linkedin.common.BrowsePathsV2;
import com.linkedin.common.DataPlatformInstance;
import com.linkedin.common.urn.DataPlatformUrn;
import com.linkedin.common.Status;
import com.linkedin.common.SubTypes;
import com.linkedin.common.UrnArray;
Expand Down Expand Up @@ -72,6 +74,9 @@ public class DataHubSyncClient extends HoodieSyncClient {
private static final Logger LOG = LoggerFactory.getLogger(DataHubSyncClient.class);

protected final DataHubSyncConfig config;
private final DataPlatformUrn dataPlatformUrn;
private final Option<String> dataPlatformInstance;
private final Option<Urn> dataPlatformInstanceUrn;
private final DatasetUrn datasetUrn;
private final Urn databaseUrn;
private final String tableName;
Expand All @@ -83,6 +88,9 @@ public DataHubSyncClient(DataHubSyncConfig config, HoodieTableMetaClient metaCli
this.config = config;
HoodieDataHubDatasetIdentifier datasetIdentifier =
config.getDatasetIdentifier();
this.dataPlatformUrn = datasetIdentifier.getDataPlatformUrn();
this.dataPlatformInstance = datasetIdentifier.getDataPlatformInstance();
this.dataPlatformInstanceUrn = datasetIdentifier.getDataPlatformInstanceUrn();
this.datasetUrn = datasetIdentifier.getDatasetUrn();
this.databaseUrn = datasetIdentifier.getDatabaseUrn();
this.tableName = datasetIdentifier.getTableName();
Expand Down Expand Up @@ -211,8 +219,8 @@ private MetadataChangeProposalWrapper createContainerAspect(Urn entityUrn, Urn c
return containerProposal;
}

private MetadataChangeProposalWrapper createBrowsePathsAspect(Urn entityUrn, List<BrowsePathEntry> path) {
BrowsePathEntryArray browsePathEntryArray = new BrowsePathEntryArray(path);
private MetadataChangeProposalWrapper createBrowsePathsAspect(Urn entityUrn, List<BrowsePathEntry> paths) {
BrowsePathEntryArray browsePathEntryArray = new BrowsePathEntryArray(paths);
MetadataChangeProposalWrapper browsePathsProposal = MetadataChangeProposalWrapper.builder()
.entityType(entityUrn.getEntityType())
.entityUrn(entityUrn)
Expand All @@ -222,6 +230,21 @@ private MetadataChangeProposalWrapper createBrowsePathsAspect(Urn entityUrn, Lis
return browsePathsProposal;
}

private MetadataChangeProposalWrapper createDataPlatformInstanceAspect(Urn entityUrn) {
DataPlatformInstance dataPlatformInstanceAspect = new DataPlatformInstance().setPlatform(this.dataPlatformUrn);
if (this.dataPlatformInstanceUrn.isPresent()) {
dataPlatformInstanceAspect.setInstance(dataPlatformInstanceUrn.get());
}

MetadataChangeProposalWrapper dataPlatformInstanceProposal = MetadataChangeProposalWrapper.builder()
.entityType(entityUrn.getEntityType())
.entityUrn(entityUrn)
.upsert()
.aspect(dataPlatformInstanceAspect)
.build();
return dataPlatformInstanceProposal;
}

private MetadataChangeProposalWrapper createDomainAspect(Urn entityUrn) {
try {
Urn domainUrn = Urn.createFromString(config.getDomainIdentifier());
Expand All @@ -246,10 +269,16 @@ private Stream<MetadataChangeProposalWrapper> createContainerEntity() {
.aspect(new ContainerProperties().setName(databaseName))
.build();

List<BrowsePathEntry> paths = dataPlatformInstanceUrn.map(dpiUrn -> Collections.singletonList(
new BrowsePathEntry().setUrn(dpiUrn).setId(dpiUrn.toString()))
).orElse(Collections.emptyList());

Stream<MetadataChangeProposalWrapper> resultStream = Stream.of(
containerEntityProposal,
createSubTypeAspect(databaseUrn, "Database"),
createBrowsePathsAspect(databaseUrn, Collections.emptyList()), createStatusAspect(databaseUrn),
createDataPlatformInstanceAspect(databaseUrn),
createBrowsePathsAspect(databaseUrn, paths),
createStatusAspect(databaseUrn),
config.attachDomain() ? createDomainAspect(databaseUrn) : null
).filter(Objects::nonNull);
return resultStream;
Expand Down Expand Up @@ -308,10 +337,20 @@ private MetadataChangeProposalWrapper createSchemaMetadataAspect(String tableNam
}

private Stream<MetadataChangeProposalWrapper> createDatasetEntity() {
BrowsePathEntry databasePath = new BrowsePathEntry().setUrn(databaseUrn).setId(databaseName);
List<BrowsePathEntry> paths = dataPlatformInstanceUrn.map(dpiUrn -> {
List<BrowsePathEntry> list = new ArrayList<BrowsePathEntry>();
list.add(new BrowsePathEntry().setUrn(dpiUrn).setId(dpiUrn.toString()));
list.add(databasePath);
return list;
}
).orElse(Collections.singletonList(databasePath));

Stream<MetadataChangeProposalWrapper> result = Stream.of(
createStatusAspect(datasetUrn),
createSubTypeAspect(datasetUrn, "Table"),
createBrowsePathsAspect(datasetUrn, Collections.singletonList(new BrowsePathEntry().setUrn(databaseUrn).setId(databaseName))),
createDataPlatformInstanceAspect(datasetUrn),
createBrowsePathsAspect(datasetUrn, paths),
createContainerAspect(datasetUrn, databaseUrn),
createSchemaMetadataAspect(tableName),
config.attachDomain() ? createDomainAspect(datasetUrn) : null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ public class DataHubSyncConfig extends HoodieSyncConfig {
.withDocumentation("String used to represent Hudi when creating its corresponding DataPlatform entity "
+ "within Datahub");

public static final ConfigProperty<String> META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME = ConfigProperty
.key("hoodie.meta.sync.datahub.dataplatform_instance.name")
.noDefaultValue()
.markAdvanced()
.withDocumentation("String used to represent Hudi instance when emitting Container and Dataset entities "
+ "with the corresponding DataPlatformInstance, only if given.");

public static final ConfigProperty<String> META_SYNC_DATAHUB_DATASET_ENV = ConfigProperty
.key("hoodie.meta.sync.datahub.dataset.env")
.defaultValue(DEFAULT_DATAHUB_ENV.name())
Expand Down Expand Up @@ -174,6 +181,10 @@ public static class DataHubSyncConfigParams {
+ "corresponding DataPlatform entity within Datahub")
public String dataPlatformName;

@Parameter(names = {"--data-platform-instance-name"}, description = "String used to represent Hudi instance when emitting Container and Dataset entities "
+ "with the corresponding DataPlatformInstance, only if given.")
public String dataPlatformInstanceName;

@Parameter(names = {"--dataset-env"}, description = "Which Datahub Environment to use when pushing entities")
public String datasetEnv;

Expand All @@ -196,6 +207,7 @@ public Properties toProps() {
props.setPropertyIfNonNull(META_SYNC_DATAHUB_EMITTER_TOKEN.key(), emitterToken);
props.setPropertyIfNonNull(META_SYNC_DATAHUB_EMITTER_SUPPLIER_CLASS.key(), emitterSupplierClass);
props.setPropertyIfNonNull(META_SYNC_DATAHUB_DATAPLATFORM_NAME.key(), dataPlatformName);
props.setPropertyIfNonNull(META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME.key(), dataPlatformInstanceName);
props.setPropertyIfNonNull(META_SYNC_DATAHUB_DATASET_ENV.key(), datasetEnv);
props.setPropertyIfNonNull(META_SYNC_DATAHUB_DOMAIN_IDENTIFIER.key(), domainIdentifier);
// We want the default behavior of DataHubSync Tool when run as command line to NOT suppress exceptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.hudi.sync.datahub.config;

import org.apache.hudi.common.util.Option;

import com.linkedin.common.FabricType;
import com.linkedin.common.urn.DataPlatformUrn;
import com.linkedin.common.urn.DatasetUrn;
Expand All @@ -29,6 +31,7 @@

import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME;
import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATAPLATFORM_NAME;
import static org.apache.hudi.sync.datahub.config.DataHubSyncConfig.META_SYNC_DATAHUB_DATASET_ENV;

Expand All @@ -43,6 +46,10 @@ public class HoodieDataHubDatasetIdentifier {
public static final FabricType DEFAULT_DATAHUB_ENV = FabricType.DEV;

protected final Properties props;
private final String dataPlatform;
private final DataPlatformUrn dataPlatformUrn;
private final Option<String> dataPlatformInstance;
private final Option<Urn> dataPlatformInstanceUrn;
private final DatasetUrn datasetUrn;
private final Urn databaseUrn;
private final String tableName;
Expand All @@ -55,18 +62,26 @@ public HoodieDataHubDatasetIdentifier(Properties props) {
}
DataHubSyncConfig config = new DataHubSyncConfig(props);

this.dataPlatform = config.getStringOrDefault(META_SYNC_DATAHUB_DATAPLATFORM_NAME);
this.dataPlatformUrn = createDataPlatformUrn(this.dataPlatform);
this.dataPlatformInstance = Option.ofNullable(config.getString(META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME));
this.dataPlatformInstanceUrn = createDataPlatformInstanceUrn(
this.dataPlatformUrn,
Option.ofNullable(config.getString(META_SYNC_DATAHUB_DATAPLATFORM_INSTANCE_NAME))
);
this.datasetUrn = new DatasetUrn(
createDataPlatformUrn(config.getStringOrDefault(META_SYNC_DATAHUB_DATAPLATFORM_NAME)),
createDatasetName(config.getString(META_SYNC_DATABASE_NAME), config.getString(META_SYNC_TABLE_NAME)),
this.dataPlatformUrn,
createDatasetName(this.dataPlatformInstance, config.getString(META_SYNC_DATABASE_NAME), config.getString(META_SYNC_TABLE_NAME)),
FabricType.valueOf(config.getStringOrDefault(META_SYNC_DATAHUB_DATASET_ENV))
);

this.tableName = config.getString(META_SYNC_TABLE_NAME);
this.databaseName = config.getString(META_SYNC_DATABASE_NAME);

// https://github.com/datahub-project/datahub/blob/0b105395e913cc47a59bdeed0c56d7c0d4b71b63/metadata-ingestion/src/datahub/emitter/mcp_builder.py#L69-L72
DatabaseKey databaseKey = DatabaseKey.builder()
.platform(config.getStringOrDefault(META_SYNC_DATAHUB_DATAPLATFORM_NAME))
.instance(config.getStringOrDefault(META_SYNC_DATAHUB_DATASET_ENV))
.instance(this.dataPlatformInstance.orElse(config.getStringOrDefault(META_SYNC_DATAHUB_DATASET_ENV)))
.database(this.databaseName)
.build();

Expand All @@ -77,6 +92,22 @@ public DatasetUrn getDatasetUrn() {
return this.datasetUrn;
}

public String getDataPlatform() {
return this.dataPlatform;
}

public DataPlatformUrn getDataPlatformUrn() {
return this.dataPlatformUrn;
}

public Option<String> getDataPlatformInstance() {
return this.dataPlatformInstance;
}

public Option<Urn> getDataPlatformInstanceUrn() {
return this.dataPlatformInstanceUrn;
}

public Urn getDatabaseUrn() {
return this.databaseUrn;
}
Expand All @@ -93,7 +124,22 @@ private static DataPlatformUrn createDataPlatformUrn(String platformUrn) {
return new DataPlatformUrn(platformUrn);
}

private static String createDatasetName(String databaseName, String tableName) {
private static Option<Urn> createDataPlatformInstanceUrn(DataPlatformUrn dataPlatformUrn, Option<String> dataPlatformInstance) {
if (dataPlatformInstance.isEmpty()) {
return Option.empty();
}
String dataPlatformInstanceStr = String.format("urn:li:dataPlatformInstance:(%s,%s)", dataPlatformUrn.toString(), dataPlatformInstance.get());
try {
return Option.of(Urn.createFromString(dataPlatformInstanceStr));
} catch (Exception e) {
throw new IllegalArgumentException(String.format("Failed to create DataPlatformInstance URN from string: %s", dataPlatformInstanceStr), e);
}
}

private static String createDatasetName(Option<String> dataPlatformInstance, String databaseName, String tableName) {
if (dataPlatformInstance.isPresent()) {
return String.format("%s.%s.%s", dataPlatformInstance.get(), databaseName, tableName);
}
return String.format("%s.%s", databaseName, tableName);
}
}
Loading
Loading