在k8s中部署Kafka高可用集群超详细讲解

🐇明明跟你说过:个人主页

🏅个人专栏:《数据流专家:Kafka探索》🏅

🔖行路有良友,便是天堂🔖

目录

一、引言

1、Kafka简介

2、为什么在Kubernetes中部署Kafka

二、Kubernetes基础 

1、Kubernetes概述

2、Pods、Services、Deployments等基本概念

三、Kafka集群架构

1、Kafka集群的组成与工作原理

2、Kafka集群的高可用性设计

四、准备部署环境 

1、准备k8s集群

2、准备StorageClass 

3、准备部署Kafka集群所需的镜像 

五、部署Kafka集群

1、部署Zookeeper集群

2、部署Kafka集群


一、引言

1、Kafka简介

阿帕奇Kafka(Apache Kafka)是一个开源的分布式流处理平台,由LinkedIn开发并于2011年开源,现在是Apache软件基金会的一部分。Kafka的设计初衷是为了处理实时数据流,具备高吞吐量、低延迟、高容错性和可扩展性的特点。

Kafka的核心概念


1. Producer(生产者):

  • 生产者是向Kafka主题(topic)发送消息的应用程序。它们负责将数据推送到Kafka集群。

2. Consumer(消费者):

  • 消费者是从Kafka主题中读取消息的应用程序。它们可以是单个消费者或消费者组(consumer group),每个消费者组中的消费者可以并行地读取数据。

3. Topic(主题):

  • 主题是Kafka中的一个逻辑分类,用于将消息进行分组。每个主题可以细分为多个分区(partition),每个分区内的消息是有序的,但不同分区之间没有顺序保证。

4. Partition(分区):

  • 分区是主题的一个子单元,每个分区可以存储大量消息,并且每个分区可以被多个消费者读取。分区使得Kafka能够水平扩展。

5. Broker(代理):

  • Broker是Kafka集群中的一个服务器节点,负责接收、存储和发送消息。一个Kafka集群可以包含多个broker,以提供高可用性和容错能力。

6. Zookeeper:

  • Kafka使用Zookeeper来进行分布式协调和管理集群状态。Zookeeper负责维护配置信息、分区的leader选举以及消费组的offset管理等。

Kafka的主要功能


1. 消息持久化:

  • Kafka将消息持久化到磁盘,并且允许设置保留策略,确保数据的可靠性。

2. 高吞吐量:

  • Kafka能够处理高吞吐量的数据流,适合处理大规模的数据流应用。

3. 低延迟:

  • Kafka的设计优化了数据传输路径,使得消息传输具有低延迟。

4. 扩展性:

  • Kafka的分区机制允许在集群中增加更多的broker和分区,从而实现水平扩展。

5. 容错性:

  • 通过复制机制,Kafka确保了数据的高可用性,即使个别节点故障,数据仍然能够恢复。

Kafka的应用场景


1. 日志收集和聚合:

  • 作为一种高效的日志收集系统,Kafka可以收集和聚合分布式系统的日志数据。

2. 实时数据流处理:

  • Kafka与流处理框架(如Apache Flink、Apache Storm等)结合,可以进行实时数据分析和处理。

3. 消息队列:

  • Kafka可以充当高吞吐量的消息队列系统,用于不同应用程序之间的解耦和异步通信。

4. 事件源:

  • Kafka可以用作事件源系统,捕获和存储所有变化事件,以便后续处理和回放。

2、为什么在Kubernetes中部署Kafka

  1. 弹性和可伸缩性:Kubernetes提供了强大的自动伸缩和调度功能,可以根据负载情况自动扩展Kafka集群的节点数量,以满足不同工作负载的需求。这种弹性能力使得Kafka能够更好地应对数据量的变化和突发性负载。
  2. 易于管理:Kubernetes提供了统一的管理接口和工具,简化了Kafka集群的部署、扩展、更新和监控。通过Kubernetes的控制面板,管理员可以轻松地管理Kafka集群的生命周期,而无需深入了解底层的部署细节。
  3. 高可用性:Kubernetes具有自动故障检测和恢复机制,可以在节点故障时自动重新调度Kafka的副本,确保数据的高可用性和持久性。
  4. 资源隔离:通过Kubernetes的命名空间和资源限制功能,可以实现Kafka集群与其他应用程序之间的资源隔离,避免因资源竞争导致的性能问题。

 
wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw==编辑

二、Kubernetes基础 

1、Kubernetes概述

Kubernetes(K8s)是一个开源的容器编排平台,最初由Google开发并于2014年发布,现已成为Cloud Native Computing Foundation(CNCF)的一部分。Kubernetes旨在简化容器化应用程序的部署、管理和扩展,提供了丰富的功能和工具,使得用户可以更轻松地构建和运行分布式系统。

Kubernetes的主要功能


1. 自动化部署和扩展:

  • Kubernetes提供了丰富的调度和自动伸缩功能,可以根据负载情况自动调度和扩展应用程序的副本数量,以适应不同的工作负载需求。

2. 服务发现和负载均衡:

  • Kubernetes通过Service对象提供了统一的服务发现和负载均衡功能,使得应用程序可以通过简单的域名访问其他服务,并自动实现负载均衡。

