Kafka消息队列python开发环境搭建

目录

引言

Kafka 的核心概念和组件

Kafka 的主要特性

使用场景

申请云服务器

安装docker及docker-compose

VSCODE配置

开发环境搭建

搭建Kafka的python编程环境

Kafka的python编程示例

引言

Apache Kafka 是一个分布式流处理平台,由 LinkedIn 开发并在 2011 年贡献给 Apache 软件基金会。虽然 Kafka 常被归类为消息队列(也称为消息传递系统或消息中间件),但它实际上提供了比传统消息队列更丰富的功能,特别是在处理大规模数据流方面。Kafka 最初被设计用于处理 LinkedIn 的高吞吐量日志数据,但现在已广泛应用于各种场景,包括网站活动跟踪、日志收集、实时分析、监控数据聚合以及流处理等。

Kafka 的核心概念和组件
  1. Broker(代理):Kafka 集群中的服务器被称为 broker。每个 broker 都可以独立地处理来自生产者的数据,并响应消费者的请求。

  2. Topic(主题):Kafka 中的消息被分类存储在名为 topic 的容器中。每个 topic 可以有多个分区(partition),每个分区都有序地存储消息。

  3. Partition(分区):分区是 Kafka 中实现水平扩展和容错的关键。每个分区可以分布在不同的 broker 上,同时每个分区内的消息都是有序的。

  4. Producer(生产者):生产者负责向 Kafka 集群发送消息到指定的 topic。生产者可以指定消息的键(key),Kafka 使用这个键来决定消息被发送到哪个分区。

  5. Consumer(消费者):消费者从 Kafka 集群订阅 topic 并消费数据。Kafka 支持多个消费者群组(consumer group)同时消费同一个 topic,每个消费者群组内的消费者可以共同分担处理数据的任务。

  6. Consumer Group(消费者群组):同一个消费者群组内的消费者可以并行地消费同一个 topic 的不同分区,但每个分区只能被一个消费者群组内的一个消费者消费,以确保消息的有序性。

  7. Offset(偏移量):Kafka 中的每条消息都有一个唯一的偏移量,用于标识消息在分区中的位置。消费者通过记录自己消费到的偏移量来跟踪消息的读取进度。

Kafka 的主要特性
  • 高吞吐量:Kafka 被设计用来处理高吞吐量的数据,可以轻松处理成千上万条消息/秒。
  • 可扩展性:Kafka 的集群可以通过增加更多的 broker 来水平扩展,以处理更大的数据量和更高的吞吐量。
  • 持久性:Kafka 通过将消息存储在磁盘上来保证消息的持久性,即使在服务器故障的情况下也不会丢失数据。
  • 容错性:Kafka 提供了强大的容错机制,包括自动的副本复制和数据冗余,以确保数据的可靠性和可用性。
  • 实时性:Kafka 支持实时数据处理,使得它可以用于构建实时流处理应用程序。
使用场景
  • 消息传递:作为传统的消息队列使用,支持解耦的生产者和消费者。
  • 网站活动跟踪:收集和分析用户的点击流、搜索查询等网站活动数据。
  • 日志收集:从分布式系统中收集日志数据,用于监控和分析。
  • 实时分析:对实时数据流进行实时处理和分析,以支持实时决策。
  • 事件流处理:处理实时事件流,如传感器数据、金融交易数据等。

申请云服务器

(以京东云为例,阿里云、腾讯云、华为云、天翼云类似)

注意在选择操作系统的时候选择ubuntu22.04或ubuntu20.04

管理员账户root

管理员密码:在安装的时候设置,记住密码

下载安装mobaXterm

https://mobaxterm.mobatek.net/download-home-edition.html

安装docker及docker-compose

#以下只安装一次即可!
sudo apt update
sudo apt install -y docker.io # intel x86_64
sudo curl -SL https://github.com/docker/compose/releases/download/v2.21.0/docker-compose-linux-x86_64 \-o /usr/local/bin/docker-compose
# 如果github不能访问,可用hub.njuu.cf或521github.com/镜像站替换github.com重试
sudo chmod +x /usr/local/bin/docker-compose #如报错,去掉sudo重试
sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose #如报错,去掉sudo重试

查看docker和dockers是否安装好?

docker version
docker-compose version

VSCODE配置

开发环境搭建

  • 将如下文件保存为docker-compose.yml,并上传至服务器,例如/home/ubuntu/iiot/kafka

  • 将下面代码中的localhost替换为云服务公网IP

