Monday 23 September 2019

spark_day_1_practice

--------------------------------------------------
Spark Provides Libraries
-> Spark SQL
-> Spark Streaming
-> Spark MLLib
-> Spark GraphX

`Spark Context` -> Entry Point for Spark Operations

From `Spark Context` we can create a Resilient Distributed Datasets (RDD)

RDD Features:
----------------
1. Immutability
2. Lazy Evaluated
3. Cacheable
4. Type Infer


RDD Operations:
--------------------
1. Transformations

<old rdd> ====> <new rdd>


2. Actions

<rdd> ====> <result>


Note:-
Transformations are not going to execute the RDD
Actions are going to execute the RDD

--------------------------------------------------
How to create a RDD ?

We can create a RDD in 2 ways
1. from collections (List, Seq, Seq, ...)
2. from datasets (text file, csv, tsv, json, ...)

--------------------------------------------------
We can Run Spark in 3 ways in command line:

Scala => spark-shell => $SPARK_HOME/bin/spark-shell
Python => pyspark => $SPARK_HOME/bin/pyspark
R => sparkR => $SPARK_HOME/bin/sparkR


--------------------------------------------------

Spark-1.x:
------------
Spark context available as 'sc'
Sql context available as 'sqlContext'


Spark-2.x:
------------
Spark context available as 'sc'
Spark session available as 'spark'


--------------------------------------------------
1. Create a RDD from Collection
-----------------------------------
val nums = List(1,2,3,4,5,6)
val chars = List('a','b','c','d','e','f')

val rdd1 = sc.parallelize(nums)
val rdd2 = sc.parallelize(chars)

--------------------------------------------------

scala> val nums = List(1,2,3,4,5,6)
nums: List[Int] = List(1, 2, 3, 4, 5, 6)

scala> val chars = List('a','b','c','d','e','f')
chars: List[Char] = List(a, b, c, d, e, f)

scala> val rdd1 = sc.parallelize(nums)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> val rdd2 = sc.parallelize(chars)
rdd2: org.apache.spark.rdd.RDD[Char] = ParallelCollectionRDD[1] at parallelize at <console>:26

--------------------------------------------------
How to know no.of partitions in RDD:
--------------------------------------
scala> rdd1.partitions.length
res0: Int = 4

scala> rdd1.getNumPartitions
res1: Int = 4


2. Create a RDD from Collection with no.of partitions
------------------------------------------------------
val nums = List(1,2,3,4,5,6)
val chars = List('a','b','c','d','e','f')

val rdd1 = sc.parallelize(nums, 2)
val rdd2 = sc.parallelize(chars, 3)

--------------------------------------------------

