Structured Streaming

目录

一、概述

(一)基本概念

(二)两种处理模型

(三)Structured Streaming和Spark SQL、Spark Streaming关系

二、编写Structured Streaming程序的基本步骤

(一)实现步骤

(二)运行测试

三、输入源

(一)File源

(二)Kafka源

(三)Socket源

(四)Rate源

四、输出操作

(一)启动流计算

(二)输出模式

(三)输出接收器


一、概述

        提供端到端的完全一致性是设计Structured Streaming 的关键目标之一,为了实现这一点,Spark设计了输入源、执行引擎和接收器,以便对处理的进度进行更可靠的跟踪,使之可以通过重启或重新处理,来处理任何类型的故障。如果所使用的源具有偏移量来跟踪流的读取位置,那么,引擎可以使用检查点和预写日志,来记录每个触发时期正在处理的数据的偏移范围;此外,如果使用的接收器是“幂等”的,那么通过使用重放、对“幂等”接收数据进行覆盖等操作,Structured Streaming可以确保在任何故障下达到端到端的完全一致性。
        Spark一直处于不停的更新中,从Spark 2.3.0版本开始引入持续流式处理模型后,可以将原先流处理的延迟降低到毫秒级别。

(一)基本概念

        Structured Streaming的关键思想是将实时数据流视为一张正在不断添加数据的表。可以把流计算等同于在一个静态表上的批处理查询,Spark会在不断添加数据的无界输入表上运行计算,并进行增量查询。

        在无界表上对输入的查询将生成结果表,系统每隔一定的周期会触发对无界表的计算并更新结果表。如图Structured Streaming编程模型。

(二)两种处理模型

1、微批处理

        Structured Streaming默认使用微批处理执行模型,这意味着Spark流计算引擎会定期检查流数据源,并对自上一批次结束后到达的新数据执行批量查询。数据到达和得到处理并输出结果之间的延时超过100毫秒。

2、持续处理模型

        Spark从2.3.0版本开始引入了持续处理的试验性功能,可以实现流计算的毫秒级延迟。在持续处理模式下,Spark不再根据触发器来周期性启动任务,而是启动一系列的连续读取、处理和写入结果的长时间运行的任务。

(三)Structured Streaming和Spark SQL、Spark Streaming关系

        Structured Streaming处理的数据跟Spark Streaming一样,也是源源不断的数据流,区别在于,Spark Streaming采用的数据抽象是DStream(本质上就是一系列RDD),而Structured Streaming采用的数据抽象是DataFrame。

        Structured Streaming可以使用Spark SQL的DataFrame/Dataset来处理数据流。虽然Spark SQL也是采用DataFrame作为数据抽象,但是,Spark SQL只能处理静态的数据,而Structured Streaming可以处理结构化的数据流。这样,Structured Streaming就将Spark SQL和Spark Streaming二者的特性结合了起来。

        Structured Streaming可以对DataFrame/Dataset应用前面章节提到的各种操作,包括select、where、groupBy、map、filter、flatMap等。

        Spark Streaming只能实现秒级的实时响应,而Structured Streaming由于采用了全新的设计方式,采用微批处理模型时可以实现100毫秒级别的实时响应,采用持续处理模型时可以支持毫秒级的实时响应。

二、编写Structured Streaming程序的基本步骤

编写Structured Streaming程序的基本步骤包括:
(1)导入pyspark模块
(2)创建SparkSession对象
(3)创建输入数据源
(4)定义流计算过程
(5)启动流计算并输出结果

        实例任务:一个包含很多行英文语句的数据流源源不断到达,Structured Streaming程序对每行英文语句进行拆分,并统计每个单词出现的频率。

(一)实现步骤

1、步骤一:导入pyspark模块

        导入PySpark模块,代码如下:

from pyspark.sql import SparkSession 
from pyspark.sql.functions import split 
from pyspark.sql.functions import explode

        由于程序中需要用到拆分字符串和展开数组内的所有单词的功能,所以引用了来自pyspark.sql.functions里面的split和explode函数。

2、步骤二:创建SparkSession对象

        创建一个SparkSession对象,代码如下:

