Java技术栈 —— Spark入门(三)之实时视频流

Java技术栈 —— Spark入门(三)之实时视频流转灰度图像

  • 一、将摄像头数据发送至kafka
  • 二、Kafka准备topic
  • 三、spark读取kafka图像数据并处理
  • 四、本地显示灰度图像(存在卡顿现象,待优化)

项目整体结构图如下

在这里插入图片描述

参考文章或视频链接
[1] Architecture-for-real-time-video-streaming-analytics

一、将摄像头数据发送至kafka

这个代码将运行在你有摄像头的机器上,缺依赖就装依赖

import cv2
import kafka
import numpy as np# 设置 Kafka Producer
# 注意修改你的kafka地址
producer = kafka.KafkaProducer(bootstrap_servers='localhost:9092')# 打开摄像头(0 为默认摄像头)
cap = cv2.VideoCapture(0)while True:# 从摄像头捕获帧ret, frame = cap.read()if not ret:break# 将图像编码为 JPEG 格式_, buffer = cv2.imencode('.jpg', frame)# 将图像作为字节数组发送到 Kafkaproducer.send('camera-images', buffer.tobytes())# 显示当前捕获的帧cv2.imshow('Video', frame)# 按 'q' 键退出if cv2.waitKey(1) & 0xFF == ord('q'):break# 释放资源
cap.release()
cv2.destroyAllWindows()
producer.close()

二、Kafka准备topic

在准备topic之前,要先配置kafka中的config/server.properties文件,否则其它机器无法联通kafka,配置好后重启kafka。

# 找到这两个选项并修改成如下内容
listeners=PLAINTEXT://0.0.0.0:9092
# 改成你的kafka所在服务器ip
advertised.listeners=PLAINTEXT://{your_ip}:9092

如果你之前创建过topic,那就清空这些topic中的数据

# 设置保留时间为0,相当于立即清空数据
#bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name {your_topic_name} --add-config retention.ms=0
# 恢复原始保留设置,立即清空数据后,将数据的保留时间恢复至原有状态
#bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name {your_topic_name} --add-config retention.ms=604800000

开始正式创建topic

# 创建输入图片所在topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic camera-images --partitions 1 --replication-factor 1
# 创建输出的gray灰度图片所在topic
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic result-gray-images --partitions 1 --replication-factor 1# 准备好后查看下topic list进行验证
bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
# 查看某topic中的数据
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic {your_topic_name} --from-beginning

三、spark读取kafka图像数据并处理

首先给你的spark脚本所运行的python环境(这个环境一般可以为conda等虚拟环境),安装必要的依赖库

pip install opencv-python-headless

准备脚本文件

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import BinaryType
import cv2
import numpy as npbootstrapServers = "localhost:9092"# 创建 SparkSession
spark = SparkSession.builder \.appName("Kafka-Spark-OpenCV") \.getOrCreate()# 初始化 Kafka Producer,用于发送处理后的图像
# 如果不这样做,会出现PicklingError,因为如果UDF中,包含了无法被序列化的对象,例如线程锁(_thread.RLock)或 Kafka 的 KafkaProducer 实例,序列化就会失败。
# 因此,在每个执行器内部,创建 KafkaProducer 实例
producer = None# 从 Kafka 读取数据流
df = spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "localhost:9092") \.option("subscribe", "camera-images") \.load()# UDF 用于将图像转换为灰度
def convert_to_gray(image_bytes):global producer# 创建 KafkaProducer 实例(在每个执行器上只初始化一次)if producer is None:producer = KafkaProducer(bootstrap_servers = bootstrapServers)# 将字节数组转换为 numpy 数组nparr = np.frombuffer(image_bytes, np.uint8)# 将 numpy 数组解码为图像img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)# 将图像转换为灰度gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)# 将灰度图像编码为 JPEG_, buffer = cv2.imencode('.jpg', gray)# 将处理后的图像发送到 Kafka 'result-gray-images' 主题producer.send('result-gray-images', buffer.tobytes())return buffer.tobytes()# 注册 UDF
convert_to_gray_udf = udf(convert_to_gray, BinaryType())# 应用 UDF 对数据进行灰度化处理
gray_df = df.withColumn("gray_image", convert_to_gray_udf("value"))# 将处理后的数据写入文件或其他输出
query = gray_df.writeStream \.outputMode("append") \.format("console") \.start()# query = gray_df\
#     .writeStream \
#     .format('kafka') \
#     .outputMode('update') \
#     .option("kafka.bootstrap.servers", bootstrapServers) \
#     .option('checkpointLocation', '/spark/job-checkpoint') \
#     .option("topic", "result-gray-images") \
#     .start()query.awaitTermination()

