VYPR
Moderate severityNVD Advisory· Published Feb 25, 2026· Updated Feb 26, 2026

zae-limiter: DynamoDB hot partition throttling enables per-entity Denial of Service

CVE-2026-27695

Description

zae-limiter is a rate limiting library using the token bucket algorithm. Prior to version 0.10.1, all rate limit buckets for a single entity share the same DynamoDB partition key (namespace/ENTITY#{id}). A high-traffic entity can exceed DynamoDB's per-partition throughput limits (~1,000 WCU/sec), causing throttling that degrades service for that entity — and potentially co-located entities in the same partition. Version 0.10.1 fixes the issue.

AI Insight

LLM-synthesized narrative grounded in this CVE's description and references.

Token bucket rate limiter library zae-limiter prior to 0.10.1 uses a shared DynamoDB partition key per entity, allowing a high-traffic entity to cause throttling and denial of service for co-located entities.

Vulnerability

CVE-2026-27695 affects the zae-limiter token bucket rate limiting library backed by DynamoDB. Prior to version 0.10.1, all rate limit buckets for a single entity shared the same DynamoDB partition key (namespace/ENTITY#{id}). This design meant that a single entity's write operations were confined to one partition, which has a maximum throughput of approximately 1000 Write Capacity Units (WCU) per second [1][2].

Exploitation

An attacker controlling a high-traffic entity (e.g., an API key with high request volume) can exceed the per-partition WCU limit. Because all buckets for that entity use the same partition key, the resulting DynamoDB throttling degrades rate limiting performance for that entity. Moreover, the throttling can affect other entities that happen to be co-located in the same partition, leading to a broader denial-of-service condition [2]. The vulnerability does not require authentication beyond normal API access, as it stems from the infrastructure design rather than a code injection or access control issue.

Impact

Successful exploitation degrades the rate limiting service, potentially causing legitimate requests to be incorrectly throttled or delayed. This can impact the availability of downstream services that rely on accurate rate limiting, especially in multi-tenant environments where entities share the same partition [1][2].

Mitigation

The issue is fixed in version 0.10.1, which introduces per-shard partition keys to distribute write load across multiple DynamoDB partitions [1][3]. Users are strongly advised to upgrade to version 0.10.1 or later. No workarounds are documented for versions prior to the fix, and the patch is included in the release [3].

AI Insight generated on May 19, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
zae-limiterPyPI
< 0.10.10.10.1

Affected products

1
  • zeroae/zae-limiterv5
    Range: < 0.10.1

Patches

1
481ce44d818d

🐛 fix(security): pre-shard bucket design review fixes (GHSA-76rv) (#415)

https://github.com/zeroae/zae-limiterPatrick SodréFeb 23, 2026via ghsa
17 files changed · +1993 111
  • docs/plans/2026-02-21-pre-shard-buckets-implementation.md+1231 0 modified
    @@ -1434,3 +1434,1234 @@ Phase 8: Sync                                                   │
     Phase 9: Integration                                            │
       Task 19 (CFN) ── Task 20 (version) ── Task 21 (integration) ── Task 22 (docs)
     ```
    +
    +---
    +
    +## Phase 10: Design Review Fixes
    +
    +Review: https://github.com/zeroae/zae-limiter-ghsa-76rv-2r9v-c5m6/pull/1#pullrequestreview-3838469366
    +
    +### Task 32: Add SpeculativeFailureReason enum
    +
    +**Problem:** On speculative failure, the limiter inspects `old_buckets` token values via `_is_wcu_exhausted()` to distinguish between app-limit exhaustion (retry on another shard) vs wcu exhaustion (double shards). This is implicit and fragile — the decision logic is spread across the limiter with no structured failure classification.
    +
    +**Fix:** Add a `SpeculativeFailureReason` enum to `repository_protocol.py` and populate it in `_speculative_consume_single()` when constructing the failure `SpeculativeResult`. Replace `_is_wcu_exhausted()` in the limiter with a match on `result.failure_reason`.
    +
    +**Files:**
    +- Modify: `src/zae_limiter/repository_protocol.py` (add enum, add field to `SpeculativeResult`)
    +- Modify: `src/zae_limiter/repository.py:2402-2415` (set `failure_reason` on failure results)
    +- Modify: `src/zae_limiter/limiter.py:732-752` (replace `_is_wcu_exhausted` with `result.failure_reason`)
    +- Modify: `src/zae_limiter/limiter.py:1007-1026` (remove `_is_wcu_exhausted` method)
    +- Generate: sync code (`hatch run generate-sync`)
    +- Test: `tests/unit/test_repository.py`, `tests/unit/test_limiter.py`
    +
    +**Step 1: Write failing tests**
    +
    +In `tests/unit/test_repository.py`:
    +
    +```python
    +from zae_limiter.repository_protocol import SpeculativeFailureReason
    +
    +def test_speculative_failure_reason_wcu_exhausted():
    +    """Failure reason is WCU_EXHAUSTED when wcu tokens < 1000 milli."""
    +    # Setup: bucket with wcu at 0, rpm has tokens
    +    result = await repo.speculative_consume(entity_id="user-1", resource="api", consume={"rpm": 1})
    +    assert not result.success
    +    assert result.failure_reason == SpeculativeFailureReason.WCU_EXHAUSTED
    +
    +def test_speculative_failure_reason_app_limit_exhausted():
    +    """Failure reason is APP_LIMIT_EXHAUSTED when user limit exhausted but wcu has tokens."""
    +    # Setup: bucket with rpm at 0, wcu full
    +    result = await repo.speculative_consume(entity_id="user-1", resource="api", consume={"rpm": 100})
    +    assert not result.success
    +    assert result.failure_reason == SpeculativeFailureReason.APP_LIMIT_EXHAUSTED
    +
    +def test_speculative_failure_reason_bucket_missing():
    +    """Failure reason is BUCKET_MISSING when no bucket exists."""
    +    result = await repo.speculative_consume(entity_id="no-bucket", resource="api", consume={"rpm": 1})
    +    assert not result.success
    +    assert result.failure_reason == SpeculativeFailureReason.BUCKET_MISSING
    +```
    +
    +In `tests/unit/test_limiter.py`:
    +
    +```python
    +def test_wcu_exhausted_triggers_doubling_via_failure_reason():
    +    """Limiter uses failure_reason instead of inspecting old_buckets."""
    +    # Setup: mock speculative result with failure_reason=WCU_EXHAUSTED
    +    # Verify bump_shard_count is called
    +```
    +
    +**Step 2: Run tests to verify they fail**
    +
    +Run: `uv run pytest tests/unit/test_repository.py -k "test_speculative_failure_reason" -v`
    +Expected: FAIL with `ImportError: cannot import name 'SpeculativeFailureReason'`
    +
    +**Step 3: Add SpeculativeFailureReason enum**
    +
    +In `repository_protocol.py` before `SpeculativeResult`:
    +
    +```python
    +from enum import Enum
    +
    +class SpeculativeFailureReason(Enum):
    +    """Classifies why a speculative write failed (GHSA-76rv).
    +
    +    Used by the limiter to decide the recovery path without
    +    inspecting individual BucketState token values.
    +    """
    +    APP_LIMIT_EXHAUSTED = "app_limit_exhausted"
    +    WCU_EXHAUSTED = "wcu_exhausted"
    +    BOTH_EXHAUSTED = "both_exhausted"
    +    BUCKET_MISSING = "bucket_missing"
    +```
    +
    +Add field to `SpeculativeResult`:
    +
    +```python
    +    failure_reason: SpeculativeFailureReason | None = None
    +```
    +
    +**Step 4: Populate failure_reason in `_speculative_consume_single`**
    +
    +In `repository.py:2402-2415`, classify the failure:
    +
    +```python
    +if old_item:
    +    old_buckets = self._deserialize_composite_bucket(old_item)
    +    old_shard_count = int(old_item.get("shard_count", {}).get("N", "1"))
    +
    +    # Classify failure reason
    +    wcu_exhausted = any(
    +        b.limit_name == WCU_LIMIT_NAME and b.tokens_milli < 1000
    +        for b in old_buckets
    +    )
    +    app_exhausted = any(
    +        b.limit_name != WCU_LIMIT_NAME
    +        and b.tokens_milli < consume.get(b.limit_name, 0) * 1000
    +        for b in old_buckets
    +    )
    +    if wcu_exhausted and app_exhausted:
    +        reason = SpeculativeFailureReason.BOTH_EXHAUSTED
    +    elif wcu_exhausted:
    +        reason = SpeculativeFailureReason.WCU_EXHAUSTED
    +    else:
    +        reason = SpeculativeFailureReason.APP_LIMIT_EXHAUSTED
    +
    +    return SpeculativeResult(
    +        success=False,
    +        old_buckets=old_buckets,
    +        shard_id=shard_id,
    +        shard_count=old_shard_count,
    +        failure_reason=reason,
    +    )
    +else:
    +    return SpeculativeResult(
    +        success=False,
    +        shard_id=shard_id,
    +        failure_reason=SpeculativeFailureReason.BUCKET_MISSING,
    +    )
    +```
    +
    +**Step 5: Replace `_is_wcu_exhausted` in limiter**
    +
    +In `limiter.py:732-752`, replace:
    +
    +```python
    +# Before:
    +if self._is_wcu_exhausted(result.old_buckets):
    +    ...
    +if result.shard_count > 1:
    +    ...
    +
    +# After:
    +if result.failure_reason in (
    +    SpeculativeFailureReason.WCU_EXHAUSTED,
    +    SpeculativeFailureReason.BOTH_EXHAUSTED,
    +):
    +    await self._repository.bump_shard_count(entity_id, resource, result.shard_count)
    +    return None
    +
    +if result.shard_count > 1 and result.failure_reason == SpeculativeFailureReason.APP_LIMIT_EXHAUSTED:
    +    retry_result = await self._retry_on_other_shard(...)
    +    ...
    +```
    +
    +Remove `_is_wcu_exhausted()` static method (lines 1007-1026).
    +
    +**Step 6: Generate sync code**
    +
    +Run: `hatch run generate-sync`
    +
    +**Step 7: Run all unit tests**
    +
    +Run: `uv run pytest tests/unit/ -v`
    +Expected: PASS
    +
    +**Step 8: Commit**
    +
    +```bash
    +git add src/zae_limiter/repository_protocol.py src/zae_limiter/repository.py \
    +    src/zae_limiter/limiter.py src/zae_limiter/sync_*.py \
    +    tests/unit/test_repository.py tests/unit/test_limiter.py \
    +    tests/unit/test_sync_*.py
    +git commit -m "♻️ refactor(limiter): add SpeculativeFailureReason enum for deterministic shard decisions"
    +```
    +
    +---
    +
    +### Task 33: Add high shard count warning
    +
    +**Problem:** Shard count doubles without any observability signal. At 32 shards, `propagate_shard_count` issues 31 sequential DynamoDB writes per Lambda invocation. Operators have no visibility into runaway shard growth.
    +
    +**Fix:** Add `WCU_SHARD_WARN_THRESHOLD = 32` constant in `schema.py`. Emit a warning log when doubling crosses the threshold at both doubling sites: `Repository.bump_shard_count()` (client) and `try_proactive_shard()` (aggregator). No hard cap — doubling continues.
    +
    +**Files:**
    +- Modify: `src/zae_limiter/schema.py` (add `WCU_SHARD_WARN_THRESHOLD` constant)
    +- Modify: `src/zae_limiter/repository.py:2435` (warning after doubling in `bump_shard_count`)
    +- Modify: `src/zae_limiter_aggregator/processor.py:665` (warning after doubling in `try_proactive_shard`)
    +- Generate: sync code (`hatch run generate-sync`)
    +- Test: `tests/unit/test_schema.py`, `tests/unit/test_repository.py`, `tests/unit/test_processor.py`
    +
    +**Step 1: Write failing tests**
    +
    +In `tests/unit/test_schema.py`:
    +
    +```python
    +def test_wcu_shard_warn_threshold_constant():
    +    assert schema.WCU_SHARD_WARN_THRESHOLD == 32
    +```
    +
    +In `tests/unit/test_repository.py`:
    +
    +```python
    +def test_bump_shard_count_warns_above_threshold(caplog):
    +    """bump_shard_count logs warning when new count exceeds threshold."""
    +    repo = make_repo(mock_dynamodb, unique_name)
    +    # Create entity + bucket with shard_count=32 (at threshold)
    +    # ...
    +    result = await repo.bump_shard_count("user-1", "api", 32)
    +    assert result == 64  # doubling still happens
    +    assert "shard count exceeded" in caplog.text.lower() or any(
    +        "shard" in r.message.lower() and "64" in r.message for r in caplog.records
    +    )
    +```
    +
    +In `tests/unit/test_processor.py`:
    +
    +```python
    +def test_try_proactive_shard_warns_above_threshold(caplog):
    +    """Proactive sharding logs warning when new count exceeds threshold."""
    +    state = BucketRefillState(
    +        ..., shard_id=0, shard_count=32, ...
    +    )
    +    result = try_proactive_shard(mock_table, state, 900_000, 1000_000)
    +    assert result is True  # doubling still happens
    +    # Verify warning was logged
    +```
    +
    +**Step 2: Run tests to verify they fail**
    +
    +Run: `uv run pytest tests/unit/test_schema.py -k "test_wcu_shard_warn" -v`
    +Expected: FAIL with `AttributeError`
    +
    +**Step 3: Add constant to schema.py**
    +
    +After `WCU_LIMIT_REFILL_PERIOD_SECONDS`:
    +
    +```python
    +WCU_SHARD_WARN_THRESHOLD = 32  # Log warning when shard count exceeds this (GHSA-76rv)
    +```
    +
    +**Step 4: Add warning in bump_shard_count (repository.py)**
    +
    +After the successful `update_item` (line ~2451), before updating the cache:
    +
    +```python
    +if new_count > schema.WCU_SHARD_WARN_THRESHOLD:
    +    logger.warning(
    +        "High shard count after doubling",
    +        entity_id=entity_id,
    +        resource=resource,
    +        shard_count=new_count,
    +        threshold=schema.WCU_SHARD_WARN_THRESHOLD,
    +    )
    +```
    +
    +**Step 5: Add warning in try_proactive_shard (processor.py)**
    +
    +After the successful `table.update_item` in the existing `logger.info` block:
    +
    +```python
    +if new_count > WCU_SHARD_WARN_THRESHOLD:
    +    logger.warning(
    +        "High shard count after proactive doubling",
    +        entity_id=state.entity_id,
    +        resource=state.resource,
    +        shard_count=new_count,
    +        threshold=WCU_SHARD_WARN_THRESHOLD,
    +    )
    +```
    +
    +Import `WCU_SHARD_WARN_THRESHOLD` from `zae_limiter.schema`.
    +
    +**Step 6: Generate sync code**
    +
    +Run: `hatch run generate-sync`
    +
    +**Step 7: Run all unit tests**
    +
    +Run: `uv run pytest tests/unit/ -v`
    +Expected: PASS
    +
    +**Step 8: Commit**
    +
    +```bash
    +git add src/zae_limiter/schema.py src/zae_limiter/repository.py \
    +    src/zae_limiter_aggregator/processor.py src/zae_limiter/sync_*.py \
    +    tests/unit/test_schema.py tests/unit/test_repository.py \
    +    tests/unit/test_processor.py tests/unit/test_sync_*.py
    +git commit -m "⚠️ fix(schema): warn when shard count exceeds WCU_SHARD_WARN_THRESHOLD=32"
    +```
    +
    +---
    +
    +### Task 34: Add missing test coverage for review findings
    +
    +**Problem:** Two test gaps identified in the design review (the third — propagation GSI attributes — is addressed by Task #29 which rewrites propagation with full attributes).
    +
    +1. **PK round-trip with `/` in resources** — `parse_bucket_pk` is tested with `gpt-4` but not with resources containing `/`, `.`, `-`, `_`. Since resources like `openai/gpt-4` and `anthropic/claude-3/opus` are valid (see CLAUDE.md naming rules), the parser must handle `#`-delimited splitting correctly when the resource contains `/`.
    +2. **Batch size boundary condition** — The test for `try_proactive_shard` passes `wcu_tc_delta=900_000` directly, but at `BatchSize=100`, the maximum achievable delta is `100 * 1000 = 100_000 milli` (100 writes × 1 WCU each × 1000 milli). A test should document that the threshold is unreachable at the current batch size.
    +
    +**Files:**
    +- Modify: `tests/unit/test_schema.py` (parametric PK round-trip tests)
    +- Modify: `tests/unit/test_processor.py` (batch size boundary test)
    +
    +**Step 1: Add parametric PK round-trip tests**
    +
    +In `tests/unit/test_schema.py`, add to `TestBucketPKBuilders`:
    +
    +```python
    +@pytest.mark.parametrize(
    +    "resource",
    +    [
    +        "gpt-4",                   # hyphen
    +        "gpt_4",                   # underscore
    +        "gpt-3.5-turbo",           # dot
    +        "openai/gpt-4",            # slash (provider/model)
    +        "anthropic/claude-3/opus", # nested slash
    +    ],
    +)
    +def test_parse_bucket_pk_round_trip(self, resource):
    +    """pk_bucket → parse_bucket_pk round-trips for all valid resource chars."""
    +    pk = schema.pk_bucket("ns1", "user-1", resource, 0)
    +    ns, entity, res, shard = schema.parse_bucket_pk(pk)
    +    assert ns == "ns1"
    +    assert entity == "user-1"
    +    assert res == resource
    +    assert shard == 0
    +```
    +
    +**Step 2: Add batch size boundary test**
    +
    +In `tests/unit/test_processor.py`, add to `TestTryProactiveShard`:
    +
    +```python
    +def test_unreachable_at_batch_size_100(self) -> None:
    +    """At BatchSize=100, max wcu tc_delta is 100_000 milli (10% of capacity).
    +
    +    Documents that proactive sharding requires BatchSize >= 800 to trigger
    +    at the 80% threshold (WCU_PROACTIVE_THRESHOLD). With the default
    +    BatchSize=100, the maximum achievable ratio is 100/1000 = 10%.
    +    See Task #28 for the BatchSize fix.
    +    """
    +    mock_table = MagicMock()
    +    state = self._make_state()
    +    # Max possible: 100 records × 1 WCU × 1000 milli = 100_000
    +    max_tc_delta_at_batch_100 = 100 * 1000
    +    wcu_capacity_milli = 1000_000
    +
    +    result = try_proactive_shard(
    +        mock_table, state, max_tc_delta_at_batch_100, wcu_capacity_milli
    +    )
    +
    +    assert result is False  # 10% < 80% threshold
    +    mock_table.update_item.assert_not_called()
    +```
    +
    +**Step 3: Run tests to verify they pass**
    +
    +Run: `uv run pytest tests/unit/test_schema.py::TestBucketPKBuilders -v`
    +Run: `uv run pytest tests/unit/test_processor.py::TestTryProactiveShard -v`
    +Expected: PASS (these test existing behavior, not new code)
    +
    +**Step 4: Commit**
    +
    +```bash
    +git add tests/unit/test_schema.py tests/unit/test_processor.py
    +git commit -m "✅ test: add PK round-trip and batch size boundary tests (review #34)"
    +```
    +
    +---
    +
    +### Task 31: Add gsi4_sk_bucket schema builder
    +
    +**Problem:** The GSI4SK format `BUCKET#{entity_id}#{resource}#{shard_id}` is inlined in `repository.py:1892` and `sync_repository.py:1552`. All other GSI SK formats have dedicated builders (`gsi2_sk_bucket`, `gsi3_sk_bucket`).
    +
    +**Fix:** Add `gsi4_sk_bucket(entity_id, resource, shard_id)` to `schema.py`, use it in `repository.py`, and regenerate sync code.
    +
    +**Files:**
    +- Modify: `src/zae_limiter/schema.py` (add builder)
    +- Modify: `src/zae_limiter/repository.py:1891-1893` (use builder)
    +- Generate: sync code (`hatch run generate-sync`)
    +- Test: `tests/unit/test_schema.py`
    +
    +**Step 1: Write failing test**
    +
    +In `tests/unit/test_schema.py`, add to `TestBucketPKBuilders`:
    +
    +```python
    +def test_gsi4_sk_bucket(self):
    +    assert schema.gsi4_sk_bucket("user-1", "gpt-4", 0) == "BUCKET#user-1#gpt-4#0"
    +    assert schema.gsi4_sk_bucket("user-1", "gpt-4", 3) == "BUCKET#user-1#gpt-4#3"
    +```
    +
    +**Step 2: Run test to verify it fails**
    +
    +Run: `uv run pytest tests/unit/test_schema.py -k "test_gsi4_sk_bucket" -v`
    +Expected: FAIL with `AttributeError: module 'zae_limiter.schema' has no attribute 'gsi4_sk_bucket'`
    +
    +**Step 3: Add builder to schema.py**
    +
    +After `gsi3_sk_bucket()` (~line 415):
    +
    +```python
    +def gsi4_sk_bucket(entity_id: str, resource: str, shard_id: int) -> str:
    +    """Build GSI4 sort key for bucket item (namespace-scoped discovery).
    +
    +    Args:
    +        entity_id: Entity owning the bucket
    +        resource: Resource name
    +        shard_id: Shard index (0-based)
    +
    +    Returns:
    +        GSI4SK string in format ``BUCKET#{entity_id}#{resource}#{shard_id}``
    +    """
    +    return f"{BUCKET_PREFIX}{entity_id}#{resource}#{shard_id}"
    +```
    +
    +**Step 4: Replace inline in repository.py**
    +
    +At line 1891-1893, change:
    +
    +```python
    +# Before:
    +"GSI4SK": {
    +    "S": f"{schema.BUCKET_PREFIX}{entity_id}#{resource}#{shard_id}",
    +},
    +
    +# After:
    +"GSI4SK": {"S": schema.gsi4_sk_bucket(entity_id, resource, shard_id)},
    +```
    +
    +**Step 5: Generate sync code**
    +
    +Run: `hatch run generate-sync`
    +
    +**Step 6: Run all unit tests**
    +
    +Run: `uv run pytest tests/unit/ -v`
    +Expected: PASS
    +
    +**Step 7: Commit**
    +
    +```bash
    +git add src/zae_limiter/schema.py src/zae_limiter/repository.py \
    +    src/zae_limiter/sync_repository.py tests/unit/test_schema.py
    +git commit -m "♻️ refactor(schema): add gsi4_sk_bucket builder and use in repository"
    +```
    +
    +---
    +
    +### Task 30: Fix inflated capacity in get_resource_capacity
    +
    +**Problem:**
    +
    +`get_resource_capacity()` (`limiter.py:1844`) iterates all buckets from
    +`get_resource_buckets()` and sums `capacity` and `available`. With pre-shard buckets,
    +GSI2 returns one item per `(entity, shard)`. Each shard stores the **full, undivided**
    +capacity, so a 2-shard entity with `capacity=1000` produces `total_capacity=2000` and
    +two duplicate `EntityCapacity` entries.
    +
    +**Fix:** Deduplicate in `get_resource_capacity()` — group buckets by `entity_id`, use
    +capacity from shard 0, sum available tokens across shards.
    +
    +Note: `get_resource_buckets()` still returns per-shard items to other callers. This is
    +acceptable — callers consuming raw buckets can decide how to aggregate.
    +
    +**Files:**
    +- Modify: `src/zae_limiter/limiter.py` (`get_resource_capacity` — deduplicate by entity)
    +- Test: `tests/unit/test_limiter.py` (new test with sharded entity)
    +
    +**Step 1: Write failing test**
    +
    +Add to `TestGetResourceCapacity` in `tests/unit/test_limiter.py`:
    +
    +```python
    +@pytest.mark.asyncio
    +async def test_get_resource_capacity_sharded_entity_deduplication(self, limiter):
    +    """Sharded entities should report capacity once, not per-shard."""
    +    await limiter.create_entity("sharded-user")
    +    limits = [Limit.per_minute("rpm", 100)]
    +
    +    async with limiter.acquire("sharded-user", "gpt-4", {"rpm": 10}, limits=limits):
    +        pass
    +
    +    # Manually create shard 1 to simulate sharding
    +    repo = limiter._repository
    +    client = await repo._get_client()
    +    shard0_key = {
    +        "PK": {"S": schema.pk_bucket(repo._namespace_id, "sharded-user", "gpt-4", 0)},
    +        "SK": {"S": schema.sk_state()},
    +    }
    +    shard0_resp = await client.get_item(TableName=repo.table_name, Key=shard0_key)
    +    shard0_item = shard0_resp["Item"]
    +
    +    shard1_item = dict(shard0_item)
    +    shard1_item["PK"] = {"S": schema.pk_bucket(repo._namespace_id, "sharded-user", "gpt-4", 1)}
    +    shard1_item["GSI2SK"] = {"S": schema.gsi2_sk_bucket("sharded-user", 1)}
    +    shard1_item["GSI3SK"] = {"S": schema.gsi3_sk_bucket("gpt-4", 1)}
    +    shard1_item["shard_count"] = {"N": "2"}
    +    await client.update_item(
    +        TableName=repo.table_name, Key=shard0_key,
    +        UpdateExpression="SET shard_count = :sc",
    +        ExpressionAttributeValues={":sc": {"N": "2"}},
    +    )
    +    await client.put_item(TableName=repo.table_name, Item=shard1_item)
    +
    +    capacity = await limiter.get_resource_capacity("gpt-4", "rpm")
    +
    +    assert capacity.total_capacity == 100  # Not 200
    +    assert len(capacity.entities) == 1  # Not 2
    +    assert capacity.entities[0].entity_id == "sharded-user"
    +    assert capacity.entities[0].capacity == 100
    +```
    +
    +**Step 2: Run test — expect FAIL**
    +
    +Run: `uv run pytest tests/unit/test_limiter.py::TestGetResourceCapacity::test_get_resource_capacity_sharded_entity_deduplication -v`
    +Expected: FAIL (`total_capacity == 200`, `len(entities) == 2`)
    +
    +**Step 3: Deduplicate by entity in `get_resource_capacity()`**
    +
    +In `limiter.py`, replace the per-bucket loop (lines 1879-1895) with entity-grouped
    +aggregation:
    +
    +```python
    +from collections import defaultdict
    +
    +entity_buckets: dict[str, list[BucketState]] = defaultdict(list)
    +for bucket in buckets:
    +    entity_buckets[bucket.entity_id].append(bucket)
    +
    +entities: list[EntityCapacity] = []
    +total_capacity = 0
    +total_available = 0
    +
    +for entity_id, entity_bucket_list in entity_buckets.items():
    +    # Capacity is the same on all shards (undivided); take from first
    +    capacity = entity_bucket_list[0].capacity
    +    # Available tokens are distributed across shards; sum them
    +    available = sum(calculate_available(b, now_ms) for b in entity_bucket_list)
    +    available = min(available, capacity)
    +
    +    total_capacity += capacity
    +    total_available += available
    +
    +    entities.append(
    +        EntityCapacity(
    +            entity_id=entity_id,
    +            capacity=capacity,
    +            available=available,
    +            utilization_pct=(
    +                ((capacity - available) / capacity * 100) if capacity > 0 else 0
    +            ),
    +        )
    +    )
    +```
    +
    +**Step 4: Generate sync code**
    +
    +Run: `hatch run generate-sync`
    +
    +**Step 5: Run all unit tests**
    +
    +Run: `uv run pytest tests/unit/ -v`
    +Expected: PASS
    +
    +**Step 6: Commit**
    +
    +```bash
    +git add src/zae_limiter/limiter.py src/zae_limiter/sync_limiter.py \
    +    tests/unit/test_limiter.py tests/unit/test_sync_limiter.py
    +git commit -m "🐛 fix(limiter): deduplicate sharded entities in get_resource_capacity"
    +```
    +
    +---
    +
    +### Task 35: Handle DynamoDB per-partition throttling with shard probe
    +
    +**Problem:** The design plan's Failure Handling table (row 4) specifies catching `ProvisionedThroughputExceededException` for DynamoDB throttle retry with shard probe. This is only correct for provisioned tables. On-demand tables (`PAY_PER_REQUEST`) return `ThrottlingException` with `ThrottlingReason: TableWriteKeyRangeThroughputExceeded` for per-partition throttling. The code must handle both capacity modes as a defensive fallback behind the `wcu` infrastructure limit.
    +
    +**References:**
    +- [DynamoDB Error Handling](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html): On-demand → `ThrottlingException`, provisioned → `ProvisionedThroughputExceededException`
    +- [Key Range Throughput Exceeded](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/throttling-key-range-limit-exceeded-mitigation.html): Per-partition throttling uses `TableWriteKeyRangeThroughputExceeded` reason
    +
    +**Fix:** In `speculative_consume()` or the limiter's `_try_speculative_acquire()`, catch both:
    +1. `ProvisionedThroughputExceededException` (provisioned tables)
    +2. `ThrottlingException` where `ThrottlingReason` contains `KeyRangeThroughputExceeded` (on-demand tables)
    +
    +On either, probe shard 1 with a GetItem. If it exists, read `shard_count` and update entity cache, then fall through to slow path on a different shard. If shard 1 doesn't exist, raise `RateLimiterUnavailable`.
    +
    +Ignore `ThrottlingException` with other reasons (table-level caps, control plane throttling) — retry on a different shard won't help for those.
    +
    +**Files:**
    +- Modify: `src/zae_limiter/repository.py` (catch both exceptions in `_speculative_consume_single`)
    +- Modify: `src/zae_limiter/limiter.py` (shard probe logic on throttle)
    +- Modify: `docs/plans/2026-02-21-pre-shard-buckets-design.md` (update Failure Handling row 4)
    +- Generate: sync code (`hatch run generate-sync`)
    +- Test: `tests/unit/test_repository.py`, `tests/unit/test_limiter.py`
    +
    +**Step 1: Write failing tests**
    +
    +In `tests/unit/test_repository.py`:
    +
    +```python
    +def test_speculative_consume_handles_provisioned_throttle():
    +    """ProvisionedThroughputExceededException returns throttled SpeculativeResult."""
    +    # Mock client to raise ProvisionedThroughputExceededException
    +    result = await repo.speculative_consume("user-1", "api", {"rpm": 1})
    +    assert not result.success
    +    assert result.failure_reason == SpeculativeFailureReason.PARTITION_THROTTLED
    +
    +def test_speculative_consume_handles_on_demand_throttle():
    +    """ThrottlingException with KeyRangeThroughputExceeded returns throttled result."""
    +    # Mock client to raise ThrottlingException with ThrottlingReason
    +    result = await repo.speculative_consume("user-1", "api", {"rpm": 1})
    +    assert not result.success
    +    assert result.failure_reason == SpeculativeFailureReason.PARTITION_THROTTLED
    +
    +def test_speculative_consume_reraises_non_partition_throttle():
    +    """ThrottlingException without KeyRange reason is re-raised."""
    +    # Mock client to raise ThrottlingException with MaxOnDemandThroughputExceeded
    +    with pytest.raises(ClientError):
    +        await repo.speculative_consume("user-1", "api", {"rpm": 1})
    +```
    +
    +**Step 2: Run tests to verify they fail**
    +
    +Run: `uv run pytest tests/unit/test_repository.py -k "test_speculative_consume_handles" -v`
    +Expected: FAIL
    +
    +**Step 3: Add throttle handling in `_speculative_consume_single`**
    +
    +```python
    +def _is_partition_throttle(e: ClientError) -> bool:
    +    """Check if a ClientError is a per-partition throttle (hot key)."""
    +    code = e.response.get("Error", {}).get("Code", "")
    +    if code == "ProvisionedThroughputExceededException":
    +        return True
    +    if code == "ThrottlingException":
    +        reason = e.response.get("Error", {}).get("ThrottlingReason", "")
    +        return "KeyRangeThroughputExceeded" in reason
    +    return False
    +```
    +
    +In `_speculative_consume_single`, extend the `except ClientError` block:
    +
    +```python
    +except ClientError as e:
    +    if e.response.get("Error", {}).get("Code") == "ConditionalCheckFailedException":
    +        # ... existing logic ...
    +    elif _is_partition_throttle(e):
    +        return SpeculativeResult(
    +            success=False,
    +            shard_id=shard_id,
    +            failure_reason=SpeculativeFailureReason.PARTITION_THROTTLED,
    +        )
    +    raise
    +```
    +
    +**Step 4: Add shard probe in limiter**
    +
    +In `_try_speculative_acquire()`, after the existing wcu/shard-retry logic:
    +
    +```python
    +if result.failure_reason == SpeculativeFailureReason.PARTITION_THROTTLED:
    +    # Probe shard 1 to discover shard_count
    +    probe = await self._repository.get_buckets(entity_id, resource, shard_id=1)
    +    if probe:
    +        shard_count = probe[0].shard_count  # or from item attributes
    +        # Update entity cache, fall through to slow path on different shard
    +        return None
    +    raise RateLimiterUnavailable("DynamoDB partition throttled, no shards available")
    +```
    +
    +**Step 5: Update design plan Failure Handling table**
    +
    +Replace row 4 in `docs/plans/2026-02-21-pre-shard-buckets-design.md`:
    +
    +```markdown
    +| `ProvisionedThroughputExceededException` or `ThrottlingException` (KeyRange) | DynamoDB per-partition throttle | Probe shard 1; if exists, discover `shard_count` and retry; if not, `RateLimiterUnavailable` |
    +```
    +
    +**Step 6: Generate sync code**
    +
    +Run: `hatch run generate-sync`
    +
    +**Step 7: Run all unit tests**
    +
    +Run: `uv run pytest tests/unit/ -v`
    +Expected: PASS
    +
    +**Step 8: Commit**
    +
    +```bash
    +git add src/zae_limiter/repository.py src/zae_limiter/limiter.py \
    +    src/zae_limiter/sync_*.py docs/plans/2026-02-21-pre-shard-buckets-design.md \
    +    tests/unit/test_repository.py tests/unit/test_limiter.py tests/unit/test_sync_*.py
    +git commit -m "🐛 fix(repository): handle both ProvisionedThroughput and ThrottlingException for partition throttle"
    +```
    +
    +---
    +
    +### Task 36: Fix propagate_shard_count to pre-create new shard items
    +
    +**Problem:** `propagate_shard_count()` (`processor.py:700-768`) uses `attribute_not_exists(shard_count) OR shard_count < :new` as the condition expression. When the target shard doesn't exist, this creates an incomplete item with only `PK`, `SK`, and `shard_count` — missing all GSI attributes (`GSI2PK`, `GSI2SK`, `GSI3PK`, `GSI3SK`, `GSI4PK`, `GSI4SK`), limit attributes, `cascade`, `parent_id`, and `wcu` infrastructure limit. These incomplete items are invisible to GSI3 bucket discovery and GSI2 resource capacity queries.
    +
    +Without pre-creation, every new shard's first client access hits the slow path (+2 RT), which goes against the architecture's core principle: keep clients on the speculative fast path. The aggregator also can't refill new shards until a client creates them.
    +
    +**Fix:** Split propagation into two code paths based on `old_count`/`new_count` from the stream record's OldImage/NewImage:
    +1. **Existing shards** `range(1, old_count)`: UpdateItem with `shard_count < :new` (drop `attribute_not_exists`)
    +2. **New shards** `range(old_count, new_count)`: PutItem cloned from shard 0's NewImage with adjusted PK/GSI keys, tokens set to effective capacity, condition `attribute_not_exists(PK)` to avoid overwriting client-created items
    +
    +**Cost:** +N WCU per doubling event in the aggregator Lambda (async, rare). Keeps clients on the fast path.
    +
    +**Files:**
    +- Modify: `src/zae_limiter_aggregator/processor.py:700-768` (`propagate_shard_count`)
    +- Test: `tests/unit/test_processor.py`
    +
    +**Step 1: Write failing tests**
    +
    +In `tests/unit/test_processor.py`:
    +
    +```python
    +def test_propagate_creates_full_items_for_new_shards():
    +    """New shards get full items cloned from shard 0's NewImage."""
    +    record = make_modify_record(
    +        pk="ns1/BUCKET#user-1#gpt-4#0",
    +        sk="#STATE",
    +        new_image={
    +            "PK": {"S": "ns1/BUCKET#user-1#gpt-4#0"},
    +            "SK": {"S": "#STATE"},
    +            "shard_count": {"N": "4"},
    +            "GSI2PK": {"S": "ns1/RESOURCE#gpt-4"},
    +            "GSI2SK": {"S": "BUCKET#user-1#0"},
    +            "GSI3PK": {"S": "ns1/ENTITY#user-1"},
    +            "GSI3SK": {"S": "BUCKET#gpt-4#0"},
    +            "GSI4PK": {"S": "ns1"},
    +            "GSI4SK": {"S": "BUCKET#user-1#gpt-4#0"},
    +            "b_rpm_tk": {"N": "50000000"},
    +            "b_rpm_cp": {"N": "100000000"},
    +            "b_rpm_ra": {"N": "100000000"},
    +            "b_rpm_rp": {"N": "60000"},
    +            "b_rpm_tc": {"N": "0"},
    +            "b_wcu_tk": {"N": "1000000"},
    +            "b_wcu_cp": {"N": "1000000"},
    +            "b_wcu_ra": {"N": "1000000"},
    +            "b_wcu_rp": {"N": "1000"},
    +            "b_wcu_tc": {"N": "500"},
    +            "cascade": {"BOOL": False},
    +            "rf": {"N": "1000"},
    +        },
    +        old_image={
    +            "PK": {"S": "ns1/BUCKET#user-1#gpt-4#0"},
    +            "SK": {"S": "#STATE"},
    +            "shard_count": {"N": "2"},
    +        },
    +    )
    +    mock_table = MagicMock()
    +    propagated = propagate_shard_count(mock_table, record)
    +
    +    # Shards 2 and 3 are NEW (old_count=2, new_count=4)
    +    # Shard 1 is EXISTING (update only)
    +    calls = mock_table.update_item.call_args_list
    +    put_calls = mock_table.put_item.call_args_list
    +
    +    # 1 existing shard updated
    +    assert len(calls) == 1
    +    assert "shard_count < :new" in calls[0].kwargs["ConditionExpression"]
    +
    +    # 2 new shards pre-created with full attributes
    +    assert len(put_calls) == 2
    +    for put_call in put_calls:
    +        item = put_call.kwargs["Item"]
    +        assert "GSI3PK" in item  # Has GSI attributes
    +        assert "GSI2PK" in item
    +        assert "b_rpm_tk" in item  # Has limit attributes
    +        assert "b_wcu_tk" in item  # Has wcu
    +
    +    # Verify PKs are correct for new shards
    +    new_pks = {c.kwargs["Item"]["PK"]["S"] for c in put_calls}
    +    assert "ns1/BUCKET#user-1#gpt-4#2" in new_pks
    +    assert "ns1/BUCKET#user-1#gpt-4#3" in new_pks
    +
    +
    +def test_propagate_existing_shards_not_created():
    +    """Existing shards get UpdateItem, not PutItem."""
    +    record = make_modify_record(
    +        pk="ns1/BUCKET#user-1#gpt-4#0",
    +        sk="#STATE",
    +        new_image={"shard_count": {"N": "4"}, "PK": {"S": "ns1/BUCKET#user-1#gpt-4#0"}, "SK": {"S": "#STATE"}, ...},
    +        old_image={"shard_count": {"N": "2"}, ...},
    +    )
    +    mock_table = MagicMock()
    +    propagate_shard_count(mock_table, record)
    +
    +    # Shard 1 is existing — UpdateItem, not PutItem
    +    update_pks = {c.kwargs["Key"]["PK"] for c in mock_table.update_item.call_args_list}
    +    assert "ns1/BUCKET#user-1#gpt-4#1" in update_pks
    +
    +    put_pks = {c.kwargs["Item"]["PK"]["S"] for c in mock_table.put_item.call_args_list}
    +    assert "ns1/BUCKET#user-1#gpt-4#1" not in put_pks
    +
    +
    +def test_propagate_new_shard_skips_if_client_created():
    +    """PutItem with attribute_not_exists(PK) skips client-created shards."""
    +    mock_table = MagicMock()
    +    mock_table.put_item.side_effect = ClientError(
    +        {"Error": {"Code": "ConditionalCheckFailedException"}}, "PutItem"
    +    )
    +    record = make_modify_record(
    +        pk="ns1/BUCKET#user-1#gpt-4#0",
    +        sk="#STATE",
    +        new_image={"shard_count": {"N": "2"}, "PK": {"S": "ns1/BUCKET#user-1#gpt-4#0"}, "SK": {"S": "#STATE"}, ...},
    +        old_image={"shard_count": {"N": "1"}, ...},
    +    )
    +    propagated = propagate_shard_count(mock_table, record)
    +    assert propagated == 0  # Client already created it
    +```
    +
    +**Step 2: Run tests to verify they fail**
    +
    +Run: `uv run pytest tests/unit/test_processor.py -k "test_propagate" -v`
    +Expected: FAIL (current code uses UpdateItem for all shards, no PutItem)
    +
    +**Step 3: Rewrite propagate_shard_count with two code paths**
    +
    +```python
    +def propagate_shard_count(
    +    table: Any,
    +    record: dict[str, Any],
    +) -> int:
    +    """Propagate shard_count changes to all other shard items.
    +
    +    Detects shard_count change in stream record (OldImage vs NewImage).
    +    Only propagates from shard 0. Two code paths:
    +    - Existing shards (1..old_count-1): UpdateItem with shard_count < :new
    +    - New shards (old_count..new_count-1): PutItem cloned from shard 0's
    +      NewImage with adjusted PK/GSI keys and effective token capacity.
    +      Uses attribute_not_exists(PK) to avoid overwriting client-created items.
    +
    +    Args:
    +        table: boto3 Table resource
    +        record: DynamoDB stream record
    +
    +    Returns:
    +        Number of shard items created or updated
    +    """
    +    dynamodb_data = record.get("dynamodb", {})
    +    new_image = dynamodb_data.get("NewImage", {})
    +    old_image = dynamodb_data.get("OldImage", {})
    +
    +    new_count_raw = new_image.get("shard_count", {}).get("N")
    +    old_count_raw = old_image.get("shard_count", {}).get("N")
    +    if not new_count_raw or not old_count_raw:
    +        return 0
    +
    +    new_count = int(new_count_raw)
    +    old_count = int(old_count_raw)
    +    if new_count <= old_count:
    +        return 0
    +
    +    pk = new_image.get("PK", {}).get("S", "")
    +    try:
    +        namespace_id, entity_id, resource, shard_id = parse_bucket_pk(pk)
    +    except ValueError:
    +        return 0
    +
    +    if shard_id != 0:
    +        return 0  # Only propagate from source of truth
    +
    +    updated = 0
    +
    +    # Path 1: Update existing shards (lightweight shard_count update)
    +    for target_shard in range(1, old_count):
    +        try:
    +            table.update_item(
    +                Key={
    +                    "PK": pk_bucket(namespace_id, entity_id, resource, target_shard),
    +                    "SK": sk_state(),
    +                },
    +                UpdateExpression="SET shard_count = :new",
    +                ConditionExpression="shard_count < :new",
    +                ExpressionAttributeValues={
    +                    ":new": new_count,
    +                },
    +            )
    +            updated += 1
    +        except ClientError as e:
    +            if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
    +                continue  # Higher value already present
    +            raise
    +
    +    # Path 2: Pre-create new shards (full item cloned from shard 0)
    +    for target_shard in range(old_count, new_count):
    +        try:
    +            item = dict(new_image)  # Clone shard 0's NewImage
    +            item["PK"] = {"S": pk_bucket(namespace_id, entity_id, resource, target_shard)}
    +            item["GSI2SK"] = {"S": gsi2_sk_bucket(entity_id, target_shard)}
    +            item["GSI3SK"] = {"S": gsi3_sk_bucket(resource, target_shard)}
    +            item["GSI4SK"] = {"S": gsi4_sk_bucket(entity_id, resource, target_shard)}
    +            item["shard_count"] = {"N": str(new_count)}
    +            # Reset tokens to effective per-shard capacity (full bucket)
    +            for limit_name, info in _extract_limit_attrs(new_image).items():
    +                cp_milli = info["cp_milli"]
    +                if limit_name == WCU_LIMIT_NAME:
    +                    effective_cp = cp_milli  # wcu is per-partition, not divided
    +                else:
    +                    effective_cp = cp_milli // new_count
    +                item[bucket_attr(limit_name, BUCKET_FIELD_TK)] = {"N": str(effective_cp)}
    +                item[bucket_attr(limit_name, BUCKET_FIELD_TC)] = {"N": "0"}
    +            table.put_item(
    +                Item=item,
    +                ConditionExpression="attribute_not_exists(PK)",
    +            )
    +            updated += 1
    +        except ClientError as e:
    +            if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
    +                continue  # Client already created this shard
    +            raise
    +
    +    if updated > 0:
    +        logger.info(
    +            "Shard count propagated",
    +            entity_id=entity_id,
    +            resource=resource,
    +            new_count=new_count,
    +            shards_updated=updated,
    +        )
    +    return updated
    +```
    +
    +**Note:** `_extract_limit_attrs` is a helper that parses `b_{name}_cp` attributes from the NewImage to get limit names and their capacity values. This may already exist in the parser or can be extracted from `_parse_bucket_record`.
    +
    +**Step 4: Run tests**
    +
    +Run: `uv run pytest tests/unit/test_processor.py -v`
    +Expected: PASS
    +
    +**Step 5: Commit**
    +
    +```bash
    +git add src/zae_limiter_aggregator/processor.py tests/unit/test_processor.py
    +git commit -m "🐛 fix(aggregator): pre-create full shard items during propagation to keep clients on fast path"
    +```
    +
    +---
    +
    +### Task 37: Fix proactive sharding signal to use wcu token level
    +
    +**Problem:** `try_proactive_shard()` (`processor.py:635-697`) compares `wcu_tc_delta / wcu_capacity_milli >= 0.8`. This is dimensionally incorrect — `tc_delta` is a count (total consumed in this batch), while `capacity_milli` is a rate (tokens per second). The comparison produces a dimensionless ratio that varies with batch size rather than measuring actual partition pressure. With `BatchSize=100`, the maximum achievable ratio is `100 × 1000 / 1_000_000 = 0.1` (10%), so proactive sharding can never trigger.
    +
    +**Three options considered:**
    +- **Option A (increase BatchSize):** Requires `BatchSize >= 800` — increases Lambda latency and cost, tight coupling to a constant.
    +- **Option B (normalize by time span):** `tc_delta / (time_span_sec × capacity_rate)` — requires per-bucket first/last timestamps, division-by-zero edge cases for sub-second batches.
    +- **Option C (use remaining wcu token level):** Check `wcu_tk_milli / wcu_cp_milli < 0.2` (less than 20% tokens remaining). Token level directly measures partition headroom, naturally accounts for refill, requires no time span computation, and the data is already available in `BucketRefillState.tk_milli`.
    +
    +**Decision: Option C.** The token level from the last NewImage in the batch directly measures how much headroom the partition has. It naturally accounts for both consumption and refill. The only weakness is that aggregator refill can temporarily mask sustained pressure for one batch cycle, but this self-corrects: if pressure continues, tokens drain again and the next batch triggers sharding.
    +
    +**Fix:** Change `try_proactive_shard` signature and body:
    +- Remove `wcu_tc_delta` parameter
    +- Add `wcu_tk_milli` parameter (remaining wcu tokens from last NewImage)
    +- Change threshold check to `wcu_tk_milli / wcu_capacity_milli < WCU_PROACTIVE_THRESHOLD_LOW` where `WCU_PROACTIVE_THRESHOLD_LOW = 0.2` (less than 20% remaining)
    +- Rename `WCU_PROACTIVE_THRESHOLD` to `WCU_PROACTIVE_THRESHOLD_LOW` to clarify it's a low-water mark
    +
    +**BatchSize stays at 100** — no CloudFormation change needed.
    +
    +**Files:**
    +- Modify: `src/zae_limiter_aggregator/processor.py` (`try_proactive_shard`, callers)
    +- Modify: `src/zae_limiter_aggregator/__init__.py` (if re-exports change)
    +- Test: `tests/unit/test_processor.py`
    +
    +**Step 1: Write failing tests**
    +
    +In `tests/unit/test_processor.py`, update `TestTryProactiveShard`:
    +
    +```python
    +def test_proactive_shard_triggers_at_low_token_level(self) -> None:
    +    """Proactive sharding triggers when wcu tokens < 20% of capacity."""
    +    mock_table = MagicMock()
    +    state = self._make_state(shard_id=0, shard_count=1)
    +
    +    # 15% remaining → below 20% threshold → should trigger
    +    wcu_tk_milli = 150_000  # 15% of 1_000_000
    +    wcu_capacity_milli = 1_000_000
    +
    +    result = try_proactive_shard(mock_table, state, wcu_tk_milli, wcu_capacity_milli)
    +
    +    assert result is True
    +    mock_table.update_item.assert_called_once()
    +
    +def test_proactive_shard_skips_above_threshold(self) -> None:
    +    """No sharding when wcu tokens >= 20% of capacity."""
    +    mock_table = MagicMock()
    +    state = self._make_state(shard_id=0, shard_count=1)
    +
    +    # 25% remaining → above 20% threshold → no sharding
    +    wcu_tk_milli = 250_000
    +    wcu_capacity_milli = 1_000_000
    +
    +    result = try_proactive_shard(mock_table, state, wcu_tk_milli, wcu_capacity_milli)
    +
    +    assert result is False
    +    mock_table.update_item.assert_not_called()
    +
    +def test_proactive_shard_triggers_at_zero_tokens(self) -> None:
    +    """Proactive sharding triggers when wcu tokens are completely exhausted."""
    +    mock_table = MagicMock()
    +    state = self._make_state(shard_id=0, shard_count=1)
    +
    +    result = try_proactive_shard(mock_table, state, 0, 1_000_000)
    +
    +    assert result is True
    +
    +def test_proactive_shard_boundary_at_exactly_20_percent(self) -> None:
    +    """At exactly 20% remaining, no sharding (threshold is strictly less than)."""
    +    mock_table = MagicMock()
    +    state = self._make_state(shard_id=0, shard_count=1)
    +
    +    wcu_tk_milli = 200_000  # Exactly 20%
    +    wcu_capacity_milli = 1_000_000
    +
    +    result = try_proactive_shard(mock_table, state, wcu_tk_milli, wcu_capacity_milli)
    +
    +    assert result is False  # Not strictly less than 20%
    +
    +def test_proactive_shard_negative_tokens(self) -> None:
    +    """Negative tokens (overdrawn wcu) trigger sharding."""
    +    mock_table = MagicMock()
    +    state = self._make_state(shard_id=0, shard_count=1)
    +
    +    result = try_proactive_shard(mock_table, state, -50_000, 1_000_000)
    +
    +    assert result is True
    +```
    +
    +**Step 2: Run tests to verify they fail**
    +
    +Run: `uv run pytest tests/unit/test_processor.py::TestTryProactiveShard -v`
    +Expected: FAIL (old signature takes `wcu_tc_delta`, not `wcu_tk_milli`; old logic checks `>=0.8` not `<0.2`)
    +
    +**Step 3: Update try_proactive_shard**
    +
    +In `processor.py`, replace the function:
    +
    +```python
    +WCU_PROACTIVE_THRESHOLD_LOW = 0.2  # Shard when wcu tokens < 20% of capacity
    +
    +
    +def try_proactive_shard(
    +    table: Any,
    +    state: BucketRefillState,
    +    wcu_tk_milli: int,
    +    wcu_capacity_milli: int,
    +) -> bool:
    +    """Proactively double shard_count when wcu token level is low.
    +
    +    Checks remaining wcu tokens against capacity. When tokens drop
    +    below 20% of capacity, the partition is under sustained write
    +    pressure and should be split.
    +
    +    Only acts on shard 0 (source of truth for shard_count).
    +    Uses conditional write to prevent double-bumping.
    +
    +    Args:
    +        table: boto3 Table resource
    +        state: Aggregated bucket state
    +        wcu_tk_milli: Remaining wcu tokens in millitokens (from last NewImage)
    +        wcu_capacity_milli: wcu capacity in millitokens
    +
    +    Returns:
    +        True if shard_count was bumped, False otherwise
    +    """
    +    if state.shard_id != 0:
    +        return False
    +
    +    if wcu_capacity_milli <= 0:
    +        return False
    +
    +    token_ratio = wcu_tk_milli / wcu_capacity_milli
    +    if token_ratio >= WCU_PROACTIVE_THRESHOLD_LOW:
    +        return False
    +
    +    new_count = state.shard_count * 2
    +
    +    try:
    +        table.update_item(
    +            Key={
    +                "PK": pk_bucket(state.namespace_id, state.entity_id, state.resource, 0),
    +                "SK": sk_state(),
    +            },
    +            UpdateExpression="SET shard_count = :new",
    +            ConditionExpression="shard_count = :old",
    +            ExpressionAttributeValues={
    +                ":old": state.shard_count,
    +                ":new": new_count,
    +            },
    +        )
    +        logger.info(
    +            "Proactive shard doubling",
    +            entity_id=state.entity_id,
    +            resource=state.resource,
    +            old_count=state.shard_count,
    +            new_count=new_count,
    +            token_ratio=round(token_ratio, 2),
    +        )
    +        return True
    +    except ClientError as e:
    +        if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
    +            logger.debug(
    +                "Proactive shard skipped - concurrent bump",
    +                entity_id=state.entity_id,
    +                resource=state.resource,
    +            )
    +            return False
    +        raise
    +```
    +
    +**Step 4: Update callers**
    +
    +In the `process_records()` function, change the call site from:
    +
    +```python
    +# Before:
    +wcu_tc_delta = ...  # accumulated from extract_deltas
    +try_proactive_shard(table, state, wcu_tc_delta, wcu_capacity_milli)
    +
    +# After:
    +wcu_tk_milli = state.limits[WCU_LIMIT_NAME].tk_milli  # from last NewImage
    +try_proactive_shard(table, state, wcu_tk_milli, wcu_capacity_milli)
    +```
    +
    +**Step 5: Remove Task 34's `test_unreachable_at_batch_size_100` test**
    +
    +This test documents that proactive sharding can't trigger at BatchSize=100 using the old tc_delta metric. With the new token-level signal, proactive sharding can trigger at any batch size, so this test becomes incorrect. Remove it.
    +
    +**Step 6: Run all unit tests**
    +
    +Run: `uv run pytest tests/unit/ -v`
    +Expected: PASS
    +
    +**Step 7: Commit**
    +
    +```bash
    +git add src/zae_limiter_aggregator/processor.py tests/unit/test_processor.py
    +git commit -m "🐛 fix(aggregator): use wcu token level for proactive sharding signal instead of tc_delta"
    +```
    +
    +#### Task 37 Test Plan
    +
    +**Scope:** 24 tests across 4 layers validating the signal change from `tc_delta/capacity >= 0.8` to `tk_milli/capacity < 0.2`.
    +
    +##### Unit Tests — `tests/unit/test_processor.py::TestTryProactiveShard`
    +
    +**Existing tests to update** (rename `wcu_tc_delta` → `wcu_tk_milli`, invert threshold logic):
    +
    +| # | Test | Old Params | New Params | Expected |
    +|---|------|-----------|------------|----------|
    +| 1 | `test_triggers_at_threshold` | `tc_delta=900k` (90%>80%) | `tk_milli=100k` (10%<20%) | True |
    +| 2 | `test_skips_below_threshold` | `tc_delta=500k` (50%<80%) | `tk_milli=500k` (50%>=20%) | False |
    +| 3 | `test_skips_non_shard_0` | param rename | `tk_milli=100k` | False |
    +| 4 | `test_conditional_check_failure_returns_false` | param rename | `tk_milli=100k` | False |
    +| 5 | `test_zero_capacity_skips` | `tc_delta=900k, cap=0` | `tk_milli=0, cap=0` | False |
    +
    +**New unit tests:**
    +
    +| # | Test | Token Level | Capacity | Expected | Edge Case |
    +|---|------|-------------|----------|----------|-----------|
    +| 6 | `test_triggers_at_low_token_level` | 150,000 (15%) | 1,000,000 | True | Core happy path |
    +| 7 | `test_skips_above_threshold` | 250,000 (25%) | 1,000,000 | False | Core negative path |
    +| 8 | `test_triggers_at_zero_tokens` | 0 | 1,000,000 | True | Complete exhaustion |
    +| 9 | `test_boundary_at_exactly_20_percent` | 200,000 (20%) | 1,000,000 | False | Boundary: `>=` is safe, `<` shards |
    +| 10 | `test_negative_tokens_triggers` | -50,000 | 1,000,000 | True | Overdrawn wcu via adjustment |
    +| 11 | `test_one_millitoken_below_threshold` | 199,999 | 1,000,000 | True | Off-by-one: 19.9999% < 20% |
    +| 12 | `test_doubles_shard_count_from_2` | 100,000 | 1,000,000 | True, new=4 | Verify `shard_count * 2` with existing count=2 |
    +| 13 | `test_negative_capacity_skips` | 100,000 | -1,000,000 | False | Corrupted data guard |
    +| 14 | `test_batch_size_independent` | 100,000 | 1,000,000 | True | Token level is invariant to BatchSize |
    +
    +**Test to remove:**
    +- `test_unreachable_at_batch_size_100` (Task 34) — documents the tc_delta limitation; no longer applicable with token-level signal.
    +
    +**Constant rename regression:**
    +
    +| # | Test | Description |
    +|---|------|-------------|
    +| 15 | `test_constant_renamed` | Assert `WCU_PROACTIVE_THRESHOLD_LOW` exists and `WCU_PROACTIVE_THRESHOLD` is removed |
    +
    +##### Caller Wiring Tests — `tests/unit/test_processor.py::TestProcessStreamRecords`
    +
    +| # | Test | Description | Validate |
    +|---|------|-------------|----------|
    +| 16 | `test_process_records_passes_tk_milli` | Mock `try_proactive_shard`, verify call uses `wcu_tk_milli=wcu_info.tk_milli` | Correct parameter wiring at `processor.py:233-238` |
    +| 17 | `test_process_records_proactive_shard_selective` | Two bucket states: one with 15% wcu remaining, one with 50% | Only the low-level bucket triggers sharding |
    +
    +##### Integration Tests — `tests/integration/test_bucket_sharding.py::TestProactiveShardingIntegration`
    +
    +| # | Test | Description | Validate |
    +|---|------|-------------|----------|
    +| 18 | `test_proactive_shard_doubles_count` (update) | Change call from `wcu_tc_delta=900k` to `wcu_tk_milli=100k` | DynamoDB conditional write succeeds, shard_count 1→2 |
    +| 19 | `test_proactive_shard_skips_healthy_bucket` | Seed bucket with `wcu_tk=500k` (50%), call with `wcu_tk_milli=500k` | No write, shard_count unchanged |
    +| 20 | `test_proactive_shard_after_refill` | Seed bucket with `wcu_tk=800k` (80% after aggregator refill) | No sharding — refill masks pressure for one cycle (expected) |
    +| 21 | `test_proactive_shard_sustained_pressure` | Two sequential calls: first 80% tokens (no shard), second 10% (shard) | Self-correction: sustained pressure triggers on next batch |
    +
    +##### E2E Tests — `tests/e2e/test_localstack.py` (new class `TestProactiveShardingE2E`)
    +
    +| # | Test | Description | Validate |
    +|---|------|-------------|----------|
    +| 22 | `test_aggregator_proactive_sharding_e2e` | Create entity, exhaust wcu via rapid speculative writes, wait for Lambda stream processing | Full pipeline: writes → Stream → Lambda → proactive shard → propagation |
    +| 23 | `test_aggregator_no_shard_under_normal_load` | Create entity, moderate writes (below threshold), wait for Lambda | No false positives under normal load |
    +| 24 | `test_aggregator_refill_then_drain_e2e` | Exhaust, wait for refill (tokens recover), exhaust again, verify shard on second cycle | End-to-end validation of the refill-masking self-correction |
    +
    +**E2E markers:** `@pytest.mark.e2e`, `@pytest.mark.slow` (stream processing delay ~30s).
    +
    +##### Summary
    +
    +| Layer | Count | Files | Backend |
    +|-------|-------|-------|---------|
    +| Unit (function-level) | 14 | `test_processor.py` | Mocked |
    +| Constant regression | 1 | `test_processor.py` | Mocked |
    +| Caller wiring | 2 | `test_processor.py` | Mocked |
    +| Integration (DynamoDB) | 4 | `test_bucket_sharding.py` | LocalStack |
    +| E2E (full pipeline) | 3 | `test_localstack.py` | LocalStack + Lambda |
    +| **Total** | **24** | | |
    +
    +##### Key Edge Cases
    +
    +1. **Boundary precision:** Exactly 20% is safe, 19.9999% triggers
    +2. **Negative tokens:** Overdrawn wcu still triggers (strongest signal)
    +3. **Zero/negative capacity:** Division-by-zero and corrupted data guards
    +4. **Aggregator refill masking:** After refill, tokens recover → no shard for one cycle → self-corrects next batch
    +5. **Batch size independence:** Token level is invariant to `BatchSize` (unlike tc_delta)
    +6. **Concurrent bumps:** `ConditionalCheckFailedException` returns `False`
    +7. **Non-shard-0:** Only shard 0 triggers (source of truth)
    
  • src/zae_limiter_aggregator/processor.py+102 14 modified
    @@ -8,6 +8,7 @@
     from typing import Any
     
     import boto3
    +from boto3.dynamodb.types import TypeDeserializer
     from botocore.exceptions import ClientError
     
     from zae_limiter.bucket import refill_bucket
    @@ -21,9 +22,13 @@
         BUCKET_PREFIX,
         SK_BUCKET,
         WCU_LIMIT_NAME,
    +    WCU_SHARD_WARN_THRESHOLD,
         bucket_attr,
         gsi2_pk_resource,
    +    gsi2_sk_bucket,
         gsi2_sk_usage,
    +    gsi3_sk_bucket,
    +    gsi4_sk_bucket,
         parse_bucket_pk,
         parse_namespace,
         pk_bucket,
    @@ -225,15 +230,15 @@ def process_stream_records(
                 )
                 errors.append(error_msg)
     
    -    # Proactive sharding (check wcu consumption per bucket)
    +    # Proactive sharding (check wcu token level per bucket)
         for state in bucket_states.values():
             wcu_info = state.limits.get(WCU_LIMIT_NAME)
             if wcu_info:
                 try:
                     try_proactive_shard(
                         table,
                         state,
    -                    wcu_tc_delta=wcu_info.tc_delta,
    +                    wcu_tk_milli=wcu_info.tk_milli,
                         wcu_capacity_milli=wcu_info.cp_milli,
                     )
                 except Exception as e:
    @@ -629,24 +634,28 @@ def try_refill_bucket(
             raise
     
     
    -WCU_PROACTIVE_THRESHOLD = 0.8  # Shard when wcu consumption >= 80% of capacity
    +WCU_PROACTIVE_THRESHOLD_LOW = 0.2  # Shard when wcu tokens < 20% of capacity
     
     
     def try_proactive_shard(
         table: Any,
         state: BucketRefillState,
    -    wcu_tc_delta: int,
    +    wcu_tk_milli: int,
         wcu_capacity_milli: int,
     ) -> bool:
    -    """Proactively double shard_count when wcu consumption approaches capacity.
    +    """Proactively double shard_count when wcu token level is low.
    +
    +    Checks remaining wcu tokens against capacity. When tokens drop
    +    below 20% of capacity, the partition is under sustained write
    +    pressure and should be split.
     
         Only acts on shard 0 (source of truth for shard_count).
         Uses conditional write to prevent double-bumping.
     
         Args:
             table: boto3 Table resource
             state: Aggregated bucket state
    -        wcu_tc_delta: Accumulated wcu tc_delta in this batch (millitokens)
    +        wcu_tk_milli: Remaining wcu tokens in millitokens (from last NewImage)
             wcu_capacity_milli: wcu capacity in millitokens
     
         Returns:
    @@ -658,8 +667,8 @@ def try_proactive_shard(
         if wcu_capacity_milli <= 0:
             return False
     
    -    consumption_ratio = wcu_tc_delta / wcu_capacity_milli
    -    if consumption_ratio < WCU_PROACTIVE_THRESHOLD:
    +    token_ratio = wcu_tk_milli / wcu_capacity_milli
    +    if token_ratio >= WCU_PROACTIVE_THRESHOLD_LOW:
             return False
     
         new_count = state.shard_count * 2
    @@ -683,8 +692,16 @@ def try_proactive_shard(
                 resource=state.resource,
                 old_count=state.shard_count,
                 new_count=new_count,
    -            consumption_ratio=round(consumption_ratio, 2),
    +            token_ratio=round(token_ratio, 2),
             )
    +        if new_count > WCU_SHARD_WARN_THRESHOLD:
    +            logger.warning(
    +                "High shard count after proactive doubling",
    +                entity_id=state.entity_id,
    +                resource=state.resource,
    +                shard_count=new_count,
    +                threshold=WCU_SHARD_WARN_THRESHOLD,
    +            )
             return True
         except ClientError as e:
             if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
    @@ -697,22 +714,58 @@ def try_proactive_shard(
             raise
     
     
    +def _extract_limit_attrs(
    +    image: dict[str, Any],
    +) -> dict[str, dict[str, int]]:
    +    """Extract limit names and their capacity/token fields from a stream image.
    +
    +    Scans ``b_{name}_cp`` attributes to discover limits and their values.
    +
    +    Returns:
    +        Dict of limit_name -> {"cp_milli": int, "tk_milli": int}
    +    """
    +    limits: dict[str, dict[str, int]] = {}
    +    for attr_name in image:
    +        if not attr_name.startswith(BUCKET_ATTR_PREFIX):
    +            continue
    +        rest = attr_name[len(BUCKET_ATTR_PREFIX) :]
    +        idx = rest.rfind("_")
    +        if idx <= 0:
    +            continue
    +        if rest[idx + 1 :] != BUCKET_FIELD_CP:
    +            continue
    +        limit_name = rest[:idx]
    +        if not limit_name:
    +            continue
    +
    +        cp_attr = bucket_attr(limit_name, BUCKET_FIELD_CP)
    +        tk_attr = bucket_attr(limit_name, BUCKET_FIELD_TK)
    +        limits[limit_name] = {
    +            "cp_milli": int(image.get(cp_attr, {}).get("N", "0")),
    +            "tk_milli": int(image.get(tk_attr, {}).get("N", "0")),
    +        }
    +    return limits
    +
    +
     def propagate_shard_count(
         table: Any,
         record: dict[str, Any],
     ) -> int:
         """Propagate shard_count changes to all other shard items.
     
         Detects shard_count change in stream record (OldImage vs NewImage).
    -    Only propagates from shard 0. Uses conditional write to prevent
    -    overwriting a higher shard_count set by another writer.
    +    Only propagates from shard 0. Two code paths:
    +    - Existing shards (1..old_count-1): UpdateItem with shard_count < :new
    +    - New shards (old_count..new_count-1): PutItem cloned from shard 0's
    +      NewImage with adjusted PK/GSI keys and effective token capacity.
    +      Uses attribute_not_exists(PK) to avoid overwriting client-created items.
     
         Args:
             table: boto3 Table resource
             record: DynamoDB stream record
     
         Returns:
    -        Number of shard items updated
    +        Number of shard items created or updated
         """
         dynamodb_data = record.get("dynamodb", {})
         new_image = dynamodb_data.get("NewImage", {})
    @@ -738,15 +791,17 @@ def propagate_shard_count(
             return 0  # Only propagate from source of truth
     
         updated = 0
    -    for target_shard in range(1, new_count):
    +
    +    # Path 1: Update existing shards (lightweight shard_count update)
    +    for target_shard in range(1, old_count):
             try:
                 table.update_item(
                     Key={
                         "PK": pk_bucket(namespace_id, entity_id, resource, target_shard),
                         "SK": sk_state(),
                     },
                     UpdateExpression="SET shard_count = :new",
    -                ConditionExpression=("attribute_not_exists(shard_count) OR shard_count < :new"),
    +                ConditionExpression="shard_count < :new",
                     ExpressionAttributeValues={
                         ":new": new_count,
                     },
    @@ -757,6 +812,39 @@ def propagate_shard_count(
                     continue  # Higher value already present
                 raise
     
    +    # Path 2: Pre-create new shards (full item cloned from shard 0)
    +    # Deserialize wire format ({"S": "val"}, {"N": "1"}) to Python types
    +    # because table.put_item() (boto3 Table resource) auto-serializes.
    +    deserializer = TypeDeserializer()
    +    base_item = {k: deserializer.deserialize(v) for k, v in new_image.items()}
    +    limit_attrs = _extract_limit_attrs(new_image)
    +    for target_shard in range(old_count, new_count):
    +        try:
    +            item = dict(base_item)
    +            item["PK"] = pk_bucket(namespace_id, entity_id, resource, target_shard)
    +            item["GSI2SK"] = gsi2_sk_bucket(entity_id, target_shard)
    +            item["GSI3SK"] = gsi3_sk_bucket(resource, target_shard)
    +            item["GSI4SK"] = gsi4_sk_bucket(entity_id, resource, target_shard)
    +            item["shard_count"] = new_count
    +            # Reset tokens to effective per-shard capacity (full bucket)
    +            for limit_name, info in limit_attrs.items():
    +                cp_milli = info["cp_milli"]
    +                if limit_name == WCU_LIMIT_NAME:
    +                    effective_cp = cp_milli  # wcu is per-partition, not divided
    +                else:
    +                    effective_cp = cp_milli // new_count
    +                item[bucket_attr(limit_name, BUCKET_FIELD_TK)] = effective_cp
    +                item[bucket_attr(limit_name, BUCKET_FIELD_TC)] = 0
    +            table.put_item(
    +                Item=item,
    +                ConditionExpression="attribute_not_exists(PK)",
    +            )
    +            updated += 1
    +        except ClientError as e:
    +            if e.response["Error"]["Code"] == "ConditionalCheckFailedException":
    +                continue  # Client already created this shard
    +            raise
    +
         if updated > 0:
             logger.info(
                 "Shard count propagated",
    
  • src/zae_limiter/limiter.py+23 30 modified
    @@ -44,7 +44,8 @@
         validate_resource,
     )
     from .repository import Repository
    -from .schema import DEFAULT_RESOURCE, WCU_LIMIT_NAME
    +from .repository_protocol import SpeculativeFailureReason
    +from .schema import DEFAULT_RESOURCE
     
     _UNSET: Any = object()  # sentinel for detecting explicitly-passed deprecated params
     
    @@ -736,13 +737,19 @@ async def _try_speculative_acquire(
                     await self._compensate_speculative(result.parent_id, resource, consume)
     
                 # Shard doubling: if wcu exhausted, double shard_count and update cache
    -            if self._is_wcu_exhausted(result.old_buckets):
    +            if result.failure_reason in (
    +                SpeculativeFailureReason.WCU_EXHAUSTED,
    +                SpeculativeFailureReason.BOTH_EXHAUSTED,
    +            ):
                     await self._repository.bump_shard_count(entity_id, resource, result.shard_count)
                     # Fall through to slow path (new shard bucket will be created)
                     return None
     
    -            # Shard retry: if multi-shard, try another shard
    -            if result.shard_count > 1:
    +            # Shard retry: if multi-shard and app limit exhausted, try another shard
    +            if (
    +                result.shard_count > 1
    +                and result.failure_reason == SpeculativeFailureReason.APP_LIMIT_EXHAUSTED
    +            ):
                     retry_result = await self._retry_on_other_shard(
                         entity_id, resource, consume, ttl_seconds=None, result=result
                     )
    @@ -1003,28 +1010,6 @@ def _check_speculative_failure(
     
         _MAX_SHARD_RETRIES = 2
     
    -    @staticmethod
    -    def _is_wcu_exhausted(old_buckets: "list[BucketState] | None") -> bool:
    -        """Check if the wcu infrastructure limit is exhausted.
    -
    -        The wcu limit tracks per-partition write pressure (GHSA-76rv). When
    -        exhausted (<1000 millitokens = <1 WCU), the caller should double
    -        shard_count to distribute writes across more DynamoDB partitions.
    -
    -        Args:
    -            old_buckets: BucketStates from a failed speculative result's
    -                ALL_OLD response. None if the bucket does not exist.
    -
    -        Returns:
    -            True if the wcu limit exists and has fewer than 1000 millitokens.
    -        """
    -        if old_buckets is None:
    -            return False
    -        for b in old_buckets:
    -            if b.limit_name == WCU_LIMIT_NAME and b.tokens_milli < 1000:
    -                return True
    -        return False
    -
         async def _retry_on_other_shard(
             self,
             entity_id: str,
    @@ -1872,20 +1857,28 @@ async def get_resource_capacity(
                         parent_ids.add(bucket.entity_id)
                 buckets = [b for b in buckets if b.entity_id in parent_ids]
     
    +        # Group buckets by entity_id to deduplicate shards (GHSA-76rv).
    +        # Each shard stores full undivided capacity; available tokens are
    +        # distributed across shards.
    +        entity_buckets: dict[str, list[BucketState]] = {}
    +        for bucket in buckets:
    +            entity_buckets.setdefault(bucket.entity_id, []).append(bucket)
    +
             entities: list[EntityCapacity] = []
             total_capacity = 0
             total_available = 0
     
    -        for bucket in buckets:
    -            available = calculate_available(bucket, now_ms)
    -            capacity = bucket.capacity
    +        for entity_id, entity_bucket_list in entity_buckets.items():
    +            capacity = entity_bucket_list[0].capacity
    +            available = sum(calculate_available(b, now_ms) for b in entity_bucket_list)
    +            available = min(available, capacity)
     
                 total_capacity += capacity
                 total_available += available
     
                 entities.append(
                     EntityCapacity(
    -                    entity_id=bucket.entity_id,
    +                    entity_id=entity_id,
                         capacity=capacity,
                         available=available,
                         utilization_pct=(
    
  • src/zae_limiter/repository_protocol.py+15 0 modified
    @@ -8,6 +8,7 @@
     """
     
     from dataclasses import dataclass, field
    +from enum import Enum
     from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
     
     if TYPE_CHECKING:
    @@ -24,6 +25,19 @@
         )
     
     
    +class SpeculativeFailureReason(Enum):
    +    """Classifies why a speculative write failed (GHSA-76rv).
    +
    +    Used by the limiter to decide the recovery path without
    +    inspecting individual BucketState token values.
    +    """
    +
    +    APP_LIMIT_EXHAUSTED = "app_limit_exhausted"
    +    WCU_EXHAUSTED = "wcu_exhausted"
    +    BOTH_EXHAUSTED = "both_exhausted"
    +    BUCKET_MISSING = "bucket_missing"
    +
    +
     @dataclass
     class SpeculativeResult:
         """Result of a speculative UpdateItem attempt.
    @@ -52,6 +66,7 @@ class SpeculativeResult:
         parent_result: "SpeculativeResult | None" = None
         shard_id: int = 0
         shard_count: int = 1
    +    failure_reason: SpeculativeFailureReason | None = None
     
     
     @runtime_checkable
    
  • src/zae_limiter/repository.py+35 5 modified
    @@ -29,7 +29,7 @@
         validate_resource,
     )
     from .naming import normalize_stack_name
    -from .repository_protocol import SpeculativeResult
    +from .repository_protocol import SpeculativeFailureReason, SpeculativeResult
     
     if TYPE_CHECKING:
         from .repository_builder import RepositoryBuilder
    @@ -1888,9 +1888,7 @@ def build_composite_create(
                 "GSI3SK": {"S": schema.gsi3_sk_bucket(resource, shard_id)},
                 # GSI4: namespace-scoped item discovery
                 "GSI4PK": {"S": self._namespace_id},
    -            "GSI4SK": {
    -                "S": f"{schema.BUCKET_PREFIX}{entity_id}#{resource}#{shard_id}",
    -            },
    +            "GSI4SK": {"S": schema.gsi4_sk_bucket(entity_id, resource, shard_id)},
                 "shard_count": {"N": str(shard_count)},
             }
             if parent_id is not None:
    @@ -2405,14 +2403,37 @@ async def _speculative_consume_single(
                     if old_item:
                         old_buckets = self._deserialize_composite_bucket(old_item)
                         old_shard_count = int(old_item.get("shard_count", {}).get("N", "1"))
    +
    +                    # Classify failure reason (GHSA-76rv)
    +                    wcu_exhausted = any(
    +                        b.limit_name == schema.WCU_LIMIT_NAME and b.tokens_milli < 1000
    +                        for b in old_buckets
    +                    )
    +                    app_exhausted = any(
    +                        b.limit_name != schema.WCU_LIMIT_NAME
    +                        and b.tokens_milli < consume.get(b.limit_name, 0) * 1000
    +                        for b in old_buckets
    +                    )
    +                    if wcu_exhausted and app_exhausted:
    +                        reason = SpeculativeFailureReason.BOTH_EXHAUSTED
    +                    elif wcu_exhausted:
    +                        reason = SpeculativeFailureReason.WCU_EXHAUSTED
    +                    else:
    +                        reason = SpeculativeFailureReason.APP_LIMIT_EXHAUSTED
    +
                         return SpeculativeResult(
                             success=False,
                             old_buckets=old_buckets,
                             shard_id=shard_id,
                             shard_count=old_shard_count,
    +                        failure_reason=reason,
                         )
                     else:
    -                    return SpeculativeResult(success=False, shard_id=shard_id)
    +                    return SpeculativeResult(
    +                        success=False,
    +                        shard_id=shard_id,
    +                        failure_reason=SpeculativeFailureReason.BUCKET_MISSING,
    +                    )
                 raise
     
         async def bump_shard_count(self, entity_id: str, resource: str, current_count: int) -> int:
    @@ -2449,6 +2470,15 @@ async def bump_shard_count(self, entity_id: str, resource: str, current_count: i
                     },
                 )
                 effective_count = new_count
    +            if new_count > schema.WCU_SHARD_WARN_THRESHOLD:
    +                logger.warning(
    +                    "High shard count after doubling: entity_id=%s resource=%s "
    +                    "shard_count=%d threshold=%d",
    +                    entity_id,
    +                    resource,
    +                    new_count,
    +                    schema.WCU_SHARD_WARN_THRESHOLD,
    +                )
             except ClientError as e:
                 if e.response.get("Error", {}).get("Code") == "ConditionalCheckFailedException":
                     effective_count = current_count  # Another client already doubled
    
  • src/zae_limiter/schema.py+15 0 modified
    @@ -69,6 +69,7 @@
     WCU_LIMIT_CAPACITY = 1000  # DynamoDB per-partition WCU/sec limit
     WCU_LIMIT_REFILL_AMOUNT = 1000  # Refills to full capacity each second
     WCU_LIMIT_REFILL_PERIOD_SECONDS = 1
    +WCU_SHARD_WARN_THRESHOLD = 32  # Log warning when shard count exceeds this (GHSA-76rv)
     
     # Composite limit config attribute prefix and field suffixes (ADR-114 for configs)
     LIMIT_ATTR_PREFIX = "l_"
    @@ -411,6 +412,20 @@ def gsi3_sk_bucket(resource: str, shard_id: int) -> str:
         return f"{BUCKET_PREFIX}{resource}#{shard_id}"
     
     
    +def gsi4_sk_bucket(entity_id: str, resource: str, shard_id: int) -> str:
    +    """Build GSI4 sort key for bucket item (namespace-scoped discovery).
    +
    +    Args:
    +        entity_id: Entity owning the bucket
    +        resource: Resource name
    +        shard_id: Shard index (0-based)
    +
    +    Returns:
    +        GSI4SK string in format ``BUCKET#{entity_id}#{resource}#{shard_id}``
    +    """
    +    return f"{BUCKET_PREFIX}{entity_id}#{resource}#{shard_id}"
    +
    +
     def get_table_definition(table_name: str) -> dict[str, Any]:
         """
         Get the DynamoDB table definition for CreateTable.
    
  • src/zae_limiter/sync_limiter.py+18 29 modified
    @@ -42,10 +42,11 @@
         validate_identifier,
         validate_resource,
     )
    -from .schema import DEFAULT_RESOURCE, WCU_LIMIT_NAME
    +from .schema import DEFAULT_RESOURCE
     from .sync_config_cache import ConfigSource
     from .sync_lease import LeaseEntry, SyncLease
     from .sync_repository import SyncRepository
    +from .sync_repository_protocol import SpeculativeFailureReason
     
     _UNSET: Any = object()
     logger = logging.getLogger(__name__)
    @@ -622,10 +623,16 @@ def _try_speculative_acquire(
                 if result.parent_result is not None and result.parent_result.success:
                     assert result.parent_id is not None
                     self._compensate_speculative(result.parent_id, resource, consume)
    -            if self._is_wcu_exhausted(result.old_buckets):
    +            if result.failure_reason in (
    +                SpeculativeFailureReason.WCU_EXHAUSTED,
    +                SpeculativeFailureReason.BOTH_EXHAUSTED,
    +            ):
                     self._repository.bump_shard_count(entity_id, resource, result.shard_count)
                     return None
    -            if result.shard_count > 1:
    +            if (
    +                result.shard_count > 1
    +                and result.failure_reason == SpeculativeFailureReason.APP_LIMIT_EXHAUSTED
    +            ):
                     retry_result = self._retry_on_other_shard(
                         entity_id, resource, consume, ttl_seconds=None, result=result
                     )
    @@ -843,28 +850,6 @@ def _check_speculative_failure(
     
         _MAX_SHARD_RETRIES = 2
     
    -    @staticmethod
    -    def _is_wcu_exhausted(old_buckets: "list[BucketState] | None") -> bool:
    -        """Check if the wcu infrastructure limit is exhausted.
    -
    -        The wcu limit tracks per-partition write pressure (GHSA-76rv). When
    -        exhausted (<1000 millitokens = <1 WCU), the caller should double
    -        shard_count to distribute writes across more DynamoDB partitions.
    -
    -        Args:
    -            old_buckets: BucketStates from a failed speculative result's
    -                ALL_OLD response. None if the bucket does not exist.
    -
    -        Returns:
    -            True if the wcu limit exists and has fewer than 1000 millitokens.
    -        """
    -        if old_buckets is None:
    -            return False
    -        for b in old_buckets:
    -            if b.limit_name == WCU_LIMIT_NAME and b.tokens_milli < 1000:
    -                return True
    -        return False
    -
         def _retry_on_other_shard(
             self,
             entity_id: str,
    @@ -1532,17 +1517,21 @@ def get_resource_capacity(
                     if entity and entity.is_parent:
                         parent_ids.add(bucket.entity_id)
                 buckets = [b for b in buckets if b.entity_id in parent_ids]
    +        entity_buckets: dict[str, list[BucketState]] = {}
    +        for bucket in buckets:
    +            entity_buckets.setdefault(bucket.entity_id, []).append(bucket)
             entities: list[EntityCapacity] = []
             total_capacity = 0
             total_available = 0
    -        for bucket in buckets:
    -            available = calculate_available(bucket, now_ms)
    -            capacity = bucket.capacity
    +        for entity_id, entity_bucket_list in entity_buckets.items():
    +            capacity = entity_bucket_list[0].capacity
    +            available = sum(calculate_available(b, now_ms) for b in entity_bucket_list)
    +            available = min(available, capacity)
                 total_capacity += capacity
                 total_available += available
                 entities.append(
                     EntityCapacity(
    -                    entity_id=bucket.entity_id,
    +                    entity_id=entity_id,
                         capacity=capacity,
                         available=available,
                         utilization_pct=(capacity - available) / capacity * 100 if capacity > 0 else 0,
    
  • src/zae_limiter/sync_repository_protocol.py+15 0 modified
    @@ -7,6 +7,7 @@
     """
     
     from dataclasses import dataclass, field
    +from enum import Enum
     from typing import TYPE_CHECKING, Any, Protocol, runtime_checkable
     
     from .config_cache import CacheStats as CacheStats
    @@ -26,6 +27,19 @@
         from .sync_config_cache import ConfigSource
     
     
    +class SpeculativeFailureReason(Enum):
    +    """Classifies why a speculative write failed (GHSA-76rv).
    +
    +    Used by the limiter to decide the recovery path without
    +    inspecting individual BucketState token values.
    +    """
    +
    +    APP_LIMIT_EXHAUSTED = "app_limit_exhausted"
    +    WCU_EXHAUSTED = "wcu_exhausted"
    +    BOTH_EXHAUSTED = "both_exhausted"
    +    BUCKET_MISSING = "bucket_missing"
    +
    +
     @dataclass
     class SpeculativeResult:
         """Result of a speculative UpdateItem attempt.
    @@ -54,6 +68,7 @@ class SpeculativeResult:
         parent_result: "SpeculativeResult | None" = None
         shard_id: int = 0
         shard_count: int = 1
    +    failure_reason: SpeculativeFailureReason | None = None
     
     
     @runtime_checkable
    
  • src/zae_limiter/sync_repository.py+31 3 modified
    @@ -36,7 +36,7 @@
     )
     from .naming import normalize_stack_name
     from .sync_config_cache import ConfigSource, SyncConfigCache
    -from .sync_repository_protocol import SpeculativeResult
    +from .sync_repository_protocol import SpeculativeFailureReason, SpeculativeResult
     
     if TYPE_CHECKING:
         from .sync_repository_builder import SyncRepositoryBuilder
    @@ -1549,7 +1549,7 @@ def build_composite_create(
                 "GSI3PK": {"S": schema.gsi3_pk_entity(self._namespace_id, entity_id)},
                 "GSI3SK": {"S": schema.gsi3_sk_bucket(resource, shard_id)},
                 "GSI4PK": {"S": self._namespace_id},
    -            "GSI4SK": {"S": f"{schema.BUCKET_PREFIX}{entity_id}#{resource}#{shard_id}"},
    +            "GSI4SK": {"S": schema.gsi4_sk_bucket(entity_id, resource, shard_id)},
                 "shard_count": {"N": str(shard_count)},
             }
             if parent_id is not None:
    @@ -1974,14 +1974,34 @@ def _speculative_consume_single(
                     if old_item:
                         old_buckets = self._deserialize_composite_bucket(old_item)
                         old_shard_count = int(old_item.get("shard_count", {}).get("N", "1"))
    +                    wcu_exhausted = any(
    +                        b.limit_name == schema.WCU_LIMIT_NAME and b.tokens_milli < 1000
    +                        for b in old_buckets
    +                    )
    +                    app_exhausted = any(
    +                        b.limit_name != schema.WCU_LIMIT_NAME
    +                        and b.tokens_milli < consume.get(b.limit_name, 0) * 1000
    +                        for b in old_buckets
    +                    )
    +                    if wcu_exhausted and app_exhausted:
    +                        reason = SpeculativeFailureReason.BOTH_EXHAUSTED
    +                    elif wcu_exhausted:
    +                        reason = SpeculativeFailureReason.WCU_EXHAUSTED
    +                    else:
    +                        reason = SpeculativeFailureReason.APP_LIMIT_EXHAUSTED
                         return SpeculativeResult(
                             success=False,
                             old_buckets=old_buckets,
                             shard_id=shard_id,
                             shard_count=old_shard_count,
    +                        failure_reason=reason,
                         )
                     else:
    -                    return SpeculativeResult(success=False, shard_id=shard_id)
    +                    return SpeculativeResult(
    +                        success=False,
    +                        shard_id=shard_id,
    +                        failure_reason=SpeculativeFailureReason.BUCKET_MISSING,
    +                    )
                 raise
     
         def bump_shard_count(self, entity_id: str, resource: str, current_count: int) -> int:
    @@ -2018,6 +2038,14 @@ def bump_shard_count(self, entity_id: str, resource: str, current_count: int) ->
                     },
                 )
                 effective_count = new_count
    +            if new_count > schema.WCU_SHARD_WARN_THRESHOLD:
    +                logger.warning(
    +                    "High shard count after doubling: entity_id=%s resource=%s shard_count=%d threshold=%d",
    +                    entity_id,
    +                    resource,
    +                    new_count,
    +                    schema.WCU_SHARD_WARN_THRESHOLD,
    +                )
             except ClientError as e:
                 if e.response.get("Error", {}).get("Code") == "ConditionalCheckFailedException":
                     effective_count = current_count
    
  • tests/benchmark/test_capacity.py+7 3 modified
    @@ -339,7 +339,7 @@ def test_delete_entity_capacity(self, sync_limiter, capacity_counter):
             """Verify: delete_entity() batches in 25-item chunks.
     
             Expected calls:
    -        - 1 Query to find all entity items = 1 RCU
    +        - 2 Queries: 1 table query (entity items) + 1 GSI3 query (bucket items)
             - BatchWriteItem in chunks of 25 items
     
             For small entities (few items), only 1 BatchWriteItem call.
    @@ -365,7 +365,9 @@ def test_delete_entity_capacity(self, sync_limiter, capacity_counter):
                 sync_limiter.delete_entity("cap-delete")
     
             # Verify capacity consumption
    -        assert capacity_counter.query == 1, "Should have 1 Query to find entity items"
    +        assert capacity_counter.query == 2, (
    +            "Should have 2 Queries: entity items + GSI3 bucket discovery"
    +        )
             assert len(capacity_counter.batch_write_item) >= 1, "Should have at least 1 BatchWriteItem"
             # All items should be deleted in the batch
             assert sum(capacity_counter.batch_write_item) >= 1, "Should delete at least 1 item"
    @@ -396,7 +398,9 @@ def test_delete_entity_large_capacity(self, sync_limiter, capacity_counter):
                 sync_limiter.delete_entity("cap-delete-large")
     
             # Verify capacity consumption
    -        assert capacity_counter.query == 1, "Should have 1 Query to find entity items"
    +        assert capacity_counter.query == 2, (
    +            "Should have 2 Queries: entity items + GSI3 bucket discovery"
    +        )
             # Should have at least 2 batch write calls (30 items / 25 per batch = 2 batches)
             assert len(capacity_counter.batch_write_item) >= 2, (
                 "Should have at least 2 BatchWriteItem calls for >25 items"
    
  • tests/integration/test_bucket_sharding.py+1 1 modified
    @@ -279,7 +279,7 @@ def test_proactive_shard_doubles_count(self, dynamodb_table) -> None:
             result = try_proactive_shard(
                 dynamodb_table,
                 state,
    -            wcu_tc_delta=900_000,
    +            wcu_tk_milli=100_000,  # 10% remaining < 20% threshold
                 wcu_capacity_milli=1_000_000,
             )
             assert result is True
    
  • tests/unit/test_limiter.py+41 0 modified
    @@ -2115,6 +2115,47 @@ async def test_get_resource_capacity_empty_result(self, limiter):
             assert len(capacity.entities) == 0
             assert capacity.utilization_pct == 0.0
     
    +    @pytest.mark.asyncio
    +    async def test_get_resource_capacity_sharded_entity_deduplication(self, limiter):
    +        """Sharded entities should report capacity once, not per-shard."""
    +        from zae_limiter import schema
    +
    +        await limiter.create_entity("sharded-user")
    +        limits = [Limit.per_minute("rpm", 100)]
    +
    +        async with limiter.acquire("sharded-user", "gpt-4", {"rpm": 10}, limits=limits):
    +            pass
    +
    +        # Manually create shard 1 to simulate sharding
    +        repo = limiter._repository
    +        client = await repo._get_client()
    +        shard0_key = {
    +            "PK": {"S": schema.pk_bucket(repo._namespace_id, "sharded-user", "gpt-4", 0)},
    +            "SK": {"S": schema.sk_state()},
    +        }
    +        shard0_resp = await client.get_item(TableName=repo.table_name, Key=shard0_key)
    +        shard0_item = shard0_resp["Item"]
    +
    +        shard1_item = dict(shard0_item)
    +        shard1_item["PK"] = {"S": schema.pk_bucket(repo._namespace_id, "sharded-user", "gpt-4", 1)}
    +        shard1_item["GSI2SK"] = {"S": schema.gsi2_sk_bucket("sharded-user", 1)}
    +        shard1_item["GSI3SK"] = {"S": schema.gsi3_sk_bucket("gpt-4", 1)}
    +        shard1_item["shard_count"] = {"N": "2"}
    +        await client.update_item(
    +            TableName=repo.table_name,
    +            Key=shard0_key,
    +            UpdateExpression="SET shard_count = :sc",
    +            ExpressionAttributeValues={":sc": {"N": "2"}},
    +        )
    +        await client.put_item(TableName=repo.table_name, Item=shard1_item)
    +
    +        capacity = await limiter.get_resource_capacity("gpt-4", "rpm")
    +
    +        assert capacity.total_capacity == 100  # Not 200
    +        assert len(capacity.entities) == 1  # Not 2
    +        assert capacity.entities[0].entity_id == "sharded-user"
    +        assert capacity.entities[0].capacity == 100
    +
     
     class TestRateLimiterCapacityEdgeCases:
         """Tests for edge cases in capacity calculations."""
    
  • tests/unit/test_processor.py+282 26 modified
    @@ -1764,7 +1764,11 @@ def test_try_refill_bucket_old_pk_shard_0(self) -> None:
     
     
     class TestTryProactiveShard:
    -    """Tests for try_proactive_shard function."""
    +    """Tests for try_proactive_shard function.
    +
    +    Uses wcu token level (remaining / capacity) as the sharding signal.
    +    Triggers when tokens < 20% of capacity (WCU_PROACTIVE_THRESHOLD_LOW).
    +    """
     
         def _make_state(
             self,
    @@ -1780,14 +1784,14 @@ def _make_state(
                 rf_ms=1704067200000,
             )
     
    -    def test_triggers_at_threshold(self) -> None:
    -        """When wcu tc_delta >= 80% of capacity, aggregator bumps shard_count."""
    +    def test_triggers_at_low_token_level(self) -> None:
    +        """Proactive sharding triggers when wcu tokens < 20% of capacity."""
             mock_table = MagicMock()
             state = self._make_state()
    -        wcu_tc_delta = 900_000  # 90% > 80% threshold
    -        wcu_capacity_milli = 1000_000
    +        wcu_tk_milli = 150_000  # 15% remaining < 20% threshold
    +        wcu_capacity_milli = 1_000_000
     
    -        result = try_proactive_shard(mock_table, state, wcu_tc_delta, wcu_capacity_milli)
    +        result = try_proactive_shard(mock_table, state, wcu_tk_milli, wcu_capacity_milli)
     
             assert result is True
             mock_table.update_item.assert_called_once()
    @@ -1796,14 +1800,14 @@ def test_triggers_at_threshold(self) -> None:
             assert call_kwargs["ExpressionAttributeValues"][":new"] == 2
             assert call_kwargs["ConditionExpression"] == "shard_count = :old"
     
    -    def test_skips_below_threshold(self) -> None:
    -        """Below threshold, no sharding."""
    +    def test_skips_above_threshold(self) -> None:
    +        """No sharding when wcu tokens >= 20% of capacity."""
             mock_table = MagicMock()
             state = self._make_state()
    -        wcu_tc_delta = 500_000  # 50% < 80%
    -        wcu_capacity_milli = 1000_000
    +        wcu_tk_milli = 250_000  # 25% remaining >= 20% threshold
    +        wcu_capacity_milli = 1_000_000
     
    -        result = try_proactive_shard(mock_table, state, wcu_tc_delta, wcu_capacity_milli)
    +        result = try_proactive_shard(mock_table, state, wcu_tk_milli, wcu_capacity_milli)
     
             assert result is False
             mock_table.update_item.assert_not_called()
    @@ -1812,10 +1816,8 @@ def test_skips_non_shard_0(self) -> None:
             """Only shard 0 can be bumped."""
             mock_table = MagicMock()
             state = self._make_state(shard_id=1, shard_count=2)
    -        wcu_tc_delta = 900_000
    -        wcu_capacity_milli = 1000_000
     
    -        result = try_proactive_shard(mock_table, state, wcu_tc_delta, wcu_capacity_milli)
    +        result = try_proactive_shard(mock_table, state, 100_000, 1_000_000)
     
             assert result is False
             mock_table.update_item.assert_not_called()
    @@ -1829,7 +1831,7 @@ def test_conditional_check_failure_returns_false(self) -> None:
             )
             state = self._make_state()
     
    -        result = try_proactive_shard(mock_table, state, 900_000, 1000_000)
    +        result = try_proactive_shard(mock_table, state, 100_000, 1_000_000)
     
             assert result is False
     
    @@ -1838,11 +1840,83 @@ def test_zero_capacity_skips(self) -> None:
             mock_table = MagicMock()
             state = self._make_state()
     
    -        result = try_proactive_shard(mock_table, state, 900_000, 0)
    +        result = try_proactive_shard(mock_table, state, 100_000, 0)
     
             assert result is False
             mock_table.update_item.assert_not_called()
     
    +    def test_triggers_at_zero_tokens(self) -> None:
    +        """Proactive sharding triggers when wcu tokens are completely exhausted."""
    +        mock_table = MagicMock()
    +        state = self._make_state()
    +
    +        result = try_proactive_shard(mock_table, state, 0, 1_000_000)
    +
    +        assert result is True
    +
    +    def test_boundary_at_exactly_20_percent(self) -> None:
    +        """At exactly 20% remaining, no sharding (threshold is strictly less than)."""
    +        mock_table = MagicMock()
    +        state = self._make_state()
    +
    +        result = try_proactive_shard(mock_table, state, 200_000, 1_000_000)
    +
    +        assert result is False
    +        mock_table.update_item.assert_not_called()
    +
    +    def test_negative_tokens_triggers(self) -> None:
    +        """Negative tokens (overdrawn wcu) trigger sharding."""
    +        mock_table = MagicMock()
    +        state = self._make_state()
    +
    +        result = try_proactive_shard(mock_table, state, -50_000, 1_000_000)
    +
    +        assert result is True
    +
    +    def test_one_millitoken_below_threshold(self) -> None:
    +        """Off-by-one: 19.9999% < 20% triggers sharding."""
    +        mock_table = MagicMock()
    +        state = self._make_state()
    +
    +        result = try_proactive_shard(mock_table, state, 199_999, 1_000_000)
    +
    +        assert result is True
    +
    +    def test_doubles_shard_count_from_2(self) -> None:
    +        """Verify shard_count * 2 with existing count=2."""
    +        mock_table = MagicMock()
    +        state = self._make_state(shard_count=2)
    +
    +        result = try_proactive_shard(mock_table, state, 100_000, 1_000_000)
    +
    +        assert result is True
    +        call_kwargs = mock_table.update_item.call_args[1]
    +        assert call_kwargs["ExpressionAttributeValues"][":new"] == 4
    +
    +    def test_warns_above_threshold(self, capsys) -> None:
    +        """Proactive sharding logs warning when new count exceeds threshold."""
    +        mock_table = MagicMock()
    +        state = self._make_state(shard_id=0, shard_count=32)
    +
    +        result = try_proactive_shard(mock_table, state, 100_000, 1_000_000)
    +
    +        assert result is True
    +        captured = capsys.readouterr().out
    +        assert '"level": "WARNING"' in captured
    +        assert "High shard count" in captured
    +        assert '"shard_count": 64' in captured
    +
    +    def test_batch_size_independent(self) -> None:
    +        """Token level is invariant to BatchSize (unlike tc_delta)."""
    +        mock_table = MagicMock()
    +        state = self._make_state()
    +
    +        # With old tc_delta metric, BatchSize=100 could only reach 10%.
    +        # With token level, we check remaining tokens regardless of batch size.
    +        result = try_proactive_shard(mock_table, state, 100_000, 1_000_000)
    +
    +        assert result is True  # 10% remaining < 20% threshold
    +
     
     class TestPropagateShardsCount:
         """Tests for propagate_shard_count function."""
    @@ -1872,7 +1946,10 @@ def _make_shard_change_record(
             }
     
         def test_propagate_on_change(self) -> None:
    -        """When shard_count changes, propagate to other shards."""
    +        """When shard_count changes, propagate to other shards.
    +
    +        Existing shards (1) get UpdateItem, new shards (2, 3) get PutItem.
    +        """
             mock_table = MagicMock()
             record = self._make_shard_change_record(
                 old_shard_count=2,
    @@ -1881,13 +1958,15 @@ def test_propagate_on_change(self) -> None:
     
             propagated = propagate_shard_count(mock_table, record)
     
    -        assert propagated == 3  # Updated shards 1, 2, 3 (not 0)
    -        calls = mock_table.update_item.call_args_list
    -        updated_pks = {c[1]["Key"]["PK"] for c in calls}
    -        assert "ns1/BUCKET#user-1#gpt-4#1" in updated_pks
    -        assert "ns1/BUCKET#user-1#gpt-4#2" in updated_pks
    -        assert "ns1/BUCKET#user-1#gpt-4#3" in updated_pks
    -        assert "ns1/BUCKET#user-1#gpt-4#0" not in updated_pks
    +        assert propagated == 3  # 1 updated + 2 created (not shard 0)
    +        # Existing shard 1 updated via UpdateItem
    +        update_pks = {c[1]["Key"]["PK"] for c in mock_table.update_item.call_args_list}
    +        assert "ns1/BUCKET#user-1#gpt-4#1" in update_pks
    +        assert "ns1/BUCKET#user-1#gpt-4#0" not in update_pks
    +        # New shards 2, 3 created via PutItem
    +        put_pks = {c.kwargs["Item"]["PK"] for c in mock_table.put_item.call_args_list}
    +        assert "ns1/BUCKET#user-1#gpt-4#2" in put_pks
    +        assert "ns1/BUCKET#user-1#gpt-4#3" in put_pks
     
         def test_no_change_no_propagation(self) -> None:
             """No propagation when shard_count unchanged."""
    @@ -1904,11 +1983,13 @@ def test_no_change_no_propagation(self) -> None:
     
         def test_conditional_prevents_downgrade(self) -> None:
             """Conditional write prevents overwriting a higher shard_count."""
    -        mock_table = MagicMock()
    -        mock_table.update_item.side_effect = ClientError(
    +        conditional_error = ClientError(
                 {"Error": {"Code": "ConditionalCheckFailedException", "Message": ""}},
                 "UpdateItem",
             )
    +        mock_table = MagicMock()
    +        mock_table.update_item.side_effect = conditional_error
    +        mock_table.put_item.side_effect = conditional_error
             record = self._make_shard_change_record(
                 old_shard_count=2,
                 new_shard_count=4,
    @@ -1943,3 +2024,178 @@ def test_skips_non_bucket_pk(self) -> None:
     
             assert propagated == 0
             mock_table.update_item.assert_not_called()
    +
    +    def test_creates_full_items_for_new_shards(self) -> None:
    +        """New shards get full items cloned from shard 0's NewImage."""
    +        record = {
    +            "eventName": "MODIFY",
    +            "dynamodb": {
    +                "NewImage": {
    +                    "PK": {"S": "ns1/BUCKET#user-1#gpt-4#0"},
    +                    "SK": {"S": "#STATE"},
    +                    "shard_count": {"N": "4"},
    +                    "entity_id": {"S": "user-1"},
    +                    "GSI2PK": {"S": "ns1/RESOURCE#gpt-4"},
    +                    "GSI2SK": {"S": "BUCKET#user-1#0"},
    +                    "GSI3PK": {"S": "ns1/ENTITY#user-1"},
    +                    "GSI3SK": {"S": "BUCKET#gpt-4#0"},
    +                    "GSI4PK": {"S": "ns1"},
    +                    "GSI4SK": {"S": "BUCKET#user-1#gpt-4#0"},
    +                    "b_rpm_tk": {"N": "50000000"},
    +                    "b_rpm_cp": {"N": "100000000"},
    +                    "b_rpm_ra": {"N": "100000000"},
    +                    "b_rpm_rp": {"N": "60000"},
    +                    "b_rpm_tc": {"N": "0"},
    +                    "b_wcu_tk": {"N": "1000000"},
    +                    "b_wcu_cp": {"N": "1000000"},
    +                    "b_wcu_ra": {"N": "1000000"},
    +                    "b_wcu_rp": {"N": "1000"},
    +                    "b_wcu_tc": {"N": "500"},
    +                    "cascade": {"BOOL": False},
    +                    "rf": {"N": "1000"},
    +                },
    +                "OldImage": {
    +                    "PK": {"S": "ns1/BUCKET#user-1#gpt-4#0"},
    +                    "SK": {"S": "#STATE"},
    +                    "shard_count": {"N": "2"},
    +                },
    +            },
    +        }
    +        mock_table = MagicMock()
    +        propagated = propagate_shard_count(mock_table, record)
    +
    +        # Shard 1 is EXISTING (update only), shards 2 and 3 are NEW (put)
    +        update_calls = mock_table.update_item.call_args_list
    +        put_calls = mock_table.put_item.call_args_list
    +
    +        assert len(update_calls) == 1  # shard 1
    +        assert "shard_count < :new" in update_calls[0].kwargs["ConditionExpression"]
    +
    +        assert len(put_calls) == 2  # shards 2 and 3
    +        for put_call in put_calls:
    +            item = put_call.kwargs["Item"]
    +            assert "GSI3PK" in item
    +            assert "GSI2PK" in item
    +            assert "b_rpm_tk" in item
    +            assert "b_wcu_tk" in item
    +
    +        new_pks = {c.kwargs["Item"]["PK"] for c in put_calls}
    +        assert "ns1/BUCKET#user-1#gpt-4#2" in new_pks
    +        assert "ns1/BUCKET#user-1#gpt-4#3" in new_pks
    +
    +        assert propagated == 3  # 1 updated + 2 created
    +
    +    def test_existing_shards_not_put(self) -> None:
    +        """Existing shards get UpdateItem, not PutItem."""
    +        record = {
    +            "eventName": "MODIFY",
    +            "dynamodb": {
    +                "NewImage": {
    +                    "PK": {"S": "ns1/BUCKET#user-1#gpt-4#0"},
    +                    "SK": {"S": "#STATE"},
    +                    "shard_count": {"N": "4"},
    +                    "entity_id": {"S": "user-1"},
    +                    "b_rpm_tk": {"N": "50000000"},
    +                    "b_rpm_cp": {"N": "100000000"},
    +                    "b_rpm_ra": {"N": "100000000"},
    +                    "b_rpm_rp": {"N": "60000"},
    +                    "b_rpm_tc": {"N": "0"},
    +                    "cascade": {"BOOL": False},
    +                    "rf": {"N": "1000"},
    +                },
    +                "OldImage": {
    +                    "PK": {"S": "ns1/BUCKET#user-1#gpt-4#0"},
    +                    "SK": {"S": "#STATE"},
    +                    "shard_count": {"N": "2"},
    +                },
    +            },
    +        }
    +        mock_table = MagicMock()
    +        propagate_shard_count(mock_table, record)
    +
    +        # Shard 1 is existing — UpdateItem, not PutItem
    +        update_pks = {c.kwargs["Key"]["PK"] for c in mock_table.update_item.call_args_list}
    +        assert "ns1/BUCKET#user-1#gpt-4#1" in update_pks
    +
    +        put_pks = {c.kwargs["Item"]["PK"] for c in mock_table.put_item.call_args_list}
    +        assert "ns1/BUCKET#user-1#gpt-4#1" not in put_pks
    +
    +    def test_new_shard_skips_if_client_created(self) -> None:
    +        """PutItem with attribute_not_exists(PK) skips client-created shards."""
    +        mock_table = MagicMock()
    +        mock_table.put_item.side_effect = ClientError(
    +            {"Error": {"Code": "ConditionalCheckFailedException", "Message": ""}},
    +            "PutItem",
    +        )
    +        record = {
    +            "eventName": "MODIFY",
    +            "dynamodb": {
    +                "NewImage": {
    +                    "PK": {"S": "ns1/BUCKET#user-1#gpt-4#0"},
    +                    "SK": {"S": "#STATE"},
    +                    "shard_count": {"N": "2"},
    +                    "entity_id": {"S": "user-1"},
    +                    "b_rpm_tk": {"N": "50000000"},
    +                    "b_rpm_cp": {"N": "100000000"},
    +                    "b_rpm_ra": {"N": "100000000"},
    +                    "b_rpm_rp": {"N": "60000"},
    +                    "b_rpm_tc": {"N": "0"},
    +                    "cascade": {"BOOL": False},
    +                    "rf": {"N": "1000"},
    +                },
    +                "OldImage": {
    +                    "PK": {"S": "ns1/BUCKET#user-1#gpt-4#0"},
    +                    "SK": {"S": "#STATE"},
    +                    "shard_count": {"N": "1"},
    +                },
    +            },
    +        }
    +
    +        propagated = propagate_shard_count(mock_table, record)
    +        assert propagated == 0  # Client already created it
    +
    +    def test_new_shard_tokens_set_to_effective_capacity(self) -> None:
    +        """New shard tokens are set to effective per-shard capacity."""
    +        record = {
    +            "eventName": "MODIFY",
    +            "dynamodb": {
    +                "NewImage": {
    +                    "PK": {"S": "ns1/BUCKET#user-1#gpt-4#0"},
    +                    "SK": {"S": "#STATE"},
    +                    "shard_count": {"N": "2"},
    +                    "entity_id": {"S": "user-1"},
    +                    "b_rpm_tk": {"N": "50000000"},
    +                    "b_rpm_cp": {"N": "100000000"},
    +                    "b_rpm_ra": {"N": "100000000"},
    +                    "b_rpm_rp": {"N": "60000"},
    +                    "b_rpm_tc": {"N": "5000"},
    +                    "b_wcu_tk": {"N": "500000"},
    +                    "b_wcu_cp": {"N": "1000000"},
    +                    "b_wcu_ra": {"N": "1000000"},
    +                    "b_wcu_rp": {"N": "1000"},
    +                    "b_wcu_tc": {"N": "200"},
    +                    "cascade": {"BOOL": False},
    +                    "rf": {"N": "1000"},
    +                },
    +                "OldImage": {
    +                    "PK": {"S": "ns1/BUCKET#user-1#gpt-4#0"},
    +                    "SK": {"S": "#STATE"},
    +                    "shard_count": {"N": "1"},
    +                },
    +            },
    +        }
    +        mock_table = MagicMock()
    +        propagate_shard_count(mock_table, record)
    +
    +        put_calls = mock_table.put_item.call_args_list
    +        assert len(put_calls) == 1  # shard 1 is new
    +        item = put_calls[0].kwargs["Item"]
    +
    +        # rpm tokens = effective capacity = 100000000 // 2 = 50000000
    +        assert item["b_rpm_tk"] == 50000000
    +        # rpm tc reset to 0
    +        assert item["b_rpm_tc"] == 0
    +        # wcu tokens = full capacity (not divided) = 1000000
    +        assert item["b_wcu_tk"] == 1000000
    +        # wcu tc reset to 0
    +        assert item["b_wcu_tc"] == 0
    
  • tests/unit/test_repository.py+67 0 modified
    @@ -10,6 +10,7 @@
     from zae_limiter.exceptions import EntityExistsError, InvalidIdentifierError
     from zae_limiter.models import BucketState
     from zae_limiter.repository import Repository
    +from zae_limiter.repository_protocol import SpeculativeFailureReason
     from zae_limiter.schema import (
         calculate_bucket_ttl,
         limit_attr,
    @@ -2830,6 +2831,49 @@ async def test_speculative_missing_item(self, repo):
             result = await repo.speculative_consume("nonexistent", "gpt-4", {"rpm": 1})
             assert result.success is False
             assert result.old_buckets is None
    +        assert result.failure_reason == SpeculativeFailureReason.BUCKET_MISSING
    +
    +    @pytest.mark.asyncio
    +    async def test_speculative_failure_reason_app_limit_exhausted(self, repo):
    +        """Failure reason is APP_LIMIT_EXHAUSTED when user limit exhausted but wcu ok."""
    +        now_ms = int(time.time() * 1000)
    +        limits = [Limit.per_minute("rpm", 10)]
    +        state = BucketState.from_limit("e1", "gpt-4", limits[0], now_ms)
    +
    +        put_item = repo.build_composite_create("e1", "gpt-4", [state], now_ms)
    +        await repo.transact_write([put_item])
    +
    +        # Exhaust rpm tokens
    +        result = await repo.speculative_consume("e1", "gpt-4", {"rpm": 10})
    +        assert result.success is True
    +
    +        # Next attempt should fail with APP_LIMIT_EXHAUSTED (wcu still has tokens)
    +        result = await repo.speculative_consume("e1", "gpt-4", {"rpm": 5})
    +        assert result.success is False
    +        assert result.failure_reason == SpeculativeFailureReason.APP_LIMIT_EXHAUSTED
    +
    +    async def test_speculative_failure_reason_both_exhausted(self, repo):
    +        """Failure reason is BOTH_EXHAUSTED when both wcu and app limit exhausted."""
    +        now_ms = int(time.time() * 1000)
    +        # Use a very small capacity so both wcu and rpm exhaust quickly
    +        limits = [Limit.per_minute("rpm", 1)]
    +        state = BucketState.from_limit("e1", "gpt-4", limits[0], now_ms)
    +
    +        put_item = repo.build_composite_create("e1", "gpt-4", [state], now_ms)
    +        await repo.transact_write([put_item])
    +
    +        # First consume exhausts the 1 rpm token AND uses 1 wcu
    +        result = await repo.speculative_consume("e1", "gpt-4", {"rpm": 1})
    +        assert result.success is True
    +
    +        # Consume 999 more to exhaust the 1000 wcu capacity
    +        for _ in range(999):
    +            await repo.speculative_consume("e1", "gpt-4", {"rpm": 0})
    +
    +        # Now both rpm (0 tokens) and wcu (0 tokens) should be exhausted
    +        result = await repo.speculative_consume("e1", "gpt-4", {"rpm": 1})
    +        assert result.success is False
    +        assert result.failure_reason == SpeculativeFailureReason.BOTH_EXHAUSTED
     
         async def test_speculative_with_ttl(self, repo):
             """Speculative consume handles TTL correctly."""
    @@ -3667,3 +3711,26 @@ async def test_bump_shard_count_reraises_other_errors(self, repo):
                 assert (
                     exc_info.value.response["Error"]["Code"] == "ProvisionedThroughputExceededException"
                 )
    +
    +    @pytest.mark.asyncio
    +    async def test_bump_shard_count_warns_above_threshold(self, repo, caplog):
    +        """bump_shard_count logs warning when new count exceeds threshold."""
    +        import logging
    +
    +        now_ms = int(time.time() * 1000)
    +        limits = [Limit.per_minute("rpm", 100_000)]
    +        states = [BucketState.from_limit("e1", "gpt-4", lim, now_ms) for lim in limits]
    +        put_item = repo.build_composite_create(
    +            "e1", "gpt-4", states, now_ms, shard_id=0, shard_count=32
    +        )
    +        await repo.transact_write([put_item])
    +
    +        with caplog.at_level(logging.WARNING, logger="zae_limiter.repository"):
    +            result = await repo.bump_shard_count("e1", "gpt-4", current_count=32)
    +
    +        assert result == 64
    +        assert any(
    +            "shard count" in r.message.lower() and "64" in str(r.message)
    +            for r in caplog.records
    +            if r.levelno >= logging.WARNING
    +        )
    
  • tests/unit/test_schema.py+26 0 modified
    @@ -415,6 +415,29 @@ def test_gsi3_sk_bucket(self):
             assert schema.gsi3_sk_bucket("gpt-4", 0) == "BUCKET#gpt-4#0"
             assert schema.gsi3_sk_bucket("gpt-4", 3) == "BUCKET#gpt-4#3"
     
    +    @pytest.mark.parametrize(
    +        "resource",
    +        [
    +            "gpt-4",  # hyphen
    +            "gpt_4",  # underscore
    +            "gpt-3.5-turbo",  # dot
    +            "openai/gpt-4",  # slash (provider/model)
    +            "anthropic/claude-3/opus",  # nested slash
    +        ],
    +    )
    +    def test_parse_bucket_pk_round_trip(self, resource):
    +        """pk_bucket -> parse_bucket_pk round-trips for all valid resource chars."""
    +        pk = schema.pk_bucket("ns1", "user-1", resource, 0)
    +        ns, entity, res, shard = schema.parse_bucket_pk(pk)
    +        assert ns == "ns1"
    +        assert entity == "user-1"
    +        assert res == resource
    +        assert shard == 0
    +
    +    def test_gsi4_sk_bucket(self):
    +        assert schema.gsi4_sk_bucket("user-1", "gpt-4", 0) == "BUCKET#user-1#gpt-4#0"
    +        assert schema.gsi4_sk_bucket("user-1", "gpt-4", 3) == "BUCKET#user-1#gpt-4#3"
    +
         def test_gsi2_sk_bucket_with_shard(self):
             assert gsi2_sk_bucket("user-1", 0) == "BUCKET#user-1#0"
             assert gsi2_sk_bucket("user-1", 3) == "BUCKET#user-1#3"
    @@ -428,3 +451,6 @@ def test_wcu_limit_constants(self):
             assert schema.WCU_LIMIT_CAPACITY == 1000
             assert schema.WCU_LIMIT_REFILL_AMOUNT == 1000
             assert schema.WCU_LIMIT_REFILL_PERIOD_SECONDS == 1
    +
    +    def test_wcu_shard_warn_threshold_constant(self):
    +        assert schema.WCU_SHARD_WARN_THRESHOLD == 32
    
  • tests/unit/test_sync_limiter.py+34 0 modified
    @@ -1499,6 +1499,40 @@ def test_get_resource_capacity_empty_result(self, sync_limiter):
             assert len(capacity.entities) == 0
             assert capacity.utilization_pct == 0.0
     
    +    def test_get_resource_capacity_sharded_entity_deduplication(self, sync_limiter):
    +        """Sharded entities should report capacity once, not per-shard."""
    +        from zae_limiter import schema
    +
    +        sync_limiter.create_entity("sharded-user")
    +        limits = [Limit.per_minute("rpm", 100)]
    +        with sync_limiter.acquire("sharded-user", "gpt-4", {"rpm": 10}, limits=limits):
    +            pass
    +        repo = sync_limiter._repository
    +        client = repo._get_client()
    +        shard0_key = {
    +            "PK": {"S": schema.pk_bucket(repo._namespace_id, "sharded-user", "gpt-4", 0)},
    +            "SK": {"S": schema.sk_state()},
    +        }
    +        shard0_resp = client.get_item(TableName=repo.table_name, Key=shard0_key)
    +        shard0_item = shard0_resp["Item"]
    +        shard1_item = dict(shard0_item)
    +        shard1_item["PK"] = {"S": schema.pk_bucket(repo._namespace_id, "sharded-user", "gpt-4", 1)}
    +        shard1_item["GSI2SK"] = {"S": schema.gsi2_sk_bucket("sharded-user", 1)}
    +        shard1_item["GSI3SK"] = {"S": schema.gsi3_sk_bucket("gpt-4", 1)}
    +        shard1_item["shard_count"] = {"N": "2"}
    +        client.update_item(
    +            TableName=repo.table_name,
    +            Key=shard0_key,
    +            UpdateExpression="SET shard_count = :sc",
    +            ExpressionAttributeValues={":sc": {"N": "2"}},
    +        )
    +        client.put_item(TableName=repo.table_name, Item=shard1_item)
    +        capacity = sync_limiter.get_resource_capacity("gpt-4", "rpm")
    +        assert capacity.total_capacity == 100
    +        assert len(capacity.entities) == 1
    +        assert capacity.entities[0].entity_id == "sharded-user"
    +        assert capacity.entities[0].capacity == 100
    +
     
     class TestRateLimiterCapacityEdgeCases:
         """Tests for edge cases in capacity calculations."""
    
  • tests/unit/test_sync_repository.py+50 0 modified
    @@ -24,6 +24,7 @@
         sk_config,
     )
     from zae_limiter.sync_repository import SyncRepository
    +from zae_limiter.sync_repository_protocol import SpeculativeFailureReason
     
     
     @pytest.fixture
    @@ -2056,6 +2057,35 @@ def test_speculative_missing_item(self, repo):
             result = repo.speculative_consume("nonexistent", "gpt-4", {"rpm": 1})
             assert result.success is False
             assert result.old_buckets is None
    +        assert result.failure_reason == SpeculativeFailureReason.BUCKET_MISSING
    +
    +    def test_speculative_failure_reason_app_limit_exhausted(self, repo):
    +        """Failure reason is APP_LIMIT_EXHAUSTED when user limit exhausted but wcu ok."""
    +        now_ms = int(time.time() * 1000)
    +        limits = [Limit.per_minute("rpm", 10)]
    +        state = BucketState.from_limit("e1", "gpt-4", limits[0], now_ms)
    +        put_item = repo.build_composite_create("e1", "gpt-4", [state], now_ms)
    +        repo.transact_write([put_item])
    +        result = repo.speculative_consume("e1", "gpt-4", {"rpm": 10})
    +        assert result.success is True
    +        result = repo.speculative_consume("e1", "gpt-4", {"rpm": 5})
    +        assert result.success is False
    +        assert result.failure_reason == SpeculativeFailureReason.APP_LIMIT_EXHAUSTED
    +
    +    def test_speculative_failure_reason_both_exhausted(self, repo):
    +        """Failure reason is BOTH_EXHAUSTED when both wcu and app limit exhausted."""
    +        now_ms = int(time.time() * 1000)
    +        limits = [Limit.per_minute("rpm", 1)]
    +        state = BucketState.from_limit("e1", "gpt-4", limits[0], now_ms)
    +        put_item = repo.build_composite_create("e1", "gpt-4", [state], now_ms)
    +        repo.transact_write([put_item])
    +        result = repo.speculative_consume("e1", "gpt-4", {"rpm": 1})
    +        assert result.success is True
    +        for _ in range(999):
    +            repo.speculative_consume("e1", "gpt-4", {"rpm": 0})
    +        result = repo.speculative_consume("e1", "gpt-4", {"rpm": 1})
    +        assert result.success is False
    +        assert result.failure_reason == SpeculativeFailureReason.BOTH_EXHAUSTED
     
         def test_speculative_with_ttl(self, repo):
             """Speculative consume handles TTL correctly."""
    @@ -2711,3 +2741,23 @@ def test_bump_shard_count_reraises_other_errors(self, repo):
                 assert (
                     exc_info.value.response["Error"]["Code"] == "ProvisionedThroughputExceededException"
                 )
    +
    +    def test_bump_shard_count_warns_above_threshold(self, repo, caplog):
    +        """bump_shard_count logs warning when new count exceeds threshold."""
    +        import logging
    +
    +        now_ms = int(time.time() * 1000)
    +        limits = [Limit.per_minute("rpm", 100000)]
    +        states = [BucketState.from_limit("e1", "gpt-4", lim, now_ms) for lim in limits]
    +        put_item = repo.build_composite_create(
    +            "e1", "gpt-4", states, now_ms, shard_id=0, shard_count=32
    +        )
    +        repo.transact_write([put_item])
    +        with caplog.at_level(logging.WARNING, logger="zae_limiter.sync_repository"):
    +            result = repo.bump_shard_count("e1", "gpt-4", current_count=32)
    +        assert result == 64
    +        assert any(
    +            "shard count" in r.message.lower() and "64" in str(r.message)
    +            for r in caplog.records
    +            if r.levelno >= logging.WARNING
    +        )
    

Vulnerability mechanics

Generated on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

5

News mentions

0

No linked articles in our index yet.