VYPR
Moderate severityNVD Advisory· Published Jun 6, 2024· Updated Oct 21, 2025

Improper Authorization in zenml-io/zenml

CVE-2024-2035

Description

An improper authorization vulnerability exists in the zenml-io/zenml repository, specifically within the API PUT /api/v1/users/id endpoint. This vulnerability allows any authenticated user to modify the information of other users, including changing the active status of user accounts to false, effectively deactivating them. This issue affects version 0.55.3 and was fixed in version 0.56.2. The impact of this vulnerability is significant as it allows for the deactivation of admin accounts, potentially disrupting the functionality and security of the application.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
zenmlPyPI
< 0.56.20.56.2

Affected products

1

Patches

1
b95f083efffa

Add admin users notion (#2494)

https://github.com/zenml-io/zenmlAndrei VishniakovMar 12, 2024via ghsa
18 files changed · +603 67
  • examples/quickstart/README.md+2 2 modified
    @@ -24,7 +24,7 @@ Along the way we will also show you how to:
     
     You can use Google Colab to see ZenML in action, no signup / installation required!
     
    -<a href="https://colab.research.google.com/github/zenml-io/zenml/blob/main/examples/quickstart/run.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>
    +<a href="https://colab.research.google.com/github/zenml-io/zenml/blob/main/examples/quickstart/quickstart.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>
     
     ## :computer: Run Locally
     
    @@ -208,4 +208,4 @@ The best way to get a production ZenML instance up and running with all batterie
     Also, make sure to join our <a href="https://zenml.io/slack" target="_blank">
         <img width="15" src="https://cdn3.iconfinder.com/data/icons/logos-and-brands-adobe/512/306_Slack-512.png" alt="Slack"/>
         <b>Slack Community</b> 
    -</a> to become part of the ZenML family!
    \ No newline at end of file
    +</a> to become part of the ZenML family!
    
  • src/zenml/client.py+13 1 modified
    @@ -691,18 +691,22 @@ def create_user(
             self,
             name: str,
             password: Optional[str] = None,
    +        is_admin: bool = False,
         ) -> UserResponse:
             """Create a new user.
     
             Args:
                 name: The name of the user.
                 password: The password of the user. If not provided, the user will
                     be created with empty password.
    +            is_admin: Whether the user should be an admin.
     
             Returns:
                 The model of the created user.
             """
    -        user = UserRequest(name=name, password=password or None)
    +        user = UserRequest(
    +            name=name, password=password or None, is_admin=is_admin
    +        )
             user.active = (
                 password != "" if self.zen_store.type != StoreType.REST else True
             )
    @@ -801,6 +805,8 @@ def update_user(
             updated_email: Optional[str] = None,
             updated_email_opt_in: Optional[bool] = None,
             updated_hub_token: Optional[str] = None,
    +        updated_password: Optional[str] = None,
    +        updated_is_admin: Optional[bool] = None,
         ) -> UserResponse:
             """Update a user.
     
    @@ -811,6 +817,8 @@ def update_user(
                 updated_email: The new email of the user.
                 updated_email_opt_in: The new email opt-in status of the user.
                 updated_hub_token: Update the hub token
    +            updated_password: The new password of the user.
    +            updated_is_admin: Whether the user should be an admin.
     
             Returns:
                 The updated user.
    @@ -830,6 +838,10 @@ def update_user(
                 user_update.email_opted_in = updated_email_opt_in
             if updated_hub_token is not None:
                 user_update.hub_token = updated_hub_token
    +        if updated_password is not None:
    +            user_update.password = updated_password
    +        if updated_is_admin is not None:
    +            user_update.is_admin = updated_is_admin
     
             return self.zen_store.update_user(
                 user_id=user.id, user_update=user_update
    
  • src/zenml/cli/user_management.py+71 4 modified
    @@ -120,15 +120,27 @@ def list_users(ctx: click.Context, **kwargs: Any) -> None:
         required=False,
         type=str,
     )
    +@click.option(
    +    "--is_admin",
    +    is_flag=True,
    +    help=(
    +        "Whether the user should be an admin. If not specified, the user will "
    +        "be a regular user."
    +    ),
    +    required=False,
    +    default=False,
    +)
     def create_user(
         user_name: str,
         password: Optional[str] = None,
    +    is_admin: bool = False,
     ) -> None:
         """Create a new user.
     
         Args:
             user_name: The name of the user to create.
             password: The password of the user to create.
    +        is_admin: Whether the user should be an admin.
         """
         client = Client()
         if not password:
    @@ -146,7 +158,9 @@ def create_user(
                 )
     
         try:
    -        new_user = client.create_user(name=user_name, password=password)
    +        new_user = client.create_user(
    +            name=user_name, password=password, is_admin=is_admin
    +        )
     
             cli_utils.declare(f"Created user '{new_user.name}'.")
         except EntityExistsError as err:
    @@ -162,8 +176,7 @@ def create_user(
     
     @user.command(
         "update",
    -    help="Update user information through the cli. All attributes "
    -    "except for password can be updated through the cli.",
    +    help="Update user information through the cli.",
     )
     @click.argument("user_name_or_id", type=str, required=True)
     @click.option(
    @@ -191,26 +204,80 @@ def create_user(
         required=False,
         help="New user email.",
     )
    +@click.option(
    +    "--password",
    +    "-p",
    +    "updated_password",
    +    type=str,
    +    required=False,
    +    help="New user password.",
    +)
    +@click.option(
    +    "--admin",
    +    "-a",
    +    "make_admin",
    +    is_flag=True,
    +    required=False,
    +    default=None,
    +    help="Whether the user should be an admin.",
    +)
    +@click.option(
    +    "--user",
    +    "-u",
    +    "make_user",
    +    is_flag=True,
    +    required=False,
    +    default=None,
    +    help="Whether the user should be a regular user.",
    +)
     def update_user(
         user_name_or_id: str,
         updated_name: Optional[str] = None,
         updated_full_name: Optional[str] = None,
         updated_email: Optional[str] = None,
    +    updated_password: Optional[str] = None,
    +    make_admin: Optional[bool] = None,
    +    make_user: Optional[bool] = None,
     ) -> None:
    -    """Create a new user.
    +    """Update an existing user.
     
         Args:
             user_name_or_id: The name of the user to create.
             updated_name: The name of the user to create.
             updated_full_name: The name of the user to create.
             updated_email: The name of the user to create.
    +        updated_password: The name of the user to create.
    +        make_admin: Whether the user should be an admin.
    +        make_user: Whether the user should be a regular user.
         """
    +    if make_admin is not None and make_user is not None:
    +        cli_utils.error(
    +            "Cannot set both --admin and --user flags as these are mutually exclusive."
    +        )
         try:
    +        current_user = Client().get_user(
    +            user_name_or_id, allow_name_prefix_match=False
    +        )
    +        if current_user.is_admin and make_user:
    +            confirmation = cli_utils.confirmation(
    +                f"Currently user `{current_user.name}` is an admin. Are you sure you want to make them a regular user?"
    +            )
    +            if not confirmation:
    +                cli_utils.declare("User update canceled.")
    +                return
    +
    +        updated_is_admin = None
    +        if make_admin is True:
    +            updated_is_admin = True
    +        elif make_user is True:
    +            updated_is_admin = False
             Client().update_user(
                 name_id_or_prefix=user_name_or_id,
                 updated_name=updated_name,
                 updated_full_name=updated_full_name,
                 updated_email=updated_email,
    +            updated_password=updated_password,
    +            updated_is_admin=updated_is_admin,
             )
         except (KeyError, IllegalOperationError) as err:
             cli_utils.error(str(err))
    
  • src/zenml/models/v2/core/service_account.py+1 0 modified
    @@ -147,6 +147,7 @@ def to_user_model(self) -> "UserResponse":
                     email_opted_in=False,
                     created=self.created,
                     updated=self.updated,
    +                is_admin=False,
                 ),
                 metadata=UserResponseMetadata(
                     description=self.description,
    
  • src/zenml/models/v2/core/user.py+78 36 modified
    @@ -26,7 +26,7 @@
     )
     from uuid import UUID
     
    -from pydantic import Field, root_validator
    +from pydantic import BaseModel, Field, root_validator
     
     from zenml.constants import STR_FIELD_MAX_LENGTH
     from zenml.models.v2.base.base import (
    @@ -35,40 +35,23 @@
         BaseRequest,
         BaseResponseMetadata,
         BaseResponseResources,
    +    BaseZenModel,
     )
     from zenml.models.v2.base.filter import AnyQuery, BaseFilter
    -from zenml.models.v2.base.update import update_model
     
     if TYPE_CHECKING:
         from passlib.context import CryptContext
     
         from zenml.models.v2.base.filter import AnySchema
     
    -# ------------------ Request Model ------------------
    -
    +# ------------------ Base Model ------------------
     
    -class UserRequest(BaseRequest):
    -    """Request model for users."""
     
    -    # Analytics fields for user request models
    -    ANALYTICS_FIELDS: ClassVar[List[str]] = [
    -        "name",
    -        "full_name",
    -        "active",
    -        "email_opted_in",
    -    ]
    +class UserBase(BaseModel):
    +    """Base model for users."""
     
         # Fields
    -    name: str = Field(
    -        title="The unique username for the account.",
    -        max_length=STR_FIELD_MAX_LENGTH,
    -    )
    -    full_name: str = Field(
    -        default="",
    -        title="The full name for the account owner. Only relevant for user "
    -        "accounts.",
    -        max_length=STR_FIELD_MAX_LENGTH,
    -    )
    +
         email: Optional[str] = Field(
             default=None,
             title="The email address associated with the account.",
    @@ -99,17 +82,6 @@ class UserRequest(BaseRequest):
             default=None,
             title="The external user ID associated with the account.",
         )
    -    active: bool = Field(default=False, title="Whether the account is active.")
    -
    -    class Config:
    -        """Pydantic configuration class."""
    -
    -        # Validate attributes when assigning them
    -        validate_assignment = True
    -
    -        # Forbid extra attributes to prevent unexpected behavior
    -        extra = "forbid"
    -        underscore_attrs_are_private = True
     
         @classmethod
         def _get_crypt_context(cls) -> "CryptContext":
    @@ -165,13 +137,71 @@ def generate_activation_token(self) -> str:
             return self.activation_token
     
     
    +# ------------------ Request Model ------------------
    +
    +
    +class UserRequest(UserBase, BaseRequest):
    +    """Request model for users."""
    +
    +    # Analytics fields for user request models
    +    ANALYTICS_FIELDS: ClassVar[List[str]] = [
    +        "name",
    +        "full_name",
    +        "active",
    +        "email_opted_in",
    +    ]
    +
    +    name: str = Field(
    +        title="The unique username for the account.",
    +        max_length=STR_FIELD_MAX_LENGTH,
    +    )
    +    full_name: str = Field(
    +        default="",
    +        title="The full name for the account owner. Only relevant for user "
    +        "accounts.",
    +        max_length=STR_FIELD_MAX_LENGTH,
    +    )
    +    is_admin: bool = Field(
    +        title="Whether the account is an administrator.",
    +    )
    +    active: bool = Field(default=False, title="Whether the account is active.")
    +
    +    class Config:
    +        """Pydantic configuration class."""
    +
    +        # Validate attributes when assigning them
    +        validate_assignment = True
    +
    +        # Forbid extra attributes to prevent unexpected behavior
    +        extra = "forbid"
    +        underscore_attrs_are_private = True
    +
    +
     # ------------------ Update Model ------------------
     
     
    -@update_model
    -class UserUpdate(UserRequest):
    +class UserUpdate(UserBase, BaseZenModel):
         """Update model for users."""
     
    +    name: Optional[str] = Field(
    +        title="The unique username for the account.",
    +        max_length=STR_FIELD_MAX_LENGTH,
    +        default=None,
    +    )
    +    full_name: Optional[str] = Field(
    +        default=None,
    +        title="The full name for the account owner. Only relevant for user "
    +        "accounts.",
    +        max_length=STR_FIELD_MAX_LENGTH,
    +    )
    +    is_admin: Optional[bool] = Field(
    +        default=None,
    +        title="Whether the account is an administrator.",
    +    )
    +    active: Optional[bool] = Field(
    +        default=None, title="Whether the account is active."
    +    )
    +
         @root_validator
         def user_email_updates(cls, values: Dict[str, Any]) -> Dict[str, Any]:
             """Validate that the UserUpdateModel conforms to the email-opt-in-flow.
    @@ -231,6 +261,9 @@ class UserResponseBody(BaseDatedResponseBody):
         is_service_account: bool = Field(
             title="Indicates whether this is a service account or a user account."
         )
    +    is_admin: bool = Field(
    +        title="Whether the account is an administrator.",
    +    )
     
     
     class UserResponseMetadata(BaseResponseMetadata):
    @@ -340,6 +373,15 @@ def is_service_account(self) -> bool:
             """
             return self.get_body().is_service_account
     
    +    @property
    +    def is_admin(self) -> bool:
    +        """The `is_admin` property.
    +
    +        Returns:
    +            Whether the user is an admin.
    +        """
    +        return self.get_body().is_admin
    +
         @property
         def email(self) -> Optional[str]:
             """The `email` property.
    
  • src/zenml/models/v2/misc/external_user.py+1 0 modified
    @@ -25,6 +25,7 @@ class ExternalUserModel(BaseModel):
         id: UUID
         email: str
         name: Optional[str] = None
    +    is_admin: bool = False
     
         class Config:
             """Pydantic configuration."""
    
  • src/zenml/zen_server/auth.py+2 0 modified
    @@ -588,6 +588,7 @@ def authenticate_external_user(external_access_token: str) -> AuthContext:
                     email_opted_in=True,
                     active=True,
                     email=external_user.email,
    +                is_admin=external_user.is_admin,
                 ),
             )
         except KeyError:
    @@ -603,6 +604,7 @@ def authenticate_external_user(external_access_token: str) -> AuthContext:
                     email_opted_in=True,
                     active=True,
                     email=external_user.email,
    +                is_admin=external_user.is_admin,
                 )
             )
     
    
  • src/zenml/zen_server/rbac/endpoint_utils.py+4 1 modified
    @@ -59,7 +59,10 @@ def verify_permissions_and_create_entity(
                     "different user."
                 )
     
    -    verify_permission(resource_type=resource_type, action=Action.CREATE)
    +    verify_permission(
    +        resource_type=resource_type,
    +        action=Action.CREATE,
    +    )
         return create_method(request_model)
     
     
    
  • src/zenml/zen_server/rbac/utils.py+4 1 modified
    @@ -233,7 +233,10 @@ def verify_permission_for_model(model: AnyResponse, action: Action) -> None:
         batch_verify_permissions_for_models(models=[model], action=action)
     
     
    -def batch_verify_permissions(resources: Set[Resource], action: Action) -> None:
    +def batch_verify_permissions(
    +    resources: Set[Resource],
    +    action: Action,
    +) -> None:
         """Batch permission verification.
     
         Args:
    
  • src/zenml/zen_server/routers/users_endpoints.py+65 7 modified
    @@ -59,6 +59,7 @@
         handle_exceptions,
         make_dependable,
         server_config,
    +    verify_admin_status_if_no_rbac,
         zen_store,
     )
     
    @@ -112,6 +113,9 @@ def list_users(
         if allowed_ids is not None:
             # Make sure users can see themselves
             allowed_ids.add(auth_context.user.id)
    +    else:
    +        if not auth_context.user.is_admin and not server_config().rbac_enabled:
    +            allowed_ids = {auth_context.user.id}
     
         user_filter_model.configure_rbac(
             authenticated_user_id=auth_context.user.id, id=allowed_ids
    @@ -139,14 +143,15 @@ def list_users(
         @handle_exceptions
         def create_user(
             user: UserRequest,
    -        _: AuthContext = Security(authorize),
    +        auth_context: AuthContext = Security(authorize),
         ) -> UserResponse:
             """Creates a user.
     
             # noqa: DAR401
     
             Args:
                 user: User to create.
    +            auth_context: Authentication context.
     
             Returns:
                 The created user.
    @@ -163,6 +168,10 @@ def create_user(
             else:
                 user.active = True
     
    +        verify_admin_status_if_no_rbac(
    +            auth_context.user.is_admin, "create user"
    +        )
    +
             new_user = verify_permissions_and_create_entity(
                 request_model=user,
                 resource_type=ResourceType.USER,
    @@ -202,7 +211,13 @@ def get_user(
             user_name_or_id=user_name_or_id, hydrate=hydrate
         )
         if user.id != auth_context.user.id:
    -        verify_permission_for_model(user, action=Action.READ)
    +        verify_admin_status_if_no_rbac(
    +            auth_context.user.is_admin, "get other user"
    +        )
    +        verify_permission_for_model(
    +            user,
    +            action=Action.READ,
    +        )
     
         return dehydrate_response_model(user)
     
    @@ -235,11 +250,32 @@ def update_user(
     
             Returns:
                 The updated user.
    +
    +        Raises:
    +            IllegalOperationError: if the user tries change admin status,
    +                while not an admin
             """
             user = zen_store().get_user(user_name_or_id)
             if user.id != auth_context.user.id:
    -            verify_permission_for_model(user, action=Action.UPDATE)
    +            verify_admin_status_if_no_rbac(
    +                auth_context.user.is_admin, "update other user"
    +            )
    +            verify_permission_for_model(
    +                user,
    +                action=Action.UPDATE,
    +            )
    +        if (
    +            user_update.is_admin is not None
    +            and user.is_admin != user_update.is_admin
    +            and not auth_context.user.is_admin
    +        ):
    +            raise IllegalOperationError(
    +                "Only admins can change the admin status of other users."
    +            )
     
    +        user_update.activation_token = user.activation_token
    +        if not auth_context.user.is_admin or user.id == auth_context.user.id:
    +            user_update.active = user.active
             updated_user = zen_store().update_user(
                 user_id=user.id,
                 user_update=user_update,
    @@ -279,6 +315,7 @@ def activate_user(
             )
             user_update.active = True
             user_update.activation_token = None
    +        user_update.is_admin = user.is_admin
             return zen_store().update_user(
                 user_id=user.id, user_update=user_update
             )
    @@ -305,10 +342,21 @@ def deactivate_user(
     
             Returns:
                 The generated activation token.
    +
    +        Raises:
    +            IllegalOperationError: if the user is trying to deactivate
    +                themselves.
             """
             user = zen_store().get_user(user_name_or_id)
    -        if user.id != auth_context.user.id:
    -            verify_permission_for_model(user, action=Action.UPDATE)
    +        if user.id == auth_context.user.id:
    +            raise IllegalOperationError("Cannot deactivate yourself.")
    +        verify_admin_status_if_no_rbac(
    +            auth_context.user.is_admin, "deactivate user"
    +        )
    +        verify_permission_for_model(
    +            user,
    +            action=Action.UPDATE,
    +        )
     
             user_update = UserUpdate(
                 name=user.name,
    @@ -354,7 +402,13 @@ def delete_user(
                     "administrator."
                 )
             else:
    -            verify_permission_for_model(user, action=Action.DELETE)
    +            verify_admin_status_if_no_rbac(
    +                auth_context.user.is_admin, "delete user"
    +            )
    +            verify_permission_for_model(
    +                user,
    +                action=Action.DELETE,
    +            )
     
             zen_store().delete_user(user_name_or_id=user_name_or_id)
     
    @@ -402,7 +456,6 @@ def email_opt_in_response(
                         email=user_response.email,
                         source="zenml server",
                     )
    -
                 updated_user = zen_store().update_user(
                     user_id=user.id, user_update=user_update
                 )
    @@ -460,6 +513,11 @@ def update_myself(
             Returns:
                 The updated user.
             """
    +        current_user = zen_store().get_user(auth_context.user.id)
    +        user.activation_token = current_user.activation_token
    +        user.active = current_user.active
    +        user.is_admin = current_user.is_admin
    +
             updated_user = zen_store().update_user(
                 user_id=auth_context.user.id, user_update=user
             )
    
  • src/zenml/zen_server/utils.py+32 1 modified
    @@ -27,7 +27,7 @@
         ENV_ZENML_SERVER,
     )
     from zenml.enums import ServerProviderType
    -from zenml.exceptions import OAuthError
    +from zenml.exceptions import IllegalOperationError, OAuthError
     from zenml.logger import get_logger
     from zenml.plugins.plugin_flavor_registry import PluginFlavorRegistry
     from zenml.zen_server.deploy.deployment import ServerDeployment
    @@ -396,3 +396,34 @@ def get_ip_location(ip_address: str) -> Tuple[str, str, str]:
         except Exception:
             logger.exception(f"Could not get IP location for {ip_address}.")
             return "", "", ""
    +
    +
    +def verify_admin_status_if_no_rbac(
    +    admin_status: Optional[bool],
    +    action: Optional[str] = None,
    +) -> None:
    +    """Validate the admin status for sensitive requests.
    +
    +    Only add this check in endpoints meant for admin use only.
    +
    +    Args:
    +        admin_status: Whether the user is an admin or not. This is only used
    +            if explicitly specified in the call and even if passed will be
    +            ignored, if RBAC is enabled.
    +        action: The action that is being performed, used for output only.
    +
    +    Raises:
    +        IllegalOperationError: If the admin status is not valid.
    +    """
    +    if not server_config().rbac_enabled:
    +        if not action:
    +            action = "this action"
    +        else:
    +            action = f"`{action.strip('`')}`"
    +
    +        if admin_status is False:
    +            raise IllegalOperationError(
    +                message=f"Only admin users can perform {action} "
    +                "without RBAC enabled.",
    +            )
    +    return
    
  • src/zenml/zen_stores/migrations/versions/1a9a9d2a836d_admin_users.py+56 0 added
    @@ -0,0 +1,56 @@
    +"""admin users [1a9a9d2a836d].
    +
    +Revision ID: 1a9a9d2a836d
    +Revises: 0.55.5
    +Create Date: 2024-03-04 15:48:16.580871
    +
    +"""
    +
    +import sqlalchemy as sa
    +import sqlmodel
    +from alembic import op
    +
    +# revision identifiers, used by Alembic.
    +revision = "1a9a9d2a836d"
    +down_revision = "0.55.5"
    +branch_labels = None
    +depends_on = None
    +
    +
    +def upgrade() -> None:
    +    """Upgrade database schema and/or data, creating a new revision."""
    +    # ### commands auto generated by Alembic - please adjust! ###
    +    bind = op.get_bind()
    +    session = sqlmodel.Session(bind=bind)
    +
    +    with op.batch_alter_table("user", schema=None) as batch_op:
    +        batch_op.add_column(
    +            sa.Column(
    +                "is_admin",
    +                sa.Boolean(),
    +                nullable=False,
    +                server_default=sa.sql.expression.false(),
    +            )
    +        )
    +
    +    # during migration we treat all users as admin for backward compatibility
    +    # this should be adjusted by server admins after upgrade
    +    session.execute(
    +        sa.text(
    +            """
    +            UPDATE user
    +            SET is_admin = true
    +            WHERE NOT is_service_account AND external_user_id IS NULL
    +            """
    +        )
    +    )
    +    # ### end Alembic commands ###
    +
    +
    +def downgrade() -> None:
    +    """Downgrade database schema and/or data back to the previous revision."""
    +    # ### commands auto generated by Alembic - please adjust! ###
    +    with op.batch_alter_table("user", schema=None) as batch_op:
    +        batch_op.drop_column("is_admin")
    +
    +    # ### end Alembic commands ###
    
  • src/zenml/zen_stores/schemas/user_schemas.py+4 0 modified
    @@ -77,6 +77,7 @@ class UserSchema(NamedSchema, table=True):
         hub_token: Optional[str] = Field(nullable=True)
         email_opted_in: Optional[bool] = Field(nullable=True)
         external_user_id: Optional[UUID] = Field(nullable=True)
    +    is_admin: bool = Field(default=False)
     
         stacks: List["StackSchema"] = Relationship(back_populates="user")
         components: List["StackComponentSchema"] = Relationship(
    @@ -167,6 +168,7 @@ def from_user_request(cls, model: UserRequest) -> "UserSchema":
                 email_opted_in=model.email_opted_in,
                 email=model.email,
                 is_service_account=False,
    +            is_admin=model.is_admin,
             )
     
         @classmethod
    @@ -189,6 +191,7 @@ def from_service_account_request(
                 is_service_account=True,
                 email_opted_in=False,
                 full_name="",
    +            is_admin=False,
             )
     
         def update_user(self, user_update: UserUpdate) -> "UserSchema":
    @@ -271,6 +274,7 @@ def to_model(
                     is_service_account=self.is_service_account,
                     created=self.created,
                     updated=self.updated,
    +                is_admin=self.is_admin,
                 ),
                 metadata=metadata,
             )
    
  • src/zenml/zen_stores/sql_zen_store.py+9 0 modified
    @@ -7514,6 +7514,14 @@ def update_user(
                     user_id, session=session, service_account=False
                 )
     
    +            if (
    +                existing_user.name == self._default_user_name
    +                and user_update.is_admin is False
    +            ):
    +                raise IllegalOperationError(
    +                    "The default user's admin status cannot be removed."
    +                )
    +
                 if (
                     user_update.name is not None
                     and user_update.name != existing_user.name
    @@ -7604,6 +7612,7 @@ def _get_or_create_default_user(self) -> UserResponse:
                         name=default_user_name,
                         active=True,
                         password=password,
    +                    is_admin=True,
                     )
                 )
     
    
  • tests/integration/functional/zen_stores/test_zen_store.py+251 6 modified
    @@ -12,10 +12,12 @@
     #  or implied. See the License for the specific language governing
     #  permissions and limitations under the License.
     import os
    +import random
     import time
     import uuid
     from contextlib import ExitStack as does_not_raise
     from datetime import datetime
    +from string import ascii_lowercase
     from threading import Thread
     from typing import Dict, List, Optional, Tuple
     from uuid import UUID, uuid4
    @@ -127,6 +129,7 @@
     from zenml.models.v2.core.user import UserFilter
     from zenml.utils import code_repository_utils, source_utils
     from zenml.utils.enum_utils import StrEnum
    +from zenml.zen_stores.rest_zen_store import RestZenStore
     from zenml.zen_stores.sql_zen_store import SqlZenStore
     
     DEFAULT_NAME = "default"
    @@ -377,6 +380,235 @@ def test_deleting_default_workspace_fails():
     # '-------'
     
     
    +class TestAdminUser:
    +    default_pwd = "".join(random.choices(ascii_lowercase, k=10))
    +
    +    def test_creation_as_admin_and_non_admin(self):
    +        """Tests creating a user as an admin and as a non-admin."""
    +        if Client().zen_store.type == StoreType.SQL:
    +            pytest.skip("SQL ZenStore does not support admin users.")
    +
    +        with UserContext(login=True, is_admin=False):
    +            zen_store: RestZenStore = Client().zen_store
    +            # this is not allowed for non-admin users
    +            with pytest.raises(IllegalOperationError):
    +                zen_store.create_user(
    +                    UserRequest(
    +                        name=sample_name("test_user"),
    +                        password=self.default_pwd,
    +                        is_admin=False,
    +                    )
    +                )
    +
    +    def test_listing_users(self):
    +        """Tests listing users as an admin and as a non-admin."""
    +        if Client().zen_store.type == StoreType.SQL:
    +            pytest.skip("SQL ZenStore does not support admin users.")
    +
    +        with UserContext(
    +            password=self.default_pwd, is_admin=False
    +        ) as test_user:
    +            zen_store: RestZenStore = Client().zen_store
    +            users = zen_store.list_users(UserFilter())
    +            assert users.total >= 2
    +
    +            # this is limited to self only for non-admin users
    +            with LoginContext(
    +                user_name=test_user.name, password=self.default_pwd
    +            ):
    +                zen_store = Client().zen_store
    +                users = zen_store.list_users(UserFilter())
    +                assert users.total == 1
    +
    +    def test_get_users(self):
    +        """Tests getting users as an admin and as a non-admin."""
    +        if Client().zen_store.type == StoreType.SQL:
    +            pytest.skip("SQL ZenStore does not support admin users.")
    +
    +        with UserContext(
    +            password=self.default_pwd, is_admin=False
    +        ) as test_user:
    +            zen_store: RestZenStore = Client().zen_store
    +
    +            user = zen_store.get_user(test_user.name)
    +            assert user.id == test_user.id
    +
    +            # this is not allowed for non-admin users
    +            with LoginContext(
    +                user_name=test_user.name, password=self.default_pwd
    +            ):
    +                zen_store = Client().zen_store
    +                with pytest.raises(IllegalOperationError):
    +                    zen_store.get_user(DEFAULT_USERNAME)
    +
    +    def test_update_users(self):
    +        """Tests updating users as an admin and as a non-admin."""
    +        if Client().zen_store.type == StoreType.SQL:
    +            pytest.skip("SQL ZenStore does not support admin users.")
    +
    +        with UserContext(
    +            password=self.default_pwd, is_admin=False
    +        ) as test_user:
    +            zen_store: RestZenStore = Client().zen_store
    +
    +            user = zen_store.update_user(
    +                test_user.id,
    +                UserUpdate(full_name="foo@bar.ai"),
    +            )
    +            assert user.full_name == "foo@bar.ai"
    +
    +            with UserContext(login=True, is_admin=False):
    +                zen_store = Client().zen_store
    +
    +                # this is not allowed for non-admin users
    +                with pytest.raises(IllegalOperationError):
    +                    zen_store.update_user(
    +                        test_user.id,
    +                        UserUpdate(full_name="bar@foo.io"),
    +                    )
    +
    +            # user is allowed to update itself
    +            with LoginContext(
    +                user_name=test_user.name, password=self.default_pwd
    +            ):
    +                zen_store = Client().zen_store
    +                user = zen_store.update_user(
    +                    test_user.id,
    +                    UserUpdate(full_name="bar@foo.io"),
    +                )
    +
    +                assert user.full_name == "bar@foo.io"
    +
    +    def test_deactivate_users(self):
    +        """Tests deactivating users as an admin and as a non-admin."""
    +        if Client().zen_store.type == StoreType.SQL:
    +            pytest.skip("SQL ZenStore does not support admin users.")
    +
    +        zen_store: RestZenStore = Client().zen_store
    +        with UserContext(
    +            password=self.default_pwd, is_admin=False
    +        ) as test_user:
    +            with UserContext(is_admin=False) as test_user2:
    +                # this is not allowed for non-admin users
    +                with LoginContext(
    +                    user_name=test_user.name, password=self.default_pwd
    +                ):
    +                    new_zen_store: RestZenStore = Client().zen_store
    +                    with pytest.raises(IllegalOperationError):
    +                        new_zen_store.put(
    +                            f"{USERS}/{str(test_user2.id)}{DEACTIVATE}"
    +                        )
    +
    +                response_body = zen_store.put(
    +                    f"{USERS}/{str(test_user2.id)}{DEACTIVATE}",
    +                )
    +                deactivated_user = UserResponse.parse_obj(response_body)
    +                assert deactivated_user.name == test_user2.name
    +
    +    def test_delete_users(self):
    +        """Tests deleting users as an admin and as a non-admin."""
    +        if Client().zen_store.type == StoreType.SQL:
    +            pytest.skip("SQL ZenStore does not support admin users.")
    +
    +        zen_store: RestZenStore = Client().zen_store
    +        with UserContext(
    +            password=self.default_pwd, is_admin=False
    +        ) as test_user:
    +            with UserContext(is_admin=False) as test_user2:
    +                # this is not allowed for non-admin users
    +                with LoginContext(
    +                    user_name=test_user.name, password=self.default_pwd
    +                ):
    +                    new_zen_store: RestZenStore = Client().zen_store
    +                    with pytest.raises(IllegalOperationError):
    +                        new_zen_store.delete_user(test_user2.id)
    +
    +            zen_store.delete_user(test_user.id)
    +
    +    def test_update_self_via_normal_endpoint(self):
    +        """Tests updating self in admin and non-admin setting."""
    +        if Client().zen_store.type == StoreType.SQL:
    +            pytest.skip("SQL ZenStore does not support admin users.")
    +
    +        zen_store: RestZenStore = Client().zen_store
    +        default_user = zen_store.get_user(DEFAULT_USERNAME)
    +        with pytest.raises(IllegalOperationError):
    +            # cannot update admin status for default user
    +            zen_store.update_user(
    +                default_user.id,
    +                UserUpdate(name=default_user.name, is_admin=False),
    +            )
    +        with UserContext(
    +            password=self.default_pwd, is_admin=False
    +        ) as test_user:
    +            # this is not allowed for non-admin users
    +            with LoginContext(
    +                user_name=test_user.name, password=self.default_pwd
    +            ):
    +                new_zen_store: RestZenStore = Client().zen_store
    +                with pytest.raises(IllegalOperationError):
    +                    new_zen_store.update_user(
    +                        test_user.id,
    +                        UserUpdate(
    +                            name=test_user.name,
    +                            is_admin=True,
    +                        ),
    +                    )
    +
    +        with UserContext(
    +            password=self.default_pwd, is_admin=False
    +        ) as test_user:
    +            zen_store.update_user(
    +                test_user.id,
    +                UserUpdate(name=test_user.name, is_admin=True),
    +            )
    +            with LoginContext(
    +                user_name=test_user.name, password=self.default_pwd
    +            ):
    +                new_zen_store: RestZenStore = Client().zen_store
    +                new_zen_store.update_user(
    +                    test_user.id,
    +                    UserUpdate(
    +                        name=test_user.name,
    +                        is_admin=False,
    +                    ),
    +                )
    +
    +    def test_update_self_via_current_user_endpoint(self):
    +        """Tests updating self in admin and non-admin setting."""
    +        if Client().zen_store.type == StoreType.SQL:
    +            pytest.skip("SQL ZenStore does not support admin users.")
    +
    +        zen_store: RestZenStore = Client().zen_store
    +        default_user = zen_store.get_user(DEFAULT_USERNAME)
    +        assert default_user.is_admin
    +        # self update cannot change admin status
    +        zen_store.put(
    +            "/current-user",
    +            body=UserUpdate(name=default_user.name, is_admin=False),
    +        )
    +        default_user = zen_store.get_user(DEFAULT_USERNAME)
    +        assert default_user.is_admin
    +        with UserContext(
    +            password=self.default_pwd, is_admin=False
    +        ) as test_user:
    +            with LoginContext(
    +                user_name=test_user.name, password=self.default_pwd
    +            ):
    +                new_zen_store: RestZenStore = Client().zen_store
    +                assert not test_user.is_admin
    +                # self update cannot change admin status
    +                new_zen_store.put(
    +                    "/current-user",
    +                    body=UserUpdate(
    +                        name=test_user.name,
    +                        is_admin=True,
    +                    ),
    +                )
    +                user = zen_store.get_user(test_user.id)
    +                assert not user.is_admin
    +
    +
     def test_active_user():
         """Tests the active user can be queried with .get_user()."""
         zen_store = Client().zen_store
    @@ -397,14 +629,20 @@ def test_creating_user_with_existing_name_fails():
         with UserContext() as existing_user:
             with pytest.raises(EntityExistsError):
                 zen_store.create_user(
    -                UserRequest(name=existing_user.name, password="password")
    +                UserRequest(
    +                    name=existing_user.name,
    +                    password="password",
    +                    is_admin=False,
    +                )
                 )
     
         with ServiceAccountContext() as existing_service_account:
             with does_not_raise():
                 user = zen_store.create_user(
                     UserRequest(
    -                    name=existing_service_account.name, password="password"
    +                    name=existing_service_account.name,
    +                    password="password",
    +                    is_admin=False,
                     )
                 )
                 # clean up
    @@ -434,7 +672,9 @@ def silent_create_user(user_request: UserRequest):
         for _ in range(count):
             t = Thread(
                 target=silent_create_user,
    -            args=(UserRequest(name=user_name, password=password),),
    +            args=(
    +                UserRequest(name=user_name, password=password, is_admin=False),
    +            ),
             )
             threads.append(t)
         for t in threads:
    @@ -740,7 +980,7 @@ def test_create_user_no_password():
             with pytest.raises(AuthorizationException):
                 response_body = store.put(
                     f"{USERS}/{str(user.id)}{ACTIVATE}",
    -                body=UserUpdate(password="password"),
    +                body=UserUpdate(password="password", is_admin=user.is_admin),
                 )
     
             with pytest.raises(AuthorizationException):
    @@ -750,7 +990,9 @@ def test_create_user_no_password():
             response_body = store.put(
                 f"{USERS}/{str(user.id)}{ACTIVATE}",
                 body=UserUpdate(
    -                password="password", activation_token=user.activation_token
    +                password="password",
    +                activation_token=user.activation_token,
    +                is_admin=user.is_admin,
                 ),
             )
             activated_user = UserResponse.parse_obj(response_body)
    @@ -793,7 +1035,9 @@ def test_reactivate_user():
             with pytest.raises(AuthorizationException):
                 response_body = store.put(
                     f"{USERS}/{str(user.id)}{ACTIVATE}",
    -                body=UserUpdate(password="newpassword"),
    +                body=UserUpdate(
    +                    password="newpassword", is_admin=user.is_admin
    +                ),
                 )
     
             with pytest.raises(AuthorizationException):
    @@ -809,6 +1053,7 @@ def test_reactivate_user():
                 body=UserUpdate(
                     password="newpassword",
                     activation_token=deactivated_user.activation_token,
    +                is_admin=user.is_admin,
                 ),
             )
             activated_user = UserResponse.parse_obj(response_body)
    
  • tests/integration/functional/zen_stores/utils.py+7 2 modified
    @@ -191,6 +191,7 @@ def __init__(
             login: bool = False,
             existing_user: bool = False,
             delete: bool = True,
    +        is_admin: bool = True,
         ):
             if existing_user:
                 self.user_name = user_name
    @@ -207,11 +208,15 @@ def __init__(
                 self.password = password or random_str(32)
             self.existing_user = existing_user
             self.delete = delete
    +        self.is_admin = is_admin
     
         def __enter__(self):
             if not self.existing_user:
                 new_user = UserRequest(
    -                name=self.user_name, password=self.password, active=True
    +                name=self.user_name,
    +                password=self.password,
    +                active=True,
    +                is_admin=self.is_admin,
                 )
                 self.created_user = self.store.create_user(new_user)
             else:
    @@ -1011,7 +1016,7 @@ def cleanup(self) -> None:
         entity_name="workspace",
     )
     user_crud_test_config = CrudTestConfig(
    -    create_model=UserRequest(name=sample_name("sample_user")),
    +    create_model=UserRequest(name=sample_name("sample_user"), is_admin=True),
         update_model=UserUpdate(name=sample_name("updated_sample_user")),
         filter_model=UserFilter,
         entity_name="user",
    
  • tests/unit/conftest.py+1 0 modified
    @@ -372,6 +372,7 @@ def sample_user_model() -> UserResponse:
                 created=datetime.now(),
                 updated=datetime.now(),
                 is_service_account=False,
    +            is_admin=True,
             ),
             metadata=UserResponseMetadata(),
         )
    
  • tests/unit/models/test_user_models.py+2 6 modified
    @@ -26,15 +26,11 @@ def test_user_request_model_fails_with_long_password():
         """Test that the user request model fails with long passwords."""
         long_password = "a" * (STR_FIELD_MAX_LENGTH + 1)
         with pytest.raises(ValidationError):
    -        UserRequest(
    -            password=long_password,
    -        )
    +        UserRequest(password=long_password, is_admin=False)
     
     
     def test_user_request_model_fails_with_long_activation_token():
         """Test that the user request model fails with long activation tokens."""
         long_token = "a" * (STR_FIELD_MAX_LENGTH + 1)
         with pytest.raises(ValidationError):
    -        UserRequest(
    -            activation_token=long_token,
    -        )
    +        UserRequest(activation_token=long_token, is_admin=False)
    

Vulnerability mechanics

Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

5

News mentions

0

No linked articles in our index yet.