Skip to content

Commit a95fb5d

Browse files
feat(dataflow): Dataflow to Apache Iceberg with dynamic destinations (GoogleCloudPlatform#9645)
* docs(sample): Dataflow to Apache Iceberg with dynamic destinations * Fix linter errors * Bump Apache Beam version * Update copyright year * Address PR review feedback * Add code comments
1 parent 601aa2a commit a95fb5d

File tree

3 files changed

+157
-41
lines changed

3 files changed

+157
-41
lines changed

dataflow/snippets/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
<maven.compiler.source>11</maven.compiler.source>
3838
<maven.compiler.target>11</maven.compiler.target>
3939
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
40-
<apache_beam.version>2.63.0</apache_beam.version>
40+
<apache_beam.version>2.62.0</apache_beam.version>
4141
<slf4j.version>2.0.12</slf4j.version>
4242
<parquet.version>1.14.0</parquet.version>
4343
<iceberg.version>1.4.2</iceberg.version>
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.dataflow;
18+
19+
// [START dataflow_apache_iceberg_dynamic_destinations]
20+
import com.google.common.collect.ImmutableMap;
21+
import java.util.Arrays;
22+
import java.util.List;
23+
import java.util.Map;
24+
import org.apache.beam.sdk.Pipeline;
25+
import org.apache.beam.sdk.PipelineResult;
26+
import org.apache.beam.sdk.managed.Managed;
27+
import org.apache.beam.sdk.options.Description;
28+
import org.apache.beam.sdk.options.PipelineOptions;
29+
import org.apache.beam.sdk.options.PipelineOptionsFactory;
30+
import org.apache.beam.sdk.schemas.Schema;
31+
import org.apache.beam.sdk.transforms.Create;
32+
import org.apache.beam.sdk.transforms.JsonToRow;
33+
34+
public class ApacheIcebergDynamicDestinations {
35+
36+
// The schema for the table rows.
37+
public static final Schema SCHEMA = new Schema.Builder()
38+
.addInt64Field("id")
39+
.addStringField("name")
40+
.addStringField("airport")
41+
.build();
42+
43+
// The data to write to table, formatted as JSON strings.
44+
static final List<String> TABLE_ROWS = List.of(
45+
"{\"id\":0, \"name\":\"Alice\", \"airport\": \"ORD\" }",
46+
"{\"id\":1, \"name\":\"Bob\", \"airport\": \"SYD\" }",
47+
"{\"id\":2, \"name\":\"Charles\", \"airport\": \"ORD\" }"
48+
);
49+
50+
public interface Options extends PipelineOptions {
51+
@Description("The URI of the Apache Iceberg warehouse location")
52+
String getWarehouseLocation();
53+
54+
void setWarehouseLocation(String value);
55+
56+
@Description("The name of the Apache Iceberg catalog")
57+
String getCatalogName();
58+
59+
void setCatalogName(String value);
60+
}
61+
62+
// Write JSON data to Apache Iceberg, using dynamic destinations to determine the Iceberg table
63+
// where Dataflow writes each record. The JSON data contains a field named "airport". The
64+
// Dataflow pipeline writes to Iceberg tables with the naming pattern "flights-{airport}".
65+
public static void main(String[] args) {
66+
// Parse the pipeline options passed into the application. Example:
67+
// --runner=DirectRunner --warehouseLocation=$LOCATION --catalogName=$CATALOG \
68+
// For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
69+
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
70+
Pipeline pipeline = Pipeline.create(options);
71+
72+
// Configure the Iceberg source I/O
73+
Map catalogConfig = ImmutableMap.<String, Object>builder()
74+
.put("warehouse", options.getWarehouseLocation())
75+
.put("type", "hadoop")
76+
.build();
77+
78+
ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
79+
.put("catalog_name", options.getCatalogName())
80+
.put("catalog_properties", catalogConfig)
81+
// Route the incoming records based on the value of the "airport" field.
82+
.put("table", "flights-{airport}")
83+
// Specify which fields to keep from the input data.
84+
.put("keep", Arrays.asList("name", "id"))
85+
.build();
86+
87+
// Build the pipeline.
88+
pipeline
89+
// Read in-memory JSON data.
90+
.apply(Create.of(TABLE_ROWS))
91+
// Convert the JSON records to Row objects.
92+
.apply(JsonToRow.withSchema(SCHEMA))
93+
// Write each Row to Apache Iceberg.
94+
.apply(Managed.write(Managed.ICEBERG).withConfig(config));
95+
96+
// Run the pipeline.
97+
pipeline.run().waitUntilFinish();
98+
}
99+
}
100+
// [END dataflow_apache_iceberg_dynamic_destinations]

dataflow/snippets/src/test/java/com/example/dataflow/ApacheIcebergIT.java

Lines changed: 56 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,15 @@
1616

1717
package com.example.dataflow;
1818

19+
import static org.junit.Assert.assertEquals;
1920
import static org.junit.Assert.assertTrue;
2021

2122
import com.google.common.collect.ImmutableMap;
22-
import java.io.ByteArrayOutputStream;
2323
import java.io.IOException;
24-
import java.io.PrintStream;
2524
import java.nio.file.Files;
2625
import java.nio.file.Paths;
2726
import java.util.UUID;
27+
import org.apache.beam.sdk.PipelineResult;
2828
import org.apache.hadoop.conf.Configuration;
2929
import org.apache.hadoop.fs.Path;
3030
import org.apache.iceberg.CatalogProperties;
@@ -52,25 +52,19 @@
5252
import org.junit.Test;
5353

5454
public class ApacheIcebergIT {
55-
private ByteArrayOutputStream bout;
56-
private final PrintStream originalOut = System.out;
57-
58-
private static final String CATALOG_NAME = "local";
59-
private static final String TABLE_NAME = "table1";
60-
private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(TABLE_NAME);
61-
62-
// The output file that the Dataflow pipeline writes.
63-
private static final String OUTPUT_FILE_NAME_PREFIX = UUID.randomUUID().toString();
64-
private static final String OUTPUT_FILE_NAME = OUTPUT_FILE_NAME_PREFIX + "-00000-of-00001.txt";
6555

6656
private Configuration hadoopConf = new Configuration();
6757
private java.nio.file.Path warehouseDirectory;
6858
private String warehouseLocation;
6959
private Catalog catalog;
70-
private Table table;
60+
private static final String CATALOG_NAME = "local";
7161

62+
String outputFileNamePrefix = UUID.randomUUID().toString();
63+
String outputFileName = outputFileNamePrefix + "-00000-of-00001.txt";
7264

73-
private void createIcebergTable(Catalog catalog, TableIdentifier tableId) {
65+
private Table createIcebergTable(String name) {
66+
67+
TableIdentifier tableId = TableIdentifier.of(name);
7468

7569
// This schema represents an Iceberg table schema. It needs to match the
7670
// org.apache.beam.sdk.schemas.Schema that is defined in ApacheIcebergWrite. However, these
@@ -79,10 +73,10 @@ private void createIcebergTable(Catalog catalog, TableIdentifier tableId) {
7973
NestedField.required(1, "id", Types.LongType.get()),
8074
NestedField.optional(2, "name", Types.StringType.get()));
8175

82-
table = catalog.createTable(tableId, schema);
76+
return catalog.createTable(tableId, schema);
8377
}
8478

85-
private void writeTableRecord()
79+
private void writeTableRecord(Table table)
8680
throws IOException {
8781
GenericRecord record = GenericRecord.create(table.schema());
8882
record.setField("id", 0L);
@@ -109,72 +103,94 @@ private void writeTableRecord()
109103
.commit();
110104
}
111105

106+
private boolean tableContainsRecord(Table table, String data) {
107+
CloseableIterable<Record> records = IcebergGenerics.read(table).build();
108+
for (Record r : records) {
109+
if (r.toString().contains(data)) {
110+
return true;
111+
}
112+
}
113+
return false;
114+
}
115+
112116
@Before
113117
public void setUp() throws IOException {
114-
bout = new ByteArrayOutputStream();
115-
System.setOut(new PrintStream(bout));
116-
117118
// Create an Apache Iceberg catalog with a table.
118119
warehouseDirectory = Files.createTempDirectory("test-warehouse");
119120
warehouseLocation = "file:" + warehouseDirectory.toString();
120-
System.out.println(warehouseLocation);
121121
catalog =
122122
CatalogUtil.loadCatalog(
123123
CatalogUtil.ICEBERG_CATALOG_HADOOP,
124124
CATALOG_NAME,
125125
ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, warehouseLocation),
126126
hadoopConf);
127-
createIcebergTable(catalog, TABLE_IDENTIFIER);
127+
128128
}
129129

130130
@After
131131
public void tearDown() throws IOException {
132-
Files.deleteIfExists(Paths.get(OUTPUT_FILE_NAME));
133-
System.setOut(originalOut);
132+
Files.deleteIfExists(Paths.get(outputFileName));
134133
}
135134

136135
@Test
137136
public void testApacheIcebergWrite() {
137+
String tableName = "write_table";
138+
final Table table = createIcebergTable("write_table");
139+
138140
// Run the Dataflow pipeline.
139141
ApacheIcebergWrite.main(
140142
new String[] {
141143
"--runner=DirectRunner",
142144
"--warehouseLocation=" + warehouseLocation,
143145
"--catalogName=" + CATALOG_NAME,
144-
"--tableName=" + TABLE_NAME
146+
"--tableName=" + tableName
145147
});
146148

147149
// Verify that the pipeline wrote records to the table.
148-
Table table = catalog.loadTable(TABLE_IDENTIFIER);
149-
CloseableIterable<Record> records = IcebergGenerics.read(table)
150-
.build();
151-
for (Record r : records) {
152-
System.out.println(r);
153-
}
150+
assertTrue(tableContainsRecord(table, "0, Alice"));
151+
assertTrue(tableContainsRecord(table, "1, Bob"));
152+
assertTrue(tableContainsRecord(table, "2, Charles"));
153+
}
154+
155+
@Test
156+
public void testApacheIcebergDynamicDestinations() {
157+
final Table tableORD = createIcebergTable("flights-ORD");
158+
final Table tableSYD = createIcebergTable("flights-SYD");
159+
160+
// Run the Dataflow pipeline.
161+
ApacheIcebergDynamicDestinations.main(
162+
new String[] {
163+
"--runner=DirectRunner",
164+
"--warehouseLocation=" + warehouseLocation,
165+
"--catalogName=" + CATALOG_NAME
166+
});
154167

155-
String got = bout.toString();
156-
assertTrue(got.contains("0, Alice"));
157-
assertTrue(got.contains("1, Bob"));
158-
assertTrue(got.contains("2, Charles"));
168+
// Verify that the pipeline wrote records to the correct tables.
169+
assertTrue(tableContainsRecord(tableORD, "0, Alice"));
170+
assertTrue(tableContainsRecord(tableORD, "2, Charles"));
171+
assertTrue(tableContainsRecord(tableSYD, "1, Bob"));
159172
}
160173

161174
@Test
162175
public void testApacheIcebergRead() throws IOException {
176+
String tableName = "read_table";
177+
final Table table = createIcebergTable(tableName);
178+
163179
// Seed the Apache Iceberg table with data.
164-
writeTableRecord();
180+
writeTableRecord(table);
165181

166182
// Run the Dataflow pipeline.
167183
ApacheIcebergRead.main(
168184
new String[] {
169185
"--runner=DirectRunner",
170186
"--warehouseLocation=" + warehouseLocation,
171187
"--catalogName=" + CATALOG_NAME,
172-
"--tableName=" + TABLE_NAME,
173-
"--outputPath=" + OUTPUT_FILE_NAME_PREFIX
188+
"--tableName=" + tableName,
189+
"--outputPath=" + outputFileNamePrefix
174190
});
175191

176-
// Verify the pipeline wrote the table data to a local file.
177-
String output = Files.readString(Paths.get(OUTPUT_FILE_NAME));
192+
// Verify the pipeline wrote the table data to a text file.
193+
String output = Files.readString(Paths.get(outputFileName));
178194
assertTrue(output.contains("0:Person-0"));
179195
}
180-
}
196+
}

0 commit comments

Comments
 (0)