VYPR
High severityNVD Advisory· Published Feb 15, 2023· Updated Mar 10, 2025

Starlite DoS vulnerability when parsing multipart request body

CVE-2023-25578

Description

Starlite is an Asynchronous Server Gateway Interface (ASGI) framework. Prior to version 1.5.2, the request body parsing in starlite allows a potentially unauthenticated attacker to consume a large amount of CPU time and RAM. The multipart body parser processes an unlimited number of file parts and an unlimited number of field parts. This is a remote, potentially unauthenticated Denial of Service vulnerability. This vulnerability affects applications with a request handler that accepts a Body(media_type=RequestEncodingType.MULTI_PART). The large amount of CPU time required for processing requests can block all available worker processes and significantly delay or slow down the processing of legitimate user requests. The large amount of RAM accumulated while processing requests can lead to Out-Of-Memory kills. Complete DoS is achievable by sending many concurrent multipart requests in a loop. Version 1.51.2 contains a patch for this issue.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
starlitePyPI
< 1.51.21.51.2

Affected products

1

Patches

1
9674fe803628

Merge pull request from GHSA-p24m-863f-fm6q

https://github.com/starlite-api/starlitePeter SchuttFeb 15, 2023via ghsa
9 files changed · +149 56
  • starlite/app.py+6 0 modified
    @@ -135,6 +135,7 @@ class Starlite(Router):
             "get_logger",
             "logger",
             "logging_config",
    +        "multipart_form_part_limit",
             "on_shutdown",
             "on_startup",
             "openapi_config",
    @@ -175,6 +176,7 @@ def __init__(
             initial_state: Optional["InitialStateType"] = None,
             logging_config: Union["BaseLoggingConfig", "EmptyType", None] = Empty,
             middleware: Optional[List["Middleware"]] = None,
    +        multipart_form_part_limit: int = 1000,
             on_app_init: Optional[List["OnAppInitHandler"]] = None,
             on_shutdown: Optional[List["LifeSpanHandler"]] = None,
             on_startup: Optional[List["LifeSpanHandler"]] = None,
    @@ -238,6 +240,8 @@ def __init__(
                 initial_state: An object from which to initialize the app state.
                 logging_config: A subclass of :class:`BaseLoggingConfig <starlite.config.logging.BaseLoggingConfig>`.
                 middleware: A list of :class:`Middleware <starlite.types.Middleware>`.
    +            multipart_form_part_limit: The maximal number of allowed parts in a multipart/formdata request.
    +                This limit is intended to protect from DoS attacks.
                 on_app_init: A sequence of :class:`OnAppInitHandler <starlite.types.OnAppInitHandler>` instances. Handlers receive
                     an instance of :class:`AppConfig <starlite.config.app.AppConfig>` that will have been initially populated with
                     the parameters passed to :class:`Starlite <starlite.app.Starlite>`, and must return an instance of same. If more
    @@ -300,6 +304,7 @@ def __init__(
                 initial_state=initial_state or {},
                 logging_config=logging_config if logging_config is not Empty else LoggingConfig() if debug else None,  # type: ignore[arg-type]
                 middleware=middleware or [],
    +            multipart_form_part_limit=multipart_form_part_limit,
                 on_shutdown=on_shutdown or [],
                 on_startup=on_startup or [],
                 openapi_config=openapi_config,
    @@ -343,6 +348,7 @@ def __init__(
             self.static_files_config = config.static_files_config
             self.template_engine = config.template_config.engine_instance if config.template_config else None
             self.websocket_class = config.websocket_class or WebSocket
    +        self.multipart_form_part_limit = config.multipart_form_part_limit
     
             super().__init__(
                 after_request=config.after_request,
    
  • starlite/config/app.py+2 0 modified
    @@ -178,6 +178,8 @@ class Config(BaseConfig):
         """A mapping of types to callables that transform them into types supported for serialization."""
         websocket_class: Optional[Type[WebSocket]]
         """An optional subclass of :class:`WebSocket <starlite.connection.websocket.WebSocket>` to use for websocket connections."""
    +    multipart_form_part_limit: int
    +    """The maximal number of allowed parts in a multipart/formdata request. This limit is intended to protect from DoS attacks."""
     
         @validator("allowed_hosts", always=True)
         def validate_allowed_hosts(  # pylint: disable=no-self-argument
    
  • starlite/connection/request.py+3 1 modified
    @@ -150,7 +150,9 @@ async def form(self) -> FormMultiDict:
                 content_type, options = self.content_type
                 if content_type == RequestEncodingType.MULTI_PART:
                     self._form = self.scope["_form"] = form_values = parse_multipart_form(  # type: ignore[typeddict-item]
    -                    body=await self.body(), boundary=options.get("boundary", "").encode()
    +                    body=await self.body(),
    +                    boundary=options.get("boundary", "").encode(),
    +                    multipart_form_part_limit=self.app.multipart_form_part_limit,
                     )
                     return FormMultiDict(form_values)
                 if content_type == RequestEncodingType.URL_ENCODED:
    
  • starlite/kwargs/extractors.py+12 1 modified
    @@ -19,6 +19,7 @@
     from starlite.enums import ParamType, RequestEncodingType
     from starlite.exceptions import ValidationException
     from starlite.multipart import parse_multipart_form
    +from starlite.params import BodyKwarg
     from starlite.parsers import (
         parse_headers,
         parse_query_string,
    @@ -289,15 +290,25 @@ def create_multipart_extractor(
         Returns:
             An extractor function.
         """
    +    body_kwarg_multipart_form_part_limit: Optional[int] = None
    +    if signature_field.kwarg_model and isinstance(signature_field.kwarg_model, BodyKwarg):
    +        body_kwarg_multipart_form_part_limit = signature_field.kwarg_model.multipart_form_part_limit
     
         async def extract_multipart(
             connection: "Request[Any, Any]",
         ) -> Any:
    +        multipart_form_part_limit = (
    +            body_kwarg_multipart_form_part_limit
    +            if body_kwarg_multipart_form_part_limit is not None
    +            else connection.app.multipart_form_part_limit
    +        )
             connection.scope["_form"] = form_values = (  # type: ignore[typeddict-item]
                 connection.scope["_form"]  # type: ignore[typeddict-item]
                 if "_form" in connection.scope
                 else parse_multipart_form(
    -                body=await connection.body(), boundary=connection.content_type[-1].get("boundary", "").encode()
    +                body=await connection.body(),
    +                boundary=connection.content_type[-1].get("boundary", "").encode(),
    +                multipart_form_part_limit=multipart_form_part_limit,
                 )
             )
     
    
  • starlite/multipart.py+76 52 modified
    @@ -30,10 +30,11 @@
     from urllib.parse import unquote
     
     from starlite.datastructures.upload_file import UploadFile
    -from starlite.exceptions import SerializationException
    +from starlite.exceptions import SerializationException, ValidationException
     from starlite.utils.serialization import decode_json
     
    -_token, _quoted = r"([\w!#$%&'*+\-.^_`|~]+)", r'"([^"]*)"'
    +_token = r"([\w!#$%&'*+\-.^_`|~]+)"
    +_quoted = r'"([^"]*)"'
     _param = re.compile(rf";\s*{_token}=(?:{_token}|{_quoted})", re.ASCII)
     _firefox_quote_escape = re.compile(r'\\"(?!; |\s*$)')
     
    @@ -59,67 +60,90 @@ def parse_content_header(value: str) -> Tuple[str, Dict[str, str]]:
         return value.strip().lower(), options
     
     
    -def parse_multipart_form(body: bytes, boundary: bytes) -> Dict[str, Any]:
    +def parse_body(body: bytes, boundary: bytes, multipart_form_part_limit: int) -> List[bytes]:
    +    """Split the body using the boundary
    +        and validate the number of form parts is within the allowed limit.
    +
    +    :param body: The form body.
    +    :param boundary: The boundary used to separate form components.
    +    :param multipart_form_part_limit: The limit of allowed form components
    +    :return:
    +        A list of form components.
    +    """
    +    if not (body and boundary):
    +        return []
    +
    +    form_parts = body.split(boundary, multipart_form_part_limit + 3)[1:-1]
    +
    +    if len(form_parts) > multipart_form_part_limit:
    +        raise ValidationException(
    +            f"number of multipart components exceeds the allowed limit of {multipart_form_part_limit}, "
    +            f"this potentially indicates a DoS attack"
    +        )
    +
    +    return form_parts
    +
    +
    +def parse_multipart_form(body: bytes, boundary: bytes, multipart_form_part_limit: int = 1000) -> Dict[str, Any]:
         """Parse multipart form data.
     
         Args:
             body: Body of the request.
             boundary: Boundary of the multipart message.
    +        multipart_form_part_limit: Limit of the number of parts allowed.
     
         Returns:
             A dictionary of parsed results.
         """
     
         fields: DefaultDict[str, List[Any]] = defaultdict(list)
     
    -    if body and boundary:
    -        form_parts = body.split(boundary)
    -        for form_part in form_parts[1:-1]:
    -            file_name = None
    -            content_type = "text/plain"
    -            content_charset = "utf-8"
    -            field_name = None
    -            line_index = 2
    -            line_end_index = 0
    -            headers: List[Tuple[str, str]] = []
    -
    -            while line_end_index != -1:
    -                line_end_index = form_part.find(b"\r\n", line_index)
    -                form_line = form_part[line_index:line_end_index].decode("utf-8")
    -
    -                if not form_line:
    -                    break
    -
    -                line_index = line_end_index + 2
    -                colon_index = form_line.index(":")
    -                current_idx = colon_index + 2
    -                form_header_field = form_line[0:colon_index].lower()
    -                form_header_value, form_parameters = parse_content_header(form_line[current_idx:])
    -
    -                if form_header_field == "content-disposition":
    -                    field_name = form_parameters.get("name")
    -                    file_name = form_parameters.get("filename")
    -
    -                    if file_name is None and (filename_with_asterisk := form_parameters.get("filename*")):
    -                        encoding, _, value = decode_rfc2231(filename_with_asterisk)
    -                        file_name = unquote(value, encoding=encoding or content_charset)
    -
    -                elif form_header_field == "content-type":
    -                    content_type = form_header_value
    -                    content_charset = form_parameters.get("charset", "utf-8")
    -                headers.append((form_header_field, form_header_value))
    -
    -            if field_name:
    -                post_data = form_part[line_index:-4].lstrip(b"\r\n")
    -                if file_name:
    -                    form_file = UploadFile(
    -                        content_type=content_type, filename=file_name, file_data=post_data, headers=dict(headers)
    -                    )
    -                    fields[field_name].append(form_file)
    -                else:
    -                    try:
    -                        fields[field_name].append(decode_json(post_data))
    -                    except SerializationException:
    -                        fields[field_name].append(post_data.decode(content_charset))
    +    for form_part in parse_body(body=body, boundary=boundary, multipart_form_part_limit=multipart_form_part_limit):
    +        file_name = None
    +        content_type = "text/plain"
    +        content_charset = "utf-8"
    +        field_name = None
    +        line_index = 2
    +        line_end_index = 0
    +        headers: List[Tuple[str, str]] = []
    +
    +        while line_end_index != -1:
    +            line_end_index = form_part.find(b"\r\n", line_index)
    +            form_line = form_part[line_index:line_end_index].decode("utf-8")
    +
    +            if not form_line:
    +                break
    +
    +            line_index = line_end_index + 2
    +            colon_index = form_line.index(":")
    +            current_idx = colon_index + 2
    +            form_header_field = form_line[0:colon_index].lower()
    +            form_header_value, form_parameters = parse_content_header(form_line[current_idx:])
    +
    +            if form_header_field == "content-disposition":
    +                field_name = form_parameters.get("name")
    +                file_name = form_parameters.get("filename")
    +
    +                if file_name is None and (filename_with_asterisk := form_parameters.get("filename*")):
    +                    encoding, _, value = decode_rfc2231(filename_with_asterisk)
    +                    file_name = unquote(value, encoding=encoding or content_charset)
    +
    +            elif form_header_field == "content-type":
    +                content_type = form_header_value
    +                content_charset = form_parameters.get("charset", "utf-8")
    +            headers.append((form_header_field, form_header_value))
    +
    +        if field_name:
    +            post_data = form_part[line_index:-4].lstrip(b"\r\n")
    +            if file_name:
    +                form_file = UploadFile(
    +                    content_type=content_type, filename=file_name, file_data=post_data, headers=dict(headers)
    +                )
    +                fields[field_name].append(form_file)
    +            else:
    +                try:
    +                    fields[field_name].append(decode_json(post_data))
    +                except SerializationException:
    +                    fields[field_name].append(post_data.decode(content_charset))
     
         return {k: v if len(v) > 1 else v[0] for k, v in fields.items()}
    
  • starlite/params.py+7 1 modified
    @@ -286,6 +286,8 @@ class BodyKwarg:
     
         Equivalent to pattern in the OpenAPI specification.
         """
    +    multipart_form_part_limit: Optional[int] = field(default=None)
    +    """The maximal number of allowed parts in a multipart/formdata request. This limit is intended to protect from DoS attacks."""
     
         def __hash__(self) -> int:  # pragma: no cover
             """Hash the dataclass in a safe way.
    @@ -315,7 +317,8 @@ def Body(
         max_items: Optional[int] = None,
         min_length: Optional[int] = None,
         max_length: Optional[int] = None,
    -    regex: Optional[str] = None
    +    regex: Optional[str] = None,
    +    multipart_form_part_limit: Optional[int] = None
     ) -> Any:
         """Create an extended request body kwarg definition.
     
    @@ -354,6 +357,8 @@ def Body(
                 maxLength in the OpenAPI specification.
             regex: A string representing a regex against which the given string will be matched.
                 Equivalent to pattern in the OpenAPI specification.
    +        multipart_form_part_limit: The maximal number of allowed parts in a multipart/formdata request.
    +            This limit is intended to protect from DoS attacks.
         """
         return BodyKwarg(
             media_type=media_type,
    @@ -374,6 +379,7 @@ def Body(
             min_length=min_length,
             max_length=max_length,
             regex=regex,
    +        multipart_form_part_limit=multipart_form_part_limit,
         )
     
     
    
  • starlite/testing/create_test_client.py+4 0 modified
    @@ -78,6 +78,7 @@ def create_test_client(
         initial_state: Optional[Union["ImmutableState", Dict[str, Any], Iterable[Tuple[str, Any]]]] = None,
         logging_config: Optional["BaseLoggingConfig"] = None,
         middleware: Optional[List["Middleware"]] = None,
    +    multipart_form_part_limit: int = 1000,
         on_app_init: Optional[List["OnAppInitHandler"]] = None,
         on_shutdown: Optional[List["LifeSpanHandler"]] = None,
         on_startup: Optional[List["LifeSpanHandler"]] = None,
    @@ -160,6 +161,8 @@ def test_my_handler() -> None:
             initial_state: An object from which to initialize the app state.
             logging_config: A subclass of :class:`BaseLoggingConfig <starlite.config.logging.BaseLoggingConfig>`.
             middleware: A list of :class:`Middleware <starlite.types.Middleware>`.
    +        multipart_form_part_limit: The maximal number of allowed parts in a multipart/formdata request.
    +            This limit is intended to protect from DoS attacks.
             on_app_init:  A sequence of :class:`OnAppInitHandler <starlite.types.OnAppInitHandler>` instances. Handlers receive
                     an instance of :class:`AppConfig <starlite.config.app.AppConfig>` that will have been initially populated with
                     the parameters passed to :class:`Starlite <starlite.app.Starlite>`, and must return an instance of same. If more
    @@ -210,6 +213,7 @@ def test_my_handler() -> None:
                 initial_state=initial_state,
                 logging_config=logging_config,
                 middleware=middleware,
    +            multipart_form_part_limit=multipart_form_part_limit,
                 on_app_init=on_app_init,
                 on_shutdown=on_shutdown,
                 on_startup=on_startup,
    
  • tests/app/test_app_config.py+1 0 modified
    @@ -35,6 +35,7 @@ def app_config_object() -> AppConfig:
             initial_state={},
             logging_config=None,
             middleware=[],
    +        multipart_form_part_limit=1000,
             on_shutdown=[],
             on_startup=[],
             openapi_config=None,
    
  • tests/kwargs/test_multipart_data.py+38 1 modified
    @@ -10,7 +10,7 @@
     
     from starlite import Body, Request, RequestEncodingType, post
     from starlite.datastructures import UploadFile
    -from starlite.status_codes import HTTP_201_CREATED
    +from starlite.status_codes import HTTP_201_CREATED, HTTP_400_BAD_REQUEST
     from starlite.testing import create_test_client
     from tests import Person, PersonFactory
     from tests.kwargs import Form
    @@ -405,3 +405,40 @@ async def hello_world(data: Optional[UploadFile] = Body(media_type=RequestEncodi
         with create_test_client(route_handlers=[hello_world]) as client:
             response = client.post("/")
             assert response.status_code == HTTP_201_CREATED
    +
    +
    +@pytest.mark.parametrize("limit", (1000, 100, 10))
    +def test_multipart_form_part_limit(limit: int) -> None:
    +    @post("/")
    +    async def hello_world(data: List[UploadFile] = Body(media_type=RequestEncodingType.MULTI_PART)) -> None:
    +        assert len(data) == limit
    +
    +    with create_test_client(route_handlers=[hello_world], multipart_form_part_limit=limit) as client:
    +        data = {str(i): "a" for i in range(limit)}
    +        response = client.post("/", files=data)
    +        assert response.status_code == HTTP_201_CREATED
    +
    +        data = {str(i): "a" for i in range(limit)}
    +        data[str(limit + 1)] = "b"
    +        response = client.post("/", files=data)
    +        assert response.status_code == HTTP_400_BAD_REQUEST
    +
    +
    +def test_multipart_form_part_limit_body_param_precedence() -> None:
    +    app_limit = 100
    +    route_limit = 10
    +
    +    @post("/")
    +    async def hello_world(
    +        data: List[UploadFile] = Body(media_type=RequestEncodingType.MULTI_PART, multipart_form_part_limit=route_limit)
    +    ) -> None:
    +        assert len(data) == route_limit
    +
    +    with create_test_client(route_handlers=[hello_world], multipart_form_part_limit=app_limit) as client:
    +        data = {str(i): "a" for i in range(route_limit)}
    +        response = client.post("/", files=data)
    +        assert response.status_code == HTTP_201_CREATED
    +
    +        data = {str(i): "a" for i in range(route_limit + 1)}
    +        response = client.post("/", files=data)
    +        assert response.status_code == HTTP_400_BAD_REQUEST
    

Vulnerability mechanics

Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

6

News mentions

0

No linked articles in our index yet.