Metadata operations in Spark
- 7 minutes read - 1290 wordsUse case
Quite often a Spark application starts interactively, accessing existing files with lots of attributes quickly using the DataFrame API, and then Spark SQL is used to transform and export the data.
Sooner or later the quickly developed interactive script is getting “missing critical”, it should be maintained by multiple persons, documentation is required, and tests would be nice.
Using case classes as metadata
One possibility is to convert the DataFrame application to using the Dataset API, based on case classes.
The benefits are
- The internal code is much more readable, since all generic DataFrame declarations can be replaced using something more explicit like Dataset[Person] for example
- The case class can be used to attach documentation to the record, and to each individual attribute. Often the colum names are abbreviated, so a meaningful explanation can be added, there could be an explanation regarding the lineage, example values etc.
- You can use the case class as the base for test data
- It’s possible to derive create table SQL commands from the case class (this normally requires some meta data on the table level as well)
- Generate documentation, including foreign keys, lineage etc. using Graphviz for example
The problem: Where do the case classes come from, especially when the data frames have lots of attributes?
Spark already provides useful meta data (DataFrame.schema), and you can use this to generate the case classes based on a given DataFrame.
Sample data set
This data frame which is to represent a person is used in the examples:
+---+-----+---+----------------------+------+------------------------+
|id |name |age|address |active|salary |
+---+-----+---+----------------------+------+------------------------+
|1 |Alice|25 |{London, Baker street}|true |50000.000000000000000000|
|2 |Bob |35 |{London, Baker street}|false |null |
+---+-----+---+----------------------+------+------------------------+
root
|-- id: long (nullable = false)
|-- name: string (nullable = true)
|-- age: integer (nullable = false)
|-- address: struct (nullable = true)
| |-- city: string (nullable = true)
| |-- road: string (nullable = true)
|-- active: boolean (nullable = false)
|-- salary: decimal(38,18) (nullable = true)
Generate case class: Derive from data frame
The DataFrameToCaseClassConverter provides two methods:
- generateCaseClass() uses the schema to generate the case class
- generateCompanionObject() generates the companion object which can be used in tests
The converter provides an alternative method to detect whether an attribute is mandatory or optional: it inspects the given data frame, and if it finds a null value, then a Scala Option is generated into the case class
package com.rhaag.spark.notebook.core.gen
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{BooleanType, DateType, DecimalType, DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType}
import java.time.{LocalDateTime, ZoneOffset}
case class ConverterConfig(dataFrame: DataFrame,
packagename: String, className: String,
deriveNullableFromDataFrame: Boolean,
deriveSampleValueFromDataFrame: Boolean,
emitPackage: Boolean,
emitImports: Boolean,
verbose: Boolean = false)
/**
* @param name The name of the attribute
* @param dataType A Scala type declaration
* @param nullable True if the attribute can hold an empty value
* @param sampleValue A Scala expression to create an instance as defined by [[dataType]]
*/
case class Field(name: String,
dataType: String,
nullable: Boolean,
sampleValue: String) {
/**
* The full data type possibly wraps the [[dataType]] into a Scala Option
*/
val fullDataType: String =
if (nullable) {
s"Option[$dataType]"
} else {
dataType
}
/**
* The full sample value possibly creates an instance of a Scala Option
*/
val fullSampleValue: String =
if (nullable) {
s"Some($sampleValue)"
} else {
sampleValue
}
val column: Column = col(name)
}
object DataFrameToCaseClassConverter {
val ImportStatement = "import java.time.{LocalDate, ZoneOffset, ZonedDateTime}"
def generateCaseClass(config: ConverterConfig): String = {
val heading = generateHeading(config)
val classSnippet =
s"""
|case class ${config.className}(""".stripMargin
val attributes = extractFields(config)
.map { f =>
s""" ${f.name}: ${f.fullDataType}"""
}
val footer =
s"""
|)
|""".stripMargin
heading + "\n" + classSnippet + "\n" + attributes.mkString(",\n") + footer
}
def generateCompanionObject(config: ConverterConfig): String = {
val heading = generateHeading(config)
val objectSnippet =
s"""
|object ${config.className} {
| def apply(): ${config.className} = {
| ${config.className}(""".stripMargin
val attributes = extractFields(config)
.map { f =>
s""" ${f.name} = ${f.fullSampleValue}"""
}
val footer =
s"""
| )
| }
|}
|""".stripMargin
heading + "\n" + objectSnippet + "\n" + attributes.mkString(",\n") + footer
}
private def generateHeading(config: ConverterConfig) = {
val packageSnippet =
if (config.emitPackage) {
s"""
|package ${config.packagename}""".stripMargin
} else {
""
}
val importsSnippet =
if (config.emitImports) {
s"""
|$ImportStatement""".stripMargin
} else {
""
}
packageSnippet + "\n" + importsSnippet
}
private def extractFields(config: ConverterConfig): List[Field] = {
val schema = config.dataFrame.schema
if (config.verbose) {
config.dataFrame.printSchema()
schema.foreach(println)
}
schema
.map { sf =>
val column = col(sf.name)
val dataType = deriveDataType(sf)
val nullable =
if (config.deriveNullableFromDataFrame) {
!config.dataFrame
.select(column)
.filter(column.isNull)
.isEmpty
} else {
sf.nullable
}
val sampleValue =
if (config.deriveSampleValueFromDataFrame) {
deriveSampleValueFromDataFrame(config, sf, column)
} else {
deriveSampleValueUsingConstants(sf)
}
Field(sf.name, dataType, nullable, sampleValue)
}
.toList
}
private def deriveDataType(sf: StructField): String = {
sf.dataType match {
case StringType =>
"String"
case IntegerType =>
"Int"
case LongType =>
"Long"
case DoubleType =>
"Double"
case _: DecimalType =>
"BigDecimal"
case BooleanType =>
"Boolean"
case DateType =>
"java.sql.Date"
case TimestampType =>
"java.sql.Timestamp"
case _: StructType =>
sf.name.capitalize
case _ =>
throw new IllegalArgumentException(s"Not supported: ${sf.name}: ${sf.dataType}")
}
}
private def deriveSampleValueUsingConstants(sf: StructField): String = {
sf.dataType match {
case StringType =>
s"""\"${sf.name}\""""
case IntegerType =>
"1"
case LongType =>
"2L"
case DoubleType =>
"3.45"
case _: DecimalType =>
"BigDecimal(\"4.56\")"
case BooleanType =>
"true"
case DateType =>
"java.sql.Date.valueOf(LocalDate.of(2000, 12, 31))"
case TimestampType =>
"java.sql.Timestamp.valueOf(ZonedDateTime.of(2005, 6, 1, 23, 58, 59, 0, ZoneOffset.UTC).toLocalDateTime)"
case _: StructType =>
s"${sf.name.capitalize}()"
case _ =>
throw new IllegalArgumentException(s"Not supported: ${sf.name}: ${sf.dataType}")
}
}
private def deriveSampleValueFromDataFrame(config: ConverterConfig, sf: StructField, column: Column): String = {
val value = config.dataFrame
.select(column)
.filter(column.isNotNull)
.head()
.get(0)
sf.dataType match {
case StringType =>
s"""\"$value\""""
case IntegerType =>
s"$value"
case LongType =>
s"${value}L"
case DoubleType =>
s"$value"
case _: DecimalType =>
s"BigDecimal(\"$value\")"
case BooleanType =>
s"$value"
case DateType =>
val date = value.asInstanceOf[java.sql.Date].toLocalDate
s"java.sql.Date.valueOf(LocalDate.of(${date.getYear}, ${date.getMonthValue}, ${date.getDayOfMonth}))"
case TimestampType =>
val javaSqlTs = value.asInstanceOf[java.sql.Timestamp]
val ts = LocalDateTime.ofEpochSecond(javaSqlTs.getTime / 1000, 0, ZoneOffset.UTC)
s"java.sql.Timestamp.valueOf(ZonedDateTime.of(${ts.getYear}, ${ts.getMonthValue}, ${ts.getDayOfMonth}, ${ts.getHour}, ${ts.getMinute}, ${ts.getSecond}, 0, ZoneOffset.UTC).toLocalDateTime)"
case _: StructType =>
s"${sf.name.capitalize}()"
case _ =>
throw new IllegalArgumentException(s"Not supported: ${sf.name}: ${sf.dataType}")
}
}
}
Usage
First a little helper method to generate both the case class and the companion object:
val packageName = "com.rhaag.spark.notebook.core.gen.output"
def gen(df: DataFrame, className: String): Unit = {
val caseClassConverterConfig = ConverterConfig(
df,
packageName,
className,
deriveNullableFromDataFrame = true,
deriveSampleValueFromDataFrame = true,
emitPackage = true,
emitImports = true)
val caseClass = DataFrameToCaseClassConverter
.generateCaseClass(caseClassConverterConfig)
println(caseClass)
val companionObjectConverterConfig =
caseClassConverterConfig.copy(
emitPackage = false,
emitImports = false)
val companionObject = DataFrameToCaseClassConverter
.generateCompanionObject(companionObjectConverterConfig)
println(companionObject)
}
to be used like this:
val personDf = readPersonDataFrame()
personDf.show(truncate = false)
personDf.printSchema()
val addressDf = personDf.select($"address.*")
addressDf.show(truncate = false)
addressDf.printSchema()
println("----")
locally {
val className = "Address"
gen(addressDf, className)
}
locally {
val className = "Person"
gen(personDf, className)
}
println("----")
The nested address record is extracted as a separate data frame, and that one is fed into the converter as well.
Result
The snippet before emits these two code snippets
package com.rhaag.spark.notebook.core.gen.output
import java.time.{LocalDate, ZoneOffset, ZonedDateTime}
case class Address(
city: String,
road: String
)
object Address {
def apply(): Address = {
Address(
city = "London",
road = "Baker street"
)
}
}
package com.rhaag.spark.notebook.core.gen.output
import java.time.{LocalDate, ZoneOffset, ZonedDateTime}
case class Person(
id: Long,
name: String,
age: Int,
address: Address,
active: Boolean,
salary: Option[BigDecimal]
)
object Person {
def apply(): Person = {
Person(
id = 1L,
name = "Alice",
age = 25,
address = Address(),
active = true,
salary = Some(BigDecimal("50000.000000000000000000"))
)
}
}
The companion object provides an apply() to create an instance quickly for tests.
Conclusion
The converter is far from being perfect, but it generates at least 80% of the code needed to get started. Now it’s possible to use the case class in the declaration of data sets (Dataset[Person]), to add comments for each attribute, and the case class can serve to provide tests data