spring boot 使用 Kafka

一、Kafka作为消息队列的好处

  1. 高吞吐量:Kafka能够处理大规模的数据流,并支持高吞吐量的消息传输。

  2. 持久性:Kafka将消息持久化到磁盘上,保证了消息不会因为系统故障而丢失。

  3. 分布式:Kafka是一个分布式系统,可以在多个节点上运行,具有良好的可扩展性和容错性。

  4. 支持多种协议:Kafka支持多种协议,如TCP、HTTP、UDP等,可以与不同的系统进行集成。

  5. 灵活的消费模式:Kafka支持多种消费模式,如拉取和推送,可以根据需要选择合适的消费模式。

  6. 可配置性强:Kafka的配置参数非常丰富,可以根据需要进行灵活配置。

  7. 社区支持:Kafka作为Apache旗下的开源项目,拥有庞大的用户基础和活跃的社区支持,方便用户得到及时的技术支持。

二、springboot中使用Kafka

  1. 添加依赖:在pom.xml文件中添加Kafka的依赖,包括spring-kafka和kafka-clients。确保版本与你的项目兼容。

  2. 创建生产者:创建一个Kafka生产者类,实现Producer接口,并使用KafkaTemplate发送消息。

  3. 配置生产者:在Spring Boot的配置文件中配置Kafka生产者的相关参数,例如bootstrap服务器地址、Kafka主题等。

  4. 发送消息:在需要发送消息的地方,注入Kafka生产者,并使用其发送消息到指定的Kafka主题。

  5. 创建消费者:创建一个Kafka消费者类,实现Consumer接口,并使用KafkaTemplate订阅指定的Kafka主题。

  6. 配置消费者:在Spring Boot的配置文件中配置Kafka消费者的相关参数,例如group id、auto offset reset等。

  7. 接收消息:在需要接收消息的地方,注入Kafka消费者,并使用其接收消息。

  8. 处理消息:对接收到的消息进行处理,例如保存到数据库或进行其他业务逻辑处理。

三、使用Kafka

pom中填了依赖

<dependency>  <groupId>org.springframework.kafka</groupId>  <artifactId>spring-kafka</artifactId>  <version>2.8.1</version>  
</dependency>  
<dependency>  <groupId>org.apache.kafka</groupId>  <artifactId>kafka-clients</artifactId>  <version>2.8.1</version>  
</dependency>
  1. 创建生产者:创建一个Kafka生产者类,实现Producer接口,并使用KafkaTemplate发送消息。

import org.apache.kafka.clients.producer.*;  
import org.springframework.beans.factory.annotation.Value;  
import org.springframework.kafka.core.KafkaTemplate;  
import org.springframework.stereotype.Component;  @Component  
public class KafkaProducer {  @Value("${kafka.bootstrap}")  private String bootstrapServers;  @Value("${kafka.topic}")  private String topic;  private KafkaTemplate<String, String> kafkaTemplate;  public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {  this.kafkaTemplate = kafkaTemplate;  }  public void sendMessage(String message) {  Producer<String, String> producer = new KafkaProducer<>(bootstrapServers, new StringSerializer(), new StringSerializer());  try {  producer.send(new ProducerRecord<>(topic, message));  } catch (Exception e) {  e.printStackTrace();  } finally {  producer.close();  }  }  
}
  1. 配置生产者:在Spring Boot的配置文件中配置Kafka生产者的相关参数,例如bootstrap服务器地址、Kafka主题等。

