Spring Boot教程之五十五:Spring Boot Kafka 消费者示例

Spring Boot Kafka 消费者示例

Spring Boot 是 Java 编程语言中最流行和使用最多的框架之一。它是一个基于微服务的框架,使用 Spring Boot 制作生产就绪的应用程序只需很少的时间。Spring Boot 可以轻松创建独立的、生产级的基于 Spring 的应用程序,您可以“直接运行”。因此,下面列出了 Spring boot 的一些主要功能。

  • 创建独立的 Spring 应用程序
  • 直接嵌入 Tomcat、Jetty 或 Undertow。
  • 提供“启动器”依赖项以简化构建配置。
  • 尽可能自动配置 Spring 和第三方库。
  • 提供可用于生产的功能,例如健康检查、指标和外部化配置。
  • 几乎不需要代码生成,也不需要 XML 配置。

Apache Kafka是一个发布-订阅消息系统。消息系统允许您在进程、应用程序和服务器之间发送消息。广义上讲,Apache Kafka 是一种可以定义和进一步处理主题(主题可能是类别)的软件。应用程序可以连接到此系统并将消息传输到主题上。消息可以包含任何类型的信息,来自您的个人博客上的任何事件,也可以是触发任何其他事件的非常简单的文本消息。在这里,我们将讨论如何使用来自 Kafka 主题的消息并使用 Spring Boot 将它们显示在控制台中,其中Kafka 是先决条件。 

例子:

先决条件:确保您已经在本地机器上安装了 Apache Kafka,因此您应该知道如何在 Windows 上安装和运行 Apache Kafka?

步骤 1:转到此链接并创建一个 Spring Boot 项目。将“ Spring for Apache Kafka ”依赖项添加到您的 Spring Boot 项目中。 

步骤2:创建一个名为KafkaConfig的配置文件。以下是KafkaConfig.java文件的代码。

  • Java

// Java Program to Illustrate Kafka Configuration

  

package com.amiya.kafka.apachekafkaconsumer.config;

  

// Importing required classes

import java.util.HashMap;

import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.apache.kafka.common.serialization.StringDeserializer;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.kafka.annotation.EnableKafka;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;

import org.springframework.kafka.core.ConsumerFactory;

import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

  

// Annotations

@EnableKafka

@Configuration

  

// Class

public class KafkaConfig {

  

    @Bean

    public ConsumerFactory<String, String> consumerFactory()

    {

  

        // Creating a Map of string-object pairs

        Map<String, Object> config = new HashMap<>();

  

        // Adding the Configuration

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,

                   "127.0.0.1:9092");

        config.put(ConsumerConfig.GROUP_ID_CONFIG,

                   "group_id");

        config.put(

            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

            StringDeserializer.class);

        config.put(

            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,

            StringDeserializer.class);

  

        return new DefaultKafkaConsumerFactory<>(config);

    }

  

    // Creating a Listener

    public ConcurrentKafkaListenerContainerFactory

    concurrentKafkaListenerContainerFactory()

    {

        ConcurrentKafkaListenerContainerFactory<

            String, String> factory

            = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory());

        return factory;

    }

}

步骤 3:创建名为KafkaConsumer的消费者文件

  • Java

// Java Program to Illustrate Kafka Consumer

  

package com.amiya.kafka.apachekafkaconsumer.consumer;

  

// Importing required classes

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.stereotype.Component;

  

@Component

  

// Class

public class KafkaConsumer {

  

    @KafkaListener(topics = "NewTopic",

                   groupId = "group_id")

  

    // Method

    public void

    consume(String message)

    {

        // Print statement

        System.out.println("message = " + message);

    }

}

步骤 4:现在我们必须做以下事情才能使用 Spring Boot 从 Kafka 主题消费消息

  • 运行 Apache Zookeeper 服务器
  • 运行 Apache Kafka 服务器
  • 从 Kafka 主题发送消息

使用此命令运行 Apache Zookeeper 服务器

C:\kafka>.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

类似地,使用此命令运行 Apache Kafka 服务器

C:\kafka>.\bin\windows\kafka-server-start.bat .\config\server.properties

运行以下命令从 Kafka 主题发送消息

C:\kafka>.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic NewTopic

