VYPR
High severity8.3NVD Advisory· Published Mar 27, 2026· Updated Apr 22, 2026

CVE-2026-33980

CVE-2026-33980

Description

Azure Data Explorer MCP Server is a Model Context Protocol (MCP) server that enables AI assistants to execute KQL queries and explore Azure Data Explorer (ADX/Kusto) databases through standardized interfaces. Versions up to and including 0.1.1 contain KQL (Kusto Query Language) injection vulnerabilities in three MCP tool handlers: get_table_schema, sample_table_data, and get_table_details. The table_name parameter is interpolated directly into KQL queries via f-strings without any validation or sanitization, allowing an attacker (or a prompt-injected AI agent) to execute arbitrary KQL queries against the Azure Data Explorer cluster. Commit 0abe0ee55279e111281076393e5e966335fffd30 patches the issue.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
adx-mcp-serverPyPI
<= 1.1.0

Patches

1
0abe0ee55279

Merge commit from fork

https://github.com/pab1it0/adx-mcp-serverPavel ShklovskyMar 25, 2026via ghsa
2 files changed · +200 1
  • src/adx_mcp_server/server.py+30 0 modified
    @@ -5,6 +5,7 @@
     """
     
     import os
    +import re
     import sys
     from typing import Any, Dict, List, Optional
     from dataclasses import dataclass
    @@ -170,6 +171,31 @@ def format_query_results(result_set) -> List[Dict[str, Any]]:
             )
             raise
     
    +_TABLE_NAME_PATTERN = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*(\.[a-zA-Z_][a-zA-Z0-9_]*)*$')
    +
    +def validate_table_name(table_name: str) -> str:
    +    """Validate a KQL table name to prevent injection attacks.
    +
    +    Allows simple identifiers (my_table) and dot-qualified names (database.table).
    +    Rejects any characters that could enable KQL injection.
    +    """
    +    if not table_name or not table_name.strip():
    +        raise ValueError("Table name cannot be empty")
    +    table_name = table_name.strip()
    +    if not _TABLE_NAME_PATTERN.match(table_name):
    +        raise ValueError(
    +            f"Invalid table name: '{table_name}'. "
    +            "Table names must contain only letters, digits, underscores, "
    +            "and dots (for qualified names like 'database.table')."
    +        )
    +    return table_name
    +
    +def validate_sample_size(sample_size: int) -> int:
    +    """Validate sample_size is a positive integer."""
    +    if not isinstance(sample_size, int) or sample_size <= 0:
    +        raise ValueError(f"sample_size must be a positive integer, got: {sample_size}")
    +    return sample_size
    +
     @mcp.tool(description="Executes a Kusto Query Language (KQL) query against the configured Azure Data Explorer database and returns the results as a list of dictionaries.")
     async def execute_query(query: str) -> List[Dict[str, Any]]:
         """Execute a KQL query against the configured ADX database."""
    @@ -217,6 +243,7 @@ async def list_tables() -> List[Dict[str, Any]]:
     @mcp.tool(description="Retrieves the schema information for a specified table in the Azure Data Explorer database, including column names, data types, and other schema-related metadata.")
     async def get_table_schema(table_name: str) -> List[Dict[str, Any]]:
         """Get schema information for a specific table."""
    +    table_name = validate_table_name(table_name)
         logger.info("Getting table schema", table_name=table_name, database=config.database)
     
         if not config.cluster_url or not config.database:
    @@ -237,6 +264,8 @@ async def get_table_schema(table_name: str) -> List[Dict[str, Any]]:
     @mcp.tool(description="Retrieves a random sample of rows from the specified table in the Azure Data Explorer database. The sample_size parameter controls how many rows to return (default: 10).")
     async def sample_table_data(table_name: str, sample_size: int = 10) -> List[Dict[str, Any]]:
         """Get sample data from a table."""
    +    table_name = validate_table_name(table_name)
    +    sample_size = validate_sample_size(sample_size)
         logger.info("Sampling table data", table_name=table_name, sample_size=sample_size, database=config.database)
     
         if not config.cluster_url or not config.database:
    @@ -257,6 +286,7 @@ async def sample_table_data(table_name: str, sample_size: int = 10) -> List[Dict
     @mcp.tool(description="Retrieves table details including TotalRowCount, HotExtentSize")
     async def get_table_details(table_name: str) -> List[Dict[str, Any]]:
         """Get detailed statistics and metadata for a table."""
    +    table_name = validate_table_name(table_name)
         logger.info("Getting table details", table_name=table_name, database=config.database)
     
         if not config.cluster_url or not config.database:
    
  • tests/test_all_tools.py+170 1 modified
    @@ -6,7 +6,7 @@
     import pytest
     from unittest.mock import patch, MagicMock
     
    -from adx_mcp_server.server import config
    +from adx_mcp_server.server import config, validate_table_name, validate_sample_size
     
     
     class TestListTablesTool:
    @@ -352,3 +352,172 @@ async def test_get_table_details_error(self):
             finally:
                 config.cluster_url = original_url
                 config.database = original_db
    +
    +
    +class TestValidateTableName:
    +    """Tests for table name validation to prevent KQL injection."""
    +
    +    def test_simple_table_name(self):
    +        assert validate_table_name("my_table") == "my_table"
    +
    +    def test_qualified_table_name(self):
    +        assert validate_table_name("database.table") == "database.table"
    +
    +    def test_multi_qualified_name(self):
    +        assert validate_table_name("db.schema.table") == "db.schema.table"
    +
    +    def test_underscore_prefix(self):
    +        assert validate_table_name("_private_table") == "_private_table"
    +
    +    def test_alphanumeric(self):
    +        assert validate_table_name("table123") == "table123"
    +
    +    def test_strips_whitespace(self):
    +        assert validate_table_name("  my_table  ") == "my_table"
    +
    +    def test_pipe_injection(self):
    +        with pytest.raises(ValueError, match="Invalid table name"):
    +            validate_table_name("sensitive_data | project Secret | take 100 //")
    +
    +    def test_newline_injection(self):
    +        with pytest.raises(ValueError, match="Invalid table name"):
    +            validate_table_name("users\n.drop table critical_data")
    +
    +    def test_semicolon_injection(self):
    +        with pytest.raises(ValueError, match="Invalid table name"):
    +            validate_table_name("table; .drop table other")
    +
    +    def test_bracket_notation(self):
    +        with pytest.raises(ValueError, match="Invalid table name"):
    +            validate_table_name("['injected query']")
    +
    +    def test_space_in_name(self):
    +        with pytest.raises(ValueError, match="Invalid table name"):
    +            validate_table_name("table name with spaces")
    +
    +    def test_empty_string(self):
    +        with pytest.raises(ValueError, match="cannot be empty"):
    +            validate_table_name("")
    +
    +    def test_whitespace_only(self):
    +        with pytest.raises(ValueError, match="cannot be empty"):
    +            validate_table_name("   ")
    +
    +    def test_starts_with_digit(self):
    +        with pytest.raises(ValueError, match="Invalid table name"):
    +            validate_table_name("123table")
    +
    +    def test_hyphen_in_name(self):
    +        with pytest.raises(ValueError, match="Invalid table name"):
    +            validate_table_name("my-table")
    +
    +    def test_slash_comment_injection(self):
    +        with pytest.raises(ValueError, match="Invalid table name"):
    +            validate_table_name("table // comment")
    +
    +    def test_trailing_dot(self):
    +        with pytest.raises(ValueError, match="Invalid table name"):
    +            validate_table_name("database.")
    +
    +    def test_leading_dot(self):
    +        with pytest.raises(ValueError, match="Invalid table name"):
    +            validate_table_name(".table")
    +
    +
    +class TestValidateSampleSize:
    +    """Tests for sample_size validation."""
    +
    +    def test_valid_sample_size(self):
    +        assert validate_sample_size(10) == 10
    +
    +    def test_zero(self):
    +        with pytest.raises(ValueError, match="sample_size must be a positive integer"):
    +            validate_sample_size(0)
    +
    +    def test_negative(self):
    +        with pytest.raises(ValueError, match="sample_size must be a positive integer"):
    +            validate_sample_size(-5)
    +
    +
    +class TestTableNameInjectionPrevention:
    +    """Integration tests proving tool handlers reject KQL injection payloads."""
    +
    +    @pytest.mark.asyncio
    +    async def test_get_table_schema_rejects_injection(self):
    +        original_url = config.cluster_url
    +        original_db = config.database
    +        config.cluster_url = "https://test.kusto.windows.net"
    +        config.database = "testdb"
    +
    +        try:
    +            from adx_mcp_server import server
    +            fn = server.get_table_schema
    +
    +            with pytest.raises(ValueError, match="Invalid table name"):
    +                if hasattr(fn, 'fn'):
    +                    await fn.fn("sensitive_data | project Secret | take 100 //")
    +                else:
    +                    await fn("sensitive_data | project Secret | take 100 //")
    +        finally:
    +            config.cluster_url = original_url
    +            config.database = original_db
    +
    +    @pytest.mark.asyncio
    +    async def test_sample_table_data_rejects_injection(self):
    +        original_url = config.cluster_url
    +        original_db = config.database
    +        config.cluster_url = "https://test.kusto.windows.net"
    +        config.database = "testdb"
    +
    +        try:
    +            from adx_mcp_server import server
    +            fn = server.sample_table_data
    +
    +            with pytest.raises(ValueError, match="Invalid table name"):
    +                if hasattr(fn, 'fn'):
    +                    await fn.fn("data | take 100 //", 10)
    +                else:
    +                    await fn("data | take 100 //", 10)
    +        finally:
    +            config.cluster_url = original_url
    +            config.database = original_db
    +
    +    @pytest.mark.asyncio
    +    async def test_sample_table_data_rejects_invalid_sample_size(self):
    +        original_url = config.cluster_url
    +        original_db = config.database
    +        config.cluster_url = "https://test.kusto.windows.net"
    +        config.database = "testdb"
    +
    +        try:
    +            from adx_mcp_server import server
    +            fn = server.sample_table_data
    +
    +            with pytest.raises(ValueError, match="sample_size must be a positive integer"):
    +                if hasattr(fn, 'fn'):
    +                    await fn.fn("valid_table", -1)
    +                else:
    +                    await fn("valid_table", -1)
    +        finally:
    +            config.cluster_url = original_url
    +            config.database = original_db
    +
    +    @pytest.mark.asyncio
    +    async def test_get_table_details_rejects_injection(self):
    +        original_url = config.cluster_url
    +        original_db = config.database
    +        config.cluster_url = "https://test.kusto.windows.net"
    +        config.database = "testdb"
    +
    +        try:
    +            from adx_mcp_server import server
    +            fn = server.get_table_details
    +
    +            with pytest.raises(ValueError, match="Invalid table name"):
    +                if hasattr(fn, 'fn'):
    +                    await fn.fn("users details\n.drop table critical_data")
    +                else:
    +                    await fn("users details\n.drop table critical_data")
    +        finally:
    +            config.cluster_url = original_url
    +            config.database = original_db
    

Vulnerability mechanics

Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

4

News mentions

0

No linked articles in our index yet.