Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix trigger dagrun deferrable mode to work with TaskSDK #48596

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

gopidesupavan
Copy link
Member

@gopidesupavan gopidesupavan commented Mar 31, 2025

closes: #47949

Adding new endpoint to get the counts for dag by run-ids and states.

Why

Airflow 3 doesn't support the direct DB access from tasks, currently TriggerDagRunOperator deferrable mode uses DagStateTrigger which access db to get the count of dag run records based on run_id's, states, logical_dates.

What

Adding /dag_runs/dag_id/count point to fetch the count of dag run records based on run_id's, states, logical_dates while running in triggerer.

image
  • Add Tests

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.

@gopidesupavan
Copy link
Member Author

Tests needs to be added.

@@ -583,3 +584,26 @@ def context_get_outlet_events(context: Context) -> OutletEventAccessorsProtocol:
except KeyError:
outlet_events = context["outlet_events"] = OutletEventAccessors()
return outlet_events


def _get_dag_run_count_by_run_ids_and_states(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this is the correct place to keep this method. this method i am using inside trigger dagrun run function to get the counts.

@gopidesupavan gopidesupavan force-pushed the fix-trigger-dagrun-operator branch from 01c2a82 to d3ad3ce Compare April 1, 2025 00:20
@gopidesupavan gopidesupavan marked this pull request as ready for review April 1, 2025 00:24
@gopidesupavan gopidesupavan force-pushed the fix-trigger-dagrun-operator branch from 9b164e9 to b4acf5c Compare April 1, 2025 08:52
@gopidesupavan gopidesupavan force-pushed the fix-trigger-dagrun-operator branch from bad8ae4 to c0260d2 Compare April 1, 2025 18:00
@gopidesupavan
Copy link
Member Author

The test i added for dagrun in trigger its failing sometimes, it looks some flaky, i have updated test with xfile similar to test_trigger_can_access_variables_connections_and_xcoms. have tested it locally working fine.

image

@gopidesupavan gopidesupavan force-pushed the fix-trigger-dagrun-operator branch from f1bf9ae to 556ac60 Compare April 2, 2025 18:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support deferral mode for TriggerDagRunOperator with Task SDK
2 participants