VYPR
Moderate severityNVD Advisory· Published Oct 9, 2025· Updated Nov 4, 2025

Apache Flink CDC, Apache Flink CDC, Apache Flink CDC, Apache Flink CDC, Apache Flink CDC: SQL injection via maliciously crafted identifiers

CVE-2025-62228

Description

Apache Flink CDC version 3.4.0 was vulnerable to a SQL injection via maliciously crafted identifiers eg. crafted database name or crafted table name. Even through only the logged-in database user can trigger the attack, we recommend users update Flink CDC version to 3.5.0 which address this issue.

AI Insight

LLM-synthesized narrative grounded in this CVE's description and references.

Apache Flink CDC 3.4.0 is vulnerable to SQL injection via crafted database or table names, allowing authenticated users to execute arbitrary SQL.

Vulnerability

Overview

CVE-2025-62228 is a SQL injection vulnerability in Apache Flink CDC version 3.4.0. The root cause is the use of string formatting to construct SQL statements with user-controlled identifiers (database names and table names) without proper sanitization. For example, in the OceanBaseMySQLCatalog class, queries like SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '%s' were built using String.format(), directly embedding the identifier into the SQL string [1][3]. This allows an attacker to inject arbitrary SQL by providing a crafted identifier containing SQL metacharacters.

Exploitation

Exploitation requires the attacker to be a logged-in database user who can supply a maliciously crafted database or table name to the Flink CDC pipeline. The attack does not require network-level access beyond normal usage; the injection occurs when Flink CDC processes the identifier in internal SQL queries. The fix, introduced in version 3.5.0, replaces string formatting with parameterized queries (using ? placeholders) and adds escaping functions like escapeSingleQuote() and quote() to safely handle special characters [1][3].

Impact

A successful SQL injection could allow an authenticated attacker to execute arbitrary SQL statements on the underlying database. This could lead to unauthorized data access, modification, or deletion, depending on the database user's privileges. The vulnerability is limited to authenticated users, but the potential impact is high due to the sensitive nature of CDC pipelines that often handle critical data streams.

Mitigation

Users are strongly advised to upgrade Apache Flink CDC to version 3.5.0, which contains the security fix [2]. No workarounds have been provided for version 3.4.0. The fix is implemented in commit d5766187a9a4b191820e10238d4594ae665cdb89 [3]. As of the publication date, this vulnerability is not listed in CISA's Known Exploited Vulnerabilities (KEV) catalog.

AI Insight generated on May 19, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
org.apache.flink:flink-cdc-pipeline-connectorsMaven
>= 3.0.0, < 3.5.03.5.0
org.apache.flink:flink-connector-oracle-cdcMaven
>= 3.0.0, < 3.5.03.5.0
org.apache.flink:flink-connector-db2-cdcMaven
>= 3.0.0, < 3.5.03.5.0
org.apache.flink:flink-connector-sqlserver-cdcMaven
>= 3.0.0, < 3.5.03.5.0
org.apache.flink:flink-connector-mysql-cdcMaven
>= 3.0.0, < 3.5.03.5.0

Affected products

2
  • Apache/Flinkllm-fuzzy
    Range: =3.4.0
  • Apache Software Foundation/Apache Flink CDCv5
    Range: 3.3.0

Patches

1
d5766187a9a4

