Skip to content

Commit

Permalink
improve(xcom): Validate XCom key for set_xcom and get_xcom endpoints …
Browse files Browse the repository at this point in the history
…using pydantic
  • Loading branch information
kiran2706 committed Feb 20, 2025
1 parent 571f268 commit 3ce170d
Showing 1 changed file with 3 additions and 26 deletions.
29 changes: 3 additions & 26 deletions airflow/api_fastapi/execution_api/routes/xcoms.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from typing import Annotated

from fastapi import Body, Depends, HTTPException, Query, Response, status
from pydantic import JsonValue
from pydantic import JsonValue, StringConstraints
from sqlalchemy.sql.selectable import Select

from airflow.api_fastapi.common.db.common import SessionDep
Expand Down Expand Up @@ -119,23 +119,11 @@ def get_xcom(
dag_id: str,
run_id: str,
task_id: str,
key: str,
key: Annotated[str, StringConstraints(min_length=1)],
xcom_query: Annotated[Select, Depends(xcom_query)],
map_index: Annotated[int, Query()] = -1,
) -> XComResponse:
"""Get an Airflow XCom from database - not other XCom Backends."""

# Validate that the provided key is not empty
# An empty key is not a valid XCom identifier and would lead to unintended queries
if not key:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"reason": "invalid_key",
"message": "XCom key must be a non-empty string.",
},
)

# The xcom_query allows no map_index to be passed. This endpoint should always return just a single item,
# so we override that query value
xcom_query = xcom_query.filter(BaseXCom.map_index == map_index)
Expand Down Expand Up @@ -170,7 +158,7 @@ def set_xcom(
dag_id: str,
run_id: str,
task_id: str,
key: str,
key: Annotated[str, StringConstraints(min_length=1)],
value: Annotated[
JsonValue,
Body(
Expand Down Expand Up @@ -201,17 +189,6 @@ def set_xcom(
"""Set an Airflow XCom."""
from airflow.configuration import conf

# Validate that the provided key is not empty
# XCom keys must be non-empty strings to ensure proper data retrieval and avoid ambiguity.
if not key:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail={
"reason": "invalid_key",
"message": "XCom key must be a non-empty string.",
},
)

if not has_xcom_access(dag_id, run_id, task_id, key, token, write=True):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
Expand Down

0 comments on commit 3ce170d

Please sign in to comment.