VYPR
Medium severity5.9GHSA Advisory· Published Jun 5, 2026

Klever-Go KVM: Throttler slot leak in trie account-data sync causes epoch bootstrap / state sync DoS

CVE-2026-49343

Description

Summary

The account-data trie syncers leak bounded throttler slots on error paths in syncDataTrie(). Each failed trie sync permanently consumes one slot from the NumGoRoutinesThrottler, and the slot is never returned unless the sync succeeds or the root hash was already present.

I confirmed this on the current default branch develop at commit 9640d63 (observed on May 20, 2026). I also confirmed the bug with a runtime PoC using the real timeout path in trieSyncer.StartSyncing(): two timed-out sync attempts are enough to exhaust a throttler with capacity 2.

This affects the epoch bootstrap path because syncUserAccountsState() and syncKappAccountsState() create bounded throttlers and abort bootstrap immediately if the syncer returns an error. Once enough trie-root sync attempts fail, the syncer cannot make forward progress and bootstrap fails.

Affected

Components

  • data/syncer/userAccountsSyncer.go
  • data/syncer/kappAccountsSyncer.go
  • data/trie/sync.go
  • core/throttler/numGoRoutinesThrottler.go
  • core/bootstrap/process.go

Affected

Version

Verified on: - develop HEAD 9640d63

Please check whether the same code is present in supported 1.7.x releases.

Suggested

Severity

High

Vulnerability

Details

Root

Cause

Both account-data syncers call StartProcessing() before creating / starting the trie syncer, but they only call EndProcessing() on the success path and on the duplicate-root early return.

userAccountsSyncer.syncDataTrie():

  func (u *userAccountsSyncer) syncDataTrie(rootHash []byte, ssh data.SyncStatisticsHandler, ctx context.Context) error {
      u.throttler.StartProcessing()

      u.syncerMutex.Lock()
      if _, ok := u.dataTries[string(rootHash)]; ok {
          u.syncerMutex.Unlock()
          u.throttler.EndProcessing()
          return nil
      }

      dataTrie, err := trie.NewTrie(...)
      if err != nil {
          u.syncerMutex.Unlock()
          return err
      }

      trieSyncer, err := trie.NewTrieSyncer(arg)
      if err != nil {
          u.syncerMutex.Unlock()
          return err
      }

      u.syncerMutex.Unlock()

      err = trieSyncer.StartSyncing(rootHash, ctx)
      if err != nil {
          return err
      }

      u.throttler.EndProcessing()
      return nil
  }

  The same bug exists in kappAccountsSyncer.syncDataTrie().

Missing slot release paths

After StartProcessing(), the following error paths return without EndProcessing():

  1. trie.NewTrie(...) returns an error
  2. trie.NewTrieSyncer(...) returns an error
  3. trieSyncer.StartSyncing(...) returns an error

Why this matters

NumGoRoutinesThrottler is a strict bounded counter: `` func (ngrt *NumGoRoutinesThrottler) CanProcess() bool { valCounter := atomic.LoadInt32(&ngrt.counter) return valCounter < ngrt.max } func (ngrt *NumGoRoutinesThrottler) StartProcessing() { atomic.AddInt32(&ngrt.counter, 1) } func (ngrt *NumGoRoutinesThrottler) EndProcessing() { atomic.AddInt32(&ngrt.counter, -1) } Once leaked, a slot remains consumed for the lifetime of that throttler instance. The parent loops in both syncers wait for capacity before starting the next account-data trie sync: for !u.throttler.CanProcess() { select { case <-time.After(timeBetweenRetries): continue case <-ctx.Done(): return common.ErrTimeIsOut } } ``

So after enough failures, further roots stop progressing and the sync operation eventually returns time is out.

Bootstrap impact

Epoch bootstrap uses these syncers directly and aborts on any error: `` err = e.syncUserAccountsState(e.epochStartMeta.Header.TrieRoot) if err != nil { return nil, nil, err } err = e.syncKappAccountsState(e.epochStartMeta.Header.KAppsTrieRoot) if err != nil { return nil, nil, err } ``

The throttlers for these paths are real bounded throttlers created from numConcurrentTrieSyncers.

Proof of

Concept

I verified the bug with the real timeout path, not only with a canceled context.

The PoC below uses:

  • a real NumGoRoutinesThrottler with capacity 2
  • a real trieSyncer.StartSyncing()
  • an empty trie-node cache and a request handler that never supplies nodes
  • a short sync timeout (1s) so StartSyncing() returns trie.ErrTimeIsOut

After the first failed sync, one slot remains leaked. After the second failed sync, the throttler is exhausted.

PoC test

  package syncer

  import (
        "context"
        "testing"
        "time"

        commonmock "github.com/klever-io/klever-go/common/mock"
        corethrottler "github.com/klever-io/klever-go/core/throttler"
        "github.com/klever-io/klever-go/data"
        "github.com/klever-io/klever-go/data/trie"
        triestats "github.com/klever-io/klever-go/data/trie/statistics"
        "github.com/stretchr/testify/require"
  )

  func newBaseSyncerForTimeoutPOC(t *testing.T) *baseAccountsSyncer {
        t.Helper()

        storageManager, err := trie.NewTrieStorageManagerWithoutPruning(commonmock.NewMemDbMock())
        require.NoError(t, err)

        return &baseAccountsSyncer{
                hasher:                    commonmock.HasherMock{},
                marshalizer:               &commonmock.MarshalizerMock{},
                trieSyncers:               make(map[string]data.TrieSyncer),
                dataTries:                 make(map[string]data.Trie),
                trieStorageManager:        storageManager,
                requestHandler:            &commonmock.RequestHandlerStub{},
                timeout:                   time.Second,
                cacher:                    commonmock.NewCacherStub(),
                maxTrieLevelInMemory:      5,
                name:                      "timeout-poc",
                maxHardCapForMissingNodes: 1,
        }
  }

  func TestPOC_UserAccountsSyncer_LeaksThrottlerSlotOnTrieTimeout(t *testing.T) {
        thr, err := corethrottler.NewNumGoRoutinesThrottler(2)
        require.NoError(t, err)

        s := &userAccountsSyncer{
                baseAccountsSyncer: newBaseSyncerForTimeoutPOC(t),
                throttler:          thr,
        }

        err = s.syncDataTrie([]byte("missing-root-1"), triestats.NewTrieSyncStatistics(), context.Background())
        require.ErrorIs(t, err, trie.ErrTimeIsOut)
        require.True(t, thr.CanProcess())

        err = s.syncDataTrie([]byte("missing-root-2"), triestats.NewTrieSyncStatistics(), context.Background())
        require.ErrorIs(t, err, trie.ErrTimeIsOut)
        require.False(t, thr.CanProcess())
  }

  func TestPOC_KappAccountsSyncer_LeaksThrottlerSlotOnTrieTimeout(t *testing.T) {
        thr, err := corethrottler.NewNumGoRoutinesThrottler(2)
        require.NoError(t, err)

        s := &kappAccountsSyncer{
                baseAccountsSyncer: newBaseSyncerForTimeoutPOC(t),
                throttler:          thr,
        }

        err = s.syncDataTrie([]byte("missing-root-1"), triestats.NewTrieSyncStatistics(), context.Background())
        require.ErrorIs(t, err, trie.ErrTimeIsOut)
        require.True(t, thr.CanProcess())

        err = s.syncDataTrie([]byte("missing-root-2"), triestats.NewTrieSyncStatistics(), context.Background())
        require.ErrorIs(t, err, trie.ErrTimeIsOut)
        require.False(t, thr.CanProcess())
  }

Command used

  go test ./data/syncer -run 'TestPOC_(User|Kapp)AccountsSyncer_LeaksThrottlerSlotOnTrieTimeout' -count=1

Result

  ok    github.com/klever-io/klever-go/data/syncer      4.005s

This confirms the leak with the real timeout path from trieSyncer.StartSyncing().

Impact

An attacker who can repeatedly cause trie-node sync failures or timeouts during bootstrap can consume the bounded sync throttler until no capacity remains.

Once enough slots are leaked:

  • additional account-data trie sync attempts stop making progress
  • the parent loop waits until context timeout
  • SyncAccounts() fails
  • epoch bootstrap fails

This is a core node availability issue. It affects fresh/restarting nodes and validators that need to bootstrap or resync state.

This is not a theoretical issue:

  • StartSyncing() performs network-dependent trie-node retrieval
  • it already has explicit timeout / failure paths
  • the leaked throttler slots are confirmed by runtime PoC

Recommended

Fix

Release the slot with defer immediately after StartProcessing() and cancel the defer only if ownership is intentionally transferred, which is not the case here.

Example fix pattern: `` func (u *userAccountsSyncer) syncDataTrie(rootHash []byte, ssh data.SyncStatisticsHandler, ctx context.Context) error { u.throttler.StartProcessing() defer u.throttler.EndProcessing() u.syncerMutex.Lock() defer u.syncerMutex.Unlock() if _, ok := u.dataTries[string(rootHash)]; ok { return nil } dataTrie, err := trie.NewTrie(...) if err != nil { return err } trieSyncer, err := trie.NewTrieSyncer(arg) if err != nil { return err } u.trieSyncers[string(rootHash)] = trieSyncer return trieSyncer.StartSyncing(rootHash, ctx) } ``

The same pattern should be applied to:

  • data/syncer/userAccountsSyncer.go
  • data/syncer/kappAccountsSyncer.go

References

  • data/syncer/userAccountsSyncer.go
  • data/syncer/kappAccountsSyncer.go
  • data/trie/sync.go
  • core/throttler/numGoRoutinesThrottler.go
  • core/bootstrap/process.go
  • SECURITY.md

Affected products

1

Patches

4
785b77ccb271

[KLC-2434] bound REST slow-body reads and tighten header/body caps (GHSA-w4c6-7r69-w7j9)

https://github.com/klever-io/klever-goFernando SobreiraMay 31, 2026Fixed in 1.7.18via llm-release-walk
3 files changed · +270 35
  • cmd/seednode/api/api_test.go+38 0 modified
    @@ -1,7 +1,9 @@
     package api
     
     import (
    +	"bufio"
     	"encoding/json"
    +	"net"
     	"net/http"
     	"net/http/httptest"
     	"strings"
    @@ -10,6 +12,7 @@ import (
     
     	"github.com/gin-gonic/gin"
     	"github.com/klever-io/klever-go/core"
    +	"github.com/klever-io/klever-go/network/api/httpserver"
     )
     
     type stubMessenger struct {
    @@ -45,6 +48,41 @@ func setup(t *testing.T) (*gin.Engine, *stubMessenger) {
     	return r, stub
     }
     
    +// TestSeednodeAPI_HardenedServerDropsSlowHeader serves the real seednode routes through
    +// NewHardenedServer and confirms the seednode listener (the reporter's PoC path) drops a
    +// slow-header connection — GHSA-w4c6-7r69-w7j9, verified end-to-end, not just wired.
    +func TestSeednodeAPI_HardenedServerDropsSlowHeader(t *testing.T) {
    +	r, _ := setup(t)
    +
    +	ln, err := net.Listen("tcp", "127.0.0.1:0")
    +	if err != nil {
    +		t.Fatalf("listen: %v", err)
    +	}
    +
    +	srv := httpserver.NewHardenedServer(ln.Addr().String(), r.Handler())
    +	srv.ReadHeaderTimeout = 200 * time.Millisecond // tighten for a fast test
    +	go func() { _ = srv.Serve(ln) }()
    +	defer func() { _ = srv.Close() }()
    +
    +	conn, err := net.Dial("tcp", ln.Addr().String())
    +	if err != nil {
    +		t.Fatalf("dial: %v", err)
    +	}
    +	defer func() { _ = conn.Close() }()
    +
    +	// Partial header, never terminated: the seednode listener must drop it.
    +	if _, err := conn.Write([]byte("GET /node/status HTTP/1.1\r\nHost: x\r\n")); err != nil {
    +		t.Fatalf("write: %v", err)
    +	}
    +
    +	start := time.Now()
    +	_ = conn.SetReadDeadline(time.Now().Add(3 * time.Second))
    +	_, _ = bufio.NewReader(conn).ReadString('\n')
    +	if elapsed := time.Since(start); elapsed > time.Second {
    +		t.Fatalf("slow-header connection not dropped promptly: %v", elapsed)
    +	}
    +}
    +
     func TestPeers_returnsCountsAndSortedAddresses(t *testing.T) {
     	r, _ := setup(t)
     
    
  • network/api/httpserver/httpserver.go+74 33 modified
    @@ -1,61 +1,102 @@
    -// Package httpserver builds the hardened *http.Server shared by both REST start
    -// paths (seednode and node). Gin's Engine.Run uses http.ListenAndServe with no
    -// ReadHeaderTimeout, leaving it open to slow-header connection exhaustion
    -// (GHSA-w4c6-7r69-w7j9); this helper hardens both listeners identically.
    +// Package httpserver builds the hardened *http.Server shared by both REST start paths
    +// (seednode and node). Gin's Engine.Run uses http.ListenAndServe with no
    +// ReadHeaderTimeout, leaving it open to slow-header exhaustion (GHSA-w4c6-7r69-w7j9).
     package httpserver
     
     import (
    +	"io"
     	"net/http"
    +	"sync"
     	"time"
     )
     
     const (
    -	// ReadHeaderTimeout is the slow-header (slowloris) mitigation: it bounds the
    -	// time to send the complete header. Header-only, so it is safe for the
    -	// long-lived websocket streams these APIs serve (cleared before hijack).
    +	// ReadHeaderTimeout bounds header read time — the slow-header (slowloris) fix.
     	ReadHeaderTimeout = 10 * time.Second
     
    -	// IdleTimeout bounds how long an idle keep-alive connection stays open.
    +	// BodyReadTimeout bounds body read time — the slow-body fix.
    +	BodyReadTimeout = 30 * time.Second
    +
    +	// IdleTimeout bounds idle keep-alive connections.
     	IdleTimeout = 120 * time.Second
     
    -	// MaxHeaderBytes caps request header size (Go's default, set explicitly).
    -	MaxHeaderBytes = 1 << 20 // 1 MiB
    +	// MaxHeaderBytes caps total request header size (Go's 1 MiB default is a no-op).
    +	MaxHeaderBytes = 32 << 10 // 32 KiB
     
    -	// MaxBodyBytes caps the request body. A single tx is bounded by the ~960 KiB
    -	// P2P wire limit (~1.9 MiB once JSON-encoded), so 4 MiB covers the largest
    -	// legitimate request with margin; bulk /transaction/broadcast is additionally
    -	// bounded by an explicit tx count. Over-cap bodies are refused (400 on bind,
    -	// 413 raw). Bounds body size, not read time — see the body read-deadline follow-up.
    -	MaxBodyBytes = 4 << 20 // 4 MiB
    +	// MaxBodyBytes caps the request body; over-cap bodies are refused (400/413).
    +	MaxBodyBytes = 2 << 20 // 2 MiB
     )
     
    -// NewHardenedServer returns an *http.Server for addr serving handler, hardened
    -// against slow-header exhaustion and oversized bodies. ReadTimeout/WriteTimeout
    -// are left unset on purpose: a whole-connection deadline would sever the
    -// long-lived websocket streams these APIs serve (/log, /subscribe).
    +// NewHardenedServer returns an *http.Server hardened against slow-header/slow-body
    +// exhaustion. ReadTimeout/WriteTimeout are left unset: a whole-connection deadline
    +// would sever the long-lived websocket streams these APIs serve (/log, /subscribe).
     func NewHardenedServer(addr string, handler http.Handler) *http.Server {
     	return &http.Server{
     		Addr:              addr,
    -		Handler:           limitRequestBody(handler),
    +		Handler:           guardRequestBody(handler),
     		ReadHeaderTimeout: ReadHeaderTimeout,
     		IdleTimeout:       IdleTimeout,
     		MaxHeaderBytes:    MaxHeaderBytes,
     	}
     }
     
    -// limitRequestBody caps every request body at MaxBodyBytes.
    -func limitRequestBody(next http.Handler) http.Handler {
    -	return limitRequestBodyN(next, MaxBodyBytes)
    -}
    -
    -// limitRequestBodyN caps each request body at limit bytes. Applied ahead of gin so
    -// w is the *http.response MaxBytesReader needs to close an over-cap connection.
    -// Websocket upgrades hijack the connection and never read r.Body, so are unaffected.
    -func limitRequestBodyN(next http.Handler, limit int64) http.Handler {
    +// guardRequestBody caps body size (MaxBodyBytes) and body read time (BodyReadTimeout).
    +// Applied ahead of gin so w is the *http.response both mechanisms need.
    +func guardRequestBody(next http.Handler) http.Handler {
     	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
    -		if r.Body != nil {
    -			r.Body = http.MaxBytesReader(w, r.Body, limit)
    -		}
    +		capRequestBody(w, r, MaxBodyBytes)
    +		applyBodyReadDeadline(w, r, BodyReadTimeout)
     		next.ServeHTTP(w, r)
     	})
     }
    +
    +// capRequestBody limits r.Body to limit bytes.
    +func capRequestBody(w http.ResponseWriter, r *http.Request, limit int64) {
    +	if r.Body != nil {
    +		r.Body = http.MaxBytesReader(w, r.Body, limit)
    +	}
    +}
    +
    +// applyBodyReadDeadline bounds body read time at d, clearing the deadline once the body
    +// is read so it never cancels a slow post-read handler. Bodiless requests are skipped —
    +// that covers the websocket handshakes (/log, /subscribe), which are bodiless GETs, and
    +// leaves no Upgrade-header check for a body-bearing request to spoof.
    +func applyBodyReadDeadline(w http.ResponseWriter, r *http.Request, d time.Duration) {
    +	if r.Body == nil || !requestHasBody(r) {
    +		return
    +	}
    +	rc := http.NewResponseController(w)
    +	if rc.SetReadDeadline(time.Now().Add(d)) != nil {
    +		return
    +	}
    +	r.Body = &deadlineClearingBody{ReadCloser: r.Body, rc: rc}
    +}
    +
    +func requestHasBody(r *http.Request) bool {
    +	return r.ContentLength != 0 // 0 = no body; -1 (chunked) and positive = body
    +}
    +
    +// deadlineClearingBody clears the read deadline once the body read completes (EOF or
    +// error) or the body is closed, so the deadline never cancels a later slow handler.
    +type deadlineClearingBody struct {
    +	io.ReadCloser
    +	rc        *http.ResponseController
    +	clearOnce sync.Once
    +}
    +
    +func (b *deadlineClearingBody) clear() {
    +	b.clearOnce.Do(func() { _ = b.rc.SetReadDeadline(time.Time{}) })
    +}
    +
    +func (b *deadlineClearingBody) Read(p []byte) (int, error) {
    +	n, err := b.ReadCloser.Read(p)
    +	if err != nil {
    +		b.clear()
    +	}
    +	return n, err
    +}
    +
    +func (b *deadlineClearingBody) Close() error {
    +	b.clear()
    +	return b.ReadCloser.Close()
    +}
    
  • network/api/httpserver/httpserver_test.go+158 2 modified
    @@ -11,6 +11,7 @@ import (
     	"testing"
     	"time"
     
    +	"github.com/gorilla/websocket"
     	"github.com/stretchr/testify/require"
     )
     
    @@ -125,13 +126,14 @@ func TestNewHardenedServer_CapsRequestBody(t *testing.T) {
     
     	// Status carries the read outcome, so assertions read only the response code —
     	// no state shared across the server/test goroutines.
    -	handler := limitRequestBodyN(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
    +	handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
    +		capRequestBody(w, r, limit)
     		if _, err := io.ReadAll(r.Body); err != nil {
     			w.WriteHeader(http.StatusRequestEntityTooLarge)
     			return
     		}
     		w.WriteHeader(http.StatusOK)
    -	}), limit)
    +	})
     
     	srv := httptest.NewServer(handler)
     	defer srv.Close()
    @@ -148,3 +150,157 @@ func TestNewHardenedServer_CapsRequestBody(t *testing.T) {
     	_ = resp.Body.Close()
     	require.Equal(t, http.StatusOK, resp.StatusCode, "body within the cap must be accepted")
     }
    +
    +// TestApplyBodyReadDeadline_CutsSlowBody: a client that completes the header, promises
    +// a large body, then stalls is cut at ~BodyReadTimeout instead of pinning the connection.
    +func TestApplyBodyReadDeadline_CutsSlowBody(t *testing.T) {
    +	t.Parallel()
    +
    +	const deadline = 300 * time.Millisecond
    +
    +	readDone := make(chan error, 1)
    +	handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
    +		applyBodyReadDeadline(w, r, deadline)
    +		_, err := io.ReadAll(r.Body)
    +		readDone <- err
    +	})
    +
    +	srv := httptest.NewServer(handler)
    +	defer srv.Close()
    +
    +	conn, err := net.Dial("tcp", srv.Listener.Addr().String())
    +	require.Nil(t, err)
    +	defer func() { _ = conn.Close() }()
    +
    +	// Promise 1 MiB but send only a few bytes, then stall — the body read blocks.
    +	_, err = fmt.Fprint(conn, "POST / HTTP/1.1\r\nHost: x\r\nContent-Length: 1048576\r\n\r\npartial")
    +	require.Nil(t, err)
    +
    +	start := time.Now()
    +	select {
    +	case err := <-readDone:
    +		require.Error(t, err, "a stalled body read must be cut by the deadline")
    +		require.Less(t, time.Since(start), 2*time.Second, "body must be cut promptly (~BodyReadTimeout)")
    +	case <-time.After(2 * time.Second):
    +		t.Fatal("stalled body read was not cut by the deadline")
    +	}
    +}
    +
    +// TestApplyBodyReadDeadline_DoesNotCancelSlowHandler: a handler that reads the body then
    +// works past the deadline still returns 200 — the deadline bounds the body read, not
    +// post-read CPU work (e.g. a VM query) or the response write. (clear() has no observable
    +// effect to assert in standard net/http, so this checks the property that matters.)
    +func TestApplyBodyReadDeadline_DoesNotCancelSlowHandler(t *testing.T) {
    +	t.Parallel()
    +
    +	const deadline = 200 * time.Millisecond
    +
    +	handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
    +		applyBodyReadDeadline(w, r, deadline)
    +		if _, err := io.ReadAll(r.Body); err != nil {
    +			w.WriteHeader(http.StatusInternalServerError)
    +			return
    +		}
    +		time.Sleep(3 * deadline) // work well past the deadline
    +		w.WriteHeader(http.StatusOK)
    +	})
    +
    +	srv := httptest.NewServer(handler)
    +	defer srv.Close()
    +
    +	resp, err := http.Post(srv.URL, "text/plain", bytes.NewReader([]byte("hello")))
    +	require.Nil(t, err)
    +	_ = resp.Body.Close()
    +	require.Equal(t, http.StatusOK, resp.StatusCode,
    +		"the body deadline must not cancel a slow post-read handler")
    +}
    +
    +// TestRequestHasBody covers the predicate that gates the body deadline: bodiless requests
    +// (including the bodiless-GET websocket handshakes) are skipped.
    +func TestRequestHasBody(t *testing.T) {
    +	t.Parallel()
    +
    +	require.False(t, requestHasBody(&http.Request{ContentLength: 0}), "no body (incl. websocket handshake)")
    +	require.True(t, requestHasBody(&http.Request{ContentLength: 5}), "known body")
    +	require.True(t, requestHasBody(&http.Request{ContentLength: -1}), "chunked body")
    +}
    +
    +// TestApplyBodyReadDeadline_SkipsBodilessUpgrade: a bodiless GET with Upgrade: websocket
    +// is not wrapped (no deadline armed), so the /log and /subscribe streams are never severed.
    +func TestApplyBodyReadDeadline_SkipsBodilessUpgrade(t *testing.T) {
    +	t.Parallel()
    +
    +	handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
    +		before := r.Body
    +		applyBodyReadDeadline(w, r, BodyReadTimeout)
    +		// r.Body must be unchanged — a wrapped body would mean a deadline was armed.
    +		if r.Body != before {
    +			w.WriteHeader(http.StatusInternalServerError)
    +			return
    +		}
    +		w.WriteHeader(http.StatusOK)
    +	})
    +
    +	srv := httptest.NewServer(handler)
    +	defer srv.Close()
    +
    +	req, err := http.NewRequest(http.MethodGet, srv.URL, nil) // bodiless GET, like a handshake
    +	require.Nil(t, err)
    +	req.Header.Set("Connection", "Upgrade")
    +	req.Header.Set("Upgrade", "websocket")
    +
    +	resp, err := http.DefaultClient.Do(req)
    +	require.Nil(t, err)
    +	_ = resp.Body.Close()
    +	require.Equal(t, http.StatusOK, resp.StatusCode, "bodiless websocket handshake must not be wrapped")
    +}
    +
    +// TestNewHardenedServer_WebSocketStreamWorks: a real websocket upgrade survives the
    +// hardened server — frames round-trip across idle gaps without the stream being severed.
    +func TestNewHardenedServer_WebSocketStreamWorks(t *testing.T) {
    +	t.Parallel()
    +
    +	upgrader := websocket.Upgrader{CheckOrigin: func(*http.Request) bool { return true }}
    +	mux := http.NewServeMux()
    +	mux.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
    +		conn, err := upgrader.Upgrade(w, r, nil)
    +		if err != nil {
    +			return
    +		}
    +		defer func() { _ = conn.Close() }()
    +		for { // echo until the client closes
    +			mt, msg, err := conn.ReadMessage()
    +			if err != nil {
    +				return
    +			}
    +			if err := conn.WriteMessage(mt, msg); err != nil {
    +				return
    +			}
    +		}
    +	})
    +
    +	ln, err := net.Listen("tcp", "127.0.0.1:0")
    +	require.Nil(t, err)
    +	srv := NewHardenedServer(ln.Addr().String(), mux)
    +	go func() { _ = srv.Serve(ln) }()
    +	defer func() { _ = srv.Close() }()
    +
    +	dialer := websocket.Dialer{}
    +	wsURL := "ws://" + ln.Addr().String() + "/ws"
    +	conn, resp, err := dialer.Dial(wsURL, nil)
    +	require.Nil(t, err, "websocket upgrade must succeed through the hardened server")
    +	require.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode)
    +	defer func() { _ = conn.Close() }()
    +
    +	// Exchange frames with an idle gap between them to show the stream stays open and is
    +	// not cut by ReadHeaderTimeout/IdleTimeout/body deadline once upgraded.
    +	for i, gap := range []time.Duration{0, 250 * time.Millisecond, 250 * time.Millisecond} {
    +		time.Sleep(gap)
    +		want := fmt.Sprintf("frame-%d", i)
    +		require.Nil(t, conn.WriteMessage(websocket.TextMessage, []byte(want)))
    +		_ = conn.SetReadDeadline(time.Now().Add(2 * time.Second))
    +		_, got, err := conn.ReadMessage()
    +		require.Nil(t, err, "frame %d must round-trip", i)
    +		require.Equal(t, want, string(got))
    +	}
    +}
    
