Support Kerberos authentication in Kafka
Currently we only support mutual tls as authentication mechanism, especially in existing installations, Kerberos may be a preferred way of doing this.
See stackabletech/decisions#28
Below is a draft custom resource for a Kafka cluster, using overrides. a few things to note:
- the custom image
docker.stackable.tech/apoc/stackable/kafka:3.7.1-stackable0.0.0-cyrus-sasl-gssapiadds(microdnf install) cyrus-sasl-gssapito the kafka image - the kcat readines probe command below times out.
/stackable/kcat -Vcan be used if we just want the pods to come up, but then the full command can be successfully executed when shelling into the kcat container! - the
argsneeds to be overridden to be able to changelistener.security.protocol.map - running a similar kcat command from a client (e.g. a Job) allows us to fetch metadata, and to create topics, ~but consuming from a topic results in the following error:
Server kafka/[email protected] not found in Kerberos database, i.e. the advertised listener needs to be kerberos-ized as well.~-
Update: this works if the advertised listener is changed to
kafka.stackable-products.svc.cluster.local:9093as shown below -
Update 2: service replaced with
scope: podfor multiple brokers
-
Update: this works if the advertised listener is changed to
---
apiVersion: zookeeper.stackable.tech/v1alpha1
kind: ZookeeperZnode
metadata:
name: kafka-znode
namespace: stackable-products
spec:
clusterRef:
name: zookeeper
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
name: kafka
namespace: stackable-products
spec:
image:
productVersion: 3.7.1
repo: docker.stackable.tech/apoc/stackable
pullPolicy: IfNotPresent
clusterConfig:
tls:
serverSecretClass: tls
zookeeperConfigMapName: kafka-znode
brokers:
config:
logging:
enableVectorAgent: False
resources:
memory:
limit: '1.3Gi'
roleGroups:
default:
replicas: 1
envOverrides:
KRB5_CONFIG: "/etc/krb5.conf"
configOverrides:
server.properties:
sasl.enabled.mechanisms: "GSSAPI"
sasl.kerberos.service.name : "kafka"
sasl.mechanism.inter.broker.protocol: "GSSAPI"
podOverrides:
spec:
containers:
- name: kafka
volumeMounts:
- name: kerberos
mountPath: /stackable/kerberos
- name: kerberos
mountPath: /etc/krb5.conf
subPath: krb5.conf
args:
- |2
prepare_signal_handlers()
{
unset term_child_pid
unset term_kill_needed
trap 'handle_term_signal' TERM
}
handle_term_signal()
{
if [ "${term_child_pid}" ]; then
kill -TERM "${term_child_pid}" 2>/dev/null
else
term_kill_needed="yes"
fi
}
wait_for_termination()
{
set +e
term_child_pid=$1
if [[ -v term_kill_needed ]]; then
kill -TERM "${term_child_pid}" 2>/dev/null
fi
wait ${term_child_pid} 2>/dev/null
trap - TERM
wait ${term_child_pid} 2>/dev/null
set -e
}
rm -f /stackable/log/_vector/shutdown
prepare_signal_handlers
bin/kafka-server-start.sh \
/stackable/config/server.properties \
--override "zookeeper.connect=$ZOOKEEPER" \
--override "listeners=CLIENT://0.0.0.0:9093,INTERNAL://0.0.0.0:19093" \
--override "advertised.listeners=
CLIENT://$POD_NAME.kafka-broker-default.stackable-products.svc.cluster.local:9093,
INTERNAL://$POD_NAME.kafka-broker-default.stackable-products.svc.cluster.local:19093" \
--override "listener.security.protocol.map=CLIENT:SASL_SSL,INTERNAL:SSL" \
--override "listener.name.client.gssapi.sasl.jaas.config=
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab=\"/stackable/kerberos/keytab\"
principal=\"kafka/$POD_NAME.kafka-broker-default.stackable-products.svc.cluster.local@CLUSTER.LOCAL\";" &
wait_for_termination $!
mkdir -p /stackable/log/_vector && touch /stackable/log/_vector/shutdown
command:
- /bin/bash
- -x
- -euo
- pipefail
- -c
- name: kcat-prober
env:
- name: KRB5_CONFIG
value: /etc/krb5.conf
volumeMounts:
- name: kerberos
mountPath: /stackable/kerberos
- name: kerberos
mountPath: /etc/krb5.conf
subPath: krb5.conf
readinessProbe:
exec:
command:
- /stackable/kcat
- -V
volumes:
- name: kerberos
ephemeral:
volumeClaimTemplate:
metadata:
annotations:
secrets.stackable.tech/class: kerberos
secrets.stackable.tech/scope: pod
secrets.stackable.tech/kerberos.service.names: kafka
spec:
storageClassName: secrets.stackable.tech
accessModes:
- ReadWriteOnce
resources:
requests:
storage: "1"
Herre is a sample client job:
---
apiVersion: batch/v1
kind: Job
metadata:
name: access-kafka
namespace: stackable-products
spec:
template:
spec:
containers:
- name: access-kafka
image: docker.stackable.tech/stackable/kafka:3.7.1-stackable0.0.0-cyrus-sasl-gssapi
command:
- /bin/bash
- /tmp/script/script.sh
env:
- name: KRB5_CONFIG
value: /etc/krb5.conf
volumeMounts:
- name: script
mountPath: /tmp/script
- mountPath: /stackable/tls_keystore_internal
name: tls-keystore-internal
- mountPath: /stackable/tls_keystore_server
name: tls-keystore-server
- mountPath: /stackable/tls_cert_server_mount
name: tls-cert-server-mount
- name: config-emptydir
mountPath: /stackable/conf/hbase
- name: kerberos
mountPath: /stackable/kerberos
- name: kerberos
mountPath: /etc/krb5.conf
subPath: krb5.conf
volumes:
- name: script
configMap:
name: access-kafka-script
- name: tls-keystore-server
ephemeral:
volumeClaimTemplate:
metadata:
annotations:
secrets.stackable.tech/class: tls
secrets.stackable.tech/format: tls-pkcs12
secrets.stackable.tech/scope: pod,node
spec:
storageClassName: secrets.stackable.tech
accessModes:
- ReadWriteOnce
resources:
requests:
storage: "1"
- name: tls-keystore-internal
ephemeral:
volumeClaimTemplate:
metadata:
annotations:
secrets.stackable.tech/class: tls
secrets.stackable.tech/format: tls-pkcs12
secrets.stackable.tech/scope: pod,node
spec:
storageClassName: secrets.stackable.tech
accessModes:
- ReadWriteOnce
resources:
requests:
storage: "1"
- name: config-emptydir
emptyDir: {}
- name: kerberos
ephemeral:
volumeClaimTemplate:
metadata:
annotations:
secrets.stackable.tech/class: kerberos
secrets.stackable.tech/scope: service=access-kafka
secrets.stackable.tech/kerberos.service.names: admin
spec:
storageClassName: secrets.stackable.tech
accessModes:
- ReadWriteOnce
resources:
requests:
storage: "1"
- name: tls-cert-server-mount
ephemeral:
volumeClaimTemplate:
metadata:
annotations:
secrets.stackable.tech/class: tls
secrets.stackable.tech/scope: pod,node,service=kafka
creationTimestamp: null
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: "1"
storageClassName: secrets.stackable.tech
volumeMode: Filesystem
securityContext:
fsGroup: 1000
runAsGroup: 1000
runAsUser: 1000
restartPolicy: OnFailure
---
apiVersion: v1
kind: ConfigMap
metadata:
name: access-kafka-script
namespace: stackable-products
data:
script.sh: |
set -ex
sleep infinity
A sample kcat callout from this pod:
/stackable/kcat -b kafka-broker-default-0.kafka-broker-default.stackable-products.svc.cluster.local:9093 \
-X security.protocol=SASL_SSL \
-X ssl.ca.location=/stackable/tls_cert_server_mount/ca.crt \
-X sasl.kerberos.keytab=/stackable/kerberos/keytab \
-X sasl.kerberos.service.name=kafka \
-X sasl.kerberos.principal=admin/[email protected] \
-X sasl.mechanism=GSSAPI \
-L -# t test-topic -C
See also https://github.com/stackabletech/docker-images/pull/874
Not sure how to proceed with this: we have a working solution that allows clients to connect to one or more of the broker listeners, but not the listener bootstrap. Here are some open points/questions:
- I've tried adding the listener bootstrap to the list of
advertised.listeners, but this does not seem to be the recommended approach, and is also not straightforward (each listener tag -CLIENT,INTERNALetc. - must be unique and can contain only one element, has to be entered in the protocol map, and requires a corresponding internal listener, which is problematic when binding two addresses to the same port).- The advertised listener for each broker is presumably only needed once the bootstrap has provided one of the brokers with which the client cna interact.
- The connectivity via the listener bootstrap does not work with non-Kerberos setups, either.
- Is not bootstrap.servers mainly a client setting? Can't we define a quorum of the broker listeners, knowing that they will be consistently reachable?
We have a fix:
- add a new internal listener:
BOOTSTRAP://0.0.0.0:9094 - advertised as:
BOOTSTRAP://test-kafka-broker-default-0-listener-broker.kuttl-test-tight-shad.svc.cluster.local:9093 - add jaas config for this listener:
--override "listener.name.bootstrap.gssapi.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/stackable/kerberos/keytab\" principal=\"kafka/$(cat /stackable/listener-bootstrap/default-address/address)@$KERBEROS_REALM\";" - add a new service in code for this:
+ ListenerPort {
+ name: "bootstrap".to_string(),
+ port: 9094,
+ protocol: Some("TCP".to_string()),
+ },
]
}
@@ -1217,5 +1222,11 @@ fn container_ports(kafka_security: &KafkaTlsSecurity) -> Vec<ContainerPort> {
protocol: Some("TCP".to_string()),
..ContainerPort::default()
},
+ ContainerPort {
+ name: Some("bootstrap".to_string()),
+ container_port: 9094,
+ protocol: Some("TCP".to_string()),
+ ..ContainerPort::default()
+ },
]
}
- extend server.properties for thids listener so that the certificate is available:
configOverrides:
server.properties:
listener.name.bootstrap.ssl.keystore.location: /stackable/tls-kafka-server/keystore.p12
listener.name.bootstrap.ssl.keystore.password: ""
listener.name.bootstrap.ssl.keystore.type: PKCS12
listener.name.bootstrap.ssl.truststore.location: /stackable/tls-kafka-server/truststore.p12
listener.name.bootstrap.ssl.truststore.password: ""
listener.name.bootstrap.ssl.truststore.type: PKCS12
Docs are here.
Thank you! And could you also include a snippet for the release notes please?
Support for Kerberos authentication has been extended to Apache Kafka in this release. This feature is currently experimental as the details regarding client connectivity may be subject to change.