diff --git a/composer/workflows/airflow_db_cleanup.py b/composer/workflows/airflow_db_cleanup.py index 6bde3f67847..8ea9396d413 100644 --- a/composer/workflows/airflow_db_cleanup.py +++ b/composer/workflows/airflow_db_cleanup.py @@ -89,7 +89,7 @@ # Length to retain the log files if not already provided in the conf. If this # is set to 30, the job will remove those files that arE 30 days old or older. DEFAULT_MAX_DB_ENTRY_AGE_IN_DAYS = int( - Variable.get("airflow_db_cleanup__max_db_entry_age_in_days", 30) + Variable.get("airflow_db_cleanup__max_db_entry_age_in_days", 365) ) # Prints the database entries which will be getting deleted; set to False # to avoid printing large lists and slowdown process diff --git a/composer/workflows/airflow_print_log copy 2.py b/composer/workflows/airflow_print_log copy 2.py new file mode 100644 index 00000000000..7f3a4e84c8d --- /dev/null +++ b/composer/workflows/airflow_print_log copy 2.py @@ -0,0 +1,233 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START airflow_log_print] +""" +A maintenance workflow that you can deploy into Airflow to periodically clean +out the DagRun, TaskInstance, Log, XCom, Job DB and SlaMiss entries to avoid +having too much data in your Airflow MetaStore. + +## Authors + +The DAG is a fork of [teamclairvoyant repository.](https://github.com/teamclairvoyant/airflow-maintenance-dags/tree/master/db-cleanup) + +## Usage + +1. Update the global variables (SCHEDULE_INTERVAL, DAG_OWNER_NAME, + ALERT_EMAIL_ADDRESSES and ENABLE_DELETE) in the DAG with the desired values + +2. Modify the DATABASE_OBJECTS list to add/remove objects as needed. Each + dictionary in the list features the following parameters: + - airflow_db_model: Model imported from airflow.models corresponding to + a table in the airflow metadata database + - age_check_column: Column in the model/table to use for calculating max + date of data deletion + - keep_last: Boolean to specify whether to preserve last run instance + - keep_last_filters: List of filters to preserve data from deleting + during clean-up, such as DAG runs where the external trigger is set to 0. + - keep_last_group_by: Option to specify column by which to group the + database entries and perform aggregate functions. + +3. Create and Set the following Variables in the Airflow Web Server + (Admin -> Variables) + - airflow_db_cleanup__max_db_entry_age_in_days - integer - Length to retain + the log files if not already provided in the conf. If this is set to 30, + the job will remove those files that are 30 days old or older. + +4. Put the DAG in your gcs bucket. +""" +from datetime import timedelta +import logging +import os + +import airflow +from airflow import settings +from airflow.models import ( + # DAG, + # DagModel, + # DagRun, + Log, + # SlaMiss, + # TaskInstance, + Variable, + XCom, +) +from airflow.operators.python import PythonOperator +from airflow.utils import timezone +from airflow.version import version as airflow_version + +import dateutil.parser +from sqlalchemy import and_, func, text +from sqlalchemy.exc import ProgrammingError +from sqlalchemy.orm import load_only + +now = timezone.utcnow + +# airflow-db-cleanup +DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "") +START_DATE = airflow.utils.dates.days_ago(1) +SCHEDULE_INTERVAL = timedelta(minutes=10) +# Who is listed as the owner of this DAG in the Airflow Web Server +DAG_OWNER_NAME = "operations" +# List of email address to send email alerts to if this job fails +ALERT_EMAIL_ADDRESSES = [] + +DEFAULT_MAX_LOG_ENTRY_AGE_IN_SECONDS = int( + Variable.get("airflow_print_log__max_log_entry_age_in_seconds", 600) +) + +# List of all the objects that will be deleted. Comment out the DB objects you +# want to skip. +# DATABASE_OBJECTS = { +# Log: +# { +# "airflow_db_model": Log, +# "age_check_column": Log.dttm, +# "keep_last": False, +# "keep_last_filters": None, +# "keep_last_group_by": None, +# }, +# } + + +default_args = { + "owner": DAG_OWNER_NAME, + "depends_on_past": False, + "email": ALERT_EMAIL_ADDRESSES, + "email_on_failure": True, + "email_on_retry": False, + "start_date": START_DATE, + "retries": 1, + "retry_delay": timedelta(minutes=1), +} + +dag = DAG( + DAG_ID, + default_args=default_args, + schedule_interval=SCHEDULE_INTERVAL, + start_date=START_DATE, +) +if hasattr(dag, "doc_md"): + dag.doc_md = __doc__ +if hasattr(dag, "catchup"): + dag.catchup = False + + +def print_configuration_function(**context): + logging.info("Loading Configurations...") + dag_run_conf = context.get("dag_run").conf + logging.info("dag_run.conf: " + str(dag_run_conf)) + max_log_entry_age_in_seconds = None + if dag_run_conf: + max_log_entry_age_in_seconds = dag_run_conf.get("maxLogEntryAgeInSeconds", None) + logging.info("maxLogEntryAgeInSeconds from dag_run.conf: " + str(dag_run_conf)) + if max_log_entry_age_in_seconds is None or max_log_entry_age_in_seconds < 0: + logging.info( + "maxLogEntryAgeInSeconds conf variable isn't included or Variable " + + "value is less than 0. Using Default '" + + str(DEFAULT_MAX_LOG_ENTRY_AGE_IN_SECONDS) + + "'" + ) + max_log_entry_age_in_seconds = DEFAULT_MAX_LOG_ENTRY_AGE_IN_SECONDS + min_date = now() - timedelta(seconds=max_log_entry_age_in_seconds) + logging.info("Finished Loading Configurations") + logging.info("") + + logging.info("Configurations:") + logging.info("max_log_entry_age_in_seconds: " + str(max_log_entry_age_in_seconds)) + logging.info("min_date: " + str(min_date)) + logging.info("") + + logging.info("Setting max_execution_date to XCom for Downstream Processes") + context["ti"].xcom_push(key="min_date", value=min_date.isoformat()) + + + +print_configuration = PythonOperator( + task_id="print_configuration", + python_callable=print_configuration_function, + provide_context=True, + dag=dag, +) + + +def print_log_function(**context): + session = settings.Session() + + logging.info("Retrieving min_date from XCom") + min_date = context["ti"].xcom_pull( + task_ids=print_configuration.task_id, key="min_date" + ) + min_date = dateutil.parser.parse(min_date) # stored as iso8601 str in xcom + + logging.info("Configurations:") + logging.info("min_date: " + str(min_date)) + + try: + query = session.query(Log) + query = query.filter( + Log.dttm >= min_date, + ) + + entries_to_print = query.all() + for entry in entries_to_print: + logging.info(f"\tEntry: {str(entry)}") + logging.info(f"Process printed {str(len(entries_to_print))} entries.") + session.commit() + + except ProgrammingError as e: + logging.error(e) + + finally: + session.close() + + +def cleanup_sessions(): + session = settings.Session() + + try: + logging.info("Deleting sessions...") + count_statement = "SELECT COUNT(*) AS cnt FROM session WHERE expiry < now()::timestamp(0);" + before = session.execute(text(count_statement)).one_or_none()["cnt"] + session.execute(text("DELETE FROM session WHERE expiry < now()::timestamp(0);")) + after = session.execute(text(count_statement)).one_or_none()["cnt"] + logging.info("Deleted %s expired sessions.", (before - after)) + except Exception as err: + logging.exception(err) + + session.commit() + session.close() + + + +# cleanup_session_op = PythonOperator( +# task_id="cleanup_sessions", +# python_callable=cleanup_sessions, +# provide_context=True, +# dag=dag, +# ) + +# cleanup_session_op.set_downstream(analyze_op) + +printlog_op = PythonOperator( + task_id="print_log", + python_callable=print_log_function, + provide_context=True, + dag=dag, +) + +print_configuration.set_downstream(printlog_op) +printlog_op.set_downstream(analyze_op) + +# [END airflow_log_print] diff --git a/composer/workflows/airflow_print_log copy 3.py b/composer/workflows/airflow_print_log copy 3.py new file mode 100644 index 00000000000..7f3a4e84c8d --- /dev/null +++ b/composer/workflows/airflow_print_log copy 3.py @@ -0,0 +1,233 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START airflow_log_print] +""" +A maintenance workflow that you can deploy into Airflow to periodically clean +out the DagRun, TaskInstance, Log, XCom, Job DB and SlaMiss entries to avoid +having too much data in your Airflow MetaStore. + +## Authors + +The DAG is a fork of [teamclairvoyant repository.](https://github.com/teamclairvoyant/airflow-maintenance-dags/tree/master/db-cleanup) + +## Usage + +1. Update the global variables (SCHEDULE_INTERVAL, DAG_OWNER_NAME, + ALERT_EMAIL_ADDRESSES and ENABLE_DELETE) in the DAG with the desired values + +2. Modify the DATABASE_OBJECTS list to add/remove objects as needed. Each + dictionary in the list features the following parameters: + - airflow_db_model: Model imported from airflow.models corresponding to + a table in the airflow metadata database + - age_check_column: Column in the model/table to use for calculating max + date of data deletion + - keep_last: Boolean to specify whether to preserve last run instance + - keep_last_filters: List of filters to preserve data from deleting + during clean-up, such as DAG runs where the external trigger is set to 0. + - keep_last_group_by: Option to specify column by which to group the + database entries and perform aggregate functions. + +3. Create and Set the following Variables in the Airflow Web Server + (Admin -> Variables) + - airflow_db_cleanup__max_db_entry_age_in_days - integer - Length to retain + the log files if not already provided in the conf. If this is set to 30, + the job will remove those files that are 30 days old or older. + +4. Put the DAG in your gcs bucket. +""" +from datetime import timedelta +import logging +import os + +import airflow +from airflow import settings +from airflow.models import ( + # DAG, + # DagModel, + # DagRun, + Log, + # SlaMiss, + # TaskInstance, + Variable, + XCom, +) +from airflow.operators.python import PythonOperator +from airflow.utils import timezone +from airflow.version import version as airflow_version + +import dateutil.parser +from sqlalchemy import and_, func, text +from sqlalchemy.exc import ProgrammingError +from sqlalchemy.orm import load_only + +now = timezone.utcnow + +# airflow-db-cleanup +DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "") +START_DATE = airflow.utils.dates.days_ago(1) +SCHEDULE_INTERVAL = timedelta(minutes=10) +# Who is listed as the owner of this DAG in the Airflow Web Server +DAG_OWNER_NAME = "operations" +# List of email address to send email alerts to if this job fails +ALERT_EMAIL_ADDRESSES = [] + +DEFAULT_MAX_LOG_ENTRY_AGE_IN_SECONDS = int( + Variable.get("airflow_print_log__max_log_entry_age_in_seconds", 600) +) + +# List of all the objects that will be deleted. Comment out the DB objects you +# want to skip. +# DATABASE_OBJECTS = { +# Log: +# { +# "airflow_db_model": Log, +# "age_check_column": Log.dttm, +# "keep_last": False, +# "keep_last_filters": None, +# "keep_last_group_by": None, +# }, +# } + + +default_args = { + "owner": DAG_OWNER_NAME, + "depends_on_past": False, + "email": ALERT_EMAIL_ADDRESSES, + "email_on_failure": True, + "email_on_retry": False, + "start_date": START_DATE, + "retries": 1, + "retry_delay": timedelta(minutes=1), +} + +dag = DAG( + DAG_ID, + default_args=default_args, + schedule_interval=SCHEDULE_INTERVAL, + start_date=START_DATE, +) +if hasattr(dag, "doc_md"): + dag.doc_md = __doc__ +if hasattr(dag, "catchup"): + dag.catchup = False + + +def print_configuration_function(**context): + logging.info("Loading Configurations...") + dag_run_conf = context.get("dag_run").conf + logging.info("dag_run.conf: " + str(dag_run_conf)) + max_log_entry_age_in_seconds = None + if dag_run_conf: + max_log_entry_age_in_seconds = dag_run_conf.get("maxLogEntryAgeInSeconds", None) + logging.info("maxLogEntryAgeInSeconds from dag_run.conf: " + str(dag_run_conf)) + if max_log_entry_age_in_seconds is None or max_log_entry_age_in_seconds < 0: + logging.info( + "maxLogEntryAgeInSeconds conf variable isn't included or Variable " + + "value is less than 0. Using Default '" + + str(DEFAULT_MAX_LOG_ENTRY_AGE_IN_SECONDS) + + "'" + ) + max_log_entry_age_in_seconds = DEFAULT_MAX_LOG_ENTRY_AGE_IN_SECONDS + min_date = now() - timedelta(seconds=max_log_entry_age_in_seconds) + logging.info("Finished Loading Configurations") + logging.info("") + + logging.info("Configurations:") + logging.info("max_log_entry_age_in_seconds: " + str(max_log_entry_age_in_seconds)) + logging.info("min_date: " + str(min_date)) + logging.info("") + + logging.info("Setting max_execution_date to XCom for Downstream Processes") + context["ti"].xcom_push(key="min_date", value=min_date.isoformat()) + + + +print_configuration = PythonOperator( + task_id="print_configuration", + python_callable=print_configuration_function, + provide_context=True, + dag=dag, +) + + +def print_log_function(**context): + session = settings.Session() + + logging.info("Retrieving min_date from XCom") + min_date = context["ti"].xcom_pull( + task_ids=print_configuration.task_id, key="min_date" + ) + min_date = dateutil.parser.parse(min_date) # stored as iso8601 str in xcom + + logging.info("Configurations:") + logging.info("min_date: " + str(min_date)) + + try: + query = session.query(Log) + query = query.filter( + Log.dttm >= min_date, + ) + + entries_to_print = query.all() + for entry in entries_to_print: + logging.info(f"\tEntry: {str(entry)}") + logging.info(f"Process printed {str(len(entries_to_print))} entries.") + session.commit() + + except ProgrammingError as e: + logging.error(e) + + finally: + session.close() + + +def cleanup_sessions(): + session = settings.Session() + + try: + logging.info("Deleting sessions...") + count_statement = "SELECT COUNT(*) AS cnt FROM session WHERE expiry < now()::timestamp(0);" + before = session.execute(text(count_statement)).one_or_none()["cnt"] + session.execute(text("DELETE FROM session WHERE expiry < now()::timestamp(0);")) + after = session.execute(text(count_statement)).one_or_none()["cnt"] + logging.info("Deleted %s expired sessions.", (before - after)) + except Exception as err: + logging.exception(err) + + session.commit() + session.close() + + + +# cleanup_session_op = PythonOperator( +# task_id="cleanup_sessions", +# python_callable=cleanup_sessions, +# provide_context=True, +# dag=dag, +# ) + +# cleanup_session_op.set_downstream(analyze_op) + +printlog_op = PythonOperator( + task_id="print_log", + python_callable=print_log_function, + provide_context=True, + dag=dag, +) + +print_configuration.set_downstream(printlog_op) +printlog_op.set_downstream(analyze_op) + +# [END airflow_log_print] diff --git a/composer/workflows/airflow_print_log copy 4.py b/composer/workflows/airflow_print_log copy 4.py new file mode 100644 index 00000000000..f88afee29ea --- /dev/null +++ b/composer/workflows/airflow_print_log copy 4.py @@ -0,0 +1,161 @@ +# [START airflow_log_print] +from datetime import timedelta +import logging +import os +import json + +import airflow +from airflow import settings +from airflow.models import ( + DAG, + # DagModel, + # DagRun, + Log, + # SlaMiss, + # TaskInstance, + Variable, + XCom, +) +from airflow.operators.python import PythonOperator +from airflow.utils import timezone +from airflow.version import version as airflow_version + +import dateutil.parser +from sqlalchemy import and_, func, text +from sqlalchemy.exc import ProgrammingError +from sqlalchemy.orm import load_only + +now = timezone.utcnow + +# airflow-db-cleanup +DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "") +START_DATE = airflow.utils.dates.days_ago(1) +# How often to Run. Every 10 minutes +SCHEDULE_INTERVAL = timedelta(minutes=10) +# Who is listed as the owner of this DAG in the Airflow Web Server +DAG_OWNER_NAME = "operations" +# List of email address to send email alerts to if this job fails +ALERT_EMAIL_ADDRESSES = [] + +DEFAULT_MAX_LOG_ENTRY_AGE_IN_SECONDS = int( + Variable.get("airflow_print_log__max_log_entry_age_in_seconds", 600) +) + +TRACKING_FILE_NAME = "/data/._airflow_print_log_tracking.json" + +# Avoid logging everything is on the log table if the tracking file is not present +MAX_ENTRIES_TO_PRINT = 100 + + +default_args = { + "owner": DAG_OWNER_NAME, + "depends_on_past": False, + "email": ALERT_EMAIL_ADDRESSES, + "email_on_failure": True, + "email_on_retry": False, + "start_date": START_DATE, + "retries": 1, + "retry_delay": timedelta(minutes=1), +} + +dag = DAG( + DAG_ID, + default_args=default_args, + schedule_interval=SCHEDULE_INTERVAL, + start_date=START_DATE, +) +if hasattr(dag, "doc_md"): + dag.doc_md = __doc__ +if hasattr(dag, "catchup"): + dag.catchup = False + + +def read_tracking_info(): + try: + with open(TRACKING_FILE_NAME, "r") as f: + return json.load(f) + except FileNotFoundError: + return {} + + +def write_tracking_info(tracking_info): + with open(TRACKING_FILE_NAME, "w") as f: + json.dump(tracking_info, f) + + +def read_configuration_function(**context): + logging.info("Loading Configurations...") + dag_run_conf = context.get("dag_run").conf + logging.info("dag_run.conf: " + str(dag_run_conf)) + max_log_entry_age_in_seconds = None + if dag_run_conf: + max_log_entry_age_in_seconds = dag_run_conf.get("maxLogEntryAgeInSeconds", None) + logging.info("maxLogEntryAgeInSeconds from dag_run.conf: " + str(dag_run_conf)) + if max_log_entry_age_in_seconds is None or max_log_entry_age_in_seconds < 0: + logging.info( + "maxLogEntryAgeInSeconds conf variable isn't included or Variable " + + "value is less than 0. Using Default '" + + str(DEFAULT_MAX_LOG_ENTRY_AGE_IN_SECONDS) + + "'" + ) + max_log_entry_age_in_seconds = DEFAULT_MAX_LOG_ENTRY_AGE_IN_SECONDS + min_date = now() - timedelta(seconds=max_log_entry_age_in_seconds) + + logging.info("Finished Loading Configurations") + logging.info("Configurations:") + logging.info("max_log_entry_age_in_seconds: " + str(max_log_entry_age_in_seconds)) + logging.info("min_date: " + str(min_date)) + logging.info("Setting max_execution_date to XCom for Downstream Processes") + context["ti"].xcom_push(key="min_date", value=min_date.isoformat()) + + + +read_configuration = PythonOperator( + task_id="print_configuration", + python_callable=read_configuration_function, + provide_context=True, + dag=dag, +) + + +def print_log_function(**context): + session = settings.Session() + + logging.info("Retrieving min_date from XCom") + min_date = context["ti"].xcom_pull( + task_ids=read_configuration.task_id, key="min_date" + ) + min_date = dateutil.parser.parse(min_date) # stored as iso8601 str in xcom + + logging.info("Configurations:") + logging.info("min_date: " + str(min_date)) + + try: + query = session.query(Log) + query = query.filter( + Log.dttm >= min_date, + ) + + entries_to_print = query.all() + for entry in entries_to_print: + logging.info(f"\tEntry: {str(entry)}") + logging.info(f"Process printed {str(len(entries_to_print))} entries.") + session.commit() + + except ProgrammingError as e: + logging.error(e) + + finally: + session.close() + + +printlog_op = PythonOperator( + task_id="print_log", + python_callable=print_log_function, + provide_context=True, + dag=dag, +) + +read_configuration.set_downstream(printlog_op) + +# [END airflow_log_print] diff --git a/composer/workflows/airflow_print_log copy.py b/composer/workflows/airflow_print_log copy.py new file mode 100644 index 00000000000..2d5d8222823 --- /dev/null +++ b/composer/workflows/airflow_print_log copy.py @@ -0,0 +1,292 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START airflow_log_print] +""" +A maintenance workflow that you can deploy into Airflow to periodically clean +out the DagRun, TaskInstance, Log, XCom, Job DB and SlaMiss entries to avoid +having too much data in your Airflow MetaStore. + +## Authors + +The DAG is a fork of [teamclairvoyant repository.](https://github.com/teamclairvoyant/airflow-maintenance-dags/tree/master/db-cleanup) + +## Usage + +1. Update the global variables (SCHEDULE_INTERVAL, DAG_OWNER_NAME, + ALERT_EMAIL_ADDRESSES and ENABLE_DELETE) in the DAG with the desired values + +2. Modify the DATABASE_OBJECTS list to add/remove objects as needed. Each + dictionary in the list features the following parameters: + - airflow_db_model: Model imported from airflow.models corresponding to + a table in the airflow metadata database + - age_check_column: Column in the model/table to use for calculating max + date of data deletion + - keep_last: Boolean to specify whether to preserve last run instance + - keep_last_filters: List of filters to preserve data from deleting + during clean-up, such as DAG runs where the external trigger is set to 0. + - keep_last_group_by: Option to specify column by which to group the + database entries and perform aggregate functions. + +3. Create and Set the following Variables in the Airflow Web Server + (Admin -> Variables) + - airflow_db_cleanup__max_db_entry_age_in_days - integer - Length to retain + the log files if not already provided in the conf. If this is set to 30, + the job will remove those files that are 30 days old or older. + +4. Put the DAG in your gcs bucket. +""" +from datetime import timedelta +import logging +import os + +import airflow +from airflow import settings +from airflow.models import ( + DAG, + DagModel, + DagRun, + Log, + SlaMiss, + TaskInstance, + Variable, + XCom, +) +from airflow.operators.python import PythonOperator +from airflow.utils import timezone +from airflow.version import version as airflow_version + +import dateutil.parser +from sqlalchemy import and_, func, text +from sqlalchemy.exc import ProgrammingError +from sqlalchemy.orm import load_only + +now = timezone.utcnow + +# airflow-db-cleanup +DAG_ID = os.path.basename(__file__).replace(".pyc", "").replace(".py", "") +START_DATE = airflow.utils.dates.days_ago(1) +# How often to Run. @daily - Once a day at Midnight (UTC) +SCHEDULE_INTERVAL = "@daily" +# Who is listed as the owner of this DAG in the Airflow Web Server +DAG_OWNER_NAME = "operations" +# List of email address to send email alerts to if this job fails +ALERT_EMAIL_ADDRESSES = [] + +DEFAULT_MAX_LOG_ENTRY_AGE_IN_SECONDS = int( + Variable.get("airflow_print_log__max_log_entry_age_in_seconds", 3600) +) + +# List of all the objects that will be deleted. Comment out the DB objects you +# want to skip. +DATABASE_OBJECTS = [ + { + "airflow_db_model": Log, + "age_check_column": Log.dttm, + "keep_last": False, + "keep_last_filters": None, + "keep_last_group_by": None, + }, +] + + +default_args = { + "owner": DAG_OWNER_NAME, + "depends_on_past": False, + "email": ALERT_EMAIL_ADDRESSES, + "email_on_failure": True, + "email_on_retry": False, + "start_date": START_DATE, + "retries": 1, + "retry_delay": timedelta(minutes=1), +} + +dag = DAG( + DAG_ID, + default_args=default_args, + schedule_interval=SCHEDULE_INTERVAL, + start_date=START_DATE, +) +if hasattr(dag, "doc_md"): + dag.doc_md = __doc__ +if hasattr(dag, "catchup"): + dag.catchup = False + + +def print_configuration_function(**context): + logging.info("Loading Configurations...") + dag_run_conf = context.get("dag_run").conf + logging.info("dag_run.conf: " + str(dag_run_conf)) + max_log_entry_age_in_seconds = None + if dag_run_conf: + max_log_entry_age_in_seconds = dag_run_conf.get("maxLogEntryAgeInSeconds", None) + logging.info("maxLogEntryAgeInSeconds from dag_run.conf: " + str(dag_run_conf)) + if max_log_entry_age_in_seconds is None or max_log_entry_age_in_seconds < 0: + logging.info( + "maxLogEntryAgeInSeconds conf variable isn't included or Variable " + + "value is less than 0. Using Default '" + + str(DEFAULT_MAX_LOG_ENTRY_AGE_IN_SECONDS) + + "'" + ) + max_log_entry_age_in_seconds = DEFAULT_MAX_LOG_ENTRY_AGE_IN_SECONDS + min_date = now() - timedelta(seconds=max_log_entry_age_in_seconds) + logging.info("Finished Loading Configurations") + logging.info("") + + logging.info("Configurations:") + logging.info("max_log_entry_age_in_seconds: " + str(max_log_entry_age_in_seconds)) + logging.info("min_date: " + str(min_date)) + logging.info("") + + logging.info("Setting max_execution_date to XCom for Downstream Processes") + context["ti"].xcom_push(key="min_date", value=min_date.isoformat()) + + + +print_configuration = PythonOperator( + task_id="print_configuration", + python_callable=print_configuration_function, + provide_context=True, + dag=dag, +) + + +def build_query( + session, + airflow_db_model, + age_check_column, + min_date, +): + query = session.query(airflow_db_model) + + logging.info("INITIAL QUERY : " + str(query)) + + query = query.filter( + age_check_column >= min_date, + ) + + return query + + +def print_query(query, airflow_db_model, age_check_column): + entries_to_print = query.all() + + logging.info("Query: " + str(query)) + for entry in entries_to_print: + date = str(entry.__dict__[str(age_check_column).split(".")[1]]) + logging.info("\tEntry: " + str(entry) + ", Date: " + date) + + logging.info( + "Process printed " + + str(len(entries_to_print)) + + " " + + str(airflow_db_model.__name__) + + "(s)" + ) + + +def print_log_function(**context): + session = settings.Session() + + logging.info("Retrieving min_date from XCom") + min_date = context["ti"].xcom_pull( + task_ids=print_configuration.task_id, key="min_date" + ) + min_date = dateutil.parser.parse(min_date) # stored as iso8601 str in xcom + + airflow_db_model = context["params"].get("airflow_db_model") + state = context["params"].get("state") + age_check_column = context["params"].get("age_check_column") + + logging.info("Configurations:") + logging.info("min_date: " + str(min_date)) + logging.info("session: " + str(session)) + logging.info("airflow_db_model: " + str(airflow_db_model)) + logging.info("state: " + str(state)) + logging.info("age_check_column: " + str(age_check_column)) + + logging.info("") + + try: + query = build_query( + session, + airflow_db_model, + age_check_column, + min_date, + ) + print_query(query, airflow_db_model, age_check_column) + session.commit() + + logging.info("Finished Running Cleanup Process") + + except ProgrammingError as e: + logging.error(e) + logging.error( + str(airflow_db_model) + " is not present in the metadata. " "Skipping..." + ) + + finally: + session.close() + + +def cleanup_sessions(): + session = settings.Session() + + try: + logging.info("Deleting sessions...") + count_statement = "SELECT COUNT(*) AS cnt FROM session WHERE expiry < now()::timestamp(0);" + before = session.execute(text(count_statement)).one_or_none()["cnt"] + session.execute(text("DELETE FROM session WHERE expiry < now()::timestamp(0);")) + after = session.execute(text(count_statement)).one_or_none()["cnt"] + logging.info("Deleted %s expired sessions.", (before - after)) + except Exception as err: + logging.exception(err) + + session.commit() + session.close() + + +def analyze_db(): + pass + # session = settings.Session() + # session.execute("ANALYZE") + # session.commit() + # session.close() + + +analyze_op = PythonOperator( + task_id="analyze_query", python_callable=analyze_db, provide_context=True, dag=dag +) + +# cleanup_session_op = PythonOperator( +# task_id="cleanup_sessions", +# python_callable=cleanup_sessions, +# provide_context=True, +# dag=dag, +# ) + +# cleanup_session_op.set_downstream(analyze_op) + +printlog_op = PythonOperator( + task_id="print_log", + python_callable=print_log_function, + params=DATABASE_OBJECTS[0], + provide_context=True, + dag=dag, +) + +print_configuration.set_downstream(printlog_op) +printlog_op.set_downstream(analyze_op) + +# [END airflow_log_print] diff --git a/composer/workflows/gke_operator.py b/composer/workflows/gke_operator.py index 2f1eaa62c8a..b5e81c3725c 100644 --- a/composer/workflows/gke_operator.py +++ b/composer/workflows/gke_operator.py @@ -39,7 +39,7 @@ # [START composer_gkeoperator_affinity] # [START composer_gkeoperator_fullconfig] # TODO(developer): update with your values - PROJECT_ID = "my-project-id" + PROJECT_ID = "quick-test-project-291118" # It is recommended to use regional clusters for increased reliability # though passing a zone in the location parameter is also valid CLUSTER_REGION = "us-west1" @@ -236,8 +236,8 @@ # [END composer_gkeoperator_delete_cluster] create_cluster >> kubernetes_min_pod >> delete_cluster - create_cluster >> kubernetes_full_pod >> delete_cluster - create_cluster >> kubernetes_affinity_ex >> delete_cluster - create_cluster >> kubenetes_template_ex >> delete_cluster + # create_cluster >> kubernetes_full_pod >> delete_cluster + # create_cluster >> kubernetes_affinity_ex >> delete_cluster + # create_cluster >> kubenetes_template_ex >> delete_cluster # [END composer_gkeoperator] diff --git a/composer/workflows/gke_operator_pod_only.py b/composer/workflows/gke_operator_pod_only.py new file mode 100644 index 00000000000..f0c7231eec0 --- /dev/null +++ b/composer/workflows/gke_operator_pod_only.py @@ -0,0 +1,152 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# [START composer_gkeoperator] + + +from airflow import models +from airflow.providers.google.cloud.operators.kubernetes_engine import ( + GKECreateClusterOperator, + GKEDeleteClusterOperator, + GKEStartPodOperator, +) +from airflow.utils.dates import days_ago + +from kubernetes.client import models as k8s_models + + +with models.DAG( + "example_gcp_gke_pod_only", + schedule_interval=None, # Override to match your needs + start_date=days_ago(1), + tags=["example"], +) as dag: + # [START composer_gkeoperator_minconfig] + # [START composer_gkeoperator_fullconfig] + # TODO(developer): update with your values + PROJECT_ID = "quick-test-project-291118" + # It is recommended to use regional clusters for increased reliability + # though passing a zone in the location parameter is also valid + CLUSTER_REGION = "europe-west8" + CLUSTER_NAME = "europe-west8-tim-test-92601621-gke" + # [END composer_gkeoperator_minconfig] + # [END composer_gkeoperator_fullconfig] + # CLUSTER = { + # "name": CLUSTER_NAME, + # "node_pools": [ + # {"name": "pool-0", "initial_node_count": 1}, + # {"name": "pool-1", "initial_node_count": 1}, + # ], + # } + + # [START composer_gkeoperator_minconfig] + kubernetes_min_pod = GKEStartPodOperator( + # The ID specified for the task. + task_id="pod-ex-minimum", + # Name of task you want to run, used to generate Pod ID. + name="pod-ex-minimum", + project_id=PROJECT_ID, + location=CLUSTER_REGION, + cluster_name=CLUSTER_NAME, + # Entrypoint of the container, if not specified the Docker container's + # entrypoint is used. The cmds parameter is templated. + cmds=["echo"], + # The namespace to run within Kubernetes, default namespace is + # `default`. + namespace="composer-user-workloads", + # Docker image specified. Defaults to hub.docker.com, but any fully + # qualified URLs will point to a custom repository. Supports private + # gcr.io images if the Composer Environment is under the same + # project-id as the gcr.io images and the service account that Composer + # uses has permission to access the Google Container Registry + # (the default service account has permission) + image="gcr.io/gcp-runtimes/ubuntu_18_0_4", + ) + # [END composer_gkeoperator_minconfig] + + # [START composer_gkeoperator_fullconfig] + kubernetes_full_pod = GKEStartPodOperator( + task_id="ex-all-configs", + name="full", + project_id=PROJECT_ID, + location=CLUSTER_REGION, + cluster_name=CLUSTER_NAME, + namespace="composer-user-workloads", + image="perl:5.34.0", + # Entrypoint of the container, if not specified the Docker container's + # entrypoint is used. The cmds parameter is templated. + cmds=["perl"], + # Arguments to the entrypoint. The docker image's CMD is used if this + # is not provided. The arguments parameter is templated. + arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"], + # The secrets to pass to Pod, the Pod will fail to create if the + # secrets you specify in a Secret object do not exist in Kubernetes. + secrets=[], + # Labels to apply to the Pod. + labels={"pod-label": "label-name"}, + # Timeout to start up the Pod, default is 120. + startup_timeout_seconds=120, + # The environment variables to be initialized in the container + # env_vars are templated. + env_vars={"EXAMPLE_VAR": "/example/value"}, + # If true, logs stdout output of container. Defaults to True. + get_logs=True, + # Determines when to pull a fresh image, if 'IfNotPresent' will cause + # the Kubelet to skip pulling an image if it already exists. If you + # want to always pull a new image, set it to 'Always'. + image_pull_policy="Always", + # Annotations are non-identifying metadata you can attach to the Pod. + # Can be a large range of data, and can include characters that are not + # permitted by labels. + annotations={"key1": "value1"}, + # Optional resource specifications for Pod, this will allow you to + # set both cpu and memory limits and requirements. + # Prior to Airflow 2.3 and the cncf providers package 5.0.0 + # resources were passed as a dictionary. This change was made in + # https://github.com/apache/airflow/pull/27197 + # Additionally, "memory" and "cpu" were previously named + # "limit_memory" and "limit_cpu" + # resources={'limit_memory': "250M", 'limit_cpu': "100m"}, + container_resources=k8s_models.V1ResourceRequirements( + limits={"memory": "250M", "cpu": "100m"}, + ), + # If true, the content of /airflow/xcom/return.json from container will + # also be pushed to an XCom when the container ends. + do_xcom_push=True, + # List of Volume objects to pass to the Pod. + volumes=[], + # List of VolumeMount objects to pass to the Pod. + volume_mounts=[], + # Affinity determines which nodes the Pod can run on based on the + # config. For more information see: + # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/ + affinity={}, + ) + # [END composer_gkeoperator_fullconfig] + # # [START composer_gkeoperator_delete_cluster] + # delete_cluster = GKEDeleteClusterOperator( + # task_id="delete_cluster", + # name=CLUSTER_NAME, + # project_id=PROJECT_ID, + # location=CLUSTER_REGION, + # ) + # # [END composer_gkeoperator_delete_cluster] + + # create_cluster >> kubernetes_min_pod >> delete_cluster + # create_cluster >> kubernetes_full_pod >> delete_cluster + # create_cluster >> kubernetes_affinity_ex >> delete_cluster + # create_cluster >> kubenetes_template_ex >> delete_cluster + +# [END composer_gkeoperator] diff --git a/composer/workflows/kubernetes_pod_operator_default.py b/composer/workflows/kubernetes_pod_operator_default.py new file mode 100644 index 00000000000..ded5d5f45fd --- /dev/null +++ b/composer/workflows/kubernetes_pod_operator_default.py @@ -0,0 +1,245 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START composer_2_kubernetespodoperator] +"""Example DAG using KubernetesPodOperator.""" +import datetime + +from airflow import models +from airflow.kubernetes.secret import Secret +from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( + KubernetesPodOperator, +) +from kubernetes.client import models as k8s_models + +# A Secret is an object that contains a small amount of sensitive data such as +# a password, a token, or a key. Such information might otherwise be put in a +# Pod specification or in an image; putting it in a Secret object allows for +# more control over how it is used, and reduces the risk of accidental +# exposure. +# [START composer_2_kubernetespodoperator_secretobject] +secret_env = Secret( + # Expose the secret as environment variable. + deploy_type="env", + # The name of the environment variable, since deploy_type is `env` rather + # than `volume`. + deploy_target="SQL_CONN", + # Name of the Kubernetes Secret + secret="airflow-secrets", + # Key of a secret stored in this Secret object + key="sql_alchemy_conn", +) +secret_volume = Secret( + deploy_type="volume", + # Path where we mount the secret as volume + deploy_target="/var/secrets/google", + # Name of Kubernetes Secret + secret="service-account", + # Key in the form of service account file name + key="service-account.json", +) +# [END composer_2_kubernetespodoperator_secretobject] +# If you are running Airflow in more than one time zone +# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html +# for best practices +YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1) + +# If a Pod fails to launch, or has an error occur in the container, Airflow +# will show the task as failed, as well as contain all of the task logs +# required to debug. +with models.DAG( + dag_id="composer_sample_kubernetes_pod", + schedule_interval=datetime.timedelta(days=1), + start_date=YESTERDAY, +) as dag: + # Only name, namespace, image, and task_id are required to create a + # KubernetesPodOperator. In Cloud Composer, currently the operator defaults + # to using the config file found at `/home/airflow/composer_kube_config if + # no `config_file` parameter is specified. By default it will contain the + # credentials for Cloud Composer's Google Kubernetes Engine cluster that is + # created upon environment creation. + # [START composer_2_kubernetespodoperator_minconfig] + kubernetes_min_pod = KubernetesPodOperator( + # The ID specified for the task. + task_id="pod-ex-minimum", + # Name of task you want to run, used to generate Pod ID. + name="pod-ex-minimum", + # Entrypoint of the container, if not specified the Docker container's + # entrypoint is used. The cmds parameter is templated. + cmds=["/bin/sh", "-c", "sleep 300; date"], + # The namespace to run within Kubernetes. In Composer 2 environments + # after December 2022, the default namespace is + # `composer-user-workloads`. + namespace="composer-user-workloads", + # Docker image specified. Defaults to hub.docker.com, but any fully + # qualified URLs will point to a custom repository. Supports private + # gcr.io images if the Composer Environment is under the same + # project-id as the gcr.io images and the service account that Composer + # uses has permission to access the Google Container Registry + # (the default service account has permission) + image="gcr.io/gcp-runtimes/ubuntu_20_0_4", + # Specifies path to kubernetes config. If no config is specified will + # default to '~/.kube/config'. The config_file is templated. + config_file="/home/airflow/composer_kube_config", + # Identifier of connection that should be used + kubernetes_conn_id="kubernetes_default", + ) + # [END composer_2_kubernetespodoperator_minconfig] + # # [START composer_2_kubernetespodoperator_templateconfig] + # kubernetes_template_ex = KubernetesPodOperator( + # task_id="ex-kube-templates", + # name="ex-kube-templates", + # namespace="composer-user-workloads", + # image="bash", + # # All parameters below are able to be templated with jinja -- cmds, + # # arguments, env_vars, and config_file. For more information visit: + # # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html + # # Entrypoint of the container, if not specified the Docker container's + # # entrypoint is used. The cmds parameter is templated. + # cmds=["sleep 300"], + # # DS in jinja is the execution date as YYYY-MM-DD, this docker image + # # will echo the execution date. Arguments to the entrypoint. The docker + # # image's CMD is used if this is not provided. The arguments parameter + # # is templated. + # arguments=["{{ ds }}"], + # # The var template variable allows you to access variables defined in + # # Airflow UI. In this case we are getting the value of my_value and + # # setting the environment variable `MY_VALUE`. The pod will fail if + # # `my_value` is not set in the Airflow UI. + # env_vars={"MY_VALUE": "{{ var.value.my_value }}"}, + # # Sets the config file to a kubernetes config file specified in + # # airflow.cfg. If the configuration file does not exist or does + # # not provide validcredentials the pod will fail to launch. If not + # # specified, config_file defaults to ~/.kube/config + # config_file="{{ conf.get('core', 'kube_config') }}", + # # Identifier of connection that should be used + # kubernetes_conn_id="kubernetes_default", + # ) + # # [END composer_2_kubernetespodoperator_templateconfig] + # # [START composer_2_kubernetespodoperator_secretconfig] + # kubernetes_secret_vars_ex = KubernetesPodOperator( + # task_id="ex-kube-secrets", + # name="ex-kube-secrets", + # namespace="composer-user-workloads", + # image="gcr.io/gcp-runtimes/ubuntu_20_0_4", + # startup_timeout_seconds=300, + # # The secrets to pass to Pod, the Pod will fail to create if the + # # secrets you specify in a Secret object do not exist in Kubernetes. + # secrets=[secret_env, secret_volume], + # cmds=["echo"], + # # env_vars allows you to specify environment variables for your + # # container to use. env_vars is templated. + # env_vars={ + # "EXAMPLE_VAR": "/example/value", + # "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json", + # }, + # # Specifies path to kubernetes config. If no config is specified will + # # default to '~/.kube/config'. The config_file is templated. + # config_file="/home/airflow/composer_kube_config", + # # Identifier of connection that should be used + # kubernetes_conn_id="kubernetes_default", + # ) + # # [END composer_2_kubernetespodoperator_secretconfig] + # [START composer_2_kubernetespodoperator_fullconfig] + kubernetes_full_pod = KubernetesPodOperator( + task_id="ex-all-configs", + name="pi", + # namespace="composer-system", + image="perl:5.34.0", + # Entrypoint of the container, if not specified the Docker container's + # entrypoint is used. The cmds parameter is templated. + cmds=["perl"], + # Arguments to the entrypoint. The docker image's CMD is used if this + # is not provided. The arguments parameter is templated. + arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"], + # The secrets to pass to Pod, the Pod will fail to create if the + # secrets you specify in a Secret object do not exist in Kubernetes. + secrets=[], + # Labels to apply to the Pod. + labels={"pod-label": "label-name"}, + # Timeout to start up the Pod, default is 600. + startup_timeout_seconds=600, + # The environment variables to be initialized in the container + # env_vars are templated. + env_vars={"EXAMPLE_VAR": "/example/value"}, + # If true, logs stdout output of container. Defaults to True. + get_logs=True, + # Determines when to pull a fresh image, if 'IfNotPresent' will cause + # the Kubelet to skip pulling an image if it already exists. If you + # want to always pull a new image, set it to 'Always'. + image_pull_policy="Always", + # Annotations are non-identifying metadata you can attach to the Pod. + # Can be a large range of data, and can include characters that are not + # permitted by labels. + annotations={"key1": "value1"}, + # Optional resource specifications for Pod, this will allow you to + # set both cpu and memory limits and requirements. + # Prior to Airflow 2.3 and the cncf providers package 5.0.0 + # resources were passed as a dictionary. This change was made in + # https://github.com/apache/airflow/pull/27197 + # Additionally, "memory" and "cpu" were previously named + # "limit_memory" and "limit_cpu" + # resources={'limit_memory': "250M", 'limit_cpu': "100m"}, + container_resources=k8s_models.V1ResourceRequirements( + requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"}, + limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"}, + ), + # Specifies path to kubernetes config. If no config is specified will + # default to '~/.kube/config'. The config_file is templated. + config_file="/home/airflow/composer_kube_config", + # If true, the content of /airflow/xcom/return.json from container will + # also be pushed to an XCom when the container ends. + do_xcom_push=False, + # List of Volume objects to pass to the Pod. + volumes=[], + # List of VolumeMount objects to pass to the Pod. + volume_mounts=[], + # Identifier of connection that should be used + kubernetes_conn_id="kubernetes_default", + # Affinity determines which nodes the Pod can run on based on the + # config. For more information see: + # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/ + # Pod affinity with the KubernetesPodOperator + # is not supported with Composer 2 + # instead, create a cluster and use the GKEStartPodOperator + # https://cloud.google.com/composer/docs/using-gke-operator + affinity={}, + ) + kubernetes_pod_default = KubernetesPodOperator( + # The ID specified for the task. + task_id="pod-ex-default", + # Name of task you want to run, used to generate Pod ID. + name="pod-ex-default", + # Entrypoint of the container, if not specified the Docker container's + # entrypoint is used. The cmds parameter is templated. + cmds=["/bin/sh", "-c", "sleep 30; date"], + # The namespace to run within Kubernetes. In Composer 2 environments + # after December 2022, the default namespace is + # `composer-user-workloads`. + namespace="default", + # Docker image specified. Defaults to hub.docker.com, but any fully + # qualified URLs will point to a custom repository. Supports private + # gcr.io images if the Composer Environment is under the same + # project-id as the gcr.io images and the service account that Composer + # uses has permission to access the Google Container Registry + # (the default service account has permission) + image="gcr.io/gcp-runtimes/ubuntu_20_0_4", + # Specifies path to kubernetes config. If no config is specified will + # default to '~/.kube/config'. The config_file is templated. + config_file="/home/airflow/composer_kube_config", + # Identifier of connection that should be used + kubernetes_conn_id="kubernetes_default", + ) + # [END composer_2_kubernetespodoperator_fullconfig] + # [END composer_2_kubernetespodoperator] diff --git a/composer/workflows/kubernetes_pod_operator_system.py b/composer/workflows/kubernetes_pod_operator_system.py new file mode 100644 index 00000000000..179931945a9 --- /dev/null +++ b/composer/workflows/kubernetes_pod_operator_system.py @@ -0,0 +1,220 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START composer_2_kubernetespodoperator] +"""Example DAG using KubernetesPodOperator.""" +import datetime + +from airflow import models +from airflow.kubernetes.secret import Secret +from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( + KubernetesPodOperator, +) +from kubernetes.client import models as k8s_models + +# A Secret is an object that contains a small amount of sensitive data such as +# a password, a token, or a key. Such information might otherwise be put in a +# Pod specification or in an image; putting it in a Secret object allows for +# more control over how it is used, and reduces the risk of accidental +# exposure. +# [START composer_2_kubernetespodoperator_secretobject] +secret_env = Secret( + # Expose the secret as environment variable. + deploy_type="env", + # The name of the environment variable, since deploy_type is `env` rather + # than `volume`. + deploy_target="SQL_CONN", + # Name of the Kubernetes Secret + secret="airflow-secrets", + # Key of a secret stored in this Secret object + key="sql_alchemy_conn", +) +secret_volume = Secret( + deploy_type="volume", + # Path where we mount the secret as volume + deploy_target="/var/secrets/google", + # Name of Kubernetes Secret + secret="service-account", + # Key in the form of service account file name + key="service-account.json", +) +# [END composer_2_kubernetespodoperator_secretobject] +# If you are running Airflow in more than one time zone +# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html +# for best practices +YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1) + +# If a Pod fails to launch, or has an error occur in the container, Airflow +# will show the task as failed, as well as contain all of the task logs +# required to debug. +with models.DAG( + dag_id="composer_sample_kubernetes_pod", + schedule_interval=datetime.timedelta(days=1), + start_date=YESTERDAY, +) as dag: + # Only name, namespace, image, and task_id are required to create a + # KubernetesPodOperator. In Cloud Composer, currently the operator defaults + # to using the config file found at `/home/airflow/composer_kube_config if + # no `config_file` parameter is specified. By default it will contain the + # credentials for Cloud Composer's Google Kubernetes Engine cluster that is + # created upon environment creation. + # [START composer_2_kubernetespodoperator_minconfig] + kubernetes_min_pod = KubernetesPodOperator( + # The ID specified for the task. + task_id="pod-ex-minimum", + # Name of task you want to run, used to generate Pod ID. + name="pod-ex-minimum", + # Entrypoint of the container, if not specified the Docker container's + # entrypoint is used. The cmds parameter is templated. + cmds=["/bin/sh", "-c", "sleep 300"], + # The namespace to run within Kubernetes. In Composer 2 environments + # after December 2022, the default namespace is + # `composer-user-workloads`. + namespace="composer-user-workloads", + # Docker image specified. Defaults to hub.docker.com, but any fully + # qualified URLs will point to a custom repository. Supports private + # gcr.io images if the Composer Environment is under the same + # project-id as the gcr.io images and the service account that Composer + # uses has permission to access the Google Container Registry + # (the default service account has permission) + image="gcr.io/gcp-runtimes/ubuntu_20_0_4", + # Specifies path to kubernetes config. If no config is specified will + # default to '~/.kube/config'. The config_file is templated. + config_file="/home/airflow/composer_kube_config", + # Identifier of connection that should be used + kubernetes_conn_id="kubernetes_default", + ) + # [END composer_2_kubernetespodoperator_minconfig] + # # [START composer_2_kubernetespodoperator_templateconfig] + # kubernetes_template_ex = KubernetesPodOperator( + # task_id="ex-kube-templates", + # name="ex-kube-templates", + # namespace="composer-user-workloads", + # image="bash", + # # All parameters below are able to be templated with jinja -- cmds, + # # arguments, env_vars, and config_file. For more information visit: + # # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html + # # Entrypoint of the container, if not specified the Docker container's + # # entrypoint is used. The cmds parameter is templated. + # cmds=["sleep 300"], + # # DS in jinja is the execution date as YYYY-MM-DD, this docker image + # # will echo the execution date. Arguments to the entrypoint. The docker + # # image's CMD is used if this is not provided. The arguments parameter + # # is templated. + # arguments=["{{ ds }}"], + # # The var template variable allows you to access variables defined in + # # Airflow UI. In this case we are getting the value of my_value and + # # setting the environment variable `MY_VALUE`. The pod will fail if + # # `my_value` is not set in the Airflow UI. + # env_vars={"MY_VALUE": "{{ var.value.my_value }}"}, + # # Sets the config file to a kubernetes config file specified in + # # airflow.cfg. If the configuration file does not exist or does + # # not provide validcredentials the pod will fail to launch. If not + # # specified, config_file defaults to ~/.kube/config + # config_file="{{ conf.get('core', 'kube_config') }}", + # # Identifier of connection that should be used + # kubernetes_conn_id="kubernetes_default", + # ) + # # [END composer_2_kubernetespodoperator_templateconfig] + # # [START composer_2_kubernetespodoperator_secretconfig] + # kubernetes_secret_vars_ex = KubernetesPodOperator( + # task_id="ex-kube-secrets", + # name="ex-kube-secrets", + # namespace="composer-user-workloads", + # image="gcr.io/gcp-runtimes/ubuntu_20_0_4", + # startup_timeout_seconds=300, + # # The secrets to pass to Pod, the Pod will fail to create if the + # # secrets you specify in a Secret object do not exist in Kubernetes. + # secrets=[secret_env, secret_volume], + # cmds=["echo"], + # # env_vars allows you to specify environment variables for your + # # container to use. env_vars is templated. + # env_vars={ + # "EXAMPLE_VAR": "/example/value", + # "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json", + # }, + # # Specifies path to kubernetes config. If no config is specified will + # # default to '~/.kube/config'. The config_file is templated. + # config_file="/home/airflow/composer_kube_config", + # # Identifier of connection that should be used + # kubernetes_conn_id="kubernetes_default", + # ) + # # [END composer_2_kubernetespodoperator_secretconfig] + # [START composer_2_kubernetespodoperator_fullconfig] + kubernetes_full_pod = KubernetesPodOperator( + task_id="ex-all-configs", + name="pi", + # namespace="composer-system", + image="perl:5.34.0", + # Entrypoint of the container, if not specified the Docker container's + # entrypoint is used. The cmds parameter is templated. + cmds=["perl"], + # Arguments to the entrypoint. The docker image's CMD is used if this + # is not provided. The arguments parameter is templated. + arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"], + # The secrets to pass to Pod, the Pod will fail to create if the + # secrets you specify in a Secret object do not exist in Kubernetes. + secrets=[], + # Labels to apply to the Pod. + labels={"pod-label": "label-name"}, + # Timeout to start up the Pod, default is 600. + startup_timeout_seconds=600, + # The environment variables to be initialized in the container + # env_vars are templated. + env_vars={"EXAMPLE_VAR": "/example/value"}, + # If true, logs stdout output of container. Defaults to True. + get_logs=True, + # Determines when to pull a fresh image, if 'IfNotPresent' will cause + # the Kubelet to skip pulling an image if it already exists. If you + # want to always pull a new image, set it to 'Always'. + image_pull_policy="Always", + # Annotations are non-identifying metadata you can attach to the Pod. + # Can be a large range of data, and can include characters that are not + # permitted by labels. + annotations={"key1": "value1"}, + # Optional resource specifications for Pod, this will allow you to + # set both cpu and memory limits and requirements. + # Prior to Airflow 2.3 and the cncf providers package 5.0.0 + # resources were passed as a dictionary. This change was made in + # https://github.com/apache/airflow/pull/27197 + # Additionally, "memory" and "cpu" were previously named + # "limit_memory" and "limit_cpu" + # resources={'limit_memory': "250M", 'limit_cpu': "100m"}, + container_resources=k8s_models.V1ResourceRequirements( + requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"}, + limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"}, + ), + # Specifies path to kubernetes config. If no config is specified will + # default to '~/.kube/config'. The config_file is templated. + config_file="/home/airflow/composer_kube_config", + # If true, the content of /airflow/xcom/return.json from container will + # also be pushed to an XCom when the container ends. + do_xcom_push=False, + # List of Volume objects to pass to the Pod. + volumes=[], + # List of VolumeMount objects to pass to the Pod. + volume_mounts=[], + # Identifier of connection that should be used + kubernetes_conn_id="kubernetes_default", + # Affinity determines which nodes the Pod can run on based on the + # config. For more information see: + # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/ + # Pod affinity with the KubernetesPodOperator + # is not supported with Composer 2 + # instead, create a cluster and use the GKEStartPodOperator + # https://cloud.google.com/composer/docs/using-gke-operator + affinity={}, + ) + # [END composer_2_kubernetespodoperator_fullconfig] + # [END composer_2_kubernetespodoperator]