VYPR
High severity8.1NVD Advisory· Published Jun 11, 2026· Updated Jun 11, 2026

CVE-2026-11816

CVE-2026-11816

Description

Keras <3.14.0 archive extraction validates paths against CWD instead of destination, enabling arbitrary file writes via path traversal.

AI Insight

LLM-synthesized narrative grounded in this CVE's description and references.

Keras <3.14.0 archive extraction validates paths against CWD instead of destination, enabling arbitrary file writes via path traversal.

Vulnerability

Keras versions prior to 3.14.0 contain a path traversal vulnerability in keras/src/utils/file_utils.py in functions filter_safe_tarinfos() and filter_safe_zipinfos(). These functions validate archive member paths against the process current working directory (CWD) instead of the actual extraction destination. When the CWD is set to / (common in Docker, CI/CD, Jupyter), the validation boundary becomes the filesystem root, allowing traversal paths to bypass the check. Additionally, the zip filter has a bug causing an AttributeError when a blocked entry is encountered, leading to incomplete extraction. Python 3.11 installations lack the filter="data" safety net, relying solely on the flawed CWD-based filter. [1][2]

Exploitation

An attacker needs to supply a crafted archive (tar or zip) to a Keras application that extracts it. No authentication is required if the application accepts user-uploaded archives. The attacker crafts archive member paths with traversal sequences (e.g., ../../etc/passwd). Because validation uses CWD instead of the extraction directory, paths that traverse outside the intended directory but remain within the filesystem root (when CWD is /) are considered safe. The zip filter's AttributeError bug may further allow some malicious entries to be extracted. [2]

Impact

Successful exploitation allows arbitrary file writes outside the intended extraction directory. An attacker can overwrite configuration files, inject malicious code (e.g., into Python modules or startup scripts), or corrupt machine learning datasets and pipelines. This can lead to code execution, privilege escalation, or data integrity compromise. [2]

Mitigation

The vulnerability is fixed in Keras version 3.14.0. Users should upgrade to 3.14.0 or later. For environments where upgrade is not immediately possible, ensure the process CWD is not set to / and avoid extracting untrusted archives. Python 3.11 users should apply the fix or use a later Python version that supports filter="data". No workaround is provided by the vendor beyond upgrading. [1][2]

AI Insight generated on Jun 11, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.

Affected products

2
  • Keras Team/Kerasreferences2 versions
    (expand)+ 1 more
    • (no CPE)
    • (no CPE)range: <3.14.0

Patches

1
2465b6657b02

Don't let unit test modify the source folder. (#22194)

