这套配置适用于TCP代理和公网访问,kafka版本2.8,版本如果不同配置参数会有一些差异,原理一致
分几种场景,正常来说我们直接使用kafka IP地址访问就行,考虑到网络架构和环境安全,需要使用公网或代理访问kafka时就需要对kafka进行一些额外配置
EXTERNAL这个地址需要监听本地地址,之后kafka会监听这个端口,此端口不能和INTERNAL端口一一样,用9093或者其他
advertised.listeners已经测试了NLB的连接,这里需要获取到NLB IP,DNS我没有测试,DNS域名太长
EXTERNAL可以配置多个,但一般很少这么用的
如果只是调试,暴露一个节点就行,如果需要暴露整个集群,那就在其他节点上也进行一样的配置
broker.id=15
delete.topic.enable=true
# 不需要外部访问时的配置
#inter.broker.listener.name=SASL_PLAINTEXT
#listeners=SASL_PLAINTEXT://10.198.170.25:9092
# 监听器和安全配置
listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
advertised.listeners=INTERNAL://10.198.170.15:9092,EXTERNAL://NLB或公网IP:9093
listener.security.protocol.map=INTERNAL:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT
inter.broker.listener.name=INTERNAL
如果只配置一个节点,其他节点也需要按照如下进行配置,只是一个节点拥有公网IP
1、监听器名称需要统一:
SASL_PLAINTEXT 需要是INTERNAL
inter.broker.listener.name 也要对应修改
2、需要添加的配置:
listener.security.protocol.map
advertised.listeners
3、格式统一:
listeners 使用 0.0.0.0 而不是具体 IP
advertised.listeners 使用具体 IP
其他节点配置参考,和主节点配置完全一样
# 外部监听器配置
listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
advertised.listeners=INTERNAL://10.198.170.26:9092,EXTERNAL://NLB或公网IP:9093
listener.security.protocol.map=INTERNAL:SASL_PLAINTEXT
inter.broker.listener.name=INTERNAL
完整配置
如果出现了无法监听的问题,考虑将0.0.0.0:9092改为10.198.170.25:9092
broker.id=25
delete.topic.enable=true# 监听器和安全配置
#inter.broker.listener.name=SASL_PLAINTEXT
#listeners=SASL_PLAINTEXT://10.198.170.25:9092
listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
advertised.listeners=INTERNAL://10.198.170.25:9092,EXTERNAL://175.41.173.200:9093
listener.security.protocol.map=INTERNAL:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT
inter.broker.listener.name=INTERNAL
# 认证相关配置
security.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256,PLAIN
allow.everyone.if.no.acl.found=true
super.users=User:admin
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer# Kafka Basic Settings
num.network.threads=5
num.io.threads=8
socket.send.buffer.bytes=10240000
socket.receive.buffer.bytes=10240000
socket.request.max.bytes=1048576000
log.dirs=/data/kafka/data
num.partitions=1
default.replication.factor=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.198.170.27:2181,10.198.170.26:2181,10.198.170.25:2181
zookeeper.connection.timeout.ms=12000
zookeeper.session.timeout.ms=12000
group.initial.rebalance.delay.ms=500
log.flush.interval.messages=10000
log.flush.interval.ms=1000
num.replica.fetchers=3
replica.fetch.min.bytes=1
replica.fetch.max.bytes=104857600
unclean.leader.election.enable=false
auto.create.topics.enable=true
min.isync.replicas=2
replica.socket.receive.buffer.bytes=65536
replica.socket.timeout.ms=30000
replica.lag.time.max.ms=5000
replica.fetch.wait.max.ms=1000
log.message.timestamp.type=LogAppendTime
log.cleanup.policy=delete
log.roll.hours=168
broker.rack=kafka-rac25
message.max.bytes=10000000
request.timeout.ms=30000
错误处理
1、集群节点监听器不一致导致无法load数据
25配置了两个监听器 ,26 27配置了一个监听器,这种情况就是26 27同步25的配置
[2025-01-02 19:53:48,249] ERROR [MetadataCache brokerId=25] Listeners are not identical across brokers: LongMap(26 -> Map(ListenerName(INTERNAL) -> 10.198.170.26:9092 (id: 26 rack: null)), 27 -> Map(ListenerName(INTERNAL) -> 10.198.170.27:9092 (id: 27 rack: null)), 25 -> Map(ListenerName(EXTERNAL) -> 175.41.173.200:9093 (id: 25 rack: null), ListenerName(INTERNAL) -> 10.198.170.25:9092 (id: 25 rack: null))) (kafka.server.ZkMetadataCache)
[2025-01-02 19:53:48,340] ERROR [MetadataCache brokerId=25] Listeners are not identical across brokers: LongMap(26 -> Map(ListenerName(INTERNAL) -> 10.198.170.26:9092 (id: 26 rack: null)), 27 -> Map(ListenerName(INTERNAL) -> 10.198.170.27:9092 (id: 27 rack: null)), 25 -> Map(ListenerName(EXTERNAL) -> 175.41.173.200:9093 (id: 25 rack: null), ListenerName(INTERNAL) -> 10.198.170.25:9092 (id: 25 rack: null))) (kafka.server.ZkMetadataCache)
2、集群配置有问题,集群节点无法同步
可用的broker就一个,应该只有本机是可用的,无法联系到其他broker,确认集群节点都监听了同一个监听器,每台节点上都查看下日志,确认kafka端口正常启动tail -100f /data/kafka/logs/server.log
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.
[2025-01-02 20:24:46,117] INFO [Admin Manager on Broker 15]: Error processing create topic request CreatableTopic(name='__consumer_offsets', numPartitions=50, replicationFactor=3, assignments=[], configs=[CreateableTopicConfig(name='compression.type', value='producer'), CreateableTopicConfig(name='cleanup.policy', value='compact'), CreateableTopicConfig(name='segment.bytes', value='104857600')]) (kafka.server.ZkAdminManager)
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.
3、EXTERNAL监听器未配置安全协议
监听的安全协议里面加上EXTERNAL
listener.security.protocol.map=INTERNAL:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT
[2025-01-02 20:26:23,988] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.lang.IllegalArgumentException: Error creating broker listeners from 'INTERNAL://10.198.170.16:9092,EXTERNAL://公网IP或者NLB:9093': No security protocol defined for listener EXTERNAL
已经测试两个集群,访问没有问题,NLB同理