步骤 5:现在运行你的 Spring Boot 应用程序。确保已在application.properties文件中更改了端口号

server.port=8081

让我们在 ApacheKafkaConsumerApplication 文件中运行 Spring Boot 应用程序

输出:在输出中,您可以看到当您从 Kafka 主题发送消息时,它会实时显示在控制台上。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/2548.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

网络安全——常用语及linux系统

一、网络安全概念及法规 网络安全&#xff1a;网络空间安全 cyber security 信息系统&#xff1a;由计算机硬件、网络和通信设备、计算机软件、信息资源、信息用户和规章制度组成的已处理信息流为目的的人机一体化系统 信息系统安全三要素&#xff08;CIA&#xff09; 保密…

Windows 正确配置android adb调试的方法

下载适用于 Windows 的 SDK Platform-Tools https://developer.android.google.cn/tools/releases/platform-tools?hlzh-cn 设置系统变量&#xff0c;路径为platform-tools文件夹的绝对路径 点击Path添加环境变量 %adb%打开终端输入adb shell 这就成功了&#xff01;

如何优化Elasticsearch大文档查询?

记录一次业务复杂场景下DSL优化的过程 背景 B端商城业务有一个场景就是客户可见的产品列表是需要N多闸口及各种其它逻辑组合过滤的&#xff0c;各种闸口数据及产品数据都是存储在ES的(有的是独立索引&#xff0c;有的是作为产品属性存储在产品文档上)。 在实际使用的过程中&a…

使用 WPF 和 C# 将纹理应用于三角形

此示例展示了如何将纹理应用于三角形,以使场景比覆盖纯色的场景更逼真。以下是为三角形添加纹理的基本步骤。 创建一个MeshGeometry3D对象。像往常一样定义三角形的点和法线。通过向网格的TextureCoordinates集合添加值来设置三角形的纹理坐标。创建一个使用想要显示的纹理的 …

探索 Transformer²:大语言模型自适应的新突破

目录 一、来源&#xff1a; 论文链接&#xff1a;https://arxiv.org/pdf/2501.06252 代码链接&#xff1a;SakanaAI/self-adaptive-llms 论文发布时间&#xff1a;2025年1月14日 二、论文概述&#xff1a; 图1 Transformer 概述 图2 训练及推理方法概述 图3 基于提示的…

Android Studio历史版本包加载不出来,怎么办?

为什么需要下载历史版本呢&#xff1f; 虽然官网推荐使用最新版本&#xff0c;但是最新版本如果自己碰到问题&#xff0c;根本找不到答案&#xff0c;所以博主这里推荐使用历史版本&#xff01;&#xff01;&#xff01; Android Studio历史版本包加载不出来&#xff1f; 下…

citrix netscaler13.1 重写负载均衡响应头(基础版)

在 Citrix NetScaler 13.1 中&#xff0c;Rewrite Actions 用于对负载均衡响应进行修改&#xff0c;包括替换、删除和插入 HTTP 响应头。这些操作可以通过自定义策略来完成&#xff0c;帮助你根据需求调整请求内容。以下是三种常见的操作&#xff1a; 1. Replace (替换响应头)…

STM32 FreeRTOS移植

目录 FreeRTOS源码结构介绍 获取源码 1、 官网下载 2、 Github下载 源码结构介绍 源码整体结构 FreeRTOS文件夹结构 Source文件夹结构如下 portable文件夹结构 RVDS文件夹 MemMang文件夹 FreeRTOS在基于寄存器项目中移植步骤 目录添加源码文件 工程添加源码文件 …

[Qt]常用控件介绍-按钮类控件-QPushButton、QRedioButton、QCheckBox、QToolButton控件

目录 1.QPushButton按钮 介绍 属性 Demo&#xff1a;键盘方向键控制人物移动 2.Redio Button按钮 属性 clicked、pressed、released、toggled区别 单选按钮的分组 Demo&#xff1a;点餐小程序 3.CheckBox按钮 属性 Demo&#xff1a;获取今天的形成计划 4.ToolBu…

SpringBoot链接Kafka

一、SpringBoot生产者 &#xff08;1&#xff09;修改SpringBoot核心配置文件application.propeties, 添加生产者相关信息 # 连接 Kafka 集群 spring.kafka.bootstrap-servers192.168.134.47:9093# SASL_PLAINTEXT 和 SCRAM-SHA-512 认证配置 spring.kafka.properties.securi…

