在Nodejs中使用kafka(一)安装使用

安装 

方法一、使用docker-compose安装

1、创建docker-compose.yml文件。

services:zookeeper:image: docker.io/bitnami/zookeeper:3.9ports:- "2181:2181"volumes:- "./data/zookeeper:/bitnami"environment:- ALLOW_ANONYMOUS_LOGIN=yeskafka:image: docker.io/bitnami/kafka:3.4ports:- "9092:9092"volumes:- "./data/kafka:/bitnami"environment:- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092  # 关键配置- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=truedepends_on:- zookeepervolumes:zookeeper_data:driver: localkafka_data:driver: local

2、在文件所在位置运行下面命令

docker compose up -d

方法二、 

直接下载kafka,zookeeper。(注:CentOS环境 )  

java安装:yum install java-17-openjdk-devel 

kafka下载地址:Apache Kafka

zookeeper下载地址:Apache ZooKeeper

 下载好后选择路径直接解压即可。

Kafka 的核心功能

1.消息队列:Kafka 最初被设计为一个高吞吐量、分布式的消息队列系统,用于处理大规模的消息流。

2.持久化:Kafka 不仅仅是一个消息队列,它还提供消息的持久化功能,数据存储在磁盘中,这使得 Kafka 适合用于长期的数据存储和分析。

3.高吞吐量和低延迟:Kafka 设计上非常注重高吞吐量和低延迟,支持每秒钟处理百万级别的消息。

4.分布式系统:Kafka 是一个分布式系统,能够跨多台机器进行水平扩展,支持高可用性和容错。

5.流处理:除了消息传递,Kafka 还提供了流处理 API(例如 Kafka Streams),使得用户能够在消息流中进行实时计算和分析。

 Kafka 的核心组件

1.Producer(生产者)

  • Producer 是负责将消息发送到 Kafka 的客户端应用程序。
  • 生产者将消息发布到 Kafka 的特定 Topic(主题)中。
  • 生产者可以选择将消息发布到一个或多个 Kafka 分区中。

2.Consumer(消费者) 

  • Consumer 是从 Kafka 中读取消息的客户端应用程序。
  • 消费者订阅一个或多个 Topic,并从中读取消息。
  • Kafka 支持消费者组机制,消费者组中的多个消费者可以共同消费一个主题的消息,达到负载均衡和容错的效果。

3.Broker(代理)

  • Kafka Broker 是 Kafka 集群中的一个节点,负责存储消息并处理生产者和消费者的请求。
  • Kafka 集群通常由多个 Broker 组成,每个 Broker 负责存储特定的分区数据。
  • Kafka 的分布式架构保证了可扩展性和高可用性,所有的消息都会在多个 Broker 之间进行分布和复制。

4.Topic(主题)

  • Topic 是 Kafka 中的消息分类,用来组织消息。
  • 每个 Topic 可以有多个 Partition(分区),消息被写入到分区中,分区内的消息是有序的。
  • Kafka 支持将一个 Topic 的消息分布到多个分区中,允许高并发的生产者和消费者。

5.Partition(分区)

  • Partition 是 Topic 的实际存储单位,一个 Topic 可以有多个 Partition,每个 Partition 作为一个日志文件进行存储。
  • 分区保证了消息的顺序性,即同一个分区内的消息是有序的。
  • 分区的数量可以通过配置来控制,多个分区能够提供水平扩展和并行处理能力。

7. Zookeeper(协调者)(对于传统 Kafka 集群来说)

  • Zookeeper 是 Kafka 中的一个分布式协调组件,用于管理集群的元数据(如 Broker 的信息、Topic 和 Partition 的分配等)。
  • Zookeeper 保证 Kafka 集群中的 Broker 之间的协调和一致性。
  • 从 Kafka 2.8.0 开始,Kafka 引入了 KRaft 模式,它在未来版本中逐步摆脱了对 Zookeeper 的依赖,转而使用内置的 Raft 协议进行集群管理。

  

kafka基本命令

在解压的kafka根文件夹下运行

1、启动zookeeper

nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

2、启动kafka

nohup bin/kafka-server-start.sh config/server.properties &

3、 查看kafka,zookeeper启动进程

jps

4、 创建topic

bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092

5、 删除topic

bin/kafka-topics.sh --delete --topic test --bootstrap-server localhost:9092

6、查看topic详情

bin/kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092

 7、查看创建的所有topic

bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

8、运行生产者

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test

9、消费者(默认接收实时消息)

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

10、从最开头开始消费

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

11、指定分区和位置开始消费

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset 2

12、创建消费者组,可以指定多个组,多个消费者接收消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group testGroup