https://github.com/keras-team/kerashertschuhFeb 17, 2026via nvd-ref
4 files changed · +49 106
  • keras/src/legacy/saving/legacy_h5_format_test.py+2 0 modified
    @@ -388,6 +388,7 @@ def test_saving_include_optimizer_false(self):
             # Compare output
             self.assertAllClose(ref_output, output, atol=1e-5)
     
    +    @pytest.mark.skipif(tf_keras is None, reason="Test requires tf_keras")
         def test_custom_sequential_registered_no_scope(self):
             @tf_keras.saving.register_keras_serializable(package="my_package")
             class MyDense(tf_keras.layers.Dense):
    @@ -411,6 +412,7 @@ def __init__(self, units, **kwargs):
             ref_input = np.array([5])
             self._check_reloading_model(ref_input, model, tf_keras_model)
     
    +    @pytest.mark.skipif(tf_keras is None, reason="Test requires tf_keras")
         def test_custom_functional_registered_no_scope(self):
             @tf_keras.saving.register_keras_serializable(package="my_package")
             class MyDense(tf_keras.layers.Dense):
    
  • keras/src/utils/code_stats_test.py+1 9 modified
    @@ -8,15 +8,7 @@
     
     class TestCountLoc(test_case.TestCase):
         def setUp(self):
    -        self.test_dir = "test_directory"
    -        os.makedirs(self.test_dir, exist_ok=True)
    -
    -    def tearDown(self):
    -        for root, dirs, files in os.walk(self.test_dir, topdown=False):
    -            for name in files:
    -                os.remove(os.path.join(root, name))
    -            for name in dirs:
    -                os.rmdir(os.path.join(root, name))
    +        self.test_dir = self.get_temp_dir()
     
         def create_file(self, filename, content):
             with open(
    
  • keras/src/utils/file_utils.py+6 9 modified
    @@ -53,8 +53,8 @@ def is_link_in_dir(info, base):
         return is_path_in_dir(info.linkname, base_dir=tip)
     
     
    -def filter_safe_zipinfos(members):
    -    base_dir = resolve_path(".")
    +def filter_safe_zipinfos(members, base_dir):
    +    base_dir = resolve_path(base_dir)
         for finfo in members:
             valid_path = False
             if is_path_in_dir(finfo.filename, base_dir):
    @@ -68,8 +68,8 @@ def filter_safe_zipinfos(members):
                 )
     
     
    -def filter_safe_tarinfos(members):
    -    base_dir = resolve_path(".")
    +def filter_safe_tarinfos(members, base_dir):
    +    base_dir = resolve_path(base_dir)
         for finfo in members:
             valid_path = False
             if finfo.issym() or finfo.islnk():
    @@ -99,7 +99,7 @@ def extract_open_archive(archive, path="."):
         if isinstance(archive, zipfile.ZipFile):
             # Zip archive.
             archive.extractall(
    -            path, members=filter_safe_zipinfos(archive.infolist())
    +            path, members=filter_safe_zipinfos(archive.infolist(), path)
             )
         else:
             # Tar archive.
    @@ -111,7 +111,7 @@ def extract_open_archive(archive, path="."):
                 extractall_kwargs = {"filter": "data"}
             archive.extractall(
                 path,
    -            members=filter_safe_tarinfos(archive),
    +            members=filter_safe_tarinfos(archive, path),
                 **extractall_kwargs,
             )
     
    @@ -563,6 +563,3 @@ def makedirs(path):
             else:
                 _raise_if_no_gfile(path)
         return os.makedirs(path)
    -
    -
    -"/fo"
    
  • keras/src/utils/file_utils_test.py+40 88 modified
    @@ -1,8 +1,6 @@
     import hashlib
     import os
    -import shutil
     import tarfile
    -import tempfile
     import urllib
     import urllib.parse
     import urllib.request
    @@ -60,22 +58,11 @@ def test_is_path_in_dir_with_absolute_paths(self):
     
     
     class IsLinkInDirTest(test_case.TestCase):
    -    def setUp(self):
    -        self._cleanup(os.path.join("test_path", "to", "base_dir"))
    -        self._cleanup(os.path.join(".", "base_dir"))
    -
    -    def _cleanup(self, base_dir):
    -        if os.path.exists(base_dir):
    -            shutil.rmtree(base_dir)
    -
         def test_is_link_in_dir_with_absolute_paths(self):
    -        base_dir = os.path.join("test_path", "to", "base_dir")
    +        base_dir = self.get_temp_dir()
             link_path = os.path.join(base_dir, "symlink")
             target_path = os.path.join(base_dir, "file.txt")
     
    -        # Create the base_dir directory if it does not exist.
    -        os.makedirs(base_dir, exist_ok=True)
    -
             # Create the file.txt file.
             with open(target_path, "w") as f:
                 f.write("Hello, world!")
    @@ -96,13 +83,10 @@ def test_is_link_in_dir_with_absolute_paths(self):
             self.assertTrue(file_utils.is_link_in_dir(info, base_dir))
     
         def test_is_link_in_dir_with_relative_paths(self):
    -        base_dir = os.path.join(".", "base_dir")
    +        base_dir = self.get_temp_dir()
             link_path = os.path.join(base_dir, "symlink")
             target_path = os.path.join(base_dir, "file.txt")
     
    -        # Create the base_dir directory if it does not exist.
    -        os.makedirs(base_dir, exist_ok=True)
    -
             # Create the file.txt file.
             with open(target_path, "w") as f:
                 f.write("Hello, world!")
    @@ -122,105 +106,85 @@ def test_is_link_in_dir_with_relative_paths(self):
     
             self.assertTrue(file_utils.is_link_in_dir(info, base_dir))
     
    -    def tearDown(self):
    -        self._cleanup(os.path.join("test_path", "to", "base_dir"))
    -        self._cleanup(os.path.join(".", "base_dir"))
    -
     
     class FilterSafePathsTest(test_case.TestCase):
         def setUp(self):
    -        self.base_dir = os.path.join(os.getcwd(), "temp_dir")
    -        os.makedirs(self.base_dir, exist_ok=True)
    +        self.base_dir = os.path.abspath(self.get_temp_dir())
             self.tar_path = os.path.join(self.base_dir, "test.tar")
    -
    -    def tearDown(self):
    -        os.remove(self.tar_path)
    -        shutil.rmtree(self.base_dir)
    +        self.target_path = os.path.join(self.base_dir, "target.txt")
    +        with open(self.target_path, "w") as f:
    +            f.write("target")
    +        self.symlink_path = os.path.join(self.base_dir, "symlink.txt")
    +        os.symlink(self.target_path, self.symlink_path)
     
         def test_member_within_base_dir(self):
             """Test a member within the base directory."""
             with tarfile.open(self.tar_path, "w") as tar:
    -            tar.add(__file__, arcname="safe_path.txt")
    +            tar.add(self.target_path, arcname="safe_path.txt")
             with tarfile.open(self.tar_path, "r") as tar:
    -            members = list(file_utils.filter_safe_tarinfos(tar.getmembers()))
    +            members = list(
    +                file_utils.filter_safe_tarinfos(tar.getmembers(), self.base_dir)
    +            )
                 self.assertEqual(len(members), 1)
                 self.assertEqual(members[0].name, "safe_path.txt")
     
         def test_symlink_within_base_dir(self):
             """Test a symlink pointing within the base directory."""
    -        symlink_path = os.path.join(self.base_dir, "symlink.txt")
    -        target_path = os.path.join(self.base_dir, "target.txt")
    -        with open(target_path, "w") as f:
    -            f.write("target")
    -        os.symlink(target_path, symlink_path)
             with tarfile.open(self.tar_path, "w") as tar:
    -            tar.add(symlink_path, arcname="symlink.txt")
    +            tar.add(self.symlink_path, arcname="symlink.txt")
             with tarfile.open(self.tar_path, "r") as tar:
    -            members = list(file_utils.filter_safe_tarinfos(tar.getmembers()))
    +            members = list(
    +                file_utils.filter_safe_tarinfos(tar.getmembers(), self.base_dir)
    +            )
                 self.assertEqual(len(members), 1)
                 self.assertEqual(members[0].name, "symlink.txt")
    -        os.remove(symlink_path)
    -        os.remove(target_path)
     
         def test_invalid_path_warning(self):
             """Test warning for an invalid path during archive extraction."""
    -        invalid_path = os.path.join(os.getcwd(), "invalid.txt")
    -        with open(invalid_path, "w") as f:
    -            f.write("invalid")
             with tarfile.open(self.tar_path, "w") as tar:
                 tar.add(
    -                invalid_path, arcname="../../invalid.txt"
    +                self.target_path, arcname="../../invalid.txt"
                 )  # Path intended to be outside of base dir
             with tarfile.open(self.tar_path, "r") as tar:
                 with patch("warnings.warn") as mock_warn:
    -                _ = list(file_utils.filter_safe_tarinfos(tar.getmembers()))
    +                _ = list(
    +                    file_utils.filter_safe_tarinfos(
    +                        tar.getmembers(), self.base_dir
    +                    )
    +                )
                     warning_msg = (
                         "Skipping invalid path during archive extraction: "
                         "'../../invalid.txt'."
                     )
                     mock_warn.assert_called_with(warning_msg, stacklevel=2)
    -        os.remove(invalid_path)
     
         def test_symbolic_link_in_base_dir(self):
             """symbolic link within the base directory is correctly processed."""
    -        symlink_path = os.path.join(self.base_dir, "symlink.txt")
    -        target_path = os.path.join(self.base_dir, "target.txt")
    -
    -        # Create a target file and then a symbolic link pointing to it.
    -        with open(target_path, "w") as f:
    -            f.write("target")
    -        os.symlink(target_path, symlink_path)
    -
             # Add the symbolic link to the tar archive.
             with tarfile.open(self.tar_path, "w") as tar:
    -            tar.add(symlink_path, arcname="symlink.txt")
    +            tar.add(self.symlink_path, arcname="symlink.txt")
     
             with tarfile.open(self.tar_path, "r") as tar:
    -            members = list(file_utils.filter_safe_tarinfos(tar.getmembers()))
    +            members = list(
    +                file_utils.filter_safe_tarinfos(tar.getmembers(), self.base_dir)
    +            )
                 self.assertEqual(len(members), 1)
                 self.assertEqual(members[0].name, "symlink.txt")
                 self.assertTrue(
                     members[0].issym()
                 )  # Explicitly assert it's a symbolic link.
     
    -        os.remove(symlink_path)
    -        os.remove(target_path)
    -
     
     class ExtractArchiveTest(test_case.TestCase):
         def setUp(self):
             """Create temporary directories and files for testing."""
    -        self.temp_dir = tempfile.mkdtemp()
    +        self.temp_dir = self.get_temp_dir()
             self.file_content = "Hello, world!"
     
             # Create sample files to be archived
             with open(os.path.join(self.temp_dir, "sample.txt"), "w") as f:
                 f.write(self.file_content)
     
    -    def tearDown(self):
    -        """Clean up temporary directories."""
    -        shutil.rmtree(self.temp_dir)
    -
         def create_tar(self):
             archive_path = os.path.join(self.temp_dir, "sample.tar")
             with tarfile.open(archive_path, "w") as archive:
    @@ -604,37 +568,32 @@ def test_handle_complex_paths(self):
     class HashFileTest(test_case.TestCase):
         def setUp(self):
             self.test_content = b"Hello, World!"
    -        self.temp_file = tempfile.NamedTemporaryFile(delete=False)
    -        self.temp_file.write(self.test_content)
    -        self.temp_file.close()
    -
    -    def tearDown(self):
    -        os.remove(self.temp_file.name)
    +        self.temp_file = os.path.join(self.get_temp_dir(), "test_file.txt")
    +        with open(self.temp_file, "wb") as f:
    +            f.write(self.test_content)
     
         def test_hash_file_sha256(self):
             """Test SHA256 hashing of a file."""
             expected_sha256 = (
                 "dffd6021bb2bd5b0af676290809ec3a53191dd81c7f70a4b28688a362182986f"
             )
             calculated_sha256 = file_utils.hash_file(
    -            self.temp_file.name, algorithm="sha256"
    +            self.temp_file, algorithm="sha256"
             )
             self.assertEqual(expected_sha256, calculated_sha256)
     
         def test_hash_file_md5(self):
             """Test MD5 hashing of a file."""
             expected_md5 = "65a8e27d8879283831b664bd8b7f0ad4"
    -        calculated_md5 = file_utils.hash_file(
    -            self.temp_file.name, algorithm="md5"
    -        )
    +        calculated_md5 = file_utils.hash_file(self.temp_file, algorithm="md5")
             self.assertEqual(expected_md5, calculated_md5)
     
     
     class TestValidateFile(test_case.TestCase):
         def setUp(self):
    -        self.tmp_file = tempfile.NamedTemporaryFile(delete=False)
    -        self.tmp_file.write(b"Hello, World!")
    -        self.tmp_file.close()
    +        self.temp_file = os.path.join(self.get_temp_dir(), "test_file.txt")
    +        with open(self.temp_file, "wb") as f:
    +            f.write(b"Hello, World!")
     
             self.sha256_hash = (
                 "dffd6021bb2bd5b0af676290809ec3a53191dd81c7f70a4b28688a362182986f"
    @@ -644,41 +603,34 @@ def setUp(self):
         def test_validate_file_sha256(self):
             """Validate SHA256 hash of a file."""
             self.assertTrue(
    -            file_utils.validate_file(
    -                self.tmp_file.name, self.sha256_hash, "sha256"
    -            )
    +            file_utils.validate_file(self.temp_file, self.sha256_hash, "sha256")
             )
     
         def test_validate_file_md5(self):
             """Validate MD5 hash of a file."""
             self.assertTrue(
    -            file_utils.validate_file(self.tmp_file.name, self.md5_hash, "md5")
    +            file_utils.validate_file(self.temp_file, self.md5_hash, "md5")
             )
     
         def test_validate_file_auto_sha256(self):
             """Auto-detect and validate SHA256 hash."""
             self.assertTrue(
    -            file_utils.validate_file(
    -                self.tmp_file.name, self.sha256_hash, "auto"
    -            )
    +            file_utils.validate_file(self.temp_file, self.sha256_hash, "auto")
             )
     
         def test_validate_file_auto_md5(self):
             """Auto-detect and validate MD5 hash."""
             self.assertTrue(
    -            file_utils.validate_file(self.tmp_file.name, self.md5_hash, "auto")
    +            file_utils.validate_file(self.temp_file, self.md5_hash, "auto")
             )
     
         def test_validate_file_wrong_hash(self):
             """Test validation with incorrect hash."""
             wrong_hash = "deadbeef" * 8
             self.assertFalse(
    -            file_utils.validate_file(self.tmp_file.name, wrong_hash, "sha256")
    +            file_utils.validate_file(self.temp_file, wrong_hash, "sha256")
             )
     
    -    def tearDown(self):
    -        os.remove(self.tmp_file.name)
    -
     
     class ResolveHasherTest(test_case.TestCase):
         def test_resolve_hasher_sha256(self):
    

Vulnerability mechanics

Root cause

"The archive extraction filters validate member paths against the process current working directory instead of the actual extraction destination, allowing path traversal to bypass the security check."

Attack vector

An attacker can craft a malicious archive (tar or zip) containing members with path traversal sequences (e.g., `../../malicious.py`). When a victim extracts the archive using Keras's `extract_open_archive()` — which is called during model loading or dataset handling — the flawed CWD-based filter in `filter_safe_tarinfos()` or `filter_safe_zipinfos()` validates paths against the process current working directory rather than the extraction destination. In environments where CWD is `/` (common in Docker, CI/CD, and Jupyter), traversal paths bypass the check entirely, allowing arbitrary file writes outside the intended directory. This can lead to overwriting configuration files, injecting code, or corrupting ML pipelines.

Affected code

The vulnerability resides in `keras/src/utils/file_utils.py` in the functions `filter_safe_tarinfos()` and `filter_safe_zipinfos()`. These functions previously hardcoded `resolve_path(".")` as the base directory for path validation, meaning they checked archive member paths against the process current working directory (CWD) instead of the actual extraction destination passed by `extract_open_archive()`. The patch changes both functions to accept an explicit `base_dir` parameter and passes the extraction `path` from `extract_open_archive()` to them.

What the fix does

The patch modifies `filter_safe_tarinfos()` and `filter_safe_zipinfos()` to accept an explicit `base_dir` parameter instead of hardcoding `resolve_path(".")`. The callers in `extract_open_archive()` now pass the extraction destination `path` as this argument. This ensures that path validation is performed against the actual target directory where files will be written, closing the traversal bypass. The commit also updates the unit tests to use temporary directories and pass `self.base_dir` explicitly, confirming the fix works correctly.

Preconditions

  • inputThe victim must extract a malicious archive using Keras's `extract_open_archive()` function (e.g., during model loading or dataset processing).
  • configThe process current working directory must be `/` or another directory that makes traversal paths appear valid — common in Docker containers, CI/CD runners, and Jupyter environments.
  • configOn Python 3.11, the `filter="data"` safety net is unavailable, so the flawed CWD-based filter is the only protection.

Generated on Jun 11, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

2

News mentions

0

No linked articles in our index yet.