version: '3'
services:zookeeper:image: confluentinc/cp-zookeeper:latestenvironment:ZOOKEEPER_CLIENT_PORT: 2181ports:- "2181:2181"
dkafka:image: confluentinc/cp-kafka:latestcontainer_name: kafka-devdepends_on:- zookeeperenvironment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1ports:- "9092:9092"
  • 上传metaldigi.zip

解压缩文件

sudo apt updatesudo apt install zipunzip metaladigi.zip

cd metaldigi/kafka
  • 启动kafka消息服务器

    • 在命令行执行消息生产者

docker-compose up -d

搭建Kafka的python编程环境

  • 进入metaldigi文件夹

  • 执行 docker ps

cd metaldigi
docker-compose up -d

Kafka的python编程示例

  • 进入metaldigi文件夹

  • 执行 docker ps

 docker exec -it metal-digi-backend bash

消息生产者

from kafka import KafkaProducer# 创建一个 Kafka 生产者实例# 这里指定了 Kafka 服务器的地址和端口
producer = KafkaProducer(bootstrap_servers='150.158.11.142:9092')# 循环发送 10 条消息到 'demo-topic' 主题
for_in range(10):
# 将要发送的消息转换为字节格式message =f'message{_}'.encode('utf-8')# 发送消息到 Kafka 的 'demo-topic' 主题producer.send('demo-topic', value=message)# 打印已发送的消息print(f'Sent message: message{_}')# 关闭生产者实例
producer.close()这段代码创建一个 Kafka 生产者,用于向 Kafka 集群发送消息。它循环发送10条消息到名为 'demo-topic' 的主题。每条消息都是一个简单的文本字符串,转换为字节格式后发送。

消息消费者

from kafka import KafkaConsumer# 创建一个 Kafka 消费者实例
# 指定 Kafka 服务器地址、端口以及其他一些配置
consumer = KafkaConsumer('demo-topic',  # 指定要消费的主题bootstrap_servers='150.158.11.142:9092',  # Kafka 服务器地址和端口auto_offset_reset='earliest',  # 从最早的消息开始消费enable_auto_commit=True,  # 自动提交偏移量group_id='demo-group'  # 消费者组标识
)# 循环消费并打印收到的消息
for message in consumer:# 解码并打印消息内容print(f"Received message: {message.value.decode()}")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/379137.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android View的绘制流程

