cd /opt/software
把kafka压缩包拉进来 并解压到/usr下
tar -xzvf /opt/software/kafka_2.12-2.7.0.tgz -C /usr/
改名
mv /usr/kafka_2.12-2.7.0/ /usr/kafka
配置环境变量
vim /etc/profile大写G定位到最后一行 在 o 在下一行添加kefka环境变量export JAVA_HOME=/usr/java/jdk1.8.0_151export KAFKA_HOME=/usr/kafkaexport PATH=$PATH:$JAVA_HOME/bin:$KAFKA_HOME/bin
让其生效
source /etc/profile
建一个logs目录,下面配置日志放在这里
cd /usr/kafka && mkdir logs
修改kafka/config/server.properties
vim /usr/kafka/config/server.properties#每一台机子的唯一识别
broker.id=0
#topic删除策略 把消息放在kafka要先建topic,把消息放在topic,如果没有这句话就是假删除(一旦公司收集到你的信息,就不会轻易删掉,java代码中会设置个字段让你查不到,他的数据库还会有你的信息,有了这句话就是真删除)
delete.topic.enable=true
#这里去公司了让你做优化可能会用到这里,最多用三个线程处理请求,只要搞不死服务器,线程越大越好
num.network.threads=3
#向磁盘上读写数据 八个磁盘io处理数据
num.io.threads=8
#kafka接受消息,存消息一类一类存用topic,消息凑够一批(一个一个存太慢)放在topic,凑够这一批的地方叫缓存区buffer,放在缓存里面,达到设置的发送缓存标准,就发送给类topic
#发送的缓存区 够了就发送
socket.send.buffer.bytes=102400
#接收的缓存区 够了就接收
socket.receive.buffer.bytes=102400
#请求的缓冲区最大值,超过这个会报OOM异常
socket.request.max.bytes=104857600
#存日志的地方,上面建了个logs目录
log.dirs=/usr/kafka/logs
#把类topic分区
num.partitions=1
#恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
#消息保存一周,就删除
log.retention.hours=168
#大于3G就删,独立于log.retention.hours的功能
#log.retention.bytes=3*1073741824
#超过1G就创建新的 不用上面的(3*1073741824)无限大
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
#配置连接Zookeeper集群地址
zookeeper.connect=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
#连zk连了十八秒还连不上就不连了
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0#保存退出
:wq
配置连接Zookeeper集群地址
vim /etc/hosts192.168.58.91 kafka1
192.168.58.92 kafka2
192.168.58.93 kafka3
192.168.58.81 zookeeper-1
192.168.58.82 zookeeper-2
192.168.58.83 zookeeper-3#保存退出
:wq
修改生产者配置
vim /usr/kafka/config/producer.propertiesbootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
#消息太多压缩,这里是none指定了不对数据进行任何压缩,可选其他压缩方式(gzip, snappy, lz4, zstd)
compression.type=none#保存退出
:wq
修改消费者配置
vim /usr/kafka/config/consumer.propertieskafka1:9092,kafka2:9092,kafka3:9092
#组名称,一个组的消费者组名一样
group.id=test-consumer-group:wq
kafka1配好了,准备把配好的发射给kafka2,kafka3 ,但是有的还得改
生成 SSH 密钥对
#这里为了方便不设盐值,直接回车到底
ssh-keygen
将 SSH 公钥复制到远程服务器
ssh-copy-id kafka1yes
123456ssh-copy-id kafka2yes
123456ssh-copy-id kafka3yes
123456
发射 因为是目录里面有东西 所以要 -r 递归发射
scp -r /usr/kafka/ kafka2:/usr/scp -r /usr/kafka/ kafka3:/usr/
再次发射
scp /etc/profile/ kafka2:/etc/scp /etc/profile/ kafka3:/etc/在all Sessions 里面 ,查看是否添加成功
tail -5 /etc/profile在all Sessions 里面 让环境变量生效
source /etc/profile
再再发射
scp /etc/hosts kafka2:/etc
scp /etc/hosts kafka3:/etc
kafka2,kafka3
vim /usr/kafka/config/server.properties进去后把broker.id=0改改,三台机子不能一样kafka1 : broker.id=0kafka2 : broker.id=1kafka3 : broker.id=2 随便改只要不一样,是正数就行
在zookeeper-1 ,zookeeper-2 ,zookeeper-3 vim /etc/hosts 里面添加
192.168.58.91 kafka1
192.168.58.92 kafka2
192.168.58.93 kafka3
启动kafka之前一定要保证zk在启动,并且可用
启动zk
./shell/zk_start_stop.sh#连接
zkCli.sh
启动kafka 在kafka1,kafka2,kafka3 运行下面两个命令
#启动kafka
kafka-server-start.sh -daemon /usr/kafka/config/server.properties#查看是否运行
jps
查看当前服务器中topic主题 发现没有
kafka-topics.sh --zookeeper zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181 --list
创建topic: list
#--zookeeper zookeeper-1:2181指定了ZooKeeper服务器的地址和端口
#--create创建一个新的主题
#--replication-factor 3设置了主题的副本因子,Kafka集群中至少需要有3个可用的broker(Kafka代理),不能大于zookeeper机子数量
#--partitions 5 主题的分区数量
#--topic tp1 主题的名称
kafka-topics.sh --zookeeper zookeeper-1:2181 --create --replication-factor 3 --partitions 5 --topic tp1在查topic主题kafka-topics.sh --zookeeper zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181 --list
在kafka1上 模拟消费者
kafka-console-consumer.sh --bootstrap-server kafka1:9092 --from-beginning --topic tp1
在kafka2上 模拟生产者
kafka-console-producer.sh --broker-list kafka2:9092 --topic tp1
生产完后,kafka1上 消费者那里就会有
--------------------------------------------
删除topic:
kafka-topics.sh --zookeeper cluster1:2181 --delete --topic tp1
----------------------------------------------------------------------------------------------------------------
如果你有错误,可以去看日志文件
cat /usr/kafka/logs/kafkaServer.out