3. 存储管理:

  • Kubernetes支持多种存储后端和存储卷类型,可以为应用程序提供持久化存储和共享存储功能,如PersistentVolume、PersistentVolumeClaim等。

4. 自我修复:

  • Kubernetes具有自我修复和健康检查机制,可以监控容器和节点的健康状态,并在出现故障时自动进行修复和恢复。

   

2、Pods、Services、Deployments等基本概念

1. Pods(容器组)


Pod是Kubernetes中最小的可部署单元,它可以包含一个或多个紧密相关的容器。这些容器共享相同的网络命名空间和存储卷,并且通常在同一节点上运行。Pod提供了一种逻辑主机的概念,使得容器之间可以共享资源和通信。Pod通常用于部署一个应用程序的一组相关容器。

2. Services(服务)


服务是Kubernetes中一种抽象,用于定义一组Pod的逻辑集合,并为它们提供统一的访问入口。服务通过标签选择器(Selector)将请求路由到对应的Pod,并实现负载均衡和服务发现功能。Kubernetes支持多种类型的服务,如ClusterIP、NodePort、LoadBalancer和ExternalName。

3. Deployments(部署)


部署是Kubernetes中用于管理Pod副本数量和应用程序版本更新的对象。部署定义了应用程序的期望状态,包括所需的副本数量、容器镜像和其他配置信息。部署控制器负责根据部署的定义创建、更新和删除Pod实例,同时提供滚动更新和回滚的功能。

三、Kafka集群架构

1、Kafka集群的组成与工作原理

Kafka集群由多个服务器节点(Broker)组成,每个Broker负责存储和处理一部分数据。Kafka的工作原理基于发布/订阅模式,其中生产者(Producer)将消息发布到一个或多个主题(Topic),而消费者(Consumer)订阅这些主题并从Broker拉取消息进行处理。

工作原理


1. 消息存储:

  • 生产者将消息发布到主题,并根据主题的分区策略选择将消息存储到对应的分区中。每个分区的消息按顺序存储,并根据消息的偏移量(Offset)进行标识。

2. 消息复制:

  • Kafka通过副本集机制实现数据的高可用性和持久性。每个分区都有一个主副本(Leader Replica)和多个副本(Follower Replica),主副本负责处理读写请求,而副本用于备份数据。当主副本故障时,Kafka会自动选举一个新的主副本来接管工作。

3. 消息传输:

  • 生产者将消息发送到Broker的主副本,Broker负责将消息复制到副本并进行持久化存储。消费者从Broker的主副本拉取消息进行处理,可以选择从不同的副本读取消息以实现负载均衡和故障恢复。

4. 消息保留策略:

  • Kafka支持设置消息的保留策略,包括基于时间、基于大小或基于日志段的策略。一旦消息达到保留期限或超过指定的大小限制,Kafka将自动删除过期消息,以释放存储空间。

   

2、Kafka集群的高可用性设计

1. 多副本复制

  • Kafka通过在多个Broker之间复制分区的副本来实现数据的冗余和高可用性。每个分区通常有多个副本,其中一个是主副本(Leader Replica),负责处理读写请求;其他副本是从副本(Follower Replica),用于备份数据。当主副本发生故障时,Kafka会自动选举一个新的主副本来接管工作,确保数据的可用性和一致性。

2. 自动故障检测与恢复

  • Kafka集群内置了自动故障检测和恢复机制,可以监控Broker和分区的健康状态,并在发现故障时自动进行故障转移和数据恢复。当Broker或分区发生故障时,Kafka会触发重新选举过程,选举出新的主副本并将数据复制到新的副本中,以实现快速的故障恢复。

3. 副本分布和均衡

  • Kafka会将分区的副本分布在不同的Broker上,以确保数据的冗余和均衡。通过将副本分布在不同的机架、数据中心或云区域中,可以提高系统的容错能力,即使整个机架或数据中心发生故障,系统仍然能够继续运行。

4. 水平扩展和动态伸缩

  • Kafka支持水平扩展和动态伸缩,可以根据负载情况和性能需求来增加或减少Broker的数量和容量。通过添加更多的Broker和分区,可以提高系统的吞吐量和容量,同时降低单点故障的风险,实现更高的可用性和可伸缩性。

   

四、准备部署环境 

1、准备k8s集群

这里我们使用的k8s集群版本为 1.23,也可以使用其他的版本,只是镜像导入命令不通,如果还未搭建k8s集群,请参考《在Centos中搭建 K8s 1.23 集群超详细讲解》这篇文章

2、准备StorageClass 

因为我们要对Kafka中的数据进行持久化,避免Pod漂移后数据丢失,保证数据的完整性与可用性

如果还未创建存储类,请参考《k8s 存储类(StorageClass)创建与动态生成PV解析,(附带镜像)》这篇文件。

3、准备部署Kafka集群所需的镜像 

 在k8s Node节点上执行以下命令