13、查看消费者组详情

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup

示例 

kafkajs文档地址:Getting Started · KafkaJS 

producer.ts

import { Kafka } from 'kafkajs';async function run() {const kafka = new Kafka({clientId: 'test1',brokers: ['localhost:9092'],});const producer = kafka.producer();await producer.connect();for (let i = 1; i <= 10; ++i) {await producer.send({topic: 'topic1',messages: [{ value: `hello kafka${i}` },],});}await producer.disconnect();
}run();

consumer.ts

import { Kafka } from 'kafkajs'const kafka = new Kafka({clientId: 'test1',brokers: ['localhost:9092'],connectionTimeout: 1000, // 1 秒连接超时
});const consumer = kafka.consumer({groupId: 'group1',
});await consumer.connect();await consumer.subscribe({ topic: 'topic1',fromBeginning: true, // 从头开始消费
});await consumer.run({eachMessage: async ({ topic, partition, message }) => {console.log(message.value?.toString());},
});// consumer.seek({ topic: 'topic1', partition: 0, offset: '2' });

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/18984.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CRISPR spacers数据库;CRT和PILER-CR用于MAGs的spacers搜索

iPHoP&#xff1a;病毒宿主预测-CSDN博客 之前介绍了这个方法来预测病毒宿主&#xff0c;今天来介绍另一种比较用的多的方法CRISPR比对 CRISPR spacers数据库 Dash 在这可以下载作者搜集的spacers用于后期比对 CRT和PILER-CR 使用 CRT 和 PILERCR 识别 CRISPR 间隔区&#x…

深入理解Java的 JIT(即时编译器)

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家&#xff0c;历代文学网&#xff08;PC端可以访问&#xff1a;https://literature.sinhy.com/#/literature?__c1000&#xff0c;移动端可微信小程序搜索“历代文学”&#xff09;总架构师&#xff0c;15年工作经验&#xff0c;…

LabVIEW开发CANopen紧急对象读取

本示例展示了如何通过CANopen协议读取设备的紧急对象&#xff08;Emergency object&#xff09;。紧急对象用于报告设备发生故障或异常情况时的紧急信息。通过该示例&#xff0c;用户可以配置并读取设备发送的紧急消息&#xff0c;确保在设备发生紧急状况时能够及时响应。 主要…

DeepSeek官方推荐的AI集成系统

DeepSeek模型虽然强大先进&#xff0c;但是模型相当于大脑&#xff0c;再聪明的大脑如果没有输入输出以及执行工具也白搭&#xff0c;所以需要有配套工具才能让模型发挥最大的作用。下面是一个典型AI Agent架构图&#xff0c;包含核心组件与数据流转关系&#xff1a; #mermaid-…

【第13章:自监督学习与少样本学习—13.4 自监督学习与少样本学习的未来研究方向与挑战】

凌晨三点的实验室里,博士生小张盯着屏幕上的训练曲线——他设计的跨模态少样本学习模型在医疗影像诊断任务上突然出现了诡异的性能断崖。前一秒还在92%的准确率高位运行,下一秒就暴跌到47%。这个看似灾难性的现象,却意外揭开了自监督学习与少样本学习技术深藏的核心挑战… 一…

论文解读之DeepSeek R1

今天带来DeepSeek R1的解读 一、介绍 deepseek主打复杂推理任务&#xff0c;如数学、代码任务。 R1以预训练过的V1-base初始化&#xff0c;主要发挥了RL在长思维链上的优势&#xff0c;R1-Zero直接RL而在前置步骤中不进行SFT&#xff0c;即缺少了有监督的指令微调阶段&#…

【Java学习】类和对象

目录 一、选择取块解 二、类变量 三、似复刻变量 四、类变量的指向对象 五、变量的解引用访问 1.new 类变量(参) 2.this(参) 3.类变量/似复刻变量. 六、代码块 七、复制变量的赋值顺序 八、访问限定符 1.private 2.default 九、导类 一、选择取块解 解引用都有可以…

使用css实现镂空效果

前言&#xff1a; 最近在公司完成小程序的新手引导中遇到了要将蒙层挖空&#xff0c;漏出后面内容的功能&#xff0c;找了各种资料之后&#xff0c;发现了一种就使用几行css代码就实现这个效果的方式&#xff0c;在这里分享给各位小伙伴们。 功能描述&#xff1a;实现下图的镂…

15.1 Process(进程)类

版权声明&#xff1a;本文为博主原创文章&#xff0c;转载请在显著位置标明本文出处以及作者网名&#xff0c;未经作者允许不得用于商业目的。 通常开发时想要获得进程是比较困难的事&#xff0c;必须要调用CreateToolhelpSnapshot、ProcessFirst、ProcessNext等API或者诸如 Zw…

