Skip to content

Dataset API — Basic Actions

Basic actions are a set of operators (methods) of the <> for transforming a Dataset into a session-scoped or global temporary view and other basic actions (FIXME).

NOTE: Basic actions are the methods in the Dataset Scala class that are grouped in basic group name, i.e. @group basic.

[[methods]] .Dataset API's Basic Actions [cols="1,3",options="header",width="100%"] |=== | Action | Description

| cache a| [[cache]]

[source, scala]

cache(): this.type

Marks the Dataset to be persisted (cached) and is actually a synonym of <> basic action

| <> a|

[source, scala]

checkpoint(): Dataset[T] checkpoint(eager: Boolean): Dataset[T]


Checkpoints the Dataset in a reliable way (using a reliable HDFS-compliant file system, e.g. Hadoop HDFS or Amazon S3)

| <> a|

[source, scala]

columns: Array[String]

| <> a|

[source, scala]

createGlobalTempView(viewName: String): Unit

| <> a|

[source, scala]

createOrReplaceGlobalTempView(viewName: String): Unit

| <> a|

[source, scala]

createOrReplaceTempView(viewName: String): Unit

| <> a|

[source, scala]

createTempView(viewName: String): Unit

| <> a|

[source, scala]

dtypes: Array[(String, String)]

| <> a|

[source, scala]

explain(): Unit explain(extended: Boolean): Unit


Displays the logical and physical plans of the Dataset, i.e. displays the logical and physical plans (with optional cost and codegen summaries) to the standard output

| <> a|

[source, scala]

hint(name: String, parameters: Any*): Dataset[T]

| <> a|

[source, scala]

inputFiles: Array[String]

| <> a|

[source, scala]

isEmpty: Boolean

(New in 2.4.0)

| <> a|

[source, scala]

isLocal: Boolean

| <> a|

[source, scala]

localCheckpoint(): Dataset[T] localCheckpoint(eager: Boolean): Dataset[T]


Checkpoints the Dataset locally on executors (and therefore unreliably)

| persist a| [[persist]]

[source, scala]

persist(): this.type // <1> persist(newLevel: StorageLevel): this.type


<1> Assumes the default storage level MEMORY_AND_DISK

Marks the Dataset to be <> the next time an action is executed

Internally, persist simply request the CacheManager to cache the structured query.

NOTE: persist uses the CacheManager from the <> associated with the <> (of the Dataset).

| <> a|

[source, scala]

printSchema(): Unit

| <> a|

[source, scala]

rdd: RDD[T]

| <> a|

[source, scala]

schema: StructType

| <> a|

[source, scala]

storageLevel: StorageLevel

| <> a|

[source, scala]

toDF(): DataFrame toDF(colNames: String*): DataFrame


| <> a|

[source, scala]

unpersist(): this.type unpersist(blocking: Boolean): this.type


Unpersists the Dataset

| <> a|

[source, scala]

write: DataFrameWriter[T]

Returns a DataFrameWriter for saving the content of the (non-streaming) Dataset out to an external storage |===

=== [[checkpoint]] Reliably Checkpointing Dataset -- checkpoint Basic Action

[source, scala]

checkpoint(): Dataset[T] // <1> checkpoint(eager: Boolean): Dataset[T] // <2>


<1> eager and reliableCheckpoint flags enabled <2> reliableCheckpoint flag enabled

NOTE: checkpoint is an experimental operator and the API is evolving towards becoming stable.

checkpoint simply requests the Dataset to <> with the given eager flag and the reliableCheckpoint flag enabled.

=== [[createTempView]] createTempView Basic Action

[source, scala]

createTempView(viewName: String): Unit

createTempView...FIXME

NOTE: createTempView is used when...FIXME

=== [[createOrReplaceTempView]] createOrReplaceTempView Basic Action

[source, scala]

createOrReplaceTempView(viewName: String): Unit

createOrReplaceTempView...FIXME

NOTE: createOrReplaceTempView is used when...FIXME

