joe's Profile

3044
Points

Questions
239

Answers
535

  • From Spark Closures, When large numbers of array which is accessed,  for instance some dummy data and this array will be shipped to each spark node with closure.

    Eg: When there is 10 nodes cluster with 100 partitions (10 partitions per node), For 100 times, the array will be distributed as 10 times to each node.

    When broadcast is used and By using efficient p2p protocol, this is distributed once per node.

    val array: Array[Int] = ??? // some huge array
    
    val broadcasted = sc.broadcast(array)
    
    

    With the RDD

    val rdd: RDD[Int] = ???
    

    Every time the case array will be shipped with closure.

    rdd.map(i => array.contains(i))
    

    and The more performance is seen with broadcast.

    rdd.map(i => broadcasted.value.contains(i))
    
    • 667 views
    • 10 answers
    • 0 votes
  • The problem can be solved by without using array and struct.

    from pyspark.sql.types import IntegerType
    from pyspark.sql.functions import udf, struct
     
    def sum(x, y):
        return x + y
     
    sum_cols = udf(sum, IntegerType())
     
    a=spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B'])
    a.show()
    a.withColumn('Result', sum_cols('A', 'B')).show()
    
    • 6935 views
    • 8 answers
    • 0 votes
  • Asked on January 12, 2019 in Apache-spark.

    In this we have seen cartesian will give n^2 elements of the cartesian product of the RDD with itself. This algorithm computes the combinations (n,2) of an RDD without having to compute the n^2 elements first: (The String is used as type, generalizing to a type T takes some plumbing with classtags that would obscure the purpose here)

    Here more space efficient as it calculates only the C(n,2) = n!/(2*(n-2))! = (n*(n-1)/2) elements rather of the n^2 of the cartesian product.

    import org.apache.spark.rdd._
     
    def combs(rdd:RDD[String]):RDD[(String,String)] = {
        val count = rdd.count
        if (rdd.count < 2) {
            sc.makeRDD[(String,String)](Seq.empty)
        } else if (rdd.count == 2) {
            val values = rdd.collect
            sc.makeRDD[(String,String)](Seq((values(0), values(1))))
        } else {
            val elem = rdd.take(1)
            val elemRdd = sc.makeRDD(elem)
            val subtracted = rdd.subtract(elemRdd)
            val comb = subtracted.map(e => (elem(0),e))
            comb.union(combs(subtracted))
        }
    }
    
    • 664 views
    • 7 answers
    • 0 votes
  • The append mode is used on the DataFrameWriter which Data can be appended to a Hive table.

    data = hc.sql("select 1 as id, 10 as score")
    data.write.mode("append").saveAsTable("my_table")
    

    The output will be same as an insert.

    • 584 views
    • 4 answers
    • 0 votes
  • When using row_number instead of rank top-n is more accurate while getting rank equality:

    val n = 5
    df.select(col('*'), row_number().over(window).alias('row_number')) \
      .where(col('row_number') <= n) \
      .limit(20) \
      .toPandas()
    

    Make sure that limit(20).toPandas() is used instead of show() for Jupyter notebooks.

     

    • 1298 views
    • 4 answers
    • 0 votes
  • This issue could be solved by, When the class conflict comes about because HBase depends on org.mortbay.jetty, and Spark depends on org.eclipse.jetty. By excluding org.mortbay.jetty dependencies from HBase the problem can be solved.

    In the case of hadoop-common, SO we want to exclude javax.servlet from hadoop-common. I have a  With my sbt dependencies, working HBase/Spark setup set up be like:

    val clouderaVersion = "cdh5.2.0"
    val hadoopVersion = s"2.5.0-$clouderaVersion"
    val hbaseVersion = s"0.98.6-$clouderaVersion"
    val sparkVersion = s"1.1.0-$clouderaVersion"
     
    val hadoopCommon = "org.apache.hadoop" % "hadoop-common" % hadoopVersion % "provided" excludeAll ExclusionRule(organization = "javax.servlet")
    val hbaseCommon = "org.apache.hbase" % "hbase-common" % hbaseVersion % "provided"
    val hbaseClient = "org.apache.hbase" % "hbase-client" % hbaseVersion % "provided"
    val hbaseProtocol = "org.apache.hbase" % "hbase-protocol" % hbaseVersion % "provided"
    val hbaseHadoop2Compat = "org.apache.hbase" % "hbase-hadoop2-compat" % hbaseVersion % "provided"
    val hbaseServer = "org.apache.hbase" % "hbase-server" % hbaseVersion % "provided" excludeAll ExclusionRule(organization = "org.mortbay.jetty")
    val sparkCore = "org.apache.spark" %% "spark-core" % sparkVersion % "provided"
    val sparkStreaming = "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided"
    val sparkStreamingKafka = "org.apache.spark" %% "spark-streaming-kafka" % sparkVersion exclude("org.apache.spark", "spark-streaming_2.10")
    
    • 523 views
    • 4 answers
    • 0 votes
  • Here lit(null) can be used:

    import org.apache.spark.sql.functions.{lit, udf}
     
    case class Record(foo: Int, bar: String)
    val df = Seq(Record(1, "foo"), Record(2, "bar")).toDF
     
    val dfWithFoobar = df.withColumn("foobar", lit(null: String))
    

    The reason for the issue is column type is null:

    scala> dfWithFoobar.printSchema
    root
      |-- foo: integer (nullable = false)
      |-- bar: string (nullable = true)
      |-- foobar: null (nullable = true)
      

    By the csv writer it is not retained. If it is a hard requirement ,

    with either DataType

    import org.apache.spark.sql.types.StringType
     
    df.withColumn("foobar", lit(null).cast(StringType))
    

    Or else string description

    df.withColumn("foobar", lit(null).cast("string"))
    

    The UDF can be used as follows:

    val getNull = udf(() => None: Option[String]) // Or some other type
     
    df.withColumn("foobar", getNull()).printSchema
    root
      |-- foo: integer (nullable = false)
      |-- bar: string (nullable = true)
      |-- foobar: string (nullable = true)
    

     

    • 1112 views
    • 2 answers
    • 0 votes
  • The Spark query engine at the moment is limited, the given below is relevant JIRA ticket, spark only handles predicate pushdown of simple types in Parquet, not nested StructTypes.

    Refer this link: https://issues.apache.org/jira/browse/SPARK-17636

    • 499 views
    • 2 answers
    • 0 votes
  • Alternatively the problem can be fixed by:

    • At first hadoop binary (link) is downloaded and place in home directory. When the different hadoop version is choosen and the next steps is changed accordingly.
    • Here the folder is unzipped in the home directory by using the following command. tar -zxvf hadoop_file_name
    • And export HADOOP_HOME=~/hadoop-2.8.0 is added to the .bashrc file. After try by opening a new terminal.
    • 491 views
    • 2 answers
    • 0 votes
  • For running the Spark application from IntelliJ IDEA, The main class in the src/test/scala directory (test, not main) is created. Provided dependencies is picked up by IntelliJ.

    object Launch {
      def main(args: Array[String]) {
        Main.main(args)
      }
    }
    
    • 403 views
    • 3 answers
    • 0 votes