VYPR
Critical severityNVD Advisory· Published Jul 16, 2020· Updated Aug 4, 2024

CVE-2020-11981

CVE-2020-11981

Description

An issue was found in Apache Airflow versions 1.10.10 and below. When using CeleryExecutor, if an attacker can connect to the broker (Redis, RabbitMQ) directly, it is possible to inject commands, resulting in the celery worker running arbitrary commands.

AI Insight

LLM-synthesized narrative grounded in this CVE's description and references.

Apache Airflow 1.10.10 and below, when using CeleryExecutor, allows command injection via direct broker access, enabling arbitrary command execution.

Vulnerability

Description

CVE-2020-11981 affects Apache Airflow versions 1.10.10 and earlier when using the CeleryExecutor. The root cause is that the Celery worker does not validate the commands it receives from the broker (Redis or RabbitMQ). If an attacker can directly connect to the broker, they can inject arbitrary commands that the worker executes without proper checks [1][2].

Exploitation

The attack requires network access to the message broker used by Celery (Redis or RabbitMQ). No authentication on the broker is necessary if exposed, or the attacker may have valid credentials. Once connected, the attacker can publish malicious task messages that the Celery worker picks up. The worker then executes these commands as if they were legitimate Airflow tasks, bypassing any command validation that might exist on the scheduler side [3].

Impact

Successful exploitation allows an attacker to execute arbitrary commands on the Celery worker host. This can lead to full compromise of the Airflow worker, access to sensitive data processed by workflows, and potential lateral movement within the environment. The vulnerability is rated with a CVSS score indicating high severity [2][4].

Mitigation

A patch was released that adds command validation on the Celery worker side, ensuring only expected task execution commands are processed [4]. Users should upgrade to Apache Airflow 1.10.11 or later. Additionally, securing the broker with strong authentication and network segmentation can reduce the attack surface. The vulnerability is not known to be exploited in the wild at the time of publication.

AI Insight generated on May 21, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
apache-airflowPyPI
< 1.10.11rc11.10.11rc1

Affected products

3

Patches

2
1dda6fdde7c6

Validate only task commands are run by executors

