VYPR
Unrated severityNVD Advisory· Published Jun 1, 2026

CVE-2026-41084

CVE-2026-41084

Description

Apache Airflow bulk Task Instances API authorization bypass allows an authenticated user to mutate task state in unauthorized DAGs.

AI Insight

LLM-synthesized narrative grounded in this CVE's description and references.

Apache Airflow bulk Task Instances API authorization bypass allows an authenticated user to mutate task state in unauthorized DAGs.

Vulnerability

A bug in Apache Airflow's bulk Task Instances API (PATCH/DELETE /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances) evaluates authorization against the dag_id resolved from the URL path while operating on the dag_id / dag_run_id extracted from request-body entity fields. This mismatch allows an authenticated UI/API user with edit permission on one DAG to mutate Task Instance state in any other DAG by keeping the authorized DAG's ID in the URL path and naming the target DAG's IDs in the request body entities. The issue affects deployments that rely on per-DAG edit-scope to keep Task Instance state isolated between teams. Versions prior to apache-airflow 3.2.2 are affected. [1]

Exploitation

An attacker must be an authenticated user with edit permission on at least one DAG. The attacker crafts a PATCH or DELETE request to the bulk Task Instances API, setting the URL path dag_id to a DAG they have edit permission on, while providing the target DAG's dag_id and dag_run_id in the request body entities. No additional network position or user interaction beyond standard API access is required. [1]

Impact

Successful exploitation allows the attacker to modify or delete Task Instance state (e.g., mark tasks as success or failure, clear task instances) in any DAG, regardless of their actual authorization scope. This bypasses per-DAG RBAC isolation, potentially leading to incorrect workflow execution, data integrity issues, or denial of service for other teams' DAGs. The attacker gains the privilege level of an editor on the target DAG without appropriate authorization. [1]

Mitigation

Upgrade to apache-airflow version 3.2.2 or later, which includes the fix merged in pull request #64288 [1]. The fix addresses the RBAC check to use the DAG ID from the request body rather than the URL path. No workaround is available for unpatched versions; users must upgrade to restore proper authorization isolation. [1]

AI Insight generated on Jun 1, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.

Affected products

1

Patches

2
a1c45b950c9a

