Kafka 快速实战及基本原理详解解析-01

一、Kafka 介绍

1. MQ 的作用

消息队列(Message Queue,简称 MQ)是一种用于跨进程通信的技术,核心功能是通过异步消息的方式实现系统之间的解耦。它在现代分布式系统中有着广泛的应用,主要作用体现在以下三个方面:

异步处理

在传统的同步调用中,生产者和消费者需要同时在线,并且生产者在完成任务后才能继续执行其他工作。这种模式限制了系统的性能。而引入消息队列后,生产者可以将任务提交到队列中,消费者按需消费任务,从而提升系统的吞吐量。

  • 示例:快递员送快递到客户家,效率低下。而菜鸟驿站的出现让快递员只需将包裹放置在驿站,客户可以根据自己的时间安排取件。这种方式大大提高了效率。
解耦

解耦是消息队列最重要的功能之一。服务之间通过消息队列传递数据,而不是直接调用对方的服务接口,这样可以有效降低系统的耦合度。

  • 示例:《Thinking in JAVA》原书是英文版,但通过翻译社将内容翻译成多种语言,满足不同读者的需求。翻译社起到了桥梁作用,不同语言之间的沟通不再直接依赖于作者和读者。
削峰填谷

在高并发场景下,系统往往会遇到流量高峰,导致系统负载过重。通过消息队列,可以将流量暂存并按固定速率处理,从而避免系统崩溃。

  • 示例:长江每年都会涨水,但通过三峡大坝的调节,下游的出水速度保持稳定,避免了洪水泛滥。

2. 为什么要用 Kafka

Kafka 是一种高吞吐量、低延迟、分布式的消息队列系统,适合在大规模数据处理场景中使用。以下是 Kafka 的典型使用场景和优势:

日志聚合场景

在大规模分布式系统中,各个服务都会产生大量的日志信息。传统的日志收集方式往往存在以下问题:

  • 数据量大:需要快速收集和处理来自各个渠道的海量日志。
  • 容错性要求高:集群中允许少量节点出现故障而不影响整体服务。
  • 功能专注:Kafka 专注于高吞吐量、低延迟的消息传递,不追求复杂的消息处理功能。
核心优势
  • 高吞吐量:Kafka 能够处理数百万 TPS(每秒事务处理量)。
  • 低延迟:通常在毫秒级别的延迟时间内完成消息传递。
  • 可扩展性:通过增加节点和分区数量,可以线性扩展处理能力。
  • 容错性:通过副本机制保证消息的高可用性。
  • 持久化:Kafka 使用磁盘存储消息,保证消息的持久性。

二、Kafka 快速上手

1. 实验环境准备

要快速上手 Kafka,首先需要搭建实验环境。以下是推荐的实验环境配置:

  • 虚拟机数量:3 台
  • 操作系统:CentOS 7
  • Java 版本:Java 8
环境配置步骤
  1. 下载 Kafka 和 Zookeeper。
  2. 将 Kafka 解压到 /app/kafka 目录,将 Zookeeper 解压到 /app/zookeeper 目录。
  3. 配置环境变量,确保系统能够识别 Kafka 和 Zookeeper 的命令。
  4. 关闭防火墙,以避免端口阻塞:
    systemctl stop firewalld.service
    

2. 单机服务体验

为了更直观地理解 Kafka 的工作原理,我们可以先体验单机版 Kafka 服务。

步骤 1:启动 Zookeeper

Kafka 依赖 Zookeeper 进行元数据管理和选举机制。在实际部署中,通常使用独立的 Zookeeper 集群。

启动 Zookeeper 服务:

nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

检查 Zookeeper 是否正常启动:

jps

确认输出中有 QuorumPeerMain 进程。

步骤 2:启动 Kafka

启动 Kafka 服务前,需要确保 Zookeeper 服务正常运行。

启动 Kafka 服务:

nohup bin/kafka-server-start.sh config/server.properties &

确认 Kafka 是否正常启动:

jps

检查输出中是否包含 Kafka 进程。

步骤 3:创建和使用 Topic

