VYPR
High severityNVD Advisory· Published Aug 27, 2024· Updated Aug 27, 2024

Apollo Router Coprocessors may cause Denial-of-Service when handling request bodies

CVE-2024-43783

Description

The Apollo Router Core is a configurable, high-performance graph router written in Rust to run a federated supergraph that uses Apollo Federation 2. Instances of the Apollo Router running versions >=1.21.0 and < 1.52.1 are impacted by a denial of service vulnerability if _all_ of the following are true: 1. The Apollo Router has been configured to support External Coprocessing. 2. The Apollo Router has been configured to send request bodies to coprocessors. This is a non-default configuration and must be configured intentionally by administrators. Instances of the Apollo Router running versions >=1.7.0 and <1.52.1 are impacted by a denial-of-service vulnerability if all of the following are true: 1. Router has been configured to use a custom-developed Native Rust Plugin. 2. The plugin accesses Request.router_request in the RouterService layer. 3. You are accumulating the body from Request.router_request into memory. If using an impacted configuration, the Router will load entire HTTP request bodies into memory without respect to other HTTP request size-limiting configurations like limits.http_max_request_bytes. This can cause the Router to be out-of-memory (OOM) terminated if a sufficiently large request is sent to the Router. By default, the Router sets limits.http_max_request_bytes to 2 MB. If you have an impacted configuration as defined above, please upgrade to at least Apollo Router 1.52.1. If you cannot upgrade, you can mitigate the denial-of-service opportunity impacting External Coprocessors by setting the coprocessor.router.request.body configuration option to false. Please note that changing this configuration option will change the information sent to any coprocessors you have configured and may impact functionality implemented by those coprocessors. If you have developed a Native Rust Plugin and cannot upgrade, you can update your plugin to either not accumulate the request body or enforce a maximum body size limit. You can also mitigate this issue by limiting HTTP body payload sizes prior to the Router (e.g., in a proxy or web application firewall appliance).

AI Insight

LLM-synthesized narrative grounded in this CVE's description and references.

Apollo Router versions 1.21.0–1.52.1 can be forced out of memory by large requests when external coprocessors or native Rust plugins accumulate request bodies.

Vulnerability

Summary

The Apollo Router Core, a high-performance federated graph router, is susceptible to a denial-of-service (DoS) vulnerability in versions 1.21.0 through 1.52.1 (and 1.7.0–1.52.1 for native plugins). The root cause is that the router loads entire HTTP request bodies into memory without respecting the configured limits.http_max_request_bytes limit when certain optional features are enabled. This bypass can lead to out-of-memory (OOM) termination of the router process [1][2].

Exploitation

Conditions

The vulnerability is only exploitable under specific non‑default configurations. For external coprocessing, the router must be configured to send request bodies to coprocessors — a deliberate opt‑in setting. For native Rust plugins, the plugin must access Request.router_request in the RouterService layer and accumulate the body into memory. An attacker needs only network access to send a sufficiently large HTTP request to the router; no authentication is required beyond normal network reachability [1][2].

Impact

If the conditions are met, a crafted oversize request can exhaust available memory, causing the router to be OOM‑killed. This results in a complete denial of service for the federated supergraph, disrupting all GraphQL operations until the router is restarted. The default limits.http_max_request_bytes of 2 MB is circumvented, so an attacker could trigger memory exhaustion with a request larger than that limit [1][2].

Mitigation

Apollo has released version 1.52.1, which enforces body size limits early in request processing, preventing the OOM condition [4]. For users who cannot upgrade, two workarounds exist: for external coprocessors, set coprocessor.router.request.body to false (though this may affect coprocessor functionality); for native Rust plugins, modify the plugin to not accumulate the request body or enforce its own size limit [1][2]. The vulnerability is not exploitable in default configurations.

AI Insight generated on May 20, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
apollo-routercrates.io
>= 1.7.0, < 1.52.11.52.1

Affected products

2

Patches

1
7a9c020608a6

fix(security): CVE-2024-43783: Enforce body limits early in request pipeline

