Kafka is a popular stream processing platform combining features from pub/sub and traditional queues.
Using Ondat persistent volumes with Apache Kafka means that if a pod fails, the cluster is only in a degraded state for as long as it takes Kubernetes to restart the pod. When the pod comes back up, the pod data is immediately available. Should Kubernetes schedule the kafka pod on a new node, Ondat allows for the data to be available to the pod, irrespective of whether or not the original Ondat master volume is located on the same node.
Kafka has features to allow it to handle replication, and as such careful consideration of whether to allow Ondat or Kafka to handle replication is required.
Before you start, ensure you have Ondat installed and ready on a Kubernetes cluster. See our guide on how to install Ondat on Kubernetes for more information.
- Apache Zookeeper is required by Kafka to function; we assume it to already
exist and be accessible within the Kubernetes cluster as
zookeeper, see how to run Zookeeper with Ondat here
- Ondat is assumed to have been installed; please check for the latest available version here
- Kafka pods require 1536 MB of memory for successful scheduling
To simplify the deployment of kafka, we’ve used this
Kafka helm chart (incubator)
0.13.8, app version
5.0.1) and rendered it into the
example deployment files you can find in our GitHub
Clone the use cases repo
You can find the latest files in the Ondat use cases repository
git clone https://github.com/storageos/use-cases.git storageos-usecases cd storageos-usecases
--- apiVersion: apps/v1beta1 kind: StatefulSet metadata: name: kafka labels: app: kafka ... spec: serviceName: kafka-headless podManagementPolicy: OrderedReady updateStrategy: type: OnDelete replicas: 3 # <--- number of kafa pods to run template: ... spec: serviceAccountName: kafka containers: ... - name: kafka-broker image: "confluentinc/cp-kafka:5.0.1" imagePullPolicy: "IfNotPresent" ... volumeMounts: - name: datadir mountPath: "/var/data" volumes: - name: jmx-config configMap: name: kafka-metrics terminationGracePeriodSeconds: 60 volumeClaimTemplates: - metadata: name: datadir spec: accessModes: ["ReadWriteOnce"] resources: requests: storage: 10Gi # <--- storage requested for each pod storageClassName: "storageos" # <--- the StorageClass to use
This excerpt is from the StatefulSet definition (
file contains the PersistentVolumeClaim template that will dynamically
provision the necessary storage, using the Ondat storage class.
Dynamic provisioning occurs as a volumeMount has been declared with the same name as a VolumeClaimTemplate.
Create the kubernetes objects
kubectl apply -f ./kafka/
Confirm kafka is up and running
$ kubectl get pods -l app=kafka NAME READY STATUS RESTARTS AGE kafka-0 2/2 Running 0 10m kafka-1 2/2 Running 0 9m26s kafka-2 2/2 Running 0 7m59s
Connect to kafka
Connect to the kafka test client pod and send some test data to kafka through its service endpoint
Connect to the pod
kubectl exec -it kafka-test-client /bin/bash
Create a topic
/usr/bin/kafka-topics --zookeeper zookeeper:2181 --create --topic test-rep-one --partitions 6 --replication-factor 1
Send some test data
/usr/bin/kafka-run-class org.apache.kafka.tools.ProducerPerformance --topic test-rep-one --num-records 5000 --record-size 100 --throughput -1 --print-metrics --producer-props acks=1 bootstrap.servers=kafka:9092 buffer.memory=67108864 batch.size=8196