Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Xcom pull is failing when is key is None #46417

Closed
1 of 2 tasks
vatsrahul1001 opened this issue Feb 4, 2025 · 7 comments · May be fixed by #46929
Closed
1 of 2 tasks

Xcom pull is failing when is key is None #46417

vatsrahul1001 opened this issue Feb 4, 2025 · 7 comments · May be fixed by #46929
Assignees
Labels
affected_version:3.0.0alpha For all 3.0.0 alpha releases area:core kind:bug This is a clearly a bug

Comments

@vatsrahul1001
Copy link
Collaborator

Apache Airflow version

3.0.0a1

If "Other Airflow 2 version" selected, which one?

No response

What happened?

pulled_value_1 = ti.xcom_pull(key=None, task_ids="push") in failing in AF3, however, same code works with 2.10.4. Looks like when I provide the key it works fine. I am raising this issue as there is a deviation of behaviour from AF2 here. Maybe we can handle this in task-sdk

3.0.0a1

Image

2.10.4

Image

{"timestamp":"2025-02-04T11:01:20.886364","level":"error","event":"Task failed with exception","logger":"task","error_detail":[{"exc_type":"ValidationError","exc_value":"1 validation error for GetXCom\nkey\n Input should be a valid string [type=string_type, input_value=None, input_type=NoneType]\n For further information visit [https://errors.pydantic.dev/2.10/v/string_type","syntax_error":null,"is_cause":false,"frames":[](https://errors.pydantic.dev/2.10/v/string_type%22,%22syntax_error%22:null,%22is_cause%22:false,%22frames%22:[){"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":545,"name":"run"},{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":645,"name":"_execute_task"},{"filename":"/opt/airflow/airflow/models/baseoperator.py","lineno":173,"name":"wrapper"},{"filename":"/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py","lineno":196,"name":"execute"},{"filename":"/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py","lineno":222,"name":"execute_callable"},{"filename":"/opt/airflow/airflow/utils/operator_helpers.py","lineno":261,"name":"run"},{"filename":"/files/dags/example_xcom.py","lineno":34,"name":"puller"},{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":266,"name":"xcom_pull"},{"filename":"/usr/local/lib/python3.9/site-packages/pydantic/main.py","lineno":214,"name":"__init__"}]}]}

What you think should happen instead?

Same code working in AF2 should work in AF3 as well

How to reproduce

Use below DAG to replicate
DAG CODE

"""Example DAG demonstrating the usage of XComs."""
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime, timedelta

dag = DAG(
    "example_xcom",
    start_date=datetime(2023, 11, 28),
    default_args={"owner": "airflow"},
    schedule="@daily",
    catchup=False,
    tags=["core"],
)

value_1 = [1, 2, 3]
value_2 = {"a": "b"}


def push(**kwargs):
    """Pushes an XCom without a specific target"""
    kwargs["ti"].xcom_push(key="value from pusher 1", value=value_1)


def push_by_returning(**kwargs):
    """Pushes an XCom without a specific target, just by returning it"""
    return value_2


def puller(**kwargs):
    """Pull all previously pushed XComs and check if the pushed values match the pulled values."""
    ti = kwargs["ti"]

    # get value_1
    pulled_value_1 = ti.xcom_pull(key=None, task_ids="push")
    if pulled_value_1 != value_1:
        raise ValueError(f"The two values differ {pulled_value_1} and {value_1}")

    # get value_2
    pulled_value_2 = ti.xcom_pull(task_ids="push_by_returning")
    if pulled_value_2 != value_2:
        raise ValueError(f"The two values differ {pulled_value_2} and {value_2}")

    # get both value_1 and value_2
    pulled_value_1, pulled_value_2 = ti.xcom_pull(
        key=None, task_ids=["push", "push_by_returning"]
    )
    print(f"pulled_value_1 is {pulled_value_1}")
    print(f"pulled_value_2 is {pulled_value_2}")
    if pulled_value_1 != value_1:
        raise ValueError(f"The two values differ {pulled_value_1} and {value_1}")
    if pulled_value_2 != value_2:
        raise ValueError(f"The two values differ {pulled_value_2} and {value_2}")


push1 = PythonOperator(
    task_id="push",
    dag=dag,
    python_callable=push,
    depends_on_past=True,
)

push2 = PythonOperator(
    task_id="push_by_returning",
    dag=dag,
    python_callable=push_by_returning,
)

pull = PythonOperator(
    task_id="puller",
    dag=dag,
    python_callable=puller,
)

pull << [push1, push2]

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@vatsrahul1001 vatsrahul1001 added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Feb 4, 2025
@vatsrahul1001 vatsrahul1001 added affected_version:3.0.0alpha For all 3.0.0 alpha releases and removed needs-triage label for new issues that we didn't triage yet labels Feb 4, 2025
@amoghrajesh
Copy link
Contributor

Nice observation. Will pick it up soon

@ashb
Copy link
Member

ashb commented Feb 7, 2025

I'd almost argue that explicitly passing key=None is a bug in the user code. If you don't want to specify a key don't pass the argument?

(Because passing None as an argument is different to not passing the argument.)

@ashb ashb changed the title Xcom pull is failing when is key is not provided Xcom pull is failing when is key is None Feb 7, 2025
@amoghrajesh
Copy link
Contributor

Yeah I agree with you too @ashb.

But the qn is, to retain the behaviour similar to earlier, we have two choices now:

  1. Do we allow users to continue providing "None" as key
  2. Or do we suggest not passing a key as an argument if they aren't interested in retrieving with the key?
  1. would probably lead into a breaking change. We can probably handle that from server, so that users can pass "None" if they like?

@amoghrajesh
Copy link
Contributor

@vatsrahul1001 @ashb do you have any inputs on the above comment?

@vatsrahul1001
Copy link
Collaborator Author

I agree with @ashb on this; the chances of someone passing None as a key are very rare. However, I’ve raised a bug to highlight the deviation from AF3. I’m also okay with the current approach.

@uranusjr
Copy link
Member

I think we should disallow both None and an empty string, they are not useful in practice (you just get an undeterministic XCom associated to the task).

We should also add a fix to 2.x for this, likely in get_one.

@amoghrajesh
Copy link
Contributor

Closing this one in favour of Disallow using an empty key in XCom pull

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:3.0.0alpha For all 3.0.0 alpha releases area:core kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants