http://stackoverflow.com/questions/36648128/how-to-store-custom-objects-in-a-dataset
http://stackoverflow.com/questions/39433419/encoder-error-while-trying-to-map-dataframe-row-to-updated-row
http://stackoverflow.com/questions/39433419/encoder-error-while-trying-to-map-dataframe-row-to-updated-row
Independent of version you can simply use
DataFrame
API:import org.apache.spark.sql.functions.{when, lower}
val df = Seq(
(2012, "Tesla", "S"), (1997, "Ford", "E350"),
(2015, "Chevy", "Volt")
).toDF("year", "make", "model")
df.withColumn("make", when(lower($"make") === "tesla", "S").otherwise($"make"))
If you really want to use
map
you should use statically typed Dataset
:import spark.implicits._
case class Record(year: Int, make: String, model: String)
df.as[Record].map {
case tesla if tesla.make.toLowerCase == "tesla" => tesla.copy(make = "S")
case rec => rec
}
or at least return an object which will have implicit encoder:
df.map {
case Row(year: Int, make: String, model: String) =>
(year, if(make.toLowerCase == "tesla") "S" else make, model)
}
Finally if for some completely crazy reason you really want to map over
Dataset[Row]
you have to provide required encoder:import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
// Yup, it would be possible to reuse df.schema here
val schema = StructType(Seq(
StructField("year", IntegerType),
StructField("make", StringType),
StructField("model", StringType)
))
val encoder = RowEncoder(schema)
df.map {
case Row(year, make: String, model) if make.toLowerCase == "tesla" =>
Row(year, "S", model)
case row => row
} (encoder)