Kafka中的Topic

在Kafka中,Topic是消息的逻辑容器,用于组织和分类消息。本文将深入探讨Kafka Topic的各个方面,包括创建、配置、生产者和消费者,以及一些实际应用中的示例代码。

1. 介绍

在Kafka中,Topic是消息的逻辑通道,生产者将消息发布到Topic,而消费者从Topic订阅消息。每个Topic可以有多个分区(Partitions),每个分区可以在不同的服务器上,以实现横向扩展。

2. 创建和配置Topic

2.1 创建Topic

使用Kafka提供的命令行工具(kafka-topics.sh)或Kafka的API来创建Topic。下面是一个使用命令行工具创建Topic的示例:

bin/kafka-topics.sh --create --topic my_topic --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092

这将创建一个名为my_topic的Topic,有3个分区,复制因子为2。

2.2 配置Topic

Kafka的Topic有各种配置选项,可以通过修改Topic的属性来满足不同的需求。例如,可以设置消息保留时间、清理策略等。以下是一个配置Topic属性的示例:

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my_topic --alter --add-config max.message.bytes=1048576

这将修改my_topic的配置,将最大消息字节数设置为1 MB。

3. 生产者和消费者

3.1 生产者

生产者负责将消息发布到Topic。使用Kafka的Producer API,可以轻松地创建一个生产者。以下是一个简单的Java示例代码:

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(properties);producer.send(new ProducerRecord<>("my_topic", "key1", "value1"));
producer.close();

3.2 消费者

消费者从Topic中读取消息。Kafka的Consumer API提供了强大而灵活的方式来实现消费者。

以下是一个简单的Java示例代码:

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "my_group");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("my_topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("Offset = %d, Key = %s, Value = %s%n", record.offset(), record.key(), record.value());}
}

4. 实际应用示例

4.1 实时日志处理

在实时日志处理的场景中,Kafka的Topic可以按照日志类型进行划分,每个Topic代表一种日志类型。这样的设计可以使得系统更具可维护性、可扩展性,并且允许不同类型的日志通过独立的消费者进行处理。以下是一个更详细的示例代码,展示如何在实时日志处理中使用Kafka Topic:

4.1.1 创建日志类型Topic

首先,为不同的日志类型创建各自的Topic。以错误日志和访问日志为例:

# 创建错误日志Topic
bin/kafka-topics.sh --create --topic error_logs --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092# 创建访问日志Topic
bin/kafka-topics.sh --create --topic access_logs --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092
4.1.2 生产者发布日志消息

在应用中,生成错误日志和访问日志的代码可能如下:

// 错误日志生产者
Producer<String, String> errorLogProducer = new KafkaProducer<>(errorLogProperties);
errorLogProducer.send(new ProducerRecord<>("error_logs", "Error message"));// 访问日志生产者
Producer<String, String> accessLogProducer = new KafkaProducer<>(accessLogProperties);
accessLogProducer.send(new ProducerRecord<>("access_logs", "Access log message"));
4.1.3 消费者实时处理日志

创建独立的消费者来处理错误日志和访问日志:

// 错误日志消费者
Consumer<String, String> errorLogConsumer = new KafkaConsumer<>(errorLogProperties);
errorLogConsumer.subscribe(Collections.singletonList("error_logs"));while (true) {ConsumerRecords<String, String> records = errorLogConsumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 处理错误日志System.out.printf("Error Log - Offset = %d, Value = %s%n", record.offset(), record.value());}
}// 访问日志消费者
Consumer<String, String> accessLogConsumer = new KafkaConsumer<>(accessLogProperties);
accessLogConsumer.subscribe(Collections.singletonList("access_logs"));while (true) {ConsumerRecords<String, String> records = accessLogConsumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 处理访问日志System.out.printf("Access Log - Offset = %d, Value = %s%n", record.offset(), record.value());}
}
4.1.4 实时监控和分析

消费者可以通过实时处理日志来进行监控和分析。例如,可以使用流处理框架(如Kafka Streams)对日志进行聚合、过滤或转换。以下是一个简化的示例:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> errorLogsStream = builder.stream("error_logs");
KStream<String, String> accessLogsStream = builder.stream("access_logs");// 在这里进行实时处理,如聚合、过滤等// 通过输出Topic将处理结果发送到下游系统
errorLogsStream.to("processed_error_logs");
accessLogsStream.to("processed_access_logs");KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

通过这种设计,可以根据实际需要扩展不同类型的日志处理,同时确保系统具有高度的灵活性和可扩展性。在实际应用中,可能需要更详细的配置和处理逻辑,以满足具体的监控和分析需求。

4.2 事件溯源

