Kafka Streaming Platform
The Kafka datastack is used to configure and deploy a Apache Kafka event streaming platform into dataspaces and provide Kafka messaging services to applications in the SCALE environment.
Apache Kafka is and open-source distributed event streaming platform optimized for ingesting and processing streaming data in real-time. Kafka can connect to external systems via Kafka Connect and provides Kafka Streams, a Java stream processing library. Kafka uses a binary TCP-based protocol.
Strimzi Operator
Strimzi is an open-source project that provides container images and operators for running Apache Kafka on Kubernetes. Operators are a method of packaging, deploying, and managing a Kubernetes application. Strimzi Operators extend Kubernetes functionality, automating common and complex tasks related to a Kafka deployment. By implementing knowledge of Kafka operations in code, Kafka administration tasks are simplified and require less manual intervention.
Strimzi simplify the process of:
- Deploying and running Kafka clusters
- Deploying and running Kafka components
- Configuring access to Kafka
- Securing access to Kafka
- Upgrading Kafka
- Managing brokers
- Creating and managing topics
- Creating and managing users
Strimzi provides Operators for managing a Kafka cluster running within a Kubernetes cluster.
-
Cluster Operator
Deploys and manages Apache Kafka clusters, Kafka Connect, Kafka MirrorMaker, Kafka Bridge, Kafka Exporter, Cruise Control, and the Entity Operator
-
Entity Operator
Comprises the Topic Operator and User Operator
-
Topic Operator
Manages Kafka topics
-
User Operator
Manages Kafka users
More information about Apache Kafka and Strimzi Operator can be found in Strimzi Overview guide.
Provisioning
When provisioning an Kafka datastack, the CI/CD pipeline will generate a Helm chart to provision the Kafka cluster and its other components. Behind the scenes, this Helm chart includes built-in objects of Kubernetes as well as some special Kafka related objects which are a YAML structure passed to Strimzi Operator that will define how to provision the new Kafka cluster.
The specifications of resources for the Kafka cluster provisioned by SCALE will depend on the size the datastack set at time of onboarding, according to the following table:
| Size | Total CPU | Total Memory | Pod count | PVC count | PVC Storage (Min) | PVC Storage (Max) |
|---|---|---|---|---|---|---|
| Large | 16 | 32Gi | 30 | 12 | 1Gi | 100Gi |
| X-Large | 24 | 48Gi | 50 | 20 | 1Gi | 200Gi |
| XX-Large | 32 | 64Gi | 60 | 30 | 1Gi | 300Gi |
| XXX-Large | 64 | 128Gi | 120 | 40 | 1Gi | 400Gi |
Getting Started in the Azure DevOps environment
Refer to the following link to learn about getting started in the Azure DevOps environment: Getting Started in Azure DevOps Environment
Repository Structure
All of the information that instructs the CI/CD pipeline how to provision the Kafka cluster and to what specifications are contained within a Git repository for the datastack.
.
├── azure-pipelines.yml
├── <appcode>-db-<kafka>
│ ├── Chart.yaml
│ ├── templates
│ │ ├── _annotations.tpl
│ │ ├── _helpers.tpl
│ │ ├── kafka-admin.yaml
│ │ ├── kafka-bridge
│ │ │ └── kafka-bridges.yaml
│ │ ├── kafka-connect
│ │ │ ├── file-stream-source-connector.yaml
│ │ │ └── kafka-connect.yaml
│ │ ├── kafka-metrics-configmap.yaml
│ │ ├── kafka-topics.yaml
│ │ ├── kafka-users.yaml
│ │ ├── kafka.yaml
│ │ ├── NOTES.txt
│ │ └── tests
│ │ └── kafka-debug-console.yaml
│ ├── values.dataspace.yaml
│ └── values.yaml
└── README.md
The structure of the Git repository for the Kafka datastack will contain the aforementioned Helm chart directory. The files under this directory are automatically generated with some default configuration for Kafka cluster. These files can be modified based on Kafka cluster requirements.
Helm chart directory has values.dataspace.yaml, which should be used to override default values stored in values.yaml. If more advanced configuration is required, other files within templates directory can be modified.
Also at the top of the repository is a file called azure-pipelines.yml. This file contains reference to the appropriate version of the CI/CD pipeline logic.
Continuous Integration and Continuous Delivery Pipelines
Please note that the document "SCALE Concepts and Procedures" contains more details about the specific flow of the application through the CI/CD pipelines
The triggering of the CI/CD pipeline to provision the datastack is manual. Details for navigating to the CI/CD pipeline in Azure DevOps, triggering a run and examining the results can be found here: Continuous Integration and Continuous Delivery Pipelines
Getting Started with Kafka
Kafka version
By default, pipeline is configured to use Apache Kafka version 3.0.0, which can be configured in
azure-pipelines.yml. Currently supported Kafka versions are 2.8.0, 2.8.1, 3.0.0.
extends:
template: dbaas/kafka-<dataspace name>.yml@spaces
parameters:
kafka:
version: 3.0.0
Kafka Configuration
Strimzi provides Custom Resource Kafka to deploy and manage Kafka cluster in Kubernetes. Kafka schema reference can
be found here.
While provisioning Kafka datastack, it is provided with some default configurations for Kafka and its components.
Once Kafka datastack is provisioned, first thing to do is configure Kafka based on application's requirements. This requires configuring Kafka brokers and setting resources required for Kafka brokers and ZooKeeper.
This can be configured in kafka section of values.dataspace.yaml in Helm chart directory.
Kafka configuration:
kafka:
replicas: 3
config:
auto.create.topics.enable: false
default.replication.factor: 3
min.insync.replicas: 2
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
log.message.format.version: "3.0"
inter.broker.protocol.version: "3.0"
storage:
size: 80Gi
resources:
limits:
cpu: 2
memory: 4Gi
requests:
cpu: 200m
memory: 4Gi
jvmOptions:
-Xms: 2048m
-Xmx: 2048m
ZooKeeper configuration:
zookeeper:
replicas: 3
resources:
limits:
cpu: 1
memory: 2Gi
requests:
cpu: 100m
memory: 2Gi
jvmOptions:
-Xms: 1536m
-Xmx: 1536m
storage:
size: 20Gi
Storage configuration
While provisioning Kafka cluster, each Kafka broker is configured to use 25Gi PersistentVolumeClaim. This can be configured in values.dataspace.yaml.
kafka:
storage:
size: 100Gi
PersistentVolumeClaim size can be increased based on the limitations provided in Provisioning section.
Please note that, PersistentVolumeClaim size can be increased; but it cannot be decreased without recreating it which will result in data loss.
Kafka Topics
Strimzi uses Topic Operator to manage Kafka topics. It provides a declarative way to manage Kafka topic using Custom
Resource KafkaTopic. KafkaTopic schema reference can be found here.
While provisioning Kafka cluster, it is configured with two test topics test-topic and test-topic-2, which can be
removed and new Kafka topics can be added.
Kafka topics can be configure in kafkaTopics section of values.dataspace.yaml.
kafkaTopics:
- topicName: test-topic
partitions: 6
replicas: 3
config:
min.insync.replicas: 2
compression.type: zstd
- topicName: test-topic-2
Kafka Users
Strimzi uses User Operator to manage Kafka users and configure access control for it. It provides a declarative way to
manage Kafka users using Custom Resource KafkaUser. KafkaUser schema referenced can be found
here.
AclRule schema for KafkaUser can be found here.
While provisioning Kafka cluster, it is configured with three Kafka users by default. New users can always be added from
values.dataspace.yaml's kafkaUsers section.
-
admin
This user a super user for Kafka and has full access to Kafka cluster. This user is created using kafka-admin.yaml.
-
producer
This user is provided as a demo for configuring ACL for producers. This user can be removed or re-purposed from values.dataspace.yaml.
kafkaUsers:
- username: producer
acls:
- resource:
type: topic
name: test-topic
patternType: literal
operation: Write
type: allow -
consumer
This user is provided as a demo for configuring ACL for consumers. This user can be removed or re-purposed from values.dataspace.yaml.
kafkaUsers:
- username: consumer
acls:
- resource:
type: topic
name: test-topic
patternType: literal
operation: Read
type: allow
- resource:
type: topic
name: test-topic
patternType: literal
operation: Describe
- resource:
type: topic
name: test-topic
patternType: literal
operation: DescribeConfigs
Authentication and Authorization
-
Authentication
Kafka uses Simple Authentication and Security Layer (SASL) for authentication and configures SASL using Java Authentication and Authorization Service (JAAS). While provisioning Kafka cluster is configured to use
SASL/SCRAM-SHA-512with TLS. -
Authorization
While provisioning, Kafka cluster is configured to use
AclAuthorizeras Authorizer.
This configuration can be found in kafka section of values.dataspace.yaml.
kafka:
listeners:
internal:
port: 9092
tls: true
authentication:
type: scram-sha-512
authorization:
enabled: true
type: simple
superUsers:
- admin
JAAS Configuration for Authentication
When creating users with KafkaUser Custom Resource, user credential and is SASL config are stored in Kubernetes
secrets with same name as Kafka user. It can be retrieved from Rancher Web console or using following command:
$ kubectl get secret --namespace <kafka-namespace> <username> -o jsonpath="{.data.password}" | base64 --decode
Following value is used to configure sasl.jaas.config value in JASS configuration file.
$ kubectl get secret --namespace <kafka-namespace> <username> -o jsonpath="{.data.sasl\.jaas\.config}" | base64 --decode
Similarly, when using Kafka with TLS enabled, Kafka cluster certificate can be found in secret named
<kafka-cluster-name>-cluster-ca-cert. This certificate is managed and autogenerated by Strimzi. It can be retrieved
from Rancher Web console or using following command:
$ kubectl get secret --namespace <kafka-namespace> <kafka-cluster-name>-cluster-ca-cert -o jsonpath="{.data.ca\.crt}" | base64 --decode > cluster-ca-cert.crt
Kafka cluster certificate is also stored in PKCS12 format with password in same secret. This can be used to configure Java Authentication and Authorization Service (JAAS) for SASL authentication.
These values are used to configure ssl.truststore.location and ssl.truststore.password in JAAS configuration file.
$ kubectl get secret --namespace <kafka-namespace> <kafka-cluster-name>-cluster-ca-cert -o jsonpath="{.data.ca\.p12}" | base64 --decode > cluster-ca-cert.p12
$ kubectl get secret --namespace <kafka-namespace> <kafka-cluster-name>-cluster-ca-cert -o jsonpath="{.data.ca\.password}" | base64 --decode
JAAS configuration file examples:
-
Using SASL/SCRAM-SHA-512 with TLS.
security.protocol=SASL_SSL
ssl.truststore.location=<truststore-location>
ssl.truststore.password=<truststore-password>
ssl.truststore.type=PKCS12
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=<sasl-jaas-config> -
Using SASL/SCRAM-SHA-512 with PLAINTEXT.
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=<sasl-jaas-config>
Testing Kafka using Kafka Debug Console
A Kafka debug console deployment is provided with Helm chart in order to interact with Kafka cluster. By default, it is disabled and can be enabled from values.dataspace.yaml.
tests:
kafkaDebugConsole: true
This will create a deployment named kafka-debug-console with one pod in Kubernetes. It is configured to generate JAAS
configuration file on startup and it is stored in /tmp/config/admin.properties.
In order to access console from Rancher, go to Cluster Explorer and select the namespace where Kafka cluster is
deployed. Then go to Pods from left menu and click on Execute Shell from kafka-debug-console pod's kebab menu.
Kafka debug console can also be accessed using following command:
$ kubectl exec -n <kafka-namespace> -it <kafka-debug-console-pod-name> -c kafka -- /bin/bash
Once inside the console, basic kafka command can be executed with /tmp/config/admin.properties JAAS config file.
Basic Kafka commands
- List all Kafka topics
$ bin/kafka-topics.sh --bootstrap-server <kafka-cluster-name>-kafka-bootstrap:9092 --command-config /tmp/config/admin.properties --list - Describe topic
$ bin/kafka-topics.sh --bootstrap-server <kafka-cluster-name>-kafka-bootstrap:9092 --command-config /tmp/config/admin.properties --describe --topic test-topic - Write events to topic
$ bin/kafka-console-producer.sh --bootstrap-server <kafka-cluster-name>-kafka-bootstrap:9092 --producer.config /tmp/config/admin.properties --topic test-topic - Read events from beginning
$ bin/kafka-console-consumer.sh --bootstrap-server <kafka-cluster-name>-kafka-bootstrap:9092 --consumer.config /tmp/config/admin.properties --from-beginning --topic test-topic - List ACL
$ bin/kafka-acls.sh --bootstrap-server <kafka-cluster-name>-kafka-bootstrap:9092 --command-config /tmp/config/admin.properties --list
Kafka Connect
Kafka Connect is an integration toolkit for streaming data between Kafka brokers and other systems. The other system is typically an external data source or target, such as a database.
Kafka Connect uses a plugin architecture. Plugins allow connections to other systems and provide additional configuration to manipulate data. Plugins include connectors and other components, such as data converters and transforms. More details about using Kafka Connect with Strimzi can be found here.
The Strimzi container images for Kafka Connect include two built-in file connectors for moving file-based data into and out of your Kafka cluster.
| File Connector | Description |
|---|---|
| FileStreamSourceConnector | Transfers data to your Kafka cluster from a file (the source). |
| FileStreamSinkConnector | Transfers data from your Kafka cluster to a file (the sink). |
Using Custom Kafka Connect Docker image
This procedure shows use custom image with additional connector plug-ins.
-
Create a new Dockerfile using
quay.io/strimzi/kafka:0.27.1-kafka-3.0.0as the base imageFROM quay.io/strimzi/kafka:0.27.1-kafka-3.0.0
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001Example plug-in file
$ tree ./my-plugins/
./my-plugins/
├── debezium-connector-mongodb
│ ├── bson-3.4.2.jar
│ ├── CHANGELOG.md
│ ├── CONTRIBUTE.md
│ ├── COPYRIGHT.txt
│ ├── debezium-connector-mongodb-0.7.1.jar
│ ├── debezium-core-0.7.1.jar
│ ├── LICENSE.txt
│ ├── mongodb-driver-3.4.2.jar
│ ├── mongodb-driver-core-3.4.2.jar
│ └── README.md
├── debezium-connector-mysql
│ ├── CHANGELOG.md
│ ├── CONTRIBUTE.md
│ ├── COPYRIGHT.txt
│ ├── debezium-connector-mysql-0.7.1.jar
│ ├── debezium-core-0.7.1.jar
│ ├── LICENSE.txt
│ ├── mysql-binlog-connector-java-0.13.0.jar
│ ├── mysql-connector-java-5.1.40.jar
│ ├── README.md
│ └── wkb-1.0.2.jar
└── debezium-connector-postgres
├── CHANGELOG.md
├── CONTRIBUTE.md
├── COPYRIGHT.txt
├── debezium-connector-postgres-0.7.1.jar
├── debezium-core-0.7.1.jar
├── LICENSE.txt
├── postgresql-42.0.0.jar
├── protobuf-java-2.6.1.jar
└── README.md -
Build the container image.
Here
<appcode>is three letter appcode and1.0.0is image tag.$ docker build -t docker-<appcode>.repo-hio.cloudone.netapp.com/kafka/kafka-connect:1.0.0 . -
Push custom image to JFrog Artifactory.
$ docker push docker-<appcode>.repo-hio.cloudone.netapp.com/kafka/kafka-connect:1.0.0 -
Enable Kafka Connect and provide image tag in
azure-pipelines.yml.Here
imageTagis a reference to image tag from step 2.template: dbaas/kafka-devexp-stg-4.yml@spaces
parameters:
kafka:
version: 3.0.0
connect:
enabled: true
imageTag: 1.0.0
Kafka Bridge
The Kafka Bridge provides a RESTful interface that allows HTTP-based clients to interact with a Kafka cluster. It offers the advantages of a web API connection to Strimzi, without the need for client applications to interpret the Kafka protocol.
The Kafka Bridge supports HTTP requests to a Kafka cluster, with methods to:
- Send messages to a topic.
- Retrieve messages from topics.
- Retrieve a list of partitions for a topic.
- Create and delete consumers.
- Subscribe consumers to topics, so that they start receiving messages from those topics.
- Retrieve a list of topics that a consumer is subscribed to.
- Unsubscribe consumers from topics.
- Assign partitions to consumers.
- Commit a list of consumer offsets.
- Seek on a partition, so that a consumer starts receiving messages from the first or last offset position, or a given offset position.
To view the API documentation, including example requests and responses, see the Kafka Bridge API reference.
Configuring Kafka Bridge
A Kafka Bridge interacts with Kafka cluster as Kafka client and provides HTTP-based RESTful interface. Thus, it needs to be configured with a Kafka user for it to authenticate itself with Kafka cluster. Kafka Bridge is disabled by default.
Multiple Kafka bridges with different Kafka users can be configured as well.
Kafka Bridge can be configured in kafkaBridge section of values.dataspace.yaml. It also provides ambassador
configuration for exposing Kafka bridge internally and externally.
kafkaBridge:
enabled: false
bridges:
- name: consumer
enabled: true
enableMetrics: false
replicas: 1
authentication:
username: consumer
http:
port: 8080
resources:
limits:
cpu: 1
memory: 2Gi
requests:
cpu: 100m
memory: 2Gi
ambassador:
internal:
enabled: true
prefix: /
# host:
# tls:
# ca:
# crt:
# key:
external:
enabled: false
prefix: /
# host:
# tls:
# ca:
# crt:
# key:
Troubleshooting
If something fails to deploy, the information about why the deployment failed (or was not even initiated) will be found in the logs of the CI/CD pipeline and can be tracked down using the instructions at the link listed in the Continuous Integration and Continuous Delivery Pipelines section.
However, additional information may be required either to better troubleshoot a failed deployment or to investigate the runtime behavior of the Kafka cluster that has been successfully deployed. In those cases, much of the information can be found in Rancher web console.