Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects

Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects

Asked on November 14, 2018 in Apache-spark.
Add Comment


  • 3 Answer(s)

        Here the spark is a distributed computing engine and its consideration is a resilient distributed dataset (RDD), This could be seen as a distributed collection.

         RDD’s elements are partitioned across the nodes of the cluster, but Spark abstracts this away from the user, letting the user interact with the RDD (collection) as if it were a local one.

          Not to get into too many details, but when you run different transformations on a RDD (map, flatMap, filter and others), your transformation code (closure) is:

    • serialized on the driver node,
    • shipped to the appropriate nodes in the cluster,
    • deserialized,
    • and finally executed on the nodes

     

     

          In the second case is that you are calling a method, defined in class testing from inside the map function. Spark sees that and since methods cannot be serialized on their own, Spark tries to serialize the whole testing class, so that the code will still work when executed in another JVM. here you have two possibilities:

    If class testing serializable, so the whole class can be serialized by Spark:

    import org.apache.spark.{SparkContext,SparkConf}
     
    object Spark {
        val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
    }
     
    object NOTworking extends App {
        new Test().doIT
    }
     
    class Test extends java.io.Serializable {
        val rddList = Spark.ctx.parallelize(List(1,2,3))
     
        def doIT() = {
            val after = rddList.map(someFunc)
            after.collect().foreach(println)
        }
    def someFunc(a: Int) = a + 1
    }
    

    In this have someFunc function instead of a method (functions are objects in Scala), Then Spark will be able to serialize it:

    import org.apache.spark.{SparkContext,SparkConf}
     
    object Spark {
        val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
    }
     
    object NOTworking extends App {
        new Test().doIT
    }
    class Test {
        val rddList = Spark.ctx.parallelize(List(1,2,3))
     
        def doIT() = {
            val after = rddList.map(someFunc)
            after.collect().foreach(println)
        }
    val someFunc = (a: Int) => a + 1
    }
    

    Note: Rewrite rddList.map(someFunc(_)) to rddList.map(someFunc), they are exactly the same. Usually, the second is preferred as it’s less involved and easy to read.

    This gets printed to stdout:

    Serialization stack:
        - object not serializable (class: testing, value: [email protected])
        - field (class: testing$$anonfun$1, name: $outer, type: class testing)
        - object (class testing$$anonfun$1, <function1>)
    
    Answered on November 14, 2018.
    Add Comment

         Here this solution is not very flexible; consider the case where your closure includes a method call on a non-Serializable class that you have no control over. It can be done by adding the Serializable tag to this class or change the underlying implementation to change the method into a function.

    here the solution can be made both giving a lot of information clearly and general:

    def genMapper[A, B](f: A => B): A => B = {
        val locker = com.twitter.chill.MeatLocker(f)
        x => locker.get.apply(x)
    }
    

    This function-serializer can then be used to automatically wrap closures and method calls:

    rdd map genMapper(someFunc)
    

     

    Answered on November 14, 2018.
    Add Comment

        Absoultely this applies to Scala but, in Java, I solved the NotSerializableException by refactoring my code so that the closure did not access a non-serializable final field.

    Answered on November 14, 2018.
    Add Comment


  • Your Answer

    By posting your answer, you agree to the privacy policy and terms of service.