Kafka 的基础工作机制是通过 Topic 进行消息的传递。

  1. 创建 Topic

    bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
    
  2. 发送消息 启动生产者端并发送消息:

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    > 这是一条测试消息
    
  3. 消费消息 启动消费者端并接收消息:

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    

3. 理解 Kafka 的消息传递机制

Kafka 的消息传递机制可以通过以下核心组件来理解:

  • 生产者(Producer):将消息发送到指定的 Topic。
  • 消费者(Consumer):从指定的 Topic 消费消息。
  • Topic:逻辑概念,表示一类业务消息的集合。
  • Partition:物理概念,实际存储消息的分区。
  • Broker:Kafka 服务器实例,存储和管理 Partition。

Kafka 的设计目标是通过这些组件实现高效、可靠的消息传递,满足企业级数据管道的需求。


四、Kafka 集群服务

1. 为什么要使用集群

单机部署的 Kafka 在性能上虽然已经非常出色,但在实际生产环境中通常需要使用 Kafka 集群来进一步提升数据存储能力和系统的高可用性。集群可以解决以下问题:

1.1 解决海量数据存储问题

单个 Broker 服务器的存储能力有限,当数据量增长到一定程度时,单机难以承载。通过集群部署,可以将数据分散存储在多个 Broker 中,从而提升整体存储能力。

1.2 提高系统容错能力

单机环境中,如果 Broker 崩溃,所有数据都会丢失。而集群环境下,每个 Partition 都有多个副本,即使部分 Broker 节点宕机,系统依然可以正常运行,保证数据的高可用性。


五、理解服务端的 Topic、Partition 和 Broker

Kafka 的核心架构由 Topic、Partition 和 Broker 组成,这三者之间的关系至关重要:

  • Topic:一个逻辑的消息分类,每个 Topic 包含多条消息。
  • Partition:每个 Topic 可以分成多个 Partition,每个 Partition 是一个消息队列。
  • Broker:Kafka 的服务器实例,负责存储 Partition 数据,并处理客户端请求。

5.1 创建分布式 Topic 示例

bin/kafka-topics.sh --bootstrap-server worker1:9092 --create --replication-factor 2 --partitions 4 --topic distributedTopic

5.2 查看 Topic 信息

bin/kafka-topics.sh --bootstrap-server worker1:9092 --describe --topic distributedTopic

六、章节总结:Kafka 集群的整体结构

通过前面的学习,我们可以总结 Kafka 集群的整体结构:

  1. Topic 是逻辑概念,Producer 和 Consumer 通过 Topic 进行消息传递。
  2. Partition 是实际存储单元,保证数据分散存储和负载均衡。
  3. Broker 是 Kafka 的服务器实例,存储 Partition 数据并处理客户端请求。
  4. Zookeeper 管理 Kafka 集群的元数据和选举过程。
  5. Controller 是 Kafka 集群的核心管理节点,负责管理 Topic 和 Partition 的分配。

七、Spring Boot 实现 Kafka 消息有序性

为了保证 Kafka 的消息有序性,可以使用 Spring Boot 和 Kafka 的整合来实现。在 Java 的 Spring Boot 项目中,我们通过指定消息的 Key 和自定义分区器来确保消息发送到相同的 Partition,从而实现有序性。

7.1 依赖配置

在 Maven 项目中,引入 Kafka 的依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-kafka</artifactId>
</dependency>

7.2 配置 KafkaProducer

创建 Kafka 的生产者配置类:

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaProducerConfig {@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}
}

7.3 发送有序消息

创建一个消息发送服务,确保消息使用相同的 Key 发送到同一个 Partition:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaProducerService {private static final String TOPIC = "test_topic";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String key, String message) {kafkaTemplate.send(TOPIC, key, message);}
}

7.4 自定义分区器(可选)

如果有更复杂的分区逻辑,可以自定义分区器:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;public class CustomPartitioner implements Partitioner {@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 自定义分区逻辑return Math.abs(key.hashCode()) % cluster.partitionCountForTopic(topic);}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

7.5 设置一个 Topic 对应一个 Partition 的方法

如果业务需求是保证某个 Topic 的消息全局有序,可以在创建 Topic 时将 Partition 数量设置为 1,从而保证所有消息存储在同一个 Partition 中,实现全局有序。