if __name__ == "__main__":spark = SparkSession \.builder \.appName("StructuredNetworkWordCount") \.getOrCreate()spark.sparkContext.setLogLevel('WARN')

3、步骤三:创建输入数据源

        创建一个输入数据源,从“监听在本机(localhost)的9999端口上的服务”那里接收文本数据,具体语句如下:    

    lines = spark \.readStream \.format("socket") \.option("host", "localhost") \.option("port", 9999) \.load()

4、步骤四:定义流计算过程

        有了输入数据源以后,接着需要定义相关的查询语句,具体如下:    

    words = lines.select(explode(split(lines.value, " ")).alias("word"))wordCounts = words.groupBy("word").count()

5、步骤五:启动流计算并输出结果

        定义完查询语句后,下面就可以开始真正执行流计算,具体语句如下:

    query = wordCounts \.writeStream \.outputMode("complete") \.format("console") \.trigger(processingTime="8 seconds") \.start()query.awaitTermination()

(二)运行测试

        把上述五步的代码写入文件StructuredNetworkWordCount.py。在执行StructuredNetworkWordCount.py之前,需要启动HDFS。启动HDFS的命令如下:

start-dfs.sh

        新建一个终端(记作“数据源终端”),输入如下命令:

nc -lk 9999

        再新建一个终端(记作“流计算终端”),执行如下命令:

cd /usr/local/mycode/structuredstreaming/
spark-submit StructuredNetworkWordCount.py

        为了模拟文本数据流,可以在“数据源终端”内用键盘不断敲入一行行英文语句,nc程序会把这些数据发送给StructuredNetworkWordCount.py程序进行处理,比如输入如下数据:

apache spark
apache hadoop

        则在“流计算终端”窗口内会输出类似以下的结果信息:

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
|  word|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
|  word|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+

三、输入源

(一)File源

        File源(或称为“文件源”)以文件流的形式读取某个目录中的文件,支持的文件格式为csv、json、orc、parquet、text等。需要注意的是,文件放置到给定目录的操作应当是原子性的,即不能长时间在给定目录内打开文件写入内容,而是应当采取大部分操作系统都支持的、通过写入到临时文件后移动文件到给定目录的方式来完成。

        File源的选项(option)主要包括如下几个。
(1)path:输入路径的目录,所有文件格式通用。path支持glob通配符路径,但是目录或glob通配符路径的格式不支持以多个逗号分隔的形式。
(2)maxFilesPerTrigger:每个触发器中要处理的最大新文件数(默认无最大值)。
(3)latestFirst:是否优先处理最新的文件,当有大量文件积压时,设置为True可以优先处理新文件,默认为False。
(4)fileNameOnly:是否仅根据文件名而不是完整路径来检査新文件,默认为False。如果设置
为True,则以下文件将被视为相同的文件,因为它们的文件名"dataset.txt"相同:

        这里以一个JSON格式文件的处理来演示File源的使用方法,主要包括以下两个步骤:

(1)创建程序生成JSON格式的File源测试数据

(2)创建程序对数据进行统计

1、创建程序生成JSON格式的File源测试数据 

        为了演示JSON格式文件的处理,这里随机生成一些JSON格式的文件来进行测试。代码文件spark_ss_filesource_generate.py内容如下:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-# 导入需要用到的模块
