Wait for events using callbacks and Eventarc triggers

Your workflow might need to wait for an external process. You can use HTTP callbacks to wait for another service to make a request to a callback endpoint; that request resumes the execution of the workflow. You can also wait using polling.

Instead of using polling, this tutorial demonstrates how you can wait for events or Pub/Sub messages using HTTP callbacks and Eventarc triggers. Although you can trigger a workflow with events or Pub/Sub messages, you might want to halt that execution to wait for another event before continuing. For example, an event triggers a workflow to initiate a process, but the workflow needs to wait for another event that signals that the process is complete. You can implement this by having one workflow call back another workflow.

Create a Firestore database

Firestore stores your data in documents that contain fields mapping to values. These documents are stored in collections, which are containers for your documents that you can use to organize your data and build queries. Learn more about Firestore.

Note that each Google Cloud project is limited to one Firestore database. Complete the following steps if you need to create a new database.

Console

  1. In the Google Cloud console, go to the Firestore Get started page.

    Go to Get started

  2. Click Select Native Mode.

    For guidance on selecting a database mode and for a feature-by-feature comparison, see choosing between Native Mode and Datastore Mode.

  3. In the Select a location list, select nam5 (United States).

    The location applies to both the Firestore database and the App Engine application in your Google Cloud project. Once you create your database, you cannot change the location.

  4. Click Create database.

gcloud

To create a Firestore database, you must first create an App Engine application and then run the gcloud firestore databases create command:

gcloud app create --region=us-central
gcloud firestore databases create --region=us-central

You can ignore the us-central is not a valid Firestore location warning. App Engine and Firestore support the same locations, but the App Engine us-central (Iowa) region maps to the Firestore nam5 (United States) multi-region.

Create a Pub/Sub topic

This tutorial uses Pub/Sub as an event source. Create a Pub/Sub topic so that you can publish a message to it. Learn more about creating and managing topics.

Console

  1. In the Google Cloud console, go to the Pub/Sub Topics page.

    Go to Topics

  2. Click Create topic.

  3. In the Topic ID field, enter topic-callback.

  4. Accept the other defaults.

  5. Click Create topic.

gcloud

To create a topic, run the gcloud pubsub topics create command:

gcloud pubsub topics create topic-callback

Create a Cloud Storage bucket

This tutorial uses Cloud Storage as an event source. Create a Cloud Storage bucket so that you can upload a file to it. Learn more about creating storage buckets.

Console

  1. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Cloud Storage

  2. Click Create.

  3. For the Name of your bucket, enter PROJECT_ID-bucket-callback.

    The project ID is used in the callback-event-sample workflow to identify the bucket.

  4. Click Continue.

  5. For Location type, select Region, and then select us-central1 (Iowa).

  6. Accept the other defaults.

  7. Click Create.

gcloud

To create a bucket, run the gcloud storage buckets create command:

gcloud storage buckets create gs://PROJECT_ID-bucket-callback \
    --location=us-central1

The project ID is used in the callback-event-sample workflow to identify the bucket.

After the event sources are created, you can deploy the event receiver workflow.

Deploy a workflow that listens for events

The callback-event-listener workflow is triggered when a message is published to a Pub/Sub topic or when a file is uploaded to a Cloud Storage bucket. The workflow receives the event, retrieves the appropriate callback details from the Firestore database, and then sends an HTTP request to the callback endpoint.

Console

  1. In the Google Cloud console, go to the Workflows page:

    Go to Workflows

  2. Click Create.

  3. Enter a name for the new workflow: callback-event-listener.

  4. In the Region list, select us-central1.

  5. Select the Service account that you previously created.

  6. Click Next.

  7. In the workflow editor, enter the following definition for your workflow:

    main:
      params: [event]
      steps:
        - log_event:
            call: sys.log
            args:
              text: ${event}
              severity: INFO
        - init:
            assign:
              - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
              - event_source_tokens: ${text.split(event.source, "/")}
              - event_source_len: ${len(event_source_tokens)}
              - event_source: ${event_source_tokens[event_source_len - 1]}
              - doc_name: ${database_root + event_source}
        - get_document_for_event_source:
            try:
              call: googleapis.firestore.v1.projects.databases.documents.get
              args:
                name: ${doc_name}
              result: document
            except:
                as: e
                steps:
                    - known_errors:
                        switch:
                        - condition: ${e.code == 404}
                          return: ${"No callbacks for event source " + event_source}
                    - unhandled_exception:
                        raise: ${e}
        - process_callback_urls:
            steps:
              - check_fields_exist:
                  switch:
                  - condition: ${not("fields" in document)}
                    return: ${"No callbacks for event source " + event_source}
                  - condition: true
                    next: processFields
              - processFields:
                  for:
                      value: key
                      in: ${keys(document.fields)}
                      steps:
                          - extract_callback_url:
                              assign:
                                  - callback_url: ${document.fields[key]["stringValue"]}
                          - log_callback_url:
                              call: sys.log
                              args:
                                text: ${"Calling back url " + callback_url}
                                severity: INFO
                          - http_post:
                              call: http.post
                              args:
                                  url: ${callback_url}
                                  auth:
                                      type: OAuth2
                                  body:
                                      event: ${event}
  8. Click Deploy.

