Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove airflow.models reference in asset decorator #46927

Open
1 task done
uranusjr opened this issue Feb 20, 2025 · 1 comment
Open
1 task done

Remove airflow.models reference in asset decorator #46927

uranusjr opened this issue Feb 20, 2025 · 1 comment
Assignees
Labels
affected_version:3.0.0alpha For all 3.0.0 alpha releases area:datasets Issues related to the datasets feature kind:meta High-level information important to the community

Comments

@uranusjr
Copy link
Member

Body

def create_dag(self, *, dag_id: str) -> DAG:
from airflow.models.dag import DAG # TODO: Use the SDK DAG when it works.
return DAG(
dag_id=dag_id,
schedule=self.schedule,
is_paused_upon_creation=self.is_paused_upon_creation,
catchup=False,
dag_display_name=self.display_name or dag_id,
description=self.description,
params=self.params,
on_success_callback=self.on_success_callback,
on_failure_callback=self.on_failure_callback,
auto_register=True,
)

Currently I get an error if this is switched to use DAG from SDK

airflow/dag_processing/collection.py:337: in update_dag_parsing_results_in_db
    DAG.bulk_write_to_db(bundle_name, bundle_version, dags, session=session)
airflow/utils/session.py:98: in wrapper
    return func(*args, **kwargs)
airflow/models/dag.py:1879: in bulk_write_to_db
    dag_op.update_dags(orm_dags, session=session)
airflow/dag_processing/collection.py:430: in update_dags
    dm.default_view = dag.default_view or conf.get("webserver", "dag_default_view").lower()
AttributeError: 'DAG' object has no attribute 'default_view'

The default_view concept should go away at some point (since the old UI itself is out the way out), but I’m not sure why using the SDK DAG directly does not result in an error. Need to investigate and find a solution.

Committer

  • I acknowledge that I am a maintainer/committer of the Apache Airflow project.
@uranusjr uranusjr added the kind:meta High-level information important to the community label Feb 20, 2025
@uranusjr uranusjr self-assigned this Feb 20, 2025
@dosubot dosubot bot added the area:datasets Issues related to the datasets feature label Feb 20, 2025
@uranusjr
Copy link
Member Author

Turns out core tests cannot correctly load SDK DAGs yet. Let’s wait a bit…

def _bootstrap_dagbag():
from airflow.models.dag import DAG
from airflow.models.dagbag import DagBag
if AIRFLOW_V_3_0_PLUS:
from airflow.dag_processing.bundles.manager import DagBundlesManager
with create_session() as session:
if AIRFLOW_V_3_0_PLUS:
DagBundlesManager().sync_bundles_to_db(session=session)
session.commit()
dagbag = DagBag()
# Save DAGs in the ORM
if AIRFLOW_V_3_0_PLUS:
dagbag.sync_to_db(bundle_name="dags-folder", bundle_version=None, session=session)
else:
dagbag.sync_to_db(session=session)
# Deactivate the unknown ones
DAG.deactivate_unknown_dags(dagbag.dags.keys(), session=session)

DagBag needs to be able to correctly handle DAG from SDK.

@vikramkoka vikramkoka added the affected_version:3.0.0alpha For all 3.0.0 alpha releases label Feb 21, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:3.0.0alpha For all 3.0.0 alpha releases area:datasets Issues related to the datasets feature kind:meta High-level information important to the community
Projects
None yet
Development

No branches or pull requests

2 participants