Skip to content

UnsafeRow

UnsafeRow is an InternalRow for mutable binary rows that are backed by raw memory outside JVM (instead of Java objects that are in JVM memory space and would lead to more frequent GCs).

UnsafeRow supports Java's Externalizable and Kryo's KryoSerializable serialization/deserialization protocols.

Creating Instance

UnsafeRow takes the following to be created:

  • Number of fields

While being created, UnsafeRow calculates the bitset width based on the number of fields.

UnsafeRow is created when:

  • FIXME

Mutable Data Types

The following DataTypes are considered mutable data types:

  • BooleanType
  • ByteType
  • CalendarIntervalType
  • DateType
  • DecimalType
  • DoubleType
  • FloatType
  • IntegerType
  • LongType
  • NullType
  • ShortType
  • TimestampType
  • UserDefinedType (over a mutable data type)

Mutable data types have fixed length and can be mutated in place.

Examples (possibly all) of the data types that are not mutable:

KryoSerializable SerDe Protocol

Learn more in KryoSerializable.

Java's Externalizable SerDe Protocol

Learn more in java.io.Externalizable.

sizeInBytes

UnsafeRow knows its size (in bytes).

scala> println(unsafeRow.getSizeInBytes)
32

Field Offsets

The fields of a data row are placed using field offsets.

Mutable Types

UnsafeRow considers a data type mutable if it is one of the following:

8-Byte Word Alignment and Three Regions

UnsafeRow is composed of three regions:

  1. Null Bit Set Bitmap Region (1 bit/field) for tracking null values
  2. Fixed-Length 8-Byte Values Region
  3. Variable-Length Data Region

UnsafeRow is always 8-byte word aligned and so their size is always a multiple of 8 bytes.

Equality and Hashing

Equality comparision and hashing of rows can be performed on raw bytes since if two rows are identical so should be their bit-wise representation. No type-specific interpretation is required.

baseObject

Object baseObject

baseObject is assigned in pointTo, copyFrom, readExternal and read. In most cases, baseObject is byte[] (except a variant of pointTo that allows for Objects).

writeToStream

void writeToStream(
  OutputStream out,
  byte[] writeBuffer)

writeToStream branches off based on whether the baseObject is byte[] or not.

writeToStream...FIXME

writeToStream is used when:

pointTo

void pointTo(
  byte[] buf,
  int sizeInBytes) // (1)
void pointTo(
  Object baseObject,
  long baseOffset,
  int sizeInBytes)
  1. Uses Platform.BYTE_ARRAY_OFFSET as baseOffset

pointTo...FIXME

copyFrom

void copyFrom(
  UnsafeRow row)

copyFrom...FIXME

copyFrom is used when:

Deserializing UnsafeRow

Regardless of whether Java or Kryo are used for deserialization, they read values from the given ObjectInput to assign the internal registries.

Registry Value
baseOffset The offset of the first element in the storage allocation of a byte array (BYTE_ARRAY_OFFSET)
sizeInBytes The first four bytes (Java's int) from the ObjectInput
numFields The next four bytes (Java's int) from the ObjectInput
bitSetWidthInBytes Based on the numFields
baseObject byte[] (of sizeInBytes size)

Kryo

void read(
  Kryo kryo,
  Input in)

read is part of the KryoSerializable (Kryo) abstraction.

Java

void readExternal(
  ObjectInput in)

readExternal is part of the Externalizable (Java) abstraction.

Demo

// Use ExpressionEncoder for simplicity
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
val stringEncoder = ExpressionEncoder[String]
val row = stringEncoder.toRow("hello world")

import org.apache.spark.sql.catalyst.expressions.UnsafeRow
val unsafeRow = row match { case ur: UnsafeRow => ur }

scala> unsafeRow.getBytes
res0: Array[Byte] = Array(0, 0, 0, 0, 0, 0, 0, 0, 11, 0, 0, 0, 16, 0, 0, 0, 104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 0, 0, 0, 0, 0)

scala> unsafeRow.getUTF8String(0)
res1: org.apache.spark.unsafe.types.UTF8String = hello world
// a sample human-readable row representation
// id (long), txt (string), num (int)
val id: Long = 0
val txt: String = "hello world"
val num: Int = 110
val singleRow = Seq(id, txt, num)
val numFields = singleRow.size

// that's not enough and I learnt it a few lines down
val rowDataInBytes = Array(id.toByte) ++ txt.toArray.map(_.toByte) ++ Array(num.toByte)

import org.apache.spark.sql.catalyst.expressions.UnsafeRow
val row = new UnsafeRow(numFields)

sizeInBytes should be a multiple of 8 and it's a coincidence that this pointTo does not catch it. Checking sizeInBytes % 8 == 0 passes fine and that's why the demo fails later on.

row.pointTo(rowDataInBytes, rowDataInBytes.length)

The following will certainly fail. Consider it a WIP.

assert(row.getLong(0) == id)
Back to top