1.不管是View的添加,还是调用View的刷新方法invalidate()或者requestLayout(),绘制都是从ViewRootImpl的scheduleTraversals()方法开始 void scheduleTraversals() {if (!mTraversalScheduled) {mTraversalScheduled true;mTraversalBarrier mHandler…

SpringCloud教程 | 第九篇: 使用API Gateway

1、参考资料 SpringCloud基础篇-10-服务网关-Gateway_springcloud gateway-CSDN博客 2、先学习路由,参考了5.1 2.1、建了一个cloudGatewayDemo,这是用来配置网关的工程,配置如下: http://localhost:18080/aaa/name 该接口代码如…

科普文:详解23种设计模式

概叙 设计模式是对大家实际工作中写的各种代码进行高层次抽象的总结,其中最出名的当属 Gang of Four(GoF)的分类了,他们将设计模式分类为 23 种经典的模式,根据用途我们又可以分为三大类,分别为创建型模式…

等保-Linux等保测评

等保-Linux等保测评 1.查看相应文件,账户xiaoming的密码设定多久过期 rootdengbap:~# chage -l xiaoming Last password change : password must be changed Password expires : pass…

理解类与对象:面向对象基础

目录 1. 类的定义1.1 格式1.2 访问限定符1.3 类域 2.实例化2.1 实例化概念2.2 对象大小 3.this指针 1. 类的定义 1.1 格式 class为定义类的关键字,Date为类的名字,{ }中为类的主体,注意类定义结束后面的分号不能省略。类体中内容称为类的成…

【博士每天一篇文献-算法】连续学习算法之HNet:Continual learning with hypernetworks

阅读时间:2023-12-26 1 介绍 年份:2019 作者:Johannes von Oswald,Google Research;Christian Henning,EthonAI AG;Benjamin F. Grewe,苏黎世联邦理工学院神经信息学研究所 期刊&a…

如何在项目中打印sql和执行的时间

目标:打印DAO方法中sql和执行的时间 一种方式是去实现Mybatis的拦截器Interceptor ,比较麻烦; 这里介绍一种比较简单的实现方式; 1、如何打印sql? 配置文件加这个可以打印出com.zhenhui.ids.busi.watch包下执行的sq…

3D线上展厅:元宇宙时代的营销利器,流量暴增的秘密武器!

在体验经济蓬勃发展的当下,企业营销领域正以前所未有的热情探索创新路径,元宇宙这一融合了无限想象与未来科技的概念,成为了众多品牌竞相追逐的新蓝海。3D技术、增强现实(AR)、虚拟现实(VR)以及…

【ProtoBuf】proto 3 语法 -- 详解

这个部分会对通讯录进行多次升级,使用 2.x 表示升级的版本,最终将会升级如下内容: 不再打印联系人的序列化结果,而是将通讯录序列化后并写入文件中。 从文件中将通讯录解析出来,并进行打印。 新增联系人属性&#xff…

谈谈大数据采集和常见问题

01 什么是数据采集 数据采集是大数据的基石,不论是现在的互联网公司,物联网公司或者传统的IT公司,每个业务流程环节都会产生大量的数据,同时用户操作的日志也会产生大量的数据,为了将这些结构化和非结构化的数据进行…

【常见开源库的二次开发】基于openssl的加密与解密——单向散列函数(四)

目录: 目录: 一、什么是单项散列函数? 1.1 如何验证文件是否被修改过 1.2 单项散列函数: 二、单向hash抗碰撞 2.1 弱抗碰撞(Weak Collision Resistance) 2.2 强抗碰撞(Strong Collision Resista…

Webpack详解

Webpack Webpack 是一个现代 JavaScript 应用程序的静态模块打包器(module bundler)。它允许开发者将项目中的资源(如 JavaScript、CSS、图片等)视为模块,通过分析和处理这些模块之间的依赖关系,将它们打包…

Python酷库之旅-第三方库Pandas(024)

目录 一、用法精讲 61、pandas.to_numeric函数 61-1、语法 61-2、参数 61-3、功能 61-4、返回值 61-5、说明 61-6、用法 61-6-1、数据准备 61-6-2、代码示例 61-6-3、结果输出 62、pandas.to_datetime函数 62-1、语法 62-2、参数 62-3、功能 62-4、返回值 62-…

ospf的MGRE实验

第一步:配IP [R1-GigabitEthernet0/0/0]ip address 12.0.0.1 24 [R1-GigabitEthernet0/0/1]ip address 21.0.0.1 24 [R1-LoopBack0]ip address 192.168.1.1 24 [ISP-GigabitEthernet0/0/0]ip address 12.0.0.2 24 [ISP-GigabitEthernet0/0/1]ip address 21.0.0.2 24…

Hadoop3:HDFS-存储优化之纠删码

一、集群环境 集群一共5个节点,102/103/104/105/106 二、纠删码原理 1、简介 HDFS默认情况下,一个文件有3个副本,这样提高了数据的可靠性,但也带来了2倍的冗余开销。Hadoop3.x引入了纠删码,采用计算的方式&#x…

新一代大语言模型 GPT-5 对工作与生活的影响及应对策略

文章目录 📒一、引言 📒二、GPT-5 的发展背景 🚀(一)GPT-4 的表现与特点 🚀(二)GPT-5 的预期进步 📒三、GPT-5 对工作的影响 🚀(一&#xf…

交叉编译ethtool(ubuntu 2018)

参考文章:https://www.cnblogs.com/nazhen/p/16800427.html https://blog.csdn.net/weixin_43128044/article/details/137953913 1、下载相关安装包 //ethtool依赖libmul git clone http://git.netfilter.org/libmnl //ethtool源码 git clone http://git.kernel.or…

OpenGL笔记十三之Uniform向量数据传输、使用glUniform3f和glUniform3fv

OpenGL笔记十三之Uniform向量数据传输、使用glUniform3f和glUniform3fv —— 2024-07-14 晚上 bilibili赵新政老师的教程看后笔记 code review! 文章目录 OpenGL笔记十三之Uniform向量数据传输、使用glUniform3f和glUniform3fv1.glUniform3f1.1.运行1.2.vs1.3.fs1.4.shader.…

科研绘图系列:R语言雨云图(Raincloud plot)

介绍 雨云图(Raincloud plot)是一种数据可视化工具,它结合了多种数据展示方式,旨在提供对数据集的全面了解。雨云图通常包括以下几个部分: 密度图(Density plot):表示数据的分布情况,密度图的曲线可以展示数据在不同数值区间的密度。箱线图(Box plot):显示数据的中…

.NET MAUI开源架构_1.学习资源分享

最近需要开发Android的App,想预研下使用.NET开源架构.NET MAUI来开发App程序。因此网上搜索了下相关资料,现在把我查询的结果记录下,方便后面学习。 1.官方文档 1.1MAUI官方学习网站 .NET Multi-Platform App UI 文档 - .NET MAUI | Micro…