kafka安装和使用的入门教程

这篇文章简单介绍如何在ubuntu上安装kafka,并使用kafka完成消息的发送和接收。

一、安装kafka

访问kafka官网Apache Kafka,然后点击快速开始

紧接着,点击Download

最后点击下载链接下载安装包

如果下载缓慢,博主已经把安装包上传到百度网盘:

链接:https://pan.baidu.com/s/1nZ1duIt64ZVUsimaQ1meZA?pwd=3aoh
提取码:3aoh
--来自百度网盘超级会员V3的分享

二、启动kafka

经过上一步下载完成后,按照页面的提示启动kafka

1、通过远程连接工具,如finalshell、xshell上传kafka_2.13-3.6.0.tgz到服务器上的usr目录

2、切换到usr目录,解压kafka_2.13-3.6.0.tgz

cd /usrtar -zxzf kafka_2.13-3.6.0.tgz

3、启动zookeeper

修改配置文件confg/zookeeper.properties,修改一下数据目录

dataDir=/usr/local/zookeeper

然后通过以下命令启动kafka自带的zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

4、启动kafka

修改配置文件confg/server.properties,修改一下kafka保存日志的目录

log.dirs=/usr/local/kafka/logs

然后新开一个连接窗口,通过以下命令启动kafka

bin/kafka-server-start.sh config/server.properties

三、kafka发送、接收消息

创建topic

bin/kafka-topics.sh --create --topic hello --bootstrap-server localhost:9092

生产消息

往刚刚创建的topic里发送消息,可以一次性发送多条消息,点击Ctrl+C完成发送

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello

消费消息

消费最新的消息

新开一个连接窗口,在命令行输入以下命令拉取topic为hello上的消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello

消费之前的消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic hello

指定偏移量消费

 指定从第几条消息开始消费,这里--offset参数设置的偏移量是从0开始的。

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --partition 0 --offset 1 --topic hello

消息的分组消费
每个消费者都可以指定一个消费者组, kafka 中的同一条消息,只能被同一个消费者组下的某一个消费 者消费。而不属于同一个消费者组的其他消费者,也可以消费到这一条消息。
通过以下命令在启动消费者时设置分组:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=helloGroup --topic hello

四、Java中使用kafka

通过maven官网搜索kafka的maven依赖版本

https://central.sonatype.com/search?q=kafkaicon-default.png?t=N7T8https://central.sonatype.com/search?q=kafka然后通过IntelliJ IDEA创建一个maven项目kafka,在pom.xml中添加kafka的依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>kafka</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId><version>3.6.0</version></dependency></dependencies>
</project>

创建消息生产者

