Skip to content

Fix Iceberg Integration tests #34686

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ task integrationTest(type: Test) {
exclude '**/BigQueryMetastoreCatalogIT.class'
}

maxParallelForks 4
maxParallelForks 1
classpath = sourceSets.test.runtimeClasspath
testClassesDirs = sourceSets.test.output.classesDirs
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class BigQueryMetastoreCatalogIT extends IcebergCatalogBaseIT {
private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryMetastoreCatalogIT");
static final String BQMS_CATALOG = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog";
static final String DATASET = "managed_iceberg_bqms_tests_" + System.nanoTime();;
static final long SALT = System.nanoTime();
private long salt = System.nanoTime();

@BeforeClass
public static void createDataset() throws IOException, InterruptedException {
Expand All @@ -62,11 +62,12 @@ public static void deleteDataset() {

@Override
public String tableId() {
return DATASET + "." + testName.getMethodName() + "_" + SALT;
return DATASET + "." + testName.getMethodName() + "_" + salt;
}

@Override
public Catalog createCatalog() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since createCatalog is called in setup() which is a non-static method, salt should also be non-static member

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

salt += System.nanoTime();
return CatalogUtil.loadCatalog(
BQMS_CATALOG,
"bqms_" + catalogName,
Expand All @@ -82,7 +83,7 @@ public Catalog createCatalog() {
public void catalogCleanup() {
for (TableIdentifier tableIdentifier : catalog.listTables(Namespace.of(DATASET))) {
// only delete tables that were created in this test run
if (tableIdentifier.name().contains(String.valueOf(SALT))) {
if (tableIdentifier.name().contains(String.valueOf(salt))) {
catalog.dropTable(tableIdentifier);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@
*/
public class HiveCatalogIT extends IcebergCatalogBaseIT {
private static HiveMetastoreExtension hiveMetastoreExtension;
private long salt = System.nanoTime();

private String testDb() {
return "test_db_" + testName.getMethodName();
}

@Override
public String tableId() {
return String.format("%s.%s", testDb(), "test_table");
return String.format("%s.%s", testDb(), "test_table" + "_" + salt);
}

@BeforeClass
Expand All @@ -73,6 +74,7 @@ public void catalogSetup() throws Exception {

@Override
public Catalog createCatalog() {
salt += System.nanoTime();
return CatalogUtil.loadCatalog(
HiveCatalog.class.getName(),
"hive_" + catalogName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@
* #numRecords()}.
*/
public abstract class IcebergCatalogBaseIT implements Serializable {
private static final long SETUP_TEARDOWN_SLEEP_MS = 5000;

public abstract Catalog createCatalog();

public abstract Map<String, Object> managedIcebergConfig(String tableId);
Expand All @@ -142,7 +144,7 @@ public void catalogSetup() throws Exception {}
public void catalogCleanup() throws Exception {}

public Integer numRecords() {
return 1000;
return OPTIONS.getRunner().equals(DirectRunner.class) ? 10 : 1000;
}

public String tableId() {
Expand All @@ -159,7 +161,8 @@ public static String warehouse(Class<? extends IcebergCatalogBaseIT> testClass)

@Before
public void setUp() throws Exception {
OPTIONS.as(DirectOptions.class).setTargetParallelism(3);
catalogName += System.nanoTime();
OPTIONS.as(DirectOptions.class).setTargetParallelism(1);
warehouse =
String.format(
"%s/%s/%s",
Expand All @@ -169,12 +172,14 @@ public void setUp() throws Exception {
warehouse = warehouse(getClass());
catalogSetup();
catalog = createCatalog();
Thread.sleep(SETUP_TEARDOWN_SLEEP_MS);
}

@After
public void cleanUp() throws Exception {
try {
catalogCleanup();
Thread.sleep(SETUP_TEARDOWN_SLEEP_MS);
} catch (Exception e) {
LOG.warn("Catalog cleanup failed.", e);
}
Expand All @@ -201,6 +206,7 @@ public void cleanUp() throws Exception {
.collect(Collectors.toList());
gcsUtil.remove(filesToDelete);
}
Thread.sleep(SETUP_TEARDOWN_SLEEP_MS);
} catch (Exception e) {
LOG.warn("Failed to clean up GCS files.", e);
}
Expand All @@ -216,9 +222,9 @@ public void cleanUp() throws Exception {

@Rule
public transient Timeout globalTimeout =
Timeout.seconds(OPTIONS.getRunner().equals(DirectRunner.class) ? 180 : 20 * 60);
Timeout.seconds(OPTIONS.getRunner().equals(DirectRunner.class) ? 300 : 20 * 60);

private static final int NUM_SHARDS = 10;
private static final int NUM_SHARDS = OPTIONS.getRunner().equals(DirectRunner.class) ? 1 : 10;
private static final Logger LOG = LoggerFactory.getLogger(IcebergCatalogBaseIT.class);
private static final Schema DOUBLY_NESTED_ROW_SCHEMA =
Schema.builder()
Expand Down
Loading