K8S部署Kafka集群 - 部署笔记
一、基础说明
Kafka和zookeeper是两种典型的有状态的应用集群服务。首先kafka和zookeeper都需要存储盘来保存有状态信息;其次kafka和zookeeper每一个实例都需要有对应的实例Id (Kafka需broker.id, zookeeper需要my.id) 来作为集群内部每个成员的标识,集群内节点之间进行内部通信时需要用到这些标识。
对于这类服务的部署,需要解决两个大的问题:一个是状态保存,另一个是集群管理 (多服务实例管理)。kubernetes中提的StatefulSet方便了有状态集群服务在上的部署和管理。通常来说,通过下面三个手段来实现有状态集群服务的部署:
- 通过Init Container来做集群的初始化工 作。
- 通过Headless Service来维持集群成员的稳定关系。
- 通过Persistent Volume和Persistent Volume Claim提供网络存储来持久化数据。
因此,在K8S集群里面部署类似kafka、zookeeper这种有状态的服务,不能使用Deployment,必须使用StatefulSet来部署,有状态简单来说就是需要持久化数据,比如日志、数据库数据、服务状态等。
StatefulSet 应用场景:
- 稳定的持久化存储,即Pod重新调度后还是能访问到相同的持久化数据,基于PVC来实现
- 稳定的网络标志,即Pod重新调度后其PodName和HostName不变,基于Headless Service(即没有Cluster IP的Service)来实现
- 有序部署,有序扩展,即Pod是有顺序的,在部署或者扩展的时候要依据定义的顺序依次依次进行(即从0到N-1,在下一个Pod运行之前所有之前的Pod必须都是Running和Ready状态),基于init containers来实现
- 有序收缩,有序删除(即从N-1到0)
StatefulSet组成:
- 用于定义网络标志(DNS domain)的Headless Service
- 用于创建PersistentVolumes的volumeClaimTemplates
- 定义具体应用的StatefulSet
StatefulSet中每个Pod的DNS格式为:
statefulSetName-{0..N-1}.serviceName.namespace.svc.cluster.local,其中:
- statefulSetName为StatefulSet的名字
- 0..N-1为Pod所在的序号,从0开始到N-1
- serviceName为Headless Service的名字
- namespace为服务所在的namespace,Headless Servic和StatefulSet必须在相同的namespace
- svc.cluster.local为K8S的Cluster Domain集群根域
二、部署过程记录(NAS存储)
这里使用K8S搭建一个三节点的kafka容器集群,因为kafka集群需要用到存储,所以需要准备持久卷(Persistent Volume) 简称就是PV。
1. 配置StatefulSet的动态持久化存储
1)使用阿里云NAS存储
阿里云平台创建的NAS存储地址:1*********-beijing.nas.aliyuncs.com
首先在NAS文件系统里创建zk和kafka的子目录。
挂载NAS存储的子目录必须是真实存在的,否则挂载失败,报错不能访问子目录。
创建方法:将NAS存储挂载到任意一个node节点,创建NAS存储的子目录,然后再卸载。 [root@k8s-node04 ~]# ls /mnt/ [root@k8s-node04 ~]# mkdir /mnt/test [root@k8s-node04 ~]# mount -t nfs -o vers=3,nolock,proto=tcp,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2,noresvport 1*********-beijing.nas.aliyuncs.com:/ /mnt/test/ [root@k8s-node04 ~]# cd /mnt/test/ [root@k8s-node04 test]# mkdir kafka [root@k8s-node04 test]# mkdir zk [root@k8s-node04 test]# ll total 2 drwxr-xr-x 2 root root 4096 Jan 24 22:53 kafka drwxr-xr-x 2 root root 4096 Jan 24 22:53 zk [root@k8s-node04 ~]# df -h|grep mnt 1*********-beijing.nas.aliyuncs.com:/ 10P 0 10P 0% /mnt/test [root@k8s-node04 ~]# umount /mnt/test [root@k8s-node04 ~]# rm -rf /mnt/test [root@k8s-node04 ~]# ls /mnt/
2)创建nfs的rbac
[root@k8s-master01 ~]# mkdir -p /opt/k8s/k8s_project/kafka_zk [root@k8s-master01 ~]# cd /opt/k8s/k8s_project/kafka_zk [root@k8s-master01 kafka_zk]# vim nfs-rbac.yaml --- apiVersion: v1 kind: ServiceAccount metadata: name: nfs-provisioner namespace: wiseco --- kind: ClusterRole apiVersion: rbac.authorization.k8s.io/v1 metadata: name: nfs-provisioner-runner namespace: wiseco rules: - apiGroups: [""] resources: ["persistentvolumes"] verbs: ["get", "list", "watch", "create", "delete"] - apiGroups: [""] resources: ["persistentvolumeclaims"] verbs: ["get", "list", "watch", "update"] - apiGroups: ["storage.k8s.io"] resources: ["storageclasses"] verbs: ["get", "list", "watch"] - apiGroups: [""] resources: ["events"] verbs: ["watch", "create", "update", "patch"] - apiGroups: [""] resources: ["services", "endpoints"] verbs: ["get","create","list", "watch","update"] - apiGroups: ["extensions"] resources: ["podsecuritypolicies"] resourceNames: ["nfs-provisioner"] verbs: ["use"] --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 metadata: name: run-nfs-provisioner subjects: - kind: ServiceAccount name: nfs-provisioner namespace: wiseco roleRef: kind: ClusterRole name: nfs-provisioner-runner apiGroup: rbac.authorization.k8s.io
创建并查看
[root@k8s-master01 kafka_zk]# kubectl apply -f nfs-rbac.yaml serviceaccount/nfs-provisioner created clusterrole.rbac.authorization.k8s.io/nfs-provisioner-runner created clusterrolebinding.rbac.authorization.k8s.io/run-nfs-provisioner created [root@k8s-master01 kafka_zk]# kubectl get sa -n wiseco|grep nfs nfs-provisioner 1 14s [root@k8s-master01 kafka_zk]# kubectl get clusterrole -n wiseco|grep nfs nfs-provisioner-runner 2021-01-04T13:03:55Z [root@k8s-master01 kafka_zk]# kubectl get clusterrolebinding -n wiseco|grep nfs run-nfs-provisioner ClusterRole/nfs-provisioner-runner 36s
3)创建zk集群的storageclass
[root@k8s-master01 kafka_zk]# mkdir zk [root@k8s-master01 kafka_zk]# cd zk [root@k8s-master01 zk]# pwd /opt/k8s/k8s_project/kafka_zk/zk [root@k8s-master01 zk]# cat zk-nfs-class.yaml apiVersion: storage.k8s.io/v1beta1 kind: StorageClass metadata: name: zk-nfs-storage namespace: wiseco mountOptions: - vers=4.0 - nolock,tcp,noresvport provisioner: zk/nfs reclaimPolicy: Retain
创建并查看
[root@k8s-master01 zk]# kubectl apply -f zk-nfs-class.yaml storageclass.storage.k8s.io/zk-nfs-storage created [root@k8s-master01 zk]# kubectl get sc -n wiseco NAME PROVISIONER RECLAIMPOLICY VOLUMEBINDINGMODE ALLOWVOLUMEEXPANSION AGE zk-nfs-storage zk/nfs Retain Immediate false 28s
4)创建kafka集群的storageclass
[root@k8s-master01 zk]# mkdir ../kafka [root@k8s-master01 zk]# cd ../kafka [root@k8s-master01 kafka]# pwd /opt/k8s/k8s_project/kafka_zk/kafka [root@k8s-master01 kafka]# cat kafka-nfs-class.yaml apiVersion: storage.k8s.io/v1beta1 kind: StorageClass metadata: name: kafka-nfs-storage namespace: wiseco mountOptions: - vers=4.0 - nolock,tcp,noresvport provisioner: kafka/nfs reclaimPolicy: Retain
创建并查看
[root@k8s-master01 kafka]# kubectl apply -f kafka-nfs-class.yaml storageclass.storage.k8s.io/kafka-nfs-storage created [root@k8s-master01 kafka]# kubectl get sc -n wiseco NAME PROVISIONER RECLAIMPOLICY VOLUMEBINDINGMODE ALLOWVOLUMEEXPANSION AGE kafka-nfs-storage kafka/nfs Retain Immediate false 4s zk-nfs-storage zk/nfs Retain Immediate false 12m
5)创建zk集群的nfs-client-provisioner
[root@k8s-master01 zk]# pwd /opt/k8s/k8s-project/kafka_zk/zk [root@k8s-master01 zk]# cat zk-nfs.yml apiVersion: apps/v1 kind: Deployment metadata: name: zk-nfs-client-provisioner namespace: wiseco spec: replicas: 1 selector: matchLabels: app: zk-nfs-client-provisioner strategy: type: Recreate template: metadata: labels: app: zk-nfs-client-provisioner spec: serviceAccount: nfs-provisioner containers: - name: zk-nfs-client-provisioner image: registry.cn-hangzhou.aliyuncs.com/open-ali/nfs-client-provisioner imagePullPolicy: IfNotPresent volumeMounts: - name: nfs-client-root mountPath: /persistentvolumes env: - name: PROVISIONER_NAME value: zk/nfs - name: NFS_SERVER value: 1*********-beijing.nas.aliyuncs.com - name: NFS_PATH value: /zk volumes: - name: nfs-client-root nfs: server: 1*********-beijing.nas.aliyuncs.com path: /zk
创建并查看
[root@k8s-master01 zk]# kubectl apply -f zk-nfs.yml deployment.apps/zk-nfs-client-provisioner created [root@k8s-master01 zk]# kubectl get pods -n wiseco|grep nfs zk-nfs-client-provisioner-bd8d65798-qrz87 1/1 Running 0 39s
6)创建kafka集群的nfs-client-provisioner
[root@k8s-master01 kafka]# pwd /opt/k8s/k8s-project/kafka_zk/kafka [root@k8s-master01 kafka]# cat kafka-nfs.yml apiVersion: apps/v1 kind: Deployment metadata: name: kafka-nfs-client-provisioner namespace: wiseco spec: replicas: 1 selector: matchLabels: app: kafka-nfs-client-provisioner strategy: type: Recreate template: metadata: labels: app: kafka-nfs-client-provisioner spec: serviceAccount: nfs-provisioner containers: - name: kafka-nfs-client-provisioner image: registry.cn-hangzhou.aliyuncs.com/open-ali/nfs-client-provisioner imagePullPolicy: IfNotPresent volumeMounts: - name: nfs-client-root mountPath: /persistentvolumes env: - name: PROVISIONER_NAME value: kafka/nfs - name: NFS_SERVER value: 1*********-beijing.nas.aliyuncs.com - name: NFS_PATH value: /kafka volumes: - name: nfs-client-root nfs: server: 1*********-beijing.nas.aliyuncs.com path: /kafka
创建并查看
[root@k8s-master01 kafka]# kubectl apply -f kafka-nfs.yml deployment.apps/kafka-nfs-client-provisioner created [root@k8s-master01 kafka]# kubectl get pods -n wiseco|grep nfs kafka-nfs-client-provisioner-6747dc587c-8qjn7 1/1 Running 0 16s zk-nfs-client-provisioner-bd8d65798-cdf6h 1/1 Running 0 47s
2. 创建ZK集群
[root@k8s-master01 zk]# pwd /opt/k8s/k8s_project/kafka_zk/zk [root@k8s-master01 zk]# ls zk-nfs-class.yaml zk-nfs.yml [root@k8s-master01 zk]# vim zk.yaml apiVersion: v1 kind: Service metadata: namespace: wiseco name: zk-hs labels: app: zk spec: ports: - port: 2888 name: server - port: 3888 name: leader-election clusterIP: None selector: app: zk --- apiVersion: v1 kind: Service metadata: namespace: wiseco name: zk-cs labels: app: zk spec: type: NodePort ports: - port: 2181 targetPort: 2181 name: client nodePort: 32181 selector: app: zk --- apiVersion: policy/v1beta1 kind: PodDisruptionBudget metadata: namespace: wiseco name: zk-pdb spec: selector: matchLabels: app: zk maxUnavailable: 1 --- apiVersion: apps/v1 kind: StatefulSet metadata: namespace: wiseco name: zok spec: serviceName: zk-hs replicas: 3 selector: matchLabels: app: zk template: metadata: labels: app: zk spec: affinity: podAntiAffinity: requiredDuringSchedulingIgnoredDuringExecution: - labelSelector: matchExpressions: - key: "app" operator: In values: - zk topologyKey: "kubernetes.io/hostname" containers: - name: kubernetes-zookeeper imagePullPolicy: Always image: leolee32/kubernetes-library:kubernetes-zookeeper1.0-3.4.10 resources: requests: memory: "1024Mi" cpu: "500m" ports: - containerPort: 2181 name: client - containerPort: 2888 name: server - containerPort: 3888 name: leader-election command: - sh - -c - "start-zookeeper \ --servers=3 \ --data_dir=/var/lib/zookeeper/data \ --data_log_dir=/var/lib/zookeeper/data/log \ --conf_dir=/opt/zookeeper/conf \ --client_port=2181 \ --election_port=3888 \ --server_port=2888 \ --tick_time=2000 \ --init_limit=10 \ --sync_limit=5 \ --heap=512M \ --max_client_cnxns=60 \ --snap_retain_count=3 \ --purge_interval=12 \ --max_session_timeout=40000 \ --min_session_timeout=4000 \ --log_level=INFO" readinessProbe: exec: command: - sh - -c - "zookeeper-ready 2181" initialDelaySeconds: 10 timeoutSeconds: 5 livenessProbe: exec: command: - sh - -c - "zookeeper-ready 2181" initialDelaySeconds: 10 timeoutSeconds: 5 volumeMounts: - name: datadir mountPath: /var/lib/zookeeper volumeClaimTemplates: - metadata: name: datadir annotations: volume.beta.kubernetes.io/storage-class: "zk-nfs-storage" spec: accessModes: - ReadWriteMany resources: requests: storage: 10Gi
创建并查看
[root@k8s-master01 zk]# kubectl apply -f zk.yaml service/zk-hs created service/zk-cs created poddisruptionbudget.policy/zk-pdb created statefulset.apps/zok created
查看创建的zk集群信息
查看zk集群pod [root@k8s-master01 zk]# kubectl get pods -n wiseco|grep zok zok-0 1/1 Running 0 2m1s zok-1 1/1 Running 0 88s zok-2 1/1 Running 0 51s 查看zk集群的svc [root@k8s-master01 zk]# kubectl get svc -n wiseco|grep zk zk-cs NodePort 10.254.223.248 <none> 2181:32181/TCP 2m24s zk-hs ClusterIP None <none> 2888/TCP,3888/TCP 2m25s 查看zk集群的pv和pvc 其中: PV是集群级别的,查询时可以不需要跟"-n 命名空间" PVC是namespace命名空间级别的,查询时需要跟"-n 命名空间" [root@k8s-master01 zk]# kubectl get pv -n wiseco|grep zok pvc-2bace45f-4567-4751-a7ee-233c7034bb09 10Gi RWX Delete Bound wiseco/datadir-zok-0 zk-nfs-storage 2m59s pvc-e5a6e15a-5765-4889-b4e0-0a3b03856c1e 10Gi RWX Delete Bound wiseco/datadir-zok-1 zk-nfs-storage 2m24s pvc-e6f00ae5-aaf3-496f-8012-c4bf714f9219 10Gi RWX Delete Bound wiseco/datadir-zok-2 zk-nfs-storage 107s [root@k8s-master01 zk]# kubectl get pvc -n wiseco|grep zok datadir-zok-0 Bound pvc-2bace45f-4567-4751-a7ee-233c7034bb09 10Gi RWX zk-nfs-storage 3m2s datadir-zok-1 Bound pvc-e5a6e15a-5765-4889-b4e0-0a3b03856c1e 10Gi RWX zk-nfs-storage 2m29s datadir-zok-2 Bound pvc-e6f00ae5-aaf3-496f-8012-c4bf714f9219 10Gi RWX zk-nfs-storage 112s
到NFS服务器端,查看zk集群的持久化共享目录
可以发现,NFS的持久化目录名称组成:namespace名称-PVC名称-PV名称
只要PVC和PC不删除,这个持久化目录名称就不会变。
[root@k8s-harbor01 ~]# cd /data/storage/k8s/zk/ [root@k8s-harbor01 zk]# ll total 0 drwxrwxrwx 3 root root 18 Jan 4 22:22 wiseco-datadir-zok-0-pvc-2bace45f-4567-4751-a7ee-233c7034bb09 drwxrwxrwx 3 root root 18 Jan 4 22:23 wiseco-datadir-zok-1-pvc-e5a6e15a-5765-4889-b4e0-0a3b03856c1e drwxrwxrwx 3 root root 18 Jan 4 22:23 wiseco-datadir-zok-2-pvc-e6f00ae5-aaf3-496f-8012-c4bf714f9219 [root@k8s-harbor01 zk]# ls * wiseco-datadir-zok-0-pvc-2bace45f-4567-4751-a7ee-233c7034bb09: data wiseco-datadir-zok-1-pvc-e5a6e15a-5765-4889-b4e0-0a3b03856c1e: data wiseco-datadir-zok-2-pvc-e6f00ae5-aaf3-496f-8012-c4bf714f9219: data [root@k8s-harbor01 zk]# ls */* wiseco-datadir-zok-0-pvc-2bace45f-4567-4751-a7ee-233c7034bb09/data: log myid version-2 wiseco-datadir-zok-1-pvc-e5a6e15a-5765-4889-b4e0-0a3b03856c1e/data: log myid version-2 wiseco-datadir-zok-2-pvc-e6f00ae5-aaf3-496f-8012-c4bf714f9219/data: log myid version-2 [root@k8s-harbor01 zk]# cat wiseco-datadir-zok-0-pvc-2bace45f-4567-4751-a7ee-233c7034bb09/data/myid 1 [root@k8s-harbor01 zk]# cat wiseco-datadir-zok-1-pvc-e5a6e15a-5765-4889-b4e0-0a3b03856c1e/data/myid 2 [root@k8s-harbor01 zk]# cat wiseco-datadir-zok-2-pvc-e6f00ae5-aaf3-496f-8012-c4bf714f9219/data/myid 3
查看zk集群主从关系
[root@k8s-master01 kafka]# kubectl get pods -n wiseco|grep zok zok-0 1/1 Running 0 17h zok-1 1/1 Running 0 17h zok-2 1/1 Running 0 17h [root@k8s-master01 kafka]# kubectl exec -ti zok-0 -n wiseco -- zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/bin/../etc/zookeeper/zoo.cfg Mode: follower [root@k8s-master01 kafka]# kubectl exec -ti zok-1 -n wiseco -- zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/bin/../etc/zookeeper/zoo.cfg Mode: leader [root@k8s-master01 kafka]# kubectl exec -ti zok-2 -n wiseco -- zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/bin/../etc/zookeeper/zoo.cfg Mode: follower
验证zk集群连接
[root@k8s-master01 kafka]# kubectl get svc -n wiseco|grep zk zk-cs NodePort 10.254.223.248 <none> 2181:32181/TCP 17h zk-hs ClusterIP None <none> 2888/TCP,3888/TCP 17h [root@k8s-master01 ~]# kubectl exec -ti zok-1 -n wiseco -- zkCli.sh -server zk-cs:2181 Connecting to zk-cs:2181 2021-01-05 08:16:10,169 [myid:] - INFO [main:Environment@100] - Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT 2021-01-05 08:16:10,173 [myid:] - INFO [main:Environment@100] - Client environment:host.name=zok-1.zk-hs.wiseco.svc.cluster.local 2021-01-05 08:16:10,173 [myid:] - INFO [main:Environment@100] - Client environment:java.version=1.8.0_131 2021-01-05 08:16:10,175 [myid:] - INFO [main:Environment@100] - Client environment:java.vendor=Oracle Corporation 2021-01-05 08:16:10,175 [myid:] - INFO [main:Environment@100] - Client environment:java.home=/usr/lib/jvm/java-8-openjdk-amd64/jre 2021-01-05 08:16:10,175 [myid:] - INFO [main:Environment@100] - Client environment:java.class.path=/usr/bin/../build/classes:/usr/bin/../build/lib/*.jar:/usr/bin/../share/zookeeper/zookeeper-3.4.10.jar:/usr/bin/../share/zookeeper/slf4j-log4j12-1.6.1.jar:/usr/bin/../share/zookeeper/slf4j-api-1.6.1.jar:/usr/bin/../share/zookeeper/netty-3.10.5.Final.jar:/usr/bin/../share/zookeeper/log4j-1.2.16.jar:/usr/bin/../share/zookeeper/jline-0.9.94.jar:/usr/bin/../src/java/lib/*.jar:/usr/bin/../etc/zookeeper: 2021-01-05 08:16:10,175 [myid:] - INFO [main:Environment@100] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib 2021-01-05 08:16:10,175 [myid:] - INFO [main:Environment@100] - Client environment:java.io.tmpdir=/tmp 2021-01-05 08:16:10,175 [myid:] - INFO [main:Environment@100] - Client environment:java.compiler=<NA> 2021-01-05 08:16:10,175 [myid:] - INFO [main:Environment@100] - Client environment:os.name=Linux 2021-01-05 08:16:10,175 [myid:] - INFO [main:Environment@100] - Client environment:os.arch=amd64 2021-01-05 08:16:10,176 [myid:] - INFO [main:Environment@100] - Client environment:os.version=4.4.243-1.el7.elrepo.x86_64 2021-01-05 08:16:10,176 [myid:] - INFO [main:Environment@100] - Client environment:user.name=root 2021-01-05 08:16:10,176 [myid:] - INFO [main:Environment@100] - Client environment:user.home=/root 2021-01-05 08:16:10,176 [myid:] - INFO [main:Environment@100] - Client environment:user.dir=/ 2021-01-05 08:16:10,177 [myid:] - INFO [main:ZooKeeper@438] - Initiating client connection, connectString=zk-cs:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@22d8cfe0 Welcome to ZooKeeper! JLine support is enabled 2021-01-05 08:16:10,201 [myid:] - INFO [main-SendThread(zk-cs.wiseco.svc.cluster.local:2181):ClientCnxn$SendThread@1032] - Opening socket connection to server zk-cs.wiseco.svc.cluster.local/10.254.223.248:2181. Will not attempt to authenticate using SASL (unknown error) 2021-01-05 08:16:10,261 [myid:] - INFO [main-SendThread(zk-cs.wiseco.svc.cluster.local:2181):ClientCnxn$SendThread@876] - Socket connection established to zk-cs.wiseco.svc.cluster.local/10.254.223.248:2181, initiating session [zk: zk-cs:2181(CONNECTING) 0] 2021-01-05 08:16:10,316 [myid:] - INFO [main-SendThread(zk-cs.wiseco.svc.cluster.local:2181):ClientCnxn$SendThread@1299] - Session establishment complete on server zk-cs.wiseco.svc.cluster.local/10.254.223.248:2181, sessionid = 0x376cdc83254000a, negotiated timeout = 30000 WATCHER:: WatchedEvent state:SyncConnected type:None path:null #到这里,按Enter键 [zk: zk-cs:2181(CONNECTED) 0] ls / [zookeeper] [zk: zk-cs:2181(CONNECTED) 1]
3. 创建KAFKA集群
这里要求能在外部访问到Kafka集群,需要配置三个外网地址,或者一个外网地址+三个端口,依次代理对应后端三个Kafka的Pod容器实例;外网连接Kafka,需要配置使用advertised.listeners监听器;
- advertised_listeners 监听器是对外暴露的服务端口,kafka组件之间通讯用的是 listeners;
- advertised_listeners 监听器会注册在 zookeeper 中;
- 当我们通过对"<公网ip>:端口"请求建立连接,kafka服务器会通过 zookeeper 中注册的监听器,找到定义的公网监听器(默认监听器是PLAINTEXT、也可以自定义),然后通过listeners中找到对应的"通讯ip和端口";
1)kafka 2.12-2.5.0版本的Dockerfile镜像制作
FROM 192.168.10.10/wiseco/jdk1.8.0_192 RUN rm -f /etc/localtime \ && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \ && echo "Asia/Shanghai" > /etc/timezone ENV LANG en_US.UTF-8 ENV KAFKA_DATA_DIR /var/lib/kafka/data ENV JAVA_HOME /usr/java/jdk1.8.0_192 ENV KAFKA_HOME /opt/kafka ENV PATH /usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/kafka/bin WORKDIR /opt RUN yum install -y wget \ && wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz \ && tar -zvxf kafka_2.12-2.5.0.tgz \ && ln -s /opt/kafka_2.12-2.5.0 /opt/kafka \ && rm -rf kafka_2.12-2.5.0.tgz \ && mkdir -p /var/lib/kafka/data CMD ["/bin/bash"]
制作镜像并上传到Harbor参考
# docker build -t 192.168.10.10/wiseco/kafka:v2.12-2.5.0 . # docker push 192.168.10.10/wiseco/kafka:v2.12-2.5.0
2)创建kafka集群
[root@k8s-master01 ~]# mkdir -p /opt/k8s/k8s_project/kafka/ [root@k8s-master01 ~]# cd /opt/k8s/k8s_project/kafka/
kafka1.yaml 文件内容:
[root@k8s-master01 kafka]# vim kafka1.yaml apiVersion: v1 kind: Service metadata: namespace: wiseco name: kafka-hs1 labels: app: kafka1 spec: ports: - port: 1099 name: jmx1 clusterIP: None selector: app: kafka1 --- apiVersion: v1 kind: Service metadata: namespace: wiseco name: kafka-cs1 labels: app: kafka1 spec: type: NodePort ports: - port: 9092 targetPort: 9092 name: client1 nodePort: 32092 selector: app: kafka1 --- apiVersion: apps/v1 kind: StatefulSet metadata: namespace: wiseco name: kafoka1 spec: serviceName: kafka-hs1 replicas: 1 selector: matchLabels: app: kafka1 template: metadata: labels: app: kafka1 spec: containers: - name: k8skafka imagePullPolicy: Always image: 192.168.10.10/wiseco/kafka:v2.12-2.5.0 ports: - containerPort: 9092 name: client1 - containerPort: 1099 name: jmx1 command: - sh - -c - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=0 \ --override advertised.listeners=PLAINTEXT://156.56.46.37:32092 \ --override zookeeper.connect=zok-0.zk-hs.wiseco.svc.cluster.local:2181,zok-1.zk-hs.wiseco.svc.cluster.local:2181,zok-2.zk-hs.wiseco.svc.cluster.local:2181 \ --override log.dirs=/var/lib/kafka \ --override auto.create.topics.enable=true \ --override auto.leader.rebalance.enable=true \ --override background.threads=10 \ --override compression.type=producer \ --override delete.topic.enable=true \ --override leader.imbalance.check.interval.seconds=300 \ --override leader.imbalance.per.broker.percentage=10 \ --override log.flush.interval.messages=9223372036854775807 \ --override log.flush.offset.checkpoint.interval.ms=60000 \ --override log.flush.scheduler.interval.ms=9223372036854775807 \ --override log.retention.bytes=-1 \ --override log.retention.hours=168 \ --override log.roll.hours=168 \ --override log.roll.jitter.hours=0 \ --override log.segment.bytes=1073741824 \ --override log.segment.delete.delay.ms=60000 \ --override message.max.bytes=1000012 \ --override min.insync.replicas=1 \ --override num.io.threads=8 \ --override num.network.threads=3 \ --override num.recovery.threads.per.data.dir=1 \ --override num.replica.fetchers=1 \ --override offset.metadata.max.bytes=4096 \ --override offsets.commit.required.acks=-1 \ --override offsets.commit.timeout.ms=5000 \ --override offsets.load.buffer.size=5242880 \ --override offsets.retention.check.interval.ms=600000 \ --override offsets.retention.minutes=1440 \ --override offsets.topic.compression.codec=0 \ --override offsets.topic.num.partitions=50 \ --override offsets.topic.replication.factor=3 \ --override offsets.topic.segment.bytes=104857600 \ --override queued.max.requests=500 \ --override quota.consumer.default=9223372036854775807 \ --override quota.producer.default=9223372036854775807 \ --override replica.fetch.min.bytes=1 \ --override replica.fetch.wait.max.ms=500 \ --override replica.high.watermark.checkpoint.interval.ms=5000 \ --override replica.lag.time.max.ms=10000 \ --override replica.socket.receive.buffer.bytes=65536 \ --override replica.socket.timeout.ms=30000 \ --override request.timeout.ms=30000 \ --override socket.receive.buffer.bytes=102400 \ --override socket.request.max.bytes=104857600 \ --override socket.send.buffer.bytes=102400 \ --override unclean.leader.election.enable=true \ --override zookeeper.session.timeout.ms=6000 \ --override zookeeper.set.acl=false \ --override broker.id.generation.enable=true \ --override connections.max.idle.ms=600000 \ --override controlled.shutdown.enable=true \ --override controlled.shutdown.max.retries=3 \ --override controlled.shutdown.retry.backoff.ms=5000 \ --override controller.socket.timeout.ms=30000 \ --override default.replication.factor=1 \ --override fetch.purgatory.purge.interval.requests=1000 \ --override group.max.session.timeout.ms=300000 \ --override group.min.session.timeout.ms=6000 \ --override log.cleaner.backoff.ms=15000 \ --override log.cleaner.dedupe.buffer.size=134217728 \ --override log.cleaner.delete.retention.ms=86400000 \ --override log.cleaner.enable=true \ --override log.cleaner.io.buffer.load.factor=0.9 \ --override log.cleaner.io.buffer.size=524288 \ --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \ --override log.cleaner.min.cleanable.ratio=0.5 \ --override log.cleaner.min.compaction.lag.ms=0 \ --override log.cleaner.threads=1 \ --override log.cleanup.policy=delete \ --override log.index.interval.bytes=4096 \ --override log.index.size.max.bytes=10485760 \ --override log.message.timestamp.difference.max.ms=9223372036854775807 \ --override log.message.timestamp.type=CreateTime \ --override log.preallocate=false \ --override log.retention.check.interval.ms=300000 \ --override max.connections.per.ip=2147483647 \ --override num.partitions=1 \ --override producer.purgatory.purge.interval.requests=1000 \ --override replica.fetch.backoff.ms=1000 \ --override replica.fetch.max.bytes=1048576 \ --override replica.fetch.response.max.bytes=10485760 \ --override reserved.broker.max.id=1000 " env: - name: KAFKA_HEAP_OPTS value : "-Xmx512M -Xms512M" - name: KAFKA_OPTS value: "-Dlogging.level=INFO" volumeMounts: - name: kafkadatadir mountPath: /var/lib/kafka lifecycle: postStart: exec: command: ["/bin/sh","-c","touch /tmp/health"] livenessProbe: exec: command: ["test","-e","/tmp/health"] initialDelaySeconds: 5 timeoutSeconds: 5 periodSeconds: 10 readinessProbe: tcpSocket: port: client1 initialDelaySeconds: 15 timeoutSeconds: 5 periodSeconds: 20 volumeClaimTemplates: - metadata: name: kafkadatadir annotations: volume.beta.kubernetes.io/storage-class: "kafka-nfs-storage" spec: accessModes: - ReadWriteMany resources: requests: storage: 10Gi
kafka2.yaml 文件内容:
[root@k8s-master01 kafka]# vim kafka2.yaml apiVersion: v1 kind: Service metadata: namespace: wiseco name: kafka-hs2 labels: app: kafka2 spec: ports: - port: 1099 name: jmx2 clusterIP: None selector: app: kafka2 --- apiVersion: v1 kind: Service metadata: namespace: wiseco name: kafka-cs2 labels: app: kafka2 spec: type: NodePort ports: - port: 9092 targetPort: 9092 name: client2 nodePort: 32093 selector: app: kafka2 --- apiVersion: apps/v1 kind: StatefulSet metadata: namespace: wiseco name: kafoka2 spec: serviceName: kafka-hs2 replicas: 1 selector: matchLabels: app: kafka2 template: metadata: labels: app: kafka2 spec: containers: - name: k8skafka imagePullPolicy: Always image: 192.168.10.10/wiseco/kafka:v2.12-2.5.0 ports: - containerPort: 9092 name: client2 - containerPort: 1099 name: jmx2 command: - sh - -c - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=1 \ --override advertised.listeners=PLAINTEXT://156.56.46.37:32093 \ --override zookeeper.connect=zok-0.zk-hs.wiseco.svc.cluster.local:2181,zok-1.zk-hs.wiseco.svc.cluster.local:2181,zok-2.zk-hs.wiseco.svc.cluster.local:2181 \ --override log.dirs=/var/lib/kafka \ --override auto.create.topics.enable=true \ --override auto.leader.rebalance.enable=true \ --override background.threads=10 \ --override compression.type=producer \ --override delete.topic.enable=true \ --override leader.imbalance.check.interval.seconds=300 \ --override leader.imbalance.per.broker.percentage=10 \ --override log.flush.interval.messages=9223372036854775807 \ --override log.flush.offset.checkpoint.interval.ms=60000 \ --override log.flush.scheduler.interval.ms=9223372036854775807 \ --override log.retention.bytes=-1 \ --override log.retention.hours=168 \ --override log.roll.hours=168 \ --override log.roll.jitter.hours=0 \ --override log.segment.bytes=1073741824 \ --override log.segment.delete.delay.ms=60000 \ --override message.max.bytes=1000012 \ --override min.insync.replicas=1 \ --override num.io.threads=8 \ --override num.network.threads=3 \ --override num.recovery.threads.per.data.dir=1 \ --override num.replica.fetchers=1 \ --override offset.metadata.max.bytes=4096 \ --override offsets.commit.required.acks=-1 \ --override offsets.commit.timeout.ms=5000 \ --override offsets.load.buffer.size=5242880 \ --override offsets.retention.check.interval.ms=600000 \ --override offsets.retention.minutes=1440 \ --override offsets.topic.compression.codec=0 \ --override offsets.topic.num.partitions=50 \ --override offsets.topic.replication.factor=3 \ --override offsets.topic.segment.bytes=104857600 \ --override queued.max.requests=500 \ --override quota.consumer.default=9223372036854775807 \ --override quota.producer.default=9223372036854775807 \ --override replica.fetch.min.bytes=1 \ --override replica.fetch.wait.max.ms=500 \ --override replica.high.watermark.checkpoint.interval.ms=5000 \ --override replica.lag.time.max.ms=10000 \ --override replica.socket.receive.buffer.bytes=65536 \ --override replica.socket.timeout.ms=30000 \ --override request.timeout.ms=30000 \ --override socket.receive.buffer.bytes=102400 \ --override socket.request.max.bytes=104857600 \ --override socket.send.buffer.bytes=102400 \ --override unclean.leader.election.enable=true \ --override zookeeper.session.timeout.ms=6000 \ --override zookeeper.set.acl=false \ --override broker.id.generation.enable=true \ --override connections.max.idle.ms=600000 \ --override controlled.shutdown.enable=true \ --override controlled.shutdown.max.retries=3 \ --override controlled.shutdown.retry.backoff.ms=5000 \ --override controller.socket.timeout.ms=30000 \ --override default.replication.factor=1 \ --override fetch.purgatory.purge.interval.requests=1000 \ --override group.max.session.timeout.ms=300000 \ --override group.min.session.timeout.ms=6000 \ --override log.cleaner.backoff.ms=15000 \ --override log.cleaner.dedupe.buffer.size=134217728 \ --override log.cleaner.delete.retention.ms=86400000 \ --override log.cleaner.enable=true \ --override log.cleaner.io.buffer.load.factor=0.9 \ --override log.cleaner.io.buffer.size=524288 \ --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \ --override log.cleaner.min.cleanable.ratio=0.5 \ --override log.cleaner.min.compaction.lag.ms=0 \ --override log.cleaner.threads=1 \ --override log.cleanup.policy=delete \ --override log.index.interval.bytes=4096 \ --override log.index.size.max.bytes=10485760 \ --override log.message.timestamp.difference.max.ms=9223372036854775807 \ --override log.message.timestamp.type=CreateTime \ --override log.preallocate=false \ --override log.retention.check.interval.ms=300000 \ --override max.connections.per.ip=2147483647 \ --override num.partitions=1 \ --override producer.purgatory.purge.interval.requests=1000 \ --override replica.fetch.backoff.ms=1000 \ --override replica.fetch.max.bytes=1048576 \ --override replica.fetch.response.max.bytes=10485760 \ --override reserved.broker.max.id=1000 " env: - name: KAFKA_HEAP_OPTS value : "-Xmx512M -Xms512M" - name: KAFKA_OPTS value: "-Dlogging.level=INFO" volumeMounts: - name: kafkadatadir mountPath: /var/lib/kafka lifecycle: postStart: exec: command: ["/bin/sh","-c","touch /tmp/health"] livenessProbe: exec: command: ["test","-e","/tmp/health"] initialDelaySeconds: 5 timeoutSeconds: 5 periodSeconds: 10 readinessProbe: tcpSocket: port: client2 initialDelaySeconds: 15 timeoutSeconds: 5 periodSeconds: 20 volumeClaimTemplates: - metadata: name: kafkadatadir annotations: volume.beta.kubernetes.io/storage-class: "kafka-nfs-storage" spec: accessModes: - ReadWriteMany resources: requests: storage: 10Gi
kafka3.yaml 文件内容:
[root@k8s-master01 kafka]# vim kafka3.yaml apiVersion: v1 kind: Service metadata: namespace: wiseco name: kafka-hs3 labels: app: kafka3 spec: ports: - port: 1099 name: jmx3 clusterIP: None selector: app: kafka3 --- apiVersion: v1 kind: Service metadata: namespace: wiseco name: kafka-cs3 labels: app: kafka3 spec: type: NodePort ports: - port: 9092 targetPort: 9092 name: client3 nodePort: 32094 selector: app: kafka3 --- apiVersion: apps/v1 kind: StatefulSet metadata: namespace: wiseco name: kafoka3 spec: serviceName: kafka-hs3 replicas: 1 selector: matchLabels: app: kafka3 template: metadata: labels: app: kafka3 spec: containers: - name: k8skafka imagePullPolicy: Always image: 192.168.10.10/wiseco/kafka:v2.12-2.5.0 ports: - containerPort: 9092 name: client3 - containerPort: 1099 name: jmx3 command: - sh - -c - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=2 \ --override advertised.listeners=PLAINTEXT://156.56.46.37:32094 \ --override zookeeper.connect=zok-0.zk-hs.wiseco.svc.cluster.local:2181,zok-1.zk-hs.wiseco.svc.cluster.local:2181,zok-2.zk-hs.wiseco.svc.cluster.local:2181 \ --override log.dirs=/var/lib/kafka \ --override auto.create.topics.enable=true \ --override auto.leader.rebalance.enable=true \ --override background.threads=10 \ --override compression.type=producer \ --override delete.topic.enable=true \ --override leader.imbalance.check.interval.seconds=300 \ --override leader.imbalance.per.broker.percentage=10 \ --override log.flush.interval.messages=9223372036854775807 \ --override log.flush.offset.checkpoint.interval.ms=60000 \ --override log.flush.scheduler.interval.ms=9223372036854775807 \ --override log.retention.bytes=-1 \ --override log.retention.hours=168 \ --override log.roll.hours=168 \ --override log.roll.jitter.hours=0 \ --override log.segment.bytes=1073741824 \ --override log.segment.delete.delay.ms=60000 \ --override message.max.bytes=1000012 \ --override min.insync.replicas=1 \ --override num.io.threads=8 \ --override num.network.threads=3 \ --override num.recovery.threads.per.data.dir=1 \ --override num.replica.fetchers=1 \ --override offset.metadata.max.bytes=4096 \ --override offsets.commit.required.acks=-1 \ --override offsets.commit.timeout.ms=5000 \ --override offsets.load.buffer.size=5242880 \ --override offsets.retention.check.interval.ms=600000 \ --override offsets.retention.minutes=1440 \ --override offsets.topic.compression.codec=0 \ --override offsets.topic.num.partitions=50 \ --override offsets.topic.replication.factor=3 \ --override offsets.topic.segment.bytes=104857600 \ --override queued.max.requests=500 \ --override quota.consumer.default=9223372036854775807 \ --override quota.producer.default=9223372036854775807 \ --override replica.fetch.min.bytes=1 \ --override replica.fetch.wait.max.ms=500 \ --override replica.high.watermark.checkpoint.interval.ms=5000 \ --override replica.lag.time.max.ms=10000 \ --override replica.socket.receive.buffer.bytes=65536 \ --override replica.socket.timeout.ms=30000 \ --override request.timeout.ms=30000 \ --override socket.receive.buffer.bytes=102400 \ --override socket.request.max.bytes=104857600 \ --override socket.send.buffer.bytes=102400 \ --override unclean.leader.election.enable=true \ --override zookeeper.session.timeout.ms=6000 \ --override zookeeper.set.acl=false \ --override broker.id.generation.enable=true \ --override connections.max.idle.ms=600000 \ --override controlled.shutdown.enable=true \ --override controlled.shutdown.max.retries=3 \ --override controlled.shutdown.retry.backoff.ms=5000 \ --override controller.socket.timeout.ms=30000 \ --override default.replication.factor=1 \ --override fetch.purgatory.purge.interval.requests=1000 \ --override group.max.session.timeout.ms=300000 \ --override group.min.session.timeout.ms=6000 \ --override log.cleaner.backoff.ms=15000 \ --override log.cleaner.dedupe.buffer.size=134217728 \ --override log.cleaner.delete.retention.ms=86400000 \ --override log.cleaner.enable=true \ --override log.cleaner.io.buffer.load.factor=0.9 \ --override log.cleaner.io.buffer.size=524288 \ --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \ --override log.cleaner.min.cleanable.ratio=0.5 \ --override log.cleaner.min.compaction.lag.ms=0 \ --override log.cleaner.threads=1 \ --override log.cleanup.policy=delete \ --override log.index.interval.bytes=4096 \ --override log.index.size.max.bytes=10485760 \ --override log.message.timestamp.difference.max.ms=9223372036854775807 \ --override log.message.timestamp.type=CreateTime \ --override log.preallocate=false \ --override log.retention.check.interval.ms=300000 \ --override max.connections.per.ip=2147483647 \ --override num.partitions=1 \ --override producer.purgatory.purge.interval.requests=1000 \ --override replica.fetch.backoff.ms=1000 \ --override replica.fetch.max.bytes=1048576 \ --override replica.fetch.response.max.bytes=10485760 \ --override reserved.broker.max.id=1000 " env: - name: KAFKA_HEAP_OPTS value : "-Xmx512M -Xms512M" - name: KAFKA_OPTS value: "-Dlogging.level=INFO" volumeMounts: - name: kafkadatadir mountPath: /var/lib/kafka lifecycle: postStart: exec: command: ["/bin/sh","-c","touch /tmp/health"] livenessProbe: exec: command: ["test","-e","/tmp/health"] initialDelaySeconds: 5 timeoutSeconds: 5 periodSeconds: 10 readinessProbe: tcpSocket: port: client3 initialDelaySeconds: 15 timeoutSeconds: 5 periodSeconds: 20 volumeClaimTemplates: - metadata: name: kafkadatadir annotations: volume.beta.kubernetes.io/storage-class: "kafka-nfs-storage" spec: accessModes: - ReadWriteMany resources: requests: storage: 10Gi
创建并查看
[root@k8s-vm01 new_kafka]# kubectl apply -f kafka1.yaml [root@k8s-vm01 new_kafka]# kubectl apply -f kafka2.yaml [root@k8s-vm01 new_kafka]# kubectl apply -f kafka3.yaml [root@k8s-vm01 new_kafka]# kubectl get pods -n wiseco|grep kafoka kafoka1-0 1/1 Running 0 5d21h kafoka2-0 1/1 Running 0 5d21h kafoka3-0 1/1 Running 0 5d21h [root@k8s-vm01 new_kafka]# kubectl get svc -n wiseco|grep kafka kafka-cs1 NodePort 10.254.66.1 <none> 9092:32092/TCP 5d21h kafka-cs2 NodePort 10.254.177.227 <none> 9092:32093/TCP 5d21h kafka-cs3 NodePort 10.254.136.242 <none> 9092:32094/TCP 5d21h kafka-hs1 ClusterIP None <none> 1099/TCP 5d21h kafka-hs2 ClusterIP None <none> 1099/TCP 5d21h kafka-hs3 ClusterIP None <none> 1099/TCP 5d21h [root@k8s-vm01 new_kafka]# kubectl get statefulset -n wiseco|grep kafoka kafoka1 1/1 5d21h kafoka2 1/1 5d21h kafoka3 1/1 5d21h
查看kafka集群的PV、PVC
[root@k8s-vm01 new_kafka]# kubectl get pv -n wiseco|grep kafoka pvc-17aeea13-4f2a-4f10-8f63-67888c468831 10Gi RWX Delete Bound wiseco/kafkadatadir-kafoka1-0 kafka-nfs-storage 6d15h pvc-447d344d-2eaa-4db5-baf8-473ce9811378 10Gi RWX Delete Bound wiseco/kafkadatadir-kafoka2-0 kafka-nfs-storage 6d15h pvc-c3b26e12-3c96-48ca-9019-6211e9e6843b 10Gi RWX Delete Bound wiseco/kafkadatadir-kafoka3-0 kafka-nfs-storage 6d15h [root@k8s-vm01 new_kafka]# kubectl get pvc -n wiseco|grep kafoka kafkadatadir-kafoka1-0 Bound pvc-17aeea13-4f2a-4f10-8f63-67888c468831 10Gi RWX kafka-nfs-storage 6d15h kafkadatadir-kafoka2-0 Bound pvc-447d344d-2eaa-4db5-baf8-473ce9811378 10Gi RWX kafka-nfs-storage 6d15h kafkadatadir-kafoka3-0 Bound pvc-c3b26e12-3c96-48ca-9019-6211e9e6843b 10Gi RWX kafka-nfs-storage 6d15h
查看NFS服务器上,kafka集群的持久化存储
[root@k8s_storage ~]# ll /data/storage/kafka/ total 12 drwxrwxrwx 54 root root 4096 Jan 18 14:30 wiseco-kafkadatadir-kafoka1-0-pvc-17aeea13-4f2a-4f10-8f63-67888c468831 drwxrwxrwx 54 root root 4096 Jan 18 14:30 wiseco-kafkadatadir-kafoka2-0-pvc-447d344d-2eaa-4db5-baf8-473ce9811378 drwxrwxrwx 52 root root 4096 Jan 18 14:30 wiseco-kafkadatadir-kafoka3-0-pvc-c3b26e12-3c96-48ca-9019-6211e9e6843b [root@k8s_storage ~]# cat /data/storage/kafka/wiseco-kafkadatadir-kafoka1-0-pvc-17aeea13-4f2a-4f10-8f63-67888c468831/meta.properties # #Tue Jan 12 17:16:03 CST 2021 cluster.id=8H9vYTT_RBOiwUKqFRvn0w version=0 broker.id=0 [root@k8s_storage ~]# cat /data/storage/kafka/wiseco-kafkadatadir-kafoka2-0-pvc-447d344d-2eaa-4db5-baf8-473ce9811378/meta.properties # #Tue Jan 12 17:17:11 CST 2021 cluster.id=8H9vYTT_RBOiwUKqFRvn0w version=0 broker.id=1 [root@k8s_storage ~]# cat /data/storage/kafka/wiseco-kafkadatadir-kafoka3-0-pvc-c3b26e12-3c96-48ca-9019-6211e9e6843b/meta.properties # #Tue Jan 12 17:17:29 CST 2021 cluster.id=8H9vYTT_RBOiwUKqFRvn0w version=0 broker.id=2
3)验证kafka集群数据的生产和消费
[root@k8s-vm01 new_kafka]# kubectl get svc -n wiseco|grep kafka kafka-cs1 NodePort 10.254.66.1 <none> 9092:32092/TCP 5d21h kafka-cs2 NodePort 10.254.177.227 <none> 9092:32093/TCP 5d21h kafka-cs3 NodePort 10.254.136.242 <none> 9092:32094/TCP 5d21h kafka-hs1 ClusterIP None <none> 1099/TCP 5d21h kafka-hs2 ClusterIP None <none> 1099/TCP 5d21h kafka-hs3 ClusterIP None <none> 1099/TCP 5d21h [root@k8s-vm01 new_kafka]# kubectl get svc -n wiseco|grep zk zk-cs NodePort 10.254.209.162 <none> 2181:32181/TCP 6d16h zk-hs ClusterIP None <none> 2888/TCP,3888/TCP 6d16h
登录到三个kafka集群中的任意一个kafka的pod容器实例(比如kafka2节点),进行kafka数据生产:
[root@k8s-vm01 new_kafka]# kubectl get pods -n wiseco|grep kafoka kafoka1-0 1/1 Running 0 5d21h kafoka2-0 1/1 Running 0 5d21h kafoka3-0 1/1 Running 0 5d21h [root@k8s-vm01 new_kafka]# kubectl exec -ti kafoka2-0 -n wiseco -- /bin/bash [root@kafoka2-0 opt]# 查看topics [root@kafoka2-0 opt]# kafka-topics.sh --list --zookeeper zk-cs:2181 创建topics,比如名称为wisecotest [root@kafoka2-0 opt]# kafka-topics.sh --create --topic wisecotest --zookeeper zk-cs:2181 --partitions 1 --replication-factor 1 Created topic wisecotest. 查看topics [root@kafoka2-0 opt]# kafka-topics.sh --list --zookeeper zk-cs:2181 wisecotest 往上面创建的名称为wisecotest的topics里生产数据 [root@kafoka2-0 opt]# kafka-console-producer.sh --broker-list 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest >test1 >test2 >test3 >
再登录到另外的一个kafka的pod实例,比如kafka3,验证kafka的数据消费
发现上面生产的kafka数据,已经可以消费了 [root@k8s-vm01 ~]# kubectl exec -ti kafoka3-0 -n wiseco -- /bin/bash [root@kafoka3-0 opt]# kafka-console-consumer.sh --bootstrap-server 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest --from-beginning test1 test2 test3 再在kafka2的容器里生产数据 [root@kafoka2-0 opt]# kafka-console-producer.sh --broker-list 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest >test1 >test2 >test3 >hahahaha >heiheihei > 再到kafka3的容器里查看数据消费 [root@kafoka3-0 opt]# kafka-console-consumer.sh --bootstrap-server 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest --from-beginning test1 test2 test3 hahahaha heiheihei 使用一个外网地址也可以消费 [root@kafoka3-0 opt]# kafka-console-consumer.sh --bootstrap-server 156.56.46.37:32094 --topic wisecotest --from-beginning test1 test2 test3 hahahaha heiheihei 使用kafka内网地址,在K8S集群内部也可以正常消费kafka数据 [root@kafoka3-0 opt]# kafka-console-consumer.sh --bootstrap-server kafka-cs1:9092 --topic wisecotest --from-beginning test1 test2 test3 hahahaha heiheihei [root@kafoka3-0 opt]# kafka-console-consumer.sh --bootstrap-server kafka-cs2:9092 --topic wisecotest --from-beginning test1 test2 test3 hahahaha heiheihei 下面告警表示:在当前kafoka3的pod内容使用自己的service地址访问不可达!如果在别的非kafka实例pod内容使用下面地址就没有这个告警了! 因为K8S规定:在Pod内容不可使用Pod自己的service地址往自己连接! [root@kafoka3-0 opt]# kafka-console-consumer.sh --bootstrap-server kafka-cs1:9092,kafka-cs2:9092,kafka-cs3:9093 --topic wisecotest --from-beginning [2021-01-18 14:47:33,820] WARN [Consumer clientId=consumer-console-consumer-4149-1, groupId=console-consumer-4149] Connection to node -3 (kafka-cs3/10.254.136.242:9093) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2021-01-18 14:47:33,821] WARN [Consumer clientId=consumer-console-consumer-4149-1, groupId=console-consumer-4149] Bootstrap broker kafka-cs3:9093 (id: -3 rack: null) disconnected (org.apache.kafka.clients.NetworkClient) test1 test2 test3 hahahaha heiheihei
现在验证在K8S集群外部,通过外网地址连接和消费Kafka数据
登录到安装kafka客户端的一台外部服务器上:
[root@dev-env ~]# cd /usr/kafka/kafka_2.11-2.1.0/bin/ [root@dev-env bin]# 如下发现,在K8S集群外部可以通过外网地址连接和消费内部生产的Kafka数据了 [root@dev-env bin]# ./kafka-console-consumer.sh --bootstrap-server 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest --from-beginning test1 test2 test3 hahahaha heiheihei 在kafka的pod内部再生产数据 [root@kafoka2-0 opt]# kafka-console-producer.sh --broker-list 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest >test1 >test2 >test3 >hahahaha >heiheihei >123456789 >abcdefghijk > 在外部查看,可以正常消费这些kafka数据 [root@dev-env bin]# ./kafka-console-consumer.sh --bootstrap-server 156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094 --topic wisecotest --from-beginning test1 test2 test3 hahahaha heiheihei 123456789 abcdefghijk
kafka集群在内部的连接地址:
kafka-cs1:9092,kafka-cs2:9092,kafka-cs3:9092
kafka集群在外部的连接地址:
156.56.46.37:32092,156.56.46.37:32093,156.56.46.37:32094
目录 返回
首页