diff --git a/composer/cicd_sample/dags/example2_dag.py b/composer/cicd_sample/dags/example2_dag.py index c050447e330..81996663d8a 100644 --- a/composer/cicd_sample/dags/example2_dag.py +++ b/composer/cicd_sample/dags/example2_dag.py @@ -44,3 +44,4 @@ print_dag_run_conf = bash.BashOperator( task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}" ) + diff --git a/composer/cicd_sample/utils/add_dags_to_composer.py b/composer/cicd_sample/utils/add_dags_to_composer.py index 82023e4df38..5b3ba92ad9a 100644 --- a/composer/cicd_sample/utils/add_dags_to_composer.py +++ b/composer/cicd_sample/utils/add_dags_to_composer.py @@ -40,7 +40,7 @@ def _create_dags_list(dags_directory: str) -> Tuple[str, List[str]]: def upload_dags_to_composer( - dags_directory: str, bucket_name: str, name_replacement: str = "dags/" + dags_directory: str, bucket_name: str, name_replacement: str = "dags/" ) -> None: """ Given a directory, this function moves all DAG files from that directory @@ -97,4 +97,4 @@ def upload_dags_to_composer( args = parser.parse_args() upload_dags_to_composer(args.dags_directory, args.dags_bucket) -# [END composer_cicd_add_dags_to_composer_utility] +# [END composer_cicd_add_dags_to_composer_utility] \ No newline at end of file diff --git a/composer/cicd_sample/utils/deploy_dags_from_diff.py b/composer/cicd_sample/utils/deploy_dags_from_diff.py new file mode 100644 index 00000000000..e0b47a94a33 --- /dev/null +++ b/composer/cicd_sample/utils/deploy_dags_from_diff.py @@ -0,0 +1,98 @@ +# Copyright 2023 Google LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# https://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START composer_cicd_deploy_dags_from_diff_utility] + +import argparse +import glob +import os +import git +from pathlib import Path +from shutil import copytree, ignore_patterns +import tempfile +from typing import List, Tuple + +# Imports the Google Cloud client library +from google.cloud import storage + + +def create_dags_list_from_git_diff(dag_dir: str, repo_root: str, main_branch_name: str) -> List[Path]: + """ + get the list of files within the DAG dir that have changed in the latest git commits against the specified branch + :param dag_dir: + :param repo_root: + :param main_branch_name: + :return: + """ + repo = git.Repo(repo_root) + diff_results = repo.git.diff(main_branch_name) + # print(diff_results) + p = Path(repo_root) + changed_file_list = list() + for diff_result_line in diff_results.split("\n"): + # test if changed + if "+++ b" in diff_result_line and dag_dir in diff_result_line: + changed_path = p / Path(diff_result_line[6:]) + print(changed_path) + changed_file_list.append(changed_path) + # print(changed_path.resolve()) + return changed_file_list + + +def upload_changed_dags_to_composer(dag_list: List[Path], bucket_name: str) -> None: + """ + list of DAGs to upload to Commposer + :param dag_list: + :return: + """ + storage_client = storage.Client() + bucket = storage_client.bucket(bucket_name) + for dag_path in dag_list: + blob = bucket.blob(dag_path.name) + blob.upload_from_filename(str(dag_path)) + print(f"File {dag_path.name} uploaded to {bucket_name}/{dag_path.name}.") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter + ) + parser.add_argument( + "--dags_directory", + help="Relative path to the source directory containing your DAGs", + ) + parser.add_argument( + "--dags_bucket", + help="Name of the DAGs bucket of your Composer environment without the gs:// prefix", + ) + parser.add_argument( + "--dag_repo", + required=False, + help="Relative path to the root of the git repo containing the DAGs", + ) + parser.add_argument( + "--repo_main", + required=False, + help="Main branch of the DAG git repo to compare changes against", + ) + + args = parser.parse_args() + + dag_list = create_dags_list_from_git_diff(args.dags_directory, args.repo_root, args.repo_main) + + if len(dag_list)==0: + print("No DAGs to upload") + else: + upload_changed_dags_to_composer(dag_list=dag_list, bucket_name=args.dags_bucket) +# [END composer_cicd_deploy_dags_from_diff_utility] \ No newline at end of file diff --git a/composer/cicd_sample/utils/deploy_dags_from_diff_test.py b/composer/cicd_sample/utils/deploy_dags_from_diff_test.py new file mode 100644 index 00000000000..1bd6b81006d --- /dev/null +++ b/composer/cicd_sample/utils/deploy_dags_from_diff_test.py @@ -0,0 +1,70 @@ +# Copyright 2023 Google LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# https://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import os +import pathlib +from shutil import copytree +import tempfile +import uuid +import git + +from google.cloud import storage +import pytest + +import deploy_dags_from_diff # noqa: I100 - lint is incorrectly saying this is out of order + +# DAGS_DIR = "cicd_sample/dags" # this needs to be relative to the REPO roo +DAGS_DIR = pathlib.Path(__file__).parent.parent / "dags/" +REPO_ROOT = pathlib.Path(__file__).parent.parent.parent.parent +REPO_MAIN = "main" +TEST_DAG_FILENAME = "example2_dag.py" + + +def test_create_dags_list_with_changes() -> None: + """ + this test checks for + :return: + """ + full_dag_dir_str = "cicd_sample/dags" # str(DAGS_DIR) # .resolve()) + full_repo_root_str = str(REPO_ROOT.resolve()) + print(full_dag_dir_str, full_repo_root_str) + repo = git.Repo(REPO_ROOT) + repo.create_head("test-branch") + repo.git.checkout("test-branch") + with open(f"{DAGS_DIR.resolve()}/{TEST_DAG_FILENAME}", 'a') as f: + # create minor change + f.write("# appended line\n") + + repo.git.commit("-am", "appended line to file") + dag_list = deploy_dags_from_diff.create_dags_list_from_git_diff( + full_dag_dir_str, full_repo_root_str, REPO_MAIN) + print(dag_list) + assert len(dag_list) > 0 + assert "example2_dag.py" in dag_list[0].name + # cleanup - uncomment once test dev is done + # repo.git.checkout(REPO_MAIN) + + +def test_create_dags_list_no_changes() -> None: + """ + this test checks for an empty list based on no changes in git + :return: + """ + full_dag_dir_str = "cicd_sample/dags" # str(DAGS_DIR) # .resolve()) + full_repo_root_str = str(REPO_ROOT.resolve()) + dag_list = deploy_dags_from_diff.create_dags_list_from_git_diff( + full_dag_dir_str, full_repo_root_str, REPO_MAIN) + assert len(dag_list) == 0 +