from pyspark. sql import SparkSession
from pyspark. sql. types import StringType
from pyspark. sql. functions import udf, col
from pyspark. sql. types import BooleanType
import pandas as pd
spark = SparkSession. builder. appName( "StringFilter" ) . getOrCreate( )
string_columns = [ field. name for field in df. schema. fields if isinstance ( field. dataType, StringType) ]
allowed_chars = set ( 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!"#$%&\'()*+,-./:;<=>?@[]^_`{|}~' )
def has_invalid_chars ( s) : if s is None : return False return any ( c not in allowed_chars for c in s) has_invalid_udf = udf( has_invalid_chars, BooleanType( ) )
if not string_columns: result_df = spark. createDataFrame( [ ] , df. schema)
else : condition = None for col_name in string_columns: col_condition = has_invalid_udf( col( col_name) ) if condition is None : condition = col_conditionelse : condition = condition | col_conditionfiltered_df = df. filter ( condition) empty_df = spark. createDataFrame( [ ] , df. schema) result_df = empty_df. union( filtered_df)
pd_df = result_df. toPandas( )
pd_df. to_excel( "output.xlsx" , index= False )
spark. stop( )
代码说明:
初始化与数据读取 :需要根据实际数据源替换读取逻辑(示例中被注释掉的spark.read.csv
部分)获取字符串列 :通过分析Schema获取所有字符串类型的字段定义字符白名单 :使用集合类型提升查询效率UDF定义 :用于检查字符串是否包含非法字符条件构建 :使用逻辑或组合所有字符串列的检查条件结果处理 : 直接处理空字符串列的边界情况 使用union
保持与原DataFrame结构一致 Excel导出 : 通过转换为Pandas DataFrame实现 注意大数据量时可能存在的内存问题
注意事项:
大数据量场景下建议分批次处理或使用分布式写入方式 Excel导出操作会触发数据收集到Driver节点,需确保资源充足 实际应用中建议添加异常处理机制 空值处理逻辑可根据业务需求调整(当前版本忽略NULL值)