CBORDecoder reuse can leak shareable values across decode calls
Description
cbor2 provides encoding and decoding for the Concise Binary Object Representation (CBOR) serialization format. Starting in version 3.0.0 and prior to version 5.8.0, whhen a CBORDecoder instance is reused across multiple decode operations, values marked with the shareable tag (28) persist in memory and can be accessed by subsequent CBOR messages using the sharedref tag (29). This allows an attacker-controlled message to read data from previously decoded messages if the decoder is reused across trust boundaries. Version 5.8.0 patches the issue.
Affected packages
Versions sourced from the GitHub Security Advisory.
| Package | Affected versions | Patched versions |
|---|---|---|
cbor2PyPI | >= 3.0.0, < 5.8.0 | 5.8.0 |
Affected products
1Patches
1f1d701cd2c41Merge commit from fork
9 files changed · +255 −17
cbor2/_decoder.py+31 −7 modified@@ -5,6 +5,7 @@ import sys from codecs import getincrementaldecoder from collections.abc import Callable, Mapping, Sequence +from contextlib import contextmanager from datetime import date, datetime, timedelta, timezone from io import BytesIO from typing import IO, TYPE_CHECKING, Any, TypeVar, cast, overload @@ -59,6 +60,7 @@ class CBORDecoder: "_immutable", "_str_errors", "_stringref_namespace", + "_decode_depth", ) _fp: IO[bytes] @@ -100,6 +102,7 @@ def __init__( self._shareables: list[object] = [] self._stringref_namespace: list[str | bytes] | None = None self._immutable = False + self._decode_depth = 0 @property def immutable(self) -> bool: @@ -225,13 +228,33 @@ def _decode(self, immutable: bool = False, unshared: bool = False) -> Any: if unshared: self._share_index = old_index + @contextmanager + def _decoding_context(self): + """ + Context manager for tracking decode depth and clearing shared state. + + Shared state is cleared at the end of each top-level decode to prevent + shared references from leaking between independent decode operations. + Nested calls (from hooks) must preserve the state. + """ + self._decode_depth += 1 + try: + yield + finally: + self._decode_depth -= 1 + assert self._decode_depth >= 0 + if self._decode_depth == 0: + self._shareables.clear() + self._share_index = None + def decode(self) -> object: """ Decode the next value from the stream. :raises CBORDecodeError: if there is any problem decoding the stream """ - return self._decode() + with self._decoding_context(): + return self._decode() def decode_from_bytes(self, buf: bytes) -> object: """ @@ -242,12 +265,13 @@ def decode_from_bytes(self, buf: bytes) -> object: object needs to be decoded separately from the rest but while still taking advantage of the shared value registry. """ - with BytesIO(buf) as fp: - old_fp = self.fp - self.fp = fp - retval = self._decode() - self.fp = old_fp - return retval + with self._decoding_context(): + with BytesIO(buf) as fp: + old_fp = self.fp + self.fp = fp + retval = self._decode() + self.fp = old_fp + return retval @overload def _decode_length(self, subtype: int) -> int: ...
cbor2/_encoder.py+37 −7 modified@@ -124,6 +124,7 @@ class CBOREncoder: "string_namespacing", "_string_references", "indefinite_containers", + "_encode_depth", ) _fp: IO[bytes] @@ -188,6 +189,7 @@ def __init__( int, tuple[object, int | None] ] = {} # indexes used for value sharing self._string_references: dict[str | bytes, int] = {} # indexes used for string references + self._encode_depth = 0 self._encoders = default_encoders.copy() if canonical: self._encoders.update(canonical_encoders) @@ -303,13 +305,41 @@ def write(self, data: bytes) -> None: """ self._fp_write(data) + @contextmanager + def _encoding_context(self): + """ + Context manager for tracking encode depth and clearing shared state. + + Shared state is cleared at the end of each top-level encode to prevent + shared references from leaking between independent encode operations. + Nested calls (from hooks) must preserve the state. + """ + self._encode_depth += 1 + try: + yield + finally: + self._encode_depth -= 1 + if self._encode_depth == 0: + self._shared_containers.clear() + self._string_references.clear() + def encode(self, obj: Any) -> None: """ Encode the given object using CBOR. :param obj: the object to encode """ + with self._encoding_context(): + self._encode_value(obj) + + def _encode_value(self, obj: Any) -> None: + """ + Internal fast path for encoding - used by built-in encoders. + + External code should use encode() instead, which properly manages + shared state between independent encode operations. + """ obj_type = obj.__class__ encoder = self._encoders.get(obj_type) or self._find_encoder(obj_type) or self._default if not encoder: @@ -459,7 +489,7 @@ def encode_string(self, value: str) -> None: def encode_array(self, value: Sequence[Any]) -> None: self.encode_length(4, len(value) if not self.indefinite_containers else None) for item in value: - self.encode(item) + self._encode_value(item) if self.indefinite_containers: self.encode_break() @@ -468,8 +498,8 @@ def encode_array(self, value: Sequence[Any]) -> None: def encode_map(self, value: Mapping[Any, Any]) -> None: self.encode_length(5, len(value) if not self.indefinite_containers else None) for key, val in value.items(): - self.encode(key) - self.encode(val) + self._encode_value(key) + self._encode_value(val) if self.indefinite_containers: self.encode_break() @@ -494,10 +524,10 @@ def encode_canonical_map(self, value: Mapping[Any, Any]) -> None: # String referencing requires that the order encoded is # the same as the order emitted so string references are # generated after an order is determined - self.encode(realkey) + self._encode_value(realkey) else: self._fp_write(sortkey[1]) - self.encode(value) + self._encode_value(value) if self.indefinite_containers: self.encode_break() @@ -511,7 +541,7 @@ def encode_semantic(self, value: CBORTag) -> None: self._string_references = {} self.encode_length(6, value.tag) - self.encode(value.value) + self._encode_value(value.value) self.string_referencing = old_string_referencing self._string_references = old_string_references @@ -574,7 +604,7 @@ def encode_decimal(self, value: Decimal) -> None: def encode_stringref(self, value: str | bytes) -> None: # Semantic tag 25 if not self._stringref(value): - self.encode(value) + self._encode_value(value) def encode_rational(self, value: Fraction) -> None: # Semantic tag 30
docs/versionhistory.rst+5 −0 modified@@ -5,6 +5,11 @@ Version history This library adheres to `Semantic Versioning <https://semver.org/>`_. +**UNRELEASED** + +- Reset shared reference state at the start of each top-level encode/decode operation + (#266 <https://github.com/agronholm/cbor2/pull/266>_; PR by @andreer) + **5.7.1** (2025-10-24) - Improved performance on decoding large definite bytestrings
source/decoder.c+27 −1 modified@@ -143,6 +143,7 @@ CBORDecoder_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) self->str_errors = PyBytes_FromString("strict"); self->immutable = false; self->shared_index = -1; + self->decode_depth = 0; } return (PyObject *) self; error: @@ -2083,11 +2084,30 @@ decode(CBORDecoderObject *self, DecodeOptions options) } +// Reset shared state at the end of each top-level decode to prevent +// shared references from leaking between independent decode operations. +// Nested calls (from hooks) must preserve the state. +static inline void +clear_shareable_state(CBORDecoderObject *self) +{ + PyList_SetSlice(self->shareables, 0, PY_SSIZE_T_MAX, NULL); + self->shared_index = -1; +} + + // CBORDecoder.decode(self) -> obj PyObject * CBORDecoder_decode(CBORDecoderObject *self) { - return decode(self, DECODE_NORMAL); + PyObject *ret; + self->decode_depth++; + ret = decode(self, DECODE_NORMAL); + self->decode_depth--; + assert(self->decode_depth >= 0); + if (self->decode_depth == 0) { + clear_shareable_state(self); + } + return ret; } @@ -2100,6 +2120,7 @@ CBORDecoder_decode_from_bytes(CBORDecoderObject *self, PyObject *data) if (!_CBOR2_BytesIO && _CBOR2_init_BytesIO() == -1) return NULL; + self->decode_depth++; save_read = self->read; buf = PyObject_CallFunctionObjArgs(_CBOR2_BytesIO, data, NULL); if (buf) { @@ -2111,6 +2132,11 @@ CBORDecoder_decode_from_bytes(CBORDecoderObject *self, PyObject *data) Py_DECREF(buf); } self->read = save_read; + self->decode_depth--; + assert(self->decode_depth >= 0); + if (self->decode_depth == 0) { + clear_shareable_state(self); + } return ret; }
source/decoder.h+1 −0 modified@@ -13,6 +13,7 @@ typedef struct { PyObject *str_errors; bool immutable; Py_ssize_t shared_index; + Py_ssize_t decode_depth; } CBORDecoderObject; extern PyTypeObject CBORDecoderType;
source/encoder.c+21 −2 modified@@ -114,6 +114,7 @@ CBOREncoder_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) self->string_referencing = false; self->string_namespacing = false; self->indefinite_containers = false; + self->encode_depth = 0; } return (PyObject *) self; } @@ -2132,17 +2133,35 @@ encode(CBOREncoderObject *self, PyObject *value) } +// Reset shared state at the end of each top-level encode to prevent +// shared references from leaking between independent encode operations. +// Nested calls (from hooks or recursive encoding) must preserve the state. +static inline void +clear_shared_state(CBOREncoderObject *self) +{ + PyDict_Clear(self->shared); + PyDict_Clear(self->string_references); +} + + // CBOREncoder.encode(self, value) PyObject * CBOREncoder_encode(CBOREncoderObject *self, PyObject *value) { PyObject *ret; - // TODO reset shared dict? - if (Py_EnterRecursiveCall(" in CBOREncoder.encode")) + self->encode_depth++; + if (Py_EnterRecursiveCall(" in CBOREncoder.encode")) { + self->encode_depth--; return NULL; + } ret = encode(self, value); Py_LeaveRecursiveCall(); + self->encode_depth--; + assert(self->encode_depth >= 0); + if (self->encode_depth == 0) { + clear_shared_state(self); + } return ret; }
source/encoder.h+1 −0 modified@@ -25,6 +25,7 @@ typedef struct { bool string_referencing; bool string_namespacing; bool indefinite_containers; + Py_ssize_t encode_depth; } CBOREncoderObject; extern PyTypeObject CBOREncoderType;
tests/test_decoder.py+62 −0 modified@@ -1022,3 +1022,65 @@ def test_oversized_read(impl, payload: bytes, tmp_path: Path) -> None: dummy_path.write_bytes(payload) with dummy_path.open("rb") as f: impl.load(f) + + +class TestDecoderReuse: + """ + Tests for correct behavior when reusing CBORDecoder instances. + """ + + def test_decoder_reuse_resets_shared_refs(self, impl): + """ + Shared references should be scoped to a single decode operation, + not persist across multiple decodes on the same decoder instance. + """ + # Message with shareable tag (28) + msg1 = impl.dumps(impl.CBORTag(28, "first_value")) + + # Message with sharedref tag (29) referencing index 0 + msg2 = impl.dumps(impl.CBORTag(29, 0)) + + # Reuse decoder across messages + decoder = impl.CBORDecoder(BytesIO(msg1)) + result1 = decoder.decode() + assert result1 == "first_value" + + # Second decode should fail - sharedref(0) doesn't exist in this context + decoder.fp = BytesIO(msg2) + with pytest.raises(impl.CBORDecodeValueError, match="shared reference"): + decoder.decode() + + def test_decode_from_bytes_resets_shared_refs(self, impl): + """ + decode_from_bytes should also reset shared references between calls. + """ + msg1 = impl.dumps(impl.CBORTag(28, "value")) + msg2 = impl.dumps(impl.CBORTag(29, 0)) + + decoder = impl.CBORDecoder(BytesIO(b"")) + decoder.decode_from_bytes(msg1) + + with pytest.raises(impl.CBORDecodeValueError, match="shared reference"): + decoder.decode_from_bytes(msg2) + + def test_shared_refs_within_single_decode(self, impl): + """ + Shared references must work correctly within a single decode operation. + + Note: This tests non-cyclic sibling references [shareable(x), sharedref(0)], + which is a different pattern from test_cyclic_array/test_cyclic_map that + test self-referencing structures like shareable([sharedref(0)]). + """ + # [shareable("hello"), sharedref(0)] -> ["hello", "hello"] + data = unhexlify( + "82" # array(2) + "d81c" # tag(28) shareable + "65" # text(5) + "68656c6c6f" # "hello" + "d81d" # tag(29) sharedref + "00" # unsigned(0) + ) + + result = impl.loads(data) + assert result == ["hello", "hello"] + assert result[0] is result[1] # Same object reference
tests/test_encoder.py+70 −0 modified@@ -717,3 +717,73 @@ def test_indefinite_containers(impl): expected = b"\xbf\xff" assert impl.dumps({}, indefinite_containers=True) == expected assert impl.dumps({}, indefinite_containers=True, canonical=True) == expected + + +class TestEncoderReuse: + """ + Tests for correct behavior when reusing CBOREncoder instances. + """ + + def test_encoder_reuse_resets_shared_containers(self, impl): + """ + Shared container tracking should be scoped to a single encode operation, + not persist across multiple encodes on the same encoder instance. + """ + fp = BytesIO() + encoder = impl.CBOREncoder(fp, value_sharing=True) + shared_obj = ["hello"] + + # First encode: object is tracked in shared containers + encoder.encode([shared_obj, shared_obj]) + + # Second encode on new fp: should produce valid standalone CBOR + # (not a sharedref pointing to stale first-encode data) + encoder.fp = BytesIO() + encoder.encode(shared_obj) + second_output = encoder.fp.getvalue() + + # The second output must be decodable on its own + result = impl.loads(second_output) + assert result == ["hello"] + + def test_encode_to_bytes_resets_shared_containers(self, impl): + """ + encode_to_bytes should also reset shared container tracking between calls. + """ + fp = BytesIO() + encoder = impl.CBOREncoder(fp, value_sharing=True) + shared_obj = ["hello"] + + # First encode + encoder.encode_to_bytes([shared_obj, shared_obj]) + + # Second encode should produce valid standalone CBOR + result_bytes = encoder.encode_to_bytes(shared_obj) + result = impl.loads(result_bytes) + assert result == ["hello"] + + def test_encoder_hook_does_not_reset_state(self, impl): + """ + When a custom encoder hook calls encode(), the shared container + tracking should be preserved (not reset mid-operation). + """ + + class Custom: + def __init__(self, value): + self.value = value + + def custom_encoder(encoder, obj): + # Hook encodes the wrapped value + encoder.encode(obj.value) + + # Encode a Custom wrapping a list + data = impl.dumps(Custom(["a", "b"]), default=custom_encoder) + + # Verify the output decodes correctly + result = impl.loads(data) + assert result == ["a", "b"] + + # Test nested Custom objects - hook should work recursively + data2 = impl.dumps(Custom(Custom(["x"])), default=custom_encoder) + result2 = impl.loads(data2) + assert result2 == ["x"]
Vulnerability mechanics
Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
5- github.com/advisories/GHSA-wcj4-jw5j-44whghsaADVISORY
- nvd.nist.gov/vuln/detail/CVE-2025-68131ghsaADVISORY
- github.com/agronholm/cbor2/commit/f1d701cd2c411ee40bb1fe383afe7f365f35abf0ghsaWEB
- github.com/agronholm/cbor2/pull/268mitrex_refsource_MISC
- github.com/agronholm/cbor2/security/advisories/GHSA-wcj4-jw5j-44whghsax_refsource_CONFIRMWEB
News mentions
0No linked articles in our index yet.