[minor] Fix potential sql connection statement issue. (#4123)

https://github.com/apache/flink-cdcKunniSep 18, 2025via ghsa
5 files changed · +107 28
  • flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/catalog/OceanBaseCatalog.java+5 1 modified
    @@ -52,9 +52,13 @@ public void open() {
             LOG.info("Open OceanBase catalog");
         }
     
    -    protected List<String> executeSingleColumnStatement(String sql) throws SQLException {
    +    protected List<String> executeSingleColumnStatement(String sql, Object... params)
    +            throws SQLException {
             try (Connection conn = connectionProvider.getConnection();
                     PreparedStatement statement = conn.prepareStatement(sql)) {
    +            for (int i = 0; i < params.length; i++) {
    +                statement.setObject(i + 1, params[i]);
    +            }
                 List<String> columnValues = Lists.newArrayList();
                 try (ResultSet rs = statement.executeQuery()) {
                     while (rs.next()) {
    
  • flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/main/java/org/apache/flink/cdc/connectors/oceanbase/catalog/OceanBaseMySQLCatalog.java+46 25 modified
    @@ -36,9 +36,8 @@
      */
     public class OceanBaseMySQLCatalog extends OceanBaseCatalog {
     
    -    private static final String RENAME_DDL = "ALTER TABLE `%s`.`%s` RENAME COLUMN `%s` TO `%s`";
    -    private static final String ALTER_COLUMN_TYPE_DDL =
    -            "ALTER TABLE `%s`.`%s` MODIFY COLUMN `%s` %s;";
    +    private static final String RENAME_DDL = "ALTER TABLE %s.%s RENAME COLUMN %s TO %s";
    +    private static final String ALTER_COLUMN_TYPE_DDL = "ALTER TABLE %s.%s MODIFY COLUMN %s %s;";
     
         private static final Logger LOG = LoggerFactory.getLogger(OceanBaseMySQLCatalog.class);
     
    @@ -59,11 +58,10 @@ public boolean databaseExists(String databaseName) throws OceanBaseCatalogExcept
                     !StringUtils.isNullOrWhitespaceOnly(databaseName),
                     "database name cannot be null or empty.");
             String querySql =
    -                String.format(
    -                        "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA` WHERE SCHEMA_NAME = '%s';",
    -                        databaseName);
    +                "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA` WHERE SCHEMA_NAME = ?;";
             try {
    -            List<String> dbList = executeSingleColumnStatement(querySql);
    +            List<String> dbList =
    +                    executeSingleColumnStatement(querySql, escapeSingleQuote(databaseName));
                 return !dbList.isEmpty();
             } catch (Exception e) {
                 LOG.error(
    @@ -76,6 +74,14 @@ public boolean databaseExists(String databaseName) throws OceanBaseCatalogExcept
             }
         }
     
    +    public static String escapeSingleQuote(String dbOrTableName) {
    +        return dbOrTableName.replace("'", "\\'");
    +    }
    +
    +    public static String quote(String dbOrTableName) {
    +        return "`" + dbOrTableName.replace("`", "``") + "`";
    +    }
    +
         /**
          * Create a database.
          *
    @@ -90,7 +96,7 @@ public void createDatabase(String databaseName, boolean ignoreIfExists)
             Preconditions.checkArgument(
                     !StringUtils.isNullOrWhitespaceOnly(databaseName),
                     "database name cannot be null or empty.");
    -        String sql = buildCreateDatabaseSql(databaseName, ignoreIfExists);
    +        String sql = buildCreateDatabaseSql(quote(databaseName), ignoreIfExists);
             try {
                 executeUpdateStatement(sql);
                 LOG.info("Successful to create database {}, sql: {}", databaseName, sql);
    @@ -114,16 +120,25 @@ public boolean tableExists(String databaseName, String tableName)
                     !StringUtils.isNullOrWhitespaceOnly(tableName),
                     "table name cannot be null or empty.");
             String querySql =
    -                String.format(
    -                        "SELECT `TABLE_NAME` FROM `INFORMATION_SCHEMA`.`TABLES` WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s';",
    -                        databaseName, tableName);
    +                "SELECT `TABLE_NAME` FROM `INFORMATION_SCHEMA`.`TABLES` WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?;";
             try {
    -            List<String> dbList = executeSingleColumnStatement(querySql);
    +            List<String> dbList =
    +                    executeSingleColumnStatement(
    +                            querySql,
    +                            escapeSingleQuote(databaseName),
    +                            escapeSingleQuote(tableName));
                 return !dbList.isEmpty();
             } catch (Exception e) {
    -            LOG.error("Failed to check table exist, table: {}, sql: {}", tableName, querySql, e);
    +            LOG.error(
    +                    "Failed to check table exist, table: {}.{}, sql: {}",
    +                    databaseName,
    +                    tableName,
    +                    querySql,
    +                    e);
                 throw new OceanBaseCatalogException(
    -                    String.format("Failed to check table exist, table: %s", tableName), e);
    +                    String.format(
    +                            "Failed to check table exist, table: %s.%s", databaseName, tableName),
    +                    e);
             }
         }
     
    @@ -242,9 +257,9 @@ public void alterColumnType(
             String alterTypeSql =
                     String.format(
                             ALTER_COLUMN_TYPE_DDL,
    -                        databaseName,
    -                        tableName,
    -                        columnName,
    +                        quote(databaseName),
    +                        quote(tableName),
    +                        quote(columnName),
                             oceanBaseColumn.getDataType());
     
             try {
    @@ -316,7 +331,7 @@ public void dropTable(String databaseName, String tableName) {
                     "table name cannot be null or empty.");
     
             String dropTableDDL =
    -                String.format("DROP TABLE IF EXISTS `%s`.`%s`", databaseName, tableName);
    +                String.format("DROP TABLE IF EXISTS %s.%s", quote(databaseName), quote(tableName));
             try {
                 long startTimeMillis = System.currentTimeMillis();
                 executeUpdateStatement(dropTableDDL);
    @@ -347,7 +362,8 @@ public void truncateTable(String databaseName, String tableName) {
                     !StringUtils.isNullOrWhitespaceOnly(tableName),
                     "table name cannot be null or empty.");
     
    -        String dropTableDDL = String.format("TRUNCATE TABLE `%s`.`%s`", databaseName, tableName);
    +        String dropTableDDL =
    +                String.format("TRUNCATE TABLE %s.%s", quote(databaseName), quote(tableName));
             try {
                 long startTimeMillis = System.currentTimeMillis();
                 executeUpdateStatement(dropTableDDL);
    @@ -382,10 +398,10 @@ protected String buildCreateTableSql(OceanBaseTable table, boolean ignoreIfExist
             StringBuilder builder = new StringBuilder();
             builder.append(
                     String.format(
    -                        "CREATE TABLE %s`%s`.`%s`",
    +                        "CREATE TABLE %s%s.%s",
                             ignoreIfExists ? "IF NOT EXISTS " : "",
    -                        table.getDatabaseName(),
    -                        table.getTableName()));
    +                        quote(table.getDatabaseName()),
    +                        quote(table.getTableName())));
             builder.append(" (\n");
             String columnsStmt =
                     table.getColumns().stream()
    @@ -421,7 +437,7 @@ protected String buildCreateTableSql(OceanBaseTable table, boolean ignoreIfExist
         private String buildAlterDropColumnsSql(
                 String databaseName, String tableName, List<String> dropColumns) {
             StringBuilder builder = new StringBuilder();
    -        builder.append(String.format("ALTER TABLE `%s`.`%s` ", databaseName, tableName));
    +        builder.append(String.format("ALTER TABLE %s.%s ", quote(databaseName), quote(tableName)));
             String columnsStmt =
                     dropColumns.stream()
                             .map(col -> String.format("DROP COLUMN `%s`", col))
    @@ -480,7 +496,7 @@ protected String getFullColumnType(
         protected String buildAlterAddColumnsSql(
                 String databaseName, String tableName, List<OceanBaseColumn> addColumns) {
             StringBuilder builder = new StringBuilder();
    -        builder.append(String.format("ALTER TABLE `%s`.`%s` ", databaseName, tableName));
    +        builder.append(String.format("ALTER TABLE %s.%s ", quote(databaseName), quote(tableName)));
             String columnsStmt =
                     addColumns.stream()
                             .map(col -> "ADD COLUMN " + buildColumnStmt(col))
    @@ -492,6 +508,11 @@ protected String buildAlterAddColumnsSql(
     
         private static String buildRenameColumnSql(
                 String schemaName, String tableName, String oldColumnName, String newColumnName) {
    -        return String.format(RENAME_DDL, schemaName, tableName, oldColumnName, newColumnName);
    +        return String.format(
    +                RENAME_DDL,
    +                quote(schemaName),
    +                quote(tableName),
    +                quote(oldColumnName),
    +                quote(newColumnName));
         }
     }
    
  • flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/test/java/org/apache/flink/cdc/connectors/oceanbase/sink/OceanBaseMetadataApplierTest.java+28 0 modified
    @@ -145,6 +145,34 @@ void testCreateTable() {
                             .build();
     
             assertThat(actualTable).isEqualTo(expectTable);
    +
    +        tableId = TableId.tableId("nonexistent' OR '1'='1", "tabl1");
    +        metadataApplier.applySchemaChange(new CreateTableEvent(tableId, schema));
    +        actualTable =
    +                catalog.getTable(tableId.getSchemaName(), tableId.getTableName()).orElse(null);
    +        assertThat(actualTable).isNotNull();
    +        assertThat(actualTable.getDatabaseName()).isEqualTo("nonexistent' OR '1'='1");
    +
    +        tableId = TableId.tableId("test", "nonexistent' OR '1'='1");
    +        metadataApplier.applySchemaChange(new CreateTableEvent(tableId, schema));
    +        actualTable =
    +                catalog.getTable(tableId.getSchemaName(), tableId.getTableName()).orElse(null);
    +        assertThat(actualTable).isNotNull();
    +        assertThat(actualTable.getTableName()).isEqualTo("nonexistent' OR '1'='1");
    +
    +        tableId = TableId.tableId("nonexistent` OR `1`=`1", "tabl1");
    +        metadataApplier.applySchemaChange(new CreateTableEvent(tableId, schema));
    +        actualTable =
    +                catalog.getTable(tableId.getSchemaName(), tableId.getTableName()).orElse(null);
    +        assertThat(actualTable).isNotNull();
    +        assertThat(actualTable.getDatabaseName()).isEqualTo("nonexistent` OR `1`=`1");
    +
    +        tableId = TableId.tableId("test", "nonexistent` OR `1`=`1");
    +        metadataApplier.applySchemaChange(new CreateTableEvent(tableId, schema));
    +        actualTable =
    +                catalog.getTable(tableId.getSchemaName(), tableId.getTableName()).orElse(null);
    +        assertThat(actualTable).isNotNull();
    +        assertThat(actualTable.getTableName()).isEqualTo("nonexistent` OR `1`=`1");
         }
     
         @Test
    
  • flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/test/java/org/apache/flink/cdc/connectors/oceanbase/utils/OceanBaseTestMySQLCatalog.java+3 2 modified
    @@ -122,8 +122,9 @@ public Optional<OceanBaseTable> getTable(String databaseName, String tableName)
         }
     
         @Override
    -    public List<String> executeSingleColumnStatement(String sql) throws SQLException {
    -        return super.executeSingleColumnStatement(sql);
    +    public List<String> executeSingleColumnStatement(String sql, Object... params)
    +            throws SQLException {
    +        return super.executeSingleColumnStatement(sql, params);
         }
     
         @Override
    
  • flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oceanbase/src/test/resources/log4j2-test.properties+25 0 added
    @@ -0,0 +1,25 @@
    +################################################################################
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    +# The ASF licenses this file to You under the Apache License, Version 2.0
    +# (the "License"); you may not use this file except in compliance with
    +# the License.  You may obtain a copy of the License at
    +#      http://www.apache.org/licenses/LICENSE-2.0
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +################################################################################
    +
    +# Set root logger level to ERROR to not flood build logs
    +# set manually to INFO for debugging purposes
    +rootLogger.level=ERROR
    +rootLogger.appenderRef.test.ref = TestLogger
    +
    +appender.testlogger.name = TestLogger
    +appender.testlogger.type = CONSOLE
    +appender.testlogger.target = SYSTEM_ERR
    +appender.testlogger.layout.type = PatternLayout
    +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n
    

Vulnerability mechanics

Generated on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

6

News mentions

0

No linked articles in our index yet.