在事件驱动的架构中,事件溯源是一种强大的方式,通过创建一个专门的Kafka Topic来记录每个业务事件的发生,以便随时追踪和回溯整个系统的状态。以下是一个基于Kafka的事件溯源的详细示例代码:

4.2.1 创建事件Topic

首先,为每个关键的业务事件创建一个专用的Kafka Topic,例如order_createdorder_shipped等:

# 创建订单创建事件Topic
bin/kafka-topics.sh --create --topic order_created --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092# 创建订单发货事件Topic
bin/kafka-topics.sh --create --topic order_shipped --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092
4.2.2 发布业务事件

在应用中,当业务事件发生时,将事件发布到相应的Topic。以下是一个订单创建事件和订单发货事件的示例:

// 订单创建事件生产者
Producer<String, String> orderCreatedProducer = new KafkaProducer<>(orderCreatedProperties);
orderCreatedProducer.send(new ProducerRecord<>("order_created", "order_id", "Order created - Order ID: 123"));// 订单发货事件生产者
Producer<String, String> orderShippedProducer = new KafkaProducer<>(orderShippedProperties);
orderShippedProducer.send(new ProducerRecord<>("order_shipped", "order_id", "Order shipped - Order ID: 123"));
4.2.3 事件溯源消费者

为了实现事件溯源,我们需要一个专用的消费者来订阅所有的事件Topic,并将事件记录到一个持久化存储中(如数据库、日志文件等):

// 事件溯源消费者
Consumer<String, String> eventTraceConsumer = new KafkaConsumer<>(eventTraceProperties);
eventTraceConsumer.subscribe(Arrays.asList("order_created", "order_shipped"));while (true) {ConsumerRecords<String, String> records = eventTraceConsumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 处理事件,可以将事件记录到数据库或日志文件中System.out.printf("Event Trace - Offset = %d, Key = %s, Value = %s%n", record.offset(), record.key(), record.value());// 持久化处理逻辑}
}
4.2.4 事件回溯和分析

通过上述设置,可以在任何时候回溯系统中的每个事件,了解事件的发生时间、顺序和内容。通过将事件存储到持久化存储中,可以建立一个事件溯源系统,支持系统状态的分析、回滚和审计。

还可以使用流处理来实时分析事件,例如计算每个订单的处理时间、统计每个事件类型的发生频率等。以下是一个简单的流处理示例:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> eventStream = builder.stream(Arrays.asList("order_created", "order_shipped"));// 在这里进行实时处理,如计算处理时间、统计频率等// 通过输出Topic将处理结果发送到下游系统
eventStream.to("processed_events");KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

通过这种方式,可以在事件溯源系统中实现强大的监控、分析和管理功能,提高系统的可观察性和可维护性。

5. 消息处理语义

Kafka支持不同的消息处理语义,包括最多一次、最少一次和正好一次。这些语义由消费者的配置决定,可以根据应用的要求进行选择。以下是一个使用最多一次语义的消费者示例代码:

properties.put("enable.auto.commit", "false"); // 禁用自动提交偏移量
properties.put("auto.offset.reset", "earliest"); // 设置偏移量重置策略为最早Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("my_topic"));try {while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 处理消息System.out.printf("Offset = %d, Key = %s, Value = %s%n", record.offset(), record.key(), record.value());}consumer.commitSync(); // 手动提交偏移量}
} finally {consumer.close();
}

6. 安全性和权限控制

Kafka提供了安全性特性,包括SSL加密、SASL认证等。在生产环境中,确保适当的安全性设置是至关重要的。

以下是一个使用SSL连接的生产者示例:

properties.put("security.protocol", "SSL");
properties.put("ssl.truststore.location", "/path/to/truststore");
properties.put("ssl.truststore.password", "truststore_password");Producer<String, String> producer = new KafkaProducer<>(properties);

7. 故障容忍和可伸缩性

7.1 多节点分布和分区

在Kafka中,分布式的设计允许数据分布在多个节点上,这提供了高度的可伸缩性。每个Topic可以分成多个分区,而这些分区可以分布在不同的服务器上。这种分布式设计使得Kafka可以轻松地处理大规模数据,并实现水平扩展。

7.1.1 增加分区数

要增加Topic的分区数,可以使用以下命令:

bin/kafka-topics.sh --alter --topic my_topic --partitions 5 --bootstrap-server localhost:9092

这将把my_topic的分区数增加到5,从而提高系统的吞吐量和可伸缩性。

7.2 复制因子

Kafka通过数据的复制来实现容错性。每个分区可以有多个副本,这些副本分布在不同的节点上。在节点发生故障时,其他副本可以继续提供服务。

7.2.1 增加复制因子

要增加Topic的复制因子,可以使用以下命令:

