Skip to content

Commit a71dd28

Browse files
authored
Deep copy PipelineOptions and keep the input intact. (#34723)
* Deep copy PipelineOptions and keep the input intact. * Fix failed tests for FlinkRunner.
1 parent f2819ca commit a71dd28

File tree

2 files changed

+18
-3
lines changed

2 files changed

+18
-3
lines changed

sdks/python/apache_beam/pipeline.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949

5050
import abc
5151
import contextlib
52+
import copy
5253
import logging
5354
import os
5455
import re
@@ -171,7 +172,9 @@ def __init__(
171172

172173
if options is not None:
173174
if isinstance(options, PipelineOptions):
174-
self._options = options
175+
# Make a deep copy of options since they could be overwritten in later
176+
# steps.
177+
self._options = copy.deepcopy(options)
175178
else:
176179
raise ValueError(
177180
'Parameter options, if specified, must be of type PipelineOptions. '

sdks/python/apache_beam/runners/interactive/interactive_beam_test.py

+14-2
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,12 @@ def test_cleanup_by_a_pipeline(self):
395395
# Pipeline association is cleaned up.
396396
self.assertNotIn(p, self.clusters.pipelines)
397397
self.assertNotIn(p, dcm.pipelines)
398-
self.assertEqual(options.view_as(FlinkRunnerOptions).flink_master, '[auto]')
398+
# The internal option in the pipeline is overwritten.
399+
self.assertEqual(
400+
p.options.view_as(FlinkRunnerOptions).flink_master, '[auto]')
401+
# The original option is unchanged.
402+
self.assertEqual(
403+
options.view_as(FlinkRunnerOptions).flink_master, meta.master_url)
399404
# The cluster is unknown now.
400405
self.assertNotIn(meta, self.clusters.dataproc_cluster_managers)
401406
self.assertNotIn(meta.master_url, self.clusters.master_urls)
@@ -423,10 +428,17 @@ def test_not_cleanup_if_multiple_pipelines_share_a_manager(self):
423428
# Pipeline association of p is cleaned up.
424429
self.assertNotIn(p, self.clusters.pipelines)
425430
self.assertNotIn(p, dcm.pipelines)
426-
self.assertEqual(options.view_as(FlinkRunnerOptions).flink_master, '[auto]')
431+
# The internal option in the pipeline is overwritten.
432+
self.assertEqual(
433+
p.options.view_as(FlinkRunnerOptions).flink_master, '[auto]')
434+
# The original option is unchanged.
435+
self.assertEqual(
436+
options.view_as(FlinkRunnerOptions).flink_master, meta.master_url)
427437
# Pipeline association of p2 still presents.
428438
self.assertIn(p2, self.clusters.pipelines)
429439
self.assertIn(p2, dcm.pipelines)
440+
self.assertEqual(
441+
p2.options.view_as(FlinkRunnerOptions).flink_master, meta.master_url)
430442
self.assertEqual(
431443
options2.view_as(FlinkRunnerOptions).flink_master, meta.master_url)
432444
# The cluster is still known.

0 commit comments

Comments
 (0)