Skip to content

data-aware scheduling makes wrong dataset updates and downstream dag runs mapping when there are multiple updates during the execution of downstream dag #49805

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
1 of 2 tasks
zeddit opened this issue Apr 26, 2025 · 1 comment
Labels
area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet priority:medium Bug that should be fixed before next release but would not block a release

Comments

@zeddit
Copy link

zeddit commented Apr 26, 2025

Apache Airflow version

2.10.5

If "Other Airflow 2 version" selected, which one?

No response

What happened?

I created a demo procedure where the upstream dag updates the dataset continuously, while the downstream dag just sleep 30s to simulate some time-consume tasks.

however, the dataset update and downstream dag run mappings are wired. it looks like below and there would be dangling dataset updates if I stop the upstream dag.

the dangling dataset updates are actually processed by downstream dag but it showed in ui that it would never be processed.

Image Image

Image

What you think should happen instead?

Image

How to reproduce

from datetime import datetime as datetime
import airflow

from airflow import DAG

from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

from airflow.operators.python import PythonOperator
from airflow.operators.bash_operator import BashOperator

from airflow.datasets import Dataset

dataset1 = Dataset('s3://folder1/dataset_2.txt')

with DAG(
  dag_id="upstream_dag_A",
  start_date=datetime(2023, 1, 1),
  catchup=False,
  schedule="@continuous",
  max_active_runs=1,
) as dag:
  start_task = BashOperator(
    task_id="start_task",
    bash_command="echo 'Start task'",
    outlets=[dataset1],
  )

with DAG(
  "downstream-dataset-dag",
  start_date=datetime(2023, 1, 1),
  schedule=[dataset1],
  catchup=False,
  max_active_runs=1,
) as dag:
  start_task = BashOperator(
    task_id="start_task",
    # bash_command="echo 'Start task'",
    bash_command='sleep 30 && echo "Upstream message: $message"',
  )

Operating System

ubuntu

Versions of Apache Airflow Providers

No response

Deployment

Other 3rd-party Helm chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@zeddit zeddit added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Apr 26, 2025
Copy link

boring-cyborg bot commented Apr 26, 2025

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@dosubot dosubot bot added the priority:medium Bug that should be fixed before next release but would not block a release label Apr 26, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet priority:medium Bug that should be fixed before next release but would not block a release
Projects
None yet
Development

No branches or pull requests

1 participant