VYPR
Medium severity5.0NVD Advisory· Published Apr 15, 2026· Updated Apr 21, 2026

CVE-2026-34244

CVE-2026-34244

Description

Weblate is a web based localization tool. In versions prior to 5.17, a user with the project.edit permission (granted by the per-project "Administration" role) can configure machine translation service URLs pointing to arbitrary internal network addresses. During configuration validation, Weblate makes an HTTP request to the attacker-controlled URL and reflects up to 200 characters of the response body back to the user in an error message. This constitutes a Server-Side Request Forgery (SSRF) with partial response read. This issue has been fixed in version 5.17. If developers are unable to immediately upgrade, they can limit available machinery services via WEBLATE_MACHINERY setting.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
weblatePyPI
< 5.175.17

Affected products

1

Patches

1
e619e9090202

fix(machinery): limit allowed URLs

https://github.com/WeblateOrg/weblateMichal ČihařMar 27, 2026via ghsa
20 files changed · +1421 85
  • docs/admin/config.rst+24 0 modified
    @@ -81,6 +81,30 @@ This is currently used in the following places:
     
        * :setting:`ALLOWED_ASSET_SIZE`
     
    +.. setting:: ALLOWED_MACHINERY_DOMAINS
    +
    +ALLOWED_MACHINERY_DOMAINS
    +-------------------------
    +
    +Configures which custom machinery domains are explicitly allowed in project-level
    +machine translation configuration.
    +
    +This setting applies only to machinery services and does not affect
    +:setting:`ALLOWED_ASSET_DOMAINS`.
    +
    +It expects a list of host/domain names. You can use fully qualified names or
    +prepend with a period as a wildcard to match all subdomains.
    +
    +Defaults to ``[]``.
    +
    +The allowlist affects project-managed machinery in two ways: it permits the
    +configured endpoint during outbound validation, and it marks matching hosts as
    +trusted when deciding whether remote provider error details or response bodies
    +can be shown to the user. For direct connections, runtime checks still reject
    +destinations that resolve to private or otherwise non-public addresses. When an
    +HTTP(S) proxy is used, runtime validation falls back to hostname validation and
    +does not perform the same local DNS or peer-IP checks.
    +
     .. setting:: ALLOWED_ASSET_SIZE
     
     ALLOWED_ASSET_SIZE
    
  • docs/changes.rst+1 0 modified
    @@ -21,6 +21,7 @@ Weblate 5.17
     .. rubric:: Bug fixes
     
     * Component file handling now validates repository symlinks.
    +* Hardened project-level machine translation against SSRF by blocking private-network targets for untrusted endpoints and hiding untrusted remote error details.
     * Prevented removing the last team from a project token.
     * Batch automatic translation now uses project-level machinery configuration instead of only site-wide settings.
     * Fixed sorting by the **Unreviewed** column in listings.
    
  • weblate/api/tests.py+18 0 modified
    @@ -3501,6 +3501,24 @@ def test_install_machinery(self) -> None:
     
             self.assertEqual(new_config, response.data)
     
    +    def test_install_machinery_blocks_private_project_target(self) -> None:
    +        self.component.project.add_user(self.user, "Administration")
    +
    +        response = self.do_request(
    +            "api:project-machinery-settings",
    +            self.project_kwargs,
    +            method="post",
    +            code=400,
    +            superuser=False,
    +            request={
    +                "service": "deepl",
    +                "configuration": {"key": "x", "url": "http://127.0.0.1:11434/"},
    +            },
    +            format="json",
    +        )
    +
    +        self.assertIn("URL domain is not allowed.", str(response.data))
    +
     
     class ComponentAPITest(APIBaseTest):
         def setUp(self) -> None:
    
  • weblate/api/views.py+6 2 modified
    @@ -1515,7 +1515,9 @@ def machinery_settings(self, request: Request, **kwargs):
                     raise ValidationError({"service": "Missing service name"}) from error
     
                 service, configuration, errors = validate_service_configuration(
    -                service_name, request.data.get("configuration", "{}")
    +                service_name,
    +                request.data.get("configuration", "{}"),
    +                allow_private_targets=False,
                 )
     
                 if service is None or errors:
    @@ -1554,7 +1556,9 @@ def machinery_settings(self, request: Request, **kwargs):
                 valid_configurations: dict[str, dict] = {}
                 for service_name, configuration in request.data.items():
                     service, configuration, errors = validate_service_configuration(
    -                    service_name, configuration
    +                    service_name,
    +                    configuration,
    +                    allow_private_targets=False,
                     )
     
                     if service is None or errors:
    
  • weblate/machinery/anthropic.py+2 0 modified
    @@ -4,6 +4,7 @@
     
     from __future__ import annotations
     
    +from typing import ClassVar
     from urllib.parse import urljoin
     
     from .base import MachineryRateLimitError
    @@ -20,6 +21,7 @@ class AnthropicTranslation(BaseLLMTranslation):
         """
     
         name = "Anthropic"
    +    trusted_error_hosts: ClassVar[set[str]] = {"api.anthropic.com"}
         end_point = "/v1/messages"
         settings_form = AnthropicMachineryForm
         version_added = "5.16"
    
  • weblate/machinery/base.py+124 26 modified
    @@ -15,8 +15,9 @@
     from html import escape, unescape
     from itertools import chain
     from typing import TYPE_CHECKING, ClassVar
    -from urllib.parse import quote
    +from urllib.parse import quote, urlparse
     
    +from django.conf import settings
     from django.core.cache import cache
     from django.core.exceptions import ValidationError
     from django.utils.functional import cached_property
    @@ -28,8 +29,10 @@
     from weblate.machinery.forms import BaseMachineryForm
     from weblate.utils.docs import DocVersionsMixin
     from weblate.utils.errors import report_error
    +from weblate.utils.forms import WeblateServiceURLField
     from weblate.utils.hash import calculate_dict_hash, calculate_hash, hash_to_checksum
    -from weblate.utils.requests import http_request
    +from weblate.utils.outbound import is_allowlisted_hostname
    +from weblate.utils.requests import http_request, validate_request_url
     from weblate.utils.similarity import Comparer
     from weblate.utils.site import get_site_url
     
    @@ -108,20 +111,21 @@ class BatchMachineTranslation(DocVersionsMixin):
     
         validate_source_language = "en"
         validate_target_language = "de"
    +    trusted_error_hosts: ClassVar[set[str]] = set()
     
         @classmethod
         def get_rank(cls):
             return cls.max_score + cls.rank_boost
     
    -    def __init__(self, settings: SettingsDict) -> None:
    +    def __init__(self, configuration: SettingsDict) -> None:
             """Create new machine translation object."""
             self.mtid = self.get_identifier()
             self.rate_limit_cache = f"{self.mtid}-rate-limit"
             self.languages_cache = f"{self.mtid}-languages"
             self.comparer = Comparer()
             self.supported_languages_error: Exception | None = None
             self.supported_languages_error_age: float = 0
    -        self.settings = settings
    +        self.settings = configuration
     
         def delete_cache(self) -> None:
             cache.delete_many([self.rate_limit_cache, self.languages_cache])
    @@ -187,29 +191,117 @@ def check_failure(self, response: Response) -> None:
             try:
                 response.raise_for_status()
             except HTTPError as error:
    -            detail = response.text
    -            try:
    -                payload = response.json()
    -            except JSONDecodeError:
    -                pass
    -            else:
    -                if isinstance(payload, dict) and payload:
    -                    if detail_error := payload.get("error"):
    -                        if isinstance(detail_error, str):
    -                            detail = detail_error
    -                        elif isinstance(detail_error, dict):
    -                            if "message" in detail_error:
    -                                detail = detail_error["message"]
    -                            else:
    -                                detail = str(detail_error)
    -                    else:
    -                        detail = str(payload)
    -
    -            if detail:
    +            if detail := self.get_error_detail(response):
                     message = f"{error.args[0]}: {detail[:200]}"
                     raise HTTPError(message, response=response) from error
                 raise
     
    +    @property
    +    def allow_private_targets(self) -> bool:
    +        return "_project" not in self.settings
    +
    +    def validate_runtime_url(self, url: str) -> None:
    +        validate_request_url(
    +            url,
    +            allow_private_targets=self.allow_private_targets,
    +            allowed_domains=settings.ALLOWED_MACHINERY_DOMAINS,
    +        )
    +
    +    @staticmethod
    +    def get_host_from_setting(value: object) -> str | None:
    +        if not isinstance(value, str):
    +            return None
    +        if "://" in value:
    +            return urlparse(value).hostname
    +        return value or None
    +
    +    def get_trusted_error_hosts(self) -> set[str]:
    +        hosts = set(settings.ALLOWED_MACHINERY_DOMAINS)
    +        hosts.update(self.trusted_error_hosts)
    +        if self.allow_private_targets or self.settings_form is None:
    +            return hosts
    +
    +        form = self.settings_form(self.__class__)
    +        for field_name, field in form.fields.items():
    +            is_endpoint_field = isinstance(field, WeblateServiceURLField) or (
    +                field_name in form.network_host_fields
    +            )
    +            if not is_endpoint_field:
    +                continue
    +
    +            values: set[str] = set()
    +            if initial := getattr(field, "initial", None):
    +                values.add(initial)
    +            values.update(value for value, _label in getattr(field, "choices", ()))
    +
    +            current_value = self.settings.get(field_name)
    +            if current_value in values and (
    +                host := self.get_host_from_setting(current_value)
    +            ):
    +                hosts.add(host)
    +
    +            for value in values:
    +                if host := self.get_host_from_setting(value):
    +                    hosts.add(host)
    +        return hosts
    +
    +    @classmethod
    +    def has_configurable_outbound_target(cls) -> bool:
    +        if cls.settings_form is None:
    +            return False
    +
    +        form = cls.settings_form(cls)
    +        for field_name, field in form.fields.items():
    +            if isinstance(field, WeblateServiceURLField):
    +                return True
    +            if field_name in form.network_host_fields:
    +                return True
    +        return False
    +
    +    def can_display_error_detail(self, response: Response) -> bool:
    +        if self.allow_private_targets:
    +            return True
    +        return self.is_trusted_error_host(response)
    +
    +    def is_trusted_error_host(self, response: Response) -> bool:
    +        if (
    +            self.settings_form is not None
    +            and not self.has_configurable_outbound_target()
    +        ):
    +            return True
    +        hostname = urlparse(response.url).hostname or ""
    +        return is_allowlisted_hostname(hostname, list(self.get_trusted_error_hosts()))
    +
    +    def get_error_detail(self, response: Response) -> str | None:
    +        if not self.can_display_error_detail(response):
    +            return None
    +        trusted_host = self.is_trusted_error_host(response)
    +
    +        try:
    +            payload = response.json()
    +        except JSONDecodeError:
    +            if trusted_host:
    +                return response.text or None
    +            return None
    +
    +        if isinstance(payload, dict):
    +            if (message := payload.get("message")) and isinstance(message, str):
    +                return message
    +            if (detail := payload.get("detail")) and isinstance(detail, str):
    +                return detail
    +            if error := payload.get("error"):
    +                if isinstance(error, str):
    +                    return error
    +                if isinstance(error, dict):
    +                    detail_message = error.get("message")
    +                    if isinstance(detail_message, str):
    +                        return detail_message
    +        elif isinstance(payload, str) and trusted_host:
    +            return payload
    +        if trusted_host:
    +            return response.text or None
    +        return None
    +
         def request(self, method, url, skip_auth=False, **kwargs):
             """Perform JSON request."""
             # Create custom headers
    @@ -231,6 +323,9 @@ def request(self, method, url, skip_auth=False, **kwargs):
                 timeout=self.request_timeout,
                 auth=self.get_auth(),
                 raise_for_status=False,
    +            validate_url=not self.allow_private_targets,
    +            allow_private_targets=self.allow_private_targets,
    +            allowed_domains=settings.ALLOWED_MACHINERY_DOMAINS,
                 **kwargs,
             )
     
    @@ -638,9 +733,12 @@ def _translate(
             return output
     
         def get_error_message(self, exc: Exception) -> str:
    -        if isinstance(exc, RequestException) and exc.response and exc.response.text:
    -            return f"{exc.__class__.__name__}: {exc}: {exc.response.text}"
    -        return f"{exc.__class__.__name__}: {exc}"
    +        message = f"{exc.__class__.__name__}: {exc}"
    +        if isinstance(exc, RequestException) and exc.response:
    +            detail = self.get_error_detail(exc.response)
    +            if detail and detail not in str(exc):
    +                return f"{message}: {detail}"
    +        return message
     
         def signed_salt(self, appid, secret, text):
             """Generate salt and sign as used by Chinese services."""
    
  • weblate/machinery/deepl.py+4 0 modified
    @@ -46,6 +46,10 @@ class DeepLTranslation(
         target_language_map: ClassVar[dict[str, str]] = {
             "PT": "PT-PT",
         }
    +    trusted_error_hosts: ClassVar[set[str]] = {
    +        "api.deepl.com",
    +        "api-free.deepl.com",
    +    }
         highlight_syntax = True
         settings_form = DeepLMachineryForm
         glossary_count_limit = 1000
    
  • weblate/machinery/forms.py+23 1 modified
    @@ -12,11 +12,14 @@
     from django.utils.translation import gettext, gettext_lazy, pgettext_lazy
     
     from weblate.utils.forms import WeblateServiceURLField
    +from weblate.utils.validators import validate_machinery_hostname, validate_machinery_url
     
     from .types import SourceLanguageChoices
     
     
     class BaseMachineryForm(forms.Form):
    +    network_host_fields = frozenset({"base_url", "endpoint_url"})
    +
         source_language = forms.ChoiceField(
             label=pgettext_lazy(
                 "Automatic suggestion service configuration", "Source language selection"
    @@ -26,8 +29,11 @@ class BaseMachineryForm(forms.Form):
             required=False,
         )
     
    -    def __init__(self, machinery, *args, **kwargs) -> None:
    +    def __init__(
    +        self, machinery, *args, allow_private_targets: bool = True, **kwargs
    +    ) -> None:
             self.machinery = machinery
    +        self.allow_private_targets = allow_private_targets
             super().__init__(*args, **kwargs)
     
         def serialize_form(self):
    @@ -40,9 +46,25 @@ def clean(self) -> None:
                     continue
                 if field not in settings:
                     return
    +        self.validate_endpoint_fields(settings)
    +        if not self.allow_private_targets:
    +            settings = {**settings, "_project": True}
             machinery = self.machinery(settings)
             machinery.validate_settings()
     
    +    def validate_endpoint_fields(self, settings) -> None:
    +        for field_name, field in self.fields.items():
    +            if (value := settings.get(field_name)) in {"", None}:
    +                continue
    +            if isinstance(field, WeblateServiceURLField):
    +                validate_machinery_url(
    +                    value, allow_private_targets=self.allow_private_targets
    +                )
    +            elif field_name in self.network_host_fields and isinstance(value, str):
    +                validate_machinery_hostname(
    +                    value, allow_private_targets=self.allow_private_targets
    +                )
    +
     
     class KeyMachineryForm(BaseMachineryForm):
         key = forms.CharField(
    
  • weblate/machinery/libretranslate.py+2 1 modified
    @@ -6,7 +6,7 @@
     
     from __future__ import annotations
     
    -from typing import TYPE_CHECKING
    +from typing import TYPE_CHECKING, ClassVar
     
     from .base import BatchMachineTranslation
     from .forms import LibreTranslateMachineryForm
    @@ -22,6 +22,7 @@ class LibreTranslateTranslation(BatchMachineTranslation):
     
         name = "LibreTranslate"
         max_score = 89
    +    trusted_error_hosts: ClassVar[set[str]] = {"libretranslate.com"}
         version_added = "4.7.1"
         settings_form = LibreTranslateMachineryForm
         request_timeout = 20
    
  • weblate/machinery/microsoft.py+9 4 modified
    @@ -6,6 +6,7 @@
     
     from datetime import timedelta
     from typing import TYPE_CHECKING, ClassVar
    +from urllib.parse import urlparse
     
     from django.utils import timezone
     
    @@ -57,6 +58,12 @@ class MicrosoftCognitiveTranslation(XMLMachineTranslationMixin, MachineTranslati
         def get_identifier(cls) -> str:
             return "microsoft-translator"
     
    +    @classmethod
    +    def get_application_hosts(cls) -> set[str]:
    +        return {
    +            value for value, _label in cls.settings_form.base_fields["base_url"].choices
    +        }
    +
         def __init__(self, settings: SettingsDict) -> None:
             """Check configuration."""
             super().__init__(settings)
    @@ -108,10 +115,8 @@ def check_failure(self, response) -> None:
             # Microsoft tends to use utf-8-sig instead of plain utf-8
             response.encoding = response.apparent_encoding
             super().check_failure(response)
    -        if (
    -            response.url.startswith("https://api.cognitive.microsofttranslator.com/")
    -            and response.status_code == 200
    -        ):
    +        hostname = urlparse(response.url).hostname
    +        if response.status_code == 200 and hostname in self.get_application_hosts():
                 payload = response.json()
     
                 # We should get an object, string usually means an error
    
  • weblate/machinery/models.py+7 1 modified
    @@ -64,6 +64,8 @@ class Meta:
     def validate_service_configuration(
         service_name: str,
         configuration: str | SettingsDict,
    +    *,
    +    allow_private_targets: bool = True,
     ) -> tuple[BatchMachineTranslation | None, SettingsDict, list[str]]:
         """
         Validate given service configuration.
    @@ -93,7 +95,11 @@ def validate_service_configuration(
     
         errors = []
         if service.settings_form is not None:
    -        form = service.settings_form(service, data=service_configuration)
    +        form = service.settings_form(
    +            service,
    +            data=service_configuration,
    +            allow_private_targets=allow_private_targets,
    +        )
             # validate form
             if not form.is_valid():
                 errors.extend([str(error) for error in form.non_field_errors()])
    
  • weblate/machinery/openai.py+13 1 modified
    @@ -4,7 +4,7 @@
     
     from __future__ import annotations
     
    -from typing import TYPE_CHECKING
    +from typing import TYPE_CHECKING, ClassVar
     
     from django.core.cache import cache
     
    @@ -24,6 +24,9 @@
     class BaseOpenAITranslation(BaseLLMTranslation):
         client: OpenAI
     
    +    def get_runtime_base_url(self) -> str:
    +        raise NotImplementedError
    +
         def fetch_llm_translations(
             self, prompt: str, content: str, previous_content: str, previous_response: str
         ) -> str | None:
    @@ -54,6 +57,7 @@ def fetch_llm_translations(
                 ),
             ]
             try:
    +            self.validate_runtime_url(self.get_runtime_base_url())
                 response = self.client.chat.completions.create(
                     model=self.get_model(),
                     messages=messages,
    @@ -70,6 +74,7 @@ def fetch_llm_translations(
     
     class OpenAITranslation(BaseOpenAITranslation):
         name = "OpenAI"
    +    trusted_error_hosts: ClassVar[set[str]] = {"api.openai.com"}
     
         version_added = "5.3"
     
    @@ -86,6 +91,9 @@ def __init__(self, settings=None) -> None:
             )
             self._models: set[str] | None = None
     
    +    def get_runtime_base_url(self) -> str:
    +        return self.settings.get("base_url") or "https://api.openai.com/v1"
    +
         def get_model(self) -> str:
             if self._models is None:
                 cache_key = self.get_cache_key("models")
    @@ -94,6 +102,7 @@ def get_model(self) -> str:
                     # hiredis-py 3 makes list from set
                     self._models = set(models_cache)
                 else:
    +                self.validate_runtime_url(self.get_runtime_base_url())
                     self._models = {model.id for model in self.client.models.list()}
                     cache.set(cache_key, self._models, 3600)
     
    @@ -129,5 +138,8 @@ def __init__(self, settings=None) -> None:
                 azure_deployment=self.settings["deployment"],
             )
     
    +    def get_runtime_base_url(self) -> str:
    +        return self.settings.get("azure_endpoint") or ""
    +
         def get_model(self) -> str:
             return self.settings["deployment"]
    
  • weblate/machinery/tests.py+347 0 modified
    @@ -5,6 +5,7 @@
     from __future__ import annotations
     
     import json
    +import os
     import re
     from copy import copy
     from datetime import UTC, datetime
    @@ -19,16 +20,19 @@
     import respx
     from aliyunsdkcore.client import AcsClient
     from botocore.stub import ANY, Stubber
    +from django.core.exceptions import ValidationError
     from django.core.management import call_command
     from django.core.management.base import CommandError
     from django.test import TestCase
    +from django.test.utils import override_settings
     from django.urls import reverse
     from google.api_core import exceptions as google_api_exceptions
     from google.cloud.translate import (
         SupportedLanguages,
         TranslateTextResponse,
         TranslationServiceClient,
     )
    +from requests.exceptions import HTTPError, JSONDecodeError
     
     import weblate.machinery.models
     from weblate.checks.tests.test_checks import MockUnit
    @@ -856,6 +860,35 @@ def mock_response(self) -> None:
                 json=MICROSOFT_RESPONSE,
             )
     
    +    @responses.activate
    +    def test_regional_host_string_payload_raises_error(self) -> None:
    +        machine = self.MACHINE_CLS(
    +            {
    +                **self.CONFIGURATION,
    +                "base_url": "api-eur.cognitive.microsofttranslator.com",
    +            }
    +        )
    +        responses.add(
    +            responses.POST,
    +            "https://westeurope.api.cognitive.microsoft.com/sts/v1.0/issueToken"
    +            "?Subscription-Key=KEY",
    +            body="TOKEN",
    +        )
    +        responses.add(
    +            responses.GET,
    +            "https://api-eur.cognitive.microsofttranslator.com/languages?api-version=3.0",
    +            json=MS_SUPPORTED_LANG_RESP,
    +        )
    +        responses.add(
    +            responses.POST,
    +            "https://api-eur.cognitive.microsofttranslator.com/"
    +            "translate?api-version=3.0&from=en&to=cs&category=general&textType=html",
    +            json="Regional host error",
    +        )
    +
    +        with self.assertRaisesRegex(MachineTranslationError, "Regional host error"):
    +            self.assert_translate(self.SUPPORTED, self.SOURCE_BLANK, 0, machine=machine)
    +
     
     class GoogleTranslationTest(BaseMachineTranslationTest):
         MACHINE_CLS = GoogleTranslation
    @@ -2650,6 +2683,56 @@ def test_clean_custom(self) -> None:
             form = machine.settings_form(machine, settings)
             self.assertFalse(form.is_valid())
     
    +    @patch(
    +        "weblate.utils.outbound.socket.getaddrinfo",
    +        return_value=[(0, 0, 0, "", ("127.0.0.1", 443))],
    +    )
    +    def test_runtime_url_validation(self, mocked_getaddrinfo) -> None:
    +        machine = self.MACHINE_CLS(self.CONFIGURATION.copy())
    +        machine.delete_cache()
    +        machine.settings["_project"] = Mock()
    +
    +        with (
    +            patch.object(machine.client.models, "list") as mocked_list,
    +            self.assertRaises(ValidationError),
    +        ):
    +            machine.get_model()
    +
    +        mocked_getaddrinfo.assert_called_once()
    +        mocked_list.assert_not_called()
    +
    +    @patch(
    +        "weblate.utils.outbound.socket.getaddrinfo",
    +        side_effect=OSError("Name or service not known"),
    +    )
    +    def test_runtime_url_validation_uses_proxy_settings(
    +        self, mocked_getaddrinfo
    +    ) -> None:
    +        machine = self.MACHINE_CLS(self.CONFIGURATION.copy())
    +        machine.delete_cache()
    +        machine.settings["_project"] = Mock()
    +
    +        with (
    +            patch.dict(
    +                os.environ,
    +                {
    +                    "HTTPS_PROXY": "http://127.0.0.1:8080",
    +                    "HTTP_PROXY": "",
    +                    "ALL_PROXY": "",
    +                    "NO_PROXY": "",
    +                },
    +            ),
    +            patch.object(
    +                machine.client.models,
    +                "list",
    +                return_value=[Mock(id="gpt-5-nano")],
    +            ) as mocked_list,
    +        ):
    +            self.assertEqual(machine.get_model(), "gpt-5-nano")
    +
    +        mocked_getaddrinfo.assert_not_called()
    +        mocked_list.assert_called_once()
    +
     
     class AzureOpenAITranslationTest(OpenAITranslationTest):
         MACHINE_CLS: type[BatchMachineTranslation] = AzureOpenAITranslation
    @@ -2692,6 +2775,42 @@ def mock_response(self) -> None:
                 )
             )
     
    +    @patch(
    +        "weblate.utils.outbound.socket.getaddrinfo",
    +        side_effect=OSError("Name or service not known"),
    +    )
    +    def test_runtime_url_validation_uses_proxy_settings(
    +        self, mocked_getaddrinfo
    +    ) -> None:
    +        machine = self.MACHINE_CLS(self.CONFIGURATION.copy())
    +        machine.settings["_project"] = Mock()
    +        completion = Mock()
    +        completion.choices = [Mock(message=Mock(content='["Ahoj světe"]'))]
    +
    +        with (
    +            patch.dict(
    +                os.environ,
    +                {
    +                    "HTTPS_PROXY": "http://127.0.0.1:8080",
    +                    "HTTP_PROXY": "",
    +                    "ALL_PROXY": "",
    +                    "NO_PROXY": "",
    +                },
    +            ),
    +            patch.object(
    +                machine.client.chat.completions,
    +                "create",
    +                return_value=completion,
    +            ) as mocked_create,
    +        ):
    +            self.assertEqual(
    +                machine.fetch_llm_translations("prompt", "content", "prev", "resp"),
    +                '["Ahoj světe"]',
    +            )
    +
    +        mocked_getaddrinfo.assert_not_called()
    +        mocked_create.assert_called_once()
    +
     
     class OllamaTranslationTest(BaseMachineTranslationTest):
         MACHINE_CLS: type[BatchMachineTranslation] = OllamaTranslation
    @@ -3318,6 +3437,234 @@ def test_configure_invalid(self) -> None:
             )
     
     
    +class MachineryValidationTest(TestCase):
    +    def test_project_machinery_rejects_private_url(self) -> None:
    +        form = DeepLTranslation.settings_form(
    +            DeepLTranslation,
    +            data={"key": "x", "url": "http://127.0.0.1:11434/"},
    +            allow_private_targets=False,
    +        )
    +
    +        self.assertFalse(form.is_valid())
    +        self.assertIn("URL domain is not allowed.", form.errors["__all__"])
    +
    +    def test_check_failure_hides_response_body(self) -> None:
    +        response = Mock()
    +        response.raise_for_status.side_effect = HTTPError(
    +            "500 Server Error: Internal Server Error for url: http://127.0.0.1/api"
    +        )
    +        response.url = "http://127.0.0.1/api"
    +        response.text = "aws_secret_key=AKIAIOSFODNN7EXAMPLE"
    +        machine = DummyTranslation({})
    +
    +        with self.assertRaises(HTTPError) as raised:
    +            machine.check_failure(response)
    +
    +        self.assertNotIn("aws_secret_key", str(raised.exception))
    +
    +    def test_check_failure_shows_trusted_provider_message(self) -> None:
    +        response = Mock()
    +        response.raise_for_status.side_effect = HTTPError(
    +            "400 Client Error: Bad Request for url: https://api.deepl.com/v2/translate"
    +        )
    +        response.url = "https://api.deepl.com/v2/translate"
    +        response.json.return_value = {"message": "Auth key is invalid."}
    +        machine = DeepLTranslation({"key": "x", "url": "https://api.deepl.com/v2/"})
    +
    +        with self.assertRaises(HTTPError) as raised:
    +            machine.check_failure(response)
    +
    +        self.assertIn("Auth key is invalid.", str(raised.exception))
    +
    +    def test_check_failure_shows_trusted_provider_plain_text_message(self) -> None:
    +        response = Mock()
    +        response.raise_for_status.side_effect = HTTPError(
    +            "429 Client Error: Too Many Requests for url: https://api.deepl.com/v2/translate"
    +        )
    +        response.url = "https://api.deepl.com/v2/translate"
    +        response.text = "Rate limit exceeded."
    +        response.json.side_effect = JSONDecodeError("Expecting value", "", 0)
    +        machine = DeepLTranslation(
    +            {"key": "x", "url": "https://api.deepl.com/v2/", "_project": Mock()}
    +        )
    +
    +        with self.assertRaises(HTTPError) as raised:
    +            machine.check_failure(response)
    +
    +        self.assertIn("Rate limit exceeded.", str(raised.exception))
    +
    +    def test_check_failure_shows_fixed_provider_plain_text_message(self) -> None:
    +        response = Mock()
    +        response.raise_for_status.side_effect = HTTPError(
    +            "503 Server Error: Service Unavailable for url: https://translation.googleapis.com/language/translate/v2"
    +        )
    +        response.url = "https://translation.googleapis.com/language/translate/v2"
    +        response.text = "Service temporarily unavailable."
    +        response.json.side_effect = JSONDecodeError("Expecting value", "", 0)
    +        machine = GoogleTranslation({"key": "x", "_project": Mock()})
    +
    +        with self.assertRaises(HTTPError) as raised:
    +            machine.check_failure(response)
    +
    +        self.assertIn("Service temporarily unavailable.", str(raised.exception))
    +
    +    def test_check_failure_hides_untrusted_provider_message(self) -> None:
    +        response = Mock()
    +        response.raise_for_status.side_effect = HTTPError(
    +            "400 Client Error: Bad Request for url: https://custom.example.com/v1"
    +        )
    +        response.url = "https://custom.example.com/v1"
    +        response.json.return_value = {"message": "Top secret."}
    +        machine = OpenAITranslation(
    +            {
    +                "key": "x",
    +                "model": "auto",
    +                "persona": "",
    +                "style": "",
    +                "base_url": "https://custom.example.com/",
    +                "_project": Mock(),
    +            }
    +        )
    +
    +        with self.assertRaises(HTTPError) as raised:
    +            machine.check_failure(response)
    +
    +        self.assertNotIn("Top secret.", str(raised.exception))
    +
    +    def test_get_error_message_hides_untrusted_response_body(self) -> None:
    +        response = Mock()
    +        response.url = "https://custom.example.com/v1"
    +        response.text = "Top secret."
    +        response.json.return_value = {"message": "Top secret."}
    +        error = HTTPError(
    +            "400 Client Error: Bad Request for url: https://custom.example.com/v1",
    +            response=response,
    +        )
    +        machine = OpenAITranslation(
    +            {
    +                "key": "x",
    +                "model": "auto",
    +                "persona": "",
    +                "style": "",
    +                "base_url": "https://custom.example.com/",
    +                "_project": Mock(),
    +            }
    +        )
    +
    +        message = machine.get_error_message(error)
    +
    +        self.assertNotIn("Top secret.", message)
    +
    +    def test_check_failure_does_not_trust_non_endpoint_choice_values(self) -> None:
    +        response = Mock()
    +        response.raise_for_status.side_effect = HTTPError(
    +            "400 Client Error: Bad Request for url: https://auto/v1"
    +        )
    +        response.url = "https://auto/v1"
    +        response.json.return_value = {"message": "Top secret."}
    +        machine = OpenAITranslation(
    +            {
    +                "key": "x",
    +                "model": "auto",
    +                "persona": "",
    +                "style": "",
    +                "base_url": "https://custom.example.com/",
    +                "_project": Mock(),
    +            }
    +        )
    +
    +        with self.assertRaises(HTTPError) as raised:
    +            machine.check_failure(response)
    +
    +        self.assertNotIn("Top secret.", str(raised.exception))
    +
    +    @override_settings(ALLOWED_MACHINERY_DOMAINS=["api.sap.com"])
    +    def test_check_failure_handles_non_string_project_settings(self) -> None:
    +        response = Mock()
    +        response.raise_for_status.side_effect = HTTPError(
    +            "400 Client Error: Bad Request for url: https://api.sap.com/v1/translate"
    +        )
    +        response.url = "https://api.sap.com/v1/translate"
    +        response.json.return_value = {"message": "Invalid credentials."}
    +        machine = SAPTranslationHub(
    +            {
    +                "key": "x",
    +                "username": "",
    +                "password": "",
    +                "enable_mt": True,
    +                "domain": "",
    +                "url": "https://api.sap.com",
    +                "_project": Mock(),
    +            }
    +        )
    +
    +        with self.assertRaises(HTTPError) as raised:
    +            machine.check_failure(response)
    +
    +        self.assertIn("Invalid credentials.", str(raised.exception))
    +
    +    def test_check_failure_shows_libretranslate_plain_text_message(self) -> None:
    +        response = Mock()
    +        response.raise_for_status.side_effect = HTTPError(
    +            "429 Client Error: Too Many Requests for url: https://libretranslate.com/translate"
    +        )
    +        response.url = "https://libretranslate.com/translate"
    +        response.text = "Too many requests."
    +        response.json.side_effect = JSONDecodeError("Expecting value", "", 0)
    +        machine = LibreTranslateTranslation(
    +            {"key": "", "url": "https://libretranslate.com/", "_project": Mock()}
    +        )
    +
    +        with self.assertRaises(HTTPError) as raised:
    +            machine.check_failure(response)
    +
    +        self.assertIn("Too many requests.", str(raised.exception))
    +
    +    @patch(
    +        "weblate.utils.outbound.socket.getaddrinfo",
    +        return_value=[(0, 0, 0, "", ("127.0.0.1", 443))],
    +    )
    +    def test_project_validation_uses_runtime_url_guard(
    +        self, mocked_getaddrinfo
    +    ) -> None:
    +        form = DeepLTranslation.settings_form(
    +            DeepLTranslation,
    +            data={"key": "x", "url": "https://api.deepl.com/v2/"},
    +            allow_private_targets=False,
    +        )
    +
    +        with patch("requests.sessions.Session.request") as mocked_request:
    +            self.assertFalse(form.is_valid())
    +
    +        mocked_getaddrinfo.assert_called()
    +        mocked_request.assert_not_called()
    +        self.assertIn("URL domain is not allowed.", str(form.non_field_errors()))
    +
    +    @override_settings(ALLOWED_MACHINERY_DOMAINS=[".example.com"])
    +    def test_check_failure_shows_wildcard_allowlisted_provider_message(self) -> None:
    +        response = Mock()
    +        response.raise_for_status.side_effect = HTTPError(
    +            "400 Client Error: Bad Request for url: https://api.example.com/v1"
    +        )
    +        response.url = "https://api.example.com/v1"
    +        response.json.return_value = {"message": "Allowlisted provider error."}
    +        machine = OpenAITranslation(
    +            {
    +                "key": "x",
    +                "model": "auto",
    +                "persona": "",
    +                "style": "",
    +                "base_url": "https://api.example.com/",
    +                "_project": Mock(),
    +            }
    +        )
    +
    +        with self.assertRaises(HTTPError) as raised:
    +            machine.check_failure(response)
    +
    +        self.assertIn("Allowlisted provider error.", str(raised.exception))
    +
    +
     class CommandTest(FixtureTestCase):
         """Test for management commands."""
     
    
  • weblate/machinery/views.py+9 0 modified
    @@ -231,8 +231,13 @@ def get_form_class(self):
         def get_form_kwargs(self):
             result = super().get_form_kwargs()
             result["machinery"] = self.machinery
    +        result["allow_private_targets"] = self.allow_private_targets
             return result
     
    +    @property
    +    def allow_private_targets(self) -> bool:
    +        return True
    +
         @cached_property
         def settings_dict(self) -> dict[str, SettingsDict]:
             raise NotImplementedError
    @@ -326,6 +331,10 @@ def dispatch(self, request: AuthenticatedHttpRequest, *args, **kwargs):  # type:
     
     
     class EditMachineryProjectView(MachineryProjectMixin, EditMachineryView):
    +    @property
    +    def allow_private_targets(self) -> bool:
    +        return False
    +
         def save_settings(self, data: SettingsDict | None) -> None:
             self.project.machinery_settings[self.machinery_id] = data
             self.project.save(update_fields=["machinery_settings"])
    
  • weblate/utils/models.py+1 0 modified
    @@ -89,6 +89,7 @@ class WeblateConf(AppConf):
         LOCALE_FILTER_FILES = True
     
         ALLOWED_ASSET_DOMAINS: ClassVar[list[str]] = ["*"]
    +    ALLOWED_MACHINERY_DOMAINS: ClassVar[list[str]] = []
         ALLOWED_ASSET_SIZE: ClassVar[int] = 4194304
     
         class Meta:
    
  • weblate/utils/outbound.py+166 0 added
    @@ -0,0 +1,166 @@
    +# Copyright © Michal Čihař <michal@weblate.org>
    +#
    +# SPDX-License-Identifier: GPL-3.0-or-later
    +
    +from __future__ import annotations
    +
    +import ipaddress
    +import socket
    +from urllib.parse import urlparse
    +
    +from django.core.exceptions import ValidationError
    +from django.http.request import validate_host
    +from django.utils.translation import gettext
    +
    +LOCAL_HOST_SUFFIXES = (
    +    ".local",
    +    ".localhost",
    +)
    +
    +
    +def _parse_ip(value: str) -> ipaddress.IPv4Address | ipaddress.IPv6Address | None:
    +    try:
    +        return ipaddress.ip_address(value)
    +    except ValueError:
    +        return None
    +
    +
    +def _normalize_hostname(value: str) -> str:
    +    normalized = value.rstrip(".")
    +    if not normalized:
    +        return ""
    +
    +    if "://" not in normalized:
    +        normalized = f"//{normalized}"
    +
    +    hostname = urlparse(normalized).hostname
    +    if hostname is None:
    +        return value.rstrip(".")
    +    return hostname.rstrip(".")
    +
    +
    +def _parse_hostname_ip(
    +    value: str,
    +) -> ipaddress.IPv4Address | ipaddress.IPv6Address | None:
    +    normalized = _normalize_hostname(value)
    +
    +    if ip_address := _parse_ip(normalized):
    +        return ip_address
    +
    +    try:
    +        packed = socket.inet_aton(normalized)
    +    except OSError:
    +        return None
    +
    +    return ipaddress.IPv4Address(packed)
    +
    +
    +def _is_public_ip(value: str) -> bool:
    +    address = _parse_ip(value)
    +    return address is not None and address.is_global
    +
    +
    +def validate_runtime_ip(value: str, *, allow_private_targets: bool = True) -> None:
    +    if allow_private_targets:
    +        return
    +
    +    if not _is_public_ip(value):
    +        raise ValidationError(gettext("URL domain is not allowed."))
    +
    +
    +def is_allowlisted_hostname(
    +    hostname: str, allowed_domains: list[str] | tuple[str, ...]
    +) -> bool:
    +    return bool(allowed_domains) and validate_host(
    +        _normalize_hostname(hostname), allowed_domains
    +    )
    +
    +
    +def validate_untrusted_hostname(
    +    hostname: str,
    +    *,
    +    allowed_domains: list[str] | tuple[str, ...] = (),
    +) -> None:
    +    normalized = _normalize_hostname(hostname)
    +    if not normalized:
    +        raise ValidationError(gettext("URL domain is not allowed."))
    +
    +    if is_allowlisted_hostname(normalized, allowed_domains):
    +        return
    +
    +    if ip_address := _parse_hostname_ip(normalized):
    +        if not ip_address.is_global:
    +            raise ValidationError(gettext("URL domain is not allowed."))
    +        return
    +
    +    lowered = normalized.lower()
    +    if lowered == "localhost" or lowered.endswith(LOCAL_HOST_SUFFIXES):
    +        raise ValidationError(gettext("URL domain is not allowed."))
    +    if "." not in normalized:
    +        raise ValidationError(gettext("URL domain is not allowed."))
    +
    +
    +def validate_outbound_url(
    +    value: str,
    +    *,
    +    allow_private_targets: bool = True,
    +    allowed_domains: list[str] | tuple[str, ...] = (),
    +) -> None:
    +    if allow_private_targets:
    +        return
    +
    +    hostname = urlparse(value).hostname
    +    if not hostname:
    +        raise ValidationError(gettext("Could not parse URL."))
    +
    +    validate_untrusted_hostname(hostname, allowed_domains=allowed_domains)
    +
    +
    +def validate_outbound_hostname(
    +    value: str,
    +    *,
    +    allow_private_targets: bool = True,
    +    allowed_domains: list[str] | tuple[str, ...] = (),
    +) -> None:
    +    if allow_private_targets:
    +        return
    +
    +    validate_untrusted_hostname(value, allowed_domains=allowed_domains)
    +
    +
    +def validate_runtime_hostname(
    +    value: str, *, allow_private_targets: bool = True
    +) -> None:
    +    if allow_private_targets:
    +        return
    +
    +    normalized = _normalize_hostname(value)
    +
    +    if ip_address := _parse_hostname_ip(normalized):
    +        validate_runtime_ip(
    +            str(ip_address), allow_private_targets=allow_private_targets
    +        )
    +        return
    +
    +    try:
    +        addresses = socket.getaddrinfo(normalized, None, type=socket.SOCK_STREAM)
    +    except OSError as error:
    +        raise ValidationError(
    +            gettext("Could not resolve the URL domain: {}").format(error)
    +        ) from error
    +
    +    for _family, _type, _proto, _canonname, sockaddr in addresses:
    +        address = sockaddr[0]
    +        if isinstance(address, str):
    +            validate_runtime_ip(address, allow_private_targets=allow_private_targets)
    +
    +
    +def validate_runtime_url(value: str, *, allow_private_targets: bool = True) -> None:
    +    if allow_private_targets:
    +        return
    +
    +    hostname = urlparse(value).hostname
    +    if not hostname:
    +        raise ValidationError(gettext("Could not parse URL."))
    +
    +    validate_runtime_hostname(hostname, allow_private_targets=allow_private_targets)
    
  • weblate/utils/requests.py+240 48 modified
    @@ -4,114 +4,306 @@
     
     from __future__ import annotations
     
    +from collections.abc import Callable
     from contextlib import contextmanager
     from typing import TYPE_CHECKING
     from urllib.parse import urljoin
     
     import requests
     from django.core.cache import cache
    +from requests import Response
    +from requests.utils import select_proxy
     
     from weblate.logger import LOGGER
    +from weblate.utils.outbound import (
    +    validate_outbound_url,
    +    validate_runtime_ip,
    +    validate_runtime_url,
    +)
     from weblate.utils.validators import validate_asset_url
     from weblate.utils.version import USER_AGENT
     
     if TYPE_CHECKING:
         from collections.abc import Generator
     
    -    from requests import Response
     
    +RequestValidator = Callable[[str], None]
    +ResponseValidator = Callable[[Response, bool], None]
     
    -def http_request(
    -    method: str,
    +
    +def _prepare_headers(headers: dict[str, str] | None) -> dict[str, str]:
    +    agent = {"User-Agent": USER_AGENT}
    +    return {**headers, **agent} if headers is not None else agent
    +
    +
    +def validate_request_url(
         url: str,
         *,
    -    headers: dict[str, str] | None = None,
    -    timeout: float = 5,
    -    raise_for_status: bool = True,
    -    allow_redirects: bool = True,
    -    stream: bool = False,
    -    **kwargs,
    -) -> Response:
    -    agent = {"User-Agent": USER_AGENT}
    -    headers = {**headers, **agent} if headers is not None else agent
    -    response = requests.request(
    -        method,
    -        url,
    -        headers=headers,
    -        timeout=timeout,
    -        allow_redirects=allow_redirects,
    -        stream=stream,
    -        **kwargs,
    -    )
    -    if raise_for_status:
    -        response.raise_for_status()
    -    return response
    +    allow_private_targets: bool = True,
    +    allowed_domains: list[str] | tuple[str, ...] = (),
    +) -> None:
    +    with requests.Session() as session:
    +        request_settings = session.merge_environment_settings(
    +            url,
    +            {},
    +            False,
    +            None,
    +            None,
    +        )
    +    used_proxy = select_proxy(url, request_settings["proxies"]) is not None
    +    if used_proxy:
    +        validate_outbound_url(
    +            url,
    +            allow_private_targets=allow_private_targets,
    +            allowed_domains=allowed_domains,
    +        )
    +        return
     
    +    validate_runtime_url(url, allow_private_targets=allow_private_targets)
     
    -@contextmanager
    -def asset_request(
    +
    +def _strip_redirect_auth(
    +    session: requests.Session,
    +    request_headers: dict[str, str],
    +    request_kwargs: dict,
    +    current_url: str,
    +    next_url: str,
    +) -> None:
    +    if not session.should_strip_auth(current_url, next_url):
    +        return
    +
    +    request_headers.pop("Authorization", None)
    +    request_headers.pop("Proxy-Authorization", None)
    +    request_kwargs.pop("auth", None)
    +
    +
    +def _should_redirect_to_get(status_code: int, method: str) -> bool:
    +    normalized_method = method.upper()
    +    if normalized_method == "HEAD":
    +        return False
    +    if status_code == 303:
    +        return True
    +    if status_code == 302:
    +        return True
    +    return status_code == 301 and normalized_method == "POST"
    +
    +
    +def _request_with_redirects(
         method: str,
         url: str,
         *,
         headers: dict[str, str] | None = None,
         timeout: float = 5,
    -    raise_for_status: bool = True,
    +    allow_redirects: bool = True,
    +    stream: bool = False,
         max_redirects: int = 5,
    +    validate_url: RequestValidator | None = None,
    +    validate_proxied_url: RequestValidator | None = None,
    +    validate_response: ResponseValidator | None = None,
         **kwargs,
    -) -> Generator[Response, None, None]:
    -    """Fetch an asset while validating each redirect target before following it."""
    +) -> Response:
    +    request_kwargs = kwargs.copy()
    +    request_kwargs["allow_redirects"] = False
         history: list[Response] = []
         current_url = url
         current_method = method
    -    request_kwargs = kwargs.copy()
    -    request_kwargs["allow_redirects"] = False
    -
    -    agent = {"User-Agent": USER_AGENT}
    -    request_headers = {**headers, **agent} if headers is not None else agent
    +    request_headers = _prepare_headers(headers)
     
         with requests.Session() as session:
             for _ in range(max_redirects + 1):
    -            validate_asset_url(current_url)
    +            request_settings = session.merge_environment_settings(
    +                current_url,
    +                request_kwargs.get("proxies") or {},
    +                stream,
    +                request_kwargs.get("verify"),
    +                request_kwargs.get("cert"),
    +            )
    +            used_proxy = (
    +                select_proxy(current_url, request_settings["proxies"]) is not None
    +            )
    +            if validate_url is not None:
    +                validator = (
    +                    validate_proxied_url
    +                    if used_proxy and validate_proxied_url is not None
    +                    else validate_url
    +                )
    +                validator(current_url)
                 response = session.request(
                     current_method,
                     current_url,
                     headers=request_headers,
                     timeout=timeout,
    +                stream=stream,
                     **request_kwargs,
                 )
                 response.history = history.copy()
    -            if not response.is_redirect:
    +            if validate_response is not None:
                     try:
    -                    if raise_for_status:
    -                        response.raise_for_status()
    -                    with response:
    -                        yield response
    -                finally:
    +                    validate_response(response, used_proxy)
    +                except Exception:
                         response.close()
    -                return
    +                    raise
    +
    +            if not allow_redirects or not response.is_redirect:
    +                return response
     
                 try:
                     next_url = urljoin(response.url, response.headers["location"])
    -                validate_asset_url(next_url)
    +                _strip_redirect_auth(
    +                    session, request_headers, request_kwargs, current_url, next_url
    +                )
                     session.cookies.update(response.cookies)
                     history.append(response)
    -            finally:
    +            except Exception:
                     response.close()
    +                raise
    +
                 current_url = next_url
     
    -            if (
    -                response.status_code in {301, 302, 303}
    -                and current_method.upper() != "HEAD"
    -            ):
    +            if _should_redirect_to_get(response.status_code, current_method):
                     current_method = "GET"
                     request_kwargs.pop("data", None)
                     request_kwargs.pop("json", None)
                     request_kwargs.pop("files", None)
     
    +            response.close()
    +
         msg = f"Exceeded {max_redirects} redirects."
         raise requests.TooManyRedirects(msg)
     
     
    +def _get_response_peer_ip(response: Response) -> str | None:
    +    try:
    +        connection = getattr(response.raw, "connection", None)
    +        if connection is None or connection.sock is None:
    +            return None
    +        peer = connection.sock.getpeername()
    +    except (AttributeError, OSError):
    +        return None
    +
    +    if isinstance(peer, tuple) and peer:
    +        return str(peer[0])
    +    return None
    +
    +
    +def _validate_response_peer(
    +    response: Response, *, allow_private_targets: bool, used_proxy: bool = False
    +) -> None:
    +    if allow_private_targets:
    +        return
    +    if used_proxy:
    +        return
    +
    +    if (peer_ip := _get_response_peer_ip(response)) is None:
    +        LOGGER.warning(
    +            "Skipping peer IP validation for direct request to %s because the "
    +            "connected peer address could not be determined.",
    +            response.url,
    +        )
    +        return
    +
    +    validate_runtime_ip(peer_ip, allow_private_targets=allow_private_targets)
    +
    +
    +def http_request(
    +    method: str,
    +    url: str,
    +    *,
    +    headers: dict[str, str] | None = None,
    +    timeout: float = 5,
    +    raise_for_status: bool = True,
    +    allow_redirects: bool = True,
    +    stream: bool = False,
    +    validate_url: bool = False,
    +    allow_private_targets: bool = True,
    +    allowed_domains: list[str] | tuple[str, ...] = (),
    +    max_redirects: int = 5,
    +    **kwargs,
    +) -> Response:
    +    if validate_url and stream:
    +        msg = "Streaming requests are not supported with URL validation enabled."
    +        raise ValueError(msg)
    +
    +    if not validate_url:
    +        response = requests.request(
    +            method,
    +            url,
    +            headers=_prepare_headers(headers),
    +            timeout=timeout,
    +            allow_redirects=allow_redirects,
    +            stream=stream,
    +            **kwargs,
    +        )
    +        if raise_for_status:
    +            response.raise_for_status()
    +        return response
    +
    +    response = _request_with_redirects(
    +        method,
    +        url,
    +        headers=headers,
    +        timeout=timeout,
    +        allow_redirects=allow_redirects,
    +        # Keep response socket open so peer IP can be validated per request.
    +        stream=True,
    +        max_redirects=max_redirects,
    +        validate_url=lambda request_url: validate_runtime_url(
    +            request_url, allow_private_targets=allow_private_targets
    +        ),
    +        validate_proxied_url=lambda request_url: validate_outbound_url(
    +            request_url,
    +            allow_private_targets=allow_private_targets,
    +            allowed_domains=allowed_domains,
    +        ),
    +        validate_response=lambda response, used_proxy: _validate_response_peer(
    +            response,
    +            allow_private_targets=allow_private_targets,
    +            used_proxy=used_proxy,
    +        ),
    +        **kwargs,
    +    )
    +    try:
    +        if raise_for_status:
    +            response.raise_for_status()
    +        # Preserve non-streaming behavior for callers by eagerly consuming body.
    +        _ = response.content
    +    finally:
    +        response.close()
    +    return response
    +
    +
    +@contextmanager
    +def asset_request(
    +    method: str,
    +    url: str,
    +    *,
    +    headers: dict[str, str] | None = None,
    +    timeout: float = 5,
    +    raise_for_status: bool = True,
    +    max_redirects: int = 5,
    +    **kwargs,
    +) -> Generator[Response, None, None]:
    +    """Fetch an asset while validating each redirect target before following it."""
    +    stream = kwargs.pop("stream", False)
    +    response = _request_with_redirects(
    +        method,
    +        url,
    +        headers=headers,
    +        timeout=timeout,
    +        stream=stream,
    +        max_redirects=max_redirects,
    +        validate_url=validate_asset_url,
    +        **kwargs,
    +    )
    +    try:
    +        if raise_for_status:
    +            response.raise_for_status()
    +        with response:
    +            yield response
    +    finally:
    +        response.close()
    +
    +
     def get_uri_error(uri: str) -> str | None:
         """Return error for fetching the URL or None if it works."""
         if uri.startswith("https://nonexisting.weblate.org/"):
    
  • weblate/utils/tests/test_requests.py+387 1 modified
    @@ -2,12 +2,16 @@
     #
     # SPDX-License-Identifier: GPL-3.0-or-later
     
    +import os
    +from unittest.mock import Mock, patch
    +
     import responses
     from django.core.exceptions import ValidationError
     from django.test import SimpleTestCase
     from django.test.utils import override_settings
    +from requests.cookies import RequestsCookieJar
     
    -from weblate.utils.requests import asset_request, get_uri_error
    +from weblate.utils.requests import asset_request, get_uri_error, http_request
     
     
     class AssetRequestTest(SimpleTestCase):
    @@ -121,3 +125,385 @@ def test_get_uri_error_does_not_follow_redirects(self) -> None:
                 responses.calls[0].request.url,
                 "https://example.com/source",
             )
    +
    +
    +class HTTPRequestValidationTest(SimpleTestCase):
    +    def test_http_request_rejects_streaming_validation(self) -> None:
    +        with self.assertRaisesMessage(
    +            ValueError,
    +            "Streaming requests are not supported with URL validation enabled.",
    +        ):
    +            http_request(
    +                "get",
    +                "https://public.example.com/source",
    +                validate_url=True,
    +                stream=True,
    +            )
    +
    +    def test_http_request_strips_auth_on_cross_origin_redirect(self) -> None:
    +        recorded_calls: list[tuple[dict[str, str], bool]] = []
    +        redirect_response = Mock()
    +        redirect_response.is_redirect = True
    +        redirect_response.url = "https://public.example.com/source"
    +        redirect_response.headers = {"location": "https://other.example.com/final"}
    +        redirect_response.cookies = RequestsCookieJar()
    +        redirect_response.history = []
    +        redirect_response.close = Mock()
    +
    +        final_response = Mock()
    +        final_response.is_redirect = False
    +        final_response.url = "https://other.example.com/final"
    +        final_response.headers = {}
    +        final_response.history = []
    +        final_response.raise_for_status = Mock()
    +        final_response.content = b"ok"
    +
    +        with patch("requests.sessions.Session.request") as mocked_request:
    +
    +            def record_request(*args, **kwargs):
    +                recorded_calls.append((dict(kwargs["headers"]), "auth" in kwargs))
    +                if len(recorded_calls) == 1:
    +                    return redirect_response
    +                return final_response
    +
    +            mocked_request.side_effect = record_request
    +
    +            http_request(
    +                "get",
    +                "https://public.example.com/source",
    +                validate_url=True,
    +                headers={"Authorization": "Bearer secret"},
    +                auth=("user", "pass"),
    +                allow_redirects=True,
    +            )
    +
    +        self.assertEqual(mocked_request.call_count, 2)
    +        self.assertEqual(recorded_calls[0][0]["Authorization"], "Bearer secret")
    +        self.assertNotIn("Authorization", recorded_calls[1][0])
    +        self.assertFalse(recorded_calls[1][1])
    +
    +    def test_http_request_preserves_delete_method_on_301_redirect(self) -> None:
    +        recorded_calls: list[tuple[str, dict[str, object]]] = []
    +        redirect_response = Mock()
    +        redirect_response.is_redirect = True
    +        redirect_response.status_code = 301
    +        redirect_response.url = "https://public.example.com/source"
    +        redirect_response.headers = {"location": "https://public.example.com/final"}
    +        redirect_response.cookies = RequestsCookieJar()
    +        redirect_response.history = []
    +        redirect_response.close = Mock()
    +
    +        final_response = Mock()
    +        final_response.is_redirect = False
    +        final_response.url = "https://public.example.com/final"
    +        final_response.headers = {}
    +        final_response.history = []
    +        final_response.raise_for_status = Mock()
    +        final_response.content = b"ok"
    +
    +        with patch("requests.sessions.Session.request") as mocked_request:
    +
    +            def record_request(*args, **kwargs):
    +                recorded_calls.append((args[0], dict(kwargs)))
    +                if len(recorded_calls) == 1:
    +                    return redirect_response
    +                return final_response
    +
    +            mocked_request.side_effect = record_request
    +
    +            http_request(
    +                "delete",
    +                "https://public.example.com/source",
    +                validate_url=True,
    +                allow_redirects=True,
    +                data=b"payload",
    +            )
    +
    +        self.assertEqual(mocked_request.call_count, 2)
    +        self.assertEqual(recorded_calls[0][0], "delete")
    +        self.assertEqual(recorded_calls[1][0], "delete")
    +        self.assertEqual(recorded_calls[1][1]["data"], b"payload")
    +
    +    @responses.activate
    +    @patch(
    +        "weblate.utils.outbound.socket.getaddrinfo",
    +        return_value=[(0, 0, 0, "", ("127.0.0.1", 443))],
    +    )
    +    def test_http_request_blocks_private_target(self, mocked_getaddrinfo) -> None:
    +        responses.add(
    +            responses.GET,
    +            "https://public.example.com/source",
    +            status=200,
    +            body=b"should-not-be-fetched",
    +        )
    +
    +        with self.assertRaises(ValidationError):
    +            http_request(
    +                "get",
    +                "https://public.example.com/source",
    +                validate_url=True,
    +                allow_private_targets=False,
    +            )
    +
    +        mocked_getaddrinfo.assert_called_once_with("public.example.com", None, type=1)
    +        self.assertEqual(len(responses.calls), 0)
    +
    +    @responses.activate
    +    @patch("weblate.utils.requests._get_response_peer_ip", return_value="93.184.216.34")
    +    @patch(
    +        "weblate.utils.outbound.socket.getaddrinfo",
    +        side_effect=[
    +            [(0, 0, 0, "", ("93.184.216.34", 443))],
    +            [(0, 0, 0, "", ("127.0.0.1", 443))],
    +        ],
    +    )
    +    def test_http_request_blocks_private_redirect(
    +        self, mocked_getaddrinfo, mocked_get_peer
    +    ) -> None:
    +        responses.add(
    +            responses.GET,
    +            "https://public.example.com/source",
    +            status=302,
    +            headers={"Location": "https://private.example.com/final"},
    +        )
    +        responses.add(
    +            responses.GET,
    +            "https://private.example.com/final",
    +            status=200,
    +            body=b"should-not-be-fetched",
    +        )
    +
    +        with self.assertRaises(ValidationError):
    +            http_request(
    +                "get",
    +                "https://public.example.com/source",
    +                validate_url=True,
    +                allow_private_targets=False,
    +            )
    +
    +        self.assertEqual(mocked_getaddrinfo.call_count, 2)
    +        mocked_get_peer.assert_called_once()
    +        self.assertEqual(len(responses.calls), 1)
    +
    +    @responses.activate
    +    @patch("weblate.utils.requests._get_response_peer_ip", return_value="127.0.0.1")
    +    @patch(
    +        "weblate.utils.outbound.socket.getaddrinfo",
    +        return_value=[(0, 0, 0, "", ("93.184.216.34", 443))],
    +    )
    +    def test_http_request_blocks_private_peer_after_public_dns(
    +        self, mocked_getaddrinfo, mocked_get_peer
    +    ) -> None:
    +        responses.add(
    +            responses.GET,
    +            "https://public.example.com/source",
    +            status=200,
    +            body=b"should-not-be-fetched",
    +        )
    +
    +        with self.assertRaises(ValidationError):
    +            http_request(
    +                "get",
    +                "https://public.example.com/source",
    +                validate_url=True,
    +                allow_private_targets=False,
    +            )
    +
    +        mocked_getaddrinfo.assert_called_once_with("public.example.com", None, type=1)
    +        mocked_get_peer.assert_called_once()
    +        self.assertEqual(len(responses.calls), 1)
    +
    +    @responses.activate
    +    @patch("weblate.utils.requests._get_response_peer_ip")
    +    @patch(
    +        "weblate.utils.outbound.socket.getaddrinfo",
    +        return_value=[(0, 0, 0, "", ("93.184.216.34", 443))],
    +    )
    +    def test_http_request_skips_peer_validation_through_proxy(
    +        self, mocked_getaddrinfo, mocked_get_peer
    +    ) -> None:
    +        responses.add(
    +            responses.GET,
    +            "https://public.example.com/source",
    +            status=200,
    +            body=b"fetched-via-proxy",
    +        )
    +
    +        with patch.dict(
    +            os.environ,
    +            {
    +                "HTTPS_PROXY": "http://127.0.0.1:8080",
    +                "HTTP_PROXY": "",
    +                "ALL_PROXY": "",
    +                "NO_PROXY": "",
    +            },
    +        ):
    +            response = http_request(
    +                "get",
    +                "https://public.example.com/source",
    +                validate_url=True,
    +                allow_private_targets=False,
    +            )
    +
    +        self.assertEqual(response.content, b"fetched-via-proxy")
    +        mocked_getaddrinfo.assert_not_called()
    +        mocked_get_peer.assert_not_called()
    +        self.assertEqual(len(responses.calls), 1)
    +
    +    @responses.activate
    +    @patch("weblate.utils.requests._get_response_peer_ip")
    +    @patch(
    +        "weblate.utils.outbound.socket.getaddrinfo",
    +        side_effect=OSError("Name or service not known"),
    +    )
    +    def test_http_request_allows_proxy_resolved_hostname(
    +        self, mocked_getaddrinfo, mocked_get_peer
    +    ) -> None:
    +        responses.add(
    +            responses.GET,
    +            "https://public.example.com/source",
    +            status=200,
    +            body=b"resolved-by-proxy",
    +        )
    +
    +        with patch.dict(
    +            os.environ,
    +            {
    +                "HTTPS_PROXY": "http://127.0.0.1:8080",
    +                "HTTP_PROXY": "",
    +                "ALL_PROXY": "",
    +                "NO_PROXY": "",
    +            },
    +        ):
    +            response = http_request(
    +                "get",
    +                "https://public.example.com/source",
    +                validate_url=True,
    +                allow_private_targets=False,
    +            )
    +
    +        self.assertEqual(response.content, b"resolved-by-proxy")
    +        mocked_getaddrinfo.assert_not_called()
    +        mocked_get_peer.assert_not_called()
    +        self.assertEqual(len(responses.calls), 1)
    +
    +    @responses.activate
    +    @patch("weblate.utils.requests._get_response_peer_ip")
    +    @patch(
    +        "weblate.utils.outbound.socket.getaddrinfo",
    +        side_effect=OSError("Name or service not known"),
    +    )
    +    def test_http_request_allows_allowlisted_hostname_through_proxy(
    +        self, mocked_getaddrinfo, mocked_get_peer
    +    ) -> None:
    +        responses.add(
    +            responses.GET,
    +            "http://ollama/api/tags",
    +            status=200,
    +            body=b'{"models":[]}',
    +        )
    +
    +        with patch.dict(
    +            os.environ,
    +            {
    +                "HTTP_PROXY": "http://127.0.0.1:8080",
    +                "HTTPS_PROXY": "",
    +                "ALL_PROXY": "",
    +                "NO_PROXY": "",
    +            },
    +        ):
    +            response = http_request(
    +                "get",
    +                "http://ollama/api/tags",
    +                validate_url=True,
    +                allow_private_targets=False,
    +                allowed_domains=["ollama"],
    +            )
    +
    +        self.assertEqual(response.content, b'{"models":[]}')
    +        mocked_getaddrinfo.assert_not_called()
    +        mocked_get_peer.assert_not_called()
    +        self.assertEqual(len(responses.calls), 1)
    +
    +    @responses.activate
    +    def test_http_request_blocks_localhost_alias_through_proxy(self) -> None:
    +        responses.add(
    +            responses.GET,
    +            "http://localhost./source",
    +            status=200,
    +            body=b"should-not-be-fetched",
    +        )
    +
    +        with (
    +            patch.dict(
    +                os.environ,
    +                {
    +                    "HTTP_PROXY": "http://127.0.0.1:8080",
    +                    "HTTPS_PROXY": "",
    +                    "ALL_PROXY": "",
    +                    "NO_PROXY": "",
    +                },
    +            ),
    +            self.assertRaises(ValidationError),
    +        ):
    +            http_request(
    +                "get",
    +                "http://localhost./source",
    +                validate_url=True,
    +                allow_private_targets=False,
    +            )
    +
    +        self.assertEqual(len(responses.calls), 0)
    +
    +    @responses.activate
    +    def test_http_request_blocks_shorthand_loopback_through_proxy(self) -> None:
    +        responses.add(
    +            responses.GET,
    +            "http://127.1/source",
    +            status=200,
    +            body=b"should-not-be-fetched",
    +        )
    +
    +        with (
    +            patch.dict(
    +                os.environ,
    +                {
    +                    "HTTP_PROXY": "http://127.0.0.1:8080",
    +                    "HTTPS_PROXY": "",
    +                    "ALL_PROXY": "",
    +                    "NO_PROXY": "",
    +                },
    +            ),
    +            self.assertRaises(ValidationError),
    +        ):
    +            http_request(
    +                "get",
    +                "http://127.1/source",
    +                validate_url=True,
    +                allow_private_targets=False,
    +            )
    +
    +        self.assertEqual(len(responses.calls), 0)
    +
    +    @patch("weblate.utils.requests.LOGGER.warning")
    +    @patch("weblate.utils.requests._get_response_peer_ip", return_value=None)
    +    def test_http_request_logs_when_peer_ip_is_unavailable(
    +        self, mocked_get_peer, mocked_warning
    +    ) -> None:
    +        response = Mock()
    +        response.url = "https://public.example.com/source"
    +
    +        from weblate.utils.requests import _validate_response_peer
    +
    +        _validate_response_peer(
    +            response,
    +            allow_private_targets=False,
    +            used_proxy=False,
    +        )
    +
    +        mocked_get_peer.assert_called_once_with(response)
    +        mocked_warning.assert_called_once_with(
    +            "Skipping peer IP validation for direct request to %s because the "
    +            "connected peer address could not be determined.",
    +            "https://public.example.com/source",
    +        )
    
  • weblate/utils/tests/test_validators.py+18 0 modified
    @@ -21,6 +21,8 @@
         validate_backup_path,
         validate_filename,
         validate_fullname,
    +    validate_machinery_hostname,
    +    validate_machinery_url,
         validate_project_web,
         validate_re,
         validate_repo_url,
    @@ -277,6 +279,22 @@ def test_asset_url_validator(self) -> None:
             with self.assertRaises(ValidationError):
                 validate_asset_url("https://blocked.example.com/image.png")
     
    +    def test_machinery_url_validator(self) -> None:
    +        validate_machinery_url("http://127.0.0.1:11434", allow_private_targets=True)
    +        validate_machinery_url("https://api.deepl.com/v2/", allow_private_targets=False)
    +        with self.assertRaises(ValidationError):
    +            validate_machinery_url(
    +                "http://127.0.0.1:11434", allow_private_targets=False
    +            )
    +
    +    @override_settings(ALLOWED_MACHINERY_DOMAINS=["ollama"])
    +    def test_machinery_hostname_allowlist(self) -> None:
    +        validate_machinery_hostname("ollama", allow_private_targets=False)
    +
    +    def test_machinery_hostname_rejects_loopback_with_port(self) -> None:
    +        with self.assertRaises(ValidationError):
    +            validate_machinery_hostname("127.0.0.1:11434", allow_private_targets=False)
    +
     
     class BackupTest(SimpleTestCase):
         def test_ssh(self) -> None:
    
  • weblate/utils/validators.py+20 0 modified
    @@ -38,6 +38,7 @@
     from weblate.utils.data import data_dir
     from weblate.utils.errors import report_error
     from weblate.utils.files import is_excluded
    +from weblate.utils.outbound import validate_outbound_hostname, validate_outbound_url
     from weblate.utils.regex import REGEX_TIMEOUT, compile_regex
     
     USERNAME_MATCHER = re.compile(r"^[\w@+-][\w.@+-]*$")
    @@ -404,6 +405,25 @@ def validate_asset_url(value: str) -> None:
             raise ValidationError(gettext("URL domain is not allowed."))
     
     
    +def validate_machinery_url(value: str, *, allow_private_targets: bool = True) -> None:
    +    WeblateServiceURLValidator()(value)
    +    validate_outbound_url(
    +        value,
    +        allow_private_targets=allow_private_targets,
    +        allowed_domains=settings.ALLOWED_MACHINERY_DOMAINS,
    +    )
    +
    +
    +def validate_machinery_hostname(
    +    value: str, *, allow_private_targets: bool = True
    +) -> None:
    +    validate_outbound_hostname(
    +        value,
    +        allow_private_targets=allow_private_targets,
    +        allowed_domains=settings.ALLOWED_MACHINERY_DOMAINS,
    +    )
    +
    +
     class WeblateEditorURLValidator(WeblateURLValidator):
         schemes: list[str] = [  # noqa: RUF012
             "editor",
    

Vulnerability mechanics

Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

5

News mentions

0

No linked articles in our index yet.