CVE-2026-30625
Description
Upsonic 0.71.6 contains a remote code execution vulnerability in its MCP server/task creation functionality. The application allows users to define MCP tasks with arbitrary command and args values. Although an allowlist exists, certain allowed commands (npm, npx) accept argument flags that enable execution of arbitrary OS commands. Maliciously crafted MCP tasks may lead to remote code execution with the privileges of the Upsonic process. In version 0.72.0 Upsonic added a warning about using Stdio servers being able to execute commands directly on the machine.
Affected packages
Versions sourced from the GitHub Security Advisory.
| Package | Affected versions | Patched versions |
|---|---|---|
upsonicPyPI | < 0.72.0 | 0.72.0 |
Patches
1855053fce066feat: workspace feature, autonomous agent
25 files changed · +4831 −430
pytest.ini+1 −0 modified@@ -1,4 +1,5 @@ [pytest] +addopts = -s --capture=no asyncio_mode = strict asyncio_default_fixture_loop_scope = function testpaths = tests
src/upsonic/agent/agent.py+207 −145 modified@@ -15,7 +15,6 @@ from upsonic.tools.processor import ExternalExecutionPause from upsonic.run.cancel import register_run, cleanup_run, raise_if_cancelled, cancel_run as cancel_run_func, is_cancelled from upsonic.session.base import SessionType -# Constants for structured output from upsonic.output import DEFAULT_OUTPUT_TOOL_NAME if TYPE_CHECKING: @@ -126,8 +125,8 @@ def __init__( company_name: Optional[str] = None, system_prompt: Optional[str] = None, reflection: bool = False, - compression_strategy: Literal["none", "simple", "llmlingua"] = "none", - compression_settings: Optional[Dict[str, Any]] = None, + context_management: bool = False, + context_management_keep_recent: int = 5, reliability_layer: Optional[Any] = None, agent_id_: Optional[str] = None, canvas: Optional["Canvas"] = None, @@ -168,6 +167,8 @@ def __init__( culture: Optional["Culture"] = None, # Agent metadata (passed to prompt) metadata: Optional[Dict[str, Any]] = None, + # Workspace settings + workspace: Optional[str] = None, ): """ Initialize the Agent with comprehensive configuration options. @@ -185,10 +186,12 @@ def __init__( company_description: Company description for context system_prompt: Custom system prompt reflection: Reflection capabilities (default is False) - compression_strategy: The method for context compression ('none', 'simple', 'llmlingua'). - compression_settings: A dictionary of settings for the chosen strategy. - - For "simple": {"max_length": 2000} - - For "llmlingua": {"ratio": 0.5, "model_name": "...", "instruction": "..."} + context_management: Enable automatic context window management (default True). + When enabled, the middleware automatically prunes tool call history and + summarizes old messages when the context approaches the model's limit. + context_management_keep_recent: Number of recent tool-call events / + messages to preserve when the context management middleware prunes or + summarizes the history (default 5). reliability_layer: Reliability layer for robustness agent_id_: Specific agent ID canvas: Canvas instance for visual interactions @@ -229,6 +232,9 @@ def __init__( culture: Culture instance defining agent behavior and communication guidelines. Includes description, add_system_prompt, repeat, and repeat_interval settings. + workspace: Path to workspace folder containing Agents.md file with agent configuration. + When set, the Agents.md content is included in system prompt and a greeting + message is generated before the first task/chat, integrated into message history. """ from upsonic.models import infer_model self.model = infer_model(model) @@ -288,25 +294,16 @@ def __init__( self.use_llm_for_selection = use_llm_for_selection self._model_recommendation: Optional[Any] = None # Store last recommendation - self.compression_strategy = compression_strategy - self.compression_settings = compression_settings or {} - self._prompt_compressor = None - - if self.compression_strategy == "llmlingua": - try: - from llmlingua import PromptCompressor - except ImportError: - from upsonic.utils.printing import import_error - import_error( - package_name="llmlingua", - install_command="pip install llmlingua", - feature_name="llmlingua compression strategy" - ) + self.context_management: bool = context_management + self.context_management_keep_recent: int = context_management_keep_recent + self._context_management_middleware: Optional[Any] = None - model_name = self.compression_settings.get( - "model_name", "microsoft/llmlingua-2-xlm-roberta-large-meetingbank" + if self.context_management: + from upsonic.agent.context_managers import ContextManagementMiddleware + self._context_management_middleware = ContextManagementMiddleware( + model=self.model, + keep_recent_count=self.context_management_keep_recent, ) - self._prompt_compressor = PromptCompressor(model_name=model_name, use_llmlingua2=True) self.reliability_layer = reliability_layer @@ -429,6 +426,122 @@ def __init__( self._setup_policy_models() self.session_type = SessionType.AGENT + + # Workspace settings + self.workspace: Optional[str] = workspace + self._workspace_greeting_executed: bool = False + self._workspace_agents_md_content: Optional[str] = None + + # Pre-load workspace Agents.md content if workspace is set + if self.workspace: + self._workspace_agents_md_content = self._read_workspace_agents_md() + + def _read_workspace_agents_md(self) -> Optional[str]: + """Read the Agents.md file from the workspace folder. + + Returns: + Content of the Agents.md file, or None if not found. + """ + import os + + if not self.workspace: + return None + + agents_md_path = os.path.join(self.workspace, "Agents.md") + + try: + with open(agents_md_path, "r", encoding="utf-8") as f: + content = f.read() + return content + except FileNotFoundError: + if self.debug: + from upsonic.utils.printing import warning_log + warning_log( + f"Agents.md not found at {agents_md_path}", + "Workspace" + ) + return None + except Exception as e: + if self.debug: + from upsonic.utils.printing import error_log + error_log( + f"Error reading Agents.md from {agents_md_path}: {str(e)}", + "Workspace" + ) + return None + + async def execute_workspace_greeting_async( + self, + return_output: bool = False, + ) -> Any: + """Execute the workspace greeting as a proper agent run. + + This method is called before the first task/chat when workspace is set. + It makes an LLM request with a greeting prompt and returns the result + just like do_async. + + Args: + return_output: If True, return full AgentRunOutput. If False, return content only. + + Returns: + Same as do_async - Task content or AgentRunOutput based on return_output. + """ + if not self.workspace or self._workspace_greeting_executed: + return None + + from upsonic.tasks.tasks import Task + + # Build the greeting prompt + greeting_prompt = ( + "A new session was started. Say hi briefly (1-2 sentences) and ask what the user wants to do next. " + "If the runtime model differs from default_model in the system prompt, mention the default model in the greeting. " + "Do not mention internal steps, files, tools, or reasoning." + ) + + # Create a greeting task + greeting_task = Task(description=greeting_prompt) + + # Mark greeting as executed BEFORE calling do_async to prevent recursion + self._workspace_greeting_executed = True + + # Execute the greeting using do_async + result = await self.do_async( + task=greeting_task, + return_output=return_output, + _print_method_default=False + ) + print("Greeting result: ", result) + + return result + + def execute_workspace_greeting( + self, + return_output: bool = False, + ) -> Any: + """Synchronous version of execute_workspace_greeting_async. + + Args: + return_output: If True, return full AgentRunOutput. If False, return content only. + + Returns: + Same as do - Task content or AgentRunOutput based on return_output. + """ + import asyncio + + if not self.workspace or self._workspace_greeting_executed: + return None + + try: + loop = asyncio.get_running_loop() + import concurrent.futures + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit( + asyncio.run, + self.execute_workspace_greeting_async(return_output) + ) + return future.result() + except RuntimeError: + return asyncio.run(self.execute_workspace_greeting_async(return_output)) def _setup_policy_models(self) -> None: """Setup model references for safety policies.""" @@ -1236,26 +1349,27 @@ async def _build_model_request( task: "Task", memory_handler: Optional["MemoryManager"], state: Optional["State"] = None, - ) -> List["ModelRequest"]: - """Build the complete message history for the model request.""" + ) -> tuple[List["ModelRequest"], Optional["ModelResponse"]]: + """Build the complete message history for the model request. + + Returns: + A tuple of (messages, context_full_response). + context_full_response is None when the context fits within the + model's window, or a ModelResponse with a fixed message when + the context is full after all reduction strategies. + """ from upsonic.agent.context_managers import SystemPromptManager, ContextManager from upsonic.messages import SystemPromptPart, UserPromptPart, ModelRequest - messages = [] + messages: List["ModelRequest"] = [] message_history = memory_handler.get_message_history() messages.extend(message_history) system_prompt_manager = SystemPromptManager(self, task) context_manager = ContextManager(self, task, state) async with system_prompt_manager.manage_system_prompt(memory_handler) as sp_handler, \ - context_manager.manage_context(memory_handler) as ctx_handler: - - if self.compression_strategy != "none" and ctx_handler: - context_prompt = ctx_handler.get_context_prompt() - if context_prompt: - compressed_context = self._compress_context(context_prompt) - task.context_formatted = compressed_context + context_manager.manage_context(memory_handler) as _ctx_handler: if hasattr(self, '_agent_run_output') and self._agent_run_output and self._agent_run_output.input: run_input = self._agent_run_output.input @@ -1271,7 +1385,6 @@ async def _build_model_request( parts = [] - # Use SystemPromptManager to determine if system prompt should be included if sp_handler.should_include_system_prompt(messages): system_prompt = sp_handler.get_system_prompt() if system_prompt: @@ -1282,7 +1395,18 @@ async def _build_model_request( current_request = ModelRequest(parts=parts) messages.append(current_request) - return messages + + # Apply context management middleware + context_full_response: Optional["ModelResponse"] = None + if self.context_management and self._context_management_middleware: + managed_msgs, ctx_full = await self._context_management_middleware.apply(messages) + messages = managed_msgs + if ctx_full: + context_full_response = self._context_management_middleware._build_context_full_response( + model_name=self.model.model_name + ) + + return messages, context_full_response async def _build_model_request_with_input( self, @@ -1291,18 +1415,25 @@ async def _build_model_request_with_input( current_input: Any, temporary_message_history: List["ModelRequest"], state: Optional["State"] = None, - ) -> List["ModelRequest"]: - """Build model request with custom input and message history for guardrail retries.""" + ) -> tuple[List["ModelRequest"], Optional["ModelResponse"]]: + """Build model request with custom input and message history for guardrail retries. + + Returns: + A tuple of (messages, context_full_response). + context_full_response is None when the context fits within the + model's window, or a ModelResponse with a fixed message when + the context is full after all reduction strategies. + """ from upsonic.agent.context_managers import SystemPromptManager, ContextManager from upsonic.messages import SystemPromptPart, UserPromptPart, ModelRequest - messages = list(temporary_message_history) + messages: List["ModelRequest"] = list(temporary_message_history) system_prompt_manager = SystemPromptManager(self, task) context_manager = ContextManager(self, task, state) async with system_prompt_manager.manage_system_prompt(memory_handler) as sp_handler, \ - context_manager.manage_context(memory_handler) as ctx_handler: + context_manager.manage_context(memory_handler) as _ctx_handler: user_part = UserPromptPart(content=current_input) @@ -1318,14 +1449,18 @@ async def _build_model_request_with_input( current_request = ModelRequest(parts=parts) messages.append(current_request) - - if self.compression_strategy != "none" and ctx_handler: - context_prompt = ctx_handler.get_context_prompt() - if context_prompt: - compressed_context = self._compress_context(context_prompt) - task.context_formatted = compressed_context - - return messages + + # Apply context management middleware + context_full_response: Optional["ModelResponse"] = None + if self.context_management and self._context_management_middleware: + managed_msgs, ctx_full = await self._context_management_middleware.apply(messages) + messages = managed_msgs + if ctx_full: + context_full_response = self._context_management_middleware._build_context_full_response( + model_name=self.model.model_name + ) + + return messages, context_full_response def _build_model_request_parameters(self, task: "Task") -> "ModelRequestParameters": """Build model request parameters including tools and structured output.""" @@ -1751,6 +1886,16 @@ async def _handle_model_response( limit_message = ModelRequest(parts=[limit_notification]) messages.append(limit_message) + # Apply context management middleware before model request + if self.context_management and self._context_management_middleware: + managed_msgs, ctx_full = await self._context_management_middleware.apply(messages) + messages.clear() + messages.extend(managed_msgs) + if ctx_full: + return self._context_management_middleware._build_context_full_response( + model_name=self.model.model_name + ) + model_params = self._build_model_request_parameters(getattr(self, 'current_task', None)) model_params = self.model.customize_request_parameters(model_params) @@ -1794,6 +1939,16 @@ async def _handle_model_response( ) return stop_response + # Apply context management middleware before follow-up model request + if self.context_management and self._context_management_middleware: + managed_msgs, ctx_full = await self._context_management_middleware.apply(messages) + messages.clear() + messages.extend(managed_msgs) + if ctx_full: + return self._context_management_middleware._build_context_full_response( + model_name=self.model.model_name + ) + model_params = self._build_model_request_parameters(getattr(self, 'current_task', None)) model_params = self.model.customize_request_parameters(model_params) @@ -2039,7 +2194,10 @@ async def _execute_with_guardrail(self, task: "Task", memory_handler: Optional[" max_retries = 1 while not validation_passed and retry_counter < max_retries: - messages = await self._build_model_request_with_input(task, memory_handler, current_input, temporary_message_history, state) + messages, context_full_response = await self._build_model_request_with_input(task, memory_handler, current_input, temporary_message_history, state) + + if context_full_response is not None: + return context_full_response model_params = self._build_model_request_parameters(task) model_params = self.model.customize_request_parameters(model_params) @@ -2152,102 +2310,6 @@ async def _execute_with_guardrail(self, task: "Task", memory_handler: Optional[" return final_model_response - def _compress_context(self, context: str) -> str: - """Compress context based on the selected strategy.""" - if self.compression_strategy == "simple": - return self._compress_simple(context) - elif self.compression_strategy == "llmlingua": - return self._compress_llmlingua(context) - return context - - def _compress_simple(self, context: str) -> str: - """Compress context using simple whitespace removal and truncation.""" - if not context: - return "" - - original_length = len(context) - compressed = " ".join(context.split()) - - max_length = self.compression_settings.get("max_length", 2000) - - if len(compressed) > max_length: - part_size = max_length // 2 - 20 - compressed = compressed[:part_size] + " ... [COMPRESSED] ... " + compressed[-part_size:] - - if self.debug and self.debug_level >= 2: - from upsonic.utils.printing import debug_log_level2 - compression_ratio = len(compressed) / original_length if original_length > 0 else 1.0 - debug_log_level2( - "Context compression (simple)", - "Agent", - debug=self.debug, - debug_level=self.debug_level, - compression_strategy="simple", - original_length=original_length, - compressed_length=len(compressed), - compression_ratio=compression_ratio, - max_length=max_length, - was_truncated=len(compressed) > max_length - ) - - return compressed - - - def _compress_llmlingua(self, context: str) -> str: - """Compress context using the LLMLingua library.""" - if not context or not self._prompt_compressor: - return context - - original_length = len(context) - ratio = self.compression_settings.get("ratio", 0.5) - instruction = self.compression_settings.get("instruction", "") - - try: - result = self._prompt_compressor.compress_prompt( - context.split('\n'), - instruction=instruction, - rate=ratio - ) - compressed = result['compressed_prompt'] - - if self.debug and self.debug_level >= 2: - from upsonic.utils.printing import debug_log_level2 - compression_ratio = len(compressed) / original_length if original_length > 0 else 1.0 - debug_log_level2( - "Context compression (llmlingua)", - "Agent", - debug=self.debug, - debug_level=self.debug_level, - compression_strategy="llmlingua", - original_length=original_length, - compressed_length=len(compressed), - compression_ratio=compression_ratio, - target_ratio=ratio, - instruction=instruction[:200] if instruction else None, - compression_stats=result.get('stats', {}) - ) - - return compressed - except Exception as e: - if self.debug: - from upsonic.utils.printing import compression_fallback, debug_log_level2 - compression_fallback("llmlingua", "simple", str(e)) - - # Level 2: Compression fallback details - if self.debug_level >= 2: - debug_log_level2( - "Context compression fallback", - "Agent", - debug=self.debug, - debug_level=self.debug_level, - original_strategy="llmlingua", - fallback_strategy="simple", - error_type=type(e).__name__, - error_message=str(e), - original_length=original_length - ) - return self._compress_simple(context) - async def recommend_model_for_task_async( self, task: Union["Task", str],
src/upsonic/agent/autonomous_agent/autonomous_agent.py+471 −0 added@@ -0,0 +1,471 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any, Dict, List, Literal, Optional, Type, Union, TYPE_CHECKING + +from upsonic.agent.agent import Agent + +if TYPE_CHECKING: + from pydantic import BaseModel + from upsonic.storage.base import Storage + from upsonic.storage.memory.memory import Memory + from upsonic.canvas.canvas import Canvas + from upsonic.models.settings import ModelSettings + from upsonic.profiles import ModelProfile + from upsonic.reflection import ReflectionConfig + from upsonic.safety_engine.base import Policy + from upsonic.models import Model + from upsonic.culture.culture import Culture + from upsonic.db.database import DatabaseBase + + +RetryMode = Literal["raise", "return_false"] + + +class AutonomousAgent(Agent): + """ + A pre-configured AI Agent with filesystem and shell capabilities. + + AutonomousAgent inherits from Agent and provides: + - **Default Storage**: Uses InMemoryStorage automatically if no storage is provided + - **Default Memory**: Creates Memory instance with session persistence + - **Filesystem Tools**: Read, write, edit, search, list, move, copy, delete files + - **Shell Tools**: Execute terminal commands with timeout and output capture + - **Workspace Sandboxing**: All file/shell operations are restricted to workspace + + This is the ideal choice for: + - Coding assistants that need to read/write files + - DevOps automation agents + - System administration tasks + - Any task requiring filesystem or shell access + + Usage: + ```python + from upsonic import AutonomousAgent + + # Simple usage with defaults + agent = AutonomousAgent( + model="openai/gpt-4o", + workspace="/path/to/project" + ) + + result = agent.do("Read the main.py file and add error handling") + + # Advanced usage with custom configuration + agent = AutonomousAgent( + model="anthropic/claude-sonnet-4-20250514", + workspace="/path/to/project", + name="CodeAssistant", + enable_filesystem=True, + enable_shell=True, + shell_timeout=60, + full_session_memory=True, # Enable chat history + ) + ``` + + Attributes: + autonomous_workspace: Path to the workspace directory (all operations sandboxed here) + filesystem_toolkit: Filesystem operations toolkit (if enabled) + shell_toolkit: Shell command execution toolkit (if enabled) + """ + + def __init__( + self, + model: Union[str, "Model"] = "openai/gpt-4o", + *, + workspace: Optional[str] = None, + name: Optional[str] = None, + # Storage/Memory configuration + storage: Optional["Storage"] = None, + memory: Optional["Memory"] = None, + db: Optional["DatabaseBase"] = None, + session_id: Optional[str] = None, + user_id: Optional[str] = None, + # Memory features + full_session_memory: bool = True, + summary_memory: bool = False, + user_analysis_memory: bool = False, + user_profile_schema: Optional[Type["BaseModel"]] = None, + dynamic_user_profile: bool = False, + num_last_messages: Optional[int] = None, + feed_tool_call_results: Optional[bool] = None, + # Toolkit configuration + enable_filesystem: bool = True, + enable_shell: bool = True, + shell_timeout: int = 120, + shell_max_output: int = 10000, + blocked_commands: Optional[List[str]] = None, + # Standard Agent parameters + debug: bool = False, + debug_level: int = 1, + print: Optional[bool] = None, + company_url: Optional[str] = None, + company_objective: Optional[str] = None, + company_description: Optional[str] = None, + company_name: Optional[str] = None, + system_prompt: Optional[str] = None, + reflection: bool = False, + context_management: bool = False, + context_management_keep_recent: int = 5, + reliability_layer: Optional[Any] = None, + agent_id_: Optional[str] = None, + canvas: Optional["Canvas"] = None, + retry: int = 1, + mode: RetryMode = "raise", + role: Optional[str] = None, + goal: Optional[str] = None, + instructions: Optional[str] = None, + education: Optional[str] = None, + work_experience: Optional[str] = None, + show_tool_calls: bool = True, + tool_call_limit: int = 100, + enable_thinking_tool: bool = False, + enable_reasoning_tool: bool = False, + tools: Optional[List[Any]] = None, + user_policy: Optional[Union["Policy", List["Policy"]]] = None, + agent_policy: Optional[Union["Policy", List["Policy"]]] = None, + tool_policy_pre: Optional[Union["Policy", List["Policy"]]] = None, + tool_policy_post: Optional[Union["Policy", List["Policy"]]] = None, + user_policy_feedback: bool = False, + agent_policy_feedback: bool = False, + user_policy_feedback_loop: int = 1, + agent_policy_feedback_loop: int = 1, + settings: Optional["ModelSettings"] = None, + profile: Optional["ModelProfile"] = None, + reflection_config: Optional["ReflectionConfig"] = None, + model_selection_criteria: Optional[Dict[str, Any]] = None, + use_llm_for_selection: bool = False, + reasoning_effort: Optional[Literal["low", "medium", "high"]] = None, + reasoning_summary: Optional[Literal["concise", "detailed"]] = None, + thinking_enabled: Optional[bool] = None, + thinking_budget: Optional[int] = None, + thinking_include_thoughts: Optional[bool] = None, + reasoning_format: Optional[Literal["hidden", "raw", "parsed"]] = None, + culture: Optional["Culture"] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> None: + """ + Initialize AutonomousAgent with default storage, memory, and tools. + + Args: + model: Model identifier or Model instance (default: "openai/gpt-4o") + workspace: Workspace directory path. Defaults to current working directory. + All file and shell operations are restricted to this directory. + name: Agent name for identification + + # Storage/Memory configuration + storage: Custom storage backend. If None, InMemoryStorage is created. + memory: Custom Memory instance. If None, Memory is created with storage. + db: Database instance (overrides memory if provided) + session_id: Session identifier. Auto-generated if None. + user_id: User identifier. Auto-generated if None. + + # Memory features + full_session_memory: Enable chat history persistence (default: True) + summary_memory: Enable session summary generation (default: False) + user_analysis_memory: Enable user profile extraction (default: False) + user_profile_schema: Pydantic model for user profile structure + dynamic_user_profile: Generate profile schema dynamically + num_last_messages: Limit on message turns to keep in history + feed_tool_call_results: Include tool call results in history + + # Toolkit configuration + enable_filesystem: Enable filesystem tools (default: True) + enable_shell: Enable shell command tools (default: True) + shell_timeout: Default timeout for shell commands in seconds (default: 120) + shell_max_output: Maximum output length before truncation (default: 10000) + blocked_commands: List of command patterns to block for security + + # Standard Agent parameters - see Agent class for full documentation + """ + from upsonic.storage import InMemoryStorage, Memory + from .filesystem_toolkit import AutonomousFilesystemToolKit + from .shell_toolkit import AutonomousShellToolKit + + if workspace is not None: + self.autonomous_workspace: Path = Path(workspace).resolve() + else: + self.autonomous_workspace = Path.cwd().resolve() + + if not self.autonomous_workspace.exists(): + self.autonomous_workspace.mkdir(parents=True, exist_ok=True) + + effective_storage: Optional[Storage] + if db is not None: + effective_storage = None + elif memory is not None: + effective_storage = memory.storage + elif storage is not None: + effective_storage = storage + else: + effective_storage = InMemoryStorage() + + self._autonomous_storage: Optional[Storage] = effective_storage + + effective_memory: Optional[Memory] + if db is not None: + effective_memory = None + elif memory is not None: + effective_memory = memory + elif effective_storage is not None: + effective_memory = Memory( + storage=effective_storage, + session_id=session_id, + user_id=user_id, + full_session_memory=full_session_memory, + summary_memory=summary_memory, + user_analysis_memory=user_analysis_memory, + user_profile_schema=user_profile_schema, + dynamic_user_profile=dynamic_user_profile, + num_last_messages=num_last_messages, + model=model if isinstance(model, str) else None, + debug=debug, + debug_level=debug_level, + feed_tool_call_results=feed_tool_call_results if feed_tool_call_results is not None else False, + ) + else: + effective_memory = None + + self._autonomous_memory: Optional[Memory] = effective_memory + + self.filesystem_toolkit: Optional[AutonomousFilesystemToolKit] = None + self.shell_toolkit: Optional[AutonomousShellToolKit] = None + + default_tools: List[Any] = [] + + if enable_filesystem: + self.filesystem_toolkit = AutonomousFilesystemToolKit(workspace=self.autonomous_workspace) + default_tools.append(self.filesystem_toolkit) + + if enable_shell: + self.shell_toolkit = AutonomousShellToolKit( + workspace=self.autonomous_workspace, + default_timeout=shell_timeout, + max_output_length=shell_max_output, + blocked_commands=blocked_commands, + ) + default_tools.append(self.shell_toolkit) + + all_tools = default_tools + (tools or []) + + effective_system_prompt = self._build_autonomous_system_prompt( + user_system_prompt=system_prompt, + enable_filesystem=enable_filesystem, + enable_shell=enable_shell, + ) + + super().__init__( + model=model, + name=name, + memory=effective_memory, + db=db, + session_id=session_id, + user_id=user_id, + debug=debug, + debug_level=debug_level, + print=print, + company_url=company_url, + company_objective=company_objective, + company_description=company_description, + company_name=company_name, + system_prompt=effective_system_prompt, + reflection=reflection, + context_management=context_management, + context_management_keep_recent=context_management_keep_recent, + reliability_layer=reliability_layer, + agent_id_=agent_id_, + canvas=canvas, + retry=retry, + mode=mode, + role=role, + goal=goal, + instructions=instructions, + education=education, + work_experience=work_experience, + feed_tool_call_results=feed_tool_call_results, + show_tool_calls=show_tool_calls, + tool_call_limit=tool_call_limit, + enable_thinking_tool=enable_thinking_tool, + enable_reasoning_tool=enable_reasoning_tool, + tools=all_tools, + user_policy=user_policy, + agent_policy=agent_policy, + tool_policy_pre=tool_policy_pre, + tool_policy_post=tool_policy_post, + user_policy_feedback=user_policy_feedback, + agent_policy_feedback=agent_policy_feedback, + user_policy_feedback_loop=user_policy_feedback_loop, + agent_policy_feedback_loop=agent_policy_feedback_loop, + settings=settings, + profile=profile, + reflection_config=reflection_config, + model_selection_criteria=model_selection_criteria, + use_llm_for_selection=use_llm_for_selection, + reasoning_effort=reasoning_effort, + reasoning_summary=reasoning_summary, + thinking_enabled=thinking_enabled, + thinking_budget=thinking_budget, + thinking_include_thoughts=thinking_include_thoughts, + reasoning_format=reasoning_format, + culture=culture, + metadata=metadata, + workspace=str(self.autonomous_workspace), + ) + + def _build_autonomous_system_prompt( + self, + user_system_prompt: Optional[str], + enable_filesystem: bool, + enable_shell: bool, + ) -> Optional[str]: + """ + Build the system prompt for AutonomousAgent. + + If user provides a custom system_prompt, it's used as-is. + Otherwise, builds a dynamic prompt based on enabled toolkits. + + Args: + user_system_prompt: User-provided system prompt (if any) + enable_filesystem: Whether filesystem tools are enabled + enable_shell: Whether shell tools are enabled + + Returns: + The effective system prompt to use + """ + # If user provides custom prompt, use it directly + if user_system_prompt is not None: + return user_system_prompt + + # If no tools enabled, no special prompt needed + if not enable_filesystem and not enable_shell: + return None + + # Build dynamic prompt based on enabled tools + prompt_parts = [ + "You are an Autonomous Agent with access to tools for completing tasks within your designated workspace." + ] + + # Capabilities section + capabilities = [] + + if enable_filesystem: + capabilities.append("""### Filesystem Tools +You have comprehensive filesystem access within the workspace: + +- **read_file**: Read file contents. Always read a file before editing it. Use offset/limit for large files. +- **write_file**: Create new files or completely overwrite existing files. Parent directories are created automatically. +- **edit_file**: Make precise text replacements in files. You MUST read the file first before editing. +- **list_files**: List directory contents. Use pattern parameter for filtering (e.g., "*.py"). +- **search_files**: Find files by name pattern across the workspace. +- **grep_files**: Search for text/regex patterns within file contents. +- **file_info**: Get file metadata (size, permissions, modification time). +- **create_directory**: Create directories (parents created automatically). +- **move_file**: Move or rename files and directories. +- **copy_file**: Copy files or directories. +- **delete_file**: Delete files or directories.""") + + if enable_shell: + capabilities.append("""### Shell Tools +You can execute commands in the workspace directory: + +- **run_command**: Execute shell commands (ls, git, pip, npm, etc.). Commands run in the workspace directory. +- **run_python**: Execute Python code snippets and get the output. +- **check_command_exists**: Verify if a command is available on the system.""") + + if capabilities: + prompt_parts.append("\n## Your Capabilities\n") + prompt_parts.append("\n\n".join(capabilities)) + + # Guidelines section + guidelines = [] + + if enable_filesystem: + guidelines.append("""### When to Use Filesystem Tools +- **Reading code**: Use read_file to understand existing code before making changes. +- **Creating files**: Use write_file for new files. The file path is relative to the workspace. +- **Editing code**: ALWAYS read_file first, then use edit_file with precise old_string/new_string. +- **Finding files**: Use search_files to locate files by name, grep_files to search content. +- **Project exploration**: Use list_files to understand project structure.""") + + if enable_shell: + guidelines.append("""### When to Use Shell Tools +- **Running tests**: Use run_command to execute test suites (pytest, npm test, etc.). +- **Installing dependencies**: Use run_command for pip install, npm install, etc. +- **Git operations**: Use run_command for git status, git diff, git commit, etc. +- **Building/compiling**: Use run_command for build commands. +- **Quick Python snippets**: Use run_python for calculations or quick scripts.""") + + if guidelines: + prompt_parts.append("\n## Guidelines\n") + prompt_parts.append("\n\n".join(guidelines)) + + # Best practices + best_practices = ["### Best Practices"] + practice_num = 1 + + if enable_filesystem: + best_practices.append(f"{practice_num}. **Read before edit**: Always read a file before attempting to edit it. This ensures you understand the current state.") + practice_num += 1 + best_practices.append(f"{practice_num}. **Use precise edits**: When editing, provide enough context in old_string to uniquely identify the location.") + practice_num += 1 + + if enable_shell: + best_practices.append(f"{practice_num}. **Check command availability**: Use check_command_exists before running commands that might not be installed.") + practice_num += 1 + + best_practices.append(f"{practice_num}. **Handle errors gracefully**: If a tool fails, analyze the error and try an alternative approach.") + practice_num += 1 + best_practices.append(f"{practice_num}. **Work incrementally**: For complex tasks, break them into steps and verify each step works.") + practice_num += 1 + + if enable_filesystem: + best_practices.append(f"{practice_num}. **Stay in workspace**: All paths are relative to the workspace. You cannot access files outside it.") + + prompt_parts.append("\n".join(best_practices)) + + # Security section + security_notes = ["\n## Security Restrictions"] + if enable_filesystem: + security_notes.append("- All file operations are sandboxed to the workspace directory.") + security_notes.append("- Path traversal (../) outside the workspace is blocked.") + if enable_shell: + security_notes.append("- Dangerous shell commands are blocked for security.") + security_notes.append("- Shell commands execute in the workspace directory.") + + prompt_parts.append("\n".join(security_notes)) + + # Closing + prompt_parts.append("\nWhen given a task, think about what tools you need and use them methodically to accomplish the goal.") + + return "\n".join(prompt_parts) + + @property + def autonomous_storage(self) -> Optional["Storage"]: + """Get the storage backend created by AutonomousAgent.""" + return self._autonomous_storage + + @property + def autonomous_memory(self) -> Optional["Memory"]: + """Get the memory instance created by AutonomousAgent.""" + return self._autonomous_memory + + def reset_filesystem_tracking(self) -> None: + """ + Reset filesystem read tracking. + + This clears the record of which files have been read, + which affects edit_file's read-before-edit enforcement. + """ + if self.filesystem_toolkit: + self.filesystem_toolkit.reset_read_tracking() + + def __repr__(self) -> str: + """String representation of AutonomousAgent.""" + return ( + f"AutonomousAgent(" + f"model={self.model_name!r}, " + f"workspace={str(self.autonomous_workspace)!r}, " + f"name={self.name!r}, " + f"tools={len(self.tools)}" + f")" + )
src/upsonic/agent/autonomous_agent/filesystem_toolkit.py+952 −0 added@@ -0,0 +1,952 @@ +""" +Autonomous Agent Filesystem Toolkit - Comprehensive filesystem operations. + +Provides production-ready tools for file operations: +- read, write, edit, search, list, move, copy, delete +""" +from __future__ import annotations + +import asyncio +import fnmatch +import re +import shutil +from pathlib import Path +from typing import List, Optional, Set + +from upsonic.tools import ToolKit, tool + + +class AutonomousFilesystemToolKit(ToolKit): + """ + Comprehensive filesystem toolkit for AutonomousAgent. + + Provides all essential filesystem operations: + - read_file: Read file content with pagination support + - write_file: Create/overwrite files with automatic directory creation + - edit_file: Precise string replacement with read-tracking enforcement + - list_files: List directory contents (recursive or non-recursive) + - search_files: Find files by glob pattern + - grep_files: Search text within files with regex support + - move_file: Move or rename files/directories + - copy_file: Copy files/directories + - delete_file: Delete files/directories + - file_info: Get detailed file/directory metadata + - create_directory: Create directory with parents + + Features: + - Workspace sandboxing for security + - Read-before-edit enforcement for safe edits + - Automatic parent directory creation + - Comprehensive error handling + - Line-numbered output for precise editing + + Usage: + ```python + from upsonic import AutonomousAgent + + agent = AutonomousAgent(workspace="/path/to/project") + result = agent.do("Read the main.py file and add logging") + ``` + """ + + def __init__(self, workspace: Path) -> None: + """ + Initialize filesystem toolkit with workspace sandboxing. + + Args: + workspace: Workspace directory path. All operations are restricted to this directory. + """ + super().__init__() + self.workspace: Path = Path(workspace).resolve() + self._read_files: Set[str] = set() + + def _validate_path(self, path: str) -> Path: + """ + Validate and resolve path within workspace for security. + + Args: + path: Path string (relative or absolute) + + Returns: + Resolved absolute Path object + + Raises: + ValueError: If path escapes workspace sandbox + """ + if path.startswith("/"): + resolved = Path(path).resolve() + else: + resolved = (self.workspace / path).resolve() + + try: + resolved.relative_to(self.workspace) + except ValueError: + raise ValueError(f"Path '{path}' is outside workspace '{self.workspace}'") + + return resolved + + def _get_relative_path(self, path: Path) -> str: + """Get path relative to workspace for clean output.""" + try: + return str(path.relative_to(self.workspace)) + except ValueError: + return str(path) + + @tool + def read_file( + self, + file_path: str, + offset: Optional[int] = None, + limit: Optional[int] = None, + ) -> str: + """ + Read content from a file with optional pagination. + + Use this tool to read the contents of a file. The output includes line numbers + for easy reference when editing. For large files, use offset and limit to + read specific sections. + + Args: + file_path: Path to file (relative to workspace or absolute) + offset: Starting line number (0-indexed). If None, starts from beginning. + limit: Maximum number of lines to read. If None, reads all remaining lines. + + Returns: + File content with line numbers in format "LINE_NUM| CONTENT" + + Example: + read_file("src/main.py") # Read entire file + read_file("src/main.py", offset=10, limit=20) # Read lines 11-30 + """ + try: + resolved = self._validate_path(file_path) + + if not resolved.exists(): + return f"Error: File not found: {file_path}" + + if not resolved.is_file(): + return f"Error: Not a file: {file_path}" + + content = resolved.read_text(encoding="utf-8") + lines = content.split("\n") + + self._read_files.add(str(resolved)) + + start = offset if offset is not None else 0 + if start < 0: + start = 0 + if start >= len(lines): + return f"Error: Offset {start} exceeds file length ({len(lines)} lines)" + + if limit is not None and limit > 0: + end = min(start + limit, len(lines)) + else: + end = len(lines) + + selected = lines[start:end] + formatted: List[str] = [] + for i, line in enumerate(selected, start=start + 1): + formatted.append(f"{i:6d}| {line}") + + result = "\n".join(formatted) + + if end < len(lines): + result += f"\n\n[Showing lines {start + 1}-{end} of {len(lines)} total]" + result += f"\n[Use offset={end} to continue reading]" + else: + result += f"\n\n[Showing lines {start + 1}-{end} of {len(lines)} total]" + + return result + + except UnicodeDecodeError: + return f"Error: File appears to be binary and cannot be read as text: {file_path}" + except PermissionError: + return f"Error: Permission denied reading: {file_path}" + except Exception as e: + return f"Error reading file: {str(e)}" + + @tool + async def aread_file( + self, + file_path: str, + offset: Optional[int] = None, + limit: Optional[int] = None, + ) -> str: + """ + Async version of read_file. + + Read content from a file with optional pagination. + + Args: + file_path: Path to file (relative to workspace or absolute) + offset: Starting line number (0-indexed) + limit: Maximum number of lines to read + + Returns: + File content with line numbers + """ + return await asyncio.get_event_loop().run_in_executor( + None, lambda: self.read_file(file_path, offset, limit) + ) + + @tool + def write_file( + self, + file_path: str, + content: str, + create_dirs: bool = True, + ) -> str: + """ + Write content to a file (creates or overwrites). + + Creates the file if it doesn't exist, overwrites if it does. + Parent directories are created automatically if create_dirs is True. + + IMPORTANT for Python files: + - Put ALL imports at the TOP of the file + - Use 4-space indentation + - Include proper type annotations + + For modifying existing files, consider using edit_file() instead + after reading the file with read_file(). + + Args: + file_path: Path to file (relative to workspace or absolute) + content: Complete content to write to the file + create_dirs: If True, create parent directories if they don't exist + + Returns: + Confirmation message with bytes written + + Example: + write_file("config.json", '{"key": "value"}') + """ + try: + resolved = self._validate_path(file_path) + + if create_dirs: + resolved.parent.mkdir(parents=True, exist_ok=True) + + resolved.write_text(content, encoding="utf-8") + self._read_files.add(str(resolved)) + + size_bytes = len(content.encode("utf-8")) + line_count = content.count("\n") + 1 + + return ( + f"✅ Successfully wrote to {self._get_relative_path(resolved)}\n" + f" Size: {size_bytes} bytes\n" + f" Lines: {line_count}" + ) + + except PermissionError: + return f"Error: Permission denied writing to: {file_path}" + except OSError as e: + return f"Error writing file: {str(e)}" + except Exception as e: + return f"Error writing file: {str(e)}" + + @tool + async def awrite_file( + self, + file_path: str, + content: str, + create_dirs: bool = True, + ) -> str: + """ + Async version of write_file. + + Write content to a file (creates or overwrites). + + Args: + file_path: Path to file + content: Complete content to write + create_dirs: Create parent directories if needed + + Returns: + Confirmation message + """ + return await asyncio.get_event_loop().run_in_executor( + None, lambda: self.write_file(file_path, content, create_dirs) + ) + + @tool + def edit_file( + self, + file_path: str, + old_string: str, + new_string: str, + replace_all: bool = False, + ) -> str: + """ + Edit a file by replacing text. + + CRITICAL: You MUST call read_file() FIRST before using this tool! + + This ensures you have the correct, up-to-date file content. + The old_string must exactly match content in the file, including + whitespace and indentation. + + Steps for safe editing: + 1. Call read_file(file_path) to see current content + 2. Copy the exact text you want to replace (including whitespace) + 3. Call edit_file with old_string and new_string + + Args: + file_path: Path to file + old_string: Exact text to find and replace (must match file content exactly) + new_string: Replacement text + replace_all: If True, replace all occurrences. If False, replace only first. + + Returns: + Confirmation message with replacement count + + Example: + edit_file("main.py", "def old_name()", "def new_name()") + """ + resolved = self._validate_path(file_path) + resolved_str = str(resolved) + + if resolved_str not in self._read_files: + return ( + f"❌ Error: You must call read_file('{file_path}') before editing.\n\n" + f"This ensures you have the correct file content and line numbers.\n" + f"Please use: read_file(\"{file_path}\") first, then retry your edit." + ) + + try: + if not resolved.exists(): + return f"Error: File not found: {file_path}" + + content = resolved.read_text(encoding="utf-8") + + if old_string not in content: + old_preview = old_string[:100] + "..." if len(old_string) > 100 else old_string + return ( + f"❌ Error: old_string not found in {file_path}\n\n" + f"Looking for: {repr(old_preview)}\n\n" + f"Please verify:\n" + f"1. You're using the exact string from read_file output\n" + f"2. Include sufficient context to make it unique\n" + f"3. Match indentation exactly (tabs vs spaces)\n" + f"4. Consider re-reading the file as it may have changed" + ) + + occurrence_count = content.count(old_string) + + if not replace_all and occurrence_count > 1: + return ( + f"❌ Error: old_string appears {occurrence_count} times in {file_path}\n\n" + f"Options:\n" + f"1. Use replace_all=True to replace all {occurrence_count} occurrences\n" + f"2. Provide more context in old_string to make it unique\n" + f" (Include surrounding lines or unique identifiers)" + ) + + if replace_all: + new_content = content.replace(old_string, new_string) + replaced_count = occurrence_count + else: + new_content = content.replace(old_string, new_string, 1) + replaced_count = 1 + + resolved.write_text(new_content, encoding="utf-8") + + old_lines = len(content.split("\n")) + new_lines = len(new_content.split("\n")) + line_diff = new_lines - old_lines + + return ( + f"✅ Successfully edited {self._get_relative_path(resolved)}\n" + f" Replaced: {replaced_count} occurrence(s)\n" + f" Lines: {old_lines} → {new_lines} ({line_diff:+d})" + ) + + except PermissionError: + return f"Error: Permission denied editing: {file_path}" + except Exception as e: + return f"Error editing file: {str(e)}" + + @tool + async def aedit_file( + self, + file_path: str, + old_string: str, + new_string: str, + replace_all: bool = False, + ) -> str: + """ + Async version of edit_file. + + Edit a file by replacing text. Requires prior read_file() call. + + Args: + file_path: Path to file + old_string: Text to find + new_string: Replacement text + replace_all: Replace all occurrences + + Returns: + Confirmation message + """ + return await asyncio.get_event_loop().run_in_executor( + None, lambda: self.edit_file(file_path, old_string, new_string, replace_all) + ) + + @tool + def list_files( + self, + directory: str = ".", + recursive: bool = False, + max_depth: Optional[int] = None, + exclude_patterns: Optional[List[str]] = None, + ) -> str: + """ + List files and directories. + + Args: + directory: Directory path (relative to workspace) + recursive: If True, list files recursively + max_depth: Maximum depth for recursive listing (None = unlimited) + exclude_patterns: Glob patterns to exclude (e.g., ["*.pyc", "__pycache__"]) + + Returns: + Formatted list of files and directories + + Example: + list_files("src") # List src directory + list_files(".", recursive=True, max_depth=2) # Recursive with depth limit + """ + try: + resolved = self._validate_path(directory) + + if not resolved.exists(): + return f"Error: Directory not found: {directory}" + + if not resolved.is_dir(): + return f"Error: Not a directory: {directory}" + + default_excludes = ["__pycache__", "node_modules", ".git", "venv", ".venv", "dist", "build", ".tox", "*.egg-info"] + excludes = set(exclude_patterns or []) | set(default_excludes) + + def should_exclude(path: Path) -> bool: + name = path.name + for pattern in excludes: + if fnmatch.fnmatch(name, pattern): + return True + return False + + entries: List[str] = [] + + if recursive: + def walk_dir(current: Path, depth: int = 0) -> None: + if max_depth is not None and depth > max_depth: + return + + try: + items = sorted(current.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower())) + except PermissionError: + return + + for item in items: + if should_exclude(item): + continue + + try: + rel_path = item.relative_to(resolved) + except ValueError: + continue + + if item.is_dir(): + entries.append(f"[DIR] {rel_path}/") + walk_dir(item, depth + 1) + else: + entries.append(f"[FILE] {rel_path}") + + walk_dir(resolved) + else: + try: + items = sorted(resolved.iterdir(), key=lambda x: (not x.is_dir(), x.name.lower())) + except PermissionError: + return f"Error: Permission denied listing: {directory}" + + for entry in items: + if should_exclude(entry): + continue + if entry.is_dir(): + entries.append(f"[DIR] {entry.name}/") + else: + entries.append(f"[FILE] {entry.name}") + + if not entries: + return f"Directory '{directory}' is empty (or all contents are excluded)" + + result = f"Contents of {directory}:\n" + result += "\n".join(entries) + result += f"\n\nTotal: {len(entries)} entries" + + return result + + except PermissionError: + return f"Error: Permission denied: {directory}" + except Exception as e: + return f"Error listing directory: {str(e)}" + + @tool + async def alist_files( + self, + directory: str = ".", + recursive: bool = False, + max_depth: Optional[int] = None, + exclude_patterns: Optional[List[str]] = None, + ) -> str: + """Async version of list_files.""" + return await asyncio.get_event_loop().run_in_executor( + None, lambda: self.list_files(directory, recursive, max_depth, exclude_patterns) + ) + + @tool + def search_files( + self, + pattern: str, + directory: str = ".", + exclude_patterns: Optional[List[str]] = None, + max_results: int = 100, + ) -> str: + """ + Search for files matching a glob pattern. + + Args: + pattern: Glob pattern (e.g., "*.py", "**/*.md", "test_*.py") + directory: Directory to search in + exclude_patterns: Directories/files to exclude + max_results: Maximum number of results to return + + Returns: + List of matching file paths + + Example: + search_files("*.py") # All Python files + search_files("**/*.test.js") # All test files recursively + """ + try: + resolved = self._validate_path(directory) + + if not resolved.exists(): + return f"Error: Directory not found: {directory}" + + default_excludes = ["__pycache__", "node_modules", ".git", "venv", ".venv", "dist", "build"] + excludes = set(exclude_patterns or []) | set(default_excludes) + + matches: List[str] = [] + for path in resolved.rglob(pattern): + if len(matches) >= max_results: + break + + try: + rel_path = path.relative_to(resolved) + if any(excluded in rel_path.parts for excluded in excludes): + continue + matches.append(str(rel_path)) + except ValueError: + continue + + if not matches: + return f"No files matching '{pattern}' found in {directory}" + + result = f"Files matching '{pattern}':\n" + result += "\n".join(f" {m}" for m in sorted(matches)) + + if len(matches) >= max_results: + result += f"\n\n⚠️ Results truncated at {max_results}" + else: + result += f"\n\nTotal: {len(matches)} file(s)" + + return result + + except Exception as e: + return f"Error searching files: {str(e)}" + + @tool + async def asearch_files( + self, + pattern: str, + directory: str = ".", + exclude_patterns: Optional[List[str]] = None, + max_results: int = 100, + ) -> str: + """Async version of search_files.""" + return await asyncio.get_event_loop().run_in_executor( + None, lambda: self.search_files(pattern, directory, exclude_patterns, max_results) + ) + + @tool + def grep_files( + self, + text: str, + directory: str = ".", + file_pattern: str = "*", + case_sensitive: bool = False, + is_regex: bool = False, + max_results: int = 100, + context_lines: int = 0, + ) -> str: + """ + Search for text within files. + + Args: + text: Text or regex pattern to search for + directory: Directory to search in + file_pattern: Glob pattern for files to search (e.g., "*.py") + case_sensitive: If True, search is case-sensitive + is_regex: If True, treat text as regex pattern + max_results: Maximum number of matching lines to return + context_lines: Number of lines of context before/after each match + + Returns: + Matching lines with file paths and line numbers + + Example: + grep_files("TODO", file_pattern="*.py") # Find TODOs in Python files + grep_files("def.*async", is_regex=True) # Find async functions + """ + try: + resolved = self._validate_path(directory) + + if not resolved.exists(): + return f"Error: Directory not found: {directory}" + + default_excludes = ["__pycache__", "node_modules", ".git", "venv", ".venv", "dist", "build"] + + flags = 0 if case_sensitive else re.IGNORECASE + + if is_regex: + try: + pattern = re.compile(text, flags) + except re.error as e: + return f"Error: Invalid regex pattern: {e}" + else: + pattern = re.compile(re.escape(text), flags) + + matches: List[str] = [] + files_searched = 0 + + for file_path in resolved.rglob(file_pattern): + if not file_path.is_file(): + continue + + try: + rel_path = file_path.relative_to(resolved) + if any(excluded in rel_path.parts for excluded in default_excludes): + continue + except ValueError: + continue + + try: + content = file_path.read_text(encoding="utf-8", errors="ignore") + lines = content.split("\n") + files_searched += 1 + + for line_num, line in enumerate(lines, start=1): + if pattern.search(line): + match_entry = f"{rel_path}:{line_num}: {line.strip()[:150]}" + + if context_lines > 0: + context_start = max(0, line_num - context_lines - 1) + context_end = min(len(lines), line_num + context_lines) + for ctx_num in range(context_start, context_end): + if ctx_num + 1 != line_num: + match_entry += f"\n {rel_path}:{ctx_num + 1}: {lines[ctx_num].strip()[:100]}" + + matches.append(match_entry) + + if len(matches) >= max_results: + break + + if len(matches) >= max_results: + break + + except (UnicodeDecodeError, PermissionError): + continue + + if not matches: + return f"No matches for '{text}' in {files_searched} files searched" + + result = f"Matches for '{text}':\n" + result += f"Searched: {files_searched} file(s)\n\n" + result += "\n".join(matches) + + if len(matches) >= max_results: + result += f"\n\n⚠️ Results truncated at {max_results} matches" + + return result + + except Exception as e: + return f"Error searching files: {str(e)}" + + @tool + async def agrep_files( + self, + text: str, + directory: str = ".", + file_pattern: str = "*", + case_sensitive: bool = False, + is_regex: bool = False, + max_results: int = 100, + context_lines: int = 0, + ) -> str: + """Async version of grep_files.""" + return await asyncio.get_event_loop().run_in_executor( + None, lambda: self.grep_files( + text, directory, file_pattern, case_sensitive, is_regex, max_results, context_lines + ) + ) + + @tool + def move_file( + self, + source: str, + destination: str, + ) -> str: + """ + Move or rename a file or directory. + + Args: + source: Source file/directory path + destination: Destination path + + Returns: + Confirmation message + + Example: + move_file("old_name.py", "new_name.py") # Rename + move_file("file.py", "src/file.py") # Move + """ + try: + src = self._validate_path(source) + dst = self._validate_path(destination) + + if not src.exists(): + return f"Error: Source not found: {source}" + + dst.parent.mkdir(parents=True, exist_ok=True) + + shutil.move(str(src), str(dst)) + + if str(src) in self._read_files: + self._read_files.discard(str(src)) + self._read_files.add(str(dst)) + + return f"✅ Moved: {self._get_relative_path(src)} → {self._get_relative_path(dst)}" + + except PermissionError: + return "Error: Permission denied" + except Exception as e: + return f"Error moving: {str(e)}" + + @tool + async def amove_file(self, source: str, destination: str) -> str: + """Async version of move_file.""" + return await asyncio.get_event_loop().run_in_executor( + None, lambda: self.move_file(source, destination) + ) + + @tool + def copy_file( + self, + source: str, + destination: str, + ) -> str: + """ + Copy a file or directory. + + Args: + source: Source file/directory path + destination: Destination path + + Returns: + Confirmation message + """ + try: + src = self._validate_path(source) + dst = self._validate_path(destination) + + if not src.exists(): + return f"Error: Source not found: {source}" + + dst.parent.mkdir(parents=True, exist_ok=True) + + if src.is_dir(): + shutil.copytree(str(src), str(dst)) + return f"✅ Copied directory: {self._get_relative_path(src)} → {self._get_relative_path(dst)}" + else: + shutil.copy2(str(src), str(dst)) + return f"✅ Copied: {self._get_relative_path(src)} → {self._get_relative_path(dst)}" + + except PermissionError: + return "Error: Permission denied" + except Exception as e: + return f"Error copying: {str(e)}" + + @tool + async def acopy_file(self, source: str, destination: str) -> str: + """Async version of copy_file.""" + return await asyncio.get_event_loop().run_in_executor( + None, lambda: self.copy_file(source, destination) + ) + + @tool + def delete_file( + self, + path: str, + recursive: bool = False, + ) -> str: + """ + Delete a file or directory. + + CAUTION: This operation is irreversible! + + Args: + path: File or directory path to delete + recursive: If True, delete directories with contents. Required for non-empty directories. + + Returns: + Confirmation message + """ + try: + resolved = self._validate_path(path) + + if not resolved.exists(): + return f"Error: Path not found: {path}" + + if resolved.is_dir(): + if recursive: + shutil.rmtree(str(resolved)) + return f"✅ Deleted directory and contents: {self._get_relative_path(resolved)}" + else: + try: + resolved.rmdir() + return f"✅ Deleted empty directory: {self._get_relative_path(resolved)}" + except OSError: + return "Error: Directory not empty. Use recursive=True to delete with contents." + else: + resolved.unlink() + self._read_files.discard(str(resolved)) + return f"✅ Deleted: {self._get_relative_path(resolved)}" + + except PermissionError: + return "Error: Permission denied" + except Exception as e: + return f"Error deleting: {str(e)}" + + @tool + async def adelete_file(self, path: str, recursive: bool = False) -> str: + """Async version of delete_file.""" + return await asyncio.get_event_loop().run_in_executor( + None, lambda: self.delete_file(path, recursive) + ) + + @tool + def file_info(self, path: str) -> str: + """ + Get detailed information about a file or directory. + + Args: + path: File or directory path + + Returns: + Detailed metadata including size, permissions, timestamps + """ + try: + resolved = self._validate_path(path) + + if not resolved.exists(): + return f"Error: Path not found: {path}" + + stat = resolved.stat() + + info: List[str] = [f"Path: {self._get_relative_path(resolved)}"] + info.append(f"Type: {'Directory' if resolved.is_dir() else 'File'}") + + if resolved.is_file(): + size = stat.st_size + if size < 1024: + size_str = f"{size} bytes" + elif size < 1024 * 1024: + size_str = f"{size / 1024:.1f} KB" + else: + size_str = f"{size / (1024 * 1024):.1f} MB" + info.append(f"Size: {size_str}") + info.append(f"Lines: {len(resolved.read_text(errors='ignore').split(chr(10)))}") + + import datetime + mtime = datetime.datetime.fromtimestamp(stat.st_mtime) + ctime = datetime.datetime.fromtimestamp(stat.st_ctime) + + info.append(f"Modified: {mtime.isoformat()}") + info.append(f"Created: {ctime.isoformat()}") + info.append(f"Permissions: {oct(stat.st_mode)[-3:]}") + + if resolved.is_dir(): + try: + contents = list(resolved.iterdir()) + dirs = sum(1 for c in contents if c.is_dir()) + files = len(contents) - dirs + info.append(f"Contents: {dirs} directories, {files} files") + except PermissionError: + info.append("Contents: Permission denied") + + return "\n".join(info) + + except Exception as e: + return f"Error getting file info: {str(e)}" + + @tool + async def afile_info(self, path: str) -> str: + """Async version of file_info.""" + return await asyncio.get_event_loop().run_in_executor( + None, lambda: self.file_info(path) + ) + + @tool + def create_directory(self, path: str) -> str: + """ + Create a directory (including parent directories). + + Args: + path: Directory path to create + + Returns: + Confirmation message + """ + try: + resolved = self._validate_path(path) + + if resolved.exists(): + if resolved.is_dir(): + return f"Directory already exists: {self._get_relative_path(resolved)}" + else: + return f"Error: Path exists as a file: {path}" + + resolved.mkdir(parents=True, exist_ok=True) + return f"✅ Created directory: {self._get_relative_path(resolved)}" + + except PermissionError: + return "Error: Permission denied" + except Exception as e: + return f"Error creating directory: {str(e)}" + + @tool + async def acreate_directory(self, path: str) -> str: + """Async version of create_directory.""" + return await asyncio.get_event_loop().run_in_executor( + None, lambda: self.create_directory(path) + ) + + def reset_read_tracking(self) -> None: + """Reset the read file tracking for edit_file enforcement.""" + self._read_files.clear() + + def get_read_files(self) -> Set[str]: + """Get the set of files that have been read.""" + return self._read_files.copy()
src/upsonic/agent/autonomous_agent/__init__.py+49 −0 added@@ -0,0 +1,49 @@ +""" +Autonomous Agent module for the Upsonic AI Agent Framework. + +This module provides a pre-configured agent with: +- Default InMemoryStorage for session persistence +- Built-in filesystem tools for file operations +- Built-in shell command execution tools +- Memory management out of the box +""" +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from .autonomous_agent import AutonomousAgent + from .filesystem_toolkit import AutonomousFilesystemToolKit + from .shell_toolkit import AutonomousShellToolKit + + +def _get_classes() -> dict[str, Any]: + """Lazy import of autonomous agent classes.""" + from .autonomous_agent import AutonomousAgent + from .filesystem_toolkit import AutonomousFilesystemToolKit + from .shell_toolkit import AutonomousShellToolKit + + return { + "AutonomousAgent": AutonomousAgent, + "AutonomousFilesystemToolKit": AutonomousFilesystemToolKit, + "AutonomousShellToolKit": AutonomousShellToolKit, + } + + +def __getattr__(name: str) -> Any: + """Lazy loading of autonomous agent classes.""" + classes = _get_classes() + if name in classes: + return classes[name] + + raise AttributeError( + f"module '{__name__}' has no attribute '{name}'. " + f"Available: {list(classes.keys())}" + ) + + +__all__ = [ + "AutonomousAgent", + "AutonomousFilesystemToolKit", + "AutonomousShellToolKit", +]
src/upsonic/agent/autonomous_agent/shell_toolkit.py+338 −0 added@@ -0,0 +1,338 @@ +""" +Autonomous Agent Shell Toolkit - Command execution operations. + +Provides production-ready tools for shell command execution with: +- Timeout support +- Working directory enforcement +- Environment variable management +- Output capturing and formatting +""" +from __future__ import annotations + +import asyncio +import os +import subprocess +from pathlib import Path +from typing import Dict, List, Optional + +from upsonic.tools import ToolKit, tool + + +class AutonomousShellToolKit(ToolKit): + """ + Shell command execution toolkit for AutonomousAgent. + + Provides safe command execution with: + - Timeout support + - Working directory enforcement + - Environment variable management + - Output capturing and formatting + - Security command blocking + + Features: + - Workspace sandboxing (commands run in workspace directory) + - Configurable timeouts to prevent hanging + - Combined stdout/stderr capture + - Exit code reporting + - Output truncation for large results + + Usage: + ```python + from upsonic import AutonomousAgent + + agent = AutonomousAgent(workspace="/path/to/project") + result = agent.do("Run the test suite") + ``` + """ + + def __init__( + self, + workspace: Path, + default_timeout: int = 120, + max_output_length: int = 10000, + allowed_commands: Optional[List[str]] = None, + blocked_commands: Optional[List[str]] = None, + ) -> None: + """ + Initialize shell toolkit. + + Args: + workspace: Working directory for command execution + default_timeout: Default command timeout in seconds + max_output_length: Maximum output length before truncation + allowed_commands: If set, only these commands are allowed (whitelist) + blocked_commands: Commands that are blocked (blacklist) + """ + super().__init__() + self.workspace: Path = Path(workspace).resolve() + self.default_timeout: int = default_timeout + self.max_output_length: int = max_output_length + self.allowed_commands: Optional[List[str]] = allowed_commands + self.blocked_commands: List[str] = blocked_commands or [ + "rm -rf /", + "rm -rf /*", + ":(){:|:&};:", + "mkfs", + "dd if=/dev/zero", + ] + + def _validate_command(self, command: str) -> Optional[str]: + """ + Validate command against security rules. + + Returns: + Error message if command is blocked, None if allowed + """ + cmd_lower = command.lower().strip() + + for blocked in self.blocked_commands: + if blocked.lower() in cmd_lower: + return f"Command blocked for security: contains '{blocked}'" + + if self.allowed_commands is not None: + cmd_parts = command.split() + if cmd_parts: + base_cmd = cmd_parts[0] + if base_cmd not in self.allowed_commands: + return f"Command not in allowed list: '{base_cmd}'" + + return None + + @tool + def run_command( + self, + command: str, + timeout: Optional[int] = None, + env: Optional[Dict[str, str]] = None, + shell: bool = True, + ) -> str: + """ + Execute a shell command in the workspace directory. + + Commands are executed with the workspace as the working directory. + Output is captured and returned along with the exit code. + + Args: + command: Shell command to execute + timeout: Timeout in seconds (defaults to 120). Set to None for no timeout. + env: Additional environment variables to set + shell: If True, run through shell (allows pipes, etc.) + + Returns: + Command output (stdout + stderr) and exit code + + Example: + run_command("python --version") + run_command("pip install -r requirements.txt") + run_command("ls -la") + """ + validation_error = self._validate_command(command) + if validation_error: + return f"Error: {validation_error}" + + effective_timeout = timeout if timeout is not None else self.default_timeout + + environment = os.environ.copy() + if env: + environment.update(env) + + try: + result = subprocess.run( + command, + shell=shell, + cwd=str(self.workspace), + capture_output=True, + text=True, + timeout=effective_timeout, + env=environment, + ) + + output_parts: List[str] = [] + + if result.stdout: + stdout = result.stdout + if len(stdout) > self.max_output_length: + stdout = stdout[:self.max_output_length] + f"\n... [truncated, {len(result.stdout)} total chars]" + output_parts.append(f"STDOUT:\n{stdout}") + + if result.stderr: + stderr = result.stderr + if len(stderr) > self.max_output_length: + stderr = stderr[:self.max_output_length] + f"\n... [truncated, {len(result.stderr)} total chars]" + output_parts.append(f"STDERR:\n{stderr}") + + if not output_parts: + output_parts.append("(no output)") + + exit_indicator = "✅" if result.returncode == 0 else "❌" + output_parts.append(f"\n{exit_indicator} Exit code: {result.returncode}") + + return "\n".join(output_parts) + + except subprocess.TimeoutExpired: + return f"❌ Error: Command timed out after {effective_timeout} seconds" + except FileNotFoundError as e: + return f"❌ Error: Command not found: {e}" + except PermissionError: + return "❌ Error: Permission denied" + except Exception as e: + return f"❌ Error running command: {str(e)}" + + @tool + async def arun_command( + self, + command: str, + timeout: Optional[int] = None, + env: Optional[Dict[str, str]] = None, + shell: bool = True, + ) -> str: + """ + Async version of run_command. + + Execute a shell command asynchronously. + + Args: + command: Shell command to execute + timeout: Timeout in seconds + env: Additional environment variables + shell: Run through shell + + Returns: + Command output and exit code + """ + validation_error = self._validate_command(command) + if validation_error: + return f"Error: {validation_error}" + + effective_timeout = timeout if timeout is not None else self.default_timeout + + environment = os.environ.copy() + if env: + environment.update(env) + + try: + if shell: + proc = await asyncio.create_subprocess_shell( + command, + cwd=str(self.workspace), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=environment, + ) + else: + proc = await asyncio.create_subprocess_exec( + *command.split(), + cwd=str(self.workspace), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=environment, + ) + + try: + stdout_bytes, stderr_bytes = await asyncio.wait_for( + proc.communicate(), + timeout=effective_timeout + ) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + return f"❌ Error: Command timed out after {effective_timeout} seconds" + + stdout = stdout_bytes.decode("utf-8", errors="replace") if stdout_bytes else "" + stderr = stderr_bytes.decode("utf-8", errors="replace") if stderr_bytes else "" + + output_parts: List[str] = [] + + if stdout: + if len(stdout) > self.max_output_length: + stdout = stdout[:self.max_output_length] + "\n... [truncated]" + output_parts.append(f"STDOUT:\n{stdout}") + + if stderr: + if len(stderr) > self.max_output_length: + stderr = stderr[:self.max_output_length] + "\n... [truncated]" + output_parts.append(f"STDERR:\n{stderr}") + + if not output_parts: + output_parts.append("(no output)") + + exit_code = proc.returncode if proc.returncode is not None else -1 + exit_indicator = "✅" if exit_code == 0 else "❌" + output_parts.append(f"\n{exit_indicator} Exit code: {exit_code}") + + return "\n".join(output_parts) + + except FileNotFoundError as e: + return f"❌ Error: Command not found: {e}" + except PermissionError: + return "❌ Error: Permission denied" + except Exception as e: + return f"❌ Error running command: {str(e)}" + + @tool + def run_python( + self, + code: str, + timeout: Optional[int] = None, + ) -> str: + """ + Execute Python code directly. + + Runs the provided Python code using the Python interpreter. + Useful for quick computations, testing snippets, or running scripts. + + Args: + code: Python code to execute + timeout: Timeout in seconds + + Returns: + Code output and exit status + + Example: + run_python("print(2 + 2)") + run_python("import sys; print(sys.version)") + """ + escaped_code = code.replace("'", "'\"'\"'") + command = f"python3 -c '{escaped_code}'" + return self.run_command(command, timeout=timeout) + + @tool + async def arun_python( + self, + code: str, + timeout: Optional[int] = None, + ) -> str: + """Async version of run_python.""" + escaped_code = code.replace("'", "'\"'\"'") + command = f"python3 -c '{escaped_code}'" + return await self.arun_command(command, timeout=timeout) + + @tool + def check_command_exists(self, command: str) -> str: + """ + Check if a command is available in the system. + + Args: + command: Command name to check + + Returns: + Information about the command availability + """ + result = subprocess.run( + f"which {command}", + shell=True, + capture_output=True, + text=True, + ) + + if result.returncode == 0: + return f"✅ Command '{command}' is available at: {result.stdout.strip()}" + else: + return f"❌ Command '{command}' is not available" + + @tool + async def acheck_command_exists(self, command: str) -> str: + """Async version of check_command_exists.""" + return await asyncio.get_event_loop().run_in_executor( + None, lambda: self.check_command_exists(command) + )
src/upsonic/agent/context_managers/context_management_middleware.py+510 −0 added@@ -0,0 +1,510 @@ +from __future__ import annotations + +import json +from typing import TYPE_CHECKING, Any, List, Literal, Optional, Union + +from pydantic import BaseModel, Field + +if TYPE_CHECKING: + from upsonic.messages.messages import ModelMessage, ModelResponse + from upsonic.models import Model + + +CONTEXT_FULL_MESSAGE: str = ( + "[SYSTEM] The conversation context window has been exceeded. " + "I am unable to process further messages in this session. " + "Please start a new conversation or reduce the context size." +) + +DEFAULT_KEEP_RECENT_COUNT: int = 5 + + +class SummarizedRequestPart(BaseModel): + """A single part inside a summarized ModelRequest.""" + part_kind: Literal["system-prompt", "user-prompt", "tool-return"] = Field( + description="The type of part: 'system-prompt', 'user-prompt', or 'tool-return'." + ) + content: str = Field( + description="The text content for this part." + ) + tool_name: Optional[str] = Field( + default=None, + description="Tool name (required when part_kind is 'tool-return')." + ) + tool_call_id: Optional[str] = Field( + default=None, + description="Tool call ID linking a tool-return to its tool-call (required when part_kind is 'tool-return')." + ) + + +class SummarizedResponsePart(BaseModel): + """A single part inside a summarized ModelResponse.""" + part_kind: Literal["text", "tool-call"] = Field( + description="The type of part: 'text' for assistant text, 'tool-call' for a tool invocation." + ) + content: Optional[str] = Field( + default=None, + description="The text content (required when part_kind is 'text')." + ) + tool_name: Optional[str] = Field( + default=None, + description="Tool name (required when part_kind is 'tool-call')." + ) + tool_call_id: Optional[str] = Field( + default=None, + description="Tool call identifier (required when part_kind is 'tool-call')." + ) + args: Optional[str] = Field( + default=None, + description="Tool call arguments as a JSON string (required when part_kind is 'tool-call')." + ) + + +class SummarizedRequest(BaseModel): + """A summarized ModelRequest (user/system → model).""" + kind: Literal["request"] = "request" + parts: List[SummarizedRequestPart] = Field( + description="Ordered list of parts in this request." + ) + + +class SummarizedResponse(BaseModel): + """A summarized ModelResponse (model → user).""" + kind: Literal["response"] = "response" + parts: List[SummarizedResponsePart] = Field( + description="Ordered list of parts in this response." + ) + + +class ConversationSummary(BaseModel): + """The complete summarized conversation returned by the LLM.""" + messages: List[Union[SummarizedRequest, SummarizedResponse]] = Field( + description=( + "Ordered list of summarized messages. " + "Must alternate between 'request' and 'response' kinds. " + "The first message must be a 'request'." + ) + ) + + + +SUMMARY_SYSTEM_PROMPT: str = """\ +You are a conversation summarizer. You will receive a structured conversation \ +and must return a CONDENSED version of it as valid JSON matching the schema below. + +RULES: +1. Preserve the request/response alternation order. +2. Merge multiple user messages into fewer user messages when possible. +3. Merge multiple assistant text responses into fewer responses when possible. +4. For tool calls and their results: keep only the MOST IMPORTANT ones. \ + Summarize or drop tool calls that are redundant or whose results were not used. +5. Condense long text content into concise summaries while preserving key facts, \ + decisions, and outcomes. +6. Keep system prompts intact — do NOT summarize them. +7. Every tool-return MUST have a matching tool-call with the same tool_call_id \ + in a preceding response. +8. Return ONLY the JSON object. No markdown fences, no extra text. + +JSON SCHEMA: +{schema} +""" + + + +class ContextManagementMiddleware: + """Middleware that manages context window overflow for agent conversations. + + When the message history exceeds the model's maximum context window, + this middleware applies a series of strategies in order: + + 1. Prune old tool call/return pairs, keeping only the last ``keep_recent_count``. + 2. Summarize old messages via the LLM into properly structured + ModelRequest / ModelResponse objects. + 3. If the context is still full after all strategies, inject a fixed + "context full" response and stop further processing. + """ + + def __init__( + self, + model: "Model", + keep_recent_count: int = DEFAULT_KEEP_RECENT_COUNT, + safety_margin_ratio: float = 0.90, + ) -> None: + """ + Args: + model: The Model instance, used for token counting and context window lookup. + keep_recent_count: Number of recent tool-call events / messages + to preserve when pruning or summarizing (default 5). + safety_margin_ratio: Use this fraction of the max context window as + the effective limit (default 0.90 = 90%). + """ + self.model = model + self.keep_recent_count: int = keep_recent_count + self.safety_margin_ratio: float = safety_margin_ratio + + def _get_max_context_window(self) -> Optional[int]: + """Get the max context window for the current model.""" + from upsonic.utils.usage import get_model_context_window + + model_name: str = self.model.model_name + return get_model_context_window(model_name) + + def _estimate_message_tokens(self, messages: List["ModelMessage"]) -> int: + """Estimate the total token count of the conversation. + + The ``messages`` list may span multiple agent runs. Each + ``ModelResponse`` carries a ``usage`` field (``RequestUsage``) + whose ``input_tokens`` reflects the input context sent to the + model for *that particular turn*, and ``output_tokens`` reflects + the tokens the model generated in that turn. + + Because the list can contain responses from different runs, + we accumulate **all** ``input_tokens`` and **all** + ``output_tokens`` across every ``ModelResponse`` to get the + total token footprint of the conversation. + + Falls back to a character-based heuristic (~4 chars/token) when no + ``ModelResponse`` with usage data exists in the message list. + """ + from upsonic.messages import ModelResponse + + total_input_tokens: int = 0 + total_output_tokens: int = 0 + has_usage: bool = False + + for message in messages: + if isinstance(message, ModelResponse) and hasattr(message, 'usage'): + usage = message.usage + if usage.input_tokens > 0 or usage.output_tokens > 0: + has_usage = True + total_input_tokens += usage.input_tokens + total_output_tokens += usage.output_tokens + + if has_usage: + return total_input_tokens + total_output_tokens + + # Fallback: character-based heuristic when no usage data is available + total_chars: int = 0 + for message in messages: + if hasattr(message, 'parts'): + for part in message.parts: + if hasattr(part, 'content'): + content = part.content + if isinstance(content, str): + total_chars += len(content) + elif isinstance(content, (dict, list)): + total_chars += len(json.dumps(content, default=str)) + else: + total_chars += len(str(content)) + if hasattr(part, 'tool_name'): + total_chars += len(str(getattr(part, 'tool_name', ''))) + if hasattr(part, 'args'): + args = getattr(part, 'args', '') + if isinstance(args, str): + total_chars += len(args) + elif isinstance(args, dict): + total_chars += len(json.dumps(args, default=str)) + return total_chars // 4 + + def _is_context_exceeded(self, messages: List["ModelMessage"]) -> bool: + """Check if the current messages exceed the model's context window.""" + max_window: Optional[int] = self._get_max_context_window() + if max_window is None: + return False + + effective_limit: int = int(max_window * self.safety_margin_ratio) + estimated_tokens: int = self._estimate_message_tokens(messages) + return estimated_tokens > effective_limit + + + def _prune_tool_call_history( + self, + messages: List["ModelMessage"], + ) -> List["ModelMessage"]: + """Remove old tool call/return pairs, keeping only the most recent ones. + + Args: + messages: The full message list. + + Returns: + A new list of messages with old tool call history pruned. + """ + from upsonic.messages import ToolCallPart, ToolReturnPart + + tool_related_indices: List[int] = [] + for i, msg in enumerate(messages): + if not hasattr(msg, 'parts'): + continue + for part in msg.parts: + if isinstance(part, (ToolCallPart, ToolReturnPart)): + tool_related_indices.append(i) + break + + if len(tool_related_indices) <= self.keep_recent_count: + return list(messages) + + indices_to_remove: set[int] = set(tool_related_indices[:-self.keep_recent_count]) + return [msg for i, msg in enumerate(messages) if i not in indices_to_remove] + + + def _serialize_messages_for_prompt( + self, + messages: List["ModelMessage"], + ) -> str: + """Serialize a list of ModelMessage objects into a human-readable + structured text representation for the LLM prompt. + """ + from upsonic.messages import ( + ModelRequest, + ModelResponse, + SystemPromptPart, + TextPart, + ToolCallPart, + ToolReturnPart, + UserPromptPart, + ) + + lines: List[str] = [] + for idx, msg in enumerate(messages): + if isinstance(msg, ModelRequest): + lines.append(f"MESSAGE {idx + 1} [REQUEST]:") + for p_idx, part in enumerate(msg.parts): + prefix = f" Part {p_idx + 1}" + if isinstance(part, SystemPromptPart): + lines.append(f"{prefix} [system-prompt]: {part.content}") + elif isinstance(part, UserPromptPart): + content_str = part.content if isinstance(part.content, str) else str(part.content) + lines.append(f"{prefix} [user-prompt]: {content_str}") + elif isinstance(part, ToolReturnPart): + content_str = part.content if isinstance(part.content, str) else json.dumps(part.content, default=str) + lines.append( + f"{prefix} [tool-return] tool_name={part.tool_name} " + f"tool_call_id={part.tool_call_id}: {content_str}" + ) + else: + lines.append(f"{prefix} [unknown-request-part]: {part}") + elif isinstance(msg, ModelResponse): + lines.append(f"MESSAGE {idx + 1} [RESPONSE]:") + for p_idx, part in enumerate(msg.parts): + prefix = f" Part {p_idx + 1}" + if isinstance(part, TextPart): + lines.append(f"{prefix} [text]: {part.content}") + elif isinstance(part, ToolCallPart): + args_str = part.args if isinstance(part.args, str) else json.dumps(part.args, default=str) + lines.append( + f"{prefix} [tool-call] tool_name={part.tool_name} " + f"tool_call_id={part.tool_call_id} args={args_str}" + ) + else: + lines.append(f"{prefix} [other-response-part]: {part}") + else: + lines.append(f"MESSAGE {idx + 1} [UNKNOWN]: {msg}") + + return "\n".join(lines) + + def _reconstruct_messages( + self, + summary: ConversationSummary, + ) -> List["ModelMessage"]: + """Reconstruct proper ModelRequest / ModelResponse objects from + the Pydantic ``ConversationSummary`` returned by the LLM. + """ + from upsonic.messages import ( + ModelRequest, + ModelResponse, + SystemPromptPart, + TextPart, + ToolCallPart, + ToolReturnPart, + UserPromptPart, + ) + from upsonic._utils import now_utc + + reconstructed: List["ModelMessage"] = [] + + for msg in summary.messages: + if msg.kind == "request": + parts: List[Any] = [] + for p in msg.parts: + if p.part_kind == "system-prompt": + parts.append(SystemPromptPart(content=p.content)) + elif p.part_kind == "user-prompt": + parts.append(UserPromptPart(content=p.content)) + elif p.part_kind == "tool-return": + parts.append(ToolReturnPart( + tool_name=p.tool_name or "", + content=p.content, + tool_call_id=p.tool_call_id or "", + )) + if parts: + reconstructed.append(ModelRequest(parts=parts)) + + elif msg.kind == "response": + parts_resp: List[Any] = [] + for p in msg.parts: + if p.part_kind == "text": + parts_resp.append(TextPart(content=p.content or "")) + elif p.part_kind == "tool-call": + parts_resp.append(ToolCallPart( + tool_name=p.tool_name or "", + args=p.args, + tool_call_id=p.tool_call_id or "", + )) + if parts_resp: + reconstructed.append(ModelResponse( + parts=parts_resp, + model_name=self.model.model_name, + timestamp=now_utc(), + )) + + return reconstructed + + async def _summarize_old_messages( + self, + messages: List["ModelMessage"], + ) -> List["ModelMessage"]: + """Summarize old messages via the LLM into structured ModelRequest / + ModelResponse objects, keeping the last ``self.keep_recent_count`` + messages verbatim. + + Args: + messages: The full message list. + + Returns: + A new list with old messages replaced by LLM-summarized messages. + """ + from upsonic.messages import ( + ModelRequest, + SystemPromptPart, + UserPromptPart, + ) + + if len(messages) <= self.keep_recent_count: + return list(messages) + + # Separate system prompt (first message if it contains SystemPromptPart) + system_messages: List["ModelMessage"] = [] + non_system_messages: List["ModelMessage"] = [] + + for i, msg in enumerate(messages): + if i == 0 and isinstance(msg, ModelRequest): + if any(isinstance(p, SystemPromptPart) for p in msg.parts): + system_messages.append(msg) + continue + non_system_messages.append(msg) + + if len(non_system_messages) <= self.keep_recent_count: + return list(messages) + + old_messages: List["ModelMessage"] = non_system_messages[:-self.keep_recent_count] + recent_messages: List["ModelMessage"] = non_system_messages[-self.keep_recent_count:] + + serialized_conversation: str = self._serialize_messages_for_prompt(old_messages) + + if not serialized_conversation.strip(): + return system_messages + recent_messages + + schema_json: str = json.dumps( + ConversationSummary.model_json_schema(), indent=2 + ) + system_instruction: str = SUMMARY_SYSTEM_PROMPT.format(schema=schema_json) + + summary_prompt: str = ( + f"<conversation>\n{serialized_conversation}\n</conversation>" + ) + + from upsonic.models import ModelRequestParameters + + request_msg = ModelRequest(parts=[ + SystemPromptPart(content=system_instruction), + UserPromptPart(content=summary_prompt), + ]) + model_params = ModelRequestParameters( + function_tools=[], + allow_text_output=True, + output_tools=[], + ) + + from upsonic.messages import TextPart + + llm_response: "ModelResponse" = await self.model.request( + messages=[request_msg], + model_settings=self.model.settings, + model_request_parameters=model_params, + ) + + raw_text: str = "" + for part in llm_response.parts: + if isinstance(part, TextPart): + raw_text += part.content + + raw_text = raw_text.strip() + # Strip markdown code fences if the LLM wrapped the JSON + if raw_text.startswith("```"): + first_newline = raw_text.find("\n") + if first_newline != -1: + raw_text = raw_text[first_newline + 1:] + if raw_text.endswith("```"): + raw_text = raw_text[:-3].strip() + + summary: ConversationSummary = ConversationSummary.model_validate_json(raw_text) + summarized_messages: List["ModelMessage"] = self._reconstruct_messages(summary) + + if not summarized_messages: + return system_messages + recent_messages + + return system_messages + summarized_messages + recent_messages + + + + def _build_context_full_response( + self, + model_name: Optional[str] = None, + ) -> "ModelResponse": + """Build a fixed ModelResponse indicating the context window is full.""" + from upsonic._utils import now_utc + from upsonic.messages import ModelResponse, TextPart + + return ModelResponse( + parts=[TextPart(content=CONTEXT_FULL_MESSAGE)], + model_name=model_name, + timestamp=now_utc(), + finish_reason="length", + ) + + async def apply( + self, + messages: List["ModelMessage"], + ) -> tuple[List["ModelMessage"], bool]: + """Apply context management strategies to messages. + + Checks if the context window is exceeded and applies strategies in order: + 1. Prune old tool call history (keep last ``self.keep_recent_count``). + 2. Summarize old messages via LLM (keep last ``self.keep_recent_count`` verbatim). + 3. If still exceeded, return a context_full flag. + + Args: + messages: The current message list (will NOT be mutated). + + Returns: + A tuple of (processed_messages, context_full). + If context_full is True, the caller should stop processing and + return a context-full response. + """ + if not self._is_context_exceeded(messages): + return list(messages), False + + # Step 1: Prune tool call history + pruned: List["ModelMessage"] = self._prune_tool_call_history(messages) + + if not self._is_context_exceeded(pruned): + return pruned, False + + # Step 2: Summarize old messages via LLM + summarized: List["ModelMessage"] = await self._summarize_old_messages(pruned) + + if not self._is_context_exceeded(summarized): + return summarized, False + + # Step 3: Context is still full — signal to caller + return summarized, True
src/upsonic/agent/context_managers/__init__.py+4 −0 modified@@ -9,6 +9,7 @@ from .system_prompt_manager import SystemPromptManager from .context_manager import ContextManager from .memory_manager import MemoryManager + from .context_management_middleware import ContextManagementMiddleware def _get_context_manager_classes(): """Lazy import of context manager classes.""" @@ -19,6 +20,7 @@ def _get_context_manager_classes(): from .system_prompt_manager import SystemPromptManager from .context_manager import ContextManager from .memory_manager import MemoryManager + from .context_management_middleware import ContextManagementMiddleware return { 'CallManager': CallManager, @@ -28,6 +30,7 @@ def _get_context_manager_classes(): 'SystemPromptManager': SystemPromptManager, 'ContextManager': ContextManager, 'MemoryManager': MemoryManager, + 'ContextManagementMiddleware': ContextManagementMiddleware, } def __getattr__(name: str) -> Any: @@ -49,4 +52,5 @@ def __getattr__(name: str) -> Any: 'ReliabilityManager', 'MemoryManager', 'LLMManager', + 'ContextManagementMiddleware', ]
src/upsonic/agent/context_managers/system_prompt_manager.py+4 −0 modified@@ -224,6 +224,10 @@ def _build_system_prompt( if culture_formatted: has_culture = True + # Add workspace Agents.md content if available (high priority, before other prompts) + if hasattr(self.agent, '_workspace_agents_md_content') and self.agent._workspace_agents_md_content: + prompt_parts.append(f"<AgentConfiguration>\n{self.agent._workspace_agents_md_content}\n</AgentConfiguration>") + if self.agent.system_prompt is not None: base_prompt = self.agent.system_prompt
src/upsonic/agent/__init__.py+30 −0 modified@@ -8,6 +8,11 @@ from .agent import Agent from .base import BaseAgent from .deepagent import DeepAgent + from .autonomous_agent import ( + AutonomousAgent, + AutonomousFilesystemToolKit, + AutonomousShellToolKit, + ) from upsonic.run.events.events import ( AgentEvent, PipelineStartEvent, @@ -57,6 +62,21 @@ def _get_agent_classes(): 'BaseAgent': BaseAgent, 'DeepAgent': DeepAgent, } + + +def _get_autonomous_agent_classes(): + """Lazy import of autonomous agent classes.""" + from .autonomous_agent import ( + AutonomousAgent, + AutonomousFilesystemToolKit, + AutonomousShellToolKit, + ) + + return { + 'AutonomousAgent': AutonomousAgent, + 'AutonomousFilesystemToolKit': AutonomousFilesystemToolKit, + 'AutonomousShellToolKit': AutonomousShellToolKit, + } def _get_event_classes(): """Lazy import of event classes.""" from upsonic.run.events.events import ( @@ -143,6 +163,11 @@ def __getattr__(name: str) -> Any: if name in agent_classes: return agent_classes[name] + # Autonomous agent classes + autonomous_classes = _get_autonomous_agent_classes() + if name in autonomous_classes: + return autonomous_classes[name] + # Event classes event_classes = _get_event_classes() if name in event_classes: @@ -159,6 +184,11 @@ def __getattr__(name: str) -> Any: 'BaseAgent', 'DeepAgent', + # Autonomous agent classes + 'AutonomousAgent', + 'AutonomousFilesystemToolKit', + 'AutonomousShellToolKit', + # Base event 'AgentEvent',
src/upsonic/agent/pipeline/steps.py+16 −2 modified@@ -861,13 +861,17 @@ async def execute(self, context: "AgentRunOutput", task: "Task", agent: "Agent", context.start_new_run() # Now build the full messages including the new request - messages = await agent._build_model_request( + messages, context_full_response = await agent._build_model_request( task, memory_manager, None, ) context.chat_history = messages - + + if context_full_response is not None: + context.response = context_full_response + context._context_window_full = True + if agent.debug and agent.debug_level >= 2: from upsonic.utils.printing import debug_log_level2 from upsonic.utils.messages import analyze_model_request_messages @@ -983,6 +987,16 @@ async def execute(self, context: "AgentRunOutput", task: "Task", agent: "Agent", execution_time=time.time() - start_time, ) return step_result + if getattr(context, '_context_window_full', False): + context.chat_history.append(context.response) + step_result = StepResult( + name=self.name, + step_number=step_number, + status=StepStatus.COMPLETED, + message="Skipped due to context window full", + execution_time=time.time() - start_time, + ) + return step_result if context.is_streaming: has_tools = bool(agent.tools or (task and task.tools))
src/upsonic/chat/cost_calculator.py+1 −14 modified@@ -17,7 +17,6 @@ get_estimated_cost_from_usage, get_estimated_cost_from_run_output, get_model_name, - get_model_pricing ) @@ -126,19 +125,7 @@ def extract_cost_from_string(cost_string: str) -> float: return float(cost_string) except (ValueError, TypeError): return 0.0 - - @staticmethod - def get_pricing(model_name: str) -> Dict[str, float]: - """ - Get pricing information for a model. - - Args: - model_name: Name of the model - - Returns: - Dictionary with pricing information - """ - return get_model_pricing(model_name) + @staticmethod def get_name(model: Optional[Union["Model", str]]) -> str:
src/upsonic/__init__.py+6 −0 modified@@ -84,6 +84,9 @@ def _get_Simulation(): def _get_RalphLoop(): return _lazy_import("upsonic.ralph.loop", "RalphLoop")() +def _get_AutonomousAgent(): + return _lazy_import("upsonic.agent.autonomous_agent.autonomous_agent", "AutonomousAgent")() + def hello() -> str: return "Hello from upsonic!" @@ -113,6 +116,8 @@ def __getattr__(name: str) -> Any: return _get_Simulation() elif name == "RalphLoop": return _get_RalphLoop() + elif name == "AutonomousAgent": + return _get_AutonomousAgent() # All other imports must come from sub-modules raise AttributeError( @@ -126,6 +131,7 @@ def __getattr__(name: str) -> Any: "Task", "KnowledgeBase", "Agent", + "AutonomousAgent", "Graph", "Team", "Chat",
src/upsonic/interfaces/slack/slack.py+15 −4 modified@@ -359,10 +359,21 @@ async def _handle_reset_command( # Send confirmation if was_reset: - reply_text = ( - "✅ Your conversation has been reset. " - "I'm ready to start fresh! How can I help you?" - ) + # Check if agent has workspace and execute greeting + if self.agent.workspace: + greeting_result = await self.agent.execute_workspace_greeting_async() + if greeting_result: + reply_text = str(greeting_result) + else: + reply_text = ( + "✅ Your conversation has been reset. " + "I'm ready to start fresh! How can I help you?" + ) + else: + reply_text = ( + "✅ Your conversation has been reset. " + "I'm ready to start fresh! How can I help you?" + ) else: reply_text = ( "No active conversation found to reset. "
src/upsonic/interfaces/telegram/telegram.py+9 −1 modified@@ -440,7 +440,15 @@ async def _handle_reset_command( was_reset = await self.areset_chat_session(str(user_id)) if was_reset: - reply_text = "✅ Your conversation has been reset. I'm ready to start fresh!" + # Check if agent has workspace and execute greeting + if self.agent.workspace: + greeting_result = await self.agent.execute_workspace_greeting_async() + if greeting_result: + reply_text = str(greeting_result) + else: + reply_text = "✅ Your conversation has been reset. I'm ready to start fresh!" + else: + reply_text = "✅ Your conversation has been reset. I'm ready to start fresh!" else: reply_text = "No active conversation found. Send me a message to start!"
src/upsonic/interfaces/whatsapp/whatsapp.py+15 −4 modified@@ -620,10 +620,21 @@ async def _handle_reset_command(self, sender: str) -> None: # Send confirmation if was_reset: - reply_text = ( - "✅ Your conversation has been reset. " - "I'm ready to start fresh! How can I help you?" - ) + # Check if agent has workspace and execute greeting + if self.agent.workspace: + greeting_result = await self.agent.execute_workspace_greeting_async() + if greeting_result: + reply_text = str(greeting_result) + else: + reply_text = ( + "✅ Your conversation has been reset. " + "I'm ready to start fresh! How can I help you?" + ) + else: + reply_text = ( + "✅ Your conversation has been reset. " + "I'm ready to start fresh! How can I help you?" + ) else: reply_text = ( "No active conversation found to reset. "
src/upsonic/ralph/tools/filesystem.py+1 −1 modified@@ -441,7 +441,7 @@ def run_command( if result.stdout: output_parts.append(f"STDOUT:\n{result.stdout}") - + if result.stderr: output_parts.append(f"STDERR:\n{result.stderr}")
src/upsonic/run/agent/output.py+3 −0 modified@@ -155,6 +155,9 @@ class AgentRunOutput: pause_reason: Optional[Literal["external_tool"]] = None # "external_tool" only now error_details: Optional[str] = None + # --- Context management --- + _context_window_full: bool = False + # --- Message tracking (internal) --- # _run_boundaries tracks where each run starts in chat_history # This allows extracting new messages from just this run
src/upsonic/tools/mcp.py+37 −6 modified@@ -32,6 +32,27 @@ HAS_STREAMABLE_HTTP = False +_MCP_SECURITY_WARNING_EMITTED = False + + +def _emit_mcp_security_warning() -> None: + """ + Emit a one-time security warning before MCP tool initialization. + Must be called before any MCP connection/initialization starts. + """ + global _MCP_SECURITY_WARNING_EMITTED + if _MCP_SECURITY_WARNING_EMITTED: + return + _MCP_SECURITY_WARNING_EMITTED = True + + from upsonic.utils.printing import console + + console.print( + "[yellow]⚠️ MCP Security: Only connect to MCP servers you trust. " + "Stdio servers run arbitrary processes on your machine; " + "do not use commands or config from untrusted sources.[/yellow]" + ) + def prepare_command(command: str) -> List[str]: """ @@ -239,7 +260,12 @@ class MCPHandler: - Lazy connection support - Tool name prefixing for avoiding collisions """ - + + def __new__(cls, *args: Any, **kwargs: Any) -> "MCPHandler": + """Emit MCP security warning before any initialization.""" + _emit_mcp_security_warning() + return super().__new__(cls) + def __init__( self, config: Type = None, @@ -422,9 +448,9 @@ async def connect(self) -> None: """Initialize and connect to the MCP server.""" if self._initialized: return - + from upsonic.utils.printing import console - + if self.session is not None: await self._initialize_with_session() return @@ -880,7 +906,12 @@ class MultiMCPHandler: - Proper cleanup delegation to individual handlers - Tool name prefixing for avoiding collisions """ - + + def __new__(cls, *args: Any, **kwargs: Any) -> "MultiMCPHandler": + """Emit MCP security warning before any initialization (before __init__).""" + _emit_mcp_security_warning() + return super().__new__(cls) + def __init__( self, commands: Optional[List[str]] = None, @@ -971,9 +1002,9 @@ async def connect(self) -> None: """Initialize and connect to all MCP servers.""" if self._initialized: return - + from upsonic.utils.printing import console - + console.print(f"[cyan]🔌 Connecting to {len(self.server_params_list)} MCP server(s)...[/cyan]") # Validate tool_name_prefixes length if provided
src/upsonic/utils/__init__.py+4 −8 modified@@ -17,9 +17,8 @@ calculate_cost_from_run_output, calculate_cost_from_agent, get_model_name, - get_model_pricing, + get_model_context_window, format_cost, - MODEL_PRICING, ) def _get_utils_classes(): @@ -39,9 +38,8 @@ def _get_utils_classes(): calculate_cost_from_run_output, calculate_cost_from_agent, get_model_name, - get_model_pricing, + get_model_context_window, format_cost, - MODEL_PRICING, ) return { @@ -59,9 +57,8 @@ def _get_utils_classes(): 'calculate_cost_from_agent': calculate_cost_from_agent, # Helper functions 'get_model_name': get_model_name, - 'get_model_pricing': get_model_pricing, + 'get_model_context_window': get_model_context_window, 'format_cost': format_cost, - 'MODEL_PRICING': MODEL_PRICING, } def __getattr__(name: str) -> Any: @@ -91,7 +88,6 @@ def __getattr__(name: str) -> Any: "calculate_cost_from_agent", # Helper functions "get_model_name", - "get_model_pricing", + "get_model_context_window", "format_cost", - "MODEL_PRICING", ] \ No newline at end of file
src/upsonic/utils/printing.py+0 −5 modified@@ -921,11 +921,6 @@ def display_tool_results_table( spacing() -def _get_model_pricing(model_name: str) -> Optional[Dict[str, float]]: - """Get comprehensive pricing data for a model.""" - from upsonic.utils.usage import get_model_pricing - return get_model_pricing(model_name) - def get_estimated_cost_from_usage(usage: Union[Dict[str, int], Any], model: Union["Model", str]) -> str: """Calculate estimated cost from usage data."""
src/upsonic/utils/usage.py+287 −234 modified@@ -1,178 +1,251 @@ -""" -Usage and cost calculation utilities for the Upsonic framework. - -This module provides functions for calculating and estimating costs based on -token usage and model pricing data. These are pure calculation utilities -without any console/terminal printing dependencies. -""" -from typing import TYPE_CHECKING, Any, Dict, Union +from typing import TYPE_CHECKING, Any, Dict, Optional, Union if TYPE_CHECKING: from upsonic.model_base import Model from upsonic.usage import RequestUsage, RunUsage -# ============================================================================= -# Model Pricing Data -# ============================================================================= - -MODEL_PRICING: Dict[str, Dict[str, float]] = { - # OpenAI GPT-4o family - 'gpt-4o': {'input_cost_per_1m': 2.50, 'output_cost_per_1m': 10.00}, - 'gpt-4o-2024-05-13': {'input_cost_per_1m': 2.50, 'output_cost_per_1m': 10.00}, - 'gpt-4o-2024-08-06': {'input_cost_per_1m': 2.50, 'output_cost_per_1m': 10.00}, - 'gpt-4o-2024-11-20': {'input_cost_per_1m': 2.50, 'output_cost_per_1m': 10.00}, - 'gpt-4o-mini': {'input_cost_per_1m': 0.15, 'output_cost_per_1m': 0.60}, - 'gpt-4o-mini-2024-07-18': {'input_cost_per_1m': 0.15, 'output_cost_per_1m': 0.60}, - - # OpenAI GPT-4 family - 'gpt-4-turbo': {'input_cost_per_1m': 10.00, 'output_cost_per_1m': 30.00}, - 'gpt-4-turbo-2024-04-09': {'input_cost_per_1m': 10.00, 'output_cost_per_1m': 30.00}, - 'gpt-4': {'input_cost_per_1m': 30.00, 'output_cost_per_1m': 60.00}, - 'gpt-4-0613': {'input_cost_per_1m': 30.00, 'output_cost_per_1m': 60.00}, - 'gpt-4-32k': {'input_cost_per_1m': 60.00, 'output_cost_per_1m': 120.00}, - 'gpt-4-32k-0613': {'input_cost_per_1m': 60.00, 'output_cost_per_1m': 120.00}, - - # OpenAI GPT-3.5 family - 'gpt-3.5-turbo': {'input_cost_per_1m': 0.50, 'output_cost_per_1m': 1.50}, - 'gpt-3.5-turbo-1106': {'input_cost_per_1m': 0.50, 'output_cost_per_1m': 1.50}, - 'gpt-3.5-turbo-16k': {'input_cost_per_1m': 3.00, 'output_cost_per_1m': 4.00}, - 'gpt-3.5-turbo-16k-0613': {'input_cost_per_1m': 3.00, 'output_cost_per_1m': 4.00}, - - # OpenAI GPT-5 family (future) - 'gpt-5': {'input_cost_per_1m': 5.00, 'output_cost_per_1m': 15.00}, - 'gpt-5-2025-08-07': {'input_cost_per_1m': 5.00, 'output_cost_per_1m': 15.00}, - 'gpt-5-mini': {'input_cost_per_1m': 0.30, 'output_cost_per_1m': 1.20}, - 'gpt-5-mini-2025-08-07': {'input_cost_per_1m': 0.30, 'output_cost_per_1m': 1.20}, - 'gpt-5-nano': {'input_cost_per_1m': 0.10, 'output_cost_per_1m': 0.40}, - 'gpt-5-nano-2025-08-07': {'input_cost_per_1m': 0.10, 'output_cost_per_1m': 0.40}, - - # OpenAI GPT-4.1 family - 'gpt-4.1': {'input_cost_per_1m': 3.00, 'output_cost_per_1m': 12.00}, - 'gpt-4.1-2025-04-14': {'input_cost_per_1m': 3.00, 'output_cost_per_1m': 12.00}, - 'gpt-4.1-mini': {'input_cost_per_1m': 0.20, 'output_cost_per_1m': 0.80}, - 'gpt-4.1-mini-2025-04-14': {'input_cost_per_1m': 0.20, 'output_cost_per_1m': 0.80}, - 'gpt-4.1-nano': {'input_cost_per_1m': 0.08, 'output_cost_per_1m': 0.32}, - 'gpt-4.1-nano-2025-04-14': {'input_cost_per_1m': 0.08, 'output_cost_per_1m': 0.32}, - - # OpenAI O-series reasoning models - 'o1': {'input_cost_per_1m': 15.00, 'output_cost_per_1m': 60.00}, - 'o1-2024-12-17': {'input_cost_per_1m': 15.00, 'output_cost_per_1m': 60.00}, - 'o1-mini': {'input_cost_per_1m': 3.00, 'output_cost_per_1m': 12.00}, - 'o1-mini-2024-09-12': {'input_cost_per_1m': 3.00, 'output_cost_per_1m': 12.00}, - 'o1-preview': {'input_cost_per_1m': 15.00, 'output_cost_per_1m': 60.00}, - 'o1-preview-2024-09-12': {'input_cost_per_1m': 15.00, 'output_cost_per_1m': 60.00}, - 'o1-pro': {'input_cost_per_1m': 60.00, 'output_cost_per_1m': 180.00}, - 'o1-pro-2025-03-19': {'input_cost_per_1m': 60.00, 'output_cost_per_1m': 180.00}, - 'o3': {'input_cost_per_1m': 20.00, 'output_cost_per_1m': 80.00}, - 'o3-2025-04-16': {'input_cost_per_1m': 20.00, 'output_cost_per_1m': 80.00}, - 'o3-mini': {'input_cost_per_1m': 4.00, 'output_cost_per_1m': 16.00}, - 'o3-mini-2025-01-31': {'input_cost_per_1m': 4.00, 'output_cost_per_1m': 16.00}, - 'o3-pro': {'input_cost_per_1m': 80.00, 'output_cost_per_1m': 240.00}, - 'o3-pro-2025-06-10': {'input_cost_per_1m': 80.00, 'output_cost_per_1m': 240.00}, - 'o3-deep-research': {'input_cost_per_1m': 100.00, 'output_cost_per_1m': 300.00}, - 'o3-deep-research-2025-06-26': {'input_cost_per_1m': 100.00, 'output_cost_per_1m': 300.00}, - - # Anthropic Claude family - 'claude-3-5-sonnet-20241022': {'input_cost_per_1m': 3.00, 'output_cost_per_1m': 15.00}, - 'claude-3-5-sonnet-latest': {'input_cost_per_1m': 3.00, 'output_cost_per_1m': 15.00}, - 'claude-3-5-sonnet-20240620': {'input_cost_per_1m': 3.00, 'output_cost_per_1m': 15.00}, - 'claude-3-5-haiku-20241022': {'input_cost_per_1m': 0.80, 'output_cost_per_1m': 4.00}, - 'claude-3-5-haiku-latest': {'input_cost_per_1m': 0.80, 'output_cost_per_1m': 4.00}, - 'claude-3-7-sonnet-20250219': {'input_cost_per_1m': 3.00, 'output_cost_per_1m': 15.00}, - 'claude-3-7-sonnet-latest': {'input_cost_per_1m': 3.00, 'output_cost_per_1m': 15.00}, - 'claude-3-opus-20240229': {'input_cost_per_1m': 15.00, 'output_cost_per_1m': 75.00}, - 'claude-3-opus-latest': {'input_cost_per_1m': 15.00, 'output_cost_per_1m': 75.00}, - 'claude-3-haiku-20240307': {'input_cost_per_1m': 0.25, 'output_cost_per_1m': 1.25}, - 'claude-4-opus-20250514': {'input_cost_per_1m': 20.00, 'output_cost_per_1m': 100.00}, - 'claude-4-sonnet-20250514': {'input_cost_per_1m': 4.00, 'output_cost_per_1m': 20.00}, - 'claude-opus-4-0': {'input_cost_per_1m': 20.00, 'output_cost_per_1m': 100.00}, - 'claude-opus-4-1-20250805': {'input_cost_per_1m': 20.00, 'output_cost_per_1m': 100.00}, - 'claude-opus-4-20250514': {'input_cost_per_1m': 20.00, 'output_cost_per_1m': 100.00}, - 'claude-sonnet-4-0': {'input_cost_per_1m': 4.00, 'output_cost_per_1m': 20.00}, - 'claude-sonnet-4-20250514': {'input_cost_per_1m': 4.00, 'output_cost_per_1m': 20.00}, - - # Google Gemini family - 'gemini-2.0-flash': {'input_cost_per_1m': 0.075, 'output_cost_per_1m': 0.30}, - 'gemini-2.0-flash-lite': {'input_cost_per_1m': 0.0375, 'output_cost_per_1m': 0.15}, - 'gemini-2.5-flash': {'input_cost_per_1m': 0.075, 'output_cost_per_1m': 0.30}, - 'gemini-2.5-flash-lite': {'input_cost_per_1m': 0.0375, 'output_cost_per_1m': 0.15}, - 'gemini-2.5-pro': {'input_cost_per_1m': 1.25, 'output_cost_per_1m': 5.00}, - 'gemini-1.5-pro': {'input_cost_per_1m': 1.25, 'output_cost_per_1m': 5.00}, - 'gemini-1.5-flash': {'input_cost_per_1m': 0.075, 'output_cost_per_1m': 0.30}, - 'gemini-1.0-pro': {'input_cost_per_1m': 0.50, 'output_cost_per_1m': 1.50}, - - # Groq / Meta Llama - 'llama-3.3-70b-versatile': {'input_cost_per_1m': 0.59, 'output_cost_per_1m': 0.79}, - 'llama-3.1-8b-instant': {'input_cost_per_1m': 0.05, 'output_cost_per_1m': 0.05}, - 'llama3-70b-8192': {'input_cost_per_1m': 0.59, 'output_cost_per_1m': 0.79}, - 'llama3-8b-8192': {'input_cost_per_1m': 0.05, 'output_cost_per_1m': 0.05}, - 'mixtral-8x7b-32768': {'input_cost_per_1m': 0.24, 'output_cost_per_1m': 0.24}, - 'gemma2-9b-it': {'input_cost_per_1m': 0.10, 'output_cost_per_1m': 0.10}, - - # Mistral - 'mistral-large-latest': {'input_cost_per_1m': 2.00, 'output_cost_per_1m': 6.00}, - 'mistral-small-latest': {'input_cost_per_1m': 1.00, 'output_cost_per_1m': 3.00}, - 'codestral-latest': {'input_cost_per_1m': 0.20, 'output_cost_per_1m': 0.20}, - - # Cohere - 'command': {'input_cost_per_1m': 1.00, 'output_cost_per_1m': 2.00}, - 'command-light': {'input_cost_per_1m': 0.30, 'output_cost_per_1m': 0.30}, - 'command-r': {'input_cost_per_1m': 0.50, 'output_cost_per_1m': 1.50}, - 'command-r-plus': {'input_cost_per_1m': 3.00, 'output_cost_per_1m': 15.00}, - - # DeepSeek - 'deepseek-chat': {'input_cost_per_1m': 0.14, 'output_cost_per_1m': 0.28}, - 'deepseek-reasoner': {'input_cost_per_1m': 0.55, 'output_cost_per_1m': 2.19}, - - # xAI Grok - 'grok-4': {'input_cost_per_1m': 0.01, 'output_cost_per_1m': 0.03}, - 'grok-4-0709': {'input_cost_per_1m': 0.01, 'output_cost_per_1m': 0.03}, - 'grok-3': {'input_cost_per_1m': 0.01, 'output_cost_per_1m': 0.03}, - 'grok-3-mini': {'input_cost_per_1m': 0.01, 'output_cost_per_1m': 0.03}, - 'grok-3-fast': {'input_cost_per_1m': 0.01, 'output_cost_per_1m': 0.03}, - 'grok-3-mini-fast': {'input_cost_per_1m': 0.01, 'output_cost_per_1m': 0.03}, - - # Moonshot / Kimi - 'moonshot-v1-8k': {'input_cost_per_1m': 0.012, 'output_cost_per_1m': 0.012}, - 'moonshot-v1-32k': {'input_cost_per_1m': 0.024, 'output_cost_per_1m': 0.024}, - 'moonshot-v1-128k': {'input_cost_per_1m': 0.06, 'output_cost_per_1m': 0.06}, - 'kimi-latest': {'input_cost_per_1m': 0.012, 'output_cost_per_1m': 0.012}, - 'kimi-thinking-preview': {'input_cost_per_1m': 0.012, 'output_cost_per_1m': 0.012}, - - # Open source / Cerebras / Hugging Face - 'gpt-oss-120b': {'input_cost_per_1m': 0.10, 'output_cost_per_1m': 0.10}, - 'llama3.1-8b': {'input_cost_per_1m': 0.05, 'output_cost_per_1m': 0.05}, - 'llama-3.3-70b': {'input_cost_per_1m': 0.20, 'output_cost_per_1m': 0.20}, - 'llama-4-scout-17b-16e-instruct': {'input_cost_per_1m': 0.15, 'output_cost_per_1m': 0.15}, - 'llama-4-maverick-17b-128e-instruct': {'input_cost_per_1m': 0.15, 'output_cost_per_1m': 0.15}, - 'qwen-3-235b-a22b-instruct-2507': {'input_cost_per_1m': 0.30, 'output_cost_per_1m': 0.30}, - 'qwen-3-32b': {'input_cost_per_1m': 0.10, 'output_cost_per_1m': 0.10}, - 'qwen-3-coder-480b': {'input_cost_per_1m': 0.50, 'output_cost_per_1m': 0.50}, - 'qwen-3-235b-a22b-thinking-2507': {'input_cost_per_1m': 0.30, 'output_cost_per_1m': 0.30}, - - # HuggingFace / Together AI paths - 'Qwen/QwQ-32B': {'input_cost_per_1m': 0.10, 'output_cost_per_1m': 0.10}, - 'Qwen/Qwen2.5-72B-Instruct': {'input_cost_per_1m': 0.20, 'output_cost_per_1m': 0.20}, - 'Qwen/Qwen3-235B-A22B': {'input_cost_per_1m': 0.30, 'output_cost_per_1m': 0.30}, - 'Qwen/Qwen3-32B': {'input_cost_per_1m': 0.10, 'output_cost_per_1m': 0.10}, - 'deepseek-ai/DeepSeek-R1': {'input_cost_per_1m': 0.55, 'output_cost_per_1m': 2.19}, - 'meta-llama/Llama-3.3-70B-Instruct': {'input_cost_per_1m': 0.20, 'output_cost_per_1m': 0.20}, - 'meta-llama/Llama-4-Maverick-17B-128E-Instruct': {'input_cost_per_1m': 0.15, 'output_cost_per_1m': 0.15}, - 'meta-llama/Llama-4-Scout-17B-16E-Instruct': {'input_cost_per_1m': 0.15, 'output_cost_per_1m': 0.15}, - - # Test model - 'test': {'input_cost_per_1m': 0.00, 'output_cost_per_1m': 0.00}, -} -# Default pricing for unknown models -DEFAULT_PRICING: Dict[str, float] = { - 'input_cost_per_1m': 0.50, - 'output_cost_per_1m': 1.50 +MODEL_CONTEXT_WINDOWS: Dict[str, Optional[int]] = { + # GPT-5 family + 'gpt-5.2': 400000, + 'gpt-5.1': 400000, + 'gpt-5': 400000, + 'gpt-5-mini': 400000, + 'gpt-5-nano': 400000, + 'gpt-5.2-chat-latest': 128000, + 'gpt-5.1-chat-latest': 128000, + 'gpt-5-chat-latest': 128000, + 'gpt-5.1-codex-max': 400000, + 'gpt-5.1-codex': 400000, + 'gpt-5.1-codex-mini': 400000, + 'gpt-5-codex': 400000, + 'gpt-5.2-pro': 400000, + 'gpt-5-pro': 400000, + 'gpt-5-search-api': 128000, + + # GPT-4.1 family + 'gpt-4.1': 1047576, + 'gpt-4.1-mini': 1047576, + 'gpt-4.1-nano': 1047576, + + # GPT-4o family + 'gpt-4o': 128000, + 'gpt-4o-2024-05-13': 128000, + 'gpt-4o-mini': 128000, + 'gpt-4o-search-preview': 128000, + 'gpt-4o-mini-search-preview': 128000, + + # GPT Realtime models + 'gpt-realtime': 32000, + 'gpt-realtime-mini': 32000, + 'gpt-4o-realtime-preview': 32000, + 'gpt-4o-mini-realtime-preview': 16000, + + # GPT Audio models + 'gpt-audio': 128000, + 'gpt-audio-mini': 128000, + 'gpt-4o-audio-preview': 128000, + 'gpt-4o-mini-audio-preview': 128000, + + # O-series reasoning models + 'o1': 200000, + 'o1-pro': 200000, + 'o1-mini': 128000, + 'o3': 200000, + 'o3-pro': 200000, + 'o3-mini': 200000, + 'o3-deep-research': 200000, + 'o4-mini': 200000, + 'o4-mini-deep-research': 200000, + + # Codex and special models + 'codex-mini-latest': 200000, + 'computer-use-preview': 8192, + + # Image models (no text context window) + 'gpt-image-1.5': None, + 'chatgpt-image-latest': None, + 'gpt-image-1': None, + 'gpt-image-1-mini': None, + + 'claude-3-5-sonnet-20241022': 200000, + 'claude-3-5-sonnet-latest': 200000, + 'claude-3-5-sonnet-20240620': 200000, + 'claude-3-5-haiku-20241022': 200000, + 'claude-3-5-haiku-latest': 200000, + 'claude-3-7-sonnet-20250219': 200000, + 'claude-3-7-sonnet-latest': 200000, + 'claude-3-opus-20240229': 200000, + 'claude-3-opus-latest': 200000, + 'claude-3-haiku-20240307': 200000, + 'claude-4-opus-20250514': 200000, + 'claude-4-sonnet-20250514': 200000, + 'claude-opus-4-0': 200000, + 'claude-opus-4-1-20250805': 200000, + 'claude-opus-4-20250514': 200000, + 'claude-sonnet-4-0': 200000, + 'claude-sonnet-4-20250514': 200000, + + 'gemini-3-pro-preview': 1048576, + 'gemini-3-pro-image-preview': 65536, + 'gemini-3-flash-preview': 1048576, + 'gemini-2.5-flash': 1048576, + 'gemini-2.5-flash-preview-09-2025': 1048576, + 'gemini-2.5-flash-image': 65536, + 'gemini-2.5-flash-lite': 1048576, + 'gemini-2.5-flash-lite-preview-09-2025': 1048576, + 'gemini-2.5-pro': 1048576, + 'gemini-2.0-flash': 1048576, + 'gemini-2.0-flash-lite': 1048576, + 'gemini-1.5-pro': 1048576, + 'gemini-1.5-flash': 1048576, + 'gemini-1.0-pro': 32000, + + 'llama-3-8b': 8000, + 'llama-3-70b': 8000, + 'llama-3.1-8b': 128000, + 'llama-3.1-8b-instant': 128000, + 'llama-3.1-70b': 128000, + 'llama-3.1-405b': 128000, + 'llama-3.2-1b': 128000, + 'llama-3.2-3b': 128000, + 'llama-3.2-11b-vision': 128000, + 'llama-3.2-90b-vision': 128000, + 'llama-3.3-70b': 128000, + 'llama-3.3-70b-versatile': 128000, + 'llama3-70b-8192': 8192, + 'llama3-8b-8192': 8192, + 'llama-4-maverick': 128000, + 'llama-4-maverick-17b-128e-instruct': 128000, + 'llama-4-scout': 10000000, + 'llama-4-scout-17b-16e-instruct': 10000000, + 'meta-llama/Llama-3.3-70B-Instruct': 128000, + 'meta-llama/Llama-4-Maverick-17B-128E-Instruct': 128000, + 'meta-llama/Llama-4-Scout-17B-16E-Instruct': 10000000, + + 'grok-3': 131000, + 'grok-3-mini': None, + 'grok-3-fast': 131000, + 'grok-3-mini-fast': None, + 'grok-4': 256000, + 'grok-4-0709': 256000, + 'grok-4-fast-reasoning': None, + 'grok-4.1': 256000, + 'grok-4-1-fast-reasoning': 2000000, + 'grok-4-1-fast-non-reasoning': 2000000, + 'grok-code-fast-1': 2000000, + + 'qwen-max': 32000, + 'qwen-plus': 1000000, + 'qwen-turbo': 1000000, + 'qwen-flash': 1000000, + 'qwen-long': 10000000, + 'qwen3-0.6b': 32000, + 'qwen3-1.7b': 32000, + 'qwen3-4b': 32000, + 'qwen3-8b': 128000, + 'qwen3-14b': 128000, + 'qwen3-32b': 128000, + 'qwen-3-32b': 128000, + 'qwen3-235b-a22b': 128000, + 'qwen-3-235b-a22b-instruct-2507': 128000, + 'qwen3-235b-a22b-thinking': 128000, + 'qwen-3-235b-a22b-thinking-2507': 128000, + 'qwen3-coder-plus': 1000000, + 'qwen-3-coder-480b': 1000000, + 'qwen-vl-plus': None, + 'qwen-vl-max': None, + 'qwen-audio-turbo': None, + 'Qwen/QwQ-32B': 128000, + 'Qwen/Qwen2.5-72B-Instruct': 128000, + 'Qwen/Qwen3-235B-A22B': 128000, + 'Qwen/Qwen3-32B': 128000, + + 'deepseek-v2-base': 128000, + 'deepseek-v2-chat': 128000, + 'deepseek-v3-base': 128000, + 'deepseek-v3': 64000, + 'deepseek-chat': 64000, + 'deepseek-v3.1': 128000, + 'deepseek-v3.2-exp': 128000, + 'deepseek-r1': 128000, + 'deepseek-reasoner': 64000, + 'deepseek-r1-zero': 128000, + 'deepseek-r1-distill-qwen-1.5b': 128000, + 'deepseek-r1-distill-qwen-7b': 128000, + 'deepseek-r1-distill-llama-8b': 128000, + 'deepseek-r1-distill-qwen-14b': 128000, + 'deepseek-r1-distill-qwen-32b': 128000, + 'deepseek-r1-distill-llama-70b': 128000, + 'deepseek-coder-1.3b': 16000, + 'deepseek-coder-6.7b': 16000, + 'deepseek-coder-33b': 16000, + 'deepseek-coder-v2-lite-base': 128000, + 'deepseek-coder-v2-base': 128000, + 'deepseek-coder-v2-instruct': 128000, + 'deepseek-math-v2': 128000, + 'deepseek-prover-v2-7b': 128000, + 'deepseek-prover-v2-671b': 128000, + 'deepseek-vl-7b-chat': None, + 'deepseek-ocr-3b': None, + 'janus-pro-7b': None, + 'deepseek-ai/DeepSeek-R1': 128000, + + 'mistral-7b-v0.1': 8000, + 'open-mistral-7b': 8000, + 'mistral-7b-v0.2': 32000, + 'mistral-7b-v0.3': 32000, + 'mistral-small': 32000, + 'mistral-small-latest': 128000, + 'mistral-small-2409': 128000, + 'mistral-small-3-1': 128000, + 'mistral-small-2506': 128000, + 'mistral-medium-3': 128000, + 'mistral-medium-3-1': 128000, + 'mistral-large': 32000, + 'mistral-large-latest': 128000, + 'mistral-large-2407': 128000, + 'mistral-large-2411': 131000, + 'mistral-large-3': 256000, + 'mistral-nemo': 128000, + 'mixtral-8x7b': 32000, + 'open-mixtral-8x7b': 32000, + 'mixtral-8x7b-32768': 32768, + 'mixtral-8x22b': 64000, + 'open-mixtral-8x22b': 64000, + 'mixtral-8x22b-v0.3': 64000, + 'ministral-3b-2410': 128000, + 'ministral-8b-2410': 128000, + 'ministral-14b': 128000, + 'codestral-2405': 256000, + 'codestral-latest': 256000, + 'codestral-2501': 256000, + 'codestral-2508': 256000, + 'codestral-mamba': None, + 'devstral-small-2507': None, + 'devstral-medium-2507': None, + 'mathstral-7b-v0.1': 32000, + 'pixtral-12b-2409': 128000, + 'pixtral-large-2411': 131000, + 'magistral-small-2507': None, + 'magistral-medium-2507': None, + 'voxtral-small-2507': 32000, + 'voxtral-mini-2507': None, + 'command': 4096, + 'command-light': 4096, + 'command-r': 128000, + 'command-r-plus': 128000, + 'gemma2-9b-it': 8192, + 'gpt-oss-120b': None, + 'llama3.1-8b': 128000, + + 'test': None, } -# Provider prefixes to strip from model names -PROVIDER_PREFIXES = [ +DEFAULT_CONTEXT_WINDOW: Optional[int] = None + +PROVIDER_PREFIXES: list[str] = [ 'anthropic:', 'google-gla:', 'google-vertex:', @@ -190,10 +263,6 @@ ] -# ============================================================================= -# Helper Functions -# ============================================================================= - def get_model_name(model: Union["Model", str]) -> str: """ Extract the model name from a Model instance or string. @@ -219,16 +288,15 @@ def get_model_name(model: Union["Model", str]) -> str: return str(model) -def get_model_pricing(model_name: str) -> Dict[str, float]: +def normalize_model_name(model_name: str) -> str: """ - Get pricing data for a model. + Normalize a model name by stripping provider prefixes. Args: - model_name: The model name to look up pricing for. + model_name: The model name to normalize. Returns: - Dictionary with 'input_cost_per_1m' and 'output_cost_per_1m' keys. - Returns default pricing if model is not found. + The normalized model name without provider prefixes. """ # Handle case where model_name might be a coroutine (in tests) if hasattr(model_name, '__await__'): @@ -243,12 +311,24 @@ def get_model_pricing(model_name: str) -> Dict[str, float]: model_name = model_name[len(prefix):] break - return MODEL_PRICING.get(model_name, DEFAULT_PRICING) + return model_name -# ============================================================================= -# Cost Calculation Functions -# ============================================================================= +def get_model_context_window(model: Union["Model", str]) -> Optional[int]: + """ + Get the context window size for a model. + + Args: + model: Model instance or model name string. + + Returns: + The context window size in tokens, or None if unknown. + """ + model_name = get_model_name(model) + normalized_name = normalize_model_name(model_name) + + return MODEL_CONTEXT_WINDOWS.get(normalized_name, DEFAULT_CONTEXT_WINDOW) + def calculate_cost( input_tokens: int, @@ -261,9 +341,7 @@ def calculate_cost( """ Calculate the cost in dollars based on token usage and model. - This is the primary cost calculation function that returns a float value. - It first attempts to use genai_prices library for accurate pricing, - then falls back to built-in pricing data. + Uses the genai_prices library for accurate pricing calculations. Args: input_tokens: Number of input/prompt tokens. @@ -275,6 +353,9 @@ def calculate_cost( Returns: The calculated cost as a float (in dollars). + + Raises: + ImportError: If genai_prices library is not installed. """ if input_tokens is None or output_tokens is None: return 0.0 @@ -288,51 +369,31 @@ def calculate_cost( except (ValueError, TypeError): return 0.0 - # Try genai_prices first for accurate pricing - try: - from genai_prices import calculate_cost as genai_calculate_cost - from upsonic.usage import RequestUsage - - usage = RequestUsage( - input_tokens=input_tokens, - output_tokens=output_tokens, - cache_write_tokens=cache_write_tokens, - cache_read_tokens=cache_read_tokens, - ) - - model_name = get_model_name(model) - cost = genai_calculate_cost(usage, model_name) - - # Add reasoning token cost if applicable (o1/o3 models) - if reasoning_tokens > 0: - pricing = get_model_pricing(model_name) - # Reasoning tokens are typically priced at output rate - reasoning_cost = (reasoning_tokens / 1_000_000) * pricing['output_cost_per_1m'] - cost += reasoning_cost - - return float(cost) - except ImportError: - pass - except Exception: - pass - - # Fall back to built-in pricing + from genai_prices import calc_price + from upsonic.usage import RequestUsage + + usage = RequestUsage( + input_tokens=input_tokens, + output_tokens=output_tokens, + cache_write_tokens=cache_write_tokens, + cache_read_tokens=cache_read_tokens, + ) + model_name = get_model_name(model) - pricing = get_model_pricing(model_name) - - input_cost = (input_tokens / 1_000_000) * pricing['input_cost_per_1m'] - output_cost = (output_tokens / 1_000_000) * pricing['output_cost_per_1m'] - - # Cache tokens typically have different pricing (usually discounted) - # For simplicity, we use 50% of input rate for cache reads, 100% for writes - cache_read_cost = (cache_read_tokens / 1_000_000) * (pricing['input_cost_per_1m'] * 0.5) - cache_write_cost = (cache_write_tokens / 1_000_000) * pricing['input_cost_per_1m'] - - # Reasoning tokens are typically priced at output rate - reasoning_cost = (reasoning_tokens / 1_000_000) * pricing['output_cost_per_1m'] - - total_cost = input_cost + output_cost + cache_read_cost + cache_write_cost + reasoning_cost - return total_cost + price_calc = calc_price(usage, model_name) + cost = float(price_calc.total_price) + + if reasoning_tokens > 0: + reasoning_usage = RequestUsage( + input_tokens=0, + output_tokens=reasoning_tokens, + cache_write_tokens=0, + cache_read_tokens=0, + ) + reasoning_price = calc_price(reasoning_usage, model_name) + cost += float(reasoning_price.total_price) + + return cost def calculate_cost_from_usage( @@ -438,10 +499,6 @@ def calculate_cost_from_agent(agent: Any) -> float: return 0.0 -# ============================================================================= -# Formatted String Functions (for display) -# ============================================================================= - def format_cost(cost: float, approximate: bool = True) -> str: """ Format a cost value as a string. @@ -471,9 +528,6 @@ def get_estimated_cost( """ Calculate and format estimated cost as a string. - This function is provided for backward compatibility and display purposes. - For programmatic use, prefer calculate_cost() which returns a float. - Args: input_tokens: Number of input/prompt tokens. output_tokens: Number of output/completion tokens. @@ -534,4 +588,3 @@ def get_estimated_cost_from_agent(agent: Any) -> str: """ cost = calculate_cost_from_agent(agent) return format_cost(cost, approximate=True) -
tests/conftest.py+8 −6 modified@@ -1,14 +1,16 @@ """Pytest configuration to handle DisallowedOperation exceptions gracefully.""" -import os import pytest import sys from pathlib import Path -# Load environment variables from .env file before any tests run +_root = Path(__file__).resolve().parent.parent +_src = _root / "src" +if _src.exists() and str(_src) not in sys.path: + sys.path.insert(0, str(_src)) + try: from dotenv import load_dotenv - # Load .env from project root - env_path = Path(__file__).parent.parent / ".env" + env_path = _root / ".env" if env_path.exists(): load_dotenv(env_path) print(f"✓ Loaded environment variables from: {env_path}") @@ -17,7 +19,7 @@ except ImportError: print("⚠️ python-dotenv not installed. Install with: pip install python-dotenv") -from upsonic.safety_engine.exceptions import DisallowedOperation +from upsonic.safety_engine.exceptions import DisallowedOperation # noqa: E402 @pytest.hookimpl(tryfirst=True, hookwrapper=True) @@ -40,7 +42,7 @@ def pytest_runtest_makereport(item, call): if is_disallowed: # Print the error message print(f"\n⚠️ DisallowedOperation caught in {item.name}: {exc_value}", file=sys.stderr) - print(f" Handling gracefully - test will not fail\n", file=sys.stderr) + print(" Handling gracefully - test will not fail\n", file=sys.stderr) # Modify the report to mark as passed while preserving structure report.outcome = "passed"
tests/smoke_tests/agent/test_context_management_middleware.py+1095 −0 added@@ -0,0 +1,1095 @@ +""" +Tests for ContextManagementMiddleware. + +All tests use real objects — no mocking. Tests that exercise +LLM summarization make actual API calls to OpenAI. +""" + +import json +from typing import Any, List + +import pytest + +from upsonic.agent.context_managers.context_management_middleware import ( + CONTEXT_FULL_MESSAGE, + DEFAULT_KEEP_RECENT_COUNT, + ConversationSummary, + ContextManagementMiddleware, + SummarizedRequest, + SummarizedRequestPart, + SummarizedResponse, + SummarizedResponsePart, +) +from upsonic.messages import ( + ModelRequest, + ModelResponse, + SystemPromptPart, + TextPart, + ToolCallPart, + ToolReturnPart, + UserPromptPart, +) +from upsonic.models import infer_model +from upsonic.usage import RequestUsage + +pytestmark = pytest.mark.timeout(120) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _get_real_model(): + """Create a real model instance via the framework's infer_model.""" + return infer_model("openai/gpt-4o") + + +def _make_user_request(text: str) -> ModelRequest: + return ModelRequest(parts=[UserPromptPart(content=text)]) + + +def _make_system_request(text: str) -> ModelRequest: + return ModelRequest(parts=[SystemPromptPart(content=text)]) + + +def _make_text_response( + text: str, + input_tokens: int = 0, + output_tokens: int = 0, + model_name: str = "gpt-4o", +) -> ModelResponse: + return ModelResponse( + parts=[TextPart(content=text)], + model_name=model_name, + usage=RequestUsage(input_tokens=input_tokens, output_tokens=output_tokens), + ) + + +def _make_tool_call_response( + tool_name: str, + tool_call_id: str, + args: str = "{}", + input_tokens: int = 0, + output_tokens: int = 0, +) -> ModelResponse: + return ModelResponse( + parts=[ToolCallPart(tool_name=tool_name, tool_call_id=tool_call_id, args=args)], + usage=RequestUsage(input_tokens=input_tokens, output_tokens=output_tokens), + ) + + +def _make_tool_return_request( + tool_name: str, + tool_call_id: str, + content: str = "result", +) -> ModelRequest: + return ModelRequest( + parts=[ToolReturnPart(tool_name=tool_name, content=content, tool_call_id=tool_call_id)] + ) + + +def _total_content_chars(messages: List[Any]) -> int: + """Sum the character length of every part's content across all messages.""" + total: int = 0 + for msg in messages: + if not hasattr(msg, 'parts'): + continue + for part in msg.parts: + if hasattr(part, 'content'): + content = part.content + if isinstance(content, str): + total += len(content) + elif isinstance(content, (dict, list)): + total += len(json.dumps(content, default=str)) + else: + total += len(str(content)) + if hasattr(part, 'args'): + args = getattr(part, 'args', '') + if isinstance(args, str): + total += len(args) + elif isinstance(args, dict): + total += len(json.dumps(args, default=str)) + return total + + +def _old_messages_chars( + messages: List[Any], + keep_recent_count: int, +) -> int: + """Compute total content chars of only the 'old' portion that gets summarized. + + Excludes the system prompt (first message if it has SystemPromptPart) + and the last ``keep_recent_count`` non-system messages. + """ + non_system: List[Any] = [] + for i, msg in enumerate(messages): + if i == 0 and isinstance(msg, ModelRequest): + if any(isinstance(p, SystemPromptPart) for p in msg.parts): + continue + non_system.append(msg) + + if len(non_system) <= keep_recent_count: + return 0 + + old_msgs = non_system[:-keep_recent_count] + return _total_content_chars(old_msgs) + + +def _build_conversation( + n_tool_pairs: int, + input_tokens: int = 0, + output_tokens: int = 0, +) -> List[Any]: + """Build a conversation with a system prompt, user message, and n tool call/return pairs.""" + msgs: List[Any] = [ + _make_system_request("You are a helpful assistant."), + _make_user_request("Do something."), + _make_text_response("Sure, let me work on that.", input_tokens=input_tokens, output_tokens=output_tokens), + ] + for i in range(n_tool_pairs): + tc_id = f"tc_{i}" + msgs.append(_make_tool_call_response(f"tool_{i}", tc_id, f'{{"arg": {i}}}', input_tokens=input_tokens, output_tokens=output_tokens)) + msgs.append(_make_tool_return_request(f"tool_{i}", tc_id, f"result_{i}")) + return msgs + + +# =========================================================================== +# Pydantic schema tests +# =========================================================================== + +class TestPydanticSchemas: + def test_summarized_request_part_system_prompt(self) -> None: + part = SummarizedRequestPart(part_kind="system-prompt", content="You are helpful.") + assert part.part_kind == "system-prompt" + assert part.content == "You are helpful." + assert part.tool_name is None + assert part.tool_call_id is None + + def test_summarized_request_part_user_prompt(self) -> None: + part = SummarizedRequestPart(part_kind="user-prompt", content="Hello there") + assert part.part_kind == "user-prompt" + assert part.content == "Hello there" + + def test_summarized_request_part_tool_return(self) -> None: + part = SummarizedRequestPart( + part_kind="tool-return", + content="42", + tool_name="calculator", + tool_call_id="tc_1", + ) + assert part.tool_name == "calculator" + assert part.tool_call_id == "tc_1" + assert part.content == "42" + + def test_summarized_response_part_text(self) -> None: + part = SummarizedResponsePart(part_kind="text", content="Hello!") + assert part.part_kind == "text" + assert part.content == "Hello!" + + def test_summarized_response_part_tool_call(self) -> None: + part = SummarizedResponsePart( + part_kind="tool-call", + tool_name="search", + tool_call_id="tc_5", + args='{"q": "test"}', + ) + assert part.tool_name == "search" + assert part.args == '{"q": "test"}' + + def test_conversation_summary_round_trip(self) -> None: + summary = ConversationSummary(messages=[ + SummarizedRequest(parts=[ + SummarizedRequestPart(part_kind="user-prompt", content="Hi"), + ]), + SummarizedResponse(parts=[ + SummarizedResponsePart(part_kind="text", content="Hello!"), + ]), + ]) + json_str: str = summary.model_dump_json() + restored = ConversationSummary.model_validate_json(json_str) + assert len(restored.messages) == 2 + assert restored.messages[0].kind == "request" + assert restored.messages[1].kind == "response" + + def test_conversation_summary_json_schema_has_required_keys(self) -> None: + schema = ConversationSummary.model_json_schema() + assert "properties" in schema + assert "messages" in schema["properties"] + + def test_summarized_request_with_multiple_parts(self) -> None: + req = SummarizedRequest(parts=[ + SummarizedRequestPart(part_kind="system-prompt", content="sys"), + SummarizedRequestPart(part_kind="user-prompt", content="user"), + ]) + assert len(req.parts) == 2 + assert req.kind == "request" + + def test_summarized_response_with_mixed_parts(self) -> None: + resp = SummarizedResponse(parts=[ + SummarizedResponsePart(part_kind="text", content="thinking..."), + SummarizedResponsePart(part_kind="tool-call", tool_name="fn", tool_call_id="tc_1", args="{}"), + ]) + assert len(resp.parts) == 2 + assert resp.kind == "response" + + +# =========================================================================== +# _get_max_context_window (real model) +# =========================================================================== + +class TestGetMaxContextWindow: + def test_returns_known_model_window(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + result = mw._get_max_context_window() + assert result is not None + assert result == 128_000 + + def test_returns_none_for_unknown_model(self) -> None: + model = _get_real_model() + original_name = model.model_name + model._model_name = "totally-unknown-model-xyz-999" + object.__setattr__(model, '_model_name_override', "totally-unknown-model-xyz-999") + + mw = ContextManagementMiddleware(model=model) + # Temporarily override model_name property for this test + import types + mw._get_max_context_window_original = mw._get_max_context_window + + def patched_get(self_mw=mw) -> int | None: + from upsonic.utils.usage import get_model_context_window + return get_model_context_window("totally-unknown-model-xyz-999") + + mw._get_max_context_window = types.MethodType(lambda self: patched_get(), mw) + result = mw._get_max_context_window() + assert result is None + + +# =========================================================================== +# _estimate_message_tokens +# =========================================================================== + +class TestEstimateMessageTokens: + def test_usage_based_accumulation(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + messages = [ + _make_user_request("Hello"), + _make_text_response("Hi", input_tokens=100, output_tokens=20), + _make_user_request("Next question"), + _make_text_response("Answer", input_tokens=200, output_tokens=30), + ] + tokens = mw._estimate_message_tokens(messages) + assert tokens == (100 + 200) + (20 + 30) + + def test_single_response_usage(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + messages = [ + _make_user_request("Hey"), + _make_text_response("World", input_tokens=50, output_tokens=10), + ] + assert mw._estimate_message_tokens(messages) == 60 + + def test_char_fallback_when_no_usage(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + text = "a" * 400 # 400 chars -> 100 tokens + messages = [_make_user_request(text)] + assert mw._estimate_message_tokens(messages) == 100 + + def test_char_fallback_with_zero_usage(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + messages = [ + _make_user_request("a" * 80), + _make_text_response("b" * 80, input_tokens=0, output_tokens=0), + ] + assert mw._estimate_message_tokens(messages) == (80 + 80) // 4 + + def test_tool_parts_counted_in_fallback(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + msgs = [_make_tool_return_request("my_tool", "tc_1", "x" * 200)] + tokens = mw._estimate_message_tokens(msgs) + assert tokens > 0 + + def test_mixed_usage_and_no_usage(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + messages = [ + _make_user_request("Hello"), + _make_text_response("Hi", input_tokens=500, output_tokens=100), + _make_user_request("More"), + _make_text_response("End", input_tokens=0, output_tokens=0), + ] + assert mw._estimate_message_tokens(messages) == 500 + 100 + + def test_multiple_runs_accumulated(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + messages = [ + _make_user_request("Run 1"), + _make_text_response("R1 answer", input_tokens=1000, output_tokens=200), + _make_user_request("Run 2"), + _make_text_response("R2 answer", input_tokens=2000, output_tokens=300), + _make_user_request("Run 3"), + _make_text_response("R3 answer", input_tokens=3000, output_tokens=400), + ] + # All input+output accumulated: (1000+2000+3000) + (200+300+400) = 6900 + assert mw._estimate_message_tokens(messages) == 6900 + + +# =========================================================================== +# _is_context_exceeded +# =========================================================================== + +class TestIsContextExceeded: + def test_not_exceeded_with_small_context(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model, safety_margin_ratio=0.90) + messages = [_make_text_response("hi", input_tokens=100, output_tokens=10)] + assert mw._is_context_exceeded(messages) is False + + def test_exceeded_with_large_context(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model, safety_margin_ratio=0.90) + # gpt-4o has 128000 window; 90% = 115200; we exceed that + messages = [_make_text_response("hi", input_tokens=110_000, output_tokens=10_000)] + assert mw._is_context_exceeded(messages) is True + + def test_exactly_at_limit_not_exceeded(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model, safety_margin_ratio=0.90) + # 128000 * 0.90 = 115200; exactly 115200 is not exceeded (> not >=) + messages = [_make_text_response("hi", input_tokens=115_000, output_tokens=200)] + assert mw._is_context_exceeded(messages) is False + + def test_just_over_limit_exceeded(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model, safety_margin_ratio=0.90) + messages = [_make_text_response("hi", input_tokens=115_000, output_tokens=201)] + assert mw._is_context_exceeded(messages) is True + + def test_custom_safety_margin(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model, safety_margin_ratio=0.50) + # 128000 * 0.50 = 64000 + messages = [_make_text_response("hi", input_tokens=60_000, output_tokens=5_000)] + assert mw._is_context_exceeded(messages) is True + + +# =========================================================================== +# _prune_tool_call_history +# =========================================================================== + +class TestPruneToolCallHistory: + def test_no_pruning_when_under_limit(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model, keep_recent_count=10) + msgs = _build_conversation(3) + pruned = mw._prune_tool_call_history(msgs) + assert len(pruned) == len(msgs) + + def test_prunes_old_tool_messages(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model, keep_recent_count=2) + msgs = _build_conversation(5) + pruned = mw._prune_tool_call_history(msgs) + assert len(pruned) < len(msgs) + + tool_count = 0 + for msg in pruned: + for part in msg.parts: + if isinstance(part, (ToolCallPart, ToolReturnPart)): + tool_count += 1 + break + assert tool_count == 2 + + def test_returns_new_list_object(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model, keep_recent_count=10) + msgs = _build_conversation(2) + pruned = mw._prune_tool_call_history(msgs) + assert pruned is not msgs + + def test_non_tool_messages_preserved(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model, keep_recent_count=1) + msgs = [ + _make_system_request("system"), + _make_user_request("hello"), + _make_text_response("hi"), + _make_tool_call_response("t1", "tc_0"), + _make_tool_return_request("t1", "tc_0"), + _make_tool_call_response("t2", "tc_1"), + _make_tool_return_request("t2", "tc_1"), + _make_tool_call_response("t3", "tc_2"), + _make_tool_return_request("t3", "tc_2"), + ] + pruned = mw._prune_tool_call_history(msgs) + assert any(isinstance(p, SystemPromptPart) for m in pruned for p in m.parts) + assert any(isinstance(p, UserPromptPart) for m in pruned for p in m.parts) + assert any(isinstance(p, TextPart) for m in pruned for p in m.parts) + + def test_keep_recent_count_1(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model, keep_recent_count=1) + msgs = _build_conversation(4) + pruned = mw._prune_tool_call_history(msgs) + + tool_count = 0 + for msg in pruned: + for part in msg.parts: + if isinstance(part, (ToolCallPart, ToolReturnPart)): + tool_count += 1 + break + assert tool_count == 1 + + def test_empty_messages(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model, keep_recent_count=5) + pruned = mw._prune_tool_call_history([]) + assert pruned == [] + + +# =========================================================================== +# _serialize_messages_for_prompt +# =========================================================================== + +class TestSerializeMessagesForPrompt: + def test_system_prompt_serialized(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + msgs = [ModelRequest(parts=[SystemPromptPart(content="Be helpful.")])] + result = mw._serialize_messages_for_prompt(msgs) + assert "[system-prompt]" in result + assert "Be helpful." in result + + def test_user_prompt_serialized(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + msgs = [ModelRequest(parts=[UserPromptPart(content="What time is it?")])] + result = mw._serialize_messages_for_prompt(msgs) + assert "[user-prompt]" in result + assert "What time is it?" in result + + def test_text_response_serialized(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + msgs = [ModelResponse(parts=[TextPart(content="It's 3pm.")])] + result = mw._serialize_messages_for_prompt(msgs) + assert "[text]" in result + assert "It's 3pm." in result + + def test_tool_call_serialized(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + msgs = [ + ModelResponse(parts=[ + ToolCallPart(tool_name="clock", tool_call_id="tc_99", args='{"tz": "UTC"}'), + ]) + ] + result = mw._serialize_messages_for_prompt(msgs) + assert "[tool-call]" in result + assert "tool_name=clock" in result + assert "tool_call_id=tc_99" in result + + def test_tool_return_serialized(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + msgs = [_make_tool_return_request("search", "tc_7", "some result")] + result = mw._serialize_messages_for_prompt(msgs) + assert "[tool-return]" in result + assert "tool_name=search" in result + assert "tool_call_id=tc_7" in result + assert "some result" in result + + def test_message_indices_are_1_based(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + msgs = [_make_user_request("A"), _make_text_response("B")] + result = mw._serialize_messages_for_prompt(msgs) + assert "MESSAGE 1 [REQUEST]:" in result + assert "MESSAGE 2 [RESPONSE]:" in result + + def test_dict_tool_args_serialized(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + msgs = [ + ModelResponse(parts=[ + ToolCallPart(tool_name="fn", tool_call_id="tc_1", args={"key": "value"}), + ]) + ] + result = mw._serialize_messages_for_prompt(msgs) + assert '"key"' in result + assert '"value"' in result + + def test_mixed_request_parts(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + msgs = [ + ModelRequest(parts=[ + SystemPromptPart(content="System"), + UserPromptPart(content="User"), + ToolReturnPart(tool_name="fn", content="data", tool_call_id="tc_1"), + ]) + ] + result = mw._serialize_messages_for_prompt(msgs) + assert "[system-prompt]" in result + assert "[user-prompt]" in result + assert "[tool-return]" in result + + def test_empty_list_returns_empty_string(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + result = mw._serialize_messages_for_prompt([]) + assert result == "" + + +# =========================================================================== +# _reconstruct_messages +# =========================================================================== + +class TestReconstructMessages: + def test_request_with_user_prompt(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + summary = ConversationSummary(messages=[ + SummarizedRequest(parts=[ + SummarizedRequestPart(part_kind="user-prompt", content="Hello"), + ]), + ]) + result = mw._reconstruct_messages(summary) + assert len(result) == 1 + assert isinstance(result[0], ModelRequest) + assert isinstance(result[0].parts[0], UserPromptPart) + assert result[0].parts[0].content == "Hello" + + def test_request_with_system_prompt(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + summary = ConversationSummary(messages=[ + SummarizedRequest(parts=[ + SummarizedRequestPart(part_kind="system-prompt", content="Be nice"), + ]), + ]) + result = mw._reconstruct_messages(summary) + assert isinstance(result[0].parts[0], SystemPromptPart) + assert result[0].parts[0].content == "Be nice" + + def test_request_with_tool_return(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + summary = ConversationSummary(messages=[ + SummarizedRequest(parts=[ + SummarizedRequestPart( + part_kind="tool-return", + content="42", + tool_name="calc", + tool_call_id="tc_1", + ), + ]), + ]) + result = mw._reconstruct_messages(summary) + part = result[0].parts[0] + assert isinstance(part, ToolReturnPart) + assert part.tool_name == "calc" + assert part.content == "42" + assert part.tool_call_id == "tc_1" + + def test_response_with_text(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + summary = ConversationSummary(messages=[ + SummarizedResponse(parts=[ + SummarizedResponsePart(part_kind="text", content="Answer"), + ]), + ]) + result = mw._reconstruct_messages(summary) + assert len(result) == 1 + assert isinstance(result[0], ModelResponse) + assert isinstance(result[0].parts[0], TextPart) + assert result[0].parts[0].content == "Answer" + + def test_response_with_tool_call(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + summary = ConversationSummary(messages=[ + SummarizedResponse(parts=[ + SummarizedResponsePart( + part_kind="tool-call", + tool_name="search", + tool_call_id="tc_5", + args='{"q": "test"}', + ), + ]), + ]) + result = mw._reconstruct_messages(summary) + part = result[0].parts[0] + assert isinstance(part, ToolCallPart) + assert part.tool_name == "search" + assert part.tool_call_id == "tc_5" + assert part.args == '{"q": "test"}' + + def test_response_model_name_set(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + summary = ConversationSummary(messages=[ + SummarizedResponse(parts=[ + SummarizedResponsePart(part_kind="text", content="ok"), + ]), + ]) + result = mw._reconstruct_messages(summary) + assert result[0].model_name == "gpt-4o" + + def test_empty_parts_skipped(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + summary = ConversationSummary(messages=[ + SummarizedRequest(parts=[]), + SummarizedResponse(parts=[]), + ]) + result = mw._reconstruct_messages(summary) + assert len(result) == 0 + + def test_full_conversation_reconstruction(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + summary = ConversationSummary(messages=[ + SummarizedRequest(parts=[ + SummarizedRequestPart(part_kind="system-prompt", content="You are helpful."), + SummarizedRequestPart(part_kind="user-prompt", content="Search for X."), + ]), + SummarizedResponse(parts=[ + SummarizedResponsePart(part_kind="tool-call", tool_name="search", tool_call_id="tc_1", args='{"q":"X"}'), + ]), + SummarizedRequest(parts=[ + SummarizedRequestPart(part_kind="tool-return", content="Found X", tool_name="search", tool_call_id="tc_1"), + ]), + SummarizedResponse(parts=[ + SummarizedResponsePart(part_kind="text", content="Here is X."), + ]), + ]) + result = mw._reconstruct_messages(summary) + assert len(result) == 4 + assert isinstance(result[0], ModelRequest) + assert isinstance(result[1], ModelResponse) + assert isinstance(result[2], ModelRequest) + assert isinstance(result[3], ModelResponse) + + def test_tool_return_without_ids_uses_empty_string(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + summary = ConversationSummary(messages=[ + SummarizedRequest(parts=[ + SummarizedRequestPart(part_kind="tool-return", content="data"), + ]), + ]) + result = mw._reconstruct_messages(summary) + part = result[0].parts[0] + assert isinstance(part, ToolReturnPart) + assert part.tool_name == "" + assert part.tool_call_id == "" + + +# =========================================================================== +# _build_context_full_response +# =========================================================================== + +class TestBuildContextFullResponse: + def test_contains_context_full_message(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + resp = mw._build_context_full_response(model_name="gpt-4o") + assert isinstance(resp, ModelResponse) + assert len(resp.parts) == 1 + assert isinstance(resp.parts[0], TextPart) + assert resp.parts[0].content == CONTEXT_FULL_MESSAGE + + def test_model_name_propagated(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + resp = mw._build_context_full_response(model_name="my-model") + assert resp.model_name == "my-model" + + def test_finish_reason_is_length(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + resp = mw._build_context_full_response() + assert resp.finish_reason == "length" + + def test_none_model_name(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + resp = mw._build_context_full_response(model_name=None) + assert resp.model_name is None + + def test_timestamp_is_set(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + resp = mw._build_context_full_response() + assert resp.timestamp is not None + + +# =========================================================================== +# _summarize_old_messages (real LLM call) +# =========================================================================== + +class TestSummarizeOldMessages: + @pytest.mark.asyncio + async def test_short_conversation_unchanged(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model, keep_recent_count=5) + msgs = [_make_user_request("A"), _make_text_response("B")] + result = await mw._summarize_old_messages(msgs) + assert len(result) == 2 + + @pytest.mark.asyncio + async def test_system_prompt_preserved(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model, keep_recent_count=2) + + msgs = [ + _make_system_request("You are a math tutor who explains concepts step by step."), + _make_user_request( + "Can you explain to me what calculus is, how it was invented, " + "who are the key historical figures involved in its development, " + "and what are the main branches of calculus used today?" + ), + _make_text_response( + "Calculus is a branch of mathematics that deals with continuous change. " + "It was independently developed by Isaac Newton and Gottfried Wilhelm Leibniz " + "in the late 17th century. Newton focused on fluxions and their applications to " + "physics, while Leibniz developed a more systematic notation that is still used today. " + "The main branches are differential calculus, which studies rates of change and slopes " + "of curves, and integral calculus, which studies accumulation of quantities and areas " + "under curves. The Fundamental Theorem of Calculus connects these two branches." + ), + _make_user_request( + "Now explain the difference between limits, derivatives, and integrals " + "with real-world examples for each one." + ), + _make_text_response( + "A limit describes the value a function approaches as its input approaches a point. " + "For example, speed at an exact instant is the limit of average speed over shorter intervals. " + "A derivative measures instantaneous rate of change — like a car's speedometer reading. " + "An integral accumulates quantities — like calculating total distance from a speed-vs-time graph. " + "In engineering, derivatives help optimize designs by finding maxima/minima, " + "while integrals help compute areas, volumes, and total accumulated quantities." + ), + _make_user_request("What is the Fundamental Theorem of Calculus?"), + _make_text_response("It connects differentiation and integration as inverse operations."), + ] + original_old_chars: int = _old_messages_chars(msgs, keep_recent_count=2) + result = await mw._summarize_old_messages(msgs) + + first_msg = result[0] + assert isinstance(first_msg, ModelRequest) + has_system = any(isinstance(p, SystemPromptPart) for p in first_msg.parts) + assert has_system + sys_part = [p for p in first_msg.parts if isinstance(p, SystemPromptPart)][0] + assert "math tutor" in sys_part.content.lower() + + result_old_chars: int = _old_messages_chars(result, keep_recent_count=2) + assert result_old_chars < original_old_chars, ( + f"Summarized old portion ({result_old_chars} chars) must be smaller " + f"than original old portion ({original_old_chars} chars)" + ) + + @pytest.mark.asyncio + async def test_recent_messages_kept_verbatim(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model, keep_recent_count=2) + + recent_1 = _make_user_request("KEEP_THIS_EXACT_MESSAGE_1") + recent_2 = _make_text_response("KEEP_THIS_EXACT_MESSAGE_2") + + msgs = [ + _make_system_request("You are a helpful research assistant."), + _make_user_request( + "I need a comprehensive analysis of the environmental impact of electric vehicles " + "compared to traditional combustion engine vehicles. Please cover manufacturing, " + "battery production, daily operation, and end-of-life recycling considerations." + ), + _make_text_response( + "Electric vehicles have a complex environmental profile. During manufacturing, EVs " + "require significant energy for battery production, particularly lithium-ion batteries " + "which involve mining lithium, cobalt, and nickel. The carbon footprint of producing an " + "EV battery can be 30-40% higher than producing a comparable combustion engine. However, " + "during operation, EVs produce zero tailpipe emissions. Over the vehicle's lifetime, " + "the total emissions depend heavily on the electricity grid mix. In regions with clean " + "energy grids, EVs can reduce lifetime emissions by 50-70%. Battery recycling remains " + "a challenge, though advances in lithium recovery technology are improving." + ), + _make_user_request( + "What about the impact of lithium mining on local water supplies and ecosystems? " + "Are there any sustainable alternatives being developed?" + ), + _make_text_response( + "Lithium mining has significant environmental consequences. In South America's Lithium " + "Triangle, brine extraction consumes vast quantities of water — approximately 500,000 " + "gallons per ton of lithium. This depletes aquifers and affects local agriculture. " + "Hard rock mining in Australia produces chemical waste and requires forest clearing. " + "Alternatives include sodium-ion batteries, solid-state batteries using abundant materials, " + "and direct lithium extraction (DLE) technology that reduces water usage by 90%. " + "Companies like Tesla are also researching iron-phosphate cathodes to eliminate cobalt." + ), + recent_1, + recent_2, + ] + original_old_chars: int = _old_messages_chars(msgs, keep_recent_count=2) + result = await mw._summarize_old_messages(msgs) + + assert result[-2] is recent_1 + assert result[-1] is recent_2 + + result_old_chars: int = _old_messages_chars(result, keep_recent_count=2) + assert result_old_chars < original_old_chars, ( + f"Summarized old portion ({result_old_chars} chars) must be smaller " + f"than original old portion ({original_old_chars} chars)" + ) + + @pytest.mark.asyncio + async def test_result_is_structured_messages(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model, keep_recent_count=2) + + msgs = [ + _make_system_request("You are a knowledgeable programming assistant."), + _make_user_request( + "Explain the key differences between Python, JavaScript, and Rust. " + "Include information about their type systems, memory management, " + "concurrency models, and typical use cases in the industry." + ), + _make_text_response( + "Python is a dynamically-typed, interpreted language with garbage collection. " + "It uses the GIL for thread safety, making true parallelism difficult. " + "Python excels in data science, machine learning, scripting, and web backends with Django/Flask. " + "JavaScript is also dynamically-typed and uses an event-loop concurrency model. " + "It runs in browsers and on servers via Node.js, dominating web development. " + "Rust is a statically-typed, compiled systems language with no garbage collector. " + "It uses ownership and borrowing for memory safety without runtime overhead. " + "Rust's async model uses futures and tokio for high-performance concurrent applications. " + "Rust is used for systems programming, WebAssembly, embedded systems, and performance-critical backends." + ), + _make_user_request( + "Can you go deeper into Rust's ownership model? Explain borrowing, lifetimes, " + "and how the borrow checker prevents data races at compile time." + ), + _make_text_response( + "Rust's ownership model ensures memory safety without a garbage collector. Each value " + "has exactly one owner, and when the owner goes out of scope, the value is dropped. " + "Borrowing allows references to values: immutable borrows (&T) allow multiple simultaneous " + "readers, while mutable borrows (&mut T) enforce exclusive access. Lifetimes are annotations " + "that tell the compiler how long references are valid, preventing dangling pointers. " + "The borrow checker enforces these rules at compile time: you cannot have a mutable reference " + "while immutable references exist, and all references must be valid for their declared lifetime. " + "This prevents data races because concurrent mutable access is statically disallowed." + ), + _make_user_request("What about async in Rust?"), + _make_text_response("Rust uses async/await with the tokio runtime for concurrent I/O."), + ] + original_old_chars: int = _old_messages_chars(msgs, keep_recent_count=2) + result = await mw._summarize_old_messages(msgs) + + for msg in result: + assert isinstance(msg, (ModelRequest, ModelResponse)) + + for msg in result: + assert hasattr(msg, 'parts') + assert len(msg.parts) > 0 + + result_old_chars: int = _old_messages_chars(result, keep_recent_count=2) + assert result_old_chars < original_old_chars, ( + f"Summarized old portion ({result_old_chars} chars) must be smaller " + f"than original old portion ({original_old_chars} chars)" + ) + + @pytest.mark.asyncio + async def test_conversation_with_tool_calls_summarized(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model, keep_recent_count=2) + + msgs = [ + _make_system_request("You are a data analysis assistant with access to database and search tools."), + _make_user_request( + "I need a full analysis of our Q3 revenue data. First search for the report, " + "then query the database for detailed breakdowns by region and product category." + ), + ModelResponse(parts=[ + TextPart(content=( + "I'll start by searching for the Q3 revenue report in your document system, " + "then I'll query the database for regional and category breakdowns." + )), + ToolCallPart(tool_name="document_search", tool_call_id="tc_1", + args='{"query": "Q3 2025 revenue report", "filters": {"type": "financial"}}'), + ]), + _make_tool_return_request("document_search", "tc_1", + "Found: Q3 Revenue Report - Total revenue $12.4M, up 15% YoY. " + "Key highlights: North America grew 22%, Europe grew 8%, APAC declined 3%. " + "SaaS subscriptions contributed 65% of total revenue."), + ModelResponse(parts=[ + TextPart(content=( + "I found the Q3 report showing $12.4M total revenue. Now let me query " + "the database for the detailed regional and product breakdowns." + )), + ToolCallPart(tool_name="database_query", tool_call_id="tc_2", + args='{"sql": "SELECT region, product_category, SUM(revenue) FROM sales WHERE quarter=3 GROUP BY region, product_category ORDER BY revenue DESC"}'), + ]), + _make_tool_return_request("database_query", "tc_2", + "Results: NA-SaaS: $5.2M, NA-Services: $1.8M, EU-SaaS: $2.1M, " + "EU-Services: $0.9M, APAC-SaaS: $1.5M, APAC-Services: $0.4M, Other: $0.5M"), + _make_text_response( + "Here's the complete Q3 revenue analysis: Total revenue reached $12.4M, a 15% " + "increase year-over-year. North America led with $7.0M (56% of total), driven by " + "strong SaaS growth at $5.2M. Europe contributed $3.0M with steady SaaS performance. " + "APAC saw a slight decline to $1.9M, primarily in services. SaaS subscriptions " + "remain the dominant revenue driver across all regions at 65% of total revenue." + ), + _make_user_request( + "Now compare this with Q2 numbers and identify the biggest changes." + ), + ModelResponse(parts=[ + TextPart(content="Let me pull the Q2 data for comparison."), + ToolCallPart(tool_name="database_query", tool_call_id="tc_3", + args='{"sql": "SELECT region, product_category, SUM(revenue) FROM sales WHERE quarter=2 GROUP BY region, product_category"}'), + ]), + _make_tool_return_request("database_query", "tc_3", + "Results: NA-SaaS: $4.5M, NA-Services: $1.6M, EU-SaaS: $2.0M, " + "EU-Services: $0.85M, APAC-SaaS: $1.6M, APAC-Services: $0.45M, Other: $0.4M"), + _make_text_response( + "Comparing Q3 vs Q2: The biggest improvement was NA-SaaS, growing from $4.5M to " + "$5.2M (+15.6%). NA-Services also grew from $1.6M to $1.8M (+12.5%). The most " + "concerning trend is APAC-SaaS declining from $1.6M to $1.5M (-6.3%) and APAC-Services " + "dropping from $0.45M to $0.4M (-11.1%). EU showed modest growth across both categories. " + "Overall, the quarter-over-quarter growth was driven almost entirely by North America." + ), + # Recent messages (last 2) + _make_user_request("What should we focus on for Q4?"), + _make_text_response("Focus on APAC recovery and maintaining NA SaaS momentum."), + ] + result = await mw._summarize_old_messages(msgs) + + for msg in result: + assert isinstance(msg, (ModelRequest, ModelResponse)) + + assert result[-1] is msgs[-1] + assert result[-2] is msgs[-2] + + first_msg = result[0] + assert isinstance(first_msg, ModelRequest) + assert any(isinstance(p, SystemPromptPart) for p in first_msg.parts) + + # Verify summarized portion has fewer or equal messages than original old portion + # (tool-call metadata must be preserved, so char reduction is not guaranteed) + result_old_chars: int = _old_messages_chars(result, keep_recent_count=2) + original_old_chars: int = _old_messages_chars(msgs, keep_recent_count=2) + assert result_old_chars <= original_old_chars, ( + f"Summarized old portion ({result_old_chars} chars) must not exceed " + f"original old portion ({original_old_chars} chars)" + ) + + @pytest.mark.asyncio + async def test_all_messages_within_keep_count_no_summarization(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model, keep_recent_count=10) + + msgs = [ + _make_system_request("sys"), + _make_user_request("q1"), + _make_text_response("a1"), + ] + result = await mw._summarize_old_messages(msgs) + # Nothing to summarize, should return same length + assert len(result) == len(msgs) + + +# =========================================================================== +# apply (orchestration, real objects) +# =========================================================================== + +class TestApply: + @pytest.mark.asyncio + async def test_no_action_when_context_not_exceeded(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + msgs = [ + _make_user_request("Hello"), + _make_text_response("Hi", input_tokens=100, output_tokens=20), + ] + result, ctx_full = await mw.apply(msgs) + assert ctx_full is False + assert len(result) == len(msgs) + + @pytest.mark.asyncio + async def test_returns_new_list_not_same_reference(self) -> None: + """Verify the aliasing fix: apply() must return a NEW list.""" + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + msgs = [ + _make_user_request("Hello"), + _make_text_response("Hi", input_tokens=100, output_tokens=20), + ] + result, _ = await mw.apply(msgs) + assert result is not msgs + # Mutating result must NOT affect original + result.clear() + assert len(msgs) == 2 + + @pytest.mark.asyncio + async def test_clear_extend_pattern_safe(self) -> None: + """Simulate the exact pattern used in _handle_model_response.""" + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + msgs = [ + _make_user_request("Hello"), + _make_text_response("Hi", input_tokens=100, output_tokens=20), + ] + original_len = len(msgs) + + managed_msgs, ctx_full = await mw.apply(msgs) + msgs.clear() + msgs.extend(managed_msgs) + + assert len(msgs) == original_len + assert ctx_full is False + + @pytest.mark.asyncio + async def test_empty_messages_not_exceeded(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + result, ctx_full = await mw.apply([]) + assert ctx_full is False + assert len(result) == 0 + + +# =========================================================================== +# Constructor defaults +# =========================================================================== + +class TestConstructorDefaults: + def test_default_keep_recent_count(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + assert mw.keep_recent_count == DEFAULT_KEEP_RECENT_COUNT + + def test_default_safety_margin_ratio(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + assert mw.safety_margin_ratio == 0.90 + + def test_custom_keep_recent_count(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model, keep_recent_count=10) + assert mw.keep_recent_count == 10 + + def test_custom_safety_margin(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model, safety_margin_ratio=0.75) + assert mw.safety_margin_ratio == 0.75 + + def test_model_stored(self) -> None: + model = _get_real_model() + mw = ContextManagementMiddleware(model=model) + assert mw.model is model
tests/smoke_tests/autonomous_agent/test_autonomous_agent.py+768 −0 added@@ -0,0 +1,768 @@ +""" +Comprehensive smoke tests for AutonomousAgent. + +Tests cover: +- Class inheritance from Agent +- Default storage and memory setup +- Filesystem toolkit operations +- Shell toolkit operations +- Real LLM integration with tools +- Workspace sandboxing + +All tests use REAL LLM requests - no mocking. +""" + +import os +import shutil +import tempfile +from pathlib import Path + +import pytest + +from upsonic import AutonomousAgent +from upsonic.agent import Agent +from upsonic.agent.autonomous_agent import ( + AutonomousFilesystemToolKit, + AutonomousShellToolKit, +) +from upsonic.storage import InMemoryStorage, Memory + +pytestmark = pytest.mark.timeout(180) + + + + +@pytest.fixture +def temp_workspace(): + """Create a temporary workspace directory for tests.""" + workspace = tempfile.mkdtemp(prefix="autonomous_agent_test_") + yield workspace + # Cleanup after test + if os.path.exists(workspace): + shutil.rmtree(workspace) + + +@pytest.fixture +def sample_python_file(temp_workspace): + """Create a sample Python file in the workspace.""" + file_path = Path(temp_workspace) / "sample.py" + content = '''"""Sample Python module for testing.""" + +def hello(name: str) -> str: + """Say hello to someone.""" + return f"Hello, {name}!" + + +def add(a: int, b: int) -> int: + """Add two numbers.""" + return a + b + + +if __name__ == "__main__": + print(hello("World")) +''' + file_path.write_text(content) + return str(file_path) + + +@pytest.fixture +def sample_json_file(temp_workspace): + """Create a sample JSON file in the workspace.""" + file_path = Path(temp_workspace) / "config.json" + content = '{"name": "test", "version": "1.0.0", "enabled": true}' + file_path.write_text(content) + return str(file_path) + + +# --------------------------------------------------------------------------- +# Test: Class Inheritance +# --------------------------------------------------------------------------- + +class TestAutonomousAgentInheritance: + """Tests verifying AutonomousAgent properly inherits from Agent.""" + + def test_inherits_from_agent(self): + """Verify AutonomousAgent is a subclass of Agent.""" + assert issubclass(AutonomousAgent, Agent) + + def test_mro_includes_agent(self): + """Verify Method Resolution Order includes Agent and BaseAgent.""" + mro_names = [cls.__name__ for cls in AutonomousAgent.__mro__] + assert "Agent" in mro_names + assert "BaseAgent" in mro_names + + def test_instance_is_agent(self, temp_workspace): + """Verify AutonomousAgent instance is also an Agent instance.""" + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + ) + assert isinstance(agent, Agent) + assert isinstance(agent, AutonomousAgent) + + +# --------------------------------------------------------------------------- +# Test: Initialization and Default Storage/Memory +# --------------------------------------------------------------------------- + +class TestAutonomousAgentInitialization: + """Tests for AutonomousAgent initialization with default storage and memory.""" + + def test_default_storage_created(self, temp_workspace): + """Verify InMemoryStorage is created by default.""" + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + ) + assert agent.autonomous_storage is not None + assert isinstance(agent.autonomous_storage, InMemoryStorage) + + def test_default_memory_created(self, temp_workspace): + """Verify Memory is created by default with the storage.""" + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + ) + assert agent.autonomous_memory is not None + assert isinstance(agent.autonomous_memory, Memory) + assert agent.memory is not None + + def test_custom_storage_used(self, temp_workspace): + """Verify custom storage is used when provided.""" + custom_storage = InMemoryStorage() + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + storage=custom_storage, + ) + assert agent.autonomous_storage is custom_storage + + def test_custom_memory_used(self, temp_workspace): + """Verify custom memory is used when provided.""" + custom_storage = InMemoryStorage() + custom_memory = Memory(storage=custom_storage) + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + memory=custom_memory, + ) + assert agent.memory is custom_memory + + def test_workspace_resolved(self, temp_workspace): + """Verify workspace path is properly resolved.""" + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + ) + assert agent.autonomous_workspace == Path(temp_workspace).resolve() + + def test_default_workspace_is_cwd(self): + """Verify default workspace is current working directory.""" + agent = AutonomousAgent(model="openai/gpt-4o-mini") + assert agent.autonomous_workspace == Path.cwd().resolve() + + def test_toolkits_created(self, temp_workspace): + """Verify filesystem and shell toolkits are created.""" + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + ) + assert agent.filesystem_toolkit is not None + assert agent.shell_toolkit is not None + assert isinstance(agent.filesystem_toolkit, AutonomousFilesystemToolKit) + assert isinstance(agent.shell_toolkit, AutonomousShellToolKit) + + def test_toolkits_disabled(self, temp_workspace): + """Verify toolkits can be disabled.""" + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + enable_filesystem=False, + enable_shell=False, + ) + assert agent.filesystem_toolkit is None + assert agent.shell_toolkit is None + + def test_tools_registered(self, temp_workspace): + """Verify toolkits are registered as tools.""" + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + ) + # The toolkits should be in the tools list + assert len(agent.tools) >= 2 + + def test_agent_properties(self, temp_workspace): + """Verify agent properties are accessible.""" + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + name="TestAgent", + ) + assert agent.name == "TestAgent" + assert agent.agent_id is not None + assert agent.model is not None + + +# --------------------------------------------------------------------------- +# Test: Filesystem Toolkit Operations (Direct) +# --------------------------------------------------------------------------- + +class TestFilesystemToolkitDirect: + """Tests for filesystem toolkit operations called directly.""" + + def test_read_file(self, temp_workspace, sample_python_file): + """Test reading a file directly.""" + toolkit = AutonomousFilesystemToolKit(workspace=temp_workspace) + result = toolkit.read_file("sample.py") + + assert "def hello" in result + assert "def add" in result + assert "1|" in result # Line numbers + + def test_read_file_with_pagination(self, temp_workspace, sample_python_file): + """Test reading a file with offset and limit.""" + toolkit = AutonomousFilesystemToolKit(workspace=temp_workspace) + result = toolkit.read_file("sample.py", offset=0, limit=5) + + assert "Sample Python module" in result + assert "Showing lines 1-5" in result + + def test_write_file(self, temp_workspace): + """Test writing a new file.""" + toolkit = AutonomousFilesystemToolKit(workspace=temp_workspace) + content = "print('Hello, World!')" + result = toolkit.write_file("new_file.py", content) + + assert "✅" in result + assert "Successfully wrote" in result + + # Verify file exists + file_path = Path(temp_workspace) / "new_file.py" + assert file_path.exists() + assert file_path.read_text() == content + + def test_edit_file_requires_read(self, temp_workspace, sample_python_file): + """Test that edit_file requires prior read_file call.""" + toolkit = AutonomousFilesystemToolKit(workspace=temp_workspace) + + # Try to edit without reading first + result = toolkit.edit_file( + "sample.py", + "def hello", + "def greet" + ) + + assert "❌" in result + assert "must call read_file" in result + + def test_edit_file_after_read(self, temp_workspace, sample_python_file): + """Test editing a file after reading it.""" + toolkit = AutonomousFilesystemToolKit(workspace=temp_workspace) + + # Read first + toolkit.read_file("sample.py") + + # Then edit + result = toolkit.edit_file( + "sample.py", + "def hello(name: str) -> str:", + "def greet(name: str) -> str:" + ) + + assert "✅" in result + assert "Successfully edited" in result + + # Verify change + content = (Path(temp_workspace) / "sample.py").read_text() + assert "def greet" in content + assert "def hello" not in content + + def test_list_files(self, temp_workspace, sample_python_file, sample_json_file): + """Test listing directory contents.""" + toolkit = AutonomousFilesystemToolKit(workspace=temp_workspace) + result = toolkit.list_files(".") + + assert "sample.py" in result + assert "config.json" in result + + def test_list_files_recursive(self, temp_workspace): + """Test recursive directory listing.""" + # Create nested structure + subdir = Path(temp_workspace) / "src" + subdir.mkdir() + (subdir / "main.py").write_text("# Main") + + toolkit = AutonomousFilesystemToolKit(workspace=temp_workspace) + result = toolkit.list_files(".", recursive=True) + + assert "[DIR]" in result + assert "src" in result + assert "main.py" in result + + def test_search_files(self, temp_workspace, sample_python_file, sample_json_file): + """Test searching files by pattern.""" + toolkit = AutonomousFilesystemToolKit(workspace=temp_workspace) + result = toolkit.search_files("*.py") + + assert "sample.py" in result + assert "config.json" not in result + + def test_grep_files(self, temp_workspace, sample_python_file): + """Test searching text within files.""" + toolkit = AutonomousFilesystemToolKit(workspace=temp_workspace) + result = toolkit.grep_files("def", file_pattern="*.py") + + assert "sample.py" in result + assert "def hello" in result or "hello" in result + + def test_file_info(self, temp_workspace, sample_python_file): + """Test getting file information.""" + toolkit = AutonomousFilesystemToolKit(workspace=temp_workspace) + result = toolkit.file_info("sample.py") + + assert "sample.py" in result + assert "Type: File" in result + assert "Size:" in result + + def test_create_directory(self, temp_workspace): + """Test creating a directory.""" + toolkit = AutonomousFilesystemToolKit(workspace=temp_workspace) + result = toolkit.create_directory("new_dir/sub_dir") + + assert "✅" in result + assert (Path(temp_workspace) / "new_dir" / "sub_dir").exists() + + def test_copy_file(self, temp_workspace, sample_python_file): + """Test copying a file.""" + toolkit = AutonomousFilesystemToolKit(workspace=temp_workspace) + result = toolkit.copy_file("sample.py", "sample_backup.py") + + assert "✅" in result + assert (Path(temp_workspace) / "sample_backup.py").exists() + + def test_move_file(self, temp_workspace, sample_python_file): + """Test moving a file.""" + toolkit = AutonomousFilesystemToolKit(workspace=temp_workspace) + result = toolkit.move_file("sample.py", "moved_sample.py") + + assert "✅" in result + assert not (Path(temp_workspace) / "sample.py").exists() + assert (Path(temp_workspace) / "moved_sample.py").exists() + + def test_delete_file(self, temp_workspace, sample_python_file): + """Test deleting a file.""" + toolkit = AutonomousFilesystemToolKit(workspace=temp_workspace) + result = toolkit.delete_file("sample.py") + + assert "✅" in result + assert not (Path(temp_workspace) / "sample.py").exists() + + def test_workspace_sandboxing(self, temp_workspace): + """Test that paths outside workspace are blocked.""" + toolkit = AutonomousFilesystemToolKit(workspace=temp_workspace) + + # Try to read file outside workspace + with pytest.raises(ValueError) as exc_info: + toolkit._validate_path("/etc/passwd") + + assert "outside workspace" in str(exc_info.value) + + +# --------------------------------------------------------------------------- +# Test: Shell Toolkit Operations (Direct) +# --------------------------------------------------------------------------- + +class TestShellToolkitDirect: + """Tests for shell toolkit operations called directly.""" + + def test_run_echo_command(self, temp_workspace): + """Test running a simple echo command.""" + toolkit = AutonomousShellToolKit(workspace=temp_workspace) + result = toolkit.run_command("echo 'Hello, World!'") + + assert "Hello, World!" in result + assert "Exit code: 0" in result + + def test_run_pwd_command(self, temp_workspace): + """Test running pwd command in workspace.""" + toolkit = AutonomousShellToolKit(workspace=temp_workspace) + result = toolkit.run_command("pwd") + + assert temp_workspace in result + assert "Exit code: 0" in result + + def test_run_ls_command(self, temp_workspace, sample_python_file): + """Test running ls command.""" + toolkit = AutonomousShellToolKit(workspace=temp_workspace) + result = toolkit.run_command("ls -la") + + assert "sample.py" in result + assert "Exit code: 0" in result + + def test_run_python_version(self, temp_workspace): + """Test running python --version.""" + toolkit = AutonomousShellToolKit(workspace=temp_workspace) + result = toolkit.run_command("python3 --version") + + assert "Python" in result + assert "Exit code: 0" in result + + def test_run_python_code(self, temp_workspace): + """Test running Python code directly.""" + toolkit = AutonomousShellToolKit(workspace=temp_workspace) + result = toolkit.run_python("print(2 + 2)") + + assert "4" in result + assert "Exit code: 0" in result + + def test_check_command_exists(self, temp_workspace): + """Test checking if a command exists.""" + toolkit = AutonomousShellToolKit(workspace=temp_workspace) + + # Check for a command that should exist + result = toolkit.check_command_exists("python3") + assert "✅" in result or "is available" in result + + def test_check_command_not_exists(self, temp_workspace): + """Test checking for a non-existent command.""" + toolkit = AutonomousShellToolKit(workspace=temp_workspace) + result = toolkit.check_command_exists("nonexistent_command_xyz123") + + assert "❌" in result or "not available" in result + + def test_command_timeout(self, temp_workspace): + """Test command timeout handling.""" + toolkit = AutonomousShellToolKit( + workspace=temp_workspace, + default_timeout=1, + ) + result = toolkit.run_command("sleep 5", timeout=1) + + assert "timed out" in result.lower() + + def test_blocked_command(self, temp_workspace): + """Test that dangerous commands are blocked.""" + toolkit = AutonomousShellToolKit(workspace=temp_workspace) + result = toolkit.run_command("rm -rf /") + + assert "blocked" in result.lower() or "error" in result.lower() + + def test_environment_variables(self, temp_workspace): + """Test passing environment variables.""" + toolkit = AutonomousShellToolKit(workspace=temp_workspace) + result = toolkit.run_command( + "echo $MY_TEST_VAR", + env={"MY_TEST_VAR": "test_value_123"} + ) + + assert "test_value_123" in result + + +# --------------------------------------------------------------------------- +# Test: Real LLM Integration +# --------------------------------------------------------------------------- + +class TestAutonomousAgentWithLLM: + """Tests for AutonomousAgent with real LLM requests.""" + + def test_simple_task_without_tools(self, temp_workspace): + """Test simple task that doesn't require tools.""" + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + ) + result = agent.do("What is 2 + 2? Reply with just the number.") + + assert result is not None + assert "4" in result + + def test_read_file_via_llm(self, temp_workspace, sample_python_file): + """Test LLM using read_file tool.""" + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + ) + result = agent.do("Read the sample.py file and tell me what functions are defined in it.") + + assert result is not None + # The LLM should mention the functions + assert "hello" in result.lower() or "add" in result.lower() + + def test_write_file_via_llm(self, temp_workspace): + """Test LLM using write_file tool.""" + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + ) + result = agent.do( + "Create a file called 'greeting.txt' with the content 'Hello from LLM!'. " + "Confirm when done." + ) + + assert result is not None + # Verify file was created + greeting_file = Path(temp_workspace) / "greeting.txt" + assert greeting_file.exists() + content = greeting_file.read_text() + assert "Hello" in content or "hello" in content.lower() + + def test_list_files_via_llm(self, temp_workspace, sample_python_file, sample_json_file): + """Test LLM using list_files tool.""" + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + ) + result = agent.do("List all files in the current directory and tell me what file types you see.") + + assert result is not None + # Should mention Python and/or JSON files + assert "py" in result.lower() or "python" in result.lower() or "json" in result.lower() + + def test_run_command_via_llm(self, temp_workspace): + """Test LLM using run_command tool.""" + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + ) + result = agent.do("Run 'echo Hello from shell' and tell me what the output was.") + + assert result is not None + assert "Hello" in result or "hello" in result.lower() + + def test_python_execution_via_llm(self, temp_workspace): + """Test LLM executing Python code.""" + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + ) + result = agent.do( + "Use the run_python tool to calculate 15 * 7 and tell me the result." + ) + + assert result is not None + assert "105" in result + + def test_edit_file_via_llm(self, temp_workspace, sample_python_file): + """Test LLM using edit_file tool (requires read first).""" + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + ) + result = agent.do( + "Read sample.py, then edit it to change the function 'hello' to 'greet'. " + "Keep the same functionality. Confirm the change was made." + ) + + assert result is not None + # Verify the change was made + content = (Path(temp_workspace) / "sample.py").read_text() + assert "greet" in content.lower() + + def test_complex_multi_tool_task(self, temp_workspace): + """Test LLM using multiple tools in sequence.""" + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + ) + result = agent.do( + "1. Create a directory called 'project'\n" + "2. Inside 'project', create a file called 'main.py' with a simple hello world program\n" + "3. List the contents of the 'project' directory\n" + "4. Tell me what you created" + ) + + assert result is not None + # Verify directory and file were created + project_dir = Path(temp_workspace) / "project" + main_file = project_dir / "main.py" + assert project_dir.exists() + assert main_file.exists() + + +# --------------------------------------------------------------------------- +# Test: Async Operations +# --------------------------------------------------------------------------- + +class TestAutonomousAgentAsync: + """Tests for async operations.""" + + @pytest.mark.asyncio + async def test_async_read_file(self, temp_workspace, sample_python_file): + """Test async read_file operation.""" + toolkit = AutonomousFilesystemToolKit(workspace=temp_workspace) + result = await toolkit.aread_file("sample.py") + + assert "def hello" in result + + @pytest.mark.asyncio + async def test_async_write_file(self, temp_workspace): + """Test async write_file operation.""" + toolkit = AutonomousFilesystemToolKit(workspace=temp_workspace) + result = await toolkit.awrite_file("async_test.txt", "Async content") + + assert "✅" in result + assert (Path(temp_workspace) / "async_test.txt").exists() + + @pytest.mark.asyncio + async def test_async_run_command(self, temp_workspace): + """Test async run_command operation.""" + toolkit = AutonomousShellToolKit(workspace=temp_workspace) + result = await toolkit.arun_command("echo 'Async test'") + + assert "Async test" in result + + @pytest.mark.asyncio + async def test_async_agent_do(self, temp_workspace): + """Test async agent.do_async operation with LLM.""" + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + ) + result = await agent.do_async("What is 3 + 3? Reply with just the number.") + + assert result is not None + assert "6" in result + + +# --------------------------------------------------------------------------- +# Test: Agent Methods Inherited from Agent +# --------------------------------------------------------------------------- + +class TestInheritedAgentMethods: + """Tests verifying inherited Agent methods work correctly.""" + + def test_print_do_method(self, temp_workspace): + """Test print_do method inherited from Agent.""" + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + ) + result = agent.print_do("Say hello") + + assert result is not None + + def test_agent_id_property(self, temp_workspace): + """Test agent_id property inherited from Agent.""" + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + ) + assert agent.agent_id is not None + assert isinstance(agent.agent_id, str) + + def test_get_cache_stats(self, temp_workspace): + """Test get_cache_stats method inherited from Agent.""" + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + ) + stats = agent.get_cache_stats() + + assert stats is not None + assert isinstance(stats, dict) + + def test_add_tools(self, temp_workspace): + """Test add_tools method inherited from Agent.""" + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + ) + initial_tool_count = len(agent.tools) + + def custom_tool(x: int) -> int: + """Multiply by 2.""" + return x * 2 + + agent.add_tools(custom_tool) + assert len(agent.tools) > initial_tool_count + + def test_repr(self, temp_workspace): + """Test __repr__ method.""" + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + name="TestAgent", + ) + repr_str = repr(agent) + + assert "AutonomousAgent" in repr_str + assert "TestAgent" in repr_str + assert temp_workspace in repr_str + + +# --------------------------------------------------------------------------- +# Test: Memory Integration +# --------------------------------------------------------------------------- + +class TestMemoryIntegration: + """Tests for memory integration with AutonomousAgent.""" + + def test_memory_session_persistence(self, temp_workspace): + """Test that memory persists across tasks.""" + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + full_session_memory=True, + ) + + # First task + agent.do("Remember this number: 42") + + # Second task should recall + result = agent.do("What number did I ask you to remember?") + + assert result is not None + assert "42" in result + + def test_session_id_accessible(self, temp_workspace): + """Test that session_id is accessible.""" + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + session_id="test_session_123", + ) + + assert agent.session_id == "test_session_123" + + def test_user_id_accessible(self, temp_workspace): + """Test that user_id is accessible.""" + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + user_id="test_user_456", + ) + + assert agent.user_id == "test_user_456" + + +# --------------------------------------------------------------------------- +# Test: Reset Tracking +# --------------------------------------------------------------------------- + +class TestFilesystemTracking: + """Tests for filesystem read tracking reset.""" + + def test_reset_filesystem_tracking(self, temp_workspace, sample_python_file): + """Test reset_filesystem_tracking method.""" + agent = AutonomousAgent( + model="openai/gpt-4o-mini", + workspace=temp_workspace, + ) + + # Read a file through the toolkit + agent.filesystem_toolkit.read_file("sample.py") + + # Verify it's tracked (use resolve to handle macOS symlinks like /var -> /private/var) + read_files = agent.filesystem_toolkit.get_read_files() + assert len(read_files) > 0 + # Check that sample.py is in the tracked files (comparing resolved paths) + expected_path = Path(temp_workspace).resolve() / "sample.py" + tracked_paths = [Path(f).resolve() for f in read_files] + assert expected_path in tracked_paths + + # Reset tracking + agent.reset_filesystem_tracking() + + # Verify it's cleared + assert len(agent.filesystem_toolkit.get_read_files()) == 0
Vulnerability mechanics
Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
5- github.com/advisories/GHSA-cw73-5f7h-m4gvghsaADVISORY
- nvd.nist.gov/vuln/detail/CVE-2026-30625ghsaADVISORY
- github.com/Upsonic/Upsonic/commit/855053fce0662227d9246268ff4a0844b481a305nvdWEB
- www.ox.security/blog/mcp-supply-chain-advisory-rce-vulnerabilities-across-the-ai-ecosystemghsaWEB
- www.ox.security/blog/mcp-supply-chain-advisory-rce-vulnerabilities-across-the-ai-ecosystem/nvd
News mentions
0No linked articles in our index yet.