|
25 | 25 | import logging |
26 | 26 | import time |
27 | 27 |
|
| 28 | +# Unused dependency used to replicate bug. |
| 29 | +import matplotlib |
| 30 | +import numpy as np |
| 31 | + |
28 | 32 | import apache_beam as beam |
29 | 33 | from apache_beam.options.pipeline_options import PipelineOptions |
30 | 34 | import apache_beam.transforms.window as window |
31 | 35 |
|
32 | | -# Defines the BigQuery schema for the output table. |
33 | | -SCHEMA = ','.join([ |
34 | | - 'url:STRING', |
35 | | - 'num_reviews:INTEGER', |
36 | | - 'score:FLOAT64', |
37 | | - 'first_date:TIMESTAMP', |
38 | | - 'last_date:TIMESTAMP', |
39 | | -]) |
40 | | - |
41 | | - |
42 | | -def parse_json_message(message): |
43 | | - """Parse the input json message and add 'score' & 'processing_time' keys.""" |
44 | | - row = json.loads(message) |
45 | | - return { |
46 | | - 'url': row['url'], |
47 | | - 'score': 1.0 if row['review'] == 'positive' else 0.0, |
48 | | - 'processing_time': int(time.time()), |
49 | | - } |
50 | | - |
51 | 36 |
|
52 | | -def get_statistics(url_messages): |
53 | | - """Get statistics from the input URL messages.""" |
54 | | - url, messages = url_messages |
55 | | - return { |
56 | | - 'url': url, |
57 | | - 'num_reviews': len(messages), |
58 | | - 'score': sum(msg['score'] for msg in messages) / len(messages), |
59 | | - 'first_date': min(msg['processing_time'] for msg in messages), |
60 | | - 'last_date': max(msg['processing_time'] for msg in messages), |
61 | | - } |
| 37 | +def useless_numpy_function(x): |
| 38 | + return str(np.array(x)) |
62 | 39 |
|
63 | 40 |
|
64 | | -def run(args, input_subscription, output_table, window_interval): |
| 41 | +def run(args, output_text): |
65 | 42 | """Build and run the pipeline.""" |
66 | | - options = PipelineOptions(args, save_main_session=True, streaming=True) |
| 43 | + options = PipelineOptions(args, save_main_session=True) |
67 | 44 |
|
68 | 45 | with beam.Pipeline(options=options) as pipeline: |
69 | 46 |
|
70 | 47 | # Read the messages from PubSub and process them. |
71 | | - messages = ( |
| 48 | + _ = ( |
72 | 49 | pipeline |
73 | | - | 'Read from Pub/Sub' >> beam.io.ReadFromPubSub( |
74 | | - subscription=input_subscription).with_output_types(bytes) |
75 | | - | 'UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8')) |
76 | | - | 'Parse JSON messages' >> beam.Map(parse_json_message) |
77 | | - | 'Fixed-size windows' >> beam.WindowInto( |
78 | | - window.FixedWindows(int(window_interval), 0)) |
79 | | - | 'Add URL keys' >> beam.Map(lambda msg: (msg['url'], msg)) |
80 | | - | 'Group by URLs' >> beam.GroupByKey() |
81 | | - | 'Get statistics' >> beam.Map(get_statistics)) |
| 50 | + | "Create tiny collection" >> beam.Create(["a", "b", "c"]) |
| 51 | + | "Useless Numpy Function" >> beam.Map(useless_numpy_function) |
| 52 | + | "Write output" >> beam.io.Write(beam.io.WriteToText(output_text)) |
| 53 | + ) |
82 | 54 |
|
83 | | - # Output the results into BigQuery table. |
84 | | - _ = messages | 'Write to Big Query' >> beam.io.WriteToBigQuery( |
85 | | - output_table, schema=SCHEMA) |
86 | 55 |
|
87 | | - |
88 | | -if __name__ == '__main__': |
| 56 | +if __name__ == "__main__": |
89 | 57 | logging.getLogger().setLevel(logging.INFO) |
90 | 58 | parser = argparse.ArgumentParser() |
91 | 59 | parser.add_argument( |
92 | | - '--output_table', |
93 | | - help='Output BigQuery table for results specified as: ' |
94 | | - 'PROJECT:DATASET.TABLE or DATASET.TABLE.') |
95 | | - parser.add_argument( |
96 | | - '--input_subscription', |
97 | | - help='Input PubSub subscription of the form ' |
98 | | - '"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."') |
99 | | - parser.add_argument( |
100 | | - '--window_interval', |
101 | | - default=60, |
102 | | - help='Window interval in seconds for grouping incoming messages.') |
| 60 | + "--output_text", help="Path to output location (should be in a bucket)" |
| 61 | + ) |
| 62 | + |
103 | 63 | known_args, pipeline_args = parser.parse_known_args() |
104 | | - run(pipeline_args, known_args.input_subscription, known_args.output_table, |
105 | | - known_args.window_interval) |
| 64 | + run(pipeline_args, known_args.output_text) |
0 commit comments