25 files changed · +1591 437
  • apollo-router/src/axum_factory/tests.rs+35 79 modified
    @@ -44,8 +44,6 @@ use test_log::test;
     use tokio::io::AsyncRead;
     use tokio::io::AsyncReadExt;
     use tokio::io::AsyncWriteExt;
    -#[cfg(unix)]
    -use tokio::io::BufReader;
     use tokio::sync::mpsc;
     use tokio_util::io::StreamReader;
     use tower::service_fn;
    @@ -1270,14 +1268,11 @@ async fn it_answers_to_custom_endpoint() -> Result<(), ApolloRouterError> {
             Ok::<_, BoxError>(
                 http::Response::builder()
                     .status(StatusCode::OK)
    -                .body(
    -                    format!(
    -                        "{} + {}",
    -                        req.router_request.method(),
    -                        req.router_request.uri().path()
    -                    )
    -                    .into(),
    -                )
    +                .body(format!(
    +                    "{} + {}",
    +                    req.router_request.method(),
    +                    req.router_request.uri().path()
    +                ))
                     .unwrap()
                     .into(),
             )
    @@ -1380,14 +1375,11 @@ async fn it_refuses_to_bind_two_extra_endpoints_on_the_same_path() {
             Ok::<_, BoxError>(
                 http::Response::builder()
                     .status(StatusCode::OK)
    -                .body(
    -                    format!(
    -                        "{} + {}",
    -                        req.router_request.method(),
    -                        req.router_request.uri().path()
    -                    )
    -                    .into(),
    -                )
    +                .body(format!(
    +                    "{} + {}",
    +                    req.router_request.method(),
    +                    req.router_request.uri().path()
    +                ))
                     .unwrap()
                     .into(),
             )
    @@ -2076,88 +2068,52 @@ async fn listening_to_unix_socket() {
         .await;
     
         assert_eq!(
    -        serde_json::from_slice::<graphql::Response>(&output).unwrap(),
    +        serde_json::from_str::<graphql::Response>(&output).unwrap(),
             expected_response,
         );
     
         // Get query
         let output = send_to_unix_socket(
             server.graphql_listen_address().as_ref().unwrap(),
             Method::GET,
    -        r#"query=query%7Bme%7Bname%7D%7D"#,
    +        r#"/?query=query%7Bme%7Bname%7D%7D"#,
         )
         .await;
     
         assert_eq!(
    -        serde_json::from_slice::<graphql::Response>(&output).unwrap(),
    +        serde_json::from_str::<graphql::Response>(&output).unwrap(),
             expected_response,
         );
     
         server.shutdown().await.unwrap();
     }
     
     #[cfg(unix)]
    -async fn send_to_unix_socket(addr: &ListenAddr, method: Method, body: &str) -> Vec<u8> {
    -    use tokio::io::AsyncBufReadExt;
    -    use tokio::io::Interest;
    +async fn send_to_unix_socket(addr: &ListenAddr, method: Method, body: &str) -> String {
         use tokio::net::UnixStream;
    -
    -    let content = match method {
    -        Method::GET => {
    -            format!(
    -                "{} /?{} HTTP/1.1\r
    -Host: localhost:4100\r
    -Content-Length: {}\r
    -Content-Type: application/json\r
    -Accept: application/json\r
    -
    -\n",
    -                method.as_str(),
    -                body,
    -                body.len(),
    -            )
    -        }
    -        Method::POST => {
    -            format!(
    -                "{} / HTTP/1.1\r
    -Host: localhost:4100\r
    -Content-Length: {}\r
    -Content-Type: application/json\r
    -Accept: application/json\r
    -
    -{}\n",
    -                method.as_str(),
    -                body.len(),
    -                body
    -            )
    -        }
    -        _ => {
    -            unimplemented!()
    -        }
    -    };
    -    let mut stream = UnixStream::connect(addr.to_string()).await.unwrap();
    -    stream.ready(Interest::WRITABLE).await.unwrap();
    -    stream.write_all(content.as_bytes()).await.unwrap();
    -    stream.flush().await.unwrap();
    -    let stream = BufReader::new(stream);
    -    let mut lines = stream.lines();
    -    let header_first_line = lines
    -        .next_line()
    -        .await
    -        .unwrap()
    -        .expect("no header received");
    -    // skip the rest of the headers
    -    let mut headers = String::new();
    -    let mut stream = lines.into_inner();
    -    loop {
    -        if stream.read_line(&mut headers).await.unwrap() == 2 {
    -            break;
    +    let stream = UnixStream::connect(addr.to_string()).await.unwrap();
    +    let (mut sender, conn) = hyper::client::conn::handshake(stream).await.unwrap();
    +    tokio::task::spawn(async move {
    +        if let Err(err) = conn.await {
    +            println!("Connection failed: {:?}", err);
             }
    +    });
    +
    +    let http_body = hyper::Body::from(body.to_string());
    +    let mut request = http::Request::builder()
    +        .method(method.clone())
    +        .header("Host", "localhost:4100")
    +        .header("Content-Type", "application/json")
    +        .header("Accept", "application/json")
    +        .body(http_body)
    +        .unwrap();
    +    if method == Method::GET {
    +        *request.uri_mut() = body.parse().unwrap();
         }
    -    // get rest of the buffer as body
    -    let body = stream.buffer().to_vec();
    -    assert!(header_first_line.contains(" 200 "), "");
    -    body
    +
    +    let response = sender.send_request(request).await.unwrap();
    +    let body = response.collect().await.unwrap().to_bytes();
    +    String::from_utf8(body.to_vec()).unwrap()
     }
     
     #[tokio::test]
    
  • apollo-router/src/configuration/mod.rs+13 105 modified
    @@ -47,6 +47,7 @@ use crate::configuration::schema::Mode;
     use crate::graphql;
     use crate::notification::Notify;
     use crate::plugin::plugins;
    +use crate::plugins::limits;
     use crate::plugins::subscription::SubscriptionConfig;
     use crate::plugins::subscription::APOLLO_SUBSCRIPTION_PLUGIN;
     use crate::plugins::subscription::APOLLO_SUBSCRIPTION_PLUGIN_NAME;
    @@ -154,7 +155,7 @@ pub struct Configuration {
     
         /// Configuration for operation limits, parser limits, HTTP limits, etc.
         #[serde(default)]
    -    pub(crate) limits: Limits,
    +    pub(crate) limits: limits::Config,
     
         /// Configuration for chaos testing, trying to reproduce bugs that require uncommon conditions.
         /// You probably don’t want this in production!
    @@ -251,18 +252,25 @@ impl<'de> serde::Deserialize<'de> for Configuration {
                 tls: Tls,
                 apq: Apq,
                 persisted_queries: PersistedQueries,
    -            limits: Limits,
    +            limits: limits::Config,
                 experimental_chaos: Chaos,
                 batching: Batching,
                 experimental_type_conditioned_fetching: bool,
                 experimental_apollo_metrics_generation_mode: ApolloMetricsGenerationMode,
                 experimental_query_planner_mode: QueryPlannerMode,
             }
    -        let ad_hoc: AdHocConfiguration = serde::Deserialize::deserialize(deserializer)?;
    +        let mut ad_hoc: AdHocConfiguration = serde::Deserialize::deserialize(deserializer)?;
     
             let notify = Configuration::notify(&ad_hoc.apollo_plugins.plugins)
                 .map_err(|e| serde::de::Error::custom(e.to_string()))?;
     
    +        // Allow the limits plugin to use the configuration from the configuration struct.
    +        // This means that the limits plugin will get the regular configuration via plugin init.
    +        ad_hoc.apollo_plugins.plugins.insert(
    +            "limits".to_string(),
    +            serde_json::to_value(&ad_hoc.limits).unwrap(),
    +        );
    +
             // Use a struct literal instead of a builder to ensure this is exhaustive
             Configuration {
                 health_check: ad_hoc.health_check,
    @@ -319,7 +327,7 @@ impl Configuration {
             tls: Option<Tls>,
             apq: Option<Apq>,
             persisted_query: Option<PersistedQueries>,
    -        operation_limits: Option<Limits>,
    +        operation_limits: Option<limits::Config>,
             chaos: Option<Chaos>,
             uplink: Option<UplinkConfig>,
             experimental_type_conditioned_fetching: Option<bool>,
    @@ -439,7 +447,7 @@ impl Configuration {
             notify: Option<Notify<String, graphql::Response>>,
             apq: Option<Apq>,
             persisted_query: Option<PersistedQueries>,
    -        operation_limits: Option<Limits>,
    +        operation_limits: Option<limits::Config>,
             chaos: Option<Chaos>,
             uplink: Option<UplinkConfig>,
             batching: Option<Batching>,
    @@ -856,106 +864,6 @@ impl Supergraph {
         }
     }
     
    -/// Configuration for operation limits, parser limits, HTTP limits, etc.
    -#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
    -#[serde(deny_unknown_fields, default)]
    -pub(crate) struct Limits {
    -    /// If set, requests with operations deeper than this maximum
    -    /// are rejected with a HTTP 400 Bad Request response and GraphQL error with
    -    /// `"extensions": {"code": "MAX_DEPTH_LIMIT"}`
    -    ///
    -    /// Counts depth of an operation, looking at its selection sets,
    -    /// including fields in fragments and inline fragments. The following
    -    /// example has a depth of 3.
    -    ///
    -    /// ```graphql
    -    /// query getProduct {
    -    ///   book { # 1
    -    ///     ...bookDetails
    -    ///   }
    -    /// }
    -    ///
    -    /// fragment bookDetails on Book {
    -    ///   details { # 2
    -    ///     ... on ProductDetailsBook {
    -    ///       country # 3
    -    ///     }
    -    ///   }
    -    /// }
    -    /// ```
    -    pub(crate) max_depth: Option<u32>,
    -
    -    /// If set, requests with operations higher than this maximum
    -    /// are rejected with a HTTP 400 Bad Request response and GraphQL error with
    -    /// `"extensions": {"code": "MAX_DEPTH_LIMIT"}`
    -    ///
    -    /// Height is based on simple merging of fields using the same name or alias,
    -    /// but only within the same selection set.
    -    /// For example `name` here is only counted once and the query has height 3, not 4:
    -    ///
    -    /// ```graphql
    -    /// query {
    -    ///     name { first }
    -    ///     name { last }
    -    /// }
    -    /// ```
    -    ///
    -    /// This may change in a future version of Apollo Router to do
    -    /// [full field merging across fragments][merging] instead.
    -    ///
    -    /// [merging]: https://spec.graphql.org/October2021/#sec-Field-Selection-Merging]
    -    pub(crate) max_height: Option<u32>,
    -
    -    /// If set, requests with operations with more root fields than this maximum
    -    /// are rejected with a HTTP 400 Bad Request response and GraphQL error with
    -    /// `"extensions": {"code": "MAX_ROOT_FIELDS_LIMIT"}`
    -    ///
    -    /// This limit counts only the top level fields in a selection set,
    -    /// including fragments and inline fragments.
    -    pub(crate) max_root_fields: Option<u32>,
    -
    -    /// If set, requests with operations with more aliases than this maximum
    -    /// are rejected with a HTTP 400 Bad Request response and GraphQL error with
    -    /// `"extensions": {"code": "MAX_ALIASES_LIMIT"}`
    -    pub(crate) max_aliases: Option<u32>,
    -
    -    /// If set to true (which is the default is dev mode),
    -    /// requests that exceed a `max_*` limit are *not* rejected.
    -    /// Instead they are executed normally, and a warning is logged.
    -    pub(crate) warn_only: bool,
    -
    -    /// Limit recursion in the GraphQL parser to protect against stack overflow.
    -    /// default: 500
    -    pub(crate) parser_max_recursion: usize,
    -
    -    /// Limit the number of tokens the GraphQL parser processes before aborting.
    -    pub(crate) parser_max_tokens: usize,
    -
    -    /// Limit the size of incoming HTTP requests read from the network,
    -    /// to protect against running out of memory. Default: 2000000 (2 MB)
    -    pub(crate) http_max_request_bytes: usize,
    -}
    -
    -impl Default for Limits {
    -    fn default() -> Self {
    -        Self {
    -            // These limits are opt-in
    -            max_depth: None,
    -            max_height: None,
    -            max_root_fields: None,
    -            max_aliases: None,
    -            warn_only: false,
    -            http_max_request_bytes: 2_000_000,
    -            parser_max_tokens: 15_000,
    -
    -            // This is `apollo-parser`’s default, which protects against stack overflow
    -            // but is still very high for "reasonable" queries.
    -            // https://github.com/apollographql/apollo-rs/blob/apollo-parser%400.7.3/crates/apollo-parser/src/parser/mod.rs#L93-L104
    -            parser_max_recursion: 500,
    -        }
    -    }
    -}
    -
     /// Router level (APQ) configuration
     #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Default)]
     #[serde(deny_unknown_fields)]
    
  • apollo-router/src/configuration/snapshots/apollo_router__configuration__tests__schema_generation.snap+147 147 modified
    @@ -1222,8 +1222,8 @@ expression: "&schema"
               "nullable": true
             },
             "subgraph": {
    -          "$ref": "#/definitions/Config3",
    -          "description": "#/definitions/Config3",
    +          "$ref": "#/definitions/Config4",
    +          "description": "#/definitions/Config4",
               "nullable": true
             }
           },
    @@ -1321,8 +1321,8 @@ expression: "&schema"
           "description": "Telemetry configuration",
           "properties": {
             "apollo": {
    -          "$ref": "#/definitions/Config8",
    -          "description": "#/definitions/Config8"
    +          "$ref": "#/definitions/Config9",
    +          "description": "#/definitions/Config9"
             },
             "exporters": {
               "$ref": "#/definitions/Exporters",
    @@ -1336,19 +1336,108 @@ expression: "&schema"
           "type": "object"
         },
         "Config": {
    -      "description": "This is a broken plugin for testing purposes only.",
    +      "additionalProperties": false,
    +      "description": "Configuration for operation limits, parser limits, HTTP limits, etc.",
    +      "properties": {
    +        "http_max_request_bytes": {
    +          "default": 2000000,
    +          "description": "Limit the size of incoming HTTP requests read from the network, to protect against running out of memory. Default: 2000000 (2 MB)",
    +          "format": "uint",
    +          "minimum": 0.0,
    +          "type": "integer"
    +        },
    +        "max_aliases": {
    +          "default": null,
    +          "description": "If set, requests with operations with more aliases than this maximum are rejected with a HTTP 400 Bad Request response and GraphQL error with `\"extensions\": {\"code\": \"MAX_ALIASES_LIMIT\"}`",
    +          "format": "uint32",
    +          "minimum": 0.0,
    +          "nullable": true,
    +          "type": "integer"
    +        },
    +        "max_depth": {
    +          "default": null,
    +          "description": "If set, requests with operations deeper than this maximum are rejected with a HTTP 400 Bad Request response and GraphQL error with `\"extensions\": {\"code\": \"MAX_DEPTH_LIMIT\"}`\n\nCounts depth of an operation, looking at its selection sets,˛ including fields in fragments and inline fragments. The following example has a depth of 3.\n\n```graphql query getProduct { book { # 1 ...bookDetails } }\n\nfragment bookDetails on Book { details { # 2 ... on ProductDetailsBook { country # 3 } } } ```",
    +          "format": "uint32",
    +          "minimum": 0.0,
    +          "nullable": true,
    +          "type": "integer"
    +        },
    +        "max_height": {
    +          "default": null,
    +          "description": "If set, requests with operations higher than this maximum are rejected with a HTTP 400 Bad Request response and GraphQL error with `\"extensions\": {\"code\": \"MAX_DEPTH_LIMIT\"}`\n\nHeight is based on simple merging of fields using the same name or alias, but only within the same selection set. For example `name` here is only counted once and the query has height 3, not 4:\n\n```graphql query { name { first } name { last } } ```\n\nThis may change in a future version of Apollo Router to do [full field merging across fragments][merging] instead.\n\n[merging]: https://spec.graphql.org/October2021/#sec-Field-Selection-Merging]",
    +          "format": "uint32",
    +          "minimum": 0.0,
    +          "nullable": true,
    +          "type": "integer"
    +        },
    +        "max_root_fields": {
    +          "default": null,
    +          "description": "If set, requests with operations with more root fields than this maximum are rejected with a HTTP 400 Bad Request response and GraphQL error with `\"extensions\": {\"code\": \"MAX_ROOT_FIELDS_LIMIT\"}`\n\nThis limit counts only the top level fields in a selection set, including fragments and inline fragments.",
    +          "format": "uint32",
    +          "minimum": 0.0,
    +          "nullable": true,
    +          "type": "integer"
    +        },
    +        "parser_max_recursion": {
    +          "default": 500,
    +          "description": "Limit recursion in the GraphQL parser to protect against stack overflow. default: 500",
    +          "format": "uint",
    +          "minimum": 0.0,
    +          "type": "integer"
    +        },
    +        "parser_max_tokens": {
    +          "default": 15000,
    +          "description": "Limit the number of tokens the GraphQL parser processes before aborting.",
    +          "format": "uint",
    +          "minimum": 0.0,
    +          "type": "integer"
    +        },
    +        "warn_only": {
    +          "default": false,
    +          "description": "If set to true (which is the default is dev mode), requests that exceed a `max_*` limit are *not* rejected. Instead they are executed normally, and a warning is logged.",
    +          "type": "boolean"
    +        }
    +      },
    +      "type": "object"
    +    },
    +    "Config10": {
    +      "additionalProperties": false,
           "properties": {
    +        "batch_processor": {
    +          "$ref": "#/definitions/BatchProcessorConfig",
    +          "description": "#/definitions/BatchProcessorConfig"
    +        },
             "enabled": {
    -          "description": "Enable the broken plugin.",
    +          "description": "Enable otlp",
               "type": "boolean"
    +        },
    +        "endpoint": {
    +          "$ref": "#/definitions/UriEndpoint",
    +          "description": "#/definitions/UriEndpoint"
    +        },
    +        "grpc": {
    +          "$ref": "#/definitions/GrpcExporter",
    +          "description": "#/definitions/GrpcExporter"
    +        },
    +        "http": {
    +          "$ref": "#/definitions/HttpExporter",
    +          "description": "#/definitions/HttpExporter"
    +        },
    +        "protocol": {
    +          "$ref": "#/definitions/Protocol",
    +          "description": "#/definitions/Protocol"
    +        },
    +        "temporality": {
    +          "$ref": "#/definitions/Temporality",
    +          "description": "#/definitions/Temporality"
             }
           },
           "required": [
             "enabled"
           ],
           "type": "object"
         },
    -    "Config10": {
    +    "Config11": {
           "additionalProperties": false,
           "description": "Prometheus configuration",
           "properties": {
    @@ -1369,7 +1458,7 @@ expression: "&schema"
           },
           "type": "object"
         },
    -    "Config11": {
    +    "Config12": {
           "anyOf": [
             {
               "additionalProperties": false,
    @@ -1415,7 +1504,7 @@ expression: "&schema"
             }
           ]
         },
    -    "Config12": {
    +    "Config13": {
           "additionalProperties": false,
           "properties": {
             "batch_processor": {
    @@ -1436,7 +1525,7 @@ expression: "&schema"
           ],
           "type": "object"
         },
    -    "Config13": {
    +    "Config14": {
           "additionalProperties": false,
           "properties": {
             "batch_processor": {
    @@ -1493,7 +1582,7 @@ expression: "&schema"
           ],
           "type": "object"
         },
    -    "Config14": {
    +    "Config15": {
           "additionalProperties": false,
           "description": "Configuration for the experimental traffic shaping plugin",
           "properties": {
    @@ -1525,6 +1614,19 @@ expression: "&schema"
           "type": "object"
         },
         "Config2": {
    +      "description": "This is a broken plugin for testing purposes only.",
    +      "properties": {
    +        "enabled": {
    +          "description": "Enable the broken plugin.",
    +          "type": "boolean"
    +        }
    +      },
    +      "required": [
    +        "enabled"
    +      ],
    +      "type": "object"
    +    },
    +    "Config3": {
           "description": "Restricted plugin (for testing purposes only)",
           "properties": {
             "enabled": {
    @@ -1537,7 +1639,7 @@ expression: "&schema"
           ],
           "type": "object"
         },
    -    "Config3": {
    +    "Config4": {
           "additionalProperties": false,
           "description": "Configure subgraph authentication",
           "properties": {
    @@ -1557,7 +1659,7 @@ expression: "&schema"
           },
           "type": "object"
         },
    -    "Config4": {
    +    "Config5": {
           "additionalProperties": false,
           "description": "Configuration for header propagation",
           "properties": {
    @@ -1577,7 +1679,7 @@ expression: "&schema"
           },
           "type": "object"
         },
    -    "Config5": {
    +    "Config6": {
           "additionalProperties": false,
           "description": "Configuration for exposing errors that originate from subgraphs",
           "properties": {
    @@ -1597,7 +1699,7 @@ expression: "&schema"
           },
           "type": "object"
         },
    -    "Config6": {
    +    "Config7": {
           "additionalProperties": false,
           "description": "Configuration for entity caching",
           "properties": {
    @@ -1625,11 +1727,11 @@ expression: "&schema"
           ],
           "type": "object"
         },
    -    "Config7": {
    +    "Config8": {
           "description": "Configuration for the progressive override plugin",
           "type": "object"
         },
    -    "Config8": {
    +    "Config9": {
           "additionalProperties": false,
           "properties": {
             "batch_processor": {
    @@ -1705,43 +1807,6 @@ expression: "&schema"
           },
           "type": "object"
         },
    -    "Config9": {
    -      "additionalProperties": false,
    -      "properties": {
    -        "batch_processor": {
    -          "$ref": "#/definitions/BatchProcessorConfig",
    -          "description": "#/definitions/BatchProcessorConfig"
    -        },
    -        "enabled": {
    -          "description": "Enable otlp",
    -          "type": "boolean"
    -        },
    -        "endpoint": {
    -          "$ref": "#/definitions/UriEndpoint",
    -          "description": "#/definitions/UriEndpoint"
    -        },
    -        "grpc": {
    -          "$ref": "#/definitions/GrpcExporter",
    -          "description": "#/definitions/GrpcExporter"
    -        },
    -        "http": {
    -          "$ref": "#/definitions/HttpExporter",
    -          "description": "#/definitions/HttpExporter"
    -        },
    -        "protocol": {
    -          "$ref": "#/definitions/Protocol",
    -          "description": "#/definitions/Protocol"
    -        },
    -        "temporality": {
    -          "$ref": "#/definitions/Temporality",
    -          "description": "#/definitions/Temporality"
    -        }
    -      },
    -      "required": [
    -        "enabled"
    -      ],
    -      "type": "object"
    -    },
         "ContextForward": {
           "additionalProperties": false,
           "description": "Configuration to forward context values in metric attributes/labels",
    @@ -3601,71 +3666,6 @@ expression: "&schema"
           ],
           "type": "object"
         },
    -    "Limits": {
    -      "additionalProperties": false,
    -      "description": "Configuration for operation limits, parser limits, HTTP limits, etc.",
    -      "properties": {
    -        "http_max_request_bytes": {
    -          "default": 2000000,
    -          "description": "Limit the size of incoming HTTP requests read from the network, to protect against running out of memory. Default: 2000000 (2 MB)",
    -          "format": "uint",
    -          "minimum": 0.0,
    -          "type": "integer"
    -        },
    -        "max_aliases": {
    -          "default": null,
    -          "description": "If set, requests with operations with more aliases than this maximum are rejected with a HTTP 400 Bad Request response and GraphQL error with `\"extensions\": {\"code\": \"MAX_ALIASES_LIMIT\"}`",
    -          "format": "uint32",
    -          "minimum": 0.0,
    -          "nullable": true,
    -          "type": "integer"
    -        },
    -        "max_depth": {
    -          "default": null,
    -          "description": "If set, requests with operations deeper than this maximum are rejected with a HTTP 400 Bad Request response and GraphQL error with `\"extensions\": {\"code\": \"MAX_DEPTH_LIMIT\"}`\n\nCounts depth of an operation, looking at its selection sets, including fields in fragments and inline fragments. The following example has a depth of 3.\n\n```graphql query getProduct { book { # 1 ...bookDetails } }\n\nfragment bookDetails on Book { details { # 2 ... on ProductDetailsBook { country # 3 } } } ```",
    -          "format": "uint32",
    -          "minimum": 0.0,
    -          "nullable": true,
    -          "type": "integer"
    -        },
    -        "max_height": {
    -          "default": null,
    -          "description": "If set, requests with operations higher than this maximum are rejected with a HTTP 400 Bad Request response and GraphQL error with `\"extensions\": {\"code\": \"MAX_DEPTH_LIMIT\"}`\n\nHeight is based on simple merging of fields using the same name or alias, but only within the same selection set. For example `name` here is only counted once and the query has height 3, not 4:\n\n```graphql query { name { first } name { last } } ```\n\nThis may change in a future version of Apollo Router to do [full field merging across fragments][merging] instead.\n\n[merging]: https://spec.graphql.org/October2021/#sec-Field-Selection-Merging]",
    -          "format": "uint32",
    -          "minimum": 0.0,
    -          "nullable": true,
    -          "type": "integer"
    -        },
    -        "max_root_fields": {
    -          "default": null,
    -          "description": "If set, requests with operations with more root fields than this maximum are rejected with a HTTP 400 Bad Request response and GraphQL error with `\"extensions\": {\"code\": \"MAX_ROOT_FIELDS_LIMIT\"}`\n\nThis limit counts only the top level fields in a selection set, including fragments and inline fragments.",
    -          "format": "uint32",
    -          "minimum": 0.0,
    -          "nullable": true,
    -          "type": "integer"
    -        },
    -        "parser_max_recursion": {
    -          "default": 500,
    -          "description": "Limit recursion in the GraphQL parser to protect against stack overflow. default: 500",
    -          "format": "uint",
    -          "minimum": 0.0,
    -          "type": "integer"
    -        },
    -        "parser_max_tokens": {
    -          "default": 15000,
    -          "description": "Limit the number of tokens the GraphQL parser processes before aborting.",
    -          "format": "uint",
    -          "minimum": 0.0,
    -          "type": "integer"
    -        },
    -        "warn_only": {
    -          "default": false,
    -          "description": "If set to true (which is the default is dev mode), requests that exceed a `max_*` limit are *not* rejected. Instead they are executed normally, and a warning is logged.",
    -          "type": "boolean"
    -        }
    -      },
    -      "type": "object"
    -    },
         "ListLength": {
           "oneOf": [
             {
    @@ -3845,12 +3845,12 @@ expression: "&schema"
               "description": "#/definitions/MetricsCommon"
             },
             "otlp": {
    -          "$ref": "#/definitions/Config9",
    -          "description": "#/definitions/Config9"
    -        },
    -        "prometheus": {
               "$ref": "#/definitions/Config10",
               "description": "#/definitions/Config10"
    +        },
    +        "prometheus": {
    +          "$ref": "#/definitions/Config11",
    +          "description": "#/definitions/Config11"
             }
           },
           "type": "object"
    @@ -4119,8 +4119,8 @@ expression: "&schema"
           "additionalProperties": false,
           "properties": {
             "experimental.broken": {
    -          "$ref": "#/definitions/Config",
    -          "description": "#/definitions/Config"
    +          "$ref": "#/definitions/Config2",
    +          "description": "#/definitions/Config2"
             },
             "experimental.expose_query_plan": {
               "$ref": "#/definitions/ExposeQueryPlanConfig",
    @@ -4131,8 +4131,8 @@ expression: "&schema"
               "description": "#/definitions/RecordConfig"
             },
             "experimental.restricted": {
    -          "$ref": "#/definitions/Config2",
    -          "description": "#/definitions/Config2"
    +          "$ref": "#/definitions/Config3",
    +          "description": "#/definitions/Config3"
             },
             "test.always_fails_to_start": {
               "$ref": "#/definitions/Conf",
    @@ -7150,28 +7150,28 @@ expression: "&schema"
               "description": "#/definitions/TracingCommon"
             },
             "datadog": {
    -          "$ref": "#/definitions/Config13",
    -          "description": "#/definitions/Config13"
    +          "$ref": "#/definitions/Config14",
    +          "description": "#/definitions/Config14"
             },
             "experimental_response_trace_id": {
               "$ref": "#/definitions/ExposeTraceId",
               "description": "#/definitions/ExposeTraceId"
             },
             "jaeger": {
    -          "$ref": "#/definitions/Config11",
    -          "description": "#/definitions/Config11"
    +          "$ref": "#/definitions/Config12",
    +          "description": "#/definitions/Config12"
             },
             "otlp": {
    -          "$ref": "#/definitions/Config9",
    -          "description": "#/definitions/Config9"
    +          "$ref": "#/definitions/Config10",
    +          "description": "#/definitions/Config10"
             },
             "propagation": {
               "$ref": "#/definitions/Propagation",
               "description": "#/definitions/Propagation"
             },
             "zipkin": {
    -          "$ref": "#/definitions/Config12",
    -          "description": "#/definitions/Config12"
    +          "$ref": "#/definitions/Config13",
    +          "description": "#/definitions/Config13"
             }
           },
           "type": "object"
    @@ -8259,8 +8259,8 @@ expression: "&schema"
           "description": "#/definitions/ForbidMutationsConfig"
         },
         "headers": {
    -      "$ref": "#/definitions/Config4",
    -      "description": "#/definitions/Config4"
    +      "$ref": "#/definitions/Config5",
    +      "description": "#/definitions/Config5"
         },
         "health_check": {
           "$ref": "#/definitions/HealthCheck",
    @@ -8271,12 +8271,12 @@ expression: "&schema"
           "description": "#/definitions/Homepage"
         },
         "include_subgraph_errors": {
    -      "$ref": "#/definitions/Config5",
    -      "description": "#/definitions/Config5"
    +      "$ref": "#/definitions/Config6",
    +      "description": "#/definitions/Config6"
         },
         "limits": {
    -      "$ref": "#/definitions/Limits",
    -      "description": "#/definitions/Limits"
    +      "$ref": "#/definitions/Config",
    +      "description": "#/definitions/Config"
         },
         "override_subgraph_url": {
           "$ref": "#/definitions/Conf5",
    @@ -8295,16 +8295,16 @@ expression: "&schema"
           "description": "#/definitions/DemandControlConfig"
         },
         "preview_entity_cache": {
    -      "$ref": "#/definitions/Config6",
    -      "description": "#/definitions/Config6"
    +      "$ref": "#/definitions/Config7",
    +      "description": "#/definitions/Config7"
         },
         "preview_file_uploads": {
           "$ref": "#/definitions/FileUploadsConfig",
           "description": "#/definitions/FileUploadsConfig"
         },
         "progressive_override": {
    -      "$ref": "#/definitions/Config7",
    -      "description": "#/definitions/Config7"
    +      "$ref": "#/definitions/Config8",
    +      "description": "#/definitions/Config8"
         },
         "rhai": {
           "$ref": "#/definitions/Conf6",
    @@ -8331,8 +8331,8 @@ expression: "&schema"
           "description": "#/definitions/Tls"
         },
         "traffic_shaping": {
    -      "$ref": "#/definitions/Config14",
    -      "description": "#/definitions/Config14"
    +      "$ref": "#/definitions/Config15",
    +      "description": "#/definitions/Config15"
         }
       },
       "title": "Configuration",
    
  • apollo-router/src/plugins/limits/fixtures/content_length_limit.router.yaml+2 0 added
    @@ -0,0 +1,2 @@
    +limits:
    +  http_max_request_bytes: 10
    \ No newline at end of file
    
  • apollo-router/src/plugins/limits/layer.rs+376 0 added
    @@ -0,0 +1,376 @@
    +use std::future::Future;
    +use std::pin::Pin;
    +use std::sync::atomic::AtomicUsize;
    +use std::sync::Arc;
    +use std::task::Poll;
    +
    +use displaydoc::Display;
    +use futures::FutureExt;
    +use pin_project_lite::pin_project;
    +use tokio::sync::AcquireError;
    +use tokio::sync::OwnedSemaphorePermit;
    +use tower::Layer;
    +use tower_service::Service;
    +
    +#[derive(thiserror::Error, Debug, Display)]
    +pub(super) enum BodyLimitError {
    +    /// Request body payload too large
    +    PayloadTooLarge,
    +}
    +
    +struct BodyLimitControlInner {
    +    limit: AtomicUsize,
    +    current: AtomicUsize,
    +}
    +
    +/// This structure allows the body limit to be updated dynamically.
    +/// It also allows the error message to be updated
    +#[derive(Clone)]
    +pub(crate) struct BodyLimitControl {
    +    inner: Arc<BodyLimitControlInner>,
    +}
    +
    +impl BodyLimitControl {
    +    pub(crate) fn new(limit: usize) -> Self {
    +        Self {
    +            inner: Arc::new(BodyLimitControlInner {
    +                limit: AtomicUsize::new(limit),
    +                current: AtomicUsize::new(0),
    +            }),
    +        }
    +    }
    +
    +    /// To disable the limit check just set this to usize::MAX
    +    pub(crate) fn update_limit(&self, limit: usize) {
    +        self.inner
    +            .limit
    +            .store(limit, std::sync::atomic::Ordering::SeqCst);
    +    }
    +
    +    /// Returns the current limit, this may have been updated dynamically.
    +    /// Usually it is the minimum of the content-length header and the configured limit.
    +    pub(crate) fn limit(&self) -> usize {
    +        self.inner.limit.load(std::sync::atomic::Ordering::SeqCst)
    +    }
    +
    +    /// Returns how much is remaining before the limit is hit
    +    pub(crate) fn remaining(&self) -> usize {
    +        self.inner.limit.load(std::sync::atomic::Ordering::SeqCst)
    +            - self.inner.current.load(std::sync::atomic::Ordering::SeqCst)
    +    }
    +
    +    /// Increment the current counted bytes by an amount
    +    pub(crate) fn increment(&self, amount: usize) -> usize {
    +        self.inner
    +            .current
    +            .fetch_add(amount, std::sync::atomic::Ordering::SeqCst)
    +    }
    +}
    +
    +/// This layer differs from the tower version in that it will always generate an error eagerly rather than
    +/// allowing the downstream service to catch and handle the error.
    +/// This way we can guarantee that the correct error will be returned to the client.
    +///
    +/// The layer that precedes this one is responsible for handling the error and returning the correct response.
    +/// It will ALWAYS be able to downcast the error to the correct type.
    +///
    +pub(crate) struct RequestBodyLimitLayer<Body> {
    +    _phantom: std::marker::PhantomData<Body>,
    +    control: BodyLimitControl,
    +}
    +impl<Body> RequestBodyLimitLayer<Body> {
    +    pub(crate) fn new(control: BodyLimitControl) -> Self {
    +        Self {
    +            _phantom: Default::default(),
    +            control,
    +        }
    +    }
    +}
    +
    +impl<Body, S> Layer<S> for RequestBodyLimitLayer<Body>
    +where
    +    S: Service<http::request::Request<super::limited::Limited<Body>>>,
    +    Body: http_body::Body,
    +{
    +    type Service = RequestBodyLimit<Body, S>;
    +
    +    fn layer(&self, inner: S) -> Self::Service {
    +        RequestBodyLimit::new(inner, self.control.clone())
    +    }
    +}
    +
    +pub(crate) struct RequestBodyLimit<Body, S> {
    +    _phantom: std::marker::PhantomData<Body>,
    +    inner: S,
    +    control: BodyLimitControl,
    +}
    +
    +impl<Body, S> RequestBodyLimit<Body, S>
    +where
    +    S: Service<http::request::Request<super::limited::Limited<Body>>>,
    +    Body: http_body::Body,
    +{
    +    fn new(inner: S, control: BodyLimitControl) -> Self {
    +        Self {
    +            _phantom: Default::default(),
    +            inner,
    +            control,
    +        }
    +    }
    +}
    +
    +impl<ReqBody, RespBody, S> Service<http::Request<ReqBody>> for RequestBodyLimit<ReqBody, S>
    +where
    +    S: Service<
    +        http::Request<super::limited::Limited<ReqBody>>,
    +        Response = http::Response<RespBody>,
    +    >,
    +    ReqBody: http_body::Body,
    +    RespBody: http_body::Body,
    +    S::Error: From<BodyLimitError>,
    +{
    +    type Response = S::Response;
    +    type Error = S::Error;
    +    type Future = ResponseFuture<S::Future>;
    +
    +    fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
    +        self.inner.poll_ready(cx)
    +    }
    +
    +    fn call(&mut self, req: http::Request<ReqBody>) -> Self::Future {
    +        let content_length = req
    +            .headers()
    +            .get(http::header::CONTENT_LENGTH)
    +            .and_then(|value| value.to_str().ok()?.parse::<usize>().ok());
    +
    +        let _body_limit = match content_length {
    +            Some(len) if len > self.control.limit() => return ResponseFuture::Reject,
    +            Some(len) => self.control.limit().min(len),
    +            None => self.control.limit(),
    +        };
    +
    +        // TODO: We can only do this once this layer is moved to the beginning of the router pipeline.
    +        // Otherwise the context length will be checked against the decompressed size of the body.
    +        // self.control.update_limit(_body_limit);
    +
    +        // This mutex allows us to signal the body stream to stop processing if the limit is hit.
    +        let abort = Arc::new(tokio::sync::Semaphore::new(1));
    +
    +        // This will be dropped if the body stream hits the limit signalling an immediate response.
    +        let owned_permit = abort
    +            .clone()
    +            .try_acquire_owned()
    +            .expect("abort lock is new, qed");
    +
    +        let f =
    +            self.inner.call(req.map(|body| {
    +                super::limited::Limited::new(body, self.control.clone(), owned_permit)
    +            }));
    +
    +        ResponseFuture::Continue {
    +            inner: f,
    +            abort: abort.acquire_owned().boxed(),
    +        }
    +    }
    +}
    +
    +pin_project! {
    +    #[project = ResponseFutureProj]
    +    pub (crate) enum ResponseFuture<F> {
    +        Reject,
    +        Continue {
    +            #[pin]
    +            inner: F,
    +
    +            #[pin]
    +            abort: futures::future::BoxFuture<'static, Result<OwnedSemaphorePermit, AcquireError>>,
    +        }
    +    }
    +}
    +
    +impl<Inner, Body, Error> Future for ResponseFuture<Inner>
    +where
    +    Inner: Future<Output = Result<http::response::Response<Body>, Error>>,
    +    Body: http_body::Body,
    +    Error: From<BodyLimitError>,
    +{
    +    type Output = Result<http::response::Response<Body>, Error>;
    +
    +    fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
    +        let project = self.project();
    +        match project {
    +            // Content-length header exceeded, eager reject
    +            ResponseFutureProj::Reject => Poll::Ready(Err(BodyLimitError::PayloadTooLarge.into())),
    +            // Continue processing the request
    +            ResponseFutureProj::Continue { inner, abort, .. } => {
    +                match inner.poll(cx) {
    +                    Poll::Ready(r) => Poll::Ready(r),
    +                    Poll::Pending => {
    +                        // Check to see if the stream limit has been hit
    +                        match abort.poll(cx) {
    +                            Poll::Ready(_) => {
    +                                Poll::Ready(Err(BodyLimitError::PayloadTooLarge.into()))
    +                            }
    +                            Poll::Pending => Poll::Pending,
    +                        }
    +                    }
    +                }
    +            }
    +        }
    +    }
    +}
    +
    +#[cfg(test)]
    +mod test {
    +    use futures::stream::StreamExt;
    +    use http::StatusCode;
    +    use tower::BoxError;
    +    use tower::ServiceBuilder;
    +    use tower_service::Service;
    +
    +    use crate::plugins::limits::layer::BodyLimitControl;
    +    use crate::plugins::limits::layer::RequestBodyLimitLayer;
    +    use crate::services;
    +
    +    #[tokio::test]
    +    async fn test_body_content_length_limit_exceeded() {
    +        let control = BodyLimitControl::new(10);
    +        let mut service = ServiceBuilder::new()
    +            .layer(RequestBodyLimitLayer::new(control.clone()))
    +            .service_fn(|r: http::Request<_>| async move {
    +                services::http::body_stream::BodyStream::new(r.into_body())
    +                    .collect::<Vec<_>>()
    +                    .await;
    +                panic!("should have rejected request");
    +            });
    +        let resp: Result<http::Response<String>, BoxError> = service
    +            .call(http::Request::new("This is a test".to_string()))
    +            .await;
    +        assert!(resp.is_err());
    +    }
    +
    +    #[tokio::test]
    +    async fn test_body_content_length_limit_ok() {
    +        let control = BodyLimitControl::new(10);
    +        let mut service = ServiceBuilder::new()
    +            .layer(RequestBodyLimitLayer::new(control.clone()))
    +            .service_fn(|r: http::Request<_>| async move {
    +                services::http::body_stream::BodyStream::new(r.into_body())
    +                    .collect::<Vec<_>>()
    +                    .await;
    +                Ok(http::Response::builder()
    +                    .status(StatusCode::OK)
    +                    .body("This is a test".to_string())
    +                    .unwrap())
    +            });
    +        let resp: Result<_, BoxError> = service.call(http::Request::new("OK".to_string())).await;
    +
    +        assert!(resp.is_ok());
    +        let resp = resp.unwrap();
    +        assert_eq!(resp.status(), StatusCode::OK);
    +        assert_eq!(resp.into_body(), "This is a test");
    +    }
    +
    +    #[tokio::test]
    +    async fn test_header_content_length_limit_exceeded() {
    +        let control = BodyLimitControl::new(10);
    +        let mut service = ServiceBuilder::new()
    +            .layer(RequestBodyLimitLayer::new(control.clone()))
    +            .service_fn(|r: http::Request<_>| async move {
    +                services::http::body_stream::BodyStream::new(r.into_body())
    +                    .collect::<Vec<_>>()
    +                    .await;
    +                panic!("should have rejected request");
    +            });
    +        let resp: Result<http::Response<String>, BoxError> = service
    +            .call(
    +                http::Request::builder()
    +                    .header("Content-Length", "100")
    +                    .body("This is a test".to_string())
    +                    .unwrap(),
    +            )
    +            .await;
    +        assert!(resp.is_err());
    +    }
    +
    +    #[tokio::test]
    +    async fn test_header_content_length_limit_ok() {
    +        let control = BodyLimitControl::new(10);
    +        let mut service = ServiceBuilder::new()
    +            .layer(RequestBodyLimitLayer::new(control.clone()))
    +            .service_fn(|r: http::Request<_>| async move {
    +                services::http::body_stream::BodyStream::new(r.into_body())
    +                    .collect::<Vec<_>>()
    +                    .await;
    +                Ok(http::Response::builder()
    +                    .status(StatusCode::OK)
    +                    .body("This is a test".to_string())
    +                    .unwrap())
    +            });
    +        let resp: Result<_, BoxError> = service
    +            .call(
    +                http::Request::builder()
    +                    .header("Content-Length", "5")
    +                    .body("OK".to_string())
    +                    .unwrap(),
    +            )
    +            .await;
    +        assert!(resp.is_ok());
    +        let resp = resp.unwrap();
    +        assert_eq!(resp.status(), StatusCode::OK);
    +        assert_eq!(resp.into_body(), "This is a test");
    +    }
    +
    +    #[tokio::test]
    +    async fn test_limits_dynamic_update() {
    +        let control = BodyLimitControl::new(10);
    +        let mut service = ServiceBuilder::new()
    +            .layer(RequestBodyLimitLayer::new(control.clone()))
    +            .service_fn(move |r: http::Request<_>| {
    +                let control = control.clone();
    +                async move {
    +                    services::http::body_stream::BodyStream::new(r.into_body())
    +                        .collect::<Vec<_>>()
    +                        .await;
    +                    control.update_limit(100);
    +                    Ok(http::Response::builder()
    +                        .status(StatusCode::OK)
    +                        .body("This is a test".to_string())
    +                        .unwrap())
    +                }
    +            });
    +        let resp: Result<_, BoxError> = service
    +            .call(http::Request::new("This is a test".to_string()))
    +            .await;
    +        assert!(resp.is_err());
    +    }
    +
    +    #[tokio::test]
    +    async fn test_body_length_exceeds_content_length() {
    +        let control = BodyLimitControl::new(10);
    +        let mut service = ServiceBuilder::new()
    +            .layer(RequestBodyLimitLayer::new(control.clone()))
    +            .service_fn(|r: http::Request<_>| async move {
    +                services::http::body_stream::BodyStream::new(r.into_body())
    +                    .collect::<Vec<_>>()
    +                    .await;
    +                Ok(http::Response::builder()
    +                    .status(StatusCode::OK)
    +                    .body("This is a test".to_string())
    +                    .unwrap())
    +            });
    +        let resp: Result<_, BoxError> = service
    +            .call(
    +                http::Request::builder()
    +                    .header("Content-Length", "5")
    +                    .body("Exceeded".to_string())
    +                    .unwrap(),
    +            )
    +            .await;
    +        assert!(resp.is_ok());
    +        //TODO this needs to to fail once the limit layer is moved before decompression.
    +        let resp = resp.unwrap();
    +        assert_eq!(resp.status(), StatusCode::OK);
    +        assert_eq!(resp.into_body(), "This is a test");
    +    }
    +}
    
  • apollo-router/src/plugins/limits/limited.rs+212 0 added
    @@ -0,0 +1,212 @@
    +use std::pin::Pin;
    +use std::task::Context;
    +use std::task::Poll;
    +
    +use bytes::Buf;
    +use http::HeaderMap;
    +use http_body::SizeHint;
    +use pin_project_lite::pin_project;
    +use tokio::sync::OwnedSemaphorePermit;
    +
    +use crate::plugins::limits::layer::BodyLimitControl;
    +
    +pin_project! {
    +    /// An implementation of http_body::Body that limits the number of bytes read from the inner body.
    +    /// Unlike the `RequestBodyLimit` middleware, this will always return Pending if the inner body has exceeded the limit.
    +    /// Upon reaching the limit the guard will be dropped allowing the RequestBodyLimitLayer to return.
    +    pub(crate) struct Limited<Body> {
    +        #[pin]
    +        inner: Body,
    +        #[pin]
    +        permit: ForgetfulPermit,
    +        control: BodyLimitControl,
    +    }
    +}
    +
    +impl<Body> Limited<Body>
    +where
    +    Body: http_body::Body,
    +{
    +    pub(super) fn new(
    +        inner: Body,
    +        control: BodyLimitControl,
    +        permit: OwnedSemaphorePermit,
    +    ) -> Self {
    +        Self {
    +            inner,
    +            control,
    +            permit: permit.into(),
    +        }
    +    }
    +}
    +
    +struct ForgetfulPermit(Option<OwnedSemaphorePermit>);
    +
    +impl ForgetfulPermit {
    +    fn release(&mut self) {
    +        self.0.take();
    +    }
    +}
    +
    +impl Drop for ForgetfulPermit {
    +    fn drop(&mut self) {
    +        // If the limit was not hit we must not release the guard otherwise a response of 413 will be returned.
    +        // This may be because the inner body was not fully read.
    +        // Instead we must forget the permit.
    +        if let Some(permit) = self.0.take() {
    +            permit.forget();
    +        }
    +    }
    +}
    +
    +impl From<OwnedSemaphorePermit> for ForgetfulPermit {
    +    fn from(permit: OwnedSemaphorePermit) -> Self {
    +        Self(Some(permit))
    +    }
    +}
    +
    +impl<Body> http_body::Body for Limited<Body>
    +where
    +    Body: http_body::Body,
    +{
    +    type Data = Body::Data;
    +    type Error = Body::Error;
    +
    +    fn poll_data(
    +        self: Pin<&mut Self>,
    +        cx: &mut Context<'_>,
    +    ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
    +        let mut this = self.project();
    +        let res = match this.inner.poll_data(cx) {
    +            Poll::Pending => return Poll::Pending,
    +            Poll::Ready(None) => None,
    +            Poll::Ready(Some(Ok(data))) => {
    +                if data.remaining() > this.control.remaining() {
    +                    // This is the difference between http_body::Limited and our implementation.
    +                    // Dropping this mutex allows the containing layer to immediately return an error response
    +                    // This prevents the need to deal with wrapped errors.
    +                    this.control.update_limit(0);
    +                    this.permit.release();
    +                    return Poll::Pending;
    +                } else {
    +                    this.control.increment(data.remaining());
    +                    Some(Ok(data))
    +                }
    +            }
    +            Poll::Ready(Some(Err(err))) => Some(Err(err)),
    +        };
    +
    +        Poll::Ready(res)
    +    }
    +
    +    fn poll_trailers(
    +        self: Pin<&mut Self>,
    +        cx: &mut Context<'_>,
    +    ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
    +        let this = self.project();
    +        let res = match this.inner.poll_trailers(cx) {
    +            Poll::Pending => return Poll::Pending,
    +            Poll::Ready(Ok(data)) => Ok(data),
    +            Poll::Ready(Err(err)) => Err(err),
    +        };
    +
    +        Poll::Ready(res)
    +    }
    +    fn is_end_stream(&self) -> bool {
    +        self.inner.is_end_stream()
    +    }
    +    fn size_hint(&self) -> SizeHint {
    +        match u64::try_from(self.control.remaining()) {
    +            Ok(n) => {
    +                let mut hint = self.inner.size_hint();
    +                if hint.lower() >= n {
    +                    hint.set_exact(n)
    +                } else if let Some(max) = hint.upper() {
    +                    hint.set_upper(n.min(max))
    +                } else {
    +                    hint.set_upper(n)
    +                }
    +                hint
    +            }
    +            Err(_) => self.inner.size_hint(),
    +        }
    +    }
    +}
    +
    +#[cfg(test)]
    +mod test {
    +    use std::pin::Pin;
    +    use std::sync::Arc;
    +
    +    use http_body::Body;
    +    use tower::BoxError;
    +
    +    use crate::plugins::limits::layer::BodyLimitControl;
    +
    +    #[test]
    +    fn test_completes() {
    +        let control = BodyLimitControl::new(100);
    +        let semaphore = Arc::new(tokio::sync::Semaphore::new(1));
    +        let lock = semaphore.clone().try_acquire_owned().unwrap();
    +        let mut limited = super::Limited::new("test".to_string(), control, lock);
    +
    +        assert_eq!(
    +            Pin::new(&mut limited).poll_data(&mut std::task::Context::from_waker(
    +                &futures::task::noop_waker()
    +            )),
    +            std::task::Poll::Ready(Some(Ok("test".to_string().into_bytes().into())))
    +        );
    +        assert!(semaphore.try_acquire().is_err());
    +
    +        // We need to assert that if the stream is dropped the semaphore isn't released.
    +        // It's only explicitly hitting the limit that releases the semaphore.
    +        drop(limited);
    +        assert!(semaphore.try_acquire().is_err());
    +    }
    +
    +    #[test]
    +    fn test_limit_hit() {
    +        let control = BodyLimitControl::new(1);
    +        let semaphore = Arc::new(tokio::sync::Semaphore::new(1));
    +        let lock = semaphore.clone().try_acquire_owned().unwrap();
    +        let mut limited = super::Limited::new("test".to_string(), control, lock);
    +
    +        assert_eq!(
    +            Pin::new(&mut limited).poll_data(&mut std::task::Context::from_waker(
    +                &futures::task::noop_waker()
    +            )),
    +            std::task::Poll::Pending
    +        );
    +        assert!(semaphore.try_acquire().is_ok())
    +    }
    +
    +    #[test]
    +    fn test_limit_hit_after_multiple() {
    +        let control = BodyLimitControl::new(5);
    +        let semaphore = Arc::new(tokio::sync::Semaphore::new(1));
    +        let lock = semaphore.clone().try_acquire_owned().unwrap();
    +
    +        let mut limited = super::Limited::new(
    +            hyper::Body::wrap_stream(futures::stream::iter(vec![
    +                Ok::<&str, BoxError>("hello"),
    +                Ok("world"),
    +            ])),
    +            control,
    +            lock,
    +        );
    +        assert!(matches!(
    +            Pin::new(&mut limited).poll_data(&mut std::task::Context::from_waker(
    +                &futures::task::noop_waker()
    +            )),
    +            std::task::Poll::Ready(Some(Ok(_)))
    +        ));
    +        assert!(semaphore.try_acquire().is_err());
    +        assert!(matches!(
    +            Pin::new(&mut limited).poll_data(&mut std::task::Context::from_waker(
    +                &futures::task::noop_waker()
    +            )),
    +            std::task::Poll::Pending
    +        ));
    +        assert!(semaphore.try_acquire().is_ok());
    +    }
    +}
    
  • apollo-router/src/plugins/limits/mod.rs+434 0 added
    @@ -0,0 +1,434 @@
    +mod layer;
    +mod limited;
    +
    +use std::error::Error;
    +
    +use async_trait::async_trait;
    +use http::StatusCode;
    +use schemars::JsonSchema;
    +use serde::Deserialize;
    +use serde::Serialize;
    +use tower::BoxError;
    +use tower::ServiceBuilder;
    +use tower::ServiceExt;
    +
    +use crate::graphql;
    +use crate::layers::ServiceBuilderExt;
    +use crate::plugin::Plugin;
    +use crate::plugin::PluginInit;
    +use crate::plugins::limits::layer::BodyLimitControl;
    +use crate::plugins::limits::layer::BodyLimitError;
    +use crate::plugins::limits::layer::RequestBodyLimitLayer;
    +use crate::services::router;
    +use crate::services::router::BoxService;
    +use crate::Context;
    +
    +/// Configuration for operation limits, parser limits, HTTP limits, etc.
    +#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
    +#[serde(deny_unknown_fields, default)]
    +pub(crate) struct Config {
    +    /// If set, requests with operations deeper than this maximum
    +    /// are rejected with a HTTP 400 Bad Request response and GraphQL error with
    +    /// `"extensions": {"code": "MAX_DEPTH_LIMIT"}`
    +    ///
    +    /// Counts depth of an operation, looking at its selection sets,˛
    +    /// including fields in fragments and inline fragments. The following
    +    /// example has a depth of 3.
    +    ///
    +    /// ```graphql
    +    /// query getProduct {
    +    ///   book { # 1
    +    ///     ...bookDetails
    +    ///   }
    +    /// }
    +    ///
    +    /// fragment bookDetails on Book {
    +    ///   details { # 2
    +    ///     ... on ProductDetailsBook {
    +    ///       country # 3
    +    ///     }
    +    ///   }
    +    /// }
    +    /// ```
    +    pub(crate) max_depth: Option<u32>,
    +
    +    /// If set, requests with operations higher than this maximum
    +    /// are rejected with a HTTP 400 Bad Request response and GraphQL error with
    +    /// `"extensions": {"code": "MAX_DEPTH_LIMIT"}`
    +    ///
    +    /// Height is based on simple merging of fields using the same name or alias,
    +    /// but only within the same selection set.
    +    /// For example `name` here is only counted once and the query has height 3, not 4:
    +    ///
    +    /// ```graphql
    +    /// query {
    +    ///     name { first }
    +    ///     name { last }
    +    /// }
    +    /// ```
    +    ///
    +    /// This may change in a future version of Apollo Router to do
    +    /// [full field merging across fragments][merging] instead.
    +    ///
    +    /// [merging]: https://spec.graphql.org/October2021/#sec-Field-Selection-Merging]
    +    pub(crate) max_height: Option<u32>,
    +
    +    /// If set, requests with operations with more root fields than this maximum
    +    /// are rejected with a HTTP 400 Bad Request response and GraphQL error with
    +    /// `"extensions": {"code": "MAX_ROOT_FIELDS_LIMIT"}`
    +    ///
    +    /// This limit counts only the top level fields in a selection set,
    +    /// including fragments and inline fragments.
    +    pub(crate) max_root_fields: Option<u32>,
    +
    +    /// If set, requests with operations with more aliases than this maximum
    +    /// are rejected with a HTTP 400 Bad Request response and GraphQL error with
    +    /// `"extensions": {"code": "MAX_ALIASES_LIMIT"}`
    +    pub(crate) max_aliases: Option<u32>,
    +
    +    /// If set to true (which is the default is dev mode),
    +    /// requests that exceed a `max_*` limit are *not* rejected.
    +    /// Instead they are executed normally, and a warning is logged.
    +    pub(crate) warn_only: bool,
    +
    +    /// Limit recursion in the GraphQL parser to protect against stack overflow.
    +    /// default: 500
    +    pub(crate) parser_max_recursion: usize,
    +
    +    /// Limit the number of tokens the GraphQL parser processes before aborting.
    +    pub(crate) parser_max_tokens: usize,
    +
    +    /// Limit the size of incoming HTTP requests read from the network,
    +    /// to protect against running out of memory. Default: 2000000 (2 MB)
    +    pub(crate) http_max_request_bytes: usize,
    +}
    +
    +impl Default for Config {
    +    fn default() -> Self {
    +        Self {
    +            // These limits are opt-in
    +            max_depth: None,
    +            max_height: None,
    +            max_root_fields: None,
    +            max_aliases: None,
    +            warn_only: false,
    +            http_max_request_bytes: 2_000_000,
    +            parser_max_tokens: 15_000,
    +
    +            // This is `apollo-parser`’s default, which protects against stack overflow
    +            // but is still very high for "reasonable" queries.
    +            // https://github.com/apollographql/apollo-rs/blob/apollo-parser%400.7.3/crates/apollo-parser/src/parser/mod.rs#L93-L104
    +            parser_max_recursion: 500,
    +        }
    +    }
    +}
    +
    +struct LimitsPlugin {
    +    config: Config,
    +}
    +
    +#[async_trait]
    +impl Plugin for LimitsPlugin {
    +    type Config = Config;
    +
    +    async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError>
    +    where
    +        Self: Sized,
    +    {
    +        Ok(LimitsPlugin {
    +            config: init.config,
    +        })
    +    }
    +
    +    fn router_service(&self, service: BoxService) -> BoxService {
    +        let control = BodyLimitControl::new(self.config.http_max_request_bytes);
    +        let control_for_context = control.clone();
    +        ServiceBuilder::new()
    +            .map_request(move |r: router::Request| {
    +                let control_for_context = control_for_context.clone();
    +                r.context
    +                    .extensions()
    +                    .with_lock(|mut lock| lock.insert(control_for_context));
    +                r
    +            })
    +            .map_future_with_request_data(
    +                |r: &router::Request| r.context.clone(),
    +                |ctx, f| async { Self::map_error_to_graphql(f.await, ctx) },
    +            )
    +            // Here we need to convert to and from the underlying http request types so that we can use existing middleware.
    +            .map_request(Into::into)
    +            .map_response(Into::into)
    +            .layer(RequestBodyLimitLayer::new(control))
    +            .map_request(Into::into)
    +            .map_response(Into::into)
    +            .service(service)
    +            .boxed()
    +    }
    +}
    +
    +impl LimitsPlugin {
    +    fn map_error_to_graphql(
    +        resp: Result<router::Response, BoxError>,
    +        ctx: Context,
    +    ) -> Result<router::Response, BoxError> {
    +        // There are two ways we can get a payload too large error:
    +        // 1. The request body is too large and detected via content length header
    +        // 2. The request body is and it failed at some other point in the pipeline.
    +        // We expect that other pipeline errors will have wrapped the source error rather than throwing it away.
    +        match resp {
    +            Ok(r) => {
    +                if r.response.status() == StatusCode::PAYLOAD_TOO_LARGE {
    +                    Self::increment_legacy_metric();
    +                    Ok(BodyLimitError::PayloadTooLarge.into_response(ctx))
    +                } else {
    +                    Ok(r)
    +                }
    +            }
    +            Err(e) => {
    +                // Getting the root cause is a bit fiddly
    +                let mut root_cause: &dyn Error = e.as_ref();
    +                while let Some(cause) = root_cause.source() {
    +                    root_cause = cause;
    +                }
    +
    +                match root_cause.downcast_ref::<BodyLimitError>() {
    +                    None => Err(e),
    +                    Some(_) => {
    +                        Self::increment_legacy_metric();
    +                        Ok(BodyLimitError::PayloadTooLarge.into_response(ctx))
    +                    }
    +                }
    +            }
    +        }
    +    }
    +
    +    fn increment_legacy_metric() {
    +        // Remove this eventually
    +        // This is already handled by the telemetry plugin via the http.server.request metric.
    +        u64_counter!(
    +            "apollo_router_http_requests_total",
    +            "Total number of HTTP requests made.",
    +            1,
    +            status = StatusCode::PAYLOAD_TOO_LARGE.as_u16() as i64,
    +            error = BodyLimitError::PayloadTooLarge.to_string()
    +        );
    +    }
    +}
    +
    +impl BodyLimitError {
    +    fn into_response(self, ctx: Context) -> router::Response {
    +        match self {
    +            BodyLimitError::PayloadTooLarge => router::Response::error_builder()
    +                .error(
    +                    graphql::Error::builder()
    +                        .message(self.to_string())
    +                        .extension_code("INVALID_GRAPHQL_REQUEST")
    +                        .extension("details", self.to_string())
    +                        .build(),
    +                )
    +                .status_code(StatusCode::PAYLOAD_TOO_LARGE)
    +                .context(ctx)
    +                .build()
    +                .unwrap(),
    +        }
    +    }
    +}
    +
    +register_plugin!("apollo", "limits", LimitsPlugin);
    +
    +#[cfg(test)]
    +mod test {
    +    use http::StatusCode;
    +    use tower::BoxError;
    +
    +    use crate::plugins::limits::layer::BodyLimitControl;
    +    use crate::plugins::limits::LimitsPlugin;
    +    use crate::plugins::test::PluginTestHarness;
    +    use crate::services::router;
    +    use crate::services::router::body::get_body_bytes;
    +
    +    #[tokio::test]
    +    async fn test_body_content_length_limit_exceeded() {
    +        let plugin = plugin().await;
    +        let resp = plugin
    +            .call_router(
    +                router::Request::fake_builder()
    +                    .body("This is a test")
    +                    .build()
    +                    .unwrap(),
    +                |r| async {
    +                    let body = r.router_request.into_body();
    +                    let _ = get_body_bytes(body).await?;
    +                    panic!("should have failed to read stream")
    +                },
    +            )
    +            .await;
    +        assert!(resp.is_ok());
    +        let resp = resp.unwrap();
    +        assert_eq!(resp.response.status(), StatusCode::PAYLOAD_TOO_LARGE);
    +        assert_eq!(
    +            String::from_utf8(
    +                get_body_bytes(resp.response.into_body())
    +                    .await
    +                    .unwrap()
    +                    .to_vec()
    +            )
    +            .unwrap(),
    +            "{\"errors\":[{\"message\":\"Request body payload too large\",\"extensions\":{\"details\":\"Request body payload too large\",\"code\":\"INVALID_GRAPHQL_REQUEST\"}}]}"
    +        );
    +    }
    +
    +    #[tokio::test]
    +    async fn test_body_content_length_limit_ok() {
    +        let plugin = plugin().await;
    +        let resp = plugin
    +            .call_router(
    +                router::Request::fake_builder().body("").build().unwrap(),
    +                |r| async {
    +                    let body = r.router_request.into_body();
    +                    let body = get_body_bytes(body).await;
    +                    assert!(body.is_ok());
    +                    Ok(router::Response::fake_builder().build().unwrap())
    +                },
    +            )
    +            .await;
    +
    +        assert!(resp.is_ok());
    +        let resp = resp.unwrap();
    +        assert_eq!(resp.response.status(), StatusCode::OK);
    +        assert_eq!(
    +            String::from_utf8(
    +                get_body_bytes(resp.response.into_body())
    +                    .await
    +                    .unwrap()
    +                    .to_vec()
    +            )
    +            .unwrap(),
    +            "{}"
    +        );
    +    }
    +
    +    #[tokio::test]
    +    async fn test_header_content_length_limit_exceeded() {
    +        let plugin = plugin().await;
    +        let resp = plugin
    +            .call_router(
    +                router::Request::fake_builder()
    +                    .header("Content-Length", "100")
    +                    .body("")
    +                    .build()
    +                    .unwrap(),
    +                |_| async { panic!("should have rejected request") },
    +            )
    +            .await;
    +        assert!(resp.is_ok());
    +        let resp = resp.unwrap();
    +        assert_eq!(resp.response.status(), StatusCode::PAYLOAD_TOO_LARGE);
    +        assert_eq!(
    +            String::from_utf8(
    +                get_body_bytes(resp.response.into_body())
    +                    .await
    +                    .unwrap()
    +                    .to_vec()
    +            )
    +            .unwrap(),
    +            "{\"errors\":[{\"message\":\"Request body payload too large\",\"extensions\":{\"details\":\"Request body payload too large\",\"code\":\"INVALID_GRAPHQL_REQUEST\"}}]}"
    +        );
    +    }
    +
    +    #[tokio::test]
    +    async fn test_header_content_length_limit_ok() {
    +        let plugin = plugin().await;
    +        let resp = plugin
    +            .call_router(
    +                router::Request::fake_builder()
    +                    .header("Content-Length", "5")
    +                    .body("")
    +                    .build()
    +                    .unwrap(),
    +                |_| async { Ok(router::Response::fake_builder().build().unwrap()) },
    +            )
    +            .await;
    +        assert!(resp.is_ok());
    +        let resp = resp.unwrap();
    +        assert_eq!(resp.response.status(), StatusCode::OK);
    +        assert_eq!(
    +            String::from_utf8(
    +                get_body_bytes(resp.response.into_body())
    +                    .await
    +                    .unwrap()
    +                    .to_vec()
    +            )
    +            .unwrap(),
    +            "{}"
    +        );
    +    }
    +
    +    #[tokio::test]
    +    async fn test_non_limit_error_passthrough() {
    +        // We should not be translating errors that are not limit errors into graphql errors
    +        let plugin = plugin().await;
    +        let resp = plugin
    +            .call_router(
    +                router::Request::fake_builder().body("").build().unwrap(),
    +                |_| async { Err(BoxError::from("error")) },
    +            )
    +            .await;
    +        assert!(resp.is_err());
    +    }
    +
    +    #[tokio::test]
    +    async fn test_limits_dynamic_update() {
    +        let plugin = plugin().await;
    +        let resp = plugin
    +            .call_router(
    +                router::Request::fake_builder()
    +                    .body("This is a test")
    +                    .build()
    +                    .unwrap(),
    +                |r| async move {
    +                    // Before we go for the body, we'll update the limit
    +                    r.context.extensions().with_lock(|lock| {
    +                        let control: &BodyLimitControl =
    +                            lock.get().expect("mut have body limit control");
    +                        assert_eq!(control.remaining(), 10);
    +                        assert_eq!(control.limit(), 10);
    +                        control.update_limit(100);
    +                    });
    +                    let body = r.router_request.into_body();
    +                    let _ = get_body_bytes(body).await?;
    +
    +                    // Now let's check progress
    +                    r.context.extensions().with_lock(|lock| {
    +                        let control: &BodyLimitControl =
    +                            lock.get().expect("mut have body limit control");
    +                        assert_eq!(control.remaining(), 86);
    +                    });
    +                    Ok(router::Response::fake_builder().build().unwrap())
    +                },
    +            )
    +            .await;
    +        assert!(resp.is_ok());
    +        let resp = resp.unwrap();
    +        assert_eq!(resp.response.status(), StatusCode::OK);
    +        assert_eq!(
    +            String::from_utf8(
    +                get_body_bytes(resp.response.into_body())
    +                    .await
    +                    .unwrap()
    +                    .to_vec()
    +            )
    +            .unwrap(),
    +            "{}"
    +        );
    +    }
    +
    +    async fn plugin() -> PluginTestHarness<LimitsPlugin> {
    +        let plugin: PluginTestHarness<LimitsPlugin> = PluginTestHarness::new(
    +            Some(include_str!("fixtures/content_length_limit.router.yaml")),
    +            None,
    +        )
    +        .await;
    +        plugin
    +    }
    +}
    
  • apollo-router/src/plugins/mod.rs+1 0 modified
    @@ -31,6 +31,7 @@ pub(crate) mod file_uploads;
     mod forbid_mutations;
     mod headers;
     mod include_subgraph_errors;
    +pub(crate) mod limits;
     pub(crate) mod override_url;
     pub(crate) mod progressive_override;
     mod record_replay;
    
  • apollo-router/src/plugins/telemetry/config_new/events.rs+9 9 modified
    @@ -756,14 +756,14 @@ mod tests {
                             .header("x-log-request", HeaderValue::from_static("log"))
                             .build()
                             .unwrap(),
    -                    |_r| {
    -                        router::Response::fake_builder()
    +                    |_r|async  {
    +                        Ok(router::Response::fake_builder()
                                 .header("custom-header", "val1")
                                 .header(CONTENT_LENGTH, "25")
                                 .header("x-log-request", HeaderValue::from_static("log"))
                                 .data(serde_json_bytes::json!({"data": "res"}))
                                 .build()
    -                            .expect("expecting valid response")
    +                            .expect("expecting valid response"))
                         },
                     )
                     .await
    @@ -790,17 +790,17 @@ mod tests {
                             .header("custom-header", "val1")
                             .build()
                             .unwrap(),
    -                    |_r| {
    +                    |_r| async {
                             let context_with_error = Context::new();
                             let _ = context_with_error
                                 .insert(CONTAINS_GRAPHQL_ERROR, true)
                                 .unwrap();
    -                        router::Response::fake_builder()
    +                        Ok(router::Response::fake_builder()
                                 .header("custom-header", "val1")
                                 .context(context_with_error)
                                 .data(serde_json_bytes::json!({"errors": [{"message": "res"}]}))
                                 .build()
    -                            .expect("expecting valid response")
    +                            .expect("expecting valid response"))
                         },
                     )
                     .await
    @@ -827,14 +827,14 @@ mod tests {
                             .header("custom-header", "val1")
                             .build()
                             .unwrap(),
    -                    |_r| {
    -                        router::Response::fake_builder()
    +                    |_r| async {
    +                        Ok(router::Response::fake_builder()
                                 .header("custom-header", "val1")
                                 .header(CONTENT_LENGTH, "25")
                                 .header("x-log-response", HeaderValue::from_static("log"))
                                 .data(serde_json_bytes::json!({"data": "res"}))
                                 .build()
    -                            .expect("expecting valid response")
    +                            .expect("expecting valid response"))
                         },
                     )
                     .await
    
  • apollo-router/src/plugins/telemetry/logging/mod.rs+3 3 modified
    @@ -22,13 +22,13 @@ mod test {
                             .body("query { foo }")
                             .build()
                             .expect("expecting valid request"),
    -                    |_r| {
    +                    |_r| async {
                             tracing::info!("response");
    -                        router::Response::fake_builder()
    +                        Ok(router::Response::fake_builder()
                                 .header("custom-header", "val1")
                                 .data(serde_json::json!({"data": "res"}))
                                 .build()
    -                            .expect("expecting valid response")
    +                            .expect("expecting valid response"))
                         },
                     )
                     .await
    
  • apollo-router/src/plugins/test.rs+8 4 modified
    @@ -1,4 +1,5 @@
     use std::any::TypeId;
    +use std::future::Future;
     use std::ops::Deref;
     use std::str::FromStr;
     use std::sync::Arc;
    @@ -124,14 +125,17 @@ impl<T: Plugin> PluginTestHarness<T> {
         }
     
         #[allow(dead_code)]
    -    pub(crate) async fn call_router(
    +    pub(crate) async fn call_router<F>(
             &self,
             request: router::Request,
    -        response_fn: fn(router::Request) -> router::Response,
    -    ) -> Result<router::Response, BoxError> {
    +        response_fn: fn(router::Request) -> F,
    +    ) -> Result<router::Response, BoxError>
    +    where
    +        F: Future<Output = Result<router::Response, BoxError>> + Send + 'static,
    +    {
             let service: router::BoxService = router::BoxService::new(
                 ServiceBuilder::new()
    -                .service_fn(move |req: router::Request| async move { Ok((response_fn)(req)) }),
    +                .service_fn(move |req: router::Request| async move { (response_fn)(req).await }),
             );
     
             self.plugin.router_service(service).call(request).await
    
  • apollo-router/src/router_factory.rs+1 0 modified
    @@ -682,6 +682,7 @@ pub(crate) async fn create_plugins(
                 }
             }
         }
    +    add_mandatory_apollo_plugin!("limits");
         add_mandatory_apollo_plugin!("traffic_shaping");
         add_optional_apollo_plugin!("forbid_mutations");
         add_optional_apollo_plugin!("subscription");
    
  • apollo-router/src/services/http/body_stream.rs+34 0 added
    @@ -0,0 +1,34 @@
    +use std::task::Poll;
    +
    +use futures::Stream;
    +use pin_project_lite::pin_project;
    +
    +pin_project! {
    +    /// Allows conversion between an http_body::Body and a futures stream.
    +    pub(crate) struct BodyStream<B: http_body::Body> {
    +        #[pin]
    +        inner: B
    +    }
    +}
    +
    +impl<B: hyper::body::HttpBody> BodyStream<B> {
    +    /// Create a new `BodyStream`.
    +    pub(crate) fn new(body: B) -> Self {
    +        Self { inner: body }
    +    }
    +}
    +
    +impl<B, D, E> Stream for BodyStream<B>
    +where
    +    B: http_body::Body<Data = D, Error = E>,
    +    B::Error: Into<E>,
    +{
    +    type Item = Result<D, E>;
    +
    +    fn poll_next(
    +        self: std::pin::Pin<&mut Self>,
    +        cx: &mut std::task::Context<'_>,
    +    ) -> Poll<Option<Self::Item>> {
    +        self.project().inner.poll_data(cx)
    +    }
    +}
    
  • apollo-router/src/services/http.rs+1 0 modified
    @@ -9,6 +9,7 @@ use super::router::body::RouterBody;
     use super::Plugins;
     use crate::Context;
     
    +pub(crate) mod body_stream;
     pub(crate) mod service;
     #[cfg(test)]
     mod tests;
    
  • apollo-router/src/services/router.rs+129 18 modified
    @@ -1,5 +1,8 @@
     #![allow(missing_docs)] // FIXME
     
    +use std::any::Any;
    +use std::mem;
    +
     use bytes::Bytes;
     use futures::future::Either;
     use futures::Stream;
    @@ -24,6 +27,7 @@ use super::supergraph;
     use crate::graphql;
     use crate::http_ext::header_map;
     use crate::json_ext::Path;
    +use crate::services;
     use crate::services::TryIntoHeaderName;
     use crate::services::TryIntoHeaderValue;
     use crate::Context;
    @@ -53,15 +57,6 @@ pub struct Request {
         pub context: Context,
     }
     
    -impl From<http::Request<Body>> for Request {
    -    fn from(router_request: http::Request<Body>) -> Self {
    -        Self {
    -            router_request,
    -            context: Context::new(),
    -        }
    -    }
    -}
    -
     impl From<(http::Request<Body>, Context)> for Request {
         fn from((router_request, context): (http::Request<Body>, Context)) -> Self {
             Self {
    @@ -187,15 +182,6 @@ pub struct Response {
         pub context: Context,
     }
     
    -impl From<http::Response<Body>> for Response {
    -    fn from(response: http::Response<Body>) -> Self {
    -        Self {
    -            response,
    -            context: Context::new(),
    -        }
    -    }
    -}
    -
     #[buildstructor::buildstructor]
     impl Response {
         pub async fn next_response(&mut self) -> Option<Result<Bytes, Error>> {
    @@ -398,3 +384,128 @@ pub(crate) struct ClientRequestAccepts {
         pub(crate) json: bool,
         pub(crate) wildcard: bool,
     }
    +
    +impl<T> From<http::Response<T>> for Response
    +where
    +    T: http_body::Body<Data = Bytes> + Send + 'static,
    +    <T as http_body::Body>::Error: Into<BoxError>,
    +{
    +    fn from(response: http::Response<T>) -> Self {
    +        let context: Context = response.extensions().get().cloned().unwrap_or_default();
    +
    +        Self {
    +            response: response.map(convert_to_body),
    +            context,
    +        }
    +    }
    +}
    +
    +impl From<Response> for http::Response<Body> {
    +    fn from(mut response: Response) -> Self {
    +        response.response.extensions_mut().insert(response.context);
    +        response.response
    +    }
    +}
    +
    +impl<T> From<http::Request<T>> for Request
    +where
    +    T: http_body::Body<Data = Bytes> + Send + 'static,
    +
    +    <T as http_body::Body>::Error: Into<BoxError>,
    +{
    +    fn from(request: http::Request<T>) -> Self {
    +        let context: Context = request.extensions().get().cloned().unwrap_or_default();
    +
    +        Self {
    +            router_request: request.map(convert_to_body),
    +            context,
    +        }
    +    }
    +}
    +
    +impl From<Request> for http::Request<Body> {
    +    fn from(mut request: Request) -> Self {
    +        request
    +            .router_request
    +            .extensions_mut()
    +            .insert(request.context);
    +        request.router_request
    +    }
    +}
    +
    +/// This function is used to convert a `http_body::Body` into a `Body`.
    +/// It does a downcast check to see if the body is already a `Body` and if it is then it just returns it.
    +/// There is zero overhead if the body is already a `Body`.
    +/// Note that ALL graphql responses are already a stream as they may be part of a deferred or stream response,
    +/// therefore if a body has to be wrapped the cost is minimal.
    +fn convert_to_body<T>(mut b: T) -> Body
    +where
    +    T: http_body::Body<Data = Bytes> + Send + 'static,
    +
    +    <T as http_body::Body>::Error: Into<BoxError>,
    +{
    +    let val_any = &mut b as &mut dyn Any;
    +    match val_any.downcast_mut::<Body>() {
    +        Some(body) => mem::take(body),
    +        None => Body::wrap_stream(services::http::body_stream::BodyStream::new(
    +            b.map_err(Into::into),
    +        )),
    +    }
    +}
    +
    +#[cfg(test)]
    +mod test {
    +    use std::pin::Pin;
    +    use std::task::Context;
    +    use std::task::Poll;
    +
    +    use http::HeaderMap;
    +    use tower::BoxError;
    +
    +    use crate::services::router::body::get_body_bytes;
    +    use crate::services::router::convert_to_body;
    +
    +    struct MockBody {
    +        data: Option<&'static str>,
    +    }
    +    impl http_body::Body for MockBody {
    +        type Data = bytes::Bytes;
    +        type Error = BoxError;
    +
    +        fn poll_data(
    +            mut self: Pin<&mut Self>,
    +            _cx: &mut Context<'_>,
    +        ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
    +            if let Some(data) = self.data.take() {
    +                Poll::Ready(Some(Ok(bytes::Bytes::from(data))))
    +            } else {
    +                Poll::Ready(None)
    +            }
    +        }
    +
    +        fn poll_trailers(
    +            self: Pin<&mut Self>,
    +            _cx: &mut Context<'_>,
    +        ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
    +            Poll::Ready(Ok(None))
    +        }
    +    }
    +
    +    #[tokio::test]
    +    async fn test_convert_from_http_body() {
    +        let body = convert_to_body(MockBody { data: Some("test") });
    +        assert_eq!(
    +            &String::from_utf8(get_body_bytes(body).await.unwrap().to_vec()).unwrap(),
    +            "test"
    +        );
    +    }
    +
    +    #[tokio::test]
    +    async fn test_convert_from_hyper_body() {
    +        let body = convert_to_body(hyper::Body::from("test"));
    +        assert_eq!(
    +            &String::from_utf8(get_body_bytes(body).await.unwrap().to_vec()).unwrap(),
    +            "test"
    +        );
    +    }
    +}
    
  • apollo-router/src/services/router/service.rs+32 70 modified
    @@ -15,6 +15,7 @@ use futures::future::BoxFuture;
     use futures::stream;
     use futures::stream::once;
     use futures::stream::StreamExt;
    +use futures::TryFutureExt;
     use http::header::CONTENT_TYPE;
     use http::header::VARY;
     use http::request::Parts;
    @@ -33,6 +34,7 @@ use tower::ServiceExt;
     use tower_service::Service;
     use tracing::Instrument;
     
    +use super::Body;
     use super::ClientRequestAccepts;
     use crate::axum_factory::CanceledRequest;
     use crate::batching::Batch;
    @@ -94,7 +96,6 @@ pub(crate) struct RouterService {
         apq_layer: APQLayer,
         persisted_query_layer: Arc<PersistedQueryLayer>,
         query_analysis_layer: QueryAnalysisLayer,
    -    http_max_request_bytes: usize,
         batching: Batching,
     }
     
    @@ -104,15 +105,13 @@ impl RouterService {
             apq_layer: APQLayer,
             persisted_query_layer: Arc<PersistedQueryLayer>,
             query_analysis_layer: QueryAnalysisLayer,
    -        http_max_request_bytes: usize,
             batching: Batching,
         ) -> Self {
             RouterService {
                 supergraph_creator,
                 apq_layer,
                 persisted_query_layer,
                 query_analysis_layer,
    -            http_max_request_bytes,
                 batching,
             }
         }
    @@ -408,9 +407,14 @@ impl RouterService {
         }
     
         async fn call_inner(&self, req: RouterRequest) -> Result<RouterResponse, BoxError> {
    -        let context = req.context.clone();
    +        let context = req.context;
    +        let (parts, body) = req.router_request.into_parts();
    +        let requests = self.get_graphql_requests(&parts, body).await?;
     
    -        let (supergraph_requests, is_batch) = match self.translate_request(req).await {
    +        let (supergraph_requests, is_batch) = match futures::future::ready(requests)
    +            .and_then(|r| self.translate_request(&context, parts, r))
    +            .await
    +        {
                 Ok(requests) => requests,
                 Err(err) => {
                     u64_counter!(
    @@ -647,67 +651,11 @@ impl RouterService {
     
         async fn translate_request(
             &self,
    -        req: RouterRequest,
    +        context: &Context,
    +        parts: Parts,
    +        graphql_requests: (Vec<graphql::Request>, bool),
         ) -> Result<(Vec<SupergraphRequest>, bool), TranslateError> {
    -        let RouterRequest {
    -            router_request,
    -            context,
    -        } = req;
    -
    -        let (parts, body) = router_request.into_parts();
    -
    -        let graphql_requests: Result<(Vec<graphql::Request>, bool), TranslateError> = if parts
    -            .method
    -            == Method::GET
    -        {
    -            self.translate_query_request(&parts).await
    -        } else {
    -            // FIXME: use a try block when available: https://github.com/rust-lang/rust/issues/31436
    -            let content_length = (|| {
    -                parts
    -                    .headers
    -                    .get(http::header::CONTENT_LENGTH)?
    -                    .to_str()
    -                    .ok()?
    -                    .parse()
    -                    .ok()
    -            })();
    -            if content_length.unwrap_or(0) > self.http_max_request_bytes {
    -                Err(TranslateError {
    -                    status: StatusCode::PAYLOAD_TOO_LARGE,
    -                    error: "payload too large for the `http_max_request_bytes` configuration",
    -                    extension_code: "INVALID_GRAPHQL_REQUEST",
    -                    extension_details: "payload too large".to_string(),
    -                })
    -            } else {
    -                let body = http_body::Limited::new(body, self.http_max_request_bytes);
    -                get_body_bytes(body)
    -                    .instrument(tracing::debug_span!("receive_body"))
    -                    .await
    -                    .map_err(|e| {
    -                        if e.is::<http_body::LengthLimitError>() {
    -                            TranslateError {
    -                                status: StatusCode::PAYLOAD_TOO_LARGE,
    -                                error: "payload too large for the `http_max_request_bytes` configuration",
    -                                extension_code: "INVALID_GRAPHQL_REQUEST",
    -                                extension_details: "payload too large".to_string(),
    -                            }
    -                        } else {
    -                            TranslateError {
    -                                status: StatusCode::BAD_REQUEST,
    -                                error: "failed to get the request body",
    -                                extension_code: "INVALID_GRAPHQL_REQUEST",
    -                                extension_details: format!("failed to get the request body: {e}"),
    -                            }
    -                        }
    -                    })
    -                    .and_then(|bytes| {
    -                        self.translate_bytes_request(&bytes)
    -                    })
    -            }
    -        };
    -
    -        let (ok_results, is_batch) = graphql_requests?;
    +        let (ok_results, is_batch) = graphql_requests;
             let mut results = Vec::with_capacity(ok_results.len());
             let batch_size = ok_results.len();
     
    @@ -759,7 +707,7 @@ impl RouterService {
                 *new.body_mut() = graphql_request;
                 // XXX Lose some private entries, is that ok?
                 let new_context = Context::new();
    -            new_context.extend(&context);
    +            new_context.extend(context);
                 let client_request_accepts_opt = context
                     .extensions()
                     .with_lock(|lock| lock.get::<ClientRequestAccepts>().cloned());
    @@ -811,13 +759,30 @@ impl RouterService {
                 0,
                 SupergraphRequest {
                     supergraph_request: sg,
    -                context,
    +                context: context.clone(),
                 },
             );
     
             Ok((results, is_batch))
         }
     
    +    async fn get_graphql_requests(
    +        &self,
    +        parts: &Parts,
    +        body: Body,
    +    ) -> Result<Result<(Vec<graphql::Request>, bool), TranslateError>, BoxError> {
    +        let graphql_requests: Result<(Vec<graphql::Request>, bool), TranslateError> =
    +            if parts.method == Method::GET {
    +                self.translate_query_request(parts).await
    +            } else {
    +                let bytes = get_body_bytes(body)
    +                    .instrument(tracing::debug_span!("receive_body"))
    +                    .await?;
    +                self.translate_bytes_request(&bytes)
    +            };
    +        Ok(graphql_requests)
    +    }
    +
         fn count_errors(errors: &[graphql::Error]) {
             let mut map = HashMap::new();
             for error in errors {
    @@ -865,7 +830,6 @@ pub(crate) struct RouterCreator {
         apq_layer: APQLayer,
         pub(crate) persisted_query_layer: Arc<PersistedQueryLayer>,
         query_analysis_layer: QueryAnalysisLayer,
    -    http_max_request_bytes: usize,
         batching: Batching,
     }
     
    @@ -915,7 +879,6 @@ impl RouterCreator {
                 static_page,
                 apq_layer,
                 query_analysis_layer,
    -            http_max_request_bytes: configuration.limits.http_max_request_bytes,
                 persisted_query_layer,
                 batching: configuration.batching.clone(),
             })
    @@ -934,7 +897,6 @@ impl RouterCreator {
                 self.apq_layer.clone(),
                 self.persisted_query_layer.clone(),
                 self.query_analysis_layer.clone(),
    -            self.http_max_request_bytes,
                 self.batching.clone(),
             ));
     
    
  • apollo-router/src/services/router/tests.rs+1 1 modified
    @@ -186,7 +186,7 @@ async fn it_fails_on_no_query() {
     
     #[tokio::test]
     async fn test_http_max_request_bytes() {
    -    /// Size of the JSON serialization of the request created by `fn canned_new`
    +    /// Size of the JSON serialization of the request created by `fn canned_new`
         /// in `apollo-router/src/services/supergraph.rs`
         const CANNED_REQUEST_LEN: usize = 391;
     
    
  • apollo-router/tests/integration/coprocessor.rs+58 0 modified
    @@ -1,5 +1,11 @@
     use insta::assert_yaml_snapshot;
    +use serde_json::json;
     use tower::BoxError;
    +use wiremock::matchers::body_partial_json;
    +use wiremock::matchers::method;
    +use wiremock::matchers::path;
    +use wiremock::Mock;
    +use wiremock::ResponseTemplate;
     
     use crate::integration::common::graph_os_enabled;
     use crate::integration::IntegrationTest;
    @@ -24,3 +30,55 @@ async fn test_error_not_propagated_to_client() -> Result<(), BoxError> {
         router.graceful_shutdown().await;
         Ok(())
     }
    +
    +#[tokio::test(flavor = "multi_thread")]
    +async fn test_coprocessor_limit_payload() -> Result<(), BoxError> {
    +    if !graph_os_enabled() {
    +        return Ok(());
    +    }
    +
    +    let mock_server = wiremock::MockServer::start().await;
    +    let coprocessor_address = mock_server.uri();
    +
    +    // Expect a small query
    +    Mock::given(method("POST"))
    +        .and(path("/"))
    +        .and(body_partial_json(json!({"version":1,"stage":"RouterRequest","control":"continue","body":"{\"query\":\"query {topProducts{name}}\",\"variables\":{}}","method":"POST"})))
    +        .respond_with(
    +            ResponseTemplate::new(200).set_body_json(json!({"version":1,"stage":"RouterRequest","control":"continue","body":"{\"query\":\"query {topProducts{name}}\",\"variables\":{}}","method":"POST"})),
    +        )
    +        .expect(1)
    +        .mount(&mock_server)
    +        .await;
    +
    +    // Do not expect a large query
    +    Mock::given(method("POST"))
    +        .and(path("/"))
    +        .respond_with(ResponseTemplate::new(200).set_body_json(json!({"version":1,"stage":"RouterRequest","control":"continue","body":"{\"query\":\"query {topProducts{name}}\",\"variables\":{}}","method":"POST"})))
    +        .expect(0)
    +        .mount(&mock_server)
    +        .await;
    +
    +    let mut router = IntegrationTest::builder()
    +        .config(
    +            include_str!("fixtures/coprocessor_body_limit.router.yaml")
    +                .replace("<replace>", &coprocessor_address),
    +        )
    +        .build()
    +        .await;
    +
    +    router.start().await;
    +    router.assert_started().await;
    +
    +    // This query is small and should make it to the coprocessor
    +    let (_trace_id, response) = router.execute_default_query().await;
    +    assert_eq!(response.status(), 200);
    +
    +    // This query is huge and will be rejected because it is too large before hitting the coprocessor
    +    let (_trace_id, response) = router.execute_huge_query().await;
    +    assert_eq!(response.status(), 413);
    +    assert_yaml_snapshot!(response.text().await?);
    +
    +    router.graceful_shutdown().await;
    +    Ok(())
    +}
    
  • apollo-router/tests/integration/fixtures/coprocessor_body_limit.router.yaml+8 0 added
    @@ -0,0 +1,8 @@
    +# This coprocessor doesn't point to anything
    +coprocessor:
    +  url: "<replace>"
    +  router:
    +    request:
    +      body: true
    +limits:
    +  http_max_request_bytes: 100
    \ No newline at end of file
    
  • apollo-router/tests/integration/fixtures/request_bytes_limit.router.yaml+7 0 added
    @@ -0,0 +1,7 @@
    +limits:
    +  http_max_request_bytes: 60
    +coprocessor:
    +  url: http://localhost:4005
    +  router:
    +    request:
    +      body: true
    \ No newline at end of file
    
  • apollo-router/tests/integration/fixtures/request_bytes_limit_with_coprocessor.router.yaml+7 0 added
    @@ -0,0 +1,7 @@
    +limits:
    +  http_max_request_bytes: 60
    +coprocessor:
    +  url: http://localhost:4005
    +  router:
    +    request:
    +      body: true
    \ No newline at end of file
    
  • apollo-router/tests/integration/operation_limits.rs+33 0 modified
    @@ -7,8 +7,11 @@ use apollo_router::services::execution;
     use apollo_router::services::supergraph;
     use apollo_router::TestHarness;
     use serde_json::json;
    +use tower::BoxError;
     use tower::ServiceExt;
     
    +use crate::integration::IntegrationTest;
    +
     #[tokio::test(flavor = "multi_thread")]
     async fn test_response_errors() {
         let (mut service, execution_count) = build_test_harness(json!({
    @@ -296,3 +299,33 @@ fn expect_errors(response: graphql::Response, expected_error_codes: &[&str]) {
             assert!(response.data.is_none())
         }
     }
    +
    +#[tokio::test(flavor = "multi_thread")]
    +async fn test_request_bytes_limit_with_coprocessor() -> Result<(), BoxError> {
    +    let mut router = IntegrationTest::builder()
    +        .config(include_str!(
    +            "fixtures/request_bytes_limit_with_coprocessor.router.yaml"
    +        ))
    +        .build()
    +        .await;
    +    router.start().await;
    +    router.assert_started().await;
    +    let (_, resp) = router.execute_huge_query().await;
    +    assert_eq!(resp.status(), 413);
    +    router.graceful_shutdown().await;
    +    Ok(())
    +}
    +
    +#[tokio::test(flavor = "multi_thread")]
    +async fn test_request_bytes_limit() -> Result<(), BoxError> {
    +    let mut router = IntegrationTest::builder()
    +        .config(include_str!("fixtures/request_bytes_limit.router.yaml"))
    +        .build()
    +        .await;
    +    router.start().await;
    +    router.assert_started().await;
    +    let (_, resp) = router.execute_huge_query().await;
    +    assert_eq!(resp.status(), 413);
    +    router.graceful_shutdown().await;
    +    Ok(())
    +}
    
  • apollo-router/tests/integration/snapshots/integration_tests__integration__coprocessor__coprocessor_limit_payload.snap+5 0 added
    @@ -0,0 +1,5 @@
    +---
    +source: apollo-router/tests/integration/coprocessor.rs
    +expression: response.text().await?
    +---
    +"{\"errors\":[{\"message\":\"Request body payload too large\",\"extensions\":{\"details\":\"Request body payload too large\",\"code\":\"INVALID_GRAPHQL_REQUEST\"}}]}"
    
  • apollo-router/tests/integration/telemetry/metrics.rs+1 1 modified
    @@ -177,7 +177,7 @@ async fn test_bad_queries() {
         router.execute_huge_query().await;
         router
             .assert_metrics_contains(
    -            r#"apollo_router_http_requests_total{error="payload too large for the `http_max_request_bytes` configuration",status="413",otel_scope_name="apollo/router"} 1"#,
    +            r#"apollo_router_http_requests_total{error="Request body payload too large",status="413",otel_scope_name="apollo/router"} 1"#,
                 None,
             )
             .await;
    
  • .changesets/fix_bryn_limits.md+34 0 added
    @@ -0,0 +1,34 @@
    +### Payload limits may exceed configured maximum ([Issue #ISSUE_NUMBER](https://github.com/apollographql/router/issues/ISSUE_NUMBER))
    +
    +When processing requests the configured limits as defined in the `limits` section may be ignored:
    +```yaml
    +limits:
    +  http_max_request_bytes: 2000000
    +```
    +
    +Plugins that execute services during the `router` lifecycle will not respect the configured limits. Potentially leading to a denial of service attack vector.
    +
    +#### Built features affected:
    +* Coprocessors configured to send the entire body of a request are vulnerable to this issue:
    +```yaml
    +coprocessor: 
    +  url: http://localhost:8080
    +  router: 
    +    request:
    +      body: true
    +```
    +
    +#### Fix details
    +Body size limits are now moved to earlier in the pipeline to ensure that coprocessors and user plugins respect
    +the configured limits.
    +Reading a request body past the configured limit will now abort the request and return a 413 response 
    +to the client instead of delegating to the code reading the body to handle the error.
    +
    +#### User impact
    +Body size limits are now enforced for all requests in the main graphql router pipeline. Custom plugins are covered by 
    +this and any attempt to read the body past the configured limit will abort the request and return a 413 response to the client.
    +
    +Coprocessors, rhai and native plugins do not have an opportunity to intercept aborted requests. It is advised to use 
    +the telemetry features within the router if you need to track these events.
    +
    +By [@bryncooke](https://github.com/AUTHOR) in https://github.com/apollographql/router/pull/PULL_NUMBER
    

Vulnerability mechanics

Generated on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

8

News mentions

0

No linked articles in our index yet.