import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.kafka.core.DefaultKafkaProducerFactory;  
import org.springframework.kafka.core.KafkaTemplate;  
import org.springframework.kafka.core.ProducerFactory;  
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;  
import org.springframework.kafka.core.ConsumerFactory;  
import org.springframework.kafka.core.ConsumerConfig;  
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;  
import org.springframework.kafka.listener.MessageListener;  
import org.springframework.context.annotation.PropertySource;  
import java.util.*;  
import org.springframework.beans.factory.*;  
import org.springframework.*;  
import org.springframework.*;expression.*;value; 																																		 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	  @Value("${kafka}")   Properties kafkaProps = new Properties(); @Bean public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf){ KafkaTemplate<String, String> template = new KafkaTemplate<>(pf); template .setMessageConverter(new StringJsonMessageConverter()); template .setSendTimeout(Duration .ofSeconds(30)); return template ; } @Bean public ProducerFactory<String, String> producerFactory(){ DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(kafkaProps); factory .setBootstrapServers(bootstrapServers); factory .setKeySerializer(new StringSerializer()); factory .setValueSerializer(new StringSerializer()); return factory ; } @Bean public ConsumerFactory<String, String> consumerFactory(){ DefaultKafkaConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(consumerConfigProps); factory .setBootstrapServers(bootstrapServers); factory .setKeyDeserializer(new StringDeserializer()); factory .setValueDeserializer(new StringDeserializer()); return factory ; } @Bean public ConcurrentMessageListenerContainer<String, String> container(ConsumerFactory<String, String> consumerFactory, MessageListener listener){ ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory); container .setMessageListener(listener); container .setConcurrency(3); return container ; } @Bean public MessageListener

消费者

import org.apache.kafka.clients.consumer.*;  
import org.springframework.kafka.core.KafkaTemplate;  
import org.springframework.stereotype.Component;  @Component  
public class KafkaConsumer {  @Value("${kafka.bootstrap}")  private String bootstrapServers;  @Value("${kafka.group}")  private String groupId;  @Value("${kafka.topic}")  private String topic;  private KafkaTemplate<String, String> kafkaTemplate;  public KafkaConsumer(KafkaTemplate<String, String> kafkaTemplate) {  this.kafkaTemplate = kafkaTemplate;  }  public void consume() {  Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfigs());  consumer.subscribe(Collections.singletonList(topic));  while (true) {  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));  for (ConsumerRecord<String, String> record : records) {  System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());  }  }  }  private Properties consumerConfigs() {  Properties props = new Properties();  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);  props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");  return props;  }  
}

四、kafka与rocketMQ比较

Kafka和RocketMQ都是开源的消息队列系统,它们具有许多相似之处,但在一些关键方面也存在差异。以下是它们在数据可靠性、性能、消息传递方式等方面的比较:

  1. 数据可靠性:
  • Kafka使用异步刷盘方式,而RocketMQ支持异步实时刷盘、同步刷盘、同步复制和异步复制。这使得RocketMQ在单机可靠性上比Kafka更高,因为它不会因为操作系统崩溃而导致数据丢失。此外,RocketMQ新增的同步刷盘机制也进一步保证了数据的可靠性。
  1. 性能:
  • Kafka和RocketMQ在性能方面各有千秋。由于Kafka的数据以partition为单位,一个Kafka实例上可能有多达上百个partition,而一个RocketMQ实例上只有一个partition。这使得RocketMQ可以充分利用IO组的commit机制,批量传输数据,从而在replication时具有更好的性能。然而,Kafka的异步replication性能理论上低于RocketMQ的replication,因为同步replication与异步replication相比,性能上会有约20%-30%的损耗。
  1. 消息传递方式:
  • Kafka和RocketMQ在消息传递方式上也有所不同。Kafka采用Producer发送消息后,broker马上把消息投递给consumer,这种方式实时性较高,但会增加broker的负载。而RocketMQ基于Pull模式和Push模式的长轮询机制,来平衡Push和Pull模式各自的优缺点。RocketMQ的消息及时性较好,严格的消息顺序得到了保证。
  1. 其他特性:
  • Kafka在单机支持的队列数超过64个队列,而RocketMQ最高支持5万个队列。队列越多,可以支持的业务就越多。

五、kafka使用场景

  1. 实时数据流处理:Kafka可以处理大量的实时数据流,这些数据流可以来自不同的源,如用户行为、传感器数据、日志文件等。通过Kafka,可以将这些数据流进行实时的处理和分析,例如进行实时数据分析和告警。
  2. 消息队列:Kafka可以作为一个消息队列使用,用于在分布式系统中传递消息。它能够处理高吞吐量的消息,并保证消息的有序性和可靠性。
  3. 事件驱动架构:Kafka可以作为事件驱动架构的核心组件,将事件数据发布到不同的消费者,以便进行实时处理。这种架构可以简化应用程序的设计和开发,提高系统的可扩展性和灵活性。
  4. 数据管道:Kafka可以用于数据管道,将数据从一个系统传输到另一个系统。例如,可以将数据从数据库或日志文件传输到大数据平台或数据仓库。
  5. 业务事件通知:Kafka可以用于通知业务事件,例如订单状态变化、库存更新等。通过订阅Kafka主题,相关的应用程序和服务可以实时地接收到这些事件通知,并进行相应的处理。
  6. 流数据处理框架集成:Kafka可以与流处理框架集成,如Apache Flink、Apache Spark等。通过集成,可以将流数据从Kafka中实时导入到流处理框架中进行处理,实现流式计算和实时分析。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/248463.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