import os
import shutil
import random
import timeTEST_DATA_TEMP_DIR = '/tmp/'
TEST_DATA_DIR = '/tmp/testdata/'ACTION_DEF = ['login', 'logout', 'purchase']
DISTRICT_DEF = ['fujian', 'beijing', 'shanghai', 'guangzhou']
JSON_LINE_PATTERN = '{{"eventTime": {}, "action": "{}", "district": "{}"}}\n‘# 测试的环境搭建,判断文件夹是否存在,如果存在则删除旧数据,并建立文件夹
def test_setUp():if os.path.exists(TEST_DATA_DIR):shutil.rmtree(TEST_DATA_DIR, ignore_errors=True)os.mkdir(TEST_DATA_DIR) # 测试环境的恢复,对文件夹进行清理
def test_tearDown():if os.path.exists(TEST_DATA_DIR):shutil.rmtree(TEST_DATA_DIR, ignore_errors=True)# 生成测试文件
def write_and_move(filename, data):with open(TEST_DATA_TEMP_DIR + filename,"wt", encoding="utf-8") as f:f.write(data)shutil.move(TEST_DATA_TEMP_DIR + filename,TEST_DATA_DIR + filename)if __name__ == "__main__":test_setUp()for i in range(1000):filename = 'e-mall-{}.json'.format(i)content = ''rndcount = list(range(100))random.shuffle(rndcount)for _ in rndcount:content += JSON_LINE_PATTERN.format(str(int(time.time())),random.choice(ACTION_DEF),random.choice(DISTRICT_DEF))write_and_move(filename, content)time.sleep(1)test_tearDown()

        这段程序首先建立测试环境,清空测试数据所在的目录,接着使用for循环一千次来生成一千个文件,文件名为“e-mall-数字.json”, 文件内容是不超过100行的随机JSON行,行的格式是类似如下:

 {"eventTime": 1546939167, "action": "logout", "district": "fujian"}\n

2、创建程序对数据进行统计

        spark_ss_filesource.py”,其代码内容如下:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-# 导入需要用到的模块
import os
import shutil
from pprint import pprintfrom pyspark.sql import SparkSession
from pyspark.sql.functions import window, asc
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import TimestampType, StringType# 定义JSON文件的路径常量
TEST_DATA_DIR_SPARK = 'file:///tmp/testdata/'if __name__ == "__main__":# 定义模式,为时间戳类型的eventTime、字符串类型的操作和省份组成schema = StructType([StructField("eventTime", TimestampType(), True),StructField("action", StringType(), True),StructField("district", StringType(), True)])spark = SparkSession \.builder \.appName("StructuredEMallPurchaseCount") \.getOrCreate()spark.sparkContext.setLogLevel('WARN')lines = spark \.readStream \.format("json") \.schema(schema) \.option("maxFilesPerTrigger", 100) \.load(TEST_DATA_DIR_SPARK)# 定义窗口windowDuration = '1 minutes'windowedCounts = lines \.filter("action = 'purchase'") \.groupBy('district', window('eventTime', windowDuration)) \.count() \.sort(asc('window')) query = windowedCounts \.writeStream \.outputMode("complete") \.format("console") \.option('truncate', 'false') \.trigger(processingTime="10 seconds") \.start()query.awaitTermination()

3、测试运行程序

        程序运行过程需要访问HDFS,因此,需要启动HDFS,命令如下:

start-dfs.sh

        新建一个终端,执行如下命令生成测试数据:

cd /usr/local/mycode/structuredstreaming/file
python3 spark_ss_filesource_generate.py

        新建一个终端,执行如下命令运行数据统计程序:

cd /usr/local/mycode/structuredstreaming/file
spark-submit spark_ss_filesource.py

        运行程序以后,可以看到类似如下的输出结果:

-------------------------------------------                                     
Batch: 0
-------------------------------------------
+---------+------------------------------------------+-----+
|district |window                                    |count|
+---------+------------------------------------------+-----+
|guangzhou|[2019-01-08 17:19:00, 2019-01-08 17:20:00]|283  |
|shanghai |[2019-01-08 17:19:00, 2019-01-08 17:20:00]|251  |
|fujian   |[2019-01-08 17:19:00, 2019-01-08 17:20:00]|258  |
|beijing  |[2019-01-08 17:19:00, 2019-01-08 17:20:00]|258  |
|guangzhou|[2019-01-08 17:20:00, 2019-01-08 17:21:00]|492  |
|beijing  |[2019-01-08 17:20:00, 2019-01-08 17:21:00]|499  |
|fujian   |[2019-01-08 17:20:00, 2019-01-08 17:21:00]|513  |
|shanghai |[2019-01-08 17:20:00, 2019-01-08 17:21:00]|503  |
|guangzhou|[2019-01-08 17:21:00, 2019-01-08 17:22:00]|71   |
|fujian   |[2019-01-08 17:21:00, 2019-01-08 17:22:00]|74   |
|shanghai |[2019-01-08 17:21:00, 2019-01-08 17:22:00]|66   |
|beijing  |[2019-01-08 17:21:00, 2019-01-08 17:22:00]|52   |
+---------+------------------------------------------+-----+

