Low severity2.5OSV Advisory· Published Oct 3, 2025· Updated Apr 15, 2026
CVE-2025-61677
CVE-2025-61677
Description
DataChain is a Python-based AI-data warehouse for transforming and analyzing unstructured data. Versions 0.34.1 and below allow for deseriaization of untrusted data because of the way the DataChain library reads serialized objects from environment variables (such as DATACHAIN__METASTORE and DATACHAIN__WAREHOUSE) in the loader.py module. An attacker with the ability to set these environment variables can trigger code execution when the application loads. This issue is fixed in version 0.34.2.
Affected packages
Versions sourced from the GitHub Security Advisory.
| Package | Affected versions | Patched versions |
|---|---|---|
datachainPyPI | < 0.34.2 | 0.34.2 |
Affected products
1Patches
1914b95610620remove pickle from serializer to be safe (#1358)
9 files changed · +341 −73
src/datachain/catalog/loader.py+5 −0 modified@@ -3,6 +3,7 @@ from importlib import import_module from typing import TYPE_CHECKING, Any, Optional +from datachain.plugins import ensure_plugins_loaded from datachain.utils import get_envs_by_prefix if TYPE_CHECKING: @@ -24,6 +25,8 @@ def get_metastore(in_memory: bool = False) -> "AbstractMetastore": + ensure_plugins_loaded() + from datachain.data_storage import AbstractMetastore from datachain.data_storage.serializer import deserialize @@ -64,6 +67,8 @@ def get_metastore(in_memory: bool = False) -> "AbstractMetastore": def get_warehouse(in_memory: bool = False) -> "AbstractWarehouse": + ensure_plugins_loaded() + from datachain.data_storage import AbstractWarehouse from datachain.data_storage.serializer import deserialize
src/datachain/data_storage/serializer.py+105 −15 modified@@ -1,29 +1,119 @@ import base64 -import pickle +import json from abc import abstractmethod from collections.abc import Callable -from typing import Any +from typing import Any, ClassVar + +from datachain.plugins import ensure_plugins_loaded + + +class CallableRegistry: + _registry: ClassVar[dict[str, Callable]] = {} + + @classmethod + def register(cls, callable_obj: Callable, name: str) -> str: + cls._registry[name] = callable_obj + return name + + @classmethod + def get(cls, name: str) -> Callable: + return cls._registry[name] class Serializable: + @classmethod + @abstractmethod + def serialize_callable_name(cls) -> str: + """Return the registered name used for this class' factory callable.""" + @abstractmethod def clone_params(self) -> tuple[Callable[..., Any], list[Any], dict[str, Any]]: - """ - Returns the class, args, and kwargs needed to instantiate a cloned copy - of this instance for use in separate processes or machines. - """ + """Return (callable, args, kwargs) necessary to recreate this object.""" + + def _prepare(self, params: tuple) -> dict: + callable, args, kwargs = params + callable_name = callable.__self__.serialize_callable_name() + return { + "callable": callable_name, + "args": args, + "kwargs": { + k: self._prepare(v) if isinstance(v, tuple) else v + for k, v in kwargs.items() + }, + } def serialize(self) -> str: - """ - Returns a string representation of clone params. - This is useful for storing the state of an object in environment variable. - """ - return base64.b64encode(pickle.dumps(self.clone_params())).decode() + """Return a base64-encoded JSON string with registered callable + params.""" + _ensure_default_callables_registered() + data = self.clone_params() + return base64.b64encode(json.dumps(self._prepare(data)).encode()).decode() def deserialize(s: str) -> Serializable: + """Deserialize from base64-encoded JSON using only registered callables. + + Nested serialized objects are instantiated automatically except for those + passed via clone parameter tuples (keys ending with ``_clone_params``), + which must remain as (callable, args, kwargs) for later factory usage. """ - Returns a new instance of the class represented by the string. - """ - (f, args, kwargs) = pickle.loads(base64.b64decode(s.encode())) # noqa: S301 - return f(*args, **kwargs) + ensure_plugins_loaded() + _ensure_default_callables_registered() + decoded = base64.b64decode(s.encode()) + data = json.loads(decoded.decode()) + + def _is_serialized(obj: Any) -> bool: + return isinstance(obj, dict) and {"callable", "args", "kwargs"}.issubset( + obj.keys() + ) + + def _reconstruct(obj: Any, nested: bool = False) -> Any: + if not _is_serialized(obj): + return obj + callable_name: str = obj["callable"] + args: list[Any] = obj["args"] + kwargs: dict[str, Any] = obj["kwargs"] + # Recurse only inside kwargs because serialize() only nests through kwargs + for k, v in list(kwargs.items()): + if _is_serialized(v): + kwargs[k] = _reconstruct(v, True) + callable_obj = CallableRegistry.get(callable_name) + if nested: + return (callable_obj, args, kwargs) + # Otherwise instantiate + return callable_obj(*args, **kwargs) + + if not _is_serialized(data): + raise ValueError("Invalid serialized data format") + return _reconstruct(data, False) + + +class _DefaultsState: + registered = False + + +def _ensure_default_callables_registered() -> None: + if _DefaultsState.registered: + return + + from datachain.data_storage.sqlite import ( + SQLiteDatabaseEngine, + SQLiteMetastore, + SQLiteWarehouse, + ) + + # Register (idempotent by name overwrite is fine) using class-level + # serialization names to avoid hard-coded literals here. + CallableRegistry.register( + SQLiteDatabaseEngine.from_db_file, + SQLiteDatabaseEngine.serialize_callable_name(), + ) + CallableRegistry.register( + SQLiteMetastore.init_after_clone, + SQLiteMetastore.serialize_callable_name(), + ) + CallableRegistry.register( + SQLiteWarehouse.init_after_clone, + SQLiteWarehouse.serialize_callable_name(), + ) + + _DefaultsState.registered = True
src/datachain/data_storage/sqlite.py+13 −1 modified@@ -201,10 +201,14 @@ def clone_params(self) -> tuple[Callable[..., Any], list[Any], dict[str, Any]]: """ return ( SQLiteDatabaseEngine.from_db_file, - [self.db_file], + [str(self.db_file)], {}, ) + @classmethod + def serialize_callable_name(cls) -> str: + return "sqlite.from_db_file" + def _reconnect(self) -> None: if not self.is_closed: raise RuntimeError("Cannot reconnect on still-open DB!") @@ -403,6 +407,10 @@ def clone_params(self) -> tuple[Callable[..., Any], list[Any], dict[str, Any]]: }, ) + @classmethod + def serialize_callable_name(cls) -> str: + return "sqlite.metastore.init_after_clone" + @classmethod def init_after_clone( cls, @@ -610,6 +618,10 @@ def clone_params(self) -> tuple[Callable[..., Any], list[Any], dict[str, Any]]: {"db_clone_params": self.db.clone_params()}, ) + @classmethod + def serialize_callable_name(cls) -> str: + return "sqlite.warehouse.init_after_clone" + @classmethod def init_after_clone( cls,
src/datachain/plugins.py+30 −0 added@@ -0,0 +1,30 @@ +"""Plugin loader for DataChain callables. + +Discovers and invokes entry points in the group "datachain.callables" once +per process. This enables external packages (e.g., Studio) to register +their callables with the serializer registry without explicit imports. +""" + +from importlib import metadata as importlib_metadata + +_plugins_loaded = False + + +def ensure_plugins_loaded() -> None: + global _plugins_loaded # noqa: PLW0603 + if _plugins_loaded: + return + + # Compatible across importlib.metadata versions + eps_obj = importlib_metadata.entry_points() + if hasattr(eps_obj, "select"): + eps_list = eps_obj.select(group="datachain.callables") + else: + # Compatibility for older versions of importlib_metadata, Python 3.9 + eps_list = eps_obj.get("datachain.callables", []) # type: ignore[attr-defined] + + for ep in eps_list: + func = ep.load() + func() + + _plugins_loaded = True
tests/conftest.py+1 −0 modified@@ -126,6 +126,7 @@ def clean_environment( working_dir = str(tmp_path_factory.mktemp("default_working_dir")) monkeypatch_session.chdir(working_dir) monkeypatch_session.delenv(DataChainDir.ENV_VAR, raising=False) + monkeypatch_session.delenv(DataChainDir.ENV_VAR_DATACHAIN_ROOT, raising=False) @pytest.fixture
tests/unit/test_database_engine.py+13 −11 modified@@ -1,12 +1,15 @@ import base64 +import json import os -import pickle import pytest from sqlalchemy import Column, Integer, Table from datachain.data_storage.serializer import deserialize -from datachain.data_storage.sqlite import SQLiteDatabaseEngine, get_db_file_in_memory +from datachain.data_storage.sqlite import ( + SQLiteDatabaseEngine, + get_db_file_in_memory, +) from tests.utils import skip_if_not_sqlite @@ -24,6 +27,7 @@ def test_init_clone(tmp_dir, db_file, expected_db_file): expected_db_file = os.fspath(tmp_dir / expected_db_file) with SQLiteDatabaseEngine.from_db_file(db_file) as db: + assert isinstance(db, SQLiteDatabaseEngine) assert db.db_file == expected_db_file # Test clone @@ -53,17 +57,15 @@ def test_get_db_file_in_memory(db_file, in_memory, expected): def test_serialize(sqlite_db): - # Test serialization + # JSON serialization format serialized = sqlite_db.serialize() assert serialized - serialized_pickled = base64.b64decode(serialized.encode()) - assert serialized_pickled - (f, args, kwargs) = pickle.loads(serialized_pickled) # noqa: S301 - assert str(f) == str(SQLiteDatabaseEngine.from_db_file) - assert args == [":memory:"] - assert kwargs == {} - - # Test deserialization + raw = base64.b64decode(serialized.encode()) + data = json.loads(raw.decode()) + assert data["callable"] == "sqlite.from_db_file" + assert data["args"] == [":memory:"] + assert data["kwargs"] == {} + obj3 = deserialize(serialized) assert isinstance(obj3, SQLiteDatabaseEngine) assert obj3.db_file == ":memory:"
tests/unit/test_metastore.py+12 −11 modified@@ -1,5 +1,5 @@ import base64 -import pickle +import json import pytest @@ -24,18 +24,19 @@ def test_sqlite_metastore(sqlite_db): assert obj2.db.db_file == sqlite_db.db_file assert obj2.clone_params() == obj.clone_params() - # Test serialization + # Test serialization JSON format serialized = obj.serialize() assert serialized - serialized_pickled = base64.b64decode(serialized.encode()) - assert serialized_pickled - (f, args, kwargs) = pickle.loads(serialized_pickled) # noqa: S301 - assert str(f) == str(SQLiteMetastore.init_after_clone) - assert args == [] - assert kwargs["uri"] == uri - assert str(kwargs["db_clone_params"]) == str(sqlite_db.clone_params()) - - # Test deserialization + raw = base64.b64decode(serialized.encode()) + data = json.loads(raw.decode()) + assert data["callable"] == "sqlite.metastore.init_after_clone" + assert data["args"] == [] + assert data["kwargs"]["uri"] == uri + nested = data["kwargs"]["db_clone_params"] + assert nested["callable"] == "sqlite.from_db_file" + assert nested["args"] == [":memory:"] + assert nested["kwargs"] == {} + obj3 = deserialize(serialized) assert isinstance(obj3, SQLiteMetastore) assert obj3.uri == uri
tests/unit/test_serializer.py+151 −25 modified@@ -1,18 +1,32 @@ import base64 -import pickle +import json +from collections.abc import Callable +from typing import Any, Optional import pytest -from datachain.data_storage.serializer import Serializable, deserialize +from datachain.data_storage.serializer import ( + CallableRegistry, + Serializable, + deserialize, +) class MySerializableInit(Serializable): def __init__(self, name, optional=None): self.name = name self.optional = optional + @classmethod + def serialize_callable_name(cls): + return "MySerializableInit" + + @classmethod + def build(cls, name, optional=None): + return cls(name, optional=optional) + def clone_params(self): - return MySerializableInit, [self.name], {"optional": self.optional} + return self.__class__.build, [self.name], {"optional": self.optional} def get_params(self): return self.name, self.optional @@ -27,6 +41,10 @@ def __init__(self, name, optional=None): def from_params(cls, name, optional=None): return cls(name, optional=optional) + @classmethod + def serialize_callable_name(cls): + return "MySerializableFunc.from_params" + def clone_params(self): return self.from_params, [self.name], {"optional": self.optional} @@ -35,15 +53,35 @@ def get_params(self): class MySerializableNoParams(Serializable): + @classmethod + def serialize_callable_name(cls): + return "MySerializableNoParams" + def clone_params(self): - return MySerializableNoParams, [], {} + return self.__class__.build, [], {} + + @classmethod + def build(cls): + return cls() + + +# Register test classes/functions for the serializer with explicit names +CallableRegistry.register(MySerializableInit.build, "MySerializableInit") +CallableRegistry.register( + MySerializableFunc.from_params, "MySerializableFunc.from_params" +) +CallableRegistry.register(MySerializableNoParams.build, "MySerializableNoParams") @pytest.mark.parametrize( - "cls,call", + "cls,call,call_name", [ - (MySerializableInit, MySerializableInit), - (MySerializableFunc, MySerializableFunc.from_params), + (MySerializableInit, MySerializableInit.build, "MySerializableInit"), + ( + MySerializableFunc, + MySerializableFunc.from_params, + "MySerializableFunc.from_params", + ), ], ) @pytest.mark.parametrize( @@ -55,38 +93,126 @@ def clone_params(self): ("bar", 24), ], ) -def test_serializable_init(cls, call, name, optional): +def test_serializable_json_format(cls, call, call_name, name, optional): + """Test the new JSON-based serialization format.""" obj = cls(name, optional=optional) assert obj.clone_params() == (call, [name], {"optional": optional}) + # Test new JSON serialization serialized = obj.serialize() assert serialized - serialized_pickled = base64.b64decode(serialized.encode()) - assert serialized_pickled - (f, args, kwargs) = pickle.loads(serialized_pickled) # noqa: S301 - assert str(f) == str(call) - assert args == [name] - assert kwargs == {"optional": optional} + + # Verify it's JSON format by decoding + serialized_decoded = base64.b64decode(serialized.encode()) + data = json.loads(serialized_decoded.decode()) + assert data["callable"] == call_name + assert data["args"] == [name] + assert data["kwargs"] == {"optional": optional} obj2 = deserialize(serialized) assert isinstance(obj2, cls) - assert obj2.name == name - assert obj2.optional == optional - assert obj2.get_params() == (name, optional) + assert obj2.name == name # type: ignore[attr-defined] + assert obj2.optional == optional # type: ignore[attr-defined] + assert obj2.get_params() == (name, optional) # type: ignore[attr-defined] -def test_serializable_init_no_params(): +def test_serializable_no_params(): + """Test serialization with no parameters.""" obj = MySerializableNoParams() - assert obj.clone_params() == (MySerializableNoParams, [], {}) + assert obj.clone_params() == (MySerializableNoParams.build, [], {}) + # Test new JSON serialization serialized = obj.serialize() assert serialized - serialized_pickled = base64.b64decode(serialized.encode()) - assert serialized_pickled - (f, args, kwargs) = pickle.loads(serialized_pickled) # noqa: S301 - assert f == MySerializableNoParams - assert args == [] - assert kwargs == {} + + # Verify it's JSON format + serialized_decoded = base64.b64decode(serialized.encode()) + data = json.loads(serialized_decoded.decode()) + assert data["callable"] == "MySerializableNoParams" + assert data["args"] == [] + assert data["kwargs"] == {} obj2 = deserialize(serialized) assert isinstance(obj2, MySerializableNoParams) + + +def test_callable_registry(): + """Test the CallableRegistry functionality.""" + + # Test registration + def dummy_func(): + pass + + CallableRegistry.register(dummy_func, "dummy_func") + assert CallableRegistry.get("dummy_func") is dummy_func + + # Test error cases + with pytest.raises(KeyError): + CallableRegistry.get("nonexistent") + + def unregistered_func(): + pass + + with pytest.raises(KeyError): + CallableRegistry.get("unregistered_func") + + +def test_reject_unregistered_callable(): + """Ensure unregistered callable names cannot be deserialized.""" + data = {"callable": "nonexistent_callable", "args": [], "kwargs": {}} + malicious_serialized = base64.b64encode(json.dumps(data).encode()).decode() + with pytest.raises(KeyError): + deserialize(malicious_serialized) + + +class NestedSerializable(Serializable): + def __init__(self, value: int, child: Optional["NestedSerializable"] = None): + self.value = value + self.child = child + + @classmethod + def factory( + cls, + value: int, + child: Optional[tuple[Callable, list, dict[str, Any]]] = None, + ) -> "NestedSerializable": + if child is not None: + f, a, kw = child + child_obj = f(*a, **kw) + else: + child_obj = None + return cls(value, child_obj) + + @classmethod + def serialize_callable_name(cls): + return "NestedSerializable.factory" + + def clone_params(self): + return ( + self.factory, + [self.value], + {"child": (self.child.clone_params() if self.child else None)}, + ) + + +CallableRegistry.register(NestedSerializable.factory, "NestedSerializable.factory") + + +def test_nested_recursive_serialization(): + leaf = NestedSerializable(2) + root = NestedSerializable(1, child=leaf) + serialized = root.serialize() + restored = deserialize(serialized) + assert isinstance(restored, NestedSerializable) + assert restored.value == 1 + assert isinstance(restored.child, NestedSerializable) + assert restored.child.value == 2 + assert restored.child.child is None + + +def test_deserialize_invalid_top_level(): + bad = base64.b64encode(json.dumps({"foo": 1}).encode()).decode() + with pytest.raises(ValueError): + deserialize(bad) + with pytest.raises(ValueError): + deserialize("Zm9vYmFy") # base64 for 'foobar'
tests/unit/test_warehouse.py+11 −10 modified@@ -1,5 +1,5 @@ import base64 -import pickle +import json from datachain.data_storage.serializer import deserialize from datachain.data_storage.sqlite import ( @@ -17,17 +17,18 @@ def test_serialize(sqlite_db): assert obj2.db.db_file == sqlite_db.db_file assert obj2.clone_params() == obj.clone_params() - # Test serialization + # Test serialization JSON format serialized = obj.serialize() assert serialized - serialized_pickled = base64.b64decode(serialized.encode()) - assert serialized_pickled - (f, args, kwargs) = pickle.loads(serialized_pickled) # noqa: S301 - assert str(f) == str(SQLiteWarehouse.init_after_clone) - assert args == [] - assert str(kwargs["db_clone_params"]) == str(sqlite_db.clone_params()) - - # Test deserialization + raw = base64.b64decode(serialized.encode()) + data = json.loads(raw.decode()) + assert data["callable"] == "sqlite.warehouse.init_after_clone" + assert data["args"] == [] + nested = data["kwargs"]["db_clone_params"] + assert nested["callable"] == "sqlite.from_db_file" + assert nested["args"] == [":memory:"] + assert nested["kwargs"] == {} + obj3 = deserialize(serialized) assert isinstance(obj3, SQLiteWarehouse) assert obj3.db.db_file == sqlite_db.db_file
Vulnerability mechanics
Synthesis attempt was rejected by the grounding validator. Re-run pending.
References
5- github.com/advisories/GHSA-6px8-mr29-cj4rghsaADVISORY
- nvd.nist.gov/vuln/detail/CVE-2025-61677ghsaADVISORY
- github.com/iterative/datachain/commit/914b95610620d50c8d9bee506ccbfa7d4d57fdc0nvdWEB
- github.com/iterative/datachain/pull/1358nvdWEB
- github.com/iterative/datachain/security/advisories/GHSA-6px8-mr29-cj4rnvdWEB
News mentions
0No linked articles in our index yet.