VYPR
High severity8.6GHSA Advisory· Published May 14, 2026· Updated May 14, 2026

CVE-2026-44504

CVE-2026-44504

Description

Aegra is a drop-in replacement for LangSmith Deployments. Prior to 0.9.7, with multiple authenticated users on a shared instance are vulnerable to a cross-tenant IDOR. Any authenticated attacker, given another user's thread_id, can execute graph runs against the user's thread, read the user's full checkpoint state, and inject arbitrary messages into the user's conversation history. This vulnerability is fixed in 0.9.7.

Affected products

1

Patches

1
e1b2042254fd

fix(runs): enforce thread ownership before creating runs (#337)

https://github.com/aegra/aegraVictor J. MarinApr 30, 2026via ghsa
6 files changed · +320 18
  • libs/aegra-api/src/aegra_api/api/runs.py+19 0 modified
    @@ -15,6 +15,7 @@
     from aegra_api.core.auth_deps import auth_dependency, get_current_user
     from aegra_api.core.auth_handlers import build_auth_context, handle_event
     from aegra_api.core.orm import Run as RunORM
    +from aegra_api.core.orm import Thread as ThreadORM
     from aegra_api.core.orm import _get_session_maker, get_session
     from aegra_api.core.sse import create_end_event, get_sse_headers
     from aegra_api.models import Run, RunCreate, RunStatus, User
    @@ -50,6 +51,12 @@ async def create_run(
         endpoint to follow progress. Provide either `input` or `command` (for
         human-in-the-loop resumption) but not both.
         """
    +    existing_thread = await session.scalar(
    +        select(ThreadORM).where(ThreadORM.thread_id == thread_id)
    +    )
    +    if existing_thread and existing_thread.user_id != user.identity:
    +        raise HTTPException(404, f"Thread '{thread_id}' not found")
    +
         # Authorization check (create_run action on threads resource)
         ctx = build_auth_context(user, "threads", "create_run")
         value = {**request.model_dump(), "thread_id": thread_id}
    @@ -92,6 +99,12 @@ async def create_and_stream_run(
         after the client disconnects (default is `"cancel"`). Use `stream_mode`
         to control which event types are emitted.
         """
    +    existing_thread = await session.scalar(
    +        select(ThreadORM).where(ThreadORM.thread_id == thread_id)
    +    )
    +    if existing_thread and existing_thread.user_id != user.identity:
    +        raise HTTPException(404, f"Thread '{thread_id}' not found")
    +
         run_id, run, _job = await _prepare_run(session, thread_id, request, user, initial_status="pending")
     
         # Default to cancel on disconnect - this matches user expectation that clicking
    @@ -311,6 +324,12 @@ async def wait_for_run(
     
         # Session block: all pre-execution DB work (validate, create run, submit)
         async with maker() as session:
    +        existing_thread = await session.scalar(
    +            select(ThreadORM).where(ThreadORM.thread_id == thread_id)
    +        )
    +        if existing_thread and existing_thread.user_id != user.identity:
    +            raise HTTPException(404, f"Thread '{thread_id}' not found")
    +
             run_id, _run, _job = await _prepare_run(session, thread_id, request, user, initial_status="pending")
     
         # No pool connection held from here — safe for long waits
    
  • libs/aegra-api/tests/e2e/manual_auth_tests/test_thread_user_isolation_e2e.py+261 0 added
    @@ -0,0 +1,261 @@
    +"""E2E tests verifying thread user isolation when authentication is enabled.
    +
    +Claim under test:
    +  When authentication is enabled, threads are automatically scoped to the
    +  authenticated user. Users can only see and interact with their own threads.
    +
    +⚠️ MANUAL TESTS - These are skipped by default. Run with: pytest -m manual_auth
    +
    +Requires a running Aegra server with auth enabled. See README.md for setup.
    +
    +Run:
    +    pytest tests/e2e/manual_auth_tests/test_thread_user_isolation_e2e.py -v -m manual_auth
    +"""
    +
    +import uuid
    +
    +import httpx
    +import pytest
    +
    +from aegra_api.settings import settings
    +from tests.e2e._utils import elog
    +
    +
    +def get_server_url() -> str:
    +    return settings.app.SERVER_URL
    +
    +
    +def get_auth_headers(user_id: str, role: str = "user", team_id: str = "team1") -> dict[str, str]:
    +    token = f"mock-jwt-{user_id}-{role}-{team_id}"
    +    return {"Authorization": f"Bearer {token}"}
    +
    +
    +def get_client_with_auth(user_id: str, role: str = "user", team_id: str = "team1"):
    +    from langgraph_sdk import get_client
    +
    +    token = f"mock-jwt-{user_id}-{role}-{team_id}"
    +    return get_client(url=get_server_url(), headers={"Authorization": f"Bearer {token}"})
    +
    +
    +@pytest.mark.e2e
    +@pytest.mark.manual_auth
    +class TestThreadOwnership:
    +    """Threads are created and stored under the authenticated user's identity."""
    +
    +    @pytest.mark.asyncio
    +    async def test_created_thread_is_owned_by_creator(self) -> None:
    +        """GET /threads/<id> returns the thread to its creator."""
    +        client = get_client_with_auth("alice")
    +        thread = await client.threads.create(metadata={"isolation_test": "ownership"})
    +        thread_id = thread["thread_id"]
    +        elog("Created thread", thread)
    +
    +        fetched = await client.threads.get(thread_id)
    +        assert fetched["thread_id"] == thread_id, "Creator should be able to fetch their own thread"
    +
    +    @pytest.mark.asyncio
    +    async def test_other_user_cannot_get_thread(self) -> None:
    +        """GET /threads/<id> returns 404 when a different user requests it."""
    +        alice_client = get_client_with_auth("alice")
    +        thread = await alice_client.threads.create(metadata={"isolation_test": "cross-user-get"})
    +        thread_id = thread["thread_id"]
    +        elog("Alice created thread", {"thread_id": thread_id})
    +
    +        bob_headers = get_auth_headers("bob")
    +        async with httpx.AsyncClient(
    +            base_url=get_server_url(), headers=bob_headers, timeout=30.0
    +        ) as http:
    +            resp = await http.get(f"/threads/{thread_id}")
    +
    +        elog("Bob GET Alice's thread", {"status": resp.status_code})
    +        assert resp.status_code == 404, (
    +            f"Expected 404 when bob requests alice's thread, got {resp.status_code}: {resp.text}"
    +        )
    +
    +
    +@pytest.mark.e2e
    +@pytest.mark.manual_auth
    +class TestThreadSearch:
    +    """Thread search/list only returns threads belonging to the requesting user."""
    +
    +    @pytest.mark.asyncio
    +    async def test_search_returns_only_own_threads(self) -> None:
    +        """POST /threads/search returns threads for the requesting user only."""
    +        tag = f"isolation-search-{uuid.uuid4().hex[:8]}"
    +
    +        alice_client = get_client_with_auth("alice")
    +        bob_client = get_client_with_auth("bob")
    +
    +        alice_thread = await alice_client.threads.create(metadata={"isolation_tag": tag})
    +        bob_thread = await bob_client.threads.create(metadata={"isolation_tag": tag})
    +        elog("Seeded threads", {"alice": alice_thread["thread_id"], "bob": bob_thread["thread_id"]})
    +
    +        alice_headers = get_auth_headers("alice")
    +        async with httpx.AsyncClient(
    +            base_url=get_server_url(), headers=alice_headers, timeout=30.0
    +        ) as http:
    +            resp = await http.post(
    +                "/threads/search",
    +                json={"metadata": {"isolation_tag": tag}, "limit": 100},
    +            )
    +        assert resp.status_code == 200, resp.text
    +        thread_ids = {t["thread_id"] for t in resp.json()}
    +        elog("Alice search results", sorted(thread_ids))
    +
    +        assert alice_thread["thread_id"] in thread_ids, "Alice should see her own thread"
    +        assert bob_thread["thread_id"] not in thread_ids, "Alice must not see Bob's thread"
    +
    +    @pytest.mark.asyncio
    +    async def test_list_endpoint_returns_only_own_threads(self) -> None:
    +        """GET /threads returns only threads owned by the requesting user."""
    +        tag = f"isolation-list-{uuid.uuid4().hex[:8]}"
    +
    +        alice_client = get_client_with_auth("alice")
    +        bob_client = get_client_with_auth("bob")
    +
    +        alice_thread = await alice_client.threads.create(metadata={"isolation_tag": tag})
    +        bob_thread = await bob_client.threads.create(metadata={"isolation_tag": tag})
    +
    +        alice_headers = get_auth_headers("alice")
    +        async with httpx.AsyncClient(
    +            base_url=get_server_url(), headers=alice_headers, timeout=30.0
    +        ) as http:
    +            resp = await http.get("/threads", params={"limit": 1000})
    +        assert resp.status_code == 200, resp.text
    +
    +        data = resp.json()
    +        thread_ids = {t["thread_id"] for t in (data if isinstance(data, list) else data.get("threads", []))}
    +        elog("Alice list results", {"count": len(thread_ids)})
    +
    +        assert alice_thread["thread_id"] in thread_ids, "Alice should see her own thread"
    +        assert bob_thread["thread_id"] not in thread_ids, "Alice must not see Bob's thread"
    +
    +
    +@pytest.mark.e2e
    +@pytest.mark.manual_auth
    +class TestThreadMutationIsolation:
    +    """Users cannot mutate threads that belong to another user."""
    +
    +    @pytest.mark.asyncio
    +    async def test_other_user_cannot_update_thread(self) -> None:
    +        """PATCH /threads/<id> returns 404 when a different user attempts to update."""
    +        alice_client = get_client_with_auth("alice")
    +        thread = await alice_client.threads.create(metadata={"isolation_test": "update"})
    +        thread_id = thread["thread_id"]
    +
    +        bob_headers = get_auth_headers("bob")
    +        async with httpx.AsyncClient(
    +            base_url=get_server_url(), headers=bob_headers, timeout=30.0
    +        ) as http:
    +            resp = await http.patch(
    +                f"/threads/{thread_id}",
    +                json={"metadata": {"hijacked": True}},
    +            )
    +        elog("Bob PATCH Alice's thread", {"status": resp.status_code})
    +        assert resp.status_code == 404, (
    +            f"Expected 404 when bob patches alice's thread, got {resp.status_code}: {resp.text}"
    +        )
    +
    +    @pytest.mark.asyncio
    +    async def test_other_user_cannot_delete_thread(self) -> None:
    +        """DELETE /threads/<id> returns 404 when a different user attempts to delete."""
    +        alice_client = get_client_with_auth("alice")
    +        thread = await alice_client.threads.create(metadata={"isolation_test": "delete"})
    +        thread_id = thread["thread_id"]
    +
    +        bob_headers = get_auth_headers("bob")
    +        async with httpx.AsyncClient(
    +            base_url=get_server_url(), headers=bob_headers, timeout=30.0
    +        ) as http:
    +            resp = await http.delete(f"/threads/{thread_id}")
    +        elog("Bob DELETE Alice's thread", {"status": resp.status_code})
    +        assert resp.status_code == 404, (
    +            f"Expected 404 when bob deletes alice's thread, got {resp.status_code}: {resp.text}"
    +        )
    +
    +        # Thread still accessible by Alice after Bob's failed delete attempt
    +        fetched = await alice_client.threads.get(thread_id)
    +        assert fetched["thread_id"] == thread_id, "Thread must still exist after unauthorized delete attempt"
    +
    +    @pytest.mark.asyncio
    +    async def test_other_user_cannot_add_run_to_thread(self) -> None:
    +        """POST /threads/<id>/runs returns 404 when a different user attempts to create a run."""
    +        alice_client = get_client_with_auth("alice")
    +        thread = await alice_client.threads.create(metadata={"isolation_test": "run"})
    +        thread_id = thread["thread_id"]
    +
    +        bob_headers = get_auth_headers("bob")
    +        async with httpx.AsyncClient(
    +            base_url=get_server_url(), headers=bob_headers, timeout=30.0
    +        ) as http:
    +            resp = await http.post(
    +                f"/threads/{thread_id}/runs",
    +                json={"assistant_id": "agent", "input": {"messages": [{"role": "human", "content": "hi"}]}},
    +            )
    +        elog("Bob POST run to Alice's thread", {"status": resp.status_code})
    +        assert resp.status_code == 404, (
    +            f"Expected 404 when bob tries to run against alice's thread, got {resp.status_code}: {resp.text}"
    +        )
    +
    +    @pytest.mark.asyncio
    +    async def test_other_user_cannot_stream_run_on_thread(self) -> None:
    +        """POST /threads/<id>/runs/stream returns 404 when a different user attempts to stream a run."""
    +        alice_client = get_client_with_auth("alice")
    +        thread = await alice_client.threads.create(metadata={"isolation_test": "stream-run"})
    +        thread_id = thread["thread_id"]
    +
    +        bob_headers = get_auth_headers("bob")
    +        async with httpx.AsyncClient(
    +            base_url=get_server_url(), headers=bob_headers, timeout=30.0
    +        ) as http:
    +            resp = await http.post(
    +                f"/threads/{thread_id}/runs/stream",
    +                json={"assistant_id": "agent", "input": {"messages": [{"role": "human", "content": "hi"}]}},
    +            )
    +        elog("Bob POST stream run to Alice's thread", {"status": resp.status_code})
    +        assert resp.status_code == 404, (
    +            f"Expected 404 when bob streams against alice's thread, got {resp.status_code}: {resp.text}"
    +        )
    +
    +    @pytest.mark.asyncio
    +    async def test_other_user_cannot_wait_run_on_thread(self) -> None:
    +        """POST /threads/<id>/runs/wait returns 404 when a different user attempts to create a run."""
    +        alice_client = get_client_with_auth("alice")
    +        thread = await alice_client.threads.create(metadata={"isolation_test": "wait-run"})
    +        thread_id = thread["thread_id"]
    +
    +        bob_headers = get_auth_headers("bob")
    +        async with httpx.AsyncClient(
    +            base_url=get_server_url(), headers=bob_headers, timeout=30.0
    +        ) as http:
    +            resp = await http.post(
    +                f"/threads/{thread_id}/runs/wait",
    +                json={"assistant_id": "agent", "input": {"messages": [{"role": "human", "content": "hi"}]}},
    +            )
    +        elog("Bob POST wait run to Alice's thread", {"status": resp.status_code})
    +        assert resp.status_code == 404, (
    +            f"Expected 404 when bob waits against alice's thread, got {resp.status_code}: {resp.text}"
    +        )
    +
    +
    +@pytest.mark.e2e
    +@pytest.mark.manual_auth
    +class TestUnauthenticatedAccess:
    +    """Requests without a valid token are rejected entirely."""
    +
    +    def test_get_thread_without_auth_returns_401(self) -> None:
    +        """GET /threads/<id> without Authorization header returns 401."""
    +        fake_thread_id = str(uuid.uuid4())
    +        resp = httpx.get(f"{get_server_url()}/threads/{fake_thread_id}", timeout=10.0)
    +        elog("Unauthenticated GET thread", {"status": resp.status_code})
    +        assert resp.status_code == 401, f"Expected 401, got {resp.status_code}"
    +
    +    def test_search_without_auth_returns_401(self) -> None:
    +        """POST /threads/search without Authorization header returns 401."""
    +        resp = httpx.post(
    +            f"{get_server_url()}/threads/search",
    +            json={"limit": 10},
    +            timeout=10.0,
    +        )
    +        elog("Unauthenticated search", {"status": resp.status_code})
    +        assert resp.status_code == 401, f"Expected 401, got {resp.status_code}"
    
  • libs/aegra-api/tests/integration/test_api/test_runs_crud.py+4 0 modified
    @@ -776,9 +776,13 @@ def test_wait_for_run_timeout(self):
             run = _run_row(status="running")
             run.output = {"partial": "data"}
     
    +        thread = _thread_row()
    +
             class Session(DummySessionBase):
                 async def scalar(self, stmt):
                     stmt_str = str(stmt).lower()
    +                if "from thread" in stmt_str:
    +                    return thread
                     if "from assistant" in stmt_str:
                         return assistant
                     if "from run" in stmt_str:
    
  • libs/aegra-api/tests/unit/test_api/test_runs.py+25 8 modified
    @@ -10,6 +10,7 @@
     from aegra_api.api.runs import create_run, get_run, join_run, list_runs, update_run
     from aegra_api.core.orm import Assistant as AssistantORM
     from aegra_api.core.orm import Run as RunORM
    +from aegra_api.core.orm import Thread as ThreadORM
     from aegra_api.models import Run, RunCreate, RunStatus, User
     
     
    @@ -27,6 +28,17 @@ def mock_session(self) -> AsyncMock:
             session.add = MagicMock()  # session.add is synchronous
             return session
     
    +    @pytest.fixture
    +    def sample_thread(self) -> ThreadORM:
    +        return ThreadORM(
    +            thread_id="test-thread-123",
    +            user_id="test-user",
    +            status="idle",
    +            metadata_json={},
    +            created_at=datetime.now(UTC),
    +            updated_at=datetime.now(UTC),
    +        )
    +
         @pytest.fixture
         def sample_assistant(self) -> AssistantORM:
             return AssistantORM(
    @@ -68,8 +80,8 @@ async def test_create_run_success(
             ):
                 mock_lg_service.return_value.list_graphs.return_value = ["test-graph"]
     
    -            # DB setup
    -            mock_session.scalar.return_value = sample_assistant
    +            # DB setup: first scalar = thread ownership check (None = new thread), second = assistant
    +            mock_session.scalar.side_effect = [None, sample_assistant]
     
                 result = await create_run(thread_id, request, mock_user, mock_session)
     
    @@ -88,7 +100,9 @@ async def test_create_run_success(
                 mock_create_task.assert_called_once()
     
         @pytest.mark.asyncio
    -    async def test_create_run_assistant_not_found(self, mock_user: User, mock_session: AsyncMock) -> None:
    +    async def test_create_run_assistant_not_found(
    +        self, mock_user: User, mock_session: AsyncMock, sample_thread: ThreadORM
    +    ) -> None:
             """Test creation with non-existent assistant."""
             thread_id = "test-thread-123"
             request = RunCreate(assistant_id="nonexistent", input={})
    @@ -100,8 +114,8 @@ async def test_create_run_assistant_not_found(self, mock_user: User, mock_sessio
             ):
                 mock_lg_service.return_value.list_graphs.return_value = ["test-graph"]
     
    -            # Return None for assistant lookup
    -            mock_session.scalar.return_value = None
    +            # First scalar call: thread ownership check (pass). Second: assistant lookup (None).
    +            mock_session.scalar.side_effect = [sample_thread, None]
     
                 with pytest.raises(HTTPException) as exc:
                     await create_run(thread_id, request, mock_user, mock_session)
    @@ -128,7 +142,7 @@ async def test_create_run_graph_not_found(
                 # Graph not in available graphs
                 mock_lg_service.return_value.list_graphs.return_value = ["other-graph"]
     
    -            mock_session.scalar.return_value = sample_assistant
    +            mock_session.scalar.side_effect = [None, sample_assistant]
     
                 with pytest.raises(HTTPException) as exc:
                     await create_run(thread_id, request, mock_user, mock_session)
    @@ -137,7 +151,9 @@ async def test_create_run_graph_not_found(
                 assert "Graph" in str(exc.value.detail)
     
         @pytest.mark.asyncio
    -    async def test_create_run_config_context_allowed(self, mock_user: User, mock_session: AsyncMock) -> None:
    +    async def test_create_run_config_context_allowed(
    +        self, mock_user: User, mock_session: AsyncMock, sample_thread: ThreadORM
    +    ) -> None:
             """Test both configurable and context are accepted."""
             thread_id = "test-thread-123"
             request = RunCreate(
    @@ -156,7 +172,8 @@ async def test_create_run_config_context_allowed(self, mock_user: User, mock_ses
                 ),
             ):
                 mock_lg_service.return_value.list_graphs.return_value = ["test-graph"]
    -            mock_session.scalar.return_value = None
    +            # First scalar call: thread ownership check (pass). Second: assistant lookup (None).
    +            mock_session.scalar.side_effect = [sample_thread, None]
     
                 with pytest.raises(HTTPException) as exc:
                     await create_run(thread_id, request, mock_user, mock_session)
    
  • libs/aegra-api/tests/unit/test_api/test_runs_streaming.py+2 2 modified
    @@ -71,8 +71,8 @@ async def test_create_and_stream_run_success(
             ):
                 mock_lg_service.return_value.list_graphs.return_value = ["test-graph"]
     
    -            # DB setup
    -            mock_session.scalar.return_value = sample_assistant
    +            # DB setup: first scalar = thread ownership check (None = new thread), second = assistant
    +            mock_session.scalar.side_effect = [None, sample_assistant]
     
                 # Mock generator for streaming response
                 async def mock_generator() -> AsyncGenerator:
    
  • libs/aegra-api/tests/unit/test_api/test_runs_wait.py+9 8 modified
    @@ -119,10 +119,10 @@ async def test_wait_for_run_timeout(self) -> None:
             user = User(identity="test-user", scopes=[])
             request = _make_request()
     
    -        # Pre-execution session (for _prepare_run)
    +        # Pre-execution session (for thread ownership check + _prepare_run)
             session_1 = AsyncMock()
             session_1.add = MagicMock()
    -        session_1.scalar.return_value = _make_assistant()
    +        session_1.scalar.side_effect = [None, _make_assistant()]
     
             # Post-wait session (for _fetch_run_output)
             session_2 = AsyncMock()
    @@ -167,7 +167,7 @@ async def test_wait_for_run_success(self) -> None:
     
             session_1 = AsyncMock()
             session_1.add = MagicMock()
    -        session_1.scalar.return_value = _make_assistant()
    +        session_1.scalar.side_effect = [None, _make_assistant()]
     
             session_2 = AsyncMock()
             session_2.scalar.return_value = _make_run_orm(run_id, thread_id, output={"result": "success"})
    @@ -208,7 +208,7 @@ async def test_wait_for_run_failed_status(self) -> None:
     
             session_1 = AsyncMock()
             session_1.add = MagicMock()
    -        session_1.scalar.return_value = _make_assistant()
    +        session_1.scalar.side_effect = [None, _make_assistant()]
     
             session_2 = AsyncMock()
             session_2.scalar.return_value = _make_run_orm(
    @@ -256,7 +256,7 @@ async def test_wait_for_run_interrupted_status(self) -> None:
     
             session_1 = AsyncMock()
             session_1.add = MagicMock()
    -        session_1.scalar.return_value = _make_assistant()
    +        session_1.scalar.side_effect = [None, _make_assistant()]
     
             session_2 = AsyncMock()
             session_2.scalar.return_value = _make_run_orm(
    @@ -299,10 +299,11 @@ async def test_wait_for_run_graph_not_found(self) -> None:
             user = User(identity="test-user", scopes=[])
             request = _make_request()
     
    +        assistant = _make_assistant()
    +        assistant.graph_id = "nonexistent-graph"
             session = AsyncMock()
             session.add = MagicMock()
    -        session.scalar.return_value = _make_assistant()
    -        session.scalar.return_value.graph_id = "nonexistent-graph"
    +        session.scalar.side_effect = [None, assistant]
     
             mock_maker = _make_session_maker(session)
     
    @@ -334,7 +335,7 @@ async def test_wait_for_run_returns_streaming_response(self) -> None:
     
             session_1 = AsyncMock()
             session_1.add = MagicMock()
    -        session_1.scalar.return_value = _make_assistant()
    +        session_1.scalar.side_effect = [None, _make_assistant()]
     
             session_2 = AsyncMock()
             session_2.scalar.return_value = _make_run_orm(run_id, thread_id, output={"ok": True})
    

Vulnerability mechanics

Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

7

News mentions

0

No linked articles in our index yet.