PySpark3.4.4_基于StreamingContext实现网络字节流统计分析

网络字节流与嵌套字节流的区别

  1. 概念解释

    • 网络嵌套字节流
      • 在网络编程的情境下,网络嵌套字节流通常是指将字节流(字节序列)以一种分层或者包含的方式进行组织,用于在网络传输过程中更好地处理数据。例如,在一个复杂的网络协议栈中,高层协议的数据单元(往往也是字节流形式)可以嵌套在底层协议的字节流之中。这就好比包裹的嵌套,外层包裹可能包含了内层包裹的相关信息以及内层包裹本身。以 HTTP 协议在 TCP/IP 协议之上传输为例,HTTP 消息(本身是字节流)被嵌套在 TCP 的字节流中进行传输。TCP 协议负责将 HTTP 消息切割成合适的片段(字节流形式),加上 TCP 头信息(也是字节流),然后通过网络发送。接收端的 TCP 协议先处理接收到的字节流,提取出 HTTP 消息的字节流部分,再交给上层的 HTTP 协议处理。
    • 套字节流
      • 这个概念不是很常见,如果理解为 “包裹字节流” 的意思,和网络嵌套字节流有相似之处。不过,“套字节流” 可能更强调简单的封装形式,即将一个字节流作为另一个字节流的一部分进行简单包装。比如,在加密通信中,原始的字节流(如要传输的文件内容字节流)被加密算法处理后,会生成一个新的字节流,这个新字节流可以看作是原始字节流被 “套” 上了一层加密后的字节流。它可能没有像网络嵌套字节流那样涉及复杂的网络协议层次关系。
  2. 应用场景区别

    • 网络嵌套字节流
      • 广泛应用于网络通信的各个层次。在构建网络服务器和客户端应用时,不同层次的网络协议交互都涉及网络嵌套字节流。例如,在电子邮件传输(SMTP、POP3 等协议)中,邮件内容字节流被嵌套在相应的协议字节流中在网络上传输。它主要用于保证数据在不同网络环境和协议间的正确传递和解析,确保数据能够从源端的应用层通过层层协议封装,经过网络传输,最终在目的端的应用层被正确还原。
    • 套字节流
      • 更多地用于数据安全和简单的数据封装场景。如在数字签名的应用中,消息的字节流被 “套” 上签名信息的字节流,用于验证消息的来源和完整性。或者在数据存储中,为了区分不同类型的数据,将数据字节流 “套” 上一个标识头字节流进行存储,方便后续读取和分类处理。
  3. 处理方式区别

    • 网络嵌套字节流
      • 需要严格按照网络协议栈的规则进行处理。在发送端,数据从高层协议开始,一层一层地进行字节流的嵌套和封装,添加每层协议所需的头部、尾部等信息。在接收端,则是相反的过程,从最外层的协议字节流开始,逐步解包和解析,根据每层协议的规范提取出内层协议的字节流,直到最终得到应用层的数据字节流。这需要对各种网络协议的格式、功能和交互流程有深入的了解。
    • 套字节流
      • 处理相对简单,主要关注封装和提取两个操作。在封装时,根据具体的需求添加包裹字节流(如加密后的字节流添加到原始字节流外层)。在提取时,按照预先定义的规则(如加密算法对应的解密规则、数据标识头的解析规则等)去除外层字节流,获取内部的原始字节流或者所需的数据。

PySpark代码开发

需要在ubuntu环境下或windows环境下,提前安装好spark执行环境

软件说明:

  1. spark 3.4.4
  2. python 3.9.20
  3. java jdk1.8.0_431

代码说明

DataSourceSoket.py 用于模拟生成实时字节流数据的脚本

# coding:utf8
import random
from socket import socketserver = socket()server.bind(('localhost', 9999))
server.listen(1)
while True:# 为了方便识别,输出一个"I'm waiting the connect ..."print("I'm waiting the connect ...")conn, addr = server.accept()print("Connected by {0}".format(addr))print(f"Connected by {addr}")# 输出发送数据# 自定义10条中文数据在一个数据容器里,并随机选取一条中文数据集输出# 步骤1:创建一个列表作为数据容器data_container = []# 步骤2:向列表中添加10条不同的中文数据chinese_data = ["你好,世界","今天天气真好","学习是一件快乐的事","分享知识,传递快乐","探索未知的世界","坚持就是胜利","努力不懈,梦想终会实现","失败乃成功之母","平凡造就非凡","相信自己,你是最棒的","I like Spark","I like Flink","I like Hadoop"]data_container.extend(chinese_data)# 步骤3:使用random.choice()随机选择并输出一条数据random_item = random.choice(data_container)print(random_item)conn.sendall(random_item.encode())conn.close()print("Connection closed")

pysparkStreamingNetwordCountCN.py  SparkStreaming处理实时数据流

