CVE-2024-53848
Description
check-jsonschema is a CLI and set of pre-commit hooks for jsonschema validation. The default cache strategy uses the basename of a remote schema as the name of the file in the cache, e.g. https://example.org/schema.json will be stored as schema.json. This naming allows for conflicts. If an attacker can get a user to run check-jsonschema against a malicious schema URL, e.g., https://example.evil.org/schema.json, they can insert their own schema into the cache and it will be picked up and used instead of the appropriate schema. Such a cache confusion attack could be used to allow data to pass validation which should have been rejected. This issue has been patched in version 0.30.0. All users are advised to upgrade. A few workarounds exist: 1. Users can use --no-cache to disable caching. 2. Users can use --cache-filename to select filenames for use in the cache, or to ensure that other usages do not overwrite the cached schema. (Note: this flag is being deprecated as part of the remediation effort.) 3. Users can explicitly download the schema before use as a local file, as in curl -LOs https://example.org/schema.json; check-jsonschema --schemafile ./schema.json
Affected packages
Versions sourced from the GitHub Security Advisory.
| Package | Affected versions | Patched versions |
|---|---|---|
check-jsonschemaPyPI | < 0.30.0 | 0.30.0 |
Patches
29932a7df36a3c52714b85e67Merge pull request #503 from python-jsonschema/fix-caching
8 files changed · +101 −104
docs/usage.rst+4 −3 modified@@ -118,6 +118,10 @@ Downloading and Caching By default, when ``--schemafile`` is used to refer to an ``http://`` or ``https://`` location, the schema is downloaded and cached based on the schema's Last-Modified time. + +Additionally, when ``$ref``\s are looked up during schema resolution, they are +similarly cached. + The following options control caching behaviors. .. list-table:: Caching Options @@ -128,9 +132,6 @@ The following options control caching behaviors. - Description * - ``--no-cache`` - Disable caching. - * - ``--cache-filename`` - - The name to use for caching a remote schema. - Defaults to using the last slash-delimited part of the URI. "format" Validation Options ---------------------------
src/check_jsonschema/cachedownloader.py+26 −11 modified@@ -1,6 +1,7 @@ from __future__ import annotations import contextlib +import hashlib import io import os import platform @@ -33,7 +34,7 @@ def _base_cache_dir() -> str | None: return cache_dir -def _resolve_cache_dir(dirname: str = "downloads") -> str | None: +def _resolve_cache_dir(dirname: str) -> str | None: cache_dir = _base_cache_dir() if cache_dir: cache_dir = os.path.join(cache_dir, "check_jsonschema", dirname) @@ -95,18 +96,32 @@ def _cache_hit(cachefile: str, response: requests.Response) -> bool: return local_mtime >= remote_mtime +def url_to_cache_filename(ref_url: str) -> str: + """ + Given a schema URL, convert it to a filename for caching in a cache dir. + + Rules are as follows: + - the base filename is an sha256 hash of the URL + - if the filename ends in an extension (.json, .yaml, etc) that extension + is appended to the hash + + Preserving file extensions preserves the extension-based logic used for parsing, and + it also helps a local editor (browsing the cache) identify filetypes. + """ + filename = hashlib.sha256(ref_url.encode()).hexdigest() + if "." in (last_part := ref_url.rpartition("/")[-1]): + _, _, extension = last_part.rpartition(".") + filename = f"{filename}.{extension}" + return filename + + class FailedDownloadError(Exception): pass class CacheDownloader: - def __init__( - self, cache_dir: str | None = None, disable_cache: bool = False - ) -> None: - if cache_dir is None: - self._cache_dir = _resolve_cache_dir() - else: - self._cache_dir = _resolve_cache_dir(cache_dir) + def __init__(self, cache_dir: str, *, disable_cache: bool = False) -> None: + self._cache_dir = _resolve_cache_dir(cache_dir) self._disable_cache = disable_cache def _download( @@ -160,21 +175,21 @@ def bind( validation_callback: t.Callable[[bytes], t.Any] | None = None, ) -> BoundCacheDownloader: return BoundCacheDownloader( - file_url, filename, self, validation_callback=validation_callback + file_url, self, filename=filename, validation_callback=validation_callback ) class BoundCacheDownloader: def __init__( self, file_url: str, - filename: str | None, downloader: CacheDownloader, *, + filename: str | None = None, validation_callback: t.Callable[[bytes], t.Any] | None = None, ) -> None: self._file_url = file_url - self._filename = filename or file_url.split("/")[-1] + self._filename = filename or url_to_cache_filename(file_url) self._downloader = downloader self._validation_callback = validation_callback
src/check_jsonschema/cli/main_command.py+1 −8 modified@@ -130,11 +130,7 @@ def pretty_helptext_list(values: list[str] | tuple[str, ...]) -> str: help="Disable schema caching. Always download remote schemas.", ) @click.option( - "--cache-filename", - help=( - "The name to use for caching a remote schema. " - "Defaults to the last slash-delimited part of the URI." - ), + "--cache-filename", help="Deprecated. This option no longer has any effect." ) @click.option( "--disable-formats", @@ -271,8 +267,6 @@ def main( args.disable_cache = no_cache args.default_filetype = default_filetype args.fill_defaults = fill_defaults - if cache_filename is not None: - args.cache_filename = cache_filename if data_transform is not None: args.data_transform = TRANSFORM_LIBRARY[data_transform] @@ -300,7 +294,6 @@ def build_schema_loader(args: ParseResult) -> SchemaLoaderBase: assert args.schema_path is not None return SchemaLoader( args.schema_path, - cache_filename=args.cache_filename, disable_cache=args.disable_cache, base_uri=args.base_uri, validator_class=args.validator_class,
src/check_jsonschema/schema_loader/main.py+1 −7 modified@@ -64,14 +64,12 @@ def __init__( self, schemafile: str, *, - cache_filename: str | None = None, base_uri: str | None = None, validator_class: type[jsonschema.protocols.Validator] | None = None, disable_cache: bool = True, ) -> None: # record input parameters (these are not to be modified) self.schemafile = schemafile - self.cache_filename = cache_filename self.disable_cache = disable_cache self.base_uri = base_uri self.validator_class = validator_class @@ -105,11 +103,7 @@ def _get_schema_reader( return LocalSchemaReader(self.schemafile) if self.url_info.scheme in ("http", "https"): - return HttpSchemaReader( - self.schemafile, - self.cache_filename, - self.disable_cache, - ) + return HttpSchemaReader(self.schemafile, self.disable_cache) else: raise UnsupportedUrlScheme( "check-jsonschema only supports http, https, and local files. "
src/check_jsonschema/schema_loader/readers.py+3 −4 modified@@ -73,14 +73,13 @@ class HttpSchemaReader: def __init__( self, url: str, - cache_filename: str | None, disable_cache: bool, ) -> None: self.url = url self.parsers = ParserSet() - self.downloader = CacheDownloader( - disable_cache=disable_cache, - ).bind(url, cache_filename, validation_callback=self._parse) + self.downloader = CacheDownloader("schemas", disable_cache=disable_cache).bind( + url, validation_callback=self._parse + ) self._parsed_schema: dict | _UnsetType = _UNSET def _parse(self, schema_bytes: bytes) -> t.Any:
src/check_jsonschema/schema_loader/resolver.py+2 −20 modified@@ -1,6 +1,5 @@ from __future__ import annotations -import hashlib import typing as t import urllib.parse @@ -12,21 +11,6 @@ from ..utils import filename2path -def ref_url_to_cache_filename(ref_url: str) -> str: - """ - Given a $ref URL, convert it to the filename in the refs/ cache dir. - Rules are as follows: - - the base filename is an md5 hash of the URL - - if the filename ends in an extension (.json, .yaml, etc) that extension - is appended to the hash - """ - filename = hashlib.md5(ref_url.encode()).hexdigest() - if "." in (last_part := ref_url.rpartition("/")[-1]): - _, _, extension = last_part.rpartition(".") - filename = f"{filename}.{extension}" - return filename - - def make_reference_registry( parsers: ParserSet, retrieval_uri: str | None, schema: dict, disable_cache: bool ) -> referencing.Registry: @@ -66,7 +50,7 @@ def create_retrieve_callable( base_uri = retrieval_uri cache = ResourceCache() - downloader = CacheDownloader("refs", disable_cache) + downloader = CacheDownloader("refs", disable_cache=disable_cache) def get_local_file(uri: str) -> t.Any: path = filename2path(uri) @@ -89,9 +73,7 @@ def validation_callback(content: bytes) -> None: parser_set.parse_data_with_path(content, full_uri, "json") bound_downloader = downloader.bind( - full_uri, - ref_url_to_cache_filename(full_uri), - validation_callback, + full_uri, validation_callback=validation_callback ) with bound_downloader.open() as fp: data = fp.read()
tests/conftest.py+16 −8 modified@@ -62,15 +62,25 @@ def patch_cache_dir(monkeypatch, cache_dir): yield m +@pytest.fixture +def url2cachepath(): + from check_jsonschema.cachedownloader import url_to_cache_filename + + def _get(cache_dir, url): + return cache_dir / url_to_cache_filename(url) + + return _get + + @pytest.fixture def downloads_cache_dir(tmp_path): return tmp_path / ".cache" / "check_jsonschema" / "downloads" @pytest.fixture -def get_download_cache_loc(downloads_cache_dir): - def _get(uri): - return downloads_cache_dir / uri.split("/")[-1] +def get_download_cache_loc(downloads_cache_dir, url2cachepath): + def _get(url): + return url2cachepath(downloads_cache_dir, url) return _get @@ -94,11 +104,9 @@ def refs_cache_dir(tmp_path): @pytest.fixture -def get_ref_cache_loc(refs_cache_dir): - from check_jsonschema.schema_loader.resolver import ref_url_to_cache_filename - - def _get(uri): - return refs_cache_dir / ref_url_to_cache_filename(uri) +def get_ref_cache_loc(refs_cache_dir, url2cachepath): + def _get(url): + return url2cachepath(refs_cache_dir, url) return _get
tests/unit/test_cachedownloader.py+48 −43 modified@@ -11,13 +11,16 @@ CacheDownloader, FailedDownloadError, _cache_hit, + url_to_cache_filename, ) +DEFAULT_RESPONSE_URL = "https://example.com/schema1.json" + def add_default_response(): responses.add( "GET", - "https://example.com/schema1.json", + DEFAULT_RESPONSE_URL, headers={"Last-Modified": "Sun, 01 Jan 2000 00:00:01 GMT"}, json={}, match_querystring=None, @@ -30,8 +33,8 @@ def default_response(): def test_default_filename_from_uri(default_response): - cd = CacheDownloader().bind("https://example.com/schema1.json") - assert cd._filename == "schema1.json" + cd = CacheDownloader("downloads").bind(DEFAULT_RESPONSE_URL) + assert cd._filename == url_to_cache_filename(DEFAULT_RESPONSE_URL) @pytest.mark.parametrize( @@ -76,7 +79,7 @@ def fake_expanduser(path): monkeypatch.setattr(platform, "system", fakesystem) monkeypatch.setattr(os.path, "expanduser", fake_expanduser) - cd = CacheDownloader() + cd = CacheDownloader("downloads") assert cd._cache_dir == expect_value if sysname == "Darwin": @@ -94,15 +97,15 @@ def test_cache_hit_by_mtime(monkeypatch, default_response): monkeypatch.setattr(os.path, "getmtime", lambda x: time.time()) assert _cache_hit( "/tmp/schema1.json", - requests.get("https://example.com/schema1.json", stream=True), + requests.get(DEFAULT_RESPONSE_URL, stream=True), ) # local mtime = 0, cache miss monkeypatch.setattr(os.path, "getmtime", lambda x: 0) assert ( _cache_hit( "/tmp/schema1.json", - requests.get("https://example.com/schema1.json", stream=True), + requests.get(DEFAULT_RESPONSE_URL, stream=True), ) is False ) @@ -114,7 +117,7 @@ def test_cachedownloader_cached_file(tmp_path, monkeypatch, default_response): f.write_text("{}") # set the cache_dir to the tmp dir (so that cache_dir will always be set) - cd = CacheDownloader(cache_dir=tmp_path).bind(str(f)) + cd = CacheDownloader(tmp_path).bind(str(f), filename="foo.json") # patch the downloader to skip any download "work" monkeypatch.setattr( cd._downloader, "_download", lambda file_uri, filename, response_ok: str(f) @@ -125,11 +128,12 @@ def test_cachedownloader_cached_file(tmp_path, monkeypatch, default_response): @pytest.mark.parametrize("disable_cache", (True, False)) -def test_cachedownloader_on_success(get_download_cache_loc, disable_cache): - add_default_response() - f = get_download_cache_loc("schema1.json") - cd = CacheDownloader(disable_cache=disable_cache).bind( - "https://example.com/schema1.json" +def test_cachedownloader_on_success( + get_download_cache_loc, disable_cache, default_response +): + f = get_download_cache_loc(DEFAULT_RESPONSE_URL) + cd = CacheDownloader("downloads", disable_cache=disable_cache).bind( + DEFAULT_RESPONSE_URL ) with cd.open() as fp: @@ -140,10 +144,12 @@ def test_cachedownloader_on_success(get_download_cache_loc, disable_cache): assert f.exists() -def test_cachedownloader_using_alternate_target_dir(cache_dir): - add_default_response() - f = cache_dir / "check_jsonschema" / "otherdir" / "schema1.json" - cd = CacheDownloader("otherdir").bind("https://example.com/schema1.json") +def test_cachedownloader_using_alternate_target_dir( + cache_dir, default_response, url2cachepath +): + cache_dir = cache_dir / "check_jsonschema" / "otherdir" + f = url2cachepath(cache_dir, DEFAULT_RESPONSE_URL) + cd = CacheDownloader("otherdir").bind(DEFAULT_RESPONSE_URL) with cd.open() as fp: assert fp.read() == b"{}" assert f.exists() @@ -158,21 +164,21 @@ def test_cachedownloader_succeeds_after_few_errors( for _i in range(failures): responses.add( "GET", - "https://example.com/schema1.json", + DEFAULT_RESPONSE_URL, status=500, match_querystring=None, ) else: responses.add( "GET", - "https://example.com/schema1.json", + DEFAULT_RESPONSE_URL, body=failures(), match_querystring=None, ) add_default_response() - f = get_download_cache_loc("schema1.json") - cd = CacheDownloader(disable_cache=disable_cache).bind( - "https://example.com/schema1.json" + f = get_download_cache_loc(DEFAULT_RESPONSE_URL) + cd = CacheDownloader("downloads", disable_cache=disable_cache).bind( + DEFAULT_RESPONSE_URL ) with cd.open() as fp: @@ -192,21 +198,21 @@ def test_cachedownloader_fails_after_many_errors( if connection_error: responses.add( "GET", - "https://example.com/schema1.json", + DEFAULT_RESPONSE_URL, body=requests.ConnectionError(), match_querystring=None, ) else: responses.add( "GET", - "https://example.com/schema1.json", + DEFAULT_RESPONSE_URL, status=500, match_querystring=None, ) add_default_response() # never reached, the 11th response - f = get_download_cache_loc("schema1.json") - cd = CacheDownloader(disable_cache=disable_cache).bind( - "https://example.com/schema1.json" + f = get_download_cache_loc(DEFAULT_RESPONSE_URL) + cd = CacheDownloader("downloads", disable_cache=disable_cache).bind( + DEFAULT_RESPONSE_URL ) with pytest.raises(FailedDownloadError): with cd.open(): @@ -218,18 +224,18 @@ def test_cachedownloader_fails_after_many_errors( def test_cachedownloader_retries_on_bad_data(get_download_cache_loc, disable_cache): responses.add( "GET", - "https://example.com/schema1.json", + DEFAULT_RESPONSE_URL, status=200, body="{", match_querystring=None, ) add_default_response() - f = get_download_cache_loc("schema1.json") + f = get_download_cache_loc(DEFAULT_RESPONSE_URL) cd = CacheDownloader( + "downloads", disable_cache=disable_cache, ).bind( - "https://example.com/schema1.json", - filename=str(f), + DEFAULT_RESPONSE_URL, validation_callback=json.loads, ) @@ -253,13 +259,14 @@ def test_cachedownloader_handles_bad_lastmod_header( file_exists, failure_mode, ): - uri = "https://example.com/schema1.json" if failure_mode == "header_missing": - responses.add("GET", uri, headers={}, json={}, match_querystring=None) + responses.add( + "GET", DEFAULT_RESPONSE_URL, headers={}, json={}, match_querystring=None + ) elif failure_mode == "header_malformed": responses.add( "GET", - uri, + DEFAULT_RESPONSE_URL, headers={"Last-Modified": "Jan 2000 00:00:01"}, json={}, match_querystring=None, @@ -275,13 +282,13 @@ def fake_mktime(*args): raise NotImplementedError original_file_contents = b'{"foo": "bar"}' - file_path = get_download_cache_loc(uri) + file_path = get_download_cache_loc(DEFAULT_RESPONSE_URL) assert not file_path.exists() if file_exists: - inject_cached_download(uri, original_file_contents) + inject_cached_download(DEFAULT_RESPONSE_URL, original_file_contents) - cd = CacheDownloader().bind(uri) + cd = CacheDownloader("downloads").bind(DEFAULT_RESPONSE_URL) # if the file already existed, it will not be overwritten by the cachedownloader # so the returned value for both the downloader and a direct file read should be the @@ -302,20 +309,18 @@ def fake_mktime(*args): def test_cachedownloader_validation_is_not_invoked_on_hit( - monkeypatch, inject_cached_download + monkeypatch, default_response, inject_cached_download ): """ Regression test for https://github.com/python-jsonschema/check-jsonschema/issues/453 This was a bug in which the validation callback was invoked eagerly, even on a cache hit. As a result, cache hits did not demonstrate their expected performance gain. """ - uri = "https://example.com/schema1.json" - # 1: construct some perfectly good data (it doesn't really matter what it is) - add_default_response() + # <<default_response fixture>> # 2: put equivalent data on disk - inject_cached_download(uri, "{}") + inject_cached_download(DEFAULT_RESPONSE_URL, "{}") # 3: construct a validator which marks that it ran in a variable validator_ran = False @@ -326,8 +331,8 @@ def dummy_validate_bytes(data): # construct a downloader pointed at the schema and file, expecting a cache hit # and use the above validation method - cd = CacheDownloader().bind( - "https://example.com/schema1.json", + cd = CacheDownloader("downloads").bind( + DEFAULT_RESPONSE_URL, validation_callback=dummy_validate_bytes, )
Vulnerability mechanics
Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
4News mentions
0No linked articles in our index yet.