(二)Kafka源

        Kafka源是流处理最理想的输入源,因为它可以保证实时和容错。Kafka源的选项(option)包括如下几个。
(1)assign:指定所消费的Kafka主题和分区。
(2)subscribe:订阅的Kafka主题,为逗号分隔的主题列表。
(3)subscribePattern:订阅的Kafka主题正则表达式,可匹配多个主题。
(4)kafka.bootstrap.servers:Kafka服务器的列表,逗号分隔的 "host:port"列表。
(5)startingOffsets:起始位置偏移量。
(6)endingOffsets:结束位置偏移量。
(7)failOnDataLoss:布尔值,表示是否在Kafka数据可能丢失时(主题被删除或位置偏移量超出范围等)触发流计算失败。一般应当禁止,以免误报。

        在这个实例中,使用生产者程序每0.1秒生成一个包含2个字母的单词,并写入Kafka的名称为“wordcount-topic”的主题(Topic)内。Spark的消费者程序通过订阅wordcount-topic,会源源不断收到单词,并且每隔8秒钟对收到的单词进行一次词频统计,把统计结果输出到Kafka的主题wordcount-result-topic内,同时,通过2个监控程序检查Spark处理的输入和输出结果。

1、启动Kafka 

        在Linux系统中新建一个终端(记作“Zookeeper终端”),输入下面命令启动Zookeeper服务:

cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

        不要关闭这个终端窗口,一旦关闭,Zookeeper服务就停止了。另外打开第二个终端(记作“Kafka终端”),然后输入下面命令启动Kafka服务:

cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties

        不要关闭这个终端窗口,一旦关闭,Kafka服务就停止了。

        再新开一个终端(记作“监控输入终端”),执行如下命令监控Kafka收到的文本:

cd /usr/local/kafka
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wordcount-topic

        再新开一个终端(记作“监控输出终端”),执行如下命令监控输出的结果文本:

cd /usr/local/kafka
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wordcount-result-topic

2、编写生产者(Producer)程序

        代码文件spark_ss_kafka_producer.py内容如下:

#!/usr/bin/env python3import string
import random
import timefrom kafka import KafkaProduceif __name__ == "__main__":producer = KafkaProducer(bootstrap_servers=['localhost:9092'])while True:s2 = (random.choice(string.ascii_lowercase) for _ in range(2))word = ''.join(s2)value = bytearray(word, 'utf-8')producer.send('wordcount-topic', value=value).get(timeout=10)time.sleep(0.1)

        如果还没有安装Python3的Kafka支持,需要按照如下操作进行安装:

(1)首先确认有没有安装pip3,如果没有,使用如下命令安装:

apt-get install pip3

(2)安装kafka-python模块,命令如下:

pip3 install kafka-python

然后在终端中执行如下命令运行生产者程序:

cd /usr/local/mycode/structuredstreaming/kafka/
python3 spark_ss_kafka_producer.py

生产者程序执行以后,在“监控输入终端”的窗口内就可以看到持续输出包含2个字母的单词。

3、编写消费者(Consumer)程序

        代码文件spark_ss_kafka_consumer.py内容如下:

#!/usr/bin/env python3from pyspark.sql import SparkSessionif __name__ == "__main__":spark = SparkSession \.builder \.appName("StructuredKafkaWordCount") \.getOrCreate()spark.sparkContext.setLogLevel('WARN‘)lines = spark \.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "localhost:9092") \.option("subscribe", 'wordcount-topic') \.load() \.selectExpr("CAST(value AS STRING)")wordCounts = lines.groupBy("value").count()query = wordCounts \.selectExpr("CAST(value AS STRING) as key", "CONCAT(CAST(value AS STRING), ':', CAST(count AS STRING)) as value") \.writeStream \.outputMode("complete") \.format("kafka") \.option("kafka.bootstrap.servers", "localhost:9092") \.option("topic", "wordcount-result-topic") \.option("checkpointLocation", "file:///tmp/kafka-sink-cp") \.trigger(processingTime="8 seconds") \.start()query.awaitTermination()

        在终端中执行如下命令运行消费者程序:

