VYPR
Medium severity5.3OSV Advisory· Published Jan 7, 2026· Updated Apr 15, 2026

CVE-2023-7333

CVE-2023-7333

Description

A weakness has been identified in bluelabsio records-mover up to 1.5.4. The affected element is an unknown function of the component Table Object Handler. This manipulation causes sql injection. The attack needs to be launched locally. Upgrading to version 1.6.0 is sufficient to fix this issue. Patch name: 3f8383aa89f45d861ca081e3e9fd2cc9d0b5dfaa. You should upgrade the affected component.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
records-moverPyPI
< 1.6.01.6.0

Affected products

1

Patches

2
3f8383aa89f4

RM-95 use Table object to prevent SQLInjection

12 files changed · +54 39
  • records_mover/db/vertica/vertica_db_driver.py+3 5 modified
    @@ -1,7 +1,7 @@
     from ..driver import DBDriver
     import sqlalchemy
     from sqlalchemy.sql import text
    -from records_mover.db.quoting import quote_schema_and_table
    +from sqlalchemy import select
     from sqlalchemy.schema import Table, Column
     import logging
     from typing import Optional, Union, Tuple
    @@ -47,10 +47,8 @@ def unloader(self) -> Optional[Unloader]:
     
         def has_table(self, schema: str, table: str) -> bool:
             try:
    -            sql = ("SELECT 1 "
    -                   f"from {quote_schema_and_table(None, schema, table, db_engine=self.db_engine)} "
    -                   "limit 0;")
    -            self.db_conn.execute(text(sql))
    +            table_to_check = Table(table, self.meta, schema=schema)
    +            self.db_conn.execute(select(text("1"), table_to_check))
                 return True
             except sqlalchemy.exc.ProgrammingError:
                 return False
    
  • records_mover/records/prep.py+8 3 modified
    @@ -5,7 +5,8 @@
     from records_mover.db import DBDriver
     from records_mover.records.table import TargetTableDetails
     import logging
    -from sqlalchemy import text
    +from sqlalchemy import text, Table, MetaData
    +from sqlalchemy.schema import DropTable
     
     logger = logging.getLogger(__name__)
     
    @@ -61,7 +62,9 @@ def prep_table_for_load(self,
                                                                db_engine=db_engine,)
                 if (how_to_prep == ExistingTableHandling.TRUNCATE_AND_OVERWRITE):
                     logger.info("Truncating...")
    -                db_conn.execute(text(f"TRUNCATE TABLE {schema_and_table}"))
    +                meta = MetaData()
    +                table = Table(self.tbl.table_name, meta, schema=self.tbl.schema_name)
    +                db_conn.execute(table.delete())
                     logger.info("Truncated.")
                 elif (how_to_prep == ExistingTableHandling.DELETE_AND_OVERWRITE):
                     logger.info("Deleting rows...")
    @@ -72,8 +75,10 @@ def prep_table_for_load(self,
                         with conn.begin():
                             logger.info(f"The connection object is: {conn}")
                             logger.info("Dropping and recreating...")
    +                        meta = MetaData()
    +                        table = Table(self.tbl.table_name, meta, schema=self.tbl.schema_name)
                             drop_table_sql = f"DROP TABLE {schema_and_table}"
    -                        conn.execute(text(drop_table_sql))
    +                        conn.execute(DropTable(table))  # type: ignore[arg-type]
                             logger.info(f"Just ran {drop_table_sql}")
                             self.create_table(schema_sql, conn, driver)
                 elif (how_to_prep == ExistingTableHandling.APPEND):
    
  • records_mover/records/sources/table.py+5 2 modified
    @@ -8,7 +8,7 @@
     from ..records_format import BaseRecordsFormat
     from ..unload_plan import RecordsUnloadPlan
     from ..results import MoveResult
    -from sqlalchemy import text
    +from sqlalchemy import Table, MetaData, select
     from sqlalchemy.engine import Engine
     from contextlib import contextmanager
     from ..schema import RecordsSchema
    @@ -88,10 +88,13 @@ def to_dataframes_source(self,
             chunksize = int(entries_per_chunk / num_columns)
             logger.info(f"Exporting in chunks of up to {chunksize} rows by {num_columns} columns")
     
    +        meta = MetaData()
    +        table = Table(self.table_name, meta, schema=self.schema_name)
             quoted_table = quote_schema_and_table(None, self.schema_name,
                                                   self.table_name, db_engine=db_engine,)
    +        logger.info(f"Reading {quoted_table}...")
             chunks: Generator['DataFrame', None, None] = \
    -            pandas.read_sql(text(f"SELECT * FROM {quoted_table}"),
    +            pandas.read_sql(select('*', table),  # type: ignore[arg-type]
                                 con=db_conn,
                                 chunksize=chunksize)
             try:
    
  • records_mover/records/targets/spectrum.py+3 1 modified
    @@ -14,6 +14,7 @@
     import logging
     import sqlalchemy
     from sqlalchemy import text
    +from sqlalchemy.schema import DropTable
     
     
     logger = logging.getLogger(__name__)
    @@ -84,9 +85,10 @@ def prep_bucket(self) -> None:
                                                                    db_engine=self.db_engine)
                     logger.info(f"Dropping external table {schema_and_table}...")
                     with self.db_engine.connect() as cursor:
    +                    table = Table(self.table_name, MetaData(), schema=self.schema_name)
                         # See below note about fix from Spectrify
                         cursor.execution_options(isolation_level='AUTOCOMMIT')
    -                    cursor.execute(text(f"DROP TABLE IF EXISTS {schema_and_table}"))
    +                    cursor.execute(DropTable(table, if_exists=True))  # type: ignore[call-arg, arg-type]
     
                 logger.info(f"Deleting files in {self.output_loc}...")
                 self.output_loc.purge_directory()
    
  • tests/integration/records/purge_old_test_tables.py+4 2 modified
    @@ -3,7 +3,8 @@
     from records_mover.db.quoting import quote_schema_and_table
     from records_mover import Session
     from datetime import datetime, timedelta
    -from sqlalchemy import inspect
    +from sqlalchemy import inspect, Table, MetaData
    +from sqlalchemy.schema import DropTable
     from typing import Optional
     import sys
     
    @@ -39,9 +40,10 @@ def purge_old_tables(schema_name: str, table_name_prefix: str,
                 "DROP TABLE "
                 f"{quote_schema_and_table(None, schema_name, table_name, db_engine=db_engine)}")
             print(sql)
    +        table = Table(table_name, MetaData(), schema=schema_name)
             with db_engine.connect() as connection:
                 with connection.begin():
    -                connection.exec_driver_sql(sql)
    +                connection.exec_driver_sql(DropTable(table))  # type: ignore[arg-type]
     
     
     if __name__ == '__main__':
    
  • tests/integration/records/records_database_fixture.py+4 3 modified
    @@ -1,6 +1,7 @@
     from records_mover.db.quoting import quote_schema_and_table
     from records_mover.utils.retry import bigquery_retry
    -from sqlalchemy import text
    +from sqlalchemy import Table, MetaData
    +from sqlalchemy.schema import DropTable
     import logging
     
     logger = logging.getLogger(__name__)
    @@ -20,10 +21,10 @@ def __init__(self, db_engine, schema_name, table_name):
     
         @bigquery_retry()
         def drop_table_if_exists(self, schema, table):
    -        sql = f"DROP TABLE IF EXISTS {self.quote_schema_and_table(schema, table)}"
    +        table_to_drop = Table(table, MetaData(), schema=schema)
             with self.engine.connect() as connection:
                 with connection.begin():
    -                connection.execute(text(sql))
    +                connection.execute(DropTable(table_to_drop, if_exists=True))
     
         def tear_down(self):
             self.drop_table_if_exists(self.schema_name, f"{self.table_name}_frozen")
    
  • tests/integration/records/records_datetime_fixture.py+5 4 modified
    @@ -6,8 +6,9 @@
         SAMPLE_YEAR, SAMPLE_MONTH, SAMPLE_DAY,
         SAMPLE_HOUR, SAMPLE_MINUTE, SAMPLE_SECOND, SAMPLE_OFFSET, SAMPLE_LONG_TZ
     )
    -from sqlalchemy import text
    +from sqlalchemy import text, Table, MetaData
     from sqlalchemy.engine import Engine, Connection
    +from sqlalchemy.schema import DropTable
     from typing import Optional
     
     import logging
    @@ -28,13 +29,13 @@ def quote_schema_and_table(self, schema, table):
     
         @bigquery_retry()
         def drop_table_if_exists(self, schema, table):
    -        sql = f"DROP TABLE IF EXISTS {self.quote_schema_and_table(schema, table)}"
    +        table_to_drop = Table(table, MetaData(), schema=schema)
             if not self.connection:
                 with self.engine.connect() as connection:
                     with connection.begin():
    -                    connection.execute(text(sql))
    +                    connection.execute(DropTable(table_to_drop, if_exists=True))
             else:
    -            self.connection.execute(text(sql))
    +            self.connection.execute(DropTable(table_to_drop, if_exists=True))
     
         def createDateTimeTzTable(self) -> None:
             if self.engine.name == 'redshift':
    
  • tests/integration/records/records_numeric_database_fixture.py+4 3 modified
    @@ -1,5 +1,6 @@
     from records_mover.db.quoting import quote_schema_and_table
    -from sqlalchemy import text
    +from sqlalchemy import Table, MetaData
    +from sqlalchemy.schema import DropTable
     
     
     class RecordsNumericDatabaseFixture:
    @@ -136,10 +137,10 @@ def quote_schema_and_table(self, schema, table):
                                           db_engine=self.engine)
     
         def drop_table_if_exists(self, schema, table):
    -        sql = f"DROP TABLE IF EXISTS {self.quote_schema_and_table(schema, table)}"
    +        table_to_drop = Table(table, MetaData(), schema=schema)
             with self.engine.connect() as connection:
                 with connection.begin():
    -                connection.execute(text(sql))
    +                connection.execute(DropTable(table_to_drop, if_exists=True))
     
         def tear_down(self):
             self.drop_table_if_exists(self.schema_name, self.table_name)
    
  • tests/unit/db/vertica/test_vertica_db_driver.py+4 4 modified
    @@ -54,14 +54,14 @@ def test_schema_sql_but_not_from_export_objects(self):
             self.assertTrue(sql is not None)
     
         def test_has_table_true(self):
    -        mock_schema = Mock(name='schema')
    -        mock_table = Mock(name='table')
    +        mock_schema = 'myschema'
    +        mock_table = 'mytable'
             self.assertEqual(True,
                              self.vertica_db_driver.has_table(mock_schema, mock_table))
     
         def test_has_table_false(self):
    -        mock_schema = Mock(name='schema')
    -        mock_table = Mock(name='table')
    +        mock_schema = 'myschema'
    +        mock_table = 'mytable'
             self.mock_db_engine.execute.side_effect = sqlalchemy.exc.ProgrammingError('statement', {},
                                                                                       'orig')
             self.assertEqual(False,
    
  • tests/unit/records/sources/test_table.py+4 4 modified
    @@ -6,8 +6,8 @@
     
     class TestTableRecordsSource(unittest.TestCase):
         def setUp(self):
    -        self.mock_schema_name = Mock(name='schema_name')
    -        self.mock_table_name = Mock(name='table_name')
    +        self.mock_schema_name = 'mock_schema_name'
    +        self.mock_table_name = 'mock_table_name'
             self.mock_driver = MagicMock(name='driver')
             self.mock_loader = self.mock_driver.loader.return_value
             self.mock_unloader = self.mock_driver.unloader.return_value
    @@ -37,7 +37,6 @@ def test_to_dataframes_source(self,
             mock_column = Mock(name='column')
             mock_columns = [mock_column]
             mock_db_engine.dialect.get_columns.return_value = mock_columns
    -        mock_quoted_table = mock_quote_schema_and_table.return_value
             mock_chunks = mock_read_sql.return_value
             with self.table_records_source.to_dataframes_source(mock_processing_instructions) as\
                     df_source:
    @@ -49,7 +48,8 @@ def test_to_dataframes_source(self,
                                                                     self.mock_table_name,
                                                                     driver=self.mock_driver)
                 str_arg = str(mock_read_sql.call_args.args[0])
    -            self.assertEqual(str_arg, f"SELECT * FROM {mock_quoted_table}")
    +            self.assertEqual(str_arg,
    +                             f"SELECT * \nFROM {self.mock_schema_name}.{self.mock_table_name}")
                 kwargs = mock_read_sql.call_args.kwargs
                 self.assertEqual(kwargs['con'], mock_db_conn)
                 self.assertEqual(kwargs['chunksize'], 2000000)
    
  • tests/unit/records/targets/test_spectrum.py+2 2 modified
    @@ -40,7 +40,6 @@ def test_init(self):
     
         @patch('records_mover.records.targets.spectrum.quote_schema_and_table')
         def test_pre_load_hook_preps_bucket_with_default_prep(self, mock_quote_schema_and_table):
    -        mock_schema_and_table = mock_quote_schema_and_table.return_value
             mock_cursor = self.target.driver.db_engine.connect.return_value.__enter__.return_value
     
             self.target.pre_load_hook()
    @@ -51,7 +50,8 @@ def test_pre_load_hook_preps_bucket_with_default_prep(self, mock_quote_schema_an
             mock_cursor.execution_options.assert_called_with(isolation_level='AUTOCOMMIT')
             arg = mock_cursor.execute.call_args.args[0]
             arg_str = str(arg)
    -        self.assertEqual(arg_str, f"DROP TABLE IF EXISTS {mock_schema_and_table}")
    +        self.assertEqual(
    +            arg_str, f"\nDROP TABLE IF EXISTS {self.target.schema_name}.{self.target.table_name}")
             self.mock_output_loc.purge_directory.assert_called_with()
     
         @patch('records_mover.records.targets.spectrum.RecordsDirectory')
    
  • tests/unit/records/test_prep.py+8 6 modified
    @@ -8,6 +8,8 @@
     class TestPrep(unittest.TestCase):
         def setUp(self):
             self.mock_tbl = Mock(name='target_table_details')
    +        self.mock_tbl.schema_name = 'mock_schema_name'
    +        self.mock_tbl.table_name = 'mock_table_name'
             self.prep = TablePrep(self.mock_tbl)
     
         @patch('records_mover.records.prep.quote_schema_and_table')
    @@ -33,7 +35,6 @@ def test_prep_table_exists_truncate_implicit(self, mock_quote_schema_and_table):
             mock_quote_schema_and_table
             mock_driver.has_table.return_value = True
             how_to_prep = ExistingTableHandling.TRUNCATE_AND_OVERWRITE
    -        mock_schema_and_table = mock_quote_schema_and_table.return_value
             self.mock_tbl.existing_table_handling = how_to_prep
     
             self.prep.prep(mock_schema_sql, mock_driver)
    @@ -43,7 +44,8 @@ def test_prep_table_exists_truncate_implicit(self, mock_quote_schema_and_table):
                                                            self.mock_tbl.table_name,
                                                            db_engine=mock_driver.db_engine)
             str_arg = str(mock_driver.db_conn.execute.call_args.args[0])
    -        self.assertEqual(str_arg, f"TRUNCATE TABLE {mock_schema_and_table}")
    +        self.assertEqual(str_arg,
    +                         f"DELETE FROM {self.mock_tbl.schema_name}.{self.mock_tbl.table_name}")
     
         @patch('records_mover.records.prep.quote_schema_and_table')
         def test_prep_table_exists_delete_implicit(self, mock_quote_schema_and_table):
    @@ -79,7 +81,6 @@ def test_prep_table_exists_drop_implicit(self, mock_quote_schema_and_table):
             mock_quote_schema_and_table
             mock_driver.has_table.return_value = True
             how_to_prep = ExistingTableHandling.DROP_AND_RECREATE
    -        mock_schema_and_table = mock_quote_schema_and_table.return_value
             self.mock_tbl.existing_table_handling = how_to_prep
     
             self.prep.prep(mock_schema_sql, mock_driver)
    @@ -91,7 +92,8 @@ def test_prep_table_exists_drop_implicit(self, mock_quote_schema_and_table):
                                                            db_engine=mock_driver.db_engine)
             str_args = [str(call_arg.args[0]) for call_arg in mock_conn.execute.call_args_list]
             drop_table_str_arg, mock_schema_sql_str_arg = str_args[0], str_args[1]
    -        self.assertEqual(drop_table_str_arg, f"DROP TABLE {mock_schema_and_table}")
    +        self.assertEqual(drop_table_str_arg,
    +                         f"\nDROP TABLE {self.mock_tbl.schema_name}.{self.mock_tbl.table_name}")
             self.assertEqual(mock_schema_sql_str_arg, mock_schema_sql)
             mock_driver.set_grant_permissions_for_groups.\
                 assert_called_with(self.mock_tbl.schema_name,
    @@ -151,7 +153,6 @@ def test_prep_table_exists_drop_explicit(self, mock_quote_schema_and_table):
             mock_quote_schema_and_table
             mock_driver.has_table.return_value = True
             how_to_prep = ExistingTableHandling.DELETE_AND_OVERWRITE
    -        mock_schema_and_table = mock_quote_schema_and_table.return_value
             self.mock_tbl.existing_table_handling = how_to_prep
     
             self.prep.prep(mock_schema_sql, mock_driver,
    @@ -163,7 +164,8 @@ def test_prep_table_exists_drop_explicit(self, mock_quote_schema_and_table):
                                                            db_engine=mock_driver.db_engine)
             str_args = [str(call_arg.args[0]) for call_arg in mock_conn.execute.call_args_list]
             drop_table_str_arg, mock_schema_sql_str_arg = str_args[0], str_args[1]
    -        self.assertEqual(drop_table_str_arg, f"DROP TABLE {mock_schema_and_table}")
    +        self.assertEqual(drop_table_str_arg,
    +                         f"\nDROP TABLE {self.mock_tbl.schema_name}.{self.mock_tbl.table_name}")
             self.assertEqual(mock_schema_sql_str_arg, mock_schema_sql)
             mock_driver.set_grant_permissions_for_groups.\
                 assert_called_with(self.mock_tbl.schema_name,
    

Vulnerability mechanics

Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

7

News mentions

0

No linked articles in our index yet.