CVE-2026-32311
Description
Flowsint is an open-source OSINT graph exploration tool designed for cybersecurity investigation, transparency, and verification. Flowsint allows a user to create investigations, which are used to manage sketches and analyses. Sketches have controllable graphs, which are comprised of nodes and relationships. The sketches contain information on an OSINT target (usernames, websites, etc) within these nodes and relationships. The nodes can have automated processes execute on them called 'transformers'. A remote attacker can create a sketch, then trigger the 'org_to_asn' transform on an organization node to execute arbitrary OS commands as root on the host machine via shell metacharacters and a docker container escape. Commit b52cbbb904c8013b74308d58af88bc7dbb1b055c appears to remove the code that causes this issue.
Affected products
1Patches
1b52cbbb904c8feat(app): add key requirement for org_to_asn
1 file changed · +74 −65
flowsint-transforms/src/flowsint_transforms/organization/to_asn.py+74 −65 modified@@ -1,10 +1,11 @@ -import json -import subprocess -from typing import List, Dict, Any, Union +import os +from typing import List, Dict, Any, Union, Optional from flowsint_core.core.transform_base import Transform +from flowsint_core.core.graph_db import Neo4jConnection from flowsint_types.organization import Organization from flowsint_types.asn import ASN from flowsint_core.core.logger import Logger +from tools.network.asnmap import AsnmapTool class OrgToAsnTransform(Transform): @@ -14,6 +15,39 @@ class OrgToAsnTransform(Transform): InputType = List[Organization] OutputType = List[ASN] + def __init__( + self, + sketch_id: Optional[str] = None, + scan_id: Optional[str] = None, + neo4j_conn: Optional[Neo4jConnection] = None, + vault=None, + params: Optional[Dict[str, Any]] = None, + ): + super().__init__( + sketch_id=sketch_id, + scan_id=scan_id, + neo4j_conn=neo4j_conn, + params_schema=self.get_params_schema(), + vault=vault, + params=params, + ) + + @classmethod + def required_params(cls) -> bool: + return True + + @classmethod + def get_params_schema(cls) -> List[Dict[str, Any]]: + """Declare required parameters for this transform""" + return [ + { + "name": "PDCP_API_KEY", + "type": "vaultSecret", + "description": "The ProjectDiscovery Cloud Platform API key for asnmap.", + "required": True, + }, + ] + @classmethod def name(cls) -> str: return "org_to_asn" @@ -42,74 +76,49 @@ def preprocess(self, data: Union[List[str], List[dict], InputType]) -> InputType async def scan(self, data: InputType) -> OutputType: """Find ASN information for organizations using asnmap.""" - asns: OutputType = [] + results: OutputType = [] + asnmap = AsnmapTool() + + # Retrieve API key from vault or environment + api_key = self.get_secret("PDCP_API_KEY", os.getenv("PDCP_API_KEY")) for org in data: - asn_data = self.__get_asn_from_asnmap(org.name) - if asn_data: - asns.append( - ASN( - number=int(asn_data["as_number"].lstrip("AS")), - name=asn_data["as_name"], - country=asn_data["as_country"], - cidrs=[], - ) - ) - else: - Logger.info( - self.sketch_id, {"message": f"No ASN found for org {org.name}."} - ) - return asns - - def __get_asn_from_asnmap(self, name: str) -> Dict[str, Any]: - try: - # Properly run the shell pipeline using shell=True - command = f"echo {name} | asnmap -silent -json | jq -s" - result = subprocess.run( - command, shell=True, capture_output=True, text=True, timeout=60 - ) - if not result.stdout.strip(): - return None try: - # Parse the JSON array - data_array = json.loads(result.stdout) - if not data_array: - return None - - combined_data = { - "as_range": [], - "as_name": None, - "as_country": None, - "as_number": None, - } - - for data in data_array: - if "as_range" in data: - combined_data["as_range"].extend(data["as_range"]) - if data.get("as_name") and not combined_data["as_name"]: - combined_data["as_name"] = data["as_name"] - if data.get("as_country") and not combined_data["as_country"]: - combined_data["as_country"] = data["as_country"] - if data.get("as_number") and not combined_data["as_number"]: - combined_data["as_number"] = data["as_number"] - - return combined_data if combined_data["as_number"] else None - - except json.JSONDecodeError as e: + # Use asnmap tool to get ASN info, passing the API key + asn_data = asnmap.launch(org.name, type="org", api_key=api_key) + if asn_data and "as_number" in asn_data: + # Parse ASN number from string like "AS16276" to integer 16276 + asn_string = asn_data["as_number"] + asn_number = int(asn_string.replace("AS", "").replace("as", "")) + # Create ASN object with correct field mapping + asn = ASN( + number=asn_number, + name=asn_data.get("as_name", ""), + country=asn_data.get("as_country", ""), + description=asn_data.get("as_name", ""), + ) + results.append(asn) + Logger.info( + self.sketch_id, + { + "message": f"[ASNMAP] Found AS{asn.number} ({asn.name}) for organization {org.name}" + }, + ) + else: + Logger.warn( + self.sketch_id, + { + "message": f"[ASNMAP] No ASN data or missing 'as_number' field for organization {org.name}. Data keys: {list(asn_data.keys()) if asn_data else 'None'}" + }, + ) + except Exception as e: Logger.error( self.sketch_id, - { - "message": f"An error occurred while parsing the JSON output from asnmap: {str(e)}" - }, + {"message": f"Error getting ASN for organization {org.name}: {e}"}, ) - return None - - except Exception as e: - Logger.error( - self.sketch_id, - {"message": f"An error occurred while running asnmap: {str(e)}"}, - ) - return None + continue + + return results def postprocess(self, results: OutputType, original_input: InputType) -> OutputType: # Create Neo4j relationships between organizations and their corresponding ASNs
Vulnerability mechanics
Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
2News mentions
0No linked articles in our index yet.