bin/kafka-topics.sh --alter --topic my_topic --partitions 3 --replication-factor 3 --bootstrap-server localhost:9092

这将把my_topic的复制因子增加到3,确保每个分区有3个副本。增加复制因子提高了系统的容错性,因为每个分区都有多个副本,即使一个节点发生故障,其他节点上的副本仍然可用。

7.3 节点故障处理

Kafka能够处理节点故障,确保系统的可用性。当一个节点发生故障时,Kafka会自动将该节点上的分区重新分配到其他可用节点上,以保持分区的复制因子。

7.3.1 节点故障模拟

为了模拟节点故障,你可以通过停止一个Kafka broker进程来模拟。Kafka会自动感知到该节点的故障,并进行分区的重新分配。

# 停止一个Kafka broker进程
bin/kafka-server-stop.sh config/server-1.properties

7.4 性能调优

在实际应用中,通过监控系统的性能指标,你可以调整Kafka的配置以满足不同的性能需求。例如,调整日志刷写频率、调整内存和磁盘的配置等,都可以对系统的性能产生影响。

总结

Kafka的Topic是构建实时流数据处理系统的核心组件之一。通过深入了解Topic的创建、配置、生产者和消费者,以及实际应用中的示例代码,可以更好地理解和应用Kafka。在实际项目中,根据具体需求和场景进行灵活配置,以确保系统的可靠性、性能和安全性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/210996.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode-478. 在圆内随机生成点【几何 数学 拒绝采样 随机化】

LeetCode-478. 在圆内随机生成点【几何 数学 拒绝采样 随机化】 题目描述&#xff1a;解题思路一&#xff1a;一个最简单的方法就是在一个正方形内生成随机采样的点&#xff0c;然后拒绝不在内切圆中的采样点。解题思路二&#xff1a;具体思想是先生成一个0到r的随机数len&…

加强网站稳定性!学习如何进行高效压力测试!

前言 1、什么是压力测试&#xff1f; 软件压力测试是一种基本的质量保证行为&#xff0c;它是每个重要软件测试工作的一部分。 软件压力测试的基本思路很简单&#xff1a;不是在常规条件下运行手动或自动测试&#xff0c;而是在计算机数量较少或系统资源匮乏的条件下运行测试…

k8s之镜像拉取时使用secret

k8s之secret使用 一、说明二、secret使用2.1 secret类型2.2 创建secret2.3 配置secret 一、说明 从公司搭建的网站镜像仓库&#xff0c;使用k8s部署服务时拉取镜像失败&#xff0c;显示未授权&#xff1a; 需要在拉取镜像时添加认证信息. 关于secret信息,参考: https://www.…

AntDesignBlazor示例——创建项目

本示例是AntDesign Blazor的入门示例&#xff0c;在学习的同时分享出来&#xff0c;以供新手参考。 示例代码仓库&#xff1a;https://gitee.com/known/AntDesignDemo 1. 开发环境 VS2022 17.8.2.NET8AntDesign 0.16.2 2. 学习目标 创建新项目安装AntDesign组件包及使用方…

2D与3D图形的基本变换

1. 2d transformations 1.1缩放(Scaling) 其实这个转换非常简单&#xff0c;如图所示就是把x与y进行s倍的缩放&#xff0c;而我们图中的这个矩阵正好满足这一算法。 1.2镜像(Reflection) 这个镜像变换可以和上面的做类比&#xff0c;简单看一下就行。 1.3错切(Shearing) 当然…

《数据结构、算法与应用C++语言描述》-线索二叉树的定义与C++实现

_23Threaded BinaryTree 可编译运行代码见&#xff1a;GIithub::Data-Structures-Algorithms-and-Applications/_24Threaded_BinaryTree 线索二叉树定义 在普通二叉树中&#xff0c;有很多nullptr指针被浪费了&#xff0c;可以将其利用起来。 首先我们要来看看这空指针有多少…

单片机怎么实现真正的多线程?

单片机怎么实现真正的多线程? 不考虑多核情况时&#xff0c;CPU在一个时间点只能做一件事&#xff0c;因为切换的速度快所以看起来好像是同时执行多个线程而已。 实际上就是用定时器来做时基&#xff0c;以时间片的方式分别执行来实现的&#xff0c;只不过实现起来细节比较复…

C语言--每日选择题--Day37

第一题 1. 有以下说明语句&#xff1a;则下面引用形式错误的是&#xff08;&#xff09; struct Student {int num;double score; };struct Student stu[3] {{1001,80}, {1002,75}, {1003,91}} struct Student *p stu; A&#xff1a;p->num B&#xff1a;(p).num C&#…

LeetCode:2477. 到达首都的最少油耗(DFS C++、Java)