scala> val rdd1 = sc.parallelize(nums, 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:26

scala> rdd1.partitions.length
res2: Int = 2

scala> rdd1.getNumPartitions
res3: Int = 2

--------------------------------------------------


scala> rdd1.
++                      groupBy                  saveAsObjectFile 
aggregate               histogram                saveAsTextFile   
cache                   id                       setName           
canEqual                intersection             sortBy           
cartesian               isCheckpointed           sparkContext     
checkpoint              isEmpty                  stats             
coalesce                iterator                 stdev             
collect                 keyBy                    subtract         
collectAsync            localCheckpoint          sum               
compute                 map                      sumApprox         
context                 mapPartitions            take             
copy                    mapPartitionsWithIndex   takeAsync         
count                   max                      takeOrdered       
countApprox             mean                     takeSample       
countApproxDistinct     meanApprox               toDF             
countAsync              min                      toDS             
countByValue            name                     toDebugString     
countByValueApprox      partitioner              toJavaRDD         
dependencies            partitions               toLocalIterator   
distinct                persist                  toString         
filter                  pipe                     top               
first                   preferredLocations       treeAggregate     
flatMap                 productArity             treeReduce       
fold                    productElement           union             
foreach                 productIterator          unpersist         
foreachAsync            productPrefix            variance         
foreachPartition        randomSplit              zip               
foreachPartitionAsync   reduce                   zipPartitions     
getCheckpointFile       repartition              zipWithIndex     
getNumPartitions        sample                   zipWithUniqueId   
getStorageLevel         sampleStdev                               
glom                    sampleVariance


--------------------------------------------------

scala> rdd1.foreach(x => println(x))
1
2
3
4
5
6

scala> rdd1.foreach(println)
4
5
6
1
2
3

Using collect print the RDD data:
------------------------------------------

scala> rdd1.collect
res16: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> rdd1.collect
res17: Array[Int] = Array(1, 2, 3, 4, 5, 6)


scala> rdd1.collect.foreach(println)
1
2
3
4
5
6

scala> rdd1.collect.foreach(println)
1
2
3
4
5
6


Note:
---------
`rdd.collect` will collect the data from all partitions of RDD into a single machine

don't use `collect`  in real-time use cases, reason is memory issues

--------------------------------------------------
scala> rdd1.map(x => x + 1)
res29: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at map at <console>:29

scala> rdd1.map(x => x + 1).foreach(println)
2
3
4
5
6
7

scala> rdd1.map(x => x + 1).collect.foreach(println)
2
3
4
5
6
7


scala> rdd1.map(x => x + 1).collect.foreach(println)
2
3
4
5
6
7

scala> rdd1.map(_ + 1).collect.foreach(println)
2
3
4
5
6
7

scala> rdd1.map(x => x * x).collect.foreach(println)
1
4
9
16
25
36

scala> rdd1.map(_ * _).collect.foreach(println)
<console>:29: error: missing parameter type for expanded function ((x$1, x$2) => x$1.$times(x$2))
       rdd1.map(_ * _).collect.foreach(println)
                ^
<console>:29: error: missing parameter type for expanded function ((x$1: <error>, x$2) => x$1.$times(x$2))
       rdd1.map(_ * _).collect.foreach(println)
                    ^



scala> rdd1.filter(x => x > 2).collect.foreach(println)
3
4
5
6

scala> rdd1.filter(_ > 2).collect.foreach(println)
3
4
5
6


--------------------------------------------------

scala> rdd1.min
res38: Int = 1

scala> rdd1.max
res39: Int = 6

scala> rdd1.sum
res40: Double = 21.0

scala> rdd1.count
res41: Long = 6

--------------------------------------------------

scala> rdd1.mapPartitionsWithIndex()
<console>:29: error: not enough arguments for method mapPartitionsWithIndex: (f: (Int, Iterator[Int]) => Iterator[U], preservesPartitioning: Boolean)(implicit evidence$8: scala.reflect.ClassTag[U])org.apache.spark.rdd.RDD[U].
Unspecified value parameter f.
       rdd1.mapPartitionsWithIndex()
                                  ^

def myfunc1(index: Int, it: Iterator[Int]): Iterator[String] = {
 it.toList.map(x => s"index -> $index, value -> $x").toIterator
}

def myfunc2(index: Int, it: Iterator[Char]): Iterator[String] = {
 it.toList.map(x => s"index -> $index, value -> $x").toIterator
}

def myfunc[T](index: Int, it: Iterator[T]): Iterator[String] = {
 it.toList.map(x => s"index -> $index, value -> $x").toIterator
}

--------------------------------------------------

scala> def myfunc1(index: Int, it: Iterator[Int]): Iterator[String] = {
     |  it.toList.map(x => s"index -> $index, value -> $x").toIterator
     | }
myfunc1: (index: Int, it: Iterator[Int])Iterator[String]


scala> rdd1.mapPartitionsWithIndex(myfunc1).collect.foreach(println)
index -> 0, value -> 1
index -> 0, value -> 2
index -> 0, value -> 3
index -> 1, value -> 4
index -> 1, value -> 5
index -> 1, value -> 6


scala> rdd1.collect
res51: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> rdd2.collect
res52: Array[Char] = Array(a, b, c, d, e, f)

scala> rdd1.mapPartitionsWithIndex(myfunc1).collect.foreach(println)
index -> 0, value -> 1
index -> 0, value -> 2
index -> 0, value -> 3
index -> 1, value -> 4
index -> 1, value -> 5
index -> 1, value -> 6

scala> rdd2.mapPartitionsWithIndex(myfunc1).collect.foreach(println)
<console>:31: error: type mismatch;
 found   : (Int, Iterator[Int]) => Iterator[String]
 required: (Int, Iterator[Char]) => Iterator[?]
       rdd2.mapPartitionsWithIndex(myfunc1).collect.foreach(println)
                                   ^



scala> def myfunc2(index: Int, it: Iterator[Char]): Iterator[String] = {
     |  it.toList.map(x => s"index -> $index, value -> $x").toIterator
     | }
myfunc2: (index: Int, it: Iterator[Char])Iterator[String]

scala> rdd2.mapPartitionsWithIndex(myfunc2).collect.foreach(println)
index -> 0, value -> a
index -> 1, value -> b
index -> 1, value -> c
index -> 2, value -> d
index -> 3, value -> e
index -> 3, value -> f

scala> rdd2.getNumPartitions
res56: Int = 4


--------------------------------------------------

scala> def myfunc[T](index: Int, it: Iterator[T]): Iterator[String] = {
     |  it.toList.map(x => s"index -> $index, value -> $x").toIterator
     | }
myfunc: [T](index: Int, it: Iterator[T])Iterator[String]

scala> rdd1.mapPartitionsWithIndex(myfunc).collect.foreach(println)
index -> 0, value -> 1
index -> 0, value -> 2
index -> 0, value -> 3
index -> 1, value -> 4
index -> 1, value -> 5
index -> 1, value -> 6

scala> rdd2.mapPartitionsWithIndex(myfunc).collect.foreach(println)
index -> 0, value -> a
index -> 1, value -> b
index -> 1, value -> c
index -> 2, value -> d
index -> 3, value -> e
index -> 3, value -> f


--------------------------------------------------
Aggregate Transformation in RDD:
----------------------------------------
scala> rdd1.aggregate()()
<console>:29: error: not enough arguments for method aggregate: (seqOp: (Unit, Int) => Unit, combOp: (Unit, Unit) => Unit)(implicit evidence$29: scala.reflect.ClassTag[Unit])Unit.
Unspecified value parameters seqOp, combOp.
       rdd1.aggregate()()
                       ^


rdd1.aggregate(intialValue)(seqOp, combOp)

def seqOp(Unit, Int): Unit = {

}

def combOp(Unit, Unit): Unit = {

}

--------------------------------------------------
val intialValue = 1

def seqOp(result: Int, data: Int): Int = {
  if( result > data) result else data
}

def combOp(result: Int, data: Int): Int = {
  result + data
}

rdd1.aggregate(intialValue)(seqOp, combOp)

--------------------------------------------------

scala> val intialValue = 1
intialValue: Int = 1

scala>

scala> def seqOp(result: Int, data: Int): Int = {
     |   if( result > data) result else data
     | }
seqOp: (result: Int, data: Int)Int

scala>

scala> def combOp(result: Int, data: Int): Int = {
     |   result + data
     | }
combOp: (result: Int, data: Int)Int

scala>

scala> rdd1.aggregate(intialValue)(seqOp, combOp)
res60: Int = 10






--------------------------------------------------

def max(a: Int, b: Int): Int = {
  if( a > b) a else b
}

def min(a: Int, b: Int): Int = {
  if( a < b) a else b
}

def mul(a: Int, b: Int): Int = {
  a * b
}

def sum(a: Int, b: Int): Int = {
  a + b
}


rdd1.aggregate(1)(min, mul)


--------------------------------------------------



scala> def max(a: Int, b: Int): Int = {
     |   if( a > b) a else b
     | }
max: (a: Int, b: Int)Int

scala>

scala> def min(a: Int, b: Int): Int = {
     |   if( a < b) a else b
     | }
min: (a: Int, b: Int)Int

scala>

scala> def mul(a: Int, b: Int): Int = {
     |   a * b
     | }
mul: (a: Int, b: Int)Int

scala>

scala> def sum(a: Int, b: Int): Int = {
     |   a + b
     | }
sum: (a: Int, b: Int)Int

scala> rdd1.aggregate(1)(min, mul)
res63: Int = 1





--------------------------------------------------
Average using `aggregate`
--------------------------------------------------
val intialValue = (0, 0)

def seqOp(result: (Int,Int), data: Int): (Int,Int) = {
  (result._1 + data, result._2 + 1)
}

def combOp(result: (Int,Int), data: (Int,Int)): (Int,Int) = {
  (result._1 + data._1, result._2 + data._2)
}

val (sum, count) = rdd1.aggregate(intialValue)(seqOp, combOp)

val avg = sum / count

val avg = sum / count.toDouble


--------------------------------------------------

scala> val intialValue = (0, 0)
intialValue: (Int, Int) = (0,0)

scala> def seqOp(result: (Int,Int), data: Int): (Int,Int) = {
     |   (result._1 + data, result._2 + 1)
     | }
seqOp: (result: (Int, Int), data: Int)(Int, Int)

scala> def combOp(result: (Int,Int), data: (Int,Int)): (Int,Int) = {
     |   (result._1 + data._1, result._2 + data._2)
     | }
combOp: (result: (Int, Int), data: (Int, Int))(Int, Int)

scala> val (sum, count) = rdd1.aggregate(intialValue)(seqOp, combOp)
sum: Int = 21
count: Int = 6

scala> val avg = sum / count
avg: Int = 3

scala> val avg = sum / count.toDouble
avg: Double = 3.5


NOte: Aggregate operation need `operator` has to follow below points
1. associative law
2. cummutive law




--------------------------------------------------






--------------------------------------------------






--------------------------------------------------








1 comment:

spark_streaming_examples

Create Spark Streaming Context: ========================================== scala: --------------- import org.apache.spark._ import ...