# coding:utf8from __future__ import print_functionimport os
import sys
import jieba
from pyspark import SparkContext
from pyspark.streaming import StreamingContext# 设置环境变量,确保指向正确的 Java 解释器
os.environ['JAVA_HOME'] = '/opt/HadoopEco/jdk1.8.0_431'  # 替换为你的 JDK 8 安装路径
os.environ['SPARK_HOME'] = '/opt/HadoopEco/spark-3.4.4-bin-without-hadoop'# 加载停用词表
def load_stopwords(file_path):"""从指定文件或文件夹中加载停用词列表。参数:file_path (str): 停用词文件或文件夹的路径。返回:set: 包含停用词的集合。"""stopwords = set()try:if os.path.isfile(file_path):with open(file_path, 'r', encoding='utf-8') as f:stopwords.update(line.strip() for line in f)elif os.path.isdir(file_path):for filename in os.listdir(file_path):file_full_path = os.path.join(file_path, filename)if os.path.isfile(file_full_path):with open(file_full_path, 'r', encoding='utf-8') as f:stopwords.update(line.strip() for line in f)else:print(f"Error: The path {file_path} is neither a file nor a directory.")except FileNotFoundError:print(f"Error: The file or directory {file_path} does not exist.")except PermissionError:print(f"Error: Permission denied for the file or directory {file_path}.")except Exception as e:print(f"An unexpected error occurred: {e}")return stopwords# 替换为你的停用词表路径或文件夹路径
stopwords = load_stopwords(sys.argv[3])  # 或 'path/to/stopwords_folder'def sparkstreamingnetworkcount():global sc, ssc, linessc = SparkContext(appName="PythonStreamingNetworkWordCount")ssc = StreamingContext(sc, 10)lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))def split_words(line):try:# 使用 jieba 进行中文分词chinese_words = jieba.lcut(line.strip())# 使用空格进行英文分词english_words = line.strip().split(" ")# 合并分词结果并过滤掉空字符串words = set(chinese_words + english_words) - {''}# 过滤掉停用词filtered_words = [word.lower() for word in words if word not in stopwords]return filtered_wordsexcept Exception as e:print(f"Error processing line: {line}, Error: {e}", file=sys.stderr)return []counts = lines.flatMap(split_words).map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b)counts.pprint()ssc.start()ssc.awaitTermination()if __name__ == "__main__":if len(sys.argv) != 4:print("Usage: networkcount.py <hostname> <port> <stopwords>", file=sys.stderr)exit(-1)sparkstreamingnetworkcount()

运行时的运行参数配置

运行结果如下

DataSourceSoket.py

pysparkStreamingNetwordCountCN.py 运行结果

注意事项:

1. 需要先启动 DataSourceSocket.py, 在启动 pysparkStreamingNetwordCountCN.py

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/487286.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【JS】简单CSS简单JS写的上传进度条

纯JS写的&#xff0c;简单的上传进度条&#xff0c;当上传的文件较大&#xff0c;加一个动态画面&#xff0c;就不会让人觉得出错了或网络卡了 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"v…

47 基于单片机的书库环境监测

目录 一、主要功能 二、硬件资源 三、程序编程 四、实现现象 一、主要功能 基于51单片机&#xff0c;采用DHT11湿度传感器检测湿度&#xff0c;DS18B20温度传感器检测温度&#xff0c; 采用滑动变阻器连接数模转换器模拟二氧化碳和氧气浓度检测&#xff0c;各项数值通过lc…

解决:IDEA中@Autowired自动注入MyBatis Mapper报红警告的几种解决方法

文章目录 解决&#xff1a;IDEA中Autowired自动注入MyBatis Mapper报红警告的几种解决方法问题描述&#xff1a;解决办法&#xff1a;1.将Autowired注解改成Resource2.给Autowired(required false)设置属性3.给Mapper层加注解Mapper/Repository4.改变写法,用RequiredArgsConst…

Spring Boot中实现JPA多数据源配置指南

本文还有配套的精品资源&#xff0c;点击获取 简介&#xff1a;本文详细介绍了在Spring Boot项目中配置和使用JPA进行多数据源管理的步骤。从引入依赖开始&#xff0c;到配置数据源、创建DataSource bean、定义实体和Repository&#xff0c;最后到配置事务管理器和使用多数据…

Ubuntu 安装 web 服务器

安装 apach sudo apt install apache2 -y 查看 apach2 版本号 apache2 -v 检查是否启动服务器 sudo service apache2 status 检查可用的 ufw 防火墙应用程序配置 sudo ufw app list 关闭防火墙 sudo ufw disable 更改允许通过端口流量 sudo ufw allow Apache Full 开启…

go语言的成神之路-标准库篇-fmt标准库

目录 一、三种类型的输出 print&#xff1a; println&#xff1a; printf&#xff1a; 总结&#xff1a; 代码展示&#xff1a; 二、格式化占位符 %s&#xff1a;用于格式化字符串。 %d&#xff1a;用于格式化整数。 %f&#xff1a;用于格式化浮点数。 %v&#xff1…

【Linux操作系统】Linux常用一键脚本

Linux网络加速脚本 Linux网络加速脚本可以替换Linux内核和更改TCP拥塞算法的一键脚本&#xff0c;包括安装BBR内核、XANMOD官方内核&#xff0c;开启BBR加速等功能&#xff0c;总之非常强大。 不卸载内核脚本&#xff08;一般用这个&#xff09; wget -O tcpx.sh "http…