gcloud

  1. Create a source code file for your workflow:

    touch callback-event-listener.yaml
  2. In a text editor, copy the following workflow to your source code file:

    main:
      params: [event]
      steps:
        - log_event:
            call: sys.log
            args:
              text: ${event}
              severity: INFO
        - init:
            assign:
              - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
              - event_source_tokens: ${text.split(event.source, "/")}
              - event_source_len: ${len(event_source_tokens)}
              - event_source: ${event_source_tokens[event_source_len - 1]}
              - doc_name: ${database_root + event_source}
        - get_document_for_event_source:
            try:
              call: googleapis.firestore.v1.projects.databases.documents.get
              args:
                name: ${doc_name}
              result: document
            except:
                as: e
                steps:
                    - known_errors:
                        switch:
                        - condition: ${e.code == 404}
                          return: ${"No callbacks for event source " + event_source}
                    - unhandled_exception:
                        raise: ${e}
        - process_callback_urls:
            steps:
              - check_fields_exist:
                  switch:
                  - condition: ${not("fields" in document)}
                    return: ${"No callbacks for event source " + event_source}
                  - condition: true
                    next: processFields
              - processFields:
                  for:
                      value: key
                      in: ${keys(document.fields)}
                      steps:
                          - extract_callback_url:
                              assign:
                                  - callback_url: ${document.fields[key]["stringValue"]}
                          - log_callback_url:
                              call: sys.log
                              args:
                                text: ${"Calling back url " + callback_url}
                                severity: INFO
                          - http_post:
                              call: http.post
                              args:
                                  url: ${callback_url}
                                  auth:
                                      type: OAuth2
                                  body:
                                      event: ${event}
  3. Deploy the workflow by entering the following command:

    gcloud workflows deploy callback-event-listener \
        --source=callback-event-listener.yaml \
        --location=us-central1 \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    Replace SERVICE_ACCOUNT_NAME with the name of the service account that you previously created.

Deploy a workflow that waits for events

The callback-event-sample workflow stores its callback details in a Firestore database, halts its execution, and then waits for specific events to occur.

Console

  1. In the Google Cloud console, go to the Workflows page:

    Go to Workflows

  2. Click Create.

  3. Enter a name for the new workflow: callback-event-sample.

  4. In the Region list, select us-central1.

  5. Select the Service account that you previously created.

  6. Click Next.

  7. In the workflow editor, enter the following definition for your workflow:

    main:
      steps:
        - init:
            assign:
              - pubsub_topic: topic-callback
              - storage_bucket: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "-bucket-callback"}
        - await_pubsub_message:
            call: await_callback_event
            args:
              event_source: ${pubsub_topic}
            result: pubsub_event
        - await_storage_bucket:
            call: await_callback_event
            args:
              event_source: ${storage_bucket}
            result: storage_event
        - return_events:
            return:
                pubsub_event: ${pubsub_event}
                storage_event: ${storage_event}
    
    await_callback_event:
        params: [event_source]
        steps:
            - init:
                assign:
                  - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
                  - doc_name: ${database_root + event_source}
                  - execution_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
                  - firestore_key: ${"exec_" + text.split(execution_id, "-")[0]}
            - create_callback:
                call: events.create_callback_endpoint
                args:
                  http_callback_method: POST
                result: callback_details
            - save_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
                  body:
                    fields:
                      ${firestore_key}:
                        stringValue: ${callback_details.url}
            - log_and_await_callback:
                try:
                  steps:
                    - log_await_start:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Started waiting 1hr for an event from source " + event_source}
                    - await_callback:
                        call: events.await_callback
                        args:
                          callback: ${callback_details}
                          timeout: 3600
                        result: callback_request
                    - log_await_stop:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Stopped waiting for an event from source " + event_source}
                except:
                    as: e
                    steps:
                        - log_error:
                            call: sys.log
                            args:
                                severity: "ERROR"
                                text: ${"Received error " + e.message}
            - delete_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
            - check_null_event:
                switch:
                  - condition: ${callback_request == null}
                    return: null
            - log_await_result:
                call: sys.log
                args:
                  severity: INFO
                  data: ${callback_request.http_request.body.event}
            - return_event:
                return: ${callback_request.http_request.body.event}
  8. Click Deploy.

