seena's Profile

3042
Points

Questions
217

555

• Asked on January 21, 2019 in

The following syntax can be used to solve the issue.

```reduceByKey(func: Function2[V, V, V]): JavaPairRDD[K, V],
```

Here this takes the values for the same key in an RDD (which will be definitely of same type) This returns the value of same type as of parent RDD.

• 757 views
• Asked on January 21, 2019 in

Here alternatively struct can be used rather than array.

```from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, struct
sum_cols = udf(lambda x: x[0]+x[1], IntegerType())
a=spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B'])
a.show()
a.withColumn('Result', sum_cols(struct('A', 'B'))).show()
```
• 6852 views
• Asked on January 12, 2019 in

Here the Cartesian product and combinations are two seperate things, An RDD of size rdd.size() ^ 2 is created with the cartesian  and combinations will create an RDD of size rdd.size() choose 2

```val rdd = sc.parallelize(1 to 5)
val combinations = rdd.cartesian(rdd).filter{ case (a,b) => a < b }`.
combinations.collect()
```

Make sure that this will only work if an ordering is defined on the elements of the list, Here we use <. This one only works for choosing two but can easily be extended by making sure the relationship a < b for all a and b in the sequence.

• 648 views
• Asked on January 12, 2019 in

This could be the solution for this issue:

props file : (mypropsfile.conf) // note: prefix your key with “spark.” else props will be ignored.

```spark.myapp.input /input/path
spark.myapp.output /output/path
```

launch this by

```\$SPARK_HOME/bin/spark-submit --properties-file mypropsfile.conf
```

Inside the code:

```sc.getConf.get("spark.driver.host") // localhost
sc.getConf.get("spark.myapp.input") // /input/path
sc.getConf.get("spark.myapp.output") // /output/path
```
• 942 views
• Asked on January 12, 2019 in

Alternatively try by using below code:

```libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.1.0",
"org.eclipse.jetty.orbit" % "javax.servlet" % "3.0.0.v201112011016",
"org.eclipse.jetty.orbit" % "javax.transaction" % "1.1.1.v201105210645",
"org.eclipse.jetty.orbit" % "javax.mail.glassfish" % "1.4.1.v201005082020"
```
• 493 views
• Asked on January 12, 2019 in

In this window functions is used to attain the rank of each row based on user_id and score, and subsequently filter results to only keep the first two values.

```from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col

window = Window.partitionBy(df['user_id']).orderBy(df['score'].desc())

df.select('*', rank().over(window).alias('rank'))
.filter(col('rank') <= 2)
.show()
#+-------+---------+-----+----+
#|user_id|object_id|score|rank|
#+-------+---------+-----+----+
#| user_1| object_1| 3   | 1  |
#| user_1| object_2| 2   | 2  |
#| user_2| object_2| 6   | 1  |
#| user_2| object_1| 5   | 2  |
#+-------+---------+-----+----+
```

For learning the spark the  programming guide will be useful.

Data

```rdd = sc.parallelize([("user_1", "object_1", 3),
("user_1", "object_2", 2),
("user_2", "object_1", 5),
("user_2", "object_2", 2),
("user_2", "object_2", 6)])
df = sqlContext.createDataFrame(rdd, ["user_id", "object_id", "score"])
```
• 1260 views
• Asked on January 12, 2019 in

Due to OOMs executors being killed by YARN. The logs on the individual executors should inspected (look for the text “running beyond physical memory”). If there is many executors just inspect all of the logs manually, The suggestion is by monitoring the job in the Spark UI while it runs. When the task fails, In the UI it will be reported. Make sure that some tasks will report failure due to missing executors that have already been killed, so note to look at causes for each of the individual failing tasks.

Note: By simply repartitioning the data at appropriate places in the code most OOM problems can be solved quickly. Or, We need to scale up machines to accommodate the need for memory.

• 948 views
• Asked on January 12, 2019 in

Joining by the commas is not a good idea, the reason is it will not be properly quoted when fields contain commas,, For instance. ‘,’.join([‘a’, ‘b’, ‘1,2,3’, ‘c’]) gives you a,b,1,2,3,c when there is need a,b,”1,2,3″,c. Rather, Python’s csv module is used to convert each list in the RDD to a properly-formatted csv string:

```# python 3
import csv, io

def list_to_csv_str(x):
"""Given a list of strings, returns a properly-csv-formatted string."""
output = io.StringIO("")
csv.writer(output).writerow(x)
return output.getvalue().strip() # remove extra newline

# ... do stuff with your rdd ...
rdd = rdd.map(list_to_csv_str)
rdd.saveAsTextFile("output_directory")
```

The file objects is only written by csv module, First an empty “file” with io.StringIO(“”) is created and says the csv.writer to write the csv-formatted string into it. After that output.getvalue() is used to get the string which is written to the “file”. In Python 2 this code will work by simply replacing io with the StringIO module.

When the Spark DataFrames API is used, We can go into the  DataBricks save function, which has a csv format.

• 1419 views
• Asked on January 12, 2019 in

In the Spark server installation, the Maven config is different.

For instance:

```<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_1.3</artifactId>
<version>1.3</version>
</dependency>

</dependencies>
```
• 468 views