Apache Airflow: Dag Code and Import Error Permissions Ignored
Description
Apache Airflow, versions before 2.8.2, has a vulnerability that allows authenticated users to view DAG code and import errors of DAGs they do not have permission to view through the API and the UI.
Users of Apache Airflow are recommended to upgrade to version 2.8.2 or newer to mitigate the risk associated with this vulnerability
AI Insight
LLM-synthesized narrative grounded in this CVE's description and references.
Apache Airflow before 2.8.2 allows authenticated users to view DAG code and import errors of DAGs they lack permission to access via API and UI.
Vulnerability
Description Apache Airflow versions prior to 2.8.2 contain a vulnerability where the API and UI do not properly enforce permission checks for viewing DAG code and import errors. Specifically, the ImportError endpoint lacks a permission check, allowing authenticated users to access information for DAGs they are not authorized to view [1][2]. The root cause is a missing permission validation in the handling of import errors, as evidenced by the fix that adds permission checks in the relevant handler [3].
Exploitation
An authenticated attacker can exploit this by directly requesting DAG code or import errors for any DAG ID via the API or UI. The attacker does not need special permissions beyond being logged in. By enumerating DAG IDs, they can retrieve sensitive information such as task definitions, connection details, and error logs that may contain credentials or business logic [2].
Impact
This information disclosure vulnerability can lead to exposure of sensitive data and intellectual property. An attacker could gain insights into workflow logic, potentially enabling further attacks such as privilege escalation or lateral movement within the Airflow environment [1].
Mitigation
The vulnerability is fixed in Apache Airflow version 2.8.2. Users should upgrade to this version or later. The fix ensures that permission checks are applied before returning import errors and DAG code, as shown in the relevant commits [3]. There are no known workarounds.
AI Insight generated on May 20, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.
Affected packages
Versions sourced from the GitHub Security Advisory.
| Package | Affected versions | Patched versions |
|---|---|---|
apache-airflowPyPI | < 2.8.2 | 2.8.2 |
Affected products
3- osv-coords2 versions
< 2.8.2+ 1 more
- (no CPE)range: < 2.8.2
- (no CPE)range: < 2.8.2
- Apache Software Foundation/Apache Airflowv5Range: 0
Patches
990255d9d44a6Check permissions for ImportError (#37468)
4 files changed · +314 −21
airflow/api_connexion/endpoints/import_error_endpoint.py+56 −5 modified@@ -16,39 +16,59 @@ # under the License. from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Sequence from sqlalchemy import func, select from airflow.api_connexion import security -from airflow.api_connexion.exceptions import NotFound +from airflow.api_connexion.exceptions import NotFound, PermissionDenied from airflow.api_connexion.parameters import apply_sorting, check_limit, format_parameters from airflow.api_connexion.schemas.error_schema import ( ImportErrorCollection, import_error_collection_schema, import_error_schema, ) -from airflow.auth.managers.models.resource_details import AccessView +from airflow.auth.managers.models.resource_details import AccessView, DagDetails +from airflow.models.dag import DagModel from airflow.models.errors import ImportError as ImportErrorModel from airflow.utils.session import NEW_SESSION, provide_session +from airflow.www.extensions.init_auth_manager import get_auth_manager if TYPE_CHECKING: from sqlalchemy.orm import Session from airflow.api_connexion.types import APIResponse + from airflow.auth.managers.models.batch_apis import IsAuthorizedDagRequest @security.requires_access_view(AccessView.IMPORT_ERRORS) @provide_session def get_import_error(*, import_error_id: int, session: Session = NEW_SESSION) -> APIResponse: """Get an import error.""" error = session.get(ImportErrorModel, import_error_id) - if error is None: raise NotFound( "Import error not found", detail=f"The ImportError with import_error_id: `{import_error_id}` was not found", ) + session.expunge(error) + + can_read_all_dags = get_auth_manager().is_authorized_dag(method="GET") + if not can_read_all_dags: + readable_dag_ids = security.get_readable_dags() + file_dag_ids = { + dag_id[0] + for dag_id in session.query(DagModel.dag_id).filter(DagModel.fileloc == error.filename).all() + } + + # Can the user read any DAGs in the file? + if not readable_dag_ids.intersection(file_dag_ids): + raise PermissionDenied(detail="You do not have read permission on any of the DAGs in the file") + + # Check if user has read access to all the DAGs defined in the file + if not file_dag_ids.issubset(readable_dag_ids): + error.stacktrace = "REDACTED - you do not have read permission on all DAGs in the file" + return import_error_schema.dump(error) @@ -65,10 +85,41 @@ def get_import_errors( """Get all import errors.""" to_replace = {"import_error_id": "id"} allowed_filter_attrs = ["import_error_id", "timestamp", "filename"] - total_entries = session.scalars(func.count(ImportErrorModel.id)).one() + count_query = select(func.count(ImportErrorModel.id)) query = select(ImportErrorModel) query = apply_sorting(query, order_by, to_replace, allowed_filter_attrs) + + can_read_all_dags = get_auth_manager().is_authorized_dag(method="GET") + + if not can_read_all_dags: + # if the user doesn't have access to all DAGs, only display errors from visible DAGs + readable_dag_ids = security.get_readable_dags() + dagfiles_subq = ( + select(DagModel.fileloc).distinct().where(DagModel.dag_id.in_(readable_dag_ids)).subquery() + ) + query = query.where(ImportErrorModel.filename.in_(dagfiles_subq)) + count_query = count_query.where(ImportErrorModel.filename.in_(dagfiles_subq)) + + total_entries = session.scalars(count_query).one() import_errors = session.scalars(query.offset(offset).limit(limit)).all() + + if not can_read_all_dags: + for import_error in import_errors: + # Check if user has read access to all the DAGs defined in the file + file_dag_ids = ( + session.query(DagModel.dag_id).filter(DagModel.fileloc == import_error.filename).all() + ) + requests: Sequence[IsAuthorizedDagRequest] = [ + { + "method": "GET", + "details": DagDetails(id=dag_id[0]), + } + for dag_id in file_dag_ids + ] + if not get_auth_manager().batch_is_authorized_dag(requests): + session.expunge(import_error) + import_error.stacktrace = "REDACTED - you do not have read permission on all DAGs in the file" + return import_error_collection_schema.dump( ImportErrorCollection(import_errors=import_errors, total_entries=total_entries) )
airflow/www/views.py+38 −13 modified@@ -147,6 +147,7 @@ if TYPE_CHECKING: from sqlalchemy.orm import Session + from airflow.auth.managers.models.batch_apis import IsAuthorizedDagRequest from airflow.models.dag import DAG from airflow.models.operator import Operator @@ -935,20 +936,44 @@ def index(self): owner_links_dict = DagOwnerAttributes.get_all(session) - import_errors = select(errors.ImportError).order_by(errors.ImportError.id) - - if not get_auth_manager().is_authorized_dag(method="GET"): - # if the user doesn't have access to all DAGs, only display errors from visible DAGs - import_errors = import_errors.join( - DagModel, DagModel.fileloc == errors.ImportError.filename - ).where(DagModel.dag_id.in_(filter_dag_ids)) + if get_auth_manager().is_authorized_view(access_view=AccessView.IMPORT_ERRORS): + import_errors = select(errors.ImportError).order_by(errors.ImportError.id) + + can_read_all_dags = get_auth_manager().is_authorized_dag(method="GET") + if not can_read_all_dags: + # if the user doesn't have access to all DAGs, only display errors from visible DAGs + import_errors = import_errors.where( + errors.ImportError.filename.in_( + select(DagModel.fileloc) + .distinct() + .where(DagModel.dag_id.in_(filter_dag_ids)) + .subquery() + ) + ) - import_errors = session.scalars(import_errors) - for import_error in import_errors: - flash( - f"Broken DAG: [{import_error.filename}] {import_error.stacktrace}", - "dag_import_error", - ) + import_errors = session.scalars(import_errors) + for import_error in import_errors: + stacktrace = import_error.stacktrace + if not can_read_all_dags: + # Check if user has read access to all the DAGs defined in the file + file_dag_ids = ( + session.query(DagModel.dag_id) + .filter(DagModel.fileloc == import_error.filename) + .all() + ) + requests: Sequence[IsAuthorizedDagRequest] = [ + { + "method": "GET", + "details": DagDetails(id=dag_id[0]), + } + for dag_id in file_dag_ids + ] + if not get_auth_manager().batch_is_authorized_dag(requests): + stacktrace = "REDACTED - you do not have read permission on all DAGs in the file" + flash( + f"Broken DAG: [{import_error.filename}]\r{stacktrace}", + "dag_import_error", + ) from airflow.plugins_manager import import_errors as plugin_import_errors
tests/api_connexion/endpoints/test_import_error_endpoint.py+159 −3 modified@@ -21,16 +21,19 @@ import pytest from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP +from airflow.models.dag import DagModel from airflow.models.errors import ImportError from airflow.security import permissions from airflow.utils import timezone from airflow.utils.session import provide_session from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_user from tests.test_utils.config import conf_vars -from tests.test_utils.db import clear_db_import_errors +from tests.test_utils.db import clear_db_dags, clear_db_import_errors pytestmark = pytest.mark.db_test +TEST_DAG_IDS = ["test_dag", "test_dag2"] + @pytest.fixture(scope="module") def configured_app(minimal_app_for_api): @@ -39,14 +42,34 @@ def configured_app(minimal_app_for_api): app, # type:ignore username="test", role_name="Test", - permissions=[(permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR)], # type: ignore + permissions=[ + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR), + ], # type: ignore ) create_user(app, username="test_no_permissions", role_name="TestNoPermissions") # type: ignore + create_user( + app, # type:ignore + username="test_single_dag", + role_name="TestSingleDAG", + permissions=[(permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR)], # type: ignore + ) + # For some reason, DAG level permissions are not synced when in the above list of perms, + # so do it manually here: + app.appbuilder.sm.bulk_sync_roles( + [ + { + "role": "TestSingleDAG", + "perms": [(permissions.ACTION_CAN_READ, permissions.resource_name_for_dag(TEST_DAG_IDS[0]))], + } + ] + ) - yield minimal_app_for_api + yield app delete_user(app, username="test") # type: ignore delete_user(app, username="test_no_permissions") # type: ignore + delete_user(app, username="test_single_dag") # type: ignore class TestBaseImportError: @@ -58,9 +81,11 @@ def setup_attrs(self, configured_app) -> None: self.client = self.app.test_client() # type:ignore clear_db_import_errors() + clear_db_dags() def teardown_method(self) -> None: clear_db_import_errors() + clear_db_dags() @staticmethod def _normalize_import_errors(import_errors): @@ -121,6 +146,72 @@ def test_should_raise_403_forbidden(self): ) assert response.status_code == 403 + def test_should_raise_403_forbidden_without_dag_read(self, session): + import_error = ImportError( + filename="Lorem_ipsum.py", + stacktrace="Lorem ipsum", + timestamp=timezone.parse(self.timestamp, timezone="UTC"), + ) + session.add(import_error) + session.commit() + + response = self.client.get( + f"/api/v1/importErrors/{import_error.id}", environ_overrides={"REMOTE_USER": "test_single_dag"} + ) + + assert response.status_code == 403 + + def test_should_return_200_with_single_dag_read(self, session): + dag_model = DagModel(dag_id=TEST_DAG_IDS[0], fileloc="Lorem_ipsum.py") + session.add(dag_model) + import_error = ImportError( + filename="Lorem_ipsum.py", + stacktrace="Lorem ipsum", + timestamp=timezone.parse(self.timestamp, timezone="UTC"), + ) + session.add(import_error) + session.commit() + + response = self.client.get( + f"/api/v1/importErrors/{import_error.id}", environ_overrides={"REMOTE_USER": "test_single_dag"} + ) + + assert response.status_code == 200 + response_data = response.json + response_data["import_error_id"] = 1 + assert { + "filename": "Lorem_ipsum.py", + "import_error_id": 1, + "stack_trace": "Lorem ipsum", + "timestamp": "2020-06-10T12:00:00+00:00", + } == response_data + + def test_should_return_200_redacted_with_single_dag_read_in_dagfile(self, session): + for dag_id in TEST_DAG_IDS: + dag_model = DagModel(dag_id=dag_id, fileloc="Lorem_ipsum.py") + session.add(dag_model) + import_error = ImportError( + filename="Lorem_ipsum.py", + stacktrace="Lorem ipsum", + timestamp=timezone.parse(self.timestamp, timezone="UTC"), + ) + session.add(import_error) + session.commit() + + response = self.client.get( + f"/api/v1/importErrors/{import_error.id}", environ_overrides={"REMOTE_USER": "test_single_dag"} + ) + + assert response.status_code == 200 + response_data = response.json + response_data["import_error_id"] = 1 + assert { + "filename": "Lorem_ipsum.py", + "import_error_id": 1, + "stack_trace": "REDACTED - you do not have read permission on all DAGs in the file", + "timestamp": "2020-06-10T12:00:00+00:00", + } == response_data + class TestGetImportErrorsEndpoint(TestBaseImportError): def test_get_import_errors(self, session): @@ -231,6 +322,71 @@ def test_should_raises_401_unauthenticated(self, session): assert_401(response) + def test_get_import_errors_single_dag(self, session): + for dag_id in TEST_DAG_IDS: + fake_filename = f"/tmp/{dag_id}.py" + dag_model = DagModel(dag_id=dag_id, fileloc=fake_filename) + session.add(dag_model) + importerror = ImportError( + filename=fake_filename, + stacktrace="Lorem ipsum", + timestamp=timezone.parse(self.timestamp, timezone="UTC"), + ) + session.add(importerror) + session.commit() + + response = self.client.get( + "/api/v1/importErrors", environ_overrides={"REMOTE_USER": "test_single_dag"} + ) + + assert response.status_code == 200 + response_data = response.json + self._normalize_import_errors(response_data["import_errors"]) + assert { + "import_errors": [ + { + "filename": "/tmp/test_dag.py", + "import_error_id": 1, + "stack_trace": "Lorem ipsum", + "timestamp": "2020-06-10T12:00:00+00:00", + }, + ], + "total_entries": 1, + } == response_data + + def test_get_import_errors_single_dag_in_dagfile(self, session): + for dag_id in TEST_DAG_IDS: + fake_filename = "/tmp/all_in_one.py" + dag_model = DagModel(dag_id=dag_id, fileloc=fake_filename) + session.add(dag_model) + + importerror = ImportError( + filename="/tmp/all_in_one.py", + stacktrace="Lorem ipsum", + timestamp=timezone.parse(self.timestamp, timezone="UTC"), + ) + session.add(importerror) + session.commit() + + response = self.client.get( + "/api/v1/importErrors", environ_overrides={"REMOTE_USER": "test_single_dag"} + ) + + assert response.status_code == 200 + response_data = response.json + self._normalize_import_errors(response_data["import_errors"]) + assert { + "import_errors": [ + { + "filename": "/tmp/all_in_one.py", + "import_error_id": 1, + "stack_trace": "REDACTED - you do not have read permission on all DAGs in the file", + "timestamp": "2020-06-10T12:00:00+00:00", + }, + ], + "total_entries": 1, + } == response_data + class TestGetImportErrorsEndpointPagination(TestBaseImportError): @pytest.mark.parametrize(
tests/www/views/test_views_home.py+61 −0 modified@@ -111,6 +111,30 @@ def test_home_status_filter_cookie(admin_client): assert "all" == flask.session[FILTER_STATUS_COOKIE] +@pytest.fixture(scope="module") +def user_no_importerror(app): + """Create User that cannot access Import Errors""" + return create_user( + app, + username="user_no_importerrors", + role_name="role_no_importerrors", + permissions=[ + (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + ], + ) + + +@pytest.fixture() +def client_no_importerror(app, user_no_importerror): + """Client for User that cannot access Import Errors""" + return client_with_login( + app, + username="user_no_importerrors", + password="user_no_importerrors", + ) + + @pytest.fixture(scope="module") def user_single_dag(app): """Create User that can only access the first DAG from TEST_FILTER_DAG_IDS""" @@ -120,6 +144,7 @@ def user_single_dag(app): role_name="role_single_dag", permissions=[ (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR), (permissions.ACTION_CAN_READ, permissions.resource_name_for_dag(TEST_FILTER_DAG_IDS[0])), ], ) @@ -232,6 +257,24 @@ def broken_dags_with_read_perm(tmp_path, working_dags_with_read_perm): _process_file(path, session) +@pytest.fixture() +def broken_dags_after_working(tmp_path): + # First create and process a DAG file that works + path = tmp_path / "all_in_one.py" + with create_session() as session: + contents = "from airflow import DAG\n" + for i, dag_id in enumerate(TEST_FILTER_DAG_IDS): + contents += f"dag{i} = DAG('{dag_id}')\n" + path.write_text(contents) + _process_file(path, session) + + # Then break it! + with create_session() as session: + contents += "foobar()" + path.write_text(contents) + _process_file(path, session) + + def test_home_filter_tags(working_dags, admin_client): with admin_client: admin_client.get("home?tags=example&tags=data", follow_redirects=True) @@ -249,6 +292,12 @@ def test_home_importerrors(broken_dags, user_client): check_content_in_response(f"/{dag_id}.py", resp) +def test_home_no_importerrors_perm(broken_dags, client_no_importerror): + # Users without "can read on import errors" don't see any import errors + resp = client_no_importerror.get("home", follow_redirects=True) + check_content_not_in_response("Import Errors", resp) + + @pytest.mark.parametrize( "page", [ @@ -266,11 +315,23 @@ def test_home_importerrors_filtered_singledag_user(broken_dags_with_read_perm, c check_content_in_response("Import Errors", resp) # They can see the first DAGs import error check_content_in_response(f"/{TEST_FILTER_DAG_IDS[0]}.py", resp) + check_content_in_response("Traceback", resp) # But not the rest for dag_id in TEST_FILTER_DAG_IDS[1:]: check_content_not_in_response(f"/{dag_id}.py", resp) +def test_home_importerrors_missing_read_on_all_dags_in_file(broken_dags_after_working, client_single_dag): + # If a user doesn't have READ on all DAGs in a file, that files traceback is redacted + resp = client_single_dag.get("home", follow_redirects=True) + check_content_in_response("Import Errors", resp) + # They can see the DAG file has an import error + check_content_in_response("all_in_one.py", resp) + # And the traceback is redacted + check_content_not_in_response("Traceback", resp) + check_content_in_response("REDACTED", resp) + + def test_home_dag_list(working_dags, user_client): # Users with "can read on DAGs" gets all DAGs resp = user_client.get("home", follow_redirects=True)
d944eb0de216Check permissions for ImportError (#37468)
4 files changed · +314 −21
airflow/api_connexion/endpoints/import_error_endpoint.py+56 −5 modified@@ -16,39 +16,59 @@ # under the License. from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Sequence from sqlalchemy import func, select from airflow.api_connexion import security -from airflow.api_connexion.exceptions import NotFound +from airflow.api_connexion.exceptions import NotFound, PermissionDenied from airflow.api_connexion.parameters import apply_sorting, check_limit, format_parameters from airflow.api_connexion.schemas.error_schema import ( ImportErrorCollection, import_error_collection_schema, import_error_schema, ) -from airflow.auth.managers.models.resource_details import AccessView +from airflow.auth.managers.models.resource_details import AccessView, DagDetails +from airflow.models.dag import DagModel from airflow.models.errors import ImportError as ImportErrorModel from airflow.utils.session import NEW_SESSION, provide_session +from airflow.www.extensions.init_auth_manager import get_auth_manager if TYPE_CHECKING: from sqlalchemy.orm import Session from airflow.api_connexion.types import APIResponse + from airflow.auth.managers.models.batch_apis import IsAuthorizedDagRequest @security.requires_access_view(AccessView.IMPORT_ERRORS) @provide_session def get_import_error(*, import_error_id: int, session: Session = NEW_SESSION) -> APIResponse: """Get an import error.""" error = session.get(ImportErrorModel, import_error_id) - if error is None: raise NotFound( "Import error not found", detail=f"The ImportError with import_error_id: `{import_error_id}` was not found", ) + session.expunge(error) + + can_read_all_dags = get_auth_manager().is_authorized_dag(method="GET") + if not can_read_all_dags: + readable_dag_ids = security.get_readable_dags() + file_dag_ids = { + dag_id[0] + for dag_id in session.query(DagModel.dag_id).filter(DagModel.fileloc == error.filename).all() + } + + # Can the user read any DAGs in the file? + if not readable_dag_ids.intersection(file_dag_ids): + raise PermissionDenied(detail="You do not have read permission on any of the DAGs in the file") + + # Check if user has read access to all the DAGs defined in the file + if not file_dag_ids.issubset(readable_dag_ids): + error.stacktrace = "REDACTED - you do not have read permission on all DAGs in the file" + return import_error_schema.dump(error) @@ -65,10 +85,41 @@ def get_import_errors( """Get all import errors.""" to_replace = {"import_error_id": "id"} allowed_filter_attrs = ["import_error_id", "timestamp", "filename"] - total_entries = session.scalars(func.count(ImportErrorModel.id)).one() + count_query = select(func.count(ImportErrorModel.id)) query = select(ImportErrorModel) query = apply_sorting(query, order_by, to_replace, allowed_filter_attrs) + + can_read_all_dags = get_auth_manager().is_authorized_dag(method="GET") + + if not can_read_all_dags: + # if the user doesn't have access to all DAGs, only display errors from visible DAGs + readable_dag_ids = security.get_readable_dags() + dagfiles_subq = ( + select(DagModel.fileloc).distinct().where(DagModel.dag_id.in_(readable_dag_ids)).subquery() + ) + query = query.where(ImportErrorModel.filename.in_(dagfiles_subq)) + count_query = count_query.where(ImportErrorModel.filename.in_(dagfiles_subq)) + + total_entries = session.scalars(count_query).one() import_errors = session.scalars(query.offset(offset).limit(limit)).all() + + if not can_read_all_dags: + for import_error in import_errors: + # Check if user has read access to all the DAGs defined in the file + file_dag_ids = ( + session.query(DagModel.dag_id).filter(DagModel.fileloc == import_error.filename).all() + ) + requests: Sequence[IsAuthorizedDagRequest] = [ + { + "method": "GET", + "details": DagDetails(id=dag_id[0]), + } + for dag_id in file_dag_ids + ] + if not get_auth_manager().batch_is_authorized_dag(requests): + session.expunge(import_error) + import_error.stacktrace = "REDACTED - you do not have read permission on all DAGs in the file" + return import_error_collection_schema.dump( ImportErrorCollection(import_errors=import_errors, total_entries=total_entries) )
airflow/www/views.py+38 −13 modified@@ -147,6 +147,7 @@ if TYPE_CHECKING: from sqlalchemy.orm import Session + from airflow.auth.managers.models.batch_apis import IsAuthorizedDagRequest from airflow.models.dag import DAG from airflow.models.operator import Operator @@ -935,20 +936,44 @@ def index(self): owner_links_dict = DagOwnerAttributes.get_all(session) - import_errors = select(errors.ImportError).order_by(errors.ImportError.id) - - if not get_auth_manager().is_authorized_dag(method="GET"): - # if the user doesn't have access to all DAGs, only display errors from visible DAGs - import_errors = import_errors.join( - DagModel, DagModel.fileloc == errors.ImportError.filename - ).where(DagModel.dag_id.in_(filter_dag_ids)) + if get_auth_manager().is_authorized_view(access_view=AccessView.IMPORT_ERRORS): + import_errors = select(errors.ImportError).order_by(errors.ImportError.id) + + can_read_all_dags = get_auth_manager().is_authorized_dag(method="GET") + if not can_read_all_dags: + # if the user doesn't have access to all DAGs, only display errors from visible DAGs + import_errors = import_errors.where( + errors.ImportError.filename.in_( + select(DagModel.fileloc) + .distinct() + .where(DagModel.dag_id.in_(filter_dag_ids)) + .subquery() + ) + ) - import_errors = session.scalars(import_errors) - for import_error in import_errors: - flash( - f"Broken DAG: [{import_error.filename}] {import_error.stacktrace}", - "dag_import_error", - ) + import_errors = session.scalars(import_errors) + for import_error in import_errors: + stacktrace = import_error.stacktrace + if not can_read_all_dags: + # Check if user has read access to all the DAGs defined in the file + file_dag_ids = ( + session.query(DagModel.dag_id) + .filter(DagModel.fileloc == import_error.filename) + .all() + ) + requests: Sequence[IsAuthorizedDagRequest] = [ + { + "method": "GET", + "details": DagDetails(id=dag_id[0]), + } + for dag_id in file_dag_ids + ] + if not get_auth_manager().batch_is_authorized_dag(requests): + stacktrace = "REDACTED - you do not have read permission on all DAGs in the file" + flash( + f"Broken DAG: [{import_error.filename}]\r{stacktrace}", + "dag_import_error", + ) from airflow.plugins_manager import import_errors as plugin_import_errors
tests/api_connexion/endpoints/test_import_error_endpoint.py+159 −3 modified@@ -21,16 +21,19 @@ import pytest from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP +from airflow.models.dag import DagModel from airflow.models.errors import ImportError from airflow.security import permissions from airflow.utils import timezone from airflow.utils.session import provide_session from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_user from tests.test_utils.config import conf_vars -from tests.test_utils.db import clear_db_import_errors +from tests.test_utils.db import clear_db_dags, clear_db_import_errors pytestmark = pytest.mark.db_test +TEST_DAG_IDS = ["test_dag", "test_dag2"] + @pytest.fixture(scope="module") def configured_app(minimal_app_for_api): @@ -39,14 +42,34 @@ def configured_app(minimal_app_for_api): app, # type:ignore username="test", role_name="Test", - permissions=[(permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR)], # type: ignore + permissions=[ + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR), + ], # type: ignore ) create_user(app, username="test_no_permissions", role_name="TestNoPermissions") # type: ignore + create_user( + app, # type:ignore + username="test_single_dag", + role_name="TestSingleDAG", + permissions=[(permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR)], # type: ignore + ) + # For some reason, DAG level permissions are not synced when in the above list of perms, + # so do it manually here: + app.appbuilder.sm.bulk_sync_roles( + [ + { + "role": "TestSingleDAG", + "perms": [(permissions.ACTION_CAN_READ, permissions.resource_name_for_dag(TEST_DAG_IDS[0]))], + } + ] + ) - yield minimal_app_for_api + yield app delete_user(app, username="test") # type: ignore delete_user(app, username="test_no_permissions") # type: ignore + delete_user(app, username="test_single_dag") # type: ignore class TestBaseImportError: @@ -58,9 +81,11 @@ def setup_attrs(self, configured_app) -> None: self.client = self.app.test_client() # type:ignore clear_db_import_errors() + clear_db_dags() def teardown_method(self) -> None: clear_db_import_errors() + clear_db_dags() @staticmethod def _normalize_import_errors(import_errors): @@ -121,6 +146,72 @@ def test_should_raise_403_forbidden(self): ) assert response.status_code == 403 + def test_should_raise_403_forbidden_without_dag_read(self, session): + import_error = ImportError( + filename="Lorem_ipsum.py", + stacktrace="Lorem ipsum", + timestamp=timezone.parse(self.timestamp, timezone="UTC"), + ) + session.add(import_error) + session.commit() + + response = self.client.get( + f"/api/v1/importErrors/{import_error.id}", environ_overrides={"REMOTE_USER": "test_single_dag"} + ) + + assert response.status_code == 403 + + def test_should_return_200_with_single_dag_read(self, session): + dag_model = DagModel(dag_id=TEST_DAG_IDS[0], fileloc="Lorem_ipsum.py") + session.add(dag_model) + import_error = ImportError( + filename="Lorem_ipsum.py", + stacktrace="Lorem ipsum", + timestamp=timezone.parse(self.timestamp, timezone="UTC"), + ) + session.add(import_error) + session.commit() + + response = self.client.get( + f"/api/v1/importErrors/{import_error.id}", environ_overrides={"REMOTE_USER": "test_single_dag"} + ) + + assert response.status_code == 200 + response_data = response.json + response_data["import_error_id"] = 1 + assert { + "filename": "Lorem_ipsum.py", + "import_error_id": 1, + "stack_trace": "Lorem ipsum", + "timestamp": "2020-06-10T12:00:00+00:00", + } == response_data + + def test_should_return_200_redacted_with_single_dag_read_in_dagfile(self, session): + for dag_id in TEST_DAG_IDS: + dag_model = DagModel(dag_id=dag_id, fileloc="Lorem_ipsum.py") + session.add(dag_model) + import_error = ImportError( + filename="Lorem_ipsum.py", + stacktrace="Lorem ipsum", + timestamp=timezone.parse(self.timestamp, timezone="UTC"), + ) + session.add(import_error) + session.commit() + + response = self.client.get( + f"/api/v1/importErrors/{import_error.id}", environ_overrides={"REMOTE_USER": "test_single_dag"} + ) + + assert response.status_code == 200 + response_data = response.json + response_data["import_error_id"] = 1 + assert { + "filename": "Lorem_ipsum.py", + "import_error_id": 1, + "stack_trace": "REDACTED - you do not have read permission on all DAGs in the file", + "timestamp": "2020-06-10T12:00:00+00:00", + } == response_data + class TestGetImportErrorsEndpoint(TestBaseImportError): def test_get_import_errors(self, session): @@ -231,6 +322,71 @@ def test_should_raises_401_unauthenticated(self, session): assert_401(response) + def test_get_import_errors_single_dag(self, session): + for dag_id in TEST_DAG_IDS: + fake_filename = f"/tmp/{dag_id}.py" + dag_model = DagModel(dag_id=dag_id, fileloc=fake_filename) + session.add(dag_model) + importerror = ImportError( + filename=fake_filename, + stacktrace="Lorem ipsum", + timestamp=timezone.parse(self.timestamp, timezone="UTC"), + ) + session.add(importerror) + session.commit() + + response = self.client.get( + "/api/v1/importErrors", environ_overrides={"REMOTE_USER": "test_single_dag"} + ) + + assert response.status_code == 200 + response_data = response.json + self._normalize_import_errors(response_data["import_errors"]) + assert { + "import_errors": [ + { + "filename": "/tmp/test_dag.py", + "import_error_id": 1, + "stack_trace": "Lorem ipsum", + "timestamp": "2020-06-10T12:00:00+00:00", + }, + ], + "total_entries": 1, + } == response_data + + def test_get_import_errors_single_dag_in_dagfile(self, session): + for dag_id in TEST_DAG_IDS: + fake_filename = "/tmp/all_in_one.py" + dag_model = DagModel(dag_id=dag_id, fileloc=fake_filename) + session.add(dag_model) + + importerror = ImportError( + filename="/tmp/all_in_one.py", + stacktrace="Lorem ipsum", + timestamp=timezone.parse(self.timestamp, timezone="UTC"), + ) + session.add(importerror) + session.commit() + + response = self.client.get( + "/api/v1/importErrors", environ_overrides={"REMOTE_USER": "test_single_dag"} + ) + + assert response.status_code == 200 + response_data = response.json + self._normalize_import_errors(response_data["import_errors"]) + assert { + "import_errors": [ + { + "filename": "/tmp/all_in_one.py", + "import_error_id": 1, + "stack_trace": "REDACTED - you do not have read permission on all DAGs in the file", + "timestamp": "2020-06-10T12:00:00+00:00", + }, + ], + "total_entries": 1, + } == response_data + class TestGetImportErrorsEndpointPagination(TestBaseImportError): @pytest.mark.parametrize(
tests/www/views/test_views_home.py+61 −0 modified@@ -111,6 +111,30 @@ def test_home_status_filter_cookie(admin_client): assert "all" == flask.session[FILTER_STATUS_COOKIE] +@pytest.fixture(scope="module") +def user_no_importerror(app): + """Create User that cannot access Import Errors""" + return create_user( + app, + username="user_no_importerrors", + role_name="role_no_importerrors", + permissions=[ + (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + ], + ) + + +@pytest.fixture() +def client_no_importerror(app, user_no_importerror): + """Client for User that cannot access Import Errors""" + return client_with_login( + app, + username="user_no_importerrors", + password="user_no_importerrors", + ) + + @pytest.fixture(scope="module") def user_single_dag(app): """Create User that can only access the first DAG from TEST_FILTER_DAG_IDS""" @@ -120,6 +144,7 @@ def user_single_dag(app): role_name="role_single_dag", permissions=[ (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR), (permissions.ACTION_CAN_READ, permissions.resource_name_for_dag(TEST_FILTER_DAG_IDS[0])), ], ) @@ -232,6 +257,24 @@ def broken_dags_with_read_perm(tmp_path, working_dags_with_read_perm): _process_file(path, session) +@pytest.fixture() +def broken_dags_after_working(tmp_path): + # First create and process a DAG file that works + path = tmp_path / "all_in_one.py" + with create_session() as session: + contents = "from airflow import DAG\n" + for i, dag_id in enumerate(TEST_FILTER_DAG_IDS): + contents += f"dag{i} = DAG('{dag_id}')\n" + path.write_text(contents) + _process_file(path, session) + + # Then break it! + with create_session() as session: + contents += "foobar()" + path.write_text(contents) + _process_file(path, session) + + def test_home_filter_tags(working_dags, admin_client): with admin_client: admin_client.get("home?tags=example&tags=data", follow_redirects=True) @@ -249,6 +292,12 @@ def test_home_importerrors(broken_dags, user_client): check_content_in_response(f"/{dag_id}.py", resp) +def test_home_no_importerrors_perm(broken_dags, client_no_importerror): + # Users without "can read on import errors" don't see any import errors + resp = client_no_importerror.get("home", follow_redirects=True) + check_content_not_in_response("Import Errors", resp) + + @pytest.mark.parametrize( "page", [ @@ -266,11 +315,23 @@ def test_home_importerrors_filtered_singledag_user(broken_dags_with_read_perm, c check_content_in_response("Import Errors", resp) # They can see the first DAGs import error check_content_in_response(f"/{TEST_FILTER_DAG_IDS[0]}.py", resp) + check_content_in_response("Traceback", resp) # But not the rest for dag_id in TEST_FILTER_DAG_IDS[1:]: check_content_not_in_response(f"/{dag_id}.py", resp) +def test_home_importerrors_missing_read_on_all_dags_in_file(broken_dags_after_working, client_single_dag): + # If a user doesn't have READ on all DAGs in a file, that files traceback is redacted + resp = client_single_dag.get("home", follow_redirects=True) + check_content_in_response("Import Errors", resp) + # They can see the DAG file has an import error + check_content_in_response("all_in_one.py", resp) + # And the traceback is redacted + check_content_not_in_response("Traceback", resp) + check_content_in_response("REDACTED", resp) + + def test_home_dag_list(working_dags, user_client): # Users with "can read on DAGs" gets all DAGs resp = user_client.get("home", follow_redirects=True)
2 files changed · +12 −8
airflow/api_connexion/endpoints/import_error_endpoint.py+4 −5 modified@@ -94,12 +94,11 @@ def get_import_errors( if not can_read_all_dags: # if the user doesn't have access to all DAGs, only display errors from visible DAGs readable_dag_ids = security.get_readable_dags() - query = query.join(DagModel, DagModel.fileloc == ImportErrorModel.filename).where( - DagModel.dag_id.in_(readable_dag_ids) - ) - count_query = count_query.join(DagModel, DagModel.fileloc == ImportErrorModel.filename).where( - DagModel.dag_id.in_(readable_dag_ids) + dagfiles_subq = ( + select(DagModel.fileloc).distinct().where(DagModel.dag_id.in_(readable_dag_ids)).subquery() ) + query = query.where(ImportErrorModel.filename.in_(dagfiles_subq)) + count_query = count_query.where(ImportErrorModel.filename.in_(dagfiles_subq)) total_entries = session.scalars(count_query).one() import_errors = session.scalars(query.offset(offset).limit(limit)).all()
airflow/www/views.py+8 −3 modified@@ -942,9 +942,14 @@ def index(self): can_read_all_dags = get_auth_manager().is_authorized_dag(method="GET") if not can_read_all_dags: # if the user doesn't have access to all DAGs, only display errors from visible DAGs - import_errors = import_errors.join( - DagModel, DagModel.fileloc == errors.ImportError.filename - ).where(DagModel.dag_id.in_(filter_dag_ids)) + import_errors = import_errors.where( + errors.ImportError.filename.in_( + select(DagModel.fileloc) + .distinct() + .where(DagModel.dag_id.in_(filter_dag_ids)) + .subquery() + ) + ) import_errors = session.scalars(import_errors) for import_error in import_errors:
0a95299691e2Check permissions for ImportError
4 files changed · +311 −22
airflow/api_connexion/endpoints/import_error_endpoint.py+57 −5 modified@@ -16,39 +16,59 @@ # under the License. from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Sequence from sqlalchemy import func, select from airflow.api_connexion import security -from airflow.api_connexion.exceptions import NotFound +from airflow.api_connexion.exceptions import NotFound, PermissionDenied from airflow.api_connexion.parameters import apply_sorting, check_limit, format_parameters from airflow.api_connexion.schemas.error_schema import ( ImportErrorCollection, import_error_collection_schema, import_error_schema, ) -from airflow.auth.managers.models.resource_details import AccessView +from airflow.auth.managers.models.resource_details import AccessView, DagDetails +from airflow.models.dag import DagModel from airflow.models.errors import ImportError as ImportErrorModel from airflow.utils.session import NEW_SESSION, provide_session +from airflow.www.extensions.init_auth_manager import get_auth_manager if TYPE_CHECKING: from sqlalchemy.orm import Session from airflow.api_connexion.types import APIResponse + from airflow.auth.managers.models.batch_apis import IsAuthorizedDagRequest @security.requires_access_view(AccessView.IMPORT_ERRORS) @provide_session def get_import_error(*, import_error_id: int, session: Session = NEW_SESSION) -> APIResponse: """Get an import error.""" error = session.get(ImportErrorModel, import_error_id) - if error is None: raise NotFound( "Import error not found", detail=f"The ImportError with import_error_id: `{import_error_id}` was not found", ) + session.expunge(error) + + can_read_all_dags = get_auth_manager().is_authorized_dag(method="GET") + if not can_read_all_dags: + readable_dag_ids = security.get_readable_dags() + file_dag_ids = { + dag_id[0] + for dag_id in session.query(DagModel.dag_id).filter(DagModel.fileloc == error.filename).all() + } + + # Can the user read any DAGs in the file? + if not readable_dag_ids.intersection(file_dag_ids): + raise PermissionDenied(detail="You do not have read permission on any of the DAGs in the file") + + # Check if user has read access to all the DAGs defined in the file + if not file_dag_ids.issubset(readable_dag_ids): + error.stacktrace = "REDACTED - you do not have read permission on all DAGs in the file" + return import_error_schema.dump(error) @@ -65,10 +85,42 @@ def get_import_errors( """Get all import errors.""" to_replace = {"import_error_id": "id"} allowed_filter_attrs = ["import_error_id", "timestamp", "filename"] - total_entries = session.scalars(func.count(ImportErrorModel.id)).one() + count_query = select(func.count(ImportErrorModel.id)) query = select(ImportErrorModel) query = apply_sorting(query, order_by, to_replace, allowed_filter_attrs) + + can_read_all_dags = get_auth_manager().is_authorized_dag(method="GET") + + if not can_read_all_dags: + # if the user doesn't have access to all DAGs, only display errors from visible DAGs + readable_dag_ids = security.get_readable_dags() + query = query.join(DagModel, DagModel.fileloc == ImportErrorModel.filename).where( + DagModel.dag_id.in_(readable_dag_ids) + ) + count_query = count_query.join(DagModel, DagModel.fileloc == ImportErrorModel.filename).where( + DagModel.dag_id.in_(readable_dag_ids) + ) + + total_entries = session.scalars(count_query).one() import_errors = session.scalars(query.offset(offset).limit(limit)).all() + + if not can_read_all_dags: + for import_error in import_errors: + # Check if user has read access to all the DAGs defined in the file + file_dag_ids = ( + session.query(DagModel.dag_id).filter(DagModel.fileloc == import_error.filename).all() + ) + requests: Sequence[IsAuthorizedDagRequest] = [ + { + "method": "GET", + "details": DagDetails(id=dag_id[0]), + } + for dag_id in file_dag_ids + ] + if not get_auth_manager().batch_is_authorized_dag(requests): + session.expunge(import_error) + import_error.stacktrace = "REDACTED - you do not have read permission on all DAGs in the file" + return import_error_collection_schema.dump( ImportErrorCollection(import_errors=import_errors, total_entries=total_entries) )
airflow/www/views.py+34 −14 modified@@ -147,6 +147,7 @@ if TYPE_CHECKING: from sqlalchemy.orm import Session + from airflow.auth.managers.models.batch_apis import IsAuthorizedDagRequest from airflow.models.dag import DAG from airflow.models.operator import Operator @@ -935,20 +936,39 @@ def index(self): owner_links_dict = DagOwnerAttributes.get_all(session) - import_errors = select(errors.ImportError).order_by(errors.ImportError.id) - - if not get_auth_manager().is_authorized_dag(method="GET"): - # if the user doesn't have access to all DAGs, only display errors from visible DAGs - import_errors = import_errors.join( - DagModel, DagModel.fileloc == errors.ImportError.filename - ).where(DagModel.dag_id.in_(filter_dag_ids)) - - import_errors = session.scalars(import_errors) - for import_error in import_errors: - flash( - f"Broken DAG: [{import_error.filename}] {import_error.stacktrace}", - "dag_import_error", - ) + if get_auth_manager().is_authorized_view(access_view=AccessView.IMPORT_ERRORS): + import_errors = select(errors.ImportError).order_by(errors.ImportError.id) + + can_read_all_dags = get_auth_manager().is_authorized_dag(method="GET") + if not can_read_all_dags: + # if the user doesn't have access to all DAGs, only display errors from visible DAGs + import_errors = import_errors.join( + DagModel, DagModel.fileloc == errors.ImportError.filename + ).where(DagModel.dag_id.in_(filter_dag_ids)) + + import_errors = session.scalars(import_errors) + for import_error in import_errors: + stacktrace = import_error.stacktrace + if not can_read_all_dags: + # Check if user has read access to all the DAGs defined in the file + file_dag_ids = ( + session.query(DagModel.dag_id) + .filter(DagModel.fileloc == import_error.filename) + .all() + ) + requests: Sequence[IsAuthorizedDagRequest] = [ + { + "method": "GET", + "details": DagDetails(id=dag_id[0]), + } + for dag_id in file_dag_ids + ] + if not get_auth_manager().batch_is_authorized_dag(requests): + stacktrace = "REDACTED - you do not have read permission on all DAGs in the file" + flash( + f"Broken DAG: [{import_error.filename}]\r{stacktrace}", + "dag_import_error", + ) from airflow.plugins_manager import import_errors as plugin_import_errors
tests/api_connexion/endpoints/test_import_error_endpoint.py+159 −3 modified@@ -21,16 +21,19 @@ import pytest from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP +from airflow.models.dag import DagModel from airflow.models.errors import ImportError from airflow.security import permissions from airflow.utils import timezone from airflow.utils.session import provide_session from tests.test_utils.api_connexion_utils import assert_401, create_user, delete_user from tests.test_utils.config import conf_vars -from tests.test_utils.db import clear_db_import_errors +from tests.test_utils.db import clear_db_dags, clear_db_import_errors pytestmark = pytest.mark.db_test +TEST_DAG_IDS = ["test_dag", "test_dag2"] + @pytest.fixture(scope="module") def configured_app(minimal_app_for_api): @@ -39,14 +42,34 @@ def configured_app(minimal_app_for_api): app, # type:ignore username="test", role_name="Test", - permissions=[(permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR)], # type: ignore + permissions=[ + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR), + ], # type: ignore ) create_user(app, username="test_no_permissions", role_name="TestNoPermissions") # type: ignore + create_user( + app, # type:ignore + username="test_single_dag", + role_name="TestSingleDAG", + permissions=[(permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR)], # type: ignore + ) + # For some reason, DAG level permissions are not synced when in the above list of perms, + # so do it manually here: + app.appbuilder.sm.bulk_sync_roles( + [ + { + "role": "TestSingleDAG", + "perms": [(permissions.ACTION_CAN_READ, permissions.resource_name_for_dag(TEST_DAG_IDS[0]))], + } + ] + ) - yield minimal_app_for_api + yield app delete_user(app, username="test") # type: ignore delete_user(app, username="test_no_permissions") # type: ignore + delete_user(app, username="test_single_dag") # type: ignore class TestBaseImportError: @@ -58,9 +81,11 @@ def setup_attrs(self, configured_app) -> None: self.client = self.app.test_client() # type:ignore clear_db_import_errors() + clear_db_dags() def teardown_method(self) -> None: clear_db_import_errors() + clear_db_dags() @staticmethod def _normalize_import_errors(import_errors): @@ -121,6 +146,72 @@ def test_should_raise_403_forbidden(self): ) assert response.status_code == 403 + def test_should_raise_403_forbidden_without_dag_read(self, session): + import_error = ImportError( + filename="Lorem_ipsum.py", + stacktrace="Lorem ipsum", + timestamp=timezone.parse(self.timestamp, timezone="UTC"), + ) + session.add(import_error) + session.commit() + + response = self.client.get( + f"/api/v1/importErrors/{import_error.id}", environ_overrides={"REMOTE_USER": "test_single_dag"} + ) + + assert response.status_code == 403 + + def test_should_return_200_with_single_dag_read(self, session): + dag_model = DagModel(dag_id=TEST_DAG_IDS[0], fileloc="Lorem_ipsum.py") + session.add(dag_model) + import_error = ImportError( + filename="Lorem_ipsum.py", + stacktrace="Lorem ipsum", + timestamp=timezone.parse(self.timestamp, timezone="UTC"), + ) + session.add(import_error) + session.commit() + + response = self.client.get( + f"/api/v1/importErrors/{import_error.id}", environ_overrides={"REMOTE_USER": "test_single_dag"} + ) + + assert response.status_code == 200 + response_data = response.json + response_data["import_error_id"] = 1 + assert { + "filename": "Lorem_ipsum.py", + "import_error_id": 1, + "stack_trace": "Lorem ipsum", + "timestamp": "2020-06-10T12:00:00+00:00", + } == response_data + + def test_should_return_200_redacted_with_single_dag_read_in_dagfile(self, session): + for dag_id in TEST_DAG_IDS: + dag_model = DagModel(dag_id=dag_id, fileloc="Lorem_ipsum.py") + session.add(dag_model) + import_error = ImportError( + filename="Lorem_ipsum.py", + stacktrace="Lorem ipsum", + timestamp=timezone.parse(self.timestamp, timezone="UTC"), + ) + session.add(import_error) + session.commit() + + response = self.client.get( + f"/api/v1/importErrors/{import_error.id}", environ_overrides={"REMOTE_USER": "test_single_dag"} + ) + + assert response.status_code == 200 + response_data = response.json + response_data["import_error_id"] = 1 + assert { + "filename": "Lorem_ipsum.py", + "import_error_id": 1, + "stack_trace": "REDACTED - you do not have read permission on all DAGs in the file", + "timestamp": "2020-06-10T12:00:00+00:00", + } == response_data + class TestGetImportErrorsEndpoint(TestBaseImportError): def test_get_import_errors(self, session): @@ -231,6 +322,71 @@ def test_should_raises_401_unauthenticated(self, session): assert_401(response) + def test_get_import_errors_single_dag(self, session): + for dag_id in TEST_DAG_IDS: + fake_filename = f"/tmp/{dag_id}.py" + dag_model = DagModel(dag_id=dag_id, fileloc=fake_filename) + session.add(dag_model) + importerror = ImportError( + filename=fake_filename, + stacktrace="Lorem ipsum", + timestamp=timezone.parse(self.timestamp, timezone="UTC"), + ) + session.add(importerror) + session.commit() + + response = self.client.get( + "/api/v1/importErrors", environ_overrides={"REMOTE_USER": "test_single_dag"} + ) + + assert response.status_code == 200 + response_data = response.json + self._normalize_import_errors(response_data["import_errors"]) + assert { + "import_errors": [ + { + "filename": "/tmp/test_dag.py", + "import_error_id": 1, + "stack_trace": "Lorem ipsum", + "timestamp": "2020-06-10T12:00:00+00:00", + }, + ], + "total_entries": 1, + } == response_data + + def test_get_import_errors_single_dag_in_dagfile(self, session): + for dag_id in TEST_DAG_IDS: + fake_filename = "/tmp/all_in_one.py" + dag_model = DagModel(dag_id=dag_id, fileloc=fake_filename) + session.add(dag_model) + + importerror = ImportError( + filename="/tmp/all_in_one.py", + stacktrace="Lorem ipsum", + timestamp=timezone.parse(self.timestamp, timezone="UTC"), + ) + session.add(importerror) + session.commit() + + response = self.client.get( + "/api/v1/importErrors", environ_overrides={"REMOTE_USER": "test_single_dag"} + ) + + assert response.status_code == 200 + response_data = response.json + self._normalize_import_errors(response_data["import_errors"]) + assert { + "import_errors": [ + { + "filename": "/tmp/all_in_one.py", + "import_error_id": 1, + "stack_trace": "REDACTED - you do not have read permission on all DAGs in the file", + "timestamp": "2020-06-10T12:00:00+00:00", + }, + ], + "total_entries": 1, + } == response_data + class TestGetImportErrorsEndpointPagination(TestBaseImportError): @pytest.mark.parametrize(
tests/www/views/test_views_home.py+61 −0 modified@@ -111,6 +111,30 @@ def test_home_status_filter_cookie(admin_client): assert "all" == flask.session[FILTER_STATUS_COOKIE] +@pytest.fixture(scope="module") +def user_no_importerror(app): + """Create User that cannot access Import Errors""" + return create_user( + app, + username="user_no_importerrors", + role_name="role_no_importerrors", + permissions=[ + (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG), + ], + ) + + +@pytest.fixture() +def client_no_importerror(app, user_no_importerror): + """Client for User that cannot access Import Errors""" + return client_with_login( + app, + username="user_no_importerrors", + password="user_no_importerrors", + ) + + @pytest.fixture(scope="module") def user_single_dag(app): """Create User that can only access the first DAG from TEST_FILTER_DAG_IDS""" @@ -120,6 +144,7 @@ def user_single_dag(app): role_name="role_single_dag", permissions=[ (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE), + (permissions.ACTION_CAN_READ, permissions.RESOURCE_IMPORT_ERROR), (permissions.ACTION_CAN_READ, permissions.resource_name_for_dag(TEST_FILTER_DAG_IDS[0])), ], ) @@ -232,6 +257,24 @@ def broken_dags_with_read_perm(tmp_path, working_dags_with_read_perm): _process_file(path, session) +@pytest.fixture() +def broken_dags_after_working(tmp_path): + # First create and process a DAG file that works + path = tmp_path / "all_in_one.py" + with create_session() as session: + contents = "from airflow import DAG\n" + for i, dag_id in enumerate(TEST_FILTER_DAG_IDS): + contents += f"dag{i} = DAG('{dag_id}')\n" + path.write_text(contents) + _process_file(path, session) + + # Then break it! + with create_session() as session: + contents += "foobar()" + path.write_text(contents) + _process_file(path, session) + + def test_home_filter_tags(working_dags, admin_client): with admin_client: admin_client.get("home?tags=example&tags=data", follow_redirects=True) @@ -249,6 +292,12 @@ def test_home_importerrors(broken_dags, user_client): check_content_in_response(f"/{dag_id}.py", resp) +def test_home_no_importerrors_perm(broken_dags, client_no_importerror): + # Users without "can read on import errors" don't see any import errors + resp = client_no_importerror.get("home", follow_redirects=True) + check_content_not_in_response("Import Errors", resp) + + @pytest.mark.parametrize( "page", [ @@ -266,11 +315,23 @@ def test_home_importerrors_filtered_singledag_user(broken_dags_with_read_perm, c check_content_in_response("Import Errors", resp) # They can see the first DAGs import error check_content_in_response(f"/{TEST_FILTER_DAG_IDS[0]}.py", resp) + check_content_in_response("Traceback", resp) # But not the rest for dag_id in TEST_FILTER_DAG_IDS[1:]: check_content_not_in_response(f"/{dag_id}.py", resp) +def test_home_importerrors_missing_read_on_all_dags_in_file(broken_dags_after_working, client_single_dag): + # If a user doesn't have READ on all DAGs in a file, that files traceback is redacted + resp = client_single_dag.get("home", follow_redirects=True) + check_content_in_response("Import Errors", resp) + # They can see the DAG file has an import error + check_content_in_response("all_in_one.py", resp) + # And the traceback is redacted + check_content_not_in_response("Traceback", resp) + check_content_in_response("REDACTED", resp) + + def test_home_dag_list(working_dags, user_client): # Users with "can read on DAGs" gets all DAGs resp = user_client.get("home", follow_redirects=True)
2adbe882e68dFix permission check on DAGs when `access_entity` is specified (#37290)
1 file changed · +3 −2
airflow/api_connexion/security.py+3 −2 modified@@ -145,10 +145,11 @@ def callback(): # ``access`` means here: # - if a DAG id is provided (``dag_id`` not None): is the user authorized to access this DAG # - if no DAG id is provided: is the user authorized to access all DAGs - if dag_id or access: + if dag_id or access or access_entity: return access - # No DAG id is provided and the user is not authorized to access all DAGs + # No DAG id is provided, the user is not authorized to access all DAGs and authorization is done + # on DAG level # If method is "GET", return whether the user has read access to any DAGs # If method is "PUT", return whether the user has edit access to any DAGs return (method == "GET" and any(get_auth_manager().get_permitted_dag_ids(methods=["GET"]))) or (
bc2646be043fFix permission check on DAGs when `access_entity` is specified (#37290)
1 file changed · +3 −2
airflow/api_connexion/security.py+3 −2 modified@@ -145,10 +145,11 @@ def callback(): # ``access`` means here: # - if a DAG id is provided (``dag_id`` not None): is the user authorized to access this DAG # - if no DAG id is provided: is the user authorized to access all DAGs - if dag_id or access: + if dag_id or access or access_entity: return access - # No DAG id is provided and the user is not authorized to access all DAGs + # No DAG id is provided, the user is not authorized to access all DAGs and authorization is done + # on DAG level # If method is "GET", return whether the user has read access to any DAGs # If method is "PUT", return whether the user has edit access to any DAGs return (method == "GET" and any(get_auth_manager().get_permitted_dag_ids(methods=["GET"]))) or (
a7fa258ba1c6Fix permission check on DAGs when `access_entity` is specified (#37290)
1 file changed · +3 −2
airflow/api_connexion/security.py+3 −2 modified@@ -145,10 +145,11 @@ def callback(): # ``access`` means here: # - if a DAG id is provided (``dag_id`` not None): is the user authorized to access this DAG # - if no DAG id is provided: is the user authorized to access all DAGs - if dag_id or access: + if dag_id or access or access_entity: return access - # No DAG id is provided and the user is not authorized to access all DAGs + # No DAG id is provided, the user is not authorized to access all DAGs and authorization is done + # on DAG level # If method is "GET", return whether the user has read access to any DAGs # If method is "PUT", return whether the user has edit access to any DAGs return (method == "GET" and any(get_auth_manager().get_permitted_dag_ids(methods=["GET"]))) or (
9c4defa08268Fix permission check on DAGs when `access_entity` is specified
1 file changed · +3 −2
airflow/api_connexion/security.py+3 −2 modified@@ -145,10 +145,11 @@ def callback(): # ``access`` means here: # - if a DAG id is provided (``dag_id`` not None): is the user authorized to access this DAG # - if no DAG id is provided: is the user authorized to access all DAGs - if dag_id or access: + if dag_id or access or access_entity: return access - # No DAG id is provided and the user is not authorized to access all DAGs + # No DAG id is provided, the user is not authorized to access all DAGs and authorization is done + # on DAG level # If method is "GET", return whether the user has read access to any DAGs # If method is "PUT", return whether the user has edit access to any DAGs return (method == "GET" and any(get_auth_manager().get_permitted_dag_ids(methods=["GET"]))) or (
2cb6027280bcVulnerability mechanics
Generated on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
16- github.com/apache/airflow/pull/37290ghsapatchWEB
- github.com/apache/airflow/pull/37468ghsapatchWEB
- github.com/advisories/GHSA-6v6w-h8m6-7mv2ghsaADVISORY
- lists.apache.org/thread/on4f7t5sqr3vfgp1pvkck79wv7mq9st5ghsavendor-advisoryWEB
- nvd.nist.gov/vuln/detail/CVE-2024-27906ghsaADVISORY
- www.openwall.com/lists/oss-security/2024/02/29/1ghsaWEB
- github.com/apache/airflow/commit/08d25607abe8593ecb90a84e338896bb79692d7bghsaWEB
- github.com/apache/airflow/commit/0a95299691e2d6a9b874adfae94d246a7f681ec9ghsaWEB
- github.com/apache/airflow/commit/2adbe882e68df0e2b1084bc869616bb01e416aa7ghsaWEB
- github.com/apache/airflow/commit/2cb6027280bcf5e2b561f3ee7f55980f6ec4cc3aghsaWEB
- github.com/apache/airflow/commit/90255d9d44a649025f588497f6c82177dad48326ghsaWEB
- github.com/apache/airflow/commit/9c4defa08268322b9db80123a22d7b56b2063446ghsaWEB
- github.com/apache/airflow/commit/a7fa258ba1c69a18e0f620499625f6026768dc24ghsaWEB
- github.com/apache/airflow/commit/bc2646be043f71b4d1ab7eefd2af65a60bf919f2ghsaWEB
- github.com/apache/airflow/commit/d944eb0de216d9e1d125fae5ce4af7440154deb4ghsaWEB
- github.com/pypa/advisory-database/tree/main/vulns/apache-airflow/PYSEC-2024-245.yamlghsaWEB
News mentions
0No linked articles in our index yet.