VYPR
High severityNVD Advisory· Published Mar 6, 2026· Updated Mar 6, 2026

CocoIndex Doris target connector didn't verify table name when constructing ALTER TABLE statements

CVE-2026-28438

Description

CocoIndex is a data transformation framework for AI. Prior to version 0.3.34, the Doris target connector didn't verify the configured table name before creating some SQL statements (ALTER TABLE). So, in the application code, if the table name is provided by an untrusted upstream, it expose vulnerability to SQL injection when target schema change. This issue has been patched in version 0.3.34.

AI Insight

LLM-synthesized narrative grounded in this CVE's description and references.

CocoIndex versions before 0.3.34 had a SQL injection vulnerability in the Doris target connector due to insufficient validation of table names used in ALTER TABLE statements.

Root

Cause

CocoIndex, a data transformation framework for AI, prior to version 0.3.34, contained a SQL injection vulnerability in its Doris target connector [1][2]. The connector did not verify the configured table name before incorporating it into SQL ALTER TABLE statements [1]. This lack of sanitization allowed an attacker who could control or influence the table name to inject arbitrary SQL commands [1][2].

Attack

Surface and Exploitation

Exploitation requires that the table name is provided by an untrusted upstream source within the application code [1][2]. If a user or system configures a CocoIndex target with a table name originating from an untrusted input — for example, from an end-user request or an external configuration — that input can contain malicious SQL fragments [2][4]. The vulnerability is specifically triggered during target schema changes when the Doris target connector constructs an ALTER TABLE statement without proper validation [1][2].

Impact

Successful exploitation could allow an attacker to execute unauthorized SQL statements on the Doris backend, potentially leading to data manipulation, unauthorized data access, or other database-level compromises [1][2]. The impact is tied to the privileges of the database user used by the connector.

Mitigation

The vulnerability has been patched in CocoIndex version 0.3.34, which validates table names at the entry point to ensure they are valid identifiers [1][4]. As a workaround, users must ensure that table names are sourced from trusted locations and are validated before being used to configure CocoIndex targets [4].

AI Insight generated on May 18, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
cocoindexPyPI
< 0.3.340.3.34

Affected products

2

Patches

1
ba2fc4a89e22