[root@node1 ~]# docker pull zookeeper:3.8
[root@node1 ~]# docker pull kafka:3.1.0
[root@node2 ~]# docker pull zookeeper:3.8
[root@node2 ~]# docker pull kafka:3.1.0
[root@node3 ~]# docker pull zookeeper:3.8
[root@node3 ~]# docker pull kafka:3.1.0

五、部署Kafka集群

1、部署Zookeeper集群

为什么部署kafka前要部署zookeeper?

Kafka依赖Zookeeper来实现分布式协调和配置管理。在Kafka集群中,Zookeeper扮演着多种角色,包括:

  1. 配置管理:Kafka集群的配置信息和元数据存储在Zookeeper中,包括主题(topics)、分区(partitions)、副本(replicas)等配置信息。
  2. Leader选举:Kafka的分区(partitions)被分布式存储在集群中的多个Broker上,每个分区都有一个Leader和多个Follower。Zookeeper负责Leader选举,确保每个分区都有一个活跃的Leader。
  3. Broker注册:Kafka Broker在启动时会向Zookeeper注册自己的信息,包括地址、ID等,以便其他Broker和客户端发现和连接。
  4. 健康监测:Zookeeper监控Kafka集群中各个节点的健康状态,并在节点出现故障或宕机时触发相应的处理操作。

因此,在部署Kafka之前,需要先部署Zookeeper,确保Kafka集群正常运行所需的分布式协调和配置管理功能可用。没有Zookeeper,Kafka无法正常运行,并且无法实现高可用性、数据一致性和故障容错等特性。

编写部署Zookeeper集群的YAML文件

[root@master ~]# vim zook.yaml
# 添加如下内容
apiVersion: v1
kind: Namespace
metadata:name: kafka
---
apiVersion: v1
kind: Service
metadata:name: zookeeper-cluster  #无头服务的名称,需要通过这个获取ip,与主机的对应关系namespace: kafkalabels:app: zookeeper
spec:ports:- port: 2181name: zookeeper- port: 2188name: cluster1- port: 3888name: cluster2clusterIP: Noneselector:app: zookeeper
---
apiVersion: v1
kind: Service
metadata:name: zookeeper-nodeport-service-0namespace: kafka
spec:type: NodePortselector:statefulset.kubernetes.io/pod-name: zookeeper-0ports:- protocol: TCPport: 80        # Service 暴露的端口targetPort: 2181   # Pod 中容器的端口nodePort: 32181   # NodePort 类型的端口范围为 30000-32767,可以根据需要调整
---
apiVersion: v1
kind: Service
metadata:name: zookeeper-nodeport-service-1namespace: kafka
spec:type: NodePortselector:statefulset.kubernetes.io/pod-name: zookeeper-1ports:- protocol: TCPport: 80        # Service 暴露的端口targetPort: 2181   # Pod 中容器的端口nodePort: 32182   # NodePort 类型的端口范围为 30000-32767,可以根据需要调整
---
apiVersion: v1
kind: Service
metadata:name: zookeeper-nodeport-service-2namespace: kafka
spec:type: NodePortselector:statefulset.kubernetes.io/pod-name: zookeeper-2ports:- protocol: TCPport: 80        # Service 暴露的端口targetPort: 2181   # Pod 中容器的端口nodePort: 32183   # NodePort 类型的端口范围为 30000-32767,可以根据需要调整
---
apiVersion: v1
kind: ConfigMap
metadata:name: zookeeper-confignamespace: kafkalabels:app: zookeeper
data:             #具体挂载的配置文件zoo.cfg: |+tickTime=2000initLimit=10syncLimit=5dataDir=/datadataLogDir=/datalogclientPort=2181server.1=zookeeper-0.zookeeper-cluster.kafka:2188:3888server.2=zookeeper-1.zookeeper-cluster.kafka:2188:3888server.3=zookeeper-2.zookeeper-cluster.kafka:2188:38884lw.commands.whitelist=*
---
apiVersion: apps/v1
kind: StatefulSet
metadata:name: zookeepernamespace: kafka
spec:serviceName: "zookeeper-cluster"   #填写无头服务的名称replicas: 3selector:matchLabels:app: zookeepertemplate:metadata:labels:app: zookeeperspec:initContainers:- name: set-zk-idimage: busybox:latestcommand: ['sh', '-c', "hostname | cut -d '-' -f 2 | awk '{print $0 + 1}' > /data/myid"]volumeMounts:- name: datamountPath: /datacontainers:- name: zookeeperimage: zookeeper:3.8imagePullPolicy: Neverresources:requests:memory: "500Mi"cpu: "500m"limits:memory: "1000Mi"cpu: "1000m"ports:- containerPort: 2181name: zookeeper- containerPort: 2188name: cluster1- containerPort: 3888name: cluster2volumeMounts:- name: zook-config            #挂载配置mountPath: /conf/zoo.cfgsubPath: zoo.cfg- name: datamountPath: /dataenv:- name: MY_POD_NAMEvalueFrom:fieldRef:fieldPath: metadata.name  #metadata.name获取自己pod名称添加到变量MY_POD_NAMEvolumes:- name: zook-configconfigMap:                                #configMap挂载name: zookeeper-configvolumeClaimTemplates:                     #这步自动创建pvc,并挂载动态pv- metadata:name: dataspec:accessModes: ["ReadWriteMany"]storageClassName: nfsresources:requests:storage: 10Gi

