Nautobot: REST API permits creation of GenericForeignKey references to objects that the user should not be able to reference
Description
Impact
In the case of inter-object references via GenericForeignKey (a pattern allowing an object to reference another object that may belong to one of several different "content types" or database tables), when creating or updating an object containing a GenericForeignKey, Nautobot's REST API failed to enforce user "view" permissions when determining whether a given reference to another object would be valid.
As a concrete example, a user:
- who has permission to create or update
ImageAttachmentrecords - but who lacks permission to view (some or all)
Devicerecords - _but who knows (via some other mechanism) the UUID of a specific
Devicethat they do not otherwise have access to_
could create via the REST API an ImageAttachment linked to that specific Device.
Other models that use GenericForeignKey and may be writable via the REST API, and hence have a similar vulnerability to ImageAttachment, may include:
ApprovalWorkflowCableConfigContextContactAssociationDataComplianceDeviceExportTemplateGraphQLQueryNoteObjectMetadataRelationshipAssociationStaticGroupAssociationVirtualMachine
Additionally, any Nautobot Apps that provide models with a REST API and use GenericForeignKey may have a similar vulnerability for their models.
Patches
A general-purpose fix has been implemented in Nautobot 2.4.33 and 3.1.2, which ensures correct application of "view" permissions when creating or modifying object references via GenericForeignKey throughout the REST API. Individual models/views/serializers generally will not require any specific code changes to benefit from this fix.
Workarounds
No known workarounds at this time.
References
- 2.4.33 (<a href="https://github.com/nautobot/nautobot/commit/9918bdb9bcf1eb42cda72c344f420a64ef7665f1">patch</a>)
- 3.1.2 (<a href="https://github.com/nautobot/nautobot/commit/36cde7148a207234de6212ec074f321dbc9d1b5b">patch</a>)
Affected products
1Patches
29918bdb9bcf1Merge commit from fork
9 files changed · +359 −27
changes/8854.housekeeping+1 −0 added@@ -0,0 +1 @@ +Fixed an intermittent test failure in `nautobot.core.tests.test_jobs.LogsCleanupTestCase`.
changes/GHSA-wpxj-44w3-2j6x.fixed+3 −0 added@@ -0,0 +1,3 @@ +Fixed `ImageAttachment` REST API incorrectly marking the `image_height` and `image_width` as required fields. +Fixed `ImageAttachment` REST API incorrectly allowing creation of attachments to an unsupported `content_type`. +Fixed `ContactAssociation` REST API incorrectly allowing creation of associations to an invalid `associated_object_type`.
changes/GHSA-wpxj-44w3-2j6x.security+1 −0 added@@ -0,0 +1 @@ +Added logic in the REST API to enforce user "view" permissions when assigning related objects via a GenericForeignKey (GHSA-wpxj-44w3-2j6x).
nautobot/core/api/serializers.py+40 −2 modified@@ -2,7 +2,7 @@ import logging import uuid -from django.contrib.contenttypes.fields import GenericRel +from django.contrib.contenttypes.fields import GenericForeignKey, GenericRel from django.contrib.contenttypes.models import ContentType from django.core.exceptions import ( FieldDoesNotExist, @@ -538,10 +538,48 @@ def validate(self, attrs): local_attrs.pop("relationships", None) local_attrs.pop("tags", None) - # Skip ManyToManyFields for field in self.Meta.model._meta.get_fields(): if isinstance(field, models.ManyToManyField): + # Skip ManyToManyFields local_attrs.pop(field.name, None) + elif isinstance(field, GenericForeignKey): + # Only validate the GFK target if this request is actually assigning it. A partial + # update that touches neither ct_field nor fk_field leaves the existing reference + # untouched, and re-checking parent view permission would block legitimate edits to + # unrelated fields. + ct_provided = field.ct_field in local_attrs + fk_provided = field.fk_field in local_attrs + if not ct_provided and not fk_provided: + continue + # For a partial update that changes only one side, fall back to the saved instance + # for the unchanged side so we validate the resulting target. + ct = local_attrs.get(field.ct_field) + fk = local_attrs.get(field.fk_field) + if self.instance is not None: + if ct is None: + ct = getattr(self.instance, field.ct_field, None) + if fk is None: + fk = getattr(self.instance, field.fk_field, None) + if ct is None or fk is None: + # Creation with one side missing: required-field validation will flag it downstream. + continue + ct_model = ct.model_class() + if ct_model is None: + raise ValidationError({field.ct_field: "Invalid content-type"}) + qs = ct_model.objects + # Layer view permission on top of the existence check when we have an authenticated + # request; programmatic use without a request still gets the existence check. + if ( + "request" in self.context + and self.context["request"] + and self.context["request"].user + and hasattr(qs, "restrict") + ): + qs = qs.restrict(self.context["request"].user, "view") + try: + qs.get(pk=fk) + except ObjectDoesNotExist as e: + raise ValidationError({field.fk_field: "Object not found"}) from e # Run clean() on an instance of the model if self.instance is None:
nautobot/core/tests/test_jobs.py+19 −1 modified@@ -1,6 +1,6 @@ import codecs import csv -from datetime import timedelta +from datetime import datetime, timedelta, timezone as dt_timezone from io import StringIO import json from pathlib import Path @@ -567,6 +567,24 @@ def setUp(self): ObjectChangeFactory.create_batch(40) if JobResult.objects.count() < 2: JobResultFactory.create_batch(20) + # To avoid spurious test failures due to factory randomness, + # ensure that we have some records well outside the test data cutoff windows. + ObjectChangeFactory.create(time=datetime(2023, 1, 1, tzinfo=dt_timezone.utc)) + ObjectChangeFactory.create(time=datetime(2026, 1, 1, tzinfo=dt_timezone.utc)) + latest_jr = JobResult.objects.filter( + date_created__isnull=False, date_started__isnull=False, date_done__isnull=False + ).latest() + latest_jr.date_created += timedelta(days=365) + latest_jr.date_started += timedelta(days=365) + latest_jr.date_done += timedelta(days=365) + latest_jr.save() + earliest_jr = JobResult.objects.filter( + date_created__isnull=False, date_started__isnull=False, date_done__isnull=False + ).earliest() + earliest_jr.date_created -= timedelta(days=365) + earliest_jr.date_started -= timedelta(days=365) + earliest_jr.date_done -= timedelta(days=365) + earliest_jr.save() @load_event_broker_override_settings( EVENT_BROKERS={
nautobot/dcim/api/serializers.py+0 −1 modified@@ -815,7 +815,6 @@ def validate(self, attrs): class CableSerializer(TaggedModelSerializerMixin, NautobotModelSerializer): - # TODO: termination_a_type/termination_b_type are a bit redundant with the full termination_a/termination_b dicts termination_a_type = ContentTypeField(queryset=ContentType.objects.filter(CABLE_TERMINATION_MODELS)) termination_b_type = ContentTypeField(queryset=ContentType.objects.filter(CABLE_TERMINATION_MODELS)) termination_a = serializers.SerializerMethodField(read_only=True)
nautobot/extras/api/serializers.py+7 −15 modified@@ -2,7 +2,6 @@ from django.conf import settings from django.contrib.contenttypes.models import ContentType -from django.core.exceptions import ObjectDoesNotExist from drf_spectacular.utils import extend_schema_field from rest_framework import serializers from rest_framework.validators import UniqueTogetherValidator @@ -227,7 +226,7 @@ def validate(self, attrs): class ContactAssociationSerializer(NautobotModelSerializer): - associated_object_type = ContentTypeField(queryset=ContentType.objects.all(), many=False) + associated_object_type = ContentTypeField(queryset=ContentType.objects.filter(FeatureQuery("contacts").get_query())) class Meta: model = ContactAssociation @@ -521,23 +520,16 @@ class GraphQLQueryOutputSerializer(serializers.Serializer): class ImageAttachmentSerializer(ValidatedModelSerializer): - content_type = ContentTypeField(queryset=ContentType.objects.all()) + content_type = ContentTypeField( + queryset=ContentType.objects.filter(app_label="dcim", model__in=["device", "location", "rack"]) + ) class Meta: model = ImageAttachment fields = "__all__" - - def validate(self, attrs): - # Validate that the parent object exists - try: - attrs["content_type"].get_object_for_this_type(id=attrs["object_id"]) - except ObjectDoesNotExist: - raise serializers.ValidationError(f"Invalid parent object: {attrs['content_type']} ID {attrs['object_id']}") - - # Enforce model validation - super().validate(attrs) - - return attrs + # image_height and image_width are auto-populated from the uploaded image via the ImageField's + # height_field/width_field declaration on the model, so clients must not supply them. + read_only_fields = ["image_height", "image_width"] @extend_schema_field( PolymorphicProxySerializer(
nautobot/extras/models/models.py+7 −0 modified@@ -783,6 +783,13 @@ def __str__(self): filename = self.image.name.rsplit("/", 1)[-1] return filename.split("_", 2)[2] + def clean(self): + super().clean() + # Currently only Device, Rack, and Location support ImageAttachment records. + # If this changes, various other code paths will need similar updates, e.g. the REST API. + if self.content_type.app_label != "dcim" or self.content_type.model not in ["device", "location", "rack"]: + raise ValidationError({"content_type": "Not a permitted content-type for ImageAttachments."}) + def delete(self, *args, **kwargs): _name = self.image.name
nautobot/extras/tests/test_api.py+281 −8 modified@@ -12,6 +12,8 @@ from django.urls import reverse from django.utils.timezone import make_aware, now from rest_framework import status +from rest_framework.request import Request as DRFRequest +from rest_framework.test import APIRequestFactory, force_authenticate from nautobot.core.choices import ColorChoices from nautobot.core.models.fields import slugify_dashes_to_underscores @@ -31,7 +33,11 @@ RackGroup, ) from nautobot.dcim.tests import test_views -from nautobot.extras.api.serializers import ConfigContextSerializer, JobResultSerializer +from nautobot.extras.api.serializers import ( + ConfigContextSerializer, + JobResultSerializer, + RelationshipAssociationSerializer, +) from nautobot.extras.choices import ( DynamicGroupOperatorChoices, DynamicGroupTypeChoices, @@ -1413,12 +1419,27 @@ class ImageAttachmentTest( model = ImageAttachment choices_fields = ["content_type"] + @classmethod + def _png_bytes(cls): + """Return the bytes of a minimal valid 1x1 PNG, generated via Pillow so it always validates.""" + from io import BytesIO + + from PIL import Image + + buf = BytesIO() + Image.new("RGBA", (1, 1)).save(buf, format="PNG") + return buf.getvalue() + @classmethod def setUpTestData(cls): ct = ContentType.objects.get_for_model(Location) location = Location.objects.filter(location_type=LocationType.objects.get(name="Campus")).first() + # These fixtures exist for GET/LIST/DELETE coverage and use placeholder image paths so they + # don't write to media storage. Tests that PATCH or otherwise round-trip through the API + # should create their own ImageAttachment via SimpleUploadedFile so the underlying file + # actually exists on disk. ImageAttachment.objects.create( content_type=ct, object_id=location.pk, @@ -1444,6 +1465,15 @@ def setUpTestData(cls): image_width=100, ) + def _create_real_image_attachment(self, *, name, location): + """Create an ImageAttachment whose underlying file is actually written to media storage.""" + return ImageAttachment.objects.create( + content_type=ContentType.objects.get_for_model(Location), + object_id=location.pk, + name=name, + image=SimpleUploadedFile(name=f"{name}.png", content=self._png_bytes(), content_type="image/png"), + ) + # TODO: Unskip after resolving #2908, #2909 @skip("DRF's built-in OrderingFilter triggering natural key attribute error in our base") def test_list_objects_ascending_ordered(self): @@ -1453,6 +1483,120 @@ def test_list_objects_ascending_ordered(self): def test_list_objects_descending_ordered(self): pass + def test_create_enforces_view_permission_on_parent(self): + """The GFK validation block must enforce view permission on the parent object on create.""" + location_ct = ContentType.objects.get_for_model(Location) + campus_lt = LocationType.objects.get(name="Campus") + permitted, forbidden = Location.objects.filter(location_type=campus_lt)[:2] + self.add_permissions("extras.add_imageattachment") + obj_perm = ObjectPermission( + name="View permitted location only", + constraints={"pk": str(permitted.pk)}, + actions=["view"], + ) + obj_perm.save() + obj_perm.users.add(self.user) + obj_perm.object_types.add(location_ct) + + with self.subTest("Viewable parent succeeds"): + response = self.client.post( + self._get_list_url(), + data={ + "content_type": "dcim.location", + "object_id": str(permitted.pk), + "name": "Permitted attachment", + "image": SimpleUploadedFile(name="img.png", content=self._png_bytes(), content_type="image/png"), + }, + format="multipart", + **self.header, + ) + self.assertHttpStatus(response, status.HTTP_201_CREATED) + + with self.subTest("Non-viewable parent is rejected"): + response = self.client.post( + self._get_list_url(), + data={ + "content_type": "dcim.location", + "object_id": str(forbidden.pk), + "name": "Forbidden attachment", + "image": SimpleUploadedFile(name="img.png", content=self._png_bytes(), content_type="image/png"), + }, + format="multipart", + **self.header, + ) + self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST) + self.assertIn("object_id", response.data) + self.assertFalse(ImageAttachment.objects.filter(name="Forbidden attachment").exists()) + + def test_partial_update_skips_gfk_validation_when_unchanged(self): + """A PATCH that doesn't touch content_type or object_id must not require parent view permission.""" + campus_lt = LocationType.objects.get(name="Campus") + location = Location.objects.filter(location_type=campus_lt).first() + instance = self._create_real_image_attachment(name="To be renamed", location=location) + # Note: deliberately NOT granting any view permission on the parent Location. + self.add_permissions("extras.view_imageattachment", "extras.change_imageattachment") + + response = self.client.patch( + self._get_detail_url(instance), + {"name": "Renamed via PATCH"}, + format="json", + **self.header, + ) + self.assertHttpStatus(response, status.HTTP_200_OK) + instance.refresh_from_db() + self.assertEqual(instance.name, "Renamed via PATCH") + + def test_partial_update_changing_object_id_enforces_view_permission(self): + """A PATCH that changes only object_id must validate the new target against parent view permission.""" + location_ct = ContentType.objects.get_for_model(Location) + campus_lt = LocationType.objects.get(name="Campus") + original_location = Location.objects.filter(location_type=campus_lt).first() + instance = self._create_real_image_attachment(name="Re-targetable", location=original_location) + # Pick another campus location not already used by the attachment. + other = Location.objects.filter(location_type=campus_lt).exclude(pk=instance.object_id).first() + self.add_permissions("extras.view_imageattachment", "extras.change_imageattachment") + + with self.subTest("Re-target to a viewable Location succeeds"): + obj_perm = ObjectPermission( + name="View both locations", + constraints={"pk__in": [str(instance.object_id), str(other.pk)]}, + actions=["view"], + ) + obj_perm.save() + obj_perm.users.add(self.user) + obj_perm.object_types.add(location_ct) + + response = self.client.patch( + self._get_detail_url(instance), + {"object_id": str(other.pk)}, + format="json", + **self.header, + ) + self.assertHttpStatus(response, status.HTTP_200_OK) + instance.refresh_from_db() + self.assertEqual(instance.object_id, other.pk) + + with self.subTest("Re-target to a non-viewable Location is rejected"): + # Tighten the permission so `other` is no longer viewable; instance is still viewable. + obj_perm.constraints = {"pk": str(instance.object_id)} + obj_perm.save() + # Find a third Location the user can't see. + third = ( + Location.objects.filter(location_type=campus_lt).exclude(pk__in=[instance.object_id, other.pk]).first() + ) + self.assertIsNotNone(third, "Test fixture needs a third Campus Location") + + response = self.client.patch( + self._get_detail_url(instance), + {"object_id": str(third.pk)}, + format="json", + **self.header, + ) + self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST) + self.assertIn("object_id", response.data) + instance.refresh_from_db() + self.assertNotEqual(instance.object_id, third.pk) + class JobTest( # note no CreateObjectViewTestCase - we do not support user creation of Job records @@ -3706,10 +3850,13 @@ def test_create_invalid_relationship_association(self): def test_model_clean_method_is_called(self): """Validate RelationshipAssociation clean method is called""" + # source_type is Device, but the relationship requires source_type=Location, which is what + # we expect the model's clean() to flag. source_id must reference a real Device so the + # serializer-level GFK existence check passes and model.clean() actually runs. data = { "relationship": self.relationship.pk, "source_type": "dcim.device", - "source_id": self.locations[2].pk, + "source_id": self.devices[1].pk, "destination_type": "dcim.device", "destination_id": self.devices[2].pk, } @@ -3891,6 +4038,133 @@ def test_update_association_data_on_location(self): self.assertTrue(RelationshipAssociation.objects.filter(destination_id=self.devices[3].pk).exists()) +class GenericForeignKeyValidationTest(APITestCase): + """ + Unit-level coverage of the GenericForeignKey enforcement in + `ValidatedModelSerializer.validate()`. + + Uses `RelationshipAssociationSerializer` as the test target because it has multiple GFKs + (source and destination) and no custom `validate()` override that would interfere with the + code path under test. + """ + + @classmethod + def setUpTestData(cls): + cls.location_type = ContentType.objects.get_for_model(Location) + cls.device_type_ct = ContentType.objects.get_for_model(Device) + cls.location_status = Status.objects.get_for_model(Location).first() + + cls.relationship = Relationship.objects.create( + label="GFK Validation Many-to-Many", + key="gfk_validation_m2m", + type=RelationshipTypeChoices.TYPE_MANY_TO_MANY, + source_type=cls.location_type, + destination_type=cls.device_type_ct, + ) + + lt = LocationType.objects.get(name="Campus") + cls.locations = [ + Location.objects.create(name=f"GFK Loc {n}", status=cls.location_status, location_type=lt) for n in range(3) + ] + manufacturer = Manufacturer.objects.first() + device_type = DeviceType.objects.create(manufacturer=manufacturer, model="GFK DT") + device_role = Role.objects.get_for_model(Device).first() + device_status = Status.objects.get_for_model(Device).first() + cls.devices = [ + Device.objects.create( + name=f"GFK Dev {n}", + device_type=device_type, + role=device_role, + location=cls.locations[0], + status=device_status, + ) + for n in range(3) + ] + + # An existing association used as the `instance` for partial-update tests + cls.existing = RelationshipAssociation.objects.create( + relationship=cls.relationship, + source_type=cls.location_type, + source_id=cls.locations[0].pk, + destination_type=cls.device_type_ct, + destination_id=cls.devices[0].pk, + ) + + def _request_context(self): + factory = APIRequestFactory() + raw = factory.post("/") + force_authenticate(raw, user=self.user) + drf_request = DRFRequest(raw) + drf_request.user = self.user + return {"request": drf_request} + + def test_create_with_viewable_targets_passes_validation(self): + """A creation whose source and destination GFK targets are both viewable is accepted.""" + self.add_permissions("extras.view_relationship", "dcim.view_location", "dcim.view_device") + serializer = RelationshipAssociationSerializer( + data={ + "relationship": str(self.relationship.pk), + "source_type": "dcim.location", + "source_id": str(self.locations[1].pk), + "destination_type": "dcim.device", + "destination_id": str(self.devices[1].pk), + }, + context=self._request_context(), + ) + self.assertTrue(serializer.is_valid(), serializer.errors) + + def test_create_rejects_non_viewable_source(self): + """A creation referencing a non-viewable source GFK target is rejected with an error on the fk_field.""" + self.add_permissions("extras.view_relationship", "dcim.view_device") + # User can view all Devices but only one specific Location (which we won't reference) + self.add_permissions("dcim.view_location", constraints={"pk__in": [str(self.locations[0].pk)]}) + serializer = RelationshipAssociationSerializer( + data={ + "relationship": str(self.relationship.pk), + "source_type": "dcim.location", + "source_id": str(self.locations[2].pk), + "destination_type": "dcim.device", + "destination_id": str(self.devices[1].pk), + }, + context=self._request_context(), + ) + self.assertFalse(serializer.is_valid()) + self.assertIn("source_id", serializer.errors) + + def test_partial_update_falls_back_to_instance_for_unchanged_ct_field(self): + """ + On a partial update that changes only the `fk_field` of a GFK pair, the validator falls back + to `self.instance` for the unchanged `ct_field` and validates the new target accordingly. + """ + # Existing association points at devices[0]; partial update redirects to devices[1]. + self.add_permissions("dcim.view_location", constraints={"pk__in": [str(self.locations[0].pk)]}) + self.add_permissions( + "dcim.view_device", + constraints={"pk__in": [str(self.devices[0].pk), str(self.devices[1].pk)]}, + ) + serializer = RelationshipAssociationSerializer( + instance=self.existing, + data={"destination_id": str(self.devices[1].pk)}, + partial=True, + context=self._request_context(), + ) + self.assertTrue(serializer.is_valid(), serializer.errors) + + def test_partial_update_rejects_unviewable_new_fk_target(self): + """A partial update changing only `fk_field` to a non-viewable target is rejected.""" + # User can view the existing target only; the new candidate (devices[2]) is hidden. + self.add_permissions("dcim.view_location", constraints={"pk__in": [str(self.locations[0].pk)]}) + self.add_permissions("dcim.view_device", constraints={"pk__in": [str(self.devices[0].pk)]}) + serializer = RelationshipAssociationSerializer( + instance=self.existing, + data={"destination_id": str(self.devices[2].pk)}, + partial=True, + context=self._request_context(), + ) + self.assertFalse(serializer.is_valid()) + self.assertIn("destination_id", serializer.errors) + + class SecretTest(APIViewTestCases.APIViewTestCase): model = Secret bulk_update_data = {} @@ -4166,12 +4440,11 @@ def setUpTestData(cls): "associated_object_id": device_pks[3], }, ] - # TODO: this isn't really valid since we're changing the associated_object_type but not the associated_object_id - # Should we disallow bulk-updates of StaticGroupAssociation? Or maybe skip the bulk-update tests at least? - cls.bulk_update_data = { - "dynamic_group": cls.dg3.pk, - "associated_object_type": "ipam.vlan", - } + # Although StaticGroupAssociation REST API supports bulk-updates, the `test_bulk_update_objects` generic test + # wants to update 3 distinct records with a *shared* set of `bulk_update_data` and doesn't provide a pattern for + # providing different data for each updated record. This doesn't really work for StaticGroupAssociation + # since a "realistic" bulk-update would have a different `associated_object` for each row. + cls.bulk_update_data = {} def test_content_type_mismatch(self): self.add_permissions("extras.add_staticgroupassociation")
36cde7148a20Merge commit from fork
7 files changed · +339 −26
changes/GHSA-wpxj-44w3-2j6x.fixed+3 −0 added@@ -0,0 +1,3 @@ +Fixed `ImageAttachment` REST API incorrectly marking the `image_height` and `image_width` as required fields. +Fixed `ImageAttachment` REST API incorrectly allowing creation of attachments to an unsupported `content_type`. +Fixed `ContactAssociation` REST API incorrectly allowing creation of associations to an invalid `associated_object_type`.
changes/GHSA-wpxj-44w3-2j6x.security+1 −0 added@@ -0,0 +1 @@ +Added logic in the REST API to enforce user "view" permissions when assigning related objects via a GenericForeignKey (GHSA-wpxj-44w3-2j6x).
nautobot/core/api/serializers.py+40 −2 modified@@ -2,7 +2,7 @@ import logging import uuid -from django.contrib.contenttypes.fields import GenericRel +from django.contrib.contenttypes.fields import GenericForeignKey, GenericRel from django.contrib.contenttypes.models import ContentType from django.core.exceptions import ( FieldDoesNotExist, @@ -545,10 +545,48 @@ def validate(self, attrs): local_attrs.pop("relationships", None) local_attrs.pop("tags", None) - # Skip ManyToManyFields for field in self.Meta.model._meta.get_fields(): if isinstance(field, models.ManyToManyField): + # Skip ManyToManyFields local_attrs.pop(field.name, None) + elif isinstance(field, GenericForeignKey): + # Only validate the GFK target if this request is actually assigning it. A partial + # update that touches neither ct_field nor fk_field leaves the existing reference + # untouched, and re-checking parent view permission would block legitimate edits to + # unrelated fields. + ct_provided = field.ct_field in local_attrs + fk_provided = field.fk_field in local_attrs + if not ct_provided and not fk_provided: + continue + # For a partial update that changes only one side, fall back to the saved instance + # for the unchanged side so we validate the resulting target. + ct = local_attrs.get(field.ct_field) + fk = local_attrs.get(field.fk_field) + if self.instance is not None: + if ct is None: + ct = getattr(self.instance, field.ct_field, None) + if fk is None: + fk = getattr(self.instance, field.fk_field, None) + if ct is None or fk is None: + # Creation with one side missing: required-field validation will flag it downstream. + continue + ct_model = ct.model_class() + if ct_model is None: + raise ValidationError({field.ct_field: "Invalid content-type"}) + qs = ct_model.objects + # Layer view permission on top of the existence check when we have an authenticated + # request; programmatic use without a request still gets the existence check. + if ( + "request" in self.context + and self.context["request"] + and self.context["request"].user + and hasattr(qs, "restrict") + ): + qs = qs.restrict(self.context["request"].user, "view") + try: + qs.get(pk=fk) + except ObjectDoesNotExist as e: + raise ValidationError({field.fk_field: "Object not found"}) from e # Run clean() on an instance of the model if self.instance is None:
nautobot/dcim/api/serializers.py+0 −1 modified@@ -809,7 +809,6 @@ def validate(self, attrs): class CableSerializer(TaggedModelSerializerMixin, NautobotModelSerializer): - # TODO: termination_a_type/termination_b_type are a bit redundant with the full termination_a/termination_b dicts termination_a_type = ContentTypeField(queryset=ContentType.objects.filter(CABLE_TERMINATION_MODELS)) termination_b_type = ContentTypeField(queryset=ContentType.objects.filter(CABLE_TERMINATION_MODELS)) termination_a = serializers.SerializerMethodField(read_only=True)
nautobot/extras/api/serializers.py+7 −15 modified@@ -3,7 +3,6 @@ from django.conf import settings from django.contrib.auth.models import Group from django.contrib.contenttypes.models import ContentType -from django.core.exceptions import ObjectDoesNotExist from drf_spectacular.utils import extend_schema_field from rest_framework import serializers from rest_framework.validators import UniqueTogetherValidator @@ -309,7 +308,7 @@ def validate(self, attrs): class ContactAssociationSerializer(NautobotModelSerializer): - associated_object_type = ContentTypeField(queryset=ContentType.objects.all(), many=False) + associated_object_type = ContentTypeField(queryset=ContentType.objects.filter(FeatureQuery("contacts").get_query())) class Meta: model = ContactAssociation @@ -607,23 +606,16 @@ class GraphQLQueryOutputSerializer(serializers.Serializer): class ImageAttachmentSerializer(ValidatedModelSerializer): - content_type = ContentTypeField(queryset=ContentType.objects.all()) + content_type = ContentTypeField( + queryset=ContentType.objects.filter(app_label="dcim", model__in=["device", "location", "rack"]) + ) class Meta: model = ImageAttachment fields = "__all__" - - def validate(self, attrs): - # Validate that the parent object exists - try: - attrs["content_type"].get_object_for_this_type(id=attrs["object_id"]) - except ObjectDoesNotExist: - raise serializers.ValidationError(f"Invalid parent object: {attrs['content_type']} ID {attrs['object_id']}") - - # Enforce model validation - super().validate(attrs) - - return attrs + # image_height and image_width are auto-populated from the uploaded image via the ImageField's + # height_field/width_field declaration on the model, so clients must not supply them. + read_only_fields = ["image_height", "image_width"] @extend_schema_field( PolymorphicProxySerializer(
nautobot/extras/models/models.py+7 −0 modified@@ -797,6 +797,13 @@ def __str__(self): filename = self.image.name.rsplit("/", 1)[-1] return filename.split("_", 2)[2] + def clean(self): + super().clean() + # Currently only Device, Rack, and Location support ImageAttachment records. + # If this changes, various other code paths will need similar updates, e.g. the REST API. + if self.content_type.app_label != "dcim" or self.content_type.model not in ["device", "location", "rack"]: + raise ValidationError({"content_type": "Not a permitted content-type for ImageAttachments."}) + def delete(self, *args, **kwargs): _name = self.image.name
nautobot/extras/tests/test_api.py+281 −8 modified@@ -14,6 +14,8 @@ from django.urls import reverse from django.utils.timezone import make_aware, now from rest_framework import status +from rest_framework.request import Request as DRFRequest +from rest_framework.test import APIRequestFactory, force_authenticate from nautobot.core.choices import ColorChoices from nautobot.core.models.fields import slugify_dashes_to_underscores @@ -33,7 +35,11 @@ RackGroup, ) from nautobot.dcim.tests import test_views -from nautobot.extras.api.serializers import ConfigContextSerializer, JobResultSerializer +from nautobot.extras.api.serializers import ( + ConfigContextSerializer, + JobResultSerializer, + RelationshipAssociationSerializer, +) from nautobot.extras.choices import ( ApprovalWorkflowStateChoices, DynamicGroupOperatorChoices, @@ -2255,12 +2261,27 @@ class ImageAttachmentTest( model = ImageAttachment choices_fields = ["content_type"] + @classmethod + def _png_bytes(cls): + """Return the bytes of a minimal valid 1x1 PNG, generated via Pillow so it always validates.""" + from io import BytesIO + + from PIL import Image + + buf = BytesIO() + Image.new("RGBA", (1, 1)).save(buf, format="PNG") + return buf.getvalue() + @classmethod def setUpTestData(cls): ct = ContentType.objects.get_for_model(Location) location = Location.objects.filter(location_type=LocationType.objects.get(name="Campus")).first() + # These fixtures exist for GET/LIST/DELETE coverage and use placeholder image paths so they + # don't write to media storage. Tests that PATCH or otherwise round-trip through the API + # should create their own ImageAttachment via SimpleUploadedFile so the underlying file + # actually exists on disk. ImageAttachment.objects.create( content_type=ct, object_id=location.pk, @@ -2286,6 +2307,15 @@ def setUpTestData(cls): image_width=100, ) + def _create_real_image_attachment(self, *, name, location): + """Create an ImageAttachment whose underlying file is actually written to media storage.""" + return ImageAttachment.objects.create( + content_type=ContentType.objects.get_for_model(Location), + object_id=location.pk, + name=name, + image=SimpleUploadedFile(name=f"{name}.png", content=self._png_bytes(), content_type="image/png"), + ) + # TODO: Unskip after resolving #2908, #2909 @skip("DRF's built-in OrderingFilter triggering natural key attribute error in our base") def test_list_objects_ascending_ordered(self): @@ -2295,6 +2325,120 @@ def test_list_objects_ascending_ordered(self): def test_list_objects_descending_ordered(self): pass + def test_create_enforces_view_permission_on_parent(self): + """The GFK validation block must enforce view permission on the parent object on create.""" + location_ct = ContentType.objects.get_for_model(Location) + campus_lt = LocationType.objects.get(name="Campus") + permitted, forbidden = Location.objects.filter(location_type=campus_lt)[:2] + self.add_permissions("extras.add_imageattachment") + obj_perm = ObjectPermission( + name="View permitted location only", + constraints={"pk": str(permitted.pk)}, + actions=["view"], + ) + obj_perm.save() + obj_perm.users.add(self.user) + obj_perm.object_types.add(location_ct) + + with self.subTest("Viewable parent succeeds"): + response = self.client.post( + self._get_list_url(), + data={ + "content_type": "dcim.location", + "object_id": str(permitted.pk), + "name": "Permitted attachment", + "image": SimpleUploadedFile(name="img.png", content=self._png_bytes(), content_type="image/png"), + }, + format="multipart", + **self.header, + ) + self.assertHttpStatus(response, status.HTTP_201_CREATED) + + with self.subTest("Non-viewable parent is rejected"): + response = self.client.post( + self._get_list_url(), + data={ + "content_type": "dcim.location", + "object_id": str(forbidden.pk), + "name": "Forbidden attachment", + "image": SimpleUploadedFile(name="img.png", content=self._png_bytes(), content_type="image/png"), + }, + format="multipart", + **self.header, + ) + self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST) + self.assertIn("object_id", response.data) + self.assertFalse(ImageAttachment.objects.filter(name="Forbidden attachment").exists()) + + def test_partial_update_skips_gfk_validation_when_unchanged(self): + """A PATCH that doesn't touch content_type or object_id must not require parent view permission.""" + campus_lt = LocationType.objects.get(name="Campus") + location = Location.objects.filter(location_type=campus_lt).first() + instance = self._create_real_image_attachment(name="To be renamed", location=location) + # Note: deliberately NOT granting any view permission on the parent Location. + self.add_permissions("extras.view_imageattachment", "extras.change_imageattachment") + + response = self.client.patch( + self._get_detail_url(instance), + {"name": "Renamed via PATCH"}, + format="json", + **self.header, + ) + self.assertHttpStatus(response, status.HTTP_200_OK) + instance.refresh_from_db() + self.assertEqual(instance.name, "Renamed via PATCH") + + def test_partial_update_changing_object_id_enforces_view_permission(self): + """A PATCH that changes only object_id must validate the new target against parent view permission.""" + location_ct = ContentType.objects.get_for_model(Location) + campus_lt = LocationType.objects.get(name="Campus") + original_location = Location.objects.filter(location_type=campus_lt).first() + instance = self._create_real_image_attachment(name="Re-targetable", location=original_location) + # Pick another campus location not already used by the attachment. + other = Location.objects.filter(location_type=campus_lt).exclude(pk=instance.object_id).first() + self.add_permissions("extras.view_imageattachment", "extras.change_imageattachment") + + with self.subTest("Re-target to a viewable Location succeeds"): + obj_perm = ObjectPermission( + name="View both locations", + constraints={"pk__in": [str(instance.object_id), str(other.pk)]}, + actions=["view"], + ) + obj_perm.save() + obj_perm.users.add(self.user) + obj_perm.object_types.add(location_ct) + + response = self.client.patch( + self._get_detail_url(instance), + {"object_id": str(other.pk)}, + format="json", + **self.header, + ) + self.assertHttpStatus(response, status.HTTP_200_OK) + instance.refresh_from_db() + self.assertEqual(instance.object_id, other.pk) + + with self.subTest("Re-target to a non-viewable Location is rejected"): + # Tighten the permission so `other` is no longer viewable; instance is still viewable. + obj_perm.constraints = {"pk": str(instance.object_id)} + obj_perm.save() + # Find a third Location the user can't see. + third = ( + Location.objects.filter(location_type=campus_lt).exclude(pk__in=[instance.object_id, other.pk]).first() + ) + self.assertIsNotNone(third, "Test fixture needs a third Campus Location") + + response = self.client.patch( + self._get_detail_url(instance), + {"object_id": str(third.pk)}, + format="json", + **self.header, + ) + self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST) + self.assertIn("object_id", response.data) + instance.refresh_from_db() + self.assertNotEqual(instance.object_id, third.pk) + class JobTest( # note no CreateObjectViewTestCase - we do not support user creation of Job records @@ -4567,10 +4711,13 @@ def test_create_invalid_relationship_association(self): def test_model_clean_method_is_called(self): """Validate RelationshipAssociation clean method is called""" + # source_type is Device, but the relationship requires source_type=Location, which is what + # we expect the model's clean() to flag. source_id must reference a real Device so the + # serializer-level GFK existence check passes and model.clean() actually runs. data = { "relationship": self.relationship.pk, "source_type": "dcim.device", - "source_id": self.locations[2].pk, + "source_id": self.devices[1].pk, "destination_type": "dcim.device", "destination_id": self.devices[2].pk, } @@ -4752,6 +4899,133 @@ def test_update_association_data_on_location(self): self.assertTrue(RelationshipAssociation.objects.filter(destination_id=self.devices[3].pk).exists()) +class GenericForeignKeyValidationTest(APITestCase): + """ + Unit-level coverage of the GenericForeignKey enforcement in + `ValidatedModelSerializer.validate()`. + + Uses `RelationshipAssociationSerializer` as the test target because it has multiple GFKs + (source and destination) and no custom `validate()` override that would interfere with the + code path under test. + """ + + @classmethod + def setUpTestData(cls): + cls.location_type = ContentType.objects.get_for_model(Location) + cls.device_type_ct = ContentType.objects.get_for_model(Device) + cls.location_status = Status.objects.get_for_model(Location).first() + + cls.relationship = Relationship.objects.create( + label="GFK Validation Many-to-Many", + key="gfk_validation_m2m", + type=RelationshipTypeChoices.TYPE_MANY_TO_MANY, + source_type=cls.location_type, + destination_type=cls.device_type_ct, + ) + + lt = LocationType.objects.get(name="Campus") + cls.locations = [ + Location.objects.create(name=f"GFK Loc {n}", status=cls.location_status, location_type=lt) for n in range(3) + ] + manufacturer = Manufacturer.objects.first() + device_type = DeviceType.objects.create(manufacturer=manufacturer, model="GFK DT") + device_role = Role.objects.get_for_model(Device).first() + device_status = Status.objects.get_for_model(Device).first() + cls.devices = [ + Device.objects.create( + name=f"GFK Dev {n}", + device_type=device_type, + role=device_role, + location=cls.locations[0], + status=device_status, + ) + for n in range(3) + ] + + # An existing association used as the `instance` for partial-update tests + cls.existing = RelationshipAssociation.objects.create( + relationship=cls.relationship, + source_type=cls.location_type, + source_id=cls.locations[0].pk, + destination_type=cls.device_type_ct, + destination_id=cls.devices[0].pk, + ) + + def _request_context(self): + factory = APIRequestFactory() + raw = factory.post("/") + force_authenticate(raw, user=self.user) + drf_request = DRFRequest(raw) + drf_request.user = self.user + return {"request": drf_request} + + def test_create_with_viewable_targets_passes_validation(self): + """A creation whose source and destination GFK targets are both viewable is accepted.""" + self.add_permissions("extras.view_relationship", "dcim.view_location", "dcim.view_device") + serializer = RelationshipAssociationSerializer( + data={ + "relationship": str(self.relationship.pk), + "source_type": "dcim.location", + "source_id": str(self.locations[1].pk), + "destination_type": "dcim.device", + "destination_id": str(self.devices[1].pk), + }, + context=self._request_context(), + ) + self.assertTrue(serializer.is_valid(), serializer.errors) + + def test_create_rejects_non_viewable_source(self): + """A creation referencing a non-viewable source GFK target is rejected with an error on the fk_field.""" + self.add_permissions("extras.view_relationship", "dcim.view_device") + # User can view all Devices but only one specific Location (which we won't reference) + self.add_permissions("dcim.view_location", constraints={"pk__in": [str(self.locations[0].pk)]}) + serializer = RelationshipAssociationSerializer( + data={ + "relationship": str(self.relationship.pk), + "source_type": "dcim.location", + "source_id": str(self.locations[2].pk), + "destination_type": "dcim.device", + "destination_id": str(self.devices[1].pk), + }, + context=self._request_context(), + ) + self.assertFalse(serializer.is_valid()) + self.assertIn("source_id", serializer.errors) + + def test_partial_update_falls_back_to_instance_for_unchanged_ct_field(self): + """ + On a partial update that changes only the `fk_field` of a GFK pair, the validator falls back + to `self.instance` for the unchanged `ct_field` and validates the new target accordingly. + """ + # Existing association points at devices[0]; partial update redirects to devices[1]. + self.add_permissions("dcim.view_location", constraints={"pk__in": [str(self.locations[0].pk)]}) + self.add_permissions( + "dcim.view_device", + constraints={"pk__in": [str(self.devices[0].pk), str(self.devices[1].pk)]}, + ) + serializer = RelationshipAssociationSerializer( + instance=self.existing, + data={"destination_id": str(self.devices[1].pk)}, + partial=True, + context=self._request_context(), + ) + self.assertTrue(serializer.is_valid(), serializer.errors) + + def test_partial_update_rejects_unviewable_new_fk_target(self): + """A partial update changing only `fk_field` to a non-viewable target is rejected.""" + # User can view the existing target only; the new candidate (devices[2]) is hidden. + self.add_permissions("dcim.view_location", constraints={"pk__in": [str(self.locations[0].pk)]}) + self.add_permissions("dcim.view_device", constraints={"pk__in": [str(self.devices[0].pk)]}) + serializer = RelationshipAssociationSerializer( + instance=self.existing, + data={"destination_id": str(self.devices[2].pk)}, + partial=True, + context=self._request_context(), + ) + self.assertFalse(serializer.is_valid()) + self.assertIn("destination_id", serializer.errors) + + class SecretTest(APIViewTestCases.APIViewTestCase): model = Secret bulk_update_data = {} @@ -5027,12 +5301,11 @@ def setUpTestData(cls): "associated_object_id": device_pks[3], }, ] - # TODO: this isn't really valid since we're changing the associated_object_type but not the associated_object_id - # Should we disallow bulk-updates of StaticGroupAssociation? Or maybe skip the bulk-update tests at least? - cls.bulk_update_data = { - "dynamic_group": cls.dg3.pk, - "associated_object_type": "ipam.vlan", - } + # Although StaticGroupAssociation REST API supports bulk-updates, the `test_bulk_update_objects` generic test + # wants to update 3 distinct records with a *shared* set of `bulk_update_data` and doesn't provide a pattern for + # providing different data for each updated record. This doesn't really work for StaticGroupAssociation + # since a "realistic" bulk-update would have a different `associated_object` for each row. + cls.bulk_update_data = {} def test_content_type_mismatch(self): self.add_permissions("extras.add_staticgroupassociation")
Vulnerability mechanics
AI mechanics synthesis has not run for this CVE yet.
References
6- github.com/advisories/GHSA-wpxj-44w3-2j6xghsaADVISORY
- github.com/nautobot/nautobot/commit/36cde7148a207234de6212ec074f321dbc9d1b5bghsa
- github.com/nautobot/nautobot/commit/9918bdb9bcf1eb42cda72c344f420a64ef7665f1ghsa
- github.com/nautobot/nautobot/releases/tag/v2.4.33ghsa
- github.com/nautobot/nautobot/releases/tag/v3.1.2ghsa
- github.com/nautobot/nautobot/security/advisories/GHSA-wpxj-44w3-2j6xghsa
News mentions
0No linked articles in our index yet.