zerotier搭建虚拟局域网,自建planet

基于该开源项目 自建planet节点&#xff0c;更快速&#xff0c;更安全 本教程依据docker-zerotier-planet 项目文档书写&#xff0c;并以linux(centos 7)和windows作为示例&#xff0c;需要其他系统配置方法&#xff0c;可移步项目文档 一. 前置资源 具有外网ip的服务器 后面…

计算机网络 (44)电子邮件

一、概述 电子邮件&#xff08;Electronic Mail&#xff0c;简称E-mail&#xff09;是因特网上最早流行的应用之一&#xff0c;并且至今仍然是因特网上最重要、最实用的应用之一。它利用计算机技术和互联网&#xff0c;实现了信息的快速、便捷传递。与传统的邮政系统相比&#…

《机器学习》——DBSCAN算法

文章目录 DBSCAN算法简介DBSCAN算法原理核心概念聚类过程 DBSCAN模型模型API主要参数其他参数 DBSCAN算法实例实例步骤导入所需库导入数据文件传入变量DBSCAN聚类分析添加数据进原数据框对聚类结果进行评分 DBSCAN算法简介 DBSCAN&#xff08;Density - Based Spatial Cluster…

【2024年华为OD机试】 (C卷,100分)- 用连续自然数之和来表达整数(Java JS PythonC/C++)

一、问题描述 题目描述 一个整数可以由连续的自然数之和来表示。 给定一个整数&#xff0c;计算该整数有几种连续自然数之和的表达式&#xff0c;且打印出每种表达式。 输入描述 一个目标整数T (1 <T< 1000) 输出描述 该整数的所有表达式和表达式的个数。 如果有…

Redis--21--大Key问题解决方案

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言Redis--20--大Key问题解析 一、如何发现Redis大Key1. 使用Redis命令行工具**MEMORY USAGE****RANDOMKEY****DEBUG OBJECT****SCAN命令****redis-cli 工具&#…

[操作系统] 深入理解约翰·冯·诺伊曼体系

约翰冯诺依曼&#xff08;John von Neumann&#xff0c;1903年12月28日—1957年2月8日&#xff09;&#xff0c;原名诺伊曼亚诺什拉约什&#xff08;Neumann Jnos Lajos&#xff09;&#xff0c;出生于匈牙利的美国籍犹太人数学家&#xff0c;20世纪最重要的数学家之一&#xf…

OpenCV实现Kuwahara滤波

Kuwahara滤波是一种非线性的平滑滤波技术&#xff0c;其基本原理在于通过计算图像模板中邻域内的均值和方差&#xff0c;选择图像灰度值较为均匀的区域的均值来替代模板中心像素的灰度值。以下是Kuwahara滤波的详细原理说明&#xff1a; 一、基本思想 Kuwahara滤波的基本思想…

vue项目引入阿里云svg资源图标

1&#xff1a;生成svg图标 登录阿里云官网 1.1 创建项目组 1.2 从阿里云网站上面获取喜欢的图标加入到已有的项目组 1.3 如果团队有自己的设计师&#xff0c;也可以让设计师上传自己的svg图标到阿里云指定的项目组&#xff1b; 使用的时候&#xff0c;把 资源包下载到本地项…

软件测试 —— 自动化测试(Selenium)

软件测试 —— 自动化测试&#xff08;Selenium&#xff09; 什么是SeleniumPython安装Selenium1.安装webdirver-manager2.安装Selenium 写一个简单用例CSS_SELECTOR和XPATH浏览器快速定位页面元素浏览器的前进&#xff08;forward&#xff09;&#xff0c;后退&#xff08;bac…

新垂直电商的社交传播策略与AI智能名片2+1链动模式S2B2C商城小程序的应用探索

摘要&#xff1a;随着互联网技术的不断进步和电商行业的快速发展&#xff0c;传统电商模式已难以满足消费者日益增长的个性化和多元化需求。新垂直电商在此背景下应运而生&#xff0c;通过精准定位、用户细分以及深度社交传播策略&#xff0c;实现了用户群体的快速裂变与高效营…