CVE-2026-41654
Description
Weblate is a web based localization tool. Prior to version 5.17.1, an authenticated user with project.add permission (default on hosted Weblate SaaS and for any user holding an active billing/trial plan) can import a crafted project backup ZIP whose components/<name>.json contains an attacker-chosen repo URL pointing at a private address (e.g. http://127.0.0.1:9999/) or using a non-allow-listed scheme (e.g. file://, git://). Weblate persists the component via Component.objects.bulk_create([component])[0], which bypasses Django's full_clean() and therefore never runs the validate_repo_url validator. The URL is subsequently written verbatim into .git/config by configure_repo(pull=False). This issue has been patched in version 5.17.1.
Affected packages
Versions sourced from the GitHub Security Advisory.
| Package | Affected versions | Patched versions |
|---|---|---|
weblatePyPI | < 5.17.1 | 5.17.1 |
Patches
2e1eff1f517c1fix(vcs): annotate remote operation for validation
13 files changed · +426 −140
weblate/addons/git.py+19 −14 modified@@ -44,7 +44,7 @@ def squash_repo( author: str | None = None, ) -> None: message = self.get_squash_commit_message(repository, "%B", remote) - repository.execute(["reset", "--mixed", remote]) + repository.execute(["reset", "--mixed", remote], remote_op="none") # Can happen for added and removed translation component.commit_files( author=author, message=message, signals=False, skip_push=True @@ -78,7 +78,7 @@ def get_git_commit_messages( if filenames: command += ["--", *filenames] - return repository.execute(command) + return repository.execute(command, remote_op="none") def get_squash_commit_message( self, @@ -100,7 +100,7 @@ def get_squash_commit_message( trailer_lines = set() change_id_line = None - for trailer in repository.execute(command).split("\n"): + for trailer in repository.execute(command, remote_op="none").split("\n"): # Skip blank lines if not trailer.strip(): continue @@ -158,7 +158,7 @@ def squash_language(self, component: Component, repository: GitRepository) -> No repository, "%B", remote, filenames ) - repository.execute(["reset", "--mixed", remote]) + repository.execute(["reset", "--mixed", remote], remote_op="none") for code, message in messages.items(): if not message: @@ -178,7 +178,7 @@ def squash_file(self, component: Component, repository: GitRepository) -> None: repository, "%B", remote, [filename] ) - repository.execute(["reset", "--mixed", remote]) + repository.execute(["reset", "--mixed", remote], remote_op="none") for filename, message in messages.items(): if not message: @@ -194,7 +194,8 @@ def squash_author(self, component: Component, repository: GitRepository) -> None x.split(None, 1) for x in reversed( repository.execute( - ["log", "--no-merges", "--format=%H %aE", f"{remote}..HEAD"] + ["log", "--no-merges", "--format=%H %aE", f"{remote}..HEAD"], + remote_op="none", ).splitlines() ) ] @@ -204,9 +205,9 @@ def squash_author(self, component: Component, repository: GitRepository) -> None repository.delete_branch(tmp) try: # Create local branch for upstream - repository.execute(["branch", tmp, remote]) + repository.execute(["branch", tmp, remote], remote_op="none") # Checkout upstream branch - repository.execute(["checkout", tmp]) + repository.execute(["checkout", tmp], remote_op="none") while commits: commit, author = commits.pop(0) # Remember current revision for final squash @@ -216,11 +217,12 @@ def squash_author(self, component: Component, repository: GitRepository) -> None try: repository.execute( ["cherry-pick", "--empty=drop", commit, *gpg_sign], + remote_op="none", environment={"WEBLATE_MERGE_SKIP": "1"}, ) except RepositoryError: if repository.has_git_file("CHERRY_PICK_HEAD"): - repository.execute(["cherry-pick", "--abort"]) + repository.execute(["cherry-pick", "--abort"], remote_op="none") raise handled = [] # Pick other commits by same author @@ -230,14 +232,17 @@ def squash_author(self, component: Component, repository: GitRepository) -> None try: repository.execute( ["cherry-pick", "--empty=drop", other[0], *gpg_sign], + remote_op="none", environment={"WEBLATE_MERGE_SKIP": "1"}, ) handled.append(i) except RepositoryError: # If fails, continue to another author, we will # pick this commit later (it depends on some other) if repository.has_git_file("CHERRY_PICK_HEAD"): - repository.execute(["cherry-pick", "--abort"]) + repository.execute( + ["cherry-pick", "--abort"], remote_op="none" + ) break # Remove processed commits from list for i in reversed(handled): @@ -246,15 +251,15 @@ def squash_author(self, component: Component, repository: GitRepository) -> None self.squash_repo(component, repository, base, author) # Update working copy with squashed commits - repository.execute(["checkout", repository.branch]) - repository.execute(["reset", "--hard", tmp]) + repository.execute(["checkout", repository.branch], remote_op="none") + repository.execute(["reset", "--hard", tmp], remote_op="none") repository.delete_branch(tmp) except Exception: report_error("Failed squash", project=component.project) # Revert to original branch without any changes - repository.execute(["reset", "--hard"]) - repository.execute(["checkout", repository.branch]) + repository.execute(["reset", "--hard"], remote_op="none") + repository.execute(["checkout", repository.branch], remote_op="none") repository.delete_branch(tmp) def post_commit(
weblate/addons/tests.py+8 −2 modified@@ -3373,7 +3373,10 @@ def test_cleanup(self) -> None: addon = CleanupAddon.create(component=self.component) # Unshallow the local repo with self.component.repository.lock: - self.component.repository.execute(["fetch", "--unshallow", "origin"]) + self.component.repository.execute( + ["fetch", "--unshallow", "origin"], + remote_op="pull", + ) addon.post_update( self.component, "da07dc0dc7052dc44eadfa8f3a2f2609ec634303", False ) @@ -3387,7 +3390,10 @@ def test_update(self) -> None: rev = self.component.repository.last_revision # Unshallow the local repo with self.component.repository.lock: - self.component.repository.execute(["fetch", "--unshallow", "origin"]) + self.component.repository.execute( + ["fetch", "--unshallow", "origin"], + remote_op="pull", + ) addon.post_update( self.component, "da07dc0dc7052dc44eadfa8f3a2f2609ec634303", False )
weblate/api/tests.py+3 −1 modified@@ -1953,7 +1953,9 @@ def __init__(self) -> None: def has_branch(self, _branch: str) -> bool: return False - def execute(self, args: list[str]) -> None: + def execute(self, args: list[str], *, remote_op: str) -> None: + if remote_op != "none": + raise AssertionError(remote_op) calls.append(args) def clean_revision_cache(self) -> None:
weblate/gitexport/tests.py+5 −0 modified@@ -306,6 +306,7 @@ def test_ignore_missing_requested_have(self) -> None: self.mark_component_shallow() present_revision = self.component.repository.execute( ["rev-parse", "HEAD"], + remote_op="none", needs_lock=False, ).strip() body = ( @@ -320,6 +321,7 @@ def test_ignore_missing_requested_have_with_present_want(self) -> None: self.mark_component_shallow() present_revision = self.component.repository.execute( ["rev-parse", "HEAD"], + remote_op="none", needs_lock=False, ).strip() body = ( @@ -333,6 +335,7 @@ def test_ignore_missing_requested_have_with_present_want(self) -> None: def test_stop_parsing_after_initial_want_block(self) -> None: present_revision = self.component.repository.execute( ["rev-parse", "HEAD"], + remote_op="none", needs_lock=False, ).strip() body = ( @@ -410,6 +413,7 @@ def test_missing_have_does_not_short_circuit_upload_pack(self) -> None: self.mark_component_shallow() present_revision = self.component.repository.execute( ["rev-parse", "HEAD"], + remote_op="none", needs_lock=False, ).strip() body = ( @@ -533,6 +537,7 @@ def create_export_commit(self) -> str: self.component.repository.commit("Test export change", files=[filename]) return self.component.repository.execute( ["rev-parse", "HEAD"], + remote_op="none", needs_lock=False, ).strip()
weblate/gitexport/views.py+1 −0 modified@@ -229,6 +229,7 @@ def get_precheck_failure_reason(component: Component, body: bytes) -> str | None try: output = execute( ["cat-file", "--batch-check"], + remote_op="none", needs_lock=False, stdin="".join(f"{revision}\n" for revision in wanted_revisions), )
weblate/trans/component_copy.py+2 −2 modified@@ -122,9 +122,9 @@ def normalize_local_copy_branch(component: Component) -> None: with repository.lock: if repository.has_branch(component.branch): - repository.execute(["checkout", component.branch]) + repository.execute(["checkout", component.branch], remote_op="none") else: - repository.execute(["checkout", "-B", component.branch]) + repository.execute(["checkout", "-B", component.branch], remote_op="none") repository.branch = component.branch repository.clean_revision_cache()
weblate/trans/tests/test_component.py+2 −1 modified@@ -1232,7 +1232,8 @@ def restore_local_missing_translation_files( self.local_missing_translation_contents[translation.pk] ) translation.component.repository.execute( - ["add", "--force", "--", translation.filename] + ["add", "--force", "--", translation.filename], + remote_op="none", ) def test_reset_keep_recreates_missing_translation_file(self) -> None:
weblate/trans/tests/test_git_views.py+4 −1 modified@@ -175,7 +175,10 @@ def setUp(self) -> None: super().setUp() repo = self.component.repository with repo.lock: - repo.execute(["branch", "--delete", "--remotes", "origin/main"]) + repo.execute( + ["branch", "--delete", "--remotes", "origin/main"], + remote_op="none", + ) class GitBrokenComponentTest(GitBrokenProjectTest):
weblate/trans/tests/test_remote.py+3 −2 modified@@ -595,9 +595,10 @@ def test_pending_changes_preserved_on_non_translation_update(self) -> None: "# Test Project\n\nThis is a test README.\n", encoding="utf-8" ) with self.component.repository.lock: - self.component.repository.execute(["add", "README.md"]) + self.component.repository.execute(["add", "README.md"], remote_op="none") self.component.repository.execute( - ["commit", "-m", "Add README.md (non-translation file)"] + ["commit", "-m", "Add README.md (non-translation file)"], + remote_op="none", ) self.component.repository.push(self.component.push_branch)
weblate/vcs/base.py+39 −12 modified@@ -13,7 +13,7 @@ import subprocess # noqa: S404 from contextlib import suppress from pathlib import Path -from typing import TYPE_CHECKING, ClassVar, Self, TypedDict +from typing import TYPE_CHECKING, ClassVar, Literal, Self, TypedDict from dateutil import parser from django.core.cache import cache @@ -61,6 +61,9 @@ class SubprocessArgs(TypedDict, total=False): input: str +type RemoteOperation = Literal["none", "pull", "push"] + + class RepositoryLock: def __init__(self, repository: Repository, lock: WeblateLock) -> None: self.repository = repository @@ -129,6 +132,14 @@ def __str__(self) -> str: return self.get_message() +class RepositoryCommandError(RepositoryError): + """Error raised by the underlying VCS command.""" + + +class RepositoryValidationError(RepositoryError): + """Error raised when repository configuration violates runtime policy.""" + + class RepositorySymlinkError(ValueError): """Raised when symlink resolution fails due to links outside the repository tree or excessive symlink depth.""" @@ -395,7 +406,7 @@ def _popen( if isinstance(error.stderr, bytes) else error.stderr ) - raise RepositoryError( + raise RepositoryCommandError( 0, f"Subprocess didn't complete before {error.timeout} seconds\n{stdout}{stderr or ''}", ) from error @@ -423,7 +434,7 @@ def _popen( retry=False, ) - raise RepositoryError(process.returncode, errormessage) + raise RepositoryCommandError(process.returncode, errormessage) return process.stdout @staticmethod @@ -451,6 +462,7 @@ def execute( self, args: list[str], *, + remote_op: RemoteOperation, needs_lock: bool = True, fullcmd: bool = False, merge_err: bool = True, @@ -465,6 +477,10 @@ def execute( raise RuntimeError(msg) if self.component: self.ensure_config_updated() + if remote_op == "pull": + self.validate_pull_url() + elif remote_op == "push": + self.validate_push_url() is_status = args[0] == self._cmd_status[0] try: self.last_output = self._popen( @@ -476,14 +492,14 @@ def execute( stdin=stdin, environment=environment, ) - except RepositoryError as error: + except RepositoryCommandError as error: if not is_status and not self.local: self.log_status(error) raise return self.last_output def log_status(self, error: str | RepositoryError) -> None: - with suppress(RepositoryError): + with suppress(RepositoryCommandError): self.log(f"failure {error}") self.log(self.status()) @@ -499,13 +515,21 @@ def last_revision(self): return self.get_last_revision() def get_last_revision(self): - return self.execute(self._cmd_last_revision, needs_lock=False, merge_err=False) + return self.execute( + self._cmd_last_revision, + remote_op="none", + needs_lock=False, + merge_err=False, + ) @cached_property def last_remote_revision(self): """Return last remote revision.""" return self.execute( - self._cmd_last_remote_revision, needs_lock=False, merge_err=False + self._cmd_last_remote_revision, + remote_op="none", + needs_lock=False, + merge_err=False, ) @classmethod @@ -523,7 +547,7 @@ def validate_remote_url(url: str) -> None: try: validate_repo_url(url) except ValidationError as error: - raise RepositoryError(0, "; ".join(error.messages)) from error + raise RepositoryValidationError(0, "; ".join(error.messages)) from error def validate_pull_url(self, url: str | None = None) -> None: """Validate the pull URL in the current runtime context.""" @@ -560,7 +584,7 @@ def update_remote(self) -> None: def status(self) -> str: """Return status of the repository.""" - return self.execute(self._cmd_status, needs_lock=False) + return self.execute(self._cmd_status, remote_op="none", needs_lock=False) def push(self, branch: str) -> None: """Push given branch to remote repository.""" @@ -820,9 +844,9 @@ def cleanup_files(self) -> None: def cleanup(self) -> None: """Cleanup repository status.""" # Recover from failed merge/rebase - with suppress(RepositoryError): + with suppress(RepositoryCommandError): self.merge(abort=True) - with suppress(RepositoryError): + with suppress(RepositoryCommandError): self.rebase(abort=True) # Remove stale branches self.remove_stale_branches() @@ -844,7 +868,10 @@ def list_changed_files(self, refspec: str) -> list: This is not universal as refspec is different per vcs. """ lines = self.execute( - [*self._cmd_list_changed_files, refspec], needs_lock=False, merge_err=False + [*self._cmd_list_changed_files, refspec], + remote_op="none", + needs_lock=False, + merge_err=False, ).splitlines() return list(self.parse_changed_files(lines))
weblate/vcs/git.py+130 −67 modified@@ -46,7 +46,7 @@ from weblate.utils.lock import WeblateLock, WeblateLockTimeoutError from weblate.utils.render import render_template from weblate.utils.xml import parse_xml -from weblate.vcs.base import Repository, RepositoryError +from weblate.vcs.base import Repository, RepositoryCommandError, RepositoryError from weblate.vcs.gpg import get_gpg_sign_key if TYPE_CHECKING: @@ -183,6 +183,7 @@ def validate_branch_name(cls, branch: str) -> str: def get_remote_branch(cls, repo: str): if not repo: return super().get_remote_branch(repo) + cls.validate_remote_url(repo) try: result = cls._popen(["ls-remote", "--symref", "--", repo, "HEAD"]) except RepositoryError: @@ -251,23 +252,26 @@ def recover_lock_session(self) -> None: current_branch = self.get_current_branch() if current_branch not in TEMPORARY_BRANCHES: return - self.execute(["reset", "--hard"]) + self.execute(["reset", "--hard"], remote_op="none") self.checkout_with_temp_cleanup(self.branch) - with suppress(RepositoryError): - self.execute(["branch", "-D", current_branch]) + with suppress(RepositoryCommandError): + self.execute(["branch", "-D", current_branch], remote_op="none") self.clean_revision_cache() def get_current_branch(self) -> str: return self.execute( - ["branch", "--show-current"], needs_lock=False, merge_err=False + ["branch", "--show-current"], + remote_op="none", + needs_lock=False, + merge_err=False, ).strip() def checkout_with_temp_cleanup(self, branch: str) -> None: current_branch = self.get_current_branch() - self.execute(["checkout", branch]) + self.execute(["checkout", branch], remote_op="none") if current_branch in TEMPORARY_BRANCHES and current_branch != branch: - with suppress(RepositoryError): - self.execute(["branch", "-D", current_branch]) + with suppress(RepositoryCommandError): + self.execute(["branch", "-D", current_branch], remote_op="none") self.clean_revision_cache() @staticmethod @@ -309,29 +313,37 @@ def _clone(cls, source: str, target: str, branch: str) -> None: def get_config(self, path): """Read entry from configuration.""" - return self.execute(["config", path], needs_lock=False, merge_err=False).strip() + return self.execute( + ["config", path], + remote_op="none", + needs_lock=False, + merge_err=False, + ).strip() def set_committer(self, name, mail) -> None: """Configure committer name.""" self.config_update(("user", "name", name), ("user", "email", mail)) def reset(self) -> None: """Reset working copy to match remote branch.""" - self.execute(["reset", "--hard", self.get_remote_branch_name()]) + self.execute( + ["reset", "--hard", self.get_remote_branch_name()], + remote_op="none", + ) self.clean_revision_cache() def rebase(self, abort=False) -> None: """Rebase working copy on top of remote branch.""" if abort: if self.has_git_file("rebase-apply") or self.has_git_file("rebase-merge"): - self.execute(["rebase", "--abort"]) + self.execute(["rebase", "--abort"], remote_op="none") if self.needs_commit(): - self.execute(["reset", "--hard"]) + self.execute(["reset", "--hard"], remote_op="none") else: cmd = ["rebase"] cmd.extend(self.get_gpg_sign_args()) cmd.append(self.get_remote_branch_name()) - self.execute(cmd, environment={"WEBLATE_MERGE_SKIP": "1"}) + self.execute(cmd, remote_op="none", environment={"WEBLATE_MERGE_SKIP": "1"}) self.clean_revision_cache() def has_git_file(self, name): @@ -341,6 +353,7 @@ def has_rev(self, rev) -> bool: try: self.execute( ["rev-parse", "--verify", "--end-of-options", rev], + remote_op="none", needs_lock=False, ) except RepositoryError: @@ -356,11 +369,11 @@ def merge( if abort: # Abort merge if there is one to abort if self.has_rev("MERGE_HEAD"): - self.execute(["merge", "--abort"]) + self.execute(["merge", "--abort"], remote_op="none") if self.needs_commit(): - self.execute(["reset", "--hard"]) + self.execute(["reset", "--hard"], remote_op="none") # Checkout original branch (we might be on tmp) - self.execute(["checkout", current_branch]) + self.execute(["checkout", current_branch], remote_op="none") else: self.delete_branch(tmp) # We don't do simple git merge origin/branch as that leads @@ -369,9 +382,9 @@ def merge( # changes merged into Weblate) remote = self.get_remote_branch_name() # Create local branch for upstream - self.execute(["branch", tmp, remote]) + self.execute(["branch", tmp, remote], remote_op="none") # Checkout upstream branch - self.execute(["checkout", tmp]) + self.execute(["checkout", tmp], remote_op="none") # Merge current Weblate changes, this can lead to conflict cmd = [ "merge", @@ -382,20 +395,20 @@ def merge( cmd.append("--no-ff") cmd.extend(self.get_gpg_sign_args()) cmd.append(current_branch) - self.execute(cmd) + self.execute(cmd, remote_op="none") # Checkout branch with Weblate changes - self.execute(["checkout", current_branch]) + self.execute(["checkout", current_branch], remote_op="none") # Merge temporary branch (this is fast forward so does not create # merge commit) - self.execute(["merge", tmp]) + self.execute(["merge", tmp], remote_op="none") # Delete temporary branch self.delete_branch(tmp) self.clean_revision_cache() def delete_branch(self, name) -> None: if self.has_branch(name): - self.execute(["branch", "-D", name]) + self.execute(["branch", "-D", name], remote_op="none") def needs_commit(self, filenames: list[str] | None = None) -> bool: """Check whether repository needs commit.""" @@ -404,7 +417,7 @@ def needs_commit(self, filenames: list[str] | None = None) -> bool: cmd.extend(["--untracked-files=all", "--ignored=traditional", "--"]) cmd.extend(filenames) with self.lock: - status = self.execute(cmd, merge_err=False) + status = self.execute(cmd, remote_op="none", merge_err=False) return bool(status) def show(self, revision: str) -> str: @@ -413,7 +426,12 @@ def show(self, revision: str) -> str: Used in tests. """ - return self.execute(["show", revision], needs_lock=False, merge_err=False) + return self.execute( + ["show", revision], + remote_op="none", + needs_lock=False, + merge_err=False, + ) @staticmethod def get_gpg_sign_args(): @@ -426,6 +444,7 @@ def _get_revision_info(self, revision): """Return dictionary with detailed revision info.""" text = self.execute( ["log", "-1", "--format=fuller", "--date=rfc", "--abbrev-commit", revision], + remote_op="none", needs_lock=False, merge_err=False, ) @@ -463,6 +482,7 @@ def log_revisions(self, refspec): """Return revision log for given refspec.""" return self.execute( ["log", "--format=format:%H", refspec, "--"], + remote_op="none", needs_lock=False, merge_err=False, ).splitlines() @@ -486,11 +506,14 @@ def commit( for name in files: try: # Resolving symlinks is needed for symlinks in directory structure - self.execute(["add", "--force", "--", self.resolve_symlinks(name)]) + self.execute( + ["add", "--force", "--", self.resolve_symlinks(name)], + remote_op="none", + ) except RepositoryError: continue else: - self.execute(["add", self.path]) + self.execute(["add", self.path], remote_op="none") # Bail out if there is nothing to commit. # This can easily happen with squashing and reverting changes. @@ -506,7 +529,7 @@ def commit( cmd.extend(self.get_gpg_sign_args()) # Execute it - self.execute(cmd, stdin=message) + self.execute(cmd, remote_op="none", stdin=message) # Clean cache self.clean_revision_cache() @@ -520,7 +543,7 @@ def remove( extra_commit_files: list[str] | None = None, ) -> None: """Remove files and creates new revision.""" - self.execute(["rm", "--force", "--", *files]) + self.execute(["rm", "--force", "--", *files], remote_op="none") self.commit(message, author, files=files + (extra_commit_files or [])) def get_remote_configure( @@ -570,7 +593,12 @@ def list_branches(self, *args: str) -> list[str]: # (we get additional * there indicating current branch) return [ x.lstrip("*").strip() - for x in self.execute(cmd, needs_lock=False, merge_err=False).splitlines() + for x in self.execute( + cmd, + remote_op="none", + needs_lock=False, + merge_err=False, + ).splitlines() ] def has_branch(self, branch): @@ -582,7 +610,10 @@ def configure_branch(self, branch) -> None: branch = self.validate_branch_name(branch) # Add branch if not self.has_branch(branch): - self.execute(["checkout", "-b", branch, f"origin/{branch}"]) + self.execute( + ["checkout", "-b", branch, f"origin/{branch}"], + remote_op="none", + ) else: # Ensure it tracks correct upstream self.config_update((f'branch "{branch}"', "remote", "origin")) @@ -594,7 +625,10 @@ def configure_branch(self, branch) -> None: def describe(self) -> str: """Verbosely describes current revision.""" return self.execute( - ["describe", "--always"], needs_lock=False, merge_err=False + ["describe", "--always"], + remote_op="none", + needs_lock=False, + merge_err=False, ).strip() @classmethod @@ -655,32 +689,42 @@ def get_file(self, path, revision) -> str: """Return content of file at given revision.""" return self.execute( ["show", f"{revision}:{path}"], + remote_op="none", needs_lock=False, merge_err=False, ) def remove_stale_branches(self) -> None: """Remove stale branches and tags from the repository.""" # Prune remote branches, this can fail if repository is unreachable - with suppress(RepositoryError): - self.execute([*self.get_auth_args(), "remote", "prune", "origin"]) + with suppress(RepositoryCommandError): + self.execute( + [*self.get_auth_args(), "remote", "prune", "origin"], + remote_op="pull", + ) # Remove possible stale branches for branch in self.list_branches(): if branch != self.branch: - self.execute(["branch", "--delete", "--force", branch]) + self.execute( + ["branch", "--delete", "--force", branch], remote_op="none" + ) # Remove any tags - for tag in self.execute(["tag", "--list"], merge_err=False).splitlines(): - self.execute(["tag", "--delete", tag]) + for tag in self.execute( + ["tag", "--list"], + remote_op="none", + merge_err=False, + ).splitlines(): + self.execute(["tag", "--delete", tag], remote_op="none") def cleanup_files(self) -> None: """Remove not tracked files from the repository.""" - self.execute(["clean", "-f", "-d"]) + self.execute(["clean", "-f", "-d"], remote_op="none") def list_remote_branches(self) -> list[str]: """Return a list of remote branch names by querying the remote repository using 'git ls-remote --heads origin'.""" - self.validate_pull_url() branches = self.execute( [*self.get_auth_args(), "ls-remote", "--heads", "origin"], + remote_op="pull", needs_lock=False, merge_err=False, ) @@ -693,8 +737,10 @@ def update_remote(self) -> None: """Update remote repository.""" branch = self.validate_branch_name(self.branch) # Update existing branch only, not changing depth - self.validate_pull_url() - self.execute([*self.get_auth_args(), "fetch", "origin", branch]) + self.execute( + [*self.get_auth_args(), "fetch", "origin", branch], + remote_op="pull", + ) self.clean_revision_cache() def push(self, branch: str) -> None: @@ -705,12 +751,10 @@ def push(self, branch: str) -> None: if branch else current_branch ) - self.validate_push_url() - self.execute([*self._cmd_push, "origin", refspec]) + self.execute([*self._cmd_push, "origin", refspec], remote_op="push") def unshallow(self) -> None: - self.validate_pull_url() - self.execute([*self.get_auth_args(), "fetch", "--unshallow"]) + self.execute([*self.get_auth_args(), "fetch", "--unshallow"], remote_op="pull") def parse_changed_files(self, lines: list[str]) -> Iterator[str]: """Parse output with changed files.""" @@ -720,18 +764,22 @@ def parse_changed_files(self, lines: list[str]) -> Iterator[str]: def status(self) -> str: result = [super().status()] - cleanups = self.execute(["clean", "-f", "-d", "-n"], needs_lock=False) + cleanups = self.execute( + ["clean", "-f", "-d", "-n"], + remote_op="none", + needs_lock=False, + ) if cleanups: result.extend(("", gettext("Possible cleanups:"), "", cleanups)) return "\n".join(result) def compact(self) -> None: - self.execute(["gc"]) + self.execute(["gc"], remote_op="none") def maintenance(self) -> None: # Expire old reflog entries (using Git defaults) - self.execute(["reflog", "expire"]) + self.execute(["reflog", "expire"], remote_op="none") # Super will invoke remove_stale_branches() and compact() super().maintenance() @@ -763,11 +811,11 @@ def get_username_from_url(self, url) -> str: return "" def push(self, branch) -> None: - self.validate_push_url() if self.needs_push(): try: self.execute( - ["review", "--yes", self.validate_branch_name(self.branch)] + ["review", "--yes", self.validate_branch_name(self.branch)], + remote_op="push", ) except RepositoryError as error: if "(no new changes)" in str(error): @@ -886,16 +934,15 @@ def configure_remote( raise RepositoryError(-1, "Can not switch subversion URL") return args, self._fetch_revision = self.get_remote_args(pull_url, self.path) - self.execute(["svn", "init", *args]) + self.execute(["svn", "init", *args], remote_op="none") def update_remote(self) -> None: """Update remote repository.""" - self.validate_pull_url() if self._fetch_revision: - self.execute(["svn", "fetch", self._fetch_revision]) + self.execute(["svn", "fetch", self._fetch_revision], remote_op="pull") self._fetch_revision = None else: - self.execute(["svn", "fetch", "--parent"]) + self.execute(["svn", "fetch", "--parent"], remote_op="pull") self.clean_revision_cache() @classmethod @@ -929,17 +976,17 @@ def rebase(self, abort=False) -> None: Git-svn does not support merge. """ if abort: - self.execute(["rebase", "--abort"]) + self.execute(["rebase", "--abort"], remote_op="none") else: - self.validate_pull_url() - self.execute(["svn", "rebase"]) + self.execute(["svn", "rebase"], remote_op="pull") self.clean_revision_cache() @cached_property def last_remote_revision(self) -> str: """Return last remote revision.""" return self.execute( ["log", "-n", "1", "--format=format:%H", self.get_remote_branch_name()], + remote_op="none", needs_lock=False, merge_err=False, ) @@ -968,8 +1015,10 @@ def list_remote_branches(self) -> list[str]: def push(self, branch: str) -> None: """Push given branch to remote repository.""" - self.validate_pull_url() - self.execute(["svn", "dcommit", self.validate_branch_name(self.branch)]) + self.execute( + ["svn", "dcommit", self.validate_branch_name(self.branch)], + remote_op="pull", + ) class GitForcePushRepository(GitRepository): @@ -1050,16 +1099,16 @@ def merge( # as we're expecting there will be an additional merge # commit created from the merge request. if abort: - self.execute(["merge", "--abort"]) + self.execute(["merge", "--abort"], remote_op="none") # Needed for compatibility with original merge code - self.execute(["checkout", current_branch]) + self.execute(["checkout", current_branch], remote_op="none") else: cmd = ["merge"] if no_ff: cmd.append("--no-ff") cmd.extend(self.get_gpg_sign_args()) cmd.append(self.get_remote_branch_name()) - self.execute(cmd) + self.execute(cmd, remote_op="none") self.clean_revision_cache() def parse_repo_url( @@ -1206,7 +1255,8 @@ def push_to_fork( "--force", credentials["username"], f"{local_branch}:{fork_branch}", - ] + ], + remote_op="push", ) def configure_fork_remote( @@ -1242,7 +1292,7 @@ def get_remote_branch_name(self, branch: str | None = None) -> str: def fork(self, credentials: GitCredentials) -> None: """Create fork of original repository if one doesn't exist yet.""" - remotes = self.execute(["remote"]).splitlines() + remotes = self.execute(["remote"], remote_op="none").splitlines() if credentials["username"] not in remotes: self.create_fork(credentials) @@ -1532,7 +1582,7 @@ def raise_for_response(cls, response: requests.Response) -> None: raise RepositoryError(0, "Invalid token") def fork(self, credentials: GitCredentials) -> None: - remotes = self.execute(["remote"]).splitlines() + remotes = self.execute(["remote"], remote_op="none").splitlines() if credentials["username"] not in remotes: self.create_fork(credentials) return @@ -1703,7 +1753,12 @@ def __get_forked_id(self, credentials: GitCredentials, remote: str) -> str: where the name is enough). """ cmd = ["remote", "get-url", "--push", remote] - fork_remotes = self.execute(cmd, needs_lock=False, merge_err=False).splitlines() + fork_remotes = self.execute( + cmd, + remote_op="none", + needs_lock=False, + merge_err=False, + ).splitlines() (_scheme, _username, _password, hostname, owner, slug) = self.parse_repo_url( fork_remotes[0] ) @@ -2016,6 +2071,9 @@ def merge( def list_remote_branches(self) -> list[str]: return [] + def remove_stale_branches(self) -> None: + return + @classmethod def get_remote_branch(cls, repo: str): # noqa: ARG003 return cls.default_branch @@ -2063,7 +2121,7 @@ def build_local_repo(cls, target: str, commit_message: str): # Populate files yield repo # Add to repository - repo.execute(["add", target]) + repo.execute(["add", target], remote_op="none") if repo.needs_commit(): repo.commit(commit_message) @@ -2116,7 +2174,12 @@ def get_forked_url(self, credentials: GitCredentials) -> str: """ target_path = credentials["url"].rsplit("/", 1)[-1] cmd = ["remote", "get-url", "--push", credentials["username"]] - fork_remotes = self.execute(cmd, needs_lock=False, merge_err=False).splitlines() + fork_remotes = self.execute( + cmd, + remote_op="none", + needs_lock=False, + merge_err=False, + ).splitlines() fork_path = self.get_fork_path(fork_remotes[0]) return credentials["url"].replace(target_path, fork_path)
weblate/vcs/mercurial.py+47 −24 modified@@ -25,6 +25,8 @@ from django_stubs_ext import StrOrPromise + from weblate.vcs.base import RemoteOperation + VERSION_RE = re.compile(r".*\(version ([^)]*)\).*") @@ -62,6 +64,7 @@ class HgRepository(Repository): default_branch: ClassVar[str] = "default" ref_to_remote: ClassVar[str] = "head() and branch(.) and not closed() - ." ref_from_remote: ClassVar[str] = "outgoing()" + remote_revset_markers: ClassVar[tuple[str, ...]] = ("remote(", "outgoing(") @staticmethod def sanitize_error_message(errormessage: str) -> str: @@ -132,9 +135,9 @@ def set_committer(self, name, mail) -> None: def reset(self) -> None: """Reset working copy to match remote branch.""" self.set_config_values(("extensions", "strip", "")) - self.execute(["update", "--clean", "remote(.)"]) + self.execute(["update", "--clean", "remote(.)"], remote_op="pull") if self.needs_push(): - self.execute(["strip", "roots(outgoing())"]) + self.execute(["strip", "roots(outgoing())"], remote_op="pull") self.clean_revision_cache() def configure_merge(self) -> None: @@ -169,21 +172,24 @@ def rebase(self, abort=False) -> None: """Rebase working copy on top of remote branch.""" self.set_config_values(("extensions", "rebase", "")) if abort: - self.execute(["rebase", "--abort"]) + self.execute(["rebase", "--abort"], remote_op="none") elif self.needs_merge(): if self.needs_ff(): - self.execute(["update", "--clean", "remote(.)"]) + self.execute(["update", "--clean", "remote(.)"], remote_op="pull") else: self.configure_merge() try: - self.execute(["rebase", "-d", "remote(.)"]) + self.execute(["rebase", "-d", "remote(.)"], remote_op="pull") except RepositoryError as error: # Mercurial 3.8 changed error code and output if ( error.retcode in {1, 255} and "nothing to rebase" in error.args[0] ): - self.execute(["update", "--clean", "remote(.)"]) + self.execute( + ["update", "--clean", "remote(.)"], + remote_op="pull", + ) self.clean_revision_cache() return raise @@ -194,30 +200,30 @@ def merge( ) -> None: """Merge remote branch or reverts the merge.""" if abort: - self.execute(["update", "--clean", "."]) + self.execute(["update", "--clean", "."], remote_op="none") elif self.needs_merge(): if self.needs_ff() and not no_ff: - self.execute(["update", "--clean", "remote(.)"]) + self.execute(["update", "--clean", "remote(.)"], remote_op="pull") else: self.configure_merge() # Fallback to merge try: - self.execute(["merge", "-r", "remote(.)"]) + self.execute(["merge", "-r", "remote(.)"], remote_op="pull") except RepositoryError as error: if error.retcode == 255: # Nothing to merge self.clean_revision_cache() return raise - self.execute(["commit", "--message", "Merge"]) + self.execute(["commit", "--message", "Merge"], remote_op="none") self.clean_revision_cache() def needs_commit(self, filenames: list[str] | None = None): """Check whether repository needs commit.""" cmd = ["status", "--"] if filenames: cmd.extend(filenames) - status = self.execute(cmd, needs_lock=False) + status = self.execute(cmd, remote_op="none", needs_lock=False) return bool(status) def _get_revision_info(self, revision): @@ -237,6 +243,7 @@ def _get_revision_info(self, revision): """ text = self.execute( ["log", "--limit", "1", "--template", template, "--rev", revision], + remote_op="none", needs_lock=False, merge_err=False, ) @@ -270,10 +277,18 @@ def log_revisions(self, refspec): """Return revisin log for given refspec.""" return self.execute( ["log", "--template", "{node}\n", "--rev", refspec], + remote_op=self.get_revset_remote_op(refspec), needs_lock=False, merge_err=False, ).splitlines() + @classmethod + def get_revset_remote_op(cls, refspec: str) -> RemoteOperation: + """Determine whether a revset can contact the configured remote.""" + if any(marker in refspec for marker in cls.remote_revset_markers): + return "pull" + return "none" + def needs_ff(self): """ Check whether repository needs a fast-forward to upstream. @@ -312,10 +327,10 @@ def commit( if files is not None: for name in files: try: - self.execute(["add", "--", name]) + self.execute(["add", "--", name], remote_op="none") except RepositoryError: try: - self.execute(["remove", "--", name]) + self.execute(["remove", "--", name], remote_op="none") except RepositoryError: continue cmd.append(name) @@ -326,7 +341,7 @@ def commit( return False # Execute it - self.execute(cmd) + self.execute(cmd, remote_op="none") # Clean cache self.clean_revision_cache() @@ -340,7 +355,7 @@ def remove( extra_commit_files: list[str] | None = None, ) -> None: """Remove files and creates new revision.""" - self.execute(["remove", "--force", "--", *files]) + self.execute(["remove", "--force", "--", *files], remote_op="none") self.commit(message, author, files=files + (extra_commit_files or [])) def configure_remote( @@ -370,12 +385,15 @@ def configure_remote( self.branch = branch def on_branch(self, branch): - return branch == self.execute(["branch"], merge_err=False).strip() + return ( + branch + == self.execute(["branch"], remote_op="none", merge_err=False).strip() + ) def configure_branch(self, branch) -> None: """Configure repository branch.""" if not self.on_branch(branch): - self.execute(["update", "--", branch]) + self.execute(["update", "--", branch], remote_op="none") self.branch = branch def describe(self): @@ -388,15 +406,15 @@ def describe(self): "--template", "{latesttag}-{latesttagdistance}-{node|short}", ], + remote_op="none", needs_lock=False, merge_err=False, ).strip() def push(self, branch) -> None: """Push given branch to remote repository.""" - self.validate_push_url() try: - self.execute(["push", f"--branch={self.branch}"]) + self.execute(["push", f"--branch={self.branch}"], remote_op="push") except RepositoryError as error: if error.retcode == 1: # No changes found @@ -406,7 +424,10 @@ def push(self, branch) -> None: def get_file(self, path, revision): """Return content of file at given revision.""" return self.execute( - ["cat", "--rev", revision, path], needs_lock=False, merge_err=False + ["cat", "--rev", revision, path], + remote_op="none", + needs_lock=False, + merge_err=False, ) def remove_stale_branches(self) -> None: @@ -416,12 +437,11 @@ def remove_stale_branches(self) -> None: def cleanup_files(self) -> None: """Remove not tracked files from the repository.""" self.set_config_values(("extensions", "purge", "")) - self.execute(["purge"]) + self.execute(["purge"], remote_op="none") def update_remote(self) -> None: """Update remote repository.""" - self.validate_pull_url() - self.execute(["pull", f"--branch={self.branch}"]) + self.execute(["pull", f"--branch={self.branch}"], remote_op="pull") self.clean_revision_cache() def parse_changed_files(self, lines: list[str]) -> Iterator[str]: @@ -450,5 +470,8 @@ def global_setup(cls) -> None: def show(self, revision: str) -> str: return self.execute( - ["diff", "--change", revision], needs_lock=False, merge_err=False + ["diff", "--change", revision], + remote_op="none", + needs_lock=False, + merge_err=False, )
weblate/vcs/tests/test_vcs.py+163 −14 modified@@ -27,8 +27,10 @@ from weblate.utils.files import REPO_TEMP_DIRNAME from weblate.utils.render import render_template from weblate.vcs.base import ( + RepositoryCommandError, RepositoryError, RepositorySymlinkError, + RepositoryValidationError, get_config_check_cache_key, is_ssh_host_key_mismatch_error, is_ssh_host_key_verification_error, @@ -385,7 +387,10 @@ def test_configure_branch_requires_lock_for_branch_creation(self) -> None: def test_configure_branch_recovers_temp_branch_on_next_lock(self) -> None: with self.repo.lock: - self.repo.execute(["checkout", "-b", "weblate-squash-tmp"]) + self.repo.execute( + ["checkout", "-b", "weblate-squash-tmp"], + remote_op="none", + ) temp_dir = self.repo.get_repo_temp_dir() if temp_dir is None: self.fail("Expected Git temp dir to exist") @@ -401,7 +406,11 @@ def test_configure_branch_recovers_temp_branch_on_next_lock(self) -> None: self.assertEqual(self.repo.get_current_branch(), "main") self.assertNotIn("weblate-squash-tmp", self.repo.list_branches()) self.assertFalse( - self.repo.execute(["status", "--short"], needs_lock=False).strip() + self.repo.execute( + ["status", "--short"], + remote_op="none", + needs_lock=False, + ).strip() ) @@ -528,6 +537,24 @@ def get_fake_component(self): pk=-1, ) + def assert_no_popen_sequence( + self, + mocked_popen, + *sequence: str, + ) -> None: + width = len(sequence) + + self.assertFalse( + any( + any( + tuple(call.args[0][index : index + width]) == sequence + for index in range(len(call.args[0]) - width + 1) + ) + for call in mocked_popen.call_args_list + ), + mocked_popen.call_args_list, + ) + def clone_repo(self, path): return self._class.clone( self.get_remote_repo_url(), @@ -613,7 +640,7 @@ def test_list_remote_branches_runtime_private_url_rejected(self) -> None: self.repo.component.repo = "https://private.example/repo.git" with ( self.repo.lock, - patch.object(self.repo, "execute") as mock_execute, + patch.object(self._class, "_popen") as mock_popen, patch( "weblate.utils.outbound.socket.getaddrinfo", return_value=[(0, 0, 0, "", ("127.0.0.1", 443))], @@ -622,16 +649,51 @@ def test_list_remote_branches_runtime_private_url_rejected(self) -> None: ): self.repo.list_remote_branches() - mock_execute.assert_not_called() + mock_popen.assert_not_called() + self.assertIn("internal or non-public address", str(error.exception)) + + def test_remove_stale_branches_runtime_private_url_rejected(self) -> None: + if self._class in {SubversionRepository, HgRepository, LocalRepository}: + self.skipTest("Covered by backend-specific behavior") + self.repo.component.repo = "https://private.example/repo.git" + with ( + self.repo.lock, + patch.object(self._class, "_popen") as mock_popen, + patch( + "weblate.utils.outbound.socket.getaddrinfo", + return_value=[(0, 0, 0, "", ("127.0.0.1", 443))], + ), + self.assertRaises(RepositoryValidationError) as error, + ): + self.repo.remove_stale_branches() + + self.assert_no_popen_sequence(mock_popen, "remote", "prune", "origin") self.assertIn("internal or non-public address", str(error.exception)) + def test_remove_stale_branches_ignores_command_errors(self) -> None: + if self._class in {SubversionRepository, HgRepository, LocalRepository}: + self.skipTest("Covered by backend-specific behavior") + + original_execute = self.repo.execute + + def mocked_execute(args: list[str], **kwargs): + if tuple(args[-3:]) == ("remote", "prune", "origin"): + raise RepositoryCommandError(128, "remote unavailable") + return original_execute(args, **kwargs) + + with ( + self.repo.lock, + patch.object(self.repo, "execute", side_effect=mocked_execute), + ): + self.repo.remove_stale_branches() + def test_update_remote_runtime_private_url_rejected(self) -> None: if self._class in {SubversionRepository, HgRepository, LocalRepository}: self.skipTest("Covered by backend-specific behavior") self.repo.component.repo = "https://private.example/repo.git" with ( self.repo.lock, - patch.object(self.repo, "execute") as mock_execute, + patch.object(self._class, "_popen") as mock_popen, patch( "weblate.utils.outbound.socket.getaddrinfo", return_value=[(0, 0, 0, "", ("127.0.0.1", 443))], @@ -640,7 +702,7 @@ def test_update_remote_runtime_private_url_rejected(self) -> None: ): self.repo.update_remote() - mock_execute.assert_not_called() + self.assert_no_popen_sequence(mock_popen, "fetch") self.assertIn("internal or non-public address", str(error.exception)) def test_push(self, branch: str = "") -> None: @@ -653,7 +715,7 @@ def test_push_runtime_private_url_rejected(self) -> None: self.repo.component.push = "https://private.example/repo.git" with ( self.repo.lock, - patch.object(self.repo, "execute") as mock_execute, + patch.object(self._class, "_popen") as mock_popen, patch( "weblate.utils.outbound.socket.getaddrinfo", return_value=[(0, 0, 0, "", ("127.0.0.1", 443))], @@ -662,7 +724,24 @@ def test_push_runtime_private_url_rejected(self) -> None: ): self.repo.push("") - mock_execute.assert_not_called() + self.assert_no_popen_sequence(mock_popen, "push") + self.assert_no_popen_sequence(mock_popen, "review") + self.assertIn("internal or non-public address", str(error.exception)) + + def test_get_remote_branch_runtime_private_url_rejected(self) -> None: + if not issubclass(self._class, GitRepository) or self._class is LocalRepository: + self.skipTest("Covered by backend-specific behavior") + with ( + patch.object(self._class, "_popen") as mock_popen, + patch( + "weblate.utils.outbound.socket.getaddrinfo", + return_value=[(0, 0, 0, "", ("127.0.0.1", 443))], + ), + self.assertRaises(RepositoryError) as error, + ): + self._class.get_remote_branch("https://private.example/repo.git") + + mock_popen.assert_not_called() self.assertIn("internal or non-public address", str(error.exception)) def test_push_commit(self) -> None: @@ -697,6 +776,7 @@ def test_has_rev_uses_end_of_options(self) -> None: self.assertTrue(self.repo.has_rev("--verify")) mocked.assert_called_once_with( ["rev-parse", "--verify", "--end-of-options", "--verify"], + remote_op="none", needs_lock=False, ) @@ -928,7 +1008,10 @@ def test_configure_remote(self) -> None: self.repo.get_config("remote.origin.pushURL"), "pushurl" ) # Test that we handle not set fetching - self.repo.execute(["config", "--unset", "remote.origin.fetch"]) + self.repo.execute( + ["config", "--unset", "remote.origin.fetch"], + remote_op="none", + ) self.repo.configure_remote("pullurl", "pushurl", "branch") self.assertEqual( self.repo.get_config("remote.origin.fetch"), @@ -1434,7 +1517,8 @@ def test_push_when_remote_fork_is_deleted(self, branch: str = "") -> None: "add", "test", "git@ssh.this.does.not.exist:v3/org/proj/repo", - ] + ], + remote_op="none", ) responses.post( @@ -2114,7 +2198,11 @@ def test_count_outgoing_after_merge(self) -> None: credentials = self.repo.get_credentials() fork_branch_name = self.repo.get_fork_branch_name() fork_ref = f"refs/remotes/{credentials['username']}/{fork_branch_name}" - self.repo.execute(["update-ref", fork_ref, "HEAD"], needs_lock=False) + self.repo.execute( + ["update-ref", fork_ref, "HEAD"], + remote_op="none", + needs_lock=False, + ) # count_outgoing should now be 0 since fork has our commits # (even though origin doesn't yet - MR is pending) @@ -2124,7 +2212,11 @@ def test_count_outgoing_after_merge(self) -> None: # In a real scenario, after a merge request is merged and git fetch is done, # origin/{branch} would contain the local commits origin_ref = f"refs/remotes/origin/{self.repo.branch}" - self.repo.execute(["update-ref", origin_ref, "HEAD"], needs_lock=False) + self.repo.execute( + ["update-ref", origin_ref, "HEAD"], + remote_op="none", + needs_lock=False, + ) # count_outgoing should still return 0 since origin has our commits self.assertEqual(self.repo.count_outgoing(), 0) @@ -2144,7 +2236,11 @@ def test_count_outgoing_non_default_branch(self) -> None: # Update origin ref for the different branch origin_ref = f"refs/remotes/origin/{different_branch}" - self.repo.execute(["update-ref", origin_ref, "HEAD"], needs_lock=False) + self.repo.execute( + ["update-ref", origin_ref, "HEAD"], + remote_op="none", + needs_lock=False, + ) # count_outgoing with different branch should return 0 # (commits are in origin for that branch, fork not checked) @@ -2157,7 +2253,11 @@ def test_needs_push_non_default_branch_ignores_stale_fork(self) -> None: credentials = self.repo.get_credentials() fork_branch_name = self.repo.get_fork_branch_name() fork_ref = f"refs/remotes/{credentials['username']}/{fork_branch_name}" - self.repo.execute(["update-ref", fork_ref, "HEAD"], needs_lock=False) + self.repo.execute( + ["update-ref", fork_ref, "HEAD"], + remote_op="none", + needs_lock=False, + ) different_branch = "develop" if self.repo.branch != "develop" else "feature" self.assertTrue(self.repo.needs_push(different_branch)) @@ -2356,6 +2456,23 @@ def verify_pull_url(self) -> None: self.format_local_path(self.subversion_repo_path), ) + def test_push_runtime_private_repo_rejected_even_with_safe_push_url(self) -> None: + self.repo.component.repo = "https://private.example/repo" + self.repo.component.push = "https://example.com/repo" + with ( + self.repo.lock, + patch.object(self._class, "_popen") as mock_popen, + patch( + "weblate.utils.outbound.socket.getaddrinfo", + return_value=[(0, 0, 0, "", ("127.0.0.1", 443))], + ), + self.assertRaises(RepositoryError) as error, + ): + self.repo.push("") + + self.assert_no_popen_sequence(mock_popen, "svn", "dcommit") + self.assertIn("internal or non-public address", str(error.exception)) + class VCSSubversionBranchTest(VCSSubversionTest): """Cloning subversion branch directly.""" @@ -2424,6 +2541,38 @@ def test_status(self) -> None: status = self.repo.status() self.assertEqual(status, "") + def test_count_outgoing_runtime_private_url_rejected(self) -> None: + self.repo.component.repo = "https://private.example/repo" + with ( + patch.object(self._class, "_popen") as mock_popen, + patch( + "weblate.utils.outbound.socket.getaddrinfo", + return_value=[(0, 0, 0, "", ("127.0.0.1", 443))], + ), + self.assertRaises(RepositoryError) as error, + ): + self.repo.count_outgoing() + + mock_popen.assert_not_called() + self.assertIn("internal or non-public address", str(error.exception)) + + def test_reset_runtime_private_url_rejected(self) -> None: + self.repo.component.repo = "https://private.example/repo" + with ( + self.repo.lock, + patch.object(self._class, "_popen") as mock_popen, + patch( + "weblate.utils.outbound.socket.getaddrinfo", + return_value=[(0, 0, 0, "", ("127.0.0.1", 443))], + ), + self.assertRaises(RepositoryError) as error, + ): + self.repo.reset() + + self.assert_no_popen_sequence(mock_popen, "update", "--clean", "remote(.)") + self.assert_no_popen_sequence(mock_popen, "strip", "roots(outgoing())") + self.assertIn("internal or non-public address", str(error.exception)) + class VCSLocalTest(VCSGitTest): """Local repository testing."""
e4b67a76d95dfix(project-backup): validate repository before restore
3 files changed · +100 −1
docs/changes.rst+1 −0 modified@@ -25,6 +25,7 @@ Weblate 5.17.1 * Client-side popup notifications triggered by JavaScript now use Bootstrap toasts. * Borg backups that finish with warnings are no longer shown as failed in the management UI, and backup logs now show ``C`` entries for files that changed during the backup. * Git exporter no longer rejects shared-history fetches just because the first negotiated ``have`` revisions are newer than Weblate's local history. +* Project backup import now revalidates component repository URLs before restore. .. rubric:: Compatibility
weblate/trans/backups.py+17 −1 modified@@ -52,7 +52,11 @@ ) from weblate.utils.data import data_path from weblate.utils.hash import checksum_to_hash, hash_to_checksum -from weblate.utils.validators import validate_bitmap, validate_filename +from weblate.utils.validators import ( + validate_bitmap, + validate_filename, + validate_repo_url, +) from weblate.utils.version import VERSION from weblate.vcs.models import VCS_REGISTRY @@ -622,6 +626,7 @@ def load_component( with zipfile.open(filename) as handle: data = json.load(handle) validate_schema(data, "weblate-component.schema.json") + self.validate_component_urls(data["component"]) if skip_linked and data["component"]["repo"].startswith("weblate:"): return False if data["component"]["vcs"] not in VCS_REGISTRY: @@ -648,6 +653,17 @@ def load_component( self.restore_component(zipfile, data, actor, changes) return True + @staticmethod + def validate_component_urls(component: dict[str, Any]) -> None: + for field in ("repo", "push"): + value = component.get(field) + if not value: + continue + try: + validate_repo_url(value) + except ValidationError as error: + raise ValidationError({field: error.messages}) from error + @overload def load_components( self,
weblate/trans/tests/test_backups.py+82 −0 modified@@ -52,6 +52,34 @@ class BackupsTest(ViewTestCase): CREATE_GLOSSARIES: bool = True + def write_tampered_component_backup( + self, *, repo: str | None = None, push: str | None = None + ) -> str: + backup = ProjectBackup() + backup.backup_project(self.project) + + with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as temp_handle: + temp_name = temp_handle.name + + with ( + ZipFile(backup.filename, "r") as source_zip, + ZipFile(temp_name, "w") as target_zip, + ): + for item in source_zip.infolist(): + data = source_zip.read(item.filename) + if item.filename.endswith( + f"{self.component.slug}.json" + ) and item.filename.startswith("components/"): + component_data = json.loads(data.decode("utf-8")) + if repo is not None: + component_data["component"]["repo"] = repo + if push is not None: + component_data["component"]["push"] = push + data = json.dumps(component_data).encode("utf-8") + target_zip.writestr(item, data) + + return temp_name + def test_backup_creates_history_entry(self) -> None: backup = ProjectBackup() @@ -355,6 +383,60 @@ def test_restore_synthesizes_source_translation_check_flags(self) -> None: self.assertEqual(restored_source.check_flags, "read-only") + def test_restore_rejects_invalid_repo_url(self) -> None: + temp_name = self.write_tampered_component_backup( + repo="https://private.example/repo.git" + ) + + try: + restore = ProjectBackup(temp_name) + with ( + patch( + "weblate.utils.outbound.socket.getaddrinfo", + return_value=[(0, 0, 0, "", ("127.0.0.1", 443))], + ), + self.assertRaises(ValidationError) as error, + ): + restore.validate() + + self.assertEqual( + error.exception.message_dict, + { + "repo": [ + "This URL is prohibited because it points to an internal or non-public address." + ] + }, + ) + finally: + os.unlink(temp_name) + + def test_restore_rejects_invalid_push_url(self) -> None: + temp_name = self.write_tampered_component_backup( + push="https://private.example/push.git" + ) + + try: + restore = ProjectBackup(temp_name) + with ( + patch( + "weblate.utils.outbound.socket.getaddrinfo", + return_value=[(0, 0, 0, "", ("127.0.0.1", 443))], + ), + self.assertRaises(ValidationError) as error, + ): + restore.validate() + + self.assertEqual( + error.exception.message_dict, + { + "push": [ + "This URL is prohibited because it points to an internal or non-public address." + ] + }, + ) + finally: + os.unlink(temp_name) + def test_create_duplicate(self) -> None: def extract_names(qs) -> list[str]: return list(qs.order_by("name").values_list("name", flat=True))
Vulnerability mechanics
Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
8- github.com/WeblateOrg/weblate/commit/e1eff1f517c1ee315d69581910baaabb724e5ef0nvdPatchWEB
- github.com/WeblateOrg/weblate/commit/e4b67a76d95d5165ecb9937f7485fd79223b7f14nvdPatchWEB
- github.com/WeblateOrg/weblate/pull/19061nvdIssue TrackingPatchWEB
- github.com/WeblateOrg/weblate/pull/19062nvdIssue TrackingPatchWEB
- github.com/WeblateOrg/weblate/security/advisories/GHSA-cwcx-382v-8m9gnvdPatchVendor AdvisoryWEB
- github.com/advisories/GHSA-cwcx-382v-8m9gghsaADVISORY
- nvd.nist.gov/vuln/detail/CVE-2026-41654ghsaADVISORY
- github.com/WeblateOrg/weblate/releases/tag/weblate-5.17.1nvdRelease NotesWEB
News mentions
0No linked articles in our index yet.