gcloud

  1. Create a source code file for your workflow:

    touch callback-event-sample.yaml
  2. In a text editor, copy the following workflow to your source code file:

    main:
      steps:
        - init:
            assign:
              - pubsub_topic: topic-callback
              - storage_bucket: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "-bucket-callback"}
        - await_pubsub_message:
            call: await_callback_event
            args:
              event_source: ${pubsub_topic}
            result: pubsub_event
        - await_storage_bucket:
            call: await_callback_event
            args:
              event_source: ${storage_bucket}
            result: storage_event
        - return_events:
            return:
                pubsub_event: ${pubsub_event}
                storage_event: ${storage_event}
    
    await_callback_event:
        params: [event_source]
        steps:
            - init:
                assign:
                  - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
                  - doc_name: ${database_root + event_source}
                  - execution_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
                  - firestore_key: ${"exec_" + text.split(execution_id, "-")[0]}
            - create_callback:
                call: events.create_callback_endpoint
                args:
                  http_callback_method: POST
                result: callback_details
            - save_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
                  body:
                    fields:
                      ${firestore_key}:
                        stringValue: ${callback_details.url}
            - log_and_await_callback:
                try:
                  steps:
                    - log_await_start:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Started waiting 1hr for an event from source " + event_source}
                    - await_callback:
                        call: events.await_callback
                        args:
                          callback: ${callback_details}
                          timeout: 3600
                        result: callback_request
                    - log_await_stop:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Stopped waiting for an event from source " + event_source}
                except:
                    as: e
                    steps:
                        - log_error:
                            call: sys.log
                            args:
                                severity: "ERROR"
                                text: ${"Received error " + e.message}
            - delete_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
            - check_null_event:
                switch:
                  - condition: ${callback_request == null}
                    return: null
            - log_await_result:
                call: sys.log
                args:
                  severity: INFO
                  data: ${callback_request.http_request.body.event}
            - return_event:
                return: ${callback_request.http_request.body.event}
  3. Deploy the workflow by entering the following command:

    gcloud workflows deploy callback-event-sample \
        --source=callback-event-sample.yaml \
        --location=us-central1 \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    Replace SERVICE_ACCOUNT_NAME with the name of the service account that you previously created.

Create an Eventarc trigger to route Pub/Sub events

An Eventarc trigger allows you to route events by specifying filters for the trigger, including the event source, and the target workflow. Create an Eventarc trigger to execute the callback-event-listener workflow as the result of publishing a message to a Pub/Sub topic. Learn more about triggering a workflow.

Console

  1. In the Google Cloud console, go to the Eventarc page.

    Go to Eventarc

  2. Click Create trigger.

  3. Type a Trigger name.

    For example, trigger-pubsub-events-listener.

  4. In the Event provider list, select Cloud Pub/Sub.

  5. In the Event list, under Custom, select google.cloud.pubsub.topic.v1.messagePublished.

  6. In the Select a Cloud Pub/Sub topic list, select the topic that you previously created.

  7. In the Region list, select us-central1 (Iowa).

  8. If prompted, grant the iam.serviceAccountTokenCreator role to the Pub/Sub service account.

  9. Select the Service account that you previously created.

  10. In the Event destination list, select Workflows.

  11. In the Select a workflow list, select the callback-event-listener workflow.

  12. Click Create.

gcloud

To create a trigger, run the gcloud eventarc triggers create command:

gcloud eventarc triggers create trigger-pubsub-events-listener \
    --location=us-central1 \
    --destination-workflow=callback-event-listener \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
    --transport-topic=topic-callback \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

Events are transformed and passed to the workflow execution as runtime arguments. Note that it can take up to 2 minutes for the new trigger to become active.

Create an Eventarc trigger to route Cloud Storage events

An Eventarc trigger allows you to route events by specifying filters for the trigger, including the event source, and the target workflow. Create an Eventarc trigger to execute the callback-event-listener workflow as the result of uploading a file to a Cloud Storage bucket. Learn more about triggering a workflow.

