VYPR
Low severity2.5OSV Advisory· Published Oct 3, 2025· Updated Apr 15, 2026

CVE-2025-61677

CVE-2025-61677

Description

DataChain is a Python-based AI-data warehouse for transforming and analyzing unstructured data. Versions 0.34.1 and below allow for deseriaization of untrusted data because of the way the DataChain library reads serialized objects from environment variables (such as DATACHAIN__METASTORE and DATACHAIN__WAREHOUSE) in the loader.py module. An attacker with the ability to set these environment variables can trigger code execution when the application loads. This issue is fixed in version 0.34.2.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
datachainPyPI
< 0.34.20.34.2

Affected products

1

Patches

1
914b95610620

remove pickle from serializer to be safe (#1358)

https://github.com/iterative/datachainIvan ShchekleinOct 1, 2025via ghsa
9 files changed · +341 73
  • src/datachain/catalog/loader.py+5 0 modified
    @@ -3,6 +3,7 @@
     from importlib import import_module
     from typing import TYPE_CHECKING, Any, Optional
     
    +from datachain.plugins import ensure_plugins_loaded
     from datachain.utils import get_envs_by_prefix
     
     if TYPE_CHECKING:
    @@ -24,6 +25,8 @@
     
     
     def get_metastore(in_memory: bool = False) -> "AbstractMetastore":
    +    ensure_plugins_loaded()
    +
         from datachain.data_storage import AbstractMetastore
         from datachain.data_storage.serializer import deserialize
     
    @@ -64,6 +67,8 @@ def get_metastore(in_memory: bool = False) -> "AbstractMetastore":
     
     
     def get_warehouse(in_memory: bool = False) -> "AbstractWarehouse":
    +    ensure_plugins_loaded()
    +
         from datachain.data_storage import AbstractWarehouse
         from datachain.data_storage.serializer import deserialize
     
    
  • src/datachain/data_storage/serializer.py+105 15 modified
    @@ -1,29 +1,119 @@
     import base64
    -import pickle
    +import json
     from abc import abstractmethod
     from collections.abc import Callable
    -from typing import Any
    +from typing import Any, ClassVar
    +
    +from datachain.plugins import ensure_plugins_loaded
    +
    +
    +class CallableRegistry:
    +    _registry: ClassVar[dict[str, Callable]] = {}
    +
    +    @classmethod
    +    def register(cls, callable_obj: Callable, name: str) -> str:
    +        cls._registry[name] = callable_obj
    +        return name
    +
    +    @classmethod
    +    def get(cls, name: str) -> Callable:
    +        return cls._registry[name]
     
     
     class Serializable:
    +    @classmethod
    +    @abstractmethod
    +    def serialize_callable_name(cls) -> str:
    +        """Return the registered name used for this class' factory callable."""
    +
         @abstractmethod
         def clone_params(self) -> tuple[Callable[..., Any], list[Any], dict[str, Any]]:
    -        """
    -        Returns the class, args, and kwargs needed to instantiate a cloned copy
    -        of this instance for use in separate processes or machines.
    -        """
    +        """Return (callable, args, kwargs) necessary to recreate this object."""
    +
    +    def _prepare(self, params: tuple) -> dict:
    +        callable, args, kwargs = params
    +        callable_name = callable.__self__.serialize_callable_name()
    +        return {
    +            "callable": callable_name,
    +            "args": args,
    +            "kwargs": {
    +                k: self._prepare(v) if isinstance(v, tuple) else v
    +                for k, v in kwargs.items()
    +            },
    +        }
     
         def serialize(self) -> str:
    -        """
    -        Returns a string representation of clone params.
    -        This is useful for storing the state of an object in environment variable.
    -        """
    -        return base64.b64encode(pickle.dumps(self.clone_params())).decode()
    +        """Return a base64-encoded JSON string with registered callable + params."""
    +        _ensure_default_callables_registered()
    +        data = self.clone_params()
    +        return base64.b64encode(json.dumps(self._prepare(data)).encode()).decode()
     
     
     def deserialize(s: str) -> Serializable:
    +    """Deserialize from base64-encoded JSON using only registered callables.
    +
    +    Nested serialized objects are instantiated automatically except for those
    +    passed via clone parameter tuples (keys ending with ``_clone_params``),
    +    which must remain as (callable, args, kwargs) for later factory usage.
         """
    -    Returns a new instance of the class represented by the string.
    -    """
    -    (f, args, kwargs) = pickle.loads(base64.b64decode(s.encode()))  # noqa: S301
    -    return f(*args, **kwargs)
    +    ensure_plugins_loaded()
    +    _ensure_default_callables_registered()
    +    decoded = base64.b64decode(s.encode())
    +    data = json.loads(decoded.decode())
    +
    +    def _is_serialized(obj: Any) -> bool:
    +        return isinstance(obj, dict) and {"callable", "args", "kwargs"}.issubset(
    +            obj.keys()
    +        )
    +
    +    def _reconstruct(obj: Any, nested: bool = False) -> Any:
    +        if not _is_serialized(obj):
    +            return obj
    +        callable_name: str = obj["callable"]
    +        args: list[Any] = obj["args"]
    +        kwargs: dict[str, Any] = obj["kwargs"]
    +        # Recurse only inside kwargs because serialize() only nests through kwargs
    +        for k, v in list(kwargs.items()):
    +            if _is_serialized(v):
    +                kwargs[k] = _reconstruct(v, True)
    +        callable_obj = CallableRegistry.get(callable_name)
    +        if nested:
    +            return (callable_obj, args, kwargs)
    +        # Otherwise instantiate
    +        return callable_obj(*args, **kwargs)
    +
    +    if not _is_serialized(data):
    +        raise ValueError("Invalid serialized data format")
    +    return _reconstruct(data, False)
    +
    +
    +class _DefaultsState:
    +    registered = False
    +
    +
    +def _ensure_default_callables_registered() -> None:
    +    if _DefaultsState.registered:
    +        return
    +
    +    from datachain.data_storage.sqlite import (
    +        SQLiteDatabaseEngine,
    +        SQLiteMetastore,
    +        SQLiteWarehouse,
    +    )
    +
    +    # Register (idempotent by name overwrite is fine) using class-level
    +    # serialization names to avoid hard-coded literals here.
    +    CallableRegistry.register(
    +        SQLiteDatabaseEngine.from_db_file,
    +        SQLiteDatabaseEngine.serialize_callable_name(),
    +    )
    +    CallableRegistry.register(
    +        SQLiteMetastore.init_after_clone,
    +        SQLiteMetastore.serialize_callable_name(),
    +    )
    +    CallableRegistry.register(
    +        SQLiteWarehouse.init_after_clone,
    +        SQLiteWarehouse.serialize_callable_name(),
    +    )
    +
    +    _DefaultsState.registered = True
    
  • src/datachain/data_storage/sqlite.py+13 1 modified
    @@ -201,10 +201,14 @@ def clone_params(self) -> tuple[Callable[..., Any], list[Any], dict[str, Any]]:
             """
             return (
                 SQLiteDatabaseEngine.from_db_file,
    -            [self.db_file],
    +            [str(self.db_file)],
                 {},
             )
     
    +    @classmethod
    +    def serialize_callable_name(cls) -> str:
    +        return "sqlite.from_db_file"
    +
         def _reconnect(self) -> None:
             if not self.is_closed:
                 raise RuntimeError("Cannot reconnect on still-open DB!")
    @@ -403,6 +407,10 @@ def clone_params(self) -> tuple[Callable[..., Any], list[Any], dict[str, Any]]:
                 },
             )
     
    +    @classmethod
    +    def serialize_callable_name(cls) -> str:
    +        return "sqlite.metastore.init_after_clone"
    +
         @classmethod
         def init_after_clone(
             cls,
    @@ -610,6 +618,10 @@ def clone_params(self) -> tuple[Callable[..., Any], list[Any], dict[str, Any]]:
                 {"db_clone_params": self.db.clone_params()},
             )
     
    +    @classmethod
    +    def serialize_callable_name(cls) -> str:
    +        return "sqlite.warehouse.init_after_clone"
    +
         @classmethod
         def init_after_clone(
             cls,
    
  • src/datachain/plugins.py+30 0 added
    @@ -0,0 +1,30 @@
    +"""Plugin loader for DataChain callables.
    +
    +Discovers and invokes entry points in the group "datachain.callables" once
    +per process. This enables external packages (e.g., Studio) to register
    +their callables with the serializer registry without explicit imports.
    +"""
    +
    +from importlib import metadata as importlib_metadata
    +
    +_plugins_loaded = False
    +
    +
    +def ensure_plugins_loaded() -> None:
    +    global _plugins_loaded  # noqa: PLW0603
    +    if _plugins_loaded:
    +        return
    +
    +    # Compatible across importlib.metadata versions
    +    eps_obj = importlib_metadata.entry_points()
    +    if hasattr(eps_obj, "select"):
    +        eps_list = eps_obj.select(group="datachain.callables")
    +    else:
    +        # Compatibility for older versions of importlib_metadata, Python 3.9
    +        eps_list = eps_obj.get("datachain.callables", [])  # type: ignore[attr-defined]
    +
    +    for ep in eps_list:
    +        func = ep.load()
    +        func()
    +
    +    _plugins_loaded = True
    
  • tests/conftest.py+1 0 modified
    @@ -126,6 +126,7 @@ def clean_environment(
         working_dir = str(tmp_path_factory.mktemp("default_working_dir"))
         monkeypatch_session.chdir(working_dir)
         monkeypatch_session.delenv(DataChainDir.ENV_VAR, raising=False)
    +    monkeypatch_session.delenv(DataChainDir.ENV_VAR_DATACHAIN_ROOT, raising=False)
     
     
     @pytest.fixture
    
  • tests/unit/test_database_engine.py+13 11 modified
    @@ -1,12 +1,15 @@
     import base64
    +import json
     import os
    -import pickle
     
     import pytest
     from sqlalchemy import Column, Integer, Table
     
     from datachain.data_storage.serializer import deserialize
    -from datachain.data_storage.sqlite import SQLiteDatabaseEngine, get_db_file_in_memory
    +from datachain.data_storage.sqlite import (
    +    SQLiteDatabaseEngine,
    +    get_db_file_in_memory,
    +)
     from tests.utils import skip_if_not_sqlite
     
     
    @@ -24,6 +27,7 @@ def test_init_clone(tmp_dir, db_file, expected_db_file):
             expected_db_file = os.fspath(tmp_dir / expected_db_file)
     
         with SQLiteDatabaseEngine.from_db_file(db_file) as db:
    +        assert isinstance(db, SQLiteDatabaseEngine)
             assert db.db_file == expected_db_file
     
             # Test clone
    @@ -53,17 +57,15 @@ def test_get_db_file_in_memory(db_file, in_memory, expected):
     
     
     def test_serialize(sqlite_db):
    -    # Test serialization
    +    # JSON serialization format
         serialized = sqlite_db.serialize()
         assert serialized
    -    serialized_pickled = base64.b64decode(serialized.encode())
    -    assert serialized_pickled
    -    (f, args, kwargs) = pickle.loads(serialized_pickled)  # noqa: S301
    -    assert str(f) == str(SQLiteDatabaseEngine.from_db_file)
    -    assert args == [":memory:"]
    -    assert kwargs == {}
    -
    -    # Test deserialization
    +    raw = base64.b64decode(serialized.encode())
    +    data = json.loads(raw.decode())
    +    assert data["callable"] == "sqlite.from_db_file"
    +    assert data["args"] == [":memory:"]
    +    assert data["kwargs"] == {}
    +
         obj3 = deserialize(serialized)
         assert isinstance(obj3, SQLiteDatabaseEngine)
         assert obj3.db_file == ":memory:"
    
  • tests/unit/test_metastore.py+12 11 modified
    @@ -1,5 +1,5 @@
     import base64
    -import pickle
    +import json
     
     import pytest
     
    @@ -24,18 +24,19 @@ def test_sqlite_metastore(sqlite_db):
         assert obj2.db.db_file == sqlite_db.db_file
         assert obj2.clone_params() == obj.clone_params()
     
    -    # Test serialization
    +    # Test serialization JSON format
         serialized = obj.serialize()
         assert serialized
    -    serialized_pickled = base64.b64decode(serialized.encode())
    -    assert serialized_pickled
    -    (f, args, kwargs) = pickle.loads(serialized_pickled)  # noqa: S301
    -    assert str(f) == str(SQLiteMetastore.init_after_clone)
    -    assert args == []
    -    assert kwargs["uri"] == uri
    -    assert str(kwargs["db_clone_params"]) == str(sqlite_db.clone_params())
    -
    -    # Test deserialization
    +    raw = base64.b64decode(serialized.encode())
    +    data = json.loads(raw.decode())
    +    assert data["callable"] == "sqlite.metastore.init_after_clone"
    +    assert data["args"] == []
    +    assert data["kwargs"]["uri"] == uri
    +    nested = data["kwargs"]["db_clone_params"]
    +    assert nested["callable"] == "sqlite.from_db_file"
    +    assert nested["args"] == [":memory:"]
    +    assert nested["kwargs"] == {}
    +
         obj3 = deserialize(serialized)
         assert isinstance(obj3, SQLiteMetastore)
         assert obj3.uri == uri
    
  • tests/unit/test_serializer.py+151 25 modified
    @@ -1,18 +1,32 @@
     import base64
    -import pickle
    +import json
    +from collections.abc import Callable
    +from typing import Any, Optional
     
     import pytest
     
    -from datachain.data_storage.serializer import Serializable, deserialize
    +from datachain.data_storage.serializer import (
    +    CallableRegistry,
    +    Serializable,
    +    deserialize,
    +)
     
     
     class MySerializableInit(Serializable):
         def __init__(self, name, optional=None):
             self.name = name
             self.optional = optional
     
    +    @classmethod
    +    def serialize_callable_name(cls):
    +        return "MySerializableInit"
    +
    +    @classmethod
    +    def build(cls, name, optional=None):
    +        return cls(name, optional=optional)
    +
         def clone_params(self):
    -        return MySerializableInit, [self.name], {"optional": self.optional}
    +        return self.__class__.build, [self.name], {"optional": self.optional}
     
         def get_params(self):
             return self.name, self.optional
    @@ -27,6 +41,10 @@ def __init__(self, name, optional=None):
         def from_params(cls, name, optional=None):
             return cls(name, optional=optional)
     
    +    @classmethod
    +    def serialize_callable_name(cls):
    +        return "MySerializableFunc.from_params"
    +
         def clone_params(self):
             return self.from_params, [self.name], {"optional": self.optional}
     
    @@ -35,15 +53,35 @@ def get_params(self):
     
     
     class MySerializableNoParams(Serializable):
    +    @classmethod
    +    def serialize_callable_name(cls):
    +        return "MySerializableNoParams"
    +
         def clone_params(self):
    -        return MySerializableNoParams, [], {}
    +        return self.__class__.build, [], {}
    +
    +    @classmethod
    +    def build(cls):
    +        return cls()
    +
    +
    +# Register test classes/functions for the serializer with explicit names
    +CallableRegistry.register(MySerializableInit.build, "MySerializableInit")
    +CallableRegistry.register(
    +    MySerializableFunc.from_params, "MySerializableFunc.from_params"
    +)
    +CallableRegistry.register(MySerializableNoParams.build, "MySerializableNoParams")
     
     
     @pytest.mark.parametrize(
    -    "cls,call",
    +    "cls,call,call_name",
         [
    -        (MySerializableInit, MySerializableInit),
    -        (MySerializableFunc, MySerializableFunc.from_params),
    +        (MySerializableInit, MySerializableInit.build, "MySerializableInit"),
    +        (
    +            MySerializableFunc,
    +            MySerializableFunc.from_params,
    +            "MySerializableFunc.from_params",
    +        ),
         ],
     )
     @pytest.mark.parametrize(
    @@ -55,38 +93,126 @@ def clone_params(self):
             ("bar", 24),
         ],
     )
    -def test_serializable_init(cls, call, name, optional):
    +def test_serializable_json_format(cls, call, call_name, name, optional):
    +    """Test the new JSON-based serialization format."""
         obj = cls(name, optional=optional)
         assert obj.clone_params() == (call, [name], {"optional": optional})
     
    +    # Test new JSON serialization
         serialized = obj.serialize()
         assert serialized
    -    serialized_pickled = base64.b64decode(serialized.encode())
    -    assert serialized_pickled
    -    (f, args, kwargs) = pickle.loads(serialized_pickled)  # noqa: S301
    -    assert str(f) == str(call)
    -    assert args == [name]
    -    assert kwargs == {"optional": optional}
    +
    +    # Verify it's JSON format by decoding
    +    serialized_decoded = base64.b64decode(serialized.encode())
    +    data = json.loads(serialized_decoded.decode())
    +    assert data["callable"] == call_name
    +    assert data["args"] == [name]
    +    assert data["kwargs"] == {"optional": optional}
     
         obj2 = deserialize(serialized)
         assert isinstance(obj2, cls)
    -    assert obj2.name == name
    -    assert obj2.optional == optional
    -    assert obj2.get_params() == (name, optional)
    +    assert obj2.name == name  # type: ignore[attr-defined]
    +    assert obj2.optional == optional  # type: ignore[attr-defined]
    +    assert obj2.get_params() == (name, optional)  # type: ignore[attr-defined]
     
     
    -def test_serializable_init_no_params():
    +def test_serializable_no_params():
    +    """Test serialization with no parameters."""
         obj = MySerializableNoParams()
    -    assert obj.clone_params() == (MySerializableNoParams, [], {})
    +    assert obj.clone_params() == (MySerializableNoParams.build, [], {})
     
    +    # Test new JSON serialization
         serialized = obj.serialize()
         assert serialized
    -    serialized_pickled = base64.b64decode(serialized.encode())
    -    assert serialized_pickled
    -    (f, args, kwargs) = pickle.loads(serialized_pickled)  # noqa: S301
    -    assert f == MySerializableNoParams
    -    assert args == []
    -    assert kwargs == {}
    +
    +    # Verify it's JSON format
    +    serialized_decoded = base64.b64decode(serialized.encode())
    +    data = json.loads(serialized_decoded.decode())
    +    assert data["callable"] == "MySerializableNoParams"
    +    assert data["args"] == []
    +    assert data["kwargs"] == {}
     
         obj2 = deserialize(serialized)
         assert isinstance(obj2, MySerializableNoParams)
    +
    +
    +def test_callable_registry():
    +    """Test the CallableRegistry functionality."""
    +
    +    # Test registration
    +    def dummy_func():
    +        pass
    +
    +    CallableRegistry.register(dummy_func, "dummy_func")
    +    assert CallableRegistry.get("dummy_func") is dummy_func
    +
    +    # Test error cases
    +    with pytest.raises(KeyError):
    +        CallableRegistry.get("nonexistent")
    +
    +    def unregistered_func():
    +        pass
    +
    +    with pytest.raises(KeyError):
    +        CallableRegistry.get("unregistered_func")
    +
    +
    +def test_reject_unregistered_callable():
    +    """Ensure unregistered callable names cannot be deserialized."""
    +    data = {"callable": "nonexistent_callable", "args": [], "kwargs": {}}
    +    malicious_serialized = base64.b64encode(json.dumps(data).encode()).decode()
    +    with pytest.raises(KeyError):
    +        deserialize(malicious_serialized)
    +
    +
    +class NestedSerializable(Serializable):
    +    def __init__(self, value: int, child: Optional["NestedSerializable"] = None):
    +        self.value = value
    +        self.child = child
    +
    +    @classmethod
    +    def factory(
    +        cls,
    +        value: int,
    +        child: Optional[tuple[Callable, list, dict[str, Any]]] = None,
    +    ) -> "NestedSerializable":
    +        if child is not None:
    +            f, a, kw = child
    +            child_obj = f(*a, **kw)
    +        else:
    +            child_obj = None
    +        return cls(value, child_obj)
    +
    +    @classmethod
    +    def serialize_callable_name(cls):
    +        return "NestedSerializable.factory"
    +
    +    def clone_params(self):
    +        return (
    +            self.factory,
    +            [self.value],
    +            {"child": (self.child.clone_params() if self.child else None)},
    +        )
    +
    +
    +CallableRegistry.register(NestedSerializable.factory, "NestedSerializable.factory")
    +
    +
    +def test_nested_recursive_serialization():
    +    leaf = NestedSerializable(2)
    +    root = NestedSerializable(1, child=leaf)
    +    serialized = root.serialize()
    +    restored = deserialize(serialized)
    +    assert isinstance(restored, NestedSerializable)
    +    assert restored.value == 1
    +    assert isinstance(restored.child, NestedSerializable)
    +    assert restored.child.value == 2
    +    assert restored.child.child is None
    +
    +
    +def test_deserialize_invalid_top_level():
    +    bad = base64.b64encode(json.dumps({"foo": 1}).encode()).decode()
    +    with pytest.raises(ValueError):
    +        deserialize(bad)
    +    with pytest.raises(ValueError):
    +        deserialize("Zm9vYmFy")  # base64 for 'foobar'
    
  • tests/unit/test_warehouse.py+11 10 modified
    @@ -1,5 +1,5 @@
     import base64
    -import pickle
    +import json
     
     from datachain.data_storage.serializer import deserialize
     from datachain.data_storage.sqlite import (
    @@ -17,17 +17,18 @@ def test_serialize(sqlite_db):
         assert obj2.db.db_file == sqlite_db.db_file
         assert obj2.clone_params() == obj.clone_params()
     
    -    # Test serialization
    +    # Test serialization JSON format
         serialized = obj.serialize()
         assert serialized
    -    serialized_pickled = base64.b64decode(serialized.encode())
    -    assert serialized_pickled
    -    (f, args, kwargs) = pickle.loads(serialized_pickled)  # noqa: S301
    -    assert str(f) == str(SQLiteWarehouse.init_after_clone)
    -    assert args == []
    -    assert str(kwargs["db_clone_params"]) == str(sqlite_db.clone_params())
    -
    -    # Test deserialization
    +    raw = base64.b64decode(serialized.encode())
    +    data = json.loads(raw.decode())
    +    assert data["callable"] == "sqlite.warehouse.init_after_clone"
    +    assert data["args"] == []
    +    nested = data["kwargs"]["db_clone_params"]
    +    assert nested["callable"] == "sqlite.from_db_file"
    +    assert nested["args"] == [":memory:"]
    +    assert nested["kwargs"] == {}
    +
         obj3 = deserialize(serialized)
         assert isinstance(obj3, SQLiteWarehouse)
         assert obj3.db.db_file == sqlite_db.db_file
    

Vulnerability mechanics

Synthesis attempt was rejected by the grounding validator. Re-run pending.

References

5

News mentions

0

No linked articles in our index yet.