UDFs are Blackbox -- Don't Use Them Unless You've Got No Choice¶
Let's review an example with an UDF. This example is converting strings of size 7 characters only and uses the Dataset
standard operators first and then custom UDF to do the same transformation.
[source, scala]¶
scala> spark.conf.get("spark.sql.parquet.filterPushdown") res0: String = true
You are going to use the following cities
dataset that is based on Parquet file (as used in PushDownPredicate.md#parquet[Predicate Pushdown / Filter Pushdown for Parquet Data Source] section). The reason for parquet is that it is an external data source that does support optimization Spark uses to optimize itself like predicate pushdown.
[source, scala]¶
// no optimization as it is a more involved Scala function in filter // 08/30 Asked on dev@spark mailing list for explanation val cities6chars = cities.filter(.name.length == 6).map(.name.toUpperCase)
cities6chars.explain(true)
// or simpler when only concerned with PushedFilters attribute in Parquet scala> cities6chars.queryExecution.optimizedPlan res33: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#248] +- MapElements
// no optimization for Dataset[City]?! // 08/30 Asked on dev@spark mailing list for explanation val cities6chars = cities.filter(.name == "Warsaw").map(.name.toUpperCase)
cities6chars.explain(true)
// The filter predicate is pushed down fine for Dataset's Column-based query in where operator scala> cities.where('name === "Warsaw").queryExecution.executedPlan res29: org.apache.spark.sql.execution.SparkPlan = *Project [id#128L, name#129] +- *Filter (isnotnull(name#129) && (name#129 = Warsaw)) +- *FileScan parquet [id#128L,name#129] Batched: true, Format: ParquetFormat, InputPaths: file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [], PushedFilters: [IsNotNull(name), EqualTo(name,Warsaw)], ReadSchema: struct
// Let's define a UDF to do the filtering val isWarsaw = udf { (s: String) => s == "Warsaw" }
// Use the UDF in where (replacing the Column-based query) scala> cities.where(isWarsaw('name)).queryExecution.executedPlan res33: org.apache.spark.sql.execution.SparkPlan = *Filter UDF(name#129) +- *FileScan parquet [id#128L,name#129] Batched: true, Format: ParquetFormat, InputPaths: file:/Users/jacek/dev/oss/spark/cities.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct