Skip to content

Commit 48731b9

Browse files
authored
Add DAGs to inspect moved tables after airflow upgrade (GoogleCloudPlatform#11032)
1 parent 8c445f8 commit 48731b9

File tree

3 files changed

+117
-0
lines changed

3 files changed

+117
-0
lines changed
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
# Copyright 2023 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# [START composer_check_moved_tables]
16+
"""
17+
When upgrading Airflow to a newer version,
18+
it might happen that some data cannot be migrated,
19+
often because of constraint changes in the metadata base.
20+
This file contains 2 DAGs:
21+
22+
1. 'list_moved_tables_after_upgrade_dag'
23+
Prints the rows which failed to be migrated.
24+
2. 'rename_moved_tables_after_upgrade_dag'
25+
Renames the table which contains the failed migrations. This will remove the
26+
warning message from airflow.
27+
"""
28+
29+
import datetime
30+
import logging
31+
32+
from airflow import DAG
33+
from airflow.operators.python import PythonOperator
34+
from airflow.providers.postgres.hooks.postgres import PostgresHook
35+
from airflow.settings import AIRFLOW_MOVED_TABLE_PREFIX
36+
37+
38+
def get_moved_tables():
39+
hook = PostgresHook(postgres_conn_id="airflow_db")
40+
return hook.get_records(
41+
"SELECT schemaname, tablename FROM pg_catalog.pg_tables WHERE tablename"
42+
f" LIKE '{AIRFLOW_MOVED_TABLE_PREFIX}_%'"
43+
)
44+
45+
46+
def list_moved_records():
47+
tables = get_moved_tables()
48+
if not tables:
49+
logging.info("No moved tables found")
50+
return
51+
52+
hook = PostgresHook(postgres_conn_id="airflow_db")
53+
for schema, table in tables:
54+
df = hook.get_pandas_df(f"SELECT * FROM {schema}.{table}")
55+
logging.info(df.to_markdown())
56+
57+
58+
def rename_moved_tables():
59+
tables = get_moved_tables()
60+
if not tables:
61+
return
62+
63+
hook = PostgresHook(postgres_conn_id="airflow_db")
64+
for schema, table in tables:
65+
hook.run(f"ALTER TABLE {schema}.{table} RENAME TO _abandoned_{table}")
66+
67+
68+
with DAG(
69+
dag_id="list_moved_tables_after_upgrade_dag",
70+
start_date=datetime.datetime(2023, 1, 1),
71+
schedule_interval=None,
72+
catchup=False,
73+
):
74+
t1 = PythonOperator(
75+
task_id="list_moved_records", python_callable=list_moved_records
76+
)
77+
78+
with DAG(
79+
dag_id="rename_moved_tables_after_upgrade_dag",
80+
start_date=datetime.datetime(2023, 1, 1),
81+
schedule_interval=None,
82+
catchup=False,
83+
) as dag:
84+
t1 = PythonOperator(
85+
task_id="rename_moved_tables", python_callable=rename_moved_tables
86+
)
87+
88+
# [END composer_check_moved_tables]
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Copyright 2023 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
import internal_unit_testing
17+
18+
19+
def test_dag_import(airflow_database):
20+
"""Test that the DAG file can be successfully imported.
21+
22+
This tests that the DAG can be parsed, but does not run it in an Airflow
23+
environment. This is a recommended confidence check by the official Airflow
24+
docs: https://airflow.incubator.apache.org/tutorial.html#testing
25+
"""
26+
from . import airflow_moved_tables as module
27+
28+
internal_unit_testing.assert_has_valid_dag(module)

composer/workflows/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ apache-airflow-providers-apache-beam==5.1.1
66
apache-airflow-providers-cncf-kubernetes==7.1.0
77
apache-airflow-providers-google==10.2.0
88
apache-airflow-providers-microsoft-azure==6.1.2
9+
apache-airflow-providers-postgres==5.5.1
910
google-cloud-dataform==0.5.2 # used in Dataform operators
1011
scipy==1.10.0; python_version > '3.0'
1112

0 commit comments

Comments
 (0)