CVE-2026-49298
Description
Apache Airflow KubernetesExecutor exposes JWT tokens in pod spec, allowing read-only Kubernetes users to escalate to Execution API admin actions.
AI Insight
LLM-synthesized narrative grounded in this CVE's description and references.
Apache Airflow KubernetesExecutor exposes JWT tokens in pod spec, allowing read-only Kubernetes users to escalate to Execution API admin actions.
Vulnerability
A bug in Apache Airflow's KubernetesExecutor causes JWT tokens used by worker pods to authenticate against the Execution API to be passed as command-line arguments in the pod spec, making them visible in kubectl describe pod output. This affects all deployments using the KubernetesExecutor prior to apache-airflow 3.2.2. [1]
Exploitation
An attacker with authenticated Kubernetes read-only access (e.g., pods/get in the Airflow namespace) can run kubectl describe pod on a worker pod and extract the JWT from the command-line arguments. With this token, they can call state-mutating Execution API endpoints—triggering DAG runs, clearing runs, reading or writing Variables, Connections, and XComs—as if they were a running task. [1]
Impact
Successful exploitation grants the attacker the same privileges as a running task, enabling arbitrary state-mutating actions on the Airflow instance. This includes starting new DAG runs, clearing existing runs, and accessing or modifying sensitive data stored in Variables, Connections, and XComs, potentially leading to full compromise of the Airflow environment. [1]
Mitigation
Upgrade to apache-airflow 3.2.2 or later. This fix is complementary to the provider-side fix in apache-airflow-providers-cncf-kubernetes 10.17.0 (CVE-2026-27173); both upgrades are required to fully close the vulnerability. No workaround is available. [1]
AI Insight generated on Jun 1, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.
Affected products
2Patches
22b6e8181e3aeTwo-token mechanism for task execution to prevent token expiration while tasks wait in executor queues (#60108)
12 files changed · +282 −86
airflow-core/docs/security/jwt_token_authentication.rst+42 −19 modified@@ -201,16 +201,25 @@ Token structure (Execution API) Token scopes (Execution API) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -The Execution API defines two token scopes: +The Execution API defines two token scopes with different lifetimes: **workload** - A restricted scope accepted only on endpoints that explicitly opt in via - ``Security(require_auth, scopes=["token:workload"])``. Used for endpoints that - manage task state transitions. + A token embedded in the workload JSON payload when the Scheduler + dispatches a task. The longer lifetime + allows tasks to remain valid while waiting in executor queues before execution + begins. When a worker calls the ``/run`` endpoint with a ``workload`` token, the + server issues a fresh ``execution``-scoped token in the ``Refreshed-API-Token`` + response header. Lifetime equals ``[scheduler] task_queued_timeout`` (default + 600 seconds) — the same timeout the scheduler uses to reap queue-starved tasks — + so tuning ``task_queued_timeout`` also widens the window a task can wait in a + backed-up queue before its workload token expires. **execution** - Accepted by all Execution API endpoints. This is the standard scope for worker - communication and allows access + A short-lived token (default 10 minutes) accepted by all Execution API endpoints. + This is the standard scope for worker communication during task execution. Issued + by the server when the worker transitions to running via the ``/run`` endpoint. + The ``JWTReissueMiddleware`` refreshes ``execution`` tokens transparently, + so the worker maintains access for the duration of the task. Tokens without a ``scope`` claim default to ``"execution"`` for backwards compatibility. @@ -219,14 +228,19 @@ Token delivery to workers The token flows through the execution stack as follows: -1. **Scheduler** generates the token and embeds it in the workload JSON payload that it passes to - **Executor**. +1. **Scheduler** generates a ``workload``-scoped token (lifetime equals + ``[scheduler] task_queued_timeout``, default 600 seconds) and embeds it in the workload + JSON payload that it passes to **Executor**. 2. The workload JSON is passed to the worker process (via the executor-specific mechanism: Celery message, Kubernetes Pod spec, local subprocess arguments, etc.). 3. The worker's ``execute_workload()`` function reads the workload JSON and extracts the token. 4. The ``supervise()`` function receives the token and creates an ``httpx.Client`` instance with ``BearerAuth(token)`` for all Execution API HTTP requests. -5. The token is included in the ``Authorization: Bearer <token>`` header of every request. +5. The worker calls the ``/run`` endpoint with the ``workload``-scoped token to mark the task + as running. The server responds with a fresh ``execution``-scoped token in the + ``Refreshed-API-Token`` header. +6. The client's ``_update_auth()`` hook detects the header and transparently updates + the ``BearerAuth`` instance to use the new ``execution`` token for all subsequent requests. Token validation (Execution API) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -251,7 +265,8 @@ Route-level enforcement is handled by ``require_auth``: Token refresh (Execution API) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -The ``JWTReissueMiddleware`` automatically refreshes valid tokens that are approaching expiry: +The ``JWTReissueMiddleware`` automatically refreshes valid tokens that are approaching +expiry. The token must be valid at the start of the request for refresh to occur: 1. After each response, the middleware checks the token's remaining validity. 2. If less than **20%** of the total validity remains (minimum 30 seconds), the server @@ -260,16 +275,20 @@ The ``JWTReissueMiddleware`` automatically refreshes valid tokens that are appro 4. The client's ``_update_auth()`` hook detects this header and transparently updates the ``BearerAuth`` instance for subsequent requests. -This mechanism ensures long-running tasks do not lose API access due to token expiry, -without requiring the worker to re-authenticate. +The middleware only refreshes ``execution``-scoped tokens. ``workload``-scoped tokens are +sized to span the queued-timeout window and are explicitly skipped by the middleware — +they are designed to survive executor queue wait times without needing refresh. This +ensures long-running tasks do not lose API access without requiring the worker to +re-authenticate. No token revocation (Execution API) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Execution API tokens are not subject to revocation. They are short-lived (default 10 minutes) -and automatically refreshed by the ``JWTReissueMiddleware``, so revocation is not part of the -Execution API security model. Once an Execution API token is issued to a worker, it remains -valid until it expires. +Execution API tokens are not subject to revocation. ``execution``-scoped tokens are short-lived +(default 10 minutes) and automatically refreshed by the ``JWTReissueMiddleware``. +``workload``-scoped tokens (tracking ``[scheduler] task_queued_timeout``) are not refreshed — +they expire naturally after their validity period. Revocation is not part of the Execution API +security model. @@ -284,11 +303,12 @@ Default timings (Execution API) - Default * - ``[execution_api] jwt_expiration_time`` - 600 seconds (10 minutes) + * - Workload token lifetime (derived) + - ``[scheduler] task_queued_timeout`` (default 600 seconds) * - ``[execution_api] jwt_audience`` - ``urn:airflow.apache.org:task`` * - Token refresh threshold - - 20% of validity remaining (minimum 30 seconds, i.e., at ~120 seconds before expiry - with the default 600-second token lifetime) + - 20% of validity remaining (minimum 30 seconds) Dag File Processor and Triggerer @@ -386,7 +406,10 @@ All JWT-related configuration parameters: - JWKS endpoint URL or local file path for token validation. Mutually exclusive with ``jwt_secret``. * - ``[execution_api] jwt_expiration_time`` - 600 (10 min) - - Execution API token lifetime in seconds. + - Execution API ``execution``-scoped token lifetime in seconds. + * - ``[scheduler] task_queued_timeout`` + - 600.0 (10 min) + - Queue-starvation timeout. Also sets the ``workload``-scoped token lifetime to the same value. * - ``[execution_api] jwt_audience`` - ``urn:airflow.apache.org:task`` - Audience claim for Execution API tokens.
airflow-core/src/airflow/api_fastapi/auth/tokens.py+8 −2 modified@@ -447,15 +447,21 @@ def signing_arg(self) -> AllowedPrivateKeys | str: assert self._secret_key return self._secret_key - def generate(self, extras: dict[str, Any] | None = None, headers: dict[str, Any] | None = None) -> str: + def generate( + self, + extras: dict[str, Any] | None = None, + headers: dict[str, Any] | None = None, + valid_for: float | None = None, + ) -> str: """Generate a signed JWT for the subject.""" now = int(datetime.now(tz=timezone.utc).timestamp()) + effective_valid_for = valid_for if valid_for is not None else self.valid_for claims = { "jti": uuid.uuid4().hex, "iss": self.issuer, "aud": self.audience, "nbf": now, - "exp": int(now + self.valid_for), + "exp": int(now + effective_valid_for), "iat": now, }
airflow-core/src/airflow/api_fastapi/execution_api/app.py+8 −5 modified@@ -129,8 +129,6 @@ async def dispatch(self, request: Request, call_next): class JWTReissueMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): - from airflow.configuration import conf - response: Response = await call_next(request) refreshed_token: str | None = None @@ -142,9 +140,15 @@ async def dispatch(self, request: Request, call_next): validator: JWTValidator = await services.aget(JWTValidator) claims = await validator.avalidated_claims(token, {}) + # Workload tokens are long-lived and meant to survive queue + # wait times so avoid refreshing them. If avalidated_claims + # raises for a workload token, the outer except handles it. + if claims.get("scope") == "workload": + return response + now = int(time.time()) - validity = conf.getint("execution_api", "jwt_expiration_time") - refresh_when_less_than = max(int(validity * 0.20), 30) + token_lifetime = int(claims.get("exp", 0)) - int(claims.get("iat", 0)) + refresh_when_less_than = max(int(token_lifetime * 0.20), 30) valid_left = int(claims.get("exp", 0)) - now if valid_left <= refresh_when_less_than: generator: JWTGenerator = await services.aget(JWTGenerator) @@ -312,7 +316,6 @@ class InProcessExecutionAPI: def app(self): if not self._app: from airflow.api_fastapi.common.dagbag import create_dag_bag - from airflow.api_fastapi.execution_api.app import create_task_execution_api_app from airflow.api_fastapi.execution_api.datamodels.token import TIClaims, TIToken from airflow.api_fastapi.execution_api.routes.connections import has_connection_access from airflow.api_fastapi.execution_api.routes.variables import has_variable_access
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py+17 −3 modified@@ -28,7 +28,7 @@ import attrs import structlog from cadwyn import VersionedAPIRouter -from fastapi import Body, HTTPException, Query, Security, status +from fastapi import Body, HTTPException, Query, Response, Security, status from opentelemetry import trace from opentelemetry.trace import StatusCode from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator @@ -42,6 +42,7 @@ from airflow._shared.observability.traces import override_ids from airflow._shared.timezones import timezone +from airflow.api_fastapi.auth.tokens import JWTGenerator from airflow.api_fastapi.common.dagbag import DagBagDep, get_latest_version_of_dag from airflow.api_fastapi.common.db.common import SessionDep from airflow.api_fastapi.common.types import UtcDateTime @@ -63,7 +64,9 @@ TISuccessStatePayload, TITerminalStatePayload, ) -from airflow.api_fastapi.execution_api.security import ExecutionAPIRoute, require_auth +from airflow.api_fastapi.execution_api.datamodels.token import TIToken +from airflow.api_fastapi.execution_api.deps import DepContainer +from airflow.api_fastapi.execution_api.security import CurrentTIToken, ExecutionAPIRoute, require_auth from airflow.exceptions import TaskNotFound from airflow.models.asset import AssetActive from airflow.models.dag import DagModel @@ -98,6 +101,7 @@ @ti_id_router.patch( "/{task_instance_id}/run", status_code=status.HTTP_200_OK, + dependencies=[Security(require_auth, scopes=["token:execution", "token:workload"])], responses={ status.HTTP_404_NOT_FOUND: {"description": "Task Instance not found"}, status.HTTP_409_CONFLICT: {"description": "The TI is already in the requested state"}, @@ -108,8 +112,11 @@ def ti_run( task_instance_id: UUID, ti_run_payload: Annotated[TIEnterRunningPayload, Body()], + response: Response, session: SessionDep, dag_bag: DagBagDep, + services=DepContainer, + token: TIToken = CurrentTIToken, ) -> TIRunContext: """ Run a TaskInstance. @@ -293,13 +300,20 @@ def ti_run( context.next_method = ti.next_method context.next_kwargs = ti.next_kwargs context.start_date = ti.start_date - return context except SQLAlchemyError: log.exception("Error marking Task Instance state as running") raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Database error occurred" ) + # JWTReissueMiddleware also writes Refreshed-API-Token but skips workload tokens, so we set it here for the workload→execution swap. + if token.claims.scope == "workload": + generator: JWTGenerator = services.get(JWTGenerator) + execution_token = generator.generate(extras={"sub": str(task_instance_id), "scope": "execution"}) + response.headers["Refreshed-API-Token"] = execution_token + + return context + @ti_id_router.patch( "/{task_instance_id}/state",
airflow-core/src/airflow/config_templates/config.yml+4 −0 modified@@ -2586,6 +2586,10 @@ scheduler: task_queued_timeout: description: | Amount of time a task can be in the queued state before being retried or set to failed. + + This value also sets the lifetime of the workload JWT token that is sent with the task + to an executor queue, so a task waiting in the queue can still authenticate to the + Execution API until its queue-starvation deadline. version_added: 2.6.0 type: float example: ~
airflow-core/src/airflow/executors/workloads/base.py+9 −1 modified@@ -25,6 +25,8 @@ from pydantic import BaseModel, ConfigDict, Field +from airflow.configuration import conf + if TYPE_CHECKING: from airflow.api_fastapi.auth.tokens import JWTGenerator from airflow.executors.workloads.types import WorkloadState @@ -76,7 +78,13 @@ class BaseWorkloadSchema(BaseModel): @staticmethod def generate_token(sub_id: str, generator: JWTGenerator | None = None) -> str: - return generator.generate({"sub": sub_id}) if generator else "" + if not generator: + return "" + valid_for = conf.getfloat("scheduler", "task_queued_timeout") + return generator.generate( + extras={"sub": sub_id, "scope": "workload"}, + valid_for=valid_for, + ) class BaseDagBundleWorkload(BaseWorkloadSchema, ABC):
airflow-core/tests/unit/api_fastapi/auth/test_tokens.py+28 −0 modified@@ -160,6 +160,34 @@ def test_secret_key_with_configured_kid(): assert header["kid"] == "my-custom-kid" +def test_generate_with_custom_valid_for(): + """generate() accepts a valid_for override.""" + generator = JWTGenerator(secret_key="test-secret", audience="test", valid_for=60) + token = generator.generate(extras={"sub": "user"}, valid_for=3600) + claims = jwt.decode(token, "test-secret", algorithms=["HS512"], audience="test") + assert claims["exp"] - claims["iat"] == 3600 + + +def test_generate_workload_scope_via_extras(): + """generate() with scope='workload' in extras produces a workload-scoped token.""" + generator = JWTGenerator(secret_key="test-secret", audience="test", valid_for=60) + + token = generator.generate(extras={"sub": "ti-123", "scope": "workload"}, valid_for=86400) + claims = jwt.decode(token, "test-secret", algorithms=["HS512"], audience="test") + assert claims["sub"] == "ti-123" + assert claims["scope"] == "workload" + assert claims["exp"] - claims["iat"] == 86400 + + +def test_regular_token_has_no_scope(): + """Regular tokens without scope in extras have no scope claim.""" + generator = JWTGenerator(secret_key="test-secret", audience="test", valid_for=60) + + regular = generator.generate(extras={"sub": "user"}) + regular_claims = jwt.decode(regular, "test-secret", algorithms=["HS512"], audience="test") + assert "scope" not in regular_claims + + @pytest.fixture def jwt_generator(ed25519_private_key: Ed25519PrivateKey): key = ed25519_private_key
airflow-core/tests/unit/api_fastapi/execution_api/conftest.py+13 −6 modified@@ -22,8 +22,16 @@ from starlette.routing import Mount from airflow.api_fastapi.app import cached_app +from airflow.api_fastapi.execution_api.app import lifespan from airflow.api_fastapi.execution_api.datamodels.token import TIClaims, TIToken -from airflow.api_fastapi.execution_api.security import _jwt_bearer +from airflow.api_fastapi.execution_api.security import require_auth + + +@pytest.fixture(autouse=True) +def _restore_lifespan_registry(): + snapshot = dict(lifespan.registry._services) + yield + lifespan.registry._services = snapshot def _get_execution_api_app(root_app: FastAPI) -> FastAPI: @@ -45,16 +53,15 @@ def client(request: pytest.FixtureRequest): app = cached_app(apps="execution") exec_app = _get_execution_api_app(app) - async def mock_jwt_bearer(request: Request): + async def mock_require_auth(request: Request) -> TIToken: from uuid import UUID ti_id = UUID(request.path_params.get("task_instance_id", "00000000-0000-0000-0000-000000000000")) - claims = TIClaims(scope="execution") - return TIToken(id=ti_id, claims=claims) + return TIToken(id=ti_id, claims=TIClaims(scope="execution")) - exec_app.dependency_overrides[_jwt_bearer] = mock_jwt_bearer + exec_app.dependency_overrides[require_auth] = mock_require_auth with TestClient(app, headers={"Authorization": "Bearer fake"}) as client: yield client - exec_app.dependency_overrides.pop(_jwt_bearer, None) + exec_app.dependency_overrides.pop(require_auth, None)
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_router.py+2 −4 modified@@ -25,8 +25,6 @@ from airflow.api_fastapi.auth.tokens import JWTValidator from airflow.api_fastapi.execution_api.app import lifespan -from tests_common.test_utils.config import conf_vars - @pytest.fixture def exec_app(client): @@ -53,6 +51,7 @@ def test_expiring_token_is_reissued( auth = AsyncMock(spec=JWTValidator) auth.avalidated_claims.return_value = { "sub": "edb09971-4e0e-4221-ad3f-800852d38085", + "iat": moment, "exp": moment + validity, } @@ -62,8 +61,7 @@ def test_expiring_token_is_reissued( lifespan.registry.register_value(JWTValidator, auth) # In order to test this we need any endpoint to hit. The easiest one to use is variable get - with conf_vars({("execution_api", "jwt_expiration_time"): str(validity)}): - response = client.get("/execution/variables/key1", headers={"Authorization": "Bearer dummy"}) + response = client.get("/execution/variables/key1", headers={"Authorization": "Bearer dummy"}) if expect_refreshed_token: assert "Refreshed-API-Token" in response.headers
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py+126 −43 modified@@ -24,6 +24,7 @@ import pytest import uuid6 +from fastapi import Request from opentelemetry import trace as otel_trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor @@ -36,9 +37,11 @@ from airflow._shared.observability.traces import OverrideableRandomIdGenerator from airflow._shared.timezones import timezone -from airflow.api_fastapi.auth.tokens import JWTValidator +from airflow.api_fastapi.auth.tokens import JWTGenerator, JWTValidator from airflow.api_fastapi.execution_api.app import lifespan +from airflow.api_fastapi.execution_api.datamodels.token import TIClaims, TIToken from airflow.api_fastapi.execution_api.routes.task_instances import _emit_task_span +from airflow.api_fastapi.execution_api.security import require_auth from airflow.exceptions import AirflowSkipException from airflow.models import RenderedTaskInstanceFields, TaskReschedule, Trigger from airflow.models.asset import AssetActive, AssetAliasModel, AssetEvent, AssetModel @@ -112,10 +115,8 @@ def _create_asset_aliases(session, num: int = 2) -> None: @pytest.fixture def _use_real_jwt_bearer(exec_app): - """Remove the mock jwt_bearer override so the real JWTBearer.__call__ runs.""" - from airflow.api_fastapi.execution_api.security import _jwt_bearer - - exec_app.dependency_overrides.pop(_jwt_bearer, None) + """Remove the mock require_auth override so the real JWT validation runs end-to-end.""" + exec_app.dependency_overrides.pop(require_auth, None) @pytest.mark.usefixtures("_use_real_jwt_bearer") @@ -244,6 +245,8 @@ def test_ti_run_state_to_running( } # upstream_map_indexes is now computed by Task SDK, not returned by the server in HEAD version assert "upstream_map_indexes" not in result + # execution-scoped tokens do not trigger a token swap + assert "Refreshed-API-Token" not in response.headers # Refresh the Task Instance from the database so that we can check the updated values session.refresh(ti) @@ -283,6 +286,54 @@ def test_ti_run_state_to_running( ) assert response.status_code == 409 + def test_ti_run_returns_execution_token( + self, client, exec_app, session, create_task_instance, time_machine + ): + """PATCH /run with a workload token should swap to an execution-scoped token.""" + instant = timezone.parse("2024-10-31T12:00:00Z") + time_machine.move_to(instant, tick=False) + + ti = create_task_instance( + task_id="test_exec_token", + state=State.QUEUED, + dagrun_state=DagRunState.RUNNING, + session=session, + start_date=instant, + dag_id=str(uuid4()), + ) + session.commit() + + mock_gen = mock.MagicMock(spec=JWTGenerator) + mock_gen.generate.return_value = "mock-execution-token" + lifespan.registry.register_value(JWTGenerator, mock_gen) + + async def workload_token(request: Request) -> TIToken: + ti_id = UUID(request.path_params.get("task_instance_id", "00000000-0000-0000-0000-000000000000")) + return TIToken(id=ti_id, claims=TIClaims(scope="workload")) + + exec_app.dependency_overrides[require_auth] = workload_token + + response = client.patch( + f"/execution/task-instances/{ti.id}/run", + json={ + "state": "running", + "hostname": "test-host", + "unixname": "test-user", + "pid": 100, + "start_date": "2024-10-31T12:00:00Z", + }, + ) + + exec_app.dependency_overrides.pop(require_auth, None) + + assert response.status_code == 200 + assert "Refreshed-API-Token" in response.headers + assert response.headers["Refreshed-API-Token"] == "mock-execution-token" + mock_gen.generate.assert_called_once() + extras = mock_gen.generate.call_args.kwargs["extras"] + assert extras["scope"] == "execution" + assert extras["sub"] == str(ti.id) + def test_dynamic_task_mapping_with_parse_time_value(self, client, dag_maker): """Test that dynamic task mapping works correctly with parse-time values.""" with dag_maker("test_dynamic_task_mapping_with_parse_time_value", serialized=True): @@ -3497,40 +3548,56 @@ def test_ti_patch_rendered_map_index_empty_string(self, client, session, create_ class TestTokenTypeValidation: """Test token scope enforcement (workload vs execution).""" - def test_workload_scope_rejected_on_default_endpoints(self, client, session, create_task_instance): - """workload scoped tokens should be rejected on endpoints without token:workload Security scope.""" + def _register_scoped_validator(self, ti_id, scope): + """Register a JWTValidator mock returning claims with the given scope.""" + validator = mock.AsyncMock(spec=JWTValidator) + claims = {"sub": str(ti_id), "exp": 9999999999, "iat": 1000000000, "nbf": 1000000000} + if scope is not None: + claims["scope"] = scope + validator.avalidated_claims.side_effect = lambda cred, validators: claims + lifespan.registry.register_value(JWTValidator, validator) + + def test_workload_scope_rejected_on_heartbeat_endpoint(self, client, session, create_task_instance): + """Workload scoped tokens should be rejected on /heartbeat.""" ti = create_task_instance(task_id="test_ti_run_heartbeat", state=State.RUNNING) session.commit() - validator = mock.AsyncMock(spec=JWTValidator) - validator.avalidated_claims.side_effect = lambda cred, validators: { - "sub": str(ti.id), - "scope": "workload", - "exp": 9999999999, - "iat": 1000000000, - "nbf": 1000000000, - } - lifespan.registry.register_value(JWTValidator, validator) + self._register_scoped_validator(ti.id, "workload") payload = {"hostname": "test-host", "pid": 100} resp = client.put(f"/execution/task-instances/{ti.id}/heartbeat", json=payload) assert resp.status_code == 403 assert "Token type 'workload' not allowed" in resp.json()["detail"] + def test_workload_scope_rejected_on_state_endpoint(self, client, session, create_task_instance): + """Workload scoped tokens should be rejected on PATCH /state.""" + ti = create_task_instance(task_id="test_workload_state", state=State.RUNNING) + session.commit() + + self._register_scoped_validator(ti.id, "workload") + + payload = {"state": "success", "end_date": "2024-10-31T13:00:00Z"} + resp = client.patch(f"/execution/task-instances/{ti.id}/state", json=payload) + assert resp.status_code == 403 + assert "Token type 'workload' not allowed" in resp.json()["detail"] + + def test_workload_scope_rejected_on_connections_endpoint(self, client, session, create_task_instance): + """Workload scoped tokens should be rejected on GET /connections (different router).""" + ti = create_task_instance(task_id="test_workload_conn", state=State.RUNNING) + session.commit() + + self._register_scoped_validator(ti.id, "workload") + + resp = client.get("/execution/connections/test_conn") + assert resp.status_code == 403 + assert "Token type 'workload' not allowed" in resp.json()["detail"] + def test_execution_scope_accepted_on_all_endpoints(self, client, session, create_task_instance): - """execution scoped tokens should be able to call all endpoints.""" + """Execution scoped tokens should be accepted on all endpoints.""" ti = create_task_instance(task_id="test_ti_star", state=State.RUNNING) session.commit() - validator = mock.AsyncMock(spec=JWTValidator) - validator.avalidated_claims.side_effect = lambda cred, validators: { - "sub": str(ti.id), - "scope": "execution", - "exp": 9999999999, - "iat": 1000000000, - "nbf": 1000000000, - } - lifespan.registry.register_value(JWTValidator, validator) + self._register_scoped_validator(ti.id, "execution") payload = {"state": "success", "end_date": "2024-10-31T13:00:00Z"} resp = client.patch(f"/execution/task-instances/{ti.id}/state", json=payload) @@ -3541,15 +3608,7 @@ def test_invalid_scope_value_rejected(self, client, session, create_task_instanc ti = create_task_instance(task_id="test_invalid_scope", state=State.QUEUED) session.commit() - validator = mock.AsyncMock(spec=JWTValidator) - validator.avalidated_claims.side_effect = lambda cred, validators: { - "sub": str(ti.id), - "scope": "bogus:scope", - "exp": 9999999999, - "iat": 1000000000, - "nbf": 1000000000, - } - lifespan.registry.register_value(JWTValidator, validator) + self._register_scoped_validator(ti.id, "bogus:scope") payload = { "state": "running", @@ -3563,19 +3622,43 @@ def test_invalid_scope_value_rejected(self, client, session, create_task_instanc assert resp.status_code == 403 assert "Invalid auth token" in resp.json()["detail"] + def test_workload_scope_accepted_on_run_endpoint( + self, client, session, create_task_instance, time_machine + ): + """Workload scoped tokens should be accepted on the /run endpoint.""" + instant = timezone.parse("2024-10-31T12:00:00Z") + time_machine.move_to(instant, tick=False) + + ti = create_task_instance( + task_id="test_workload_run", + state=State.QUEUED, + dagrun_state=DagRunState.RUNNING, + session=session, + start_date=instant, + dag_id=str(uuid4()), + ) + session.commit() + + self._register_scoped_validator(ti.id, "workload") + + resp = client.patch( + f"/execution/task-instances/{ti.id}/run", + json={ + "state": "running", + "hostname": "test-host", + "unixname": "test-user", + "pid": 100, + "start_date": "2024-10-31T12:00:00Z", + }, + ) + assert resp.status_code == 200 + def test_no_scope_defaults_to_execution(self, client, session, create_task_instance): """Tokens without scope claim should default to 'execution'.""" ti = create_task_instance(task_id="test_no_scope", state=State.RUNNING) session.commit() - validator = mock.AsyncMock(spec=JWTValidator) - validator.avalidated_claims.side_effect = lambda cred, validators: { - "sub": str(ti.id), - "exp": 9999999999, - "iat": 1000000000, - "nbf": 1000000000, - } - lifespan.registry.register_value(JWTValidator, validator) + self._register_scoped_validator(ti.id, None) payload = {"state": "success", "end_date": "2024-10-31T13:00:00Z"} resp = client.patch(f"/execution/task-instances/{ti.id}/state", json=payload)
airflow-core/tests/unit/executors/test_workloads.py+23 −2 modified@@ -20,9 +20,12 @@ from pathlib import PurePosixPath from uuid import uuid4 +import jwt + +from airflow.api_fastapi.auth.tokens import JWTGenerator from airflow.executors import workloads -from airflow.executors.workloads import TaskInstance, TaskInstanceDTO -from airflow.executors.workloads.base import BundleInfo +from airflow.executors.workloads import TaskInstance, TaskInstanceDTO, base as workloads_base +from airflow.executors.workloads.base import BaseWorkloadSchema, BundleInfo from airflow.executors.workloads.task import ExecuteTask @@ -61,3 +64,21 @@ def test_token_excluded_from_workload_repr(): assert fake_token not in workload_repr, f"JWT token leaked into repr! Found token in: {workload_repr}" # But token should still be accessible as an attribute assert workload.token == fake_token + + +def test_generate_token_produces_workload_scope(monkeypatch): + """generate_token should create a JWT with scope 'workload' and [scheduler] task_queued_timeout expiry.""" + monkeypatch.setattr(workloads_base.conf, "getfloat", lambda section, key: 86400.0) + + generator = JWTGenerator(secret_key="test-secret", audience="test", valid_for=60) + token = BaseWorkloadSchema.generate_token("ti-123", generator) + + claims = jwt.decode(token, "test-secret", algorithms=["HS512"], audience="test") + assert claims["sub"] == "ti-123" + assert claims["scope"] == "workload" + assert claims["exp"] - claims["iat"] == 86400 + + +def test_generate_token_without_generator(): + """generate_token should return empty string when no generator is provided.""" + assert BaseWorkloadSchema.generate_token("ti-123", None) == ""
devel-common/src/tests_common/test_utils/mock_executor.py+2 −1 modified@@ -22,6 +22,7 @@ from typing import TYPE_CHECKING from unittest.mock import MagicMock +from airflow.api_fastapi.auth.tokens import JWTGenerator from airflow.executors.base_executor import BaseExecutor from airflow.executors.executor_utils import ExecutorName from airflow.models.taskinstance import TaskInstance @@ -57,7 +58,7 @@ def __init__(self, do_update=True, *args, **kwargs): self.mock_task_results = defaultdict(self.success) # Mock JWT generator for token generation - mock_jwt_generator = MagicMock() + mock_jwt_generator = MagicMock(spec=JWTGenerator) mock_jwt_generator.generate.return_value = "mock-token" self.jwt_generator = mock_jwt_generator
cde4885818beUpdating release notes for 3.2.2rc3
2 files changed · +5 −4
RELEASE_NOTES.rst+3 −2 modified@@ -24,7 +24,7 @@ .. towncrier release notes start -Airflow 3.2.2 (2026-05-27) +Airflow 3.2.2 (2026-05-29) -------------------------- Significant Changes @@ -81,7 +81,8 @@ Significant Changes Bug Fixes ^^^^^^^^^ - +- Fix ``Callback.handle_event`` triggerer crash when OpenTelemetry metrics receive dict typed tag values (#67527) (#67529) +- UI: Rewrite ``modulepreload hrefs`` to the api-server static path (#67548) (#67556) - Correctly pre-allocate ``external_executor_id`` with multiple executors on PostgreSQL (#67388) (#67458) - Return raw import-error stacktrace when a Dag file has no registered Dag (#67465) (#67478) - UI: Fix Expand/Collapse All on XComs and Audit Log JSON cells (#67316) (#67361)
reproducible_build.yaml+2 −2 modified@@ -1,2 +1,2 @@ -release-notes-hash: 6407b48d1054fe3ce68c09bf4435d91d -source-date-epoch: 1779745327 +release-notes-hash: 504288db9a9dc13a0db859232fab98d0 +source-date-epoch: 1779811737
Vulnerability mechanics
Root cause
"The JWT token used by worker pods to authenticate against the Execution API was passed to the worker container as a command-line argument visible in the Kubernetes pod spec, and the token was accepted by all Execution API endpoints without scope restriction."
Attack vector
An authenticated Airflow UI/API user with Kubernetes read-only permissions (e.g., `pods/get` in the Airflow namespace) can run `kubectl describe pod` to extract the JWT token from the pod spec's command-line arguments [CWE-538]. This token, originally scoped as `workload`, was accepted by all Execution API endpoints before the fix. The attacker can then use the harvested token to call state-mutating Execution API endpoints — triggering DAG runs, clearing runs, reading or writing Variables/Connections/XComs — as if they were a running task.
Affected code
The vulnerability exists in the `KubernetesExecutor` path where the scheduler's JWT token is embedded in the workload JSON payload and passed to the worker pod as a command-line argument visible in the pod spec. The patch introduces a two-token mechanism in `airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py` (the `ti_run` endpoint) and the `BaseWorkloadSchema` in `airflow/executors/workloads/base.py`.
What the fix does
The patch introduces a two-token mechanism: a `workload`-scoped token (lifetime tied to `[scheduler] task_queued_timeout`) travels with the task through the executor queue, and a short-lived `execution`-scoped token is issued by the server when the worker successfully calls the `/run` endpoint. The `workload` token is now restricted to only the `/run` endpoint via FastAPI `Security` scopes (`token:workload`). All other Execution API endpoints require the `execution` scope. The client's `_update_auth()` hook transparently swaps to the new token from the `Refreshed-API-Token` response header. This means that even if an attacker harvests the `workload` token from the Kubernetes pod spec, they can only call `/run` and cannot mutate state via other endpoints.
Preconditions
- configDeployment uses KubernetesExecutor
- authAttacker has authenticated UI/API access with Kubernetes read-only permissions (e.g., pods/get) in the Airflow namespace
- inputAttacker can run kubectl describe pod to extract the JWT from command-line arguments
Generated on Jun 1, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
2News mentions
0No linked articles in our index yet.