VYPR
High severity8.1NVD Advisory· Published May 7, 2026· Updated May 11, 2026

CVE-2026-41654

CVE-2026-41654

Description

Weblate is a web based localization tool. Prior to version 5.17.1, an authenticated user with project.add permission (default on hosted Weblate SaaS and for any user holding an active billing/trial plan) can import a crafted project backup ZIP whose components/<name>.json contains an attacker-chosen repo URL pointing at a private address (e.g. http://127.0.0.1:9999/) or using a non-allow-listed scheme (e.g. file://, git://). Weblate persists the component via Component.objects.bulk_create([component])[0], which bypasses Django's full_clean() and therefore never runs the validate_repo_url validator. The URL is subsequently written verbatim into .git/config by configure_repo(pull=False). This issue has been patched in version 5.17.1.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
weblatePyPI
< 5.17.15.17.1

Patches

2
e1eff1f517c1

fix(vcs): annotate remote operation for validation

https://github.com/WeblateOrg/weblateMichal ČihařApr 17, 2026via ghsa
13 files changed · +426 140
  • weblate/addons/git.py+19 14 modified
    @@ -44,7 +44,7 @@ def squash_repo(
             author: str | None = None,
         ) -> None:
             message = self.get_squash_commit_message(repository, "%B", remote)
    -        repository.execute(["reset", "--mixed", remote])
    +        repository.execute(["reset", "--mixed", remote], remote_op="none")
             # Can happen for added and removed translation
             component.commit_files(
                 author=author, message=message, signals=False, skip_push=True
    @@ -78,7 +78,7 @@ def get_git_commit_messages(
             if filenames:
                 command += ["--", *filenames]
     
    -        return repository.execute(command)
    +        return repository.execute(command, remote_op="none")
     
         def get_squash_commit_message(
             self,
    @@ -100,7 +100,7 @@ def get_squash_commit_message(
     
                 trailer_lines = set()
                 change_id_line = None
    -            for trailer in repository.execute(command).split("\n"):
    +            for trailer in repository.execute(command, remote_op="none").split("\n"):
                     # Skip blank lines
                     if not trailer.strip():
                         continue
    @@ -158,7 +158,7 @@ def squash_language(self, component: Component, repository: GitRepository) -> No
                     repository, "%B", remote, filenames
                 )
     
    -        repository.execute(["reset", "--mixed", remote])
    +        repository.execute(["reset", "--mixed", remote], remote_op="none")
     
             for code, message in messages.items():
                 if not message:
    @@ -178,7 +178,7 @@ def squash_file(self, component: Component, repository: GitRepository) -> None:
                         repository, "%B", remote, [filename]
                     )
     
    -        repository.execute(["reset", "--mixed", remote])
    +        repository.execute(["reset", "--mixed", remote], remote_op="none")
     
             for filename, message in messages.items():
                 if not message:
    @@ -194,7 +194,8 @@ def squash_author(self, component: Component, repository: GitRepository) -> None
                 x.split(None, 1)
                 for x in reversed(
                     repository.execute(
    -                    ["log", "--no-merges", "--format=%H %aE", f"{remote}..HEAD"]
    +                    ["log", "--no-merges", "--format=%H %aE", f"{remote}..HEAD"],
    +                    remote_op="none",
                     ).splitlines()
                 )
             ]
    @@ -204,9 +205,9 @@ def squash_author(self, component: Component, repository: GitRepository) -> None
             repository.delete_branch(tmp)
             try:
                 # Create local branch for upstream
    -            repository.execute(["branch", tmp, remote])
    +            repository.execute(["branch", tmp, remote], remote_op="none")
                 # Checkout upstream branch
    -            repository.execute(["checkout", tmp])
    +            repository.execute(["checkout", tmp], remote_op="none")
                 while commits:
                     commit, author = commits.pop(0)
                     # Remember current revision for final squash
    @@ -216,11 +217,12 @@ def squash_author(self, component: Component, repository: GitRepository) -> None
                     try:
                         repository.execute(
                             ["cherry-pick", "--empty=drop", commit, *gpg_sign],
    +                        remote_op="none",
                             environment={"WEBLATE_MERGE_SKIP": "1"},
                         )
                     except RepositoryError:
                         if repository.has_git_file("CHERRY_PICK_HEAD"):
    -                        repository.execute(["cherry-pick", "--abort"])
    +                        repository.execute(["cherry-pick", "--abort"], remote_op="none")
                         raise
                     handled = []
                     # Pick other commits by same author
    @@ -230,14 +232,17 @@ def squash_author(self, component: Component, repository: GitRepository) -> None
                         try:
                             repository.execute(
                                 ["cherry-pick", "--empty=drop", other[0], *gpg_sign],
    +                            remote_op="none",
                                 environment={"WEBLATE_MERGE_SKIP": "1"},
                             )
                             handled.append(i)
                         except RepositoryError:
                             # If fails, continue to another author, we will
                             # pick this commit later (it depends on some other)
                             if repository.has_git_file("CHERRY_PICK_HEAD"):
    -                            repository.execute(["cherry-pick", "--abort"])
    +                            repository.execute(
    +                                ["cherry-pick", "--abort"], remote_op="none"
    +                            )
                             break
                     # Remove processed commits from list
                     for i in reversed(handled):
    @@ -246,15 +251,15 @@ def squash_author(self, component: Component, repository: GitRepository) -> None
                     self.squash_repo(component, repository, base, author)
     
                 # Update working copy with squashed commits
    -            repository.execute(["checkout", repository.branch])
    -            repository.execute(["reset", "--hard", tmp])
    +            repository.execute(["checkout", repository.branch], remote_op="none")
    +            repository.execute(["reset", "--hard", tmp], remote_op="none")
                 repository.delete_branch(tmp)
     
             except Exception:
                 report_error("Failed squash", project=component.project)
                 # Revert to original branch without any changes
    -            repository.execute(["reset", "--hard"])
    -            repository.execute(["checkout", repository.branch])
    +            repository.execute(["reset", "--hard"], remote_op="none")
    +            repository.execute(["checkout", repository.branch], remote_op="none")
                 repository.delete_branch(tmp)
     
         def post_commit(
    
  • weblate/addons/tests.py+8 2 modified
    @@ -3373,7 +3373,10 @@ def test_cleanup(self) -> None:
             addon = CleanupAddon.create(component=self.component)
             # Unshallow the local repo
             with self.component.repository.lock:
    -            self.component.repository.execute(["fetch", "--unshallow", "origin"])
    +            self.component.repository.execute(
    +                ["fetch", "--unshallow", "origin"],
    +                remote_op="pull",
    +            )
             addon.post_update(
                 self.component, "da07dc0dc7052dc44eadfa8f3a2f2609ec634303", False
             )
    @@ -3387,7 +3390,10 @@ def test_update(self) -> None:
             rev = self.component.repository.last_revision
             # Unshallow the local repo
             with self.component.repository.lock:
    -            self.component.repository.execute(["fetch", "--unshallow", "origin"])
    +            self.component.repository.execute(
    +                ["fetch", "--unshallow", "origin"],
    +                remote_op="pull",
    +            )
             addon.post_update(
                 self.component, "da07dc0dc7052dc44eadfa8f3a2f2609ec634303", False
             )
    
  • weblate/api/tests.py+3 1 modified
    @@ -1953,7 +1953,9 @@ def __init__(self) -> None:
                 def has_branch(self, _branch: str) -> bool:
                     return False
     
    -            def execute(self, args: list[str]) -> None:
    +            def execute(self, args: list[str], *, remote_op: str) -> None:
    +                if remote_op != "none":
    +                    raise AssertionError(remote_op)
                     calls.append(args)
     
                 def clean_revision_cache(self) -> None:
    
  • weblate/gitexport/tests.py+5 0 modified
    @@ -306,6 +306,7 @@ def test_ignore_missing_requested_have(self) -> None:
             self.mark_component_shallow()
             present_revision = self.component.repository.execute(
                 ["rev-parse", "HEAD"],
    +            remote_op="none",
                 needs_lock=False,
             ).strip()
             body = (
    @@ -320,6 +321,7 @@ def test_ignore_missing_requested_have_with_present_want(self) -> None:
             self.mark_component_shallow()
             present_revision = self.component.repository.execute(
                 ["rev-parse", "HEAD"],
    +            remote_op="none",
                 needs_lock=False,
             ).strip()
             body = (
    @@ -333,6 +335,7 @@ def test_ignore_missing_requested_have_with_present_want(self) -> None:
         def test_stop_parsing_after_initial_want_block(self) -> None:
             present_revision = self.component.repository.execute(
                 ["rev-parse", "HEAD"],
    +            remote_op="none",
                 needs_lock=False,
             ).strip()
             body = (
    @@ -410,6 +413,7 @@ def test_missing_have_does_not_short_circuit_upload_pack(self) -> None:
             self.mark_component_shallow()
             present_revision = self.component.repository.execute(
                 ["rev-parse", "HEAD"],
    +            remote_op="none",
                 needs_lock=False,
             ).strip()
             body = (
    @@ -533,6 +537,7 @@ def create_export_commit(self) -> str:
                 self.component.repository.commit("Test export change", files=[filename])
             return self.component.repository.execute(
                 ["rev-parse", "HEAD"],
    +            remote_op="none",
                 needs_lock=False,
             ).strip()
     
    
  • weblate/gitexport/views.py+1 0 modified
    @@ -229,6 +229,7 @@ def get_precheck_failure_reason(component: Component, body: bytes) -> str | None
         try:
             output = execute(
                 ["cat-file", "--batch-check"],
    +            remote_op="none",
                 needs_lock=False,
                 stdin="".join(f"{revision}\n" for revision in wanted_revisions),
             )
    
  • weblate/trans/component_copy.py+2 2 modified
    @@ -122,9 +122,9 @@ def normalize_local_copy_branch(component: Component) -> None:
     
         with repository.lock:
             if repository.has_branch(component.branch):
    -            repository.execute(["checkout", component.branch])
    +            repository.execute(["checkout", component.branch], remote_op="none")
             else:
    -            repository.execute(["checkout", "-B", component.branch])
    +            repository.execute(["checkout", "-B", component.branch], remote_op="none")
             repository.branch = component.branch
             repository.clean_revision_cache()
     
    
  • weblate/trans/tests/test_component.py+2 1 modified
    @@ -1232,7 +1232,8 @@ def restore_local_missing_translation_files(
                         self.local_missing_translation_contents[translation.pk]
                     )
                     translation.component.repository.execute(
    -                    ["add", "--force", "--", translation.filename]
    +                    ["add", "--force", "--", translation.filename],
    +                    remote_op="none",
                     )
     
         def test_reset_keep_recreates_missing_translation_file(self) -> None:
    
  • weblate/trans/tests/test_git_views.py+4 1 modified
    @@ -175,7 +175,10 @@ def setUp(self) -> None:
             super().setUp()
             repo = self.component.repository
             with repo.lock:
    -            repo.execute(["branch", "--delete", "--remotes", "origin/main"])
    +            repo.execute(
    +                ["branch", "--delete", "--remotes", "origin/main"],
    +                remote_op="none",
    +            )
     
     
     class GitBrokenComponentTest(GitBrokenProjectTest):
    
  • weblate/trans/tests/test_remote.py+3 2 modified
    @@ -595,9 +595,10 @@ def test_pending_changes_preserved_on_non_translation_update(self) -> None:
                 "# Test Project\n\nThis is a test README.\n", encoding="utf-8"
             )
             with self.component.repository.lock:
    -            self.component.repository.execute(["add", "README.md"])
    +            self.component.repository.execute(["add", "README.md"], remote_op="none")
                 self.component.repository.execute(
    -                ["commit", "-m", "Add README.md (non-translation file)"]
    +                ["commit", "-m", "Add README.md (non-translation file)"],
    +                remote_op="none",
                 )
                 self.component.repository.push(self.component.push_branch)
     
    
  • weblate/vcs/base.py+39 12 modified
    @@ -13,7 +13,7 @@
     import subprocess  # noqa: S404
     from contextlib import suppress
     from pathlib import Path
    -from typing import TYPE_CHECKING, ClassVar, Self, TypedDict
    +from typing import TYPE_CHECKING, ClassVar, Literal, Self, TypedDict
     
     from dateutil import parser
     from django.core.cache import cache
    @@ -61,6 +61,9 @@ class SubprocessArgs(TypedDict, total=False):
         input: str
     
     
    +type RemoteOperation = Literal["none", "pull", "push"]
    +
    +
     class RepositoryLock:
         def __init__(self, repository: Repository, lock: WeblateLock) -> None:
             self.repository = repository
    @@ -129,6 +132,14 @@ def __str__(self) -> str:
             return self.get_message()
     
     
    +class RepositoryCommandError(RepositoryError):
    +    """Error raised by the underlying VCS command."""
    +
    +
    +class RepositoryValidationError(RepositoryError):
    +    """Error raised when repository configuration violates runtime policy."""
    +
    +
     class RepositorySymlinkError(ValueError):
         """Raised when symlink resolution fails due to links outside the repository tree or excessive symlink depth."""
     
    @@ -395,7 +406,7 @@ def _popen(
                     if isinstance(error.stderr, bytes)
                     else error.stderr
                 )
    -            raise RepositoryError(
    +            raise RepositoryCommandError(
                     0,
                     f"Subprocess didn't complete before {error.timeout} seconds\n{stdout}{stderr or ''}",
                 ) from error
    @@ -423,7 +434,7 @@ def _popen(
                         retry=False,
                     )
     
    -            raise RepositoryError(process.returncode, errormessage)
    +            raise RepositoryCommandError(process.returncode, errormessage)
             return process.stdout
     
         @staticmethod
    @@ -451,6 +462,7 @@ def execute(
             self,
             args: list[str],
             *,
    +        remote_op: RemoteOperation,
             needs_lock: bool = True,
             fullcmd: bool = False,
             merge_err: bool = True,
    @@ -465,6 +477,10 @@ def execute(
                     raise RuntimeError(msg)
                 if self.component:
                     self.ensure_config_updated()
    +        if remote_op == "pull":
    +            self.validate_pull_url()
    +        elif remote_op == "push":
    +            self.validate_push_url()
             is_status = args[0] == self._cmd_status[0]
             try:
                 self.last_output = self._popen(
    @@ -476,14 +492,14 @@ def execute(
                     stdin=stdin,
                     environment=environment,
                 )
    -        except RepositoryError as error:
    +        except RepositoryCommandError as error:
                 if not is_status and not self.local:
                     self.log_status(error)
                 raise
             return self.last_output
     
         def log_status(self, error: str | RepositoryError) -> None:
    -        with suppress(RepositoryError):
    +        with suppress(RepositoryCommandError):
                 self.log(f"failure {error}")
                 self.log(self.status())
     
    @@ -499,13 +515,21 @@ def last_revision(self):
             return self.get_last_revision()
     
         def get_last_revision(self):
    -        return self.execute(self._cmd_last_revision, needs_lock=False, merge_err=False)
    +        return self.execute(
    +            self._cmd_last_revision,
    +            remote_op="none",
    +            needs_lock=False,
    +            merge_err=False,
    +        )
     
         @cached_property
         def last_remote_revision(self):
             """Return last remote revision."""
             return self.execute(
    -            self._cmd_last_remote_revision, needs_lock=False, merge_err=False
    +            self._cmd_last_remote_revision,
    +            remote_op="none",
    +            needs_lock=False,
    +            merge_err=False,
             )
     
         @classmethod
    @@ -523,7 +547,7 @@ def validate_remote_url(url: str) -> None:
             try:
                 validate_repo_url(url)
             except ValidationError as error:
    -            raise RepositoryError(0, "; ".join(error.messages)) from error
    +            raise RepositoryValidationError(0, "; ".join(error.messages)) from error
     
         def validate_pull_url(self, url: str | None = None) -> None:
             """Validate the pull URL in the current runtime context."""
    @@ -560,7 +584,7 @@ def update_remote(self) -> None:
     
         def status(self) -> str:
             """Return status of the repository."""
    -        return self.execute(self._cmd_status, needs_lock=False)
    +        return self.execute(self._cmd_status, remote_op="none", needs_lock=False)
     
         def push(self, branch: str) -> None:
             """Push given branch to remote repository."""
    @@ -820,9 +844,9 @@ def cleanup_files(self) -> None:
         def cleanup(self) -> None:
             """Cleanup repository status."""
             # Recover from failed merge/rebase
    -        with suppress(RepositoryError):
    +        with suppress(RepositoryCommandError):
                 self.merge(abort=True)
    -        with suppress(RepositoryError):
    +        with suppress(RepositoryCommandError):
                 self.rebase(abort=True)
             # Remove stale branches
             self.remove_stale_branches()
    @@ -844,7 +868,10 @@ def list_changed_files(self, refspec: str) -> list:
             This is not universal as refspec is different per vcs.
             """
             lines = self.execute(
    -            [*self._cmd_list_changed_files, refspec], needs_lock=False, merge_err=False
    +            [*self._cmd_list_changed_files, refspec],
    +            remote_op="none",
    +            needs_lock=False,
    +            merge_err=False,
             ).splitlines()
             return list(self.parse_changed_files(lines))
     
    
  • weblate/vcs/git.py+130 67 modified
    @@ -46,7 +46,7 @@
     from weblate.utils.lock import WeblateLock, WeblateLockTimeoutError
     from weblate.utils.render import render_template
     from weblate.utils.xml import parse_xml
    -from weblate.vcs.base import Repository, RepositoryError
    +from weblate.vcs.base import Repository, RepositoryCommandError, RepositoryError
     from weblate.vcs.gpg import get_gpg_sign_key
     
     if TYPE_CHECKING:
    @@ -183,6 +183,7 @@ def validate_branch_name(cls, branch: str) -> str:
         def get_remote_branch(cls, repo: str):
             if not repo:
                 return super().get_remote_branch(repo)
    +        cls.validate_remote_url(repo)
             try:
                 result = cls._popen(["ls-remote", "--symref", "--", repo, "HEAD"])
             except RepositoryError:
    @@ -251,23 +252,26 @@ def recover_lock_session(self) -> None:
             current_branch = self.get_current_branch()
             if current_branch not in TEMPORARY_BRANCHES:
                 return
    -        self.execute(["reset", "--hard"])
    +        self.execute(["reset", "--hard"], remote_op="none")
             self.checkout_with_temp_cleanup(self.branch)
    -        with suppress(RepositoryError):
    -            self.execute(["branch", "-D", current_branch])
    +        with suppress(RepositoryCommandError):
    +            self.execute(["branch", "-D", current_branch], remote_op="none")
             self.clean_revision_cache()
     
         def get_current_branch(self) -> str:
             return self.execute(
    -            ["branch", "--show-current"], needs_lock=False, merge_err=False
    +            ["branch", "--show-current"],
    +            remote_op="none",
    +            needs_lock=False,
    +            merge_err=False,
             ).strip()
     
         def checkout_with_temp_cleanup(self, branch: str) -> None:
             current_branch = self.get_current_branch()
    -        self.execute(["checkout", branch])
    +        self.execute(["checkout", branch], remote_op="none")
             if current_branch in TEMPORARY_BRANCHES and current_branch != branch:
    -            with suppress(RepositoryError):
    -                self.execute(["branch", "-D", current_branch])
    +            with suppress(RepositoryCommandError):
    +                self.execute(["branch", "-D", current_branch], remote_op="none")
                 self.clean_revision_cache()
     
         @staticmethod
    @@ -309,29 +313,37 @@ def _clone(cls, source: str, target: str, branch: str) -> None:
     
         def get_config(self, path):
             """Read entry from configuration."""
    -        return self.execute(["config", path], needs_lock=False, merge_err=False).strip()
    +        return self.execute(
    +            ["config", path],
    +            remote_op="none",
    +            needs_lock=False,
    +            merge_err=False,
    +        ).strip()
     
         def set_committer(self, name, mail) -> None:
             """Configure committer name."""
             self.config_update(("user", "name", name), ("user", "email", mail))
     
         def reset(self) -> None:
             """Reset working copy to match remote branch."""
    -        self.execute(["reset", "--hard", self.get_remote_branch_name()])
    +        self.execute(
    +            ["reset", "--hard", self.get_remote_branch_name()],
    +            remote_op="none",
    +        )
             self.clean_revision_cache()
     
         def rebase(self, abort=False) -> None:
             """Rebase working copy on top of remote branch."""
             if abort:
                 if self.has_git_file("rebase-apply") or self.has_git_file("rebase-merge"):
    -                self.execute(["rebase", "--abort"])
    +                self.execute(["rebase", "--abort"], remote_op="none")
                 if self.needs_commit():
    -                self.execute(["reset", "--hard"])
    +                self.execute(["reset", "--hard"], remote_op="none")
             else:
                 cmd = ["rebase"]
                 cmd.extend(self.get_gpg_sign_args())
                 cmd.append(self.get_remote_branch_name())
    -            self.execute(cmd, environment={"WEBLATE_MERGE_SKIP": "1"})
    +            self.execute(cmd, remote_op="none", environment={"WEBLATE_MERGE_SKIP": "1"})
             self.clean_revision_cache()
     
         def has_git_file(self, name):
    @@ -341,6 +353,7 @@ def has_rev(self, rev) -> bool:
             try:
                 self.execute(
                     ["rev-parse", "--verify", "--end-of-options", rev],
    +                remote_op="none",
                     needs_lock=False,
                 )
             except RepositoryError:
    @@ -356,11 +369,11 @@ def merge(
             if abort:
                 # Abort merge if there is one to abort
                 if self.has_rev("MERGE_HEAD"):
    -                self.execute(["merge", "--abort"])
    +                self.execute(["merge", "--abort"], remote_op="none")
                 if self.needs_commit():
    -                self.execute(["reset", "--hard"])
    +                self.execute(["reset", "--hard"], remote_op="none")
                 # Checkout original branch (we might be on tmp)
    -            self.execute(["checkout", current_branch])
    +            self.execute(["checkout", current_branch], remote_op="none")
             else:
                 self.delete_branch(tmp)
                 # We don't do simple git merge origin/branch as that leads
    @@ -369,9 +382,9 @@ def merge(
                 # changes merged into Weblate)
                 remote = self.get_remote_branch_name()
                 # Create local branch for upstream
    -            self.execute(["branch", tmp, remote])
    +            self.execute(["branch", tmp, remote], remote_op="none")
                 # Checkout upstream branch
    -            self.execute(["checkout", tmp])
    +            self.execute(["checkout", tmp], remote_op="none")
                 # Merge current Weblate changes, this can lead to conflict
                 cmd = [
                     "merge",
    @@ -382,20 +395,20 @@ def merge(
                     cmd.append("--no-ff")
                 cmd.extend(self.get_gpg_sign_args())
                 cmd.append(current_branch)
    -            self.execute(cmd)
    +            self.execute(cmd, remote_op="none")
                 # Checkout branch with Weblate changes
    -            self.execute(["checkout", current_branch])
    +            self.execute(["checkout", current_branch], remote_op="none")
                 # Merge temporary branch (this is fast forward so does not create
                 # merge commit)
    -            self.execute(["merge", tmp])
    +            self.execute(["merge", tmp], remote_op="none")
     
             # Delete temporary branch
             self.delete_branch(tmp)
             self.clean_revision_cache()
     
         def delete_branch(self, name) -> None:
             if self.has_branch(name):
    -            self.execute(["branch", "-D", name])
    +            self.execute(["branch", "-D", name], remote_op="none")
     
         def needs_commit(self, filenames: list[str] | None = None) -> bool:
             """Check whether repository needs commit."""
    @@ -404,7 +417,7 @@ def needs_commit(self, filenames: list[str] | None = None) -> bool:
                 cmd.extend(["--untracked-files=all", "--ignored=traditional", "--"])
                 cmd.extend(filenames)
             with self.lock:
    -            status = self.execute(cmd, merge_err=False)
    +            status = self.execute(cmd, remote_op="none", merge_err=False)
             return bool(status)
     
         def show(self, revision: str) -> str:
    @@ -413,7 +426,12 @@ def show(self, revision: str) -> str:
     
             Used in tests.
             """
    -        return self.execute(["show", revision], needs_lock=False, merge_err=False)
    +        return self.execute(
    +            ["show", revision],
    +            remote_op="none",
    +            needs_lock=False,
    +            merge_err=False,
    +        )
     
         @staticmethod
         def get_gpg_sign_args():
    @@ -426,6 +444,7 @@ def _get_revision_info(self, revision):
             """Return dictionary with detailed revision info."""
             text = self.execute(
                 ["log", "-1", "--format=fuller", "--date=rfc", "--abbrev-commit", revision],
    +            remote_op="none",
                 needs_lock=False,
                 merge_err=False,
             )
    @@ -463,6 +482,7 @@ def log_revisions(self, refspec):
             """Return revision log for given refspec."""
             return self.execute(
                 ["log", "--format=format:%H", refspec, "--"],
    +            remote_op="none",
                 needs_lock=False,
                 merge_err=False,
             ).splitlines()
    @@ -486,11 +506,14 @@ def commit(
                 for name in files:
                     try:
                         # Resolving symlinks is needed for symlinks in directory structure
    -                    self.execute(["add", "--force", "--", self.resolve_symlinks(name)])
    +                    self.execute(
    +                        ["add", "--force", "--", self.resolve_symlinks(name)],
    +                        remote_op="none",
    +                    )
                     except RepositoryError:
                         continue
             else:
    -            self.execute(["add", self.path])
    +            self.execute(["add", self.path], remote_op="none")
     
             # Bail out if there is nothing to commit.
             # This can easily happen with squashing and reverting changes.
    @@ -506,7 +529,7 @@ def commit(
             cmd.extend(self.get_gpg_sign_args())
     
             # Execute it
    -        self.execute(cmd, stdin=message)
    +        self.execute(cmd, remote_op="none", stdin=message)
             # Clean cache
             self.clean_revision_cache()
     
    @@ -520,7 +543,7 @@ def remove(
             extra_commit_files: list[str] | None = None,
         ) -> None:
             """Remove files and creates new revision."""
    -        self.execute(["rm", "--force", "--", *files])
    +        self.execute(["rm", "--force", "--", *files], remote_op="none")
             self.commit(message, author, files=files + (extra_commit_files or []))
     
         def get_remote_configure(
    @@ -570,7 +593,12 @@ def list_branches(self, *args: str) -> list[str]:
             # (we get additional * there indicating current branch)
             return [
                 x.lstrip("*").strip()
    -            for x in self.execute(cmd, needs_lock=False, merge_err=False).splitlines()
    +            for x in self.execute(
    +                cmd,
    +                remote_op="none",
    +                needs_lock=False,
    +                merge_err=False,
    +            ).splitlines()
             ]
     
         def has_branch(self, branch):
    @@ -582,7 +610,10 @@ def configure_branch(self, branch) -> None:
             branch = self.validate_branch_name(branch)
             # Add branch
             if not self.has_branch(branch):
    -            self.execute(["checkout", "-b", branch, f"origin/{branch}"])
    +            self.execute(
    +                ["checkout", "-b", branch, f"origin/{branch}"],
    +                remote_op="none",
    +            )
             else:
                 # Ensure it tracks correct upstream
                 self.config_update((f'branch "{branch}"', "remote", "origin"))
    @@ -594,7 +625,10 @@ def configure_branch(self, branch) -> None:
         def describe(self) -> str:
             """Verbosely describes current revision."""
             return self.execute(
    -            ["describe", "--always"], needs_lock=False, merge_err=False
    +            ["describe", "--always"],
    +            remote_op="none",
    +            needs_lock=False,
    +            merge_err=False,
             ).strip()
     
         @classmethod
    @@ -655,32 +689,42 @@ def get_file(self, path, revision) -> str:
             """Return content of file at given revision."""
             return self.execute(
                 ["show", f"{revision}:{path}"],
    +            remote_op="none",
                 needs_lock=False,
                 merge_err=False,
             )
     
         def remove_stale_branches(self) -> None:
             """Remove stale branches and tags from the repository."""
             # Prune remote branches, this can fail if repository is unreachable
    -        with suppress(RepositoryError):
    -            self.execute([*self.get_auth_args(), "remote", "prune", "origin"])
    +        with suppress(RepositoryCommandError):
    +            self.execute(
    +                [*self.get_auth_args(), "remote", "prune", "origin"],
    +                remote_op="pull",
    +            )
             # Remove possible stale branches
             for branch in self.list_branches():
                 if branch != self.branch:
    -                self.execute(["branch", "--delete", "--force", branch])
    +                self.execute(
    +                    ["branch", "--delete", "--force", branch], remote_op="none"
    +                )
             # Remove any tags
    -        for tag in self.execute(["tag", "--list"], merge_err=False).splitlines():
    -            self.execute(["tag", "--delete", tag])
    +        for tag in self.execute(
    +            ["tag", "--list"],
    +            remote_op="none",
    +            merge_err=False,
    +        ).splitlines():
    +            self.execute(["tag", "--delete", tag], remote_op="none")
     
         def cleanup_files(self) -> None:
             """Remove not tracked files from the repository."""
    -        self.execute(["clean", "-f", "-d"])
    +        self.execute(["clean", "-f", "-d"], remote_op="none")
     
         def list_remote_branches(self) -> list[str]:
             """Return a list of remote branch names by querying the remote repository using 'git ls-remote --heads origin'."""
    -        self.validate_pull_url()
             branches = self.execute(
                 [*self.get_auth_args(), "ls-remote", "--heads", "origin"],
    +            remote_op="pull",
                 needs_lock=False,
                 merge_err=False,
             )
    @@ -693,8 +737,10 @@ def update_remote(self) -> None:
             """Update remote repository."""
             branch = self.validate_branch_name(self.branch)
             # Update existing branch only, not changing depth
    -        self.validate_pull_url()
    -        self.execute([*self.get_auth_args(), "fetch", "origin", branch])
    +        self.execute(
    +            [*self.get_auth_args(), "fetch", "origin", branch],
    +            remote_op="pull",
    +        )
             self.clean_revision_cache()
     
         def push(self, branch: str) -> None:
    @@ -705,12 +751,10 @@ def push(self, branch: str) -> None:
                 if branch
                 else current_branch
             )
    -        self.validate_push_url()
    -        self.execute([*self._cmd_push, "origin", refspec])
    +        self.execute([*self._cmd_push, "origin", refspec], remote_op="push")
     
         def unshallow(self) -> None:
    -        self.validate_pull_url()
    -        self.execute([*self.get_auth_args(), "fetch", "--unshallow"])
    +        self.execute([*self.get_auth_args(), "fetch", "--unshallow"], remote_op="pull")
     
         def parse_changed_files(self, lines: list[str]) -> Iterator[str]:
             """Parse output with changed files."""
    @@ -720,18 +764,22 @@ def parse_changed_files(self, lines: list[str]) -> Iterator[str]:
     
         def status(self) -> str:
             result = [super().status()]
    -        cleanups = self.execute(["clean", "-f", "-d", "-n"], needs_lock=False)
    +        cleanups = self.execute(
    +            ["clean", "-f", "-d", "-n"],
    +            remote_op="none",
    +            needs_lock=False,
    +        )
             if cleanups:
                 result.extend(("", gettext("Possible cleanups:"), "", cleanups))
     
             return "\n".join(result)
     
         def compact(self) -> None:
    -        self.execute(["gc"])
    +        self.execute(["gc"], remote_op="none")
     
         def maintenance(self) -> None:
             # Expire old reflog entries (using Git defaults)
    -        self.execute(["reflog", "expire"])
    +        self.execute(["reflog", "expire"], remote_op="none")
             # Super will invoke remove_stale_branches() and compact()
             super().maintenance()
     
    @@ -763,11 +811,11 @@ def get_username_from_url(self, url) -> str:
             return ""
     
         def push(self, branch) -> None:
    -        self.validate_push_url()
             if self.needs_push():
                 try:
                     self.execute(
    -                    ["review", "--yes", self.validate_branch_name(self.branch)]
    +                    ["review", "--yes", self.validate_branch_name(self.branch)],
    +                    remote_op="push",
                     )
                 except RepositoryError as error:
                     if "(no new changes)" in str(error):
    @@ -886,16 +934,15 @@ def configure_remote(
                     raise RepositoryError(-1, "Can not switch subversion URL")
                 return
             args, self._fetch_revision = self.get_remote_args(pull_url, self.path)
    -        self.execute(["svn", "init", *args])
    +        self.execute(["svn", "init", *args], remote_op="none")
     
         def update_remote(self) -> None:
             """Update remote repository."""
    -        self.validate_pull_url()
             if self._fetch_revision:
    -            self.execute(["svn", "fetch", self._fetch_revision])
    +            self.execute(["svn", "fetch", self._fetch_revision], remote_op="pull")
                 self._fetch_revision = None
             else:
    -            self.execute(["svn", "fetch", "--parent"])
    +            self.execute(["svn", "fetch", "--parent"], remote_op="pull")
             self.clean_revision_cache()
     
         @classmethod
    @@ -929,17 +976,17 @@ def rebase(self, abort=False) -> None:
             Git-svn does not support merge.
             """
             if abort:
    -            self.execute(["rebase", "--abort"])
    +            self.execute(["rebase", "--abort"], remote_op="none")
             else:
    -            self.validate_pull_url()
    -            self.execute(["svn", "rebase"])
    +            self.execute(["svn", "rebase"], remote_op="pull")
             self.clean_revision_cache()
     
         @cached_property
         def last_remote_revision(self) -> str:
             """Return last remote revision."""
             return self.execute(
                 ["log", "-n", "1", "--format=format:%H", self.get_remote_branch_name()],
    +            remote_op="none",
                 needs_lock=False,
                 merge_err=False,
             )
    @@ -968,8 +1015,10 @@ def list_remote_branches(self) -> list[str]:
     
         def push(self, branch: str) -> None:
             """Push given branch to remote repository."""
    -        self.validate_pull_url()
    -        self.execute(["svn", "dcommit", self.validate_branch_name(self.branch)])
    +        self.execute(
    +            ["svn", "dcommit", self.validate_branch_name(self.branch)],
    +            remote_op="pull",
    +        )
     
     
     class GitForcePushRepository(GitRepository):
    @@ -1050,16 +1099,16 @@ def merge(
             # as we're expecting there will be an additional merge
             # commit created from the merge request.
             if abort:
    -            self.execute(["merge", "--abort"])
    +            self.execute(["merge", "--abort"], remote_op="none")
                 # Needed for compatibility with original merge code
    -            self.execute(["checkout", current_branch])
    +            self.execute(["checkout", current_branch], remote_op="none")
             else:
                 cmd = ["merge"]
                 if no_ff:
                     cmd.append("--no-ff")
                 cmd.extend(self.get_gpg_sign_args())
                 cmd.append(self.get_remote_branch_name())
    -            self.execute(cmd)
    +            self.execute(cmd, remote_op="none")
             self.clean_revision_cache()
     
         def parse_repo_url(
    @@ -1206,7 +1255,8 @@ def push_to_fork(
                     "--force",
                     credentials["username"],
                     f"{local_branch}:{fork_branch}",
    -            ]
    +            ],
    +            remote_op="push",
             )
     
         def configure_fork_remote(
    @@ -1242,7 +1292,7 @@ def get_remote_branch_name(self, branch: str | None = None) -> str:
     
         def fork(self, credentials: GitCredentials) -> None:
             """Create fork of original repository if one doesn't exist yet."""
    -        remotes = self.execute(["remote"]).splitlines()
    +        remotes = self.execute(["remote"], remote_op="none").splitlines()
             if credentials["username"] not in remotes:
                 self.create_fork(credentials)
     
    @@ -1532,7 +1582,7 @@ def raise_for_response(cls, response: requests.Response) -> None:
                 raise RepositoryError(0, "Invalid token")
     
         def fork(self, credentials: GitCredentials) -> None:
    -        remotes = self.execute(["remote"]).splitlines()
    +        remotes = self.execute(["remote"], remote_op="none").splitlines()
             if credentials["username"] not in remotes:
                 self.create_fork(credentials)
                 return
    @@ -1703,7 +1753,12 @@ def __get_forked_id(self, credentials: GitCredentials, remote: str) -> str:
             where the name is enough).
             """
             cmd = ["remote", "get-url", "--push", remote]
    -        fork_remotes = self.execute(cmd, needs_lock=False, merge_err=False).splitlines()
    +        fork_remotes = self.execute(
    +            cmd,
    +            remote_op="none",
    +            needs_lock=False,
    +            merge_err=False,
    +        ).splitlines()
             (_scheme, _username, _password, hostname, owner, slug) = self.parse_repo_url(
                 fork_remotes[0]
             )
    @@ -2016,6 +2071,9 @@ def merge(
         def list_remote_branches(self) -> list[str]:
             return []
     
    +    def remove_stale_branches(self) -> None:
    +        return
    +
         @classmethod
         def get_remote_branch(cls, repo: str):  # noqa: ARG003
             return cls.default_branch
    @@ -2063,7 +2121,7 @@ def build_local_repo(cls, target: str, commit_message: str):
                 # Populate files
                 yield repo
                 # Add to repository
    -            repo.execute(["add", target])
    +            repo.execute(["add", target], remote_op="none")
                 if repo.needs_commit():
                     repo.commit(commit_message)
     
    @@ -2116,7 +2174,12 @@ def get_forked_url(self, credentials: GitCredentials) -> str:
             """
             target_path = credentials["url"].rsplit("/", 1)[-1]
             cmd = ["remote", "get-url", "--push", credentials["username"]]
    -        fork_remotes = self.execute(cmd, needs_lock=False, merge_err=False).splitlines()
    +        fork_remotes = self.execute(
    +            cmd,
    +            remote_op="none",
    +            needs_lock=False,
    +            merge_err=False,
    +        ).splitlines()
             fork_path = self.get_fork_path(fork_remotes[0])
             return credentials["url"].replace(target_path, fork_path)
     
    
  • weblate/vcs/mercurial.py+47 24 modified
    @@ -25,6 +25,8 @@
     
         from django_stubs_ext import StrOrPromise
     
    +    from weblate.vcs.base import RemoteOperation
    +
     
     VERSION_RE = re.compile(r".*\(version ([^)]*)\).*")
     
    @@ -62,6 +64,7 @@ class HgRepository(Repository):
         default_branch: ClassVar[str] = "default"
         ref_to_remote: ClassVar[str] = "head() and branch(.) and not closed() - ."
         ref_from_remote: ClassVar[str] = "outgoing()"
    +    remote_revset_markers: ClassVar[tuple[str, ...]] = ("remote(", "outgoing(")
     
         @staticmethod
         def sanitize_error_message(errormessage: str) -> str:
    @@ -132,9 +135,9 @@ def set_committer(self, name, mail) -> None:
         def reset(self) -> None:
             """Reset working copy to match remote branch."""
             self.set_config_values(("extensions", "strip", ""))
    -        self.execute(["update", "--clean", "remote(.)"])
    +        self.execute(["update", "--clean", "remote(.)"], remote_op="pull")
             if self.needs_push():
    -            self.execute(["strip", "roots(outgoing())"])
    +            self.execute(["strip", "roots(outgoing())"], remote_op="pull")
             self.clean_revision_cache()
     
         def configure_merge(self) -> None:
    @@ -169,21 +172,24 @@ def rebase(self, abort=False) -> None:
             """Rebase working copy on top of remote branch."""
             self.set_config_values(("extensions", "rebase", ""))
             if abort:
    -            self.execute(["rebase", "--abort"])
    +            self.execute(["rebase", "--abort"], remote_op="none")
             elif self.needs_merge():
                 if self.needs_ff():
    -                self.execute(["update", "--clean", "remote(.)"])
    +                self.execute(["update", "--clean", "remote(.)"], remote_op="pull")
                 else:
                     self.configure_merge()
                     try:
    -                    self.execute(["rebase", "-d", "remote(.)"])
    +                    self.execute(["rebase", "-d", "remote(.)"], remote_op="pull")
                     except RepositoryError as error:
                         # Mercurial 3.8 changed error code and output
                         if (
                             error.retcode in {1, 255}
                             and "nothing to rebase" in error.args[0]
                         ):
    -                        self.execute(["update", "--clean", "remote(.)"])
    +                        self.execute(
    +                            ["update", "--clean", "remote(.)"],
    +                            remote_op="pull",
    +                        )
                             self.clean_revision_cache()
                             return
                         raise
    @@ -194,30 +200,30 @@ def merge(
         ) -> None:
             """Merge remote branch or reverts the merge."""
             if abort:
    -            self.execute(["update", "--clean", "."])
    +            self.execute(["update", "--clean", "."], remote_op="none")
             elif self.needs_merge():
                 if self.needs_ff() and not no_ff:
    -                self.execute(["update", "--clean", "remote(.)"])
    +                self.execute(["update", "--clean", "remote(.)"], remote_op="pull")
                 else:
                     self.configure_merge()
                     # Fallback to merge
                     try:
    -                    self.execute(["merge", "-r", "remote(.)"])
    +                    self.execute(["merge", "-r", "remote(.)"], remote_op="pull")
                     except RepositoryError as error:
                         if error.retcode == 255:
                             # Nothing to merge
                             self.clean_revision_cache()
                             return
                         raise
    -                self.execute(["commit", "--message", "Merge"])
    +                self.execute(["commit", "--message", "Merge"], remote_op="none")
             self.clean_revision_cache()
     
         def needs_commit(self, filenames: list[str] | None = None):
             """Check whether repository needs commit."""
             cmd = ["status", "--"]
             if filenames:
                 cmd.extend(filenames)
    -        status = self.execute(cmd, needs_lock=False)
    +        status = self.execute(cmd, remote_op="none", needs_lock=False)
             return bool(status)
     
         def _get_revision_info(self, revision):
    @@ -237,6 +243,7 @@ def _get_revision_info(self, revision):
             """
             text = self.execute(
                 ["log", "--limit", "1", "--template", template, "--rev", revision],
    +            remote_op="none",
                 needs_lock=False,
                 merge_err=False,
             )
    @@ -270,10 +277,18 @@ def log_revisions(self, refspec):
             """Return revisin log for given refspec."""
             return self.execute(
                 ["log", "--template", "{node}\n", "--rev", refspec],
    +            remote_op=self.get_revset_remote_op(refspec),
                 needs_lock=False,
                 merge_err=False,
             ).splitlines()
     
    +    @classmethod
    +    def get_revset_remote_op(cls, refspec: str) -> RemoteOperation:
    +        """Determine whether a revset can contact the configured remote."""
    +        if any(marker in refspec for marker in cls.remote_revset_markers):
    +            return "pull"
    +        return "none"
    +
         def needs_ff(self):
             """
             Check whether repository needs a fast-forward to upstream.
    @@ -312,10 +327,10 @@ def commit(
             if files is not None:
                 for name in files:
                     try:
    -                    self.execute(["add", "--", name])
    +                    self.execute(["add", "--", name], remote_op="none")
                     except RepositoryError:
                         try:
    -                        self.execute(["remove", "--", name])
    +                        self.execute(["remove", "--", name], remote_op="none")
                         except RepositoryError:
                             continue
                     cmd.append(name)
    @@ -326,7 +341,7 @@ def commit(
                 return False
     
             # Execute it
    -        self.execute(cmd)
    +        self.execute(cmd, remote_op="none")
             # Clean cache
             self.clean_revision_cache()
     
    @@ -340,7 +355,7 @@ def remove(
             extra_commit_files: list[str] | None = None,
         ) -> None:
             """Remove files and creates new revision."""
    -        self.execute(["remove", "--force", "--", *files])
    +        self.execute(["remove", "--force", "--", *files], remote_op="none")
             self.commit(message, author, files=files + (extra_commit_files or []))
     
         def configure_remote(
    @@ -370,12 +385,15 @@ def configure_remote(
             self.branch = branch
     
         def on_branch(self, branch):
    -        return branch == self.execute(["branch"], merge_err=False).strip()
    +        return (
    +            branch
    +            == self.execute(["branch"], remote_op="none", merge_err=False).strip()
    +        )
     
         def configure_branch(self, branch) -> None:
             """Configure repository branch."""
             if not self.on_branch(branch):
    -            self.execute(["update", "--", branch])
    +            self.execute(["update", "--", branch], remote_op="none")
             self.branch = branch
     
         def describe(self):
    @@ -388,15 +406,15 @@ def describe(self):
                     "--template",
                     "{latesttag}-{latesttagdistance}-{node|short}",
                 ],
    +            remote_op="none",
                 needs_lock=False,
                 merge_err=False,
             ).strip()
     
         def push(self, branch) -> None:
             """Push given branch to remote repository."""
    -        self.validate_push_url()
             try:
    -            self.execute(["push", f"--branch={self.branch}"])
    +            self.execute(["push", f"--branch={self.branch}"], remote_op="push")
             except RepositoryError as error:
                 if error.retcode == 1:
                     # No changes found
    @@ -406,7 +424,10 @@ def push(self, branch) -> None:
         def get_file(self, path, revision):
             """Return content of file at given revision."""
             return self.execute(
    -            ["cat", "--rev", revision, path], needs_lock=False, merge_err=False
    +            ["cat", "--rev", revision, path],
    +            remote_op="none",
    +            needs_lock=False,
    +            merge_err=False,
             )
     
         def remove_stale_branches(self) -> None:
    @@ -416,12 +437,11 @@ def remove_stale_branches(self) -> None:
         def cleanup_files(self) -> None:
             """Remove not tracked files from the repository."""
             self.set_config_values(("extensions", "purge", ""))
    -        self.execute(["purge"])
    +        self.execute(["purge"], remote_op="none")
     
         def update_remote(self) -> None:
             """Update remote repository."""
    -        self.validate_pull_url()
    -        self.execute(["pull", f"--branch={self.branch}"])
    +        self.execute(["pull", f"--branch={self.branch}"], remote_op="pull")
             self.clean_revision_cache()
     
         def parse_changed_files(self, lines: list[str]) -> Iterator[str]:
    @@ -450,5 +470,8 @@ def global_setup(cls) -> None:
     
         def show(self, revision: str) -> str:
             return self.execute(
    -            ["diff", "--change", revision], needs_lock=False, merge_err=False
    +            ["diff", "--change", revision],
    +            remote_op="none",
    +            needs_lock=False,
    +            merge_err=False,
             )
    
  • weblate/vcs/tests/test_vcs.py+163 14 modified
    @@ -27,8 +27,10 @@
     from weblate.utils.files import REPO_TEMP_DIRNAME
     from weblate.utils.render import render_template
     from weblate.vcs.base import (
    +    RepositoryCommandError,
         RepositoryError,
         RepositorySymlinkError,
    +    RepositoryValidationError,
         get_config_check_cache_key,
         is_ssh_host_key_mismatch_error,
         is_ssh_host_key_verification_error,
    @@ -385,7 +387,10 @@ def test_configure_branch_requires_lock_for_branch_creation(self) -> None:
     
         def test_configure_branch_recovers_temp_branch_on_next_lock(self) -> None:
             with self.repo.lock:
    -            self.repo.execute(["checkout", "-b", "weblate-squash-tmp"])
    +            self.repo.execute(
    +                ["checkout", "-b", "weblate-squash-tmp"],
    +                remote_op="none",
    +            )
                 temp_dir = self.repo.get_repo_temp_dir()
                 if temp_dir is None:
                     self.fail("Expected Git temp dir to exist")
    @@ -401,7 +406,11 @@ def test_configure_branch_recovers_temp_branch_on_next_lock(self) -> None:
                 self.assertEqual(self.repo.get_current_branch(), "main")
                 self.assertNotIn("weblate-squash-tmp", self.repo.list_branches())
                 self.assertFalse(
    -                self.repo.execute(["status", "--short"], needs_lock=False).strip()
    +                self.repo.execute(
    +                    ["status", "--short"],
    +                    remote_op="none",
    +                    needs_lock=False,
    +                ).strip()
                 )
     
     
    @@ -528,6 +537,24 @@ def get_fake_component(self):
                 pk=-1,
             )
     
    +    def assert_no_popen_sequence(
    +        self,
    +        mocked_popen,
    +        *sequence: str,
    +    ) -> None:
    +        width = len(sequence)
    +
    +        self.assertFalse(
    +            any(
    +                any(
    +                    tuple(call.args[0][index : index + width]) == sequence
    +                    for index in range(len(call.args[0]) - width + 1)
    +                )
    +                for call in mocked_popen.call_args_list
    +            ),
    +            mocked_popen.call_args_list,
    +        )
    +
         def clone_repo(self, path):
             return self._class.clone(
                 self.get_remote_repo_url(),
    @@ -613,7 +640,7 @@ def test_list_remote_branches_runtime_private_url_rejected(self) -> None:
             self.repo.component.repo = "https://private.example/repo.git"
             with (
                 self.repo.lock,
    -            patch.object(self.repo, "execute") as mock_execute,
    +            patch.object(self._class, "_popen") as mock_popen,
                 patch(
                     "weblate.utils.outbound.socket.getaddrinfo",
                     return_value=[(0, 0, 0, "", ("127.0.0.1", 443))],
    @@ -622,16 +649,51 @@ def test_list_remote_branches_runtime_private_url_rejected(self) -> None:
             ):
                 self.repo.list_remote_branches()
     
    -        mock_execute.assert_not_called()
    +        mock_popen.assert_not_called()
    +        self.assertIn("internal or non-public address", str(error.exception))
    +
    +    def test_remove_stale_branches_runtime_private_url_rejected(self) -> None:
    +        if self._class in {SubversionRepository, HgRepository, LocalRepository}:
    +            self.skipTest("Covered by backend-specific behavior")
    +        self.repo.component.repo = "https://private.example/repo.git"
    +        with (
    +            self.repo.lock,
    +            patch.object(self._class, "_popen") as mock_popen,
    +            patch(
    +                "weblate.utils.outbound.socket.getaddrinfo",
    +                return_value=[(0, 0, 0, "", ("127.0.0.1", 443))],
    +            ),
    +            self.assertRaises(RepositoryValidationError) as error,
    +        ):
    +            self.repo.remove_stale_branches()
    +
    +        self.assert_no_popen_sequence(mock_popen, "remote", "prune", "origin")
             self.assertIn("internal or non-public address", str(error.exception))
     
    +    def test_remove_stale_branches_ignores_command_errors(self) -> None:
    +        if self._class in {SubversionRepository, HgRepository, LocalRepository}:
    +            self.skipTest("Covered by backend-specific behavior")
    +
    +        original_execute = self.repo.execute
    +
    +        def mocked_execute(args: list[str], **kwargs):
    +            if tuple(args[-3:]) == ("remote", "prune", "origin"):
    +                raise RepositoryCommandError(128, "remote unavailable")
    +            return original_execute(args, **kwargs)
    +
    +        with (
    +            self.repo.lock,
    +            patch.object(self.repo, "execute", side_effect=mocked_execute),
    +        ):
    +            self.repo.remove_stale_branches()
    +
         def test_update_remote_runtime_private_url_rejected(self) -> None:
             if self._class in {SubversionRepository, HgRepository, LocalRepository}:
                 self.skipTest("Covered by backend-specific behavior")
             self.repo.component.repo = "https://private.example/repo.git"
             with (
                 self.repo.lock,
    -            patch.object(self.repo, "execute") as mock_execute,
    +            patch.object(self._class, "_popen") as mock_popen,
                 patch(
                     "weblate.utils.outbound.socket.getaddrinfo",
                     return_value=[(0, 0, 0, "", ("127.0.0.1", 443))],
    @@ -640,7 +702,7 @@ def test_update_remote_runtime_private_url_rejected(self) -> None:
             ):
                 self.repo.update_remote()
     
    -        mock_execute.assert_not_called()
    +        self.assert_no_popen_sequence(mock_popen, "fetch")
             self.assertIn("internal or non-public address", str(error.exception))
     
         def test_push(self, branch: str = "") -> None:
    @@ -653,7 +715,7 @@ def test_push_runtime_private_url_rejected(self) -> None:
             self.repo.component.push = "https://private.example/repo.git"
             with (
                 self.repo.lock,
    -            patch.object(self.repo, "execute") as mock_execute,
    +            patch.object(self._class, "_popen") as mock_popen,
                 patch(
                     "weblate.utils.outbound.socket.getaddrinfo",
                     return_value=[(0, 0, 0, "", ("127.0.0.1", 443))],
    @@ -662,7 +724,24 @@ def test_push_runtime_private_url_rejected(self) -> None:
             ):
                 self.repo.push("")
     
    -        mock_execute.assert_not_called()
    +        self.assert_no_popen_sequence(mock_popen, "push")
    +        self.assert_no_popen_sequence(mock_popen, "review")
    +        self.assertIn("internal or non-public address", str(error.exception))
    +
    +    def test_get_remote_branch_runtime_private_url_rejected(self) -> None:
    +        if not issubclass(self._class, GitRepository) or self._class is LocalRepository:
    +            self.skipTest("Covered by backend-specific behavior")
    +        with (
    +            patch.object(self._class, "_popen") as mock_popen,
    +            patch(
    +                "weblate.utils.outbound.socket.getaddrinfo",
    +                return_value=[(0, 0, 0, "", ("127.0.0.1", 443))],
    +            ),
    +            self.assertRaises(RepositoryError) as error,
    +        ):
    +            self._class.get_remote_branch("https://private.example/repo.git")
    +
    +        mock_popen.assert_not_called()
             self.assertIn("internal or non-public address", str(error.exception))
     
         def test_push_commit(self) -> None:
    @@ -697,6 +776,7 @@ def test_has_rev_uses_end_of_options(self) -> None:
                 self.assertTrue(self.repo.has_rev("--verify"))
             mocked.assert_called_once_with(
                 ["rev-parse", "--verify", "--end-of-options", "--verify"],
    +            remote_op="none",
                 needs_lock=False,
             )
     
    @@ -928,7 +1008,10 @@ def test_configure_remote(self) -> None:
                         self.repo.get_config("remote.origin.pushURL"), "pushurl"
                     )
                 # Test that we handle not set fetching
    -            self.repo.execute(["config", "--unset", "remote.origin.fetch"])
    +            self.repo.execute(
    +                ["config", "--unset", "remote.origin.fetch"],
    +                remote_op="none",
    +            )
                 self.repo.configure_remote("pullurl", "pushurl", "branch")
                 self.assertEqual(
                     self.repo.get_config("remote.origin.fetch"),
    @@ -1434,7 +1517,8 @@ def test_push_when_remote_fork_is_deleted(self, branch: str = "") -> None:
                         "add",
                         "test",
                         "git@ssh.this.does.not.exist:v3/org/proj/repo",
    -                ]
    +                ],
    +                remote_op="none",
                 )
     
             responses.post(
    @@ -2114,7 +2198,11 @@ def test_count_outgoing_after_merge(self) -> None:
             credentials = self.repo.get_credentials()
             fork_branch_name = self.repo.get_fork_branch_name()
             fork_ref = f"refs/remotes/{credentials['username']}/{fork_branch_name}"
    -        self.repo.execute(["update-ref", fork_ref, "HEAD"], needs_lock=False)
    +        self.repo.execute(
    +            ["update-ref", fork_ref, "HEAD"],
    +            remote_op="none",
    +            needs_lock=False,
    +        )
     
             # count_outgoing should now be 0 since fork has our commits
             # (even though origin doesn't yet - MR is pending)
    @@ -2124,7 +2212,11 @@ def test_count_outgoing_after_merge(self) -> None:
             # In a real scenario, after a merge request is merged and git fetch is done,
             # origin/{branch} would contain the local commits
             origin_ref = f"refs/remotes/origin/{self.repo.branch}"
    -        self.repo.execute(["update-ref", origin_ref, "HEAD"], needs_lock=False)
    +        self.repo.execute(
    +            ["update-ref", origin_ref, "HEAD"],
    +            remote_op="none",
    +            needs_lock=False,
    +        )
     
             # count_outgoing should still return 0 since origin has our commits
             self.assertEqual(self.repo.count_outgoing(), 0)
    @@ -2144,7 +2236,11 @@ def test_count_outgoing_non_default_branch(self) -> None:
     
             # Update origin ref for the different branch
             origin_ref = f"refs/remotes/origin/{different_branch}"
    -        self.repo.execute(["update-ref", origin_ref, "HEAD"], needs_lock=False)
    +        self.repo.execute(
    +            ["update-ref", origin_ref, "HEAD"],
    +            remote_op="none",
    +            needs_lock=False,
    +        )
     
             # count_outgoing with different branch should return 0
             # (commits are in origin for that branch, fork not checked)
    @@ -2157,7 +2253,11 @@ def test_needs_push_non_default_branch_ignores_stale_fork(self) -> None:
             credentials = self.repo.get_credentials()
             fork_branch_name = self.repo.get_fork_branch_name()
             fork_ref = f"refs/remotes/{credentials['username']}/{fork_branch_name}"
    -        self.repo.execute(["update-ref", fork_ref, "HEAD"], needs_lock=False)
    +        self.repo.execute(
    +            ["update-ref", fork_ref, "HEAD"],
    +            remote_op="none",
    +            needs_lock=False,
    +        )
     
             different_branch = "develop" if self.repo.branch != "develop" else "feature"
             self.assertTrue(self.repo.needs_push(different_branch))
    @@ -2356,6 +2456,23 @@ def verify_pull_url(self) -> None:
                 self.format_local_path(self.subversion_repo_path),
             )
     
    +    def test_push_runtime_private_repo_rejected_even_with_safe_push_url(self) -> None:
    +        self.repo.component.repo = "https://private.example/repo"
    +        self.repo.component.push = "https://example.com/repo"
    +        with (
    +            self.repo.lock,
    +            patch.object(self._class, "_popen") as mock_popen,
    +            patch(
    +                "weblate.utils.outbound.socket.getaddrinfo",
    +                return_value=[(0, 0, 0, "", ("127.0.0.1", 443))],
    +            ),
    +            self.assertRaises(RepositoryError) as error,
    +        ):
    +            self.repo.push("")
    +
    +        self.assert_no_popen_sequence(mock_popen, "svn", "dcommit")
    +        self.assertIn("internal or non-public address", str(error.exception))
    +
     
     class VCSSubversionBranchTest(VCSSubversionTest):
         """Cloning subversion branch directly."""
    @@ -2424,6 +2541,38 @@ def test_status(self) -> None:
             status = self.repo.status()
             self.assertEqual(status, "")
     
    +    def test_count_outgoing_runtime_private_url_rejected(self) -> None:
    +        self.repo.component.repo = "https://private.example/repo"
    +        with (
    +            patch.object(self._class, "_popen") as mock_popen,
    +            patch(
    +                "weblate.utils.outbound.socket.getaddrinfo",
    +                return_value=[(0, 0, 0, "", ("127.0.0.1", 443))],
    +            ),
    +            self.assertRaises(RepositoryError) as error,
    +        ):
    +            self.repo.count_outgoing()
    +
    +        mock_popen.assert_not_called()
    +        self.assertIn("internal or non-public address", str(error.exception))
    +
    +    def test_reset_runtime_private_url_rejected(self) -> None:
    +        self.repo.component.repo = "https://private.example/repo"
    +        with (
    +            self.repo.lock,
    +            patch.object(self._class, "_popen") as mock_popen,
    +            patch(
    +                "weblate.utils.outbound.socket.getaddrinfo",
    +                return_value=[(0, 0, 0, "", ("127.0.0.1", 443))],
    +            ),
    +            self.assertRaises(RepositoryError) as error,
    +        ):
    +            self.repo.reset()
    +
    +        self.assert_no_popen_sequence(mock_popen, "update", "--clean", "remote(.)")
    +        self.assert_no_popen_sequence(mock_popen, "strip", "roots(outgoing())")
    +        self.assertIn("internal or non-public address", str(error.exception))
    +
     
     class VCSLocalTest(VCSGitTest):
         """Local repository testing."""
    
e4b67a76d95d

fix(project-backup): validate repository before restore

https://github.com/WeblateOrg/weblateMichal ČihařApr 17, 2026via ghsa
3 files changed · +100 1
  • docs/changes.rst+1 0 modified
    @@ -25,6 +25,7 @@ Weblate 5.17.1
     * Client-side popup notifications triggered by JavaScript now use Bootstrap toasts.
     * Borg backups that finish with warnings are no longer shown as failed in the management UI, and backup logs now show ``C`` entries for files that changed during the backup.
     * Git exporter no longer rejects shared-history fetches just because the first negotiated ``have`` revisions are newer than Weblate's local history.
    +* Project backup import now revalidates component repository URLs before restore.
     
     .. rubric:: Compatibility
     
    
  • weblate/trans/backups.py+17 1 modified
    @@ -52,7 +52,11 @@
     )
     from weblate.utils.data import data_path
     from weblate.utils.hash import checksum_to_hash, hash_to_checksum
    -from weblate.utils.validators import validate_bitmap, validate_filename
    +from weblate.utils.validators import (
    +    validate_bitmap,
    +    validate_filename,
    +    validate_repo_url,
    +)
     from weblate.utils.version import VERSION
     from weblate.vcs.models import VCS_REGISTRY
     
    @@ -622,6 +626,7 @@ def load_component(
             with zipfile.open(filename) as handle:
                 data = json.load(handle)
                 validate_schema(data, "weblate-component.schema.json")
    +            self.validate_component_urls(data["component"])
                 if skip_linked and data["component"]["repo"].startswith("weblate:"):
                     return False
                 if data["component"]["vcs"] not in VCS_REGISTRY:
    @@ -648,6 +653,17 @@ def load_component(
                     self.restore_component(zipfile, data, actor, changes)
                 return True
     
    +    @staticmethod
    +    def validate_component_urls(component: dict[str, Any]) -> None:
    +        for field in ("repo", "push"):
    +            value = component.get(field)
    +            if not value:
    +                continue
    +            try:
    +                validate_repo_url(value)
    +            except ValidationError as error:
    +                raise ValidationError({field: error.messages}) from error
    +
         @overload
         def load_components(
             self,
    
  • weblate/trans/tests/test_backups.py+82 0 modified
    @@ -52,6 +52,34 @@
     class BackupsTest(ViewTestCase):
         CREATE_GLOSSARIES: bool = True
     
    +    def write_tampered_component_backup(
    +        self, *, repo: str | None = None, push: str | None = None
    +    ) -> str:
    +        backup = ProjectBackup()
    +        backup.backup_project(self.project)
    +
    +        with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as temp_handle:
    +            temp_name = temp_handle.name
    +
    +        with (
    +            ZipFile(backup.filename, "r") as source_zip,
    +            ZipFile(temp_name, "w") as target_zip,
    +        ):
    +            for item in source_zip.infolist():
    +                data = source_zip.read(item.filename)
    +                if item.filename.endswith(
    +                    f"{self.component.slug}.json"
    +                ) and item.filename.startswith("components/"):
    +                    component_data = json.loads(data.decode("utf-8"))
    +                    if repo is not None:
    +                        component_data["component"]["repo"] = repo
    +                    if push is not None:
    +                        component_data["component"]["push"] = push
    +                    data = json.dumps(component_data).encode("utf-8")
    +                target_zip.writestr(item, data)
    +
    +        return temp_name
    +
         def test_backup_creates_history_entry(self) -> None:
             backup = ProjectBackup()
     
    @@ -355,6 +383,60 @@ def test_restore_synthesizes_source_translation_check_flags(self) -> None:
     
             self.assertEqual(restored_source.check_flags, "read-only")
     
    +    def test_restore_rejects_invalid_repo_url(self) -> None:
    +        temp_name = self.write_tampered_component_backup(
    +            repo="https://private.example/repo.git"
    +        )
    +
    +        try:
    +            restore = ProjectBackup(temp_name)
    +            with (
    +                patch(
    +                    "weblate.utils.outbound.socket.getaddrinfo",
    +                    return_value=[(0, 0, 0, "", ("127.0.0.1", 443))],
    +                ),
    +                self.assertRaises(ValidationError) as error,
    +            ):
    +                restore.validate()
    +
    +            self.assertEqual(
    +                error.exception.message_dict,
    +                {
    +                    "repo": [
    +                        "This URL is prohibited because it points to an internal or non-public address."
    +                    ]
    +                },
    +            )
    +        finally:
    +            os.unlink(temp_name)
    +
    +    def test_restore_rejects_invalid_push_url(self) -> None:
    +        temp_name = self.write_tampered_component_backup(
    +            push="https://private.example/push.git"
    +        )
    +
    +        try:
    +            restore = ProjectBackup(temp_name)
    +            with (
    +                patch(
    +                    "weblate.utils.outbound.socket.getaddrinfo",
    +                    return_value=[(0, 0, 0, "", ("127.0.0.1", 443))],
    +                ),
    +                self.assertRaises(ValidationError) as error,
    +            ):
    +                restore.validate()
    +
    +            self.assertEqual(
    +                error.exception.message_dict,
    +                {
    +                    "push": [
    +                        "This URL is prohibited because it points to an internal or non-public address."
    +                    ]
    +                },
    +            )
    +        finally:
    +            os.unlink(temp_name)
    +
         def test_create_duplicate(self) -> None:
             def extract_names(qs) -> list[str]:
                 return list(qs.order_by("name").values_list("name", flat=True))
    

Vulnerability mechanics

Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

8

News mentions

0

No linked articles in our index yet.