网络字节流与嵌套字节流的区别
-
概念解释
- 网络嵌套字节流:
- 在网络编程的情境下,网络嵌套字节流通常是指将字节流(字节序列)以一种分层或者包含的方式进行组织,用于在网络传输过程中更好地处理数据。例如,在一个复杂的网络协议栈中,高层协议的数据单元(往往也是字节流形式)可以嵌套在底层协议的字节流之中。这就好比包裹的嵌套,外层包裹可能包含了内层包裹的相关信息以及内层包裹本身。以 HTTP 协议在 TCP/IP 协议之上传输为例,HTTP 消息(本身是字节流)被嵌套在 TCP 的字节流中进行传输。TCP 协议负责将 HTTP 消息切割成合适的片段(字节流形式),加上 TCP 头信息(也是字节流),然后通过网络发送。接收端的 TCP 协议先处理接收到的字节流,提取出 HTTP 消息的字节流部分,再交给上层的 HTTP 协议处理。
- 套字节流:
- 这个概念不是很常见,如果理解为 “包裹字节流” 的意思,和网络嵌套字节流有相似之处。不过,“套字节流” 可能更强调简单的封装形式,即将一个字节流作为另一个字节流的一部分进行简单包装。比如,在加密通信中,原始的字节流(如要传输的文件内容字节流)被加密算法处理后,会生成一个新的字节流,这个新字节流可以看作是原始字节流被 “套” 上了一层加密后的字节流。它可能没有像网络嵌套字节流那样涉及复杂的网络协议层次关系。
- 网络嵌套字节流:
-
应用场景区别
- 网络嵌套字节流:
- 广泛应用于网络通信的各个层次。在构建网络服务器和客户端应用时,不同层次的网络协议交互都涉及网络嵌套字节流。例如,在电子邮件传输(SMTP、POP3 等协议)中,邮件内容字节流被嵌套在相应的协议字节流中在网络上传输。它主要用于保证数据在不同网络环境和协议间的正确传递和解析,确保数据能够从源端的应用层通过层层协议封装,经过网络传输,最终在目的端的应用层被正确还原。
- 套字节流:
- 更多地用于数据安全和简单的数据封装场景。如在数字签名的应用中,消息的字节流被 “套” 上签名信息的字节流,用于验证消息的来源和完整性。或者在数据存储中,为了区分不同类型的数据,将数据字节流 “套” 上一个标识头字节流进行存储,方便后续读取和分类处理。
- 网络嵌套字节流:
-
处理方式区别
- 网络嵌套字节流:
- 需要严格按照网络协议栈的规则进行处理。在发送端,数据从高层协议开始,一层一层地进行字节流的嵌套和封装,添加每层协议所需的头部、尾部等信息。在接收端,则是相反的过程,从最外层的协议字节流开始,逐步解包和解析,根据每层协议的规范提取出内层协议的字节流,直到最终得到应用层的数据字节流。这需要对各种网络协议的格式、功能和交互流程有深入的了解。
- 套字节流:
- 处理相对简单,主要关注封装和提取两个操作。在封装时,根据具体的需求添加包裹字节流(如加密后的字节流添加到原始字节流外层)。在提取时,按照预先定义的规则(如加密算法对应的解密规则、数据标识头的解析规则等)去除外层字节流,获取内部的原始字节流或者所需的数据。
- 网络嵌套字节流:
PySpark代码开发
需要在ubuntu环境下或windows环境下,提前安装好spark执行环境
软件说明:
- spark 3.4.4
- python 3.9.20
- java jdk1.8.0_431
代码说明
DataSourceSoket.py 用于模拟生成实时字节流数据的脚本
# coding:utf8
import random
from socket import socketserver = socket()server.bind(('localhost', 9999))
server.listen(1)
while True:# 为了方便识别,输出一个"I'm waiting the connect ..."print("I'm waiting the connect ...")conn, addr = server.accept()print("Connected by {0}".format(addr))print(f"Connected by {addr}")# 输出发送数据# 自定义10条中文数据在一个数据容器里,并随机选取一条中文数据集输出# 步骤1:创建一个列表作为数据容器data_container = []# 步骤2:向列表中添加10条不同的中文数据chinese_data = ["你好,世界","今天天气真好","学习是一件快乐的事","分享知识,传递快乐","探索未知的世界","坚持就是胜利","努力不懈,梦想终会实现","失败乃成功之母","平凡造就非凡","相信自己,你是最棒的","I like Spark","I like Flink","I like Hadoop"]data_container.extend(chinese_data)# 步骤3:使用random.choice()随机选择并输出一条数据random_item = random.choice(data_container)print(random_item)conn.sendall(random_item.encode())conn.close()print("Connection closed")
pysparkStreamingNetwordCountCN.py SparkStreaming处理实时数据流
# coding:utf8from __future__ import print_functionimport os
import sys
import jieba
from pyspark import SparkContext
from pyspark.streaming import StreamingContext# 设置环境变量,确保指向正确的 Java 解释器
os.environ['JAVA_HOME'] = '/opt/HadoopEco/jdk1.8.0_431' # 替换为你的 JDK 8 安装路径
os.environ['SPARK_HOME'] = '/opt/HadoopEco/spark-3.4.4-bin-without-hadoop'# 加载停用词表
def load_stopwords(file_path):"""从指定文件或文件夹中加载停用词列表。参数:file_path (str): 停用词文件或文件夹的路径。返回:set: 包含停用词的集合。"""stopwords = set()try:if os.path.isfile(file_path):with open(file_path, 'r', encoding='utf-8') as f:stopwords.update(line.strip() for line in f)elif os.path.isdir(file_path):for filename in os.listdir(file_path):file_full_path = os.path.join(file_path, filename)if os.path.isfile(file_full_path):with open(file_full_path, 'r', encoding='utf-8') as f:stopwords.update(line.strip() for line in f)else:print(f"Error: The path {file_path} is neither a file nor a directory.")except FileNotFoundError:print(f"Error: The file or directory {file_path} does not exist.")except PermissionError:print(f"Error: Permission denied for the file or directory {file_path}.")except Exception as e:print(f"An unexpected error occurred: {e}")return stopwords# 替换为你的停用词表路径或文件夹路径
stopwords = load_stopwords(sys.argv[3]) # 或 'path/to/stopwords_folder'def sparkstreamingnetworkcount():global sc, ssc, linessc = SparkContext(appName="PythonStreamingNetworkWordCount")ssc = StreamingContext(sc, 10)lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))def split_words(line):try:# 使用 jieba 进行中文分词chinese_words = jieba.lcut(line.strip())# 使用空格进行英文分词english_words = line.strip().split(" ")# 合并分词结果并过滤掉空字符串words = set(chinese_words + english_words) - {''}# 过滤掉停用词filtered_words = [word.lower() for word in words if word not in stopwords]return filtered_wordsexcept Exception as e:print(f"Error processing line: {line}, Error: {e}", file=sys.stderr)return []counts = lines.flatMap(split_words).map(lambda x: (x, 1)).reduceByKey(lambda a, b: a + b)counts.pprint()ssc.start()ssc.awaitTermination()if __name__ == "__main__":if len(sys.argv) != 4:print("Usage: networkcount.py <hostname> <port> <stopwords>", file=sys.stderr)exit(-1)sparkstreamingnetworkcount()
运行时的运行参数配置
运行结果如下
DataSourceSoket.py
pysparkStreamingNetwordCountCN.py 运行结果
注意事项:
1. 需要先启动 DataSourceSocket.py, 在启动 pysparkStreamingNetwordCountCN.py