比Filebeat更强大的日志收集工具-Fluent bit的http插件实战

文章目录 1.前言2. fluent bit http插件配置以及参数详解3. Http 接口服务3.1 开发Http 接口服务3.2 重启fluent bit向http web服务发送数据 1.前言 Fluent Bit 的 HTTP 插件提供了一种灵活而通用的机制&#xff0c;可用于将日志数据 从各种环境中传输到指定的远程服务器&#…

C++核心编程:类和对象 笔记

4.类和对象 C面向对象的三大特性为:封装,继承,多态C认为万事万物都皆为对象&#xff0c;对象上有其属性和行为 例如&#xff1a; 人可以作为对象&#xff0c;属性有姓名、年龄、身高、体重...,行为有走、跑、跳、说话...车可以作为对象&#xff0c;属性有轮胎、方向盘、车灯…

力扣hot100 子集 回溯 超简洁

Problem: 78. 子集 文章目录 思路复杂度Code 思路 &#x1f468;‍&#x1f3eb; 参考题解 复杂度 时间复杂度: 添加时间复杂度, 示例&#xff1a; O ( n ) O(n) O(n) 空间复杂度: 添加空间复杂度, 示例&#xff1a; O ( n ) O(n) O(n) Code class Solution {List<Li…

设计模式之框架源码剖析(实战+图解)

Java设计模式 1&#xff0c;概述 随着软件开发人员人数的增多&#xff0c;一些公司急需一些高端人才。作为一个高端人才&#xff0c;设计面向对象软件是必不可少的能力&#xff0c;而软件设计是需要很深的功力&#xff0c;设计模式就要求你必须掌握。 2&#xff0c;本章特色…

[GN] 设计模式——面向对象设计原则概述

文章目录 面向对象设计原则概述单一职责原则开闭原则里氏代换原则依赖倒转原则接口隔离原则合成复用原则迪米特法则 总结 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 面向对象设计原则概述 单一职责原则 一个类只负责一个功能领域中的相应职责 类…

C#,数据检索算法之三元搜索(Ternary Search)的源代码

数据检索算法是指从数据集合&#xff08;数组、表、哈希表等&#xff09;中检索指定的数据项。 数据检索算法是所有算法的基础算法之一。 本文发布 三元搜索&#xff08;Ternary Search&#xff09;的源代码。 1 文本格式 using System; namespace Legalsoft.Truffer.Algo…

【Java异常处理与try-catch-finally】

Java异常处理与try-catch-finally try块是被监视的代码块&#xff0c;可能会发生异常的地方。当try块中的代码抛出了异常&#xff0c;程序会立即转入catch块&#xff0c;catch块根据捕获的异常类型进行处理。 Java异常处理是一种机制&#xff0c;用于捕获并处理在程序执行过程中…

flutter module打包成framework引入原生工程

Flutter - 将 Flutter 集成到现有项目&#xff08;iOS - Framework篇&#xff09; 本篇文章大幅参考了 caijinglong 大佬的总结文章&#xff1a; 把flutter作为framework添加到已存在的iOS中[1] 用 Flutter 来开发&#xff0c;从来都不可能是新开的一个纯 Flutter 项目&#xf…

基于JavaWeb开发的礼品商贸服务平台【附源码】

基于JavaWeb开发的礼品商贸服务平台【附源码】 &#x1f345; 作者主页 央顺技术团队 &#x1f345; 欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; &#x1f345; 文末获取源码联系方式 &#x1f4dd; &#x1f345; 查看下方微信号获取联系方式 承接各种定制系统 &#…

