seena's Profile

3042
Points

Questions
217

Answers
555

  • Asked on January 21, 2019 in Apache-spark.

    The following syntax can be used to solve the issue.

    reduceByKey(func: Function2[V, V, V]): JavaPairRDD[K, V],
    

    Here this takes the values for the same key in an RDD (which will be definitely of same type) This returns the value of same type as of parent RDD.

    • 757 views
    • 9 answers
    • 0 votes
  • Here alternatively struct can be used rather than array.

    from pyspark.sql.types import IntegerType
    from pyspark.sql.functions import udf, struct
    sum_cols = udf(lambda x: x[0]+x[1], IntegerType())
    a=spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B'])
    a.show()
    a.withColumn('Result', sum_cols(struct('A', 'B'))).show()
    
    • 6852 views
    • 8 answers
    • 0 votes
  • Asked on January 12, 2019 in Apache-spark.

    Here the Cartesian product and combinations are two seperate things, An RDD of size rdd.size() ^ 2 is created with the cartesian  and combinations will create an RDD of size rdd.size() choose 2

    val rdd = sc.parallelize(1 to 5)
    val combinations = rdd.cartesian(rdd).filter{ case (a,b) => a < b }`.
    combinations.collect()
    

    Make sure that this will only work if an ordering is defined on the elements of the list, Here we use <. This one only works for choosing two but can easily be extended by making sure the relationship a < b for all a and b in the sequence.

    • 648 views
    • 7 answers
    • 0 votes
  • This could be the solution for this issue:

    props file : (mypropsfile.conf) // note: prefix your key with “spark.” else props will be ignored.

    spark.myapp.input /input/path
    spark.myapp.output /output/path
    

    launch this by

    $SPARK_HOME/bin/spark-submit --properties-file mypropsfile.conf
    

    Inside the code:

    sc.getConf.get("spark.driver.host") // localhost
    sc.getConf.get("spark.myapp.input") // /input/path
    sc.getConf.get("spark.myapp.output") // /output/path
    
    • 942 views
    • 2 answers
    • 0 votes
  • Asked on January 12, 2019 in Apache-spark.

    Alternatively try by using below code:

    libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "1.1.0",
    "org.eclipse.jetty.orbit" % "javax.servlet" % "3.0.0.v201112011016",
    "org.eclipse.jetty.orbit" % "javax.transaction" % "1.1.1.v201105210645",
    "org.eclipse.jetty.orbit" % "javax.mail.glassfish" % "1.4.1.v201005082020"
    
    • 493 views
    • 2 answers
    • 0 votes
  • In this window functions is used to attain the rank of each row based on user_id and score, and subsequently filter results to only keep the first two values.

    from pyspark.sql.window import Window
    from pyspark.sql.functions import rank, col
     
    window = Window.partitionBy(df['user_id']).orderBy(df['score'].desc())
     
    df.select('*', rank().over(window).alias('rank'))
      .filter(col('rank') <= 2)
      .show()
    #+-------+---------+-----+----+
    #|user_id|object_id|score|rank|
    #+-------+---------+-----+----+
    #| user_1| object_1| 3   | 1  |
    #| user_1| object_2| 2   | 2  |
    #| user_2| object_2| 6   | 1  |
    #| user_2| object_1| 5   | 2  |
    #+-------+---------+-----+----+
    

    For learning the spark the  programming guide will be useful.

    Data

    rdd = sc.parallelize([("user_1", "object_1", 3),
                        ("user_1", "object_2", 2),
                        ("user_2", "object_1", 5),
                        ("user_2", "object_2", 2),
                        ("user_2", "object_2", 6)])
    df = sqlContext.createDataFrame(rdd, ["user_id", "object_id", "score"])
    
    • 1260 views
    • 4 answers
    • 0 votes
  • Asked on January 12, 2019 in Apache-spark.

    Due to OOMs executors being killed by YARN. The logs on the individual executors should inspected (look for the text “running beyond physical memory”). If there is many executors just inspect all of the logs manually, The suggestion is by monitoring the job in the Spark UI while it runs. When the task fails, In the UI it will be reported. Make sure that some tasks will report failure due to missing executors that have already been killed, so note to look at causes for each of the individual failing tasks.

    Note: By simply repartitioning the data at appropriate places in the code most OOM problems can be solved quickly. Or, We need to scale up machines to accommodate the need for memory.

    • 948 views
    • 3 answers
    • 0 votes
  • Joining by the commas is not a good idea, the reason is it will not be properly quoted when fields contain commas,, For instance. ‘,’.join([‘a’, ‘b’, ‘1,2,3’, ‘c’]) gives you a,b,1,2,3,c when there is need a,b,”1,2,3″,c. Rather, Python’s csv module is used to convert each list in the RDD to a properly-formatted csv string:

    # python 3
    import csv, io
     
    def list_to_csv_str(x):
        """Given a list of strings, returns a properly-csv-formatted string."""
        output = io.StringIO("")
        csv.writer(output).writerow(x)
        return output.getvalue().strip() # remove extra newline
     
    # ... do stuff with your rdd ...
    rdd = rdd.map(list_to_csv_str)
    rdd.saveAsTextFile("output_directory")
    

    The file objects is only written by csv module, First an empty “file” with io.StringIO(“”) is created and says the csv.writer to write the csv-formatted string into it. After that output.getvalue() is used to get the string which is written to the “file”. In Python 2 this code will work by simply replacing io with the StringIO module.

    When the Spark DataFrames API is used, We can go into the  DataBricks save function, which has a csv format.

    • 1419 views
    • 4 answers
    • 0 votes
  • Asked on January 12, 2019 in Apache-spark.

    In the Spark server installation, the Maven config is different.

    For instance:

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_1.3</artifactId>
            <version>1.3</version>
        </dependency>
     
    </dependencies>
    
    • 468 views
    • 2 answers
    • 0 votes
  • The following steps will be useful solve the issue:

    • At first Hadoop binaries need to be downloaded
    • With the user,  the directory is unpacked.
    • For pointing that directory set HADOOP_HOME.
    • And $HADOOP_HOME/lib/native is added to LD_LIBRARY_PATH.
    • 475 views
    • 2 answers
    • 0 votes