=== [[createGlobalTempView]] createGlobalTempView Basic Action

[source, scala]

createGlobalTempView(viewName: String): Unit

createGlobalTempView...FIXME

NOTE: createGlobalTempView is used when...FIXME

=== [[createOrReplaceGlobalTempView]] createOrReplaceGlobalTempView Basic Action

[source, scala]

createOrReplaceGlobalTempView(viewName: String): Unit

createOrReplaceGlobalTempView...FIXME

NOTE: createOrReplaceGlobalTempView is used when...FIXME

=== [[createTempViewCommand]] createTempViewCommand Internal Method

[source, scala]

createTempViewCommand( viewName: String, replace: Boolean, global: Boolean): CreateViewCommand


createTempViewCommand...FIXME

NOTE: createTempViewCommand is used when the following Dataset operators are used: <>, <>, <> and <>.

=== [[explain]] Displaying Logical and Physical Plans, Their Cost and Codegen -- explain Basic Action

[source, scala]

explain(): Unit // <1> explain(extended: Boolean): Unit


<1> Turns the extended flag on

explain prints the spark-sql-LogicalPlan.md[logical] and (with extended flag enabled) SparkPlan.md[physical] plans, their cost and codegen to the console.

TIP: Use explain to review the structured queries and optimizations applied.

Internally, explain creates a ExplainCommand.md[ExplainCommand] logical command and requests SessionState to SessionState.md#executePlan[execute it] (to get a QueryExecution back).

NOTE: explain uses ExplainCommand.md[ExplainCommand] logical command that, when ExplainCommand.md#run[executed], gives different text representations of QueryExecution (for the Dataset's spark-sql-LogicalPlan.md[LogicalPlan]) depending on the flags (e.g. extended, codegen, and cost which are disabled by default).

explain then requests QueryExecution for the optimized physical query plan and collects the records (as InternalRow objects).

[NOTE]

explain uses Dataset's Dataset.md#sparkSession[SparkSession] to SparkSession.md#sessionState[access the current SessionState].

In the end, explain goes over the InternalRow records and converts them to lines to display to console.

Note

explain "converts" an InternalRow record to a line using getString at position 0.

TIP: If you are serious about query debugging you could also use the spark-sql-debugging-query-execution.md[Debugging Query Execution facility].

[source, scala]

scala> spark.range(10).explain(extended = true) == Parsed Logical Plan == Range (0, 10, step=1, splits=Some(8))

== Analyzed Logical Plan == id: bigint Range (0, 10, step=1, splits=Some(8))

== Optimized Logical Plan == Range (0, 10, step=1, splits=Some(8))

== Physical Plan == *Range (0, 10, step=1, splits=Some(8))


Specifying Hint

hint(
  name: String,
  parameters: Any*): Dataset[T]

hint operator is part of Hint Framework to specify a hint (by name and parameters) for a Dataset.

