--------------------------------------------------
Spark Provides Libraries
-> Spark SQL
-> Spark Streaming
-> Spark MLLib
-> Spark GraphX
`Spark Context` -> Entry Point for Spark Operations
From `Spark Context` we can create a Resilient Distributed Datasets (RDD)
RDD Features:
----------------
1. Immutability
2. Lazy Evaluated
3. Cacheable
4. Type Infer
RDD Operations:
--------------------
1. Transformations
<old rdd> ====> <new rdd>
2. Actions
<rdd> ====> <result>
Note:-
Transformations are not going to execute the RDD
Actions are going to execute the RDD
--------------------------------------------------
How to create a RDD ?
We can create a RDD in 2 ways
1. from collections (List, Seq, Seq, ...)
2. from datasets (text file, csv, tsv, json, ...)
--------------------------------------------------
We can Run Spark in 3 ways in command line:
Scala => spark-shell => $SPARK_HOME/bin/spark-shell
Python => pyspark => $SPARK_HOME/bin/pyspark
R => sparkR => $SPARK_HOME/bin/sparkR
--------------------------------------------------
Spark-1.x:
------------
Spark context available as 'sc'
Sql context available as 'sqlContext'
Spark-2.x:
------------
Spark context available as 'sc'
Spark session available as 'spark'
--------------------------------------------------
1. Create a RDD from Collection
-----------------------------------
val nums = List(1,2,3,4,5,6)
val chars = List('a','b','c','d','e','f')
val rdd1 = sc.parallelize(nums)
val rdd2 = sc.parallelize(chars)
--------------------------------------------------
scala> val nums = List(1,2,3,4,5,6)
nums: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val chars = List('a','b','c','d','e','f')
chars: List[Char] = List(a, b, c, d, e, f)
scala> val rdd1 = sc.parallelize(nums)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> val rdd2 = sc.parallelize(chars)
rdd2: org.apache.spark.rdd.RDD[Char] = ParallelCollectionRDD[1] at parallelize at <console>:26
--------------------------------------------------
How to know no.of partitions in RDD:
--------------------------------------
scala> rdd1.partitions.length
res0: Int = 4
scala> rdd1.getNumPartitions
res1: Int = 4
2. Create a RDD from Collection with no.of partitions
------------------------------------------------------
val nums = List(1,2,3,4,5,6)
val chars = List('a','b','c','d','e','f')
val rdd1 = sc.parallelize(nums, 2)
val rdd2 = sc.parallelize(chars, 3)
--------------------------------------------------
scala> val rdd1 = sc.parallelize(nums, 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:26
scala> rdd1.partitions.length
res2: Int = 2
scala> rdd1.getNumPartitions
res3: Int = 2
--------------------------------------------------
scala> rdd1.
++ groupBy saveAsObjectFile
aggregate histogram saveAsTextFile
cache id setName
canEqual intersection sortBy
cartesian isCheckpointed sparkContext
checkpoint isEmpty stats
coalesce iterator stdev
collect keyBy subtract
collectAsync localCheckpoint sum
compute map sumApprox
context mapPartitions take
copy mapPartitionsWithIndex takeAsync
count max takeOrdered
countApprox mean takeSample
countApproxDistinct meanApprox toDF
countAsync min toDS
countByValue name toDebugString
countByValueApprox partitioner toJavaRDD
dependencies partitions toLocalIterator
distinct persist toString
filter pipe top
first preferredLocations treeAggregate
flatMap productArity treeReduce
fold productElement union
foreach productIterator unpersist
foreachAsync productPrefix variance
foreachPartition randomSplit zip
foreachPartitionAsync reduce zipPartitions
getCheckpointFile repartition zipWithIndex
getNumPartitions sample zipWithUniqueId
getStorageLevel sampleStdev
glom sampleVariance
--------------------------------------------------
scala> rdd1.foreach(x => println(x))
1
2
3
4
5
6
scala> rdd1.foreach(println)
4
5
6
1
2
3
Using collect print the RDD data:
------------------------------------------
scala> rdd1.collect
res16: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> rdd1.collect
res17: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> rdd1.collect.foreach(println)
1
2
3
4
5
6
scala> rdd1.collect.foreach(println)
1
2
3
4
5
6
Note:
---------
`rdd.collect` will collect the data from all partitions of RDD into a single machine
don't use `collect` in real-time use cases, reason is memory issues
--------------------------------------------------
scala> rdd1.map(x => x + 1)
res29: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at map at <console>:29
scala> rdd1.map(x => x + 1).foreach(println)
2
3
4
5
6
7
scala> rdd1.map(x => x + 1).collect.foreach(println)
2
3
4
5
6
7
scala> rdd1.map(x => x + 1).collect.foreach(println)
2
3
4
5
6
7
scala> rdd1.map(_ + 1).collect.foreach(println)
2
3
4
5
6
7
scala> rdd1.map(x => x * x).collect.foreach(println)
1
4
9
16
25
36
scala> rdd1.map(_ * _).collect.foreach(println)
<console>:29: error: missing parameter type for expanded function ((x$1, x$2) => x$1.$times(x$2))
rdd1.map(_ * _).collect.foreach(println)
^
<console>:29: error: missing parameter type for expanded function ((x$1: <error>, x$2) => x$1.$times(x$2))
rdd1.map(_ * _).collect.foreach(println)
^
scala> rdd1.filter(x => x > 2).collect.foreach(println)
3
4
5
6
scala> rdd1.filter(_ > 2).collect.foreach(println)
3
4
5
6
--------------------------------------------------
scala> rdd1.min
res38: Int = 1
scala> rdd1.max
res39: Int = 6
scala> rdd1.sum
res40: Double = 21.0
scala> rdd1.count
res41: Long = 6
--------------------------------------------------
scala> rdd1.mapPartitionsWithIndex()
<console>:29: error: not enough arguments for method mapPartitionsWithIndex: (f: (Int, Iterator[Int]) => Iterator[U], preservesPartitioning: Boolean)(implicit evidence$8: scala.reflect.ClassTag[U])org.apache.spark.rdd.RDD[U].
Unspecified value parameter f.
rdd1.mapPartitionsWithIndex()
^
def myfunc1(index: Int, it: Iterator[Int]): Iterator[String] = {
it.toList.map(x => s"index -> $index, value -> $x").toIterator
}
def myfunc2(index: Int, it: Iterator[Char]): Iterator[String] = {
it.toList.map(x => s"index -> $index, value -> $x").toIterator
}
def myfunc[T](index: Int, it: Iterator[T]): Iterator[String] = {
it.toList.map(x => s"index -> $index, value -> $x").toIterator
}
--------------------------------------------------
scala> def myfunc1(index: Int, it: Iterator[Int]): Iterator[String] = {
| it.toList.map(x => s"index -> $index, value -> $x").toIterator
| }
myfunc1: (index: Int, it: Iterator[Int])Iterator[String]
scala> rdd1.mapPartitionsWithIndex(myfunc1).collect.foreach(println)
index -> 0, value -> 1
index -> 0, value -> 2
index -> 0, value -> 3
index -> 1, value -> 4
index -> 1, value -> 5
index -> 1, value -> 6
scala> rdd1.collect
res51: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> rdd2.collect
res52: Array[Char] = Array(a, b, c, d, e, f)
scala> rdd1.mapPartitionsWithIndex(myfunc1).collect.foreach(println)
index -> 0, value -> 1
index -> 0, value -> 2
index -> 0, value -> 3
index -> 1, value -> 4
index -> 1, value -> 5
index -> 1, value -> 6
scala> rdd2.mapPartitionsWithIndex(myfunc1).collect.foreach(println)
<console>:31: error: type mismatch;
found : (Int, Iterator[Int]) => Iterator[String]
required: (Int, Iterator[Char]) => Iterator[?]
rdd2.mapPartitionsWithIndex(myfunc1).collect.foreach(println)
^
scala> def myfunc2(index: Int, it: Iterator[Char]): Iterator[String] = {
| it.toList.map(x => s"index -> $index, value -> $x").toIterator
| }
myfunc2: (index: Int, it: Iterator[Char])Iterator[String]
scala> rdd2.mapPartitionsWithIndex(myfunc2).collect.foreach(println)
index -> 0, value -> a
index -> 1, value -> b
index -> 1, value -> c
index -> 2, value -> d
index -> 3, value -> e
index -> 3, value -> f
scala> rdd2.getNumPartitions
res56: Int = 4
--------------------------------------------------
scala> def myfunc[T](index: Int, it: Iterator[T]): Iterator[String] = {
| it.toList.map(x => s"index -> $index, value -> $x").toIterator
| }
myfunc: [T](index: Int, it: Iterator[T])Iterator[String]
scala> rdd1.mapPartitionsWithIndex(myfunc).collect.foreach(println)
index -> 0, value -> 1
index -> 0, value -> 2
index -> 0, value -> 3
index -> 1, value -> 4
index -> 1, value -> 5
index -> 1, value -> 6
scala> rdd2.mapPartitionsWithIndex(myfunc).collect.foreach(println)
index -> 0, value -> a
index -> 1, value -> b
index -> 1, value -> c
index -> 2, value -> d
index -> 3, value -> e
index -> 3, value -> f
--------------------------------------------------
Aggregate Transformation in RDD:
----------------------------------------
scala> rdd1.aggregate()()
<console>:29: error: not enough arguments for method aggregate: (seqOp: (Unit, Int) => Unit, combOp: (Unit, Unit) => Unit)(implicit evidence$29: scala.reflect.ClassTag[Unit])Unit.
Unspecified value parameters seqOp, combOp.
rdd1.aggregate()()
^
rdd1.aggregate(intialValue)(seqOp, combOp)
def seqOp(Unit, Int): Unit = {
}
def combOp(Unit, Unit): Unit = {
}
--------------------------------------------------
val intialValue = 1
def seqOp(result: Int, data: Int): Int = {
if( result > data) result else data
}
def combOp(result: Int, data: Int): Int = {
result + data
}
rdd1.aggregate(intialValue)(seqOp, combOp)
--------------------------------------------------
scala> val intialValue = 1
intialValue: Int = 1
scala>
scala> def seqOp(result: Int, data: Int): Int = {
| if( result > data) result else data
| }
seqOp: (result: Int, data: Int)Int
scala>
scala> def combOp(result: Int, data: Int): Int = {
| result + data
| }
combOp: (result: Int, data: Int)Int
scala>
scala> rdd1.aggregate(intialValue)(seqOp, combOp)
res60: Int = 10
--------------------------------------------------
def max(a: Int, b: Int): Int = {
if( a > b) a else b
}
def min(a: Int, b: Int): Int = {
if( a < b) a else b
}
def mul(a: Int, b: Int): Int = {
a * b
}
def sum(a: Int, b: Int): Int = {
a + b
}
rdd1.aggregate(1)(min, mul)
--------------------------------------------------
scala> def max(a: Int, b: Int): Int = {
| if( a > b) a else b
| }
max: (a: Int, b: Int)Int
scala>
scala> def min(a: Int, b: Int): Int = {
| if( a < b) a else b
| }
min: (a: Int, b: Int)Int
scala>
scala> def mul(a: Int, b: Int): Int = {
| a * b
| }
mul: (a: Int, b: Int)Int
scala>
scala> def sum(a: Int, b: Int): Int = {
| a + b
| }
sum: (a: Int, b: Int)Int
scala> rdd1.aggregate(1)(min, mul)
res63: Int = 1
--------------------------------------------------
Average using `aggregate`
--------------------------------------------------
val intialValue = (0, 0)
def seqOp(result: (Int,Int), data: Int): (Int,Int) = {
(result._1 + data, result._2 + 1)
}
def combOp(result: (Int,Int), data: (Int,Int)): (Int,Int) = {
(result._1 + data._1, result._2 + data._2)
}
val (sum, count) = rdd1.aggregate(intialValue)(seqOp, combOp)
val avg = sum / count
val avg = sum / count.toDouble
--------------------------------------------------
scala> val intialValue = (0, 0)
intialValue: (Int, Int) = (0,0)
scala> def seqOp(result: (Int,Int), data: Int): (Int,Int) = {
| (result._1 + data, result._2 + 1)
| }
seqOp: (result: (Int, Int), data: Int)(Int, Int)
scala> def combOp(result: (Int,Int), data: (Int,Int)): (Int,Int) = {
| (result._1 + data._1, result._2 + data._2)
| }
combOp: (result: (Int, Int), data: (Int, Int))(Int, Int)
scala> val (sum, count) = rdd1.aggregate(intialValue)(seqOp, combOp)
sum: Int = 21
count: Int = 6
scala> val avg = sum / count
avg: Int = 3
scala> val avg = sum / count.toDouble
avg: Double = 3.5
NOte: Aggregate operation need `operator` has to follow below points
1. associative law
2. cummutive law
--------------------------------------------------
--------------------------------------------------
--------------------------------------------------
Spark Provides Libraries
-> Spark SQL
-> Spark Streaming
-> Spark MLLib
-> Spark GraphX
`Spark Context` -> Entry Point for Spark Operations
From `Spark Context` we can create a Resilient Distributed Datasets (RDD)
RDD Features:
----------------
1. Immutability
2. Lazy Evaluated
3. Cacheable
4. Type Infer
RDD Operations:
--------------------
1. Transformations
<old rdd> ====> <new rdd>
2. Actions
<rdd> ====> <result>
Note:-
Transformations are not going to execute the RDD
Actions are going to execute the RDD
--------------------------------------------------
How to create a RDD ?
We can create a RDD in 2 ways
1. from collections (List, Seq, Seq, ...)
2. from datasets (text file, csv, tsv, json, ...)
--------------------------------------------------
We can Run Spark in 3 ways in command line:
Scala => spark-shell => $SPARK_HOME/bin/spark-shell
Python => pyspark => $SPARK_HOME/bin/pyspark
R => sparkR => $SPARK_HOME/bin/sparkR
--------------------------------------------------
Spark-1.x:
------------
Spark context available as 'sc'
Sql context available as 'sqlContext'
Spark-2.x:
------------
Spark context available as 'sc'
Spark session available as 'spark'
--------------------------------------------------
1. Create a RDD from Collection
-----------------------------------
val nums = List(1,2,3,4,5,6)
val chars = List('a','b','c','d','e','f')
val rdd1 = sc.parallelize(nums)
val rdd2 = sc.parallelize(chars)
--------------------------------------------------
scala> val nums = List(1,2,3,4,5,6)
nums: List[Int] = List(1, 2, 3, 4, 5, 6)
scala> val chars = List('a','b','c','d','e','f')
chars: List[Char] = List(a, b, c, d, e, f)
scala> val rdd1 = sc.parallelize(nums)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> val rdd2 = sc.parallelize(chars)
rdd2: org.apache.spark.rdd.RDD[Char] = ParallelCollectionRDD[1] at parallelize at <console>:26
--------------------------------------------------
How to know no.of partitions in RDD:
--------------------------------------
scala> rdd1.partitions.length
res0: Int = 4
scala> rdd1.getNumPartitions
res1: Int = 4
2. Create a RDD from Collection with no.of partitions
------------------------------------------------------
val nums = List(1,2,3,4,5,6)
val chars = List('a','b','c','d','e','f')
val rdd1 = sc.parallelize(nums, 2)
val rdd2 = sc.parallelize(chars, 3)
--------------------------------------------------
scala> val rdd1 = sc.parallelize(nums, 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:26
scala> rdd1.partitions.length
res2: Int = 2
scala> rdd1.getNumPartitions
res3: Int = 2
--------------------------------------------------
scala> rdd1.
++ groupBy saveAsObjectFile
aggregate histogram saveAsTextFile
cache id setName
canEqual intersection sortBy
cartesian isCheckpointed sparkContext
checkpoint isEmpty stats
coalesce iterator stdev
collect keyBy subtract
collectAsync localCheckpoint sum
compute map sumApprox
context mapPartitions take
copy mapPartitionsWithIndex takeAsync
count max takeOrdered
countApprox mean takeSample
countApproxDistinct meanApprox toDF
countAsync min toDS
countByValue name toDebugString
countByValueApprox partitioner toJavaRDD
dependencies partitions toLocalIterator
distinct persist toString
filter pipe top
first preferredLocations treeAggregate
flatMap productArity treeReduce
fold productElement union
foreach productIterator unpersist
foreachAsync productPrefix variance
foreachPartition randomSplit zip
foreachPartitionAsync reduce zipPartitions
getCheckpointFile repartition zipWithIndex
getNumPartitions sample zipWithUniqueId
getStorageLevel sampleStdev
glom sampleVariance
--------------------------------------------------
scala> rdd1.foreach(x => println(x))
1
2
3
4
5
6
scala> rdd1.foreach(println)
4
5
6
1
2
3
Using collect print the RDD data:
------------------------------------------
scala> rdd1.collect
res16: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> rdd1.collect
res17: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> rdd1.collect.foreach(println)
1
2
3
4
5
6
scala> rdd1.collect.foreach(println)
1
2
3
4
5
6
Note:
---------
`rdd.collect` will collect the data from all partitions of RDD into a single machine
don't use `collect` in real-time use cases, reason is memory issues
--------------------------------------------------
scala> rdd1.map(x => x + 1)
res29: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at map at <console>:29
scala> rdd1.map(x => x + 1).foreach(println)
2
3
4
5
6
7
scala> rdd1.map(x => x + 1).collect.foreach(println)
2
3
4
5
6
7
scala> rdd1.map(x => x + 1).collect.foreach(println)
2
3
4
5
6
7
scala> rdd1.map(_ + 1).collect.foreach(println)
2
3
4
5
6
7
scala> rdd1.map(x => x * x).collect.foreach(println)
1
4
9
16
25
36
scala> rdd1.map(_ * _).collect.foreach(println)
<console>:29: error: missing parameter type for expanded function ((x$1, x$2) => x$1.$times(x$2))
rdd1.map(_ * _).collect.foreach(println)
^
<console>:29: error: missing parameter type for expanded function ((x$1: <error>, x$2) => x$1.$times(x$2))
rdd1.map(_ * _).collect.foreach(println)
^
scala> rdd1.filter(x => x > 2).collect.foreach(println)
3
4
5
6
scala> rdd1.filter(_ > 2).collect.foreach(println)
3
4
5
6
--------------------------------------------------
scala> rdd1.min
res38: Int = 1
scala> rdd1.max
res39: Int = 6
scala> rdd1.sum
res40: Double = 21.0
scala> rdd1.count
res41: Long = 6
--------------------------------------------------
scala> rdd1.mapPartitionsWithIndex()
<console>:29: error: not enough arguments for method mapPartitionsWithIndex: (f: (Int, Iterator[Int]) => Iterator[U], preservesPartitioning: Boolean)(implicit evidence$8: scala.reflect.ClassTag[U])org.apache.spark.rdd.RDD[U].
Unspecified value parameter f.
rdd1.mapPartitionsWithIndex()
^
def myfunc1(index: Int, it: Iterator[Int]): Iterator[String] = {
it.toList.map(x => s"index -> $index, value -> $x").toIterator
}
def myfunc2(index: Int, it: Iterator[Char]): Iterator[String] = {
it.toList.map(x => s"index -> $index, value -> $x").toIterator
}
def myfunc[T](index: Int, it: Iterator[T]): Iterator[String] = {
it.toList.map(x => s"index -> $index, value -> $x").toIterator
}
--------------------------------------------------
scala> def myfunc1(index: Int, it: Iterator[Int]): Iterator[String] = {
| it.toList.map(x => s"index -> $index, value -> $x").toIterator
| }
myfunc1: (index: Int, it: Iterator[Int])Iterator[String]
scala> rdd1.mapPartitionsWithIndex(myfunc1).collect.foreach(println)
index -> 0, value -> 1
index -> 0, value -> 2
index -> 0, value -> 3
index -> 1, value -> 4
index -> 1, value -> 5
index -> 1, value -> 6
scala> rdd1.collect
res51: Array[Int] = Array(1, 2, 3, 4, 5, 6)
scala> rdd2.collect
res52: Array[Char] = Array(a, b, c, d, e, f)
scala> rdd1.mapPartitionsWithIndex(myfunc1).collect.foreach(println)
index -> 0, value -> 1
index -> 0, value -> 2
index -> 0, value -> 3
index -> 1, value -> 4
index -> 1, value -> 5
index -> 1, value -> 6
scala> rdd2.mapPartitionsWithIndex(myfunc1).collect.foreach(println)
<console>:31: error: type mismatch;
found : (Int, Iterator[Int]) => Iterator[String]
required: (Int, Iterator[Char]) => Iterator[?]
rdd2.mapPartitionsWithIndex(myfunc1).collect.foreach(println)
^
scala> def myfunc2(index: Int, it: Iterator[Char]): Iterator[String] = {
| it.toList.map(x => s"index -> $index, value -> $x").toIterator
| }
myfunc2: (index: Int, it: Iterator[Char])Iterator[String]
scala> rdd2.mapPartitionsWithIndex(myfunc2).collect.foreach(println)
index -> 0, value -> a
index -> 1, value -> b
index -> 1, value -> c
index -> 2, value -> d
index -> 3, value -> e
index -> 3, value -> f
scala> rdd2.getNumPartitions
res56: Int = 4
--------------------------------------------------
scala> def myfunc[T](index: Int, it: Iterator[T]): Iterator[String] = {
| it.toList.map(x => s"index -> $index, value -> $x").toIterator
| }
myfunc: [T](index: Int, it: Iterator[T])Iterator[String]
scala> rdd1.mapPartitionsWithIndex(myfunc).collect.foreach(println)
index -> 0, value -> 1
index -> 0, value -> 2
index -> 0, value -> 3
index -> 1, value -> 4
index -> 1, value -> 5
index -> 1, value -> 6
scala> rdd2.mapPartitionsWithIndex(myfunc).collect.foreach(println)
index -> 0, value -> a
index -> 1, value -> b
index -> 1, value -> c
index -> 2, value -> d
index -> 3, value -> e
index -> 3, value -> f
--------------------------------------------------
Aggregate Transformation in RDD:
----------------------------------------
scala> rdd1.aggregate()()
<console>:29: error: not enough arguments for method aggregate: (seqOp: (Unit, Int) => Unit, combOp: (Unit, Unit) => Unit)(implicit evidence$29: scala.reflect.ClassTag[Unit])Unit.
Unspecified value parameters seqOp, combOp.
rdd1.aggregate()()
^
rdd1.aggregate(intialValue)(seqOp, combOp)
def seqOp(Unit, Int): Unit = {
}
def combOp(Unit, Unit): Unit = {
}
--------------------------------------------------
val intialValue = 1
def seqOp(result: Int, data: Int): Int = {
if( result > data) result else data
}
def combOp(result: Int, data: Int): Int = {
result + data
}
rdd1.aggregate(intialValue)(seqOp, combOp)
--------------------------------------------------
scala> val intialValue = 1
intialValue: Int = 1
scala>
scala> def seqOp(result: Int, data: Int): Int = {
| if( result > data) result else data
| }
seqOp: (result: Int, data: Int)Int
scala>
scala> def combOp(result: Int, data: Int): Int = {
| result + data
| }
combOp: (result: Int, data: Int)Int
scala>
scala> rdd1.aggregate(intialValue)(seqOp, combOp)
res60: Int = 10
--------------------------------------------------
def max(a: Int, b: Int): Int = {
if( a > b) a else b
}
def min(a: Int, b: Int): Int = {
if( a < b) a else b
}
def mul(a: Int, b: Int): Int = {
a * b
}
def sum(a: Int, b: Int): Int = {
a + b
}
rdd1.aggregate(1)(min, mul)
--------------------------------------------------
scala> def max(a: Int, b: Int): Int = {
| if( a > b) a else b
| }
max: (a: Int, b: Int)Int
scala>
scala> def min(a: Int, b: Int): Int = {
| if( a < b) a else b
| }
min: (a: Int, b: Int)Int
scala>
scala> def mul(a: Int, b: Int): Int = {
| a * b
| }
mul: (a: Int, b: Int)Int
scala>
scala> def sum(a: Int, b: Int): Int = {
| a + b
| }
sum: (a: Int, b: Int)Int
scala> rdd1.aggregate(1)(min, mul)
res63: Int = 1
--------------------------------------------------
Average using `aggregate`
--------------------------------------------------
val intialValue = (0, 0)
def seqOp(result: (Int,Int), data: Int): (Int,Int) = {
(result._1 + data, result._2 + 1)
}
def combOp(result: (Int,Int), data: (Int,Int)): (Int,Int) = {
(result._1 + data._1, result._2 + data._2)
}
val (sum, count) = rdd1.aggregate(intialValue)(seqOp, combOp)
val avg = sum / count
val avg = sum / count.toDouble
--------------------------------------------------
scala> val intialValue = (0, 0)
intialValue: (Int, Int) = (0,0)
scala> def seqOp(result: (Int,Int), data: Int): (Int,Int) = {
| (result._1 + data, result._2 + 1)
| }
seqOp: (result: (Int, Int), data: Int)(Int, Int)
scala> def combOp(result: (Int,Int), data: (Int,Int)): (Int,Int) = {
| (result._1 + data._1, result._2 + data._2)
| }
combOp: (result: (Int, Int), data: (Int, Int))(Int, Int)
scala> val (sum, count) = rdd1.aggregate(intialValue)(seqOp, combOp)
sum: Int = 21
count: Int = 6
scala> val avg = sum / count
avg: Int = 3
scala> val avg = sum / count.toDouble
avg: Double = 3.5
NOte: Aggregate operation need `operator` has to follow below points
1. associative law
2. cummutive law
--------------------------------------------------
--------------------------------------------------
--------------------------------------------------
good blog
ReplyDeleteBig Data and Hadoop Online Training