From 6b95400fd9c5debd79ca5c38697a558b6a279577 Mon Sep 17 00:00:00 2001 From: Diego Ardila Date: Thu, 28 May 2020 22:14:15 +0000 Subject: [PATCH] repro --- .../flex-templates/streaming_beam/Dockerfile | 5 +- .../streaming_beam/metadata.json | 22 ++---- .../streaming_beam/requirements.txt | 1 + .../streaming_beam/streaming_beam.py | 77 +++++-------------- 4 files changed, 29 insertions(+), 76 deletions(-) diff --git a/dataflow/flex-templates/streaming_beam/Dockerfile b/dataflow/flex-templates/streaming_beam/Dockerfile index d07acfb0d52..d2feb393819 100644 --- a/dataflow/flex-templates/streaming_beam/Dockerfile +++ b/dataflow/flex-templates/streaming_beam/Dockerfile @@ -21,7 +21,8 @@ WORKDIR ${WORKDIR} COPY requirements.txt . COPY streaming_beam.py . -ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt" ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/streaming_beam.py" -RUN pip install -U -r ./requirements.txt +RUN pip install matplotlib +RUN pip install apache-beam[gcp]==2.21.0 +RUN pip install pip==9.0.3 \ No newline at end of file diff --git a/dataflow/flex-templates/streaming_beam/metadata.json b/dataflow/flex-templates/streaming_beam/metadata.json index f670cce28ea..903595f3404 100644 --- a/dataflow/flex-templates/streaming_beam/metadata.json +++ b/dataflow/flex-templates/streaming_beam/metadata.json @@ -1,23 +1,15 @@ { - "name": "Streaming beam Python flex template", - "description": "Streaming beam example for python flex template.", + "name": "Batch beam Python flex template", + "description": "Batch beam example for python flex template.", "parameters": [ { - "name": "input_subscription", - "label": "Input PubSub subscription.", - "helpText": "Name of the input PubSub subscription to consume from.", - "regexes": [ - "[/:-_.a-zA-Z0-9]+" - ] - }, - { - "name": "output_table", - "label": "BigQuery output table name.", - "helpText": "Name of the BigQuery output table name.", + "name": "output_text", + "label": "Output text location", + "helpText": "Path to output text location", "is_optional": true, "regexes": [ - "[/:-_.a-zA-Z0-9]+" + ".*" ] } ] -} +} \ No newline at end of file diff --git a/dataflow/flex-templates/streaming_beam/requirements.txt b/dataflow/flex-templates/streaming_beam/requirements.txt index 2ec854d1569..418474d37ba 100644 --- a/dataflow/flex-templates/streaming_beam/requirements.txt +++ b/dataflow/flex-templates/streaming_beam/requirements.txt @@ -1 +1,2 @@ apache-beam[gcp]==2.21.0 +matplotlib \ No newline at end of file diff --git a/dataflow/flex-templates/streaming_beam/streaming_beam.py b/dataflow/flex-templates/streaming_beam/streaming_beam.py index af1321e8e18..2d90772c4dd 100644 --- a/dataflow/flex-templates/streaming_beam/streaming_beam.py +++ b/dataflow/flex-templates/streaming_beam/streaming_beam.py @@ -25,81 +25,40 @@ import logging import time +# Unused dependency used to replicate bug. +import matplotlib +import numpy as np + import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions import apache_beam.transforms.window as window -# Defines the BigQuery schema for the output table. -SCHEMA = ','.join([ - 'url:STRING', - 'num_reviews:INTEGER', - 'score:FLOAT64', - 'first_date:TIMESTAMP', - 'last_date:TIMESTAMP', -]) - - -def parse_json_message(message): - """Parse the input json message and add 'score' & 'processing_time' keys.""" - row = json.loads(message) - return { - 'url': row['url'], - 'score': 1.0 if row['review'] == 'positive' else 0.0, - 'processing_time': int(time.time()), - } - -def get_statistics(url_messages): - """Get statistics from the input URL messages.""" - url, messages = url_messages - return { - 'url': url, - 'num_reviews': len(messages), - 'score': sum(msg['score'] for msg in messages) / len(messages), - 'first_date': min(msg['processing_time'] for msg in messages), - 'last_date': max(msg['processing_time'] for msg in messages), - } +def useless_numpy_function(x): + return str(np.array(x)) -def run(args, input_subscription, output_table, window_interval): +def run(args, output_text): """Build and run the pipeline.""" - options = PipelineOptions(args, save_main_session=True, streaming=True) + options = PipelineOptions(args, save_main_session=True) with beam.Pipeline(options=options) as pipeline: # Read the messages from PubSub and process them. - messages = ( + _ = ( pipeline - | 'Read from Pub/Sub' >> beam.io.ReadFromPubSub( - subscription=input_subscription).with_output_types(bytes) - | 'UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8')) - | 'Parse JSON messages' >> beam.Map(parse_json_message) - | 'Fixed-size windows' >> beam.WindowInto( - window.FixedWindows(int(window_interval), 0)) - | 'Add URL keys' >> beam.Map(lambda msg: (msg['url'], msg)) - | 'Group by URLs' >> beam.GroupByKey() - | 'Get statistics' >> beam.Map(get_statistics)) + | "Create tiny collection" >> beam.Create(["a", "b", "c"]) + | "Useless Numpy Function" >> beam.Map(useless_numpy_function) + | "Write output" >> beam.io.Write(beam.io.WriteToText(output_text)) + ) - # Output the results into BigQuery table. - _ = messages | 'Write to Big Query' >> beam.io.WriteToBigQuery( - output_table, schema=SCHEMA) - -if __name__ == '__main__': +if __name__ == "__main__": logging.getLogger().setLevel(logging.INFO) parser = argparse.ArgumentParser() parser.add_argument( - '--output_table', - help='Output BigQuery table for results specified as: ' - 'PROJECT:DATASET.TABLE or DATASET.TABLE.') - parser.add_argument( - '--input_subscription', - help='Input PubSub subscription of the form ' - '"projects//subscriptions/."') - parser.add_argument( - '--window_interval', - default=60, - help='Window interval in seconds for grouping incoming messages.') + "--output_text", help="Path to output location (should be in a bucket)" + ) + known_args, pipeline_args = parser.parse_known_args() - run(pipeline_args, known_args.input_subscription, known_args.output_table, - known_args.window_interval) + run(pipeline_args, known_args.output_text)