创建Zookeeper集群

[root@master ~]# kubectl apply -f  zook.yaml

查看Pod状态  

2、部署Kafka集群

编写部署Kafka集群的YAML文件

[root@master ~]# vim kafka.yaml
# 输入如下内容 
apiVersion: v1
kind: Service
metadata:name: kafka-cluster  #无头服务的名称,需要通过这个获取ip,与主机的对应关系namespace: kafkalabels:app: kafka
spec:ports:- port: 9092name: kafkaclusterIP: Noneselector:app: kafka
---
apiVersion: v1
kind: Service
metadata:name: kafka-nodeport-service-0namespace: kafka
spec:type: NodePortselector:statefulset.kubernetes.io/pod-name: kafka0-0ports:- protocol: TCPport: 9092        # Service 暴露的端口targetPort: 9092   # Pod 中容器的端口nodePort: 30092   # NodePort 类型的端口范围为 30000-32767,可以根据需要调整name: kafka
---
apiVersion: v1
kind: Service
metadata:name: kafka-nodeport-service-1namespace: kafka
spec:type: NodePortselector:statefulset.kubernetes.io/pod-name: kafka1-0ports:- protocol: TCPport: 9092        # Service 暴露的端口targetPort: 9092   # Pod 中容器的端口nodePort: 30093   # NodePort 类型的端口范围为 30000-32767,可以根据需要调整name: kafka
---
apiVersion: v1
kind: Service
metadata:name: kafka-nodeport-service-2namespace: kafka
spec:type: NodePortselector:statefulset.kubernetes.io/pod-name: kafka2-0ports:- protocol: TCPport: 9092        # Service 暴露的端口targetPort: 9092   # Pod 中容器的端口nodePort: 30094   # NodePort 类型的端口范围为 30000-32767,可以根据需要调整name: kafka
---
apiVersion: apps/v1
kind: StatefulSet
metadata:name: kafka0namespace: kafka
spec:serviceName: "kafka-cluster"   #填写无头服务的名称replicas: 1selector:matchLabels:app: kafka0template:metadata:labels:app: kafka0spec:containers:- name: kafkaimage: kafka:3.1.0imagePullPolicy: Neverresources:requests:memory: "500Mi"cpu: "500m"limits:memory: "1000Mi"cpu: "2000m"ports:- containerPort: 9092name: kafkacommand:- sh - -c - "exec /app/kafka/bin/kafka-server-start.sh /app/kafka/config/server.properties --override broker.id=0 \--override listeners=PLAINTEXT://:9092 \--override advertised.listeners=PLAINTEXT://192.168.40.181:30092 \--override zookeeper.connect=192.168.40.181:32181,192.168.40.181:32182,192.168.40.181:32183/kafka \--override log.dirs=/var/lib/kafka/data \--override auto.create.topics.enable=true \--override auto.leader.rebalance.enable=true \--override background.threads=10 \--override compression.type=producer \--override delete.topic.enable=true \--override leader.imbalance.check.interval.seconds=300 \--override leader.imbalance.per.broker.percentage=10 \--override log.flush.interval.messages=9223372036854775807 \--override log.flush.offset.checkpoint.interval.ms=60000 \--override log.flush.scheduler.interval.ms=9223372036854775807 \--override log.retention.bytes=-1 \--override log.retention.hours=168 \--override log.roll.hours=168 \--override log.roll.jitter.hours=0 \--override log.segment.bytes=1073741824 \--override log.segment.delete.delay.ms=60000 \--override message.max.bytes=1000012 \--override min.insync.replicas=1 \--override num.io.threads=8 \--override num.network.threads=3 \--override num.recovery.threads.per.data.dir=1 \--override num.replica.fetchers=1 \--override offset.metadata.max.bytes=4096 \--override offsets.commit.required.acks=-1 \--override offsets.commit.timeout.ms=5000 \--override offsets.load.buffer.size=5242880 \--override offsets.retention.check.interval.ms=600000 \--override offsets.retention.minutes=1440 \--override offsets.topic.compression.codec=0 \--override offsets.topic.num.partitions=50 \--override offsets.topic.replication.factor=3 \--override offsets.topic.segment.bytes=104857600 \--override queued.max.requests=500 \--override quota.consumer.default=9223372036854775807 \--override quota.producer.default=9223372036854775807 \--override replica.fetch.min.bytes=1 \--override replica.fetch.wait.max.ms=500 \--override replica.high.watermark.checkpoint.interval.ms=5000 \--override replica.lag.time.max.ms=10000 \--override replica.socket.receive.buffer.bytes=65536 \--override replica.socket.timeout.ms=30000 \--override request.timeout.ms=30000 \--override socket.receive.buffer.bytes=102400 \--override socket.request.max.bytes=104857600 \--override socket.send.buffer.bytes=102400 \--override unclean.leader.election.enable=true \--override zookeeper.session.timeout.ms=6000 \--override zookeeper.set.acl=false \--override broker.id.generation.enable=true \--override connections.max.idle.ms=600000 \--override controlled.shutdown.enable=true \--override controlled.shutdown.max.retries=3 \--override controlled.shutdown.retry.backoff.ms=5000 \--override controller.socket.timeout.ms=30000 \--override default.replication.factor=1 \--override fetch.purgatory.purge.interval.requests=1000 \--override group.max.session.timeout.ms=300000 \--override group.min.session.timeout.ms=6000 \--override log.cleaner.backoff.ms=15000 \--override log.cleaner.dedupe.buffer.size=134217728 \--override log.cleaner.delete.retention.ms=86400000 \--override log.cleaner.enable=true \--override log.cleaner.io.buffer.load.factor=0.9 \--override log.cleaner.io.buffer.size=524288 \--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \--override log.cleaner.min.cleanable.ratio=0.5 \--override log.cleaner.min.compaction.lag.ms=0 \--override log.cleaner.threads=1 \--override log.cleanup.policy=delete \--override log.index.interval.bytes=4096 \--override log.index.size.max.bytes=10485760 \--override log.message.timestamp.difference.max.ms=9223372036854775807 \--override log.message.timestamp.type=CreateTime \--override log.preallocate=false \--override log.retention.check.interval.ms=300000 \--override max.connections.per.ip=2147483647 \--override num.partitions=1 \--override producer.purgatory.purge.interval.requests=1000 \--override replica.fetch.backoff.ms=1000 \--override replica.fetch.max.bytes=1048576 \--override replica.fetch.response.max.bytes=10485760 \--override reserved.broker.max.id=1000"volumeMounts:- name: data0mountPath: /var/lib/kafka/dataenv:- name: MY_POD_NAMEvalueFrom:fieldRef:fieldPath: metadata.name  #metadata.name获取自己pod名称添加到变量MY_POD_NAME- name: ALLOW_PLAINTEXT_LISTENERvalue: "yes"- name: KAFKA_HEAP_OPTSvalue : "-Xms1g -Xmx1g"- name: JMX_PORTvalue: "5555"volumeClaimTemplates:                     #这步自动创建pvc,并挂载动态pv- metadata:name: data0spec:accessModes: ["ReadWriteMany"]storageClassName: nfsresources:requests:storage: 10Gi
---
apiVersion: apps/v1
kind: StatefulSet
metadata:name: kafka1namespace: kafka
spec:serviceName: "kafka-cluster"   #填写无头服务的名称replicas: 1selector:matchLabels:app: kafka1template:metadata:labels:app: kafka1spec:containers:- name: kafkaimage: kafka:3.1.0imagePullPolicy: Neverresources:requests:memory: "500Mi"cpu: "500m"limits:memory: "1000Mi"cpu: "2000m"ports:- containerPort: 9092name: kafkacommand:- sh - -c - "exec /app/kafka/bin/kafka-server-start.sh /app/kafka/config/server.properties --override broker.id=1 \--override listeners=PLAINTEXT://:9092 \--override advertised.listeners=PLAINTEXT://192.168.40.181:30093 \--override zookeeper.connect=192.168.40.181:32181,192.168.40.181:32182,192.168.40.181:32183/kafka \--override log.dirs=/var/lib/kafka/data \--override auto.create.topics.enable=true \--override auto.leader.rebalance.enable=true \--override background.threads=10 \--override compression.type=producer \--override delete.topic.enable=true \--override leader.imbalance.check.interval.seconds=300 \--override leader.imbalance.per.broker.percentage=10 \--override log.flush.interval.messages=9223372036854775807 \--override log.flush.offset.checkpoint.interval.ms=60000 \--override log.flush.scheduler.interval.ms=9223372036854775807 \--override log.retention.bytes=-1 \--override log.retention.hours=168 \--override log.roll.hours=168 \--override log.roll.jitter.hours=0 \--override log.segment.bytes=1073741824 \--override log.segment.delete.delay.ms=60000 \--override message.max.bytes=1000012 \--override min.insync.replicas=1 \--override num.io.threads=8 \--override num.network.threads=3 \--override num.recovery.threads.per.data.dir=1 \--override num.replica.fetchers=1 \--override offset.metadata.max.bytes=4096 \--override offsets.commit.required.acks=-1 \--override offsets.commit.timeout.ms=5000 \--override offsets.load.buffer.size=5242880 \--override offsets.retention.check.interval.ms=600000 \--override offsets.retention.minutes=1440 \--override offsets.topic.compression.codec=0 \--override offsets.topic.num.partitions=50 \--override offsets.topic.replication.factor=3 \--override offsets.topic.segment.bytes=104857600 \--override queued.max.requests=500 \--override quota.consumer.default=9223372036854775807 \--override quota.producer.default=9223372036854775807 \--override replica.fetch.min.bytes=1 \--override replica.fetch.wait.max.ms=500 \--override replica.high.watermark.checkpoint.interval.ms=5000 \--override replica.lag.time.max.ms=10000 \--override replica.socket.receive.buffer.bytes=65536 \--override replica.socket.timeout.ms=30000 \--override request.timeout.ms=30000 \--override socket.receive.buffer.bytes=102400 \--override socket.request.max.bytes=104857600 \--override socket.send.buffer.bytes=102400 \--override unclean.leader.election.enable=true \--override zookeeper.session.timeout.ms=6000 \--override zookeeper.set.acl=false \--override broker.id.generation.enable=true \--override connections.max.idle.ms=600000 \--override controlled.shutdown.enable=true \--override controlled.shutdown.max.retries=3 \--override controlled.shutdown.retry.backoff.ms=5000 \--override controller.socket.timeout.ms=30000 \--override default.replication.factor=1 \--override fetch.purgatory.purge.interval.requests=1000 \--override group.max.session.timeout.ms=300000 \--override group.min.session.timeout.ms=6000 \--override log.cleaner.backoff.ms=15000 \--override log.cleaner.dedupe.buffer.size=134217728 \--override log.cleaner.delete.retention.ms=86400000 \--override log.cleaner.enable=true \--override log.cleaner.io.buffer.load.factor=0.9 \--override log.cleaner.io.buffer.size=524288 \--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \--override log.cleaner.min.cleanable.ratio=0.5 \--override log.cleaner.min.compaction.lag.ms=0 \--override log.cleaner.threads=1 \--override log.cleanup.policy=delete \--override log.index.interval.bytes=4096 \--override log.index.size.max.bytes=10485760 \--override log.message.timestamp.difference.max.ms=9223372036854775807 \--override log.message.timestamp.type=CreateTime \--override log.preallocate=false \--override log.retention.check.interval.ms=300000 \--override max.connections.per.ip=2147483647 \--override num.partitions=1 \--override producer.purgatory.purge.interval.requests=1000 \--override replica.fetch.backoff.ms=1000 \--override replica.fetch.max.bytes=1048576 \--override replica.fetch.response.max.bytes=10485760 \--override reserved.broker.max.id=1000"volumeMounts:- name: data1mountPath: /var/lib/kafka/dataenv:- name: MY_POD_NAMEvalueFrom:fieldRef:fieldPath: metadata.name  #metadata.name获取自己pod名称添加到变量MY_POD_NAME- name: ALLOW_PLAINTEXT_LISTENERvalue: "yes"- name: KAFKA_HEAP_OPTSvalue : "-Xms1g -Xmx1g"- name: JMX_PORTvalue: "5555"volumeClaimTemplates:                     #这步自动创建pvc,并挂载动态pv- metadata:name: data1spec:accessModes: ["ReadWriteMany"]storageClassName: nfsresources:requests:storage: 10Gi
---
apiVersion: apps/v1
kind: StatefulSet
metadata:name: kafka2namespace: kafka
spec:serviceName: "kafka-cluster"   #填写无头服务的名称replicas: 1selector:matchLabels:app: kafka2template:metadata:labels:app: kafka2spec:containers:- name: kafkaimage: kafka:3.1.0imagePullPolicy: Neverresources:requests:memory: "500Mi"cpu: "500m"limits:memory: "1000Mi"cpu: "2000m"ports:- containerPort: 9092name: kafkacommand:- sh - -c - "exec /app/kafka/bin/kafka-server-start.sh /app/kafka/config/server.properties --override broker.id=2 \--override listeners=PLAINTEXT://:9092 \--override advertised.listeners=PLAINTEXT://192.168.40.181:30094 \--override zookeeper.connect=192.168.40.181:32181,192.168.40.181:32182,192.168.40.181:32183/kafka \--override log.dirs=/var/lib/kafka/data \--override auto.create.topics.enable=true \--override auto.leader.rebalance.enable=true \--override background.threads=10 \--override compression.type=producer \--override delete.topic.enable=true \--override leader.imbalance.check.interval.seconds=300 \--override leader.imbalance.per.broker.percentage=10 \--override log.flush.interval.messages=9223372036854775807 \--override log.flush.offset.checkpoint.interval.ms=60000 \--override log.flush.scheduler.interval.ms=9223372036854775807 \--override log.retention.bytes=-1 \--override log.retention.hours=168 \--override log.roll.hours=168 \--override log.roll.jitter.hours=0 \--override log.segment.bytes=1073741824 \--override log.segment.delete.delay.ms=60000 \--override message.max.bytes=1000012 \--override min.insync.replicas=1 \--override num.io.threads=8 \--override num.network.threads=3 \--override num.recovery.threads.per.data.dir=1 \--override num.replica.fetchers=1 \--override offset.metadata.max.bytes=4096 \--override offsets.commit.required.acks=-1 \--override offsets.commit.timeout.ms=5000 \--override offsets.load.buffer.size=5242880 \--override offsets.retention.check.interval.ms=600000 \--override offsets.retention.minutes=1440 \--override offsets.topic.compression.codec=0 \--override offsets.topic.num.partitions=50 \--override offsets.topic.replication.factor=3 \--override offsets.topic.segment.bytes=104857600 \--override queued.max.requests=500 \--override quota.consumer.default=9223372036854775807 \--override quota.producer.default=9223372036854775807 \--override replica.fetch.min.bytes=1 \--override replica.fetch.wait.max.ms=500 \--override replica.high.watermark.checkpoint.interval.ms=5000 \--override replica.lag.time.max.ms=10000 \--override replica.socket.receive.buffer.bytes=65536 \--override replica.socket.timeout.ms=30000 \--override request.timeout.ms=30000 \--override socket.receive.buffer.bytes=102400 \--override socket.request.max.bytes=104857600 \--override socket.send.buffer.bytes=102400 \--override unclean.leader.election.enable=true \--override zookeeper.session.timeout.ms=6000 \--override zookeeper.set.acl=false \--override broker.id.generation.enable=true \--override connections.max.idle.ms=600000 \--override controlled.shutdown.enable=true \--override controlled.shutdown.max.retries=3 \--override controlled.shutdown.retry.backoff.ms=5000 \--override controller.socket.timeout.ms=30000 \--override default.replication.factor=1 \--override fetch.purgatory.purge.interval.requests=1000 \--override group.max.session.timeout.ms=300000 \--override group.min.session.timeout.ms=6000 \--override log.cleaner.backoff.ms=15000 \--override log.cleaner.dedupe.buffer.size=134217728 \--override log.cleaner.delete.retention.ms=86400000 \--override log.cleaner.enable=true \--override log.cleaner.io.buffer.load.factor=0.9 \--override log.cleaner.io.buffer.size=524288 \--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \--override log.cleaner.min.cleanable.ratio=0.5 \--override log.cleaner.min.compaction.lag.ms=0 \--override log.cleaner.threads=1 \--override log.cleanup.policy=delete \--override log.index.interval.bytes=4096 \--override log.index.size.max.bytes=10485760 \--override log.message.timestamp.difference.max.ms=9223372036854775807 \--override log.message.timestamp.type=CreateTime \--override log.preallocate=false \--override log.retention.check.interval.ms=300000 \--override max.connections.per.ip=2147483647 \--override num.partitions=1 \--override producer.purgatory.purge.interval.requests=1000 \--override replica.fetch.backoff.ms=1000 \--override replica.fetch.max.bytes=1048576 \--override replica.fetch.response.max.bytes=10485760 \--override reserved.broker.max.id=1000"volumeMounts:- name: data2mountPath: /var/lib/kafka/dataenv:- name: MY_POD_NAMEvalueFrom:fieldRef:fieldPath: metadata.name  #metadata.name获取自己pod名称添加到变量MY_POD_NAME- name: ALLOW_PLAINTEXT_LISTENERvalue: "yes"- name: KAFKA_HEAP_OPTSvalue : "-Xms1g -Xmx1g"- name: JMX_PORTvalue: "5555"volumeClaimTemplates:                     #这步自动创建pvc,并挂载动态pv- metadata:name: data2spec:accessModes: ["ReadWriteMany"]storageClassName: nfsresources:requests:storage: 10Gi