a8d6682f8009

[KLC-2428] harden REST listeners against slow-header DoS (GHSA-w4c6-7r69-w7j9)

https://github.com/klever-io/klever-goFernando SobreiraMay 30, 2026Fixed in 1.7.18via llm-release-walk
6 files changed · +305 2
  • cmd/seednode/api/api.go+4 1 modified
    @@ -12,6 +12,7 @@ import (
     	"github.com/gorilla/websocket"
     	logger "github.com/klever-io/klever-go-logger"
     	"github.com/klever-io/klever-go/core"
    +	"github.com/klever-io/klever-go/network/api/httpserver"
     	"github.com/klever-io/klever-go/network/api/logs"
     	"github.com/klever-io/klever-go/tools/marshal"
     )
    @@ -56,7 +57,9 @@ func Start(restAPIInterface string, marshalizer marshal.Marshalizer, messenger p
     
     	srv.registerRoutes(ws)
     
    -	return ws.Run(restAPIInterface)
    +	// Hardened http.Server instead of ws.Run: adds the ReadHeaderTimeout that
    +	// http.ListenAndServe lacks (slow-header DoS, GHSA-w4c6-7r69-w7j9).
    +	return httpserver.NewHardenedServer(restAPIInterface, ws.Handler()).ListenAndServe()
     }
     
     func (s *server) registerRoutes(ws *gin.Engine) {
    
  • network/api/api.go+4 1 modified
    @@ -14,6 +14,7 @@ import (
     	ginSwagger "github.com/swaggo/gin-swagger"
     
     	indexer "github.com/klever-io/klever-go/indexer"
    +	"github.com/klever-io/klever-go/network/api/httpserver"
     	clientSocket "github.com/klever-io/klever-go/websocket"
     
     	"github.com/gin-contrib/cors"
    @@ -95,7 +96,9 @@ func Start(ctx context.Context, kleverFacade MainAPIHandler, routesConfig config
     	ws.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler, ginSwagger.InstanceName(docs.SwaggerInfonode.InstanceName())))
     	RegisterRoutes(ctx, ws, routesConfig, kleverFacade)
     
    -	return ws.Run(kleverFacade.RestAPIInterface())
    +	// Hardened http.Server instead of ws.Run: adds the ReadHeaderTimeout that
    +	// http.ListenAndServe lacks (slow-header DoS, GHSA-w4c6-7r69-w7j9).
    +	return httpserver.NewHardenedServer(kleverFacade.RestAPIInterface(), ws.Handler()).ListenAndServe()
     }
     
     func registerRouteGroup(ws *gin.Engine, name string, routesConfig config.APIRoutesConfig, authHandler gin.HandlerFunc, register func(*wrapper.RouterWrapper)) {
    
  • network/api/httpserver/httpserver.go+61 0 added
    @@ -0,0 +1,61 @@
    +// Package httpserver builds the hardened *http.Server shared by both REST start
    +// paths (seednode and node). Gin's Engine.Run uses http.ListenAndServe with no
    +// ReadHeaderTimeout, leaving it open to slow-header connection exhaustion
    +// (GHSA-w4c6-7r69-w7j9); this helper hardens both listeners identically.
    +package httpserver
    +
    +import (
    +	"net/http"
    +	"time"
    +)
    +
    +const (
    +	// ReadHeaderTimeout is the slow-header (slowloris) mitigation: it bounds the
    +	// time to send the complete header. Header-only, so it is safe for the
    +	// long-lived websocket streams these APIs serve (cleared before hijack).
    +	ReadHeaderTimeout = 10 * time.Second
    +
    +	// IdleTimeout bounds how long an idle keep-alive connection stays open.
    +	IdleTimeout = 120 * time.Second
    +
    +	// MaxHeaderBytes caps request header size (Go's default, set explicitly).
    +	MaxHeaderBytes = 1 << 20 // 1 MiB
    +
    +	// MaxBodyBytes caps the request body. A single tx is bounded by the ~960 KiB
    +	// P2P wire limit (~1.9 MiB once JSON-encoded), so 4 MiB covers the largest
    +	// legitimate request with margin; bulk /transaction/broadcast is additionally
    +	// bounded by an explicit tx count. Over-cap bodies are refused (400 on bind,
    +	// 413 raw). Bounds body size, not read time — see the body read-deadline follow-up.
    +	MaxBodyBytes = 4 << 20 // 4 MiB
    +)
    +
    +// NewHardenedServer returns an *http.Server for addr serving handler, hardened
    +// against slow-header exhaustion and oversized bodies. ReadTimeout/WriteTimeout
    +// are left unset on purpose: a whole-connection deadline would sever the
    +// long-lived websocket streams these APIs serve (/log, /subscribe).
    +func NewHardenedServer(addr string, handler http.Handler) *http.Server {
    +	return &http.Server{
    +		Addr:              addr,
    +		Handler:           limitRequestBody(handler),
    +		ReadHeaderTimeout: ReadHeaderTimeout,
    +		IdleTimeout:       IdleTimeout,
    +		MaxHeaderBytes:    MaxHeaderBytes,
    +	}
    +}
    +
    +// limitRequestBody caps every request body at MaxBodyBytes.
    +func limitRequestBody(next http.Handler) http.Handler {
    +	return limitRequestBodyN(next, MaxBodyBytes)
    +}
    +
    +// limitRequestBodyN caps each request body at limit bytes. Applied ahead of gin so
    +// w is the *http.response MaxBytesReader needs to close an over-cap connection.
    +// Websocket upgrades hijack the connection and never read r.Body, so are unaffected.
    +func limitRequestBodyN(next http.Handler, limit int64) http.Handler {
    +	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
    +		if r.Body != nil {
    +			r.Body = http.MaxBytesReader(w, r.Body, limit)
    +		}
    +		next.ServeHTTP(w, r)
    +	})
    +}
    
  • network/api/httpserver/httpserver_test.go+150 0 added
    @@ -0,0 +1,150 @@
    +package httpserver
    +
    +import (
    +	"bufio"
    +	"bytes"
    +	"fmt"
    +	"io"
    +	"net"
    +	"net/http"
    +	"net/http/httptest"
    +	"testing"
    +	"time"
    +
    +	"github.com/stretchr/testify/require"
    +)
    +
    +// TestNewHardenedServer_SetsSlowHeaderDefenses guards GHSA-w4c6-7r69-w7j9: the
    +// hardening fields are set, and ReadTimeout/WriteTimeout stay zero so the APIs'
    +// long-lived websocket streams are not severed.
    +func TestNewHardenedServer_SetsSlowHeaderDefenses(t *testing.T) {
    +	t.Parallel()
    +
    +	srv := NewHardenedServer(":8080", http.NewServeMux())
    +
    +	require.Equal(t, ":8080", srv.Addr)
    +	require.NotNil(t, srv.Handler)
    +	require.Equal(t, ReadHeaderTimeout, srv.ReadHeaderTimeout)
    +	require.Positive(t, srv.ReadHeaderTimeout, "ReadHeaderTimeout must be set to defeat slow-header DoS")
    +	require.Equal(t, IdleTimeout, srv.IdleTimeout)
    +	require.Equal(t, MaxHeaderBytes, srv.MaxHeaderBytes)
    +
    +	// Websocket safety: a whole-connection deadline would kill long-lived streams.
    +	require.Zero(t, srv.ReadTimeout, "ReadTimeout must stay unset to not sever websocket streams")
    +	require.Zero(t, srv.WriteTimeout, "WriteTimeout must stay unset to not sever websocket streams")
    +}
    +
    +// TestNewHardenedServer_ClosesSlowHeaderConnection proves end-to-end that a connection
    +// whose header never completes is dropped, not pinned. Short ReadHeaderTimeout for speed.
    +func TestNewHardenedServer_ClosesSlowHeaderConnection(t *testing.T) {
    +	t.Parallel()
    +
    +	ln, err := net.Listen("tcp", "127.0.0.1:0")
    +	require.Nil(t, err)
    +
    +	srv := NewHardenedServer(ln.Addr().String(), http.NewServeMux())
    +	srv.ReadHeaderTimeout = 200 * time.Millisecond // tighten for a fast test
    +	go func() { _ = srv.Serve(ln) }()
    +	defer func() { _ = srv.Close() }()
    +
    +	conn, err := net.Dial("tcp", ln.Addr().String())
    +	require.Nil(t, err)
    +	defer func() { _ = conn.Close() }()
    +
    +	// Send headers but never the terminating blank line, so the header never completes.
    +	_, err = fmt.Fprint(conn, "GET / HTTP/1.1\r\nHost: x\r\n")
    +	require.Nil(t, err)
    +
    +	// With ReadHeaderTimeout the server drops the connection ~200ms in; a sub-second
    +	// return proves the defense (without it, the read blocks until our 3s deadline).
    +	start := time.Now()
    +	_ = conn.SetReadDeadline(time.Now().Add(3 * time.Second))
    +	_, _ = bufio.NewReader(conn).ReadString('\n')
    +	require.Less(t, time.Since(start), time.Second,
    +		"server must drop the stalled slow-header connection promptly (ReadHeaderTimeout)")
    +}
    +
    +// TestNewHardenedServer_DropsDrippingSlowHeader proves ReadHeaderTimeout is an absolute
    +// deadline, not a reset-on-read idle timer: a client actively dripping header bytes
    +// (never completing the header) is still dropped at ~ReadHeaderTimeout.
    +func TestNewHardenedServer_DropsDrippingSlowHeader(t *testing.T) {
    +	t.Parallel()
    +
    +	const headerTimeout = 400 * time.Millisecond
    +
    +	ln, err := net.Listen("tcp", "127.0.0.1:0")
    +	require.Nil(t, err)
    +
    +	srv := NewHardenedServer(ln.Addr().String(), http.NewServeMux())
    +	srv.ReadHeaderTimeout = headerTimeout // tighten for a fast test
    +	go func() { _ = srv.Serve(ln) }()
    +	defer func() { _ = srv.Close() }()
    +
    +	conn, err := net.Dial("tcp", ln.Addr().String())
    +	require.Nil(t, err)
    +	defer func() { _ = conn.Close() }()
    +
    +	_, err = fmt.Fprint(conn, "GET / HTTP/1.1\r\nHost: x\r\n")
    +	require.Nil(t, err)
    +
    +	// Drip header lines well under the timeout interval, never ending the header.
    +	dripStop := make(chan struct{})
    +	go func() {
    +		ticker := time.NewTicker(headerTimeout / 8)
    +		defer ticker.Stop()
    +		for i := 0; ; i++ {
    +			select {
    +			case <-dripStop:
    +				return
    +			case <-ticker.C:
    +				if _, werr := fmt.Fprintf(conn, "X-Pad-%d: y\r\n", i); werr != nil {
    +					return // server closed the connection
    +				}
    +			}
    +		}
    +	}()
    +	defer close(dripStop)
    +
    +	start := time.Now()
    +	_ = conn.SetReadDeadline(time.Now().Add(3 * time.Second))
    +	_, _ = bufio.NewReader(conn).ReadString('\n')
    +	elapsed := time.Since(start)
    +
    +	require.Less(t, elapsed, time.Second,
    +		"absolute ReadHeaderTimeout must drop an actively-dripping slow-header connection")
    +	require.GreaterOrEqual(t, elapsed, headerTimeout/2,
    +		"connection should survive until ~the header deadline, not be dropped on the first read")
    +}
    +
    +// TestNewHardenedServer_CapsRequestBody guards the body-size cap: a body over the
    +// limit is refused, one within it is read normally. Small explicit limit for speed.
    +func TestNewHardenedServer_CapsRequestBody(t *testing.T) {
    +	t.Parallel()
    +
    +	const limit = 16
    +
    +	// Status carries the read outcome, so assertions read only the response code —
    +	// no state shared across the server/test goroutines.
    +	handler := limitRequestBodyN(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
    +		if _, err := io.ReadAll(r.Body); err != nil {
    +			w.WriteHeader(http.StatusRequestEntityTooLarge)
    +			return
    +		}
    +		w.WriteHeader(http.StatusOK)
    +	}), limit)
    +
    +	srv := httptest.NewServer(handler)
    +	defer srv.Close()
    +
    +	// Over the cap: MaxBytesReader fails the body read, so the handler replies 413.
    +	resp, err := http.Post(srv.URL, "application/octet-stream", bytes.NewReader(make([]byte, limit+1)))
    +	require.Nil(t, err)
    +	_ = resp.Body.Close()
    +	require.Equal(t, http.StatusRequestEntityTooLarge, resp.StatusCode, "body over the cap must be refused")
    +
    +	// Within the cap: the body reads cleanly and the handler replies 200.
    +	resp, err = http.Post(srv.URL, "application/octet-stream", bytes.NewReader(make([]byte, limit)))
    +	require.Nil(t, err)
    +	_ = resp.Body.Close()
    +	require.Equal(t, http.StatusOK, resp.StatusCode, "body within the cap must be accepted")
    +}
    
  • network/api/transaction/routes.go+18 0 modified
    @@ -41,6 +41,11 @@ const (
     	queryParamWithResults     = "withResults"
     )
     
    +// maxBulkBroadcastTxs caps the transactions a single /transaction/broadcast may
    +// carry, bounding per-request work by intent rather than incidentally by body size.
    +// Clients needing to submit more should send multiple requests.
    +const maxBulkBroadcastTxs = 100
    +
     // FacadeHandler interface defines methods that can be used by the gin webserver
     type FacadeHandler interface {
     	CreateTransaction(txType uint32, base *transaction.TXBaseInfo, contracts []json.RawMessage, skipValidate bool) (*transaction.Transaction, []byte, error)
    @@ -285,6 +290,19 @@ func BroadcastTX(c *gin.Context) {
     		return
     	}
     
    +	if len(gtx.TXs) > maxBulkBroadcastTxs {
    +		c.JSON(
    +			http.StatusBadRequest,
    +			shared.GenericAPIResponse{
    +				Data: nil,
    +				Error: fmt.Sprintf("%s: bulk broadcast exceeds the maximum of %d transactions per request, got %d",
    +					errors.ErrValidation.Error(), maxBulkBroadcastTxs, len(gtx.TXs)),
    +				Code: shared.ReturnCodeRequestError,
    +			},
    +		)
    +		return
    +	}
    +
     	txsHashes, err := facade.SendBulkTransactions(gtx.TXs)
     
     	if err != nil {
    
  • network/api/transaction/routes_test.go+68 0 modified
    @@ -508,6 +508,74 @@ func TestBroadcastTX_BulkTransactions_ShouldWork(t *testing.T) {
     	}
     }
     
    +func makeBroadcastTxs(n int) []*transaction.Transaction {
    +	txs := make([]*transaction.Transaction, n)
    +	for i := 0; i < n; i++ {
    +		txs[i] = transaction.NewBaseTransaction([]byte("sender"), uint64(i), nil, 0, 0)
    +	}
    +	return txs
    +}
    +
    +func TestBroadcastTX_BulkTransactions_ExceedsLimitShouldError(t *testing.T) {
    +	t.Parallel()
    +
    +	facade := mock.Facade{
    +		SendBulkTransactionsHandler: func(txs []*transaction.Transaction) ([]string, error) {
    +			require.Fail(t, "SendBulkTransactions must not be called when the batch exceeds the limit")
    +			return nil, nil
    +		},
    +	}
    +
    +	ws := startNodeServer(&facade)
    +
    +	requestData := tr.BroadcastTXRequest{
    +		TXs: makeBroadcastTxs(101),
    +	}
    +	requestBytes, _ := json.Marshal(requestData)
    +
    +	req, _ := http.NewRequest("POST", "/transaction/broadcast", bytes.NewReader(requestBytes))
    +	resp := httptest.NewRecorder()
    +	ws.ServeHTTP(resp, req)
    +
    +	response := shared.GenericAPIResponse{}
    +	loadResponse(resp.Body, &response)
    +
    +	assert.Equal(t, http.StatusBadRequest, resp.Code)
    +	assert.Equal(t, shared.ReturnCodeRequestError, response.Code)
    +	assert.True(t, strings.Contains(response.Error, apiErrors.ErrValidation.Error()))
    +	assert.True(t, strings.Contains(response.Error, "maximum of 100"))
    +	assert.Nil(t, response.Data)
    +}
    +
    +func TestBroadcastTX_BulkTransactions_AtLimitShouldWork(t *testing.T) {
    +	t.Parallel()
    +
    +	facade := mock.Facade{
    +		SendBulkTransactionsHandler: func(txs []*transaction.Transaction) ([]string, error) {
    +			require.Len(t, txs, 100)
    +			return make([]string, len(txs)), nil
    +		},
    +	}
    +
    +	ws := startNodeServer(&facade)
    +
    +	requestData := tr.BroadcastTXRequest{
    +		TXs: makeBroadcastTxs(100),
    +	}
    +	requestBytes, _ := json.Marshal(requestData)
    +
    +	req, _ := http.NewRequest("POST", "/transaction/broadcast", bytes.NewReader(requestBytes))
    +	resp := httptest.NewRecorder()
    +	ws.ServeHTTP(resp, req)
    +
    +	response := shared.GenericAPIResponse{}
    +	loadResponse(resp.Body, &response)
    +
    +	assert.Equal(t, http.StatusOK, resp.Code)
    +	assert.Equal(t, shared.ReturnCodeSuccess, response.Code)
    +	assert.Empty(t, response.Error)
    +}
    +
     func TestBroadcastTX_BulkTransactions_SendBulkTransactionsError(t *testing.T) {
     	t.Parallel()
     
    
25ba0ba4ae70

Merge commit from fork

https://github.com/klever-io/klever-goFernando SobreiraMay 29, 2026Fixed in 1.7.18via llm-release-walk
20 files changed · +1031 61
  • common/errors.go+12 0 modified
    @@ -784,6 +784,18 @@ var ErrDecompressionTooLarge = errors.New("decompressed payload exceeds maximum
     // ErrDecompressedSizeMismatch signals that the inflated payload size does not match the advertised DataSize
     var ErrDecompressedSizeMismatch = errors.New("decompressed payload size does not match advertised data size")
     
    +// ErrTooManyItemsInBatch signals that a received Batch carries more items than allowed
    +var ErrTooManyItemsInBatch = errors.New("too many items in batch")
    +
    +// ErrBatchWireTooLarge signals that an incoming Batch wire payload exceeds the
    +// pre-Unmarshal byte cap (MaxBatchWireSize / MaxHashArrayBuffSize).
    +// Distinct from ErrTooManyItemsInBatch so logs / metrics can tell a byte-size
    +// rejection apart from an entry-count rejection.
    +var ErrBatchWireTooLarge = errors.New("batch wire payload too large")
    +
    +// ErrProcessReceivedMessagePanicked signals that ProcessReceivedMessage recovered from a panic
    +var ErrProcessReceivedMessagePanicked = errors.New("process received message panicked")
    +
     // ErrInvalidParameter signals that a wrong parameter has been provided
     var ErrInvalidParameter = errors.New("invalid parameter")
     
    
  • common/mock/throttlerStub.go+16 0 modified
    @@ -1,12 +1,16 @@
     package mock
     
    +import "sync/atomic"
    +
     // ThrottlerStub -
     type ThrottlerStub struct {
     	CanProcessCalled      func() bool
     	StartProcessingCalled func()
     	EndProcessingCalled   func()
     	StartWasCalled        bool
     	EndWasCalled          bool
    +	startProcessingCount  int32
    +	endProcessingCount    int32
     }
     
     // CanProcess -
    @@ -21,6 +25,7 @@ func (ts *ThrottlerStub) CanProcess() bool {
     // StartProcessing -
     func (ts *ThrottlerStub) StartProcessing() {
     	ts.StartWasCalled = true
    +	atomic.AddInt32(&ts.startProcessingCount, 1)
     	if ts.StartProcessingCalled != nil {
     		ts.StartProcessingCalled()
     	}
    @@ -29,11 +34,22 @@ func (ts *ThrottlerStub) StartProcessing() {
     // EndProcessing -
     func (ts *ThrottlerStub) EndProcessing() {
     	ts.EndWasCalled = true
    +	atomic.AddInt32(&ts.endProcessingCount, 1)
     	if ts.EndProcessingCalled != nil {
     		ts.EndProcessingCalled()
     	}
     }
     
    +// StartProcessingCount returns the number of StartProcessing invocations.
    +func (ts *ThrottlerStub) StartProcessingCount() int32 {
    +	return atomic.LoadInt32(&ts.startProcessingCount)
    +}
    +
    +// EndProcessingCount returns the number of EndProcessing invocations.
    +func (ts *ThrottlerStub) EndProcessingCount() int32 {
    +	return atomic.LoadInt32(&ts.endProcessingCount)
    +}
    +
     // IsInterfaceNil -
     func (ts *ThrottlerStub) IsInterfaceNil() bool {
     	return ts == nil
    
  • core/partitioning/simpleDataPacker.go+5 2 modified
    @@ -57,9 +57,12 @@ func (sdp *SimpleDataPacker) PackDataInChunks(data [][]byte, limit int) ([][]byt
     			ba := &batch.Batch{Data: currentChunk}
     			err := ba.Compress(sdp.marshalizer)
     			if err != nil {
    -				continue
    +				return nil, err
    +			}
    +			marshaledChunk, err := sdp.marshalizer.Marshal(ba)
    +			if err != nil {
    +				return nil, err
     			}
    -			marshaledChunk, _ := sdp.marshalizer.Marshal(ba)
     			compressedSize += len(marshaledChunk)
     			returningBuff = append(returningBuff, marshaledChunk)
     			currentChunk = make([][]byte, 0)
    
  • core/partitioning/simpleDataPacker_test.go+33 0 modified
    @@ -2,12 +2,14 @@ package partitioning_test
     
     import (
     	"crypto/rand"
    +	"errors"
     	"testing"
     
     	"github.com/klever-io/klever-go/common"
     	"github.com/klever-io/klever-go/common/mock"
     	"github.com/klever-io/klever-go/core/partitioning"
     	"github.com/stretchr/testify/assert"
    +	"github.com/stretchr/testify/require"
     )
     
     func TestNewSimpleDataPacker_NilMarshalizerShouldErr(t *testing.T) {
    @@ -116,3 +118,34 @@ func TestSimpleSplitter_SendDataInChunksWithOnlyOneLargeElementShouldWork(t *tes
     	assert.Equal(t, 1, len(buffSent))
     	assert.Nil(t, checkExpectedElements(buffSent[0], marshalizer, [][]byte{elemLarge}))
     }
    +
    +// Regression: a Marshal error inside the chunk-flush branch must propagate to
    +// the caller, not silently drop the chunk and the current element.
    +func TestSimpleDataPacker_PackDataInChunks_PropagatesMarshalError(t *testing.T) {
    +	t.Parallel()
    +
    +	expectedErr := errors.New("forced marshal error")
    +	// Fail only on Batch marshal (the Compress path internally marshals the
    +	// Batch). The trailing-block already returns errors; this exercises the
    +	// in-loop flush branch.
    +	marshalizer := &mock.MarshalizerStub{
    +		MarshalCalled: func(obj interface{}) ([]byte, error) {
    +			return nil, expectedErr
    +		},
    +		UnmarshalCalled: func(obj interface{}, buff []byte) error {
    +			return nil
    +		},
    +	}
    +
    +	sdp, err := partitioning.NewSimpleDataPacker(marshalizer)
    +	require.NoError(t, err)
    +
    +	// Two elements summing to >= limit so the in-loop flush branch is entered.
    +	elem1 := make([]byte, 600)
    +	elem2 := make([]byte, 600)
    +	buffSent, err := sdp.PackDataInChunks([][]byte{elem1, elem2}, 1000)
    +
    +	require.Error(t, err)
    +	require.ErrorIs(t, err, expectedErr)
    +	assert.Nil(t, buffSent)
    +}
    
  • core/process/errors.go+12 4 modified
    @@ -1,6 +1,10 @@
     package process
     
    -import "errors"
    +import (
    +	"errors"
    +
    +	"github.com/klever-io/klever-go/common"
    +)
     
     // ErrNilQuotaStatusHandler signals that a nil quota status handler has been provided
     var ErrNilQuotaStatusHandler = errors.New("nil quota status handler")
    @@ -56,9 +60,13 @@ var ErrEmptyPeerID = errors.New("empty peer ID")
     // ErrNoDataInMessage signals that no data was found after parsing received p2p message
     var ErrNoDataInMessage = errors.New("no data found in received message")
     
    -// ErrTooManyItemsInBatch signals that a received Batch carries more items than allowed,
    -// guarding against pre-allocation amplification (CWE-789 / CWE-770).
    -var ErrTooManyItemsInBatch = errors.New("too many items in batch")
    +// ErrTooManyItemsInBatch aliases common.ErrTooManyItemsInBatch so every Batch consumer
    +// shares a single sentinel under errors.Is.
    +var ErrTooManyItemsInBatch = common.ErrTooManyItemsInBatch
    +
    +// ErrBatchWireTooLarge aliases common.ErrBatchWireTooLarge for the byte-size
    +// pre-Unmarshal rejection path; see common.ErrBatchWireTooLarge.
    +var ErrBatchWireTooLarge = common.ErrBatchWireTooLarge
     
     // ErrInterceptedDataNotForCurrentShard signals that intercepted data is not for current shard
     var ErrInterceptedDataNotForCurrentShard = errors.New("intercepted data not for current shard")
    
  • core/process/interceptors/multiDataInterceptor.go+9 17 modified
    @@ -12,16 +12,9 @@ import (
     	"github.com/klever-io/klever-go/tools/marshal"
     )
     
    -// MaxItemsPerBatch is the hard upper bound on the number of items a single P2P Batch
    -// may carry. It guards ProcessReceivedMessage's pre-allocation
    -// (make([]process.InterceptedData, len(b.Data))) against attacker-controlled lengths
    -// that would otherwise force ~16 B per entry of allocation before any anti-flood check.
    -// See GHSA-74m6-4hjp-7226 / KLC-2353 (CWE-789 / CWE-770).
    -//
    -// Sized at 8192 — comfortably above the ~1700 minimum-sized txs that fit inside
    -// core.MaxBulkTransactionSize (256 KiB), and above any legitimate trie-node response
    -// bounded by core.MaxBufferSizeToSendTrieNodes (also 256 KiB).
    -const MaxItemsPerBatch = 8192
    +// MaxItemsPerBatch aliases batch.MaxItemsPerBatch so every Batch consumer shares a
    +// single cap. See GHSA-74m6-4hjp-7226 / KLC-2353 and GHSA-w342-mj6g-v9c4.
    +const MaxItemsPerBatch = batch.MaxItemsPerBatch
     
     // ArgMultiDataInterceptor is the argument for the multi-data interceptor
     type ArgMultiDataInterceptor struct {
    @@ -105,13 +98,12 @@ func (mdi *MultiDataInterceptor) ProcessReceivedMessage(message p2p.MessageP2P,
     		}
     	}()
     
    -	// We deliberately do NOT add a wire-size pre-check before Unmarshal:
    -	// ProcessReceivedMessage is only invoked from networkMessenger.pubsubCallback
    -	// (network/p2p/libp2p/netMessenger.go), so the libp2p pubsub
    -	// DefaultMaxMessageSize (1 MiB) is always upstream of us. A duplicate cap at
    -	// our layer would be dead code today. If pubsub.WithMaxMessageSize is ever
    -	// raised, reintroduce a MaxRawBatchSize check here. See GHSA-74m6-4hjp-7226 /
    -	// KLC-2353.
    +	// Reject oversized wire payloads before Unmarshal; see MaxBatchWireSize doc.
    +	// No blacklist (logical bound, antiflood gates abuse). Currently unreachable
    +	// under default libp2p; forward-defense for raised pubsub.WithMaxMessageSize.
    +	if len(message.Data()) > batch.MaxBatchWireSize {
    +		return process.ErrBatchWireTooLarge
    +	}
     	b := batch.Batch{}
     	err = mdi.marshalizer.Unmarshal(&b, message.Data())
     	if err != nil {
    
  • core/process/interceptors/multiDataInterceptor_test.go+74 0 modified
    @@ -17,6 +17,7 @@ import (
     	"github.com/klever-io/klever-go/core/throttler"
     	"github.com/klever-io/klever-go/data/batch"
     	"github.com/klever-io/klever-go/tools/check"
    +	"github.com/klever-io/klever-go/tools/marshal/factory"
     	"github.com/stretchr/testify/assert"
     	"github.com/stretchr/testify/require"
     )
    @@ -822,6 +823,79 @@ func TestMultiDataInterceptor_ProcessReceivedMessage_CompressedItemCountBomb_Rel
     		"the post-Decompress items-per-batch rejection path must release the throttler slot")
     }
     
    +// Regression GHSA-w342-mj6g-v9c4 defense-in-depth: oversized wire payload must
    +// be rejected before Unmarshal (slice-header amplification window).
    +func TestMultiDataInterceptor_ProcessReceivedMessage_RejectsOversizedWirePayload(t *testing.T) {
    +	t.Parallel()
    +
    +	marshalizer := &mock.MarshalizerMock{}
    +
    +	// Junk bytes (not a marshaled Batch): on vulnerable code Unmarshal would
    +	// either error or decode to empty; only the fix returns ErrTooManyItemsInBatch.
    +	payload := make([]byte, batch.MaxBatchWireSize+1)
    +
    +	countingThrottler := &mock.InterceptorThrottlerStub{
    +		CanProcessCalled: func() bool { return true },
    +	}
    +
    +	arg := createMockArgMultiDataInterceptor()
    +	arg.Marshalizer = marshalizer
    +	arg.Throttler = countingThrottler
    +
    +	mdi, err := interceptors.NewMultiDataInterceptor(arg)
    +	require.NoError(t, err)
    +
    +	msg := &mock.P2PMessageMock{
    +		DataField:  payload,
    +		PeerField:  core.PeerID("origin-peer"),
    +		SeqNoField: []byte("seq-1"),
    +	}
    +
    +	processErr := mdi.ProcessReceivedMessage(msg, fromConnectedPeerID)
    +	require.ErrorIs(t, processErr, process.ErrBatchWireTooLarge)
    +	assert.Equal(t, int32(1), countingThrottler.StartProcessingCount())
    +	assert.Equal(t, int32(1), countingThrottler.EndProcessingCount(),
    +		"the wire-size rejection path must release the throttler slot")
    +}
    +
    +// Regression GHSA-w342-mj6g-v9c4: real amplification pattern. Payload is `0x0a 0x00` × N —
    +// proto3 field-1 LEN tag + length-0 varint = one empty `repeated bytes` entry per pair.
    +// Pre-check must reject on byte length before Unmarshal allocates N empty []byte headers.
    +func TestMultiDataInterceptor_ProcessReceivedMessage_RejectsRealAmplificationPattern(t *testing.T) {
    +	t.Parallel()
    +
    +	// Proto marshalizer so the pattern decodes (not just a size rejection) if pre-check is bypassed.
    +	protoMarsh, err := factory.NewMarshalizer(factory.ProtoMarshalizer)
    +	require.NoError(t, err)
    +
    +	const numEmptyEntries = batch.MaxBatchWireSize/2 + 1
    +	payload := bytes.Repeat([]byte{0x0a, 0x00}, numEmptyEntries)
    +
    +	countingThrottler := &mock.InterceptorThrottlerStub{
    +		CanProcessCalled: func() bool { return true },
    +	}
    +
    +	arg := createMockArgMultiDataInterceptor()
    +	arg.Marshalizer = protoMarsh
    +	arg.Throttler = countingThrottler
    +
    +	mdi, err := interceptors.NewMultiDataInterceptor(arg)
    +	require.NoError(t, err)
    +
    +	msg := &mock.P2PMessageMock{
    +		DataField:  payload,
    +		PeerField:  core.PeerID("origin-peer"),
    +		SeqNoField: []byte("seq-1"),
    +	}
    +
    +	processErr := mdi.ProcessReceivedMessage(msg, fromConnectedPeerID)
    +	require.ErrorIs(t, processErr, process.ErrBatchWireTooLarge,
    +		"wire-size pre-check must fire before proto Unmarshal allocates %d empty entries", numEmptyEntries)
    +	assert.Equal(t, int32(1), countingThrottler.StartProcessingCount())
    +	assert.Equal(t, int32(1), countingThrottler.EndProcessingCount(),
    +		"the wire-size rejection path must release the throttler slot")
    +}
    +
     //------- regression: data race on bdi.debugHandler from worker goroutine (Finding 4.2)
     
     // MultiDataInterceptor.ProcessReceivedMessage spawns a worker goroutine that
    
  • data/batch/batch.go+31 9 modified
    @@ -12,15 +12,33 @@ import (
     	"github.com/klever-io/klever-go/tools/marshal"
     )
     
    -// MaxDecompressedBatchSize is the hard upper bound on the inflated size of a Batch payload.
    -// It guards against gzip "decompression bomb" attacks where a tiny wire payload expands to
    -// many gigabytes inside io.ReadAll. See GHSA-74m6-4hjp-7226 / KLC-2352 (CWE-409).
    -//
    -// Sized at 10 MiB — ~40x the legitimate single-batch ceiling enforced upstream
    -// (core.MaxBulkTransactionSize and core.MaxBufferSizeToSendTrieNodes are both 256 KiB),
    -// equal to one second of the outOfSpecs per-peer antiflood budget, and well below the
    -// 30-second blacklist threshold (~36 MiB).
    -const MaxDecompressedBatchSize = 10 * 1024 * 1024
    +// MaxBatchWireSize caps the COMPRESSED wire payload at the multi-data interceptor
    +// pre-Unmarshal site. Equal to libp2p's DefaultMaxMessageSize; coordinated with
    +// the tighter outbound network/p2p/libp2p.maxSendBuffSize (1 MiB − 64 KiB framing).
    +// Does NOT bound Decompress output — see MaxDecompressedBatchSize for that.
    +// See GHSA-74m6-4hjp-7226 / KLC-2352 (CWE-409), GHSA-w342-mj6g-v9c4.
    +const MaxBatchWireSize = 1 << 20 // 1 MiB
    +
    +// MaxDecompressedBatchSize caps the INFLATED output of Batch.Decompress.
    +// Sized at 2 MiB — ~2× the worst-case legitimate marshaled batch, which is a
    +// single oversized-element chunk carrying one tx near core.MaxDataSize (1 MiB)
    +// plus tx + Batch proto framing. Distinct from MaxBatchWireSize because a
    +// highly-compressible payload below the wire cap can legitimately inflate
    +// past 1 MiB on Decompress. Bounds transient slice-header amplification to
    +// ~24 MB before MaxItemsPerBatch fires.
    +// See GHSA-74m6-4hjp-7226 / KLC-2352 (CWE-409), GHSA-w342-mj6g-v9c4.
    +const MaxDecompressedBatchSize = 1 << 21 // 2 MiB
    +
    +// MaxItemsPerBatch caps Batch entries. A byte cap alone isn't enough: 2 MiB of
    +// mostly-empty proto entries still encodes ~1 M items.
    +// See GHSA-w342-mj6g-v9c4 and GHSA-74m6-4hjp-7226 / KLC-2353.
    +const MaxItemsPerBatch = 8192
    +
    +// MaxHashArrayBuffSize is the tighter pre-Unmarshal cap for resolver hash-array
    +// requests. Sized at 512 KiB — ~1.9× the security ceiling
    +// (MaxItemsPerBatch × ~34 B proto framing for 32-byte hash entries ≈ 272 KiB).
    +// Defense-in-depth for GHSA-w342-mj6g-v9c4.
    +const MaxHashArrayBuffSize = 1 << 19 // 512 KiB
     
     // New returns a new batch from given buffers
     func New(buffs ...[]byte) *Batch {
    @@ -154,6 +172,10 @@ func (ba *Batch) Decompress(m marshal.Marshalizer) error {
     		return err
     	}
     
    +	if len(ba.Data) > MaxItemsPerBatch {
    +		return common.ErrTooManyItemsInBatch
    +	}
    +
     	ba.Stream = nil
     	ba.IsCompressed = false
     	return nil
    
  • data/batch/batch_test.go+112 0 modified
    @@ -180,6 +180,118 @@ func TestDecompress_RejectsDataSizeMismatch(t *testing.T) {
     	}
     }
     
    +// Regression: GHSA-w342-mj6g-v9c4 — inflated entry count over MaxItemsPerBatch.
    +func TestDecompress_RejectsItemCountBomb(t *testing.T) {
    +	t.Parallel()
    +
    +	internalMarshalizer, err := factory.NewMarshalizer(factory.ProtoMarshalizer)
    +	require.NoError(t, err)
    +
    +	bomb := &batch.Batch{
    +		Algo: batch.CType_GZip,
    +		Data: make([][]byte, batch.MaxItemsPerBatch+1),
    +	}
    +	require.NoError(t, bomb.Compress(internalMarshalizer))
    +
    +	tampered := &batch.Batch{
    +		IsCompressed: true,
    +		Algo:         bomb.Algo,
    +		Stream:       bomb.Stream,
    +		DataSize:     bomb.DataSize,
    +	}
    +
    +	err = tampered.Decompress(internalMarshalizer)
    +	require.Error(t, err)
    +	require.Truef(t, errors.Is(err, common.ErrTooManyItemsInBatch),
    +		"expected ErrTooManyItemsInBatch, got %v", err)
    +}
    +
    +// Pins the cap boundary against off-by-one regressions.
    +func TestDecompress_AcceptsItemCountAtCap(t *testing.T) {
    +	t.Parallel()
    +
    +	internalMarshalizer, err := factory.NewMarshalizer(factory.ProtoMarshalizer)
    +	require.NoError(t, err)
    +
    +	ok := &batch.Batch{
    +		Algo: batch.CType_GZip,
    +		Data: make([][]byte, batch.MaxItemsPerBatch),
    +	}
    +	require.NoError(t, ok.Compress(internalMarshalizer))
    +	require.NoError(t, ok.Decompress(internalMarshalizer))
    +	require.Equal(t, batch.MaxItemsPerBatch, len(ok.Data))
    +}
    +
    +// Pins the Decompress cap boundary against off-by-one regressions.
    +// Builds a Batch whose marshaled-pre-compression size equals exactly
    +// MaxDecompressedBatchSize — one repeated-bytes entry of size N has framing
    +// of tag(1B) + length-varint(varies) + N. For N up to 2^21 − 1 the length
    +// varint is 3 B, so entrySize = MaxDecompressedBatchSize − 4.
    +func TestDecompress_AcceptsAtDecompressedBoundary(t *testing.T) {
    +	t.Parallel()
    +
    +	internalMarshalizer, err := factory.NewMarshalizer(factory.ProtoMarshalizer)
    +	require.NoError(t, err)
    +
    +	const entrySize = batch.MaxDecompressedBatchSize - 4
    +
    +	b := &batch.Batch{
    +		Algo: batch.CType_GZip,
    +		Data: [][]byte{make([]byte, entrySize)},
    +	}
    +	require.NoError(t, b.Compress(internalMarshalizer))
    +	require.Equal(t, int32(batch.MaxDecompressedBatchSize), b.DataSize, // #nosec G115 — fits int32 by construction
    +		"sanity: marshaled-pre-compression size must equal MaxDecompressedBatchSize")
    +
    +	incoming := &batch.Batch{
    +		IsCompressed: true,
    +		Algo:         b.Algo,
    +		Stream:       b.Stream,
    +		DataSize:     b.DataSize,
    +	}
    +	require.NoError(t, incoming.Decompress(internalMarshalizer),
    +		"Decompress must accept an inflated payload of exactly MaxDecompressedBatchSize")
    +	require.Equal(t, 1, len(incoming.Data))
    +	require.Equal(t, entrySize, len(incoming.Data[0]))
    +}
    +
    +// Regression for the F1 finding: a max-cap single-tx batch's marshaled size
    +// EXCEEDS MaxBatchWireSize by the size of proto framing. That payload must
    +// still round-trip cleanly through Compress → Decompress — which is the whole
    +// reason MaxDecompressedBatchSize is sized above MaxBatchWireSize. If
    +// MaxDecompressedBatchSize ever drops to MaxBatchWireSize, this test fails.
    +func TestDecompress_AcceptsMaxCapSingleTxBatch(t *testing.T) {
    +	t.Parallel()
    +
    +	internalMarshalizer, err := factory.NewMarshalizer(factory.ProtoMarshalizer)
    +	require.NoError(t, err)
    +
    +	// One entry sized at core.MaxDataSize (1 MiB) — proxy for a max-cap tx.
    +	// Marshaled batch is 1 + 3 + (1 << 20) = 1 MiB + 4 bytes, i.e. just above
    +	// MaxBatchWireSize. The all-zero content is realistic for SC-deploy
    +	// bytecode with padding and is the case that triggers the F1 gap.
    +	const txInner = 1 << 20
    +
    +	b := &batch.Batch{
    +		Algo: batch.CType_GZip,
    +		Data: [][]byte{make([]byte, txInner)},
    +	}
    +	require.NoError(t, b.Compress(internalMarshalizer))
    +	require.Greater(t, b.DataSize, int32(batch.MaxBatchWireSize), // #nosec G115 — fits int32 by construction
    +		"sanity: a max-cap single-tx batch's marshaled size DOES exceed MaxBatchWireSize — that's the framing gap MaxDecompressedBatchSize accommodates")
    +
    +	incoming := &batch.Batch{
    +		IsCompressed: true,
    +		Algo:         b.Algo,
    +		Stream:       b.Stream,
    +		DataSize:     b.DataSize,
    +	}
    +	require.NoError(t, incoming.Decompress(internalMarshalizer),
    +		"Decompress must accept a max-cap single-tx batch (DataSize > MaxBatchWireSize but ≤ MaxDecompressedBatchSize)")
    +	require.Equal(t, 1, len(incoming.Data))
    +	require.Equal(t, txInner, len(incoming.Data[0]))
    +}
    +
     func BenchmarkCompress(b *testing.B) {
     	algos := []batch.CType{batch.CType_GZip, batch.CType_LZ4}
     	sizes := []int{100, 1000, 3000}
    
  • data/retriever/resolvers/headerResolver.go+17 2 modified
    @@ -1,6 +1,9 @@
     package resolvers
     
     import (
    +	"fmt"
    +	"runtime/debug"
    +
     	logger "github.com/klever-io/klever-go-logger"
     	"github.com/klever-io/klever-go/common"
     	"github.com/klever-io/klever-go/core"
    @@ -106,8 +109,17 @@ func (hdrRes *HeaderResolver) SetEpochHandler(epochHandler retriever.EpochHandle
     
     // ProcessReceivedMessage will be the callback func from the p2p.Messenger and will be called each time a new message was received
     // (for the topic this validator was registered to, usually a request topic)
    -func (hdrRes *HeaderResolver) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error {
    -	err := hdrRes.canProcessMessage(message, fromConnectedPeer)
    +func (hdrRes *HeaderResolver) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID) (err error) {
    +	defer func() {
    +		if r := recover(); r != nil {
    +			log.Error("HeaderResolver.ProcessReceivedMessage panicked",
    +				"panic", r,
    +				"stack", string(debug.Stack()))
    +			err = fmt.Errorf("%w: %v", common.ErrProcessReceivedMessagePanicked, r)
    +		}
    +	}()
    +
    +	err = hdrRes.canProcessMessage(message, fromConnectedPeer)
     	if err != nil {
     		return err
     	}
    @@ -194,6 +206,9 @@ func (hdrRes *HeaderResolver) searchInCache(nonce uint64) ([]byte, error) {
     	if err != nil {
     		return nil, err
     	}
    +	if len(headers) == 0 {
    +		return nil, common.ErrMissingData
    +	}
     
     	hdr := headers[len(headers)-1]
     	buff, err := hdrRes.marshalizer.Marshal(hdr)
    
  • data/retriever/resolvers/headerResolver_test.go+58 0 modified
    @@ -695,3 +695,61 @@ func TestHeaderResolver_SetAndGetNumPeersToQuery(t *testing.T) {
     	assert.Equal(t, expectedIntra, actualIntra)
     	assert.Equal(t, expectedCross, actualCross)
     }
    +
    +// Regression: searchInCache must not panic when the headers pool returns
    +// (emptySlice, _, nil). HeadersCacherStub default returns exactly that.
    +func TestHeaderResolver_ProcessReceivedMessage_NonceType_EmptyHeadersNoPanic(t *testing.T) {
    +	t.Parallel()
    +
    +	arg := createMockArgHeaderResolver()
    +	// Force the storage lookup to fail so the code falls through to searchInCache.
    +	arg.HeadersNoncesStorage = &mock.StorerStub{
    +		SearchFirstCalled: func(key []byte) ([]byte, error) {
    +			return nil, errKeyNotFound
    +		},
    +	}
    +	// arg.Headers (HeadersCacherStub) returns (nil, nil, nil) by default — the
    +	// exact empty-with-nil-err state that previously panicked at headers[len-1].
    +
    +	hdrRes, err := resolvers.NewHeaderResolver(arg)
    +	require.NoError(t, err)
    +
    +	nonceBytes := arg.NonceConverter.ToByteSlice(uint64(42))
    +	processErr := hdrRes.ProcessReceivedMessage(
    +		createRequestMsg(retriever.RequestDataType_NonceType, nonceBytes),
    +		fromConnectedPeerId,
    +	)
    +	require.Error(t, processErr)
    +	require.Truef(t, errors.Is(processErr, common.ErrMissingData),
    +		"expected ErrMissingData, got %v", processErr)
    +	assert.True(t, arg.Throttler.(*mock.ThrottlerStub).StartWasCalled)
    +	assert.True(t, arg.Throttler.(*mock.ThrottlerStub).EndWasCalled)
    +}
    +
    +// Regression: a panic in any dependency must be recovered by
    +// ProcessReceivedMessage rather than killing the message-handling goroutine.
    +func TestHeaderResolver_ProcessReceivedMessage_RecoversFromPanic(t *testing.T) {
    +	t.Parallel()
    +
    +	arg := createMockArgHeaderResolver()
    +	// Inject a marshalizer whose Unmarshal panics — this fires inside
    +	// parseReceivedMessage, after canProcessMessage / throttler.Start.
    +	arg.Marshalizer = &mock.MarshalizerStub{
    +		UnmarshalCalled: func(obj interface{}, buff []byte) error {
    +			panic("injected panic during Unmarshal")
    +		},
    +	}
    +
    +	hdrRes, err := resolvers.NewHeaderResolver(arg)
    +	require.NoError(t, err)
    +
    +	processErr := hdrRes.ProcessReceivedMessage(
    +		createRequestMsg(retriever.RequestDataType_HashType, []byte("x")),
    +		fromConnectedPeerId,
    +	)
    +	require.Error(t, processErr)
    +	require.Truef(t, errors.Is(processErr, common.ErrProcessReceivedMessagePanicked),
    +		"expected ErrProcessReceivedMessagePanicked, got %v", processErr)
    +	assert.True(t, arg.Throttler.(*mock.ThrottlerStub).StartWasCalled)
    +	assert.True(t, arg.Throttler.(*mock.ThrottlerStub).EndWasCalled)
    +}
    
  • data/retriever/resolvers/topicResolverSender/topicResolverSender.go+6 5 modified
    @@ -105,12 +105,13 @@ func (trs *topicResolverSender) SendOnRequestTopic(rd *retriever.RequestData, or
     	}
     
     	topicToSendRequest := trs.topicName + topicRequestSuffix
    +	numConsensusPeers, numCommonPeers := trs.NumPeersToQuery()
     
     	commonPeers := trs.peerListCreator.PeerList()
    -	numSentCommon := trs.sendOnTopic(commonPeers, topicToSendRequest, buff, trs.numCommonPeers, "common peer")
    +	numSentCommon := trs.sendOnTopic(commonPeers, topicToSendRequest, buff, numCommonPeers, "common peer")
     
     	consensusPeers := trs.peerListCreator.ConsensusPeerList()
    -	numSentConsensus := trs.sendOnTopic(consensusPeers, topicToSendRequest, buff, trs.numConsensusPeers, "consensus peer")
    +	numSentConsensus := trs.sendOnTopic(consensusPeers, topicToSendRequest, buff, numConsensusPeers, "consensus peer")
     
     	trs.callDebugHandler(originalHashes, numSentConsensus, numSentCommon)
     
    @@ -146,9 +147,9 @@ func (trs *topicResolverSender) SendOnRequestTopicTo(rd *retriever.RequestData,
     		}
     	}
     
    -	numConsensusPeers := trs.numConsensusPeers
    -	if numSentDirect > 0 {
    -		// if sent to origin, remove one from max to send
    +	numConsensusPeers, _ := trs.NumPeersToQuery()
    +	if numSentDirect > 0 && numConsensusPeers > 0 {
    +		// origin already got direct send; the > 0 guard prevents fan-out on NumConsensusPeers=0
     		numConsensusPeers = numConsensusPeers - 1
     	}
     	numSentConsensus := trs.sendOnTopic(consensusPeers, topicToSendRequest, buff, numConsensusPeers, "consensus peer")
    
  • data/retriever/resolvers/topicResolverSender/topicResolverSender_test.go+94 0 modified
    @@ -14,6 +14,7 @@ import (
     	"github.com/klever-io/klever-go/network/p2p"
     	"github.com/klever-io/klever-go/tools/check"
     	"github.com/stretchr/testify/assert"
    +	"github.com/stretchr/testify/require"
     )
     
     var defaultHashes = [][]byte{[]byte("hash")}
    @@ -512,3 +513,96 @@ func TestTopicResolverSender_NumPeersToQueryr(t *testing.T) {
     	assert.Equal(t, intra, recoveredIntra)
     	assert.Equal(t, cross, recoveredCross)
     }
    +
    +// Regression: SendOnRequestTopicTo must not fan out to every consensus peer when
    +// the resolver is configured with NumConsensusPeers=0 (a valid config when
    +// NumCommonPeers>=2). The pre-fix code computed numConsensusPeers-1 = -1 and the
    +// sendOnTopic break (`msgSentCounter == maxToSend`) never fired, broadcasting to
    +// the entire consensus list. With the clamp, the per-call budget stays at 0
    +// (which sendOnTopic floors to 1), so at most one consensus peer is contacted.
    +func TestTopicResolverSender_SendOnRequestTopicTo_ZeroConsensusPeersNoFanout(t *testing.T) {
    +	t.Parallel()
    +
    +	directPeer := core.PeerID("direct")
    +	consensusList := []core.PeerID{
    +		core.PeerID("c1"), core.PeerID("c2"), core.PeerID("c3"),
    +		core.PeerID("c4"), core.PeerID("c5"),
    +	}
    +
    +	consensusSendCount := 0
    +	directSendCount := 0
    +	arg := createMockArgTopicResolverSender()
    +	arg.NumConsensusPeers = 0
    +	arg.NumCommonPeers = 2 // satisfy Common+Consensus >= 2 constructor validation
    +	arg.PeerListCreator = &mock.PeerListCreatorStub{
    +		PeerListCalled:          func() []core.PeerID { return nil },
    +		ConsensusPeerListCalled: func() []core.PeerID { return consensusList },
    +	}
    +	arg.Messenger = &mock.MessageHandlerStub{
    +		SendToConnectedPeerCalled: func(topic string, buff []byte, peerID core.PeerID) error {
    +			if bytes.Equal(peerID.Bytes(), directPeer.Bytes()) {
    +				directSendCount++
    +			} else {
    +				consensusSendCount++
    +			}
    +			return nil
    +		},
    +	}
    +	trs, err := topicResolverSender.NewTopicResolverSender(arg)
    +	require.NoError(t, err)
    +
    +	err = trs.SendOnRequestTopicTo(&retriever.RequestData{}, defaultHashes, directPeer)
    +	require.NoError(t, err)
    +
    +	assert.Equal(t, 1, directSendCount, "direct peer should receive exactly one send")
    +	assert.LessOrEqualf(t, consensusSendCount, 1, "without fan-out the consensus budget caps at 1 (sendOnTopic floors maxToSend=0 to 1); got %d", consensusSendCount)
    +}
    +
    +// Regression: SendOnRequestTopic and SendOnRequestTopicTo must read
    +// numConsensusPeers / numCommonPeers under the same mutex as SetNumPeersToQuery.
    +// Run with `go test -race` to confirm; without the fix the race detector flags
    +// concurrent read/write of numConsensusPeers / numCommonPeers.
    +func TestTopicResolverSender_ConcurrentSendAndSetNumPeers_NoDataRace(t *testing.T) {
    +	t.Parallel()
    +
    +	arg := createMockArgTopicResolverSender()
    +	arg.PeerListCreator = &mock.PeerListCreatorStub{
    +		PeerListCalled:          func() []core.PeerID { return []core.PeerID{"p1", "p2", "p3"} },
    +		ConsensusPeerListCalled: func() []core.PeerID { return []core.PeerID{"p4", "p5"} },
    +	}
    +	arg.Messenger = &mock.MessageHandlerStub{
    +		SendToConnectedPeerCalled: func(topic string, buff []byte, peerID core.PeerID) error {
    +			return nil
    +		},
    +	}
    +	trs, err := topicResolverSender.NewTopicResolverSender(arg)
    +	require.NoError(t, err)
    +
    +	rd := &retriever.RequestData{Type: retriever.RequestDataType_HashType, Value: []byte("x")}
    +
    +	const iters = 500
    +	done := make(chan struct{}, 3)
    +
    +	go func() {
    +		for i := 0; i < iters; i++ {
    +			trs.SetNumPeersToQuery(i%5, (i+1)%5)
    +		}
    +		done <- struct{}{}
    +	}()
    +	go func() {
    +		for i := 0; i < iters; i++ {
    +			_ = trs.SendOnRequestTopic(rd, defaultHashes)
    +		}
    +		done <- struct{}{}
    +	}()
    +	go func() {
    +		for i := 0; i < iters; i++ {
    +			_ = trs.SendOnRequestTopicTo(rd, defaultHashes, "peerX")
    +		}
    +		done <- struct{}{}
    +	}()
    +
    +	for i := 0; i < 3; i++ {
    +		<-done
    +	}
    +}
    
  • data/retriever/resolvers/transactionResolver.go+21 2 modified
    @@ -2,6 +2,7 @@ package resolvers
     
     import (
     	"fmt"
    +	"runtime/debug"
     
     	logger "github.com/klever-io/klever-go-logger"
     	"github.com/klever-io/klever-go/common"
    @@ -84,8 +85,17 @@ func NewTxResolver(arg ArgTxResolver) (*TxResolver, error) {
     
     // ProcessReceivedMessage will be the callback func from the p2p.Messenger and will be called each time a new message was received
     // (for the topic this validator was registered to, usually a request topic)
    -func (txRes *TxResolver) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error {
    -	err := txRes.canProcessMessage(message, fromConnectedPeer)
    +func (txRes *TxResolver) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID) (err error) {
    +	defer func() {
    +		if r := recover(); r != nil {
    +			log.Error("TxResolver.ProcessReceivedMessage panicked",
    +				"panic", r,
    +				"stack", string(debug.Stack()))
    +			err = fmt.Errorf("%w: %v", common.ErrProcessReceivedMessagePanicked, r)
    +		}
    +	}()
    +
    +	err = txRes.canProcessMessage(message, fromConnectedPeer)
     	if err != nil {
     		return err
     	}
    @@ -175,11 +185,20 @@ func (txRes *TxResolver) fetchTxAsByteSlice(hash []byte) ([]byte, error) {
     
     func (txRes *TxResolver) resolveTxRequestByHashArray(hashesBuff []byte, pid core.PeerID) error {
     	//TODO this can be optimized by searching in corresponding datapool (taken by topic name)
    +	// Reject oversized hash-array payloads before Unmarshal — see MaxHashArrayBuffSize.
    +	// No blacklist (logical bound, antiflood gates abuse).
    +	if len(hashesBuff) > batch.MaxHashArrayBuffSize {
    +		return common.ErrBatchWireTooLarge
    +	}
     	b := batch.Batch{}
     	err := txRes.marshalizer.Unmarshal(&b, hashesBuff)
     	if err != nil {
     		return err
     	}
    +	// Cap uncompressed batches; the compressed path is bounded inside b.Decompress.
    +	if len(b.Data) > batch.MaxItemsPerBatch {
    +		return common.ErrTooManyItemsInBatch
    +	}
     	if b.IsCompressed {
     		err = b.Decompress(txRes.marshalizer)
     		if err != nil {
    
  • data/retriever/resolvers/transactionResolver_test.go+154 0 modified
    @@ -16,6 +16,7 @@ import (
     	"github.com/klever-io/klever-go/tools/check"
     	"github.com/klever-io/klever-go/tools/marshal"
     	"github.com/stretchr/testify/assert"
    +	"github.com/stretchr/testify/require"
     )
     
     var connectedPeerId = core.PeerID("connected peer id")
    @@ -552,3 +553,156 @@ func TestTxResolver_SetAndGetNumPeersToQuery(t *testing.T) {
     	assert.Equal(t, expectedIntra, actualIntra)
     	assert.Equal(t, expectedCross, actualCross)
     }
    +
    +//------- regression: GHSA-w342-mj6g-v9c4
    +
    +func TestTxResolver_ProcessReceivedMessage_RejectsHashArrayItemBomb_Uncompressed(t *testing.T) {
    +	t.Parallel()
    +
    +	marshalizer := &mock.MarshalizerMock{}
    +
    +	arg := createMockArgTxResolver()
    +	arg.Marshalizer = marshalizer
    +	txRes, err := resolvers.NewTxResolver(arg)
    +	require.NoError(t, err)
    +
    +	bomb := &batch.Batch{Data: make([][]byte, batch.MaxItemsPerBatch+1)}
    +	buff, err := marshalizer.Marshal(bomb)
    +	require.NoError(t, err)
    +
    +	data, err := marshalizer.Marshal(&retriever.RequestData{
    +		Type:  retriever.RequestDataType_HashArrayType,
    +		Value: buff,
    +	})
    +	require.NoError(t, err)
    +
    +	msg := &mock.P2PMessageMock{DataField: data}
    +
    +	processErr := txRes.ProcessReceivedMessage(msg, connectedPeerId)
    +	require.ErrorIs(t, processErr, common.ErrTooManyItemsInBatch)
    +	assert.True(t, arg.Throttler.(*mock.ThrottlerStub).StartWasCalled)
    +	assert.True(t, arg.Throttler.(*mock.ThrottlerStub).EndWasCalled)
    +}
    +
    +// Regression GHSA-w342-mj6g-v9c4 defense-in-depth: oversized hashesBuff must
    +// be rejected before Unmarshal (slice-header amplification window).
    +func TestTxResolver_ProcessReceivedMessage_RejectsOversizedRawHashArrayBuff(t *testing.T) {
    +	t.Parallel()
    +
    +	marshalizer := &mock.MarshalizerMock{}
    +
    +	arg := createMockArgTxResolver()
    +	arg.Marshalizer = marshalizer
    +	txRes, err := resolvers.NewTxResolver(arg)
    +	require.NoError(t, err)
    +
    +	// Junk bytes (not a marshaled Batch): on vulnerable code Unmarshal would
    +	// either error or decode to empty; only the fix returns ErrTooManyItemsInBatch.
    +	hashesBuff := make([]byte, batch.MaxHashArrayBuffSize+1)
    +
    +	data, err := marshalizer.Marshal(&retriever.RequestData{
    +		Type:  retriever.RequestDataType_HashArrayType,
    +		Value: hashesBuff,
    +	})
    +	require.NoError(t, err)
    +
    +	msg := &mock.P2PMessageMock{DataField: data}
    +
    +	processErr := txRes.ProcessReceivedMessage(msg, connectedPeerId)
    +	require.ErrorIs(t, processErr, common.ErrBatchWireTooLarge)
    +	thr := arg.Throttler.(*mock.ThrottlerStub)
    +	assert.Equal(t, int32(1), thr.StartProcessingCount())
    +	assert.Equal(t, int32(1), thr.EndProcessingCount(),
    +		"the wire-size rejection path must release the throttler slot exactly once")
    +}
    +
    +// Regression GHSA-w342-mj6g-v9c4: real amplification pattern. Payload is `0x0a 0x00` × N —
    +// proto3 field-1 LEN tag + length-0 varint = one empty `repeated bytes` entry per pair.
    +// Pre-check must reject on byte length before Unmarshal allocates N empty []byte headers.
    +func TestTxResolver_ProcessReceivedMessage_RejectsRealAmplificationPattern(t *testing.T) {
    +	t.Parallel()
    +
    +	// Proto marshalizer so the pattern decodes (not just a size rejection) if pre-check is bypassed.
    +	protoMarsh := marshal.NewProtoMarshalizer()
    +
    +	arg := createMockArgTxResolver()
    +	arg.Marshalizer = protoMarsh
    +	txRes, err := resolvers.NewTxResolver(arg)
    +	require.NoError(t, err)
    +
    +	const numEmptyEntries = batch.MaxHashArrayBuffSize/2 + 1 // 2 B per entry → just over cap
    +	hashesBuff := bytes.Repeat([]byte{0x0a, 0x00}, numEmptyEntries)
    +
    +	data, err := protoMarsh.Marshal(&retriever.RequestData{
    +		Type:  retriever.RequestDataType_HashArrayType,
    +		Value: hashesBuff,
    +	})
    +	require.NoError(t, err)
    +
    +	msg := &mock.P2PMessageMock{DataField: data}
    +
    +	processErr := txRes.ProcessReceivedMessage(msg, connectedPeerId)
    +	require.ErrorIs(t, processErr, common.ErrBatchWireTooLarge,
    +		"wire-size pre-check must fire before proto Unmarshal allocates %d empty entries", numEmptyEntries)
    +	thr := arg.Throttler.(*mock.ThrottlerStub)
    +	assert.Equal(t, int32(1), thr.EndProcessingCount(),
    +		"throttler slot must be released on the pre-check rejection path")
    +}
    +
    +func TestTxResolver_ProcessReceivedMessage_RejectsHashArrayItemBomb_Compressed(t *testing.T) {
    +	t.Parallel()
    +
    +	marshalizer := &mock.MarshalizerMock{}
    +
    +	arg := createMockArgTxResolver()
    +	arg.Marshalizer = marshalizer
    +	txRes, err := resolvers.NewTxResolver(arg)
    +	require.NoError(t, err)
    +
    +	bomb := &batch.Batch{
    +		Algo: batch.CType_GZip,
    +		Data: make([][]byte, batch.MaxItemsPerBatch+1),
    +	}
    +	require.NoError(t, bomb.Compress(marshalizer))
    +
    +	buff, err := marshalizer.Marshal(bomb)
    +	require.NoError(t, err)
    +
    +	data, err := marshalizer.Marshal(&retriever.RequestData{
    +		Type:  retriever.RequestDataType_HashArrayType,
    +		Value: buff,
    +	})
    +	require.NoError(t, err)
    +
    +	msg := &mock.P2PMessageMock{DataField: data}
    +
    +	processErr := txRes.ProcessReceivedMessage(msg, connectedPeerId)
    +	require.ErrorIs(t, processErr, common.ErrTooManyItemsInBatch)
    +	assert.True(t, arg.Throttler.(*mock.ThrottlerStub).StartWasCalled)
    +	assert.True(t, arg.Throttler.(*mock.ThrottlerStub).EndWasCalled)
    +}
    +
    +// Regression: a panic in any dependency must be recovered by
    +// ProcessReceivedMessage rather than killing the message-handling goroutine.
    +func TestTxResolver_ProcessReceivedMessage_RecoversFromPanic(t *testing.T) {
    +	t.Parallel()
    +
    +	arg := createMockArgTxResolver()
    +	arg.Marshalizer = &mock.MarshalizerStub{
    +		UnmarshalCalled: func(obj interface{}, buff []byte) error {
    +			panic("injected panic during Unmarshal")
    +		},
    +	}
    +
    +	txRes, err := resolvers.NewTxResolver(arg)
    +	require.NoError(t, err)
    +
    +	msg := &mock.P2PMessageMock{DataField: []byte("anything")}
    +
    +	processErr := txRes.ProcessReceivedMessage(msg, connectedPeerId)
    +	require.Error(t, processErr)
    +	require.Truef(t, errors.Is(processErr, common.ErrProcessReceivedMessagePanicked),
    +		"expected ErrProcessReceivedMessagePanicked, got %v", processErr)
    +	assert.True(t, arg.Throttler.(*mock.ThrottlerStub).StartWasCalled)
    +	assert.True(t, arg.Throttler.(*mock.ThrottlerStub).EndWasCalled)
    +}
    
  • data/retriever/resolvers/trieNodeResolver.go+23 2 modified
    @@ -1,6 +1,9 @@
     package resolvers
     
     import (
    +	"fmt"
    +	"runtime/debug"
    +
     	"github.com/klever-io/klever-go/common"
     	"github.com/klever-io/klever-go/core"
     	"github.com/klever-io/klever-go/data/batch"
    @@ -63,8 +66,17 @@ func NewTrieNodeResolver(arg ArgTrieNodeResolver) (*TrieNodeResolver, error) {
     
     // ProcessReceivedMessage will be the callback func from the p2p.Messenger and will be called each time a new message was received
     // (for the topic this validator was registered to, usually a request topic)
    -func (tnRes *TrieNodeResolver) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error {
    -	err := tnRes.canProcessMessage(message, fromConnectedPeer)
    +func (tnRes *TrieNodeResolver) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID) (err error) {
    +	defer func() {
    +		if r := recover(); r != nil {
    +			log.Error("TrieNodeResolver.ProcessReceivedMessage panicked",
    +				"panic", r,
    +				"stack", string(debug.Stack()))
    +			err = fmt.Errorf("%w: %v", common.ErrProcessReceivedMessagePanicked, r)
    +		}
    +	}()
    +
    +	err = tnRes.canProcessMessage(message, fromConnectedPeer)
     	if err != nil {
     		return err
     	}
    @@ -88,11 +100,20 @@ func (tnRes *TrieNodeResolver) ProcessReceivedMessage(message p2p.MessageP2P, fr
     }
     
     func (tnRes *TrieNodeResolver) resolveMultipleHashes(hashesBuff []byte, message p2p.MessageP2P) error {
    +	// Reject oversized hash-array payloads before Unmarshal — see MaxHashArrayBuffSize.
    +	// No blacklist (logical bound, antiflood gates abuse).
    +	if len(hashesBuff) > batch.MaxHashArrayBuffSize {
    +		return common.ErrBatchWireTooLarge
    +	}
     	b := batch.Batch{}
     	err := tnRes.marshalizer.Unmarshal(&b, hashesBuff)
     	if err != nil {
     		return err
     	}
    +	// Cap uncompressed batches; the compressed path is bounded inside b.Decompress.
    +	if len(b.Data) > batch.MaxItemsPerBatch {
    +		return common.ErrTooManyItemsInBatch
    +	}
     	if b.IsCompressed {
     		err = b.Decompress(tnRes.marshalizer)
     		if err != nil {
    
  • data/retriever/resolvers/trieNodeResolver_test.go+156 0 modified
    @@ -8,11 +8,14 @@ import (
     	"github.com/klever-io/klever-go/common"
     	"github.com/klever-io/klever-go/common/mock"
     	"github.com/klever-io/klever-go/core"
    +	"github.com/klever-io/klever-go/data/batch"
     	"github.com/klever-io/klever-go/data/retriever"
     	"github.com/klever-io/klever-go/data/retriever/resolvers"
     	"github.com/klever-io/klever-go/network/p2p"
     	"github.com/klever-io/klever-go/tools/check"
    +	"github.com/klever-io/klever-go/tools/marshal"
     	"github.com/stretchr/testify/assert"
    +	"github.com/stretchr/testify/require"
     )
     
     var fromConnectedPeer = core.PeerID("from connected peer")
    @@ -297,3 +300,156 @@ func TestTrieNodeResolver_SetAndGetNumPeersToQuery(t *testing.T) {
     	assert.Equal(t, expectedIntra, actualIntra)
     	assert.Equal(t, expectedCross, actualCross)
     }
    +
    +//------- regression: GHSA-w342-mj6g-v9c4
    +
    +func TestTrieNodeResolver_ProcessReceivedMessage_RejectsHashArrayItemBomb_Uncompressed(t *testing.T) {
    +	t.Parallel()
    +
    +	marshalizer := &mock.MarshalizerMock{}
    +
    +	arg := createMockArgTrieNodeResolver()
    +	arg.Marshalizer = marshalizer
    +	tnRes, err := resolvers.NewTrieNodeResolver(arg)
    +	require.NoError(t, err)
    +
    +	bomb := &batch.Batch{Data: make([][]byte, batch.MaxItemsPerBatch+1)}
    +	buff, err := marshalizer.Marshal(bomb)
    +	require.NoError(t, err)
    +
    +	data, err := marshalizer.Marshal(&retriever.RequestData{
    +		Type:  retriever.RequestDataType_HashArrayType,
    +		Value: buff,
    +	})
    +	require.NoError(t, err)
    +
    +	msg := &mock.P2PMessageMock{DataField: data}
    +
    +	processErr := tnRes.ProcessReceivedMessage(msg, fromConnectedPeer)
    +	require.ErrorIs(t, processErr, common.ErrTooManyItemsInBatch)
    +	assert.True(t, arg.Throttler.(*mock.ThrottlerStub).StartWasCalled)
    +	assert.True(t, arg.Throttler.(*mock.ThrottlerStub).EndWasCalled)
    +}
    +
    +// Regression GHSA-w342-mj6g-v9c4 defense-in-depth: oversized hashesBuff must
    +// be rejected before Unmarshal (slice-header amplification window).
    +func TestTrieNodeResolver_ProcessReceivedMessage_RejectsOversizedRawHashArrayBuff(t *testing.T) {
    +	t.Parallel()
    +
    +	marshalizer := &mock.MarshalizerMock{}
    +
    +	arg := createMockArgTrieNodeResolver()
    +	arg.Marshalizer = marshalizer
    +	tnRes, err := resolvers.NewTrieNodeResolver(arg)
    +	require.NoError(t, err)
    +
    +	// Junk bytes (not a marshaled Batch): on vulnerable code Unmarshal would
    +	// either error or decode to empty; only the fix returns ErrTooManyItemsInBatch.
    +	hashesBuff := make([]byte, batch.MaxHashArrayBuffSize+1)
    +
    +	data, err := marshalizer.Marshal(&retriever.RequestData{
    +		Type:  retriever.RequestDataType_HashArrayType,
    +		Value: hashesBuff,
    +	})
    +	require.NoError(t, err)
    +
    +	msg := &mock.P2PMessageMock{DataField: data}
    +
    +	processErr := tnRes.ProcessReceivedMessage(msg, fromConnectedPeer)
    +	require.ErrorIs(t, processErr, common.ErrBatchWireTooLarge)
    +	thr := arg.Throttler.(*mock.ThrottlerStub)
    +	assert.Equal(t, int32(1), thr.StartProcessingCount())
    +	assert.Equal(t, int32(1), thr.EndProcessingCount(),
    +		"the wire-size rejection path must release the throttler slot exactly once")
    +}
    +
    +// Regression GHSA-w342-mj6g-v9c4: real amplification pattern. Payload is `0x0a 0x00` × N —
    +// proto3 field-1 LEN tag + length-0 varint = one empty `repeated bytes` entry per pair.
    +// Pre-check must reject on byte length before Unmarshal allocates N empty []byte headers.
    +func TestTrieNodeResolver_ProcessReceivedMessage_RejectsRealAmplificationPattern(t *testing.T) {
    +	t.Parallel()
    +
    +	// Proto marshalizer so the pattern decodes (not just a size rejection) if pre-check is bypassed.
    +	protoMarsh := marshal.NewProtoMarshalizer()
    +
    +	arg := createMockArgTrieNodeResolver()
    +	arg.Marshalizer = protoMarsh
    +	tnRes, err := resolvers.NewTrieNodeResolver(arg)
    +	require.NoError(t, err)
    +
    +	const numEmptyEntries = batch.MaxHashArrayBuffSize/2 + 1
    +	hashesBuff := bytes.Repeat([]byte{0x0a, 0x00}, numEmptyEntries)
    +
    +	data, err := protoMarsh.Marshal(&retriever.RequestData{
    +		Type:  retriever.RequestDataType_HashArrayType,
    +		Value: hashesBuff,
    +	})
    +	require.NoError(t, err)
    +
    +	msg := &mock.P2PMessageMock{DataField: data}
    +
    +	processErr := tnRes.ProcessReceivedMessage(msg, fromConnectedPeer)
    +	require.ErrorIs(t, processErr, common.ErrBatchWireTooLarge,
    +		"wire-size pre-check must fire before proto Unmarshal allocates %d empty entries", numEmptyEntries)
    +	thr := arg.Throttler.(*mock.ThrottlerStub)
    +	assert.Equal(t, int32(1), thr.EndProcessingCount(),
    +		"throttler slot must be released on the pre-check rejection path")
    +}
    +
    +func TestTrieNodeResolver_ProcessReceivedMessage_RejectsHashArrayItemBomb_Compressed(t *testing.T) {
    +	t.Parallel()
    +
    +	marshalizer := &mock.MarshalizerMock{}
    +
    +	arg := createMockArgTrieNodeResolver()
    +	arg.Marshalizer = marshalizer
    +	tnRes, err := resolvers.NewTrieNodeResolver(arg)
    +	require.NoError(t, err)
    +
    +	bomb := &batch.Batch{
    +		Algo: batch.CType_GZip,
    +		Data: make([][]byte, batch.MaxItemsPerBatch+1),
    +	}
    +	require.NoError(t, bomb.Compress(marshalizer))
    +
    +	buff, err := marshalizer.Marshal(bomb)
    +	require.NoError(t, err)
    +
    +	data, err := marshalizer.Marshal(&retriever.RequestData{
    +		Type:  retriever.RequestDataType_HashArrayType,
    +		Value: buff,
    +	})
    +	require.NoError(t, err)
    +
    +	msg := &mock.P2PMessageMock{DataField: data}
    +
    +	processErr := tnRes.ProcessReceivedMessage(msg, fromConnectedPeer)
    +	require.ErrorIs(t, processErr, common.ErrTooManyItemsInBatch)
    +	assert.True(t, arg.Throttler.(*mock.ThrottlerStub).StartWasCalled)
    +	assert.True(t, arg.Throttler.(*mock.ThrottlerStub).EndWasCalled)
    +}
    +
    +// Regression: a panic in any dependency must be recovered by
    +// ProcessReceivedMessage rather than killing the message-handling goroutine.
    +func TestTrieNodeResolver_ProcessReceivedMessage_RecoversFromPanic(t *testing.T) {
    +	t.Parallel()
    +
    +	arg := createMockArgTrieNodeResolver()
    +	arg.Marshalizer = &mock.MarshalizerStub{
    +		UnmarshalCalled: func(obj interface{}, buff []byte) error {
    +			panic("injected panic during Unmarshal")
    +		},
    +	}
    +
    +	tnRes, err := resolvers.NewTrieNodeResolver(arg)
    +	require.NoError(t, err)
    +
    +	msg := &mock.P2PMessageMock{DataField: []byte("anything")}
    +
    +	processErr := tnRes.ProcessReceivedMessage(msg, fromConnectedPeer)
    +	require.Error(t, processErr)
    +	require.Truef(t, errors.Is(processErr, common.ErrProcessReceivedMessagePanicked),
    +		"expected ErrProcessReceivedMessagePanicked, got %v", processErr)
    +	assert.True(t, arg.Throttler.(*mock.ThrottlerStub).StartWasCalled)
    +	assert.True(t, arg.Throttler.(*mock.ThrottlerStub).EndWasCalled)
    +}
    
  • data/syncer/kappAccountsSyncer.go+4 9 modified
    @@ -130,11 +130,11 @@ func (k *kappAccountsSyncer) syncAccountDataTries(rootHashes [][]byte, ssh data.
     
     func (k *kappAccountsSyncer) syncDataTrie(rootHash []byte, ssh data.SyncStatisticsHandler, ctx context.Context) error {
     	k.throttler.StartProcessing()
    +	defer k.throttler.EndProcessing()
     
     	k.syncerMutex.Lock()
     	if _, ok := k.dataTries[string(rootHash)]; ok {
     		k.syncerMutex.Unlock()
    -		k.throttler.EndProcessing()
     		return nil
     	}
     
    @@ -160,16 +160,11 @@ func (k *kappAccountsSyncer) syncDataTrie(rootHash []byte, ssh data.SyncStatisti
     		return err
     	}
     	k.trieSyncers[string(rootHash)] = trieSyncer
    +	// Released before the blocking StartSyncing — do NOT move to defer or
    +	// numConcurrentTrieSyncers parallelism collapses.
     	k.syncerMutex.Unlock()
     
    -	err = trieSyncer.StartSyncing(rootHash, ctx)
    -	if err != nil {
    -		return err
    -	}
    -
    -	k.throttler.EndProcessing()
    -
    -	return nil
    +	return trieSyncer.StartSyncing(rootHash, ctx)
     }
     
     func (k *kappAccountsSyncer) findAllAccountRootHashes(mainTrie data.Trie, ctx context.Context) ([][]byte, error) {
    
  • data/syncer/syncDataTrieThrottler_test.go+190 0 added
    @@ -0,0 +1,190 @@
    +package syncer
    +
    +import (
    +	"testing"
    +	"time"
    +
    +	"github.com/klever-io/klever-go/common/mock"
    +	"github.com/klever-io/klever-go/core/throttler"
    +	"github.com/klever-io/klever-go/crypto/hashing/sha256"
    +	"github.com/klever-io/klever-go/data/trie"
    +	"github.com/klever-io/klever-go/data/trie/statistics"
    +	"github.com/klever-io/klever-go/storage"
    +	"github.com/klever-io/klever-go/storage/memorydb"
    +	"github.com/klever-io/klever-go/storage/storageUnit"
    +	"github.com/klever-io/klever-go/tools/marshal"
    +	"github.com/stretchr/testify/require"
    +)
    +
    +// Regression GHSA-fw38-pc54-jvx9 (KLC-2420): every error path out of
    +// syncDataTrie must release the throttler slot. Two timeouts on a throttler
    +// sized to 2 must leave CanProcess()==true; otherwise the parent fan-out
    +// in syncAccountDataTries spins on !CanProcess() forever.
    +
    +func makeSyncerStorageCacher(t *testing.T) (storage.Storer, storage.Cacher) {
    +	t.Helper()
    +
    +	cache, err := storageUnit.NewCache(storageUnit.CacheConfig{
    +		Type:        storageUnit.LRUCache,
    +		Capacity:    16,
    +		Shards:      1,
    +		SizeInBytes: 0,
    +	})
    +	require.NoError(t, err)
    +
    +	persist, err := memorydb.NewlruDB(1024)
    +	require.NoError(t, err)
    +
    +	unit, err := storageUnit.NewStorageUnit(cache, persist)
    +	require.NoError(t, err)
    +
    +	interceptedNodes, err := storageUnit.NewCache(storageUnit.CacheConfig{
    +		Type:        storageUnit.LRUCache,
    +		Capacity:    16,
    +		Shards:      1,
    +		SizeInBytes: 0,
    +	})
    +	require.NoError(t, err)
    +
    +	return unit, interceptedNodes
    +}
    +
    +func newTestUserAccountsSyncer(t *testing.T, max int32) (*userAccountsSyncer, *throttler.NumGoRoutinesThrottler) {
    +	t.Helper()
    +
    +	hasher := &sha256.Sha256{}
    +	marshalizer := marshal.NewProtoMarshalizer()
    +
    +	unit, interceptedNodes := makeSyncerStorageCacher(t)
    +	storageManager, err := trie.NewTrieStorageManagerWithoutPruning(unit)
    +	require.NoError(t, err)
    +
    +	thr, err := throttler.NewNumGoRoutinesThrottler(max)
    +	require.NoError(t, err)
    +
    +	args := ArgsNewUserAccountsSyncer{
    +		ArgsNewBaseAccountsSyncer: ArgsNewBaseAccountsSyncer{
    +			Hasher:                    hasher,
    +			Marshalizer:               marshalizer,
    +			TrieStorageManager:        storageManager,
    +			RequestHandler:            &mock.RequestHandlerStub{},
    +			Timeout:                   time.Second,
    +			Cacher:                    interceptedNodes,
    +			MaxTrieLevelInMemory:      5,
    +			MaxHardCapForMissingNodes: 100,
    +		},
    +		ShardId:   0,
    +		Throttler: thr,
    +	}
    +
    +	syncer, err := NewUserAccountsSyncer(args)
    +	require.NoError(t, err)
    +
    +	return syncer, thr
    +}
    +
    +func newTestKappAccountsSyncer(t *testing.T, max int32) (*kappAccountsSyncer, *throttler.NumGoRoutinesThrottler) {
    +	t.Helper()
    +
    +	hasher := &sha256.Sha256{}
    +	marshalizer := marshal.NewProtoMarshalizer()
    +
    +	unit, interceptedNodes := makeSyncerStorageCacher(t)
    +	storageManager, err := trie.NewTrieStorageManagerWithoutPruning(unit)
    +	require.NoError(t, err)
    +
    +	thr, err := throttler.NewNumGoRoutinesThrottler(max)
    +	require.NoError(t, err)
    +
    +	args := ArgsNewKappAccountsSyncer{
    +		ArgsNewBaseAccountsSyncer: ArgsNewBaseAccountsSyncer{
    +			Hasher:                    hasher,
    +			Marshalizer:               marshalizer,
    +			TrieStorageManager:        storageManager,
    +			RequestHandler:            &mock.RequestHandlerStub{},
    +			Timeout:                   time.Second,
    +			Cacher:                    interceptedNodes,
    +			MaxTrieLevelInMemory:      5,
    +			MaxHardCapForMissingNodes: 100,
    +		},
    +		Throttler: thr,
    +	}
    +
    +	syncer, err := NewKappAccountsSyncer(args)
    +	require.NoError(t, err)
    +
    +	return syncer, thr
    +}
    +
    +func TestUserAccountsSyncer_syncDataTrie_releasesThrottlerOnTimeout(t *testing.T) {
    +	t.Parallel()
    +
    +	syncer, thr := newTestUserAccountsSyncer(t, 2)
    +
    +	ctx := t.Context()
    +
    +	ssh := statistics.NewTrieSyncStatistics()
    +
    +	rootHashes := [][]byte{
    +		[]byte("11111111111111111111111111111111"),
    +		[]byte("22222222222222222222222222222222"),
    +	}
    +
    +	for _, rh := range rootHashes {
    +		err := syncer.syncDataTrie(rh, ssh, ctx)
    +		require.ErrorIs(t, err, trie.ErrTimeIsOut,
    +			"empty storage and no peer responses must surface trie.ErrTimeIsOut")
    +	}
    +
    +	require.True(t, thr.CanProcess(),
    +		"throttler slot must be released on every error path; CanProcess()==false here means the slot leaked (GHSA-fw38-pc54-jvx9)")
    +}
    +
    +func TestKappAccountsSyncer_syncDataTrie_releasesThrottlerOnTimeout(t *testing.T) {
    +	t.Parallel()
    +
    +	syncer, thr := newTestKappAccountsSyncer(t, 2)
    +
    +	ctx := t.Context()
    +
    +	ssh := statistics.NewTrieSyncStatistics()
    +
    +	rootHashes := [][]byte{
    +		[]byte("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
    +		[]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"),
    +	}
    +
    +	for _, rh := range rootHashes {
    +		err := syncer.syncDataTrie(rh, ssh, ctx)
    +		require.ErrorIs(t, err, trie.ErrTimeIsOut,
    +			"empty storage and no peer responses must surface trie.ErrTimeIsOut")
    +	}
    +
    +	require.True(t, thr.CanProcess(),
    +		"throttler slot must be released on every error path; CanProcess()==false here means the slot leaked (GHSA-fw38-pc54-jvx9)")
    +}
    +
    +func TestUserAccountsSyncer_syncDataTrie_duplicateRootStillReleasesThrottler(t *testing.T) {
    +	t.Parallel()
    +
    +	syncer, thr := newTestUserAccountsSyncer(t, 2)
    +
    +	ctx := t.Context()
    +
    +	ssh := statistics.NewTrieSyncStatistics()
    +	rootHash := []byte("ccccccccccccccccccccccccccccccccc")
    +
    +	// Pre-seed the dataTries map so the duplicate-root early return is exercised
    +	// without going through trieSyncer.StartSyncing().
    +	syncer.syncerMutex.Lock()
    +	syncer.dataTries[string(rootHash)] = nil
    +	syncer.syncerMutex.Unlock()
    +
    +	for range 3 {
    +		err := syncer.syncDataTrie(rootHash, ssh, ctx)
    +		require.NoError(t, err)
    +	}
    +
    +	require.True(t, thr.CanProcess(),
    +		"duplicate-root early return must release the throttler slot")
    +}
    
  • data/syncer/userAccountsSyncer.go+4 9 modified
    @@ -136,11 +136,11 @@ func (u *userAccountsSyncer) syncAccountDataTries(rootHashes [][]byte, ssh data.
     
     func (u *userAccountsSyncer) syncDataTrie(rootHash []byte, ssh data.SyncStatisticsHandler, ctx context.Context) error {
     	u.throttler.StartProcessing()
    +	defer u.throttler.EndProcessing()
     
     	u.syncerMutex.Lock()
     	if _, ok := u.dataTries[string(rootHash)]; ok {
     		u.syncerMutex.Unlock()
    -		u.throttler.EndProcessing()
     		return nil
     	}
     
    @@ -166,16 +166,11 @@ func (u *userAccountsSyncer) syncDataTrie(rootHash []byte, ssh data.SyncStatisti
     		return err
     	}
     	u.trieSyncers[string(rootHash)] = trieSyncer
    +	// Released before the blocking StartSyncing — do NOT move to defer or
    +	// numConcurrentTrieSyncers parallelism collapses.
     	u.syncerMutex.Unlock()
     
    -	err = trieSyncer.StartSyncing(rootHash, ctx)
    -	if err != nil {
    -		return err
    -	}
    -
    -	u.throttler.EndProcessing()
    -
    -	return nil
    +	return trieSyncer.StartSyncing(rootHash, ctx)
     }
     
     func (u *userAccountsSyncer) findAllAccountRootHashes(mainTrie data.Trie, ctx context.Context) ([][]byte, error) {
    
036eb81c63f9

[KLC-2405] expose NTP clock offset + last-sync timestamp as metrics (#60)

https://github.com/klever-io/klever-goFernando SobreiraMay 22, 2026Fixed in 1.7.18via ghsa-release-walk
10 files changed · +574 21
  • cmd/node/metrics/clockMetrics.go+79 0 added
    @@ -0,0 +1,79 @@
    +package metrics
    +
    +import (
    +	"errors"
    +	"fmt"
    +	"io"
    +	"time"
    +
    +	"github.com/klever-io/klever-go/core"
    +	"github.com/klever-io/klever-go/core/appStatusPolling"
    +	"github.com/klever-io/klever-go/ntp"
    +	"github.com/klever-io/klever-go/tools/check"
    +)
    +
    +// StartClockMetricsPolling registers a polling function that publishes the NTP
    +// syncer's current clock offset and the timestamp of the last successful sync.
    +// These two signals cover every blockchain-operator alarm:
    +//
    +//   - abs(klv_clock_offset_ns) > tolerance         → drift will hurt consensus
    +//   - time() - klv_clock_last_sync_timestamp > 2×sync_period → NTP path broken
    +//
    +// Returned io.Closer stops the polling goroutine; nil if validation fails.
    +func StartClockMetricsPolling(
    +	ash core.AppStatusHandler,
    +	syncTimer ntp.SyncTimer,
    +	pollingInterval time.Duration,
    +) (io.Closer, error) {
    +	if check.IfNil(ash) {
    +		return nil, errors.New("nil AppStatusHandler")
    +	}
    +	if check.IfNil(syncTimer) {
    +		return nil, errors.New("nil SyncTimer")
    +	}
    +
    +	appStatusPollingHandler, err := appStatusPolling.NewAppStatusPolling(ash, pollingInterval)
    +	if err != nil {
    +		return nil, fmt.Errorf("cannot init AppStatusPolling for clock metrics: %w", err)
    +	}
    +
    +	pollingFunc := buildClockMetricsPollingFunc(syncTimer)
    +	err = appStatusPollingHandler.RegisterPollingFunc(pollingFunc)
    +	if err != nil {
    +		return nil, fmt.Errorf("cannot register clock metrics polling function: %w", err)
    +	}
    +
    +	// Prime before the recurring poll so /node/status returns the current
    +	// offset on request 1 (AppStatusPolling.Poll sleeps before its first tick).
    +	pollingFunc(ash)
    +
    +	appStatusPollingHandler.Poll()
    +	return appStatusPollingHandler, nil
    +}
    +
    +func buildClockMetricsPollingFunc(syncTimer ntp.SyncTimer) func(core.AppStatusHandler) {
    +	return func(appStatusHandler core.AppStatusHandler) {
    +		// Read offset and last-sync timestamp under a single RLock so a scrape
    +		// can't observe a fresh offset paired with a stale timestamp (or vice
    +		// versa) if a sync lands between two separate getter calls.
    +		offset, lastSync := syncTimer.ClockSnapshot()
    +
    +		// ClockOffset is signed: negative means OS clock is fast relative to NTP
    +		// servers, positive means it is slow. SetInt64Value (not SetUInt64Value)
    +		// is required because uint64 can't represent negative values.
    +		appStatusHandler.SetInt64Value(core.MetricClockOffsetNs, offset.Nanoseconds())
    +
    +		// Zero before the first successful sync — publish 0 explicitly so
    +		// `time() - value > threshold` alarms only fire once a sync has
    +		// actually happened. The unix > 0 guard defends against a pathological
    +		// pre-1970 RTC: uint64(negative-int64) wraps to a huge value that
    +		// would silently keep "stale-sync" alarms quiet forever.
    +		var lastSyncUnix uint64
    +		if !lastSync.IsZero() {
    +			if unix := lastSync.Unix(); unix > 0 {
    +				lastSyncUnix = uint64(unix)
    +			}
    +		}
    +		appStatusHandler.SetUInt64Value(core.MetricClockLastSyncTimestamp, lastSyncUnix)
    +	}
    +}
    
  • cmd/node/metrics/clockMetrics_test.go+284 0 added
    @@ -0,0 +1,284 @@
    +package metrics
    +
    +import (
    +	"fmt"
    +	"strings"
    +	"sync"
    +	"testing"
    +	"time"
    +
    +	"github.com/klever-io/klever-go/common/mock"
    +	"github.com/klever-io/klever-go/core"
    +	consensusMock "github.com/klever-io/klever-go/core/consensus/mock"
    +	"github.com/klever-io/klever-go/statusHandler"
    +	"github.com/stretchr/testify/assert"
    +	"github.com/stretchr/testify/require"
    +)
    +
    +// captureHandler records SetInt64Value and SetUInt64Value calls so tests can
    +// assert which keys the polling function published and with what values.
    +type captureHandler struct {
    +	mu  sync.Mutex
    +	i64 map[string]int64
    +	u64 map[string]uint64
    +}
    +
    +func newCaptureHandler() (*captureHandler, *mock.AppStatusHandlerStub) {
    +	cap := &captureHandler{
    +		i64: map[string]int64{},
    +		u64: map[string]uint64{},
    +	}
    +	stub := &mock.AppStatusHandlerStub{
    +		SetInt64ValueHandler: func(key string, value int64) {
    +			cap.mu.Lock()
    +			cap.i64[key] = value
    +			cap.mu.Unlock()
    +		},
    +		SetUInt64ValueHandler: func(key string, value uint64) {
    +			cap.mu.Lock()
    +			cap.u64[key] = value
    +			cap.mu.Unlock()
    +		},
    +	}
    +	return cap, stub
    +}
    +
    +func TestStartClockMetricsPolling_NilAppStatusHandler(t *testing.T) {
    +	syncTimer := &consensusMock.SyncTimerMock{}
    +
    +	closer, err := StartClockMetricsPolling(nil, syncTimer, time.Second)
    +
    +	assert.Error(t, err)
    +	assert.Nil(t, closer)
    +}
    +
    +func TestStartClockMetricsPolling_NilSyncTimer(t *testing.T) {
    +	_, stub := newCaptureHandler()
    +
    +	closer, err := StartClockMetricsPolling(stub, nil, time.Second)
    +
    +	assert.Error(t, err)
    +	assert.Nil(t, closer)
    +}
    +
    +func TestStartClockMetricsPolling_InvalidPollingInterval(t *testing.T) {
    +	_, stub := newCaptureHandler()
    +	syncTimer := &consensusMock.SyncTimerMock{}
    +
    +	closer, err := StartClockMetricsPolling(stub, syncTimer, 0)
    +
    +	assert.Error(t, err)
    +	assert.Nil(t, closer)
    +}
    +
    +func TestStartClockMetricsPolling_PrimesMetricsBeforeFirstTick(t *testing.T) {
    +	// Verify the polling function runs once synchronously so /node/status
    +	// returns real values on the first scrape, not zero.
    +	const offset = time.Duration(-38) * time.Millisecond
    +	syncTs := time.Unix(1_700_000_000, 0)
    +
    +	cap, stub := newCaptureHandler()
    +	syncTimer := &consensusMock.SyncTimerMock{
    +		ClockOffsetCalled:       func() time.Duration { return offset },
    +		LastSyncTimestampCalled: func() time.Time { return syncTs },
    +	}
    +
    +	closer, err := StartClockMetricsPolling(stub, syncTimer, time.Hour)
    +	require.NoError(t, err)
    +	require.NotNil(t, closer)
    +	defer func() { _ = closer.Close() }()
    +
    +	cap.mu.Lock()
    +	defer cap.mu.Unlock()
    +	assert.Equal(t, offset.Nanoseconds(), cap.i64[core.MetricClockOffsetNs])
    +	assert.Equal(t, uint64(syncTs.Unix()), cap.u64[core.MetricClockLastSyncTimestamp])
    +}
    +
    +func TestBuildClockMetricsPollingFunc_NegativeOffsetPreservesSign(t *testing.T) {
    +	// The "OS clock fast" case (operator-relevant: a fast clock makes the node
    +	// emit timestamps in the future relative to peers, breaking quorum). Sign
    +	// must survive the SetInt64Value round-trip.
    +	const offset = time.Duration(-123_456_789) * time.Nanosecond
    +
    +	cap, stub := newCaptureHandler()
    +	syncTimer := &consensusMock.SyncTimerMock{
    +		ClockOffsetCalled: func() time.Duration { return offset },
    +	}
    +
    +	pollingFunc := buildClockMetricsPollingFunc(syncTimer)
    +	pollingFunc(stub)
    +
    +	cap.mu.Lock()
    +	defer cap.mu.Unlock()
    +	assert.Equal(t, offset.Nanoseconds(), cap.i64[core.MetricClockOffsetNs])
    +	assert.Less(t, cap.i64[core.MetricClockOffsetNs], int64(0), "negative offset must remain negative")
    +}
    +
    +func TestBuildClockMetricsPollingFunc_PositiveOffset(t *testing.T) {
    +	const offset = 42 * time.Millisecond
    +
    +	cap, stub := newCaptureHandler()
    +	syncTimer := &consensusMock.SyncTimerMock{
    +		ClockOffsetCalled: func() time.Duration { return offset },
    +	}
    +
    +	pollingFunc := buildClockMetricsPollingFunc(syncTimer)
    +	pollingFunc(stub)
    +
    +	cap.mu.Lock()
    +	defer cap.mu.Unlock()
    +	assert.Equal(t, offset.Nanoseconds(), cap.i64[core.MetricClockOffsetNs])
    +}
    +
    +func TestBuildClockMetricsPollingFunc_ZeroTimestampBeforeFirstSync(t *testing.T) {
    +	// A node that has never successfully synced (e.g. NTP timeouts at boot)
    +	// must publish timestamp=0, not a Unix-epoch translation of the zero time.
    +	// Operators alarm on `time() - klv_clock_last_sync_timestamp > threshold`;
    +	// leaking time.Time{}.Unix() (≈ -6e10) would make the alarm fire spuriously
    +	// or never fire at all depending on the operator's filter expression.
    +	cap, stub := newCaptureHandler()
    +	syncTimer := &consensusMock.SyncTimerMock{
    +		// LastSyncTimestampCalled left nil → returns time.Time{}
    +	}
    +
    +	pollingFunc := buildClockMetricsPollingFunc(syncTimer)
    +	pollingFunc(stub)
    +
    +	cap.mu.Lock()
    +	defer cap.mu.Unlock()
    +	assert.Equal(t, int64(0), cap.i64[core.MetricClockOffsetNs])
    +	assert.Equal(t, uint64(0), cap.u64[core.MetricClockLastSyncTimestamp],
    +		"zero time.Time must translate to 0, not the Unix epoch of the zero time")
    +}
    +
    +func TestBuildClockMetricsPollingFunc_LastSyncTimestampPublished(t *testing.T) {
    +	syncTs := time.Unix(1_700_000_000, 0)
    +
    +	cap, stub := newCaptureHandler()
    +	syncTimer := &consensusMock.SyncTimerMock{
    +		LastSyncTimestampCalled: func() time.Time { return syncTs },
    +	}
    +
    +	pollingFunc := buildClockMetricsPollingFunc(syncTimer)
    +	pollingFunc(stub)
    +
    +	cap.mu.Lock()
    +	defer cap.mu.Unlock()
    +	assert.Equal(t, uint64(syncTs.Unix()), cap.u64[core.MetricClockLastSyncTimestamp])
    +}
    +
    +func TestBuildClockMetricsPollingFunc_NegativeUnixGuard(t *testing.T) {
    +	// Pathological case: a non-zero timestamp whose Unix() is negative (pre-1970
    +	// RTC after a successful sync). Without the guard, uint64(negative-int64)
    +	// wraps to ~1.8e19, which would silently bypass any "stale sync" alarm
    +	// expressed as `time() - value > threshold`. Must publish 0 instead.
    +	preEpoch := time.Date(1969, 6, 1, 0, 0, 0, 0, time.UTC)
    +	require.True(t, preEpoch.Unix() < 0, "test setup: timestamp must precede 1970")
    +
    +	cap, stub := newCaptureHandler()
    +	syncTimer := &consensusMock.SyncTimerMock{
    +		LastSyncTimestampCalled: func() time.Time { return preEpoch },
    +	}
    +
    +	pollingFunc := buildClockMetricsPollingFunc(syncTimer)
    +	pollingFunc(stub)
    +
    +	cap.mu.Lock()
    +	defer cap.mu.Unlock()
    +	assert.Equal(t, uint64(0), cap.u64[core.MetricClockLastSyncTimestamp],
    +		"negative Unix seconds must publish 0, not the uint64 wrap-around")
    +}
    +
    +func TestBuildClockMetricsPollingFunc_UsesAtomicSnapshot(t *testing.T) {
    +	// Verify the polling func reads (offset, lastSync) via the single-RLock
    +	// snapshot, not as two independent getter calls. Forcing the legacy getters
    +	// to fail the assertion proves the snapshot path is what publishes the metrics.
    +	const offset = 1234 * time.Nanosecond
    +	syncTs := time.Unix(1_700_000_000, 0)
    +
    +	cap, stub := newCaptureHandler()
    +	syncTimer := &consensusMock.SyncTimerMock{
    +		ClockOffsetCalled: func() time.Duration {
    +			t.Errorf("ClockOffset() must not be called — polling must use ClockSnapshot()")
    +			return 0
    +		},
    +		LastSyncTimestampCalled: func() time.Time {
    +			t.Errorf("LastSyncTimestamp() must not be called — polling must use ClockSnapshot()")
    +			return time.Time{}
    +		},
    +		ClockSnapshotCalled: func() (time.Duration, time.Time) {
    +			return offset, syncTs
    +		},
    +	}
    +
    +	pollingFunc := buildClockMetricsPollingFunc(syncTimer)
    +	pollingFunc(stub)
    +
    +	cap.mu.Lock()
    +	defer cap.mu.Unlock()
    +	assert.Equal(t, offset.Nanoseconds(), cap.i64[core.MetricClockOffsetNs])
    +	assert.Equal(t, uint64(syncTs.Unix()), cap.u64[core.MetricClockLastSyncTimestamp])
    +}
    +
    +// TestClockMetricsAppearInPrometheusOutput is an end-to-end check that
    +// exercises the real statusHandler + clockMetrics polling combination,
    +// ensuring both metrics appear in the /node/metrics scrape output with the
    +// sign of klv_clock_offset_ns surviving the int64 → Prometheus number
    +// formatting.
    +func TestClockMetricsAppearInPrometheusOutput(t *testing.T) {
    +	const offset = time.Duration(-38) * time.Millisecond
    +	syncTs := time.Unix(1_700_000_000, 0)
    +
    +	sm := statusHandler.NewStatusMetrics()
    +	sm.SetStringValue(core.MetricChainID, "testnet")
    +
    +	syncTimer := &consensusMock.SyncTimerMock{
    +		ClockOffsetCalled:       func() time.Duration { return offset },
    +		LastSyncTimestampCalled: func() time.Time { return syncTs },
    +	}
    +
    +	// Long interval so only the synchronous prime call runs during the assert;
    +	// avoids racing against the background poller.
    +	closer, err := StartClockMetricsPolling(sm, syncTimer, time.Hour)
    +	require.NoError(t, err)
    +	defer func() { _ = closer.Close() }()
    +
    +	prom := sm.StatusMetricsWithoutP2PPrometheusString()
    +
    +	assert.True(t, strings.Contains(prom, core.MetricClockOffsetNs),
    +		"missing %s in: %s", core.MetricClockOffsetNs, prom)
    +	assert.True(t, strings.Contains(prom, core.MetricClockLastSyncTimestamp),
    +		"missing %s in: %s", core.MetricClockLastSyncTimestamp, prom)
    +
    +	expectedOffsetLine := fmt.Sprintf(`%s{chainID="testnet"} %d`,
    +		core.MetricClockOffsetNs, offset.Nanoseconds())
    +	assert.Contains(t, prom, expectedOffsetLine,
    +		"negative offset must appear with sign preserved")
    +
    +	expectedTimestampLine := fmt.Sprintf(`%s{chainID="testnet"} %d`,
    +		core.MetricClockLastSyncTimestamp, syncTs.Unix())
    +	assert.Contains(t, prom, expectedTimestampLine)
    +}
    +
    +// TestClockMetricsAppearInStatusMap checks that both metrics land in the JSON
    +// map returned by /node/status (used by dashboards/operator tooling that read
    +// the structured map rather than the scrape format).
    +func TestClockMetricsAppearInStatusMap(t *testing.T) {
    +	const offset = 42 * time.Millisecond
    +	syncTs := time.Unix(1_700_000_000, 0)
    +
    +	sm := statusHandler.NewStatusMetrics()
    +	syncTimer := &consensusMock.SyncTimerMock{
    +		ClockOffsetCalled:       func() time.Duration { return offset },
    +		LastSyncTimestampCalled: func() time.Time { return syncTs },
    +	}
    +
    +	closer, err := StartClockMetricsPolling(sm, syncTimer, time.Hour)
    +	require.NoError(t, err)
    +	defer func() { _ = closer.Close() }()
    +
    +	statusMap := sm.StatusMetricsMapWithoutP2P()
    +
    +	assert.Equal(t, offset.Nanoseconds(), statusMap[core.MetricClockOffsetNs])
    +	assert.Equal(t, uint64(syncTs.Unix()), statusMap[core.MetricClockLastSyncTimestamp])
    +}
    
  • cmd/node/startup.go+6 0 modified
    @@ -988,12 +988,18 @@ func startNode(ctx *cli.Context, log logger.Logger, version string) error {
     		return err
     	}
     
    +	clockMetricsCloser, err := metrics.StartClockMetricsPolling(coreComponents.StatusHandler, syncer, statusPollingInterval)
    +	if err != nil {
    +		return err
    +	}
    +
     	// Background goroutines that must be drained before component teardown.
     	// Order within the slice doesn't matter; they're independent of each other.
     	backgroundClosers := []io.Closer{
     		statusPollingCloser,
     		machineStatsCloser,
     		nodeMetricsCloser,
    +		clockMetricsCloser,
     		currentNode.GetPeerHonestyHandler(),
     	}
     
    
  • config/node/config.yaml+1 1 modified
    @@ -55,7 +55,7 @@ ntp:
         - time.windows.com
         - time.apple.com
       port: 123
    -  timeoutMilliseconds: 100
    +  timeoutMilliseconds: 500
       syncPeriodSeconds: 3600
       version: 0
     
    
  • core/consensus/mock/syncTimerMock.go+25 2 modified
    @@ -6,8 +6,10 @@ import (
     
     // SyncTimerMock mocks the implementation for a SyncTimer
     type SyncTimerMock struct {
    -	ClockOffsetCalled func() time.Duration
    -	CurrentTimeCalled func() time.Time
    +	ClockOffsetCalled       func() time.Duration
    +	LastSyncTimestampCalled func() time.Time
    +	ClockSnapshotCalled     func() (time.Duration, time.Time)
    +	CurrentTimeCalled       func() time.Time
     }
     
     // StartSyncingTime method does the time synchronization at every syncPeriod time elapsed. This should be started as a go routine
    @@ -24,6 +26,27 @@ func (stm *SyncTimerMock) ClockOffset() time.Duration {
     	return time.Duration(0)
     }
     
    +// LastSyncTimestamp returns the configured last sync timestamp or the zero time
    +func (stm *SyncTimerMock) LastSyncTimestamp() time.Time {
    +	if stm.LastSyncTimestampCalled != nil {
    +		return stm.LastSyncTimestampCalled()
    +	}
    +
    +	return time.Time{}
    +}
    +
    +// ClockSnapshot returns the configured (offset, lastSync) pair under a single
    +// callback so tests can assert atomic-pair semantics. Falls back to the
    +// individual getters when no explicit snapshot stub is set, which preserves
    +// the existing behavior for tests that only configure one or both fields.
    +func (stm *SyncTimerMock) ClockSnapshot() (time.Duration, time.Time) {
    +	if stm.ClockSnapshotCalled != nil {
    +		return stm.ClockSnapshotCalled()
    +	}
    +
    +	return stm.ClockOffset(), stm.LastSyncTimestamp()
    +}
    +
     // FormattedCurrentTime method gets the formatted current time on which is added a given offset
     func (stm *SyncTimerMock) FormattedCurrentTime() string {
     	return time.Unix(0, 0).String()
    
  • core/metrics.go+6 0 modified
    @@ -300,3 +300,9 @@ const MetricNodeStartTimestamp = "klv_node_start_timestamp"
     
     // MetricRedundancySlotsInactive is the metric for the redundancy slots of inactivity counter
     const MetricRedundancySlotsInactive = "klv_redundancy_slots_inactive"
    +
    +// MetricClockOffsetNs is the metric for the NTP-corrected clock offset in nanoseconds (signed)
    +const MetricClockOffsetNs = "klv_clock_offset_ns"
    +
    +// MetricClockLastSyncTimestamp is the metric for the unix-seconds timestamp of the last successful NTP sync (0 before first success)
    +const MetricClockLastSyncTimestamp = "klv_clock_last_sync_timestamp"
    
  • integrationTest/config/config.yaml+1 1 modified
    @@ -38,7 +38,7 @@ ntp:
         - time.cloudflare.com
         - time.apple.com
       port: 123
    -  timeoutMilliseconds: 100
    +  timeoutMilliseconds: 500
       syncPeriodSeconds: 3600
       version: 0
     
    
  • ntp/interface.go+2 0 modified
    @@ -9,6 +9,8 @@ type SyncTimer interface {
     	Close() error
     	StartSyncingTime()
     	ClockOffset() time.Duration
    +	LastSyncTimestamp() time.Time
    +	ClockSnapshot() (offset time.Duration, lastSync time.Time)
     	FormattedCurrentTime() string
     	CurrentTime() time.Time
     	IsInterfaceNil() bool
    
  • ntp/syncTime.go+61 17 modified
    @@ -96,12 +96,13 @@ func queryNTP(options NTPOptions, hostIndex int) (*ntp.Response, error) {
     
     // syncTime defines an object for time synchronization
     type syncTime struct {
    -	mut         sync.RWMutex
    -	clockOffset time.Duration
    -	syncPeriod  time.Duration
    -	ntpOptions  NTPOptions
    -	query       func(options NTPOptions, hostIndex int) (*ntp.Response, error)
    -	cancelFunc  func()
    +	mut               sync.RWMutex
    +	clockOffset       time.Duration
    +	lastSyncTimestamp time.Time
    +	syncPeriod        time.Duration
    +	ntpOptions        NTPOptions
    +	query             func(options NTPOptions, hostIndex int) (*ntp.Response, error)
    +	cancelFunc        func()
     }
     
     // NewSyncTime creates a syncTime object. The customQueryFunc argument allows the caller to set a different NTP-querying
    @@ -162,11 +163,13 @@ func (s *syncTime) getSleepTime() time.Duration {
     // and servers time which have been used in synchronization
     func (s *syncTime) sync() {
     	clockOffsets := make([]time.Duration, 0)
    +	queryFailures := 0
     	for hostIndex := 0; hostIndex < len(s.ntpOptions.Hosts); hostIndex++ {
     		for requests := 0; requests < numRequestsFromHost; requests++ {
     			response, err := s.query(s.ntpOptions, hostIndex)
     			if err != nil {
    -				log.Debug("sync.query",
    +				queryFailures++
    +				log.Trace("sync.query",
     					"host", s.ntpOptions.Hosts[hostIndex],
     					"port", s.ntpOptions.Port,
     					"error", err.Error())
    @@ -189,9 +192,11 @@ func (s *syncTime) sync() {
     	numTotalRequests := len(s.ntpOptions.Hosts) * numRequestsFromHost
     	minClockOffsetsToAllowUpdate := math.Ceil(float64(numTotalRequests) * minResponsesPercent / (1 - cuttingOutPercent))
     	if len(clockOffsets) < int(minClockOffsetsToAllowUpdate) {
    -		log.Debug("sync.setClockOffset NOT done",
    -			"clock offsets", len(clockOffsets),
    -			"min clock offsets to allow update", int(minClockOffsetsToAllowUpdate))
    +		log.Warn("ntp sync FAILED: insufficient responses",
    +			"ok", len(clockOffsets),
    +			"failed", queryFailures,
    +			"total", numTotalRequests,
    +			"min required", int(minClockOffsetsToAllowUpdate))
     
     		return
     	}
    @@ -200,17 +205,21 @@ func (s *syncTime) sync() {
     	clockOffsetHarmonicMean := s.getHarmonicMean(clockOffsetsWithoutEdges)
     	isOutOfBounds := tools.AbsDuration(clockOffsetHarmonicMean)-outOfBoundsDuration > 0
     	if isOutOfBounds {
    -		log.Error("syncTime.sync: clock offset is out of expected bounds",
    -			"clock offset harmonic mean", clockOffsetHarmonicMean)
    +		log.Error("ntp sync FAILED: offset out of bounds",
    +			"ok", len(clockOffsets),
    +			"failed", queryFailures,
    +			"total", numTotalRequests,
    +			"offset", clockOffsetHarmonicMean)
     
     		return
     	}
    -	s.setClockOffset(clockOffsetHarmonicMean)
    +	s.recordSyncSuccess(clockOffsetHarmonicMean, time.Now())
     
    -	log.Debug("sync.setClockOffset done",
    -		"num clock offsets", len(clockOffsets),
    -		"num clock offsets without edges", len(clockOffsetsWithoutEdges),
    -		"clock offset harmonic mean", clockOffsetHarmonicMean)
    +	log.Debug("ntp sync OK",
    +		"ok", len(clockOffsets),
    +		"failed", queryFailures,
    +		"total", numTotalRequests,
    +		"offset", clockOffsetHarmonicMean)
     }
     
     func (s *syncTime) getClockOffsetsWithoutEdges(clockOffsets []time.Duration) []time.Duration {
    @@ -263,6 +272,41 @@ func (s *syncTime) setClockOffset(clockOffset time.Duration) {
     	s.mut.Unlock()
     }
     
    +// LastSyncTimestamp returns the wall-clock time of the most recent successful
    +// sync. Zero-value time before the first success — exposed as metric value 0
    +// so operators alarming on `time() - last_sync > threshold` only fire once a
    +// sync has actually happened.
    +func (s *syncTime) LastSyncTimestamp() time.Time {
    +	s.mut.RLock()
    +	ts := s.lastSyncTimestamp
    +	s.mut.RUnlock()
    +
    +	return ts
    +}
    +
    +// ClockSnapshot returns the current offset and last-sync timestamp under a
    +// single RLock so callers see a coherent pair. Without this, a sync landing
    +// between two separate getter calls could mix a fresh offset with a stale
    +// timestamp on the same scrape.
    +func (s *syncTime) ClockSnapshot() (offset time.Duration, lastSync time.Time) {
    +	s.mut.RLock()
    +	offset = s.clockOffset
    +	lastSync = s.lastSyncTimestamp
    +	s.mut.RUnlock()
    +
    +	return
    +}
    +
    +// recordSyncSuccess updates offset and timestamp under a single lock so
    +// /node/metrics scrapes can't observe a fresh offset paired with a stale
    +// timestamp (or vice versa) mid-update.
    +func (s *syncTime) recordSyncSuccess(offset time.Duration, at time.Time) {
    +	s.mut.Lock()
    +	s.clockOffset = offset
    +	s.lastSyncTimestamp = at
    +	s.mut.Unlock()
    +}
    +
     // FormattedCurrentTime method gets the formatted current time on which is added the current offset
     func (s *syncTime) FormattedCurrentTime() string {
     	return s.formatTime(s.CurrentTime())
    
  • ntp/syncTime_test.go+109 0 modified
    @@ -319,3 +319,112 @@ func TestCallQueryShouldNotUpdateOnOutOfBoundValuesNegative(t *testing.T) {
     
     	assert.Equal(t, currentValue, st.ClockOffset())
     }
    +
    +func TestLastSyncTimestampStartsAtZero(t *testing.T) {
    +	t.Parallel()
    +
    +	st := ntp.NewSyncTime(config.NTPConfig{SyncPeriodSeconds: 1}, nil)
    +
    +	assert.True(t, st.LastSyncTimestamp().IsZero(),
    +		"timestamp must be zero before first successful sync — the polling code "+
    +			"checks IsZero() to publish 0 (operator alarms on `time() - value > threshold`)")
    +}
    +
    +func TestLastSyncTimestampStaysZeroWhenAllQueriesFail(t *testing.T) {
    +	t.Parallel()
    +
    +	// Mirrors the "i/o timeout" failure mode at startup: every UDP query fails.
    +	// The metric must honestly say "this node has never synced" so operator
    +	// alarms can fire.
    +	st := ntp.NewSyncTime(
    +		config.NTPConfig{
    +			SyncPeriodSeconds: 1,
    +			Hosts:             []string{"host1"},
    +		},
    +		func(options ntp.NTPOptions, hostIndex int) (*beevikNtp.Response, error) {
    +			return nil, errNtpMock
    +		},
    +	)
    +
    +	st.Sync()
    +
    +	assert.True(t, st.LastSyncTimestamp().IsZero(),
    +		"unreachable NTP must not stamp a timestamp")
    +}
    +
    +func TestLastSyncTimestampStaysZeroOnOutOfBoundsRejection(t *testing.T) {
    +	t.Parallel()
    +
    +	// All queries returned but the harmonic mean was rejected (> 1s). No
    +	// usable correction was produced, so the timestamp must not advance —
    +	// otherwise the "sync stalled" alarm would silently clear on a rejected
    +	// sync, hiding the broken state.
    +	st := ntp.NewSyncTime(
    +		config.NTPConfig{
    +			SyncPeriodSeconds: 3600,
    +			Hosts:             []string{"host1"},
    +		},
    +		func(options ntp.NTPOptions, hostIndex int) (*beevikNtp.Response, error) {
    +			return &beevikNtp.Response{
    +				ClockOffset: ntp.OutOfBoundsDuration + time.Nanosecond,
    +			}, nil
    +		},
    +	)
    +
    +	st.Sync()
    +
    +	assert.True(t, st.LastSyncTimestamp().IsZero())
    +}
    +
    +func TestLastSyncTimestampStampedOnSuccess(t *testing.T) {
    +	t.Parallel()
    +
    +	st := ntp.NewSyncTime(
    +		config.NTPConfig{
    +			SyncPeriodSeconds: 1,
    +			Hosts:             []string{"host1"},
    +		},
    +		func(options ntp.NTPOptions, hostIndex int) (*beevikNtp.Response, error) {
    +			return &beevikNtp.Response{ClockOffset: 23456}, nil
    +		},
    +	)
    +
    +	before := time.Now()
    +	st.Sync()
    +	after := time.Now()
    +
    +	ts := st.LastSyncTimestamp()
    +	assert.False(t, ts.IsZero())
    +	assert.False(t, ts.Before(before), "timestamp must be set on or after the sync started")
    +	assert.False(t, ts.After(after), "timestamp must not be set in the future")
    +}
    +
    +func TestClockSnapshotReturnsConsistentPair(t *testing.T) {
    +	t.Parallel()
    +
    +	// After a successful sync, ClockSnapshot must return the offset and
    +	// timestamp written by that same sync — they're set together under one
    +	// lock in recordSyncSuccess and must be read together for callers
    +	// publishing them as paired metrics.
    +	st := ntp.NewSyncTime(
    +		config.NTPConfig{
    +			SyncPeriodSeconds: 1,
    +			Hosts:             []string{"host1"},
    +		},
    +		func(options ntp.NTPOptions, hostIndex int) (*beevikNtp.Response, error) {
    +			return &beevikNtp.Response{ClockOffset: 23456}, nil
    +		},
    +	)
    +
    +	offset, ts := st.ClockSnapshot()
    +	assert.Equal(t, time.Duration(0), offset, "no sync yet, offset is zero")
    +	assert.True(t, ts.IsZero(), "no sync yet, timestamp is zero")
    +
    +	st.Sync()
    +
    +	offset, ts = st.ClockSnapshot()
    +	assert.Equal(t, time.Duration(23456), offset)
    +	assert.False(t, ts.IsZero())
    +	assert.Equal(t, st.ClockOffset(), offset, "snapshot offset must match individual getter")
    +	assert.Equal(t, st.LastSyncTimestamp(), ts, "snapshot timestamp must match individual getter")
    +}
    

Vulnerability mechanics

Root cause

"The `syncDataTrie` function in account-data trie syncers fails to call `EndProcessing()` on certain error paths, leading to a permanent leak of throttler slots."

Attack vector

An attacker can repeatedly trigger failures in the `trieSyncer.StartSyncing()` function, such as by causing timeouts during trie-node retrieval. Each failure consumes a slot from the `NumGoRoutinesThrottler` without releasing it. As enough slots are leaked, the throttler becomes exhausted, preventing further sync attempts. This blockage impacts the epoch bootstrap process, which aborts on sync errors, leading to a denial of service for new or restarting nodes [ref_id=1].

Affected code

The vulnerability exists in the `syncDataTrie` function within `data/syncer/userAccountsSyncer.go` and `data/syncer/kappAccountsSyncer.go`. The issue stems from the interaction with `core/throttler/numGoRoutinesThrottler.go` and affects the `core/bootstrap/process.go` during epoch bootstrap.

What the fix does

The recommended fix involves adding a `defer u.throttler.EndProcessing()` call immediately after `u.throttler.StartProcessing()` in the `syncDataTrie` function for both `userAccountsSyncer` and `kappAccountsSyncer`. This ensures that the throttler slot is always released, even if errors occur during trie synchronization, preventing the slot from being permanently consumed and thus resolving the leak.

Preconditions

  • inputThe attacker must be able to trigger failures in trie synchronization, such as network timeouts or errors during trie-node retrieval.

Reproduction

The provided PoC tests `TestPOC_UserAccountsSyncer_LeaksThrottlerSlotOnTrieTimeout` and `TestPOC_KappAccountsSyncer_LeaksThrottlerSlotOnTrieTimeout` demonstrate the throttler slot leak. Running these tests with a throttler capacity of 2 shows that after two failed sync attempts, the throttler becomes exhausted (`thr.CanProcess()` returns false), confirming the leak. The command to run the tests is `go test ./data/syncer -run 'TestPOC_(User|Kapp)AccountsSyncer_LeaksThrottlerSlotOnTrieTimeout' -count=1` [ref_id=1].

Generated on Jun 5, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

3

News mentions

0

No linked articles in our index yet.