Dataset¶
Dataset[T]
is a strongly-typed data structure that represents a structured query over rows of T
type.
Dataset
is created using SQL or Dataset high-level declarative "languages".
The following figure shows the relationship of low-level entities of Spark SQL that all together build up the Dataset
data structure.
It is fair to say that Dataset
is a Spark SQL developer-friendly layer over the following three low-level entities:
-
QueryExecution (with the parsed yet unanalyzed LogicalPlan of a structured query)
-
Encoder (of the type of the records for fast serialization and deserialization to and from InternalRow)
Creating Instance¶
Dataset
takes the following when created:
Note
Dataset
can be created using LogicalPlan when executed using SessionState.
When created, Dataset
requests QueryExecution to assert analyzed phase is successful.
Dataset
is created when:
-
Dataset.apply (for a LogicalPlan and a SparkSession with the Encoder in a Scala implicit scope)
-
Dataset.ofRows (for a LogicalPlan and a SparkSession)
-
Dataset.toDF untyped transformation is used
-
Dataset.select, Dataset.randomSplit and Dataset.mapPartitions typed transformations are used
-
KeyValueGroupedDataset.agg operator is used (that requests
KeyValueGroupedDataset
to aggUntyped) -
SparkSession.emptyDataset and SparkSession.range operators are used
-
CatalogImpl
is requested to makeDataset (when requested to list databases, tables, functions and columns)
writeTo¶
writeTo(
table: String): DataFrameWriterV2[T]
writeTo
creates a DataFrameWriterV2 for the input table and this Dataset
.
write¶
write: DataFrameWriter[T]
write
creates a DataFrameWriter for this Dataset
.
Others to be Reviewed¶
Datasets are lazy and structured query operators and expressions are only triggered when an action is invoked.
[source, scala]¶
import org.apache.spark.sql.SparkSession val spark: SparkSession = ...
scala> val dataset = spark.range(5) dataset: org.apache.spark.sql.Dataset[Long] = [id: bigint]
// Variant 1: filter operator accepts a Scala function dataset.filter(n => n % 2 == 0).count
// Variant 2: filter operator accepts a Column-based SQL expression dataset.filter('value % 2 === 0).count
// Variant 3: filter operator accepts a SQL query dataset.filter("value % 2 = 0").count
The <
Dataset
offers convenience of RDDs with the performance optimizations of DataFrames and the strong static type-safety of Scala. The last feature of bringing the strong type-safety to spark-sql-DataFrame.md[DataFrame] makes Dataset so appealing. All the features together give you a more functional programming interface to work with structured data.
[source, scala]¶
scala> spark.range(1).filter('id === 0).explain(true) == Parsed Logical Plan == 'Filter ('id = 0) +- Range (0, 1, splits=8)
== Analyzed Logical Plan == id: bigint Filter (id#51L = cast(0 as bigint)) +- Range (0, 1, splits=8)
== Optimized Logical Plan == Filter (id#51L = 0) +- Range (0, 1, splits=8)
== Physical Plan == *Filter (id#51L = 0) +- *Range (0, 1, splits=8)
scala> spark.range(1).filter(_ == 0).explain(true) == Parsed Logical Plan == 'TypedFilter
== Analyzed Logical Plan == id: bigint TypedFilter
== Optimized Logical Plan == TypedFilter
== Physical Plan == *Filter
It is only with Datasets to have syntax and analysis checks at compile time (that was not possible using spark-sql-DataFrame.md[DataFrame], regular SQL queries or even RDDs).
Using Dataset
objects turns DataFrames
of spark-sql-Row.md[Row] instances into a DataFrames
of case classes with proper names and types (following their equivalents in the case classes). Instead of using indices to access respective fields in a DataFrame and cast it to a type, all this is automatically handled by Datasets and checked by the Scala compiler.
If however a logical-operators/LogicalPlan.md[LogicalPlan] is used to <SparkSession
) that yields the QueryExecution plan.
A Dataset
is <Serializable
, i.e. can be saved to a persistent storage.
NOTE: SparkSession.md[SparkSession] and QueryExecution are transient attributes of a Dataset
and therefore do not participate in Dataset serialization. The only firmly-tied feature of a Dataset
is the Encoder.
You can request the "untyped" view of a Dataset or access the spark-sql-dataset-operators.md#rdd[RDD] that is generated after executing the query. It is supposed to give you a more pleasant experience while transitioning from the legacy RDD-based or DataFrame-based APIs you may have used in the earlier versions of Spark SQL or encourage migrating from Spark Core's RDD API to Spark SQL's Dataset API.
The default storage level for Datasets
is spark-rdd-caching.md[MEMORY_AND_DISK] because recomputing the in-memory columnar representation of the underlying table is expensive. You can however spark-sql-caching-and-persistence.md#persist[persist a Dataset
].
NOTE: Spark 2.0 has introduced a new query model called spark-structured-streaming.md[Structured Streaming] for continuous incremental execution of structured queries. That made possible to consider Datasets a static and bounded as well as streaming and unbounded data sets with a single unified API for different execution models.
A Dataset
is spark-sql-dataset-operators.md#isLocal[local] if it was created from local collections using SparkSession.md#emptyDataset[SparkSession.emptyDataset] or SparkSession.md#createDataset[SparkSession.createDataset] methods and their derivatives like <
NOTE: Dataset
makes sure that the underlying QueryExecution
is analyzed and spark-sql-Analyzer-CheckAnalysis.md#checkAnalysis[checked].
[[properties]] [[attributes]] .Dataset's Properties [cols="1,2",options="header",width="100%",separator="!"] !=== ! Name ! Description
! [[boundEnc]] boundEnc
! ExpressionEncoder
Used when...FIXME
! [[deserializer]] deserializer
a! Deserializer expressions/Expression.md[expression] to convert internal rows to objects of type T
Created lazily by requesting the <
Used when:
-
Dataset
is <> (for a logical plan in a given SparkSession
) -
spark-sql-dataset-operators.md#spark-sql-dataset-operators.md[Dataset.toLocalIterator] operator is used (to create a Java
Iterator
of objects of typeT
) -
Dataset
is requested to <>
! [[exprEnc]] exprEnc
! Implicit ExpressionEncoder
Used when...FIXME
! logicalPlan
a! [[logicalPlan]] Analyzed <
[source, scala]¶
logicalPlan: LogicalPlan¶
When initialized, logicalPlan
requests the <logicalPlan
<
! planWithBarrier
a! [[planWithBarrier]]
[source, scala]¶
planWithBarrier: AnalysisBarrier¶
! [[rdd]] rdd
a! (lazily-created) spark-rdd.md[RDD] of JVM objects of type T
(as converted from rows in Dataset
in the internal binary row format).
[source, scala]¶
rdd: RDD[T]¶
NOTE: rdd
gives RDD
with the extra execution step to convert rows from their internal binary row format to JVM objects that will impact the JVM memory as the objects are inside JVM (while were outside before). You should not use rdd
directly.
Internally, rdd
first spark-sql-CatalystSerde.md#deserialize[creates a new logical plan that deserializes] the Dataset's <
[source, scala]¶
val dataset = spark.range(5).withColumn("group", 'id % 2) scala> dataset.rdd.toDebugString res1: String = (8) MapPartitionsRDD[8] at rdd at
// Compare with a more memory-optimized alternative // Avoids copies and has no schema scala> dataset.queryExecution.toRdd.toDebugString res2: String = (8) MapPartitionsRDD[11] at toRdd at
rdd
then requests SessionState
to SessionState.md#executePlan[execute the logical plan] to get the corresponding RDD of binary rows.
NOTE: rdd
uses <SessionState
].
rdd
then requests the Dataset's <T
.
NOTE: rdd
is at the "boundary" between the internal binary row format and the JVM type of the dataset. Avoid the extra deserialization step to lower JVM memory requirements of your Spark application.
! [[sqlContext]] sqlContext
! Lazily-created SQLContext
Used when...FIXME !===
=== [[inputFiles]] Getting Input Files of Relations (in Structured Query) -- inputFiles
Method
[source, scala]¶
inputFiles: Array[String]¶
inputFiles
requests <
-
LogicalRelation with FileRelation (as the BaseRelation)
inputFiles
then requests the logical operators for their underlying files:
-
inputFiles of the
FileRelations
-
locationUri of the
HiveTableRelation
=== [[resolve]] resolve
Internal Method
[source, scala]¶
resolve(colName: String): NamedExpression¶
CAUTION: FIXME
=== [[isLocal]] Is Dataset Local? -- isLocal
Method
[source, scala]¶
isLocal: Boolean¶
isLocal
flag is enabled (i.e. true
) when operators like collect
or take
could be run locally, i.e. without using executors.
Internally, isLocal
checks whether the logical query plan of a Dataset
is LocalRelation.md[LocalRelation].
=== [[isStreaming]] Is Dataset Streaming? -- isStreaming
method
[source, scala]¶
isStreaming: Boolean¶
isStreaming
is enabled (i.e. true
) when the logical plan logical-operators/LogicalPlan.md#isStreaming[is streaming].
Internally, isStreaming
takes the Dataset's logical-operators/LogicalPlan.md[logical plan] and gives logical-operators/LogicalPlan.md#isStreaming[whether the plan is streaming or not].
=== [[Queryable]] Queryable
CAUTION: FIXME
=== [[withNewRDDExecutionId]] withNewRDDExecutionId
Internal Method
[source, scala]¶
withNewRDDExecutionIdU: U¶
withNewRDDExecutionId
executes the input body
action under new execution id.
CAUTION: FIXME What's the difference between withNewRDDExecutionId
and <
NOTE: withNewRDDExecutionId
is used when <
Creating DataFrame (For Logical Query Plan and SparkSession)¶
ofRows(
sparkSession: SparkSession,
logicalPlan: LogicalPlan): DataFrame
Note
ofRows
is part of Dataset
Scala object that is marked as a private[sql]
and so can only be accessed from code in org.apache.spark.sql
package.
ofRows
returns spark-sql-DataFrame.md[DataFrame] (which is the type alias for Dataset[Row]
). ofRows
uses RowEncoder to convert the schema (based on the input logicalPlan
logical plan).
Internally, ofRows
SessionState.md#executePlan[prepares the input logicalPlan
for execution] and creates a Dataset[Row]
with the current SparkSession.md[SparkSession], the QueryExecution and RowEncoder.
ofRows
is used when:
-
DataFrameReader
is requested to load data from a data source -
Dataset
is requested to execute <>, mapPartitionsInR
, <> and < > -
RelationalGroupedDataset
is requested to create a DataFrame from aggregate expressions,flatMapGroupsInR
andflatMapGroupsInPandas
-
SparkSession
is requested to <>, < >, < >, < > and < > -
CacheTableCommand
, <>, < > and SaveIntoDataSourceCommand
logical commands are executed (run) -
DataSource
is requested to writeAndRead (for a CreatableRelationProvider) -
FrequentItems
is requested tosinglePassFreqItems
-
StatFunctions
is requested tocrossTabulate
andsummary
-
Spark Structured Streaming's
DataStreamReader
is requested toload
-
Spark Structured Streaming's
DataStreamWriter
is requested tostart
-
Spark Structured Streaming's
FileStreamSource
is requested togetBatch
-
Spark Structured Streaming's
MemoryStream
is requested totoDF
=== [[withNewExecutionId]] Tracking Multi-Job Structured Query Execution (PySpark) -- withNewExecutionId
Internal Method
[source, scala]¶
withNewExecutionIdU: U¶
withNewExecutionId
executes the input body
action under new execution id.
NOTE: withNewExecutionId
sets a unique execution id so that all Spark jobs belong to the Dataset
action execution.
[NOTE]¶
withNewExecutionId
is used exclusively when Dataset
is executing Python-based actions (i.e. collectToPython
, collectAsArrowToPython
and toPythonIterator
) that are not of much interest in this gitbook.
Feel free to contact me at jacek@japila.pl if you think I should re-consider my decision.¶
=== [[withAction]] Executing Action Under New Execution ID -- withAction
Internal Method
[source, scala]¶
withActionU(action: SparkPlan => U)¶
withAction
requests QueryExecution
for the optimized physical query plan and SparkPlan.md[resets the metrics] of every physical operator (in the physical plan).
withAction
requests SQLExecution
to execute the input action
with the executable physical plan (tracked under a new execution id).
In the end, withAction
notifies ExecutionListenerManager
that the name
action has finished ExecutionListenerManager.md#onSuccess[successfully] or ExecutionListenerManager.md#onFailure[with an exception].
NOTE: withAction
uses <
[NOTE]¶
withAction
is used when Dataset
is requested for the following:
- <
> (and executing a Command.md[logical command] or their Union
)
* Dataset operators: <>, <>, <> and <>¶
=== [[apply]] Creating Dataset Instance (For LogicalPlan and SparkSession) -- apply
Internal Factory Method
[source, scala]¶
applyT: Encoder: Dataset[T]¶
NOTE: apply
is part of Dataset
Scala object that is marked as a private[sql]
and so can only be accessed from code in org.apache.spark.sql
package.
apply
...FIXME
[NOTE]¶
apply
is used when:
Dataset
is requested to execute <> and < >
* Spark Structured Streaming's MemoryStream
is requested to toDS
¶
=== [[collectFromPlan]] Collecting All Rows From Spark Plan -- collectFromPlan
Internal Method
[source, scala]¶
collectFromPlan(plan: SparkPlan): Array[T]¶
collectFromPlan
...FIXME
NOTE: collectFromPlan
is used for spark-sql-dataset-operators.md#head[Dataset.head], spark-sql-dataset-operators.md#collect[Dataset.collect] and spark-sql-dataset-operators.md#collectAsList[Dataset.collectAsList] operators.
=== [[selectUntyped]] selectUntyped
Internal Method
[source, scala]¶
selectUntyped(columns: TypedColumn[, _]*): Dataset[]¶
selectUntyped
...FIXME
NOTE: selectUntyped
is used exclusively when <
=== [[withTypedPlan]] Helper Method for Typed Transformations -- withTypedPlan
Internal Method
[source, scala]¶
withTypedPlanU: Encoder: Dataset[U]¶
withTypedPlan
...FIXME
NOTE: withTypedPlan
is annotated with Scala's https://www.scala-lang.org/api/current/scala/inline.html[@inline] annotation that requests the Scala compiler to try especially hard to inline it.
NOTE: withTypedPlan
is used in the Dataset
<
=== [[withSetOperator]] Helper Method for Set-Based Typed Transformations -- withSetOperator
Internal Method
[source, scala]¶
withSetOperatorU: Encoder: Dataset[U]
withSetOperator
...FIXME
NOTE: withSetOperator
is annotated with Scala's https://www.scala-lang.org/api/current/scala/inline.html[@inline] annotation that requests the Scala compiler to try especially hard to inline it.
NOTE: withSetOperator
is used for the spark-sql-Dataset-typed-transformations.md[Dataset's typed transformations] (i.e. spark-sql-Dataset-typed-transformations.md#union[union], spark-sql-Dataset-typed-transformations.md#unionByName[unionByName], spark-sql-Dataset-typed-transformations.md#intersect[intersect], spark-sql-Dataset-typed-transformations.md#intersectAll[intersectAll], spark-sql-Dataset-typed-transformations.md#except[except] and spark-sql-Dataset-typed-transformations.md#exceptAll[exceptAll]).
=== [[sortInternal]] sortInternal
Internal Method
[source, scala]¶
sortInternal(global: Boolean, sortExprs: Seq[Column]): Dataset[T]¶
sortInternal
<
[source, scala]¶
val nums = Seq((0, "zero"), (1, "one")).toDF("id", "name") // Creates a Sort logical operator: // - descending sort direction for id column (specified explicitly) // - name column is wrapped with ascending sort direction val numsSorted = nums.sort('id.desc, 'name) val logicalPlan = numsSorted.queryExecution.logical scala> println(logicalPlan.numberedTreeString) 00 'Sort ['id DESC NULLS LAST, 'name ASC NULLS FIRST], true 01 +- Project [_1#11 AS id#14, _2#12 AS name#15] 02 +- LocalRelation [_1#11, _2#12]
Internally, sortInternal
firstly builds ordering expressions for the given sortExprs
columns, i.e. takes the sortExprs
columns and makes sure that they are <
In the end, sortInternal
<global
flag, and the <
NOTE: sortInternal
is used for the <global
flag being enabled and disabled, respectively).
=== [[withPlan]] Helper Method for Untyped Transformations and Basic Actions -- withPlan
Internal Method
[source, scala]¶
withPlan(logicalPlan: LogicalPlan): DataFrame¶
withPlan
simply uses <DataFrame
for the input <
NOTE: withPlan
is annotated with Scala's https://www.scala-lang.org/api/current/scala/inline.html[@inline] annotation that requests the Scala compiler to try especially hard to inline it.
withPlan
is used in untyped transformations
=== [[i-want-more]] Further Reading and Watching
- (video) https://youtu.be/i7l3JQRx7Qw[Structuring Spark: DataFrames, Datasets, and Streaming]