【全攻略】React Native与环信UIKit:Expo项目从创建到云打包完整指南

前言 在当今快速发展的移动应用领域&#xff0c;React Native 因其跨平台开发能力和高效的开发周期而受到开发者的青睐。而 Expo&#xff0c;作为一个基于 React Native 的框架&#xff0c;进一步简化了开发流程&#xff0c;提供了一套完整的工具链&#xff0c;使得开发者能够…

乌龟咬人,小意外中的大警示

近日&#xff0c;听闻有朋友被自家乌龟咬了手指&#xff0c;这看似滑稽的小意外&#xff0c;实则蕴含着不少值得我们深思的安全与责任问题。 乌龟&#xff0c;在大众的认知里&#xff0c;向来是行动迟缓、性情温和的宠物代表。它们慢悠悠地爬行&#xff0c;憨态可掬的模样常常…

java+springboot+mysql论文分享平台

项目介绍&#xff1a; 使用javaspringbootmysql开发的论文分享平台&#xff0c;系统包含超级管理员、管理员、用户角色&#xff0c;功能如下&#xff1a; 用户&#xff1a;系统前台首页&#xff1b;论文分类&#xff1b;论文共享&#xff08;用户可以上传、下载、评论论文文档…

《探索形象克隆:科技与未来的奇妙融合》

目录 一、什么是形象克隆 二、形象克隆的技术原理 三、形象克隆的发展现状 四、形象克隆的未来趋势 五、形象克隆的应用场景 六、形象克隆简单代码案例 Python 实现数字人形象克隆 Scratch 实现角色克隆效果&#xff08;以猫为例&#xff09; JavaScript 实现 Scratc…

帝可得-运营管理App

运营管理App Android模拟器 本项目的App客户端部分已经由前端团队进行开发完成&#xff0c;并且以apk的方式提供出来&#xff0c;供我们测试使用&#xff0c;如果要运行apk&#xff0c;需要先安装安卓的模拟器。 可以选择国内的安卓模拟器产品&#xff0c;比如&#xff1a;网…

案例-商品列表(组件封装)

标签组件封装 1.双击显示&#xff0c;自动聚焦 2.失去焦点&#xff0c;隐藏输入框 标签一列&#xff0c;不同行的标签内容不同&#xff0c;但是除此之外其他基本一致&#xff0c;所以选择用 标签组件 将这一部分封装为一个组件&#xff0c;需要时组件标签展示。 首先标签处一进…

Linux socket编程

目录 基础概念端口和端口号Socket&#xff08;套接字&#xff09;UDP和TCP的概念 Socket编程实战socket的类型struct sockaddrstruct sockaddr_in网络字节序socket APITCP网络编程流程分析UDP Demo样例 other概念补充setsockopt函数心跳机制面向字节流与面向报文的理解 参考 基…

Linux update-alternatives 命令详解

1、查看所有候选项 sudo update-alternatives --list &#xff08;java筛选​​​​​​​sudo update-alternatives --list java&#xff09; 2、​​​​​​​更换候选项 sudo update-alternatives --config java 3、自动选择优先级最高的作为默认项 sudo update-alterna…

unity3d—demo(2d人物左右移动发射子弹)

目录 人物代码示例&#xff1a; 子弹代码示例&#xff1a; 总结上面代码&#xff1a; 注意点&#xff1a; 人物代码示例&#xff1a; using System.Collections; using System.Collections.Generic; using UnityEngine;public class PlayerTiao : MonoBehaviour {public f…

JSP技术发展现状

多年前&#xff0c;Java入门时学习的JSP可谓时风光无限&#xff0c;J2EE如日中天&#xff0c;短短数年&#xff0c;技术迭代更新光速般发展&#xff0c;有些技术慢慢就退出历史舞台。 JSP&#xff08;Java Server Pages&#xff09; 技术在早期 Java Web 开发中曾是构建动态网…

科技绽放-EtherCAT转Profinet网关智能连接项目

一、项目名称 备选名称及含义&#xff1a;开疆智能EtherCAT转Profinet网关智能连接项目&#xff1a;直接体现了从Profinet到EtherCAT的连接核心内容&#xff0c;智能连接突出了该项目的技术特点。工业互联方案强调了该项目在工业领域实现不同协议设备互联的目标&#xff0c;方案…

《计算机网络》(408大题)

2009 路由转发和静态路由的计算 子网划分、路由聚合的计算 注&#xff1a;CIDR中的子网号可以全为0或1&#xff0c;但是其主机号不允许。 注&#xff1a; 这里其实是把到互联网的路由当做了一个默认路由&#xff08;当一个目的网络地址与路由表中其他都不匹配时&#xff0c;…

matlab读取NetCDF文件

matlab对NetCDF文件进行信息获取和读取数据 文章目录 前言一、什么是NetCDF文件二、读取NetCDF文件数据 1.引入库 2.读入数据总结 前言 在气象学中&#xff0c;许多气象数据存储在NetCDF文件中&#xff0c;后缀为.nc&#xff0c;通常可以用NCL、python和MATLAB等对该…