VYPR
High severityNVD Advisory· Published Mar 20, 2026· Updated Mar 20, 2026

Langflow has Missing Ownership Verification in API Key Deletion (IDOR)

CVE-2026-33053

Description

Langflow is a tool for building and deploying AI-powered agents and workflows. In versions prior to 1.9.0, the delete_api_key_route() endpoint accepts an api_key_id path parameter and deletes it with only a generic authentication check (get_current_active_user dependency). However, the delete_api_key() CRUD function does NOT verify that the API key belongs to the current user before deletion.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
langflowPyPI
< 1.7.21.7.2

Affected products

1

Patches

1
fdc1b3b1448f

fix: Check the user id of caller in delete_var (#11196)

https://github.com/langflow-ai/langflowEric HareJan 5, 2026via ghsa
4 files changed · +72 5
  • src/backend/base/langflow/api/v1/api_key.py+4 3 modified
    @@ -1,6 +1,6 @@
     from uuid import UUID
     
    -from fastapi import APIRouter, Depends, HTTPException, Response
    +from fastapi import APIRouter, HTTPException, Response
     
     from langflow.api.utils import CurrentActiveUser, DbSession
     from langflow.api.v1.schemas import ApiKeyCreateRequest, ApiKeysResponse
    @@ -41,13 +41,14 @@ async def create_api_key_route(
             raise HTTPException(status_code=400, detail=str(e)) from e
     
     
    -@router.delete("/{api_key_id}", dependencies=[Depends(auth_utils.get_current_active_user)])
    +@router.delete("/{api_key_id}")
     async def delete_api_key_route(
         api_key_id: UUID,
         db: DbSession,
    +    current_user: CurrentActiveUser,
     ):
         try:
    -        await delete_api_key(db, api_key_id)
    +        await delete_api_key(db, api_key_id, current_user.id)
         except Exception as e:
             raise HTTPException(status_code=400, detail=str(e)) from e
         return {"detail": "API Key deleted"}
    
  • src/backend/base/langflow/__main__.py+1 1 modified
    @@ -887,7 +887,7 @@ async def aapi_key():
                 stmt = select(ApiKey).where(ApiKey.user_id == superuser.id)
                 api_key = (await session.exec(stmt)).first()
                 if api_key:
    -                await delete_api_key(session, api_key.id)
    +                await delete_api_key(session, api_key.id, superuser.id)
     
                 api_key_create = ApiKeyCreate(name="CLI")
                 return await create_api_key(session, api_key_create, user_id=superuser.id)
    
  • src/backend/base/langflow/services/database/models/api_key/crud.py+4 1 modified
    @@ -41,11 +41,14 @@ async def create_api_key(session: AsyncSession, api_key_create: ApiKeyCreate, us
         return unmasked
     
     
    -async def delete_api_key(session: AsyncSession, api_key_id: UUID) -> None:
    +async def delete_api_key(session: AsyncSession, api_key_id: UUID, user_id: UUID) -> None:
         api_key = await session.get(ApiKey, api_key_id)
         if api_key is None:
             msg = "API Key not found"
             raise ValueError(msg)
    +    if api_key.user_id != user_id:
    +        msg = "API Key not found"
    +        raise ValueError(msg)
         await session.delete(api_key)
     
     
    
  • src/backend/tests/unit/api/v1/test_api_key.py+63 0 modified
    @@ -62,3 +62,66 @@ async def test_save_store_api_key(client: AsyncClient, logged_in_headers):
         assert response.status_code == status.HTTP_200_OK
         assert isinstance(result, dict), "The result must be a dictionary"
         assert "detail" in result, "The dictionary must contain a key called 'detail'"
    +
    +
    +async def test_delete_api_key_route_unauthorized(client: AsyncClient, logged_in_headers, active_user):
    +    """Test that users cannot delete API keys belonging to other users."""
    +    # Import required modules
    +    from langflow.services.auth.utils import get_password_hash
    +    from langflow.services.database.models.user.model import User
    +    from langflow.services.deps import session_scope
    +    from sqlmodel import select
    +
    +    # Create first user's API key
    +    basic_case = {
    +        "name": "test_key_user1",
    +        "total_uses": 0,
    +        "is_active": True,
    +        "api_key": "string",
    +        "user_id": str(active_user.id),
    +    }
    +    response = await client.post("api/v1/api_key/", json=basic_case, headers=logged_in_headers)
    +    assert response.status_code == status.HTTP_200_OK
    +    user1_api_key_id = response.json()["id"]
    +
    +    # Create a second user and get their auth headers
    +    async with session_scope() as session:
    +        user2 = User(
    +            username="testuser2",
    +            password=get_password_hash("testpassword2"),
    +            is_active=True,
    +            is_superuser=False,
    +        )
    +        stmt = select(User).where(User.username == user2.username)
    +        existing_user = (await session.exec(stmt)).first()
    +        if not existing_user:
    +            session.add(user2)
    +            await session.flush()
    +            await session.refresh(user2)
    +        else:
    +            user2 = existing_user
    +
    +    # Login as second user
    +    login_data = {"username": "testuser2", "password": "testpassword2"}
    +    response = await client.post("api/v1/login", data=login_data)
    +    assert response.status_code == status.HTTP_200_OK
    +    user2_token = response.json()["access_token"]
    +    user2_headers = {"Authorization": f"Bearer {user2_token}"}
    +
    +    # Try to delete first user's API key using second user's credentials
    +    response = await client.delete(f"api/v1/api_key/{user1_api_key_id}", headers=user2_headers)
    +
    +    # Should fail with 400 error (API Key not found - we don't reveal it exists)
    +    assert response.status_code == status.HTTP_400_BAD_REQUEST
    +    assert "API Key not found" in response.json()["detail"]
    +
    +    # Verify the first user's API key still exists by trying to delete it with correct credentials
    +    response = await client.delete(f"api/v1/api_key/{user1_api_key_id}", headers=logged_in_headers)
    +    assert response.status_code == status.HTTP_200_OK
    +    assert response.json()["detail"] == "API Key deleted"
    +
    +    # Clean up second user
    +    async with session_scope() as session:
    +        user = await session.get(User, user2.id)
    +        if user:
    +            await session.delete(user)
    

Vulnerability mechanics

Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

4

News mentions

0

No linked articles in our index yet.