Skip to content

Encoder

Encoder is the fundamental concept in the serialization and deserialization (SerDe) framework. Spark SQL uses the SerDe framework for IO to make it efficient time- and space-wise.

Note

Spark has borrowed the idea from the Hive SerDe library so it might be worthwhile to get familiar with Hive a little bit, too.

Encoders are modelled in Spark SQL 2.0 as Encoder[T] trait.

[source, scala]

trait Encoder[T] extends Serializable { def schema: StructType def clsTag: ClassTag[T] }


The type T stands for the type of records a Encoder[T] can deal with. An encoder of type T, i.e. Encoder[T], is used to convert (encode and decode) any JVM object or primitive of type T (that could be your domain object) to and from Spark SQL's InternalRow which is the internal binary row format representation (using Catalyst expressions and code generation).

NOTE: Encoder is also called "a container of serde expressions in Dataset".

NOTE: The one and only implementation of the Encoder trait in Spark SQL 2 is ExpressionEncoder.

Encoders are integral (and internal) part of any Dataset.md[Dataset[T]] (of records of type T) with a Encoder[T] that is used to serialize and deserialize the records of this dataset.

NOTE: Dataset[T] type is a Scala type constructor with the type parameter T. So is Encoder[T] that handles serialization and deserialization of T to the internal representation.

Encoders know the spark-sql-schema.md[schema] of the records. This is how they offer significantly faster serialization and deserialization (comparing to the default Java or Kryo serializers).

[source, scala]

// The domain object for your records in a large dataset case class Person(id: Long, name: String)

import org.apache.spark.sql.Encoders

scala> val personEncoder = Encoders.product[Person] personEncoder: org.apache.spark.sql.Encoder[Person] = class[id[0]: bigint, name[0]: string]

scala> personEncoder.schema res0: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false), StructField(name,StringType,true))

scala> personEncoder.clsTag res1: scala.reflect.ClassTag[Person] = Person

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder

scala> val personExprEncoder = personEncoder.asInstanceOf[ExpressionEncoder[Person]] personExprEncoder: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[Person] = class[id[0]: bigint, name[0]: string]

// ExpressionEncoders may or may not be flat scala> personExprEncoder.flat res2: Boolean = false

// The Serializer part of the encoder scala> personExprEncoder.serializer res3: Seq[org.apache.spark.sql.catalyst.expressions.Expression] = List(assertnotnull(input[0, Person, true], top level non-flat input object).id AS id#0L, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, Person, true], top level non-flat input object).name, true) AS name#1)

// The Deserializer part of the encoder scala> personExprEncoder.deserializer res4: org.apache.spark.sql.catalyst.expressions.Expression = newInstance(class Person)

scala> personExprEncoder.namedExpressions res5: Seq[org.apache.spark.sql.catalyst.expressions.NamedExpression] = List(assertnotnull(input[0, Person, true], top level non-flat input object).id AS id#2L, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, Person, true], top level non-flat input object).name, true) AS name#3)

// A record in a Dataset[Person] // A mere instance of Person case class // There could be a thousand of Person in a large dataset val jacek = Person(0, "Jacek")

// Serialize a record to the internal representation, i.e. InternalRow scala> val row = personExprEncoder.toRow(jacek) row: org.apache.spark.sql.catalyst.InternalRow = [0,0,1800000005,6b6563614a]

// Spark uses InternalRows internally for IO // Let's deserialize it to a JVM object, i.e. a Scala object import org.apache.spark.sql.catalyst.dsl.expressions._

// in spark-shell there are competing implicits // That's why DslSymbol is used explicitly in the following line scala> val attrs = Seq(DslSymbol('id).long, DslSymbol('name).string) attrs: Seq[org.apache.spark.sql.catalyst.expressions.AttributeReference] = List(id#8L, name#9)

scala> val jacekReborn = personExprEncoder.resolveAndBind(attrs).fromRow(row) jacekReborn: Person = Person(0,Jacek)

// Are the jacek instances same? scala> jacek == jacekReborn res6: Boolean = true


You can <Encoders object>>. Note however that encoders for common Scala types and their product types are already available in SparkSession.md#implicits[implicits object].

[source, scala]

val spark = SparkSession.builder.getOrCreate() import spark.implicits._


TIP: The default encoders are already imported in spark-shell.md[spark-shell].

Encoders map columns (of your dataset) to fields (of your JVM object) by name. It is by Encoders that you can bridge JVM objects to data sources (CSV, JDBC, Parquet, Avro, JSON, Cassandra, Elasticsearch, memsql) and vice versa.

NOTE: In Spark SQL 2.0 DataFrame type is a mere type alias for Dataset[Row] with RowEncoder being the encoder.

=== [[creating-encoders]][[encoders]] Creating Custom Encoders (Encoders object)

Encoders factory object defines methods to create Encoder instances.

Import org.apache.spark.sql package to have access to the Encoders factory object.

[source, scala]

import org.apache.spark.sql.Encoders

scala> Encoders.LONG res1: org.apache.spark.sql.Encoder[Long] = class[value[0]: bigint]


You can find methods to create encoders for Java's object types, e.g. Boolean, Integer, Long, Double, String, java.sql.Timestamp or Byte array, that could be composed to create more advanced encoders for Java bean classes (using bean method).

[source, scala]

import org.apache.spark.sql.Encoders

scala> Encoders.STRING res2: org.apache.spark.sql.Encoder[String] = class[value[0]: string]


You can also create encoders based on Kryo or Java serializers.

[source, scala]

import org.apache.spark.sql.Encoders

case class Person(id: Int, name: String, speaksPolish: Boolean)

scala> Encoders.kryo[Person] res3: org.apache.spark.sql.Encoder[Person] = class[value[0]: binary]

scala> Encoders.javaSerialization[Person] res5: org.apache.spark.sql.Encoder[Person] = class[value[0]: binary]


You can create encoders for Scala's tuples and case classes, Int, Long, Double, etc.

[source, scala]

import org.apache.spark.sql.Encoders

scala> Encoders.tuple(Encoders.scalaLong, Encoders.STRING, Encoders.scalaBoolean) res9: org.apache.spark.sql.Encoder[(Long, String, Boolean)] = class[_1[0]: bigint, _2[0]: string, _3[0]: boolean]


=== [[i-want-more]] Further Reading and Watching


Last update: 2020-11-15