VYPR
Low severityNVD Advisory· Published Nov 10, 2022· Updated May 1, 2025

Nomad Event Stream Subscriber Using a Token with TTL Receives Updates Until Garbage Collected

CVE-2022-3867

Description

HashiCorp Nomad and Nomad Enterprise 1.4.0 up to 1.4.1 event stream subscribers using a token with TTL receive updates until token garbage is collected. Fixed in 1.4.2.

AI Insight

LLM-synthesized narrative grounded in this CVE's description and references.

HashiCorp Nomad 1.4.0-1.4.1 does not validate ACL token expiry for event stream subscribers, allowing continued access beyond TTL until garbage collection.

Vulnerability

Description

CVE-2022-3867 affects HashiCorp Nomad and Nomad Enterprise versions 1.4.0 through 1.4.1. The vulnerability lies in the event stream subscription mechanism: when a subscriber authenticates with an ACL token that has a Time-To-Live (TTL) expiry, the system fails to re-check that the token is still valid before delivering subsequent events. Instead, the subscriber continues to receive updates until the token is garbage collected, which may be significantly after the intended expiry time [1][3][4].

Exploitation

An attacker must have authenticated access to Nomad with a valid ACL token that has a TTL set. No additional privileges are required beyond those already granted to the token. The event stream is a built-in feature for subscribing to state changes (jobs, allocations, evaluations, deployments, nodes) in near real-time [2][4]. The attacker can initiate a subscription and then, after the token has expired, still receive events until the system performs garbage collection on the old token [1][4].

Impact

A malicious operator or third party with authenticated access can continue to monitor sensitive Nomad state changes beyond the authorized time window. This could lead to unauthorized disclosure of operational information, including job specifications, allocation details, or node events, violating the intended access control policy enforced by token TTLs [4].

Mitigation

HashiCorp has fixed this vulnerability in Nomad 1.4.2. The fix modifies the token verification logic to authorize the subscriber's ACL token before sending each event down the stream [1][4]. Users should upgrade to version 1.4.2 or later to remediate the issue. There is no workaround other than upgrading [4].

AI Insight generated on May 21, 2026. Synthesized from this CVE's description and the cited reference URLs; citations are validated against the source bundle.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
github.com/hashicorp/nomadGo
>= 1.4.0, < 1.4.21.4.2

Affected products

3

Patches

1
dd6a4634a965

event stream: ensure token expiry is correctly checked for subs.