spark-submit提交脚本文件:

# 1.提高内存
# 2.调整 Kafka 批次大小,减少单个批次的数据量,从而降低内存使用(这个步骤存疑)
/opt/spark-3.5.2-bin-hadoop3/bin/spark-submit \
--executor-memory 4g \
--driver-memory 4g \
--conf "spark.kafka.maxOffsetsPerTrigger=1000" \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.2,org.apache.kafka:kafka-clients:3.5.2 \
/opt/spark-3.5.2-bin-hadoop3/jobs/pyjobs/kafka_to_spark.py

四、本地显示灰度图像(存在卡顿现象,待优化)

import cv2
import numpy as np
from kafka import KafkaConsumer# 设置 Kafka Consumer
consumer = KafkaConsumer('result-gray-images',bootstrap_servers='{your_kafka_ip}:9092',auto_offset_reset='latest',enable_auto_commit=True,# group_id='image-display-group'
)# 从 Kafka 主题读取灰度图像并显示
for message in consumer:# print("reading gray image.... ")# 将消息转换为 numpy 数组nparr = np.frombuffer(message.value, np.uint8)# 解码为图像gray_img = cv2.imdecode(nparr, cv2.IMREAD_GRAYSCALE)# 显示灰度图像cv2.imshow('Gray Video', gray_img)if cv2.waitKey(1) & 0xFF == ord('q'):break# 释放资源
cv2.destroyAllWindows()
consumer.close()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/412986.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

破解“目录名称无效”难题:数据恢复实战指南

在数字化生活日益普及的今天,数据存储与管理成为了我们日常不可或缺的一部分。然而,当您尝试访问某个文件夹时,却遇到了“目录名称无效”的错误提示,这无疑会让人感到焦虑和困惑。本文将深入探讨“目录名称无效”这一问题的根源&a…

Excel中使用VBS自定义函数将中文转为拼音首字母

1、在“开发工具”中&#xff0c;点击“Visual Basic”。如果没有“开发工具”&#xff0c;则添加。 2、添加“模块”&#xff0c;在窗口中添加自定义函数。 Function MyGetPYChar(char) MyCodeNumber 65536 Asc(char) If (MyCodeNumber > 45217 And MyCodeNumber <…

2d椭圆拟合学习

算法来自论文《 Direct Least Square Fitting of Ellipses》 《NUMERICALLY STABLE DIRECT LEAST SQUARES FITTING OF ELLIPSES》 相关文章 论文阅读&#xff1a;直接拟合椭圆 Direct Least Square Fitting of Ellipseshttps://zhuanlan.zhihu.com/p/645391510Fitting Elli…

线段树离散化、二分搜索、特别修改

699. 掉落的方块 - 力扣&#xff08;LeetCode&#xff09; 1.如果直接按照原落点的值构造线段树&#xff0c;空间开辟会过大&#xff0c;所以收集所有出现过的点进行离散化 2.方块a落在1--3点&#xff0c;b落在3--4点&#xff0c;如果直接按照落点修改&#xff0c;查询3时位置…

基于Docker搭建Graylog分布式日志采集系统

文章目录 一、简介二、Graylog1、主要特点2、组件3、工作流程介绍4、使用场景 三、Graylog 安装部署1、 安装 docker2、安装docker compose3、 安装graylog4、Graylog控制台 四、springboot集成Graylog 一、简介 Graylog是一个开源的日志管理工具&#xff0c;主要功能包括日志…

go 切片slice学习总结

切片的结构 切片的底层结构&#xff1a; type SliceHeader struct {Data uintptr // 指向底层数组的指针 Len int //长度Cap int //空间容量 } 切片的初始化 1 通过数组或者已有的slice创建新的slice 1.1 使用数组创建切片 通过数组的一部分来初始化切片。 …

数据结构-c/c++实现栈(详解,栈容量可以动态增长)

一.栈的基本介绍 栈是一种只能够在一端进行插入和删除的顺序表。如下图 空栈&#xff1a;表示不含任何元素的栈 栈顶&#xff1a;表示允许进行插入和删除元素的一端 栈底&#xff1a;表示不允许进行插入和删除元素的一端 即栈是一种后进先出的线性表数据结构 二.栈的常见操…

为什么我的手机卡需要快递员给激活?这到底安全吗?

网友咨询&#xff1a;网上申请了一张新卡&#xff0c;本来想着自己激活&#xff0c;没想到快递员先打电话过来说&#xff0c;要身份证给帮助激活&#xff0c;所以我想问一下&#xff0c;网上申请的卡是不是都是快递给激活呢&#xff1f;安不安全呢&#xff1f; 首先要说一下&a…

