2222import os
2323import pytest
2424import time
25+ import uuid
2526
2627from datetime import datetime
2728from googleapiclient .discovery import build
@@ -41,17 +42,22 @@ def app():
4142 return flask .Flask (__name__ )
4243
4344
45+ def unique_job_name (label ):
46+ return datetime .now ().strftime ('{}-%Y%m%d-%H%M%S-{}' .format (
47+ label , uuid .uuid4 ().hex ))
48+
49+
4450def test_run_template_python_empty_args (app ):
4551 project = PROJECT
46- job = datetime . now (). strftime ( 'test_run_template_python-%Y%m%d-%H%M%S ' )
52+ job = unique_job_name ( 'test_run_template_empty ' )
4753 template = 'gs://dataflow-templates/latest/Word_Count'
4854 with pytest .raises (HttpError ):
4955 main .run (project , job , template )
5056
5157
5258def test_run_template_python (app ):
5359 project = PROJECT
54- job = datetime . now (). strftime ( 'test_run_template_python-%Y%m%d-%H%M%S ' )
60+ job = unique_job_name ( 'test_run_template_python' )
5561 template = 'gs://dataflow-templates/latest/Word_Count'
5662 parameters = {
5763 'inputFile' : 'gs://apache-beam-samples/shakespeare/kinglear.txt' ,
@@ -70,7 +76,7 @@ def test_run_template_http_empty_args(app):
7076def test_run_template_http_url (app ):
7177 args = {
7278 'project' : PROJECT ,
73- 'job' : datetime . now (). strftime ( 'test_run_template_url-%Y%m%d-%H%M%S ' ),
79+ 'job' : unique_job_name ( 'test_run_template_url' ),
7480 'template' : 'gs://dataflow-templates/latest/Word_Count' ,
7581 'inputFile' : 'gs://apache-beam-samples/shakespeare/kinglear.txt' ,
7682 'output' : 'gs://{}/dataflow/wordcount/outputs' .format (BUCKET ),
@@ -84,7 +90,7 @@ def test_run_template_http_url(http://github.com/app):
8490def test_run_template_http_data (app ):
8591 args = {
8692 'project' : PROJECT ,
87- 'job' : datetime . now (). strftime ( 'test_run_template_data-%Y%m%d-%H%M%S ' ),
93+ 'job' : unique_job_name ( 'test_run_template_data' ),
8894 'template' : 'gs://dataflow-templates/latest/Word_Count' ,
8995 'inputFile' : 'gs://apache-beam-samples/shakespeare/kinglear.txt' ,
9096 'output' : 'gs://{}/dataflow/wordcount/outputs' .format (BUCKET ),
@@ -98,7 +104,7 @@ def test_run_template_http_data(app):
98104def test_run_template_http_json (app ):
99105 args = {
100106 'project' : PROJECT ,
101- 'job' : datetime . now (). strftime ( 'test_run_template_json-%Y%m%d-%H%M%S ' ),
107+ 'job' : unique_job_name ( 'test_run_template_json' ),
102108 'template' : 'gs://dataflow-templates/latest/Word_Count' ,
103109 'inputFile' : 'gs://apache-beam-samples/shakespeare/kinglear.txt' ,
104110 'output' : 'gs://{}/dataflow/wordcount/outputs' .format (BUCKET ),
@@ -110,9 +116,17 @@ def test_run_template_http_json(app):
110116
111117
112118def dataflow_jobs_cancel (job_id ):
113- # Wait time until a job can be cancelled, as a best effort.
114- # If it fails to be cancelled, the job will run for ~8 minutes.
115- time .sleep (5 ) # seconds
119+ # Wait time until the job can be cancelled.
120+ state = None
121+ while state != 'JOB_STATE_RUNNING' :
122+ job = dataflow .projects ().jobs ().get (
123+ projectId = PROJECT ,
124+ jobId = job_id
125+ ).execute ()
126+ state = job ['currentState' ]
127+ time .sleep (1 )
128+
129+ # Cancel the Dataflow job.
116130 request = dataflow .projects ().jobs ().update (
117131 projectId = PROJECT ,
118132 jobId = job_id ,
0 commit comments