VYPR
Critical severity9.1NVD Advisory· Published Apr 20, 2026· Updated Apr 22, 2026

CVE-2026-33557

CVE-2026-33557

Description

A possible security vulnerability has been identified in Apache Kafka.

By default, the broker property sasl.oauthbearer.jwt.validator.class is set to org.apache.kafka.common.security.oauthbearer.DefaultJwtValidator. It accepts any JWT token without validating its signature, issuer, or audience. An attacker can generate a JWT token from any issuer with the preferred_username set to any user, and the broker will accept it.

We advise the Kafka users using kafka v4.1.0 or v4.1.1 to set the config sasl.oauthbearer.jwt.validator.class to org.apache.kafka.common.security.oauthbearer.BrokerJwtValidator explicitly to avoid this vulnerability. Since Kafka v4.1.2 and v4.2.0 and later, the issue is fixed and will correctly validate the JWT token.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
org.apache.kafka:kafka-clientsMaven
>= 4.1.0, < 4.1.24.1.2

Affected products

1
  • cpe:2.3:a:apache:kafka:*:*:*:*:*:*:*:*
    Range: >=4.1.0,<4.1.2

Patches

1
01d8e7db8d08

MINOR: Code cleanup and additional tests for DefaultJwtValidator

https://github.com/apache/kafkaKirk TrueNov 24, 2025via ghsa
3 files changed · +74 4
  • clients/src/main/java/org/apache/kafka/common/security/oauthbearer/DefaultJwtValidator.java+9 1 modified
    @@ -17,7 +17,9 @@
     
     package org.apache.kafka.common.security.oauthbearer;
     
    +import org.apache.kafka.common.config.SaslConfigs;
     import org.apache.kafka.common.security.oauthbearer.internals.secured.CloseableVerificationKeyResolver;
    +import org.apache.kafka.common.security.oauthbearer.internals.secured.ConfigurationUtils;
     import org.apache.kafka.common.utils.Utils;
     
     import org.jose4j.keys.resolvers.VerificationKeyResolver;
    @@ -54,7 +56,13 @@ public void configure(Map<String, ?> configs, String saslMechanism, List<AppConf
             if (verificationKeyResolver.isPresent()) {
                 delegate = new BrokerJwtValidator(verificationKeyResolver.get());
             } else {
    -            delegate = new ClientJwtValidator();
    +            ConfigurationUtils cu = new ConfigurationUtils(configs, saslMechanism);
    +
    +            if (cu.containsKey(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL)) {
    +                delegate = new BrokerJwtValidator();
    +            } else {
    +                delegate = new ClientJwtValidator();
    +            }
             }
     
             delegate.configure(configs, saslMechanism, jaasConfigEntries);
    
  • clients/src/test/java/org/apache/kafka/common/security/oauthbearer/DefaultJwtValidatorTest.java+33 0 modified
    @@ -17,21 +17,34 @@
     
     package org.apache.kafka.common.security.oauthbearer;
     
    +import org.apache.kafka.common.config.SaslConfigs;
    +import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
     import org.apache.kafka.common.security.oauthbearer.internals.secured.AccessTokenBuilder;
     import org.apache.kafka.common.security.oauthbearer.internals.secured.CloseableVerificationKeyResolver;
     import org.apache.kafka.common.security.oauthbearer.internals.secured.OAuthBearerTest;
     
    +import org.jose4j.jwk.JsonWebKey;
    +import org.jose4j.jwk.JsonWebKeySet;
    +import org.jose4j.jwk.PublicJsonWebKey;
     import org.jose4j.jws.AlgorithmIdentifiers;
    +import org.junit.jupiter.api.AfterEach;
     import org.junit.jupiter.api.Test;
     
     import java.util.Map;
     
    +import static org.apache.kafka.common.config.internals.BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG;
     import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
    +import static org.apache.kafka.test.TestUtils.tempFile;
     import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
     import static org.junit.jupiter.api.Assertions.assertInstanceOf;
     
     public class DefaultJwtValidatorTest extends OAuthBearerTest {
     
    +    @AfterEach
    +    public void tearDown() {
    +        System.clearProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG);
    +    }
    +
         @Test
         public void testConfigureWithVerificationKeyResolver() {
             AccessTokenBuilder builder = new AccessTokenBuilder()
    @@ -51,6 +64,26 @@ public void testConfigureWithoutVerificationKeyResolver() {
             assertInstanceOf(ClientJwtValidator.class, jwtValidator.delegate());
         }
     
    +    @Test
    +    public void testConfigureWithJwksUrl() throws Exception {
    +        PublicJsonWebKey jwk = createRsaJwk();
    +        AccessTokenBuilder builder = new AccessTokenBuilder()
    +            .jwk(jwk)
    +            .alg(AlgorithmIdentifiers.RSA_USING_SHA256);
    +        String accessToken = builder.build();
    +
    +        JsonWebKeySet jwks = new JsonWebKeySet(jwk);
    +        String jwksJson = jwks.toJson(JsonWebKey.OutputControlLevel.PUBLIC_ONLY);
    +        String fileUrl = tempFile(jwksJson).toURI().toString();
    +        System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, fileUrl);
    +        Map<String, ?> configs = getSaslConfigs(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL, fileUrl);
    +
    +        DefaultJwtValidator jwtValidator = new DefaultJwtValidator();
    +        assertDoesNotThrow(() -> jwtValidator.configure(configs, OAUTHBEARER_MECHANISM, getJaasConfigEntries()));
    +        assertInstanceOf(BrokerJwtValidator.class, jwtValidator.delegate());
    +        assertDoesNotThrow(() -> jwtValidator.validate(accessToken));
    +    }
    +
         private CloseableVerificationKeyResolver createVerificationKeyResolver(AccessTokenBuilder builder) {
             return (jws, nestingContext) -> builder.jwk().getPublicKey();
         }
    
  • core/src/test/scala/integration/kafka/api/ClientOAuthIntegrationTest.scala+32 3 modified
    @@ -26,10 +26,11 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
     import java.util.{Base64, Collections, Properties}
     import no.nav.security.mock.oauth2.{MockOAuth2Server, OAuth2Config}
     import no.nav.security.mock.oauth2.token.{KeyProvider, OAuth2TokenProvider}
    -import org.apache.kafka.common.KafkaException
    +import org.apache.kafka.common.{KafkaException, TopicPartition}
     import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
    +import org.apache.kafka.common.errors.SaslAuthenticationException
     import org.apache.kafka.common.security.auth.SecurityProtocol
    -import org.apache.kafka.common.security.oauthbearer.{OAuthBearerLoginCallbackHandler, OAuthBearerLoginModule, OAuthBearerValidatorCallbackHandler}
    +import org.apache.kafka.common.security.oauthbearer.{JwtRetriever, OAuthBearerLoginCallbackHandler, OAuthBearerLoginModule, OAuthBearerValidatorCallbackHandler}
     import org.apache.kafka.common.utils.Utils
     import org.apache.kafka.test.TestUtils
     import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertThrows}
    @@ -244,6 +245,27 @@ class ClientOAuthIntegrationTest extends IntegrationTestHarness with SaslSetup {
         assertThrows(classOf[ConfigException], () => createAdminClient(configOverrides = configs))
       }
     
    +  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
    +  @MethodSource(Array("getTestGroupProtocolParametersAll"))
    +  def testAuthenticationErrorOnTamperedJwt(groupProtocol: String): Unit = {
    +    val className = classOf[TamperedJwtRetriever].getName
    +
    +    val configs = defaultOAuthConfigs()
    +    configs.put(SaslConfigs.SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS, className)
    +
    +    val tp = new TopicPartition("test-topic", 0)
    +
    +    val admin = createAdminClient(configOverrides = configs)
    +    TestUtils.assertFutureThrows(classOf[SaslAuthenticationException], admin.describeCluster().clusterId())
    +
    +    val producer = createProducer(configOverrides = configs)
    +    assertThrows(classOf[SaslAuthenticationException], () => producer.partitionsFor(tp.topic()))
    +
    +    val consumer = createConsumer(configOverrides = configs)
    +    consumer.assign(Collections.singleton(tp))
    +    assertThrows(classOf[SaslAuthenticationException], () => consumer.position(tp))
    +  }
    +
       def generatePrivateKeyFile(): File = {
         val file = File.createTempFile("private-", ".key")
         val bytes = Base64.getEncoder.encode(privateKey.getEncoded)
    @@ -258,4 +280,11 @@ class ClientOAuthIntegrationTest extends IntegrationTestHarness with SaslSetup {
     
         file
       }
    -}
    \ No newline at end of file
    +}
    +
    +class TamperedJwtRetriever extends JwtRetriever {
    +
    +  override def retrieve(): String = {
    +    "eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9.eyJzdWIiOiAiMTIzNDU2Nzg5MCIsICJuYW1lIjogIkpvaG4gRG9lIiwgInJvbGUiOiAiYWRtaW4iLCAiaWF0IjogMTUxNjIzOTAyMiwgImV4cCI6IDE5MTYyMzkwMjJ9.vVT5ylQCGvb0B-wv1YXHjmlMd-DZKCThUt5-enry_sA"
    +  }
    +}
    

Vulnerability mechanics

Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

6

News mentions

0

No linked articles in our index yet.