get specific row from spark dataframe
get specific row from spark dataframe
At starting, DataFrames are distributed, needs to be understood, In typical procedural way this cannot be accessed , At first analysis process is done. With the Scala, here recommended to read the Pyspark Documentation, because this contains more details.
Here we can use some methods of the RDD API cause all DataFrames have one RDD as attribute.
For instance
df = sqlContext.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["letter", "name"]) myIndex = 1 values = (df.rdd.zipWithIndex() .filter(lambda ((l, v), i): i == myIndex) .map(lambda ((l,v), i): (l, v)) .collect()) print(values[0]) # (u'b', 2)
The getrows() function below should get the specific rows you want.
For completeness, I have written down the full code in order to reproduce the output.
# Create SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.master(‘local’).appName(‘scratch’).getOrCreate()
# Create the dataframe
df = spark.createDataFrame([(“a”, 1), (“b”, 2), (“c”, 3)], [“letter”, “name”])
# Function to get rows at `rownums`
def getrows(df, rownums=None):
return df.rdd.zipWithIndex().filter(lambda x: x[1] in rownums).map(lambda x: x[0])
# Get rows at positions 0 and 2.
getrows(df, rownums=[0, 2]).collect()
# Output:
#> [(Row(letter=’a’, name=1)), (Row(letter=’c’, name=3))
Is there any alternative for df in scala spark data frames. I want to select specific row from a column of spark data frame. for example 100th row in above R equivalent code.
get specific row from spark dataframe
apache-spark apache-spark-sql
Is there any alternative for df[100, c(“column”)] in scala spark data frames. I want to select specific row from a column of spark data frame. for example 100th row in above R equivalent codeThe getrows() function below should get the specific rows you want.
For completeness, I have written down the full code in order to reproduce the output.
# Create SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.master(‘local’).appName(‘scratch’).getOrCreate()
# Create the dataframe
df = spark.createDataFrame([(“a”, 1), (“b”, 2), (“c”, 3)], [“letter”, “name”])
# Function to get rows at `rownums`
def getrows(df, rownums=None):
return df.rdd.zipWithIndex().filter(lambda x: x[1] in rownums).map(lambda x: x[0])
# Get rows at positions 0 and 2.
getrows(df, rownums=[0, 2]).collect()
# Output:
#> [(Row(letter=’a’, name=1)), (Row(letter=’c’, name=3))]
val index = ss.sparkContext
.parallelize(Seq((1,1), (2,0), (3,2), (4,0), (5,1)))
.toDF("ID", "value")
index.where($"value" === 0).select("ID").show()
There is a scala way (if you have a enough memory on working machine):
val arr = df.select("column").rdd.collect println(arr(100))
If dataframe schema is unknown, and you know actual type of"column"
field (for example double), than you can getarr
as following:
val arr = df.select($"column".cast("Double")).as[Double].rdd.collect
scala> val row = Row(1, "hello")
row: org.apache.spark.sql.Row = [1,hello]
scala> row(1)
res0: Any = hello
scala> row.get(1)
res1: Any = hello
DataFrame — Dataset of Rows with RowEncoder
Spark SQL introduces a tabular functional data abstraction called DataFrame. It is designed to ease developing Spark applications for processing large amount of structured tabular data on Spark infrastructure.
DataFrame is a data abstraction or a domain-specific language (DSL) for working with structured and semi-structured data, i.e. datasets that you can specify a schema for.
DataFrame is a collection of rows with a schema that is the result of executing a structured query (once it will have been executed).
DataFrame uses the immutable, in-memory, resilient, distributed and parallel capabilities of RDD, and applies a structure called schema to the data.
Note
|
In Spark 2.0.0
|
DataFrame
is a distributed collection of tabular data organized into rows and named columns. It is conceptually equivalent to a table in a relational database with operations to project (select
), filter
, intersect
, join
, group
, sort
, join
, aggregate
, or convert
to a RDD (consult DataFrame API)
data.groupBy('Product_ID).sum('Score)
Spark SQL borrowed the concept of DataFrame from pandas’ DataFrame and made it immutable, parallel (one machine, perhaps with many processors and cores) and distributed (many machines, perhaps with many processors and cores).
Note
|
Hey, big data consultants, time to help teams migrate the code from pandas’ DataFrame into Spark’s DataFrames (at least to PySpark’s DataFrame) and offer services to set up large clusters! |
DataFrames in Spark SQL strongly rely on the features of RDD – it’s basically a RDD exposed as structured DataFrame by appropriate operations to handle very big data from the day one. So, petabytes of data should not scare you (unless you’re an administrator to create such clustered Spark environment – contact me when you feel alone with the task).
val df = Seq(("one", 1), ("one", 1), ("two", 1))
.toDF("word", "count")
scala> df.show
+----+-----+
|word|count|
+----+-----+
| one| 1|
| one| 1|
| two| 1|
+----+-----+
val counted = df.groupBy('word).count
scala> counted.show
+----+-----+
|word|count|
+----+-----+
| two| 1|
| one| 2|
+----+-----+
You can create DataFrames by loading data from structured files (JSON, Parquet, CSV), RDDs, tables in Hive, or external databases (JDBC). You can also create DataFrames from scratch and build upon them (as in the above example). See DataFrame API. You can read any format given you have appropriate Spark SQL extension of DataFrameReader to format the dataset appropriately.
Caution
|
FIXME Diagram of reading data from sources to create DataFrame |
You can execute queries over DataFrames using two approaches:
- the good ol’ SQL – helps migrating from “SQL databases” world into the world of DataFrame in Spark SQL
- Query DSL – an API that helps ensuring proper syntax at compile time.
Dataset is a strongly-typed data structure in Spark SQL that represents a structured query.
Note
|
A structured query can be written using SQL or Dataset API. |
The following figure shows the relationship between different entities of Spark SQL that all together give the Dataset
data structure.

It is therefore fair to say that Dataset
consists of the following three elements:
- QueryExecution (with the parsed unanalyzed LogicalPlan of a structured query)
- Encoder (of the type of the records for fast serialization and deserialization to and from InternalRow)
- SparkSession