Skip to content

Commit 3b48eb4

Browse files
leahecolenicain
andauthored
fix: add more dataproc error handling (GoogleCloudPlatform#9826)
* add more error handling * add teardown * remove accidental <<<<HEAD * refactor update * fix: Adds InvalidArgument as caught exception for cluster setup in backoff/retry (GoogleCloudPlatform#9824) * fix lint * remove yield * swap status value for name * fix lint --------- Co-authored-by: nicain <[email protected]>
1 parent c465b5c commit 3b48eb4

File tree

3 files changed

+27
-15
lines changed

3 files changed

+27
-15
lines changed

dataproc/snippets/create_cluster_test.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import uuid
1717

1818
import backoff
19-
from google.api_core.exceptions import (InternalServerError, InvalidArgument, NotFound,
19+
from google.api_core.exceptions import (AlreadyExists, InternalServerError, InvalidArgument, NotFound,
2020
ServiceUnavailable)
2121
from google.cloud import dataproc_v1 as dataproc
2222

@@ -27,11 +27,13 @@
2727
CLUSTER_NAME = "py-cc-test-{}".format(str(uuid.uuid4()))
2828

2929

30+
cluster_client = dataproc.ClusterControllerClient(
31+
client_options={"api_endpoint": f"{REGION}-dataproc.googleapis.com:443"}
32+
)
33+
34+
3035
@backoff.on_exception(backoff.expo, (Exception), max_tries=5)
3136
def teardown():
32-
cluster_client = dataproc.ClusterControllerClient(
33-
client_options={"api_endpoint": f"{REGION}-dataproc.googleapis.com:443"}
34-
)
3537
# Client library function
3638
try:
3739
operation = cluster_client.delete_cluster(
@@ -53,8 +55,13 @@ def test_cluster_create(capsys):
5355
# Wrapper function for client library function
5456
try:
5557
create_cluster.create_cluster(PROJECT_ID, REGION, CLUSTER_NAME)
58+
out, _ = capsys.readouterr()
59+
assert CLUSTER_NAME in out
60+
except AlreadyExists:
61+
request = dataproc.GetClusterRequest(project_id=PROJECT_ID, region=REGION, cluster_name=CLUSTER_NAME)
62+
response = cluster_client.get_cluster(request=request)
63+
assert response.status.state == dataproc.ClusterStatus.State.RUNNING # verify the cluster is in the RUNNING state
64+
out, _ = capsys.readouterr()
65+
assert CLUSTER_NAME in out
5666
finally:
5767
teardown()
58-
59-
out, _ = capsys.readouterr()
60-
assert CLUSTER_NAME in out

dataproc/snippets/submit_job_test.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import uuid
1717

1818
import backoff
19-
from google.api_core.exceptions import (AlreadyExists, InternalServerError, NotFound,
19+
from google.api_core.exceptions import (AlreadyExists, InternalServerError, InvalidArgument, NotFound,
2020
ServiceUnavailable)
2121
from google.cloud import dataproc_v1 as dataproc
2222
import pytest
@@ -36,7 +36,7 @@ def cluster_client():
3636
)
3737

3838

39-
@backoff.on_exception(backoff.expo, ServiceUnavailable, max_tries=5)
39+
@backoff.on_exception(backoff.expo, (ServiceUnavailable, InvalidArgument), max_tries=5)
4040
def setup_cluster(cluster_client, curr_cluster_name):
4141

4242
CLUSTER = {
@@ -85,7 +85,8 @@ def cluster_name(cluster_client):
8585
teardown_cluster(cluster_client, curr_cluster_name)
8686

8787

88-
@backoff.on_exception(backoff.expo, (InternalServerError, ServiceUnavailable), max_tries=5)
88+
# InvalidArgument is thrown when the subnetwork is not ready
89+
@backoff.on_exception(backoff.expo, (InvalidArgument, InternalServerError, ServiceUnavailable), max_tries=5)
8990
def test_submit_job(capsys, cluster_name):
9091
submit_job.submit_job(PROJECT_ID, REGION, cluster_name)
9192
out, _ = capsys.readouterr()

dataproc/snippets/update_cluster_test.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import backoff
2222
from google.api_core.exceptions import (
23+
AlreadyExists,
2324
Cancelled,
2425
InternalServerError,
2526
InvalidArgument,
@@ -57,11 +58,14 @@ def cluster_client():
5758

5859
@backoff.on_exception(backoff.expo, (ServiceUnavailable, InvalidArgument), max_tries=5)
5960
def setup_cluster(cluster_client):
60-
# Create the cluster.
61-
operation = cluster_client.create_cluster(
62-
request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER}
63-
)
64-
operation.result()
61+
try:
62+
# Create the cluster.
63+
operation = cluster_client.create_cluster(
64+
request={"project_id": PROJECT_ID, "region": REGION, "cluster": CLUSTER}
65+
)
66+
operation.result()
67+
except AlreadyExists:
68+
print("Cluster already exists, utilize existing cluster")
6569

6670

6771
@backoff.on_exception(backoff.expo, ServiceUnavailable, max_tries=5)

0 commit comments

Comments
 (0)