joe's Profile

3044
Points

Questions
239

535

• Asked on January 21, 2019 in

From Spark Closures, When large numbers of array which is accessed,  for instance some dummy data and this array will be shipped to each spark node with closure.

Eg: When there is 10 nodes cluster with 100 partitions (10 partitions per node), For 100 times, the array will be distributed as 10 times to each node.

When broadcast is used and By using efficient p2p protocol, this is distributed once per node.

```val array: Array[Int] = ??? // some huge array

```

With the RDD

```val rdd: RDD[Int] = ???
```

Every time the case array will be shipped with closure.

```rdd.map(i => array.contains(i))
```

and The more performance is seen with broadcast.

```rdd.map(i => broadcasted.value.contains(i))
```
• 667 views
• Asked on January 21, 2019 in

The problem can be solved by without using array and struct.

```from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, struct

def sum(x, y):
return x + y

sum_cols = udf(sum, IntegerType())

a=spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B'])
a.show()
a.withColumn('Result', sum_cols('A', 'B')).show()
```
• 6935 views
• Asked on January 12, 2019 in

In this we have seen cartesian will give n^2 elements of the cartesian product of the RDD with itself. This algorithm computes the combinations (n,2) of an RDD without having to compute the n^2 elements first: (The String is used as type, generalizing to a type T takes some plumbing with classtags that would obscure the purpose here)

Here more space efficient as it calculates only the C(n,2) = n!/(2*(n-2))! = (n*(n-1)/2) elements rather of the n^2 of the cartesian product.

```import org.apache.spark.rdd._

def combs(rdd:RDD[String]):RDD[(String,String)] = {
val count = rdd.count
if (rdd.count < 2) {
sc.makeRDD[(String,String)](Seq.empty)
} else if (rdd.count == 2) {
val values = rdd.collect
sc.makeRDD[(String,String)](Seq((values(0), values(1))))
} else {
val elem = rdd.take(1)
val elemRdd = sc.makeRDD(elem)
val subtracted = rdd.subtract(elemRdd)
val comb = subtracted.map(e => (elem(0),e))
comb.union(combs(subtracted))
}
}
```
• 664 views
• Asked on January 12, 2019 in

The append mode is used on the DataFrameWriter which Data can be appended to a Hive table.

```data = hc.sql("select 1 as id, 10 as score")
data.write.mode("append").saveAsTable("my_table")
```

The output will be same as an insert.

• 584 views
• Asked on January 12, 2019 in

When using row_number instead of rank top-n is more accurate while getting rank equality:

```val n = 5
df.select(col('*'), row_number().over(window).alias('row_number')) \
.where(col('row_number') <= n) \
.limit(20) \
.toPandas()
```

Make sure that limit(20).toPandas() is used instead of show() for Jupyter notebooks.

• 1298 views
• Asked on January 12, 2019 in

This issue could be solved by, When the class conflict comes about because HBase depends on org.mortbay.jetty, and Spark depends on org.eclipse.jetty. By excluding org.mortbay.jetty dependencies from HBase the problem can be solved.

In the case of hadoop-common, SO we want to exclude javax.servlet from hadoop-common. I have a  With my sbt dependencies, working HBase/Spark setup set up be like:

```val clouderaVersion = "cdh5.2.0"
val hbaseVersion = s"0.98.6-\$clouderaVersion"
val sparkVersion = s"1.1.0-\$clouderaVersion"

val hbaseCommon = "org.apache.hbase" % "hbase-common" % hbaseVersion % "provided"
val hbaseClient = "org.apache.hbase" % "hbase-client" % hbaseVersion % "provided"
val hbaseProtocol = "org.apache.hbase" % "hbase-protocol" % hbaseVersion % "provided"
val hbaseServer = "org.apache.hbase" % "hbase-server" % hbaseVersion % "provided" excludeAll ExclusionRule(organization = "org.mortbay.jetty")
val sparkCore = "org.apache.spark" %% "spark-core" % sparkVersion % "provided"
val sparkStreaming = "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided"
val sparkStreamingKafka = "org.apache.spark" %% "spark-streaming-kafka" % sparkVersion exclude("org.apache.spark", "spark-streaming_2.10")
```
• 523 views
• Asked on January 12, 2019 in

Here lit(null) can be used:

```import org.apache.spark.sql.functions.{lit, udf}

case class Record(foo: Int, bar: String)
val df = Seq(Record(1, "foo"), Record(2, "bar")).toDF

val dfWithFoobar = df.withColumn("foobar", lit(null: String))
```

The reason for the issue is column type is null:

```scala> dfWithFoobar.printSchema
root
|-- foo: integer (nullable = false)
|-- bar: string (nullable = true)
|-- foobar: null (nullable = true)
```

By the csv writer it is not retained. If it is a hard requirement ,

with either DataType

```import org.apache.spark.sql.types.StringType

df.withColumn("foobar", lit(null).cast(StringType))
```

Or else string description

```df.withColumn("foobar", lit(null).cast("string"))
```

The UDF can be used as follows:

```val getNull = udf(() => None: Option[String]) // Or some other type

df.withColumn("foobar", getNull()).printSchema
root
|-- foo: integer (nullable = false)
|-- bar: string (nullable = true)
|-- foobar: string (nullable = true)
```

• 1112 views
• Asked on January 12, 2019 in

The Spark query engine at the moment is limited, the given below is relevant JIRA ticket, spark only handles predicate pushdown of simple types in Parquet, not nested StructTypes.

• 499 views
• Asked on January 12, 2019 in

Alternatively the problem can be fixed by:

• Here the folder is unzipped in the home directory by using the following command. tar -zxvf hadoop_file_name
• And export HADOOP_HOME=~/hadoop-2.8.0 is added to the .bashrc file. After try by opening a new terminal.
• 491 views
```object Launch {