腾讯服务器单机版 kafka 3.7 安装

1.Kafka是什么

Kafka是Apache开源的一款基于zookeeper协调的分布式消息系统,具有高吞吐率、高性能、实时、高可靠等特点,可实时处理流式数据。它最初由LinkedIn公司开发,使用Scala语言编写。 Kafka历经数年的发展,从最初纯粹的消息引擎,到近几年开始在流处理平台生态圈发力,多个组织或公司发布了各种不同特性的产品。常见产品如下: <font color="red">**Apache Kafka** :最“正统”的Kafka`也是开源版,它是后面其他所有发行版的基础。

- <font color="red">**Apache Kafka** :最“正统”的`Kafka`也是开源版,它是后面其他所有发行版的基础</font>。
- Cloudera/Hortonworks Kafka :集成了目前主流的大数据框架,能够帮助用户实现从分布式存储、集群调度、流处理到机器学习、实时数据库等全方位的数据处理。
- Confluent Kafka :主要提供基于`Kafka`的企业级流处理解决方案。`Apache Kafka`,它现在依然是开发人数最多、版本迭代速度最快的`Kafka`

kafka 架构图

在这里插入图片描述

1.1Kafka能干嘛

特点

- 高吞吐量、低延迟:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息,它的延迟最低只有几毫秒- 持久性:支持消息持久化,即使数TB级别的消息也能够保持长时间的稳定性能。- 可靠性:支持数据备份防止丢失- 容错性:支持通过Kafka服务器和消费机集群来分区消息,允许集群中的节点失败(若分区副本数量为n,则允许n-1个节点失败)
- 高并发:单机可支持数千个客户端同时读写,支持在线水平扩展。可无缝对接hadoop、strom、spark等,支持Hadoop并行数据加载,

Kafka下载

kafka官网https://kafka.apache.org/
kafka下载https://kafka.apache.org/downloads

2. kakfa 安装

2.1 kafka 解压

tar  -zxvf  kafka_2.12-3.7.0.tgzcd kafka_2.12-3.7.0/

2.2 配置zookeeper.properties

#1.创建一个目录
mkdir zookeeper-data
#3.修改文件路径为:
dataDir=/opt/software/kafka/kafka_2.12-3.7.0/zookeeper-data

完整配置

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/opt/software/kafka/kafka_2.12-3.7.0/zookeeper-data
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080

2.3 配置 kafak 的 server.properties

配置kafka_2.12-3.7.0/config下的“server.properties”:
修改log.dirs和zookeeper.connect。前者是日志存放文件夹,后者是zookeeper连接地址(端口和clientPort保持一致)

创建一个目录:
mkdir kafka-logs修改配置:listeners=PLAINTEXT://0.0.0.0:9092# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://ip:9092
ip  为外网ip#日志目录log.dirs=/opt/software/kafka/kafka_2.12-3.7.0/kafka-logs
#zookeeper连接地址
zookeeper.connect=ip:2181ip  为外网ip

2.3.1 全部配置文件

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.#
# This configuration file is intended for use in ZK-based mode, where Apache ZooKeeper is required.
# See kafka.server.KafkaConfig for additional details and defaults
############################## Server Basics ############################## The id of the broker. This must be set to a unique integer for each broker.
broker.id=0############################# Socket Server Settings ############################## The address the socket server listens on. If not configured, the host name will be equal to the value of
# java.net.InetAddress.getCanonicalHostName(), with PLAINTEXT listener name, and port 9092.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://0.0.0.0:9092# Listener name, hostname and port the broker will advertise to clients.
# If not set, it uses the value for "listeners".
#advertised.listeners=PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://ip:9092# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600############################# Log Basics ############################## A comma separated list of directories under which to store log files
log.dirs=/opt/software/kafka/kafka_2.12-3.7.0/kafka-logs# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1############################# Log Flush Policy ############################## Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000############################# Log Retention Policy ############################## The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
#log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000############################# Zookeeper ############################## Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=ip:2181# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000############################# Group Coordinator Settings ############################## The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

3. 启动

3.1 zookeeper启动

# 后台启动zookeeper,指定启动日志
nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties > ./zookeeper-run.log 2>&1 &

3.2 后台启动kafka,指定启动日志

# 后台启动kafka,指定启动日志
nohup ./bin/kafka-server-start.sh ./config/server.properties > ./kafka-run.log 2>&1 &

4.测试使用

4.1 创建主题:

bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1

4.2 生产消息:

bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092

4.3 消费消息

bin/kafka-console-consumer.sh --topic test --bootstrap-server localhost:9092 --from-beginning

5.停止服务

5.1 kafka 停止

bin/kafka-server-stop.sh 

5.2 zookeeper 停止

bin/zookeeper-server-stop.sh  

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/388911.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL:QEP 查询执行计划

QEP QEP 是指查询执行计划&#xff08;Query Execution Plan&#xff09;&#xff0c;它是由数据库系统在执行查询时生成的一组操作指令。这些指令定义了查询的具体执行方式&#xff0c;包括涉及哪些表、使用哪些索引、以及哪些算法、操作符等。 查询执行计划是数据库查询优化…

IT运维管理与ITSM:理论与实践

IT运维管理和IT服务管理&#xff08;ITSM&#xff09;在现代企业信息化过程中占据着举足轻重的地位。它们不仅是确保IT系统稳定运行和业务连续性的关键&#xff0c;还是推动企业数字化转型、提升竞争力的重要力量。本文将结合《IT运维管理和ITSM》文档的内容&#xff0c;深入探…

搭建日志系统ELK(二)

搭建日志系统ELK(二) 架构设计 在搭建以ELK为核心的日志系统时&#xff0c;Logstash作为日志采集的核心组件&#xff0c;负责将各个服务的日志数据采集、清洗、过滤。然而缺点也很明显&#xff1a; 占用较多的服务器资源。配置复杂&#xff0c;学习曲线陡峭。处理大数据量时…

FPGA开发——呼吸灯的另一种实现方式

一、概述 关于呼吸灯的设计其实在前面的文章中就已经提到过&#xff0c;这篇文章更多的针对前面的实现方式进行一个改良。在前面的呼吸灯的设计使用的是us、ms、s的三级计数器进行功能实现&#xff0c;这种实现方法应该是我们最后理解也是最常用的方式。但经过蜂鸣器的音乐播放…

2024第18届中国西部体育博览会诚邀代理招展

2024第18届中国西部体育博览会诚邀代理招展 2024第18届中国西部体育博览会诚邀全国各关联商会、协会&#xff0c;联盟、各专业会展公司、各关联产业园区、各关联网站报纸杂志及平台等资源方组团参展&#xff0c;组委会将给予最优惠的代理招展政策&#xff0c;群策群力共同把中…

2024年音频剪辑必备:五大最佳音频编辑软件精选!

在数字时代&#xff0c;音频剪辑已成为创意表达的重要工具。无论是音乐制作、播客编辑还是视频后期&#xff0c;一款优秀的音频剪辑软件都是不可或缺的。推荐五款备受推崇的音频剪辑工具。 福昕音频剪辑 链接&#xff1a;https://www.foxitsoftware.cn/audio-clip/ 福昕音频…

关于DynamoRIO处理多线程程序时候的问题

&#x1f3c6;本文收录于《CSDN问答解惑-专业版》专栏&#xff0c;主要记录项目实战过程中的Bug之前因后果及提供真实有效的解决方案&#xff0c;希望能够助你一臂之力&#xff0c;帮你早日登顶实现财富自由&#x1f680;&#xff1b;同时&#xff0c;欢迎大家关注&&收…

Java数据结构(五)——栈和队列

文章目录 栈和队列栈基本概念栈的模拟实现集合框架中的栈栈的创建栈的方法栈的遍历 栈的应用及相关练习括号匹配逆波兰表达式求值出栈入栈次序匹配最小栈 几个含"栈"概念的区分 队列基本概念队列的模拟实现循环队列双端队列集合框架中的队列队列的创建队列的方法队列…

数据结构(邓俊辉)学习笔记】词典 01—— 散列

文章目录 1. 从服务到电话2. 循值访问3. 数组4. 原理5. 散列6. 冲突 1. 从服务到电话 现在进入新的一章词典。将学习实现词典 adt 的重要技术&#xff0c;也就是散列。我们将看到散列实际上并不是一种简单的技术&#xff0c;从某种意义上讲&#xff0c;它甚至是一种思想&#x…

记录一次环境的安装

目录 新添加的代码 代码解释 为啥ubuntu用debian软件源 为啥修改sources.list.d S权限意思 php缺少和数据库连接的模块 使用root登陆数据库1698错误 字段解释 auth_socket解释 使用root登陆数据库方法 详细解释 首先在安装的时候&#xff0c;有一个dockerfile文件&a…

day 18流的定位、文件IO以及Linux系统中时间的获取

流的定位 偏移量&#xff1a;读和写都在偏移量的位置进行 文件IO 相对于标准IO来说&#xff0c;文件IO直接在Linux的内核中操作&#xff0c;也更加的简洁精炼 对文件的操作也是三个部分 1.打开文件 open 2.读写文件 read write 3.关闭文件 close 还有一些其他的函数接口…

vue3 命令运行窗口暴露网络地址,以及修改端口号

一般情况下这里的地址是隐藏的 这里加上 --host 可以暴露网络地址&#xff0c;再加上--port --8080 就可以将端口号修改为8080&#xff08;修改后边的数字就可以修改为你想要的端口号&#xff09;

linux安装配置jdk

①下载jdk安装包&#xff0c;放在/opt/app/software/java下 cd /opt/app/software/java②进行解压操作 tar -zxvf jdk-8u251-linux-x64.tar.gz③解压完成之后&#xff0c;进行环境变量的配置&#xff0c;shell下执行 vi ~/.bash_profile根据jdk的安装目录&#xff0c;加入 …

【C++】学习笔记——智能指针

文章目录 二十一、智能指针1. 内存泄漏2. 智能指针的使用及原理RAII智能指针的原理auto_ptrunique_ptrshared_ptrshared_ptr的循环引用weak_ptr删除器 未完待续 二十一、智能指针 1. 内存泄漏 在上一章的异常中&#xff0c;我们了解到如果出现了异常&#xff0c;会中断执行流…

LocalDateTime计算两个时间之间的间隔

LocalDateTime计算两个时间之间的间隔 嘚吧嘚LocalDateTimeLocalDateLocalTime 嘚吧嘚 自从认识了LocalDateTime之后&#xff0c;使用的频率越来越高了&#xff0c;使用多了就不可避免的涉及到日期的比较、加减以及计算日期间隔这些操作。 但是我发现自己好像不会&#x1f605…

2024年钉钉杯大学生大数据挑战赛倒计时,最后冲刺

2024第三届钉钉杯大学生大数据挑战赛倒计时&#xff0c;小编给大家带来非常实用的最后冲刺助力【A题】&#xff0c;&#xff08;看图资料预览&#xff09;&#xff1a; 中国烟草行业作为国家税收和财政收入的重要支柱&#xff0c;近年来销售收入持续增长。国家对此实行严格的专…

一键测量仪,能否彻底解决燃气灶配件缺陷问题?

燃气灶配件是指用于燃气灶的附件或零部件&#xff0c;用于安装、维护或改进燃气灶的功能和性能。这些配件通常包括各种零部件、附件和替换件&#xff0c;以确保燃气灶的正常运行和安全使用。燃气灶的火焰头是产生火焰的部件&#xff0c;通常根据不同的燃气类型和火力需求选择合…

ETL数据集成丨快速将MySQL数据迁移至Doris数据库

随着大数据技术的迅速发展&#xff0c;越来越多的企业开始寻求高效、灵活的数据存储与分析解决方案。Apache Doris&#xff08;原名 Palo&#xff09;作为一款高性能的MPP&#xff08;大规模并行处理&#xff09;分析型数据库&#xff0c;凭借其在OLAP场景下的卓越表现&#xf…

Minio多主机分布式 docker-compose 集群部署

参考 docker-compose搭建多主机分布式minio - 会bk的鱼 - 博客园 (cnblogs.com) 【运维】docker-compose安装minio集群-CSDN博客 Minio 是个基于 Golang 编写的开源对象存储套件&#xff0c;虽然轻量&#xff0c;却拥有着不错的性能 中文地址&#xff1a;MinIO | 用于AI的S3 …

SYD88xx代码复位不成功和解决办法

原来的复位代码如下: void ota_manage(void){#ifdef _OTA_if(ota_state){switch(ota_state){case 1 : #if defined(_DEBUG_) || defined(_SYD_RTT_DEBUG_)dbg_printf("start FwErase\r\n");#endifCmdFwErase();#if defined(_DEBUG_) || defined(_SYD_RTT_DEBUG_)db…