官方文档:http://kafka.apache.org/documentation.html
(虽然kafka中集成了zookeeper,但还是建议使用独立的zk集群)
Kafka3台集群搭建环境:
操作系统: centos7
防火墙:全关
3台zookeeper集群内的机器,1台logstash
软件版本: zookeeper-3.4.12.tar.gz
软件版本kafka_2.12-2.1.0.tgz
安装软件
(3台zookeeper集群的机器)
# tar xf kafka_2.12-2.1.0.tgz -C /usr/local/
# ln -s /usr/local/kafka_2.12-2.1.0/ /usr/local/kafka
创建数据目录(3台)
# mkdir /data/kafka-logs
修改第一台配置文件
(注意不同颜色标记的部分)
# egrep -v "^$|^#" /usr/local/kafka/config/server.properties
broker.id=1 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
listeners=PLAINTEXT://192.168.148.141:9092 #监听套接字
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
log.dirs=/data/kafka-logs #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数
#如果配置多个目录,新创建的topic把消息持久化在分区数最少那一个目录中
num.partitions=1 #默认的分区数,一个topic默认1个分区数
num.recovery.threads.per.data.dir=1 #在启动时恢复日志和关闭时刷新日志时每个数据目录的线程的数量,默认1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本数
replica.fetch.max.bytes=5242880 #取消息的最大字节数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间,到目录查看是否有过期的消息如果有,删除
zookeeper.connect=192.168.148.141:2181,192.168.148.142:2181,192.168.148.143:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
修改另外两台配置文件
#scp /usr/local/kafka/config/server.properties kafka-2:/usr/local/kafka/config/
broker.id=2
listeners=PLAINTEXT://192.168.148.142:9092
# scp /usr/local/kafka/config/server.properties kafka-3:/usr/local/kafka/config/
broker.id=3
listeners=PLAINTEXT://192.168.148.143:9092
启动kafka(3台)
[root@host1 ~]# /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &
查看启动情况(3台)
[root@host1 ~]# jps
10754 QuorumPeerMain
11911 Kafka
12287 Jps
创建topic来验证
[root@host1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.148.143:2181 --replication-factor 2 --partitions 1 --topic cien
出现Created topic "cien"验证成功运行
在一台服务器上创建一个发布者
[root@host2 ~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.148.141:9092 --topic cien
> hello kafka
> ni hao ya
>
在另一台服务器上创建一个订阅者
[root@host3 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.148.142:9092 --topic cien --from-beginning
...
hello kafka
ni hao ya
如果都能接收到,说明kafka部署成功!
[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper 192.168.10.23:2181 --list #查看所有topic
[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.10.23:2181 --topic qianfeng #查看指定topic的详细信息
Topic:qianfeng PartitionCount:1 ReplicationFactor:2 Configs:
Topic: qianfeng Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3
[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 192.168.10.23:2181 --topic qianfeng #删除topic
Topic qianfeng is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
配置elfk集群订阅和zookeeper和kafka
配置第一台logstash生产消息输出到kafka
yum -y install wget
wget https://d6.injdk.cn/oraclejdk/8/jdk-8u341-linux-x64.rpm
yum localinstall jdk-8u341-linux-x64.rpm -y
java -version
1.安装logstash
tar xf logstash-6.4.1.tar.gz -C /usr/local
ln -s /usr/local/logstash-6.4.1 /usr/local/logstash
2.修改配置文件
cd /usr/local/logstash/config/
vim logstash.yml
http.host: "0.0.0.0"
3.编写配置文件
不要过滤, logstash会将message内容写入到队列中
# cd /usr/local/logstash/config/
# vim logstash-kafka.conf
input {file {type => "sys-log"path => "/var/log/messages"start_position => beginning} } output {kafka {bootstrap_servers => "192.168.148.141:9092,192.168.148.142:9092,192.168.148.143:9092" #输出到kafka集群topic_id => "sys-log-messages" #主题名称compression_type => "snappy" #压缩类型codec => "json"} }
启动logstash
# /usr/local/logstash/bin/logstash -f logstash-kafka.conf
在kafka上查看主题,发现已经有了sys-log-messages,说明写入成功了
[root@host2 ~]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper 192.168.148.141:2181 --list
__consumer_offsets
qianfeng
sys-log-messages
[root@host2 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.148.141:2181 --topic sys-log-messages
Topic:sys-log-messages PartitionCount:1 ReplicationFactor:2 Configs:
Topic: sys-log-messages Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2
配置第二台logstash,订阅kafka日志,输出到es集群
# cat kafka-es.conf
input {kafka {bootstrap_servers => "192.168.148.141:9092,192.168.148.142:9092,192.168.148.143:9092" topics => "sys-log-messages" #kafka主题名称codec => "json"auto_offset_reset => "earliest"} }output {elasticsearch {hosts => ["192.168.148.131:9200","192.168.148.132:9200"]index => "kafka-%{type}-%{+YYYY.MM.dd}"} }