VYPR
High severity7.5NVD Advisory· Published Jun 5, 2026· Updated Jun 9, 2026

klever-go: Unbounded goroutine spawn on direct-message ingress enables peer-driven DoS

CVE-2026-52879

Description

Summary

networkMessenger.directMessageHandler in network/p2p/libp2p/netMessenger.go spawns a fresh goroutine for every incoming direct message before the antiflood layer makes an admission decision. There is no semaphore, throttler, or bound on concurrent in-flight spawns.

A single connected libp2p peer can open a DirectSendID stream and send well-formed TopicMessage envelopes with varying sequence numbers. Each accepted direct message reaches directMessageHandler and triggers a fresh goroutine before processor.ProcessReceivedMessage runs. This allows unbounded goroutine growth and node availability degradation from one peer.

This remains present in the latest release v1.7.17: network/p2p/libp2p/netMessenger.go:1060 still spawns go func(msg p2p.MessageP2P) before processor.ProcessReceivedMessage. I also verified current develop commit 10bcfd50, where the same spawn remains at network/p2p/libp2p/netMessenger.go:1115.

This is distinct from GHSA-74m6-4hjp-7226 and GHSA-87m7-qffr-542v. Those advisories concern MultiDataInterceptor decompression/throttler behavior. This report concerns the libp2p direct-message ingress wrapper spawning an unbounded goroutine before processor-level antiflood/admission logic runs. A patch to Batch.Decompress or MultiDataInterceptor does not bound this direct-message goroutine spawn.

Details

The affected path is network/p2p/libp2p/netMessenger.go in directMessageHandler.

The direct-message path transforms and validates the message, looks up the topic processor, then immediately spawns a goroutine:

func (netMes *networkMessenger) directMessageHandler(message *pubsub.Message, fromConnectedPeer core.PeerID) error {
    var processor p2p.MessageProcessor

    topic := *message.Topic
    msg, err := netMes.transformAndCheckMessage(message, fromConnectedPeer, topic)
    if err != nil {
        return err
    }

    netMes.mutTopics.RLock()
    processor = netMes.processors[topic]
    netMes.mutTopics.RUnlock()

    if processor == nil {
        return fmt.Errorf("%w on directMessageHandler for topic %s", p2p.ErrNilValidator, topic)
    }

    go func(msg p2p.MessageP2P) {
        if check.IfNil(msg) {
            return
        }

        errProcess := processor.ProcessReceivedMessage(msg, fromConnectedPeer)
        // ...
    }(msg)

    return nil
}

The processor-level antiflood decision happens inside ProcessReceivedMessage, after the goroutine, its stack, and the cloned message reference already exist. That means antiflood can bound processing rate, but not goroutine creation rate.

The existing goRoutinesThrottler with capacity broadcastGoRoutines = 1000 is wired into outgoing broadcast paths such as BroadcastOnChannelBlocking, not this incoming direct-message path.

The parallel pubsub ingress path in the same file handles a comparable inbound message surface synchronously:

err = handler.ProcessReceivedMessage(msg, fromConnectedPeer)

So the direct-message path is asymmetric: same transform/check function, same ProcessReceivedMessage callee, but direct-message ingress adds an unbounded goroutine spawn.

Reachability:

  • directSender.go registers DirectSendID as a libp2p stream protocol.
  • directStreamHandler reads framed pubsub.Message envelopes from the stream.
  • directStreamHandler forwards each message to networkMessenger.directMessageHandler.
  • Any connected peer can send well-formed envelopes to registered topics.
  • The seenMessages cache keys on From + Seqno; Seqno is attacker-controlled in the envelope, so incrementing it bypasses dedupe.

PoC

GitHub Private Vulnerability Reporting does not appear to allow file attachments in this form, so I am including the reproduction command and captured output inline. I can provide the full Go test file immediately if useful.

The PoC is a Go test file intended to be placed under network/p2p/libp2p/ in a klever-go checkout. It exercises the real network/p2p/libp2p package with NewMockMessenger.

Reproduction:

git clone https://github.com/klever-io/klever-go
cd klever-go
git checkout v1.7.16

# Place dos_directmsg_test.go into:
# network/p2p/libp2p/

go test ./network/p2p/libp2p/ -run TestPoC_ -count=1 -v -timeout 60s

Captured output:

=== RUN   TestPoC_DirectMessageHandler_SpawnsGoroutinePerMessage
    baseline goroutines: 43
    peak goroutines after 500 direct messages: 543 (delta = 500)
    final goroutines after drain + GC: 43
POC_RESULT direct=spawn N=500 baseline=43 peak=543 delta=500 threshold=400 final=43
--- PASS

=== RUN   TestPoC_SynchronousHandler_NoGoroutineGrowth
    baseline goroutines: 47
    peak goroutines after 500 synchronous calls: 47 (delta = 0)
POC_RESULT sync=block N=500 baseline=47 peak=47 delta=0
--- PASS

=== RUN   TestPoC_DirectMessageHandler_NoThrottlerInPath
    all 2000 SendToConnectedPeer calls returned in 2.490708ms -- no throttler blocking
POC_RESULT throttler=absent N=2000 elapsed=2.490708ms
--- PASS

Reading:

  1. 500 direct messages with slow processors produced exactly 500 new goroutines.
  2. The synchronous control path produced zero goroutine growth.
  3. 2000 messages, twice the outgoing broadcastGoRoutines = 1000 capacity, returned immediately, showing no ingress throttler blocks this path.

Impact

A single connected peer can sustain unbounded goroutine spawn growth on a klever-go node. Each spawned goroutine allocates its own stack, holds message references until the processor returns, and adds scheduler and GC pressure before antiflood admission can reject the message.

Under realistic attacker line rate and non-trivial processor latency, goroutine count can grow faster than the runtime drains it, degrading the node's ability to process legitimate traffic. This maps to the SECURITY.md High category: "Denial of Service affecting network availability."

All testing was local only. I did not contact Klever mainnet, public testnet, hosted RPCs, explorers, or third-party production infrastructure.

Suggested fixes:

  1. Wire goRoutinesThrottler.CanProcess() or a dedicated ingress throttler before the go func() spawn in directMessageHandler.
  2. Or remove the goroutine and call ProcessReceivedMessage synchronously, matching the existing pubsubCallback path.

Disclosure note: I originally sent this report to security@klever.org on 2026-05-13. Since SECURITY.md lists GitHub Private Vulnerability Reporting as the recommended channel, I am resubmitting it here.

Affected products

1

Patches

3
2cf19442b62c

[KLC-2427] p2p: process direct-message ingress synchronously (GHSA-hf2g-6j7h-98wg)

