Monday 23 September 2019

SPARK_DAY1_PRACTICE

Spark:
---------------------
Apache Spark™ is a fast and general engine for large-scale data processing.


Spark Libraries on Spark-Core:
--------------------------------
1. Spark SQL
2. Spark Streaming
3. Spark MLLib
4. Spark GraphX


Spark Supports 4 Programming Languages:
------------------------------------------
Scala, Java, Python ,R


How to Start the Spark in Command Line:
--------------------------------------------
Scala => $SPARK_HOME/bin/spark-shell
Python => $SPARK_HOME/bin/pyspark
R => $SPARK_HOME/bin/sparkR


Spark-2.x:
--------------------------------------------
Spark context available as 'sc'
Spark session available as 'spark'


Spark-1.x:
--------------------------------------------
Spark context available as 'sc'
Spark SQLContext available as 'sqlContext'


Note:
------------
`Spark Context` is `Entry Point` for any `Spark Operations`.


Resilient Distributed DataSets (RDD):
--------------------------------------------

RDD Features:
----------------
1. Immutability
2. Lazy Evaluation
3. Cacheable
4. Type Infer


RDD Operations:
-----------------
1. Transformations

Convert `RDD` into `RDD`

ex: old_rdd ==> new_rdd


2. Actions

Convert `RDD` into `Result`

ex: rdd ==> result



How to Create a RDD?
-----------------------------------
We can create RDD in 2 ways
1. from collections (List, Set, Seq, ...)
2. from data sets (text, csv, tsv, json, ...)


How to create RDD from `collections`?
---------------------------------------
val list = List(1,2,3,4,5,6)

val rdd = sc.parallelize(<your collection>, <no.of partitions>)
val rdd = sc.parallelize(list,6)

Create a RDD with bydefault Partitions:
--------------------------------------
val rdd = sc.parallelize(list)


scala> val list = List(1,2,3,4,5,6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)

scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> rdd.getNumPartitions
res0: Int = 4


Create a RDD with 2 Partitions:
--------------------------------------
val rdd = sc.parallelize(list, 2)

