VYPR
High severityOSV Advisory· Published Nov 7, 2025· Updated Apr 15, 2026

CVE-2025-64439

CVE-2025-64439

Description

LangGraph SQLite Checkpoint is an implementation of LangGraph CheckpointSaver that uses SQLite DB (both sync and async, via aiosqlite). In versions 2.1.2 and below, the JsonPlusSerializer (used as the default serialization protocol for all checkpointing) contains a Remote Code Execution (RCE) vulnerability when deserializing payloads saved in the "json" serialization mode. By default, the serializer attempts to use "msgpack" for serialization. However, prior to version 3.0 of the checkpointer library, if illegal Unicode surrogate values caused serialization to fail, it would fall back to using the "json" mode. This issue is fixed in version 3.0.0.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
langgraph-checkpointPyPI
< 3.0.03.0.0

Affected products

1

Patches

1
c5744f583b11

chore: Restrict "json" type deserialization (#6269)

https://github.com/langchain-ai/langgraphWilliam FHOct 20, 2025via ghsa
8 files changed · +143 172
  • libs/checkpoint/langgraph/checkpoint/serde/base.py+4 9 modified
    @@ -1,6 +1,6 @@
     from __future__ import annotations
     
    -from typing import Any, Protocol
    +from typing import Any, Protocol, runtime_checkable
     
     
     class UntypedSerializerProtocol(Protocol):
    @@ -11,7 +11,8 @@ def dumps(self, obj: Any) -> bytes: ...
         def loads(self, data: bytes) -> Any: ...
     
     
    -class SerializerProtocol(UntypedSerializerProtocol, Protocol):
    +@runtime_checkable
    +class SerializerProtocol(Protocol):
         """Protocol for serialization and deserialization of objects.
     
         - `dumps`: Serialize an object to bytes.
    @@ -31,12 +32,6 @@ class SerializerCompat(SerializerProtocol):
         def __init__(self, serde: UntypedSerializerProtocol) -> None:
             self.serde = serde
     
    -    def dumps(self, obj: Any) -> bytes:
    -        return self.serde.dumps(obj)
    -
    -    def loads(self, data: bytes) -> Any:
    -        return self.serde.loads(data)
    -
         def dumps_typed(self, obj: Any) -> tuple[str, bytes]:
             return type(obj).__name__, self.serde.dumps(obj)
     
    @@ -49,7 +44,7 @@ def maybe_add_typed_methods(
     ) -> SerializerProtocol:
         """Wrap serde old serde implementations in a class with loads_typed and dumps_typed for backwards compatibility."""
     
    -    if not hasattr(serde, "loads_typed") or not hasattr(serde, "dumps_typed"):
    +    if not isinstance(serde, SerializerProtocol):
             return SerializerCompat(serde)
     
         return serde
    
  • libs/checkpoint/langgraph/checkpoint/serde/encrypted.py+0 6 modified
    @@ -14,12 +14,6 @@ def __init__(
             self.cipher = cipher
             self.serde = serde
     
    -    def dumps(self, obj: Any) -> bytes:
    -        return self.serde.dumps(obj)
    -
    -    def loads(self, data: bytes) -> Any:
    -        return self.serde.loads(data)
    -
         def dumps_typed(self, obj: Any) -> tuple[str, bytes]:
             """Serialize an object to a tuple `(type, bytes)` and encrypt the bytes."""
             # serialize data
    
  • libs/checkpoint/langgraph/checkpoint/serde/jsonplus.py+100 129 modified
    @@ -4,6 +4,7 @@
     import decimal
     import importlib
     import json
    +import logging
     import pathlib
     import pickle
     import re
    @@ -21,32 +22,44 @@
         IPv6Interface,
         IPv6Network,
     )
    -from typing import Any, cast
    +from typing import Any, Literal
     from uuid import UUID
     from zoneinfo import ZoneInfo
     
     import ormsgpack
     from langchain_core.load.load import Reviver
    -from langchain_core.load.serializable import Serializable
     
     from langgraph.checkpoint.serde.base import SerializerProtocol
     from langgraph.checkpoint.serde.types import SendProtocol
     from langgraph.store.base import Item
     
     LC_REVIVER = Reviver()
     EMPTY_BYTES = b""
    +logger = logging.getLogger(__name__)
     
     
     class JsonPlusSerializer(SerializerProtocol):
    -    """Serializer that uses ormsgpack, with a fallback to extended JSON serializer."""
    +    """Serializer that uses ormsgpack, with optional fallbacks.
    +
    +    Security note: this serializer is intended for use within the BaseCheckpointSaver
    +    class and called within the Pregel loop. It should not be used on untrusted
    +    python objects. If an attacker can write directly to your checkpoint database,
    +    they may be able to trigger code execution when data is deserialized.
    +    """
     
         def __init__(
             self,
             *,
             pickle_fallback: bool = False,
    +        allowed_json_modules: Sequence[tuple[str, ...]] | Literal[True] | None = None,
             __unpack_ext_hook__: Callable[[int, bytes], Any] | None = None,
         ) -> None:
             self.pickle_fallback = pickle_fallback
    +        self._allowed_modules = (
    +            {mod_and_name for mod_and_name in allowed_json_modules}
    +            if allowed_json_modules and allowed_json_modules is not True
    +            else (allowed_json_modules if allowed_json_modules is True else None)
    +        )
             self._unpack_ext_hook = (
                 __unpack_ext_hook__
                 if __unpack_ext_hook__ is not None
    @@ -74,134 +87,90 @@ def _encode_constructor_args(
                 out["kwargs"] = kwargs
             return out
     
    -    def _default(self, obj: Any) -> str | dict[str, Any]:
    -        if isinstance(obj, Serializable):
    -            return cast(dict[str, Any], obj.to_json())
    -        elif hasattr(obj, "model_dump") and callable(obj.model_dump):
    -            return self._encode_constructor_args(
    -                obj.__class__, method=(None, "model_construct"), kwargs=obj.model_dump()
    -            )
    -        elif hasattr(obj, "dict") and callable(obj.dict):
    -            return self._encode_constructor_args(
    -                obj.__class__, method=(None, "construct"), kwargs=obj.dict()
    -            )
    -        elif hasattr(obj, "_asdict") and callable(obj._asdict):
    -            return self._encode_constructor_args(obj.__class__, kwargs=obj._asdict())
    -        elif isinstance(obj, pathlib.Path):
    -            return self._encode_constructor_args(pathlib.Path, args=obj.parts)
    -        elif isinstance(obj, re.Pattern):
    -            return self._encode_constructor_args(
    -                re.compile, args=(obj.pattern, obj.flags)
    -            )
    -        elif isinstance(obj, UUID):
    -            return self._encode_constructor_args(UUID, args=(obj.hex,))
    -        elif isinstance(obj, decimal.Decimal):
    -            return self._encode_constructor_args(decimal.Decimal, args=(str(obj),))
    -        elif isinstance(obj, (set, frozenset, deque)):
    -            return self._encode_constructor_args(type(obj), args=(tuple(obj),))
    -        elif isinstance(obj, (IPv4Address, IPv4Interface, IPv4Network)):
    -            return self._encode_constructor_args(obj.__class__, args=(str(obj),))
    -        elif isinstance(obj, (IPv6Address, IPv6Interface, IPv6Network)):
    -            return self._encode_constructor_args(obj.__class__, args=(str(obj),))
    -
    -        elif isinstance(obj, datetime):
    -            return self._encode_constructor_args(
    -                datetime, method="fromisoformat", args=(obj.isoformat(),)
    -            )
    -        elif isinstance(obj, timezone):
    -            return self._encode_constructor_args(
    -                timezone,
    -                args=obj.__getinitargs__(),  # type: ignore[attr-defined]
    -            )
    -        elif isinstance(obj, ZoneInfo):
    -            return self._encode_constructor_args(ZoneInfo, args=(obj.key,))
    -        elif isinstance(obj, timedelta):
    -            return self._encode_constructor_args(
    -                timedelta, args=(obj.days, obj.seconds, obj.microseconds)
    -            )
    -        elif isinstance(obj, date):
    -            return self._encode_constructor_args(
    -                date, args=(obj.year, obj.month, obj.day)
    -            )
    -        elif isinstance(obj, time):
    -            return self._encode_constructor_args(
    -                time,
    -                args=(obj.hour, obj.minute, obj.second, obj.microsecond, obj.tzinfo),
    -                kwargs={"fold": obj.fold},
    -            )
    -        elif dataclasses.is_dataclass(obj):
    -            return self._encode_constructor_args(
    -                obj.__class__,
    -                kwargs={
    -                    field.name: getattr(obj, field.name)
    -                    for field in dataclasses.fields(obj)
    -                },
    -            )
    -        elif isinstance(obj, Enum):
    -            return self._encode_constructor_args(obj.__class__, args=(obj.value,))
    -        elif isinstance(obj, SendProtocol):
    -            return self._encode_constructor_args(
    -                obj.__class__, kwargs={"node": obj.node, "arg": obj.arg}
    -            )
    -        elif isinstance(obj, (bytes, bytearray)):
    -            return self._encode_constructor_args(
    -                obj.__class__, method="fromhex", args=(obj.hex(),)
    -            )
    -        elif isinstance(obj, BaseException):
    -            return repr(obj)
    -        else:
    -            raise TypeError(
    -                f"Object of type {obj.__class__.__name__} is not JSON serializable"
    -            )
    -
         def _reviver(self, value: dict[str, Any]) -> Any:
    -        if (
    +        if self._allowed_modules and (
                 value.get("lc", None) == 2
                 and value.get("type", None) == "constructor"
                 and value.get("id", None) is not None
             ):
                 try:
    -                # Get module and class name
    -                [*module, name] = value["id"]
    -                # Import module
    -                mod = importlib.import_module(".".join(module))
    -                # Import class
    -                cls = getattr(mod, name)
    -                # Instantiate class
    -                method = value.get("method")
    -                if isinstance(method, str):
    -                    methods = [getattr(cls, method)]
    -                elif isinstance(method, list):
    -                    methods = [
    -                        cls if method is None else getattr(cls, method)
    -                        for method in method
    -                    ]
    -                else:
    -                    methods = [cls]
    -                args = value.get("args")
    -                kwargs = value.get("kwargs")
    -                for method in methods:
    -                    try:
    -                        if isclass(method) and issubclass(method, BaseException):
    -                            return None
    -                        if args and kwargs:
    -                            return method(*args, **kwargs)
    -                        elif args:
    -                            return method(*args)
    -                        elif kwargs:
    -                            return method(**kwargs)
    -                        else:
    -                            return method()
    -                    except Exception:
    -                        continue
    -            except Exception:
    -                return None
    +                return self._revive_lc2(value)
    +            except InvalidModuleError as e:
    +                logger.warning(
    +                    "Object %s is not in the deserialization allowlist.\n%s",
    +                    value["id"],
    +                    e.message,
    +                )
     
             return LC_REVIVER(value)
     
    -    def dumps(self, obj: Any) -> bytes:
    -        return json.dumps(obj, default=self._default, ensure_ascii=False).encode(
    -            "utf-8", "ignore"
    +    def _revive_lc2(self, value: dict[str, Any]) -> Any:
    +        self._check_allowed_modules(value)
    +
    +        [*module, name] = value["id"]
    +        try:
    +            mod = importlib.import_module(".".join(module))
    +            cls = getattr(mod, name)
    +            method = value.get("method")
    +            if isinstance(method, str):
    +                methods = [getattr(cls, method)]
    +            elif isinstance(method, list):
    +                methods = [cls if m is None else getattr(cls, m) for m in method]
    +            else:
    +                methods = [cls]
    +            args = value.get("args")
    +            kwargs = value.get("kwargs")
    +            for method in methods:
    +                try:
    +                    if isclass(method) and issubclass(method, BaseException):
    +                        return None
    +                    if args and kwargs:
    +                        return method(*args, **kwargs)
    +                    elif args:
    +                        return method(*args)
    +                    elif kwargs:
    +                        return method(**kwargs)
    +                    else:
    +                        return method()
    +                except Exception:
    +                    continue
    +        except Exception:
    +            return None
    +
    +    def _check_allowed_modules(self, value: dict[str, Any]) -> None:
    +        needed = tuple(value["id"])
    +        method = value.get("method")
    +        if isinstance(method, list):
    +            method_display = ",".join(m or "<init>" for m in method)
    +        elif isinstance(method, str):
    +            method_display = method
    +        else:
    +            method_display = "<init>"
    +
    +        dotted = ".".join(needed)
    +        if not self._allowed_modules:
    +            raise InvalidModuleError(
    +                f"Refused to deserialize JSON constructor: {dotted} (method: {method_display}). "
    +                "No allowed_json_modules configured.\n\n"
    +                "Unblock with ONE of:\n"
    +                f"  • JsonPlusSerializer(allowed_json_modules=[{needed!r}, ...])\n"
    +                "  • (DANGEROUS) JsonPlusSerializer(allowed_json_modules=True)\n\n"
    +                "Note: Prefix allowlists are intentionally unsupported; prefer exact symbols "
    +                "or plain-JSON representations revived without import-time side effects."
    +            )
    +
    +        if self._allowed_modules is True:
    +            return
    +        if needed in self._allowed_modules:
    +            return
    +
    +        raise InvalidModuleError(
    +            f"Refused to deserialize JSON constructor: {dotted} (method: {method_display}). "
    +            "Symbol is not in the deserialization allowlist.\n\n"
    +            "Add exactly this symbol to unblock:\n"
    +            f"  JsonPlusSerializer(allowed_json_modules=[{needed!r}, ...])\n"
    +            "Or, as a last resort (DANGEROUS):\n"
    +            "  JsonPlusSerializer(allowed_json_modules=True)"
             )
     
         def dumps_typed(self, obj: Any) -> tuple[str, bytes]:
    @@ -215,15 +184,10 @@ def dumps_typed(self, obj: Any) -> tuple[str, bytes]:
                 try:
                     return "msgpack", _msgpack_enc(obj)
                 except ormsgpack.MsgpackEncodeError as exc:
    -                if "valid UTF-8" in str(exc):
    -                    return "json", self.dumps(obj)
    -                elif self.pickle_fallback:
    +                if self.pickle_fallback:
                         return "pickle", pickle.dumps(obj)
                     raise exc
     
    -    def loads(self, data: bytes) -> Any:
    -        return json.loads(data, object_hook=self._reviver)
    -
         def loads_typed(self, data: tuple[str, bytes]) -> Any:
             type_, data_ = data
             if type_ == "null":
    @@ -233,7 +197,7 @@ def loads_typed(self, data: tuple[str, bytes]) -> Any:
             elif type_ == "bytearray":
                 return bytearray(data_)
             elif type_ == "json":
    -            return self.loads(data_)
    +            return json.loads(data_, object_hook=self._reviver)
             elif type_ == "msgpack":
                 return ormsgpack.unpackb(
                     data_, ext_hook=self._unpack_ext_hook, option=ormsgpack.OPT_NON_STR_KEYS
    @@ -663,6 +627,13 @@ def _msgpack_ext_hook_to_json(code: int, data: bytes) -> Any:
                 return
     
     
    +class InvalidModuleError(Exception):
    +    """Exception raised when a module is not in the allowlist."""
    +
    +    def __init__(self, message: str):
    +        self.message = message
    +
    +
     _option = (
         ormsgpack.OPT_NON_STR_KEYS
         | ormsgpack.OPT_PASSTHROUGH_DATACLASS
    
  • libs/checkpoint-sqlite/langgraph/checkpoint/sqlite/aio.py+6 9 modified
    @@ -1,6 +1,7 @@
     from __future__ import annotations
     
     import asyncio
    +import json
     import random
     from collections.abc import AsyncIterator, Callable, Iterator, Sequence
     from contextlib import asynccontextmanager
    @@ -377,9 +378,7 @@ async def aget_tuple(self, config: RunnableConfig) -> CheckpointTuple | None:
                         self.serde.loads_typed((type, checkpoint)),
                         cast(
                             CheckpointMetadata,
    -                        self.jsonplus_serde.loads(metadata)
    -                        if metadata is not None
    -                        else {},
    +                        (json.loads(metadata) if metadata is not None else {}),
                         ),
                         (
                             {
    @@ -457,9 +456,7 @@ async def alist(
                         self.serde.loads_typed((type, checkpoint)),
                         cast(
                             CheckpointMetadata,
    -                        self.jsonplus_serde.loads(metadata)
    -                        if metadata is not None
    -                        else {},
    +                        (json.loads(metadata) if metadata is not None else {}),
                         ),
                         (
                             {
    @@ -503,9 +500,9 @@ async def aput(
             thread_id = config["configurable"]["thread_id"]
             checkpoint_ns = config["configurable"]["checkpoint_ns"]
             type_, serialized_checkpoint = self.serde.dumps_typed(checkpoint)
    -        serialized_metadata = self.jsonplus_serde.dumps(
    -            get_checkpoint_metadata(config, metadata)
    -        )
    +        serialized_metadata = json.dumps(
    +            get_checkpoint_metadata(config, metadata), ensure_ascii=False
    +        ).encode("utf-8", "ignore")
             async with (
                 self.lock,
                 self.conn.execute(
    
  • libs/checkpoint-sqlite/langgraph/checkpoint/sqlite/__init__.py+6 9 modified
    @@ -1,5 +1,6 @@
     from __future__ import annotations
     
    +import json
     import random
     import sqlite3
     import threading
    @@ -265,9 +266,7 @@ def get_tuple(self, config: RunnableConfig) -> CheckpointTuple | None:
                         self.serde.loads_typed((type, checkpoint)),
                         cast(
                             CheckpointMetadata,
    -                        self.jsonplus_serde.loads(metadata)
    -                        if metadata is not None
    -                        else {},
    +                        json.loads(metadata) if metadata is not None else {},
                         ),
                         (
                             {
    @@ -358,9 +357,7 @@ def list(
                         self.serde.loads_typed((type, checkpoint)),
                         cast(
                             CheckpointMetadata,
    -                        self.jsonplus_serde.loads(metadata)
    -                        if metadata is not None
    -                        else {},
    +                        json.loads(metadata) if metadata is not None else {},
                         ),
                         (
                             {
    @@ -413,9 +410,9 @@ def put(
             thread_id = config["configurable"]["thread_id"]
             checkpoint_ns = config["configurable"]["checkpoint_ns"]
             type_, serialized_checkpoint = self.serde.dumps_typed(checkpoint)
    -        serialized_metadata = self.jsonplus_serde.dumps(
    -            get_checkpoint_metadata(config, metadata)
    -        )
    +        serialized_metadata = json.dumps(
    +            get_checkpoint_metadata(config, metadata), ensure_ascii=False
    +        ).encode("utf-8", "ignore")
             with self.cursor() as cur:
                 cur.execute(
                     "INSERT OR REPLACE INTO checkpoints (thread_id, checkpoint_ns, checkpoint_id, parent_checkpoint_id, type, checkpoint, metadata) VALUES (?, ?, ?, ?, ?, ?, ?)",
    
  • libs/checkpoint/tests/test_jsonplus.py+18 3 modified
    @@ -1,4 +1,5 @@
     import dataclasses
    +import json
     import pathlib
     import re
     import sys
    @@ -19,6 +20,7 @@
     from pydantic.v1 import SecretStr as SecretStrV1
     
     from langgraph.checkpoint.serde.jsonplus import (
    +    InvalidModuleError,
         JsonPlusSerializer,
         _msgpack_ext_hook_to_json,
     )
    @@ -160,10 +162,9 @@ def test_serde_jsonplus() -> None:
             "Text\ud83d\udcac",
             "收花🙄·到",
         ]
    +    serde = JsonPlusSerializer(pickle_fallback=True)
     
    -    assert serde.loads_typed(serde.dumps_typed(surrogates)) == [
    -        v.encode("utf-8", "ignore").decode() for v in surrogates
    -    ]
    +    assert serde.loads_typed(serde.dumps_typed(surrogates)) == surrogates
     
     
     def test_serde_jsonplus_json_mode() -> None:
    @@ -290,6 +291,20 @@ def test_serde_jsonplus_bytes() -> None:
         assert serde.loads_typed(dumped) == some_bytes
     
     
    +def test_deserde_invalid_module() -> None:
    +    serde = JsonPlusSerializer()
    +    load = {
    +        "lc": 2,
    +        "type": "constructor",
    +        "id": ["pprint", "pprint"],
    +        "kwargs": {"object": "HELLO"},
    +    }
    +    with pytest.raises(InvalidModuleError):
    +        serde._revive_lc2(load)
    +    serde = JsonPlusSerializer(allowed_json_modules=[("pprint", "pprint")])
    +    serde.loads_typed(("json", json.dumps(load).encode("utf-8")))
    +
    +
     def test_serde_jsonplus_bytearray() -> None:
         serde = JsonPlusSerializer()
     
    
  • libs/langgraph/.claude/settings.local.json+4 2 modified
    @@ -7,8 +7,10 @@
           "Bash(sed:*)",
           "Bash(awk:*)",
           "Bash(uv run mypy:*)",
    -      "Bash(uv run:*)"
    +      "Bash(uv run:*)",
    +      "Bash(make test:*)",
    +      "Bash(make test_parallel:*)"
         ],
         "deny": []
       }
    -}
    \ No newline at end of file
    +}
    
  • libs/langgraph/tests/test_interrupt_migration.py+5 5 modified
    @@ -21,18 +21,18 @@ def test_interrupt_legacy_ns() -> None:
             assert new_interrupt.id == old_interrupt.id
     
     
    -serializer = JsonPlusSerializer()
    +serializer = JsonPlusSerializer(allowed_json_modules=True)
     
     
     def test_serialization_roundtrip() -> None:
         """Test that the legacy interrupt (pre v1) can be reserialized as the modern interrupt without id corruption."""
     
         # generated with:
    -    # JsonPlusSerializer().dumps(Interrupt(value="legacy_test", ns=["legacy_test"], resumable=True, when="during"))
    +    # JsonPlusSerializer().dumps_typed(Interrupt(value="legacy_test", ns=["legacy_test"], resumable=True, when="during"))
         legacy_interrupt_bytes = b'{"lc": 2, "type": "constructor", "id": ["langgraph", "types", "Interrupt"], "kwargs": {"value": "legacy_test", "resumable": true, "ns": ["legacy_test"], "when": "during"}}'
         legacy_interrupt_id = "f1fa625689ec006a5b32b76863e22a6c"
     
    -    interrupt = serializer.loads(legacy_interrupt_bytes)
    +    interrupt = serializer.loads_typed(("json", legacy_interrupt_bytes))
         assert interrupt.id == legacy_interrupt_id
         assert interrupt.value == "legacy_test"
     
    @@ -41,10 +41,10 @@ def test_serialization_roundtrip_complex_ns() -> None:
         """Test that the legacy interrupt (pre v1), with a more complex ns can be reserialized as the modern interrupt without id corruption."""
     
         # generated with:
    -    # JsonPlusSerializer().dumps(Interrupt(value="legacy_test", ns=["legacy:test", "with:complex", "name:space"], resumable=True, when="during"))
    +    # JsonPlusSerializer().dumps_typed(Interrupt(value="legacy_test", ns=["legacy:test", "with:complex", "name:space"], resumable=True, when="during"))
         legacy_interrupt_bytes = b'{"lc": 2, "type": "constructor", "id": ["langgraph", "types", "Interrupt"], "kwargs": {"value": "legacy_test", "resumable": true, "ns": ["legacy:test", "with:complex", "name:space"], "when": "during"}}'
         legacy_interrupt_id = "e69356a9ee3630ee7f4f597f2693000c"
     
    -    interrupt = serializer.loads(legacy_interrupt_bytes)
    +    interrupt = serializer.loads_typed(("json", legacy_interrupt_bytes))
         assert interrupt.id == legacy_interrupt_id
         assert interrupt.value == "legacy_test"
    

Vulnerability mechanics

Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

6

News mentions

0

No linked articles in our index yet.