cd /usr/local/mycode/structuredstreaming/kafka/
/usr/local/spark/bin/spark-submit \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 \
spark_ss_kafka_consumer.py

        消费者程序运行起来以后,可以在“监控输出终端”看到类似如下的输出结果:

sq:3
bl:6
lo:8
…

(三)Socket源

        Socket源从一个本地或远程主机的某个端口服务上读取数据,数据的编码为UTF8。因为Socket源使用内存保存读取到的所有数据,并且远端服务不能保证数据在出错后可以使用检查点或者指定当前已处理的偏移量来重放数据,所以,它无法提供端到端的容错保障。Socket源一般仅用于测试或学习用途。

        Socket源的选项(option)包括如下几个。
(1)host:主机IP地址或者域名,必须设置。
(2)port:端口号,必须设置。
(3)includeTimestamp:是否在数据行内包含时间戳。使用时间戳可以用来测试基于时间聚合的
功能。

        Socket源的实例可以参考“二、编写Structured Streaming程序的基本步骤”的StructuredNetworkWordCount.py。

(四)Rate源

        Rate源可每秒生成特定个数的数据行,每个数据行包括时间戳和值字段。时间戳是消息发送的时间,值是从开始到当前消息发送的总个数,从0开始。Rate源一般用来作为调试或性能基准测试。

        Rate源的选项(option)包括如下几个。
(1)rOwsPerSecond:每秒产生多少行数据,默认为1。
(2)rampUpTime:生成速度达到rowsPerSecond需要多少启动时间,使用比秒更精细的粒度将
会被截断为整数秒,默认为0秒。
(3)numPartitions:使用的分区数,默认为Spark的默认分区数。

        Rate源会尽可能地使每秒生成的数据量达到rowsPerSecond,可以通过调整numPartitions以尽快达到所需的速度。这几个参数的作用类似一辆汽车从0加速到100千米/小时并以100千米/小时进行巡航的过程,通过增加“马力”(numPartitions),可以使得加速时间(rampUpTime)更短。

        代码文件spark_ss_rate.py内容如下:

#!/usr/bin/env python3from pyspark.sql import SparkSessionif __name__ == "__main__":spark = SparkSession \.builder \.appName("TestRateStreamSource") \.getOrCreate()spark.sparkContext.setLogLevel('WARN‘)lines = spark \.readStream \.format("rate") \.option('rowsPerSecond', 5) \.load()print(lines.schema)query = lines \.writeStream \.outputMode("update") \.format("console") \.option('truncate', 'false') \.start()query.awaitTermination()

        在Linux终端中执行如下命令执行spark_ss_rate.py:

cd /usr/local/mycode/structuredstreaming/rate/
spark-submit spark_ss_rate.py

        上述命令执行后,会得到类似如下的结果:

StructType(List(StructField(timestamp,TimestampType,true),StructField(value,LongType,true)))
-------------------------------------------
Batch: 0
-------------------------------------------
+---------+-----+
|timestamp|value|
+---------+-----+
+---------+-----+ -------------------------------------------
Batch: 1
-------------------------------------------
+-----------------------+-----+
|timestamp              |value|
+-----------------------+-----+
|2018-10-01 15:42:38.595|0    |
|2018-10-01 15:42:38.795|1    |
|2018-10-01 15:42:38.995|2    |
|2018-10-01 15:42:39.195|3    |
|2018-10-01 15:42:39.395|4    |
+-----------------------+-----+

四、输出操作

(一)启动流计算

        DataFrame/Dataset的.writeStream()方法将会返回DataStreamWriter接口,接口通过.start()真正启动流计算,并将DataFrame/Dataset写入到外部的输出接收器,DataStreamWriter接口有以下几个主要函数:

