VYPR
Critical severity9.8NVD Advisory· Published Mar 31, 2026· Updated Apr 3, 2026

CVE-2026-32714

CVE-2026-32714

Description

SciTokens is a reference library for generating and using SciTokens. Prior to version 1.9.6, the KeyCache class in scitokens was vulnerable to SQL Injection because it used Python's str.format() to construct SQL queries with user-supplied data (such as issuer and key_id). This allowed an attacker to execute arbitrary SQL commands against the local SQLite database. This issue has been patched in version 1.9.6.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
scitokensPyPI
< 1.9.61.9.6

Affected products

1

Patches

1
3dba108853f2

Refactor KeyCache SQL queries to use parameterized statements for security and add regression tests for SQL injection prevention

https://github.com/scitokens/scitokensDerek WeitzelMar 13, 2026via ghsa
2 files changed · +149 16
  • src/scitokens/utils/keycache.py+15 16 modified
    @@ -80,7 +80,7 @@ def addkeyinfo(self, issuer, key_id, public_key, cache_timer=0, next_update=0):
                 conn = sqlite3.connect(self.cache_location)
                 conn.row_factory = sqlite3.Row
                 curs = conn.cursor()
    -            curs.execute("DELETE FROM keycache WHERE issuer = '{}' AND key_id = '{}'".format(issuer, key_id))
    +            curs.execute("DELETE FROM keycache WHERE issuer = ? AND key_id = ?", [issuer, key_id])
                 KeyCache._addkeyinfo(curs, issuer, key_id, public_key, cache_timer=cache_timer, next_update=next_update)
                 conn.commit()
                 conn.close()
    @@ -94,14 +94,13 @@ def _addkeyinfo(curs, issuer, key_id, public_key, cache_timer=0, next_update=0):
             Given an open database cursor to a key cache, insert a key.
             """
             # Add the key to the cache
    -        insert_key_statement = "INSERT OR REPLACE INTO keycache VALUES('{issuer}', '{expiration}', '{key_id}', \
    -                               '{keydata}', '{next_update}')"
    +        insert_key_statement = "INSERT OR REPLACE INTO keycache VALUES(?, ?, ?, ?, ?)"
             keydata = {
                 'pub_key': public_key.public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo).decode('ascii'),
             }
     
    -        curs.execute(insert_key_statement.format(issuer=issuer, expiration=time.time()+cache_timer, key_id=key_id,
    -                                                 keydata=json.dumps(keydata), next_update=time.time()+next_update))
    +        curs.execute(insert_key_statement, [issuer, time.time()+cache_timer, key_id,
    +                                            json.dumps(keydata), time.time()+next_update])
             if curs.rowcount != 1:
                 raise UnableToWriteKeyCache("Unable to insert into key cache")
     
    @@ -137,8 +136,7 @@ def _delete_cache_entry(self, issuer, key_id):
             try:
                 conn = sqlite3.connect(self.cache_location)
                 curs = conn.cursor()
    -            curs.execute("DELETE FROM keycache WHERE issuer = '{}' AND key_id = '{}'".format(issuer,
    -                        key_id))
    +            curs.execute("DELETE FROM keycache WHERE issuer = ? AND key_id = ?", [issuer, key_id])
                 conn.commit()
                 conn.close()
             except Exception as ex:
    @@ -154,11 +152,10 @@ def _add_negative_cache_entry(self, issuer, key_id, cache_retry_interval):
                 conn = sqlite3.connect(self.cache_location)
                 conn.row_factory = sqlite3.Row
                 curs = conn.cursor()
    -            insert_key_statement = "INSERT OR REPLACE INTO keycache VALUES('{issuer}', '{expiration}', '{key_id}', \
    -                                '{keydata}', '{next_update}')"
    +            insert_key_statement = "INSERT OR REPLACE INTO keycache VALUES(?, ?, ?, ?, ?)"
                 keydata = ''
    -            curs.execute(insert_key_statement.format(issuer=issuer, expiration=time.time()+cache_retry_interval, key_id=key_id,
    -                                                    keydata=keydata, next_update=time.time()+cache_retry_interval))
    +            curs.execute(insert_key_statement, [issuer, time.time()+cache_retry_interval, key_id,
    +                                                keydata, time.time()+cache_retry_interval])
                 if curs.rowcount != 1:
                     logger = logging.getLogger("scitokens")
                     logger.error(UnableToWriteKeyCache("Unable to insert into key cache"))
    @@ -214,16 +211,18 @@ def getkeyinfo(self, issuer, key_id=None, insecure=False, force_refresh=False, c
             logger = logging.getLogger("scitokens")
             
             # Check the sql database
    -        key_query = ("SELECT * FROM keycache WHERE "
    -                     "issuer = '{issuer}'")
    -        if key_id != None:
    -            key_query += " AND key_id = '{key_id}'"
    +        if key_id is not None:
    +            key_query = "SELECT * FROM keycache WHERE issuer = ? AND key_id = ?"
    +            query_params = [issuer, key_id]
    +        else:
    +            key_query = "SELECT * FROM keycache WHERE issuer = ?"
    +            query_params = [issuer]
             row = None
             try:
                 conn = sqlite3.connect(self.cache_location)
                 conn.row_factory = sqlite3.Row
                 curs = conn.cursor()
    -            curs.execute(key_query.format(issuer=issuer, key_id=key_id))
    +            curs.execute(key_query, query_params)
                 row = curs.fetchone()
                 conn.commit()
                 conn.close()
    
  • tests/test_keycache.py+134 0 modified
    @@ -333,5 +333,139 @@ def test_cache_update_trigger(self):
             create_webserver.shutdown_server()
     
     
    +import sqlite3
    +
    +class TestKeyCacheSQLInjection(unittest.TestCase):
    +    """
    +    Regression tests to verify that SQL injection via issuer/key_id is not possible.
    +    """
    +
    +    def setUp(self):
    +        self.tmp_dir = tempfile.mkdtemp()
    +        self.old_xdg = os.environ.get('XDG_CACHE_HOME', None)
    +        os.environ['XDG_CACHE_HOME'] = self.tmp_dir
    +        self.keycache = KeyCache()
    +
    +        # Generate a test key pair
    +        self.private_key = generate_private_key(
    +            public_exponent=65537,
    +            key_size=2048,
    +            backend=default_backend()
    +        )
    +        self.public_key = self.private_key.public_key()
    +
    +    def tearDown(self):
    +        shutil.rmtree(self.tmp_dir)
    +        if self.old_xdg:
    +            os.environ['XDG_CACHE_HOME'] = self.old_xdg
    +
    +    def _count_rows(self):
    +        conn = sqlite3.connect(self.keycache.cache_location)
    +        curs = conn.cursor()
    +        curs.execute("SELECT COUNT(*) FROM keycache")
    +        count = curs.fetchone()[0]
    +        conn.close()
    +        return count
    +
    +    def test_injection_in_issuer_does_not_delete_other_rows(self):
    +        """
    +        With the old .format() pattern, an issuer like "x' OR '1'='1" in a
    +        DELETE would wipe every row. Parameterized queries treat it as a
    +        literal value, so no rows other than the exact match are affected.
    +        """
    +        # Insert a legitimate row
    +        self.keycache.addkeyinfo("https://legit.example.com/", "key1",
    +                                 self.public_key, cache_timer=3600)
    +        self.assertEqual(self._count_rows(), 1)
    +
    +        # Attempt injection via issuer in addkeyinfo (which DELETEs first)
    +        malicious_issuer = "x' OR '1'='1"
    +        self.keycache.addkeyinfo(malicious_issuer, "evil_key",
    +                                 self.public_key, cache_timer=3600)
    +
    +        # The legitimate row must still exist, plus the new malicious-literal row
    +        self.assertEqual(self._count_rows(), 2)
    +
    +    def test_injection_in_key_id_does_not_delete_other_rows(self):
    +        """
    +        A malicious key_id should not be able to affect other rows.
    +        """
    +        self.keycache.addkeyinfo("https://legit.example.com/", "key1",
    +                                 self.public_key, cache_timer=3600)
    +        self.assertEqual(self._count_rows(), 1)
    +
    +        malicious_key_id = "x' OR '1'='1"
    +        self.keycache.addkeyinfo("https://other.example.com/", malicious_key_id,
    +                                 self.public_key, cache_timer=3600)
    +
    +        self.assertEqual(self._count_rows(), 2)
    +
    +    def test_delete_cache_entry_with_injection_string(self):
    +        """
    +        _delete_cache_entry with a crafted issuer must not delete unrelated rows.
    +        """
    +        self.keycache.addkeyinfo("https://legit.example.com/", "key1",
    +                                 self.public_key, cache_timer=3600)
    +        self.assertEqual(self._count_rows(), 1)
    +
    +        # Try to delete with an injection string — should match nothing
    +        self.keycache._delete_cache_entry("x' OR '1'='1", "key1")
    +        self.assertEqual(self._count_rows(), 1)
    +
    +    def test_union_select_injection_is_literal(self):
    +        """
    +        A UNION SELECT payload in the issuer should be stored as a literal
    +        value, not interpreted as SQL.
    +        """
    +        malicious_issuer = "x' UNION SELECT * FROM keycache --"
    +        self.keycache.addkeyinfo(malicious_issuer, "key1",
    +                                 self.public_key, cache_timer=3600)
    +        self.assertEqual(self._count_rows(), 1)
    +
    +        # The stored issuer should be the literal malicious string
    +        conn = sqlite3.connect(self.keycache.cache_location)
    +        curs = conn.cursor()
    +        curs.execute("SELECT issuer FROM keycache")
    +        row = curs.fetchone()
    +        conn.close()
    +        self.assertEqual(row[0], malicious_issuer)
    +
    +    def test_getkeyinfo_injection_issuer_no_leak(self):
    +        """
    +        getkeyinfo with an injection payload in issuer must not return
    +        rows belonging to a different issuer.
    +        """
    +        self.keycache.addkeyinfo("https://legit.example.com/", "key1",
    +                                 self.public_key, cache_timer=3600)
    +
    +        # This injection string would match all rows with the old code
    +        malicious_issuer = "x' OR '1'='1"
    +        # getkeyinfo will not find a cached row and will try to download,
    +        # which will fail — that's expected.  The important thing is it
    +        # does NOT return the legit key.
    +        try:
    +            result = self.keycache.getkeyinfo(malicious_issuer, "key1")
    +        except Exception:
    +            result = None
    +        self.assertIsNone(result)
    +
    +    def test_negative_cache_with_injection_string(self):
    +        """
    +        _add_negative_cache_entry with injection strings stores them literally.
    +        """
    +        malicious_issuer = "x' OR '1'='1"
    +        malicious_key_id = "y' DROP TABLE keycache --"
    +        self.keycache._add_negative_cache_entry(malicious_issuer, malicious_key_id, 300)
    +        self.assertEqual(self._count_rows(), 1)
    +
    +        conn = sqlite3.connect(self.keycache.cache_location)
    +        curs = conn.cursor()
    +        curs.execute("SELECT issuer, key_id FROM keycache")
    +        row = curs.fetchone()
    +        conn.close()
    +        self.assertEqual(row[0], malicious_issuer)
    +        self.assertEqual(row[1], malicious_key_id)
    +
    +
     if __name__ == '__main__':
         unittest.main()
    

Vulnerability mechanics

Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

5

News mentions

0

No linked articles in our index yet.