Skip to content

AnalyzeColumnCommand Logical Command

AnalyzeColumnCommand is a logical command for ANALYZE TABLE SQL statement.

AnalyzeColumnCommand is not supported on views.

Creating Instance

AnalyzeColumnCommand takes the following to be created:

  • TableIdentifier
  • Column Names (optional)
  • allColumns Flag

AnalyzeColumnCommand is created when ResolveSessionCatalog logical resolution rule is executed (to resolve an AnalyzeColumnStatement).

Executing Logical Command

run(
  sparkSession: SparkSession): Seq[Row]

run calculates the following statistics:

  • sizeInBytes
  • stats for each column

run is part of RunnableCommand abstraction.

Computing Statistics for Specified Columns

computeColumnStats(
  sparkSession: SparkSession,
  tableIdent: TableIdentifier,
  columnNames: Seq[String]): (Long, Map[String, ColumnStat])

computeColumnStats...FIXME

Computing Percentiles

computePercentiles(
  attributesToAnalyze: Seq[Attribute],
  sparkSession: SparkSession,
  relation: LogicalPlan): AttributeMap[ArrayData]

computePercentiles...FIXME

Demo

AnalyzeColumnCommand can generate column histograms when spark.sql.statistics.histogram.enabled configuration property is enabled. AnalyzeColumnCommand supports column histograms for the following data types:

  • IntegralType
  • DecimalType
  • DoubleType
  • FloatType
  • DateType
  • TimestampType
// ./bin/spark-shell --conf spark.sql.statistics.histogram.enabled=true
// Use the above example to set up the environment
// Make sure that ANALYZE TABLE COMPUTE STATISTICS FOR COLUMNS was run with histogram enabled

// There are 254 bins by default
// Use spark.sql.statistics.histogram.numBins to control the bins
val descExtSQL = s"DESC EXTENDED $tableName p1"
scala> spark.sql(descExtSQL).show(truncate = false)
+--------------+-----------------------------------------------------+
|info_name     |info_value                                           |
+--------------+-----------------------------------------------------+
|col_name      |p1                                                   |
|data_type     |double                                               |
|comment       |NULL                                                 |
|min           |0.0                                                  |
|max           |1.4                                                  |
|num_nulls     |0                                                    |
|distinct_count|2                                                    |
|avg_col_len   |8                                                    |
|max_col_len   |8                                                    |
|histogram     |height: 0.007874015748031496, num_of_bins: 254       |
|bin_0         |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1|
|bin_1         |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1|
|bin_2         |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1|
|bin_3         |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1|
|bin_4         |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1|
|bin_5         |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1|
|bin_6         |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1|
|bin_7         |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1|
|bin_8         |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1|
|bin_9         |lower_bound: 0.0, upper_bound: 0.0, distinct_count: 1|
+--------------+-----------------------------------------------------+
only showing top 20 rows

Demo

// Make the example reproducible
val tableName = "t1"
import org.apache.spark.sql.catalyst.TableIdentifier
val tableId = TableIdentifier(tableName)

val sessionCatalog = spark.sessionState.catalog
sessionCatalog.dropTable(tableId, ignoreIfNotExists = true, purge = true)

val df = Seq((0, 0.0, "zero"), (1, 1.4, "one")).toDF("id", "p1", "p2")
df.write.saveAsTable("t1")

// AnalyzeColumnCommand represents ANALYZE TABLE...FOR COLUMNS SQL command
val allCols = df.columns.mkString(",")
val analyzeTableSQL = s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS $allCols"
val plan = spark.sql(analyzeTableSQL).queryExecution.logical
import org.apache.spark.sql.execution.command.AnalyzeColumnCommand
val cmd = plan.asInstanceOf[AnalyzeColumnCommand]
scala> println(cmd)
AnalyzeColumnCommand `t1`, [id, p1, p2]

spark.sql(analyzeTableSQL)
val stats = sessionCatalog.getTableMetadata(tableId).stats.get
scala> println(stats.simpleString)
1421 bytes, 2 rows

scala> stats.colStats.map { case (c, ss) => s"$c: $ss" }.foreach(println)
id: ColumnStat(2,Some(0),Some(1),0,4,4,None)
p1: ColumnStat(2,Some(0.0),Some(1.4),0,8,8,None)
p2: ColumnStat(2,None,None,0,4,4,None)

// Use DESC EXTENDED for friendlier output
scala> sql(s"DESC EXTENDED $tableName id").show
+--------------+----------+
|     info_name|info_value|
+--------------+----------+
|      col_name|        id|
|     data_type|       int|
|       comment|      NULL|
|           min|         0|
|           max|         1|
|     num_nulls|         0|
|distinct_count|         2|
|   avg_col_len|         4|
|   max_col_len|         4|
|     histogram|      NULL|
+--------------+----------+

Last update: 2020-11-07