2.部署kafka:9092

官方文档:http://kafka.apache.org/documentation.html

(虽然kafka中集成了zookeeper,但还是建议使用独立的zk集群)

Kafka3台集群搭建环境:

操作系统: centos7

防火墙:全关

3台zookeeper集群内的机器,1台logstash

软件版本: zookeeper-3.4.12.tar.gz

软件版本kafka_2.12-2.1.0.tgz

安装软件

(3台zookeeper集群的机器)

# tar xf kafka_2.12-2.1.0.tgz -C /usr/local/

# ln -s /usr/local/kafka_2.12-2.1.0/ /usr/local/kafka

创建数据目录(3台)

# mkdir /data/kafka-logs

修改第一台配置文件

(注意不同颜色标记的部分)

# egrep -v "^$|^#" /usr/local/kafka/config/server.properties

broker.id=1 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样

listeners=PLAINTEXT://192.168.148.141:9092 #监听套接字

num.network.threads=3 #这个是borker进行网络处理的线程数

num.io.threads=8 #这个是borker进行I/O处理的线程数

socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能

socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘

socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小

log.dirs=/data/kafka-logs #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数

#如果配置多个目录,新创建的topic把消息持久化在分区数最少那一个目录中

num.partitions=1 #默认的分区数,一个topic默认1个分区数

num.recovery.threads.per.data.dir=1 #在启动时恢复日志和关闭时刷新日志时每个数据目录的线程的数量,默认1

offsets.topic.replication.factor=2

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天

message.max.byte=5242880 #消息保存的最大值5M

default.replication.factor=2 #kafka保存消息的副本数

replica.fetch.max.bytes=5242880 #取消息的最大字节数

log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件

log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间,到目录查看是否有过期的消息如果有,删除

zookeeper.connect=192.168.148.141:2181,192.168.148.142:2181,192.168.148.143:2181

zookeeper.connection.timeout.ms=6000

group.initial.rebalance.delay.ms=0

修改另外两台配置文件

#scp /usr/local/kafka/config/server.properties kafka-2:/usr/local/kafka/config/

broker.id=2

listeners=PLAINTEXT://192.168.148.142:9092

# scp /usr/local/kafka/config/server.properties kafka-3:/usr/local/kafka/config/

broker.id=3

listeners=PLAINTEXT://192.168.148.143:9092

启动kafka(3台)

[root@host1 ~]# /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &

查看启动情况(3台)

[root@host1 ~]# jps

10754 QuorumPeerMain

11911 Kafka

12287 Jps

创建topic来验证

[root@host1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.148.143:2181 --replication-factor 2 --partitions 1 --topic cien

出现Created topic "cien"验证成功运行

在一台服务器上创建一个发布者

[root@host2 ~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.148.141:9092 --topic cien

> hello kafka

> ni hao ya

>

在另一台服务器上创建一个订阅者

[root@host3 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.148.142:9092 --topic cien --from-beginning

...

hello kafka

ni hao ya

如果都能接收到,说明kafka部署成功!

[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper 192.168.10.23:2181 --list #查看所有topic

[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.10.23:2181 --topic qianfeng #查看指定topic的详细信息

Topic:qianfeng PartitionCount:1 ReplicationFactor:2 Configs:

Topic: qianfeng Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3

[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 192.168.10.23:2181 --topic qianfeng #删除topic

Topic qianfeng is marked for deletion.

Note: This will have no impact if delete.topic.enable is not set to true.

配置elfk集群订阅和zookeeper和kafka

配置第一台logstash生产消息输出到kafka

yum -y install wget

wget https://d6.injdk.cn/oraclejdk/8/jdk-8u341-linux-x64.rpm

yum localinstall jdk-8u341-linux-x64.rpm -y

java -version

1.安装logstash

tar xf logstash-6.4.1.tar.gz -C /usr/local

ln -s /usr/local/logstash-6.4.1 /usr/local/logstash

2.修改配置文件

cd /usr/local/logstash/config/

vim logstash.yml

http.host: "0.0.0.0"

3.编写配置文件

不要过滤, logstash会将message内容写入到队列中

# cd /usr/local/logstash/config/

# vim logstash-kafka.conf

input {file {type => "sys-log"path => "/var/log/messages"start_position => beginning}
}
output {kafka {bootstrap_servers => "192.168.148.141:9092,192.168.148.142:9092,192.168.148.143:9092"     #输出到kafka集群topic_id => "sys-log-messages"         #主题名称compression_type => "snappy"         #压缩类型codec =>  "json"}
}

启动logstash

# /usr/local/logstash/bin/logstash -f logstash-kafka.conf

在kafka上查看主题,发现已经有了sys-log-messages,说明写入成功了

[root@host2 ~]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper 192.168.148.141:2181 --list

__consumer_offsets

qianfeng

sys-log-messages

[root@host2 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.148.141:2181 --topic sys-log-messages

Topic:sys-log-messages PartitionCount:1 ReplicationFactor:2 Configs:

Topic: sys-log-messages Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2

配置第二台logstash,订阅kafka日志,输出到es集群

# cat kafka-es.conf

input {kafka {bootstrap_servers => "192.168.148.141:9092,192.168.148.142:9092,192.168.148.143:9092" topics => "sys-log-messages"          #kafka主题名称codec => "json"auto_offset_reset => "earliest"}
}output {elasticsearch {hosts => ["192.168.148.131:9200","192.168.148.132:9200"]index => "kafka-%{type}-%{+YYYY.MM.dd}"}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/24077.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

IO进程 day05

IO进程 day05 9. 进程9. 9. 守护进程守护进程的特点守护进程创建步骤 10. 线程10.1. 线程的概念10.2. 进程和线程的区别10.2. 线程资源10.3. 线程的函数接口1. pthread_create-创建线程线程函数和普通函数的区别 2. pthread_exit3.线程资源回收函数join和detach的区别 获取线程…

数字IC低功耗后端设计实现之power gating和isolation技术

考虑低功耗设计需求,下图中间那个功能模块是需要做power domain的,即这个模块需要插MTCMOS。需要开启时,外面的VDD会和这个模块的LOCAL VDD形成通路,否则就是断开即power off状态。 这些低功耗设计实现经验,你真的懂了…

使用 Open3D 批量渲染并导出固定视角点云截图

一、前言 在三维点云处理与可视化中,固定视角批量生成点云渲染截图是一个常见的需求。例如,想要将同一系列的点云(PCD 文件)在同样的视角下生成序列图片,以便后续合成为视频或进行其他可视化演示。本文将介绍如何使用…

c++的继承

封装、继承和多态是c的三大特性,他们的关系甚为紧密 封装的概念简单易懂,其实就是将数据和操作数据的方法结合在一起,形成一个独立的单元(类),通过访问控制符(如private、protected和public&…

3dtiles平移旋转工具制作

3dtiles平移旋转缩放原理及可视化工具实现 背景 平时工作中,通过cesium平台来搭建一个演示场景是很常见的事情。一般来说,演示场景不需要多完善的功能,但是需要一批三维模型搭建,如厂房、电力设备、园区等。在实际搭建过程中&…

我是如何从 0 到 1 找到 Web3 工作的?

作者:Lotus的人生实验 关于我花了一个月的时间,从 0 到 1 学习 Web3 相关的知识和编程知识。然后找到了一个 Web3 创业公司实习的远程工作。 👇👇👇 我的背景: 计算机科班,学历还可以(大厂门槛水平) 毕业工…

进程状态(R|S|D|t|T|X|Z)、僵尸进程及孤儿进程

文章目录 一.进程状态进程排队状态:运行、阻塞、挂起 二.Linux下的进程状态R 运行状态(running)S 睡眠状态(sleeping)D 磁盘休眠状态(Disk sleep)t 停止、暂停状态(tracing stopped)T 停止、暂停状态(stopp…

为什么要将PDF转换为CSV?CSV是Excel吗?

在企业和数据管理的日常工作中,PDF文件和CSV文件承担着各自的任务。PDF通常用于传输和展示静态的文档,而CSV因其简洁、易操作的特性,广泛应用于数据存储和交换。如果需要从PDF中提取、分析或处理数据,转换为CSV格式可能是一个高效…

Starlink卫星动力学系统仿真建模第十讲-基于SMC和四元数的卫星姿态控制示例及Python实现

基于四元数与滑模控制的卫星姿态控制 一、基本原理 1. 四元数姿态表示 四元数运动学方程: 3. 滑模控制设计 二、代码实现(Python) 1. 四元数运算工具 import numpy as npdef quat_mult(q1, q2):"""四元数乘法""…

CSS—引入方式、选择器、复合选择器、文字控制属性、CSS特性

目录 CSS 1.引入方式 2.选择器 3.复合选择器 4.文字控制属性 5.CSS特性 CSS 层叠样式表,是一种样式表语言,用来描述HTML文档的呈现 书写时一般按照顺序:盒子模型属性—>文字样式—>圆角、阴影等修饰属性 1.引入方式 引入方式方…

OpenHarmony-4.基于dayu800 GPIO 实践(2)

基于dayu800 GPIO 进行开发 1.DAYU800开发板硬件接口 LicheePi 4A 板载 2x10pin 插针,其中有 16 个原生 IO,包括 6 个普通 IO,3 对串口,一个 SPI。TH1520 SOC 具有4个GPIO bank,每个bank最大有32个IO:  …

win11 24h2 远程桌面 频繁断开 已失去连接 2025

一、现象 Windows11自升级2025年2月补丁后版本号为系统版本是26100.3194,远程桌面频繁断开连接,尝试连接,尤其在连接旧的server2012 二、临时解决方案 目前经测试,在组策略中,远程桌面连接客户端,关闭客户…

rust学习笔记6-数组练习704. 二分查找

上次说到rust所有权看看它和其他语言比有什么优势,就以python为例 # Python3 def test():a [1, 3, -4, 7, 9]print(a[4])b a # 所有权没有发生转移del b[4]print(a[4]) # 由于b做了删除,导致a再度访问报数组越界if __name__ __main__:test() 运行结…

Windows安装NVIDIA显卡CUDAD调用GPU,适用于部署deepseek r1

显卡、显卡驱动、CUDA之间的关系 显卡:(GPU),主流是NVIDIA的GPU,因为深度学习本身需要大量计算。GPU的并行计算能力,在过去几年里恰当地满足了深度学习的需求。AMD的GPU基本没有什么支持,可以不…

基于无人机遥感的烟株提取和计数研究

一.研究的背景、目的和意义 1.研究背景及意义 烟草作为我国重要的经济作物之一,其种植面积和产量的准确统计对于烟草产业的发展和管理至关重要。传统的人工烟株计数方法存在效率低、误差大、难以覆盖大面积烟田等问题,已无法满足现代烟草种植管理的需求…

《深度学习实战》第3集:循环神经网络(RNN)与序列建模

第3集:循环神经网络(RNN)与序列建模 引言 在深度学习领域,处理序列数据(如文本、语音、时间序列等)是一个重要的研究方向。传统的全连接网络和卷积神经网络(CNN)难以直接捕捉序列中…

【前沿探索篇七】【DeepSeek自动驾驶:端到端决策网络】

第一章 自动驾驶的"感官革命":多模态神经交响乐团 1.1 传感器矩阵的量子纠缠 我们把8路摄像头+4D毫米波雷达+128线激光雷达的融合称为"传感器交响乐",其数据融合公式可以简化为: def sensor_fusion(cam, radar, lidar):# 像素级特征提取 (ResNet-152…

可狱可囚的爬虫系列课程 13:Requests使用代理IP

一、什么是代理 IP 代理 IP(Proxy IP)是一个充当“中间人”的服务器IP地址,用于代替用户设备(如电脑、手机等)直接与目标网站或服务通信。用户通过代理IP访问互联网时,目标网站看到的是代理服务器的IP地址&…

https:原理

目录 1.数据的加密 1.1对称加密 1.2非对称加密 2.数据指纹 2.1数据指纹实际的应用 3.数据加密的方式 3.1只使用对称加密 3.2只使用非对称加密 3.3双方都使用对称加密 3.4非对称加密和对称加密一起使用 4.中间人攻击 5.CA证书 5.1什么是CA证书 CA证书的验证 6.https的原理 1.数据…

Github项目管理之 其余分支同步main分支

文章目录 方法:通过 Pull Request 同步分支1. **创建一个从 main 到目标分支的 Pull Request**2. **合并 Pull Request** 注意事项总结 在 GitHub 网页上,你可以通过 Pull Request 的方式将一个分支(例如 main 分支)的修改同步到…