https://github.com/klever-io/klever-goFernando SobreiraMay 30, 2026Fixed in 1.7.18via ghsa-release-walk
5 files changed · +361 36
  • data/retriever/storageResolvers/selfSend_importdb_test.go+190 0 added
    @@ -0,0 +1,190 @@
    +package storageResolvers
    +
    +import (
    +	"bytes"
    +	"fmt"
    +	"sync"
    +	"sync/atomic"
    +	"testing"
    +	"time"
    +
    +	commonMock "github.com/klever-io/klever-go/common/mock"
    +	"github.com/klever-io/klever-go/config"
    +	"github.com/klever-io/klever-go/core"
    +	"github.com/klever-io/klever-go/data/batch"
    +	"github.com/klever-io/klever-go/data/endProcess"
    +	"github.com/klever-io/klever-go/network/p2p"
    +	"github.com/klever-io/klever-go/network/p2p/libp2p"
    +	p2pMock "github.com/klever-io/klever-go/network/p2p/mock"
    +	mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
    +	"github.com/stretchr/testify/require"
    +)
    +
    +// selfSendTimeout bounds each self-send so a deadlock regression fails fast instead of hanging.
    +const selfSendTimeout = 2 * time.Second
    +
    +// newLoopbackMessenger builds a real in-memory networkMessenger to exercise the genuine
    +// SendToConnectedPeer(self) -> sendDirectToSelf -> directMessageHandler loopback.
    +func newLoopbackMessenger(t *testing.T) p2p.Messenger {
    +	t.Helper()
    +
    +	mes, err := libp2p.NewMockMessenger(
    +		libp2p.ArgsNetworkMessenger{
    +			Marshalizer:   &commonMock.ProtoMarshalizerMock{},
    +			ListenAddress: libp2p.ListenLocalhostAddrWithIp4AndTcp,
    +			P2pConfig: config.P2PConfig{
    +				Node:                config.NodeConfig{Port: "0"},
    +				KadDhtPeerDiscovery: config.KadDhtPeerDiscoveryConfig{Enabled: false},
    +				Sharding:            config.ShardingConfig{Type: p2p.NilListSharder},
    +			},
    +			SyncTimer: &libp2p.LocalSyncTimer{},
    +		},
    +		mocknet.New(),
    +	)
    +	require.Nil(t, err)
    +
    +	return mes
    +}
    +
    +// runWithin runs fn and fails the test if it does not return before selfSendTimeout.
    +func runWithin(t *testing.T, what string, fn func() error) error {
    +	t.Helper()
    +
    +	done := make(chan error, 1)
    +	go func() {
    +		// Turn a panic in fn into a clean test failure instead of crashing the test binary.
    +		defer func() {
    +			if r := recover(); r != nil {
    +				done <- fmt.Errorf("%s panicked: %v", what, r)
    +			}
    +		}()
    +		done <- fn()
    +	}()
    +
    +	select {
    +	case err := <-done:
    +		return err
    +	case <-time.After(selfSendTimeout):
    +		// On timeout the worker goroutine is intentionally leaked: this is a fail-fast guard and
    +		// the process exits via Fatalf, so the leak does not outlive the test.
    +		t.Fatalf("%s deadlocked: self-send did not return within %s — import-db replay would hang", what, selfSendTimeout)
    +		return nil
    +	}
    +}
    +
    +// TestSliceResolver_RequestDataFromHash_SelfSendIsSynchronous guards the import-db replay path:
    +// storage resolvers answer local requests via sendToSelf -> SendToConnectedPeer(self). After the
    +// GHSA-hf2g-6j7h-98wg fix this must deliver to the response-topic processor synchronously (already
    +// processed by the time RequestDataFromHash returns), with no error and no deadlock.
    +func TestSliceResolver_RequestDataFromHash_SelfSendIsSynchronous(t *testing.T) {
    +	// Not t.Parallel: messenger construction races on the libp2p global pubsub.TimeCacheDuration
    +	// (KLC-2430), so two messengers cannot be built concurrently.
    +	const responseTopic = "txBlockBodies_0_RESPONSE"
    +	hash := []byte("mb-hash")
    +	value := []byte("marshalled-miniblock-bytes")
    +
    +	mes := newLoopbackMessenger(t)
    +	defer func() { _ = mes.Close() }()
    +
    +	marshalizer := &commonMock.MarshalizerMock{}
    +
    +	var processed int32
    +	var received atomic.Value // []byte
    +	err := mes.RegisterMessageProcessor(responseTopic, &p2pMock.MessageProcessorStub{
    +		ProcessMessageCalled: func(msg p2p.MessageP2P, _ core.PeerID) error {
    +			received.Store(append([]byte(nil), msg.Data()...))
    +			atomic.AddInt32(&processed, 1)
    +			return nil
    +		},
    +	})
    +	require.Nil(t, err)
    +
    +	storer := commonMock.NewStorerMock("Storage", 0)
    +	require.Nil(t, storer.Put(hash, value))
    +
    +	arg := ArgSliceResolver{
    +		Messenger:                mes,
    +		ResponseTopicName:        responseTopic,
    +		Storage:                  storer,
    +		DataPacker:               &commonMock.DataPackerStub{},
    +		Marshalizer:              marshalizer,
    +		ManualEpochStartNotifier: &commonMock.ManualEpochStartNotifierStub{},
    +		ChanGracefullyClose:      make(chan endProcess.ArgEndProcess, 1),
    +	}
    +	res, err := NewSliceResolver(arg)
    +	require.Nil(t, err)
    +
    +	err = runWithin(t, "RequestDataFromHash", func() error {
    +		return res.RequestDataFromHash(hash, 0)
    +	})
    +	require.Nil(t, err)
    +
    +	// Synchronous: processing must be done by the time RequestDataFromHash returns.
    +	require.Equal(t, int32(1), atomic.LoadInt32(&processed),
    +		"self-sent response must be processed synchronously before RequestDataFromHash returns")
    +
    +	// Payload must match the resolver's marshalled batch.
    +	expected, err := marshalizer.Marshal(&batch.Batch{Data: [][]byte{value}})
    +	require.Nil(t, err)
    +	got, _ := received.Load().([]byte)
    +	require.True(t, bytes.Equal(expected, got),
    +		"delivered payload must match the resolver's marshalled batch")
    +}
    +
    +// TestSliceResolver_RequestDataFromHashArray_SelfSendDeliversEveryChunk guards the multi-message
    +// self-send loop (one sendToSelf per packed chunk): every chunk must arrive synchronously, in order.
    +func TestSliceResolver_RequestDataFromHashArray_SelfSendDeliversEveryChunk(t *testing.T) {
    +	// Not t.Parallel: see TestSliceResolver_RequestDataFromHash_SelfSendIsSynchronous.
    +	const responseTopic = "miniBlocks_0_RESPONSE"
    +	hashes := [][]byte{[]byte("h1"), []byte("h2"), []byte("h3")}
    +	values := [][]byte{[]byte("v1"), []byte("v2"), []byte("v3")}
    +
    +	mes := newLoopbackMessenger(t)
    +	defer func() { _ = mes.Close() }()
    +
    +	var mut sync.Mutex
    +	deliveries := make([][]byte, 0, len(hashes))
    +	err := mes.RegisterMessageProcessor(responseTopic, &p2pMock.MessageProcessorStub{
    +		ProcessMessageCalled: func(msg p2p.MessageP2P, _ core.PeerID) error {
    +			mut.Lock()
    +			deliveries = append(deliveries, append([]byte(nil), msg.Data()...))
    +			mut.Unlock()
    +			return nil
    +		},
    +	})
    +	require.Nil(t, err)
    +
    +	storer := commonMock.NewStorerMock("Storage", 0)
    +	for i := range hashes {
    +		require.Nil(t, storer.Put(hashes[i], values[i]))
    +	}
    +
    +	// Identity packer: one chunk per stored value, preserving order.
    +	packer := &commonMock.DataPackerStub{
    +		PackDataInChunksCalled: func(data [][]byte, _ int) ([][]byte, error) {
    +			return data, nil
    +		},
    +	}
    +
    +	arg := ArgSliceResolver{
    +		Messenger:                mes,
    +		ResponseTopicName:        responseTopic,
    +		Storage:                  storer,
    +		DataPacker:               packer,
    +		Marshalizer:              &commonMock.MarshalizerMock{},
    +		ManualEpochStartNotifier: &commonMock.ManualEpochStartNotifierStub{},
    +		ChanGracefullyClose:      make(chan endProcess.ArgEndProcess, 1),
    +	}
    +	res, err := NewSliceResolver(arg)
    +	require.Nil(t, err)
    +
    +	err = runWithin(t, "RequestDataFromHashArray", func() error {
    +		return res.RequestDataFromHashArray(hashes, 0)
    +	})
    +	require.Nil(t, err)
    +
    +	mut.Lock()
    +	defer mut.Unlock()
    +	require.Equal(t, values, deliveries,
    +		"every self-sent chunk must be delivered synchronously and in order")
    +}
    
  • network/p2p/libp2p/directSender.go+9 0 modified
    @@ -27,6 +27,10 @@ var _ p2p.DirectSender = (*directSender)(nil)
     const timeSeenMessages = time.Second * 120
     const maxMutexes = 10000
     
    +// directSendWriteTimeout bounds a single direct-message write so a stalled peer cannot pin the
    +// writer. 10s covers legitimate ~1 MiB responses. See GHSA-hf2g-6j7h-98wg.
    +const directSendWriteTimeout = time.Second * 10
    +
     type directSender struct {
     	counter         uint64
     	ctx             context.Context
    @@ -170,6 +174,8 @@ func (ds *directSender) Send(topic string, buff []byte, peer core.PeerID) error
     		return err
     	}
     
    +	_ = stream.SetWriteDeadline(time.Now().Add(directSendWriteTimeout))
    +
     	msg := ds.createMessage(topic, buff, conn)
     
     	bufw := bufio.NewWriter(stream)
    @@ -189,6 +195,9 @@ func (ds *directSender) Send(topic string, buff []byte, peer core.PeerID) error
     		return err
     	}
     
    +	// Clear the deadline on the reused stream.
    +	_ = stream.SetWriteDeadline(time.Time{})
    +
     	return nil
     }
     
    
  • network/p2p/libp2p/dos_directmsg_test.go+116 0 added
    @@ -0,0 +1,116 @@
    +package libp2p_test
    +
    +import (
    +	"fmt"
    +	"sync"
    +	"sync/atomic"
    +	"testing"
    +	"time"
    +
    +	"github.com/klever-io/klever-go/core"
    +	"github.com/klever-io/klever-go/network/p2p"
    +	"github.com/klever-io/klever-go/network/p2p/libp2p"
    +	"github.com/klever-io/klever-go/network/p2p/mock"
    +	mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
    +	"github.com/stretchr/testify/require"
    +)
    +
    +// TestNetworkMessenger_DirectMessageHandler_ProcessesSynchronously guards GHSA-hf2g-6j7h-98wg:
    +// direct-message processing must be synchronous, so in-flight processing is bounded by the number
    +// of streams, not the number of messages. N senders (=> N streams) flood one receiver whose
    +// processor parks until released; the test asserts peak in-flight stays at N (pre-fix it would
    +// approach the message count, as each message spawned its own goroutine).
    +func TestNetworkMessenger_DirectMessageHandler_ProcessesSynchronously(t *testing.T) {
    +	const numSenders = 4
    +	const perSender = 25 // 100 messages total; pre-fix this would approach 100 in-flight
    +
    +	netw := mocknet.New()
    +
    +	receiver, err := libp2p.NewMockMessenger(createMockNetworkArgs(), netw)
    +	require.Nil(t, err)
    +	defer func() { _ = receiver.Close() }()
    +
    +	senders := make([]p2p.Messenger, numSenders)
    +	for i := range senders {
    +		s, errS := libp2p.NewMockMessenger(createMockNetworkArgs(), netw)
    +		require.Nil(t, errS)
    +		senders[i] = s
    +		defer func(m p2p.Messenger) { _ = m.Close() }(s)
    +	}
    +
    +	require.Nil(t, netw.LinkAll())
    +
    +	var current, maxObserved, exceeded int32
    +	release := make(chan struct{})
    +
    +	err = receiver.RegisterMessageProcessor("test", &mock.MessageProcessorStub{
    +		ProcessMessageCalled: func(_ p2p.MessageP2P, _ core.PeerID) error {
    +			c := atomic.AddInt32(&current, 1)
    +			for {
    +				m := atomic.LoadInt32(&maxObserved)
    +				if c <= m || atomic.CompareAndSwapInt32(&maxObserved, m, c) {
    +					break
    +				}
    +			}
    +			if c > int32(numSenders) {
    +				atomic.StoreInt32(&exceeded, 1)
    +			}
    +			<-release // park, keeping every concurrently-processing message in-flight
    +			atomic.AddInt32(&current, -1)
    +			return nil
    +		},
    +	})
    +	require.Nil(t, err)
    +
    +	for _, s := range senders {
    +		require.Nil(t, s.ConnectToPeer(receiver.Addresses()[0]))
    +	}
    +
    +	// Wait until every sender is connected before flooding (polling beats a fixed sleep, which
    +	// flakes under load). A direct send needs a live connection.
    +	connectDeadline := time.Now().Add(5 * time.Second)
    +	for _, s := range senders {
    +		for !s.IsConnected(receiver.ID()) {
    +			require.True(t, time.Now().Before(connectDeadline),
    +				"senders did not connect to the receiver within the deadline")
    +			time.Sleep(10 * time.Millisecond)
    +		}
    +	}
    +
    +	// Each sender floods on its own goroutine. With a parked processor, only the first message per
    +	// stream is consumed; the rest block on backpressure rather than soaking into goroutines.
    +	var wg sync.WaitGroup
    +	for _, s := range senders {
    +		wg.Add(1)
    +		go func(m p2p.Messenger) {
    +			defer wg.Done()
    +			for j := 0; j < perSender; j++ {
    +				_ = m.SendToConnectedPeer("test", []byte(fmt.Sprintf("m-%d", j)), receiver.ID())
    +			}
    +		}(s)
    +	}
    +
    +	// Wait until one message per stream is parked, asserting in-flight never exceeds stream count.
    +	reached := false
    +	deadline := time.Now().Add(5 * time.Second)
    +	for time.Now().Before(deadline) {
    +		require.Zero(t, atomic.LoadInt32(&exceeded),
    +			"in-flight exceeded live-stream count — per-message goroutine spawn has regressed")
    +		if atomic.LoadInt32(&current) == int32(numSenders) {
    +			reached = true
    +			break
    +		}
    +		time.Sleep(20 * time.Millisecond)
    +	}
    +	require.True(t, reached,
    +		"in-flight never reached the stream count (%d); sent %d total", numSenders, numSenders*perSender)
    +
    +	// Hold to confirm in-flight does NOT climb toward the message count.
    +	time.Sleep(300 * time.Millisecond)
    +	require.LessOrEqual(t, atomic.LoadInt32(&maxObserved), int32(numSenders),
    +		"peak in-flight must stay at stream count (%d), not message count (%d)",
    +		numSenders, numSenders*perSender)
    +
    +	close(release) // unblock parked processors; backpressured sends now drain
    +	wg.Wait()
    +}
    
  • network/p2p/libp2p/netMessenger.go+43 33 modified
    @@ -1131,10 +1131,30 @@ func (netMes *networkMessenger) sendDirectToSelf(topic string, buff []byte) erro
     	return netMes.directMessageHandler(msg, netMes.ID())
     }
     
    -func (netMes *networkMessenger) directMessageHandler(message *pubsub.Message, fromConnectedPeer core.PeerID) error {
    +func (netMes *networkMessenger) directMessageHandler(message *pubsub.Message, fromConnectedPeer core.PeerID) (err error) {
     	var processor p2p.MessageProcessor
     
    +	// Guard before dereferencing: callers validate this today, but the deref must not panic
    +	// outside the recover below, or it would crash the node it is meant to protect.
    +	if message == nil || message.Topic == nil {
    +		return p2p.ErrNilMessage
    +	}
     	topic := *message.Topic
    +
    +	// Recover so a panic on untrusted data cannot crash the node: this now runs synchronously
    +	// in the stream reader goroutine, which has no recover of its own (GHSA-rm5c-5x2p-48wr).
    +	defer func() {
    +		if r := recover(); r != nil {
    +			netMes.recoverFromMessagePanic(r, topic, fromConnectedPeer, message)
    +			dataLen := 0
    +			if message != nil {
    +				dataLen = len(message.GetData())
    +			}
    +			netMes.processDebugMessage(topic, fromConnectedPeer, uint64(dataLen), true)
    +			err = nil
    +		}
    +	}()
    +
     	msg, err := netMes.transformAndCheckMessage(message, fromConnectedPeer, topic)
     	if err != nil {
     		return err
    @@ -1148,38 +1168,28 @@ func (netMes *networkMessenger) directMessageHandler(message *pubsub.Message, fr
     		return fmt.Errorf("%w on directMessageHandler for topic %s", p2p.ErrNilValidator, topic)
     	}
     
    -	go func(msg p2p.MessageP2P) {
    -		// Circuit-breaker for the direct-send path: the gossip recover does not cover
    -		// this goroutine (GHSA-rm5c-5x2p-48wr).
    -		defer func() {
    -			if r := recover(); r != nil {
    -				netMes.recoverFromMessagePanic(r, topic, fromConnectedPeer, message)
    -				dataLen := 0
    -				if message != nil {
    -					dataLen = len(message.GetData())
    -				}
    -				netMes.processDebugMessage(topic, fromConnectedPeer, uint64(dataLen), true)
    -			}
    -		}()
    -
    -		if check.IfNil(msg) {
    -			return
    -		}
    -
    -		//we won't recheck the message id against the cacher here as there might be collisions since we are using
    -		// a separate sequence counter for direct sender
    -		errProcess := processor.ProcessReceivedMessage(msg, fromConnectedPeer)
    -		if errProcess != nil {
    -			log.Trace("p2p validator directMessageHandler",
    -				"error", errProcess.Error(),
    -				"topic", msg.Topic(),
    -				"originator", p2p.MessageOriginatorPid(msg),
    -				"from connected peer", p2p.PeerIDToShortString(fromConnectedPeer),
    -				"seq no", p2p.MessageOriginatorSeq(msg),
    -			)
    -		}
    -		netMes.debugger.AddIncomingMessage(msg.Topic(), uint64(len(msg.Data())), errProcess != nil)
    -	}(msg)
    +	if check.IfNil(msg) {
    +		return p2p.ErrNilMessage
    +	}
    +
    +	// Process synchronously: the per-stream reader bounds concurrency to one in-flight per stream,
    +	// instead of one goroutine per message which a single peer could grow unbounded (GHSA-hf2g-6j7h-98wg).
    +	// Trade-off: a slow processor head-of-line-blocks its own stream, so direct-send processors
    +	// must stay non-blocking.
    +	//
    +	//we won't recheck the message id against the cacher here as there might be collisions since we are using
    +	// a separate sequence counter for direct sender
    +	errProcess := processor.ProcessReceivedMessage(msg, fromConnectedPeer)
    +	if errProcess != nil {
    +		log.Trace("p2p validator directMessageHandler",
    +			"error", errProcess.Error(),
    +			"topic", msg.Topic(),
    +			"originator", p2p.MessageOriginatorPid(msg),
    +			"from connected peer", p2p.PeerIDToShortString(fromConnectedPeer),
    +			"seq no", p2p.MessageOriginatorSeq(msg),
    +		)
    +	}
    +	netMes.debugger.AddIncomingMessage(msg.Topic(), uint64(len(msg.Data())), errProcess != nil)
     
     	return nil
     }
    
  • network/p2p/mock/streamMock.go+3 3 modified
    @@ -86,17 +86,17 @@ func (sm *streamMock) Reset() error {
     
     // SetDeadline -
     func (sm *streamMock) SetDeadline(time.Time) error {
    -	panic("implement me")
    +	return nil
     }
     
     // SetReadDeadline -
     func (sm *streamMock) SetReadDeadline(time.Time) error {
    -	panic("implement me")
    +	return nil
     }
     
     // SetWriteDeadline -
     func (sm *streamMock) SetWriteDeadline(time.Time) error {
    -	panic("implement me")
    +	return nil
     }
     
     // Protocol -
    
a3c2e67ac6a8

[KLC-2432] fix nil-Header panic in block interceptor + cover direct-send recover gap (GHSA-rm5c-5x2p-48wr)

https://github.com/klever-io/klever-goFernando SobreiraMay 30, 2026Fixed in 1.7.18via ghsa-release-walk
5 files changed · +162 22
  • core/process/block/interceptedBlocks/interceptedBlock.go+14 5 modified
    @@ -149,18 +149,27 @@ func (inMb *InterceptedBlock) Type() string {
     
     // String returns the transactions body's most important fields as string
     func (inMb *InterceptedBlock) String() string {
    +	// Runs pre-CheckValidity on untrusted data; guard block + nil-safe getters so an
    +	// omitted Header cannot panic (GHSA-rm5c-5x2p-48wr).
    +	if inMb.block == nil {
    +		return ""
    +	}
     	return fmt.Sprintf("epoch=%d, slot=%d, nonce=%d, block numTxs=%d",
    -		inMb.block.Header.Epoch,
    -		inMb.block.Header.Slot,
    -		inMb.block.Header.Nonce,
    +		inMb.block.GetEpoch(),
    +		inMb.block.GetSlot(),
    +		inMb.block.GetNonce(),
     		len(inMb.block.TxHashes),
     	)
     }
     
     // Identifiers returns the identifiers used in requests
     func (inMb *InterceptedBlock) Identifiers() [][]byte {
    -	keyNonce := []byte(fmt.Sprintf("nonce-%d", inMb.block.Header.Nonce))
    -	keyEpoch := []byte(core.EpochStartIdentifier(inMb.block.Header.Epoch))
    +	// Same guard as String: runs pre-CheckValidity on untrusted data (GHSA-rm5c-5x2p-48wr).
    +	if inMb.block == nil {
    +		return [][]byte{inMb.hash}
    +	}
    +	keyNonce := []byte(fmt.Sprintf("nonce-%d", inMb.block.GetNonce()))
    +	keyEpoch := []byte(core.EpochStartIdentifier(inMb.block.GetEpoch()))
     
     	return [][]byte{inMb.hash, keyNonce, keyEpoch}
     }
    
  • core/process/block/interceptedBlocks/interceptedBlock_test.go+27 0 modified
    @@ -437,3 +437,30 @@ func TestInterceptedBlock_CheckValidity(t *testing.T) {
     	}
     
     }
    +
    +// Regression test for GHSA-rm5c-5x2p-48wr (block variant): a nil Header must not panic
    +// Identifiers()/String() (they run before CheckValidity, which still rejects the block).
    +func TestInterceptedBlock_NilHeaderIdentifiersAndStringShouldNotPanic(t *testing.T) {
    +	t.Parallel()
    +
    +	arg := createDefaultBlockArgument()
    +	// Use the proto marshaler to mirror the real gossip/direct-send wire path. Header
    +	// omitted — stays nil after unmarshal, mirroring the PoC payload.
    +	protoMarshalizer := &mock.ProtoMarshalizerMock{}
    +	arg.Marshalizer = protoMarshalizer
    +	arg.BlockBuff, _ = protoMarshalizer.Marshal(&block.Block{PubKeysBitmap: []byte{1}})
    +
    +	inBlk, err := interceptedBlocks.NewInterceptedBlock(arg)
    +	require.Nil(t, err)
    +	require.False(t, check.IfNil(inBlk))
    +
    +	require.NotPanics(t, func() {
    +		_ = inBlk.Identifiers()
    +		_ = inBlk.String()
    +	})
    +
    +	// CheckValidity must still reject the malformed (nil-header) block.
    +	err = inBlk.CheckValidity()
    +	require.Error(t, err)
    +	assert.ErrorIs(t, err, common.ErrNilPreviousBlockHash)
    +}
    
  • network/p2p/libp2p/export_test.go+5 0 modified
    @@ -3,6 +3,7 @@ package libp2p
     import (
     	"context"
     
    +	"github.com/klever-io/klever-go/core"
     	"github.com/klever-io/klever-go/network/p2p"
     	"github.com/klever-io/klever-go/storage"
     	pubsub "github.com/libp2p/go-libp2p-pubsub"
    @@ -41,6 +42,10 @@ func (netMes *networkMessenger) PubsubCallback(handler p2p.MessageProcessor, top
     	return netMes.pubsubCallback(handler, topic)
     }
     
    +func (netMes *networkMessenger) DirectMessageHandler(message *pubsub.Message, fromConnectedPeer core.PeerID) error {
    +	return netMes.directMessageHandler(message, fromConnectedPeer)
    +}
    +
     func (netMes *networkMessenger) ValidMessageByTimestamp(msg p2p.MessageP2P) error {
     	return netMes.validMessageByTimestamp(msg)
     }
    
  • network/p2p/libp2p/netMessenger.go+37 17 modified
    @@ -892,28 +892,15 @@ func (netMes *networkMessenger) pubsubCallback(handler p2p.MessageProcessor, top
     	return func(ctx context.Context, pid peer.ID, message *pubsub.Message) (isValid bool) {
     		fromConnectedPeer := core.PeerID(pid)
     
    -		// Circuit-breaker: this callback runs in an unrecovered pubsub goroutine, so a
    -		// panic on an untrusted message would crash the node (GHSA-rm5c-5x2p-48wr).
    +		// Circuit-breaker: recover so a panic on an untrusted message cannot crash the
    +		// node (GHSA-rm5c-5x2p-48wr).
     		defer func() {
     			if r := recover(); r != nil {
    -				// Read message via nil-safe getters: the recover must not panic itself.
    -				var dataLen int
    -				var originator core.PeerID
    +				netMes.recoverFromMessagePanic(r, topic, fromConnectedPeer, message)
    +				dataLen := 0
     				if message != nil {
     					dataLen = len(message.GetData())
    -					originator = core.PeerID(message.GetFrom())
     				}
    -
    -				log.Error("recovered from panic in pubsubCallback",
    -					"topic", topic,
    -					"originator", p2p.PeerIDToShortString(originator),
    -					"from connected peer", p2p.PeerIDToShortString(fromConnectedPeer),
    -					"panic", fmt.Sprintf("%v", r),
    -					"stack", string(debug.Stack()),
    -				)
    -				// Blacklist both peers, as transformAndCheckMessage does on severe errors.
    -				netMes.blacklistPid(fromConnectedPeer, core.WrongP2PMessageBlacklistDuration)
    -				netMes.blacklistPid(originator, core.WrongP2PMessageBlacklistDuration)
     				netMes.processDebugMessage(topic, fromConnectedPeer, uint64(dataLen), true)
     				isValid = false
     			}
    @@ -943,6 +930,26 @@ func (netMes *networkMessenger) pubsubCallback(handler p2p.MessageProcessor, top
     	}
     }
     
    +// recoverFromMessagePanic logs a recovered panic from processing an untrusted p2p
    +// message and blacklists both the connected peer and the originator, using nil-safe
    +// accessors. Shared by the gossip and direct-send receive paths (GHSA-rm5c-5x2p-48wr).
    +func (netMes *networkMessenger) recoverFromMessagePanic(r interface{}, topic string, fromConnectedPeer core.PeerID, message *pubsub.Message) {
    +	var originator core.PeerID
    +	if message != nil {
    +		originator = core.PeerID(message.GetFrom())
    +	}
    +
    +	log.Error("recovered from panic while processing p2p message",
    +		"topic", topic,
    +		"originator", p2p.PeerIDToShortString(originator),
    +		"from connected peer", p2p.PeerIDToShortString(fromConnectedPeer),
    +		"panic", fmt.Sprintf("%v", r),
    +		"stack", string(debug.Stack()),
    +	)
    +	netMes.blacklistPid(fromConnectedPeer, core.WrongP2PMessageBlacklistDuration)
    +	netMes.blacklistPid(originator, core.WrongP2PMessageBlacklistDuration)
    +}
    +
     func (netMes *networkMessenger) transformAndCheckMessage(pbMsg *pubsub.Message, pid core.PeerID, topic string) (p2p.MessageP2P, error) {
     	msg, errUnmarshal := NewMessage(pbMsg, netMes.marshalizer)
     	if errUnmarshal != nil {
    @@ -1142,6 +1149,19 @@ func (netMes *networkMessenger) directMessageHandler(message *pubsub.Message, fr
     	}
     
     	go func(msg p2p.MessageP2P) {
    +		// Circuit-breaker for the direct-send path: the gossip recover does not cover
    +		// this goroutine (GHSA-rm5c-5x2p-48wr).
    +		defer func() {
    +			if r := recover(); r != nil {
    +				netMes.recoverFromMessagePanic(r, topic, fromConnectedPeer, message)
    +				dataLen := 0
    +				if message != nil {
    +					dataLen = len(message.GetData())
    +				}
    +				netMes.processDebugMessage(topic, fromConnectedPeer, uint64(dataLen), true)
    +			}
    +		}()
    +
     		if check.IfNil(msg) {
     			return
     		}
    
  • network/p2p/libp2p/netMessenger_test.go+79 0 modified
    @@ -1599,6 +1599,85 @@ func TestNetworkMessenger_PubsubCallbackRecoversFromHandlerPanic(t *testing.T) {
     	_ = mes.Close()
     }
     
    +// Regression test for GHSA-rm5c-5x2p-48wr (direct-send variant): a panic in the
    +// direct-send goroutine must be recovered (no crash) and blacklist the sender.
    +func TestNetworkMessenger_DirectMessageHandlerRecoversFromPanic(t *testing.T) {
    +	args := libp2p.ArgsNetworkMessenger{
    +		Marshalizer:   &commonMock.ProtoMarshalizerMock{},
    +		ListenAddress: libp2p.ListenLocalhostAddrWithIp4AndTcp,
    +		P2pConfig: config.P2PConfig{
    +			Node: config.NodeConfig{
    +				Port: "0",
    +			},
    +			KadDhtPeerDiscovery: config.KadDhtPeerDiscoveryConfig{
    +				Enabled: false,
    +			},
    +			Sharding: config.ShardingConfig{
    +				Type: p2p.NilListSharder,
    +			},
    +		},
    +		SyncTimer: &libp2p.LocalSyncTimer{},
    +	}
    +
    +	mes, _ := libp2p.NewNetworkMessenger(args)
    +
    +	// processReceivedDirectMessage enforces From == fromConnectedPeer, so the originator
    +	// and connected peer are the same on the direct path.
    +	peerID := mes.ID()
    +
    +	blacklistedCh := make(chan string, 4)
    +	_ = mes.SetPeerDenialEvaluator(&mock.PeerDenialEvaluatorStub{
    +		UpsertPeerIDCalled: func(pid core.PeerID, duration time.Duration) error {
    +			blacklistedCh <- string(pid)
    +			return nil
    +		},
    +		IsDeniedCalled: func(pid core.PeerID) bool {
    +			return false
    +		},
    +	})
    +
    +	topic := "topic"
    +	_ = mes.CreateTopic(topic, false)
    +	numCalled := uint32(0)
    +	_ = mes.RegisterMessageProcessor(topic, &mock.MessageProcessorStub{
    +		ProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error {
    +			atomic.AddUint32(&numCalled, 1)
    +			panic("simulated nil-pointer dereference while processing a malicious block")
    +		},
    +	})
    +
    +	innerMessage := &data.TopicMessage{
    +		Payload:   []byte("data"),
    +		Timestamp: time.Now().Unix(),
    +		Version:   libp2p.CurrentTopicMessageVersion,
    +	}
    +	buff, _ := args.Marshalizer.Marshal(innerMessage)
    +	msg := &pubsub.Message{
    +		Message: &pubsub_pb.Message{
    +			From:  []byte(peerID),
    +			Data:  buff,
    +			Seqno: []byte{0, 0, 0, 1},
    +			Topic: &topic,
    +		},
    +	}
    +
    +	// The recover runs in a goroutine: if it failed, the test binary would crash.
    +	require.NotPanics(t, func() {
    +		_ = mes.DirectMessageHandler(msg, peerID)
    +	})
    +
    +	// Wait for the recover to blacklist the sender.
    +	select {
    +	case pid := <-blacklistedCh:
    +		assert.Equal(t, string(peerID), pid, "sender should be blacklisted on panic")
    +	case <-time.After(2 * time.Second):
    +		t.Fatal("timed out waiting for the sender to be blacklisted")
    +	}
    +	assert.Equal(t, uint32(1), atomic.LoadUint32(&numCalled))
    +
    +	_ = mes.Close()
    +}
    +
     func TestNetworkMessenger_UnjoinAllTopicsShouldWork(t *testing.T) {
     	args := libp2p.ArgsNetworkMessenger{
     		Marshalizer:   &commonMock.ProtoMarshalizerMock{},
    
23b74e1e69ce

[KLC-2424] blockchain: atomic Set/Get current header+hash primitive (#65)

https://github.com/klever-io/klever-goFernando SobreiraMay 29, 2026Fixed in 1.7.18via ghsa-release-walk
10 files changed · +411 75
  • common/mock/blockChainMock.go+34 16 modified
    @@ -6,22 +6,24 @@ import (
     
     // BlockChainMock is a mock implementation of the blockchain interface
     type BlockChainMock struct {
    -	GetGenesisHeaderCalled          func() data.HeaderHandler
    -	SetGenesisHeaderCalled          func(handler data.HeaderHandler) error
    -	GetGenesisHeaderHashCalled      func() []byte
    -	SetGenesisHeaderHashCalled      func([]byte)
    -	GetCurrentBlockHeaderCalled     func() data.HeaderHandler
    -	SetCurrentBlockHeaderCalled     func(data.HeaderHandler) error
    -	GetCurrentBlockHeaderHashCalled func() []byte
    -	SetCurrentBlockHeaderHashCalled func([]byte)
    -	GetCurrentBlockRootHashCalled   func() []byte
    -	GetLocalHeightCalled            func() int64
    -	SetLocalHeightCalled            func(int64)
    -	GetNetworkHeightCalled          func() int64
    -	SetNetworkHeightCalled          func(int64)
    -	HasBadBlockCalled               func([]byte) bool
    -	PutBadBlockCalled               func([]byte)
    -	CreateNewHeaderCalled           func() data.HeaderHandler
    +	GetGenesisHeaderCalled             func() data.HeaderHandler
    +	SetGenesisHeaderCalled             func(handler data.HeaderHandler) error
    +	GetGenesisHeaderHashCalled         func() []byte
    +	SetGenesisHeaderHashCalled         func([]byte)
    +	GetCurrentBlockHeaderCalled        func() data.HeaderHandler
    +	SetCurrentBlockHeaderCalled        func(data.HeaderHandler) error
    +	GetCurrentBlockHeaderHashCalled    func() []byte
    +	SetCurrentBlockHeaderHashCalled    func([]byte)
    +	SetCurrentBlockHeaderAndHashCalled func(data.HeaderHandler, []byte) error
    +	GetCurrentBlockHeaderAndHashCalled func() (data.HeaderHandler, []byte)
    +	GetCurrentBlockRootHashCalled      func() []byte
    +	GetLocalHeightCalled               func() int64
    +	SetLocalHeightCalled               func(int64)
    +	GetNetworkHeightCalled             func() int64
    +	SetNetworkHeightCalled             func(int64)
    +	HasBadBlockCalled                  func([]byte) bool
    +	PutBadBlockCalled                  func([]byte)
    +	CreateNewHeaderCalled              func() data.HeaderHandler
     }
     
     // GetGenesisHeader returns the genesis block header pointer
    @@ -86,6 +88,22 @@ func (bc *BlockChainMock) SetCurrentBlockHeaderHash(hash []byte) {
     	}
     }
     
    +// SetCurrentBlockHeaderAndHash atomically sets the current block header and its hash
    +func (bc *BlockChainMock) SetCurrentBlockHeaderAndHash(header data.HeaderHandler, hash []byte) error {
    +	if bc.SetCurrentBlockHeaderAndHashCalled != nil {
    +		return bc.SetCurrentBlockHeaderAndHashCalled(header, hash)
    +	}
    +	return nil
    +}
    +
    +// GetCurrentBlockHeaderAndHash atomically returns the current block header and its hash
    +func (bc *BlockChainMock) GetCurrentBlockHeaderAndHash() (data.HeaderHandler, []byte) {
    +	if bc.GetCurrentBlockHeaderAndHashCalled != nil {
    +		return bc.GetCurrentBlockHeaderAndHashCalled()
    +	}
    +	return nil, nil
    +}
    +
     // GetCurrentBlockRootHash returns the current block root hash
     func (bc *BlockChainMock) GetCurrentBlockRootHash() []byte {
     	if bc.GetCurrentBlockRootHashCalled != nil {
    
  • common/mock/blockChainStub.go+27 9 modified
    @@ -6,15 +6,17 @@ import (
     
     // BlockChainStub is a mock implementation of the blockchain interface
     type BlockChainStub struct {
    -	GetGenesisHeaderCalled          func() data.HeaderHandler
    -	SetGenesisHeaderCalled          func(handler data.HeaderHandler) error
    -	GetGenesisHeaderHashCalled      func() []byte
    -	SetGenesisHeaderHashCalled      func([]byte)
    -	GetCurrentBlockHeaderCalled     func() data.HeaderHandler
    -	SetCurrentBlockHeaderCalled     func(data.HeaderHandler) error
    -	GetCurrentBlockHeaderHashCalled func() []byte
    -	SetCurrentBlockHeaderHashCalled func([]byte)
    -	CreateNewHeaderCalled           func() data.HeaderHandler
    +	GetGenesisHeaderCalled             func() data.HeaderHandler
    +	SetGenesisHeaderCalled             func(handler data.HeaderHandler) error
    +	GetGenesisHeaderHashCalled         func() []byte
    +	SetGenesisHeaderHashCalled         func([]byte)
    +	GetCurrentBlockHeaderCalled        func() data.HeaderHandler
    +	SetCurrentBlockHeaderCalled        func(data.HeaderHandler) error
    +	GetCurrentBlockHeaderHashCalled    func() []byte
    +	SetCurrentBlockHeaderHashCalled    func([]byte)
    +	SetCurrentBlockHeaderAndHashCalled func(data.HeaderHandler, []byte) error
    +	GetCurrentBlockHeaderAndHashCalled func() (data.HeaderHandler, []byte)
    +	CreateNewHeaderCalled              func() data.HeaderHandler
     }
     
     // GetGenesisHeader returns the genesis block header pointer
    @@ -79,6 +81,22 @@ func (bcs *BlockChainStub) SetCurrentBlockHeaderHash(hash []byte) {
     	}
     }
     
    +// SetCurrentBlockHeaderAndHash atomically sets the current block header and its hash
    +func (bcs *BlockChainStub) SetCurrentBlockHeaderAndHash(header data.HeaderHandler, hash []byte) error {
    +	if bcs.SetCurrentBlockHeaderAndHashCalled != nil {
    +		return bcs.SetCurrentBlockHeaderAndHashCalled(header, hash)
    +	}
    +	return nil
    +}
    +
    +// GetCurrentBlockHeaderAndHash atomically returns the current block header and its hash
    +func (bcs *BlockChainStub) GetCurrentBlockHeaderAndHash() (data.HeaderHandler, []byte) {
    +	if bcs.GetCurrentBlockHeaderAndHashCalled != nil {
    +		return bcs.GetCurrentBlockHeaderAndHashCalled()
    +	}
    +	return nil, nil
    +}
    +
     // IsInterfaceNil returns true if there is no value under the interface
     func (bcs *BlockChainStub) IsInterfaceNil() bool {
     	return bcs == nil
    
  • core/process/block/block.go+3 3 modified
    @@ -673,12 +673,12 @@ func (mp *metaProcessor) CommitBlock(
     	lastMetaBlock := mp.blockChain.GetCurrentBlockHeader()
     	mp.updateState(lastMetaBlock)
     
    -	// set blockchain header info
    -	err = mp.blockChain.SetCurrentBlockHeader(header)
    +	// set blockchain header info atomically so readers cannot observe a
    +	// mismatched (header, hash) pair between the two updates
    +	err = mp.blockChain.SetCurrentBlockHeaderAndHash(header, headerHash)
     	if err != nil {
     		return err
     	}
    -	mp.blockChain.SetCurrentBlockHeaderHash(headerHash)
     
     	mp.tpsBenchmark.Update(lastMetaBlock)
     
    
  • core/process/sync/baseSync.go+5 14 modified
    @@ -886,32 +886,23 @@ func (boot *baseBootstrap) restoreState(
     		"nonce", currHeader.GetNonce(),
     		"hash", currHeaderHash)
     
    -	err := boot.chainHandler.SetCurrentBlockHeader(currHeader)
    +	err := boot.chainHandler.SetCurrentBlockHeaderAndHash(currHeader, currHeaderHash)
     	if err != nil {
    -		log.Debug("SetCurrentBlockHeader", "error", err.Error())
    +		// recovery path after a failed rollback — surface double-failures so ops can react
    +		log.Warn("restoreState: SetCurrentBlockHeaderAndHash", "error", err.Error())
     	}
     
    -	boot.chainHandler.SetCurrentBlockHeaderHash(currHeaderHash)
    -
     	err = boot.blockProcessor.RevertStateToBlock(currHeader)
     	if err != nil {
    -		log.Debug("RevertState", "error", err.Error())
    +		log.Warn("restoreState: RevertStateToBlock", "error", err.Error())
     	}
     }
     
     func (boot *baseBootstrap) setCurrentBlockInfo(
     	headerHash []byte,
     	header data.HeaderHandler,
     ) error {
    -
    -	err := boot.chainHandler.SetCurrentBlockHeader(header)
    -	if err != nil {
    -		return err
    -	}
    -
    -	boot.chainHandler.SetCurrentBlockHeaderHash(headerHash)
    -
    -	return nil
    +	return boot.chainHandler.SetCurrentBlockHeaderAndHash(header, headerHash)
     }
     
     func (boot *baseBootstrap) init() {
    
  • core/process/sync/storageBootstrap/baseStorageBootstrapper.go+3 12 modified
    @@ -379,14 +379,7 @@ func (st *storageBootstrapper) cleanupStorage(headerInfo *bootstrapStorage.Boots
     }
     
     func (st *storageBootstrapper) applyBlock(header data.HeaderHandler, headerHash []byte) error {
    -	err := st.blkc.SetCurrentBlockHeader(header)
    -	if err != nil {
    -		return err
    -	}
    -
    -	st.blkc.SetCurrentBlockHeaderHash(headerHash)
    -
    -	return nil
    +	return st.blkc.SetCurrentBlockHeaderAndHash(header, headerHash)
     }
     
     func (st *storageBootstrapper) restoreBlockChainToGenesis() {
    @@ -396,12 +389,10 @@ func (st *storageBootstrapper) restoreBlockChainToGenesis() {
     		log.Debug("cannot recreate trie for genesis header with nonce", "nonce", genesisHeader.GetNonce())
     	}
     
    -	err = st.blkc.SetCurrentBlockHeader(nil)
    +	err = st.blkc.SetCurrentBlockHeaderAndHash(nil, nil)
     	if err != nil {
    -		log.Debug("cannot set current block header", "error", err.Error())
    +		log.Debug("cannot set current block header and hash", "error", err.Error())
     	}
    -
    -	st.blkc.SetCurrentBlockHeaderHash(nil)
     }
     
     func checkBaseStorageBootstrapperArguments(args ArgsBaseStorageBootstrapper) error {
    
  • core/process/sync/storageBootstrap/baseStorageBootstrapper_test.go+4 5 modified
    @@ -176,8 +176,9 @@ func TestStorageBootstrapper_LoadAndApplyBlocks(t *testing.T) {
     		var headerCalled []byte
     
     		sb := newValidStorageBootstrapper(bootstrapData)
    -		sb.blkc.(*mock.BlockChainMock).SetCurrentBlockHeaderHashCalled = func(hash []byte) {
    +		sb.blkc.(*mock.BlockChainMock).SetCurrentBlockHeaderAndHashCalled = func(_ data.HeaderHandler, hash []byte) error {
     			headerCalled = hash
    +			return nil
     		}
     
     		result, err := sb.loadAndApplyBlocks(98)
    @@ -268,16 +269,14 @@ func TestStorageBootstrapper_HandleBlockLoadingError(t *testing.T) {
     		sb.blkc.(*mock.BlockChainMock).GetGenesisHeaderCalled = func() data.HeaderHandler {
     			return &block.Block{}
     		}
    -		sb.blkc.(*mock.BlockChainMock).SetCurrentBlockHeaderCalled = func(header data.HeaderHandler) error {
    +		sb.blkc.(*mock.BlockChainMock).SetCurrentBlockHeaderAndHashCalled = func(header data.HeaderHandler, hash []byte) error {
     			if header == nil {
     				headerCleared = true
     			}
    -			return nil
    -		}
    -		sb.blkc.(*mock.BlockChainMock).SetCurrentBlockHeaderHashCalled = func(hash []byte) {
     			if hash == nil {
     				headerHashCleared = true
     			}
    +			return nil
     		}
     
     		testErr := errors.New("test error")
    
  • data/blockchain/blockchain.go+60 10 modified
    @@ -56,28 +56,36 @@ func (bc *blockChain) SetGenesisHeader(genesisBlock data.HeaderHandler) error {
     	return nil
     }
     
    -// SetCurrentBlockHeader sets current block header pointer
    -func (bc *blockChain) SetCurrentBlockHeader(header data.HeaderHandler) error {
    +// prepareCurrentBlockHeader validates the header, updates status metrics, and returns
    +// a clone ready to be stored. Returns (nil, nil) when header is the nil interface.
    +// Callers must assign the returned value under the appropriate mutex.
    +func (bc *blockChain) prepareCurrentBlockHeader(header data.HeaderHandler) (data.HeaderHandler, error) {
     	if check.IfNil(header) {
    -		bc.mut.Lock()
    -		bc.currentBlockHeader = nil
    -		bc.mut.Unlock()
    -
    -		return nil
    +		return nil, nil
     	}
     
     	h, ok := header.(*block.Block)
     	if !ok {
    -		return common.ErrInvalidHeaderType
    +		return nil, common.ErrInvalidHeaderType
     	}
     
    -	log.Trace("SetCurrentBlockHeader", "nonce", h.Header.Nonce)
    +	log.Trace("setCurrentBlockHeader", "nonce", h.Header.Nonce)
     
     	bc.appStatusHandler.SetUInt64Value(core.MetricSynchronizedSlot, h.Header.Slot)
     	bc.appStatusHandler.SetUInt64Value(core.MetricNonce, h.Header.Nonce)
     
    +	return h.Clone(), nil
    +}
    +
    +// SetCurrentBlockHeader sets current block header pointer
    +func (bc *blockChain) SetCurrentBlockHeader(header data.HeaderHandler) error {
    +	clone, err := bc.prepareCurrentBlockHeader(header)
    +	if err != nil {
    +		return err
    +	}
    +
     	bc.mut.Lock()
    -	bc.currentBlockHeader = h.Clone()
    +	bc.currentBlockHeader = clone
     	bc.mut.Unlock()
     
     	return nil
    @@ -147,13 +155,55 @@ func (bc *blockChain) GetCurrentBlockHeaderHash() []byte {
     	return bc.currentBlockHeaderHash
     }
     
    +// GetCurrentBlockHeaderAndHash atomically returns the current block header and
    +// its hash under a single read lock acquisition. Use this in preference to
    +// calling GetCurrentBlockHeader and GetCurrentBlockHeaderHash separately when
    +// the (header, hash) pair must be consistent — those two calls take separate
    +// RLocks and can observe an update interleaved between them.
    +//
    +// The returned hash is a defensive copy so callers cannot mutate the
    +// backing array and corrupt the atomic snapshot.
    +func (bc *blockChain) GetCurrentBlockHeaderAndHash() (data.HeaderHandler, []byte) {
    +	bc.mut.RLock()
    +	defer bc.mut.RUnlock()
    +
    +	hashCopy := append([]byte(nil), bc.currentBlockHeaderHash...)
    +	if check.IfNil(bc.currentBlockHeader) {
    +		return nil, hashCopy
    +	}
    +
    +	return bc.currentBlockHeader.Clone(), hashCopy
    +}
    +
     // SetCurrentBlockHeaderHash returns the current block header hash
     func (bc *blockChain) SetCurrentBlockHeaderHash(hash []byte) {
     	bc.mut.Lock()
     	bc.currentBlockHeaderHash = hash
     	bc.mut.Unlock()
     }
     
    +// SetCurrentBlockHeaderAndHash atomically sets the current block header and its hash
    +// under a single mutex acquisition, preventing concurrent readers from observing a
    +// mismatched (header, hash) pair between the two updates.
    +//
    +// The hash bytes are defensively copied so subsequent caller-side mutation of the
    +// input slice cannot corrupt the stored snapshot.
    +func (bc *blockChain) SetCurrentBlockHeaderAndHash(header data.HeaderHandler, hash []byte) error {
    +	clone, err := bc.prepareCurrentBlockHeader(header)
    +	if err != nil {
    +		return err
    +	}
    +
    +	hashCopy := append([]byte(nil), hash...)
    +
    +	bc.mut.Lock()
    +	bc.currentBlockHeader = clone
    +	bc.currentBlockHeaderHash = hashCopy
    +	bc.mut.Unlock()
    +
    +	return nil
    +}
    +
     // GetCurrentBlockRootHash returns the current committed block root hash. The returned byte slice is a new copy
     // of the contained root hash.
     func (bc *blockChain) GetCurrentBlockRootHash() []byte {
    
  • data/blockchain/blockchain_test.go+222 0 modified
    @@ -1,7 +1,10 @@
     package blockchain_test
     
     import (
    +	"fmt"
    +	"runtime"
     	"testing"
    +	"time"
     
     	"github.com/klever-io/klever-go/common"
     	"github.com/klever-io/klever-go/common/mock"
    @@ -113,6 +116,225 @@ func TestBlockChain_SettersWithInvalidBlock(t *testing.T) {
     	assert.Equal(t, common.ErrInvalidHeaderType, err)
     }
     
    +func TestBlockChain_SetCurrentBlockHeaderAndHashShouldWork(t *testing.T) {
    +	t.Parallel()
    +
    +	bc := blockchain.NewBlockChain()
    +
    +	hdr := &block.Block{
    +		Header: &block.BlockHeader{
    +			Nonce: 7,
    +		},
    +	}
    +	hash := []byte("matching-hash")
    +
    +	err := bc.SetCurrentBlockHeaderAndHash(hdr, hash)
    +	assert.Nil(t, err)
    +
    +	assert.Equal(t, hdr.Clone(), bc.GetCurrentBlockHeader())
    +	assert.Equal(t, hash, bc.GetCurrentBlockHeaderHash())
    +}
    +
    +func TestBlockChain_SetCurrentBlockHeaderAndHashNilHeader(t *testing.T) {
    +	t.Parallel()
    +
    +	bc := blockchain.NewBlockChain()
    +
    +	hdr := &block.Block{
    +		Header: &block.BlockHeader{
    +			Nonce: 3,
    +		},
    +	}
    +	require.NoError(t, bc.SetCurrentBlockHeader(hdr))
    +	bc.SetCurrentBlockHeaderHash([]byte("old"))
    +
    +	err := bc.SetCurrentBlockHeaderAndHash(nil, []byte("new"))
    +	assert.Nil(t, err)
    +
    +	assert.Nil(t, bc.GetCurrentBlockHeader())
    +	assert.Equal(t, []byte("new"), bc.GetCurrentBlockHeaderHash())
    +}
    +
    +func TestBlockChain_SetCurrentBlockHeaderAndHashInvalidHeaderType(t *testing.T) {
    +	t.Parallel()
    +
    +	bc := blockchain.NewBlockChain()
    +
    +	hdr := &block.Block{
    +		Header: &block.BlockHeader{
    +			Nonce: 1,
    +		},
    +	}
    +	hash := []byte("preserved")
    +	require.NoError(t, bc.SetCurrentBlockHeader(hdr))
    +	bc.SetCurrentBlockHeaderHash(hash)
    +
    +	err := bc.SetCurrentBlockHeaderAndHash(&mock.HeaderHandlerStub{}, []byte("ignored"))
    +	assert.Equal(t, common.ErrInvalidHeaderType, err)
    +
    +	assert.Equal(t, hdr.Clone(), bc.GetCurrentBlockHeader())
    +	assert.Equal(t, hash, bc.GetCurrentBlockHeaderHash())
    +}
    +
    +func TestBlockChain_GetCurrentBlockHeaderAndHashAtomicPairs(t *testing.T) {
    +	t.Parallel()
    +
    +	bc := blockchain.NewBlockChain()
    +
    +	hdrA := &block.Block{Header: &block.BlockHeader{Nonce: 10}}
    +	hashA := []byte("hash-10")
    +	hdrB := &block.Block{Header: &block.BlockHeader{Nonce: 9}}
    +	hashB := []byte("hash-9")
    +
    +	// The pair (nonce, hash) must always be one of these two valid combinations.
    +	pairs := map[uint64]string{
    +		10: "hash-10",
    +		9:  "hash-9",
    +	}
    +	require.NoError(t, bc.SetCurrentBlockHeaderAndHash(hdrA, hashA))
    +
    +	stop := make(chan struct{})
    +	mismatch := make(chan string, 1)
    +	writeErr := make(chan error, 1)
    +	done := make(chan struct{}, 2)
    +
    +	go func() {
    +		defer func() { done <- struct{}{} }()
    +		for {
    +			select {
    +			case <-stop:
    +				return
    +			default:
    +			}
    +			h, hash := bc.GetCurrentBlockHeaderAndHash()
    +			if h != nil {
    +				want, ok := pairs[h.GetNonce()]
    +				if !ok || want != string(hash) {
    +					select {
    +					case mismatch <- fmt.Sprintf("nonce=%d hash=%q", h.GetNonce(), string(hash)):
    +					default:
    +					}
    +					return
    +				}
    +			}
    +			runtime.Gosched()
    +		}
    +	}()
    +
    +	go func() {
    +		defer func() { done <- struct{}{} }()
    +		recordErr := func(err error) {
    +			if err == nil {
    +				return
    +			}
    +			select {
    +			case writeErr <- err:
    +			default:
    +			}
    +		}
    +		for {
    +			select {
    +			case <-stop:
    +				return
    +			default:
    +			}
    +			recordErr(bc.SetCurrentBlockHeaderAndHash(hdrB, hashB))
    +			recordErr(bc.SetCurrentBlockHeaderAndHash(hdrA, hashA))
    +			runtime.Gosched()
    +		}
    +	}()
    +
    +	time.Sleep(100 * time.Millisecond)
    +	close(stop)
    +	<-done
    +	<-done
    +
    +	select {
    +	case m := <-mismatch:
    +		t.Fatalf("observed mismatched (header, hash) pair via paired-read getter: %s", m)
    +	default:
    +	}
    +	select {
    +	case err := <-writeErr:
    +		t.Fatalf("writer goroutine reported error: %v", err)
    +	default:
    +	}
    +}
    +
    +func TestBlockChain_GetCurrentBlockHeaderAndHashEmpty(t *testing.T) {
    +	t.Parallel()
    +
    +	bc := blockchain.NewBlockChain()
    +
    +	h, hash := bc.GetCurrentBlockHeaderAndHash()
    +	assert.Nil(t, h)
    +	assert.Nil(t, hash)
    +}
    +
    +func TestBlockChain_SetCurrentBlockHeaderAndHashRaceClean(t *testing.T) {
    +	t.Parallel()
    +
    +	bc := blockchain.NewBlockChain()
    +
    +	hdrA := &block.Block{Header: &block.BlockHeader{Nonce: 10}}
    +	hashA := []byte("hash-10")
    +	hdrB := &block.Block{Header: &block.BlockHeader{Nonce: 9}}
    +	hashB := []byte("hash-9")
    +	require.NoError(t, bc.SetCurrentBlockHeaderAndHash(hdrA, hashA))
    +
    +	stop := make(chan struct{})
    +	writeErr := make(chan error, 1)
    +	done := make(chan struct{}, 2)
    +
    +	go func() {
    +		defer func() { done <- struct{}{} }()
    +		for {
    +			select {
    +			case <-stop:
    +				return
    +			default:
    +			}
    +			_ = bc.GetCurrentBlockHeader()
    +			_ = bc.GetCurrentBlockHeaderHash()
    +			runtime.Gosched()
    +		}
    +	}()
    +
    +	go func() {
    +		defer func() { done <- struct{}{} }()
    +		recordErr := func(err error) {
    +			if err == nil {
    +				return
    +			}
    +			select {
    +			case writeErr <- err:
    +			default:
    +			}
    +		}
    +		for {
    +			select {
    +			case <-stop:
    +				return
    +			default:
    +			}
    +			recordErr(bc.SetCurrentBlockHeaderAndHash(hdrB, hashB))
    +			recordErr(bc.SetCurrentBlockHeaderAndHash(hdrA, hashA))
    +			runtime.Gosched()
    +		}
    +	}()
    +
    +	time.Sleep(50 * time.Millisecond)
    +	close(stop)
    +	<-done
    +	<-done
    +
    +	select {
    +	case err := <-writeErr:
    +		t.Fatalf("writer goroutine reported error: %v", err)
    +	default:
    +	}
    +}
    +
     func TestBlockChain_GetCurrentBlockRootHash(t *testing.T) {
     	t.Parallel()
     
    
  • data/interface.go+2 0 modified
    @@ -101,6 +101,8 @@ type ChainHandler interface {
     	SetCurrentBlockHeader(bh HeaderHandler) error
     	GetCurrentBlockHeaderHash() []byte
     	SetCurrentBlockHeaderHash(hash []byte)
    +	SetCurrentBlockHeaderAndHash(bh HeaderHandler, hash []byte) error
    +	GetCurrentBlockHeaderAndHash() (HeaderHandler, []byte)
     	GetCurrentBlockRootHash() []byte
     	CreateNewHeader() HeaderHandler
     	IsInterfaceNil() bool
    
  • integrationTest/processorNode/processorNode.go+51 6 modified
    @@ -1465,29 +1465,41 @@ func (n *ProcessorNode) ConnectTo(connectable Connectable) error {
     }
     
     func (n *ProcessorNode) GetCurrentBlockHeaderAndHash() (data.HeaderHandler, []byte) {
    -	return n.Blkc.GetCurrentBlockHeader(), n.Blkc.GetCurrentBlockHeaderHash()
    +	return n.Blkc.GetCurrentBlockHeaderAndHash()
     }
     
    +// RevertOneBlock mirrors the production rollBackOneBlock flow in baseBootstrap:
    +// atomically swap the current header/hash to the previous block, revert state,
    +// restore the current block's body to pools, then clean caches and storage so
    +// the rolled-back block does not snap back via fork detector or nonce->hash storage.
     func (n *ProcessorNode) RevertOneBlock(nonce uint64) error {
    -	// get current block
     	currHeader, err := n.GetBlock(nonce)
     	if err != nil {
     		return err
     	}
     
    -	// get last block
     	prevHeader, err := n.GetBlock(nonce - 1)
     	if err != nil {
     		return err
     	}
     
    -	err = n.Blkc.SetCurrentBlockHeader(prevHeader)
    +	// Mirror production rollBackOneBlock: when reverting block 1 (back to
    +	// genesis), clear the current pair instead of pointing at the genesis
    +	// block. See core/process/sync/baseSync.go rollBackOneBlock.
    +	if nonce <= 1 {
    +		err = n.Blkc.SetCurrentBlockHeaderAndHash(nil, nil)
    +	} else {
    +		var prevHash []byte
    +		prevHash, err = tools.CalculateHash(n.InternalMarshalizer, n.Hasher, prevHeader.GetBlockHeader())
    +		if err != nil {
    +			return err
    +		}
    +		err = n.Blkc.SetCurrentBlockHeaderAndHash(prevHeader, prevHash)
    +	}
     	if err != nil {
     		return err
     	}
     
    -	n.Blkc.SetCurrentBlockHeaderHash(prevHeader.GetParentHash())
    -
     	err = n.BlockProcessor.RevertStateToBlock(prevHeader)
     	if err != nil {
     		return err
    @@ -1500,9 +1512,42 @@ func (n *ProcessorNode) RevertOneBlock(nonce uint64) error {
     		return err
     	}
     
    +	n.cleanCachesAndStorageOnRollback(currHeader)
     	return nil
     }
     
    +// cleanCachesAndStorageOnRollback mirrors baseBootstrap.cleanCachesAndStorageOnRollback:
    +// removes the rolled-back header from the data pool, the fork detector, and the
    +// nonce->hash storage unit. Returns silently on any per-step failure (mirroring
    +// production, which logs at Debug and ignores errors here).
    +func (n *ProcessorNode) cleanCachesAndStorageOnRollback(header data.HeaderHandler) {
    +	hash := n.removeHeaderFromPools(header)
    +	n.ForkDetector.RemoveHeader(header.GetNonce(), hash)
    +	nonceBytes := n.Uint64ByteSliceConverter.ToByteSlice(header.GetNonce())
    +	storer := n.Store.GetStorer(retriever.HdrNonceHashDataUnit)
    +	if storer != nil {
    +		_ = storer.Remove(nonceBytes)
    +	}
    +}
    +
    +// removeHeaderFromPools mirrors baseBootstrap.removeHeaderFromPools: compute the
    +// block's own header hash and evict it from the data pool. Returns nil on any
    +// failure (matching production), with a Debug-level log to preserve symmetry.
    +func (n *ProcessorNode) removeHeaderFromPools(hdr data.HeaderHandler) []byte {
    +	blk, ok := hdr.(*block.Block)
    +	if !ok {
    +		log.Debug("removeHeaderFromPools: type assertion to *block.Block failed")
    +		return nil
    +	}
    +	hash, err := tools.CalculateHash(n.InternalMarshalizer, n.Hasher, blk.Header)
    +	if err != nil {
    +		log.Debug("removeHeaderFromPools: CalculateHash failed", "error", err.Error())
    +		return nil
    +	}
    +	n.DataPool.Headers().RemoveHeaderByHash(hash)
    +	return hash
    +}
    +
     // SyncNode tries to process and commit a block already stored in data pool with provided nonce
     func (n *ProcessorNode) SyncNode(nonce uint64) error {
     	header, err := n.GetBlock(nonce)
    

Vulnerability mechanics

Root cause

"The direct message handler spawns a new goroutine for every incoming message before any admission control logic is applied."

Attack vector

An attacker establishes a libp2p connection and opens a `DirectSendID` stream. By sending well-formed `TopicMessage` envelopes with incrementing sequence numbers, the attacker can bypass deduplication. Each message processed by `directMessageHandler` triggers a new goroutine before the antiflood layer can make an admission decision, leading to unbounded goroutine growth [ref_id=1].

Affected code

The vulnerability resides in the `directMessageHandler` function within the `network/p2p/libp2p/netMessenger.go` file. Specifically, the code spawns a new goroutine using `go func(...)` immediately after validating the message and retrieving the topic processor, but before calling `processor.ProcessReceivedMessage` [ref_id=1].

What the fix does

The advisory suggests two potential fixes: either integrate a throttler, such as `goRoutinesThrottler.CanProcess()`, before spawning the goroutine in `directMessageHandler`, or remove the goroutine entirely and process messages synchronously, mirroring the behavior of the parallel pubsub ingress path [ref_id=1]. The advisory does not specify which fix, if any, has been implemented.

Preconditions

  • networkThe attacker must have an established libp2p connection with the target node.
  • inputThe attacker must send well-formed `TopicMessage` envelopes with varying sequence numbers to bypass deduplication.

Reproduction

The provided reproduction steps involve cloning the `klever-go` repository, checking out version `v1.7.16`, placing a specific Go test file (`dos_directmsg_test.go`) into the `network/p2p/libp2p/` directory, and then running `go test ./network/p2p/libp2p/ -run TestPoC_ -count=1 -v -timeout 60s` [ref_id=1].

Generated on Jun 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

3

News mentions

0

No linked articles in our index yet.