创建Kafka集群

[root@master ~]# kubectl apply -f  kafka.yaml 

 查看Pod状态

  

至此,Kafka集群部署完成

💕💕💕每一次的分享都是一次成长的旅程,感谢您的陪伴和关注。希望这些关于Kafka的文章能陪伴您走过技术的一段旅程,共同见证成长和进步!😺😺😺

🧨🧨🧨让我们一起在技术的海洋中探索前行,共同书写美好的未来!!! 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/341920.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

金融科技发展报告:移动支付的市场格局、趋势与优缺点

一、引言 随着科技的飞速发展,金融科技已成为推动全球经济发展的重要力量。移动支付作为金融科技的重要分支,其市场格局与趋势日益受到业界的关注。本报告将详细剖析移动支付的市场格局,并深入探讨其发展趋势,同时结合相关案例和数据,为读者呈现移动支付的优缺点,以及国…

idea 中:运行 Application 时出错。命令行过长

一、问题描述: idea 导入新项目,在编译后,运行项目时,报以下错误: 14:47 运行 Application 时出错运行 Application 时出错。命令行过长。通过 JAR 清单或通过类路径文件缩短命令行,然后重新运行。二、问题…

小程序丨最大填表限制如何开启?

老师在新建填表时,希望设置最大数量限制,若填表达到限制,后续的学生将不能继续提交填表。 通过开启【表格最大限制】功能即可实现,下面就来教大家如何制作吧。 🔎如何开启表格最大限制功能? 按照常规流程…

