VYPR
High severityNVD Advisory· Published Oct 22, 2009· Updated Apr 23, 2026

CVE-2009-2940

CVE-2009-2940

Description

The pygresql module 3.8.1 and 4.0 for Python does not properly support the PQescapeStringConn function, which might allow remote attackers to leverage escaping issues involving multibyte character encodings.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
PyGreSQLPyPI
<= 3.8.1
PyGreSQLPyPI
>= 4.0, < 4.14.1

Affected products

2
  • Pygresql/Pygresql2 versions
    cpe:2.3:a:pygresql:pygresql:3.8.1:*:*:*:*:*:*:*+ 1 more
    • cpe:2.3:a:pygresql:pygresql:3.8.1:*:*:*:*:*:*:*
    • cpe:2.3:a:pygresql:pygresql:4.0:*:*:*:*:*:*:*

Patches

2
f7237d773e6f

Added support for PQescapeStringConn() and PQescapeByteaConn(). Promoted the _quote() functions to methods of the connection. Made them use the former functions instead of manually and faulitly handling backslashes and quotes.

https://github.com/PyGreSQL/PyGreSQLChristoph ZwerschkeNov 21, 2008via ghsa
6 files changed · +227 214
  • docs/changelog.txt+3 1 modified
    @@ -19,7 +19,9 @@ Version 4.0 (2008-12-01)
     - Allow DB wrapper to be used with DB-API 2 connections
       (as suggested by Chris Hilton).
     - Made private attributes of DB wrapper accessible.
    -- Dropped dependence on mx.DateTime module
    +- Dropped dependence on mx.DateTime module.
    +- Support for PQescapeStringConn() and PQescapeByteaConn();
    +  these are now also used by the internal _quote() functions.
     
     Version 3.8.1 (2006-06-05)
     --------------------------
    
  • docs/future.txt+0 4 modified
    @@ -6,13 +6,9 @@ PyGreSQL future directions
     To Do
     -----
     
    -- Use PQescapeStringConn in the _quote() function of pg and pgdb
    -  (escaping via backslash is not standard and produces warnings
    -  in newer PostgreSQL versions).
     - Code marked with XXX in the pg module
     - pg relies on OIDs, but these are not generated by default any more
       (at least docs should recommend setting default_with_oids=true).
    -- install.txt talks about MSVC 2003, we should try using 2005/2008
     - 64 bit int typecasting in pgdb module
       (see http://mailman.vex.net/pipermail/pygresql/2008-March/001943.html).
     - Support composite primary keys
    
  • module/pgdb.py+43 40 modified
    @@ -4,7 +4,7 @@
     #
     # Written by D'Arcy J.M. Cain
     #
    -# $Id: pgdb.py,v 1.47 2008-11-21 13:10:51 cito Exp $
    +# $Id: pgdb.py,v 1.48 2008-11-21 21:17:53 cito Exp $
     #
     
     """pgdb - DB-API 2.0 compliant module for PygreSQL.
    @@ -145,47 +145,15 @@ def getdescr(self, oid):
                 return res
     
     
    -class _quoteitem(dict):
    -    """Dictionary with auto quoting of its items."""
    +class _quotedict(dict):
    +    """Dictionary with auto quoting of its items.
     
    -    def __getitem__(self, key):
    -        return _quote(super(_quoteitem, self).__getitem__(key))
    -
    -
    -def _quote(val):
    -    """Quote value depending on its type."""
    -    if isinstance(val, datetime):
    -        val = str(val)
    -    elif isinstance(val, unicode):
    -        val = val.encode( 'utf-8' )
    -    if isinstance(val, str):
    -        val = "'%s'" % str(val).replace("\\", "\\\\").replace("'", "''")
    -    elif isinstance(val, (int, long, float)):
    -        pass
    -    elif val is None:
    -        val = 'NULL'
    -    elif isinstance(val, (list, tuple)):
    -        val = '(%s)' % ','.join(map(lambda v: str(_quote(v)), val))
    -    elif Decimal is not float and isinstance(val, Decimal):
    -        pass
    -    elif hasattr(val, '__pg_repr__'):
    -        val = val.__pg_repr__()
    -    else:
    -        raise InterfaceError('do not know how to handle type %s' % type(val))
    -    return val
    -
    -
    -def _quoteparams(string, params):
    -    """Quote parameters.
    -
    -    This function works for both mappings and sequences.
    +    The quote attribute must be set to the desired quote function.
     
         """
    -    if hasattr(params, 'has_key'):
    -        params = _quoteitem(params)
    -    else:
    -        params = tuple(map(_quote, params))
    -    return string % params
    +
    +    def __getitem__(self, key):
    +        return self.quote(super(_quotedict, self).__getitem__(key))
     
     
     ### Cursor Object
    @@ -204,6 +172,41 @@ def __init__(self, dbcnx):
             self.arraysize = 1
             self.lastrowid = None
     
    +    def _quote(self, val):
    +        """Quote value depending on its type."""
    +        if isinstance(val, datetime):
    +            val = str(val)
    +        elif isinstance(val, unicode):
    +            val = val.encode( 'utf8' )
    +        if isinstance(val, str):
    +            val = "'%s'" % self._cnx.escape_string(val)
    +        elif isinstance(val, (int, long, float)):
    +            pass
    +        elif val is None:
    +            val = 'NULL'
    +        elif isinstance(val, (list, tuple)):
    +            val = '(%s)' % ','.join(map(lambda v: str(self._quote(v)), val))
    +        elif Decimal is not float and isinstance(val, Decimal):
    +            pass
    +        elif hasattr(val, '__pg_repr__'):
    +            val = val.__pg_repr__()
    +        else:
    +            raise InterfaceError('do not know how to handle type %s' % type(val))
    +        return val
    +
    +    def _quoteparams(self, string, params):
    +        """Quote parameters.
    +
    +        This function works for both mappings and sequences.
    +
    +        """
    +        if isinstance(params, dict):
    +            params = _quotedict(params)
    +            params.quote = self._quote
    +        else:
    +            params = tuple(map(self._quote, params))
    +        return string % params
    +
         def row_factory(row):
             """Process rows before they are returned.
     
    @@ -260,7 +263,7 @@ def executemany(self, operation, param_seq):
                     self._dbcnx._tnx = True
                 for params in param_seq:
                     if params:
    -                    sql = _quoteparams(operation, params)
    +                    sql = self._quoteparams(operation, params)
                     else:
                         sql = operation
                     rows = self._src.execute(sql)
    
  • module/pg.py+102 94 modified
    @@ -5,7 +5,7 @@
     # Written by D'Arcy J.M. Cain
     # Improved by Christoph Zwerschke
     #
    -# $Id: pg.py,v 1.62 2008-11-21 19:25:27 cito Exp $
    +# $Id: pg.py,v 1.63 2008-11-21 21:17:53 cito Exp $
     #
     
     """PyGreSQL classic interface.
    @@ -32,58 +32,6 @@
     
     # Auxiliary functions which are independent from a DB connection:
     
    -def _quote_text(d):
    -    """Quote text value."""
    -    if not isinstance(d, basestring):
    -        d = str(d)
    -    return "'%s'" % d.replace("\\", "\\\\").replace("'", "''")
    -
    -_bool_true = frozenset('t true 1 y yes on'.split())
    -
    -def _quote_bool(d):
    -    """Quote boolean value."""
    -    if isinstance(d, basestring):
    -        if not d:
    -            return 'NULL'
    -        d = d.lower() in _bool_true
    -    else:
    -        d = bool(d)
    -    return ("'f'", "'t'")[d]
    -
    -_date_literal = frozenset('current_date current_time'
    -    ' current_timestamp localtime localtimestamp'.split())
    -
    -def _quote_date(d):
    -    """Quote date value."""
    -    if not d:
    -        return 'NULL'
    -    if isinstance(d, basestring) and d.lower() in _date_literal:
    -        return d
    -    return _quote_text(d)
    -
    -def _quote_num(d):
    -    """Quote numeric value."""
    -    if not d:
    -        return 'NULL'
    -    return str(d)
    -
    -def _quote_money(d):
    -    """Quote money value."""
    -    if not d:
    -        return 'NULL'
    -    return "'%.2f'" % float(d)
    -
    -_quote_funcs = dict( # quote functions for each type
    -    text=_quote_text, bool=_quote_bool, date=_quote_date,
    -    int=_quote_num, num=_quote_num, float=_quote_num, money=_quote_money)
    -
    -def _quote(d, t):
    -    """Return quotes if needed."""
    -    if d is None:
    -        return 'NULL'
    -    else:
    -        return _quote_funcs.get(t, _quote_text)(d)
    -
     def _is_quoted(s):
         """Check whether this string is a quoted identifier."""
         s = s.replace('_', 'a')
    @@ -188,9 +136,7 @@ def __getattr__(self, name):
             else:
                 raise InternalError('Connection is not valid')
     
    -    # escape_string and escape_bytea exist as methods,
    -    # so we define unescape_bytea as a method as well
    -    unescape_bytea = staticmethod(unescape_bytea)
    +    # Auxiliary methods
     
         def _do_debug(self, s):
             """Print a debug message."""
    @@ -202,46 +148,61 @@ def _do_debug(self, s):
                 elif callable(self.debug):
                     self.debug(s)
     
    -    def close(self):
    -        """Close the database connection."""
    -        # Wraps shared library function so we can track state.
    -        if self._closeable:
    -            if self.db:
    -                self.db.close()
    -                self.db = None
    -            else:
    -                raise InternalError('Connection already closed')
    +    def _quote_text(self, d):
    +        """Quote text value."""
    +        if not isinstance(d, basestring):
    +            d = str(d)
    +        return "'%s'" % self.escape_string(d)
     
    -    def reopen(self):
    -        """Reopen connection to the database.
    +    _bool_true = frozenset('t true 1 y yes on'.split())
     
    -        Used in case we need another connection to the same database.
    -        Note that we can still reopen a database that we have closed.
    +    def _quote_bool(self, d):
    +        """Quote boolean value."""
    +        if isinstance(d, basestring):
    +            if not d:
    +                return 'NULL'
    +            d = d.lower() in self._bool_true
    +        else:
    +            d = bool(d)
    +        return ("'f'", "'t'")[d]
     
    -        """
    -        # There is no such shared library function.
    -        if self._closeable:
    -            db = connect(*self._args[0], **self._args[1])
    -            if self.db:
    -                self.db.close()
    -            self.db = db
    +    _date_literals = frozenset('current_date current_time'
    +        ' current_timestamp localtime localtimestamp'.split())
     
    -    def query(self, qstr):
    -        """Executes a SQL command string.
    +    def _quote_date(self, d):
    +        """Quote date value."""
    +        if not d:
    +            return 'NULL'
    +        if isinstance(d, basestring) and d.lower() in self._date_literals:
    +            return d
    +        return self._quote_text(d)
     
    -        This method simply sends a SQL query to the database. If the query is
    -        an insert statement, the return value is the OID of the newly
    -        inserted row.  If it is otherwise a query that does not return a result
    -        (ie. is not a some kind of SELECT statement), it returns None.
    -        Otherwise, it returns a pgqueryobject that can be accessed via the
    -        getresult or dictresult method or simply printed.
    +    def _quote_num(self, d):
    +        """Quote numeric value."""
    +        if not d:
    +            return 'NULL'
    +        return str(d)
     
    -        """
    -        # Wraps shared library function for debugging.
    -        if not self.db:
    -            raise InternalError('Connection is not valid')
    -        self._do_debug(qstr)
    -        return self.db.query(qstr)
    +    def _quote_money(self, d):
    +        """Quote money value."""
    +        if not d:
    +            return 'NULL'
    +        return "'%.2f'" % float(d)
    +
    +    _quote_funcs = dict( # quote methods for each type
    +        text=_quote_text, bool=_quote_bool, date=_quote_date,
    +        int=_quote_num, num=_quote_num, float=_quote_num,
    +        money=_quote_money)
    +
    +    def _quote(self, d, t):
    +        """Return quotes if needed."""
    +        if d is None:
    +            return 'NULL'
    +        try:
    +            quote_func = self._quote_funcs[t]
    +        except KeyError:
    +            quote_func = self._quote_funcs['text']
    +        return quote_func(self, d)
     
         def _split_schema(self, cl):
             """Return schema and name of object separately.
    @@ -282,6 +243,53 @@ def _split_schema(self, cl):
                     schema = 'public'
             return schema, cl
     
    +    # Public methods
    +
    +    # escape_string and escape_bytea exist as methods,
    +    # so we define unescape_bytea as a method as well
    +    unescape_bytea = staticmethod(unescape_bytea)
    +
    +    def close(self):
    +        """Close the database connection."""
    +        # Wraps shared library function so we can track state.
    +        if self._closeable:
    +            if self.db:
    +                self.db.close()
    +                self.db = None
    +            else:
    +                raise InternalError('Connection already closed')
    +
    +    def reopen(self):
    +        """Reopen connection to the database.
    +
    +        Used in case we need another connection to the same database.
    +        Note that we can still reopen a database that we have closed.
    +
    +        """
    +        # There is no such shared library function.
    +        if self._closeable:
    +            db = connect(*self._args[0], **self._args[1])
    +            if self.db:
    +                self.db.close()
    +            self.db = db
    +
    +    def query(self, qstr):
    +        """Executes a SQL command string.
    +
    +        This method simply sends a SQL query to the database. If the query is
    +        an insert statement, the return value is the OID of the newly
    +        inserted row.  If it is otherwise a query that does not return a result
    +        (ie. is not a some kind of SELECT statement), it returns None.
    +        Otherwise, it returns a pgqueryobject that can be accessed via the
    +        getresult or dictresult method or simply printed.
    +
    +        """
    +        # Wraps shared library function for debugging.
    +        if not self.db:
    +            raise InternalError('Connection is not valid')
    +        self._do_debug(qstr)
    +        return self.db.query(qstr)
    +
         def pkey(self, cl, newpkey = None):
             """This method gets or sets the primary key of a class.
     
    @@ -451,12 +459,12 @@ def get(self, cl, arg, keyname = None, view = 0):
             else:
                 q = 'SELECT %s FROM %s WHERE %s=%s LIMIT 1' % \
                     (','.join(fnames.keys()), qcl, \
    -                    keyname, _quote(k, fnames[keyname]))
    +                    keyname, self._quote(k, fnames[keyname]))
             self._do_debug(q)
             res = self.db.query(q).dictresult()
             if not res:
                 raise DatabaseError('No such record in %s where %s=%s'
    -                % (qcl, keyname, _quote(k, fnames[keyname])))
    +                % (qcl, keyname, self._quote(k, fnames[keyname])))
             for k, d in res[0].items():
                 if k == 'oid':
                     k = foid
    @@ -488,7 +496,7 @@ def insert(self, cl, d = None, **kw):
             n = []
             for f in fnames.keys():
                 if f != 'oid' and f in a:
    -                t.append(_quote(a[f], fnames[f]))
    +                t.append(self._quote(a[f], fnames[f]))
                     n.append('"%s"' % f)
             q = 'INSERT INTO %s (%s) VALUES (%s)' % \
                 (qcl, ','.join(n), ','.join(t))
    @@ -548,7 +556,7 @@ def update(self, cl, d = None, **kw):
             fnames = self.get_attnames(qcl)
             for ff in fnames.keys():
                 if ff != 'oid' and ff in a:
    -                v.append('%s=%s' % (ff, _quote(a[ff], fnames[ff])))
    +                v.append('%s=%s' % (ff, self._quote(a[ff], fnames[ff])))
             if v == []:
                 return None
             q = 'UPDATE %s SET %s WHERE %s' % (qcl, ','.join(v), where)
    
  • module/test_pg.py+78 74 modified
    @@ -4,7 +4,7 @@
     #
     # Written by Christoph Zwerschke
     #
    -# $Id: test_pg.py,v 1.16 2008-11-21 19:25:27 cito Exp $
    +# $Id: test_pg.py,v 1.17 2008-11-21 21:17:53 cito Exp $
     #
     
     """Test the classic PyGreSQL interface in the pg module.
    @@ -27,15 +27,17 @@
     import pg
     import unittest
     
    +debug = 0
    +
     # Try to load german locale for Umlaut tests
     german = 1
     try:
         import locale
         locale.setlocale(locale.LC_ALL, ('de', 'latin1'))
    -except:
    +except Exception:
         try:
             locale.setlocale(locale.LC_ALL, 'german')
    -    except:
    +    except Exception:
             german = 0
     
     try:
    @@ -68,71 +70,6 @@ def smart_ddl(conn, cmd):
     class TestAuxiliaryFunctions(unittest.TestCase):
         """Test the auxiliary functions external to the connection class."""
     
    -    def testQuote(self):
    -        f = pg._quote
    -        self.assertEqual(f(None, None), 'NULL')
    -        self.assertEqual(f(None, 'int'), 'NULL')
    -        self.assertEqual(f(None, 'float'), 'NULL')
    -        self.assertEqual(f(None, 'num'), 'NULL')
    -        self.assertEqual(f(None, 'money'), 'NULL')
    -        self.assertEqual(f(None, 'bool'), 'NULL')
    -        self.assertEqual(f(None, 'date'), 'NULL')
    -        self.assertEqual(f('', 'int'), 'NULL')
    -        self.assertEqual(f('', 'float'), 'NULL')
    -        self.assertEqual(f('', 'num'), 'NULL')
    -        self.assertEqual(f('', 'money'), 'NULL')
    -        self.assertEqual(f('', 'bool'), 'NULL')
    -        self.assertEqual(f('', 'date'), 'NULL')
    -        self.assertEqual(f('', 'text'), "''")
    -        self.assertEqual(f(123456789, 'int'), '123456789')
    -        self.assertEqual(f(123456987, 'num'), '123456987')
    -        self.assertEqual(f(1.23654789, 'num'), '1.23654789')
    -        self.assertEqual(f(12365478.9, 'num'), '12365478.9')
    -        self.assertEqual(f('123456789', 'num'), '123456789')
    -        self.assertEqual(f('1.23456789', 'num'), '1.23456789')
    -        self.assertEqual(f('12345678.9', 'num'), '12345678.9')
    -        self.assertEqual(f(123, 'money'), "'123.00'")
    -        self.assertEqual(f('123', 'money'), "'123.00'")
    -        self.assertEqual(f(123.45, 'money'), "'123.45'")
    -        self.assertEqual(f('123.45', 'money'), "'123.45'")
    -        self.assertEqual(f(123.454, 'money'), "'123.45'")
    -        self.assertEqual(f('123.454', 'money'), "'123.45'")
    -        self.assertEqual(f(123.456, 'money'), "'123.46'")
    -        self.assertEqual(f('123.456', 'money'), "'123.46'")
    -        self.assertEqual(f('f', 'bool'), "'f'")
    -        self.assertEqual(f('F', 'bool'), "'f'")
    -        self.assertEqual(f('false', 'bool'), "'f'")
    -        self.assertEqual(f('False', 'bool'), "'f'")
    -        self.assertEqual(f('FALSE', 'bool'), "'f'")
    -        self.assertEqual(f(0, 'bool'), "'f'")
    -        self.assertEqual(f('0', 'bool'), "'f'")
    -        self.assertEqual(f('-', 'bool'), "'f'")
    -        self.assertEqual(f('n', 'bool'), "'f'")
    -        self.assertEqual(f('N', 'bool'), "'f'")
    -        self.assertEqual(f('no', 'bool'), "'f'")
    -        self.assertEqual(f('off', 'bool'), "'f'")
    -        self.assertEqual(f('t', 'bool'), "'t'")
    -        self.assertEqual(f('T', 'bool'), "'t'")
    -        self.assertEqual(f('true', 'bool'), "'t'")
    -        self.assertEqual(f('True', 'bool'), "'t'")
    -        self.assertEqual(f('TRUE', 'bool'), "'t'")
    -        self.assertEqual(f(1, 'bool'), "'t'")
    -        self.assertEqual(f(2, 'bool'), "'t'")
    -        self.assertEqual(f(-1, 'bool'), "'t'")
    -        self.assertEqual(f(0.5, 'bool'), "'t'")
    -        self.assertEqual(f('1', 'bool'), "'t'")
    -        self.assertEqual(f('y', 'bool'), "'t'")
    -        self.assertEqual(f('Y', 'bool'), "'t'")
    -        self.assertEqual(f('yes', 'bool'), "'t'")
    -        self.assertEqual(f('on', 'bool'), "'t'")
    -        self.assertEqual(f('01.01.2000', 'date'), "'01.01.2000'")
    -        self.assertEqual(f(123, 'text'), "'123'")
    -        self.assertEqual(f(1.23, 'text'), "'1.23'")
    -        self.assertEqual(f('abc', 'text'), "'abc'")
    -        self.assertEqual(f("ab'c", 'text'), "'ab''c'")
    -        self.assertEqual(f('ab\\c', 'text'), "'ab\\\\c'")
    -        self.assertEqual(f("a\\b'c", 'text'), "'a\\\\b''c'")
    -
         def testIsQuoted(self):
             f = pg._is_quoted
             self.assert_(f('A'))
    @@ -383,11 +320,11 @@ def testCanConnectTemplate1(self):
             dbname = 'template1'
             try:
                 connection = pg.connect(dbname)
    -        except:
    +        except Exception:
                 self.fail('Cannot connect to database ' + dbname)
             try:
                 connection.close()
    -        except:
    +        except Exception:
                 self.fail('Cannot close the database connection')
     
     
    @@ -465,13 +402,13 @@ def testMethodClose(self):
             try:
                 self.connection.reset()
                 fail('Reset should give an error for a closed connection')
    -        except:
    +        except Exception:
                 pass
             self.assertRaises(pg.InternalError, self.connection.close)
             try:
                 self.connection.query('select 1')
                 self.fail('Query should give an error for a closed connection')
    -        except:
    +        except Exception:
                 pass
             self.connection = pg.connect(self.dbname)
     
    @@ -597,7 +534,7 @@ def testPrint(self):
             stdout, sys.stdout = sys.stdout, s
             try:
                 print r
    -        except:
    +        except Exception:
                 pass
             sys.stdout = stdout
             s.close()
    @@ -775,7 +712,7 @@ def testMethodClose(self):
             try:
                 self.db.reset()
                 fail('Reset should give an error for a closed connection')
    -        except:
    +        except Exception:
                 pass
             self.assertRaises(pg.InternalError, self.db.close)
             self.assertRaises(pg.InternalError, self.db.query, 'select 1')
    @@ -812,6 +749,8 @@ def setUp(self):
             dbname = DBTestSuite.dbname
             self.dbname = dbname
             self.db = pg.DB(dbname)
    +        if debug:
    +            self.db.debug = 'DEBUG: %s'
     
         def tearDown(self):
             self.db.close()
    @@ -838,6 +777,71 @@ def testUnescapeBytea(self):
             self.assertEqual(pg.unescape_bytea(
                 r'O\000ps\377!'), 'O\x00ps\xff!')
     
    +    def testQuote(self):
    +        f = self.db._quote
    +        self.assertEqual(f(None, None), 'NULL')
    +        self.assertEqual(f(None, 'int'), 'NULL')
    +        self.assertEqual(f(None, 'float'), 'NULL')
    +        self.assertEqual(f(None, 'num'), 'NULL')
    +        self.assertEqual(f(None, 'money'), 'NULL')
    +        self.assertEqual(f(None, 'bool'), 'NULL')
    +        self.assertEqual(f(None, 'date'), 'NULL')
    +        self.assertEqual(f('', 'int'), 'NULL')
    +        self.assertEqual(f('', 'float'), 'NULL')
    +        self.assertEqual(f('', 'num'), 'NULL')
    +        self.assertEqual(f('', 'money'), 'NULL')
    +        self.assertEqual(f('', 'bool'), 'NULL')
    +        self.assertEqual(f('', 'date'), 'NULL')
    +        self.assertEqual(f('', 'text'), "''")
    +        self.assertEqual(f(123456789, 'int'), '123456789')
    +        self.assertEqual(f(123456987, 'num'), '123456987')
    +        self.assertEqual(f(1.23654789, 'num'), '1.23654789')
    +        self.assertEqual(f(12365478.9, 'num'), '12365478.9')
    +        self.assertEqual(f('123456789', 'num'), '123456789')
    +        self.assertEqual(f('1.23456789', 'num'), '1.23456789')
    +        self.assertEqual(f('12345678.9', 'num'), '12345678.9')
    +        self.assertEqual(f(123, 'money'), "'123.00'")
    +        self.assertEqual(f('123', 'money'), "'123.00'")
    +        self.assertEqual(f(123.45, 'money'), "'123.45'")
    +        self.assertEqual(f('123.45', 'money'), "'123.45'")
    +        self.assertEqual(f(123.454, 'money'), "'123.45'")
    +        self.assertEqual(f('123.454', 'money'), "'123.45'")
    +        self.assertEqual(f(123.456, 'money'), "'123.46'")
    +        self.assertEqual(f('123.456', 'money'), "'123.46'")
    +        self.assertEqual(f('f', 'bool'), "'f'")
    +        self.assertEqual(f('F', 'bool'), "'f'")
    +        self.assertEqual(f('false', 'bool'), "'f'")
    +        self.assertEqual(f('False', 'bool'), "'f'")
    +        self.assertEqual(f('FALSE', 'bool'), "'f'")
    +        self.assertEqual(f(0, 'bool'), "'f'")
    +        self.assertEqual(f('0', 'bool'), "'f'")
    +        self.assertEqual(f('-', 'bool'), "'f'")
    +        self.assertEqual(f('n', 'bool'), "'f'")
    +        self.assertEqual(f('N', 'bool'), "'f'")
    +        self.assertEqual(f('no', 'bool'), "'f'")
    +        self.assertEqual(f('off', 'bool'), "'f'")
    +        self.assertEqual(f('t', 'bool'), "'t'")
    +        self.assertEqual(f('T', 'bool'), "'t'")
    +        self.assertEqual(f('true', 'bool'), "'t'")
    +        self.assertEqual(f('True', 'bool'), "'t'")
    +        self.assertEqual(f('TRUE', 'bool'), "'t'")
    +        self.assertEqual(f(1, 'bool'), "'t'")
    +        self.assertEqual(f(2, 'bool'), "'t'")
    +        self.assertEqual(f(-1, 'bool'), "'t'")
    +        self.assertEqual(f(0.5, 'bool'), "'t'")
    +        self.assertEqual(f('1', 'bool'), "'t'")
    +        self.assertEqual(f('y', 'bool'), "'t'")
    +        self.assertEqual(f('Y', 'bool'), "'t'")
    +        self.assertEqual(f('yes', 'bool'), "'t'")
    +        self.assertEqual(f('on', 'bool'), "'t'")
    +        self.assertEqual(f('01.01.2000', 'date'), "'01.01.2000'")
    +        self.assertEqual(f(123, 'text'), "'123'")
    +        self.assertEqual(f(1.23, 'text'), "'1.23'")
    +        self.assertEqual(f('abc', 'text'), "'abc'")
    +        self.assertEqual(f("ab'c", 'text'), "'ab''c'")
    +        self.assertEqual(f('ab\\c', 'text'), "'ab\\\\c'")
    +        self.assertEqual(f("a\\b'c", 'text'), "'a\\\\b''c'")
    +
         def testPkey(self):
             smart_ddl(self.db, 'drop table pkeytest0')
             smart_ddl(self.db, "create table pkeytest0 ("
    
  • module/TEST_PyGreSQL_classic.py+1 1 modified
    @@ -138,7 +138,7 @@ def test_update(self):
             self.assertEqual(r['dvar'], 456)
     
         def test_quote(self):
    -        from pg import _quote
    +        _quote = db._quote
             self.assertEqual(_quote(1, 'int'), "1")
             self.assertEqual(_quote(1, 'text'), "'1'")
             self.assertEqual(_quote(1, 'num'), "1")
    
8e19320b1309

Support for PQescapeStringConn and PQescapeByteaConn.

https://github.com/PyGreSQL/PyGreSQLChristoph ZwerschkeNov 21, 2008via ghsa
5 files changed · +108 29
  • docs/future.txt+0 2 modified
    @@ -6,8 +6,6 @@ PyGreSQL future directions
     To Do
     -----
     
    -- Support PQescapeStringConn and PQescapeByteaConn
    -  (see also http://www.postgresql.org/docs/techdocs.49).
     - Use PQescapeStringConn in the _quote() function of pg and pgdb
       (escaping via backslash is not standard and produces warnings
       in newer PostgreSQL versions).
    
  • docs/pg.txt+10 2 modified
    @@ -303,6 +303,8 @@ Description:
       in SQL commands. Certain characters (such as quotes and backslashes)
       must be escaped to prevent them from being interpreted specially
       by the SQL parser. `escape_string` performs this operation.
    +  Note that there is also a `pgobject` method with the same name
    +  which takes connection properties into account.
     
     .. caution:: It is especially important to do proper escaping when
       handling strings that were received from an untrustworthy source.
    @@ -334,6 +336,8 @@ Description:
       Escapes binary data for use within an SQL command with the type `bytea`.
       As with `escape_string`, this is only used when inserting data directly
       into an SQL command string.
    +  Note that there is also a `pgobject` method with the same name
    +  which takes connection properties into account.
     
     Example::
     
    @@ -965,7 +969,9 @@ Return type:
       :str: the escaped string
     
     Description:
    -  See the module function with the same name.
    +  Similar to the module function with the same name, but the
    +  behavior of this method is adjusted depending on the connection properties
    +  (such as character encoding).
     
     escape_bytea - escape binary data for use within SQL as type `bytea`
     --------------------------------------------------------------------
    @@ -980,7 +986,9 @@ Return type:
       :str: the escaped string
     
     Description:
    -  See the module function with the same name.
    +  Similar to the module function with the same name, but the
    +  behavior of this method is adjusted depending on the connection properties
    +  (in particular, whether standard-conforming strings are enabled).
     
     unescape_bytea -- unescape `bytea` data that has been retrieved as text
     -----------------------------------------------------------------------
    
  • module/pgmodule.c+58 2 modified
    @@ -1,5 +1,5 @@
     /*
    - * $Id: pgmodule.c,v 1.84 2008-11-21 17:25:28 cito Exp $
    + * $Id: pgmodule.c,v 1.85 2008-11-21 19:25:27 cito Exp $
      * PyGres, version 2.2 A Python interface for PostgreSQL database. Written by
      * D'Arcy J.M. Cain, (darcy@druid.net).  Based heavily on code written by
      * Pascal Andre, andre@chimay.via.ecp.fr. Copyright (c) 1995, Pascal Andre
    @@ -2724,6 +2724,59 @@ pg_parameter(pgobject * self, PyObject * args)
     	return Py_None;
     }
     
    +/* escape string */
    +static char pg_escape_string__doc__[] =
    +"pg_escape_string(str) -- escape a string for use within SQL.";
    +
    +static PyObject *
    +pg_escape_string(pgobject *self, PyObject *args) {
    +	char *from; /* our string argument */
    +	char *to=NULL; /* the result */
    +	int from_length; /* length of string */
    +	int to_length; /* length of result */
    +	PyObject *ret; /* string object to return */
    +
    +	if (!PyArg_ParseTuple(args, "s#", &from, &from_length))
    +		return NULL;
    +	to_length = 2*from_length + 1;
    +	if (to_length < from_length) { /* overflow */
    +		to_length = from_length;
    +		from_length = (from_length - 1)/2;
    +	}
    +	to = (char *)malloc(to_length);
    +	to_length = (int)PQescapeStringConn(self->cnx,
    +		to, from, (size_t)from_length, NULL);
    +	ret = Py_BuildValue("s#", to, to_length);
    +	if (to)
    +		free(to);
    +	if (!ret) /* pass on exception */
    +		return NULL;
    +	return ret;
    +}
    +
    +/* escape bytea */
    +static char pg_escape_bytea__doc__[] =
    +"pg_escape_bytea(data) -- escape binary data for use within SQL as type bytea.";
    +
    +static PyObject *
    +pg_escape_bytea(pgobject *self, PyObject *args) {
    +	unsigned char *from; /* our string argument */
    +	unsigned char *to; /* the result */
    +	int from_length; /* length of string */
    +	size_t to_length; /* length of result */
    +	PyObject *ret; /* string object to return */
    +
    +	if (!PyArg_ParseTuple(args, "s#", &from, &from_length))
    +		return NULL;
    +	to = PQescapeByteaConn(self->cnx, from, (int)from_length, &to_length);
    +	ret = Py_BuildValue("s", to);
    +	if (to)
    +		PQfreemem((void *)to);
    +	if (!ret) /* pass on exception */
    +		return NULL;
    +	return ret;
    +}
    +
     #ifdef LARGE_OBJECTS
     /* creates large object */
     static char pg_locreate__doc__[] =
    @@ -2821,7 +2874,6 @@ pg_loimport(pgobject * self, PyObject * args)
     }
     #endif /* LARGE_OBJECTS */
     
    -
     /* connection object methods */
     static struct PyMethodDef pgobj_methods[] = {
     	{"source", (PyCFunction) pg_source, METH_VARARGS, pg_source__doc__},
    @@ -2838,6 +2890,10 @@ static struct PyMethodDef pgobj_methods[] = {
     			pg_transaction__doc__},
     	{"parameter", (PyCFunction) pg_parameter, METH_VARARGS,
     			pg_parameter__doc__},
    +	{"escape_string", (PyCFunction) pg_escape_string, METH_VARARGS,
    +			pg_escape_string__doc__},
    +	{"escape_bytea", (PyCFunction) pg_escape_bytea, METH_VARARGS,
    +			pg_escape_bytea__doc__},
     
     #ifdef DIRECT_ACCESS
     	{"putline", (PyCFunction) pg_putline, 1, pg_putline__doc__},
    
  • module/pg.py+4 4 modified
    @@ -5,7 +5,7 @@
     # Written by D'Arcy J.M. Cain
     # Improved by Christoph Zwerschke
     #
    -# $Id: pg.py,v 1.61 2008-11-21 13:10:51 cito Exp $
    +# $Id: pg.py,v 1.62 2008-11-21 19:25:27 cito Exp $
     #
     
     """PyGreSQL classic interface.
    @@ -188,9 +188,9 @@ def __getattr__(self, name):
             else:
                 raise InternalError('Connection is not valid')
     
    -    # For convenience, define some module functions as static methods also:
    -    escape_string, escape_bytea, unescape_bytea = map(staticmethod,
    -        (escape_string, escape_bytea, unescape_bytea))
    +    # escape_string and escape_bytea exist as methods,
    +    # so we define unescape_bytea as a method as well
    +    unescape_bytea = staticmethod(unescape_bytea)
     
         def _do_debug(self, s):
             """Print a debug message."""
    
  • module/test_pg.py+36 19 modified
    @@ -4,7 +4,7 @@
     #
     # Written by Christoph Zwerschke
     #
    -# $Id: test_pg.py,v 1.15 2008-11-21 13:00:21 cito Exp $
    +# $Id: test_pg.py,v 1.16 2008-11-21 19:25:27 cito Exp $
     #
     
     """Test the classic PyGreSQL interface in the pg module.
    @@ -354,18 +354,24 @@ class TestEscapeFunctions(unittest.TestCase):
         """"Test pg escape and unescape functions."""
     
         def testEscapeString(self):
    -        self.assertEqual(pg.escape_string('hello'), 'hello')
    +        self.assertEqual(pg.escape_string('plain'), 'plain')
    +        self.assertEqual(pg.escape_string(
    +            "that's k\xe4se"), "that''s k\xe4se")
             self.assertEqual(pg.escape_string(
                 r"It's fine to have a \ inside."),
                 r"It''s fine to have a \\ inside.")
     
         def testEscapeBytea(self):
    -        self.assertEqual(pg.escape_bytea('hello'), 'hello')
    +        self.assertEqual(pg.escape_bytea('plain'), 'plain')
    +        self.assertEqual(pg.escape_bytea(
    +            "that's k\xe4se"), "that''s k\\\\344se")
             self.assertEqual(pg.escape_bytea(
                 'O\x00ps\xff!'), r'O\\000ps\\377!')
     
         def testUnescapeBytea(self):
    -        self.assertEqual(pg.unescape_bytea('hello'), 'hello')
    +        self.assertEqual(pg.unescape_bytea('plain'), 'plain')
    +        self.assertEqual(pg.unescape_bytea(
    +            "that's k\\344se"), "that's k\xe4se")
             self.assertEqual(pg.unescape_bytea(
                 r'O\000ps\377!'), 'O\x00ps\xff!')
     
    @@ -405,18 +411,13 @@ def testAllConnectAttributes(self):
     
         def testAllConnectMethods(self):
             methods = ['cancel', 'close', 'endcopy',
    +            'escape_bytea', 'escape_string',
                 'fileno', 'getline', 'getlo', 'getnotify',
                 'inserttable', 'locreate', 'loimport',
                 'parameter', 'putline', 'query', 'reset',
                 'source', 'transaction']
             connection_methods = [a for a in dir(self.connection)
                 if callable(eval("self.connection." + a))]
    -        if 'transaction' not in connection_methods:
    -            # this may be the case for PostgreSQL < 7.4
    -            connection_methods.append('transaction')
    -        if 'parameter' not in connection_methods:
    -            # this may be the case for PostgreSQL < 7.4
    -            connection_methods.append('parameter')
             self.assertEqual(methods, connection_methods)
     
         def testAttributeDb(self):
    @@ -695,12 +696,6 @@ def testAllDBAttributes(self):
                 'update', 'user']
             db_attributes = [a for a in dir(self.db)
                 if not a.startswith('_')]
    -        if 'transaction' not in db_attributes:
    -            # this may be the case for PostgreSQL < 7.4
    -            db_attributes.insert(-4, 'transaction')
    -        if 'parameter' not in db_attributes:
    -            # this may be the case for PostgreSQL < 7.4
    -            db_attributes.insert(-12, 'parameter')
             self.assertEqual(attributes, db_attributes)
     
         def testAttributeDb(self):
    @@ -745,13 +740,26 @@ def testAttributeUser(self):
             self.assertEqual(self.db.db.user, def_user)
     
         def testMethodEscapeString(self):
    -        self.assertEqual(self.db.escape_string('hello'), 'hello')
    +        self.assertEqual(self.db.escape_string("plain"), "plain")
    +        self.assertEqual(self.db.escape_string(
    +            "that's k\xe4se"), "that''s k\xe4se")
    +        self.assertEqual(self.db.escape_string(
    +            r"It's fine to have a \ inside."),
    +            r"It''s fine to have a \\ inside.")
     
         def testMethodEscapeBytea(self):
    -        self.assertEqual(self.db.escape_bytea('hello'), 'hello')
    +        self.assertEqual(self.db.escape_bytea("plain"), "plain")
    +        self.assertEqual(self.db.escape_bytea(
    +            "that's k\xe4se"), "that''s k\\\\344se")
    +        self.assertEqual(self.db.escape_bytea(
    +            'O\x00ps\xff!'), r'O\\000ps\\377!')
     
         def testMethodUnescapeBytea(self):
    -        self.assertEqual(self.db.unescape_bytea('hello'), 'hello')
    +        self.assertEqual(self.db.unescape_bytea("plain"), "plain")
    +        self.assertEqual(self.db.unescape_bytea(
    +            "that's k\\344se"), "that's k\xe4se")
    +        self.assertEqual(pg.unescape_bytea(
    +            r'O\000ps\377!'), 'O\x00ps\xff!')
     
         def testMethodQuery(self):
             self.db.query("select 1+1")
    @@ -809,16 +817,25 @@ def tearDown(self):
             self.db.close()
     
         def testEscapeString(self):
    +        self.assertEqual(self.db.escape_string("plain"), "plain")
    +        self.assertEqual(self.db.escape_string(
    +            "that's k\xe4se"), "that''s k\xe4se")
             self.assertEqual(self.db.escape_string(
                 r"It's fine to have a \ inside."),
                 r"It''s fine to have a \\ inside.")
     
         def testEscapeBytea(self):
    +        self.assertEqual(self.db.escape_bytea("plain"), "plain")
    +        self.assertEqual(self.db.escape_bytea(
    +            "that's k\xe4se"), "that''s k\\\\344se")
             self.assertEqual(self.db.escape_bytea(
                 'O\x00ps\xff!'), r'O\\000ps\\377!')
     
         def testUnescapeBytea(self):
    +        self.assertEqual(self.db.unescape_bytea("plain"), "plain")
             self.assertEqual(self.db.unescape_bytea(
    +            "that's k\\344se"), "that's k\xe4se")
    +        self.assertEqual(pg.unescape_bytea(
                 r'O\000ps\377!'), 'O\x00ps\xff!')
     
         def testPkey(self):
    

Vulnerability mechanics

Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

9

News mentions

0

No linked articles in our index yet.