Nautobot: Webhook definitions could be used for server-side request forgery (SSRF)
Description
Impact
Nautobot's Webhook data model and associated feature set could be configured by users with sufficient access to perform requests to various hosts and IP addresses that should not be permitted, allowing for various behaviors similar to server-side request forgery (SSRF).
Patches
Fixes are available in Nautobot v2.4.33 and v3.1.2.
In support of this fix, three new settings variables have been added to Nautobot:
WEBHOOK_ALLOWED_SCHEMES- By default new or updatedWebhookrecords will be restricted to HTTP or HTTPS only, disallowing other schemes that may have been previously allowed. Administrators should audit existingWebhookrecords to identify any that are invalid, and either update/delete said records or customizeWEBHOOK_ALLOWED_SCHEMESas appropriate.WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS- This can be used to specify additional IP networks that should be denied toWebhooksending, for example some deployments may wish to disallow RFC1918 addresses or even disallow all networks and carve out specific exemptions using the following setting.WEBHOOK_ALLOWED_HOSTS- This can be used to provide an allow-list of specific hosts that would otherwise be blocked by anyWEBHOOK_ADDITIONAL_BLOCKED_NETWORKSconfiguration.
Workarounds
Administrators should review which users have been granted add or change permissions for the Webhook data model, and should review currently defined Webhook records for safety and validity. Other than that, no specific workaround has been identified.
References
- 2.4.33 (<a href="https://github.com/nautobot/nautobot/commit/16aa4aa9796ab7a31c4d615ec945e1f16d8c77c4">patch</a>)
- 3.1.2 (<a href="https://github.com/nautobot/nautobot/commit/7324c8f0d8c7245fbc691e15d729adc2d2707d08">patch</a>)
Affected packages
Versions sourced from the GitHub Security Advisory.
| Package | Affected versions | Patched versions |
|---|---|---|
nautobotPyPI | >= 3.0.0a2, < 3.1.2 | 3.1.2 |
nautobotPyPI | < 2.4.33 | 2.4.33 |
Affected products
1Patches
216aa4aa9796aMerge commit from fork
13 files changed · +708 −144
changes/GHSA-c35q-vxrp-ph26.removed+1 −0 added@@ -0,0 +1 @@ +Removed support for `nautobot-server webhook_receiver` command.
changes/GHSA-c35q-vxrp-ph26.security+5 −0 added@@ -0,0 +1,5 @@ +Added support for `WEBHOOK_ALLOWED_SCHEMES` settings variable. By default new or updated `Webhook` records will be restricted to HTTP or HTTPS only, disallowing other schemes that may have been previously allowed. Administrators should audit existing `Webhook` records to identify any that are invalid, and either update/delete said records or customize `WEBHOOK_ALLOWED_SCHEMES` as appropriate. +Added support for `WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS` settings variable. This can be used to specify additional IP networks that should be denied to `Webhook` sending, for example some deployments may wish to disallow RFC1918 addresses. +Added support for `WEBHOOK_ALLOWED_HOSTS` settings variable. This can be used to provide an allow-list of specific hosts that would otherwise be blocked by any `WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS` configuration. +Added logic to deny loopback, link-local, multicast, unspecified, or reserved IP addresses when defining or executing a `Webhook`. Administrators should audit existing `Webhook` records to identify any that are invalid and delete said records. (GHSA-c35q-vxrp-ph26) +Added various logic to protect `Webhook` definitions against being used as a vector for server-side request forgery (SSRF). (GHSA-c35q-vxrp-ph26)
nautobot/core/settings.py+25 −0 modified@@ -302,6 +302,31 @@ # Pseudo-random number generator seed, for reproducibility of test results. TEST_FACTORY_SEED = os.getenv("NAUTOBOT_TEST_FACTORY_SEED", None) +# URL schemes that Webhooks are permitted to target. Defaults to HTTP and HTTPS only. +WEBHOOK_ALLOWED_SCHEMES = ["http", "https"] +if "NAUTOBOT_WEBHOOK_ALLOWED_SCHEMES" in os.environ and os.environ["NAUTOBOT_WEBHOOK_ALLOWED_SCHEMES"] != "": + WEBHOOK_ALLOWED_SCHEMES = os.environ["NAUTOBOT_WEBHOOK_ALLOWED_SCHEMES"].split(_CONFIG_SETTING_SEPARATOR) + +# Hostnames that Webhooks may target even if they resolve into WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS. +# Django ALLOWED_HOSTS-style: literal hostnames or `.example.com` to match a domain and all of its subdomains. +# Use `*` to disable network filtering entirely (not recommended). +WEBHOOK_ALLOWED_HOSTS = [] +if "NAUTOBOT_WEBHOOK_ALLOWED_HOSTS" in os.environ and os.environ["NAUTOBOT_WEBHOOK_ALLOWED_HOSTS"] != "": + WEBHOOK_ALLOWED_HOSTS = os.environ["NAUTOBOT_WEBHOOK_ALLOWED_HOSTS"].split(_CONFIG_SETTING_SEPARATOR) + +# Network ranges (CIDR strings) added to the built-in Webhook block-list. The built-in block-list (loopback, link-local +# including cloud metadata endpoints such as 169.254.169.254, unspecified, multicast, reserved) is enforced +# unconditionally and cannot be disabled. Use this setting to extend it -- e.g. ["10.0.0.0/8", "192.168.0.0/16"] to +# block RFC1918 ranges as well, for deployments that don't legitimately need to webhook into private networks. +WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS = [] +if ( + "NAUTOBOT_WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS" in os.environ + and os.environ["NAUTOBOT_WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS"] != "" +): + WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS = os.environ["NAUTOBOT_WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS"].split( + _CONFIG_SETTING_SEPARATOR + ) + # # Django Prometheus #
nautobot/core/settings.yaml+65 −0 modified@@ -2124,4 +2124,69 @@ properties: The function must return only one argument: a string of the truncated device display name. version_added: "1.4.0" + WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS: + default: [] + description: >- + A list of network ranges (CIDR strings) added to the built-in Webhook block-list. The built-in block-list + (loopback, link-local including cloud metadata endpoints such as `169.254.169.254`, unspecified, multicast, + and reserved) is enforced unconditionally and cannot be disabled. Use this setting to extend it. + details: |- + For deployments that don't legitimately need to webhook into RFC1918 private networks, set this to + `["10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16"]` (and the equivalent IPv6 ULA `fc00::/7`) for stricter + posture. Hosts listed in [`WEBHOOK_ALLOWED_HOSTS`](#webhook_allowed_hosts) bypass *this* admin-extended + list (but not the built-in list). + + The block-list is enforced both when a Webhook is saved (for IP-literal hosts) and when the Celery worker + sends the request (after DNS resolution). + environment_variable: "NAUTOBOT_WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS" + items: + type: "string" + see_also: + "`WEBHOOK_ALLOWED_HOSTS`": "#webhook_allowed_hosts" + "`WEBHOOK_ALLOWED_SCHEMES`": "#webhook_allowed_schemes" + type: "array" + version_added: "2.4.33" + WEBHOOK_ALLOWED_HOSTS: + default: [] + description: >- + Hostnames that Webhooks may target even if they would otherwise be blocked by the + [`WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS`](#webhook_additional_blocked_networks) admin-extended block-list. + Django `ALLOWED_HOSTS`-style: literal hostnames or `.example.com` to match a domain and all of its + subdomains. The wildcard `*` matches every hostname. + details: |- + The built-in block-list (loopback, link-local including cloud metadata endpoints such as + `169.254.169.254`, unspecified, multicast, reserved) is enforced unconditionally and is **not** bypassed + by this setting -- including the wildcard `*`. An allow-listed hostname whose DNS resolves into the + built-in block-list is still rejected at send time. + + Typical use is to allow specific internal targets while leaving the rest of the network filtered. For + example, to permit a single internal monitoring host: + + ```python + WEBHOOK_ALLOWED_HOSTS = ["monitoring.internal.example.com"] + ``` + environment_variable: "NAUTOBOT_WEBHOOK_ALLOWED_HOSTS" + items: + type: "string" + see_also: + "`WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS`": "#webhook_additional_blocked_networks" + "`WEBHOOK_ALLOWED_SCHEMES`": "#webhook_allowed_schemes" + "Django documentation for `ALLOWED_HOSTS` (matching syntax)": "https://docs.djangoproject.com/en/stable/ref/settings/#allowed-hosts" + type: "array" + version_added: "2.4.33" + WEBHOOK_ALLOWED_SCHEMES: + default: + - "http" + - "https" + description: >- + URL schemes that Webhooks are permitted to target. Defaults to HTTP and HTTPS only; other schemes + (e.g. `file:`, `ftp:`, `gopher:`) are rejected at Webhook save time as a baseline SSRF protection. + environment_variable: "NAUTOBOT_WEBHOOK_ALLOWED_SCHEMES" + items: + type: "string" + see_also: + "`WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS`": "#webhook_additional_blocked_networks" + "`WEBHOOK_ALLOWED_HOSTS`": "#webhook_allowed_hosts" + type: "array" + version_added: "2.4.33" ...
nautobot/core/templates/nautobot_config.py.j2+17 −0 modified@@ -613,6 +613,23 @@ INSTALLATION_METRICS_ENABLED = is_truthy(os.getenv("NAUTOBOT_INSTALLATION_METRIC # """ # return str(device_display_name).split(".")[0] +# Webhook URL schemes permitted for the `payload_url` field. Defaults to HTTP and HTTPS. +# +# WEBHOOK_ALLOWED_SCHEMES = ["http", "https"] + +# Hostnames that Webhooks may target even if they would otherwise resolve into a blocked network range. Django +# `ALLOWED_HOSTS`-style: literal hostnames or `.example.com` to match a domain and all of its subdomains. Use `*` +# to disable network filtering entirely (not recommended). +# +# WEBHOOK_ALLOWED_HOSTS = [] + +# Network ranges (CIDR strings) added to the built-in Webhook block-list. The built-in block-list (loopback, +# link-local including cloud metadata endpoints such as 169.254.169.254, unspecified, multicast, and reserved) +# is enforced unconditionally and cannot be disabled. Use this setting to extend it -- e.g. add the RFC1918 +# private ranges below for stricter posture in deployments that don't legitimately webhook into private networks. +# +# WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS = ["10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "fc00::/7"] + # A list of strings designating all applications that are enabled in this Django installation. # Each string should be a dotted Python path to an application configuration class (preferred), # or a package containing an application.
nautobot/docs/user-guide/administration/tools/nautobot-server.md+0 −24 modified@@ -877,27 +877,3 @@ Nautobot version: 2.2.0a1 Django version: 3.2.24 Configuration file: /opt/nautobot/nautobot_config.py ``` - -### `webhook_receiver` - -`nautobot-server webhook_receiver` - -Start a simple listener to display received HTTP requests. - -`--port PORT` -Optional port number (default: `9000`) - -`--no-headers` -Hide HTTP request headers. - -```no-highlight -nautobot-server webhook_receiver --port 9001 --no-headers -``` - -Example output: - -```no-highlight -Listening on port http://localhost:9000. Stop with CONTROL-C. -``` - -Please see the guide on [Troubleshooting Webhooks](../../platform-functionality/webhook.md#troubleshooting-webhooks) for more information.
nautobot/docs/user-guide/platform-functionality/webhook.md+11 −29 modified@@ -110,39 +110,21 @@ A webhook request is considered successful if the receiver responds with a `2XX` ## Troubleshooting Webhooks -To inspect outgoing webhooks, you can use a local HTTP listener. Nautobot provides a built-in webhook receiver that logs incoming requests: +You can test webhooks with external services like [Beeceptor](https://beeceptor.com/) or [Pipedream RequestBin](https://pipedream.com/requestbin). These tools let you inspect webhook payloads and troubleshoot integration issues. -1. Set the webhook URL to `http://localhost:9000/`. -2. Start the webhook receiver: - - ```sh - nautobot-server webhook_receiver - ``` - - Example output: - - ```sh - Listening on port http://localhost:9000. Stop with CONTROL-C. - ``` - -3. Send a test request: +If a webhook does not trigger as expected, ensure that the **Celery worker** process is running and check the Nautobot logs for errors. - ```sh - curl -X POST http://localhost:9000 --data '{"foo": "bar"}' - ``` +## Webhook Administration and Security - The listener will output: +Because configured webhooks are sent automatically to another system, and because the data sent by a webhook contains information about the data stored in Nautobot, it's often necessary to restrict what users may configure webhooks, and what systems the webhooks may be sent to. The former is achieved by only granting webhook `add` and `change` [permissions](users/objectpermission.md) to the users that have a legitimate need to manage webhooks, while the later is a bit more involved. - ```sh - [1] Tue, 07 Apr 2020 17:44:02 GMT 127.0.0.1 "POST / HTTP/1.1" 200 - - Host: localhost:9000 - Content-Type: application/json - Content-Length: 14 +To provide a baseline of security, Nautobot automatically disallows the configuration of webhooks that point to certain classes of IP addresses (link-local, loopback, multicast, and reserved addresses), and at webhook transmission time, webhooks configured to send to a hostname that resolves to such an IP is also automatically blocked. This restriction is built-in to Nautobot and cannot be disabled. - {"foo": "bar"} - ``` +An administrator can further restrict the range of IPs and hosts that webhooks can be sent to by configuring [`WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS`](../administration/configuration/settings.md#webhook_additional_blocked_networks) and/or [`WEBHOOK_ALLOWED_HOSTS`](../administration/configuration/settings.md#webhook_allowed_hosts) in `nautobot_config.py`. The former defines additional IP networks that should be blocked, and the latter defines an allow-list of hosts or domains that are explicitly permitted **despite falling within the `ADDITIONAL_BLOCKED_NETWORKS` networks**. For example, to disallow webhooks to **all** hosts except for those within `example.com`, you could configure: -> **Alternative Testing Tools:** -> Instead of using the built-in webhook receiver, you can test webhooks with external services like [Beeceptor](https://beeceptor.com/) or [Pipedream RequestBin](https://pipedream.com/requestbin). These tools let you inspect webhook payloads and troubleshoot integration issues. +```python +WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS = ["0.0.0.0/0", "::/0"] +WEBHOOK_ALLOWED_HOSTS = [".example.com"] +``` -If a webhook does not trigger as expected, ensure that the **Celery worker** process is running and check the Nautobot logs for errors. +A third administratively-definable setting is [`WEBHOOK_ALLOWED_SCHEMES`](../administration/configuration/settings.md#webhook_allowed_schemes). This defaults to `["http", "https"]` but can be configured if desired, for example to disallow HTTP-only webhooks or to allow specific additional protocols.
nautobot/extras/management/commands/webhook_receiver.py+0 −82 removed@@ -1,82 +0,0 @@ -from http.server import BaseHTTPRequestHandler, HTTPServer -import sys - -from django.core.management.base import BaseCommand - -request_counter = 1 - - -class WebhookHandler(BaseHTTPRequestHandler): - show_headers = True - - def __getattr__(self, item): - # Return the same method for any type of HTTP request (GET, POST, etc.) - if item.startswith("do_"): - return self.do_ANY - - raise AttributeError - - def log_message(self, format_str, *args): # pylint: disable=arguments-differ - global request_counter - - print(f"[{request_counter}] {self.date_time_string()} {self.address_string()} {format_str % args}") - - def do_ANY(self): - global request_counter - - # Send a 200 response regardless of the request content - self.send_response(200) - self.end_headers() - self.wfile.write(b"Webhook received!\n") - - request_counter += 1 - - # Print the request headers to stdout - if self.show_headers: - for k, v in self.headers.items(): - print(f"{k}: {v}") - print() - - # Print the request body (if any) - content_length = self.headers.get("Content-Length") - if content_length is not None: - body = self.rfile.read(int(content_length)) - print(body.decode("utf-8")) - else: - print("(No body)") - - print("------------") - - -class Command(BaseCommand): - help = "Start a simple listener to display received HTTP requests" - - default_port = 9000 - - def add_arguments(self, parser): - parser.add_argument( - "--port", - type=int, - default=self.default_port, - help=f"Optional port number (default: {self.default_port})", - ) - parser.add_argument( - "--no-headers", - action="store_true", - dest="no_headers", - help="Hide HTTP request headers", - ) - - def handle(self, *args, **options): - port = options["port"] - quit_command = "CTRL-BREAK" if sys.platform == "win32" else "CONTROL-C" - - WebhookHandler.show_headers = not options["no_headers"] - - self.stdout.write(f"Listening on port http://localhost:{port}. Stop with {quit_command}.") - httpd = HTTPServer(("localhost", port), WebhookHandler) - - try: - httpd.serve_forever() - except KeyboardInterrupt: - self.stdout.write("\nExiting...")
nautobot/extras/models/models.py+11 −0 modified@@ -1051,6 +1051,17 @@ def clean(self): {"ca_file_path": "Do not specify a CA certificate file if SSL verification is disabled."} ) + # Validate payload_url against SSRF policy (scheme allow-list, host allow-list, IP-literal block-list). + # DNS-based block-list enforcement happens at request-send time in process_webhook, since the web server + # and worker may have different DNS views. + # Imported here to avoid a circular import: nautobot.extras.webhooks imports the Webhook model. + from nautobot.extras.webhooks import validate_webhook_url_format + + try: + validate_webhook_url_format(self.payload_url) + except ValidationError as exc: + raise ValidationError({"payload_url": exc.messages}) + def render_headers(self, context): """ Render additional_headers and return a dict of Header: Value pairs.
nautobot/extras/tasks.py+79 −6 modified@@ -1,9 +1,12 @@ from logging import getLogger +from urllib.parse import urlsplit from django.conf import settings from django.contrib.contenttypes.models import ContentType +from django.core.exceptions import ValidationError from jinja2.exceptions import TemplateError import requests +import urllib3 from nautobot.core.celery import nautobot_task from nautobot.core.models.query_functions import JSONRemove, JSONSet @@ -174,15 +177,70 @@ def provision_field(field_id, content_type_pk_set, change_context=None): return True +def _send_webhook_request_pinned(prepared_request, validated_ip): + """ + Send `prepared_request` by connecting to `validated_ip` directly, while preserving the URL's hostname for + the `Host` header (and TLS SNI for HTTPS). Used only when DNS rebinding is a real risk -- i.e. plaintext + HTTP, or HTTPS with verification disabled. HTTPS-with-verification is naturally protected by certificate + validation and uses the regular `requests` path. + """ + parts = urlsplit(prepared_request.url) + hostname = parts.hostname + port = parts.port or (443 if parts.scheme == "https" else 80) + + pool_kwargs = {"timeout": urllib3.Timeout(connect=10, read=30)} + if parts.scheme == "https": + # Reached only when ssl_verification is False (HTTPS+verify is handled by the requests path). Skip cert + # validation to match the existing semantic of webhook.ssl_verification=False, but keep server_hostname + # set to the original hostname so SNI-routed receivers still work. + pool_kwargs.update(cert_reqs="CERT_NONE", server_hostname=hostname) + pool = urllib3.HTTPSConnectionPool(host=validated_ip, port=port, **pool_kwargs) + else: + pool = urllib3.HTTPConnectionPool(host=validated_ip, port=port, **pool_kwargs) + + target = parts.path or "/" + if parts.query: + target = f"{target}?{parts.query}" + + headers = dict(prepared_request.headers) + headers["Host"] = hostname if port in (80, 443) else f"{hostname}:{port}" + + raw = pool.urlopen( + method=prepared_request.method, + url=target, + body=prepared_request.body, + headers=headers, + redirect=False, + retries=False, + ) + response = requests.Response() + response.status_code = raw.status + response.headers.update(raw.headers) + response._content = raw.data + response.url = prepared_request.url + response.request = prepared_request + return response + + @nautobot_task def process_webhook(webhook_pk, data, model_name, event, timestamp, username, request_id, snapshots): """ Make a POST request to the defined Webhook """ from nautobot.extras.models import Webhook # avoiding circular import + from nautobot.extras.webhooks import validate_webhook_url # avoiding circular import webhook = Webhook.objects.get(pk=webhook_pk) + # SSRF defense-in-depth: re-validate the URL here (with DNS resolution) since the worker is the entity opening + # the outbound connection, and its DNS view may differ from the web server's. Catches any rows that predate + # the SSRF policy. Returns the resolved IP for use below in DNS-rebinding mitigation. + try: + validated_ip = validate_webhook_url(webhook.payload_url) + except ValidationError as e: + logger.error("Webhook %s payload URL %r blocked by SSRF policy: %s", webhook, webhook.payload_url, e) + raise + context = { "event": dict(ObjectChangeActionChoices)[event].lower(), "timestamp": timestamp, @@ -229,12 +287,27 @@ def process_webhook(webhook_pk, data, model_name, event, timestamp, username, re if webhook.secret != "": prepared_request.headers["X-Hook-Signature"] = generate_signature(prepared_request.body, webhook.secret) - # Send the request - with requests.Session() as session: - session.verify = webhook.ssl_verification - if webhook.ca_file_path: - session.verify = webhook.ca_file_path - response = session.send(prepared_request, proxies=settings.HTTP_PROXIES) + # Send the request. Redirects are never followed: the SSRF block-list only validates the initial URL, so + # allowing redirects would let an attacker-controlled public host 30x the request to a loopback or + # cloud-metadata target. + # + # DNS-rebinding mitigation: pin the connection to the IP we validated, instead of letting urllib3 re-resolve + # the hostname. We only do this for paths where TLS doesn't already provide the protection (plain HTTP, and + # HTTPS with verification disabled). HTTPS-with-verification is naturally rebinding-safe because the rebound + # target's certificate won't have a SAN matching the original hostname. Pinning is also skipped when an HTTP + # proxy is configured: the proxy resolves DNS itself and is an admin-controlled trust boundary. + parts_scheme = urlsplit(webhook.payload_url).scheme.lower() + can_pin = not settings.HTTP_PROXIES and ( + parts_scheme == "http" or (parts_scheme == "https" and not webhook.ssl_verification) + ) + if can_pin: + response = _send_webhook_request_pinned(prepared_request, validated_ip) + else: + with requests.Session() as session: + session.verify = webhook.ssl_verification + if webhook.ca_file_path: + session.verify = webhook.ca_file_path + response = session.send(prepared_request, proxies=settings.HTTP_PROXIES, allow_redirects=False) if response.ok: logger.info("Request succeeded; response status %s", response.status_code)
nautobot/extras/tests/test_models.py+18 −0 modified@@ -3435,3 +3435,21 @@ def test_type_error_not_raised_when_calling_check_for_conflicts(self): conflicts["type_create"], [f"A webhook already exists for create on dcim | device to URL {self.url}"], ) + + def test_clean_payload_url_validation(self): + """`Webhook.clean()` surfaces SSRF policy violations against the `payload_url` field.""" + cases = [ + ("bad scheme is rejected", "ftp://example.com/", False), + ("loopback ip literal is rejected", "http://127.0.0.1/", False), + ("link-local ip literal is rejected", "http://169.254.169.254/", False), + ("public url is accepted", "https://example.com/hooks/abc", True), + ] + for desc, payload_url, should_pass in cases: + with self.subTest(desc): + webhook = Webhook(name="test-webhook", type_create=True, payload_url=payload_url) + if should_pass: + webhook.clean() + else: + with self.assertRaises(ValidationError) as ctx: + webhook.clean() + self.assertIn("payload_url", ctx.exception.message_dict)
nautobot/extras/tests/test_webhooks.py+306 −3 modified@@ -6,12 +6,15 @@ from django.apps import apps from django.contrib.auth import get_user_model from django.contrib.contenttypes.models import ContentType +from django.core.exceptions import ValidationError +from django.test import override_settings from django.utils import timezone +import requests from requests import Session from nautobot.core.api.exceptions import SerializerNotFound from nautobot.core.api.utils import get_serializer_for_model -from nautobot.core.testing import APITestCase +from nautobot.core.testing import APITestCase, TestCase from nautobot.core.utils.lookup import get_changes_for_model from nautobot.dcim.api.serializers import LocationSerializer from nautobot.dcim.models import Location, LocationType @@ -20,8 +23,9 @@ from nautobot.extras.models import Tag, Webhook from nautobot.extras.models.statuses import Status from nautobot.extras.registry import registry -from nautobot.extras.tasks import process_webhook +from nautobot.extras.tasks import _send_webhook_request_pinned, process_webhook from nautobot.extras.utils import generate_signature +from nautobot.extras.webhooks import validate_webhook_url, validate_webhook_url_format User = get_user_model() @@ -30,7 +34,10 @@ class WebhookTest(APITestCase): @classmethod def setUpTestData(cls): location_ct = ContentType.objects.get_for_model(Location) - MOCK_URL = "http://localhost/" + # https://8.8.8.8/ skips DNS resolution (IP literal), isn't in any block-list, and uses HTTPS with + # verification (the default), so the worker takes the requests.Session.send path that these tests mock. + # No actual network traffic occurs because Session.send is patched. + MOCK_URL = "https://8.8.8.8/" MOCK_SECRET = "LOOKATMEIMASECRETSTRING" # noqa: S105 # hardcoded-password-string -- OK as this is test code webhooks = ( @@ -275,6 +282,100 @@ class FakeResponse: def test_webhook_render_body_with_utf8(self): self.assertEqual(Webhook().render_body({"utf8": "I am UTF-8! 😀"}), '{"utf8": "I am UTF-8! 😀"}') + def test_process_webhook_does_not_follow_redirects(self): + """SSRF defense: redirects must not be followed, since the SSRF block-list only validates the initial URL.""" + request_id = uuid.uuid4() + webhook = Webhook.objects.get(type_create=True) + timestamp = str(timezone.now()) + + captured_kwargs = {} + + def mock_send(_, request, **kwargs): + captured_kwargs.update(kwargs) + + class FakeResponse: + ok = True + status_code = 200 + + return FakeResponse() + + with patch.object(Session, "send", mock_send): + with web_request_context(self.user, change_id=request_id): + location_type = LocationType.objects.get(name="Campus") + location = Location(name="Location 1", location_type=location_type, status=self.statuses[0]) + location.save() + + serializer = LocationSerializer(location, context={"request": None}) + oc = get_changes_for_model(location).first() + snapshots = oc.get_snapshots() + + process_webhook( + webhook.pk, + serializer.data, + Location._meta.model_name, + ObjectChangeActionChoices.ACTION_CREATE, + timestamp, + self.user.username, + request_id, + snapshots, + ) + + self.assertEqual(captured_kwargs.get("allow_redirects"), False) + + def _process_webhook(self, webhook): + """Drive `process_webhook` end-to-end with a freshly-saved Location object change.""" + request_id = uuid.uuid4() + timestamp = str(timezone.now()) + with web_request_context(self.user, change_id=request_id): + location_type = LocationType.objects.get(name="Campus") + location = Location(name="Location SSRF", location_type=location_type, status=self.statuses[0]) + location.save() + serializer = LocationSerializer(location, context={"request": None}) + oc = get_changes_for_model(location).first() + snapshots = oc.get_snapshots() + process_webhook( + webhook.pk, + serializer.data, + Location._meta.model_name, + ObjectChangeActionChoices.ACTION_CREATE, + timestamp, + self.user.username, + request_id, + snapshots, + ) + + def test_process_webhook_uses_pinning_for_http(self): + """HTTP webhooks go through the pinned-IP helper rather than `requests.Session.send`.""" + webhook = Webhook.objects.get(type_create=True) + webhook.payload_url = "http://target.example/" + webhook.save() + + with ( + patch("nautobot.extras.tasks._send_webhook_request_pinned") as mock_pinned, + patch("nautobot.extras.webhooks.socket.getaddrinfo") as mock_resolve, + ): + mock_resolve.return_value = [(2, 1, 6, "", ("8.8.8.8", 0))] + mock_pinned.return_value.ok = True + mock_pinned.return_value.status_code = 200 + self._process_webhook(webhook) + + # Second positional arg is the validated IP that came back from validate_webhook_url. The webhook fires + # both via Location.save() (auto-enqueue) and via the explicit process_webhook call -- we only care that + # the pinned helper handled it (not the requests.Session.send path). + mock_pinned.assert_called() + self.assertEqual(mock_pinned.call_args.args[1], "8.8.8.8") + + def test_process_webhook_raises_on_ssrf_blocked_url(self): + """Worker re-validates: a webhook whose hostname resolves to a blocked IP at send time is rejected.""" + webhook = Webhook.objects.get(type_create=True) + webhook.payload_url = "http://rebound.example/" + webhook.save() + + with patch("nautobot.extras.webhooks.socket.getaddrinfo") as mock_resolve: + mock_resolve.return_value = [(2, 1, 6, "", ("127.0.0.1", 0))] + with self.assertRaises(ValidationError): + self._process_webhook(webhook) + @patch("nautobot.extras.tasks.process_webhook.apply_async") def test_enqueue_webhooks(self, mock_async): request_id = uuid.uuid4() @@ -359,3 +460,205 @@ def test_all_webhook_supported_models(self): model_class = apps.get_model(app_label, model_name) get_serializer_for_model(model_class) self.assertTrue(hasattr(model_class, "to_objectchange")) + + +class WebhookURLFormatValidatorTest(TestCase): + """Tests for `validate_webhook_url_format` -- save-time validation, no DNS resolution.""" + + def test_validate_format_default_policy(self): + # (description, url, should_pass) + cases = [ + # Empty / malformed + ("empty url is rejected", "", False), + ("invalid syntax is rejected", "not a url", False), + # Scheme allow-list (default = http, https) + ("file scheme is rejected", "file:///etc/passwd", False), + ("ftp scheme is rejected", "ftp://example.com/", False), + ("gopher scheme is rejected", "gopher://example.com/", False), + ("javascript scheme is rejected", "javascript:alert(1)", False), + ("http is accepted", "http://example.com/", True), + ("https is accepted", "https://example.com/", True), + # Built-in block-list (IP literals) + ("loopback v4 is rejected", "http://127.0.0.1/", False), + ("loopback v4 wider range is rejected", "http://127.5.6.7/", False), + ("loopback v6 is rejected", "http://[::1]/", False), + ("link-local v4 (cloud metadata) is rejected", "http://169.254.169.254/latest/meta-data/", False), + ("link-local v6 is rejected", "http://[fe80::1]/", False), + ("multicast v4 is rejected", "http://224.0.0.1/", False), + ("broadcast is rejected", "http://255.255.255.255/", False), + ("unspecified v4 is rejected", "http://0.0.0.0/", False), + ("class-E reserved is rejected", "http://240.0.0.1/", False), + # Public IP literals + ("public v4 is accepted", "http://8.8.8.8/", True), + ("public v6 is accepted", "http://[2001:4860:4860::8888]/", True), + # RFC1918 private ranges -- intentionally NOT blocked by default + ("RFC1918 10/8 is accepted by default", "http://10.0.0.1/", True), + ("RFC1918 192.168/16 is accepted by default", "http://192.168.1.1/", True), + ("RFC1918 172.16/12 is accepted by default", "http://172.16.0.1/", True), + # Hostnames -- DNS deferred to worker, so any syntactically-valid hostname passes save-time + ("hostname passes without DNS lookup", "http://internal-nonresolvable-host.example/", True), + ] + for desc, url, should_pass in cases: + with self.subTest(desc): + if should_pass: + validate_webhook_url_format(url) + else: + with self.assertRaises(ValidationError): + validate_webhook_url_format(url) + + @override_settings(WEBHOOK_ALLOWED_SCHEMES=["ftp"]) + def test_allowed_schemes_setting_overrides_default(self): + validate_webhook_url_format("ftp://example.com/") + with self.assertRaises(ValidationError): + validate_webhook_url_format("http://example.com/") + + @override_settings( + WEBHOOK_ALLOWED_HOSTS=["10.20.30.40"], + WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS=["10.0.0.0/8"], + ) + def test_allow_list_literal_bypasses_admin_extended_block_list(self): + # Allow-list bypasses WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS for the matching host. + validate_webhook_url_format("http://10.20.30.40/") + # Other hosts in the same admin-extended range are still blocked. + with self.assertRaises(ValidationError): + validate_webhook_url_format("http://10.99.99.99/") + + @override_settings(WEBHOOK_ALLOWED_HOSTS=[".internal.example.com"]) + def test_allow_list_subdomain_wildcard(self): + validate_webhook_url_format("http://api.internal.example.com/") + validate_webhook_url_format("http://internal.example.com/") + + @override_settings(WEBHOOK_ALLOWED_HOSTS=["127.0.0.1", "169.254.169.254", "*"]) + def test_allow_list_does_not_bypass_built_in_block_list(self): + # Built-in block-list is enforced unconditionally; allow-listing (including wildcard) cannot expose + # loopback / cloud-metadata IP literals at save time. + with self.assertRaises(ValidationError): + validate_webhook_url_format("http://127.0.0.1/") + with self.assertRaises(ValidationError): + validate_webhook_url_format("http://169.254.169.254/") + + @override_settings(WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS=["10.0.0.0/8", "fc00::/7"]) + def test_additional_blocked_networks_extends_built_in(self): + with self.assertRaises(ValidationError): + validate_webhook_url_format("http://10.20.30.40/") + # IPv6 ULA (fc00::/7) -- exercises the cross-family skip path: an IPv4 addr is matched only against + # IPv4 networks in WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS, and vice versa. + with self.assertRaises(ValidationError): + validate_webhook_url_format("http://[fd00::1]/") + validate_webhook_url_format("http://192.168.1.1/") + validate_webhook_url_format("http://[2001:4860:4860::8888]/") + + +class WebhookURLSendTimeValidatorTest(TestCase): + """Tests for `validate_webhook_url` -- send-time validation, includes DNS resolution.""" + + def test_resolves_and_rejects_blocked_hostname(self): + # Mock getaddrinfo to return a loopback address for an arbitrary hostname. + with patch("nautobot.extras.webhooks.socket.getaddrinfo") as mock_resolve: + mock_resolve.return_value = [(2, 1, 6, "", ("127.0.0.1", 0))] + with self.assertRaises(ValidationError): + validate_webhook_url("http://attacker-controlled.example/") + + def test_resolves_and_accepts_public_hostname_returns_chosen_ip(self): + with patch("nautobot.extras.webhooks.socket.getaddrinfo") as mock_resolve: + mock_resolve.return_value = [(2, 1, 6, "", ("8.8.8.8", 0))] + self.assertEqual(validate_webhook_url("http://example.com/"), "8.8.8.8") + + def test_ip_literal_returns_literal(self): + self.assertEqual(validate_webhook_url("http://8.8.8.8/"), "8.8.8.8") + + @override_settings( + WEBHOOK_ALLOWED_HOSTS=["allowed.example"], + WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS=["8.0.0.0/8"], + ) + def test_allow_list_bypasses_admin_extended_at_send_time(self): + # Allow-listed host whose DNS lands inside the admin-extended block-list passes (the allow-list overrides + # the admin extension), and the validator returns the resolved IP for connection pinning. + with patch("nautobot.extras.webhooks.socket.getaddrinfo") as mock_resolve: + mock_resolve.return_value = [(2, 1, 6, "", ("8.8.8.8", 0))] + self.assertEqual(validate_webhook_url("http://allowed.example/"), "8.8.8.8") + + @override_settings(WEBHOOK_ALLOWED_HOSTS=["allowed.example", "*"]) + def test_allow_list_does_not_bypass_built_in_at_send_time(self): + # Allow-list (and wildcard) cannot expose the built-in block-list at send time -- a hostname whose DNS + # resolves to (e.g.) loopback is still rejected. This is the rebinding-into-metadata scenario. + with patch("nautobot.extras.webhooks.socket.getaddrinfo") as mock_resolve: + mock_resolve.return_value = [(2, 1, 6, "", ("169.254.169.254", 0))] + with self.assertRaises(ValidationError): + validate_webhook_url("http://allowed.example/") + + def test_unresolvable_host_raises(self): + import socket + + with patch("nautobot.extras.webhooks.socket.getaddrinfo") as mock_resolve: + mock_resolve.side_effect = socket.gaierror("nodename nor servname provided") + with self.assertRaises(ValidationError): + validate_webhook_url("http://nonexistent.example/") + + def test_rejects_when_any_resolved_address_is_blocked(self): + # If a hostname resolves to *any* blocked address (DNS round-robin, dual-stack with a blocked v6, etc.), + # we reject. This protects against partial-match bypasses. + with patch("nautobot.extras.webhooks.socket.getaddrinfo") as mock_resolve: + mock_resolve.return_value = [ + (2, 1, 6, "", ("8.8.8.8", 0)), # public + (2, 1, 6, "", ("127.0.0.1", 0)), # loopback -- should cause rejection + ] + with self.assertRaises(ValidationError): + validate_webhook_url("http://mixed.example/") + + +class WebhookSendPinningTest(TestCase): + """Tests for `_send_webhook_request_pinned` -- DNS-rebinding mitigation for HTTP and unverified HTTPS.""" + + def _make_prepared(self, url, method="POST", body=b'{"ok": true}'): + return requests.Request(method=method, url=url, data=body).prepare() + + def test_pinned_http_connects_to_validated_ip_with_original_host_header(self): + prepared = self._make_prepared("http://attacker.example/path?x=1") + + with patch("nautobot.extras.tasks.urllib3.HTTPConnectionPool") as mock_pool_cls: + mock_pool = mock_pool_cls.return_value + mock_pool.urlopen.return_value.status = 200 + mock_pool.urlopen.return_value.headers = {} + mock_pool.urlopen.return_value.data = b"" + _send_webhook_request_pinned(prepared, validated_ip="203.0.113.5") + + # Pool was constructed against the validated IP, not the hostname. + self.assertEqual(mock_pool_cls.call_args.kwargs["host"], "203.0.113.5") + self.assertEqual(mock_pool_cls.call_args.kwargs["port"], 80) + # Host header preserved as the original hostname so vhost-routed receivers still match. + urlopen_kwargs = mock_pool.urlopen.call_args.kwargs + self.assertEqual(urlopen_kwargs["headers"]["Host"], "attacker.example") + self.assertEqual(urlopen_kwargs["url"], "/path?x=1") + self.assertFalse(urlopen_kwargs["redirect"]) + + def test_pinned_https_unverified_sets_sni_to_hostname_and_disables_cert_check(self): + prepared = self._make_prepared("https://internal.example/") + + with patch("nautobot.extras.tasks.urllib3.HTTPSConnectionPool") as mock_pool_cls: + mock_pool = mock_pool_cls.return_value + mock_pool.urlopen.return_value.status = 200 + mock_pool.urlopen.return_value.headers = {} + mock_pool.urlopen.return_value.data = b"" + _send_webhook_request_pinned(prepared, validated_ip="10.20.30.40") + + kwargs = mock_pool_cls.call_args.kwargs + self.assertEqual(kwargs["host"], "10.20.30.40") + self.assertEqual(kwargs["port"], 443) + # SNI/TLS hostname is the original hostname so SNI-routed receivers continue to work. + self.assertEqual(kwargs["server_hostname"], "internal.example") + self.assertEqual(kwargs["cert_reqs"], "CERT_NONE") + + def test_pinned_uses_explicit_port_when_present(self): + prepared = self._make_prepared("http://attacker.example:8080/") + + with patch("nautobot.extras.tasks.urllib3.HTTPConnectionPool") as mock_pool_cls: + mock_pool = mock_pool_cls.return_value + mock_pool.urlopen.return_value.status = 200 + mock_pool.urlopen.return_value.headers = {} + mock_pool.urlopen.return_value.data = b"" + _send_webhook_request_pinned(prepared, validated_ip="203.0.113.5") + + self.assertEqual(mock_pool_cls.call_args.kwargs["port"], 8080) + # Non-default port is included in the Host header. + self.assertEqual(mock_pool.urlopen.call_args.kwargs["headers"]["Host"], "attacker.example:8080")
nautobot/extras/webhooks.py+170 −0 modified@@ -1,10 +1,180 @@ +import logging +import socket +from urllib.parse import urlsplit + +from django.conf import settings +from django.core.exceptions import ValidationError +from django.core.validators import URLValidator from django.utils import timezone +import netaddr from nautobot.extras.choices import ObjectChangeActionChoices from nautobot.extras.models import Webhook from nautobot.extras.registry import registry from nautobot.extras.tasks import process_webhook +logger = logging.getLogger(__name__) + + +def _webhook_addr_is_builtin_blocked(addr): + """Return True if ``addr`` is in a never-legitimate range. Admins cannot disable these via configuration.""" + # is_reserved covers IPv4 0.0.0.0, 240.0.0.0/4 (incl. 255.255.255.255 broadcast), IPv6 :: and the IETF-reserved + # blocks (which include the IPv4-mapped IPv6 ::ffff:0:0/96 range, so an attacker can't bypass via that form). + return addr.is_loopback() or addr.is_link_local() or addr.is_multicast() or addr.is_reserved() + + +def _webhook_additional_blocked_networks(): + return [netaddr.IPNetwork(cidr) for cidr in settings.WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS] + + +def _webhook_host_matches_allow_list(host, allow_list): + """Django ``ALLOWED_HOSTS``-style matching: literal hostname, ``.example.com`` subdomain wildcard, or ``*``.""" + if not host: + return False + host = host.lower().rstrip(".") + for pattern in allow_list or []: + pattern = pattern.lower().rstrip(".") + if pattern == "*": + return True + if pattern.startswith("."): + if host == pattern[1:] or host.endswith(pattern): + return True + elif host == pattern: + return True + return False + + +def _webhook_address_from_host(host): + """ + Return a `netaddr.IPAddress` if `host` is an IP literal, else `None`. + + Does not perform DNS resolution -- DNS lookups are deferred to the Celery worker, since the web server and + the worker may resolve names differently (split-horizon DNS, container DNS, K8s namespaces, etc.). + """ + if host.startswith("[") and host.endswith("]"): + host = host[1:-1] + try: + return netaddr.IPAddress(host) + except (netaddr.AddrFormatError, ValueError): + return None + + +def _webhook_check_address_against_block_lists(host, addr, *, check_additional=True): + """ + Raise `ValidationError` if `addr` falls in the built-in block-list, or (when `check_additional` is True) + in the admin-extended block-list. The built-in block-list is enforced unconditionally and is NOT bypassed + by `WEBHOOK_ALLOWED_HOSTS` -- callers pass `check_additional=False` for allow-listed hosts to skip only the + admin-extended list. + """ + if _webhook_addr_is_builtin_blocked(addr): + logger.warning( + "Webhook URL validation: host %r resolved to %s, which is in a built-in blocked range.", host, addr + ) + raise ValidationError( + f"Webhook URL host {host!r} is not permitted (resolves to a reserved/loopback/link-local address)." + ) + if not check_additional: + return + for network in _webhook_additional_blocked_networks(): + if addr.version != network.version: + continue + if addr in network: + logger.warning( + "Webhook URL validation: host %r resolved to %s, which is in additional blocked network %s.", + host, + addr, + network, + ) + raise ValidationError( + f"Webhook URL host {host!r} is not permitted. " + "Add the host to WEBHOOK_ALLOWED_HOSTS if this target is intentional." + ) + + +def _webhook_validate_scheme_and_extract_host(url): + """Shared scheme + URL syntax check. Returns the URL host.""" + if not url: + raise ValidationError("Webhook URL is required.") + + allowed_schemes = list(settings.WEBHOOK_ALLOWED_SCHEMES) + try: + URLValidator(schemes=allowed_schemes)(url) + except ValidationError as exc: + scheme = urlsplit(url).scheme.lower() + if scheme and scheme not in (s.lower() for s in allowed_schemes): + raise ValidationError( + f"Webhook URL scheme {scheme!r} is not permitted; allowed schemes are: {', '.join(allowed_schemes)}." + ) + raise exc + + host = urlsplit(url).hostname + if not host: + raise ValidationError("Webhook URL must include a host.") + return host + + +def validate_webhook_url_format(url): + """ + Save-time validation: scheme, URL syntax, and built-in block-list check for IP-literal hosts. + + DNS resolution is intentionally NOT performed here because the web server and the Celery worker may have + different DNS views; reliable name-based block-list enforcement happens at request-send time via + `validate_webhook_url`. + + `WEBHOOK_ALLOWED_HOSTS` bypasses only the admin-extended block-list (`WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS`). + The built-in block-list (loopback, link-local, multicast, unspecified, reserved) is enforced unconditionally. + + Raises `django.core.exceptions.ValidationError` on any policy violation. + """ + host = _webhook_validate_scheme_and_extract_host(url) + allow_listed = _webhook_host_matches_allow_list(host, settings.WEBHOOK_ALLOWED_HOSTS) + + addr = _webhook_address_from_host(host) + if addr is not None: + _webhook_check_address_against_block_lists(host, addr, check_additional=not allow_listed) + + +def validate_webhook_url(url): + """ + Send-time validation: everything `validate_webhook_url_format` does, plus DNS resolution and block-list + check on every resolved address. + + Returns the validated IP that the caller should connect to (as a string). For IP-literal URLs the returned + IP is the literal. The caller can use this to pin the outbound connection to the validated IP for + DNS-rebinding mitigation. + + `WEBHOOK_ALLOWED_HOSTS` bypasses only the admin-extended block-list (`WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS`). + The built-in block-list (loopback, link-local, multicast, unspecified, reserved) is enforced unconditionally, + so an allow-listed hostname whose DNS resolves to (e.g.) `169.254.169.254` is still rejected. + + Intended to be called from the Celery worker just before issuing the request, so the DNS view used here + matches the one the request will actually use. + + Raises `django.core.exceptions.ValidationError` on any policy violation. + """ + host = _webhook_validate_scheme_and_extract_host(url) + allow_listed = _webhook_host_matches_allow_list(host, settings.WEBHOOK_ALLOWED_HOSTS) + + addr = _webhook_address_from_host(host) + if addr is not None: + _webhook_check_address_against_block_lists(host, addr, check_additional=not allow_listed) + return str(addr) + + bare_host = host[1:-1] if host.startswith("[") and host.endswith("]") else host + try: + infos = socket.getaddrinfo(bare_host, None) + except socket.gaierror as exc: + logger.warning("Webhook URL validation: DNS resolution failed for host %r: %s", host, exc) + raise ValidationError(f"Unable to resolve webhook host {host!r}.") + + chosen = None + for info in infos: + addr = netaddr.IPAddress(info[4][0]) + _webhook_check_address_against_block_lists(host, addr, check_additional=not allow_listed) + if chosen is None: + chosen = str(addr) + return chosen + def enqueue_webhooks(object_change, snapshots=None, webhook_queryset=None): """
7324c8f0d8c7Merge commit from fork
13 files changed · +708 −144
changes/GHSA-c35q-vxrp-ph26.removed+1 −0 added@@ -0,0 +1 @@ +Removed support for `nautobot-server webhook_receiver` command.
changes/GHSA-c35q-vxrp-ph26.security+5 −0 added@@ -0,0 +1,5 @@ +Added support for `WEBHOOK_ALLOWED_SCHEMES` settings variable. By default new or updated `Webhook` records will be restricted to HTTP or HTTPS only, disallowing other schemes that may have been previously allowed. Administrators should audit existing `Webhook` records to identify any that are invalid, and either update/delete said records or customize `WEBHOOK_ALLOWED_SCHEMES` as appropriate. +Added support for `WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS` settings variable. This can be used to specify additional IP networks that should be denied to `Webhook` sending, for example some deployments may wish to disallow RFC1918 addresses. +Added support for `WEBHOOK_ALLOWED_HOSTS` settings variable. This can be used to provide an allow-list of specific hosts that would otherwise be blocked by any `WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS` configuration. +Added logic to deny loopback, link-local, multicast, unspecified, or reserved IP addresses when defining or executing a `Webhook`. Administrators should audit existing `Webhook` records to identify any that are invalid and delete said records. (GHSA-c35q-vxrp-ph26) +Added various logic to protect `Webhook` definitions against being used as a vector for server-side request forgery (SSRF). (GHSA-c35q-vxrp-ph26)
nautobot/core/settings.py+25 −0 modified@@ -317,6 +317,31 @@ # Pseudo-random number generator seed, for reproducibility of test results. TEST_FACTORY_SEED = os.getenv("NAUTOBOT_TEST_FACTORY_SEED", None) +# URL schemes that Webhooks are permitted to target. Defaults to HTTP and HTTPS only. +WEBHOOK_ALLOWED_SCHEMES = ["http", "https"] +if "NAUTOBOT_WEBHOOK_ALLOWED_SCHEMES" in os.environ and os.environ["NAUTOBOT_WEBHOOK_ALLOWED_SCHEMES"] != "": + WEBHOOK_ALLOWED_SCHEMES = os.environ["NAUTOBOT_WEBHOOK_ALLOWED_SCHEMES"].split(_CONFIG_SETTING_SEPARATOR) + +# Hostnames that Webhooks may target even if they resolve into WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS. +# Django ALLOWED_HOSTS-style: literal hostnames or `.example.com` to match a domain and all of its subdomains. +# Use `*` to disable network filtering entirely (not recommended). +WEBHOOK_ALLOWED_HOSTS = [] +if "NAUTOBOT_WEBHOOK_ALLOWED_HOSTS" in os.environ and os.environ["NAUTOBOT_WEBHOOK_ALLOWED_HOSTS"] != "": + WEBHOOK_ALLOWED_HOSTS = os.environ["NAUTOBOT_WEBHOOK_ALLOWED_HOSTS"].split(_CONFIG_SETTING_SEPARATOR) + +# Network ranges (CIDR strings) added to the built-in Webhook block-list. The built-in block-list (loopback, link-local +# including cloud metadata endpoints such as 169.254.169.254, unspecified, multicast, reserved) is enforced +# unconditionally and cannot be disabled. Use this setting to extend it -- e.g. ["10.0.0.0/8", "192.168.0.0/16"] to +# block RFC1918 ranges as well, for deployments that don't legitimately need to webhook into private networks. +WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS = [] +if ( + "NAUTOBOT_WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS" in os.environ + and os.environ["NAUTOBOT_WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS"] != "" +): + WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS = os.environ["NAUTOBOT_WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS"].split( + _CONFIG_SETTING_SEPARATOR + ) + # # Django Prometheus #
nautobot/core/settings.yaml+65 −0 modified@@ -2055,4 +2055,69 @@ properties: The function must return only one argument: a string of the truncated device display name. version_added: "1.4.0" + WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS: + default: [] + description: >- + A list of network ranges (CIDR strings) added to the built-in Webhook block-list. The built-in block-list + (loopback, link-local including cloud metadata endpoints such as `169.254.169.254`, unspecified, multicast, + and reserved) is enforced unconditionally and cannot be disabled. Use this setting to extend it. + details: |- + For deployments that don't legitimately need to webhook into RFC1918 private networks, set this to + `["10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16"]` (and the equivalent IPv6 ULA `fc00::/7`) for stricter + posture. Hosts listed in [`WEBHOOK_ALLOWED_HOSTS`](#webhook_allowed_hosts) bypass *this* admin-extended + list (but not the built-in list). + + The block-list is enforced both when a Webhook is saved (for IP-literal hosts) and when the Celery worker + sends the request (after DNS resolution). + environment_variable: "NAUTOBOT_WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS" + items: + type: "string" + see_also: + "`WEBHOOK_ALLOWED_HOSTS`": "#webhook_allowed_hosts" + "`WEBHOOK_ALLOWED_SCHEMES`": "#webhook_allowed_schemes" + type: "array" + version_added: "3.1.2" + WEBHOOK_ALLOWED_HOSTS: + default: [] + description: >- + Hostnames that Webhooks may target even if they would otherwise be blocked by the + [`WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS`](#webhook_additional_blocked_networks) admin-extended block-list. + Django `ALLOWED_HOSTS`-style: literal hostnames or `.example.com` to match a domain and all of its + subdomains. The wildcard `*` matches every hostname. + details: |- + The built-in block-list (loopback, link-local including cloud metadata endpoints such as + `169.254.169.254`, unspecified, multicast, reserved) is enforced unconditionally and is **not** bypassed + by this setting -- including the wildcard `*`. An allow-listed hostname whose DNS resolves into the + built-in block-list is still rejected at send time. + + Typical use is to allow specific internal targets while leaving the rest of the network filtered. For + example, to permit a single internal monitoring host: + + ```python + WEBHOOK_ALLOWED_HOSTS = ["monitoring.internal.example.com"] + ``` + environment_variable: "NAUTOBOT_WEBHOOK_ALLOWED_HOSTS" + items: + type: "string" + see_also: + "`WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS`": "#webhook_additional_blocked_networks" + "`WEBHOOK_ALLOWED_SCHEMES`": "#webhook_allowed_schemes" + "Django documentation for `ALLOWED_HOSTS` (matching syntax)": "https://docs.djangoproject.com/en/stable/ref/settings/#allowed-hosts" + type: "array" + version_added: "3.1.2" + WEBHOOK_ALLOWED_SCHEMES: + default: + - "http" + - "https" + description: >- + URL schemes that Webhooks are permitted to target. Defaults to HTTP and HTTPS only; other schemes + (e.g. `file:`, `ftp:`, `gopher:`) are rejected at Webhook save time as a baseline SSRF protection. + environment_variable: "NAUTOBOT_WEBHOOK_ALLOWED_SCHEMES" + items: + type: "string" + see_also: + "`WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS`": "#webhook_additional_blocked_networks" + "`WEBHOOK_ALLOWED_HOSTS`": "#webhook_allowed_hosts" + type: "array" + version_added: "3.1.2" ...
nautobot/core/templates/nautobot_config.py.j2+17 −0 modified@@ -624,6 +624,23 @@ INSTALLATION_METRICS_ENABLED = is_truthy(os.getenv("NAUTOBOT_INSTALLATION_METRIC # """ # return str(device_display_name).split(".")[0] +# Webhook URL schemes permitted for the `payload_url` field. Defaults to HTTP and HTTPS. +# +# WEBHOOK_ALLOWED_SCHEMES = ["http", "https"] + +# Hostnames that Webhooks may target even if they would otherwise resolve into a blocked network range. Django +# `ALLOWED_HOSTS`-style: literal hostnames or `.example.com` to match a domain and all of its subdomains. Use `*` +# to disable network filtering entirely (not recommended). +# +# WEBHOOK_ALLOWED_HOSTS = [] + +# Network ranges (CIDR strings) added to the built-in Webhook block-list. The built-in block-list (loopback, +# link-local including cloud metadata endpoints such as 169.254.169.254, unspecified, multicast, and reserved) +# is enforced unconditionally and cannot be disabled. Use this setting to extend it -- e.g. add the RFC1918 +# private ranges below for stricter posture in deployments that don't legitimately webhook into private networks. +# +# WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS = ["10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "fc00::/7"] + # A list of strings designating all applications that are enabled in this Django installation. # Each string should be a dotted Python path to an application configuration class (preferred), # or a package containing an application.
nautobot/docs/user-guide/administration/tools/nautobot-server.md+0 −24 modified@@ -876,27 +876,3 @@ Nautobot version: 2.2.0a1 Django version: 3.2.24 Configuration file: /opt/nautobot/nautobot_config.py ``` - -### `webhook_receiver` - -`nautobot-server webhook_receiver` - -Start a simple listener to display received HTTP requests. - -`--port PORT` -Optional port number (default: `9000`) - -`--no-headers` -Hide HTTP request headers. - -```no-highlight -nautobot-server webhook_receiver --port 9001 --no-headers -``` - -Example output: - -```no-highlight -Listening on port http://localhost:9000. Stop with CONTROL-C. -``` - -Please see the guide on [Troubleshooting Webhooks](../../platform-functionality/webhook.md#troubleshooting-webhooks) for more information.
nautobot/docs/user-guide/platform-functionality/webhook.md+11 −29 modified@@ -110,39 +110,21 @@ A webhook request is considered successful if the receiver responds with a `2XX` ## Troubleshooting Webhooks -To inspect outgoing webhooks, you can use a local HTTP listener. Nautobot provides a built-in webhook receiver that logs incoming requests: +You can test webhooks with external services like [Beeceptor](https://beeceptor.com/) or [Pipedream RequestBin](https://pipedream.com/requestbin). These tools let you inspect webhook payloads and troubleshoot integration issues. -1. Set the webhook URL to `http://localhost:9000/`. -2. Start the webhook receiver: - - ```sh - nautobot-server webhook_receiver - ``` - - Example output: - - ```sh - Listening on port http://localhost:9000. Stop with CONTROL-C. - ``` - -3. Send a test request: +If a webhook does not trigger as expected, ensure that the **Celery worker** process is running and check the Nautobot logs for errors. - ```sh - curl -X POST http://localhost:9000 --data '{"foo": "bar"}' - ``` +## Webhook Administration and Security - The listener will output: +Because configured webhooks are sent automatically to another system, and because the data sent by a webhook contains information about the data stored in Nautobot, it's often necessary to restrict what users may configure webhooks, and what systems the webhooks may be sent to. The former is achieved by only granting webhook `add` and `change` [permissions](users/objectpermission.md) to the users that have a legitimate need to manage webhooks, while the later is a bit more involved. - ```sh - [1] Tue, 07 Apr 2020 17:44:02 GMT 127.0.0.1 "POST / HTTP/1.1" 200 - - Host: localhost:9000 - Content-Type: application/json - Content-Length: 14 +To provide a baseline of security, Nautobot automatically disallows the configuration of webhooks that point to certain classes of IP addresses (link-local, loopback, multicast, and reserved addresses), and at webhook transmission time, webhooks configured to send to a hostname that resolves to such an IP is also automatically blocked. This restriction is built-in to Nautobot and cannot be disabled. - {"foo": "bar"} - ``` +An administrator can further restrict the range of IPs and hosts that webhooks can be sent to by configuring [`WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS`](../administration/configuration/settings.md#webhook_additional_blocked_networks) and/or [`WEBHOOK_ALLOWED_HOSTS`](../administration/configuration/settings.md#webhook_allowed_hosts) in `nautobot_config.py`. The former defines additional IP networks that should be blocked, and the latter defines an allow-list of hosts or domains that are explicitly permitted **despite falling within the `ADDITIONAL_BLOCKED_NETWORKS` networks**. For example, to disallow webhooks to **all** hosts except for those within `example.com`, you could configure: -> **Alternative Testing Tools:** -> Instead of using the built-in webhook receiver, you can test webhooks with external services like [Beeceptor](https://beeceptor.com/) or [Pipedream RequestBin](https://pipedream.com/requestbin). These tools let you inspect webhook payloads and troubleshoot integration issues. +```python +WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS = ["0.0.0.0/0", "::/0"] +WEBHOOK_ALLOWED_HOSTS = [".example.com"] +``` -If a webhook does not trigger as expected, ensure that the **Celery worker** process is running and check the Nautobot logs for errors. +A third administratively-definable setting is [`WEBHOOK_ALLOWED_SCHEMES`](../administration/configuration/settings.md#webhook_allowed_schemes). This defaults to `["http", "https"]` but can be configured if desired, for example to disallow HTTP-only webhooks or to allow specific additional protocols.
nautobot/extras/management/commands/webhook_receiver.py+0 −82 removed@@ -1,82 +0,0 @@ -from http.server import BaseHTTPRequestHandler, HTTPServer -import sys - -from django.core.management.base import BaseCommand - -request_counter = 1 - - -class WebhookHandler(BaseHTTPRequestHandler): - show_headers = True - - def __getattr__(self, item): - # Return the same method for any type of HTTP request (GET, POST, etc.) - if item.startswith("do_"): - return self.do_ANY - - raise AttributeError - - def log_message(self, format_str, *args): # pylint: disable=arguments-differ - global request_counter - - print(f"[{request_counter}] {self.date_time_string()} {self.address_string()} {format_str % args}") - - def do_ANY(self): - global request_counter - - # Send a 200 response regardless of the request content - self.send_response(200) - self.end_headers() - self.wfile.write(b"Webhook received!\n") - - request_counter += 1 - - # Print the request headers to stdout - if self.show_headers: - for k, v in self.headers.items(): - print(f"{k}: {v}") - print() - - # Print the request body (if any) - content_length = self.headers.get("Content-Length") - if content_length is not None: - body = self.rfile.read(int(content_length)) - print(body.decode("utf-8")) - else: - print("(No body)") - - print("------------") - - -class Command(BaseCommand): - help = "Start a simple listener to display received HTTP requests" - - default_port = 9000 - - def add_arguments(self, parser): - parser.add_argument( - "--port", - type=int, - default=self.default_port, - help=f"Optional port number (default: {self.default_port})", - ) - parser.add_argument( - "--no-headers", - action="store_true", - dest="no_headers", - help="Hide HTTP request headers", - ) - - def handle(self, *args, **options): - port = options["port"] - quit_command = "CTRL-BREAK" if sys.platform == "win32" else "CONTROL-C" - - WebhookHandler.show_headers = not options["no_headers"] - - self.stdout.write(f"Listening on port http://localhost:{port}. Stop with {quit_command}.") - httpd = HTTPServer(("localhost", port), WebhookHandler) - - try: - httpd.serve_forever() - except KeyboardInterrupt: - self.stdout.write("\nExiting...")
nautobot/extras/models/models.py+11 −0 modified@@ -1067,6 +1067,17 @@ def clean(self): {"ca_file_path": "Do not specify a CA certificate file if SSL verification is disabled."} ) + # Validate payload_url against SSRF policy (scheme allow-list, host allow-list, IP-literal block-list). + # DNS-based block-list enforcement happens at request-send time in process_webhook, since the web server + # and worker may have different DNS views. + # Imported here to avoid a circular import: nautobot.extras.webhooks imports the Webhook model. + from nautobot.extras.webhooks import validate_webhook_url_format + + try: + validate_webhook_url_format(self.payload_url) + except ValidationError as exc: + raise ValidationError({"payload_url": exc.messages}) + def render_headers(self, context): """ Render additional_headers and return a dict of Header: Value pairs.
nautobot/extras/tasks.py+79 −6 modified@@ -1,8 +1,11 @@ from logging import getLogger +from urllib.parse import urlsplit from django.conf import settings +from django.core.exceptions import ValidationError from jinja2.exceptions import TemplateError import requests +import urllib3 from nautobot.core.celery import nautobot_task from nautobot.extras.choices import ObjectChangeActionChoices @@ -13,15 +16,70 @@ logger = getLogger("nautobot.extras.tasks") +def _send_webhook_request_pinned(prepared_request, validated_ip): + """ + Send `prepared_request` by connecting to `validated_ip` directly, while preserving the URL's hostname for + the `Host` header (and TLS SNI for HTTPS). Used only when DNS rebinding is a real risk -- i.e. plaintext + HTTP, or HTTPS with verification disabled. HTTPS-with-verification is naturally protected by certificate + validation and uses the regular `requests` path. + """ + parts = urlsplit(prepared_request.url) + hostname = parts.hostname + port = parts.port or (443 if parts.scheme == "https" else 80) + + pool_kwargs = {"timeout": urllib3.Timeout(connect=10, read=30)} + if parts.scheme == "https": + # Reached only when ssl_verification is False (HTTPS+verify is handled by the requests path). Skip cert + # validation to match the existing semantic of webhook.ssl_verification=False, but keep server_hostname + # set to the original hostname so SNI-routed receivers still work. + pool_kwargs.update(cert_reqs="CERT_NONE", server_hostname=hostname) + pool = urllib3.HTTPSConnectionPool(host=validated_ip, port=port, **pool_kwargs) + else: + pool = urllib3.HTTPConnectionPool(host=validated_ip, port=port, **pool_kwargs) + + target = parts.path or "/" + if parts.query: + target = f"{target}?{parts.query}" + + headers = dict(prepared_request.headers) + headers["Host"] = hostname if port in (80, 443) else f"{hostname}:{port}" + + raw = pool.urlopen( + method=prepared_request.method, + url=target, + body=prepared_request.body, + headers=headers, + redirect=False, + retries=False, + ) + response = requests.Response() + response.status_code = raw.status + response.headers.update(raw.headers) + response._content = raw.data + response.url = prepared_request.url + response.request = prepared_request + return response + + @nautobot_task def process_webhook(webhook_pk, data, model_name, event, timestamp, username, request_id, snapshots): """ Make a POST request to the defined Webhook """ from nautobot.extras.models import Webhook # avoiding circular import + from nautobot.extras.webhooks import validate_webhook_url # avoiding circular import webhook = Webhook.objects.get(pk=webhook_pk) + # SSRF defense-in-depth: re-validate the URL here (with DNS resolution) since the worker is the entity opening + # the outbound connection, and its DNS view may differ from the web server's. Catches any rows that predate + # the SSRF policy. Returns the resolved IP for use below in DNS-rebinding mitigation. + try: + validated_ip = validate_webhook_url(webhook.payload_url) + except ValidationError as e: + logger.error("Webhook %s payload URL %r blocked by SSRF policy: %s", webhook, webhook.payload_url, e) + raise + context = { "event": dict(ObjectChangeActionChoices)[event].lower(), "timestamp": timestamp, @@ -68,12 +126,27 @@ def process_webhook(webhook_pk, data, model_name, event, timestamp, username, re if webhook.secret != "": prepared_request.headers["X-Hook-Signature"] = generate_signature(prepared_request.body, webhook.secret) - # Send the request - with requests.Session() as session: - session.verify = webhook.ssl_verification - if webhook.ca_file_path: - session.verify = webhook.ca_file_path - response = session.send(prepared_request, proxies=settings.HTTP_PROXIES) + # Send the request. Redirects are never followed: the SSRF block-list only validates the initial URL, so + # allowing redirects would let an attacker-controlled public host 30x the request to a loopback or + # cloud-metadata target. + # + # DNS-rebinding mitigation: pin the connection to the IP we validated, instead of letting urllib3 re-resolve + # the hostname. We only do this for paths where TLS doesn't already provide the protection (plain HTTP, and + # HTTPS with verification disabled). HTTPS-with-verification is naturally rebinding-safe because the rebound + # target's certificate won't have a SAN matching the original hostname. Pinning is also skipped when an HTTP + # proxy is configured: the proxy resolves DNS itself and is an admin-controlled trust boundary. + parts_scheme = urlsplit(webhook.payload_url).scheme.lower() + can_pin = not settings.HTTP_PROXIES and ( + parts_scheme == "http" or (parts_scheme == "https" and not webhook.ssl_verification) + ) + if can_pin: + response = _send_webhook_request_pinned(prepared_request, validated_ip) + else: + with requests.Session() as session: + session.verify = webhook.ssl_verification + if webhook.ca_file_path: + session.verify = webhook.ca_file_path + response = session.send(prepared_request, proxies=settings.HTTP_PROXIES, allow_redirects=False) if response.ok: logger.info("Request succeeded; response status %s", response.status_code)
nautobot/extras/tests/test_models.py+18 −0 modified@@ -4284,3 +4284,21 @@ def test_type_error_not_raised_when_calling_check_for_conflicts(self): conflicts["type_create"], [f"A webhook already exists for create on DCIM | device to URL {self.url}"], ) + + def test_clean_payload_url_validation(self): + """`Webhook.clean()` surfaces SSRF policy violations against the `payload_url` field.""" + cases = [ + ("bad scheme is rejected", "ftp://example.com/", False), + ("loopback ip literal is rejected", "http://127.0.0.1/", False), + ("link-local ip literal is rejected", "http://169.254.169.254/", False), + ("public url is accepted", "https://example.com/hooks/abc", True), + ] + for desc, payload_url, should_pass in cases: + with self.subTest(desc): + webhook = Webhook(name="test-webhook", type_create=True, payload_url=payload_url) + if should_pass: + webhook.clean() + else: + with self.assertRaises(ValidationError) as ctx: + webhook.clean() + self.assertIn("payload_url", ctx.exception.message_dict)
nautobot/extras/tests/test_webhooks.py+306 −3 modified@@ -6,12 +6,15 @@ from django.apps import apps from django.contrib.auth import get_user_model from django.contrib.contenttypes.models import ContentType +from django.core.exceptions import ValidationError +from django.test import override_settings from django.utils import timezone +import requests from requests import Session from nautobot.core.api.exceptions import SerializerNotFound from nautobot.core.api.utils import get_serializer_for_model -from nautobot.core.testing import APITestCase +from nautobot.core.testing import APITestCase, TestCase from nautobot.core.utils.lookup import get_changes_for_model from nautobot.dcim.api.serializers import LocationSerializer from nautobot.dcim.models import Location, LocationType @@ -20,8 +23,9 @@ from nautobot.extras.models import Tag, Webhook from nautobot.extras.models.statuses import Status from nautobot.extras.registry import registry -from nautobot.extras.tasks import process_webhook +from nautobot.extras.tasks import _send_webhook_request_pinned, process_webhook from nautobot.extras.utils import generate_signature +from nautobot.extras.webhooks import validate_webhook_url, validate_webhook_url_format User = get_user_model() @@ -30,7 +34,10 @@ class WebhookTest(APITestCase): @classmethod def setUpTestData(cls): location_ct = ContentType.objects.get_for_model(Location) - MOCK_URL = "http://localhost/" + # https://8.8.8.8/ skips DNS resolution (IP literal), isn't in any block-list, and uses HTTPS with + # verification (the default), so the worker takes the requests.Session.send path that these tests mock. + # No actual network traffic occurs because Session.send is patched. + MOCK_URL = "https://8.8.8.8/" MOCK_SECRET = "LOOKATMEIMASECRETSTRING" # noqa: S105 # hardcoded-password-string -- OK as this is test code webhooks = ( @@ -275,6 +282,100 @@ class FakeResponse: def test_webhook_render_body_with_utf8(self): self.assertEqual(Webhook().render_body({"utf8": "I am UTF-8! 😀"}), '{"utf8": "I am UTF-8! 😀"}') + def test_process_webhook_does_not_follow_redirects(self): + """SSRF defense: redirects must not be followed, since the SSRF block-list only validates the initial URL.""" + request_id = uuid.uuid4() + webhook = Webhook.objects.get(type_create=True) + timestamp = str(timezone.now()) + + captured_kwargs = {} + + def mock_send(_, request, **kwargs): + captured_kwargs.update(kwargs) + + class FakeResponse: + ok = True + status_code = 200 + + return FakeResponse() + + with patch.object(Session, "send", mock_send): + with web_request_context(self.user, change_id=request_id): + location_type = LocationType.objects.get(name="Campus") + location = Location(name="Location 1", location_type=location_type, status=self.statuses[0]) + location.save() + + serializer = LocationSerializer(location, context={"request": None}) + oc = get_changes_for_model(location).first() + snapshots = oc.get_snapshots() + + process_webhook( + webhook.pk, + serializer.data, + Location._meta.model_name, + ObjectChangeActionChoices.ACTION_CREATE, + timestamp, + self.user.username, + request_id, + snapshots, + ) + + self.assertEqual(captured_kwargs.get("allow_redirects"), False) + + def _process_webhook(self, webhook): + """Drive `process_webhook` end-to-end with a freshly-saved Location object change.""" + request_id = uuid.uuid4() + timestamp = str(timezone.now()) + with web_request_context(self.user, change_id=request_id): + location_type = LocationType.objects.get(name="Campus") + location = Location(name="Location SSRF", location_type=location_type, status=self.statuses[0]) + location.save() + serializer = LocationSerializer(location, context={"request": None}) + oc = get_changes_for_model(location).first() + snapshots = oc.get_snapshots() + process_webhook( + webhook.pk, + serializer.data, + Location._meta.model_name, + ObjectChangeActionChoices.ACTION_CREATE, + timestamp, + self.user.username, + request_id, + snapshots, + ) + + def test_process_webhook_uses_pinning_for_http(self): + """HTTP webhooks go through the pinned-IP helper rather than `requests.Session.send`.""" + webhook = Webhook.objects.get(type_create=True) + webhook.payload_url = "http://target.example/" + webhook.save() + + with ( + patch("nautobot.extras.tasks._send_webhook_request_pinned") as mock_pinned, + patch("nautobot.extras.webhooks.socket.getaddrinfo") as mock_resolve, + ): + mock_resolve.return_value = [(2, 1, 6, "", ("8.8.8.8", 0))] + mock_pinned.return_value.ok = True + mock_pinned.return_value.status_code = 200 + self._process_webhook(webhook) + + # Second positional arg is the validated IP that came back from validate_webhook_url. The webhook fires + # both via Location.save() (auto-enqueue) and via the explicit process_webhook call -- we only care that + # the pinned helper handled it (not the requests.Session.send path). + mock_pinned.assert_called() + self.assertEqual(mock_pinned.call_args.args[1], "8.8.8.8") + + def test_process_webhook_raises_on_ssrf_blocked_url(self): + """Worker re-validates: a webhook whose hostname resolves to a blocked IP at send time is rejected.""" + webhook = Webhook.objects.get(type_create=True) + webhook.payload_url = "http://rebound.example/" + webhook.save() + + with patch("nautobot.extras.webhooks.socket.getaddrinfo") as mock_resolve: + mock_resolve.return_value = [(2, 1, 6, "", ("127.0.0.1", 0))] + with self.assertRaises(ValidationError): + self._process_webhook(webhook) + @patch("nautobot.extras.tasks.process_webhook.apply_async") def test_enqueue_webhooks(self, mock_async): request_id = uuid.uuid4() @@ -359,3 +460,205 @@ def test_all_webhook_supported_models(self): model_class = apps.get_model(app_label, model_name) get_serializer_for_model(model_class) self.assertTrue(hasattr(model_class, "to_objectchange")) + + +class WebhookURLFormatValidatorTest(TestCase): + """Tests for `validate_webhook_url_format` -- save-time validation, no DNS resolution.""" + + def test_validate_format_default_policy(self): + # (description, url, should_pass) + cases = [ + # Empty / malformed + ("empty url is rejected", "", False), + ("invalid syntax is rejected", "not a url", False), + # Scheme allow-list (default = http, https) + ("file scheme is rejected", "file:///etc/passwd", False), + ("ftp scheme is rejected", "ftp://example.com/", False), + ("gopher scheme is rejected", "gopher://example.com/", False), + ("javascript scheme is rejected", "javascript:alert(1)", False), + ("http is accepted", "http://example.com/", True), + ("https is accepted", "https://example.com/", True), + # Built-in block-list (IP literals) + ("loopback v4 is rejected", "http://127.0.0.1/", False), + ("loopback v4 wider range is rejected", "http://127.5.6.7/", False), + ("loopback v6 is rejected", "http://[::1]/", False), + ("link-local v4 (cloud metadata) is rejected", "http://169.254.169.254/latest/meta-data/", False), + ("link-local v6 is rejected", "http://[fe80::1]/", False), + ("multicast v4 is rejected", "http://224.0.0.1/", False), + ("broadcast is rejected", "http://255.255.255.255/", False), + ("unspecified v4 is rejected", "http://0.0.0.0/", False), + ("class-E reserved is rejected", "http://240.0.0.1/", False), + # Public IP literals + ("public v4 is accepted", "http://8.8.8.8/", True), + ("public v6 is accepted", "http://[2001:4860:4860::8888]/", True), + # RFC1918 private ranges -- intentionally NOT blocked by default + ("RFC1918 10/8 is accepted by default", "http://10.0.0.1/", True), + ("RFC1918 192.168/16 is accepted by default", "http://192.168.1.1/", True), + ("RFC1918 172.16/12 is accepted by default", "http://172.16.0.1/", True), + # Hostnames -- DNS deferred to worker, so any syntactically-valid hostname passes save-time + ("hostname passes without DNS lookup", "http://internal-nonresolvable-host.example/", True), + ] + for desc, url, should_pass in cases: + with self.subTest(desc): + if should_pass: + validate_webhook_url_format(url) + else: + with self.assertRaises(ValidationError): + validate_webhook_url_format(url) + + @override_settings(WEBHOOK_ALLOWED_SCHEMES=["ftp"]) + def test_allowed_schemes_setting_overrides_default(self): + validate_webhook_url_format("ftp://example.com/") + with self.assertRaises(ValidationError): + validate_webhook_url_format("http://example.com/") + + @override_settings( + WEBHOOK_ALLOWED_HOSTS=["10.20.30.40"], + WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS=["10.0.0.0/8"], + ) + def test_allow_list_literal_bypasses_admin_extended_block_list(self): + # Allow-list bypasses WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS for the matching host. + validate_webhook_url_format("http://10.20.30.40/") + # Other hosts in the same admin-extended range are still blocked. + with self.assertRaises(ValidationError): + validate_webhook_url_format("http://10.99.99.99/") + + @override_settings(WEBHOOK_ALLOWED_HOSTS=[".internal.example.com"]) + def test_allow_list_subdomain_wildcard(self): + validate_webhook_url_format("http://api.internal.example.com/") + validate_webhook_url_format("http://internal.example.com/") + + @override_settings(WEBHOOK_ALLOWED_HOSTS=["127.0.0.1", "169.254.169.254", "*"]) + def test_allow_list_does_not_bypass_built_in_block_list(self): + # Built-in block-list is enforced unconditionally; allow-listing (including wildcard) cannot expose + # loopback / cloud-metadata IP literals at save time. + with self.assertRaises(ValidationError): + validate_webhook_url_format("http://127.0.0.1/") + with self.assertRaises(ValidationError): + validate_webhook_url_format("http://169.254.169.254/") + + @override_settings(WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS=["10.0.0.0/8", "fc00::/7"]) + def test_additional_blocked_networks_extends_built_in(self): + with self.assertRaises(ValidationError): + validate_webhook_url_format("http://10.20.30.40/") + # IPv6 ULA (fc00::/7) -- exercises the cross-family skip path: an IPv4 addr is matched only against + # IPv4 networks in WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS, and vice versa. + with self.assertRaises(ValidationError): + validate_webhook_url_format("http://[fd00::1]/") + validate_webhook_url_format("http://192.168.1.1/") + validate_webhook_url_format("http://[2001:4860:4860::8888]/") + + +class WebhookURLSendTimeValidatorTest(TestCase): + """Tests for `validate_webhook_url` -- send-time validation, includes DNS resolution.""" + + def test_resolves_and_rejects_blocked_hostname(self): + # Mock getaddrinfo to return a loopback address for an arbitrary hostname. + with patch("nautobot.extras.webhooks.socket.getaddrinfo") as mock_resolve: + mock_resolve.return_value = [(2, 1, 6, "", ("127.0.0.1", 0))] + with self.assertRaises(ValidationError): + validate_webhook_url("http://attacker-controlled.example/") + + def test_resolves_and_accepts_public_hostname_returns_chosen_ip(self): + with patch("nautobot.extras.webhooks.socket.getaddrinfo") as mock_resolve: + mock_resolve.return_value = [(2, 1, 6, "", ("8.8.8.8", 0))] + self.assertEqual(validate_webhook_url("http://example.com/"), "8.8.8.8") + + def test_ip_literal_returns_literal(self): + self.assertEqual(validate_webhook_url("http://8.8.8.8/"), "8.8.8.8") + + @override_settings( + WEBHOOK_ALLOWED_HOSTS=["allowed.example"], + WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS=["8.0.0.0/8"], + ) + def test_allow_list_bypasses_admin_extended_at_send_time(self): + # Allow-listed host whose DNS lands inside the admin-extended block-list passes (the allow-list overrides + # the admin extension), and the validator returns the resolved IP for connection pinning. + with patch("nautobot.extras.webhooks.socket.getaddrinfo") as mock_resolve: + mock_resolve.return_value = [(2, 1, 6, "", ("8.8.8.8", 0))] + self.assertEqual(validate_webhook_url("http://allowed.example/"), "8.8.8.8") + + @override_settings(WEBHOOK_ALLOWED_HOSTS=["allowed.example", "*"]) + def test_allow_list_does_not_bypass_built_in_at_send_time(self): + # Allow-list (and wildcard) cannot expose the built-in block-list at send time -- a hostname whose DNS + # resolves to (e.g.) loopback is still rejected. This is the rebinding-into-metadata scenario. + with patch("nautobot.extras.webhooks.socket.getaddrinfo") as mock_resolve: + mock_resolve.return_value = [(2, 1, 6, "", ("169.254.169.254", 0))] + with self.assertRaises(ValidationError): + validate_webhook_url("http://allowed.example/") + + def test_unresolvable_host_raises(self): + import socket + + with patch("nautobot.extras.webhooks.socket.getaddrinfo") as mock_resolve: + mock_resolve.side_effect = socket.gaierror("nodename nor servname provided") + with self.assertRaises(ValidationError): + validate_webhook_url("http://nonexistent.example/") + + def test_rejects_when_any_resolved_address_is_blocked(self): + # If a hostname resolves to *any* blocked address (DNS round-robin, dual-stack with a blocked v6, etc.), + # we reject. This protects against partial-match bypasses. + with patch("nautobot.extras.webhooks.socket.getaddrinfo") as mock_resolve: + mock_resolve.return_value = [ + (2, 1, 6, "", ("8.8.8.8", 0)), # public + (2, 1, 6, "", ("127.0.0.1", 0)), # loopback -- should cause rejection + ] + with self.assertRaises(ValidationError): + validate_webhook_url("http://mixed.example/") + + +class WebhookSendPinningTest(TestCase): + """Tests for `_send_webhook_request_pinned` -- DNS-rebinding mitigation for HTTP and unverified HTTPS.""" + + def _make_prepared(self, url, method="POST", body=b'{"ok": true}'): + return requests.Request(method=method, url=url, data=body).prepare() + + def test_pinned_http_connects_to_validated_ip_with_original_host_header(self): + prepared = self._make_prepared("http://attacker.example/path?x=1") + + with patch("nautobot.extras.tasks.urllib3.HTTPConnectionPool") as mock_pool_cls: + mock_pool = mock_pool_cls.return_value + mock_pool.urlopen.return_value.status = 200 + mock_pool.urlopen.return_value.headers = {} + mock_pool.urlopen.return_value.data = b"" + _send_webhook_request_pinned(prepared, validated_ip="203.0.113.5") + + # Pool was constructed against the validated IP, not the hostname. + self.assertEqual(mock_pool_cls.call_args.kwargs["host"], "203.0.113.5") + self.assertEqual(mock_pool_cls.call_args.kwargs["port"], 80) + # Host header preserved as the original hostname so vhost-routed receivers still match. + urlopen_kwargs = mock_pool.urlopen.call_args.kwargs + self.assertEqual(urlopen_kwargs["headers"]["Host"], "attacker.example") + self.assertEqual(urlopen_kwargs["url"], "/path?x=1") + self.assertFalse(urlopen_kwargs["redirect"]) + + def test_pinned_https_unverified_sets_sni_to_hostname_and_disables_cert_check(self): + prepared = self._make_prepared("https://internal.example/") + + with patch("nautobot.extras.tasks.urllib3.HTTPSConnectionPool") as mock_pool_cls: + mock_pool = mock_pool_cls.return_value + mock_pool.urlopen.return_value.status = 200 + mock_pool.urlopen.return_value.headers = {} + mock_pool.urlopen.return_value.data = b"" + _send_webhook_request_pinned(prepared, validated_ip="10.20.30.40") + + kwargs = mock_pool_cls.call_args.kwargs + self.assertEqual(kwargs["host"], "10.20.30.40") + self.assertEqual(kwargs["port"], 443) + # SNI/TLS hostname is the original hostname so SNI-routed receivers continue to work. + self.assertEqual(kwargs["server_hostname"], "internal.example") + self.assertEqual(kwargs["cert_reqs"], "CERT_NONE") + + def test_pinned_uses_explicit_port_when_present(self): + prepared = self._make_prepared("http://attacker.example:8080/") + + with patch("nautobot.extras.tasks.urllib3.HTTPConnectionPool") as mock_pool_cls: + mock_pool = mock_pool_cls.return_value + mock_pool.urlopen.return_value.status = 200 + mock_pool.urlopen.return_value.headers = {} + mock_pool.urlopen.return_value.data = b"" + _send_webhook_request_pinned(prepared, validated_ip="203.0.113.5") + + self.assertEqual(mock_pool_cls.call_args.kwargs["port"], 8080) + # Non-default port is included in the Host header. + self.assertEqual(mock_pool.urlopen.call_args.kwargs["headers"]["Host"], "attacker.example:8080")
nautobot/extras/webhooks.py+170 −0 modified@@ -1,10 +1,180 @@ +import logging +import socket +from urllib.parse import urlsplit + +from django.conf import settings +from django.core.exceptions import ValidationError +from django.core.validators import URLValidator from django.utils import timezone +import netaddr from nautobot.extras.choices import ObjectChangeActionChoices from nautobot.extras.models import Webhook from nautobot.extras.registry import registry from nautobot.extras.tasks import process_webhook +logger = logging.getLogger(__name__) + + +def _webhook_addr_is_builtin_blocked(addr): + """Return True if ``addr`` is in a never-legitimate range. Admins cannot disable these via configuration.""" + # is_reserved covers IPv4 0.0.0.0, 240.0.0.0/4 (incl. 255.255.255.255 broadcast), IPv6 :: and the IETF-reserved + # blocks (which include the IPv4-mapped IPv6 ::ffff:0:0/96 range, so an attacker can't bypass via that form). + return addr.is_loopback() or addr.is_link_local() or addr.is_multicast() or addr.is_reserved() + + +def _webhook_additional_blocked_networks(): + return [netaddr.IPNetwork(cidr) for cidr in settings.WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS] + + +def _webhook_host_matches_allow_list(host, allow_list): + """Django ``ALLOWED_HOSTS``-style matching: literal hostname, ``.example.com`` subdomain wildcard, or ``*``.""" + if not host: + return False + host = host.lower().rstrip(".") + for pattern in allow_list or []: + pattern = pattern.lower().rstrip(".") + if pattern == "*": + return True + if pattern.startswith("."): + if host == pattern[1:] or host.endswith(pattern): + return True + elif host == pattern: + return True + return False + + +def _webhook_address_from_host(host): + """ + Return a `netaddr.IPAddress` if `host` is an IP literal, else `None`. + + Does not perform DNS resolution -- DNS lookups are deferred to the Celery worker, since the web server and + the worker may resolve names differently (split-horizon DNS, container DNS, K8s namespaces, etc.). + """ + if host.startswith("[") and host.endswith("]"): + host = host[1:-1] + try: + return netaddr.IPAddress(host) + except (netaddr.AddrFormatError, ValueError): + return None + + +def _webhook_check_address_against_block_lists(host, addr, *, check_additional=True): + """ + Raise `ValidationError` if `addr` falls in the built-in block-list, or (when `check_additional` is True) + in the admin-extended block-list. The built-in block-list is enforced unconditionally and is NOT bypassed + by `WEBHOOK_ALLOWED_HOSTS` -- callers pass `check_additional=False` for allow-listed hosts to skip only the + admin-extended list. + """ + if _webhook_addr_is_builtin_blocked(addr): + logger.warning( + "Webhook URL validation: host %r resolved to %s, which is in a built-in blocked range.", host, addr + ) + raise ValidationError( + f"Webhook URL host {host!r} is not permitted (resolves to a reserved/loopback/link-local address)." + ) + if not check_additional: + return + for network in _webhook_additional_blocked_networks(): + if addr.version != network.version: + continue + if addr in network: + logger.warning( + "Webhook URL validation: host %r resolved to %s, which is in additional blocked network %s.", + host, + addr, + network, + ) + raise ValidationError( + f"Webhook URL host {host!r} is not permitted. " + "Add the host to WEBHOOK_ALLOWED_HOSTS if this target is intentional." + ) + + +def _webhook_validate_scheme_and_extract_host(url): + """Shared scheme + URL syntax check. Returns the URL host.""" + if not url: + raise ValidationError("Webhook URL is required.") + + allowed_schemes = list(settings.WEBHOOK_ALLOWED_SCHEMES) + try: + URLValidator(schemes=allowed_schemes)(url) + except ValidationError as exc: + scheme = urlsplit(url).scheme.lower() + if scheme and scheme not in (s.lower() for s in allowed_schemes): + raise ValidationError( + f"Webhook URL scheme {scheme!r} is not permitted; allowed schemes are: {', '.join(allowed_schemes)}." + ) + raise exc + + host = urlsplit(url).hostname + if not host: + raise ValidationError("Webhook URL must include a host.") + return host + + +def validate_webhook_url_format(url): + """ + Save-time validation: scheme, URL syntax, and built-in block-list check for IP-literal hosts. + + DNS resolution is intentionally NOT performed here because the web server and the Celery worker may have + different DNS views; reliable name-based block-list enforcement happens at request-send time via + `validate_webhook_url`. + + `WEBHOOK_ALLOWED_HOSTS` bypasses only the admin-extended block-list (`WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS`). + The built-in block-list (loopback, link-local, multicast, unspecified, reserved) is enforced unconditionally. + + Raises `django.core.exceptions.ValidationError` on any policy violation. + """ + host = _webhook_validate_scheme_and_extract_host(url) + allow_listed = _webhook_host_matches_allow_list(host, settings.WEBHOOK_ALLOWED_HOSTS) + + addr = _webhook_address_from_host(host) + if addr is not None: + _webhook_check_address_against_block_lists(host, addr, check_additional=not allow_listed) + + +def validate_webhook_url(url): + """ + Send-time validation: everything `validate_webhook_url_format` does, plus DNS resolution and block-list + check on every resolved address. + + Returns the validated IP that the caller should connect to (as a string). For IP-literal URLs the returned + IP is the literal. The caller can use this to pin the outbound connection to the validated IP for + DNS-rebinding mitigation. + + `WEBHOOK_ALLOWED_HOSTS` bypasses only the admin-extended block-list (`WEBHOOK_ADDITIONAL_BLOCKED_NETWORKS`). + The built-in block-list (loopback, link-local, multicast, unspecified, reserved) is enforced unconditionally, + so an allow-listed hostname whose DNS resolves to (e.g.) `169.254.169.254` is still rejected. + + Intended to be called from the Celery worker just before issuing the request, so the DNS view used here + matches the one the request will actually use. + + Raises `django.core.exceptions.ValidationError` on any policy violation. + """ + host = _webhook_validate_scheme_and_extract_host(url) + allow_listed = _webhook_host_matches_allow_list(host, settings.WEBHOOK_ALLOWED_HOSTS) + + addr = _webhook_address_from_host(host) + if addr is not None: + _webhook_check_address_against_block_lists(host, addr, check_additional=not allow_listed) + return str(addr) + + bare_host = host[1:-1] if host.startswith("[") and host.endswith("]") else host + try: + infos = socket.getaddrinfo(bare_host, None) + except socket.gaierror as exc: + logger.warning("Webhook URL validation: DNS resolution failed for host %r: %s", host, exc) + raise ValidationError(f"Unable to resolve webhook host {host!r}.") + + chosen = None + for info in infos: + addr = netaddr.IPAddress(info[4][0]) + _webhook_check_address_against_block_lists(host, addr, check_additional=not allow_listed) + if chosen is None: + chosen = str(addr) + return chosen + def enqueue_webhooks(object_change, snapshots=None, webhook_queryset=None): """
Vulnerability mechanics
AI mechanics synthesis has not run for this CVE yet.
References
6- github.com/advisories/GHSA-c35q-vxrp-ph26ghsaADVISORY
- github.com/nautobot/nautobot/commit/16aa4aa9796ab7a31c4d615ec945e1f16d8c77c4ghsaWEB
- github.com/nautobot/nautobot/commit/7324c8f0d8c7245fbc691e15d729adc2d2707d08ghsaWEB
- github.com/nautobot/nautobot/releases/tag/v2.4.33ghsaWEB
- github.com/nautobot/nautobot/releases/tag/v3.1.2ghsaWEB
- github.com/nautobot/nautobot/security/advisories/GHSA-c35q-vxrp-ph26ghsaWEB
News mentions
0No linked articles in our index yet.