Using reduceByKey in Apache Spark (Scala)

Using reduceByKey in Apache Spark (Scala)

Asked on January 21, 2019 in Apache-spark.
Add Comment


  • 9 Answer(s)

    The problem can be solved by using below code:

    val byKey = x.map({case (id,uri,count) => (id,uri)->count})
    

    This can be done by:

    val reducedByKey = byKey.reduceByKey(_ + _)
     
    scala> reducedByKey.collect.foreach(println)
    ((a,d),1)
    ((a,b),2)
    ((c,b),1)
    

    PairRDDFunctions[K,V].reduceByKey takes a reduce function that can be included to the to type V of the RDD[(K,V)].

    Otherwise we need a function f[V](e1:V, e2:V) : V . In this condition with sum on Ints: (x:Int, y:Int) => x+y or _ + _ in short underscore notation.

    In this reduceByKey performs better than groupByKey because it attemps to apply the reduce function locally before the shuffle/reduce phase. groupByKey will force a shuffle of all elements before grouping.

    Answered on January 21, 2019.
    Add Comment

    When the data structure is RDD[(K, V)], the reduceByKey can be used and origin data structure is: RDD[(String, String, Int)]

    val kv = x.map(e => e._1 -> e._2 -> e._3) // kv is RDD[((String, String), Int)]
    val reduced = kv.reduceByKey(_ + _) // reduced is RDD[((String, String), Int)]
    val kv2 = reduced.map(e => e._1._1 -> (e._1._2 -> e._2)) // kv2 is RDD[(String, (String, Int))]
    val grouped = kv2.groupByKey() // grouped is RDD[(String, Iterable[(String, Int)])]
    grouped.foreach(println)
    
    Answered on January 21, 2019.
    Add Comment

    The following syntax can be used to solve the issue.

    reduceByKey(func: Function2[V, V, V]): JavaPairRDD[K, V],
    

    Here this takes the values for the same key in an RDD (which will be definitely of same type) This returns the value of same type as of parent RDD.

    Answered on January 21, 2019.
    Add Comment

    The syntax is below:

    reduceByKey(func: Function2[V, V, V]): JavaPairRDD[K, V],
    which says for the same key in an RDD it takes the values (which will be definitely of same type) performs the operation provided as part of function and returns the value of same type as of parent RDD

    Answered on January 24, 2019.
    Add Comment

    val x = sc.parallelize(List(
    (“a”, “b”, 1),
    (“a”, “b”, 1),
    (“c”, “b”, 1),
    (“a”, “d”, 1))
    )

    Answered on January 26, 2019.
    Add Comment

    I have a list of Tuples of type : (user id, name, count).

    For example,

    val x = sc.parallelize(List(
    (“a”, “b”, 1),
    (“a”, “b”, 1),
    (“c”, “b”, 1),
    (“a”, “d”, 1))
    )
    I’m attempting to reduce this collection to a type where each element name is counted.

    So in above val x is converted to :

    (a,ArrayBuffer((d,1), (b,2)))
    (c,ArrayBuffer((b,1)))
    Here is the code I am currently using :

    val byKey = x.map({case (id,uri,count) => (id,uri)->count})

    val grouped = byKey.groupByKey
    val count = grouped.map{case ((id,uri),count) => ((id),(uri,count.sum))}
    val grouped2: org.apache.spark.rdd.RDD[(String, Seq[(String, Int)])] = count.groupByKey

    grouped2.foreach(println)
    I’m attempting to use reduceByKey as it performs faster than groupByKey.

    How can reduceByKey be implemented instead of above code to provide the same mapping ?
    Following your code:

    val byKey = x.map({case (id,uri,count) => (id,uri)->count})
    You could do:

    val reducedByKey = byKey.reduceByKey(_ + _)

    scala> reducedByKey.collect.foreach(println)
    ((a,d),1)
    ((a,b),2)
    ((c,b),1)
    PairRDDFunctions[K,V].reduceByKey takes an associative reduce function that can be applied to the to type V of the RDD[(K,V)]. In other words, you need a function f[V](e1:V, e2:V) : V . In this particular case with sum on Ints: (x:Int, y:Int) => x+y or _ + _ in short underscore notation.

    For the record: reduceByKey performs better than groupByKey because it attemps to apply the reduce function locally before the shuffle/reduce phase. groupByKey will force a shuffle of all elements before grouping.Your origin data structure is: RDD[(String, String, Int)], and reduceByKey can only be used if data structure is RDD[(K, V)].

    val kv = x.map(e => e._1 -> e._2 -> e._3) // kv is RDD[((String, String), Int)]
    val reduced = kv.reduceByKey(_ + _) // reduced is RDD[((String, String), Int)]
    val kv2 = reduced.map(e => e._1._1 -> (e._1._2 -> e._2)) // kv2 is RDD[(String, (String, Int))]
    val grouped = kv2.groupByKey() // grouped is RDD[(String, Iterable[(String, Int)])]
    grouped.foreach(println)

    Answered on January 27, 2019.
    Add Comment

    scala> val rdd = sc.parallelize(List(("a", (1,1,1)), ("b", (1,1,1)), ("a", (1,1,1)), ("a", (1,1,1)), ("b", (1,1,1)), ("b", (1,1,1)), ("b", (1,1,1)), ("b", (1,1,1))))
    scala> rdd.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2, x._3 + y._3)).collect

    Answered on February 3, 2019.
    Add Comment
    val byKey = x.map({case (id,uri,count) => (id,uri)->count})

    You could do:

    val reducedByKey = byKey.reduceByKey(_ + _)
    
    scala> reducedByKey.collect.foreach(println)
    ((a,d),1)
    ((a,b),2)
    ((c,b),1)
    Answered on February 5, 2019.
    Add Comment

    The syntax is below:

    reduceByKey(func: Function2[V, V, V]): JavaPairRDD[K, V],

    which says for the same key in an RDD it takes the values (which will be definitely of same type) performs the operation provided as part of function and returns the value of same type as of parent RDD.

    Answered on February 23, 2019.
    Add Comment


  • Your Answer

    By posting your answer, you agree to the privacy policy and terms of service.