ashwath's Profile

3027
Points

Questions
225

Answers
529

  • At starting, DataFrames are distributed, needs to be understood, In typical procedural way this cannot be accessed , At first analysis process is done. With the Scala, here recommended to read the Pyspark Documentation, because this contains more details.

    Here we can use some methods of the RDD API cause all DataFrames have one RDD as attribute.

    For instance

    df = sqlContext.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["letter", "name"])
    myIndex = 1
    values = (df.rdd.zipWithIndex()
            .filter(lambda ((l, v), i): i == myIndex)
            .map(lambda ((l,v), i): (l, v))
            .collect())
     
    print(values[0])
    # (u'b', 2)
    
    • 13770 views
    • 15 answers
    • 0 votes
  • Asked on January 21, 2019 in Apache-spark.

    When the data structure is RDD[(K, V)], the reduceByKey can be used and origin data structure is: RDD[(String, String, Int)]

    val kv = x.map(e => e._1 -> e._2 -> e._3) // kv is RDD[((String, String), Int)]
    val reduced = kv.reduceByKey(_ + _) // reduced is RDD[((String, String), Int)]
    val kv2 = reduced.map(e => e._1._1 -> (e._1._2 -> e._2)) // kv2 is RDD[(String, (String, Int))]
    val grouped = kv2.groupByKey() // grouped is RDD[(String, Iterable[(String, Int)])]
    grouped.foreach(println)
    
    • 822 views
    • 9 answers
    • 0 votes
  • When there is need to pass all columns to UDF which is having the same data type, So here array can be used as input parameter,

    for instance:

    >>> from pyspark.sql.types import IntegerType
    >>> from pyspark.sql.functions import udf, array
    >>> sum_cols = udf(lambda arr: sum(arr), IntegerType())
    >>> spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B']) \
    ... .    withColumn('Result', sum_cols(array('A', 'B'))).show()
    +---+---+---+------+
    | ID| A | B |Result|
    +---+---+---+------+
    |101| 1 | 16| 17   |
    +---+---+---+------+
     
    >>> spark.createDataFrame([(101, 1, 16, 8)], ['ID', 'A', 'B', 'C'])\
    ... .    withColumn('Result', sum_cols(array('A', 'B', 'C'))).show()
    +---+---+---+---+------+
    | ID| A | B | C |Result|
    +---+---+---+---+------+
    |101| 1 | 16| 8 | 25   |
    +---+---+---+---+------+
    
    • 7046 views
    • 8 answers
    • 0 votes
  • Asked on January 12, 2019 in Apache-spark.

    When spark-shell is entered, And if there is need to add a .jar to the classpath Then :require is used. as follows:

    scala> :require /path/to/file.jar
    Added '/path/to/file.jar' to classpath.
    
    • 1979 views
    • 3 answers
    • 0 votes
  • In this saveAsTable solution is failed with an AnalysisException . Rather the below works fine:

    data = hc.sql("select 1 as id, 10 as score")
    data.write.mode("append").insertInto("my_table")
    

    Here the version of Spark v2.1.0 is used.

    • 608 views
    • 4 answers
    • 0 votes
  • Asked on January 12, 2019 in Apache-spark.

    Here the dependency defined as

    "org.apache.spark" %% "spark-core" % "1.0.2"
    

    In the above code: %% instructs sbt to substitute current scala version to artifact name. Apparently,  Without specific jars for 2.10.1, 2.10.2 … spark was build for the whole family of 2.10 scala,

    So we need to redefine:

    "org.apache.spark" % "spark-core_2.10" % "1.0.2"
    
    • 572 views
    • 2 answers
    • 0 votes
  • This could be very helpful and it works:

    libraryDependencies ++= Seq(
        "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
        "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
        ....................................................................
    ).map(_.excludeAll(ExclusionRule(organization = "javax.servlet")))
    
    • 551 views
    • 4 answers
    • 0 votes
  • Asked on January 12, 2019 in Apache-spark.

    Here set the spark.network.timeout to a higher value in spark-defaults.conf.

    When  I was also able to We can set the timeout as follows by using spark-submit:

    $SPARK_HOME/bin/spark-submit --conf spark.network.timeout 10000000 --class myclass.neuralnet.TrainNetSpark --master spark://master.cluster:7077 --driver-memory 30G --executor-memory 14G --num-executors 7 --executor-cores 8 --conf spark.driver.maxResultSize=4g --conf spark.executor.heartbeatInterval=10000000 path/to/my.jar
    
    • 993 views
    • 3 answers
    • 0 votes
  • Here the two column RDD is written to a single CSV file in PySpark 1.6.2

    For the RRD

    >>> rdd.take(5)
    [(73342, u'cells'), (62861, u'cell'), (61714, u'studies'), (61377, u'aim'), (60168, u'clinical')]
    

    The code will be:

    # First I convert the RDD to dataframe
    from pyspark import SparkContext
    df = sqlContext.createDataFrame(rdd, ['count', 'word'])
    

    For the DF:

    >>> df.show()
    +-----+-----------+
    |count| word      |
    +-----+-----------+
    |73342| cells     |
    |62861| cell      |
    |61714| studies   |
    |61377| aim       |
    |60168| clinical  |
    |59275| 2         |
    |59221| 1         |
    |58274| data      |
    |58087|development|
    |56579| cancer    |
    |50243| disease   |
    |49817| provided  |
    |49216| specific  |
    |48857| health    |
    |48536| study     |
    |47827| project   |
    |45573|description|
    |45455| applicant |
    |44739| program   |
    |44522| patients  |
    +-----+-----------+
    only showing top 20 rows
    

    To CSV it is written.

    # Write CSV (I have HDFS storage)
    df.coalesce(1).write.format('com.databricks.spark.csv').options(header='true').save('file:///home/username/csv_out')
    
    • 1508 views
    • 4 answers
    • 0 votes
  • Here it is related on creating another subproject for running the project locally which as follows:

    We need to change the build.sbt file by:

    lazy val sparkDependencies = Seq(
    
    "org.apache.spark" %% "spark-streaming" % sparkVersion
    
    )
    libraryDependencies ++= sparkDependencies.map(_ % "provided")
    lazy val localRunner = project.in(file("mainRunner")).dependsOn(RootProject(file("."))).settings(
    
    libraryDependencies ++= sparkDependencies.map(_ % "compile")
    
    )
    

    After that the new subproject is ran locally with the classpath of module is used: localRunner under the Run Configuration.

    • 426 views
    • 3 answers
    • 0 votes