内网安全:NTLM-Relay

目录 NTLM认证过程以及攻击面 NTLM Relay攻击 NTLM攻击总结 实验环境说明 域横向移动&#xff1a;NTLM中继攻击 攻击条件 实战一&#xff1a;NTLM中继攻击-CS转发上线MSF 原理示意图 一. CS代理转发 二. MSF架设路由 三. 适用smb_relay模块进行中继攻击 域横向移动…

JavaWeb,Vue的学习(下)

Router路由 路由&#xff08;Router&#xff09;简介 定义&#xff1a;路由就是根据不同的 URL 地址展示不同的内容或页面。通俗理解&#xff1a;路由就像是一个地图&#xff0c;我们要去不同的地方&#xff0c;需要通过不同的路线进行导航。 路由的作用 单页应用程序&…

【Java程序设计】【C00184】基于SSM的旅游网站管理系统(论文+PPT)

基于SSM的旅游网站管理系统&#xff08;论文PPT&#xff09; 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于ssm的旅游网站管理系统 本系统分为前台用户、后台管理员2个功能模块。 前台用户&#xff1a;当游客打开系统的网址后&#xff0c;首先看到的就是首…

手机屏幕生产厂污废水处理需要哪些工艺设备

随着手机行业的快速发展&#xff0c;手机屏幕生产厂的规模也越来越大&#xff0c;但同时也带来了大量的污废水排放问题。为了保护环境和人类的健康&#xff0c;手机屏幕生产厂需要采取适当的工艺设备来处理污废水。本文将介绍手机屏幕生产厂污废水处理所需的工艺设备。 首先&am…

LeetCode —— 43. 字符串相乘

&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️Take your time ! &#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️&#x1f636;‍&#x1f32b;️…

OpenCV 2 - 矩阵的掩膜操作

1知识点 1-1 CV_Assert(myImage.depth() == CV_8U); 确保输入图像是无符号字符类型,若该函数括号内的表达式为false,则会抛出一个错误。 1-2 Mat.ptr(int i = 0); 获取像素矩阵的指针,索引 i 表示第几行,从0开始计行数。 1-3 const uchar* current = mylmage.ptr(row); 获得…

锁定 MinIO 操作员权限

虽然您可以使用 deployment 或 statefulset 在 Kubernetes 上部署 MinIO&#xff0c;但在 Kubernetes 上部署 MinIO 的推荐方法是通过官方的 MinIO Operator。为什么&#xff1f; MinIO Operator 简化了 Kubernetes 集群上的 MinIO 管理&#xff0c;不仅在初始部署期间&#x…

【LVGL源码移植环境搭建】

LVGL源码移植&环境搭建 ■ LVGL源码移植■ 下载LVGL源码■ 修改LVGL文件夹■■■■ 视频链接 Ubuntu模拟器环境建置 ■ LVGL源码移植 ■ 下载LVGL源码 LVGL源码 我们以选择v8.2.0为例&#xff0c;选择8.2.0下载 ■ 修改LVGL文件夹 1.我们只需要关注这5个文件即可&…

世界坐标系转换为平面地图坐标

将世界坐标系转换为平面地图坐标的方法通常涉及地图投影。地图投影是一种将地球(一个三维球体)上的点转换为平面(二维)地图上的点的方法。 这里介绍几种常见的地图投影方法: 墨卡托投影(Mercator Projection): 这是最常见的投影方式之一,尤其用于航海地图。它将经纬度…

解决:AttributeError: ‘str’ object has no attribute ‘capabilities’

解决&#xff1a;AttributeError: ‘str’ object has no attribute ‘capabilities’ 文章目录 解决&#xff1a;AttributeError: str object has no attribute capabilities背景报错问题报错翻译报错位置代码报错原因解决方法方法一&#xff1a;使用Service对象方法二&#x…

线性代数----------学习记录

线性代数发展历程 &#xff08;1&#xff09;线性方程组&#xff1a;例如二元一次方程组&#xff1b; &#xff08;2&#xff09;行列式&#xff1a;determinant,克莱默&#xff0c;莱布尼兹&#xff1b; &#xff08;3&#xff09;矩阵&#xff1a;方程个数与未知数的个数可…