fix: make sure execution plan initialized after target setup done (#1715)

https://github.com/cocoindex-io/cocoindexJiangzhouFeb 28, 2026via ghsa
4 files changed · +88 64
  • rust/cocoindex/src/builder/exec_ctx.rs+1 0 modified
    @@ -8,6 +8,7 @@ pub struct ImportOpExecutionContext {
         pub source_id: i32,
     }
     
    +#[derive(Debug, Clone)]
     pub struct ExportOpExecutionContext {
         pub target_id: i32,
         pub schema_version_id: usize,
    
  • rust/cocoindex/src/execution/db_tracking_setup.rs+58 13 modified
    @@ -1,8 +1,10 @@
    +use crate::builder::exec_ctx::ExportOpExecutionContext;
     use crate::prelude::*;
     
     use crate::execution::db_tracking;
     use crate::lib_context::get_settings;
     use crate::setup::{CombinedState, ResourceSetupChange, ResourceSetupInfo, SetupChangeType};
    +use cocoindex_utils::error::{SharedError, SharedResultExt};
     use serde::{Deserialize, Serialize};
     use sqlx::PgPool;
     use std::collections::{BTreeMap, BTreeSet};
    @@ -120,7 +122,6 @@ pub struct TargetInfoForCleanup {
         pub key_field_schemas: Vec<schema::FieldSchema>,
     }
     
    -#[derive(Debug)]
     pub struct TrackingTableSetupChange {
         pub desired_state: Option<TrackingTableSetupState>,
     
    @@ -135,18 +136,55 @@ pub struct TrackingTableSetupChange {
         /// Target information for cleanup (target_id -> TargetInfoForCleanup)
         pub desired_targets: Option<BTreeMap<i32, TargetInfoForCleanup>>,
     
    -    /// Export contexts for targets (target_id -> export_context)
    -    pub export_contexts: Option<Arc<BTreeMap<i32, Arc<dyn Any + Send + Sync>>>>,
    +    /// Lazily resolved execution plan (awaited only when cleanup needs export contexts)
    +    pub execution_plan:
    +        Shared<BoxFuture<'static, std::result::Result<Arc<plan::ExecutionPlan>, SharedError>>>,
    +    pub export_op_execution_contexts: Vec<ExportOpExecutionContext>,
     
         has_state_change: bool,
     }
     
    +impl std::fmt::Debug for TrackingTableSetupChange {
    +    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    +        f.debug_struct("TrackingTableSetupChange")
    +            .field("desired_state", &self.desired_state)
    +            .field("min_existing_version_id", &self.min_existing_version_id)
    +            .field(
    +                "legacy_tracking_table_names",
    +                &self.legacy_tracking_table_names,
    +            )
    +            .field(
    +                "source_state_table_always_exists",
    +                &self.source_state_table_always_exists,
    +            )
    +            .field(
    +                "legacy_source_state_table_names",
    +                &self.legacy_source_state_table_names,
    +            )
    +            .field(
    +                "source_names_need_state_cleanup",
    +                &self.source_names_need_state_cleanup,
    +            )
    +            .field("desired_targets", &self.desired_targets)
    +            .field(
    +                "export_op_execution_contexts",
    +                &self.export_op_execution_contexts,
    +            )
    +            .field("has_state_change", &self.has_state_change)
    +            .finish_non_exhaustive()
    +    }
    +}
    +
     impl TrackingTableSetupChange {
         pub fn new(
             desired: Option<&TrackingTableSetupState>,
             existing: &CombinedState<TrackingTableSetupState>,
             source_names_need_state_cleanup: BTreeMap<i32, BTreeSet<String>>,
             desired_targets: Option<BTreeMap<i32, TargetInfoForCleanup>>,
    +        execution_plan: Shared<
    +            BoxFuture<'static, std::result::Result<Arc<plan::ExecutionPlan>, SharedError>>,
    +        >,
    +        export_op_execution_contexts: Vec<ExportOpExecutionContext>,
         ) -> Option<Self> {
             let legacy_tracking_table_names = existing
                 .legacy_values(desired, |v| &v.table_name)
    @@ -174,19 +212,15 @@ impl TrackingTableSetupChange {
                     min_existing_version_id,
                     source_names_need_state_cleanup,
                     desired_targets,
    -                export_contexts: None,
    +                execution_plan,
    +                export_op_execution_contexts,
                     has_state_change: existing.has_state_diff(desired, |v| v),
                 })
             } else {
                 None
             }
         }
     
    -    /// Attach export contexts for targets (called after targets are built)
    -    pub fn attach_export_contexts(&mut self, contexts: BTreeMap<i32, Arc<dyn Any + Send + Sync>>) {
    -        self.export_contexts = Some(Arc::new(contexts));
    -    }
    -
         pub fn into_setup_info(
             self,
         ) -> ResourceSetupInfo<(), TrackingTableSetupState, TrackingTableSetupChange> {
    @@ -436,20 +470,31 @@ impl TrackingTableSetupChange {
             let mut total_processed = 0u64;
             let mut total_deleted = 0u64;
     
    +        // Lazily resolve the execution plan and build export contexts map
    +        let execution_plan = self.execution_plan.clone().await.into_result()?;
    +        let export_contexts: Arc<BTreeMap<i32, Arc<dyn Any + Send + Sync>>> = Arc::new(
    +            execution_plan
    +                .export_ops
    +                .iter()
    +                .zip(self.export_op_execution_contexts.iter())
    +                .map(|(op, ctx)| (ctx.target_id, op.export_context.clone()))
    +                .collect(),
    +        );
    +
             // Stream entries one by one
             while let Some(entry) = entries_stream.try_next().await? {
                 total_processed += 1;
     
                 let permit = semaphore.clone().acquire_owned().await?;
     
                 let desired_targets = self.desired_targets.clone();
    -            let export_contexts = self.export_contexts.clone();
    +            let export_contexts = export_contexts.clone();
     
                 join_set.spawn(async move {
                     let result = Self::cleanup_single_tracking_entry(
                         entry,
                         desired_targets.as_ref(),
    -                    export_contexts.as_ref(),
    +                    &export_contexts,
                     )
                     .await;
                     drop(permit);
    @@ -481,7 +526,7 @@ impl TrackingTableSetupChange {
         async fn cleanup_single_tracking_entry(
             entry: db_tracking::SourceTrackingEntryForCleanup,
             desired_targets: Option<&BTreeMap<i32, TargetInfoForCleanup>>,
    -        export_contexts: Option<&Arc<BTreeMap<i32, Arc<dyn Any + Send + Sync>>>>,
    +        export_contexts: &BTreeMap<i32, Arc<dyn Any + Send + Sync>>,
         ) -> Result<u64> {
             let Some(desired_targets) = desired_targets else {
                 return Ok(0);
    @@ -531,7 +576,7 @@ impl TrackingTableSetupChange {
     
                 // Get export context for this target
                 let export_context = export_contexts
    -                .and_then(|ctxs| ctxs.get(&target_id))
    +                .get(&target_id)
                     .ok_or_else(|| internal_error!("No export context for target {}", target_id))?;
     
                 // Delete via target factory
    
  • rust/cocoindex/src/lib_context.rs+2 0 modified
    @@ -39,6 +39,8 @@ async fn build_setup_context(
             Some(&setup_execution_context.setup_state),
             existing_flow_ss,
             &analyzed_flow.flow_instance_ctx,
    +        analyzed_flow.execution_plan.clone(),
    +        setup_execution_context.export_ops.clone(),
         )
         .await?;
     
    
  • rust/cocoindex/src/setup/driver.rs+27 51 modified
    @@ -387,6 +387,13 @@ pub async fn diff_flow_setup_states(
         desired_state: Option<&FlowSetupState<DesiredMode>>,
         existing_state: Option<&FlowSetupState<ExistingMode>>,
         flow_instance_ctx: &Arc<FlowInstanceContext>,
    +    execution_plan: Shared<
    +        BoxFuture<
    +            'static,
    +            std::result::Result<Arc<plan::ExecutionPlan>, cocoindex_utils::error::SharedError>,
    +        >,
    +    >,
    +    export_op_execution_contexts: Vec<exec_ctx::ExportOpExecutionContext>,
     ) -> Result<FlowSetupChange> {
         let metadata_change = diff_state(
             existing_state.map(|e| &e.metadata),
    @@ -453,6 +460,8 @@ pub async fn diff_flow_setup_states(
                 .unwrap_or_default(),
             source_names_needs_states_cleanup,
             desired_targets_for_cleanup,
    +        execution_plan,
    +        export_op_execution_contexts,
         );
     
         let mut target_resources = Vec::new();
    @@ -674,16 +683,6 @@ async fn apply_changes_for_flow(
         )
         .await?;
     
    -    if let Some(tracking_table) = &flow_setup_change.tracking_table {
    -        maybe_update_resource_setup(
    -            "tracking table",
    -            write,
    -            std::iter::once(tracking_table),
    -            |setup_change| setup_change[0].setup_change.apply_change(),
    -        )
    -        .await?;
    -    }
    -
         let mut setup_change_by_target_kind = IndexMap::<&str, Vec<_>>::new();
         for target_resource in &flow_setup_change.target_resources {
             setup_change_by_target_kind
    @@ -750,6 +749,16 @@ async fn apply_changes_for_flow(
             )
             .await?;
         }
    +    // Apply tracking table change after target changes, since it may need to clean up tracking states based on the target info.
    +    if let Some(tracking_table) = &flow_setup_change.tracking_table {
    +        maybe_update_resource_setup(
    +            "tracking table",
    +            write,
    +            std::iter::once(tracking_table),
    +            |setup_change| setup_change[0].setup_change.apply_change(),
    +        )
    +        .await?;
    +    }
     
         let is_deletion = status == ObjectStatus::Deleted;
         db_metadata::commit_changes_for_flow(
    @@ -949,8 +958,14 @@ async fn get_flow_setup_change<'a>(
             FlowSetupChangeAction::Drop => {
                 let existing_state = setup_ctx.all_setup_states.flows.get(flow_ctx.flow_name());
                 buffer.insert(
    -                diff_flow_setup_states(None, existing_state, &flow_ctx.flow.flow_instance_ctx)
    -                    .await?,
    +                diff_flow_setup_states(
    +                    None,
    +                    existing_state,
    +                    &flow_ctx.flow.flow_instance_ctx,
    +                    flow_ctx.flow.execution_plan.clone(),
    +                    vec![],
    +                )
    +                .await?,
                 )
             }
         };
    @@ -966,45 +981,6 @@ pub(crate) async fn apply_changes_for_flow_ctx(
         db_pool: &PgPool,
         write: &mut (dyn std::io::Write + Send),
     ) -> Result<()> {
    -    // Attach export contexts to tracking table setup change
    -    if let FlowSetupChangeAction::Setup = action
    -        && let Some(tracking_table) = &mut flow_exec_ctx.setup_change.tracking_table
    -        && let Some(setup_change) = &mut tracking_table.setup_change
    -    {
    -        // Get execution plan with export contexts
    -        let execution_plan = flow_ctx
    -            .flow
    -            .execution_plan
    -            .clone()
    -            .await
    -            .map_err(|e| internal_error!("Failed to get execution plan: {:?}", e))?;
    -
    -        // Build map of target_id -> export_context using flow_exec_ctx.setup_execution_context
    -        let mut export_contexts = BTreeMap::new();
    -        for (export_op, exec_ctx_export_op) in execution_plan
    -            .export_ops
    -            .iter()
    -            .zip(flow_exec_ctx.setup_execution_context.export_ops.iter())
    -        {
    -            export_contexts.insert(
    -                exec_ctx_export_op.target_id,
    -                export_op.export_context.clone(),
    -            );
    -        }
    -
    -        // Attach to tracking change
    -        setup_change.attach_export_contexts(export_contexts);
    -
    -        tracing::debug!(
    -            "Attached export contexts for {} targets to tracking table setup change",
    -            setup_change
    -                .export_contexts
    -                .as_ref()
    -                .map(|c| c.len())
    -                .unwrap_or(0)
    -        );
    -    }
    -
         let mut setup_change_buffer = None;
         let setup_change = get_flow_setup_change(
             setup_ctx,
    

Vulnerability mechanics

Generated on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

4

News mentions

0

No linked articles in our index yet.