Standard Functions -- functions Object¶
org.apache.spark.sql.functions
object defines built-in <
You can access the standard functions using the following import
statement in your Scala application:
[source, scala]¶
import org.apache.spark.sql.functions._¶
[[standard-functions]] .(Subset of) Standard Functions in Spark SQL [align="center",cols="1,1,2",width="100%",options="header"] |=== | |Name |Description
.26+.| [[aggregate-functions]][[agg_funcs]] Aggregate functions
| <
[source, scala]¶
approx_count_distinct(e: Column): Column approx_count_distinct(columnName: String): Column approx_count_distinct(e: Column, rsd: Double): Column approx_count_distinct(columnName: String, rsd: Double): Column
| <
[source, scala]¶
avg(e: Column): Column avg(columnName: String): Column
| <
[source, scala]¶
collect_list(e: Column): Column collect_list(columnName: String): Column
| <
[source, scala]¶
collect_set(e: Column): Column collect_set(columnName: String): Column
| <
[source, scala]¶
corr(column1: Column, column2: Column): Column corr(columnName1: String, columnName2: String): Column
| <
[source, scala]¶
count(e: Column): Column count(columnName: String): TypedColumn[Any, Long]
| <
[source, scala]¶
countDistinct(expr: Column, exprs: Column*): Column countDistinct(columnName: String, columnNames: String*): Column
| <
[source, scala]¶
covar_pop(column1: Column, column2: Column): Column covar_pop(columnName1: String, columnName2: String): Column
| <
[source, scala]¶
covar_samp(column1: Column, column2: Column): Column covar_samp(columnName1: String, columnName2: String): Column
| <
[source, scala]¶
first(e: Column): Column first(e: Column, ignoreNulls: Boolean): Column first(columnName: String): Column first(columnName: String, ignoreNulls: Boolean): Column
Returns the first value in a group. Returns the first non-null value when ignoreNulls
flag on. If all values are null, then returns null.
| <
[source, scala]¶
grouping(e: Column): Column grouping(columnName: String): Column
Indicates whether a given column is aggregated or not
| <
[source, scala]¶
grouping_id(cols: Column*): Column grouping_id(colName: String, colNames: String*): Column
Computes the level of grouping
| <
[source, scala]¶
kurtosis(e: Column): Column kurtosis(columnName: String): Column
| <
[source, scala]¶
last(e: Column, ignoreNulls: Boolean): Column last(columnName: String, ignoreNulls: Boolean): Column last(e: Column): Column last(columnName: String): Column
| <
[source, scala]¶
max(e: Column): Column max(columnName: String): Column
| <
[source, scala]¶
mean(e: Column): Column mean(columnName: String): Column
| <
[source, scala]¶
min(e: Column): Column min(columnName: String): Column
| <
[source, scala]¶
skewness(e: Column): Column skewness(columnName: String): Column
| <
[source, scala]¶
stddev(e: Column): Column stddev(columnName: String): Column
| <
[source, scala]¶
stddev_pop(e: Column): Column stddev_pop(columnName: String): Column
| <
[source, scala]¶
stddev_samp(e: Column): Column stddev_samp(columnName: String): Column
| <
[source, scala]¶
sum(e: Column): Column sum(columnName: String): Column
| <
[source, scala]¶
sumDistinct(e: Column): Column sumDistinct(columnName: String): Column
| <
[source, scala]¶
variance(e: Column): Column variance(columnName: String): Column
| <
[source, scala]¶
var_pop(e: Column): Column var_pop(columnName: String): Column
| <
[source, scala]¶
var_samp(e: Column): Column var_samp(columnName: String): Column
.31+.| [[collection_funcs]] Collection functions
| <
[source, scala]¶
array_contains(column: Column, value: Any): Column¶
| <
[source, scala]¶
array_distinct(e: Column): Column¶
(New in 2.4.0)
| <
[source, scala]¶
array_except(e: Column): Column¶
(New in 2.4.0)
| <
[source, scala]¶
array_intersect(col1: Column, col2: Column): Column¶
(New in 2.4.0)
| <
[source, scala]¶
array_join(column: Column, delimiter: String): Column array_join(column: Column, delimiter: String, nullReplacement: String): Column
(New in 2.4.0)
| <
[source, scala]¶
array_max(e: Column): Column¶
(New in 2.4.0)
| <
[source, scala]¶
array_min(e: Column): Column¶
(New in 2.4.0)
| <
[source, scala]¶
array_position(column: Column, value: Any): Column¶
(New in 2.4.0)
| <
[source, scala]¶
array_remove(column: Column, element: Any): Column¶
(New in 2.4.0)
| <
[source, scala]¶
array_repeat(e: Column, count: Int): Column array_repeat(left: Column, right: Column): Column
(New in 2.4.0)
| <
[source, scala]¶
array_sort(e: Column): Column¶
(New in 2.4.0)
| <
[source, scala]¶
array_union(col1: Column, col2: Column): Column¶
(New in 2.4.0)
| <
[source, scala]¶
arrays_zip(e: Column*): Column¶
(New in 2.4.0)
| <
[source, scala]¶
arrays_overlap(a1: Column, a2: Column): Column¶
(New in 2.4.0)
| <
[source, scala]¶
element_at(column: Column, value: Any): Column¶
(New in 2.4.0)
| spark-sql-functions-collection.md#explode[explode] a| [[explode]]
[source, scala]¶
explode(e: Column): Column¶
| spark-sql-functions-collection.md#explode_outer[explode_outer] a| [[explode_outer]]
[source, scala]¶
explode_outer(e: Column): Column¶
Creates a new row for each element in the given array or map column. If the array/map is null
or empty then null
is produced.
| <
[source, scala]¶
flatten(e: Column): Column¶
(New in 2.4.0)
| <
[source, scala]¶
from_json(e: Column, schema: Column): Column // <1> from_json(e: Column, schema: DataType): Column from_json(e: Column, schema: DataType, options: Map[String, String]): Column from_json(e: Column, schema: String, options: Map[String, String]): Column from_json(e: Column, schema: StructType): Column from_json(e: Column, schema: StructType, options: Map[String, String]): Column
<1> New in 2.4.0
Parses a column with a JSON string into a StructType or ArrayType of StructType
elements with the specified schema.
| <
[source, scala]¶
map_concat(cols: Column*): Column¶
(New in 2.4.0)
| <
[source, scala]¶
map_from_entries(e: Column): Column¶
(New in 2.4.0)
| <
[source, scala]¶
map_keys(e: Column): Column¶
| <
[source, scala]¶
map_values(e: Column): Column¶
| <
[source, scala]¶
posexplode(e: Column): Column¶
| <
[source, scala]¶
posexplode_outer(e: Column): Column¶
| <
[source, scala]¶
reverse(e: Column): Column¶
Returns a reversed string or an array with reverse order of elements
NOTE: Support for reversing arrays is new in 2.4.0.
| <
[source, scala]¶
schema_of_json(json: Column): Column schema_of_json(json: String): Column
(New in 2.4.0)
| <
[source, scala]¶
sequence(start: Column, stop: Column): Column sequence(start: Column, stop: Column, step: Column): Column
(New in 2.4.0)
| <
[source, scala]¶
shuffle(e: Column): Column¶
(New in 2.4.0)
| <
[source, scala]¶
size(e: Column): Column¶
Returns the size of the given array or map. Returns -1 if null
.
| <
[source, scala]¶
slice(x: Column, start: Int, length: Int): Column¶
(New in 2.4.0)
.9+.| [[datetime_funcs]] Date and time functions | <
[source, scala]¶
current_date(): Column¶
| <
[source, scala]¶
current_timestamp(): Column¶
| <
[source, scala]¶
from_utc_timestamp(ts: Column, tz: String): Column from_utc_timestamp(ts: Column, tz: Column): Column // <1>
<1> New in 2.4.0
| <
[source, scala]¶
months_between(end: Column, start: Column): Column months_between(end: Column, start: Column, roundOff: Boolean): Column // <1>
<1> New in 2.4.0
| <
[source, scala]¶
to_date(e: Column): Column to_date(e: Column, fmt: String): Column
| <
[source, scala]¶
to_timestamp(s: Column): Column to_timestamp(s: Column, fmt: String): Column
| <
[source, scala]¶
to_utc_timestamp(ts: Column, tz: String): Column to_utc_timestamp(ts: Column, tz: Column): Column // <1>
<1> New in 2.4.0
| <
[source, scala]¶
unix_timestamp(): Column unix_timestamp(s: Column): Column unix_timestamp(s: Column, p: String): Column
| <
[source, scala]¶
window( timeColumn: Column, windowDuration: String): Column window( timeColumn: Column, windowDuration: String, slideDuration: String): Column window( timeColumn: Column, windowDuration: String, slideDuration: String, startTime: String): Column
1+.| Math functions | <
.11+.| Regular functions (Non-aggregate functions)
[[array]] spark-sql-functions-regular-functions.md#array[array] |
---|
[[broadcast]] spark-sql-functions-regular-functions.md#broadcast[broadcast] |
---|
| [[coalesce]] spark-sql-functions-regular-functions.md#coalesce[coalesce] | Gives the first non-null
value among the given columns or null
| [[col]][[column]] spark-sql-functions-regular-functions.md#col[col] and spark-sql-functions-regular-functions.md#column[column] | Creating spark-sql-Column.md[Columns]
| spark-sql-functions-regular-functions.md#expr[expr] | [[expr]]
[[lit]] spark-sql-functions-regular-functions.md#lit[lit] |
---|
[[map]] spark-sql-functions-regular-functions.md#map[map] |
---|
| <
[[struct]] spark-sql-functions-regular-functions.md#struct[struct] |
---|
[[typedLit]] spark-sql-functions-regular-functions.md#typedLit[typedLit] |
---|
[[when]] spark-sql-functions-regular-functions.md#when[when] |
---|
.2+.| String functions | <
< |
---|
1.2+.| UDF functions | <
| <
.11+.| [[window-functions]] Window functions
| [[cume_dist]] <
[source, scala]¶
cume_dist(): Column¶
Computes the cumulative distribution of records across window partitions
| [[currentRow]] <
[source, scala]¶
currentRow(): Column¶
| [[dense_rank]] <
[source, scala]¶
dense_rank(): Column¶
Computes the rank of records per window partition
| [[lag]] <
[source, scala]¶
lag(e: Column, offset: Int): Column lag(columnName: String, offset: Int): Column lag(columnName: String, offset: Int, defaultValue: Any): Column
| [[lead]] <
[source, scala]¶
lead(columnName: String, offset: Int): Column lead(e: Column, offset: Int): Column lead(columnName: String, offset: Int, defaultValue: Any): Column lead(e: Column, offset: Int, defaultValue: Any): Column
| [[ntile]] <
[source, scala]¶
ntile(n: Int): Column¶
Computes the ntile group
| [[percent_rank]] <
[source, scala]¶
percent_rank(): Column¶
Computes the rank of records per window partition
| [[rank]] <
[source, scala]¶
rank(): Column¶
Computes the rank of records per window partition
| [[row_number]] <
[source, scala]¶
row_number(): Column¶
Computes the sequential numbering per window partition
| [[unboundedFollowing]] <
[source, scala]¶
unboundedFollowing(): Column¶
| [[unboundedPreceding]] <
[source, scala]¶
unboundedPreceding(): Column¶
|===
TIP: The page gives only a brief ovierview of the many functions available in functions
object and so you should read the http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$[official documentation of the functions
object].
=== [[callUDF]] Executing UDF by Name and Variable-Length Column List -- callUDF
Function
[source, scala]¶
callUDF(udfName: String, cols: Column*): Column¶
callUDF
executes an UDF by udfName
and variable-length list of columns.
=== [[udf]] Defining UDFs -- udf
Function
[source, scala]¶
udf(f: FunctionN[...]): UserDefinedFunction¶
The udf
family of functions allows you to create spark-sql-udfs.md[user-defined functions (UDFs)] based on a user-defined function in Scala. It accepts f
function of 0 to 10 arguments and the input and output types are automatically inferred (given the types of the respective input and output types of the function f
).
[source, scala]¶
import org.apache.spark.sql.functions._ val _length: String => Int = _.length val _lengthUDF = udf(_length)
// define a dataframe val df = sc.parallelize(0 to 3).toDF("num")
// apply the user-defined function to "num" column scala> df.withColumn("len", _lengthUDF($"num")).show +---+---+ |num|len| +---+---+ | 0| 1| | 1| 1| | 2| 1| | 3| 1| +---+---+
Since Spark 2.0.0, there is another variant of udf
function:
[source, scala]¶
udf(f: AnyRef, dataType: DataType): UserDefinedFunction¶
udf(f: AnyRef, dataType: DataType)
allows you to use a Scala closure for the function argument (as f
) and explicitly declaring the output data type (as dataType
).
[source, scala]¶
// given the dataframe above
import org.apache.spark.sql.types.IntegerType val byTwo = udf((n: Int) => n * 2, IntegerType)
scala> df.withColumn("len", byTwo($"num")).show +---+---+ |num|len| +---+---+ | 0| 0| | 1| 2| | 2| 4| | 3| 6| +---+---+
=== [[split]] split
Function
[source, scala]¶
split(str: Column, pattern: String): Column¶
split
function splits str
column using pattern
. It returns a new Column
.
NOTE: split
UDF uses https://docs.oracle.com/javase/8/docs/api/java/lang/String.html#split-java.lang.String-int-[java.lang.String.split(String regex, int limit)] method.
[source, scala]¶
val df = Seq((0, "hello|world"), (1, "witaj|swiecie")).toDF("num", "input") val withSplit = df.withColumn("split", split($"input", "[|]"))
scala> withSplit.show +---+-------------+----------------+ |num| input| split| +---+-------------+----------------+ | 0| hello|world| [hello, world]| | 1|witaj|swiecie|[witaj, swiecie]| +---+-------------+----------------+
NOTE: .$|()[{^?*+\
are RegEx's meta characters and are considered special.
=== [[upper]] upper
Function
[source, scala]¶
upper(e: Column): Column¶
upper
function converts a string column into one with all letter upper. It returns a new Column
.
NOTE: The following example uses two functions that accept a Column
and return another to showcase how to chain them.
[source, scala]¶
val df = Seq((0,1,"hello"), (2,3,"world"), (2,4, "ala")).toDF("id", "val", "name") val withUpperReversed = df.withColumn("upper", reverse(upper($"name")))
scala> withUpperReversed.show +---+---+-----+-----+ | id|val| name|upper| +---+---+-----+-----+ | 0| 1|hello|OLLEH| | 2| 3|world|DLROW| | 2| 4| ala| ALA| +---+---+-----+-----+
=== [[bin]] Converting Long to Binary Format (in String Representation) -- bin
Function
[source, scala]¶
bin(e: Column): Column bin(columnName: String): Column // <1>
<1> Calls the first bin
with columnName
as a Column
bin
converts the long value in a column to its binary format (i.e. as an unsigned integer in base 2) with no extra leading 0s.
[source, scala]¶
scala> spark.range(5).withColumn("binary", bin('id)).show +---+------+ | id|binary| +---+------+ | 0| 0| | 1| 1| | 2| 10| | 3| 11| | 4| 100| +---+------+
val withBin = spark.range(5).withColumn("binary", bin('id)) scala> withBin.printSchema root |-- id: long (nullable = false) |-- binary: string (nullable = false)
Internally, bin
creates a spark-sql-Column.md[Column] with Bin
unary expression.
[source, scala]¶
scala> withBin.queryExecution.logical res2: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = 'Project [*, bin('id) AS binary#14] +- Range (0, 5, step=1, splits=Some(8))
NOTE: Bin
unary expression uses ++https://docs.oracle.com/javase/8/docs/api/java/lang/Long.html#toBinaryString-long-++[java.lang.Long.toBinaryString] for the conversion.
[NOTE]¶
Bin
expression supports expressions/Expression.md#doGenCode[code generation] (aka CodeGen).
val withBin = spark.range(5).withColumn("binary", bin('id))
scala> withBin.queryExecution.debug.codegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Project [id#19L, bin(id#19L) AS binary#22]
+- *Range (0, 5, step=1, splits=Some(8))
...
/* 103 */ UTF8String project_value1 = null;
/* 104 */ project_value1 = UTF8String.fromString(java.lang.Long.toBinaryString(range_value));
¶
val withBin = spark.range(5).withColumn("binary", bin('id))
scala> withBin.queryExecution.debug.codegen
Found 1 WholeStageCodegen subtrees.
== Subtree 1 / 1 ==
*Project [id#19L, bin(id#19L) AS binary#22]
+- *Range (0, 5, step=1, splits=Some(8))
...
/* 103 */ UTF8String project_value1 = null;
/* 104 */ project_value1 = UTF8String.fromString(java.lang.Long.toBinaryString(range_value));