创建一个 Partition 的 Topic
bin/kafka-topics.sh --create --topic singlePartitionTopic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
在 Spring Boot 中发送消息到该 Topic
@Service
public class KafkaSinglePartitionProducerService {private static final String TOPIC = "singlePartitionTopic";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send(TOPIC, message);}
}

通过这种方式,所有发送到 singlePartitionTopic 的消息都会进入同一个 Partition,确保消息顺序性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/501531.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Flutter中的网络请求图片存储为缓存,与定制删除本地缓存

Flutter中的网络请求图片存储为缓存&#xff0c;与定制删除本地缓存 1&#xff1a;封装请求图片函数 2&#xff1a;访问的图片都会转为本地缓存&#xff0c;当相同的请求url&#xff0c;会在本地调用图片 3&#xff1a;本地缓存管理【windows与andriod已经测试】【有页面】【有…

无线AP安装注意事项

现在的办公楼、酒店等项目中都设计含有网络无线覆盖这一项&#xff0c;在项目实施中&#xff0c;往往采用的是便捷并且后期便于网络无线设备管理的无线ap设备&#xff0c;作为前端无线信号的覆盖。在具体安装无线AP过程中&#xff0c;我们必须要注意以下几点才能保证项目实施完…

Golang的容器编排实践

Golang的容器编排实践 一、Golang中的容器编排概述 作为一种高效的编程语言&#xff0c;其在容器编排领域也有着广泛的运用。容器编排是指利用自动化工具对容器化的应用进行部署、管理和扩展的过程&#xff0c;典型的容器编排工具包括Docker Swarm、Kubernetes等。在Golang中&a…

计算机毕业设计Django+Tensorflow音乐推荐系统 音乐可视化 卷积神经网络CNN LSTM音乐情感分析 机器学习 深度学习 Flask

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…

C# 在PDF中添加和删除水印注释 (Watermark Annotation)

目录 使用工具 C# 在PDF文档中添加水印注释 C# 在PDF文档中删除水印注释 PDF中的水印注释是一种独特的注释类型&#xff0c;它通常以透明的文本或图片形式叠加在页面内容之上&#xff0c;为文档添加标识或信息提示。与传统的静态水印不同&#xff0c;水印注释并不会永久嵌入…

分析服务器 systemctl 启动gozero项目报错的解决方案

### 分析 systemctl start beisen.service 报错 在 Linux 系统中&#xff0c;systemctl 是管理系统和服务的主要工具。当我们尝试重启某个服务时&#xff0c;如果服务启动失败&#xff0c;systemctl 会输出错误信息&#xff0c;帮助我们诊断和解决问题。 本文将通过一个实际的…

Dubbo扩展点加载机制

加载机制中已经存在的一些关键注解&#xff0c;如SPI、©Adaptive> ©Activateo然后介绍整个加载机制中最核心的ExtensionLoader的工作流程及实现原理。最后介绍扩展中使用的类动态编译的实 现原理。 Java SPI Java 5 中的服务提供商https://docs.oracle.com/jav…

如何利用Logo设计免费生成器创建专业级Logo

在当今的商业世界中&#xff0c;一个好的Logo是品牌身份的象征&#xff0c;它承载着公司的形象与理念。设计一个专业级的Logo不再需要花费大量的金钱和时间&#xff0c;尤其是当我们拥有Logo设计免费生成器这样的工具时。接下来&#xff0c;让我们深入探讨如何利用这些工具来创…

游戏如何检测iOS越狱

不同于安卓的开源生态&#xff0c;iOS一直秉承着安全性更高的闭源生态&#xff0c;系统中的硬件、软件和服务会经过严格审核和测试&#xff0c;来保障安全性与稳定性。 据FairGurd观察&#xff0c;虽然iOS系统具备一定的安全性&#xff0c;但并非没有漏洞&#xff0c;如市面上…

智联视频超融合平台:电力行业的智能守护者

文章目录 一、远程实时监控与设备状态监测二、提高应急响应能力三、实现无人值守与减员增效四、保障电力设施安全与防范外部破坏五、提升电网运行管理效率与决策科学性六、助力电力企业数字化转型与智能化发展七、智联视频超融合平台 在当今数字化浪潮下&#xff0c;视频联网平…

