用SparkSQL和PySpark完成以下数据转换。
源数据:
userid,page_name,visit_time
1,A,2021-2-1
2,B,2024-1-1
1,C,2020-5-4
2,D,2028-9-1
目的数据:
user_id,page_name_path
1,C->A
2,B->D
PySpark:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window# 初始化SparkSession(如果在已有环境中可以直接使用已有的spark对象)
spark = SparkSession.builder.appName("DataTransformation").getOrCreate()# 创建示例数据的DataFrame
data = [(1, "A", "2021-2-1"),(2, "B", "2024-1-1"),(1, "C", "2020-5-4"),(2, "D", "2028-9-1")
]
columns = ["userid", "page_name", "visit_time"]
df = spark.createDataFrame(data, columns)# 将visit_time转换为日期类型,方便后续排序
df = df.withColumn("visit_time", F.to_date(F.col("visit_time")))# 按照userid分区,根据visit_time排序创建窗口
window_spec = Window.partitionBy("userid").orderBy("visit_time")# 使用collect_list函数收集每个userid对应的page_name列表,然后使用concat_ws函数将其拼接为指定格式
result_df = df.withColumn("page_name_list", F.collect_list("page_name").over(window_spec)) \.groupBy("userid") \.agg(F.concat_ws("->", F.col("page_name_list")).alias("page_name_path")) \.select("userid", "page_name_path")# 重命名userid列为user_id(和目标数据列名一致)
result_df = result_df.withColumnRenamed("userid", "user_id")# 展示结果
result_df.show()
SparkSQL:
SELECT userid AS user_id,CONCAT_WS('->', collect_list(page_name) OVER (PARTITION BY userid ORDER BY visit_time)) AS page_name_path
FROM page_visits
GROUP BY userid