【全栈开发】----Mysql基本配置与使用

本篇是在已下载Mysql的情况下进行的&#xff0c;若还未下载或未创建Mysql服务&#xff0c;请转到这篇: 2024 年 MySQL 8.0.40 安装配置、Workbench汉化教程最简易&#xff08;保姆级&#xff09;_mysql8.0.40下载安装教程-CSDN博客 本文对于mysql的操作均使用控制台sql原生代码…

数据恢复-01-机械硬盘的物理与逻辑结构

磁盘存储原理 磁盘存储数据的原理&#xff1a; 磁盘存储数据的原理是利用磁性材料在磁场作用下的磁化性质&#xff0c;通过在磁盘表面上划分成许多小区域&#xff0c;根据不同的磁化方向来表示0和1的二进制数据&#xff0c;通过读写磁头在磁盘上的移动&#xff0c;可以实现数据…

神经网络新手入门(3)光明顶复出(2006-2012)

让我们继续这场科技江湖的传奇&#xff0c;见证神经网络如何从寒冬中涅槃重生&#xff1a; 第五章&#xff1a;光明顶复出&#xff08;2006-2012&#xff09; 2006年&#xff0c;江湖人称"深度学习教主"的辛顿&#xff08;Geoffrey Hinton&#xff09;闭关修炼二十…

【C++】基础入门(详解)

&#x1f31f; Hello&#xff0c;我是egoist2023&#xff01; &#x1f30d; 种一棵树最好是十年前&#xff0c;其次是现在&#xff01; 目录 输入&输出 缺省参数(默认参数) 函数重载 引用 概念及定义 特性及使用 const引用 与指针的关系 内联inline和nullptr in…

【2025最新版】软件测试面试题总结(150道题含答案解析)

接口测试面试题 1&#xff1a;你平常做接口测试的过程中发现过哪些 bug? 2&#xff1a;平常你是怎么测试接口的&#xff1f; 3&#xff1a;平常用什么工具测接口? 4: webService 接口是如何测试的? 5&#xff1a;没有接口文档&#xff0c;如何做接口测试&#xff1f; 6&…

使用EVE-NE-锐捷实现NAT+ACL服务限制

一、项目拓扑 二、项目实现 1.NET配置 点击左侧的NetWorks,设置与图相同的配置&#xff0c;实现实验环境桥接到物理网络 2.GW配置 进入特权模式 enable进入全局模式 configure terminal 更改名称为GW hostname GW进入g0/0接口 interface g0/0将g0/0接口IP地址配置为192.168.…

nginx 实战配置

一、配置一个默认80端口的&#xff0c;静态页面&#xff0c;路径是path1。 http://192.168.0.111/path1 &#xff0c; /path1路径指向linux的/data/index1.html vi /data/nginx-1.24.0/conf/nginx.conf 文件添加以下配置 location /path1 { alias /data/…

kubekey一键部署k8s高可用与kubesphere

kubekey一键安装k8s与kubesphere还是蛮方便的&#xff0c;kubesphere官网上面也提到了高可用安装的一些事宜&#xff0c;但是没有涉及到kubesphere资深的redis的系统的部署问题&#xff0c;本文简单给出对应配置&#xff0c;其实这个配置在kubephere的cluster-configuration.ya…

怎么使用服务器运行pySCENIC

前言 我们注意到在其他的一些论坛&#xff0c;有一些用户反馈&#xff0c;在服务器上面运行pyscenic不太顺畅。本文我们整理了在服务器上运行pyscenic的三个方法供大家参考&#xff0c;分别是conda安装pyscenic运行、arboreto_with_multiprocessing运行、容器化运行。总的来说&…

QEMU 搭建arm linux开发环境

Qemu 作为一款强大的开源虚拟化软件&#xff0c;为我们提供了一个便捷且经济实惠的方式来模拟各种硬件环境&#xff0c;从而在上面安装和学习 Linux 系统。本文将详细介绍如何使用 Qemu 搭建 Linux 学习环境&#xff0c; 环境准备 操作系统&#xff1a;建议使用 Ubuntu 20.04…

坐井说天阔---DeepSeek-R1

前言 DeepSeek-R1这么火&#xff0c;虽然网上很多介绍和解读&#xff0c;但听人家的总不如自己去看看原论文。于是花了大概一周的时间&#xff0c;下班后有进入了研究生的状态---读论文。 DeepSeek这次的目标是探索在没有任何监督数据的情况下训练具有推理能力的大模型&#…