LangGraph SQLite Checkpoint is vulnerable to SQL Injection via metadata filter key in checkpointer list method
Description
LangGraph SQLite Checkpoint is an implementation of LangGraph CheckpointSaver that uses SQLite DB (both sync and async, via aiosqlite). Versions 3.0.0 and below are vulnerable to SQL injection through the checkpoint implementation. Checkpoint allows attackers to manipulate SQL queries through metadata filter keys, affecting applications that accept untrusted metadata filter keys (not just filter values) in checkpoint search operations. The _metadata_predicate() function constructs SQL queries by interpolating filter keys directly into f-strings without validation. This issue is fixed in version 3.0.1.
Affected packages
Versions sourced from the GitHub Security Advisory.
| Package | Affected versions | Patched versions |
|---|---|---|
langgraph-checkpoint-sqlitePyPI | < 3.0.1 | 3.0.1 |
Affected products
1- Range: < 3.0.1
Patches
1297242913f8afix(checkpoint-sqlite): harden (#6565)
7 files changed · +422 −100
libs/checkpoint-sqlite/langgraph/checkpoint/sqlite/aio.py+3 −2 modified@@ -425,8 +425,9 @@ async def alist( FROM checkpoints {where} ORDER BY checkpoint_id DESC""" - if limit: - query += f" LIMIT {limit}" + if limit is not None: + query += " LIMIT ?" + params = (*params, limit) async with ( self.lock, self.conn.execute(query, params) as cur,
libs/checkpoint-sqlite/langgraph/checkpoint/sqlite/__init__.py+3 −2 modified@@ -329,8 +329,9 @@ def list( FROM checkpoints {where} ORDER BY checkpoint_id DESC""" - if limit: - query += f" LIMIT {limit}" + if limit is not None: + query += " LIMIT ?" + param_values = (*param_values, limit) with self.cursor(transaction=False) as cur, closing(self.conn.cursor()) as wcur: cur.execute(query, param_values) for (
libs/checkpoint-sqlite/langgraph/checkpoint/sqlite/utils.py+21 −0 modified@@ -1,12 +1,32 @@ from __future__ import annotations import json +import re from collections.abc import Sequence from typing import Any from langchain_core.runnables import RunnableConfig from langgraph.checkpoint.base import get_checkpoint_id +_FILTER_PATTERN = re.compile(r"^[a-zA-Z0-9_.-]+$") + + +def _validate_filter_key(key: str) -> None: + """Validate that a filter key is safe for use in SQL queries. + + Args: + key: The filter key to validate + + Raises: + ValueError: If the key contains invalid characters that could enable SQL injection + """ + # Allow alphanumeric characters, underscores, dots, and hyphens + # This covers typical JSON property names while preventing SQL injection + if not _FILTER_PATTERN.match(key): + raise ValueError( + f"Invalid filter key: '{key}'. Filter keys must contain only alphanumeric characters, underscores, dots, and hyphens." + ) + def _metadata_predicate( metadata_filter: dict[str, Any], @@ -43,6 +63,7 @@ def _where_value(query_value: Any) -> tuple[str, Any]: # process metadata query for query_key, query_value in metadata_filter.items(): + _validate_filter_key(query_key) operator, param_value = _where_value(query_value) predicates.append( f"json_extract(CAST(metadata AS TEXT), '$.{query_key}') {operator}"
libs/checkpoint-sqlite/langgraph/store/sqlite/base.py+60 −95 modified@@ -107,6 +107,9 @@ def _decode_ns_text(namespace: str) -> tuple[str, ...]: return tuple(namespace.split(".")) +_FILTER_PATTERN = re.compile(r"^[a-zA-Z0-9_.-]+$") + + def _validate_filter_key(key: str) -> None: """Validate that a filter key is safe for use in SQL queries. @@ -118,7 +121,7 @@ def _validate_filter_key(key: str) -> None: """ # Allow alphanumeric characters, underscores, dots, and hyphens # This covers typical JSON property names while preventing SQL injection - if not re.match(r"^[a-zA-Z0-9_.-]+$", key): + if not _FILTER_PATTERN.match(key): raise ValueError( f"Invalid filter key: '{key}'. Filter keys must contain only alphanumeric characters, underscores, dots, and hyphens." ) @@ -404,12 +407,9 @@ def _prepare_batch_search_queries( # SQLite json_extract returns unquoted string values if isinstance(value, str): filter_conditions.append( - "json_extract(value, '$." - + key - + "') = '" - + value.replace("'", "''") - + "'" + "json_extract(value, '$." + key + "') = ?" ) + filter_params.append(value) elif value is None: filter_conditions.append( "json_extract(value, '$." + key + "') IS NULL" @@ -423,9 +423,11 @@ def _prepare_batch_search_queries( + ("1" if value else "0") ) elif isinstance(value, (int, float)): + # Use parameterized query to handle special floats and large integers filter_conditions.append( - "json_extract(value, '$." + key + "') = " + str(value) + "json_extract(value, '$." + key + "') = ?" ) + filter_params.append(float(value)) else: # Complex objects (list, dict, …) – compare JSON text filter_conditions.append( @@ -636,85 +638,66 @@ def _get_filter_condition(self, key: str, op: str, value: Any) -> tuple[str, lis # We need to properly format values for SQLite JSON extraction comparison if op == "$eq": if isinstance(value, str): - # Direct string comparison with proper quoting for unquoted json_extract result - return ( - f"json_extract(value, '$.{key}') = '" - + value.replace("'", "''") - + "'", - [], - ) + return f"json_extract(value, '$.{key}') = ?", [value] elif value is None: return f"json_extract(value, '$.{key}') IS NULL", [] elif isinstance(value, bool): # SQLite JSON stores booleans as integers return f"json_extract(value, '$.{key}') = {1 if value else 0}", [] elif isinstance(value, (int, float)): - return f"json_extract(value, '$.{key}') = {value}", [] + # Convert to float to handle inf, -inf, nan, and very large integers + # SQLite REAL can handle these cases better than INTEGER + return f"json_extract(value, '$.{key}') = ?", [float(value)] else: return f"json_extract(value, '$.{key}') = ?", [orjson.dumps(value)] elif op == "$gt": # For numeric values, SQLite needs to compare as numbers, not strings if isinstance(value, (int, float)): - return f"CAST(json_extract(value, '$.{key}') AS REAL) > {value}", [] + # Convert to float to handle special values and very large integers + return f"CAST(json_extract(value, '$.{key}') AS REAL) > ?", [ + float(value) + ] elif isinstance(value, str): - return ( - f"json_extract(value, '$.{key}') > '" - + value.replace("'", "''") - + "'", - [], - ) + return f"json_extract(value, '$.{key}') > ?", [value] else: return f"json_extract(value, '$.{key}') > ?", [orjson.dumps(value)] elif op == "$gte": if isinstance(value, (int, float)): - return f"CAST(json_extract(value, '$.{key}') AS REAL) >= {value}", [] + return f"CAST(json_extract(value, '$.{key}') AS REAL) >= ?", [ + float(value) + ] elif isinstance(value, str): - return ( - f"json_extract(value, '$.{key}') >= '" - + value.replace("'", "''") - + "'", - [], - ) + return f"json_extract(value, '$.{key}') >= ?", [value] else: return f"json_extract(value, '$.{key}') >= ?", [orjson.dumps(value)] elif op == "$lt": if isinstance(value, (int, float)): - return f"CAST(json_extract(value, '$.{key}') AS REAL) < {value}", [] + return f"CAST(json_extract(value, '$.{key}') AS REAL) < ?", [ + float(value) + ] elif isinstance(value, str): - return ( - f"json_extract(value, '$.{key}') < '" - + value.replace("'", "''") - + "'", - [], - ) + return f"json_extract(value, '$.{key}') < ?", [value] else: return f"json_extract(value, '$.{key}') < ?", [orjson.dumps(value)] elif op == "$lte": if isinstance(value, (int, float)): - return f"CAST(json_extract(value, '$.{key}') AS REAL) <= {value}", [] + return f"CAST(json_extract(value, '$.{key}') AS REAL) <= ?", [ + float(value) + ] elif isinstance(value, str): - return ( - f"json_extract(value, '$.{key}') <= '" - + value.replace("'", "''") - + "'", - [], - ) + return f"json_extract(value, '$.{key}') <= ?", [value] else: return f"json_extract(value, '$.{key}') <= ?", [orjson.dumps(value)] elif op == "$ne": if isinstance(value, str): - return ( - f"json_extract(value, '$.{key}') != '" - + value.replace("'", "''") - + "'", - [], - ) + return f"json_extract(value, '$.{key}') != ?", [value] elif value is None: return f"json_extract(value, '$.{key}') IS NOT NULL", [] elif isinstance(value, bool): return f"json_extract(value, '$.{key}') != {1 if value else 0}", [] elif isinstance(value, (int, float)): - return f"json_extract(value, '$.{key}') != {value}", [] + # Convert to float for consistency + return f"json_extract(value, '$.{key}') != ?", [float(value)] else: return f"json_extract(value, '$.{key}') != ?", [orjson.dumps(value)] else: @@ -792,8 +775,9 @@ def __init__( self, conn: sqlite3.Connection, *, - deserializer: Callable[[bytes | str | orjson.Fragment], dict[str, Any]] - | None = None, + deserializer: ( + Callable[[bytes | str | orjson.Fragment], dict[str, Any]] | None + ) = None, index: SqliteIndexConfig | None = None, ttl: TTLConfig | None = None, ): @@ -874,85 +858,66 @@ def _get_filter_condition(self, key: str, op: str, value: Any) -> tuple[str, lis # We need to properly format values for SQLite JSON extraction comparison if op == "$eq": if isinstance(value, str): - # Direct string comparison with proper quoting for unquoted json_extract result - return ( - f"json_extract(value, '$.{key}') = '" - + value.replace("'", "''") - + "'", - [], - ) + return f"json_extract(value, '$.{key}') = ?", [value] elif value is None: return f"json_extract(value, '$.{key}') IS NULL", [] elif isinstance(value, bool): # SQLite JSON stores booleans as integers return f"json_extract(value, '$.{key}') = {1 if value else 0}", [] elif isinstance(value, (int, float)): - return f"json_extract(value, '$.{key}') = {value}", [] + # Convert to float to handle inf, -inf, nan, and very large integers + # SQLite REAL can handle these cases better than INTEGER + return f"json_extract(value, '$.{key}') = ?", [float(value)] else: return f"json_extract(value, '$.{key}') = ?", [orjson.dumps(value)] elif op == "$gt": # For numeric values, SQLite needs to compare as numbers, not strings if isinstance(value, (int, float)): - return f"CAST(json_extract(value, '$.{key}') AS REAL) > {value}", [] + # Convert to float to handle special values and very large integers + return f"CAST(json_extract(value, '$.{key}') AS REAL) > ?", [ + float(value) + ] elif isinstance(value, str): - return ( - f"json_extract(value, '$.{key}') > '" - + value.replace("'", "''") - + "'", - [], - ) + return f"json_extract(value, '$.{key}') > ?", [value] else: return f"json_extract(value, '$.{key}') > ?", [orjson.dumps(value)] elif op == "$gte": if isinstance(value, (int, float)): - return f"CAST(json_extract(value, '$.{key}') AS REAL) >= {value}", [] + return f"CAST(json_extract(value, '$.{key}') AS REAL) >= ?", [ + float(value) + ] elif isinstance(value, str): - return ( - f"json_extract(value, '$.{key}') >= '" - + value.replace("'", "''") - + "'", - [], - ) + return f"json_extract(value, '$.{key}') >= ?", [value] else: return f"json_extract(value, '$.{key}') >= ?", [orjson.dumps(value)] elif op == "$lt": if isinstance(value, (int, float)): - return f"CAST(json_extract(value, '$.{key}') AS REAL) < {value}", [] + return f"CAST(json_extract(value, '$.{key}') AS REAL) < ?", [ + float(value) + ] elif isinstance(value, str): - return ( - f"json_extract(value, '$.{key}') < '" - + value.replace("'", "''") - + "'", - [], - ) + return f"json_extract(value, '$.{key}') < ?", [value] else: return f"json_extract(value, '$.{key}') < ?", [orjson.dumps(value)] elif op == "$lte": if isinstance(value, (int, float)): - return f"CAST(json_extract(value, '$.{key}') AS REAL) <= {value}", [] + return f"CAST(json_extract(value, '$.{key}') AS REAL) <= ?", [ + float(value) + ] elif isinstance(value, str): - return ( - f"json_extract(value, '$.{key}') <= '" - + value.replace("'", "''") - + "'", - [], - ) + return f"json_extract(value, '$.{key}') <= ?", [value] else: return f"json_extract(value, '$.{key}') <= ?", [orjson.dumps(value)] elif op == "$ne": if isinstance(value, str): - return ( - f"json_extract(value, '$.{key}') != '" - + value.replace("'", "''") - + "'", - [], - ) + return f"json_extract(value, '$.{key}') != ?", [value] elif value is None: return f"json_extract(value, '$.{key}') IS NOT NULL", [] elif isinstance(value, bool): return f"json_extract(value, '$.{key}') != {1 if value else 0}", [] elif isinstance(value, (int, float)): - return f"json_extract(value, '$.{key}') != {value}", [] + # Convert to float for consistency + return f"json_extract(value, '$.{key}') != ?", [float(value)] else: return f"json_extract(value, '$.{key}') != ?", [orjson.dumps(value)] else:
libs/checkpoint-sqlite/tests/test_aiosqlite.py+75 −1 modified@@ -113,4 +113,78 @@ async def test_asearch(self) -> None: search_results_5[1].config["configurable"]["checkpoint_ns"], } == {"", "inner"} - # TODO: test before and limit params + # Test limit param + search_results_6 = [ + c + async for c in saver.alist( + {"configurable": {"thread_id": "thread-2"}}, limit=1 + ) + ] + assert len(search_results_6) == 1 + assert search_results_6[0].config["configurable"]["thread_id"] == "thread-2" + + # Test before param + search_results_7 = [ + c async for c in saver.alist(None, before=search_results_5[1].config) + ] + assert len(search_results_7) == 1 + assert search_results_7[0].config["configurable"]["thread_id"] == "thread-1" + + async def test_limit_parameter_sql_injection_prevention(self) -> None: + """Test that the limit parameter properly uses parameterized queries to prevent SQL injection.""" + async with AsyncSqliteSaver.from_conn_string(":memory:") as saver: + # Setup: Create multiple checkpoints + for i in range(5): + config: RunnableConfig = { + "configurable": { + "thread_id": f"thread-{i}", + "checkpoint_ns": "", + } + } + checkpoint = empty_checkpoint() + metadata: CheckpointMetadata = {"index": i} + await saver.aput(config, checkpoint, metadata, {}) + + # Test that limit works correctly with valid integer + results = [c async for c in saver.alist(None, limit=2)] + assert len(results) == 2 + + # Test that limit=0 returns no results + results = [c async for c in saver.alist(None, limit=0)] + assert len(results) == 0 + + # Test that limit=None returns all results + results = [c async for c in saver.alist(None, limit=None)] + assert len(results) == 5 + + # Test explicit SQL injection attempt via limit parameter + # Even if type checking is bypassed and a malicious string is passed, + # the parameterized query will treat it as a value, not SQL code + # This would cause an error (can't convert string to int for LIMIT), + # which is the correct secure behavior + malicious_limits = [ + "1; DROP TABLE checkpoints; --", + "1 OR 1=1", + "999999 UNION SELECT * FROM checkpoints", + ] + + for malicious_limit in malicious_limits: + # The parameterized query should safely reject non-integer limits + # or convert them in a way that prevents SQL injection + try: + # Bypass type checking by casting + results = [ + c + async for c in saver.alist(None, limit=malicious_limit) # type: ignore + ] + # If it doesn't raise an error, it should at least not execute the injection + # SQLite's parameter binding will try to convert the string to an integer + # which will either fail or treat it as 0 + except Exception: + # Expected: SQLite should reject invalid limit values + pass + + # Verify the checkpoints table still exists and has all data + # (would have been dropped if injection succeeded) + results = [c async for c in saver.alist(None, limit=None)] + assert len(results) == 5
libs/checkpoint-sqlite/tests/test_sqlite.py+125 −0 modified@@ -182,3 +182,128 @@ async def test_informative_async_errors(self) -> None: with pytest.raises(NotImplementedError, match="AsyncSqliteSaver"): async for _ in saver.alist(self.config_1): pass + + def test_metadata_predicate_sql_injection_prevention(self) -> None: + """Test that _metadata_predicate rejects malicious filter keys.""" + # Test various SQL injection payloads + malicious_keys = [ + "x') OR '1'='1", # Boolean-based injection + "x') OR 1=1 --", # Comment-based injection + "x') UNION SELECT 1,2,3,4,5,6,7 --", # UNION-based injection + "access') = 'public' OR '1'='1' OR json_extract(value, '$.", # Complex injection + "'; DROP TABLE checkpoints; --", # Destructive injection + ] + + for malicious_key in malicious_keys: + with pytest.raises(ValueError, match="Invalid filter key"): + _metadata_predicate({malicious_key: "dummy"}) + + def test_checkpoint_search_sql_injection_prevention(self) -> None: + """Test that SQL injection via malicious filter keys is prevented in checkpoint search.""" + with SqliteSaver.from_conn_string(":memory:") as saver: + # Setup: Create checkpoints with different metadata + config_public: RunnableConfig = { + "configurable": { + "thread_id": "thread-public", + "checkpoint_ns": "", + } + } + config_private: RunnableConfig = { + "configurable": { + "thread_id": "thread-private", + "checkpoint_ns": "", + } + } + + checkpoint_public = empty_checkpoint() + checkpoint_private = empty_checkpoint() + + metadata_public: CheckpointMetadata = { + "access": "public", + "data": "public information", + } + metadata_private: CheckpointMetadata = { + "access": "private", + "data": "secret information", + "password": "secret123", + } + + saver.put(config_public, checkpoint_public, metadata_public, {}) + saver.put(config_private, checkpoint_private, metadata_private, {}) + + # Normal query - should return only public checkpoint + normal_results = list(saver.list(None, filter={"access": "public"})) + assert len(normal_results) == 1 + assert normal_results[0].metadata["access"] == "public" + + # SQL injection attempt should raise ValueError + malicious_key = ( + "access') = 'public' OR '1'='1' OR json_extract(metadata, '$." + ) + + with pytest.raises(ValueError, match="Invalid filter key"): + list(saver.list(None, filter={malicious_key: "dummy"})) + + def test_limit_parameter_sql_injection_prevention(self) -> None: + """Test that the limit parameter properly uses parameterized queries to prevent SQL injection.""" + with SqliteSaver.from_conn_string(":memory:") as saver: + # Setup: Create multiple checkpoints + for i in range(5): + config: RunnableConfig = { + "configurable": { + "thread_id": f"thread-{i}", + "checkpoint_ns": "", + } + } + checkpoint = empty_checkpoint() + metadata: CheckpointMetadata = {"index": i} + saver.put(config, checkpoint, metadata, {}) + + # Test that limit works correctly with valid integer + results = list(saver.list(None, limit=2)) + assert len(results) == 2 + + # Test that limit=0 returns no results + results = list(saver.list(None, limit=0)) + assert len(results) == 0 + + # Test that limit=None returns all results + results = list(saver.list(None, limit=None)) + assert len(results) == 5 + + def test_metadata_filter_keys_with_hyphens_and_digits(self) -> None: + """Metadata keys with hyphens and digit-start should be filterable. + + This exposes incorrect JSON path handling (unquoted segments) by asserting + that such filters successfully match saved checkpoints. + """ + with SqliteSaver.from_conn_string(":memory:") as saver: + config: RunnableConfig = { + "configurable": { + "thread_id": "thread-hyphen-digit", + "checkpoint_ns": "", + } + } + checkpoint = empty_checkpoint() + metadata: CheckpointMetadata = { + "access-level": "public", + "user": {"access-level": "nested", "123abc": "ok2"}, + "123abc": "ok", + } + saver.put(config, checkpoint, metadata, {}) + + # Top-level hyphenated key + results = list(saver.list(None, filter={"access-level": "public"})) + assert len(results) == 1 + + # Nested hyphenated key via dotted path + results = list(saver.list(None, filter={"user.access-level": "nested"})) + assert len(results) == 1 + + # Top-level digit-starting key + results = list(saver.list(None, filter={"123abc": "ok"})) + assert len(results) == 1 + + # Nested digit-starting key via dotted path + results = list(saver.list(None, filter={"user.123abc": "ok2"})) + assert len(results) == 1
libs/checkpoint-sqlite/tests/test_store.py+135 −0 modified@@ -1069,6 +1069,141 @@ def test_sql_injection_vulnerability(store: SqliteStore) -> None: store.search(("docs",), filter={malicious_key: "dummy"}) +def test_sql_injection_filter_values(store: SqliteStore) -> None: + """Test that SQL injection via malicious filter values is properly escaped.""" + # Setup: Create documents with different access levels + store.put(("docs",), "doc1", {"access": "public", "title": "Public Document"}) + store.put(("docs",), "doc2", {"access": "private", "title": "Private Document"}) + store.put(("docs",), "doc3", {"access": "secret", "title": "Secret Document"}) + + # Test 1: Basic SQL injection attempt with single quote + malicious_value = "public' OR '1'='1" + results = store.search(("docs",), filter={"access": malicious_value}) + # Should return 0 results because the malicious value is escaped and won't match anything + assert len(results) == 0, "SQL injection via string value should be blocked" + + # Test 2: SQL injection with comment + malicious_value = "public'; --" + results = store.search(("docs",), filter={"access": malicious_value}) + assert len(results) == 0, "SQL comment injection should be blocked" + + # Test 3: UNION injection attempt + malicious_value = "public' UNION SELECT * FROM store --" + results = store.search(("docs",), filter={"access": malicious_value}) + assert len(results) == 0, "UNION injection should be blocked" + + # Test 4: Parameterized queries handle strings with null bytes and SQL injection attempts safely + malicious_value = "public\x00' OR '1'='1" + results = store.search(("docs",), filter={"access": malicious_value}) + assert len(results) == 0, ( + "Parameterized queries treat injection attempts as literal strings" + ) + + # Test 5: Multiple single quotes + malicious_value = "''''" + results = store.search(("docs",), filter={"access": malicious_value}) + assert len(results) == 0, "Multiple quotes should be handled safely" + + # Test 6: Legitimate value with single quote should work + store.put(("docs",), "doc4", {"title": "O'Brien's Document", "access": "public"}) + results = store.search(("docs",), filter={"title": "O'Brien's Document"}) + assert len(results) == 1, "Legitimate single quotes should work" + assert results[0].value["title"] == "O'Brien's Document" + + # Test 7: Unicode characters with injection attempt + malicious_value = "public' OR 'א'='א" + results = store.search(("docs",), filter={"access": malicious_value}) + assert len(results) == 0, "Unicode-based injection should be blocked" + + +def test_numeric_filter_safety(store: SqliteStore) -> None: + """Test that numeric filter values are handled safely.""" + # Setup: Create documents with numeric fields + store.put(("items",), "item1", {"price": 10, "quantity": 5}) + store.put(("items",), "item2", {"price": 20, "quantity": 3}) + store.put(("items",), "item3", {"price": 30, "quantity": 1}) + + # Test 1: Normal numeric comparison + results = store.search(("items",), filter={"price": {"$gt": 15}}) + assert len(results) == 2 + assert all(r.value["price"] > 15 for r in results) + + # Test 2: Special float values (infinity) + results = store.search(("items",), filter={"price": {"$lt": float("inf")}}) + assert len(results) == 3, "All finite values should be less than infinity" + + # Test 3: Special float values (negative infinity) + results = store.search(("items",), filter={"price": {"$gt": float("-inf")}}) + assert len(results) == 3, ( + "All finite values should be greater than negative infinity" + ) + + # Test 4: NaN handling - NaN comparisons should not cause errors + try: + results = store.search(("items",), filter={"price": {"$eq": float("nan")}}) + # NaN never equals anything, including itself, so should return 0 results + assert len(results) == 0 + except Exception as e: + pytest.fail(f"NaN handling should not raise exception: {e}") + + # Test 5: Very large numbers + results = store.search(("items",), filter={"price": {"$lt": 10**100}}) + assert len(results) == 3, "Very large numbers should be handled safely" + + # Test 6: Negative numbers + store.put(("items",), "item4", {"price": -10, "quantity": 0}) + results = store.search(("items",), filter={"price": {"$lt": 0}}) + assert len(results) == 1 + assert results[0].key == "item4" + + +def test_boolean_filter_safety(store: SqliteStore) -> None: + """Test that boolean filter values are handled safely.""" + store.put(("flags",), "flag1", {"active": True, "name": "Feature A"}) + store.put(("flags",), "flag2", {"active": False, "name": "Feature B"}) + store.put(("flags",), "flag3", {"active": True, "name": "Feature C"}) + + # Test boolean filters + results = store.search(("flags",), filter={"active": True}) + assert len(results) == 2 + assert all(r.value["active"] is True for r in results) + + results = store.search(("flags",), filter={"active": False}) + assert len(results) == 1 + assert results[0].value["active"] is False + + +def test_filter_keys_with_hyphens_and_digits(store: SqliteStore) -> None: + """Keys with hyphens or leading digits should be queryable via filters. + + Current unquoted JSON path construction (e.g., '$.access-level' or '$.123abc') + is not valid JSON1 syntax, so this test will catch regressions in path handling. + """ + # Documents with top-level and nested keys requiring bracket-quoted JSON paths + store.put( + ("docs",), + "hyphen", + {"access-level": "public", "user": {"access-level": "nested"}}, + ) + store.put(("docs",), "digit", {"123abc": "ok", "user": {"123abc": "ok2"}}) + + # Top-level hyphenated key + results = store.search(("docs",), filter={"access-level": "public"}) + assert [r.key for r in results] == ["hyphen"] + + # Nested hyphenated key via dotted path + results = store.search(("docs",), filter={"user.access-level": "nested"}) + assert [r.key for r in results] == ["hyphen"] + + # Top-level digit-starting key + results = store.search(("docs",), filter={"123abc": "ok"}) + assert [r.key for r in results] == ["digit"] + + # Nested digit-starting key via dotted path + results = store.search(("docs",), filter={"user.123abc": "ok2"}) + assert [r.key for r in results] == ["digit"] + + @pytest.mark.parametrize("distance_type", VECTOR_TYPES) def test_non_ascii( fake_embeddings: CharacterEmbeddings,
Vulnerability mechanics
Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
4- github.com/advisories/GHSA-9rwj-6rc7-p77cghsaADVISORY
- nvd.nist.gov/vuln/detail/CVE-2025-67644ghsaADVISORY
- github.com/langchain-ai/langgraph/commit/297242913f8ad2143ee3e2f72e67db0911d48e2aghsax_refsource_MISCWEB
- github.com/langchain-ai/langgraph/security/advisories/GHSA-9rwj-6rc7-p77cghsax_refsource_CONFIRMWEB
News mentions
0No linked articles in our index yet.