生产者工厂类
package producer;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;import java.util.Properties;/*** 消息生产者工厂类* @author heyunlin* @version 1.0*/
public class MessageProducerFactory {private static final String BOOTSTRAP_SERVERS = "192.168.254.128:9092";public static Producer<String, String> getProducer() {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");return new KafkaProducer<>(props);}}

测试发送消息
package producer;import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;/*** @author heyunlin* @version 1.0*/
public class MessageProducer {private static final String TOPIC = "hello";public static void main(String[] args) {ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, "1", "Message From Producer.");Producer<String, String> producer = MessageProducerFactory.getProducer();// 同步发送消息producer.send(record);// 异步发送消息producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {String topic = recordMetadata.topic();long offset = recordMetadata.offset();int partition = recordMetadata.partition();String message = recordMetadata.toString();System.out.println("topic = " + topic);System.out.println("offset = " + offset);System.out.println("message = " + message);System.out.println("partition = " + partition);}});// 加上这行代码才会发送消息producer.close();}}

创建消息消费者

消费者工厂类
package consumer;import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;/*** 消息生产者工厂类* @author heyunlin* @version 1.0*/
public class MessageConsumerFactory {private static final String BOOTSTRAP_SERVERS = "192.168.254.128:9092";public static Consumer<String, String> getConsumer() {//PART1:设置发送者相关属性Properties props = new Properties();//kafka地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//每个消费者要指定一个groupprops.put(ConsumerConfig.GROUP_ID_CONFIG, "helloGroup");//key序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//value序列化类props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<>(props);}}

测试消费消息
package consumer;import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;import java.time.Duration;
import java.util.Collections;/*** @author heyunlin* @version 1.0*/
public class MessageConsumer {private static final String TOPIC = "hello";public static void main(String[] args) {Consumer<String, String> consumer = MessageConsumerFactory.getConsumer();consumer.subscribe(Collections.singletonList(TOPIC));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));for (ConsumerRecord<String, String> record : records) {System.out.println(record.key() + ": " + record.value());}// 提交偏移量,避免消息重复推送consumer.commitSync(); // 同步提交// consumer.commitAsync(); // 异步提交}}}

五、springboot整合kafka

开始前的准备工作

然后通过IntelliJ IDEA创建一个springboot项目springboot-kafka,在pom.xml中添加kafka的依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

然后修改application.yml,添加kafka相关配置

spring:kafka:bootstrap-servers: 192.168.254.128:9092producer:acks: 1retries: 3batch-size: 16384properties:linger:ms: 0buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: helloGroupenable-auto-commit: falseauto-commit-interval: 1000auto-offset-reset: latestproperties:request:timeout:ms: 18000session:timeout:ms: 12000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

创建消息生产者

package com.example.springboot.kafka.producer;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;/*** @author heyunlin* @version 1.0*/
@RestController
@RequestMapping(path = "/producer", produces = "application/json;charset=utf-8")
public class KafkaProducer {private final KafkaTemplate<String, Object> kafkaTemplate;@Autowiredpublic KafkaProducer(KafkaTemplate<String, Object> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}@RequestMapping(value = "/sendMessage", method = RequestMethod.GET)public String sendMessage(String message) {kafkaTemplate.send("hello", message);return "发送成功~";}}

创建消息消费者

package com.example.springboot.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** @author heyunlin* @version 1.0*/
@Component
public class KafkaConsumer {@KafkaListener(topics = "hello")public void receiveMessage(ConsumerRecord<String, String> record) {String topic = record.topic();long offset = record.offset();int partition = record.partition();System.out.println("topic = " + topic);System.out.println("offset = " + offset);System.out.println("partition = " + partition);}}

然后访问网址http://localhost:8080/producer/sendMessage?message=hello往topic为hello的消息队列发送消息。控制台打印了参数,成功监听到发送的消息。

 

文章涉及的项目已经上传到gitee,按需获取~

Java中操作kafka的基本项目icon-default.png?t=N7T8https://gitee.com/he-yunlin/kafka.git

springboot整合kafka案例项目icon-default.png?t=N7T8https://gitee.com/he-yunlin/springboot-kafka.git

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/161905.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

旧手机热点机改造成服务器方案

如果你也跟我一样有这种想法, 那真的太酷了!!! ok,前提是得有root,不然体验大打折扣 目录 目录 1.做一个能爬墙能走百度直连的热点机(做热点机用) 2.做emby视频服务器 3.做文件服务, 存取文件 4.装青龙面板,跑一些定时任务 5.做远程摄像头监控 6.做web服务器 7.内网穿…

基于springboot的高校科研管理系统(源码+调试+LW)

项目描述 临近学期结束&#xff0c;还是毕业设计&#xff0c;你还在做java程序网络编程&#xff0c;期末作业&#xff0c;老师的作业要求觉得大了吗?不知道毕业设计该怎么办?网页功能的数量是否太多?没有合适的类型或系统?等等。这里根据你想解决的问题&#xff0c;今天给…

2023年全网最新 Windows10 搭建 Python 环境教程

目录 一、查看计算机操作系统的位数二、安装Python2.1 下载Python安装包2.2 在Windows 64位系统中安装Python2.3 测试Python是否安装成功 三、Windows环境下安装第2个Python(不同版本) ----不需要安装多个Python版本的读者此小节可以忽略 一、查看计算机操作系统的位数 目前&a…

手机通过WiFi连接调试UR机器人

1.测试物料 1.1ur机器人 https://item.taobao.com/item.htm?spma1z10.1-c.w4004-25069442759.18.2ff56d6bmuxX0Z&id740002623764 1.2 路由器&#xff08;TPLINK&#xff09; https://detail.tmall.com/item.htm?abbucket7&id548610924784&ns1&spma21n57.1.…

基于Java的留学生交流互动论坛系统设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言具体实现截图论文参考详细视频演示为什么选择我自己的网站自己的小程序&#xff08;小蔡coding&#xff09; 代码参考数据库参考源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、博客专家、CSDN新星计划导师、全栈领域优质创作者&am…

OJ项目——用户的登录拦截,我是如何实现的?

目录 前言 1、关于Session该如何处理 简单session回顾&#xff1a; 回顾session的setAttribute、getAttribute : 项目中如何做&#xff1f; 2、登陆拦截器实现 自定义拦截器&#xff1a; 自定义拦截&#xff1a; 前言 博主之前也有出过一期关于拦截器的&#xff0c;大…

Stable Diffusion WebUI几种解决手崩溃的方法

1. 添加与手相关负面提示词 如何提价提示词呢? 首先有一个embeddings模型文件bad-hands-5,我们可以去各个大模型网站去搜,我是在C站上面下载的。 附上C站地址:https://civitai.com/ 下载好之后,你需要将文件放入stable-diffusion-webui\embeddings目录中。位置如下所示…

论文阅读之《Learn to see in the dark》

Learning to See in the Dark-CVPR2018 Chen ChenUIUC&#xff08;伊利诺伊大学厄巴纳-香槟分校&#xff09; Qifeng Chen, Jia Xu, Vladlen Koltun Intel Labs(英特尔研究院) 文章链接&#xff1a;https://arxiv.org/pdf/1805.01934.pdfhttps://arxiv.org/pdf/1805.01934.p…

从Flink的Kafka消费者看算子联合列表状态的使用

背景 算子的联合列表状态是平时使用的比较少的一种状态&#xff0c;本文通过kafka的消费者实现来看一下怎么使用算子列表联合状态 算子联合列表状态 首先我们看一下算子联合列表状态的在进行故障恢复或者从某个保存点进行扩缩容启动应用时状态的恢复情况 算子联合列表状态主…

UE5 Python脚本自动化Sequence Key帧

前言 码上1024了&#xff0c;给大家分享一个UE5的脚本小功能&#xff0c;UE5中Sequence动态Key功能&#xff0c;这样我们就可以根据我们的数据动态更新了&#xff0c;非常实用&#xff0c;适合刚入门或者小白&#xff0c;接下来我就把整个过程分享给大家。 过程 新建一个工程…

【微信小程序开发】自定义组件以及页面布局设计

&#x1f973;&#x1f973;Welcome Huihuis Code World ! !&#x1f973;&#x1f973; 接下来看看由辉辉所写的关于小程序的相关操作吧 目录 &#x1f973;&#x1f973;Welcome Huihuis Code World ! !&#x1f973;&#x1f973; 一.自定义组件的使用步骤&#xff08;附实…

Sandboxie+Buster Sandbox Analyzer打造个人沙箱

一、运行环境和需要安装的软件 实验环境&#xff1a;win7_x32或win7_x64 用到的软件&#xff1a;WinPcap_4_1_3.exe、Sandboxie-3-70.exe、Buster Sandbox Analyzer 重点是Sandboxie必须是3.70版本。下载地址&#xff1a;https://github.com/sandboxie-plus/sandboxie-old/blo…

制作linux系统内部yum源仓库

需求说明 制作内网linux系统yum源仓库&#xff0c;比较简单的方式就是添加系统镜像&#xff0c;此种yum配置方式可参考文章 https://blog.csdn.net/d1240673769/article/details/108477661 如果无法提供系统镜像&#xff0c;那该如何创建内网的yum源仓库呢&#xff1f;本文提…

HarmonyOS 音视频开发概述

在音视频开发指导中&#xff0c;将介绍各种涉及音频、视频播放或录制功能场景的开发方式&#xff0c;指导开发者如何使用系统提供的音视频 API 实现对应功能。比如使用 TonePlayer 实现简单的提示音&#xff0c;当设备接收到新消息时&#xff0c;会发出短促的“滴滴”声&#x…

JMeter 随机数生成器简介:使用 Random 和 UUID 算法

在压力测试中&#xff0c;经常需要生成随机值来模拟用户行为。JMeter 提供了多种方式来生成随机值&#xff0c;本文来具体介绍一下。 随机数函数 JMeter 提供了多个用于生成随机数的函数&#xff0c;其中最常用的是 __Random 函数。该函数可以生成一个指定范围内的随机整数或…

JAVA转GO

GO 环境配置 go环境 下载go并安装(win下),环境变量他自己要配置上 https://dl.google.com/go/go1.21.3.windows-amd64.msi 验证是否安装成功: //打开cmd go versionVSCODE环境 下载VSCODE…略 配置VSCODE的环境 下载插件 go开发工具包 打开cmd,或者VSCODE自带的终端,…

IDEA使用http client无法识别http-client.env.json的环境配置

http-client.env.json的配置 {"dev": {"baseUrl": "http://192.168.60.176:9160","accessToken": "eyJhbPNOQ"} }选择不到环境 问题原因&#xff1a; 安装了Alibaba Cloud Toolkit插件后&#xff0c;被Alibaba Cloud ROS …

[初始java]——java为什么这么火,java如何实现跨平台、什么是JDK/JRE/JVM

java的名言&#xff1a; ”一次编译、到处运行“ 一、编译语言与解释语言 编译&#xff1a; 是将整份源代码转换成机器码再进行下面的操作&#xff0c;最终形成可执行文件 解释&#xff1a; 是将源代码逐行转换成机器码并直接执行的过程&#xff0c;不需要生成目标文件 jav…

springboot 导出word模板

一、安装依赖 <dependency><groupId>com.deepoove</groupId><artifactId>poi-tl</artifactId><version>1.12.1</version></dependency>二、定义工具类 package com.example.springbootmp.utils;import com.deepoove.poi.XWP…

如何加入开源项目维护并提交代码?本地搭建源码阅读开发构建环境示例: kafka

如何加入开源项目维护并提交代码?本地搭建源码阅读开发构建环境示例: kafka。 大家对开源项目有兴趣、想成为committer,或者工作需要,会从github上获取最新的开源项目源码。本文做一个示例,怎样搭建本地的源码阅读、开发、构建环境。 首先,在github上找到项目的链接,…