Skip to content

Commit 6085a7a

Browse files
committed
Properly propagate schemas of Beam YAML Partition transform.
1 parent 781917a commit 6085a7a

File tree

2 files changed

+3
-0
lines changed

2 files changed

+3
-0
lines changed

sdks/python/apache_beam/yaml/yaml_mapping.py

+2
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,8 @@ def split(element):
814814
mapping_transform = mapping_transform.with_outputs(*output_set)
815815
splits = pcoll | mapping_transform.with_input_types(T).with_output_types(T)
816816
result = {out: getattr(splits, out) for out in output_set}
817+
for out in result.values():
818+
out.element_type = pcoll.element_type
817819
if error_output:
818820
result[error_output] = result[error_output] | map_errors_to_standard_format(
819821
pcoll.element_type)

sdks/python/apache_beam/yaml/yaml_mapping_test.py

+1
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ def test_partition(self):
212212
language: python
213213
outputs: [even, odd]
214214
''')
215+
self.assertEqual(result['even'].element_type, elements.element_type)
215216
assert_that(
216217
result['even'] | beam.Map(lambda x: x.element),
217218
equal_to(['banana', 'orange']),

0 commit comments

Comments
 (0)