简单实现,在nodejs中简单使用kafka

什么是 Kafka

Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统

Kafka 的基本术语

消息:Kafka 中的数据单元被称为消息,也被称为记录,可以把它看作数据库表中某一行的记录。

批次:为了提高效率, 消息会分批次写入 Kafka,批次就代指的是一组消息。

主题:消息的种类称为 主题(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。主题就像是数据库中的表。

分区:主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的伸缩性,单一主题中的分区有序,但是无法保证主题中所有的分区有序

一、本地简单搭建kafka

1.1 下载并解压2.8.1版本 或者其他版本(点击下载:Apache Kafka)

下载后,解压到指定文件夹,并创建两个文件夹以后使用

1.2修改(如下图)配置文件

1.config下的zookeeper.properties,修改为刚才创建data的真实路径

2.config下的server.properties,修改为刚才创建kafka-log的真实路径

1.3启动项目 顺序启动

1.启动zookeeper

到解压文件夹下执行:不要关闭窗口

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties

2.启动kafka

到解压文件夹下执行:不要关闭窗口

.\bin\windows\kafka-server-start.bat .\config\server.properties

3.创建一个为test的topic

不要关闭窗口

.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1

4.创建生产者

到解压文件夹下执行:

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test1


5.创建消费者

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test1 --from-beginning

6.测试一下

生产者发消息后消费者是否可以收到

二.在nodejs中使用kafka

2.1安装kafka-node
命令 npm install kafka-node

2.2 创建生产者
import kafka  = require('kafka-node');
var client = new kafka.KafkaClient({kafkaHost:'127.0.0.1:9092'});
var Producer = kafka.Producer;
var topic = 'test1';
var payloads = [{  // 需要发送的一些配置信息
        topic: topic,
        messages: 'arrange'  // 需要生产的消息
    }];  // 此处必须要使用数组的形式,因为payloads的遍历采用的是foreach
producer.on('ready', function () {
   producer.send(payloads, function (err, data) {
      console.log(payloads);
      console.log("=======");
      console.log(data);
   });
})

producer.on('error', function (err) {
  console.log('error', err);
});

2.3创建消费者

import kafka  = require('kafka-node');
var client = new kafka.KafkaClient({kafkaHost:'127.0.0.1:9092'});
var topic = 'test1';
var payloads = [{  // 需要发送的一些配置信息
        topic: topic,
        messages: 'arrange'  // 需要生产的消息
    }];  // 此处必须要使用数组的形式,因为payloads的遍历采用的是foreach
var options = {  // 消费者的选择
    host: 'localhost:9092',
    sessionTimeout: 15000,
    autoCommit: true
};
var consumer = new kafka.Consumer(client, payloads, options);
consumer.on('message', function (message) {
    console.log(message);
});
 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/191782.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深入理解 Django 单元测试

概要 在现代软件开发流程中,单元测试是确保代码质量和可维护性的关键组成部分。对于使用 Django 框架的项目来说,Django 提供了一套强大的测试工具来帮助开发者编写和运行单元测试。本文将深入探讨 Django 中的单元测试,包括测试原理、编写测…

vue3 ref 与shallowRef reactive与shallowReactive

ref 给数据添加响应式,基本类型采用object.defineProperty进行数据劫持,对象类型是借助reactive 实现响应式,采用proxy 实现数据劫持,利用reflect进行源数据的操作 let country ref({count:20,names:[河南,山东,陕西],objs:{key…

19C进入数据库出现问号

问题情况如图所示: 解决方法: su - oracle echo "NLS_LANGAMERICAN_AMERICA.ZHS16GBK;export NLS_LANG" >> ~/.bash_profilesource ~/.bash_profileofile

《网络协议》05. 网络通信安全 · 密码技术

title: 《网络协议》05. 网络通信安全 密码技术 date: 2022-09-10 15:16:15 updated: 2023-11-12 07:03:52 categories: 学习记录:网络协议 excerpt: 网络通信安全(ARP 欺骗,DoS & DDoS,SYN 洪水攻击,LAND 攻击&a…

activiti7审批驳回,控制变量无法覆盖,导致无限循环驳回,流程无法结束

项目开发过程中使用工作流,因此考虑使用activiti7做完工作流引擎。项目开发过程中,发现流程驳回时,再次执行流程,控制变量无法覆盖,导致无限循环驳回,流程无法结束。流程图如下图所示: 驳回控制…

数据结构 栈(C语言实现)

目录 1.栈的概念及结构2.栈的代码实现 1.栈的概念及结构 栈:一种特殊的线性表,其只允许在固定的一端进行插入和删除元素操作。进行数据插入和删除操作的一端 称为栈顶,另一端称为栈底。栈中的数据元素遵守后进先出LIFO(Last In F…

【JavaEE】Servlet API 详解(HttpServletRequest类)

二、HttpServletRequest Tomcat 通过 Socket API 读取 HTTP 请求(字符串), 并且按照 HTTP 协议的格式把字符串解析成 HttpServletRequest 对象(内容和HTTP请求报文一样) 1.1 HttpServletRequest核心方法 1.2 方法演示 WebServlet("/showRequest&…

线性代数理解笔记

一.向量引入: 向量:只由大小和方向决定,不由位置决定。 二.向量加减法 向量的加法是首尾相连,减法是尾尾相连。 而向量v向量w为平行四边形主对角线。 向量v-向量w为平行四边形副对角线。 2.向量内积点乘(内积) 内积…

谈谈如何沟通

序言 如果你是对的,就要试着温和地、技巧地让对方同意你;如果你错了,就要迅速热忱地承认。这比为自己争辩有效和有趣的多。——卡耐基【美】 通过上篇文章谈谈如何写作(一),我们了解了如何表达的一些基本的…

国际阿里云:云服务器灾备方案!!!

保障企业业务稳定、IT系统功能正常、数据安全十分重要,可以同时保障数据备份与系统、应用容灾的灾备解决方案应势而生,且发展迅速。ECS可使用快照、镜像进行备份。 灾备设计 快照备份 阿里云ECS可使用快照进行系统盘、数据盘的备份。目前,阿…

【10套模拟】【3】

关键字: 物理存储、完全二叉树、出栈入栈时间复杂度、线索二叉树

Vue基础必备掌握知识点-Vue的指令系统讲解(二)

Vue指令系统继续讲解 v-for 作用:基于数据进行循环,多次渲染整个元素 数据类型:数组.对象.数字。。。 遍历数组语法:v-for"(item,index)" in 数组 item:表示每一项 index:则是表现下标 注意:v-for中的key值,key属性唯一的…

MyBatis解析全局配置文件

MyBatis解析全局配置文件 MyBaits基础应用: 文档:MyBatis 链接:http://note.youdao.com/noteshare?id5d41fd41d970f1af9185ea2ec0647b64 传统JDBC和Mybatis相比的弊病 传统JDBC ​ Connection conn null; PreparedStatement pstmt …

负公差轧钢测径仪 多规格可定制 普通智能随意选择

负公差轧制的意义: 轧钢厂生产的螺纹钢是按理论重量销,因此稳定的高负差产品极具市场竞争力。负差率即实际重量与理论重量的差值,除以理论重量,乘100%。以螺纹12为例,不按负差生产,在坯重2450kg的情况下&am…

通过Workstation工具制作CentOS8虚拟机模板

通过Workstation工具制作CentOS8虚拟机模板 1. 需求说明2. 安装模板虚拟机3. 配置模板虚拟机 1. 需求说明 说明:在做集群实验过程中,需要创建多台虚拟机,如果逐台安装虚拟机,很消耗时间,所以最简洁的办法就是通过模板克…

【无标题】通用工作站设计方案:ORI-D3R600服务器-多路PCIe3.0的双CPU通用工作站

ORI-D3R600服务器-多路PCIe3.0的双CPU通用工作站 一、机箱功能和技术指标: 系统 系统型号 ORI-SR630 主板支持 EEB(12*13)/CEB(12*10.5)/ATX(12*9.6)/Micro ATX 前置硬盘 最大支持8个3.5寸(兼容25寸)SATA硬盘 2*2.5(后置) 电源类型 CRPS元余电源&#xff0…

Python开源项目CodeFormer——人脸重建(Face Restoration),模糊清晰、划痕修复及黑白上色的实践

无论是自己、家人或是朋友、客户的照片,免不了有些是黑白的、被污损的、模糊的,总想着修复一下。作为一个程序员 或者 程序员的家属,当然都有责任满足他们的需求、实现他们的想法。除了这个,学习了本文的成果,或许你还…

JAVA毕业设计110—基于Java+Springboot+Vue的房屋租赁系统小程序(源码+数据库)

基于JavaSpringbootVue的房屋租赁系统小程序(源码数据库)110 一、系统介绍 本系统前后端分离 本系统分为用户、房东、超级管理员三种角色 1、用户: 登录、注册、房屋搜索、房屋收藏、看房预约、租房申请、租房记录、看房记录、收藏记录、我的消息、个人信息修改…

Center Smoothing Certified Robustness for Networks with Structured Outputs

文章目录 Center Smoothing: Certified Robustness for Networks with Structured OutputsSummaryResearch ObjectiveProblem StatementMethodsEvaluationConclusionNotesGaussian Smoothing常用希腊字母霍夫丁不等式(Hoeffdings inequality)1.简述2.霍夫…