High severityNVD Advisory· Published Oct 22, 2009· Updated Apr 23, 2026
CVE-2009-2940
CVE-2009-2940
Description
The pygresql module 3.8.1 and 4.0 for Python does not properly support the PQescapeStringConn function, which might allow remote attackers to leverage escaping issues involving multibyte character encodings.
Affected packages
Versions sourced from the GitHub Security Advisory.
| Package | Affected versions | Patched versions |
|---|---|---|
PyGreSQLPyPI | <= 3.8.1 | — |
PyGreSQLPyPI | >= 4.0, < 4.1 | 4.1 |
Affected products
2Patches
2f7237d773e6fAdded support for PQescapeStringConn() and PQescapeByteaConn(). Promoted the _quote() functions to methods of the connection. Made them use the former functions instead of manually and faulitly handling backslashes and quotes.
6 files changed · +227 −214
docs/changelog.txt+3 −1 modified@@ -19,7 +19,9 @@ Version 4.0 (2008-12-01) - Allow DB wrapper to be used with DB-API 2 connections (as suggested by Chris Hilton). - Made private attributes of DB wrapper accessible. -- Dropped dependence on mx.DateTime module +- Dropped dependence on mx.DateTime module. +- Support for PQescapeStringConn() and PQescapeByteaConn(); + these are now also used by the internal _quote() functions. Version 3.8.1 (2006-06-05) --------------------------
docs/future.txt+0 −4 modified@@ -6,13 +6,9 @@ PyGreSQL future directions To Do ----- -- Use PQescapeStringConn in the _quote() function of pg and pgdb - (escaping via backslash is not standard and produces warnings - in newer PostgreSQL versions). - Code marked with XXX in the pg module - pg relies on OIDs, but these are not generated by default any more (at least docs should recommend setting default_with_oids=true). -- install.txt talks about MSVC 2003, we should try using 2005/2008 - 64 bit int typecasting in pgdb module (see http://mailman.vex.net/pipermail/pygresql/2008-March/001943.html). - Support composite primary keys
module/pgdb.py+43 −40 modified@@ -4,7 +4,7 @@ # # Written by D'Arcy J.M. Cain # -# $Id: pgdb.py,v 1.47 2008-11-21 13:10:51 cito Exp $ +# $Id: pgdb.py,v 1.48 2008-11-21 21:17:53 cito Exp $ # """pgdb - DB-API 2.0 compliant module for PygreSQL. @@ -145,47 +145,15 @@ def getdescr(self, oid): return res -class _quoteitem(dict): - """Dictionary with auto quoting of its items.""" +class _quotedict(dict): + """Dictionary with auto quoting of its items. - def __getitem__(self, key): - return _quote(super(_quoteitem, self).__getitem__(key)) - - -def _quote(val): - """Quote value depending on its type.""" - if isinstance(val, datetime): - val = str(val) - elif isinstance(val, unicode): - val = val.encode( 'utf-8' ) - if isinstance(val, str): - val = "'%s'" % str(val).replace("\\", "\\\\").replace("'", "''") - elif isinstance(val, (int, long, float)): - pass - elif val is None: - val = 'NULL' - elif isinstance(val, (list, tuple)): - val = '(%s)' % ','.join(map(lambda v: str(_quote(v)), val)) - elif Decimal is not float and isinstance(val, Decimal): - pass - elif hasattr(val, '__pg_repr__'): - val = val.__pg_repr__() - else: - raise InterfaceError('do not know how to handle type %s' % type(val)) - return val - - -def _quoteparams(string, params): - """Quote parameters. - - This function works for both mappings and sequences. + The quote attribute must be set to the desired quote function. """ - if hasattr(params, 'has_key'): - params = _quoteitem(params) - else: - params = tuple(map(_quote, params)) - return string % params + + def __getitem__(self, key): + return self.quote(super(_quotedict, self).__getitem__(key)) ### Cursor Object @@ -204,6 +172,41 @@ def __init__(self, dbcnx): self.arraysize = 1 self.lastrowid = None + def _quote(self, val): + """Quote value depending on its type.""" + if isinstance(val, datetime): + val = str(val) + elif isinstance(val, unicode): + val = val.encode( 'utf8' ) + if isinstance(val, str): + val = "'%s'" % self._cnx.escape_string(val) + elif isinstance(val, (int, long, float)): + pass + elif val is None: + val = 'NULL' + elif isinstance(val, (list, tuple)): + val = '(%s)' % ','.join(map(lambda v: str(self._quote(v)), val)) + elif Decimal is not float and isinstance(val, Decimal): + pass + elif hasattr(val, '__pg_repr__'): + val = val.__pg_repr__() + else: + raise InterfaceError('do not know how to handle type %s' % type(val)) + return val + + def _quoteparams(self, string, params): + """Quote parameters. + + This function works for both mappings and sequences. + + """ + if isinstance(params, dict): + params = _quotedict(params) + params.quote = self._quote + else: + params = tuple(map(self._quote, params)) + return string % params + def row_factory(row): """Process rows before they are returned. @@ -260,7 +263,7 @@ def executemany(self, operation, param_seq): self._dbcnx._tnx = True for params in param_seq: if params: - sql = _quoteparams(operation, params) + sql = self._quoteparams(operation, params) else: sql = operation rows = self._src.execute(sql)
module/pg.py+102 −94 modified@@ -5,7 +5,7 @@ # Written by D'Arcy J.M. Cain # Improved by Christoph Zwerschke # -# $Id: pg.py,v 1.62 2008-11-21 19:25:27 cito Exp $ +# $Id: pg.py,v 1.63 2008-11-21 21:17:53 cito Exp $ # """PyGreSQL classic interface. @@ -32,58 +32,6 @@ # Auxiliary functions which are independent from a DB connection: -def _quote_text(d): - """Quote text value.""" - if not isinstance(d, basestring): - d = str(d) - return "'%s'" % d.replace("\\", "\\\\").replace("'", "''") - -_bool_true = frozenset('t true 1 y yes on'.split()) - -def _quote_bool(d): - """Quote boolean value.""" - if isinstance(d, basestring): - if not d: - return 'NULL' - d = d.lower() in _bool_true - else: - d = bool(d) - return ("'f'", "'t'")[d] - -_date_literal = frozenset('current_date current_time' - ' current_timestamp localtime localtimestamp'.split()) - -def _quote_date(d): - """Quote date value.""" - if not d: - return 'NULL' - if isinstance(d, basestring) and d.lower() in _date_literal: - return d - return _quote_text(d) - -def _quote_num(d): - """Quote numeric value.""" - if not d: - return 'NULL' - return str(d) - -def _quote_money(d): - """Quote money value.""" - if not d: - return 'NULL' - return "'%.2f'" % float(d) - -_quote_funcs = dict( # quote functions for each type - text=_quote_text, bool=_quote_bool, date=_quote_date, - int=_quote_num, num=_quote_num, float=_quote_num, money=_quote_money) - -def _quote(d, t): - """Return quotes if needed.""" - if d is None: - return 'NULL' - else: - return _quote_funcs.get(t, _quote_text)(d) - def _is_quoted(s): """Check whether this string is a quoted identifier.""" s = s.replace('_', 'a') @@ -188,9 +136,7 @@ def __getattr__(self, name): else: raise InternalError('Connection is not valid') - # escape_string and escape_bytea exist as methods, - # so we define unescape_bytea as a method as well - unescape_bytea = staticmethod(unescape_bytea) + # Auxiliary methods def _do_debug(self, s): """Print a debug message.""" @@ -202,46 +148,61 @@ def _do_debug(self, s): elif callable(self.debug): self.debug(s) - def close(self): - """Close the database connection.""" - # Wraps shared library function so we can track state. - if self._closeable: - if self.db: - self.db.close() - self.db = None - else: - raise InternalError('Connection already closed') + def _quote_text(self, d): + """Quote text value.""" + if not isinstance(d, basestring): + d = str(d) + return "'%s'" % self.escape_string(d) - def reopen(self): - """Reopen connection to the database. + _bool_true = frozenset('t true 1 y yes on'.split()) - Used in case we need another connection to the same database. - Note that we can still reopen a database that we have closed. + def _quote_bool(self, d): + """Quote boolean value.""" + if isinstance(d, basestring): + if not d: + return 'NULL' + d = d.lower() in self._bool_true + else: + d = bool(d) + return ("'f'", "'t'")[d] - """ - # There is no such shared library function. - if self._closeable: - db = connect(*self._args[0], **self._args[1]) - if self.db: - self.db.close() - self.db = db + _date_literals = frozenset('current_date current_time' + ' current_timestamp localtime localtimestamp'.split()) - def query(self, qstr): - """Executes a SQL command string. + def _quote_date(self, d): + """Quote date value.""" + if not d: + return 'NULL' + if isinstance(d, basestring) and d.lower() in self._date_literals: + return d + return self._quote_text(d) - This method simply sends a SQL query to the database. If the query is - an insert statement, the return value is the OID of the newly - inserted row. If it is otherwise a query that does not return a result - (ie. is not a some kind of SELECT statement), it returns None. - Otherwise, it returns a pgqueryobject that can be accessed via the - getresult or dictresult method or simply printed. + def _quote_num(self, d): + """Quote numeric value.""" + if not d: + return 'NULL' + return str(d) - """ - # Wraps shared library function for debugging. - if not self.db: - raise InternalError('Connection is not valid') - self._do_debug(qstr) - return self.db.query(qstr) + def _quote_money(self, d): + """Quote money value.""" + if not d: + return 'NULL' + return "'%.2f'" % float(d) + + _quote_funcs = dict( # quote methods for each type + text=_quote_text, bool=_quote_bool, date=_quote_date, + int=_quote_num, num=_quote_num, float=_quote_num, + money=_quote_money) + + def _quote(self, d, t): + """Return quotes if needed.""" + if d is None: + return 'NULL' + try: + quote_func = self._quote_funcs[t] + except KeyError: + quote_func = self._quote_funcs['text'] + return quote_func(self, d) def _split_schema(self, cl): """Return schema and name of object separately. @@ -282,6 +243,53 @@ def _split_schema(self, cl): schema = 'public' return schema, cl + # Public methods + + # escape_string and escape_bytea exist as methods, + # so we define unescape_bytea as a method as well + unescape_bytea = staticmethod(unescape_bytea) + + def close(self): + """Close the database connection.""" + # Wraps shared library function so we can track state. + if self._closeable: + if self.db: + self.db.close() + self.db = None + else: + raise InternalError('Connection already closed') + + def reopen(self): + """Reopen connection to the database. + + Used in case we need another connection to the same database. + Note that we can still reopen a database that we have closed. + + """ + # There is no such shared library function. + if self._closeable: + db = connect(*self._args[0], **self._args[1]) + if self.db: + self.db.close() + self.db = db + + def query(self, qstr): + """Executes a SQL command string. + + This method simply sends a SQL query to the database. If the query is + an insert statement, the return value is the OID of the newly + inserted row. If it is otherwise a query that does not return a result + (ie. is not a some kind of SELECT statement), it returns None. + Otherwise, it returns a pgqueryobject that can be accessed via the + getresult or dictresult method or simply printed. + + """ + # Wraps shared library function for debugging. + if not self.db: + raise InternalError('Connection is not valid') + self._do_debug(qstr) + return self.db.query(qstr) + def pkey(self, cl, newpkey = None): """This method gets or sets the primary key of a class. @@ -451,12 +459,12 @@ def get(self, cl, arg, keyname = None, view = 0): else: q = 'SELECT %s FROM %s WHERE %s=%s LIMIT 1' % \ (','.join(fnames.keys()), qcl, \ - keyname, _quote(k, fnames[keyname])) + keyname, self._quote(k, fnames[keyname])) self._do_debug(q) res = self.db.query(q).dictresult() if not res: raise DatabaseError('No such record in %s where %s=%s' - % (qcl, keyname, _quote(k, fnames[keyname]))) + % (qcl, keyname, self._quote(k, fnames[keyname]))) for k, d in res[0].items(): if k == 'oid': k = foid @@ -488,7 +496,7 @@ def insert(self, cl, d = None, **kw): n = [] for f in fnames.keys(): if f != 'oid' and f in a: - t.append(_quote(a[f], fnames[f])) + t.append(self._quote(a[f], fnames[f])) n.append('"%s"' % f) q = 'INSERT INTO %s (%s) VALUES (%s)' % \ (qcl, ','.join(n), ','.join(t)) @@ -548,7 +556,7 @@ def update(self, cl, d = None, **kw): fnames = self.get_attnames(qcl) for ff in fnames.keys(): if ff != 'oid' and ff in a: - v.append('%s=%s' % (ff, _quote(a[ff], fnames[ff]))) + v.append('%s=%s' % (ff, self._quote(a[ff], fnames[ff]))) if v == []: return None q = 'UPDATE %s SET %s WHERE %s' % (qcl, ','.join(v), where)
module/test_pg.py+78 −74 modified@@ -4,7 +4,7 @@ # # Written by Christoph Zwerschke # -# $Id: test_pg.py,v 1.16 2008-11-21 19:25:27 cito Exp $ +# $Id: test_pg.py,v 1.17 2008-11-21 21:17:53 cito Exp $ # """Test the classic PyGreSQL interface in the pg module. @@ -27,15 +27,17 @@ import pg import unittest +debug = 0 + # Try to load german locale for Umlaut tests german = 1 try: import locale locale.setlocale(locale.LC_ALL, ('de', 'latin1')) -except: +except Exception: try: locale.setlocale(locale.LC_ALL, 'german') - except: + except Exception: german = 0 try: @@ -68,71 +70,6 @@ def smart_ddl(conn, cmd): class TestAuxiliaryFunctions(unittest.TestCase): """Test the auxiliary functions external to the connection class.""" - def testQuote(self): - f = pg._quote - self.assertEqual(f(None, None), 'NULL') - self.assertEqual(f(None, 'int'), 'NULL') - self.assertEqual(f(None, 'float'), 'NULL') - self.assertEqual(f(None, 'num'), 'NULL') - self.assertEqual(f(None, 'money'), 'NULL') - self.assertEqual(f(None, 'bool'), 'NULL') - self.assertEqual(f(None, 'date'), 'NULL') - self.assertEqual(f('', 'int'), 'NULL') - self.assertEqual(f('', 'float'), 'NULL') - self.assertEqual(f('', 'num'), 'NULL') - self.assertEqual(f('', 'money'), 'NULL') - self.assertEqual(f('', 'bool'), 'NULL') - self.assertEqual(f('', 'date'), 'NULL') - self.assertEqual(f('', 'text'), "''") - self.assertEqual(f(123456789, 'int'), '123456789') - self.assertEqual(f(123456987, 'num'), '123456987') - self.assertEqual(f(1.23654789, 'num'), '1.23654789') - self.assertEqual(f(12365478.9, 'num'), '12365478.9') - self.assertEqual(f('123456789', 'num'), '123456789') - self.assertEqual(f('1.23456789', 'num'), '1.23456789') - self.assertEqual(f('12345678.9', 'num'), '12345678.9') - self.assertEqual(f(123, 'money'), "'123.00'") - self.assertEqual(f('123', 'money'), "'123.00'") - self.assertEqual(f(123.45, 'money'), "'123.45'") - self.assertEqual(f('123.45', 'money'), "'123.45'") - self.assertEqual(f(123.454, 'money'), "'123.45'") - self.assertEqual(f('123.454', 'money'), "'123.45'") - self.assertEqual(f(123.456, 'money'), "'123.46'") - self.assertEqual(f('123.456', 'money'), "'123.46'") - self.assertEqual(f('f', 'bool'), "'f'") - self.assertEqual(f('F', 'bool'), "'f'") - self.assertEqual(f('false', 'bool'), "'f'") - self.assertEqual(f('False', 'bool'), "'f'") - self.assertEqual(f('FALSE', 'bool'), "'f'") - self.assertEqual(f(0, 'bool'), "'f'") - self.assertEqual(f('0', 'bool'), "'f'") - self.assertEqual(f('-', 'bool'), "'f'") - self.assertEqual(f('n', 'bool'), "'f'") - self.assertEqual(f('N', 'bool'), "'f'") - self.assertEqual(f('no', 'bool'), "'f'") - self.assertEqual(f('off', 'bool'), "'f'") - self.assertEqual(f('t', 'bool'), "'t'") - self.assertEqual(f('T', 'bool'), "'t'") - self.assertEqual(f('true', 'bool'), "'t'") - self.assertEqual(f('True', 'bool'), "'t'") - self.assertEqual(f('TRUE', 'bool'), "'t'") - self.assertEqual(f(1, 'bool'), "'t'") - self.assertEqual(f(2, 'bool'), "'t'") - self.assertEqual(f(-1, 'bool'), "'t'") - self.assertEqual(f(0.5, 'bool'), "'t'") - self.assertEqual(f('1', 'bool'), "'t'") - self.assertEqual(f('y', 'bool'), "'t'") - self.assertEqual(f('Y', 'bool'), "'t'") - self.assertEqual(f('yes', 'bool'), "'t'") - self.assertEqual(f('on', 'bool'), "'t'") - self.assertEqual(f('01.01.2000', 'date'), "'01.01.2000'") - self.assertEqual(f(123, 'text'), "'123'") - self.assertEqual(f(1.23, 'text'), "'1.23'") - self.assertEqual(f('abc', 'text'), "'abc'") - self.assertEqual(f("ab'c", 'text'), "'ab''c'") - self.assertEqual(f('ab\\c', 'text'), "'ab\\\\c'") - self.assertEqual(f("a\\b'c", 'text'), "'a\\\\b''c'") - def testIsQuoted(self): f = pg._is_quoted self.assert_(f('A')) @@ -383,11 +320,11 @@ def testCanConnectTemplate1(self): dbname = 'template1' try: connection = pg.connect(dbname) - except: + except Exception: self.fail('Cannot connect to database ' + dbname) try: connection.close() - except: + except Exception: self.fail('Cannot close the database connection') @@ -465,13 +402,13 @@ def testMethodClose(self): try: self.connection.reset() fail('Reset should give an error for a closed connection') - except: + except Exception: pass self.assertRaises(pg.InternalError, self.connection.close) try: self.connection.query('select 1') self.fail('Query should give an error for a closed connection') - except: + except Exception: pass self.connection = pg.connect(self.dbname) @@ -597,7 +534,7 @@ def testPrint(self): stdout, sys.stdout = sys.stdout, s try: print r - except: + except Exception: pass sys.stdout = stdout s.close() @@ -775,7 +712,7 @@ def testMethodClose(self): try: self.db.reset() fail('Reset should give an error for a closed connection') - except: + except Exception: pass self.assertRaises(pg.InternalError, self.db.close) self.assertRaises(pg.InternalError, self.db.query, 'select 1') @@ -812,6 +749,8 @@ def setUp(self): dbname = DBTestSuite.dbname self.dbname = dbname self.db = pg.DB(dbname) + if debug: + self.db.debug = 'DEBUG: %s' def tearDown(self): self.db.close() @@ -838,6 +777,71 @@ def testUnescapeBytea(self): self.assertEqual(pg.unescape_bytea( r'O\000ps\377!'), 'O\x00ps\xff!') + def testQuote(self): + f = self.db._quote + self.assertEqual(f(None, None), 'NULL') + self.assertEqual(f(None, 'int'), 'NULL') + self.assertEqual(f(None, 'float'), 'NULL') + self.assertEqual(f(None, 'num'), 'NULL') + self.assertEqual(f(None, 'money'), 'NULL') + self.assertEqual(f(None, 'bool'), 'NULL') + self.assertEqual(f(None, 'date'), 'NULL') + self.assertEqual(f('', 'int'), 'NULL') + self.assertEqual(f('', 'float'), 'NULL') + self.assertEqual(f('', 'num'), 'NULL') + self.assertEqual(f('', 'money'), 'NULL') + self.assertEqual(f('', 'bool'), 'NULL') + self.assertEqual(f('', 'date'), 'NULL') + self.assertEqual(f('', 'text'), "''") + self.assertEqual(f(123456789, 'int'), '123456789') + self.assertEqual(f(123456987, 'num'), '123456987') + self.assertEqual(f(1.23654789, 'num'), '1.23654789') + self.assertEqual(f(12365478.9, 'num'), '12365478.9') + self.assertEqual(f('123456789', 'num'), '123456789') + self.assertEqual(f('1.23456789', 'num'), '1.23456789') + self.assertEqual(f('12345678.9', 'num'), '12345678.9') + self.assertEqual(f(123, 'money'), "'123.00'") + self.assertEqual(f('123', 'money'), "'123.00'") + self.assertEqual(f(123.45, 'money'), "'123.45'") + self.assertEqual(f('123.45', 'money'), "'123.45'") + self.assertEqual(f(123.454, 'money'), "'123.45'") + self.assertEqual(f('123.454', 'money'), "'123.45'") + self.assertEqual(f(123.456, 'money'), "'123.46'") + self.assertEqual(f('123.456', 'money'), "'123.46'") + self.assertEqual(f('f', 'bool'), "'f'") + self.assertEqual(f('F', 'bool'), "'f'") + self.assertEqual(f('false', 'bool'), "'f'") + self.assertEqual(f('False', 'bool'), "'f'") + self.assertEqual(f('FALSE', 'bool'), "'f'") + self.assertEqual(f(0, 'bool'), "'f'") + self.assertEqual(f('0', 'bool'), "'f'") + self.assertEqual(f('-', 'bool'), "'f'") + self.assertEqual(f('n', 'bool'), "'f'") + self.assertEqual(f('N', 'bool'), "'f'") + self.assertEqual(f('no', 'bool'), "'f'") + self.assertEqual(f('off', 'bool'), "'f'") + self.assertEqual(f('t', 'bool'), "'t'") + self.assertEqual(f('T', 'bool'), "'t'") + self.assertEqual(f('true', 'bool'), "'t'") + self.assertEqual(f('True', 'bool'), "'t'") + self.assertEqual(f('TRUE', 'bool'), "'t'") + self.assertEqual(f(1, 'bool'), "'t'") + self.assertEqual(f(2, 'bool'), "'t'") + self.assertEqual(f(-1, 'bool'), "'t'") + self.assertEqual(f(0.5, 'bool'), "'t'") + self.assertEqual(f('1', 'bool'), "'t'") + self.assertEqual(f('y', 'bool'), "'t'") + self.assertEqual(f('Y', 'bool'), "'t'") + self.assertEqual(f('yes', 'bool'), "'t'") + self.assertEqual(f('on', 'bool'), "'t'") + self.assertEqual(f('01.01.2000', 'date'), "'01.01.2000'") + self.assertEqual(f(123, 'text'), "'123'") + self.assertEqual(f(1.23, 'text'), "'1.23'") + self.assertEqual(f('abc', 'text'), "'abc'") + self.assertEqual(f("ab'c", 'text'), "'ab''c'") + self.assertEqual(f('ab\\c', 'text'), "'ab\\\\c'") + self.assertEqual(f("a\\b'c", 'text'), "'a\\\\b''c'") + def testPkey(self): smart_ddl(self.db, 'drop table pkeytest0') smart_ddl(self.db, "create table pkeytest0 ("
module/TEST_PyGreSQL_classic.py+1 −1 modified@@ -138,7 +138,7 @@ def test_update(self): self.assertEqual(r['dvar'], 456) def test_quote(self): - from pg import _quote + _quote = db._quote self.assertEqual(_quote(1, 'int'), "1") self.assertEqual(_quote(1, 'text'), "'1'") self.assertEqual(_quote(1, 'num'), "1")
8e19320b1309Support for PQescapeStringConn and PQescapeByteaConn.
5 files changed · +108 −29
docs/future.txt+0 −2 modified@@ -6,8 +6,6 @@ PyGreSQL future directions To Do ----- -- Support PQescapeStringConn and PQescapeByteaConn - (see also http://www.postgresql.org/docs/techdocs.49). - Use PQescapeStringConn in the _quote() function of pg and pgdb (escaping via backslash is not standard and produces warnings in newer PostgreSQL versions).
docs/pg.txt+10 −2 modified@@ -303,6 +303,8 @@ Description: in SQL commands. Certain characters (such as quotes and backslashes) must be escaped to prevent them from being interpreted specially by the SQL parser. `escape_string` performs this operation. + Note that there is also a `pgobject` method with the same name + which takes connection properties into account. .. caution:: It is especially important to do proper escaping when handling strings that were received from an untrustworthy source. @@ -334,6 +336,8 @@ Description: Escapes binary data for use within an SQL command with the type `bytea`. As with `escape_string`, this is only used when inserting data directly into an SQL command string. + Note that there is also a `pgobject` method with the same name + which takes connection properties into account. Example:: @@ -965,7 +969,9 @@ Return type: :str: the escaped string Description: - See the module function with the same name. + Similar to the module function with the same name, but the + behavior of this method is adjusted depending on the connection properties + (such as character encoding). escape_bytea - escape binary data for use within SQL as type `bytea` -------------------------------------------------------------------- @@ -980,7 +986,9 @@ Return type: :str: the escaped string Description: - See the module function with the same name. + Similar to the module function with the same name, but the + behavior of this method is adjusted depending on the connection properties + (in particular, whether standard-conforming strings are enabled). unescape_bytea -- unescape `bytea` data that has been retrieved as text -----------------------------------------------------------------------
module/pgmodule.c+58 −2 modified@@ -1,5 +1,5 @@ /* - * $Id: pgmodule.c,v 1.84 2008-11-21 17:25:28 cito Exp $ + * $Id: pgmodule.c,v 1.85 2008-11-21 19:25:27 cito Exp $ * PyGres, version 2.2 A Python interface for PostgreSQL database. Written by * D'Arcy J.M. Cain, (darcy@druid.net). Based heavily on code written by * Pascal Andre, andre@chimay.via.ecp.fr. Copyright (c) 1995, Pascal Andre @@ -2724,6 +2724,59 @@ pg_parameter(pgobject * self, PyObject * args) return Py_None; } +/* escape string */ +static char pg_escape_string__doc__[] = +"pg_escape_string(str) -- escape a string for use within SQL."; + +static PyObject * +pg_escape_string(pgobject *self, PyObject *args) { + char *from; /* our string argument */ + char *to=NULL; /* the result */ + int from_length; /* length of string */ + int to_length; /* length of result */ + PyObject *ret; /* string object to return */ + + if (!PyArg_ParseTuple(args, "s#", &from, &from_length)) + return NULL; + to_length = 2*from_length + 1; + if (to_length < from_length) { /* overflow */ + to_length = from_length; + from_length = (from_length - 1)/2; + } + to = (char *)malloc(to_length); + to_length = (int)PQescapeStringConn(self->cnx, + to, from, (size_t)from_length, NULL); + ret = Py_BuildValue("s#", to, to_length); + if (to) + free(to); + if (!ret) /* pass on exception */ + return NULL; + return ret; +} + +/* escape bytea */ +static char pg_escape_bytea__doc__[] = +"pg_escape_bytea(data) -- escape binary data for use within SQL as type bytea."; + +static PyObject * +pg_escape_bytea(pgobject *self, PyObject *args) { + unsigned char *from; /* our string argument */ + unsigned char *to; /* the result */ + int from_length; /* length of string */ + size_t to_length; /* length of result */ + PyObject *ret; /* string object to return */ + + if (!PyArg_ParseTuple(args, "s#", &from, &from_length)) + return NULL; + to = PQescapeByteaConn(self->cnx, from, (int)from_length, &to_length); + ret = Py_BuildValue("s", to); + if (to) + PQfreemem((void *)to); + if (!ret) /* pass on exception */ + return NULL; + return ret; +} + #ifdef LARGE_OBJECTS /* creates large object */ static char pg_locreate__doc__[] = @@ -2821,7 +2874,6 @@ pg_loimport(pgobject * self, PyObject * args) } #endif /* LARGE_OBJECTS */ - /* connection object methods */ static struct PyMethodDef pgobj_methods[] = { {"source", (PyCFunction) pg_source, METH_VARARGS, pg_source__doc__}, @@ -2838,6 +2890,10 @@ static struct PyMethodDef pgobj_methods[] = { pg_transaction__doc__}, {"parameter", (PyCFunction) pg_parameter, METH_VARARGS, pg_parameter__doc__}, + {"escape_string", (PyCFunction) pg_escape_string, METH_VARARGS, + pg_escape_string__doc__}, + {"escape_bytea", (PyCFunction) pg_escape_bytea, METH_VARARGS, + pg_escape_bytea__doc__}, #ifdef DIRECT_ACCESS {"putline", (PyCFunction) pg_putline, 1, pg_putline__doc__},
module/pg.py+4 −4 modified@@ -5,7 +5,7 @@ # Written by D'Arcy J.M. Cain # Improved by Christoph Zwerschke # -# $Id: pg.py,v 1.61 2008-11-21 13:10:51 cito Exp $ +# $Id: pg.py,v 1.62 2008-11-21 19:25:27 cito Exp $ # """PyGreSQL classic interface. @@ -188,9 +188,9 @@ def __getattr__(self, name): else: raise InternalError('Connection is not valid') - # For convenience, define some module functions as static methods also: - escape_string, escape_bytea, unescape_bytea = map(staticmethod, - (escape_string, escape_bytea, unescape_bytea)) + # escape_string and escape_bytea exist as methods, + # so we define unescape_bytea as a method as well + unescape_bytea = staticmethod(unescape_bytea) def _do_debug(self, s): """Print a debug message."""
module/test_pg.py+36 −19 modified@@ -4,7 +4,7 @@ # # Written by Christoph Zwerschke # -# $Id: test_pg.py,v 1.15 2008-11-21 13:00:21 cito Exp $ +# $Id: test_pg.py,v 1.16 2008-11-21 19:25:27 cito Exp $ # """Test the classic PyGreSQL interface in the pg module. @@ -354,18 +354,24 @@ class TestEscapeFunctions(unittest.TestCase): """"Test pg escape and unescape functions.""" def testEscapeString(self): - self.assertEqual(pg.escape_string('hello'), 'hello') + self.assertEqual(pg.escape_string('plain'), 'plain') + self.assertEqual(pg.escape_string( + "that's k\xe4se"), "that''s k\xe4se") self.assertEqual(pg.escape_string( r"It's fine to have a \ inside."), r"It''s fine to have a \\ inside.") def testEscapeBytea(self): - self.assertEqual(pg.escape_bytea('hello'), 'hello') + self.assertEqual(pg.escape_bytea('plain'), 'plain') + self.assertEqual(pg.escape_bytea( + "that's k\xe4se"), "that''s k\\\\344se") self.assertEqual(pg.escape_bytea( 'O\x00ps\xff!'), r'O\\000ps\\377!') def testUnescapeBytea(self): - self.assertEqual(pg.unescape_bytea('hello'), 'hello') + self.assertEqual(pg.unescape_bytea('plain'), 'plain') + self.assertEqual(pg.unescape_bytea( + "that's k\\344se"), "that's k\xe4se") self.assertEqual(pg.unescape_bytea( r'O\000ps\377!'), 'O\x00ps\xff!') @@ -405,18 +411,13 @@ def testAllConnectAttributes(self): def testAllConnectMethods(self): methods = ['cancel', 'close', 'endcopy', + 'escape_bytea', 'escape_string', 'fileno', 'getline', 'getlo', 'getnotify', 'inserttable', 'locreate', 'loimport', 'parameter', 'putline', 'query', 'reset', 'source', 'transaction'] connection_methods = [a for a in dir(self.connection) if callable(eval("self.connection." + a))] - if 'transaction' not in connection_methods: - # this may be the case for PostgreSQL < 7.4 - connection_methods.append('transaction') - if 'parameter' not in connection_methods: - # this may be the case for PostgreSQL < 7.4 - connection_methods.append('parameter') self.assertEqual(methods, connection_methods) def testAttributeDb(self): @@ -695,12 +696,6 @@ def testAllDBAttributes(self): 'update', 'user'] db_attributes = [a for a in dir(self.db) if not a.startswith('_')] - if 'transaction' not in db_attributes: - # this may be the case for PostgreSQL < 7.4 - db_attributes.insert(-4, 'transaction') - if 'parameter' not in db_attributes: - # this may be the case for PostgreSQL < 7.4 - db_attributes.insert(-12, 'parameter') self.assertEqual(attributes, db_attributes) def testAttributeDb(self): @@ -745,13 +740,26 @@ def testAttributeUser(self): self.assertEqual(self.db.db.user, def_user) def testMethodEscapeString(self): - self.assertEqual(self.db.escape_string('hello'), 'hello') + self.assertEqual(self.db.escape_string("plain"), "plain") + self.assertEqual(self.db.escape_string( + "that's k\xe4se"), "that''s k\xe4se") + self.assertEqual(self.db.escape_string( + r"It's fine to have a \ inside."), + r"It''s fine to have a \\ inside.") def testMethodEscapeBytea(self): - self.assertEqual(self.db.escape_bytea('hello'), 'hello') + self.assertEqual(self.db.escape_bytea("plain"), "plain") + self.assertEqual(self.db.escape_bytea( + "that's k\xe4se"), "that''s k\\\\344se") + self.assertEqual(self.db.escape_bytea( + 'O\x00ps\xff!'), r'O\\000ps\\377!') def testMethodUnescapeBytea(self): - self.assertEqual(self.db.unescape_bytea('hello'), 'hello') + self.assertEqual(self.db.unescape_bytea("plain"), "plain") + self.assertEqual(self.db.unescape_bytea( + "that's k\\344se"), "that's k\xe4se") + self.assertEqual(pg.unescape_bytea( + r'O\000ps\377!'), 'O\x00ps\xff!') def testMethodQuery(self): self.db.query("select 1+1") @@ -809,16 +817,25 @@ def tearDown(self): self.db.close() def testEscapeString(self): + self.assertEqual(self.db.escape_string("plain"), "plain") + self.assertEqual(self.db.escape_string( + "that's k\xe4se"), "that''s k\xe4se") self.assertEqual(self.db.escape_string( r"It's fine to have a \ inside."), r"It''s fine to have a \\ inside.") def testEscapeBytea(self): + self.assertEqual(self.db.escape_bytea("plain"), "plain") + self.assertEqual(self.db.escape_bytea( + "that's k\xe4se"), "that''s k\\\\344se") self.assertEqual(self.db.escape_bytea( 'O\x00ps\xff!'), r'O\\000ps\\377!') def testUnescapeBytea(self): + self.assertEqual(self.db.unescape_bytea("plain"), "plain") self.assertEqual(self.db.unescape_bytea( + "that's k\\344se"), "that's k\xe4se") + self.assertEqual(pg.unescape_bytea( r'O\000ps\377!'), 'O\x00ps\xff!') def testPkey(self):
Vulnerability mechanics
Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
9- secunia.com/advisories/37046nvdVendor Advisory
- github.com/advisories/GHSA-xv6x-43gq-4hfjghsaADVISORY
- nvd.nist.gov/vuln/detail/CVE-2009-2940ghsaADVISORY
- ubuntu.com/usn/usn-870-1nvdWEB
- www.debian.org/security/2009/dsa-1911nvdWEB
- github.com/PyGreSQL/PyGreSQL/commit/8e19320b130946eed6f043297e3e4e005a523612ghsaWEB
- github.com/PyGreSQL/PyGreSQL/commit/f7237d773e6f4d5a7da3d99bb6bc5062bd07935eghsaWEB
- secunia.com/advisories/37654nvd
- www.osvdb.org/59028nvd
News mentions
0No linked articles in our index yet.