scala> val rdd = sc.parallelize(list, 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:26

scala> rdd.getNumPartitions
res1: Int = 2

---------------------------------------------------

scala> rdd.collect
res3: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> rdd.collect.foreach(println)
1
2
3
4
5
6


---------------------------------------------------

scala> rdd.foreach(println)
1
2
3
4
5
6

scala> rdd.foreach(println)
4
1
2
3
5
6


---------------------------------------------------

Note:
------------
`collect` will collect the data from `all the partitions`

`collect` will ensure the `order of the data`

Don't use `collect` in `Real-Time Use Cases`, the reason is `It will end up with MEMORY issues`

---------------------------------------------------

x.map(data => (data, data))

or

x.map(y => (y, y))

or

x.map(z => (z, z))

or

def f(p : Int) : (Int, Int) = {
 (p,p)
}

x.map(f)


---------------------------------------------------
Importance of `mapPartitionsWithIndex`
---------------------------------------------------

scala> rdd.mapPartitionsWithIndex
<console>:29: error: missing argument list for method mapPartitionsWithIndex in class RDD
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `mapPartitionsWithIndex _` or `mapPartitionsWithIndex(_,_)(_)` instead of `mapPartitionsWithIndex`.
       rdd.mapPartitionsWithIndex
           ^

scala> rdd.mapPartitionsWithIndex()
<console>:29: error: not enough arguments for method mapPartitionsWithIndex: (f: (Int, Iterator[Int]) => Iterator[U], preservesPartitioning: Boolean)(implicit evidence$8: scala.reflect.ClassTag[U])org.apache.spark.rdd.RDD[U].
Unspecified value parameter f.
       rdd.mapPartitionsWithIndex()
                                 ^

---------------------------------------------------

def mapPartitionsWithIndex(f: (Int, Iterator[Int]) => Iterator[U], preservesPartitioning: Boolean)

def f(Int, Iterator[Int]) : Iterator[U]

---------------------------------------------------


def myFunc1(index: Int, it: Iterator[Int]) : Iterator[String]= {
 it.toList.map(data => s"index: $index, data: $data").iterator
}




---------------------------------------------------

rdd.mapPartitionsWithIndex(myFunc1)

rdd.mapPartitionsWithIndex(myFunc1).collect

rdd.mapPartitionsWithIndex(myFunc1).collect.foreach(println)

---------------------------------------------------

scala> def myFunc1(index: Int, it: Iterator[Int]) : Iterator[String]= {
     |  it.toList.map(data => s"index: $index, data: $data").iterator
     | }
myFunc1: (index: Int, it: Iterator[Int])Iterator[String]


---------------------------------------------------

scala> rdd.mapPartitionsWithIndex(myFunc1)
res28: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at mapPartitionsWithIndex at <console>:31

scala> rdd.mapPartitionsWithIndex(myFunc1).collect
res29: Array[String] = Array(index: 0, data: 1, index: 0, data: 2, index: 0, data: 3, index: 1, data: 4, index: 1, data: 5, index: 1, data: 6)

scala> rdd.mapPartitionsWithIndex(myFunc1).collect.foreach(println)
index: 0, data: 1
index: 0, data: 2
index: 0, data: 3
index: 1, data: 4
index: 1, data: 5
index: 1, data: 6


---------------------------------------------------

val names = List("kalyan", "anil", "raj", "sunil", "rajesh", "dev")

val rdd1 = sc.parallelize(names, 2)


// the below one won't work
rdd1.mapPartitionsWithIndex(myFunc1)


---------------------------------------------------

scala> val names = List("kalyan", "anil", "raj", "sunil", "rajesh", "dev")
names: List[String] = List(kalyan, anil, raj, sunil, rajesh, dev)

scala> val rdd1 = sc.parallelize(names, 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:26

---------------------------------------------------

scala> rdd1.mapPartitionsWithIndex(myFunc1)
<console>:31: error: type mismatch;
 found   : (Int, Iterator[Int]) => Iterator[String]
 required: (Int, Iterator[String]) => Iterator[?]
Error occurred in an application involving default arguments.
       rdd1.mapPartitionsWithIndex(myFunc1)
                                   ^




---------------------------------------------------


def myFunc2(index: Int, it: Iterator[String]) : Iterator[String]= {
 it.toList.map(data => s"index: $index, data: $data").iterator
}

rdd1.mapPartitionsWithIndex(myFunc2)

---------------------------------------------------

scala> def myFunc2(index: Int, it: Iterator[String]) : Iterator[String]= {
     |  it.toList.map(data => s"index: $index, data: $data").iterator
     | }
myFunc2: (index: Int, it: Iterator[String])Iterator[String]

scala> rdd1.mapPartitionsWithIndex(myFunc2)
res32: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at mapPartitionsWithIndex at <console>:31

scala> rdd1.mapPartitionsWithIndex(myFunc2).collect
res33: Array[String] = Array(index: 0, data: kalyan, index: 0, data: anil, index: 0, data: raj, index: 1, data: sunil, index: 1, data: rajesh, index: 1, data: dev)

scala> rdd1.mapPartitionsWithIndex(myFunc2).collect.foreach(println)
index: 0, data: kalyan
index: 0, data: anil
index: 0, data: raj
index: 1, data: sunil
index: 1, data: rajesh
index: 1, data: dev



---------------------------------------------------
How to create `Generalized Function`:
------------------------------------------
def myFunc[T](index: Int, it: Iterator[T]) : Iterator[String]= {
 it.toList.map(data => s"index: $index, data: $data").iterator
}

rdd.mapPartitionsWithIndex(myFunc).collect.foreach(println)

rdd1.mapPartitionsWithIndex(myFunc).collect.foreach(println)


---------------------------------------------------


scala> def myFunc[T](index: Int, it: Iterator[T]) : Iterator[String]= {
     |  it.toList.map(data => s"index: $index, data: $data").iterator
     | }
myFunc: [T](index: Int, it: Iterator[T])Iterator[String]

scala> rdd.mapPartitionsWithIndex(myFunc).collect.foreach(println)
index: 0, data: 1
index: 0, data: 2
index: 0, data: 3
index: 1, data: 4
index: 1, data: 5
index: 1, data: 6

scala> rdd1.mapPartitionsWithIndex(myFunc).collect.foreach(println)
index: 0, data: kalyan
index: 0, data: anil
index: 0, data: raj
index: 1, data: sunil
index: 1, data: rajesh
index: 1, data: dev

---------------------------------------------------

scala> rdd.aggregate
<console>:29: error: missing argument list for method aggregate in class RDD
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `aggregate _` or `aggregate(_)(_,_)(_)` instead of `aggregate`.
       rdd.aggregate
           ^

scala> rdd.aggregate()
<console>:29: error: missing argument list for method aggregate in class RDD
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `aggregate _` or `aggregate(_)(_,_)(_)` instead of `aggregate`.
       rdd.aggregate()
                    ^

scala> rdd.aggregate()()
<console>:29: error: not enough arguments for method aggregate: (seqOp: (Unit, Int) => Unit, combOp: (Unit, Unit) => Unit)(implicit evidence$29: scala.reflect.ClassTag[Unit])Unit.
Unspecified value parameters seqOp, combOp.
       rdd.aggregate()()
                      ^

---------------------------------------------------

def aggregate(zeroValue)(seqOp: (Unit, Int) => Unit, combOp: (Unit, Unit) => Unit)

---------------------------------------------------

val zeroValue = Int

def seqOp(Unit, Int) : Unit = {}

def combOp(Unit, Unit) : Unit = {}

Note: `Unit` is the mainly expected `Return Type`

---------------------------------------------------
Implementation of `aggregate`
---------------------------------------------------

Example1: Sum the RDD (Unit => Int)
---------------------------------------------
Note: replce `Unit` with `Int` all the places

val zeroValue = 0

def seqOp(res: Int, data: Int) : Int = {
 res + data
}

def combOp(res1: Int, res2: Int) : Int = {
 res1 + res2
}

rdd.aggregate(zeroValue)(seqOp,combOp)

---------------------------------------------------

scala> val zeroValue = 0
zeroValue: Int = 0

scala> def seqOp(res: Int, data: Int) : Int = {
     |  res + data
     | }
seqOp: (res: Int, data: Int)Int

scala> def combOp(res1: Int, res2: Int) : Int = {
     |  res1 + res2
     | }
combOp: (res1: Int, res2: Int)Int

scala> rdd.aggregate(zeroValue)(seqOp,combOp)
res40: Int = 21



---------------------------------------------------

Example2: Multiply the RDD (Unit => Int)
---------------------------------------------
Note: replce `Unit` with `Int` all the places

val zeroValue = 1

def seqOp(res: Int, data: Int) : Int = {
 res * data
}

def combOp(res1: Int, res2: Int) : Int = {
 res1 * res2
}

rdd.aggregate(zeroValue)(seqOp,combOp)


---------------------------------------------------

scala> val zeroValue = 1
zeroValue: Int = 1

scala> def seqOp(res: Int, data: Int) : Int = {
     |  res * data
     | }
seqOp: (res: Int, data: Int)Int

scala> def combOp(res1: Int, res2: Int) : Int = {
     |  res1 * res2
     | }
combOp: (res1: Int, res2: Int)Int

scala> rdd.aggregate(zeroValue)(seqOp,combOp)
res41: Int = 720


---------------------------------------------------
Find the `average` of 1 to 10 using `aggregate` function?
---------------------------------------------------


---------------------------------------------------



---------------------------------------------------

scala> rdd.reduce
<console>:29: error: missing argument list for method reduce in class RDD
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `reduce _` or `reduce(_)` instead of `reduce`.
       rdd.reduce
           ^

scala> rdd.reduce()
<console>:29: error: not enough arguments for method reduce: (f: (Int, Int) => Int)Int.
Unspecified value parameter f.
       rdd.reduce()
                 ^
---------------------------------------------------

def f(Int, Int) : Int

---------------------------------------------------

def sum(res: Int, data: Int) : Int = {res + data}

def mul(res: Int, data: Int) : Int = {res * data}

---------------------------------------------------

scala> def sum(res: Int, data: Int) : Int = {res + data}
sum: (res: Int, data: Int)Int

scala> def mul(res: Int, data: Int) : Int = {res * data}
mul: (res: Int, data: Int)Int

scala> rdd.reduce(sum)
res44: Int = 21

scala> rdd.reduce(mul)
res45: Int = 720

---------------------------------------------------
Compare `aggregate` and `reduce`:
-----------------------------------------

aggregate(zeroValue)(seqOp, comOp)


aggregate(intialValue)(seqOp, seqOp)

<==>

reduce(seqOp)

---------------------------------------------------

Transformations in Spark:
----------------------------
scala> rdd.collect.foreach(println)
1
2
3
4
5
6

scala> rdd.map(x => x + 1)
res49: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[15] at map at <console>:29

scala> rdd.map(x => x + 1).collect.foreach(println)
2
3
4
5
6
7

scala> rdd.map(x => x * x).collect.foreach(println)
1
4
9
16
25
36

scala> rdd.map(x => x * x * x).collect.foreach(println)
1
8
27
64
125
216

---------------------------------------------------

scala> rdd.filter(x => x > 3)
res53: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[19] at filter at <console>:29

scala> rdd.filter(x => x > 3).collect.foreach(println)
4
5
6

scala> rdd.filter(x => x % 2 == 1).collect.foreach(println)
1
3
5

scala> rdd.filter(x => x % 2 == 0).collect.foreach(println)
2
4
6


---------------------------------------------------

scala> rdd.filter(x => x % 2 == 0).collect.foreach(println)
2
4
6

scala> rdd.filter(_ % 2 == 0).collect.foreach(println)
2
4
6

scala> rdd.filter(x => x > 3).collect.foreach(println)
4
5
6

scala> rdd.filter(_ > 3).collect.foreach(println)
4
5
6

scala> rdd.filter(_ > 3).collect.foreach(x => println(x))
[Stage 45:>                                                         (0 +                                                                        4     
5
6

scala> rdd.filter(_ > 3).collect.foreach(println)
4
5
6

scala> rdd.filter(_ > 3).collect.foreach(println(_))
4
5
6


---------------------------------------------------

val data = sc.textFile("hdfs:///user/tsipl1059/lakshmi/spark_sample.txt")
data: org.apache.spark.rdd.RDD[String] = hdfs:///user/tsipl1059/lakshmi/spark_sample.txt MapPartitionsRDD[1] at textFile at <console>:24

data.collect()
res0: Array[String] = Array(this is my first spark smaple data., Load data into RDD using scala language., "testing the data ", count the number of lines in a file)

---------------------------------------------------

---------------------------------------------------

val r2 = sc.textFile("/user/tsipl1115/input.txt") =====================> where file located in hdfs and moving that into rdd

---------------------------------------------------

val r1 = sc.textFile("/home/tsipl1115/input.txt")========================> where file located in local and moving that into rdd

---------------------------------------------------


---------------------------------------------------

---------------------------------------------------


---------------------------------------------------



---------------------------------------------------


---------------------------------------------------

---------------------------------------------------


---------------------------------------------------



---------------------------------------------------


---------------------------------------------------

1 comment:

spark_streaming_examples

Create Spark Streaming Context: ========================================== scala: --------------- import org.apache.spark._ import ...