目录 2477. 到达首都的最少油耗 题目描述&#xff1a; 实现代码与解析&#xff1a; dfs 2477. 到达首都的最少油耗 题目描述&#xff1a; 给你一棵 n 个节点的树&#xff08;一个无向、连通、无环图&#xff09;&#xff0c;每个节点表示一个城市&#xff0c;编号从 0 到 n…

1-4节电池升降压充电IC解决方案

描述 MP2760是一款集成窄电压DC&#xff08;NVDC&#xff09;电源路径管理功能和USB On-the-Go(OTG)功能的升降压充电IC&#xff0c;兼容USB PD&#xff0c;适用于单节至4节串联的电池包应用。该芯片的充电输入电压范围广&#xff0c;可支持最高22V。 当启用电池放电模式&…

线性可分SVM摘记

线性可分SVM摘记 0. 线性可分1. 训练样本到分类面的距离2. 函数间隔和几何间隔、(硬)间隔最大化3. 支持向量 \qquad 线性可分的支持向量机是一种二分类模型&#xff0c;支持向量机通过核技巧可以成为非线性分类器。本文主要分析了线性可分的支持向量机模型&#xff0c;主要取自…

企业级SQL开发:如何审核发布到生产环境的SQL性能

自从上世纪 70 年代数据库开始普及以来&#xff0c;DBA 们就不停地遭遇各种各样的数据库管理难题&#xff0c;其中最为显著的&#xff0c;可能就是日常的开发任务中&#xff0c;研发人员们对于核心库进行变更带来的一系列风险。由于针对数据库的数据变更是一项非常常见的任务&a…

对抗生成网络-G与D的loss异常问题

我最近在**使用DCGAN训练个人的数据集**时&#xff0c;出现了D loss 下降趋于0&#xff0c;但是G loss 却不停上升。我总结了一下几点原因&#xff1a; 生成器损失为1或者大于1通常表明生成器的训练可能存在问题&#xff0c;这可能是由于训练不稳定、超参数设置不当或网络结构问…

基于阿里云服务网格流量泳道的全链路流量管理(一):严格模式流量泳道

作者&#xff1a;尹航 概述 灰度发布是一种常见的对新版本应用服务的发布手段&#xff0c;其特点在于能够将流量在服务的稳定版本和灰度版本之间时刻切换&#xff0c;以帮助我们用更加可靠的方式实现服务的升级。在流量比例切换的过程中&#xff0c;我们可以逐步验证新版本服…

【网络奇缘】- 如何自己动手做一个五类|以太网|RJ45|网络电缆

​ ​ &#x1f308;个人主页: Aileen_0v0&#x1f525;系列专栏: 一见倾心,再见倾城 --- 计算机网络~&#x1f4ab;个人格言:"没有罗马,那就自己创造罗马~" 本篇文章关于计算机网络的动手小实验---如何自己动手做一个网线&#xff0c; 也是为后面的物理层学习进…

C# WPF上位机开发(图形显示软件)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 在实际应用中&#xff0c;有一种情况就是&#xff0c;我们需要经常对数据进行图形化显示&#xff0c;这样会比较直观一点。比如经济统计里面的同比…

软件设计之桥接模式

实现茶水间&#xff1a;茶可以分红茶和绿茶&#xff0c;每种茶又可以分大杯和中杯&#xff0c;现在你是服务员需要计算茶水的价格。 package Bridge;public class BlackTea implements TeaKind{private float redTeaPrice 2.0f;Overridepublic float price() {return redTeaPr…

WordPiece词表的创建

文章目录 一、简单介绍二、步骤流程2.1 预处理2.2 计数2.3 分割2.4 添加subword 三、代码实现 本篇内容主要介绍如何根据提供的文本内容创建 WordPiece vocabulary&#xff0c;代码来自谷歌&#xff1b; 一、简单介绍 wordpiece的目的是&#xff1a;通过考虑单词内部构造&…

Canal笔记:安装与整合Springboot模式Mysql同步Redis

官方文档 https://github.com/alibaba/canal 使用场景 学习一件东西前&#xff0c;要知道为什么使用它。 1、同步mysql数据到redis 常规情况下&#xff0c;产生数据的方法可能有很多地方&#xff0c;那么就需要在多个地方中&#xff0c;都去做mysql数据同步到redis的处理&…

2005-2021年地级市绿色发展注意力数据(根据政府报告文本词频统计)

2005-2021年地级市绿色发展注意力数据&#xff08;根据政府报告文本词频统计&#xff09; 1、时间&#xff1a;2005-2021年 2、指标&#xff1a;省、市、年份、一级指标、关键词、关键词词频、总词频 3、范围&#xff1a;270个地级市 4、来源&#xff1a;地级市政府工作报告…