C++结合OpenCV进行图像处理与分类

⭐️我叫忆_恒心,一名喜欢书写博客的在读研究生👨‍🎓。 如果觉得本文能帮到您,麻烦点个赞👍呗! 近期会不断在专栏里进行更新讲解博客~~~ 有什么问题的小伙伴 欢迎留言提问欧,喜欢的小伙伴给个三…

GPT-4o:重塑人机交互的未来

一个愿意伫立在巨人肩膀上的农民...... 一、推出 在人工智能(AI)领域,自然语言处理(NLP)技术一直被视为连接人类与机器的桥梁。近年来,随着深度学习技术的快速发展,NLP领域迎来了前所未有的变革…

使用正则表达式分割字符串

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 split()方法用于实现根据正则表达式分割字符串,并以列表的形式返回。其作用同字符串对象的split()方法类似,所不同的就是分割…

【Vue】路由的基本使用

文章目录 一、固定5个固定的步骤二、代码示例三、两个核心步骤四、完整代码 vue-router插件作用 修改地址栏路径时,切换显示匹配的组件 说明 Vue 官方的一个路由插件,是一个第三方包 官网 https://v3.router.vuejs.org/zh/ VueRouter的使用&#xff0…

【java基础】内部类

1、 非静态成员内部类可以访问所在类的全部方法和对象(就相当于一个对象方法(属于对象阶层和非静态方法同时加载在类加载之后)) 2、非静态成员内部类无法在该类(就是非静态成员内部类所在的类)的静态方法中…