Internally, hint simply attaches UnresolvedHint.md[UnresolvedHint] unary logical operator to an "analyzed" Dataset (i.e. the Dataset.md#logicalPlan[analyzed logical plan] of a Dataset).

val ds = spark.range(3)
val plan = ds.queryExecution.logical
scala> println(plan.numberedTreeString)
00 Range (0, 3, step=1, splits=Some(8))

// Attach a hint
val dsHinted = ds.hint("myHint", 100, true)
val plan = dsHinted.queryExecution.logical
scala> println(plan.numberedTreeString)
00 'UnresolvedHint myHint, [100, true]
01 +- Range (0, 3, step=1, splits=Some(8))

Note

hint adds an <> unary logical operator to an analyzed logical plan that indirectly triggers analysis phase that executes logical commands and their unions as well as resolves all hints that have already been added to a logical plan.

=== [[localCheckpoint]] Locally Checkpointing Dataset -- localCheckpoint Basic Action

[source, scala]

localCheckpoint(): Dataset[T] // <1> localCheckpoint(eager: Boolean): Dataset[T]


<1> eager flag enabled

localCheckpoint simply uses <> operator with the input eager flag and reliableCheckpoint flag disabled (false).

=== [[checkpoint-internal]] checkpoint Internal Method

[source, scala]

checkpoint(eager: Boolean, reliableCheckpoint: Boolean): Dataset[T]

checkpoint requests Dataset.md#queryExecution[QueryExecution] (of the Dataset) to generate an RDD of internal binary rows (internalRdd) and then requests the RDD to make a copy of all the rows (by adding a MapPartitionsRDD).

Depending on reliableCheckpoint flag, checkpoint marks the RDD for (reliable) checkpointing (true) or local checkpointing (false).

With eager flag on, checkpoint counts the number of records in the RDD (by executing RDD.count) that gives the effect of immediate eager checkpointing.

checkpoint requests Dataset.md#queryExecution[QueryExecution] (of the Dataset) for optimized physical query plan (the plan is used to get the SparkPlan.md#outputPartitioning[outputPartitioning] and SparkPlan.md#outputOrdering[outputOrdering] for the result Dataset).

In the end, checkpoint Dataset.md#ofRows[creates a DataFrame] with a new LogicalRDD.md#creating-instance[logical plan node for scanning data from an RDD of InternalRows] (LogicalRDD).

NOTE: checkpoint is used in the Dataset untyped transformations, i.e. checkpoint and localCheckpoint.

=== [[rdd]] Generating RDD of Internal Binary Rows -- rdd Basic Action

[source, scala]

rdd: RDD[T]

Whenever you are in need to convert a Dataset into a RDD, executing rdd method gives you the RDD of the proper input object type (not Row as in DataFrames) that sits behind the Dataset.

[source, scala]

scala> val rdd = tokens.rdd rdd: org.apache.spark.rdd.RDD[Token] = MapPartitionsRDD[11] at rdd at :30


Internally, it looks ExpressionEncoder (for the Dataset) up and accesses the deserializer expression. That gives the DataType of the result of evaluating the expression.

NOTE: A deserializer expression is used to decode an InternalRow to an object of type T. See ExpressionEncoder.

It then executes a DeserializeToObject.md[DeserializeToObject logical operator] that will produce a RDD[InternalRow] that is converted into the proper RDD[T] using the DataType and T.

NOTE: It is a lazy operation that "produces" a RDD[T].

=== [[schema]] Accessing Schema -- schema Basic Action

A Dataset has a schema.

[source, scala]

schema: StructType

[TIP]

You may also use the following methods to learn about the schema:

  • printSchema(): Unit
  • <>

=== [[toDF]] Converting Typed Dataset to Untyped DataFrame -- toDF Basic Action

[source, scala]

toDF(): DataFrame toDF(colNames: String*): DataFrame


toDF converts a Dataset.md[Dataset] into a DataFrame.

Internally, the empty-argument toDF creates a Dataset[Row] using the Dataset's SparkSession.md[SparkSession] and QueryExecution with the encoder being RowEncoder.

CAUTION: FIXME Describe toDF(colNames: String*)

=== [[unpersist]] Unpersisting Cached Dataset -- unpersist Basic Action

[source, scala]

unpersist(): this.type unpersist(blocking: Boolean): this.type


unpersist uncache the Dataset possibly by blocking the call.

Internally, unpersist requests CacheManager to uncache the query.

CAUTION: FIXME

Accessing DataFrameWriter (to Describe Writing Dataset)

[source, scala]

write: DataFrameWriter[T]

write gives DataFrameWriter for records of type T.

import org.apache.spark.sql.{DataFrameWriter, Dataset}
val ints: Dataset[Int] = (0 to 5).toDS
val writer: DataFrameWriter[Int] = ints.write

=== [[isEmpty]] isEmpty Typed Transformation

[source, scala]

isEmpty: Boolean

isEmpty...FIXME

=== [[isLocal]] isLocal Typed Transformation

[source, scala]

isLocal: Boolean

isLocal...FIXME


Last update: 2021-05-20