CVE-2025-64439
Description
LangGraph SQLite Checkpoint is an implementation of LangGraph CheckpointSaver that uses SQLite DB (both sync and async, via aiosqlite). In versions 2.1.2 and below, the JsonPlusSerializer (used as the default serialization protocol for all checkpointing) contains a Remote Code Execution (RCE) vulnerability when deserializing payloads saved in the "json" serialization mode. By default, the serializer attempts to use "msgpack" for serialization. However, prior to version 3.0 of the checkpointer library, if illegal Unicode surrogate values caused serialization to fail, it would fall back to using the "json" mode. This issue is fixed in version 3.0.0.
Affected packages
Versions sourced from the GitHub Security Advisory.
| Package | Affected versions | Patched versions |
|---|---|---|
langgraph-checkpointPyPI | < 3.0.0 | 3.0.0 |
Affected products
1- Range: 0.1.10, 0.1.11, 0.1.12, …
Patches
1c5744f583b11chore: Restrict "json" type deserialization (#6269)
8 files changed · +143 −172
libs/checkpoint/langgraph/checkpoint/serde/base.py+4 −9 modified@@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any, Protocol +from typing import Any, Protocol, runtime_checkable class UntypedSerializerProtocol(Protocol): @@ -11,7 +11,8 @@ def dumps(self, obj: Any) -> bytes: ... def loads(self, data: bytes) -> Any: ... -class SerializerProtocol(UntypedSerializerProtocol, Protocol): +@runtime_checkable +class SerializerProtocol(Protocol): """Protocol for serialization and deserialization of objects. - `dumps`: Serialize an object to bytes. @@ -31,12 +32,6 @@ class SerializerCompat(SerializerProtocol): def __init__(self, serde: UntypedSerializerProtocol) -> None: self.serde = serde - def dumps(self, obj: Any) -> bytes: - return self.serde.dumps(obj) - - def loads(self, data: bytes) -> Any: - return self.serde.loads(data) - def dumps_typed(self, obj: Any) -> tuple[str, bytes]: return type(obj).__name__, self.serde.dumps(obj) @@ -49,7 +44,7 @@ def maybe_add_typed_methods( ) -> SerializerProtocol: """Wrap serde old serde implementations in a class with loads_typed and dumps_typed for backwards compatibility.""" - if not hasattr(serde, "loads_typed") or not hasattr(serde, "dumps_typed"): + if not isinstance(serde, SerializerProtocol): return SerializerCompat(serde) return serde
libs/checkpoint/langgraph/checkpoint/serde/encrypted.py+0 −6 modified@@ -14,12 +14,6 @@ def __init__( self.cipher = cipher self.serde = serde - def dumps(self, obj: Any) -> bytes: - return self.serde.dumps(obj) - - def loads(self, data: bytes) -> Any: - return self.serde.loads(data) - def dumps_typed(self, obj: Any) -> tuple[str, bytes]: """Serialize an object to a tuple `(type, bytes)` and encrypt the bytes.""" # serialize data
libs/checkpoint/langgraph/checkpoint/serde/jsonplus.py+100 −129 modified@@ -4,6 +4,7 @@ import decimal import importlib import json +import logging import pathlib import pickle import re @@ -21,32 +22,44 @@ IPv6Interface, IPv6Network, ) -from typing import Any, cast +from typing import Any, Literal from uuid import UUID from zoneinfo import ZoneInfo import ormsgpack from langchain_core.load.load import Reviver -from langchain_core.load.serializable import Serializable from langgraph.checkpoint.serde.base import SerializerProtocol from langgraph.checkpoint.serde.types import SendProtocol from langgraph.store.base import Item LC_REVIVER = Reviver() EMPTY_BYTES = b"" +logger = logging.getLogger(__name__) class JsonPlusSerializer(SerializerProtocol): - """Serializer that uses ormsgpack, with a fallback to extended JSON serializer.""" + """Serializer that uses ormsgpack, with optional fallbacks. + + Security note: this serializer is intended for use within the BaseCheckpointSaver + class and called within the Pregel loop. It should not be used on untrusted + python objects. If an attacker can write directly to your checkpoint database, + they may be able to trigger code execution when data is deserialized. + """ def __init__( self, *, pickle_fallback: bool = False, + allowed_json_modules: Sequence[tuple[str, ...]] | Literal[True] | None = None, __unpack_ext_hook__: Callable[[int, bytes], Any] | None = None, ) -> None: self.pickle_fallback = pickle_fallback + self._allowed_modules = ( + {mod_and_name for mod_and_name in allowed_json_modules} + if allowed_json_modules and allowed_json_modules is not True + else (allowed_json_modules if allowed_json_modules is True else None) + ) self._unpack_ext_hook = ( __unpack_ext_hook__ if __unpack_ext_hook__ is not None @@ -74,134 +87,90 @@ def _encode_constructor_args( out["kwargs"] = kwargs return out - def _default(self, obj: Any) -> str | dict[str, Any]: - if isinstance(obj, Serializable): - return cast(dict[str, Any], obj.to_json()) - elif hasattr(obj, "model_dump") and callable(obj.model_dump): - return self._encode_constructor_args( - obj.__class__, method=(None, "model_construct"), kwargs=obj.model_dump() - ) - elif hasattr(obj, "dict") and callable(obj.dict): - return self._encode_constructor_args( - obj.__class__, method=(None, "construct"), kwargs=obj.dict() - ) - elif hasattr(obj, "_asdict") and callable(obj._asdict): - return self._encode_constructor_args(obj.__class__, kwargs=obj._asdict()) - elif isinstance(obj, pathlib.Path): - return self._encode_constructor_args(pathlib.Path, args=obj.parts) - elif isinstance(obj, re.Pattern): - return self._encode_constructor_args( - re.compile, args=(obj.pattern, obj.flags) - ) - elif isinstance(obj, UUID): - return self._encode_constructor_args(UUID, args=(obj.hex,)) - elif isinstance(obj, decimal.Decimal): - return self._encode_constructor_args(decimal.Decimal, args=(str(obj),)) - elif isinstance(obj, (set, frozenset, deque)): - return self._encode_constructor_args(type(obj), args=(tuple(obj),)) - elif isinstance(obj, (IPv4Address, IPv4Interface, IPv4Network)): - return self._encode_constructor_args(obj.__class__, args=(str(obj),)) - elif isinstance(obj, (IPv6Address, IPv6Interface, IPv6Network)): - return self._encode_constructor_args(obj.__class__, args=(str(obj),)) - - elif isinstance(obj, datetime): - return self._encode_constructor_args( - datetime, method="fromisoformat", args=(obj.isoformat(),) - ) - elif isinstance(obj, timezone): - return self._encode_constructor_args( - timezone, - args=obj.__getinitargs__(), # type: ignore[attr-defined] - ) - elif isinstance(obj, ZoneInfo): - return self._encode_constructor_args(ZoneInfo, args=(obj.key,)) - elif isinstance(obj, timedelta): - return self._encode_constructor_args( - timedelta, args=(obj.days, obj.seconds, obj.microseconds) - ) - elif isinstance(obj, date): - return self._encode_constructor_args( - date, args=(obj.year, obj.month, obj.day) - ) - elif isinstance(obj, time): - return self._encode_constructor_args( - time, - args=(obj.hour, obj.minute, obj.second, obj.microsecond, obj.tzinfo), - kwargs={"fold": obj.fold}, - ) - elif dataclasses.is_dataclass(obj): - return self._encode_constructor_args( - obj.__class__, - kwargs={ - field.name: getattr(obj, field.name) - for field in dataclasses.fields(obj) - }, - ) - elif isinstance(obj, Enum): - return self._encode_constructor_args(obj.__class__, args=(obj.value,)) - elif isinstance(obj, SendProtocol): - return self._encode_constructor_args( - obj.__class__, kwargs={"node": obj.node, "arg": obj.arg} - ) - elif isinstance(obj, (bytes, bytearray)): - return self._encode_constructor_args( - obj.__class__, method="fromhex", args=(obj.hex(),) - ) - elif isinstance(obj, BaseException): - return repr(obj) - else: - raise TypeError( - f"Object of type {obj.__class__.__name__} is not JSON serializable" - ) - def _reviver(self, value: dict[str, Any]) -> Any: - if ( + if self._allowed_modules and ( value.get("lc", None) == 2 and value.get("type", None) == "constructor" and value.get("id", None) is not None ): try: - # Get module and class name - [*module, name] = value["id"] - # Import module - mod = importlib.import_module(".".join(module)) - # Import class - cls = getattr(mod, name) - # Instantiate class - method = value.get("method") - if isinstance(method, str): - methods = [getattr(cls, method)] - elif isinstance(method, list): - methods = [ - cls if method is None else getattr(cls, method) - for method in method - ] - else: - methods = [cls] - args = value.get("args") - kwargs = value.get("kwargs") - for method in methods: - try: - if isclass(method) and issubclass(method, BaseException): - return None - if args and kwargs: - return method(*args, **kwargs) - elif args: - return method(*args) - elif kwargs: - return method(**kwargs) - else: - return method() - except Exception: - continue - except Exception: - return None + return self._revive_lc2(value) + except InvalidModuleError as e: + logger.warning( + "Object %s is not in the deserialization allowlist.\n%s", + value["id"], + e.message, + ) return LC_REVIVER(value) - def dumps(self, obj: Any) -> bytes: - return json.dumps(obj, default=self._default, ensure_ascii=False).encode( - "utf-8", "ignore" + def _revive_lc2(self, value: dict[str, Any]) -> Any: + self._check_allowed_modules(value) + + [*module, name] = value["id"] + try: + mod = importlib.import_module(".".join(module)) + cls = getattr(mod, name) + method = value.get("method") + if isinstance(method, str): + methods = [getattr(cls, method)] + elif isinstance(method, list): + methods = [cls if m is None else getattr(cls, m) for m in method] + else: + methods = [cls] + args = value.get("args") + kwargs = value.get("kwargs") + for method in methods: + try: + if isclass(method) and issubclass(method, BaseException): + return None + if args and kwargs: + return method(*args, **kwargs) + elif args: + return method(*args) + elif kwargs: + return method(**kwargs) + else: + return method() + except Exception: + continue + except Exception: + return None + + def _check_allowed_modules(self, value: dict[str, Any]) -> None: + needed = tuple(value["id"]) + method = value.get("method") + if isinstance(method, list): + method_display = ",".join(m or "<init>" for m in method) + elif isinstance(method, str): + method_display = method + else: + method_display = "<init>" + + dotted = ".".join(needed) + if not self._allowed_modules: + raise InvalidModuleError( + f"Refused to deserialize JSON constructor: {dotted} (method: {method_display}). " + "No allowed_json_modules configured.\n\n" + "Unblock with ONE of:\n" + f" • JsonPlusSerializer(allowed_json_modules=[{needed!r}, ...])\n" + " • (DANGEROUS) JsonPlusSerializer(allowed_json_modules=True)\n\n" + "Note: Prefix allowlists are intentionally unsupported; prefer exact symbols " + "or plain-JSON representations revived without import-time side effects." + ) + + if self._allowed_modules is True: + return + if needed in self._allowed_modules: + return + + raise InvalidModuleError( + f"Refused to deserialize JSON constructor: {dotted} (method: {method_display}). " + "Symbol is not in the deserialization allowlist.\n\n" + "Add exactly this symbol to unblock:\n" + f" JsonPlusSerializer(allowed_json_modules=[{needed!r}, ...])\n" + "Or, as a last resort (DANGEROUS):\n" + " JsonPlusSerializer(allowed_json_modules=True)" ) def dumps_typed(self, obj: Any) -> tuple[str, bytes]: @@ -215,15 +184,10 @@ def dumps_typed(self, obj: Any) -> tuple[str, bytes]: try: return "msgpack", _msgpack_enc(obj) except ormsgpack.MsgpackEncodeError as exc: - if "valid UTF-8" in str(exc): - return "json", self.dumps(obj) - elif self.pickle_fallback: + if self.pickle_fallback: return "pickle", pickle.dumps(obj) raise exc - def loads(self, data: bytes) -> Any: - return json.loads(data, object_hook=self._reviver) - def loads_typed(self, data: tuple[str, bytes]) -> Any: type_, data_ = data if type_ == "null": @@ -233,7 +197,7 @@ def loads_typed(self, data: tuple[str, bytes]) -> Any: elif type_ == "bytearray": return bytearray(data_) elif type_ == "json": - return self.loads(data_) + return json.loads(data_, object_hook=self._reviver) elif type_ == "msgpack": return ormsgpack.unpackb( data_, ext_hook=self._unpack_ext_hook, option=ormsgpack.OPT_NON_STR_KEYS @@ -663,6 +627,13 @@ def _msgpack_ext_hook_to_json(code: int, data: bytes) -> Any: return +class InvalidModuleError(Exception): + """Exception raised when a module is not in the allowlist.""" + + def __init__(self, message: str): + self.message = message + + _option = ( ormsgpack.OPT_NON_STR_KEYS | ormsgpack.OPT_PASSTHROUGH_DATACLASS
libs/checkpoint-sqlite/langgraph/checkpoint/sqlite/aio.py+6 −9 modified@@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import json import random from collections.abc import AsyncIterator, Callable, Iterator, Sequence from contextlib import asynccontextmanager @@ -377,9 +378,7 @@ async def aget_tuple(self, config: RunnableConfig) -> CheckpointTuple | None: self.serde.loads_typed((type, checkpoint)), cast( CheckpointMetadata, - self.jsonplus_serde.loads(metadata) - if metadata is not None - else {}, + (json.loads(metadata) if metadata is not None else {}), ), ( { @@ -457,9 +456,7 @@ async def alist( self.serde.loads_typed((type, checkpoint)), cast( CheckpointMetadata, - self.jsonplus_serde.loads(metadata) - if metadata is not None - else {}, + (json.loads(metadata) if metadata is not None else {}), ), ( { @@ -503,9 +500,9 @@ async def aput( thread_id = config["configurable"]["thread_id"] checkpoint_ns = config["configurable"]["checkpoint_ns"] type_, serialized_checkpoint = self.serde.dumps_typed(checkpoint) - serialized_metadata = self.jsonplus_serde.dumps( - get_checkpoint_metadata(config, metadata) - ) + serialized_metadata = json.dumps( + get_checkpoint_metadata(config, metadata), ensure_ascii=False + ).encode("utf-8", "ignore") async with ( self.lock, self.conn.execute(
libs/checkpoint-sqlite/langgraph/checkpoint/sqlite/__init__.py+6 −9 modified@@ -1,5 +1,6 @@ from __future__ import annotations +import json import random import sqlite3 import threading @@ -265,9 +266,7 @@ def get_tuple(self, config: RunnableConfig) -> CheckpointTuple | None: self.serde.loads_typed((type, checkpoint)), cast( CheckpointMetadata, - self.jsonplus_serde.loads(metadata) - if metadata is not None - else {}, + json.loads(metadata) if metadata is not None else {}, ), ( { @@ -358,9 +357,7 @@ def list( self.serde.loads_typed((type, checkpoint)), cast( CheckpointMetadata, - self.jsonplus_serde.loads(metadata) - if metadata is not None - else {}, + json.loads(metadata) if metadata is not None else {}, ), ( { @@ -413,9 +410,9 @@ def put( thread_id = config["configurable"]["thread_id"] checkpoint_ns = config["configurable"]["checkpoint_ns"] type_, serialized_checkpoint = self.serde.dumps_typed(checkpoint) - serialized_metadata = self.jsonplus_serde.dumps( - get_checkpoint_metadata(config, metadata) - ) + serialized_metadata = json.dumps( + get_checkpoint_metadata(config, metadata), ensure_ascii=False + ).encode("utf-8", "ignore") with self.cursor() as cur: cur.execute( "INSERT OR REPLACE INTO checkpoints (thread_id, checkpoint_ns, checkpoint_id, parent_checkpoint_id, type, checkpoint, metadata) VALUES (?, ?, ?, ?, ?, ?, ?)",
libs/checkpoint/tests/test_jsonplus.py+18 −3 modified@@ -1,4 +1,5 @@ import dataclasses +import json import pathlib import re import sys @@ -19,6 +20,7 @@ from pydantic.v1 import SecretStr as SecretStrV1 from langgraph.checkpoint.serde.jsonplus import ( + InvalidModuleError, JsonPlusSerializer, _msgpack_ext_hook_to_json, ) @@ -160,10 +162,9 @@ def test_serde_jsonplus() -> None: "Text\ud83d\udcac", "收花🙄·到", ] + serde = JsonPlusSerializer(pickle_fallback=True) - assert serde.loads_typed(serde.dumps_typed(surrogates)) == [ - v.encode("utf-8", "ignore").decode() for v in surrogates - ] + assert serde.loads_typed(serde.dumps_typed(surrogates)) == surrogates def test_serde_jsonplus_json_mode() -> None: @@ -290,6 +291,20 @@ def test_serde_jsonplus_bytes() -> None: assert serde.loads_typed(dumped) == some_bytes +def test_deserde_invalid_module() -> None: + serde = JsonPlusSerializer() + load = { + "lc": 2, + "type": "constructor", + "id": ["pprint", "pprint"], + "kwargs": {"object": "HELLO"}, + } + with pytest.raises(InvalidModuleError): + serde._revive_lc2(load) + serde = JsonPlusSerializer(allowed_json_modules=[("pprint", "pprint")]) + serde.loads_typed(("json", json.dumps(load).encode("utf-8"))) + + def test_serde_jsonplus_bytearray() -> None: serde = JsonPlusSerializer()
libs/langgraph/.claude/settings.local.json+4 −2 modified@@ -7,8 +7,10 @@ "Bash(sed:*)", "Bash(awk:*)", "Bash(uv run mypy:*)", - "Bash(uv run:*)" + "Bash(uv run:*)", + "Bash(make test:*)", + "Bash(make test_parallel:*)" ], "deny": [] } -} \ No newline at end of file +}
libs/langgraph/tests/test_interrupt_migration.py+5 −5 modified@@ -21,18 +21,18 @@ def test_interrupt_legacy_ns() -> None: assert new_interrupt.id == old_interrupt.id -serializer = JsonPlusSerializer() +serializer = JsonPlusSerializer(allowed_json_modules=True) def test_serialization_roundtrip() -> None: """Test that the legacy interrupt (pre v1) can be reserialized as the modern interrupt without id corruption.""" # generated with: - # JsonPlusSerializer().dumps(Interrupt(value="legacy_test", ns=["legacy_test"], resumable=True, when="during")) + # JsonPlusSerializer().dumps_typed(Interrupt(value="legacy_test", ns=["legacy_test"], resumable=True, when="during")) legacy_interrupt_bytes = b'{"lc": 2, "type": "constructor", "id": ["langgraph", "types", "Interrupt"], "kwargs": {"value": "legacy_test", "resumable": true, "ns": ["legacy_test"], "when": "during"}}' legacy_interrupt_id = "f1fa625689ec006a5b32b76863e22a6c" - interrupt = serializer.loads(legacy_interrupt_bytes) + interrupt = serializer.loads_typed(("json", legacy_interrupt_bytes)) assert interrupt.id == legacy_interrupt_id assert interrupt.value == "legacy_test" @@ -41,10 +41,10 @@ def test_serialization_roundtrip_complex_ns() -> None: """Test that the legacy interrupt (pre v1), with a more complex ns can be reserialized as the modern interrupt without id corruption.""" # generated with: - # JsonPlusSerializer().dumps(Interrupt(value="legacy_test", ns=["legacy:test", "with:complex", "name:space"], resumable=True, when="during")) + # JsonPlusSerializer().dumps_typed(Interrupt(value="legacy_test", ns=["legacy:test", "with:complex", "name:space"], resumable=True, when="during")) legacy_interrupt_bytes = b'{"lc": 2, "type": "constructor", "id": ["langgraph", "types", "Interrupt"], "kwargs": {"value": "legacy_test", "resumable": true, "ns": ["legacy:test", "with:complex", "name:space"], "when": "during"}}' legacy_interrupt_id = "e69356a9ee3630ee7f4f597f2693000c" - interrupt = serializer.loads(legacy_interrupt_bytes) + interrupt = serializer.loads_typed(("json", legacy_interrupt_bytes)) assert interrupt.id == legacy_interrupt_id assert interrupt.value == "legacy_test"
Vulnerability mechanics
Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
6- github.com/advisories/GHSA-wwqv-p2pp-99h5ghsaADVISORY
- nvd.nist.gov/vuln/detail/CVE-2025-64439ghsaADVISORY
- github.com/langchain-ai/langgraph/blob/c5744f583b11745cd406f3059903e17bbcdcc8ac/libs/checkpoint/langgraph/checkpoint/serde/jsonplus.pynvdWEB
- github.com/langchain-ai/langgraph/commit/c5744f583b11745cd406f3059903e17bbcdcc8acnvdWEB
- github.com/langchain-ai/langgraph/releases/tag/checkpoint%3D%3D3.0.0nvdWEB
- github.com/langchain-ai/langgraph/security/advisories/GHSA-wwqv-p2pp-99h5nvdWEB
News mentions
0No linked articles in our index yet.