JMeter KAFKA connection using SCRAM authentication
JAAS.conf file content.
JMeter { org.apache.kafka.common.security.scram.ScramLoginModule required doNotPrompt=false useKeyTab=false username="" password="" storeKey=false; };
Plugin used : Kafka pepper box
System properties added to enable SCRAM authentication
• sasl.enabled.mechanisms=SCRAM-SHA-256 • security.inter.broker.protocol=SASL_SSL • sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
JMeter version used : 5.4.1 JDK installation : 1.7 KAFKA version : 2.3 kafka-1.0.0.jar
After taking a deep look in the logs and the contents of Kafka jar files , I guess the SASLclientAuthenticator.class under org.apache.security.authenticator of the KAFKA jar file throws a "Failed to configure SaslClientAuthenticator" [seen in JMeter logs]
This method uses GSSAPI as one of the parameters . The authentication deployed in the KAFKA topic is SCRAM-SHA-256 and the class and methods related to this sit under package scram which is not being called at all.
JMeter LOGS
2-10-10 12:14:46,324 INFO o.a.z.ClientCnxn: Session establishment complete on server **, sessionid = 0x1f01e376955b0063, negotiated timeout = 10000 2022-10-10 12:14:46,324 ERROR o.a.z.ClientCnxn: Error while calling watcher java.lang.NullPointerException: Cannot invoke "org.apache.zookeeper.Watcher.process(org.apache.zookeeper.WatchedEvent)" because "watcher" is null at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:522) [pepper-box-1.0.jar:?] at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:498) [pepper-box-1.0.jar:?] 2022-10-10 12:14:46,324 INFO o.a.k.c.p.ProducerConfig: ProducerConfig values: acks = 1 batch.size = 16384 bootstrap.servers = [.com:9041, ***.com:9041, ***.com:9041, ***.com:9041] buffer.memory = 33554432 client.dns.lookup = default client.id = compression.type = none connections.max.idle.ms = 540000 delivery.timeout.ms = 120000 enable.idempotence = false interceptor.classes = [] key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 2147483647 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = kafka sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = SASL_PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2022-10-10 12:14:46,367 INFO o.a.k.c.s.a.AbstractLogin: Successfully logged in.
2022-10-10 12:14:46,398 INFO o.a.k.c.u.AppInfoParser: Kafka version: 2.3.0
2022-10-10 12:14:46,398 INFO o.a.k.c.u.AppInfoParser: Kafka commitId: fc1aaa116b661c8a
2022-10-10 12:14:46,398 INFO o.a.k.c.u.AppInfoParser: Kafka startTimeMs: 1665418486398
2022-10-10 12:14:46,398 INFO o.a.k.c.n.SaslChannelBuilder: Failed to create channel due to
org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
Caused by: org.apache.kafka.common.KafkaException: Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.firstPrincipal(SaslClientAuthenticator.java:562) ~[jmeter.backendlistener.kafka-1.0.0.jar:?]
at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.
JMeter logs
[jmeter.backendlistener.kafka-1.0.0.jar:?]
at org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:202) ~[jmeter.backendlistener.kafka-1.0.0.jar:?]
at org.apache.kafka.common.network.KafkaChannel.
Jar file contents
SASL.client.authenticator code*
package org.apache.kafka.common.security.authenticator;
import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.GatheringByteChannel; import java.nio.channels.ScatteringByteChannel; import java.security.Principal; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; import java.util.Arrays; import java.util.Iterator; import java.util.Map; import java.util.Set; import javax.security.auth.Subject; import javax.security.sasl.Sasl; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.IllegalSaslStateException; import org.apache.kafka.common.errors.SaslAuthenticationException; import org.apache.kafka.common.errors.UnsupportedSaslMechanismException; import org.apache.kafka.common.network.Authenticator; import org.apache.kafka.common.network.Mode; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.network.NetworkSend; import org.apache.kafka.common.network.Send; import org.apache.kafka.common.network.TransportLayer; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.SchemaException; import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.ApiVersionsRequest; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.SaslAuthenticateRequest; import org.apache.kafka.common.requests.SaslAuthenticateResponse; import org.apache.kafka.common.requests.SaslHandshakeRequest; import org.apache.kafka.common.requests.SaslHandshakeResponse; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
public class SaslClientAuthenticator implements Authenticator { public enum SaslState { SEND_APIVERSIONS_REQUEST, RECEIVE_APIVERSIONS_RESPONSE, SEND_HANDSHAKE_REQUEST, RECEIVE_HANDSHAKE_RESPONSE, INITIAL, INTERMEDIATE, CLIENT_COMPLETE, COMPLETE, FAILED; }
private static final Logger LOG = LoggerFactory.getLogger(SaslClientAuthenticator.class);
private static final short DISABLE_KAFKA_SASL_AUTHENTICATE_HEADER = -1;
private final Subject subject;
private final String servicePrincipal;
private final String host;
private final String node;
private final String mechanism;
private final TransportLayer transportLayer;
private final SaslClient saslClient;
private final Map<String, ?> configs;
private final String clientPrincipalName;
private final AuthCallbackHandler callbackHandler;
private NetworkReceive netInBuffer;
private Send netOutBuffer;
private SaslState saslState;
private SaslState pendingSaslState;
private int correlationId;
private RequestHeader currentRequestHeader;
private short saslAuthenticateVersion;
public SaslClientAuthenticator(Map<String, ?> configs, String node, Subject subject, String servicePrincipal, String host, String mechanism, boolean handshakeRequestEnable, TransportLayer transportLayer) throws IOException { this.node = node; this.subject = subject; this.host = host; this.servicePrincipal = servicePrincipal; this.mechanism = mechanism; this.correlationId = -1; this.transportLayer = transportLayer; this.configs = configs; this.saslAuthenticateVersion = -1; try { setSaslState(handshakeRequestEnable ? SaslState.SEND_APIVERSIONS_REQUEST : SaslState.INITIAL); if (mechanism.equals("GSSAPI")) { this.clientPrincipalName = firstPrincipal(subject); } else { this.clientPrincipalName = null; } this.callbackHandler = new SaslClientCallbackHandler(); this.callbackHandler.configure(configs, Mode.CLIENT, subject, mechanism); this.saslClient = createSaslClient(); } catch (Exception e) { throw new SaslAuthenticationException("Failed to configure SaslClientAuthenticator", e); } }
private SaslClient createSaslClient() { try { return Subject.<SaslClient>doAs(this.subject, new PrivilegedExceptionAction<SaslClient>() { public SaslClient run() throws SaslException { String[] mechs = { SaslClientAuthenticator.this.mechanism }; SaslClientAuthenticator.LOG.debug("Creating SaslClient: client={};service={};serviceHostname={};mechs={}", new Object[] { SaslClientAuthenticator.access$100(this.this$0), SaslClientAuthenticator.access$200(this.this$0), SaslClientAuthenticator.access$300(this.this$0), Arrays.toString((Object[])mechs) }); return Sasl.createSaslClient(mechs, SaslClientAuthenticator.this.clientPrincipalName, SaslClientAuthenticator.this.servicePrincipal, SaslClientAuthenticator.this.host, SaslClientAuthenticator.this.configs, SaslClientAuthenticator.this.callbackHandler); } }); } catch (PrivilegedActionException e) { throw new SaslAuthenticationException("Failed to create SaslClient with mechanism " + this.mechanism, e.getCause()); } }
public void authenticate() throws IOException { ApiVersionsRequest apiVersionsRequest; ApiVersionsResponse apiVersionsResponse; ApiVersionsResponse.ApiVersion authenticateVersion; SaslHandshakeRequest handshakeRequest; SaslHandshakeResponse handshakeResponse; byte[] serverToken; boolean noResponsesPending; byte[] serverResponse; short saslHandshakeVersion = 0; if (this.netOutBuffer != null && !flushNetOutBufferAndUpdateInterestOps()) return; switch (this.saslState) { case NONE: apiVersionsRequest = new ApiVersionsRequest((short)0); send(apiVersionsRequest.toSend(this.node, nextRequestHeader(ApiKeys.API_VERSIONS, apiVersionsRequest.version()))); setSaslState(SaslState.RECEIVE_APIVERSIONS_RESPONSE); break; case UNSUPPORTED_SASL_MECHANISM: apiVersionsResponse = (ApiVersionsResponse)receiveKafkaResponse(); if (apiVersionsResponse == null) break; saslHandshakeVersion = (apiVersionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id)).maxVersion; authenticateVersion = apiVersionsResponse.apiVersion(ApiKeys.SASL_AUTHENTICATE.id); if (authenticateVersion != null) saslAuthenticateVersion((short)Math.min(authenticateVersion.maxVersion, ApiKeys.SASL_AUTHENTICATE.latestVersion())); setSaslState(SaslState.SEND_HANDSHAKE_REQUEST); case ILLEGAL_SASL_STATE: handshakeRequest = createSaslHandshakeRequest(saslHandshakeVersion); send(handshakeRequest.toSend(this.node, nextRequestHeader(ApiKeys.SASL_HANDSHAKE, handshakeRequest.version()))); setSaslState(SaslState.RECEIVE_HANDSHAKE_RESPONSE); break; case null: handshakeResponse = (SaslHandshakeResponse)receiveKafkaResponse(); if (handshakeResponse == null) break; handleSaslHandshakeResponse(handshakeResponse); setSaslState(SaslState.INITIAL); case null: sendSaslClientToken(new byte[0], true); setSaslState(SaslState.INTERMEDIATE); break; case null: serverToken = receiveToken(); noResponsesPending = (serverToken != null && !sendSaslClientToken(serverToken, false)); if (this.saslClient.isComplete()) { if (this.saslAuthenticateVersion == -1 || noResponsesPending) { setSaslState(SaslState.COMPLETE); break; } setSaslState(SaslState.CLIENT_COMPLETE); } break; case null: serverResponse = receiveToken(); if (serverResponse != null) setSaslState(SaslState.COMPLETE); break; case null: throw new IllegalStateException("SASL handshake has already failed"); } }
private RequestHeader nextRequestHeader(ApiKeys apiKey, short version) { String clientId = (String)this.configs.get("client.id"); this.currentRequestHeader = new RequestHeader(apiKey, version, clientId, this.correlationId++); return this.currentRequestHeader; }
protected SaslHandshakeRequest createSaslHandshakeRequest(short version) { return (new SaslHandshakeRequest.Builder(this.mechanism)).build(version); }
protected void saslAuthenticateVersion(short version) { this.saslAuthenticateVersion = version; }
private void setSaslState(SaslState saslState) { if (this.netOutBuffer != null && !this.netOutBuffer.completed()) { this.pendingSaslState = saslState; } else { this.pendingSaslState = null; this.saslState = saslState; LOG.debug("Set SASL client state to {}", saslState); if (saslState == SaslState.COMPLETE) this.transportLayer.removeInterestOps(4); } }
private boolean sendSaslClientToken(byte[] serverToken, boolean isInitial) throws IOException { if (!this.saslClient.isComplete()) { byte[] saslToken = createSaslToken(serverToken, isInitial); if (saslToken != null) { ByteBuffer tokenBuf = ByteBuffer.wrap(saslToken); if (this.saslAuthenticateVersion != -1) { SaslAuthenticateRequest request = (new SaslAuthenticateRequest.Builder(tokenBuf)).build(this.saslAuthenticateVersion); tokenBuf = request.serialize(nextRequestHeader(ApiKeys.SASL_AUTHENTICATE, this.saslAuthenticateVersion)); } send((Send)new NetworkSend(this.node, tokenBuf)); return true; } } return false; }
private void send(Send send) throws IOException { try { this.netOutBuffer = send; flushNetOutBufferAndUpdateInterestOps(); } catch (IOException e) { setSaslState(SaslState.FAILED); throw e; } }
private boolean flushNetOutBufferAndUpdateInterestOps() throws IOException { boolean flushedCompletely = flushNetOutBuffer(); if (flushedCompletely) { this.transportLayer.removeInterestOps(4); if (this.pendingSaslState != null) setSaslState(this.pendingSaslState); } else { this.transportLayer.addInterestOps(4); } return flushedCompletely; }
private byte[] receiveResponseOrToken() throws IOException { if (this.netInBuffer == null) this.netInBuffer = new NetworkReceive(this.node); this.netInBuffer.readFrom((ScatteringByteChannel)this.transportLayer); byte[] serverPacket = null; if (this.netInBuffer.complete()) { this.netInBuffer.payload().rewind(); serverPacket = new byte[this.netInBuffer.payload().remaining()]; this.netInBuffer.payload().get(serverPacket, 0, serverPacket.length); this.netInBuffer = null; } return serverPacket; }
public KafkaPrincipal principal() { return new KafkaPrincipal("User", this.clientPrincipalName); }
public boolean complete() { return (this.saslState == SaslState.COMPLETE); }
public void close() throws IOException { if (this.saslClient != null) this.saslClient.dispose(); if (this.callbackHandler != null) this.callbackHandler.close(); }
private byte[] receiveToken() throws IOException { if (this.saslAuthenticateVersion == -1) return receiveResponseOrToken(); SaslAuthenticateResponse response = (SaslAuthenticateResponse)receiveKafkaResponse(); if (response != null) { Errors error = response.error(); if (error != Errors.NONE) { setSaslState(SaslState.FAILED); String errMsg = response.errorMessage(); throw (errMsg == null) ? error.exception() : error.exception(errMsg); } return Utils.readBytes(response.saslAuthBytes()); } return null; }
private byte[] createSaslToken(final byte[] saslToken, boolean isInitial) throws SaslException {
if (saslToken == null)
throw new IllegalSaslStateException("Error authenticating with the Kafka Broker: received a null saslToken.");
try {
if (isInitial && !this.saslClient.hasInitialResponse())
return saslToken;
return Subject.<byte[]>doAs(this.subject, (PrivilegedExceptionAction)new PrivilegedExceptionAction<byte[]>() {
public byte[] run() throws SaslException {
return SaslClientAuthenticator.this.saslClient.evaluateChallenge(saslToken);
}
});
} catch (PrivilegedActionException e) {
String error = "An error: (" + e + ") occurred when evaluating SASL token received from the Kafka Broker.";
String unknownServerErrorText = "(Mechanism level: Server not found in Kerberos database (7) - UNKNOWN_SERVER)";
if (e.toString().contains("(Mechanism level: Server not found in Kerberos database (7) - UNKNOWN_SERVER)"))
error = error + " This may be caused by Java's being unable to resolve the Kafka Broker's hostname correctly. You may want to try to adding '-Dsun.net.spi.nameservice.provider.1=dns,sun' to your client's JVMFLAGS environment. Users must configure FQDN of kafka brokers when authenticating using SASL and socketChannel.socket().getInetAddress().getHostName() must match the hostname in principal/hostname@realm";
error = error + " Kafka Client will go to AUTHENTICATION_FAILED state.";
throw new SaslAuthenticationException(error, e.getCause());
}
}
private boolean flushNetOutBuffer() throws IOException { if (!this.netOutBuffer.completed()) this.netOutBuffer.writeTo((GatheringByteChannel)this.transportLayer); return this.netOutBuffer.completed(); }
private AbstractResponse receiveKafkaResponse() throws IOException { try { byte[] responseBytes = receiveResponseOrToken(); if (responseBytes == null) return null; AbstractResponse response = NetworkClient.parseResponse(ByteBuffer.wrap(responseBytes), this.currentRequestHeader); this.currentRequestHeader = null; return response; } catch (SchemaException|IllegalArgumentException e) { LOG.debug("Invalid SASL mechanism response, server may be expecting only GSSAPI tokens"); setSaslState(SaslState.FAILED); throw new IllegalSaslStateException("Invalid SASL mechanism response, server may be expecting a different protocol", e); } }
private void handleSaslHandshakeResponse(SaslHandshakeResponse response) { Errors error = response.error(); if (error != Errors.NONE) setSaslState(SaslState.FAILED); switch (error) { case NONE: return; case UNSUPPORTED_SASL_MECHANISM: throw new UnsupportedSaslMechanismException(String.format("Client SASL mechanism '%s' not enabled in the server, enabled mechanisms are %s", new Object[] { this.mechanism, response.enabledMechanisms() })); case ILLEGAL_SASL_STATE: throw new IllegalSaslStateException(String.format("Unexpected handshake request with client mechanism %s, enabled mechanisms are %s", new Object[] { this.mechanism, response.enabledMechanisms() })); } throw new IllegalSaslStateException(String.format("Unknown error code %s, client mechanism is %s, enabled mechanisms are %s", new Object[] { response.error(), this.mechanism, response.enabledMechanisms() })); }
static final String firstPrincipal(Subject subject) { Set<Principal> principals = subject.getPrincipals(); synchronized (principals) { Iterator<Principal> iterator = principals.iterator(); if (iterator.hasNext()) return ((Principal)iterator.next()).getName(); throw new KafkaException("Principal could not be determined from Subject, this may be a transient failure due to Kerberos re-login"); } } }