期望18K,4年前端Cvte 视源股份一面挂

一面 1、自我介绍?毕业的时候一直在 xx 公司,你基本都在做什么项目? 2、你讲一下你主要负责哪一块的?balabala 3、你们的 json 是怎么定义组件间的联动的? 4、怎么确定区分两个 input? 5、你们是怎么触…

3. 使用tcpdump抓取rdma数据包

系列文章 第1章 多机多卡运行nccl-tests 和channel获取第2章 多机多卡nccl-tests 对比分析第3章 使用tcpdump抓取rdma数据包 目录 系列文章一、准备工作1. 源码编译tcpdump2. 安装wireshark 二、Tcpdump抓包三、Wireshark分析 一、准备工作 1. 源码编译tcpdump 使用 tcpdump…

基于web的垃圾分类回收系统的设计

管理员账户功能包括:系统首页,个人中心,管理员管理,用户管理,公告管理,运输管理,基础数据管理 用户账户功能包括:系统首页,个人中心,运输管理,公告…

基于Django的博客系统之用HayStack连接elasticsearch增加搜索功能(五)

上一篇:搭建基于Django的博客系统数据库迁移从Sqlite3到MySQL(四) 下一篇:基于Django的博客系统之增加类别导航栏(六) 功能概述 添加搜索框用于搜索博客。 需求详细描述 1. 添加搜索框用于搜索博客 描…

