How to store custom objects in Dataset ?

How to store custom objects in Dataset ?

Asked on November 15, 2018 in Apache-spark.
Add Comment


  • 3 Answer(s)

    Update
          Here the solution is still valid and informative, although things are now better since 2.2/2.3, which adds built-in encoder support for Set, Seq, Map, Date, Timestamp, and BigDecimal. For making types with only case classes and the usual Scala types, This will be fine with just the implicit in SQLImplicits.

    Exact problem is:
           When there is need to create a dataset, Spark “requires an encoder (to convert a JVM object of type T to and from the internal Spark SQL representation) that is normally created automatically through implicits from a SparkSession, or can be created explicitly by calling static methods on Encoders” (taken from the docs on createDataset). An encoder will take the form Encoder[T] where T is the type you are encoding. The first suggestion is to add import spark.implicits._ (which gives you these implicit encoders) and the second suggestion is to explicitly pass in the implicit encoder using this set of encoder related functions.

    for regular classes, There is no encoder available, so:

    import spark.implicits._
    class MyObj(val i: Int)
    // ...
    val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
    

    This will show you the following implicit related compile time error:

    Can’t able to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing sqlContext.implicits._ Support for serializing other types will be added in future releases.

    Generally the above error in some class that extends Product, the error confusingly gets delayed to runtime, so

    import spark.implicits._
    case class Wrap[T](unwrap: T)
    class MyObj(val i: Int)
    // ...
    val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3))))
    

    It normally Compiles fine, but fails at runtime with

    Here the java.lang.UnsupportedOperationException: No Encoder found for MyObj

            Here the main reason is the encoders Spark creates with the implicits are actually only made at runtime (via scala relfection). In this case, all Spark checks at compile time is that the outermost class extends Product (which all case classes do), and only realizes at runtime that it still doesn’t know what to do with MyObj (the same problem occurs if I tried to make a Dataset[(Int,MyObj)] – Spark waits until runtime to barf on MyObj). These are central problems that are in dire need of being fixed:

    When some classes that extend Product compile despite always crashing at runtime and

        Basically there is no other way of passing in custom encoders for nested types (I have no way of feeding Spark an encoder for just MyObj such that it then knows how to encode Wrap[MyObj] or (Int,MyObj)).

    Here kryo is used.

    The best answer is suggests is to use the kryo encoder.

    import spark.implicits._
    class MyObj(val i: Int)
    implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj]
    // ...
    val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
    

    if the code is manipulating all sorts of datasets, joining, grouping etc. You end up racking up a bunch of extra implicits. So, why not just make an implicit that does this all automatically?

    import scala.reflect.ClassTag
    implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = 
    org.apache.spark.sql.Encoders.kryo[A](ct)
    

         And now, it seems like I can do almost anything I want (the example below won’t work in the spark-shell where spark.implicits._ is automatically imported)

    class MyObj(val i: Int)
    val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3)))
    val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and ..
    val d3 = d1.map(d => (d.i, d)).alias("d3") // .. deals with the new type
    val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom!
    

    The main issue is that using kryo leads to Spark just storing every row in the dataset as a flat binary object. For map, filter, foreach that is enough, but for operations like join, Spark really needs these to be separated into columns. Inspecting the schema for d2 or d3, you see there is just one binary column:

    d2.printSchema
    // root
    // |-- value: binary (nullable = true)
    

     

     

    Answered on November 15, 2018.
    Add Comment

    Here the generic encoders are used.

        In this there are two generic encoders available for now kryo and javaSerialization where the latter one is explicitly described as:

    extremely inefficient and should only be used as the last resort.

    By the following class:

    class Bar(i: Int) {
        override def toString = s"bar $i"
        def bar = i
    }
    

    Here encoders are used by adding implicit encoder:

    object BarEncoders {
        implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] =
        org.apache.spark.sql.Encoders.kryo[Bar]
    }
    

    In which these can be used together as follows:

    object Main {
        def main(args: Array[String]) {
            val sc = new SparkContext("local", "test", new SparkConf())
            val sqlContext = new SQLContext(sc)
            import sqlContext.implicits._
            import BarEncoders._
     
            val ds = Seq(new Bar(1)).toDS
            ds.show
            sc.stop()
        }
    }
    

    Here the objects are stored as binary column so when converted to DataFrame you get following schema:

    root
        |-- value: binary (nullable = true)
    

    It is also done by encoding tuples using kryo encoder for specific field:

    val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar])
     
    spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder)
    // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary]
    

    Note:  implicit encoders pass encoder explicitly so this most likely won’t work with toDS method.

    The following is done by using implicit conversions:

    Giving implicit conversions between representation which can be encoded and custom class,

    For instance:

    object BarConversions {
        implicit def toInt(bar: Bar): Int = bar.bar
        implicit def toBar(i: Int): Bar = new Bar(i)
    }
    object Main {
        def main(args: Array[String]) {
            val sc = new SparkContext("local", "test", new SparkConf())
            val sqlContext = new SQLContext(sc)
            import sqlContext.implicits._
            import BarConversions._
     
            type EncodedBar = Int
     
            val bars: RDD[EncodedBar] = sc.parallelize(Seq(new Bar(1)))
            val barsDS = bars.toDS
     
            barsDS.show
            barsDS.map(_.bar).show
            sc.stop()
        }
    }
    
    Answered on November 15, 2018.
    Add Comment

    This will be useful In case of Java Bean class.

    import spark.sqlContext.implicits._
    import org.apache.spark.sql.Encoders
    implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass])
    

    Here the simple way is read the dataFrame as custom DataFrame

    dataFrame.as[MyClass]
    

    This will create a custom class encoder and not a binary one.

    Answered on November 15, 2018.
    Add Comment


  • Your Answer

    By posting your answer, you agree to the privacy policy and terms of service.