https://github.com/hashicorp/nomadJames RasellOct 24, 2022via ghsa
5 files changed · +181 22
  • .changelog/15013.txt+3 0 added
    @@ -0,0 +1,3 @@
    +```release-note:security
    +event stream: Fixed a bug where ACL token expiration was not checked when emitting events
    +```
    
  • nomad/event_endpoint.go+15 1 modified
    @@ -59,9 +59,13 @@ func (e *Event) stream(conn io.ReadWriteCloser) {
     	// start subscription to publisher
     	var subscription *stream.Subscription
     	var subErr error
    +
    +	// Track whether the ACL token being used has an expiry time.
    +	var expiryTime *time.Time
    +
     	// Check required ACL permissions for requested Topics
     	if e.srv.config.ACLEnabled {
    -		subscription, subErr = publisher.SubscribeWithACLCheck(subReq)
    +		subscription, expiryTime, subErr = publisher.SubscribeWithACLCheck(subReq)
     	} else {
     		subscription, subErr = publisher.Subscribe(subReq)
     	}
    @@ -93,6 +97,16 @@ func (e *Event) stream(conn io.ReadWriteCloser) {
     				return
     			}
     
    +			// Ensure the token being used is not expired before we any events
    +			// to subscribers.
    +			if expiryTime != nil && expiryTime.Before(time.Now().UTC()) {
    +				select {
    +				case errCh <- structs.ErrTokenExpired:
    +				case <-ctx.Done():
    +				}
    +				return
    +			}
    +
     			// Continue if there are no events
     			if len(events.Events) == 0 {
     				continue
    
  • nomad/event_endpoint_test.go+116 0 modified
    @@ -15,11 +15,13 @@ import (
     	msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
     	"github.com/hashicorp/nomad/acl"
     	"github.com/hashicorp/nomad/ci"
    +	"github.com/hashicorp/nomad/helper/pointer"
     	"github.com/hashicorp/nomad/nomad/mock"
     	"github.com/hashicorp/nomad/nomad/stream"
     	"github.com/hashicorp/nomad/nomad/structs"
     	"github.com/hashicorp/nomad/testutil"
     	"github.com/mitchellh/mapstructure"
    +	"github.com/shoenig/test/must"
     	"github.com/stretchr/testify/require"
     )
     
    @@ -625,3 +627,117 @@ OUTER:
     		}
     	}
     }
    +
    +// TestEventStream_ACLTokenExpiry ensure a subscription does not receive events
    +// and is closed once the token has expired.
    +func TestEventStream_ACLTokenExpiry(t *testing.T) {
    +	ci.Parallel(t)
    +
    +	// Start our test server and wait until we have a leader.
    +	testServer, _, testServerCleanup := TestACLServer(t, nil)
    +	defer testServerCleanup()
    +	testutil.WaitForLeader(t, testServer.RPC)
    +
    +	// Create and upsert and ACL token which has a short expiry set.
    +	aclTokenWithExpiry := mock.ACLManagementToken()
    +	aclTokenWithExpiry.ExpirationTime = pointer.Of(time.Now().Add(2 * time.Second))
    +
    +	must.NoError(t, testServer.fsm.State().UpsertACLTokens(
    +		structs.MsgTypeTestSetup, 10, []*structs.ACLToken{aclTokenWithExpiry}))
    +
    +	req := structs.EventStreamRequest{
    +		Topics: map[structs.Topic][]string{"Job": {"*"}},
    +		QueryOptions: structs.QueryOptions{
    +			Region:    testServer.Region(),
    +			Namespace: structs.DefaultNamespace,
    +			AuthToken: aclTokenWithExpiry.SecretID,
    +		},
    +	}
    +
    +	handler, err := testServer.StreamingRpcHandler("Event.Stream")
    +	must.NoError(t, err)
    +
    +	p1, p2 := net.Pipe()
    +	defer p1.Close()
    +	defer p2.Close()
    +
    +	errCh := make(chan error)
    +	streamMsg := make(chan *structs.EventStreamWrapper)
    +
    +	go handler(p2)
    +
    +	go func() {
    +		decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
    +		for {
    +			var msg structs.EventStreamWrapper
    +			if err := decoder.Decode(&msg); err != nil {
    +				if err == io.EOF || strings.Contains(err.Error(), "closed") {
    +					return
    +				}
    +				errCh <- fmt.Errorf("error decoding: %w", err)
    +			}
    +
    +			streamMsg <- &msg
    +		}
    +	}()
    +
    +	publisher, err := testServer.State().EventBroker()
    +	must.NoError(t, err)
    +
    +	jobEvent := structs.JobEvent{
    +		Job: mock.Job(),
    +	}
    +
    +	// send req
    +	encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
    +	must.Nil(t, encoder.Encode(req))
    +
    +	// publish some events
    +	publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: structs.TopicJob, Payload: jobEvent}}})
    +	publisher.Publish(&structs.Events{Index: uint64(2), Events: []structs.Event{{Topic: structs.TopicJob, Payload: jobEvent}}})
    +
    +	ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(4*time.Second))
    +	defer cancel()
    +
    +	errChStream := make(chan error, 1)
    +	go func() {
    +		for {
    +			select {
    +			case <-ctx.Done():
    +				errChStream <- ctx.Err()
    +				return
    +			case err := <-errCh:
    +				errChStream <- err
    +				return
    +			case msg := <-streamMsg:
    +				if msg.Error == nil {
    +					continue
    +				}
    +
    +				errChStream <- msg.Error
    +				return
    +			}
    +		}
    +	}()
    +
    +	// Generate a timeout for the test and for the expiry. The expiry timeout
    +	// is used to trigger an update which will close the subscription as the
    +	// event stream only reacts to change in state.
    +	testTimeout := time.After(4 * time.Second)
    +	expiryTimeout := time.After(time.Until(*aclTokenWithExpiry.ExpirationTime))
    +
    +	for {
    +		select {
    +		case <-testTimeout:
    +			t.Fatal("timeout waiting for event stream to close")
    +		case err := <-errCh:
    +			t.Fatal(err)
    +		case <-expiryTimeout:
    +			publisher.Publish(&structs.Events{Index: uint64(1), Events: []structs.Event{{Topic: structs.TopicJob, Payload: jobEvent}}})
    +		case err := <-errChStream:
    +			// Success
    +			must.StrContains(t, err.Error(), "ACL token expired")
    +			return
    +		}
    +	}
    +}
    
  • nomad/stream/event_broker.go+41 18 modified
    @@ -109,19 +109,25 @@ func (e *EventBroker) Publish(events *structs.Events) {
     	e.publishCh <- events
     }
     
    -// SubscribeWithACLCheck validates the SubscribeRequest's token and requested Topics
    -// to ensure that the tokens privileges are sufficient enough.
    -func (e *EventBroker) SubscribeWithACLCheck(req *SubscribeRequest) (*Subscription, error) {
    -	aclObj, err := aclObjFromSnapshotForTokenSecretID(e.aclDelegate.TokenProvider(), e.aclCache, req.Token)
    +// SubscribeWithACLCheck validates the SubscribeRequest's token and requested
    +// topics to ensure that the tokens privileges are sufficient. It will also
    +// return the token expiry time, if any. It is the callers responsibility to
    +// check this before publishing events to the caller.
    +func (e *EventBroker) SubscribeWithACLCheck(req *SubscribeRequest) (*Subscription, *time.Time, error) {
    +	aclObj, expiryTime, err := aclObjFromSnapshotForTokenSecretID(e.aclDelegate.TokenProvider(), e.aclCache, req.Token)
     	if err != nil {
    -		return nil, structs.ErrPermissionDenied
    +		return nil, nil, structs.ErrPermissionDenied
     	}
     
     	if allowed := aclAllowsSubscription(aclObj, req); !allowed {
    -		return nil, structs.ErrPermissionDenied
    +		return nil, nil, structs.ErrPermissionDenied
     	}
     
    -	return e.Subscribe(req)
    +	sub, err := e.Subscribe(req)
    +	if err != nil {
    +		return nil, nil, err
    +	}
    +	return sub, expiryTime, nil
     }
     
     // Subscribe returns a new Subscription for a given request. A Subscription
    @@ -203,13 +209,19 @@ func (e *EventBroker) handleACLUpdates(ctx context.Context) {
     					continue
     				}
     
    -				aclObj, err := aclObjFromSnapshotForTokenSecretID(e.aclDelegate.TokenProvider(), e.aclCache, tokenSecretID)
    +				aclObj, expiryTime, err := aclObjFromSnapshotForTokenSecretID(e.aclDelegate.TokenProvider(), e.aclCache, tokenSecretID)
     				if err != nil || aclObj == nil {
     					e.logger.Error("failed resolving ACL for secretID, closing subscriptions", "error", err)
     					e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID})
     					continue
     				}
     
    +				if expiryTime != nil && expiryTime.Before(time.Now().UTC()) {
    +					e.logger.Info("ACL token is expired, closing subscriptions")
    +					e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID})
    +					continue
    +				}
    +
     				e.subscriptions.closeSubscriptionFunc(tokenSecretID, func(sub *Subscription) bool {
     					return !aclAllowsSubscription(aclObj, sub.req)
     				})
    @@ -245,45 +257,52 @@ func (e *EventBroker) checkSubscriptionsAgainstACLChange() {
     			continue
     		}
     
    -		aclObj, err := aclObjFromSnapshotForTokenSecretID(aclSnapshot, e.aclCache, tokenSecretID)
    +		aclObj, expiryTime, err := aclObjFromSnapshotForTokenSecretID(aclSnapshot, e.aclCache, tokenSecretID)
     		if err != nil || aclObj == nil {
     			e.logger.Debug("failed resolving ACL for secretID, closing subscriptions", "error", err)
     			e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID})
     			continue
     		}
     
    +		if expiryTime != nil && expiryTime.Before(time.Now().UTC()) {
    +			e.logger.Info("ACL token is expired, closing subscriptions")
    +			e.subscriptions.closeSubscriptionsForTokens([]string{tokenSecretID})
    +			continue
    +		}
    +
     		e.subscriptions.closeSubscriptionFunc(tokenSecretID, func(sub *Subscription) bool {
     			return !aclAllowsSubscription(aclObj, sub.req)
     		})
     	}
     }
     
     func aclObjFromSnapshotForTokenSecretID(
    -	aclSnapshot ACLTokenProvider, aclCache *lru.TwoQueueCache, tokenSecretID string) (*acl.ACL, error) {
    +	aclSnapshot ACLTokenProvider, aclCache *lru.TwoQueueCache, tokenSecretID string) (
    +	*acl.ACL, *time.Time, error) {
     
     	aclToken, err := aclSnapshot.ACLTokenBySecretID(nil, tokenSecretID)
     	if err != nil {
    -		return nil, err
    +		return nil, nil, err
     	}
     
     	if aclToken == nil {
    -		return nil, structs.ErrTokenNotFound
    +		return nil, nil, structs.ErrTokenNotFound
     	}
     	if aclToken.IsExpired(time.Now().UTC()) {
    -		return nil, structs.ErrTokenExpired
    +		return nil, nil, structs.ErrTokenExpired
     	}
     
     	// Check if this is a management token
     	if aclToken.Type == structs.ACLManagementToken {
    -		return acl.ManagementACL, nil
    +		return acl.ManagementACL, aclToken.ExpirationTime, nil
     	}
     
     	aclPolicies := make([]*structs.ACLPolicy, 0, len(aclToken.Policies)+len(aclToken.Roles))
     
     	for _, policyName := range aclToken.Policies {
     		policy, err := aclSnapshot.ACLPolicyByName(nil, policyName)
     		if err != nil || policy == nil {
    -			return nil, errors.New("error finding acl policy")
    +			return nil, nil, errors.New("error finding acl policy")
     		}
     		aclPolicies = append(aclPolicies, policy)
     	}
    @@ -294,7 +313,7 @@ func aclObjFromSnapshotForTokenSecretID(
     
     		role, err := aclSnapshot.GetACLRoleByID(nil, roleLink.ID)
     		if err != nil {
    -			return nil, err
    +			return nil, nil, err
     		}
     		if role == nil {
     			continue
    @@ -303,13 +322,17 @@ func aclObjFromSnapshotForTokenSecretID(
     		for _, policyLink := range role.Policies {
     			policy, err := aclSnapshot.ACLPolicyByName(nil, policyLink.Name)
     			if err != nil || policy == nil {
    -				return nil, errors.New("error finding acl policy")
    +				return nil, nil, errors.New("error finding acl policy")
     			}
     			aclPolicies = append(aclPolicies, policy)
     		}
     	}
     
    -	return structs.CompileACLObject(aclCache, aclPolicies)
    +	aclObj, err := structs.CompileACLObject(aclCache, aclPolicies)
    +	if err != nil {
    +		return nil, nil, err
    +	}
    +	return aclObj, aclToken.ExpirationTime, nil
     }
     
     type ACLTokenProvider interface {
    
  • nomad/stream/event_broker_test.go+6 3 modified
    @@ -514,13 +514,14 @@ func TestEventBroker_handleACLUpdates_policyUpdated(t *testing.T) {
     				ns = structs.DefaultNamespace
     			}
     
    -			sub, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{
    +			sub, expiryTime, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{
     				Topics: map[structs.Topic][]string{
     					tc.event.Topic: {"*"},
     				},
     				Namespace: ns,
     				Token:     secretID,
     			})
    +			require.Nil(t, expiryTime)
     
     			if tc.initialSubErr {
     				require.Error(t, err)
    @@ -811,11 +812,12 @@ func TestEventBroker_handleACLUpdates_roleUpdated(t *testing.T) {
     				ns = tc.event.Namespace
     			}
     
    -			sub, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{
    +			sub, expiryTime, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{
     				Topics:    map[structs.Topic][]string{tc.event.Topic: {"*"}},
     				Namespace: ns,
     				Token:     tokenSecretID,
     			})
    +			require.Nil(t, expiryTime)
     
     			if tc.initialSubErr {
     				require.Error(t, err)
    @@ -931,12 +933,13 @@ func TestEventBroker_handleACLUpdates_tokenExpiry(t *testing.T) {
     				Payload: structs.NewACLTokenEvent(&structs.ACLToken{SecretID: tc.inputToken.SecretID}),
     			}
     
    -			sub, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{
    +			sub, expiryTime, err := publisher.SubscribeWithACLCheck(&SubscribeRequest{
     				Topics: map[structs.Topic][]string{structs.TopicAll: {"*"}},
     				Token:  tc.inputToken.SecretID,
     			})
     			require.NoError(t, err)
     			require.NotNil(t, sub)
    +			require.NotNil(t, expiryTime)
     
     			// Publish an event and check that there is a new item in the
     			// subscription queue.
    

Vulnerability mechanics

Generated on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

4

News mentions

0

No linked articles in our index yet.