第4章-08-用Python Requests库模拟浏览器访问接口

🏆作者简介,黑夜开发者,CSDN领军人物,全栈领域优质创作者✌,CSDN博客专家,阿里云社区专家博主,2023年CSDN全站百大博主。 🏆数年电商行业从业经验,历任核心研发工程师,项目技术负责人。 🏆本文已收录于专栏:Web爬虫入门与实战精讲,后续完整更新内容如下。 文章…

CSRF漏洞的预防

目录 CSRF漏洞预防措施 深入研究 CSRF Token的工作原理是什么&#xff1f; 为什么仅依靠Referer头字段来防范CSRF攻击不是完全可靠&#xff1f; SameSite cookie属性如何防止CSRF攻击&#xff1f; SameSite Cookie属性的作用 如何通过SameSite属性防止CSRF攻击 导图 CS…

Eclipse 自定义字体大小

常用编程软件自定义字体大全首页 文章目录 前言具体操作1. 打开设置对话框2. 打开字体设置页面3. 找到Text Font&#xff0c;点击修改4. 修改字体 前言 Eclipse 自定义字体大小&#xff0c;统一设置为 Courier New &#xff0c;大小为 三号 具体操作 【Windows】>【Perfer…

Qt第二课----信号和槽

作者前言 &#x1f382; ✨✨✨✨✨✨&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f382; ​&#x1f382; 作者介绍&#xff1a; &#x1f382;&#x1f382; &#x1f382; &#x1f389;&#x1f389;&#x1f389…

C#笔记4 详细解释事件及其原型、匿名方法和委托的关系

匿名方法 定义 匿名方法允许一个与委托关联的代码被内联的写入使用委托的位置。 语法形式 delegate(参数列表) {代码块 } 前文说过&#xff0c;委托是定义了一个公司&#xff0c;公司专门承接某一类型的任务。 委托的实例化就是公司把任务交给了具体的职员&#xff08;方…

掌握测试的艺术:深入探索Python的pytest库

文章目录 **掌握测试的艺术&#xff1a;深入探索Python的pytest库**背景&#xff1a;为什么选择pytest&#xff1f;pytest是什么&#xff1f;如何安装pytest&#xff1f;5个简单的库函数使用方法1. pytest.main()2. pytest.skip()3. pytest.mark.parametrize()4. pytest.raises…

基于物联网的低成本便携式传感器节点用于火灾和空气污染的检测与报警

目录 摘要 引言 材料和方法 传感器节点 IoT 微控制器 颗粒物传感器 环境和气体传感器 MQTT代理 Node-Red监控平台 系统结构 数据存储 工作描述 实验结果 讨论 结论 致谢 参考文献 这篇论文的标题是《Low-cost IoT-based Portable Sensor Node for Fire and Air…

STM32G474之TIM1捕获1模式

STM32G474采用TIM8产生方波信号&#xff0c;使用TIM1工作于捕获1模式&#xff0c;并计算方波频率。捕获方波周期&#xff0c;在有些开发中&#xff0c;还是能用到。建议开发时使用HAL库自带的库函数。使用寄存器方法也可以实现&#xff0c;但是后期修改不太方便。 测试时&…

利用 Web 浏览器构建 Java Media Player

如果您需要在 Java 桌面应用程序中嵌入媒体播放器&#xff0c;有几种方法可供选择&#xff1a; 您可以使用 JavaFX Media API 来实现所有必需的媒体播放器功能。虽然稍显过时但仍然可用的 Java Media Framework 也可以作为一种解决方案。您可以集成像 VLCJ 这样的第三方 Java …

如何选择适合企业的财税自动化解决方案

财税自动化解决方案是现代企业提升财务管理效率、降低运营成本的关键工具。然而&#xff0c;市场上的财税自动化产品琳琅满目&#xff0c;功能各异&#xff0c;企业在选择时常常感到困惑。本文金智维将从中小型的需求出发&#xff0c;帮助企业了解如何选择适合自身的财税自动化…

QT实战项目之音乐播放器

项目效果演示 myMusicShow 项目概述 在本QT音乐播放器实战项目中,开发环境使用的是QT Creator5.14版本。该项目实现了音乐播放器的基本功能,例如开始播放、停止播放、下一首播放、上一首播放、调节音量、调节倍速、设置音乐播放模式等。同时还具备搜索功能,通过搜索歌曲名字…

另一种关于类的小例

前言 我们还是以一段关于构造函数的代码作为开端&#xff0c;我们以之前银行家的小项目为背景 class Account {constructor(owner, currency, pin) {this.owner owner;this.currency currency;this.pin pin;} }const ITshare new Account(ITshare, EUR, 21211); console.…