https://github.com/apache/airflowAsh Berlin-TaylorJun 8, 2020via ghsa
6 files changed · +44 3
  • airflow/executors/celery_executor.py+3 0 modified
    @@ -71,6 +71,9 @@
     @app.task
     def execute_command(command_to_exec: CommandType) -> None:
         """Executes command."""
    +    if command_to_exec[0:3] != ["airflow", "tasks", "run"]:
    +        raise ValueError('The command must start with ["airflow", "tasks", "run"].')
    +
         log.info("Executing command in Celery: %s", command_to_exec)
         env = os.environ.copy()
         try:
    
  • airflow/executors/dask_executor.py+3 0 modified
    @@ -72,6 +72,9 @@ def execute_async(self,
                           queue: Optional[str] = None,
                           executor_config: Optional[Any] = None) -> None:
     
    +        if command[0:3] != ["airflow", "tasks", "run"]:
    +            raise ValueError('The command must start with ["airflow", "tasks", "run"].')
    +
             def airflow_run():
                 return subprocess.check_call(command, close_fds=True)
     
    
  • airflow/executors/kubernetes_executor.py+3 0 modified
    @@ -459,6 +459,9 @@ def run_next(self, next_job: KubernetesJobType) -> None:
             if isinstance(command, str):
                 command = [command]
     
    +        if command[0] != "airflow":
    +            raise ValueError('The first element of command must be equal to "airflow".')
    +
             pod = PodGenerator.construct_pod(
                 namespace=self.namespace,
                 worker_uuid=self.worker_uuid,
    
  • airflow/executors/local_executor.py+4 0 modified
    @@ -284,6 +284,10 @@ def execute_async(self, key: TaskInstanceKeyType,
             """Execute asynchronously."""
             if not self.impl:
                 raise AirflowException(NOT_STARTED_MESSAGE)
    +
    +        if command[0:3] != ["airflow", "tasks", "run"]:
    +            raise ValueError('The command must start with ["airflow", "tasks", "run"].')
    +
             self.impl.execute_async(key=key, command=command, queue=queue, executor_config=executor_config)
     
         def sync(self) -> None:
    
  • airflow/executors/sequential_executor.py+4 0 modified
    @@ -49,6 +49,10 @@ def execute_async(self,
                           command: CommandType,
                           queue: Optional[str] = None,
                           executor_config: Optional[Any] = None) -> None:
    +
    +        if command[0:3] != ["airflow", "tasks", "run"]:
    +            raise ValueError('The command must start with ["airflow", "tasks", "run"].')
    +
             self.commands_to_run.append((key, command))
     
         def sync(self) -> None:
    
  • tests/executors/test_celery_executor.py+27 3 modified
    @@ -36,6 +36,7 @@
     from parameterized import parameterized
     
     from airflow.configuration import conf
    +from airflow.exceptions import AirflowException
     from airflow.executors import celery_executor
     from airflow.executors.celery_executor import BulkStateFetcher
     from airflow.models import TaskInstance
    @@ -101,13 +102,18 @@ class TestCeleryExecutor(unittest.TestCase):
         @pytest.mark.integration("rabbitmq")
         @pytest.mark.backend("mysql", "postgres")
         def test_celery_integration(self, broker_url):
    -        with _prepare_app(broker_url) as app:
    +        success_command = ['airflow', 'tasks', 'run', 'true', 'some_parameter']
    +        fail_command = ['airflow', 'version']
    +
    +        def fake_execute_command(command):
    +            if command != success_command:
    +                raise AirflowException("fail")
    +
    +        with _prepare_app(broker_url, execute=fake_execute_command) as app:
                 executor = celery_executor.CeleryExecutor()
                 executor.start()
     
                 with start_worker(app=app, logfile=sys.stdout, loglevel='info'):
    -                success_command = ['true', 'some_parameter']
    -                fail_command = ['false', 'some_parameter']
                     execute_date = datetime.datetime.now()
     
                     cached_celery_backend = celery_executor.execute_command.backend
    @@ -202,6 +208,24 @@ def test_gauge_executor_metrics(self, mock_stats_gauge, mock_trigger_tasks, mock
                      mock.call('executor.running_tasks', mock.ANY)]
             mock_stats_gauge.assert_has_calls(calls)
     
    +    @parameterized.expand((
    +        [['true'], ValueError],
    +        [['airflow', 'version'], ValueError],
    +        [['airflow', 'tasks', 'run'], None]
    +    ))
    +    @mock.patch('subprocess.check_call')
    +    def test_command_validation(self, command, expected_exception, mock_check_call):
    +        # Check that we validate _on the receiving_ side, not just sending side
    +        if expected_exception:
    +            with pytest.raises(expected_exception):
    +                celery_executor.execute_command(command)
    +            mock_check_call.assert_not_called()
    +        else:
    +            celery_executor.execute_command(command)
    +            mock_check_call.assert_called_once_with(
    +                command, stderr=mock.ANY, close_fds=mock.ANY, env=mock.ANY,
    +            )
    +
     
     def test_operation_timeout_config():
         assert celery_executor.OPERATION_TIMEOUT == 2
    
afa4b11fddfd

[AIRFLOW-6351] security - ui - Add Cross Site Scripting defence (#6913)

https://github.com/apache/airflowtooptoop4Dec 27, 2019via ghsa
1 file changed · +1 1
  • airflow/www_rbac/views.py+1 1 modified
    @@ -321,7 +321,7 @@ def get_int_arg(value, default=0):
                 num_dag_to=min(end, num_of_all_dags),
                 num_of_all_dags=num_of_all_dags,
                 paging=wwwutils.generate_pages(current_page, num_of_pages,
    -                                           search=arg_search_query,
    +                                           search=escape(arg_search_query) if arg_search_query else None,
                                                showPaused=not hide_paused),
                 num_runs=num_runs,
                 tags=tags)
    

Vulnerability mechanics

Generated on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

7

News mentions

0

No linked articles in our index yet.