Spark:
---------------------
Apache Spark™ is a fast and general engine for large-scale data processing.
Spark Libraries on Spark-Core:
--------------------------------
1. Spark SQL
2. Spark Streaming
3. Spark MLLib
4. Spark GraphX
Spark Supports 4 Programming Languages:
------------------------------------------
Scala, Java, Python ,R
How to Start the Spark in Command Line:
--------------------------------------------
Scala => $SPARK_HOME/bin/spark-shell
Python => $SPARK_HOME/bin/pyspark
R => $SPARK_HOME/bin/sparkR
Spark-2.x:
--------------------------------------------
Spark context available as 'sc'
Spark session available as 'spark'
Spark-1.x:
--------------------------------------------
Spark context available as 'sc'
Spark SQLContext available as 'sqlContext'
Note:
------------
`Spark Context` is `Entry Point` for any `Spark Operations`.
Resilient Distributed DataSets (RDD):
--------------------------------------------
RDD Features:
----------------
1. Immutability
2. Lazy Evaluation
3. Cacheable
4. Type Infer
RDD Operations:
-----------------
1. Transformations
Convert `RDD` into `RDD`
ex: old_rdd ==> new_rdd
2. Actions
Convert `RDD` into `Result`
ex: rdd ==> result
How to Create a RDD?
-----------------------------------
We can create RDD in 2 ways
1. from collections (List, Set, Seq, ...)
2. from data sets (text, csv, tsv, json, ...)
How to create RDD from `collections`?
---------------------------------------
val list = List(1,2,3,4,5,6)
val rdd = sc.parallelize(<your collection>, <no.of partitions>)
val rdd = sc.parallelize(list,6)
Create a RDD with bydefault Partitions:
--------------------------------------
val rdd = sc.parallelize(list)
scala> val list = List(1,2,3,4,5,6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> rdd.getNumPartitions
res0: Int = 4
Create a RDD with 2 Partitions:
--------------------------------------
val rdd = sc.parallelize(list, 2)
scala> val rdd = sc.parallelize(list, 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:26
scala> rdd.getNumPartitions
res1: Int = 2
---------------------------------------------------
scala> rdd.collect
res3: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> rdd.collect.foreach(println)
1
2
3
4
5
6
---------------------------------------------------
scala> rdd.foreach(println)
1
2
3
4
5
6
scala> rdd.foreach(println)
4
1
2
3
5
6
---------------------------------------------------
Note:
------------
`collect` will collect the data from `all the partitions`
`collect` will ensure the `order of the data`
Don't use `collect` in `Real-Time Use Cases`, the reason is `It will end up with MEMORY issues`
---------------------------------------------------
x.map(data => (data, data))
or
x.map(y => (y, y))
or
x.map(z => (z, z))
or
def f(p : Int) : (Int, Int) = {
(p,p)
}
x.map(f)
---------------------------------------------------
Importance of `mapPartitionsWithIndex`
---------------------------------------------------
scala> rdd.mapPartitionsWithIndex
<console>:29: error: missing argument list for method mapPartitionsWithIndex in class RDD
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `mapPartitionsWithIndex _` or `mapPartitionsWithIndex(_,_)(_)` instead of `mapPartitionsWithIndex`.
rdd.mapPartitionsWithIndex
^
scala> rdd.mapPartitionsWithIndex()
<console>:29: error: not enough arguments for method mapPartitionsWithIndex: (f: (Int, Iterator[Int]) => Iterator[U], preservesPartitioning: Boolean)(implicit evidence$8: scala.reflect.ClassTag[U])org.apache.spark.rdd.RDD[U].
Unspecified value parameter f.
rdd.mapPartitionsWithIndex()
^
---------------------------------------------------
def mapPartitionsWithIndex(f: (Int, Iterator[Int]) => Iterator[U], preservesPartitioning: Boolean)
def f(Int, Iterator[Int]) : Iterator[U]
---------------------------------------------------
def myFunc1(index: Int, it: Iterator[Int]) : Iterator[String]= {
it.toList.map(data => s"index: $index, data: $data").iterator
}
---------------------------------------------------
rdd.mapPartitionsWithIndex(myFunc1)
rdd.mapPartitionsWithIndex(myFunc1).collect
rdd.mapPartitionsWithIndex(myFunc1).collect.foreach(println)
---------------------------------------------------
scala> def myFunc1(index: Int, it: Iterator[Int]) : Iterator[String]= {
| it.toList.map(data => s"index: $index, data: $data").iterator
| }
myFunc1: (index: Int, it: Iterator[Int])Iterator[String]
---------------------------------------------------
scala> rdd.mapPartitionsWithIndex(myFunc1)
res28: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at mapPartitionsWithIndex at <console>:31
scala> rdd.mapPartitionsWithIndex(myFunc1).collect
res29: Array[String] = Array(index: 0, data: 1, index: 0, data: 2, index: 0, data: 3, index: 1, data: 4, index: 1, data: 5, index: 1, data: 6)
scala> rdd.mapPartitionsWithIndex(myFunc1).collect.foreach(println)
index: 0, data: 1
index: 0, data: 2
index: 0, data: 3
index: 1, data: 4
index: 1, data: 5
index: 1, data: 6
---------------------------------------------------
val names = List("kalyan", "anil", "raj", "sunil", "rajesh", "dev")
val rdd1 = sc.parallelize(names, 2)
// the below one won't work
rdd1.mapPartitionsWithIndex(myFunc1)
---------------------------------------------------
scala> val names = List("kalyan", "anil", "raj", "sunil", "rajesh", "dev")
names: List[String] = List(kalyan, anil, raj, sunil, rajesh, dev)
scala> val rdd1 = sc.parallelize(names, 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:26
---------------------------------------------------
scala> rdd1.mapPartitionsWithIndex(myFunc1)
<console>:31: error: type mismatch;
found : (Int, Iterator[Int]) => Iterator[String]
required: (Int, Iterator[String]) => Iterator[?]
Error occurred in an application involving default arguments.
rdd1.mapPartitionsWithIndex(myFunc1)
^
---------------------------------------------------
def myFunc2(index: Int, it: Iterator[String]) : Iterator[String]= {
it.toList.map(data => s"index: $index, data: $data").iterator
}
rdd1.mapPartitionsWithIndex(myFunc2)
---------------------------------------------------
scala> def myFunc2(index: Int, it: Iterator[String]) : Iterator[String]= {
| it.toList.map(data => s"index: $index, data: $data").iterator
| }
myFunc2: (index: Int, it: Iterator[String])Iterator[String]
scala> rdd1.mapPartitionsWithIndex(myFunc2)
res32: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at mapPartitionsWithIndex at <console>:31
scala> rdd1.mapPartitionsWithIndex(myFunc2).collect
res33: Array[String] = Array(index: 0, data: kalyan, index: 0, data: anil, index: 0, data: raj, index: 1, data: sunil, index: 1, data: rajesh, index: 1, data: dev)
scala> rdd1.mapPartitionsWithIndex(myFunc2).collect.foreach(println)
index: 0, data: kalyan
index: 0, data: anil
index: 0, data: raj
index: 1, data: sunil
index: 1, data: rajesh
index: 1, data: dev
---------------------------------------------------
How to create `Generalized Function`:
------------------------------------------
def myFunc[T](index: Int, it: Iterator[T]) : Iterator[String]= {
it.toList.map(data => s"index: $index, data: $data").iterator
}
rdd.mapPartitionsWithIndex(myFunc).collect.foreach(println)
rdd1.mapPartitionsWithIndex(myFunc).collect.foreach(println)
---------------------------------------------------
scala> def myFunc[T](index: Int, it: Iterator[T]) : Iterator[String]= {
| it.toList.map(data => s"index: $index, data: $data").iterator
| }
myFunc: [T](index: Int, it: Iterator[T])Iterator[String]
scala> rdd.mapPartitionsWithIndex(myFunc).collect.foreach(println)
index: 0, data: 1
index: 0, data: 2
index: 0, data: 3
index: 1, data: 4
index: 1, data: 5
index: 1, data: 6
scala> rdd1.mapPartitionsWithIndex(myFunc).collect.foreach(println)
index: 0, data: kalyan
index: 0, data: anil
index: 0, data: raj
index: 1, data: sunil
index: 1, data: rajesh
index: 1, data: dev
---------------------------------------------------
scala> rdd.aggregate
<console>:29: error: missing argument list for method aggregate in class RDD
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `aggregate _` or `aggregate(_)(_,_)(_)` instead of `aggregate`.
rdd.aggregate
^
scala> rdd.aggregate()
<console>:29: error: missing argument list for method aggregate in class RDD
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `aggregate _` or `aggregate(_)(_,_)(_)` instead of `aggregate`.
rdd.aggregate()
^
scala> rdd.aggregate()()
<console>:29: error: not enough arguments for method aggregate: (seqOp: (Unit, Int) => Unit, combOp: (Unit, Unit) => Unit)(implicit evidence$29: scala.reflect.ClassTag[Unit])Unit.
Unspecified value parameters seqOp, combOp.
rdd.aggregate()()
^
---------------------------------------------------
def aggregate(zeroValue)(seqOp: (Unit, Int) => Unit, combOp: (Unit, Unit) => Unit)
---------------------------------------------------
val zeroValue = Int
def seqOp(Unit, Int) : Unit = {}
def combOp(Unit, Unit) : Unit = {}
Note: `Unit` is the mainly expected `Return Type`
---------------------------------------------------
Implementation of `aggregate`
---------------------------------------------------
Example1: Sum the RDD (Unit => Int)
---------------------------------------------
Note: replce `Unit` with `Int` all the places
val zeroValue = 0
def seqOp(res: Int, data: Int) : Int = {
res + data
}
def combOp(res1: Int, res2: Int) : Int = {
res1 + res2
}
rdd.aggregate(zeroValue)(seqOp,combOp)
---------------------------------------------------
scala> val zeroValue = 0
zeroValue: Int = 0
scala> def seqOp(res: Int, data: Int) : Int = {
| res + data
| }
seqOp: (res: Int, data: Int)Int
scala> def combOp(res1: Int, res2: Int) : Int = {
| res1 + res2
| }
combOp: (res1: Int, res2: Int)Int
scala> rdd.aggregate(zeroValue)(seqOp,combOp)
res40: Int = 21
---------------------------------------------------
Example2: Multiply the RDD (Unit => Int)
---------------------------------------------
Note: replce `Unit` with `Int` all the places
val zeroValue = 1
def seqOp(res: Int, data: Int) : Int = {
res * data
}
def combOp(res1: Int, res2: Int) : Int = {
res1 * res2
}
rdd.aggregate(zeroValue)(seqOp,combOp)
---------------------------------------------------
scala> val zeroValue = 1
zeroValue: Int = 1
scala> def seqOp(res: Int, data: Int) : Int = {
| res * data
| }
seqOp: (res: Int, data: Int)Int
scala> def combOp(res1: Int, res2: Int) : Int = {
| res1 * res2
| }
combOp: (res1: Int, res2: Int)Int
scala> rdd.aggregate(zeroValue)(seqOp,combOp)
res41: Int = 720
---------------------------------------------------
Find the `average` of 1 to 10 using `aggregate` function?
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------
scala> rdd.reduce
<console>:29: error: missing argument list for method reduce in class RDD
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `reduce _` or `reduce(_)` instead of `reduce`.
rdd.reduce
^
scala> rdd.reduce()
<console>:29: error: not enough arguments for method reduce: (f: (Int, Int) => Int)Int.
Unspecified value parameter f.
rdd.reduce()
^
---------------------------------------------------
def f(Int, Int) : Int
---------------------------------------------------
def sum(res: Int, data: Int) : Int = {res + data}
def mul(res: Int, data: Int) : Int = {res * data}
---------------------------------------------------
scala> def sum(res: Int, data: Int) : Int = {res + data}
sum: (res: Int, data: Int)Int
scala> def mul(res: Int, data: Int) : Int = {res * data}
mul: (res: Int, data: Int)Int
scala> rdd.reduce(sum)
res44: Int = 21
scala> rdd.reduce(mul)
res45: Int = 720
---------------------------------------------------
Compare `aggregate` and `reduce`:
-----------------------------------------
aggregate(zeroValue)(seqOp, comOp)
aggregate(intialValue)(seqOp, seqOp)
<==>
reduce(seqOp)
---------------------------------------------------
Transformations in Spark:
----------------------------
scala> rdd.collect.foreach(println)
1
2
3
4
5
6
scala> rdd.map(x => x + 1)
res49: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[15] at map at <console>:29
scala> rdd.map(x => x + 1).collect.foreach(println)
2
3
4
5
6
7
scala> rdd.map(x => x * x).collect.foreach(println)
1
4
9
16
25
36
scala> rdd.map(x => x * x * x).collect.foreach(println)
1
8
27
64
125
216
---------------------------------------------------
scala> rdd.filter(x => x > 3)
res53: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[19] at filter at <console>:29
scala> rdd.filter(x => x > 3).collect.foreach(println)
4
5
6
scala> rdd.filter(x => x % 2 == 1).collect.foreach(println)
1
3
5
scala> rdd.filter(x => x % 2 == 0).collect.foreach(println)
2
4
6
---------------------------------------------------
scala> rdd.filter(x => x % 2 == 0).collect.foreach(println)
2
4
6
scala> rdd.filter(_ % 2 == 0).collect.foreach(println)
2
4
6
scala> rdd.filter(x => x > 3).collect.foreach(println)
4
5
6
scala> rdd.filter(_ > 3).collect.foreach(println)
4
5
6
scala> rdd.filter(_ > 3).collect.foreach(x => println(x))
[Stage 45:> (0 + 4
5
6
scala> rdd.filter(_ > 3).collect.foreach(println)
4
5
6
scala> rdd.filter(_ > 3).collect.foreach(println(_))
4
5
6
---------------------------------------------------
val data = sc.textFile("hdfs:///user/tsipl1059/lakshmi/spark_sample.txt")
data: org.apache.spark.rdd.RDD[String] = hdfs:///user/tsipl1059/lakshmi/spark_sample.txt MapPartitionsRDD[1] at textFile at <console>:24
data.collect()
res0: Array[String] = Array(this is my first spark smaple data., Load data into RDD using scala language., "testing the data ", count the number of lines in a file)
---------------------------------------------------
---------------------------------------------------
val r2 = sc.textFile("/user/tsipl1115/input.txt") =====================> where file located in hdfs and moving that into rdd
---------------------------------------------------
val r1 = sc.textFile("/home/tsipl1115/input.txt")========================> where file located in local and moving that into rdd
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------
---------------------
Apache Spark™ is a fast and general engine for large-scale data processing.
Spark Libraries on Spark-Core:
--------------------------------
1. Spark SQL
2. Spark Streaming
3. Spark MLLib
4. Spark GraphX
Spark Supports 4 Programming Languages:
------------------------------------------
Scala, Java, Python ,R
How to Start the Spark in Command Line:
--------------------------------------------
Scala => $SPARK_HOME/bin/spark-shell
Python => $SPARK_HOME/bin/pyspark
R => $SPARK_HOME/bin/sparkR
Spark-2.x:
--------------------------------------------
Spark context available as 'sc'
Spark session available as 'spark'
Spark-1.x:
--------------------------------------------
Spark context available as 'sc'
Spark SQLContext available as 'sqlContext'
Note:
------------
`Spark Context` is `Entry Point` for any `Spark Operations`.
Resilient Distributed DataSets (RDD):
--------------------------------------------
RDD Features:
----------------
1. Immutability
2. Lazy Evaluation
3. Cacheable
4. Type Infer
RDD Operations:
-----------------
1. Transformations
Convert `RDD` into `RDD`
ex: old_rdd ==> new_rdd
2. Actions
Convert `RDD` into `Result`
ex: rdd ==> result
How to Create a RDD?
-----------------------------------
We can create RDD in 2 ways
1. from collections (List, Set, Seq, ...)
2. from data sets (text, csv, tsv, json, ...)
How to create RDD from `collections`?
---------------------------------------
val list = List(1,2,3,4,5,6)
val rdd = sc.parallelize(<your collection>, <no.of partitions>)
val rdd = sc.parallelize(list,6)
Create a RDD with bydefault Partitions:
--------------------------------------
val rdd = sc.parallelize(list)
scala> val list = List(1,2,3,4,5,6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> rdd.getNumPartitions
res0: Int = 4
Create a RDD with 2 Partitions:
--------------------------------------
val rdd = sc.parallelize(list, 2)
scala> val rdd = sc.parallelize(list, 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:26
scala> rdd.getNumPartitions
res1: Int = 2
---------------------------------------------------
scala> rdd.collect
res3: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> rdd.collect.foreach(println)
1
2
3
4
5
6
---------------------------------------------------
scala> rdd.foreach(println)
1
2
3
4
5
6
scala> rdd.foreach(println)
4
1
2
3
5
6
---------------------------------------------------
Note:
------------
`collect` will collect the data from `all the partitions`
`collect` will ensure the `order of the data`
Don't use `collect` in `Real-Time Use Cases`, the reason is `It will end up with MEMORY issues`
---------------------------------------------------
x.map(data => (data, data))
or
x.map(y => (y, y))
or
x.map(z => (z, z))
or
def f(p : Int) : (Int, Int) = {
(p,p)
}
x.map(f)
---------------------------------------------------
Importance of `mapPartitionsWithIndex`
---------------------------------------------------
scala> rdd.mapPartitionsWithIndex
<console>:29: error: missing argument list for method mapPartitionsWithIndex in class RDD
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `mapPartitionsWithIndex _` or `mapPartitionsWithIndex(_,_)(_)` instead of `mapPartitionsWithIndex`.
rdd.mapPartitionsWithIndex
^
scala> rdd.mapPartitionsWithIndex()
<console>:29: error: not enough arguments for method mapPartitionsWithIndex: (f: (Int, Iterator[Int]) => Iterator[U], preservesPartitioning: Boolean)(implicit evidence$8: scala.reflect.ClassTag[U])org.apache.spark.rdd.RDD[U].
Unspecified value parameter f.
rdd.mapPartitionsWithIndex()
^
---------------------------------------------------
def mapPartitionsWithIndex(f: (Int, Iterator[Int]) => Iterator[U], preservesPartitioning: Boolean)
def f(Int, Iterator[Int]) : Iterator[U]
---------------------------------------------------
def myFunc1(index: Int, it: Iterator[Int]) : Iterator[String]= {
it.toList.map(data => s"index: $index, data: $data").iterator
}
---------------------------------------------------
rdd.mapPartitionsWithIndex(myFunc1)
rdd.mapPartitionsWithIndex(myFunc1).collect
rdd.mapPartitionsWithIndex(myFunc1).collect.foreach(println)
---------------------------------------------------
scala> def myFunc1(index: Int, it: Iterator[Int]) : Iterator[String]= {
| it.toList.map(data => s"index: $index, data: $data").iterator
| }
myFunc1: (index: Int, it: Iterator[Int])Iterator[String]
---------------------------------------------------
scala> rdd.mapPartitionsWithIndex(myFunc1)
res28: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at mapPartitionsWithIndex at <console>:31
scala> rdd.mapPartitionsWithIndex(myFunc1).collect
res29: Array[String] = Array(index: 0, data: 1, index: 0, data: 2, index: 0, data: 3, index: 1, data: 4, index: 1, data: 5, index: 1, data: 6)
scala> rdd.mapPartitionsWithIndex(myFunc1).collect.foreach(println)
index: 0, data: 1
index: 0, data: 2
index: 0, data: 3
index: 1, data: 4
index: 1, data: 5
index: 1, data: 6
---------------------------------------------------
val names = List("kalyan", "anil", "raj", "sunil", "rajesh", "dev")
val rdd1 = sc.parallelize(names, 2)
// the below one won't work
rdd1.mapPartitionsWithIndex(myFunc1)
---------------------------------------------------
scala> val names = List("kalyan", "anil", "raj", "sunil", "rajesh", "dev")
names: List[String] = List(kalyan, anil, raj, sunil, rajesh, dev)
scala> val rdd1 = sc.parallelize(names, 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:26
---------------------------------------------------
scala> rdd1.mapPartitionsWithIndex(myFunc1)
<console>:31: error: type mismatch;
found : (Int, Iterator[Int]) => Iterator[String]
required: (Int, Iterator[String]) => Iterator[?]
Error occurred in an application involving default arguments.
rdd1.mapPartitionsWithIndex(myFunc1)
^
---------------------------------------------------
def myFunc2(index: Int, it: Iterator[String]) : Iterator[String]= {
it.toList.map(data => s"index: $index, data: $data").iterator
}
rdd1.mapPartitionsWithIndex(myFunc2)
---------------------------------------------------
scala> def myFunc2(index: Int, it: Iterator[String]) : Iterator[String]= {
| it.toList.map(data => s"index: $index, data: $data").iterator
| }
myFunc2: (index: Int, it: Iterator[String])Iterator[String]
scala> rdd1.mapPartitionsWithIndex(myFunc2)
res32: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at mapPartitionsWithIndex at <console>:31
scala> rdd1.mapPartitionsWithIndex(myFunc2).collect
res33: Array[String] = Array(index: 0, data: kalyan, index: 0, data: anil, index: 0, data: raj, index: 1, data: sunil, index: 1, data: rajesh, index: 1, data: dev)
scala> rdd1.mapPartitionsWithIndex(myFunc2).collect.foreach(println)
index: 0, data: kalyan
index: 0, data: anil
index: 0, data: raj
index: 1, data: sunil
index: 1, data: rajesh
index: 1, data: dev
---------------------------------------------------
How to create `Generalized Function`:
------------------------------------------
def myFunc[T](index: Int, it: Iterator[T]) : Iterator[String]= {
it.toList.map(data => s"index: $index, data: $data").iterator
}
rdd.mapPartitionsWithIndex(myFunc).collect.foreach(println)
rdd1.mapPartitionsWithIndex(myFunc).collect.foreach(println)
---------------------------------------------------
scala> def myFunc[T](index: Int, it: Iterator[T]) : Iterator[String]= {
| it.toList.map(data => s"index: $index, data: $data").iterator
| }
myFunc: [T](index: Int, it: Iterator[T])Iterator[String]
scala> rdd.mapPartitionsWithIndex(myFunc).collect.foreach(println)
index: 0, data: 1
index: 0, data: 2
index: 0, data: 3
index: 1, data: 4
index: 1, data: 5
index: 1, data: 6
scala> rdd1.mapPartitionsWithIndex(myFunc).collect.foreach(println)
index: 0, data: kalyan
index: 0, data: anil
index: 0, data: raj
index: 1, data: sunil
index: 1, data: rajesh
index: 1, data: dev
---------------------------------------------------
scala> rdd.aggregate
<console>:29: error: missing argument list for method aggregate in class RDD
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `aggregate _` or `aggregate(_)(_,_)(_)` instead of `aggregate`.
rdd.aggregate
^
scala> rdd.aggregate()
<console>:29: error: missing argument list for method aggregate in class RDD
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `aggregate _` or `aggregate(_)(_,_)(_)` instead of `aggregate`.
rdd.aggregate()
^
scala> rdd.aggregate()()
<console>:29: error: not enough arguments for method aggregate: (seqOp: (Unit, Int) => Unit, combOp: (Unit, Unit) => Unit)(implicit evidence$29: scala.reflect.ClassTag[Unit])Unit.
Unspecified value parameters seqOp, combOp.
rdd.aggregate()()
^
---------------------------------------------------
def aggregate(zeroValue)(seqOp: (Unit, Int) => Unit, combOp: (Unit, Unit) => Unit)
---------------------------------------------------
val zeroValue = Int
def seqOp(Unit, Int) : Unit = {}
def combOp(Unit, Unit) : Unit = {}
Note: `Unit` is the mainly expected `Return Type`
---------------------------------------------------
Implementation of `aggregate`
---------------------------------------------------
Example1: Sum the RDD (Unit => Int)
---------------------------------------------
Note: replce `Unit` with `Int` all the places
val zeroValue = 0
def seqOp(res: Int, data: Int) : Int = {
res + data
}
def combOp(res1: Int, res2: Int) : Int = {
res1 + res2
}
rdd.aggregate(zeroValue)(seqOp,combOp)
---------------------------------------------------
scala> val zeroValue = 0
zeroValue: Int = 0
scala> def seqOp(res: Int, data: Int) : Int = {
| res + data
| }
seqOp: (res: Int, data: Int)Int
scala> def combOp(res1: Int, res2: Int) : Int = {
| res1 + res2
| }
combOp: (res1: Int, res2: Int)Int
scala> rdd.aggregate(zeroValue)(seqOp,combOp)
res40: Int = 21
---------------------------------------------------
Example2: Multiply the RDD (Unit => Int)
---------------------------------------------
Note: replce `Unit` with `Int` all the places
val zeroValue = 1
def seqOp(res: Int, data: Int) : Int = {
res * data
}
def combOp(res1: Int, res2: Int) : Int = {
res1 * res2
}
rdd.aggregate(zeroValue)(seqOp,combOp)
---------------------------------------------------
scala> val zeroValue = 1
zeroValue: Int = 1
scala> def seqOp(res: Int, data: Int) : Int = {
| res * data
| }
seqOp: (res: Int, data: Int)Int
scala> def combOp(res1: Int, res2: Int) : Int = {
| res1 * res2
| }
combOp: (res1: Int, res2: Int)Int
scala> rdd.aggregate(zeroValue)(seqOp,combOp)
res41: Int = 720
---------------------------------------------------
Find the `average` of 1 to 10 using `aggregate` function?
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------
scala> rdd.reduce
<console>:29: error: missing argument list for method reduce in class RDD
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `reduce _` or `reduce(_)` instead of `reduce`.
rdd.reduce
^
scala> rdd.reduce()
<console>:29: error: not enough arguments for method reduce: (f: (Int, Int) => Int)Int.
Unspecified value parameter f.
rdd.reduce()
^
---------------------------------------------------
def f(Int, Int) : Int
---------------------------------------------------
def sum(res: Int, data: Int) : Int = {res + data}
def mul(res: Int, data: Int) : Int = {res * data}
---------------------------------------------------
scala> def sum(res: Int, data: Int) : Int = {res + data}
sum: (res: Int, data: Int)Int
scala> def mul(res: Int, data: Int) : Int = {res * data}
mul: (res: Int, data: Int)Int
scala> rdd.reduce(sum)
res44: Int = 21
scala> rdd.reduce(mul)
res45: Int = 720
---------------------------------------------------
Compare `aggregate` and `reduce`:
-----------------------------------------
aggregate(zeroValue)(seqOp, comOp)
aggregate(intialValue)(seqOp, seqOp)
<==>
reduce(seqOp)
---------------------------------------------------
Transformations in Spark:
----------------------------
scala> rdd.collect.foreach(println)
1
2
3
4
5
6
scala> rdd.map(x => x + 1)
res49: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[15] at map at <console>:29
scala> rdd.map(x => x + 1).collect.foreach(println)
2
3
4
5
6
7
scala> rdd.map(x => x * x).collect.foreach(println)
1
4
9
16
25
36
scala> rdd.map(x => x * x * x).collect.foreach(println)
1
8
27
64
125
216
---------------------------------------------------
scala> rdd.filter(x => x > 3)
res53: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[19] at filter at <console>:29
scala> rdd.filter(x => x > 3).collect.foreach(println)
4
5
6
scala> rdd.filter(x => x % 2 == 1).collect.foreach(println)
1
3
5
scala> rdd.filter(x => x % 2 == 0).collect.foreach(println)
2
4
6
---------------------------------------------------
scala> rdd.filter(x => x % 2 == 0).collect.foreach(println)
2
4
6
scala> rdd.filter(_ % 2 == 0).collect.foreach(println)
2
4
6
scala> rdd.filter(x => x > 3).collect.foreach(println)
4
5
6
scala> rdd.filter(_ > 3).collect.foreach(println)
4
5
6
scala> rdd.filter(_ > 3).collect.foreach(x => println(x))
[Stage 45:> (0 + 4
5
6
scala> rdd.filter(_ > 3).collect.foreach(println)
4
5
6
scala> rdd.filter(_ > 3).collect.foreach(println(_))
4
5
6
---------------------------------------------------
val data = sc.textFile("hdfs:///user/tsipl1059/lakshmi/spark_sample.txt")
data: org.apache.spark.rdd.RDD[String] = hdfs:///user/tsipl1059/lakshmi/spark_sample.txt MapPartitionsRDD[1] at textFile at <console>:24
data.collect()
res0: Array[String] = Array(this is my first spark smaple data., Load data into RDD using scala language., "testing the data ", count the number of lines in a file)
---------------------------------------------------
---------------------------------------------------
val r2 = sc.textFile("/user/tsipl1115/input.txt") =====================> where file located in hdfs and moving that into rdd
---------------------------------------------------
val r1 = sc.textFile("/home/tsipl1115/input.txt")========================> where file located in local and moving that into rdd
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------
---------------------------------------------------
Nice information
ReplyDeleteBig Data and Hadoop Online Training