Fix bulk task instance rbac bypass (#64288)

https://github.com/apache/airflowGPKApr 4, 2026via nvd-ref
3 files changed · +232 9
  • airflow-core/src/airflow/api_fastapi/core_api/services/public/task_instances.py+26 2 modified
    @@ -18,6 +18,7 @@
     from __future__ import annotations
     
     from collections.abc import Sequence
    +from typing import Literal
     
     import structlog
     from fastapi import HTTPException, Query, status
    @@ -27,6 +28,8 @@
     from sqlalchemy.orm import joinedload
     from sqlalchemy.orm.session import Session
     
    +from airflow.api_fastapi.app import get_auth_manager
    +from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity, DagDetails
     from airflow.api_fastapi.common.dagbag import DagBagDep, get_latest_version_of_dag
     from airflow.api_fastapi.common.db.common import SessionDep
     from airflow.api_fastapi.core_api.datamodels.common import (
    @@ -45,6 +48,7 @@
     from airflow.api_fastapi.core_api.security import GetUserDep
     from airflow.api_fastapi.core_api.services.public.common import BulkService
     from airflow.listeners.listener import get_listener_manager
    +from airflow.models.dag import DagModel
     from airflow.models.taskinstance import TaskInstance as TI
     from airflow.serialization.definitions.dag import SerializedDAG
     from airflow.utils.state import TaskInstanceState
    @@ -201,6 +205,8 @@ def _categorize_entities(
             self,
             entities: Sequence[str | BulkTaskInstanceBody],
             results: BulkActionResponse,
    +        method: Literal["PUT", "DELETE"],
    +        action_name: str,
         ) -> tuple[set[tuple[str, str, str, int]], set[tuple[str, str, str]]]:
             """
             Validate entities and categorize them into specific and all map index update sets.
    @@ -211,6 +217,7 @@ def _categorize_entities(
             """
             specific_map_index_task_keys = set()
             all_map_index_task_keys = set()
    +        dag_authorization_cache: dict[str, bool] = {}
     
             for entity in entities:
                 dag_id, dag_run_id, task_id, map_index = self._extract_task_identifiers(entity)
    @@ -229,6 +236,23 @@ def _categorize_entities(
                     )
                     continue
     
    +            if dag_id not in dag_authorization_cache:
    +                team_name = DagModel.get_team_name(dag_id, session=self.session)
    +                dag_authorization_cache[dag_id] = get_auth_manager().is_authorized_dag(
    +                    method=method,
    +                    access_entity=DagAccessEntity.TASK_INSTANCE,
    +                    details=DagDetails(id=dag_id, team_name=team_name),
    +                    user=self.user,
    +                )
    +            if not dag_authorization_cache[dag_id]:
    +                results.errors.append(
    +                    {
    +                        "error": f"User is not authorized to {action_name} task instances for DAG '{dag_id}'",
    +                        "status_code": status.HTTP_403_FORBIDDEN,
    +                    }
    +                )
    +                continue
    +
                 # Separate logic for "update all" vs "update specific"
                 if map_index is not None:
                     specific_map_index_task_keys.add((dag_id, dag_run_id, task_id, map_index))
    @@ -318,7 +342,7 @@ def handle_bulk_update(
             """Bulk Update Task Instances."""
             # Validate and categorize entities into specific and all map index update sets
             update_specific_map_index_task_keys, update_all_map_index_task_keys = self._categorize_entities(
    -            action.entities, results
    +            action.entities, results, method="PUT", action_name=action.action
             )
     
             try:
    @@ -420,7 +444,7 @@ def handle_bulk_delete(
             """Bulk delete task instances."""
             # Validate and categorize entities into specific and all map index delete sets
             delete_specific_map_index_task_keys, delete_all_map_index_task_keys = self._categorize_entities(
    -            action.entities, results
    +            action.entities, results, method="DELETE", action_name=action.action
             )
     
             try:
    
  • airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py+178 3 modified
    @@ -26,18 +26,22 @@
     
     import pendulum
     import pytest
    +from fastapi.testclient import TestClient
     from sqlalchemy import delete, func, select, update
     
     from airflow._shared.timezones.timezone import datetime
    +from airflow.api_fastapi.auth.managers.simple.user import SimpleAuthManagerUser
     from airflow.dag_processing.bundles.manager import DagBundlesManager
     from airflow.dag_processing.dagbag import DagBag, sync_bag_to_db
     from airflow.jobs.job import Job
     from airflow.jobs.triggerer_job_runner import TriggererJobRunner
    -from airflow.models import DagRun, Log, TaskInstance
    +from airflow.models import DagModel, DagRun, Log, TaskInstance
     from airflow.models.dag_version import DagVersion
    +from airflow.models.dagbundle import DagBundleModel
     from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF
     from airflow.models.taskinstancehistory import TaskInstanceHistory
     from airflow.models.taskmap import TaskMap
    +from airflow.models.team import Team
     from airflow.models.trigger import Trigger
     from airflow.providers.standard.operators.empty import EmptyOperator
     from airflow.sdk import BaseOperator, TaskGroup
    @@ -50,6 +54,7 @@
     from tests_common.test_utils.config import conf_vars
     from tests_common.test_utils.db import (
         clear_db_runs,
    +    clear_db_teams,
         clear_rendered_ti_fields,
     )
     from tests_common.test_utils.logs import check_last_log
    @@ -5502,6 +5507,14 @@ class TestBulkTaskInstances(TestTaskInstanceEndpoint):
         BASH_TASK_ID = "also_run_this"
         WILDCARD_ENDPOINT = "/dags/~/dagRuns/~/taskInstances"
     
    +    @pytest.fixture(autouse=True)
    +    def clean_db(self, session):
    +        clear_db_runs()
    +        clear_db_teams()
    +        yield
    +        clear_db_teams()
    +        clear_db_runs()
    +
         @pytest.mark.parametrize(
             ("default_ti", "actions", "expected_results", "endpoint_url", "setup_dags"),
             [
    @@ -6069,10 +6082,24 @@ def test_bulk_task_instances(
         ):
             # Setup task instances
             if setup_dags:
    -            for dag_id in setup_dags:
    +            if setup_dags == [self.BASH_DAG_ID, self.DAG_ID]:
    +                self.create_task_instances(
    +                    session,
    +                    task_instances=[{"task_id": self.BASH_TASK_ID, "state": default_ti[0]["state"]}],
    +                    dag_id=self.BASH_DAG_ID,
    +                    update_extras=True,
    +                )
                     self.create_task_instances(
    -                    session, task_instances=default_ti, dag_id=dag_id, update_extras=True
    +                    session,
    +                    task_instances=[{"task_id": self.TASK_ID, "state": default_ti[1]["state"]}],
    +                    dag_id=self.DAG_ID,
    +                    update_extras=True,
                     )
    +            else:
    +                for dag_id in setup_dags:
    +                    self.create_task_instances(
    +                        session, task_instances=default_ti, dag_id=dag_id, update_extras=True
    +                    )
             else:
                 self.create_task_instances(session, task_instances=default_ti)
     
    @@ -6141,6 +6168,154 @@ def test_bulk_update_mapped_task_instance_state_is_persisted(
                         f"Expected map_index={mi} to remain running, got {ti.state!r}"
                     )
     
    +    def test_bulk_task_instances_rejects_unauthorized_dag_ids_from_request_body(self, test_client, session):
    +        restricted_bundle_name = "restricted-bundle-update"
    +        restricted_team_name = "restricted-team-update"
    +        self.create_task_instances(
    +            session,
    +            task_instances=[{"task_id": self.BASH_TASK_ID, "state": State.RUNNING}],
    +            dag_id=self.BASH_DAG_ID,
    +            update_extras=True,
    +        )
    +        self.create_task_instances(
    +            session,
    +            task_instances=[{"task_id": self.TASK_ID, "state": State.RUNNING}],
    +            dag_id=self.DAG_ID,
    +            update_extras=True,
    +        )
    +        restricted_bundle = DagBundleModel(name=restricted_bundle_name)
    +        restricted_team = Team(name=restricted_team_name)
    +        restricted_bundle.teams.append(restricted_team)
    +        session.add_all([restricted_bundle, restricted_team])
    +        session.flush()
    +        session.execute(
    +            update(DagModel)
    +            .where(DagModel.dag_id == self.BASH_DAG_ID)
    +            .values(bundle_name=restricted_bundle_name)
    +        )
    +        session.commit()
    +
    +        auth_manager = test_client.app.state.auth_manager
    +        token = auth_manager._get_token_signer().generate(
    +            auth_manager.serialize_user(
    +                SimpleAuthManagerUser(username="limited-user", role="user", teams=[]),
    +            )
    +        )
    +        with (
    +            mock.patch("airflow.models.revoked_token.RevokedToken.is_revoked", return_value=False),
    +            TestClient(
    +                test_client.app,
    +                headers={"Authorization": f"Bearer {token}"},
    +                base_url=str(test_client.base_url),
    +            ) as limited_test_client,
    +        ):
    +            response = limited_test_client.patch(
    +                self.WILDCARD_ENDPOINT,
    +                json={
    +                    "actions": [
    +                        {
    +                            "action": "update",
    +                            "entities": [
    +                                {
    +                                    "dag_id": self.BASH_DAG_ID,
    +                                    "dag_run_id": self.RUN_ID,
    +                                    "task_id": self.BASH_TASK_ID,
    +                                    "new_state": "success",
    +                                },
    +                                {
    +                                    "dag_id": self.DAG_ID,
    +                                    "dag_run_id": self.RUN_ID,
    +                                    "task_id": self.TASK_ID,
    +                                    "new_state": "success",
    +                                },
    +                            ],
    +                        }
    +                    ]
    +                },
    +            )
    +
    +        assert response.status_code == 200
    +        assert response.json()["update"]["success"] == [f"{self.DAG_ID}.{self.RUN_ID}.{self.TASK_ID}[-1]"]
    +        assert response.json()["update"]["errors"] == [
    +            {
    +                "error": f"User is not authorized to update task instances for DAG '{self.BASH_DAG_ID}'",
    +                "status_code": 403,
    +            }
    +        ]
    +
    +    def test_bulk_delete_rejects_unauthorized_dag_ids_from_request_body(self, test_client, session):
    +        restricted_bundle_name = "restricted-bundle-delete"
    +        restricted_team_name = "restricted-team-delete"
    +        self.create_task_instances(
    +            session,
    +            task_instances=[{"task_id": self.BASH_TASK_ID, "state": State.SUCCESS}],
    +            dag_id=self.BASH_DAG_ID,
    +            update_extras=True,
    +        )
    +        self.create_task_instances(
    +            session,
    +            task_instances=[{"task_id": self.TASK_ID, "state": State.SUCCESS}],
    +            dag_id=self.DAG_ID,
    +            update_extras=True,
    +        )
    +        restricted_bundle = DagBundleModel(name=restricted_bundle_name)
    +        restricted_team = Team(name=restricted_team_name)
    +        restricted_bundle.teams.append(restricted_team)
    +        session.add_all([restricted_bundle, restricted_team])
    +        session.flush()
    +        session.execute(
    +            update(DagModel)
    +            .where(DagModel.dag_id == self.BASH_DAG_ID)
    +            .values(bundle_name=restricted_bundle_name)
    +        )
    +        session.commit()
    +
    +        auth_manager = test_client.app.state.auth_manager
    +        token = auth_manager._get_token_signer().generate(
    +            auth_manager.serialize_user(
    +                SimpleAuthManagerUser(username="limited-user", role="user", teams=[]),
    +            )
    +        )
    +        with (
    +            mock.patch("airflow.models.revoked_token.RevokedToken.is_revoked", return_value=False),
    +            TestClient(
    +                test_client.app,
    +                headers={"Authorization": f"Bearer {token}"},
    +                base_url=str(test_client.base_url),
    +            ) as limited_test_client,
    +        ):
    +            response = limited_test_client.patch(
    +                self.WILDCARD_ENDPOINT,
    +                json={
    +                    "actions": [
    +                        {
    +                            "action": "delete",
    +                            "entities": [
    +                                {
    +                                    "dag_id": self.BASH_DAG_ID,
    +                                    "dag_run_id": self.RUN_ID,
    +                                    "task_id": self.BASH_TASK_ID,
    +                                },
    +                                {
    +                                    "dag_id": self.DAG_ID,
    +                                    "dag_run_id": self.RUN_ID,
    +                                    "task_id": self.TASK_ID,
    +                                },
    +                            ],
    +                        }
    +                    ]
    +                },
    +            )
    +
    +        assert response.status_code == 200
    +        assert response.json()["delete"]["success"] == [f"{self.DAG_ID}.{self.RUN_ID}.{self.TASK_ID}[-1]"]
    +        assert response.json()["delete"]["errors"] == [
    +            {
    +                "error": f"User is not authorized to delete task instances for DAG '{self.BASH_DAG_ID}'",
    +                "status_code": 403,
    +            }
    +        ]
    +
         def test_should_respond_401(self, unauthenticated_test_client):
             response = unauthenticated_test_client.patch(self.ENDPOINT_URL, json={})
             assert response.status_code == 401
    
  • airflow-core/tests/unit/api_fastapi/core_api/services/public/test_task_instances.py+28 4 modified
    @@ -17,11 +17,15 @@
     
     from __future__ import annotations
     
    +from unittest import mock
    +
     import pytest
     
    +from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager
     from airflow.api_fastapi.core_api.datamodels.common import BulkActionResponse, BulkBody
     from airflow.api_fastapi.core_api.datamodels.task_instances import BulkTaskInstanceBody
     from airflow.api_fastapi.core_api.services.public.task_instances import BulkTaskInstanceService
    +from airflow.models import DagModel
     from airflow.providers.standard.operators.bash import BashOperator
     
     from tests_common.test_utils.db import (
    @@ -53,6 +57,10 @@ def teardown_method(self):
             self.clear_db()
     
         class MockUser:
    +        username = "test_user"
    +        role = "admin"
    +        teams = ["team1"]
    +
             def get_id(self) -> str:
                 return "test_user"
     
    @@ -184,6 +192,10 @@ def teardown_method(self):
             self.clear_db()
     
         class MockUser:
    +        username = "test_user"
    +        role = "admin"
    +        teams = ["team1"]
    +
             def get_id(self) -> str:
                 return "test_user"
     
    @@ -260,6 +272,10 @@ def teardown_method(self):
             self.clear_db()
     
         class MockUser:
    +        username = "test_user"
    +        role = "admin"
    +        teams = ["team1"]
    +
             def get_id(self) -> str:
                 return "test_user"
     
    @@ -380,7 +396,6 @@ def test_categorize_entities(
             expected_error_count,
         ):
             """Test _categorize_entities with different entity configurations and wildcard validation."""
    -
             user = self.MockUser()
             bulk_request = BulkBody(actions=[])
             service = BulkTaskInstanceService(
    @@ -393,9 +408,18 @@ def test_categorize_entities(
             )
     
             results = BulkActionResponse()
    -        specific_map_index_task_keys, all_map_index_task_keys = service._categorize_entities(
    -            entities, results
    -        )
    +        with (
    +            mock.patch.object(DagModel, "get_team_name", return_value="team1"),
    +            mock.patch(
    +                "airflow.api_fastapi.core_api.services.public.task_instances.get_auth_manager"
    +            ) as mock_get_auth_manager,
    +        ):
    +            auth_manager = mock.create_autospec(BaseAuthManager, instance=True, spec_set=True)
    +            auth_manager.is_authorized_dag.return_value = True
    +            mock_get_auth_manager.return_value = auth_manager
    +            specific_map_index_task_keys, all_map_index_task_keys = service._categorize_entities(
    +                entities, results, method="PUT", action_name="update"
    +            )
     
             assert specific_map_index_task_keys == expected_specific_keys
             assert all_map_index_task_keys == expected_all_keys
    
cde4885818be

Updating release notes for 3.2.2rc3

https://github.com/apache/airflowvatsrahul1001May 26, 2026Fixed in 3.2.2via release-tag
2 files changed · +5 4
  • RELEASE_NOTES.rst+3 2 modified
    @@ -24,7 +24,7 @@
     
     .. towncrier release notes start
     
    -Airflow 3.2.2 (2026-05-27)
    +Airflow 3.2.2 (2026-05-29)
     --------------------------
     
     Significant Changes
    @@ -81,7 +81,8 @@ Significant Changes
     
     Bug Fixes
     ^^^^^^^^^
    -
    +- Fix ``Callback.handle_event`` triggerer crash when OpenTelemetry metrics receive dict typed tag values (#67527) (#67529)
    +- UI: Rewrite ``modulepreload hrefs`` to the api-server static path (#67548) (#67556)
     - Correctly pre-allocate ``external_executor_id`` with multiple executors on PostgreSQL (#67388) (#67458)
     - Return raw import-error stacktrace when a Dag file has no registered Dag (#67465) (#67478)
     - UI: Fix Expand/Collapse All on XComs and Audit Log JSON cells (#67316) (#67361)
    
  • reproducible_build.yaml+2 2 modified
    @@ -1,2 +1,2 @@
    -release-notes-hash: 6407b48d1054fe3ce68c09bf4435d91d
    -source-date-epoch: 1779745327
    +release-notes-hash: 504288db9a9dc13a0db859232fab98d0
    +source-date-epoch: 1779811737
    

Vulnerability mechanics

Root cause

"Authorization is evaluated against the dag_id from the URL path while the operation acts on dag_id values from the request body, allowing a user to operate on unauthorized Dags."

Attack vector

An authenticated UI/API user who has edit permission on one Dag can mutate Task Instance state in any other Dag. The attacker sends a request to the bulk Task Instances endpoint using their authorized Dag's ID in the URL path, while placing the target Dag's ID in the request-body entity fields. The server authorizes the request based on the URL-path Dag ID but then applies the operation to the Dag IDs from the body, bypassing per-Dag authorization checks [CWE-639].

Affected code

The vulnerability resides in the `_categorize_entities` method of `airflow/api_fastapi/core_api/services/public/task_instances.py`. The bulk Task Instances API endpoints (`PATCH/DELETE /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances`) evaluated authorization against the `dag_id` from the URL path but operated on `dag_id`/`dag_run_id` values extracted from the request-body entities, creating an authorization bypass [patch_id=4186244].

What the fix does

The patch adds per-entity authorization checks inside `_categorize_entities`. For each entity extracted from the request body, the code now looks up the team name via `DagModel.get_team_name(dag_id)` and calls `get_auth_manager().is_authorized_dag()` with the entity's `dag_id`. If authorization fails, the entity is rejected with a 403 error and processing continues for the remaining entities. A `dag_authorization_cache` dictionary avoids redundant lookups for repeated Dag IDs [patch_id=4186244].

Preconditions

  • authThe attacker must be an authenticated user with edit permission on at least one Dag.
  • configThe deployment must rely on per-Dag edit-scope isolation between teams.
  • inputThe attacker must know the target Dag's ID and Dag Run ID to place in the request body.

Generated on Jun 1, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

2

News mentions

0

No linked articles in our index yet.