2121import uuid
2222
2323import backoff
24- from google .api_core .exceptions import Aborted , NotFound , AlreadyExists
24+ from google .api_core .exceptions import Aborted , NotFound
2525from google .cloud import bigquery
2626from google .cloud import dataproc_v1 as dataproc
2727from google .cloud import storage
5050PROCESSING_PYTHON_FILE = f"gs://{ BUCKET_NAME } /{ BUCKET_BLOB } "
5151
5252
53- # Retry if we see a flaky 409 "subnet not ready" exception
54- @backoff .on_exception (backoff .expo , Aborted , max_tries = 3 )
55- @pytest .fixture (scope = "function" )
56- def test_dataproc_batch (test_bucket , bq_dataset ):
57- # check that the results table isnt there
58- with pytest .raises (NotFound ):
59- BQ_CLIENT .get_table (f"{ BQ_DATASET } .{ BQ_WRITE_TABLE } " )
53+ @pytest .fixture (scope = "module" )
54+ def test_dataproc_batch ():
6055
61- BATCH_ID = f"summit-dag-test-{ TEST_ID } " # Dataproc serverless only allows lowercase characters
56+ BATCH_ID = (
57+ f"summit-dag-test-{ TEST_ID } " # Dataproc serverless only allows lowercase characters
58+ )
6259 BATCH_CONFIG = {
6360 "pyspark_batch" : {
6461 "jar_file_uris" : [PYSPARK_JAR ],
@@ -71,27 +68,7 @@ def test_dataproc_batch(test_bucket, bq_dataset):
7168 },
7269 }
7370
74- # create a batch
75- dataproc_client = dataproc .BatchControllerClient (
76- client_options = {
77- "api_endpoint" : f"{ DATAPROC_REGION } -dataproc.googleapis.com:443"
78- }
79- )
80- request = dataproc .CreateBatchRequest (
81- parent = f"projects/{ PROJECT_ID } /regions/{ DATAPROC_REGION } " ,
82- batch = BATCH_CONFIG ,
83- batch_id = BATCH_ID ,
84- )
85-
86- # Make the request
87- operation = dataproc_client .create_batch (request = request )
88-
89- print ("Waiting for operation to complete..." )
90-
91- response = operation .result ()
92-
93- yield response
94-
71+ yield (BATCH_ID , BATCH_CONFIG )
9572 dataproc_client = dataproc .BatchControllerClient (
9673 client_options = {
9774 "api_endpoint" : f"{ DATAPROC_REGION } -dataproc.googleapis.com:443"
@@ -131,7 +108,7 @@ def test_bucket():
131108 bucket .delete (force = True )
132109
133110
134- @pytest .fixture (scope = "module" )
111+ @pytest .fixture (autouse = True )
135112def bq_dataset (test_bucket ):
136113 # Create dataset and table tfor test CSV
137114 BQ_CLIENT .create_dataset (BQ_DATASET )
@@ -168,9 +145,33 @@ def bq_dataset(test_bucket):
168145 print (f"Ignoring NotFound on cleanup, details: { e } " )
169146
170147
148+ # Retry if we see a flaky 409 "subnet not ready" exception
149+ @backoff .on_exception (backoff .expo , Aborted , max_tries = 3 )
171150def test_process (test_dataproc_batch ):
151+ # check that the results table isnt there
152+ with pytest .raises (NotFound ):
153+ BQ_CLIENT .get_table (f"{ BQ_DATASET } .{ BQ_WRITE_TABLE } " )
154+
155+ # create a batch
156+ dataproc_client = dataproc .BatchControllerClient (
157+ client_options = {
158+ "api_endpoint" : f"{ DATAPROC_REGION } -dataproc.googleapis.com:443"
159+ }
160+ )
161+ request = dataproc .CreateBatchRequest (
162+ parent = f"projects/{ PROJECT_ID } /regions/{ DATAPROC_REGION } " ,
163+ batch = test_dataproc_batch [1 ],
164+ batch_id = test_dataproc_batch [0 ],
165+ )
166+ # Make the request
167+ operation = dataproc_client .create_batch (request = request )
168+
169+ print ("Waiting for operation to complete..." )
170+
171+ response = operation .result ()
172172
173- print (test_dataproc_batch )
173+ # Handle the response
174+ print (response )
174175
175176 # check that the results table is there now
176177 assert BQ_CLIENT .get_table (f"{ BQ_DATASET } .{ BQ_WRITE_TABLE } " ).num_rows > 0
0 commit comments