Skip to content

UDFs are Blackbox -- Don't Use Them Unless You've Got No Choice

Let's review an example with an UDF. This example is converting strings of size 7 characters only and uses the Dataset standard operators first and then custom UDF to do the same transformation.

[source, scala]

scala> spark.conf.get("spark.sql.parquet.filterPushdown") res0: String = true


You are going to use the following cities dataset that is based on Parquet file (as used in PushDownPredicate.md#parquet[Predicate Pushdown / Filter Pushdown for Parquet Data Source] section). The reason for parquet is that it is an external data source that does support optimization Spark uses to optimize itself like predicate pushdown.

[source, scala]

// no optimization as it is a more involved Scala function in filter // 08/30 Asked on dev@spark mailing list for explanation val cities6chars = cities.filter(.name.length == 6).map(.name.toUpperCase)

cities6chars.explain(true)

// or simpler when only concerned with PushedFilters attribute in Parquet scala> cities6chars.queryExecution.optimizedPlan res33: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#248] +- MapElements , class City, [StructField(id,LongType,false), StructField(name,StringType,true)], obj#247: java.lang.String +- Filter .apply +- DeserializeToObject newInstance(class City), obj#246: City +- Relation[id#236L,name#237] parquet

// no optimization for Dataset[City]?! // 08/30 Asked on dev@spark mailing list for explanation val cities6chars = cities.filter(.name == "Warsaw").map(.name.toUpperCase)

cities6chars.explain(true)

// The filter predicate is pushed down fine for Dataset's Column-based query in where operator scala> cities.where('name === "Warsaw").queryExecution.executedPlan res29: org.apache.spark.sql.execution.SparkPlan = *Project [id#128L, name#129] +- *Filter (isnotnull(name#129) && (name#129 = Warsaw)) +- *FileScan parquet [id#128L,name#129] Batched: true, Format: ParquetFormat, InputPaths: file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,Warsaw)], ReadSchema: struct

// Let's define a UDF to do the filtering val isWarsaw = udf { (s: String) => s == "Warsaw" }

// Use the UDF in where (replacing the Column-based query) scala> cities.where(isWarsaw('name)).queryExecution.executedPlan res33: org.apache.spark.sql.execution.SparkPlan = *Filter UDF(name#129) +- *FileScan parquet [id#128L,name#129] Batched: true, Format: ParquetFormat, InputPaths: file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct


Back to top