Console

  1. In the Google Cloud console, go to the Eventarc page.

    Go to Eventarc

  2. Click Create trigger.

  3. Type a Trigger name.

    For example, trigger-storage-events-listener.

  4. In the Event provider list, select Cloud Storage.

  5. In the Event list, under Direct, select google.cloud.storage.object.v1.finalized.

  6. In the Bucket list, browse for the bucket that you previously created and select it.

  7. In the Region list, based on your Cloud Storage bucket, accept the default of us-central1 (Iowa).

  8. If prompted, grant the iam.serviceAccountTokenCreator role to the Pub/Sub service account.

  9. Select the Service account that you previously created.

  10. In the Event destination list, select Workflows.

  11. In the Select a workflow list, select the callback-event-listener workflow.

  12. Click Create.

gcloud

To create a trigger, run the gcloud eventarc triggers create command:

gcloud eventarc triggers create trigger-storage-events-listener \
    --location=us-central1 \
    --destination-workflow=callback-event-listener \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.storage.object.v1.finalized" \
    --event-filters="bucket=PROJECT_ID-bucket-callback" \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

Events are transformed and passed to the workflow execution as runtime arguments. Note that it can take up to 2 minutes for the new trigger to become active.

Execute the primary workflow

Executing a workflow runs the current workflow definition associated with the workflow. Execute the callback-event-sample workflow. This is the primary workflow and it waits for specific events to occurs, only resuming its execution when the secondary workflow makes the appropriate callback requests.

Console

  1. In the Google Cloud console, go to the Workflows page.

    Go to Workflows

  2. On the Workflows page, click the callback-event-sample workflow to go to its details page.

  3. On the Workflow Details page, click Execute.

  4. Click Execute again.

    The workflow execution starts. While the execution runs, you should see an Execution state of Running and a log entry similar to the following: Started waiting 1hr for an event from source topic-callback.

gcloud

To execute a workflow, run the gcloud workflows run command:

gcloud workflows run callback-event-sample \
    --location=us-central1

The workflow execution starts. While the execution runs, you should see an execution state similar to the following:

Waiting for execution [a848a164-268a-449c-b2fe-396f32f2ed66] to complete...working...

Generate events and check the execution status

You can confirm that the results are as expected by generating events, viewing log entries, and checking the workflow execution status.

Publish a message

Publish a message to the Pub/Sub topic that you previously created.

Console

  1. In the Google Cloud console, go to the Pub/Sub Topics page.

    Go to Topics

  2. Click topic-callback.

  3. Click the Messages tab.

  4. Click Publish message.

  5. In the Message body field, enter Hello World.

  6. Click Publish.

gcloud

To publish a message, use the gcloud pubsub topics publish command:

gcloud pubsub topics publish topic-callback \
    --message="Hello World"

Upload an object

Upload a file to the Cloud Storage bucket that you previously created.

Console

  1. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  2. Click the name of the bucket that you previously created.

  3. In the Objects tab, do either of the following:

    • Drag and drop the desired file from your desktop or file manager to the main pane in the Google Cloud console.

    • Click Upload files, select the file that you want to upload, and then click Open.

gcloud

To upload a file, run the gcloud storage cp command:

gcloud storage cp OBJECT_LOCATION gs://PROJECT_ID-bucket-callback/

Replace OBJECT_LOCATION with the local path to your object. For example, random.txt.

View log entries and execution status

Confirm that the callback-event-sample workflow completed successfully.

Console

  1. In the Google Cloud console, go to the Workflows page.

    Go to Workflows

  2. On the Workflows page, click the callback-event-sample workflow to go to its details page.

  3. On the Workflow Details page, to retrieve the details for a particular execution, click the appropriate execution ID.

    The Execution state should be Succeeded and, in the Output pane, you should see the received Pub/Sub and Cloud Storage events.

gcloud

  1. Filter the log entries and return the output in JSON format:

    gcloud logging read "resource.type=workflows.googleapis.com/Workflow AND textPayload:calling OR textPayload:waiting" \
        --format=json
  2. Look for log entries similar to:

    "textPayload": "Stopped waiting for an event from source..."
    "textPayload": "Calling back url https://workflowexecutions.googleapis.com/v1/projects/..."
    "textPayload": "Started waiting 1hr for an event from source..."
    
  3. Check the status of the last execution attempt:

    gcloud workflows executions wait-last

    The result should be similar to the following:

    Using cached execution name: projects/1085953646031/locations/us-central1/workflows/callback-event-sample/executions/79929e4e-82c1-4da1-b068-f828034c01b7
    Waiting for execution [79929e4e-82c1-4da1-b068-f828034c01b7] to complete...done.
    [...]
    state: SUCCEEDED