C语言 | Leetcode C语言题解之第133题克隆图

题目: 题解: struct Node** visited; int* state; //数组存放结点状态 0:结点未创建 1:仅创建结点 2:结点已创建并已填入所有内容void bfs(struct Node* s) {if (visited[s->val] && state[s->val] 2…

图片和PDF展示预览、并支持下载

需求 展示图片和PDF类型&#xff0c;并且点击图片或者PDF可以预览 第一步&#xff1a;遍历所有的图片和PDF列表 <div v-for"(data,index) in parerFont(item.fileInfo)" :key"index" class"data-list-item"><downloadCard :file-inf…

【面试八股总结】锁:互斥锁、自旋锁、读写锁、乐观锁、悲观锁

使用加锁操作和解锁操作可以解决并发线程/进程的互斥问题。任何想进入临界区的线程&#xff0c;必须先执行加锁操作。若加锁操作顺利通过&#xff0c;则线程可进入临界区&#xff1b;在完成对临界资源的访问后再执行解锁操作&#xff0c;以释放该临界资源。 一、互斥锁与自旋锁…

【C语言】详解函数(下)(庖丁解牛版)

文章目录 1. 前言2. 数组做函数形参3. 函数嵌套调用和链式访问3.1 嵌套调用3.2 链式访问 1. 前言 详解C语言函数(上)的链接&#xff1a;http://t.csdnimg.cn/EGsfe 经过对函数的初步了解之后,相信大家已经对C语言标准库里的函数已经有初步的认知了&#xff0c;并且还学会了如…

Linux系统Docker部署Apache Superset并实现远程访问详细流程

目录 前言 1. 使用Docker部署Apache Superset 1.1 第一步安装docker 、docker compose 1.2 克隆superset代码到本地并使用docker compose启动 2. 安装cpolar内网穿透&#xff0c;实现公网访问 3. 设置固定连接公网地址 前言 作者简介&#xff1a; 懒大王敲代码&#xff0…

x86国产化麒麟系统上安装docker及问题解决

以前感觉安装docker没有问题&#xff0c;所以没有记录怎么安装的&#xff0c;最近在国产化系统上安装docker总是失败&#xff0c;经过仔细研究完全解决了该问题&#xff0c;特此记录。 参考链接&#xff1a; 在 OpenKylin 上安装 Docker 按照上面的链接可以知道整个docker安装…

浏览器工作原理

主要分为导航、获取数据、HTML解析、css解析、执行javaScript、渲染树几个步骤。 1.导航 DNS查询 DNS服务器类似于电话簿&#xff0c;里面包含公共的IP地址以及相关主机名数据库&#xff0c;我们输入一个域名&#xff0c;他能帮我们映射到对应的IP地址。&#xff08;第一次查…

Latex之图片排列的简单使用(以MiKTeX工具为例)

一、参考资料 Latex如何插入图片 Latex 学术撰写工具推荐&#xff08;在线、Windows、Mac、Linux&#xff09; 关于Latex并排多张图片及加入图片说明的方法 二、准备工作 1. 在线LaTex工具 Overleaf 2. 本地LaTex工具 MiKTeX 3. 测试用例 \documentclass{article} \ti…