卸载干净 IDEA(图文讲解)

目录 1、卸载 IDEA 程序 2、注册表清理 3、残留清理 1、卸载 IDEA 程序 点击屏幕左下角 Windows 图标 -> 设置-控制面板->intellij idea 勾选第一栏 Delete IntelliJ IDEA 2022.2 caches and local history&#xff0c;表示同时删除 IDEA 本地缓存以及历史。 Delete I…

计算机网络•自顶向下方法:路由选路算法

路由选路算法 在网络层中&#xff0c;选路是指数据包从源主机到目的主机的传输过程中&#xff0c;如何通过网络中的路由器选择一条合适的路径。路由器根据网络拓扑、路由表、协议规则等来决定如何将数据包转发到下一跳&#xff0c;直到数据包到达目的地。 选路算法分类 静态算…

Qemu配置QXL显卡支持分辨率

默认情况下&#xff0c;创建的vm的视频RAM限制为16MB。在win操作系统中分辨率最高就只能调到1024x768。 <video><model typecirrus vram16384 heads1 primaryyes/><address typepci domain0x0000 bus0x00 slot0x02 function0x0/> </video>单单修改ram…

【区块链】零知识证明基础概念详解

&#x1f308;个人主页: 鑫宝Code &#x1f525;热门专栏: 闲话杂谈&#xff5c; 炫酷HTML | JavaScript基础 ​&#x1f4ab;个人格言: "如无必要&#xff0c;勿增实体" 文章目录 零知识证明基础概念详解引言1. 零知识证明的定义与特性1.1 基本定义1.2 三个核心…

Redis面试相关

Redis开篇 使用场景 缓存 缓存穿透 解决方法一&#xff1a; 方法二&#xff1a; 通过多次hash来获取对应的值。 小结 缓存击穿 缓存雪崩 打油诗 双写一致性 两种不同的要求 强一致 读锁代码 写锁代码 强一致&#xff0c;性能低。 延迟一致 方案一&#xff1a;消息队列 方…

以太网协议和LWIP协议详解

一、以太网协议简介 以太网是一种产生较早&#xff0c;使用相当广泛的局域网技术。目前以太网根据速度等级分类大概分为&#xff1a;标准以太网&#xff08;10Mbit/s&#xff09;&#xff0c;快速以太网&#xff08;100Mbit/s&#xff09;&#xff0c;千兆以太网&#xff08;1…

Qt|QWidget窗口支持旋转

功能实现&#xff1a;使用QWidget创建的窗口支持窗口旋转功能。 展示的示例中支持由水平方向旋转至垂直方向。至于其它角度旋转的问题&#xff0c;看完这篇文章后应该会很简单能实现的&#xff01; 开发环境&#xff1a;win VS2019 Qt 5.15.2 在实现之前也有想用使用 QProp…

微信小程序滑动解锁、滑动验证

微信小程序简单滑动解锁 效果 通过 movable-view &#xff08;可移动的视图容器&#xff0c;在页面中可以拖拽滑动&#xff09;实现的简单微信小程序滑动验证 movable-view 官方说明&#xff1a;https://developers.weixin.qq.com/miniprogram/dev/component/movable-view.ht…

微服务实战——购物车模块实战

购物车 1. 数据模型分析 1.1. 需求描述 用户可以在登录状态下将商品添加到购物车【用户购物车/在线购物车】 放入数据库mongodb放入 redis&#xff08;采用&#xff09; 登录以后&#xff0c;会将临时购物车的数据全部合并过来&#xff0c;并清空临时购物车&#xff1b; 用…

1961-2022年中国大陆多干旱指数数据集(SPI/SPEI/EDDI/PDSI/SC-PDSI/VPD)

DOI: 10.5194/essd-2024-270 干旱指数对于评估和管理缺水和农业风险至关重要;然而&#xff0c;现有数据集中缺乏统一的数据基础&#xff0c;导致不一致&#xff0c;对干旱指数的可比性提出了挑战。本研究致力于创建CHM_Drought&#xff0c;这是一个创新且全面的长期气象干旱数…