(1)format:接收器类型。
(2)outputMode:输出模式,指定写入接收器的内容,可以是Append模式、Complete模式或Update模式。
(3)queryName:查询的名称,可选,用于标识查询的唯一名称。
(4)trigger:触发间隔,可选,设定触发间隔,如果未指定,则系统将在上一次处理完成后立即检查新数据的可用性。如果由于先前的处理尚未完成导致超过触发间隔,则系统将在处理完成后立即触发新的查询。

(二)输出模式

        输出模式用于指定写入接收器的内容,主要有以下几种:
(1)Append模式:只有结果表中自上次触发间隔后增加的新行,才会被写入外部存储器。这种模式一般适用于“不希望更改结果表中现有行的内容”的使用场景。
(2)Complete模式:已更新的完整的结果表可被写入外部存储器。
(3)Update模式:只有自上次触发间隔后结果表中发生更新的行,才会被写入外部存储器。这种模式与Complete模式相比,输出较少,如果结果表的部分行没有更新,则不会输出任何内容。当查询不包括聚合时,这个模式等同于Append模式。

        不同的流计算查询类型支持不同的输出模式,二者之间的兼容性如下表所示。

查询类型支持的输出模式备注
聚合查询在事件时间字段上使用水印的聚合Append
Complete
Update
Append模式使用水印来清理旧的聚合状态
其他聚合Complete
Update
连接查询Append
其他查询Append
Update
不支持Complete模式,因为无法将所有未分组数据保存在结果表内

(三)输出接收器

        系统内置的输出接收器包括File接收器、Kafka接收器、Foreach接收器、Console接收器、Memory接收器等,其中,Console接收器和Memory接收器仅用于调试用途。有些接收器由于无法保证输出的持久性,导致其不是容错的。Spark内置的输出接收器的详细信息如下表所示。

接收器支持的输出模式选项容错
File接收器Appendpath:输出目录的路径必须指定是。数据只会被处理一次
Kafka接收器Append
Complete
Update
选项较多,具体可查看Kafka对接指南是。数据至少被处理一次
Foreach接收器Append
Complete
Update
依赖于ForeachWriter的实现
Console接收器Append
Complete
Update
numRows:每次触发后打印多少行,默认为20;
truncate:如果行太长是否截断,默认为“是”
Memory接收器Append
Complete
否。在Complete输出模式下,重启查询会重建全表

        以File接收器为例,这里把“二、编写Structured Streaming程序的基本步骤”的实例修改为使用File接收器,修改后的代码文件为StructuredNetworkWordCountFileSink.py:

#!/usr/bin/env python3from pyspark.sql import SparkSession
from pyspark.sql.functions import split
from pyspark.sql.functions import explode
from pyspark.sql.functions import lengthif __name__ == "__main__":spark = SparkSession \.builder \.appName("StructuredNetworkWordCountFileSink") \.getOrCreate()spark.sparkContext.setLogLevel('WARN')lines = spark \.readStream \.format("socket") \.option("host", "localhost") \.option("port", 9999) \.load() words = lines.select(explode(split(lines.value, " ")).alias("word"))all_length_5_words = words.filter(length("word") == 5)query = all_length_5_words \.writeStream \.outputMode("append") \.format("parquet") \.option("path", "file:///tmp/filesink") \.option("checkpointLocation", "file:///tmp/file-sink-cp") \.trigger(processingTime="8 seconds") \.start() query.awaitTermination()

        在Linux系统中新建一个终端(记作“数据源终端”),输入如下命令:

nc -lk 9999

        再新建一个终端(记作“流计算终端”),执行如下命令执行StructuredNetworkWordCountFileSink.py:

cd /usr/local/mycode/structuredstreaming
spark-submit StructuredNetworkWordCountFileSink.py

        为了模拟文本数据流,可以在数据源终端内用键盘不断敲入一行行英文语句,并且让其中部分英语单词长度等于5。

        由于程序执行后不会在终端输出信息,这时可新建一个终端,执行如下命令查看File接收器保存的位置:

cd /tmp/filesink
ls

        可以看到以parquet格式保存的类似如下的文件列表:

part-00000-2bd184d2-e9b0-4110-9018-a7f2d14602a9-c000.snappy.parquet
part-00000-36eed4ab-b8c4-4421-adc6-76560699f6f5-c000.snappy.parquet
part-00000-dde601ad-1b49-4b78-a658-865e54d28fb7-c000.snappy.parquet
part-00001-eedddae2-fb96-4ce9-9000-566456cd5e8e-c000.snappy.parquet
_spark_metadata

        可以使用strings命令查看文件内的字符串,具体如下:

strings part-00003-89584d0a-db83-467b-84d8-53d43baa4755-c000.snappy.parquet

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/256112.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

人工智能|深度学习——使用多层级注意力机制和keras实现问题分类

代码下载 使用多层级注意力机制和keras实现问题分类资源-CSDN文库 1 准备工作 1.1 什么是词向量? ”词向量”(词嵌入)是将一类将词的语义映射到向量空间中去的自然语言处理技术。即将一个词用特定的向量来表示,向量之间的距离(例…

【 buuctf--刷新过的图片】

前言:这题主要运用到了新的工具F5-steganography由于 java 环境不合适的原因,我不得不重新配java11.0.18。 具体思路:非常帅气的一张图片。。。用 binwalk,stegsolve,zsteg,exiftool 等工具无果后&#xf…

【Java程序设计】【C00253】基于Springboot的在线考试管理系统(有论文)

基于Springboot的在线考试管理系统(有论文) 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于Springboot的在线考试系统 本系统分为系统功能模块、管理员功能模块以及用户功能模块。 系统功能模块:系统登录,管理…

【Django】Django文件上传

文件上传 1 定义&场景 定义&#xff1a;用户可以通过浏览器将图片等文件上传至网站。 场景&#xff1a; 用户上传头像。 上传流程性的文档[pdf&#xff0c;txt等] 2 上传规范-前端[html] 文件上传必须为POST提交方式 表单 <form> 中文件上传时必须带有 enctype…

图(高阶数据结构)

目录 一、图的基本概念 二、图的存储结构 2.1 邻接矩阵 2.2 邻接表 三、图的遍历 3.1 广度优先遍历 3.2 深度优先遍历 四、最小生成树 4.1 Kruskal算法 4.2 Prim算法 五、最短路径 5.1 单源最短路径-Dijkstra算法 5.2 单源最短路径-Bellman-Ford算法 5.3 多源最…

JCIM | MD揭示PTP1B磷酸酶激活RtcB连接酶的机制

Background 内质网应激反应&#xff08;UPR&#xff09; 中的一个重要过程。UPR是由内质网中的三种跨膜传感器&#xff08;IRE1、PERK和ATF6&#xff09;控制的细胞应激反应&#xff0c;当内质网中的蛋白质折叠能力受到压力时&#xff0c;UPR通过减少蛋白质合成和增加未折叠或错…

springcloud分布式架构网上商城源码和论文

首先,论文一开始便是清楚的论述了系统的研究内容。其次,剖析系统需求分析,弄明白“做什么”,分析包括业务分析和业务流程的分析以及用例分析,更进一步明确系统的需求。然后在明白了系统的需求基础上需要进一步地设计系统,主要包罗软件架构模式、整体功能模块、数据库设计。本项…

集合进阶(双列集合、HashMap、LinkedHashMap、TreeMap、Collections)

目录 一、双列集合 1、双列集合的特点 2、双列集合的常见API 3、Map的遍历方式 3.1第一种遍历方式&#xff1a;键找值&#xff08;keySet&#xff09; 3.2第二种遍历方式&#xff1a;键值对&#xff08;entrySet&#xff09;Entry&#xff1a;键值对对象 3.3第三种遍历方…

vue 引入 百度地图API 和 路书

公司项目中&#xff0c;偶尔都会涉及到地图的使用&#xff0c;这里以百度地图为例&#xff0c;我们梳理一下引用流程及注意点 账号和获取密钥 百度地图示例 百度地图 类参考 1、账号和获取密钥 // api.map.baidu.com/api?typewebgl&v3.0&ak您的密钥<script type…

EasyExcel下载带下拉框和批注模板

EasyExcel下载带下拉框和批注模板 一、 代码实现 controller下载入口 /***下载excel模板* author youlu* date 2023/8/14 17:31* param response* param request* return void*/PostMapping("/downloadTemplate")public void downloadExcel(HttpServletResponse r…

Linux笔记之xhost +和docker的关系以及GDK_SCALE和GDK_DPI_SCALE详解

Linux笔记之xhost 和docker的关系以及GDK_SCALE和GDK_DPI_SCALE详解 ——2024-02-11 code review! 文章目录 Linux笔记之xhost 和docker的关系以及GDK_SCALE和GDK_DPI_SCALE详解xhost 的作用xhost 与 Docker 的关系 -e GDK_SCALE 和 -e GDK_DPI_SCALE详解GDK_SCALEGDK_DPI_SC…

Java设计模式大全:23种常见的设计模式详解(一)

本系列文章简介&#xff1a; 设计模式是在软件开发过程中&#xff0c;经过实践和总结得到的一套解决特定问题的可复用的模板。它是一种在特定情境中经过验证的经验和技巧的集合&#xff0c;可以帮助开发人员设计出高效、可维护、可扩展和可复用的软件系统。设计模式提供了一种在…

华为 huawei 交换机 接口 MAC 地址学习限制接入用户数量 配置示例

目录 组网需求: 配置思路&#xff1a; 操作步骤&#xff1a; 配置文件&#xff1a; 组网需求: 如 图 2-14 所示&#xff0c;用户网络 1 和用户网络 2 通过 LSW 与 Switch 相连&#xff0c; Switch 连接 LSW 的接口为GE0/0/1 。用户网络 1 和用户网络 2 分别属于 VLAN10 和 V…

幻兽帕鲁PalWorld服务器2024年配置选择

幻兽帕鲁PalWorld是一款备受期待的虚拟游戏&#xff0c;其独特的幻兽系统和丰富的世界观吸引了大量玩家。然而&#xff0c;随着游戏日益受到关注&#xff0c;服务器的配置选择成为了关键问题。2024年&#xff0c;随着技术不断发展&#xff0c;玩家对于游戏体验的需求也在不断提…

Quartus工程的qsf配置约束文件介绍

一、qsf文件概述 qsf&#xff1a;Quartus Setting File&#xff0c;是Quartus工程的配置文件&#xff1b; 包含一个Quartus工程的所有约束&#xff0c;包括工程的软件版本信息、FPGA器件信息、引脚约分配、引脚电平分配&#xff0c;编译约束和用于Classic TimingAnalyzer的时…

通过QT制作一个模仿微信主界面的界面(不要求实现具体通信功能)

main.cpp #include "widget.h" #include "second.h"#include <QApplication>int main(int argc, char *argv[]) {QApplication a(argc, argv);Widget w;w.show();//实例化第二个界面Second s;QObject::connect(&w, &Widget::my_jump, &…

微信,支付宝在线换钱平台系统源码

探索全新、全开源的在线换钱系统源码&#xff0c;它将以前所未有的方式改变您的支付体验。我们为您精心打造了一个集简单易用与安全高效于一身的优质产品&#xff0c;它采用最新的技术开发&#xff0c;为您带来前所未有的便捷与安心。 这款在线换钱系统源码设计直观&#xff0…

netty-websocket扩展协议及token鉴权补充

文章源码&#xff1a;gitee 源码部分可以看上一篇文章中的源码分析netty-websocket 鉴权token及统一请求和响应头&#xff08;鉴权控制器&#xff09; 最近刚好没事&#xff0c;看到有朋友说自定义协议好搞&#xff0c;我就想了想&#xff0c;发现上面那种方式实现确实麻烦&…

分享springboot框架的一个开源的本地开发部署教程(若依开源项目开发部署过程分享持续更新二开宝藏项目MySQL数据库版)

1首先介绍下若依项目&#xff1a; 若依是一个基于Spring Boot和Spring Cloud技术栈开发的多租户权限管理系统。该开源项目提供了一套完整的权限管理解决方案&#xff0c;包括用户管理、角色管理、菜单管理、部门管理、岗位管理等功能。 若依项目采用前后端分离的架构&#xf…