pyspark 的相关函数
PySpark SQL provides several built-in standard functions pyspark.sql.functions to work with DataFrame and SQL queries. All these PySpark Functions return pyspark.sql.Column type.
get_json_object
/*** Extracts json object from a json string based on json path specified, and returns json string* of the extracted json object. It will return null if the input json string is invalid.** @group json_funcs* @since 1.6.0*/def get_json_object(e: Column, path: String): Column =Column.fn("get_json_object", e, lit(path))
以下可以看出 python 的函数实现,复用的是jvm 的那套实现
@_try_remote_functions
def get_json_object(col: "ColumnOrName", path: str) -> Column:"""Extracts json object from a json string based on json `path` specified, and returns json stringof the extracted json object. It will return null if the input json string is invalid... versionadded:: 1.6.0.. versionchanged:: 3.4.0Supports Spark Connect.Parameters----------col : :class:`~pyspark.sql.Column` or strstring column in json formatpath : strpath to the json object to extractReturns-------:class:`~pyspark.sql.Column`string representation of given JSON object value.Examples-------->>> data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": "value12"}''')]>>> df = spark.createDataFrame(data, ("key", "jstring"))>>> df.select(df.key, get_json_object(df.jstring, '$.f1').alias("c0"), \\... get_json_object(df.jstring, '$.f2').alias("c1") ).collect()[Row(key='1', c0='value1', c1='value2'), Row(key='2', c0='value12', c1=None)]"""from pyspark.sql.classic.column import _to_java_columnreturn _invoke_function("get_json_object", _to_java_column(col), path)
def _invoke_function(name: str, *args: Any) -> Column:"""Invokes JVM function identified by name with argsand wraps the result with :class:`~pyspark.sql.Column`."""from pyspark import SparkContextassert SparkContext._active_spark_context is not Nonejf = _get_jvm_function(name, SparkContext._active_spark_context)return Column(jf(*args))
def _get_jvm_function(name: str, sc: "SparkContext") -> Callable:"""Retrieves JVM function identified by name fromJava gateway associated with sc."""assert sc._jvm is not Nonereturn getattr(getattr(sc._jvm, "org.apache.spark.sql.functions"), name)
def _to_java_column(col: "ColumnOrName") -> "JavaObject":if isinstance(col, Column):jcol = col._jcelif isinstance(col, str):jcol = _create_column_from_name(col)else:raise PySparkTypeError(error_class="NOT_COLUMN_OR_STR",message_parameters={"arg_name": "col", "arg_type": type(col).__name__},)return jcol
FQA
ModuleNotFoundError: No module named 'numpy'
遇到该问题, 先看看 pyspark 使用的 python 版本
Welcome to____ __/ __/__ ___ _____/ /___\ \/ _ \/ _ `/ __/ '_//__ / .__/\_,_/_/ /_/\_\ version 3.3.2/_/Using Python version 3.9.9 (main, Mar 3 2024 19:54:45)
Spark context Web UI available at http://172.17.48.107:4040
Spark context available as 'sc' (master = local[10], app id = local-1719386128955).
SparkSession available as 'spark'.
然后使用响应的 python 版本 pip去安装 numpy, 否则安装了也 识别不了
pip3.9 install numpy