Open
Description
Apache Airflow version
2.10.5
If "Other Airflow 2 version" selected, which one?
No response
What happened?
I created a demo procedure where the upstream dag updates the dataset continuously, while the downstream dag just sleep 30s to simulate some time-consume tasks.
however, the dataset update and downstream dag run mappings are wired. it looks like below and there would be dangling dataset updates if I stop the upstream dag.
the dangling dataset updates are actually processed by downstream dag but it showed in ui that it would never be processed.


What you think should happen instead?

How to reproduce
from datetime import datetime as datetime
import airflow
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.python import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.datasets import Dataset
dataset1 = Dataset('s3://folder1/dataset_2.txt')
with DAG(
dag_id="upstream_dag_A",
start_date=datetime(2023, 1, 1),
catchup=False,
schedule="@continuous",
max_active_runs=1,
) as dag:
start_task = BashOperator(
task_id="start_task",
bash_command="echo 'Start task'",
outlets=[dataset1],
)
with DAG(
"downstream-dataset-dag",
start_date=datetime(2023, 1, 1),
schedule=[dataset1],
catchup=False,
max_active_runs=1,
) as dag:
start_task = BashOperator(
task_id="start_task",
# bash_command="echo 'Start task'",
bash_command='sleep 30 && echo "Upstream message: $message"',
)
Operating System
ubuntu
Versions of Apache Airflow Providers
No response
Deployment
Other 3rd-party Helm chart
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct