Monday 23 September 2019

Spark_Day2_2

---------------------------------------------------
Find the `average` of 1 to 10 using `aggregate` function?
---------------------------------------------------
val list = List(1,2,3,4,5,6,7,8,9,10)
val rdd = sc.parallelize(list, 2)

---------------------------------------------------
val zeroValue = _

def seqOp(Unit, Int) : Unit = {}

def combOp(Unit, Unit) : Unit = {}

Note: `Unit` is the mainly expected `Return Type`

---------------------------------------------------

Note: replce `Unit` with `(Int, Int)` all the places


val zeroValue = (0, 0)

def seqOp(res: (Int, Int), data: Int) : (Int, Int) = {
 (res._1 + data, res._2 + 1)
}

def combOp(res1: (Int, Int), res2: (Int, Int)) : (Int, Int) = {
 (res1._1 + res2._1, res1._2 + res2._2)
}

val tuple = rdd.aggregate(zeroValue)(seqOp,combOp)

val avg = tuple._1 / tuple._2

val avg = tuple._1.toDouble / tuple._2

---------------------------------------------------

scala> val list = List(1,2,3,4,5,6,7,8,9,10)
list: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> val rdd = sc.parallelize(list, 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> val zeroValue = (0, 0)
zeroValue: (Int, Int) = (0,0)

scala> def seqOp(res: (Int, Int), data: Int) : (Int, Int) = {
     |  (res._1 + data, res._2 + 1)
     | }
seqOp: (res: (Int, Int), data: Int)(Int, Int)

scala> def combOp(res1: (Int, Int), res2: (Int, Int)) : (Int, Int) = {
     |  (res1._1 + res2._1, res1._2 + res2._2)
     | }
combOp: (res1: (Int, Int), res2: (Int, Int))(Int, Int)

scala> val tuple = rdd.aggregate(zeroValue)(seqOp,combOp)
tuple: (Int, Int) = (55,10)

scala> val avg = tuple._1 / tuple._2
avg: Int = 5

scala> val avg = tuple._1.toDouble / tuple._2
avg: Double = 5.5


---------------------------------------------------
Note: For doing `aggregate` operation, operator need to follow `associative law and cummitive law`
---------------------------------------------------

val rdd1 = sc.parallelize(List(1,2,3,4,5,6), 2)
val rdd2 = sc.parallelize(List(1,2,1,3,2,4,4), 2)

---------------------------------------------------
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List(1,2,1,3,2,4,4), 2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

---------------------------------------------------
scala> rdd1.count
res0: Long = 6

scala> rdd1.first
res1: Int = 1

scala> rdd1.min
res2: Int = 1

scala> rdd1.max
res3: Int = 6

scala> rdd1.sum
res4: Double = 21.0


scala> rdd1.foreach(println)
1
4
5
6
2
3

scala> rdd1.collect
res7: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> rdd1.collect.foreach(println)
1
2
3
4
5
6

---------------------------------------------------
Importance of `coalesce` and `repartition`
---------------------------------------------------
1. use `coalesce` we can only decrease the partition number

2. use `repartition` we can increase or decrease the partition number

3. in `coalesce` bydefault `shuffle` value is `false`

4. in `repartition` always `shuffle` value is `true`

5. try to use `coalesce`, instead of `repartition` to avoid `shuffle` operation


---------------------------------------------------

scala> rdd1.coalesce(3)
res9: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[4] at coalesce at <console>:27

scala> rdd1.coalesce(3).getNumPartitions
res10: Int = 2

scala> rdd1.coalesce(1).getNumPartitions
res11: Int = 1

scala> rdd1.repartition(3)
res12: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[10] at repartition at <console>:27

scala> rdd1.repartition(3).getNumPartitions
res13: Int = 3

scala> rdd1.repartition(1).getNumPartitions
res14: Int = 1

---------------------------------------------------

scala> rdd2.collect
res18: Array[Int] = Array(1, 2, 1, 3, 2, 4, 4)

scala> rdd2.distinct
res19: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[21] at distinct at <console>:27

scala> rdd2.distinct.collect
res20: Array[Int] = Array(4, 2, 1, 3)

scala> rdd2.distinct.getNumPartitions
res22: Int = 2

scala> rdd2.distinct(1).getNumPartitions
res23: Int = 1

scala> rdd2.distinct(3).getNumPartitions
res24: Int = 3

---------------------------------------------------
scala> rdd2.collect
res25: Array[Int] = Array(1, 2, 1, 3, 2, 4, 4)

scala> rdd2.sortBy(x => x)
res26: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[41] at sortBy at <console>:27

scala> rdd2.sortBy(x => x).collect
res27: Array[Int] = Array(1, 1, 2, 2, 3, 4, 4)

scala> rdd2.sortBy(x => -x).collect
res28: Array[Int] = Array(4, 4, 3, 2, 2, 1, 1)

---------------------------------------------------

scala> rdd1.collect
res31: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> rdd1.keyBy(x => x)
res32: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[52] at keyBy at <console>:27

scala> rdd1.keyBy(x => x).collect
res33: Array[(Int, Int)] = Array((1,1), (2,2), (3,3), (4,4), (5,5), (6,6))

scala> rdd2.collect
res34: Array[Int] = Array(1, 2, 1, 3, 2, 4, 4)

scala> rdd2.keyBy(x => x).collect
res35: Array[(Int, Int)] = Array((1,1), (2,2), (1,1), (3,3), (2,2), (4,4), (4,4))

---------------------------------------------------

scala> rdd1.keyBy(x => x).collect
res36: Array[(Int, Int)] = Array((1,1), (2,2), (3,3), (4,4), (5,5), (6,6))

scala> rdd1.keyBy(x => x * x).collect
res37: Array[(Int, Int)] = Array((1,1), (4,2), (9,3), (16,4), (25,5), (36,6))

scala> rdd1.map(x => (x * x, x)).collect
res38: Array[(Int, Int)] = Array((1,1), (4,2), (9,3), (16,4), (25,5), (36,6))

---------------------------------------------------
scala> rdd1.countByValue()
res40: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 1, 6 -> 1, 2 -> 1, 3 -> 1, 4 -> 1)

scala> rdd2.countByValue()
res41: scala.collection.Map[Int,Long] = Map(4 -> 2, 2 -> 2, 1 -> 2, 3 -> 1)

---------------------------------------------------

scala> rdd1.collect
res42: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> rdd2.collect
res43: Array[Int] = Array(1, 2, 1, 3, 2, 4, 4)

scala> rdd1.union(rdd2)
res44: org.apache.spark.rdd.RDD[Int] = UnionRDD[64] at union at <console>:29

scala> rdd1.union(rdd2).collect
res45: Array[Int] = Array(1, 2, 3, 4, 5, 6, 1, 2, 1, 3, 2, 4, 4)

scala> rdd1.intersection(rdd2).collect
res46: Array[Int] = Array(4, 2, 1, 3)

scala> rdd1.subtract(rdd2)
res48: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[75] at subtract at <console>:29

scala> rdd1.subtract(rdd2).collect
res49: Array[Int] = Array(6, 5)

scala> rdd2.subtract(rdd1).collect
res50: Array[Int] = Array()

---------------------------------------------------

scala> rdd2.take(4)
res52: Array[Int] = Array(1, 2, 1, 3)

scala> rdd2.takeOrdered(4)
res53: Array[Int] = Array(1, 1, 2, 2)

scala> rdd2.collect
res54: Array[Int] = Array(1, 2, 1, 3, 2, 4, 4)

---------------------------------------------------
val lines = List("I am going", "to hyd", "I am learning", "spark course")

val rdd3 = sc.parallelize(lines, 1)

---------------------------------------------------
scala> val lines = List("I am going", "to hyd", "I am learning", "spark course")
lines: List[String] = List(I am going, to hyd, I am learning, spark course)

scala> val rdd3 = sc.parallelize(lines, 1)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[86] at parallelize at <console>:26


---------------------------------------------------
Importance of `FlatMap`
---------------------------------------------------
scala> rdd3.flatMap(x => x)
res57: org.apache.spark.rdd.RDD[Char] = MapPartitionsRDD[87] at flatMap at <console>:29

scala> rdd3.flatMap(x => x).collect
res58: Array[Char] = Array(I,  , a, m,  , g, o, i, n, g, t, o,  , h, y, d, I,  , a, m,  , l, e, a, r, n, i, n, g, s, p, a, r, k,  , c, o, u, r, s, e)

scala> rdd3.flatMap(x => x.split(" "))
res59: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[89] at flatMap at <console>:29

scala> rdd3.flatMap(x => x.split(" ")).collect
res60: Array[String] = Array(I, am, going, to, hyd, I, am, learning, spark, course)

scala> rdd3.flatMap(x => x.split(" ")).countByValue().foreach(println)
(course,1)
(am,2)
(going,1)
(I,2)
(hyd,1)
(to,1)
(spark,1)
(learning,1)

---------------------------------------------------

val llines = List(List("I am going"), List("to hyd"), List("I am learning"), List("spark course"))

val rdd4 = sc.parallelize(llines, 1)

---------------------------------------------------
scala> val llines = List(List("I am going"), List("to hyd"), List("I am learning"), List("spark course"))
llines: List[List[String]] = List(List(I am going), List(to hyd), List(I am learning), List(spark course))

scala> val rdd4 = sc.parallelize(llines, 1)
rdd4: org.apache.spark.rdd.RDD[List[String]] = ParallelCollectionRDD[99] at parallelize at <console>:26

scala> rdd4.flatMap(x => x)
res65: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[100] at flatMap at <console>:29

scala> rdd4.flatMap(x => x).collect
res66: Array[String] = Array(I am going, to hyd, I am learning, spark course)

---------------------------------------------------
scala> rdd1.cartesian(rdd2)
res67: org.apache.spark.rdd.RDD[(Int, Int)] = CartesianRDD[102] at cartesian at <console>:29

scala> rdd1.cartesian(rdd2).collect
res68: Array[(Int, Int)] = Array((1,1), (1,2), (1,1), (2,1), (2,2), (2,1), (3,1), (3,2), (3,1), (1,3), (1,2), (1,4), (1,4), (2,3), (2,2), (2,4), (2,4), (3,3), (3,2), (3,4), (3,4), (4,1), (4,2), (4,1), (5,1), (5,2), (5,1), (6,1), (6,2), (6,1), (4,3), (4,2), (4,4), (4,4), (5,3), (5,2), (5,4), (5,4), (6,3), (6,2), (6,4), (6,4))

scala> rdd1.cartesian(rdd2).count
res69: Long = 42

---------------------------------------------------
scala> rdd1.context
res70: org.apache.spark.SparkContext = org.apache.spark.SparkContext@6780f4f2

scala> rdd1.dependencies
res71: Seq[org.apache.spark.Dependency[_]] = List()


---------------------------------------------------
Importance of `fold`
---------------------------------------------------

scala> rdd1.fold()()
<console>:27: error: not enough arguments for method fold: (zeroValue: Int)(op: (Int, Int) => Int)Int.
Unspecified value parameter zeroValue.
       rdd1.fold()()
                ^

---------------------------------------------------
Compare `aggregate` and `fold`:
-----------------------------------------

aggregate(zeroValue)(seqOp, comOp)


aggregate(intialValue)(seqOp, seqOp)

<==>

fold(intialValue)(seqOp)

---------------------------------------------------

val zeroValue = 1

def seqOp(res: Int, data: Int) : Int = {
 res * data
}

rdd1.fold(zeroValue)(seqOp)

---------------------------------------------------
scala> val zeroValue = 1
zeroValue: Int = 1

scala> def seqOp(res: Int, data: Int) : Int = {
     |  res * data
     | }
seqOp: (res: Int, data: Int)Int

scala> rdd1.fold(zeroValue)(seqOp)
res73: Int = 720


---------------------------------------------------
scala> rdd1.glom()
res74: org.apache.spark.rdd.RDD[Array[Int]] = MapPartitionsRDD[105] at glom at <console>:27

scala> rdd1.glom().collect
res75: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6))

scala> rdd1.collect
res76: Array[Int] = Array(1, 2, 3, 4, 5, 6)

---------------------------------------------------
scala> rdd1.groupBy(x => x)
res77: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[108] at groupBy at <console>:27

scala> rdd1.groupBy(x => x).collect
res78: Array[(Int, Iterable[Int])] = Array((4,CompactBuffer(4)), (6,CompactBuffer(6)), (2,CompactBuffer(2)), (1,CompactBuffer(1)), (3,CompactBuffer(3)), (5,CompactBuffer(5)))

scala> rdd2.groupBy(x => x).collect
res79: Array[(Int, Iterable[Int])] = Array((4,CompactBuffer(4, 4)), (2,CompactBuffer(2, 2)), (1,CompactBuffer(1, 1)), (3,CompactBuffer(3)))


---------------------------------------------------
scala> rdd1.partitions.length
res80: Int = 2

scala> rdd1.getNumPartitions
res81: Int = 2

scala> rdd1.mean
res82: Double = 3.5

---------------------------------------------------

scala> rdd1.pipe("head -n 1")
res85: org.apache.spark.rdd.RDD[String] = PipedRDD[116] at pipe at <console>:27

scala> rdd1.pipe("head -n 1").collect
res86: Array[String] = Array(1, 4)

scala> rdd1.pipe("head -n 2").collect
res87: Array[String] = Array(1, 2, 4, 5)

scala> rdd1.pipe("head -n 3").collect
res88: Array[String] = Array(1, 2, 3, 4, 5, 6)


---------------------------------------------------
scala> rdd1.sample(true, 0.1, 3).collect
res92: Array[Int] = Array(1)

scala> rdd1.sample(true, 0.5, 3).collect
res93: Array[Int] = Array()

scala> rdd1.sample(true, 0.5, 3).collect
res94: Array[Int] = Array()

scala> rdd1.sample(true, 0.5, 7).collect
res95: Array[Int] = Array(1, 4, 5)

scala> rdd1.sample(true, 0.5, 7).collect
res96: Array[Int] = Array(1, 4, 5)

scala> rdd1.sample(true, 0.5, 7).collect
res97: Array[Int] = Array(1, 4, 5)

scala> rdd1.sample(true, 0.9, 7).collect
res98: Array[Int] = Array(1, 4, 4, 4, 5, 6, 6)

scala> rdd1.sample(true, 0.9, 7).collect
res99: Array[Int] = Array(1, 4, 4, 4, 5, 6, 6)


---------------------------------------------------

scala> rdd1.id
res100: Int = 1

scala> rdd1.name
res101: String = null

scala> rdd1.setName("kalyan")
res102: rdd1.type = kalyan ParallelCollectionRDD[1] at parallelize at <console>:24

scala> rdd1.name
res103: String = kalyan

scala> rdd1.setName("kalyan spark")
res104: rdd1.type = kalyan spark ParallelCollectionRDD[1] at parallelize at <console>:24

scala> rdd1.name
res105: String = kalyan spark

---------------------------------------------------
Importance of `sortBy`
---------------------------------------------------
scala> val rdd5 = rdd3.flatMap(x => x.split(" "))
rdd5: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[130] at flatMap at <console>:28

scala> val rdd5 = rdd3.flatMap(_.split(" "))
rdd5: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[131] at flatMap at <console>:28

scala> rdd5.collect
res107: Array[String] = Array(I, am, going, to, hyd, I, am, learning, spark, course)

---------------------------------------------------

scala> rdd5.sortBy(x => x)
res108: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[134] at sortBy at <console>:31

scala> rdd5.sortBy(x => x).collect
res109: Array[String] = Array(I, I, am, am, course, going, hyd, learning, spark, to)

scala> rdd5.sortBy(x => -x).collect
<console>:31: error: value unary_- is not a member of String
Error occurred in an application involving default arguments.
       rdd5.sortBy(x => -x).collect
                        ^
<console>:31: error: diverging implicit expansion for type Ordering[K]
starting with method Tuple9 in object Ordering
Error occurred in an application involving default arguments.
       rdd5.sortBy(x => -x).collect
                  ^

---------------------------------------------------

scala> rdd5.sortBy(x => x.reverse).collect
res111: Array[String] = Array(I, I, hyd, course, learning, going, spark, am, am, to)

scala> rdd5.sortBy(x => x.indexOf).collect
indexOf   indexOfSlice

scala> rdd5.sortBy(x => x.indexOf(0)).collect
res112: Array[String] = Array(I, am, going, to, hyd, I, am, learning, spark, course)

scala> rdd5.sortBy(x => x.indexOf(1)).collect
res113: Array[String] = Array(I, am, going, to, hyd, I, am, learning, spark, course)

scala> rdd5.sortBy(x => x.indexOf(3)).collect
res114: Array[String] = Array(I, am, going, to, hyd, I, am, learning, spark, course)

---------------------------------------------------
Imporatance of `zip`
---------------------------------------------------
In Scala:
-------------
val l1 = List(1,2,3,4,5,6)
val l2 = List('a','b','c','d')
val l3 = List("raj", "venkat", "ravi", "anil")

scala> l1.zip(l2).foreach(println)
(1,a)
(2,b)
(3,c)
(4,d)

scala> l2.zip(l1).foreach(println)
(a,1)
(b,2)
(c,3)
(d,4)

scala> l2.zip(l3).foreach(println)
(a,raj)
(b,venkat)
(c,ravi)
(d,anil)

scala> l3.zip(l2).foreach(println)
(raj,a)
(venkat,b)
(ravi,c)
(anil,d)


In Spark:
-------------
val r1 = sc.parallelize(List(1,2,3,4,5,6),1)
val r2 = sc.parallelize(List('a','b','c','d'),1)
val r3 = sc.parallelize(List("raj", "venkat", "ravi", "anil"),1)

---------------------------------------------------

r1.zip(r2).foreach(println)
r2.zip(r1).foreach(println)
r2.zip(r3).foreach(println)
r3.zip(r2).foreach(println)

---------------------------------------------------
Note: Can only zip RDDs with same number of elements in each partition

The below 2 statements are going to fail:
----------------------------------------------
r1.zip(r2).foreach(println)
r2.zip(r1).foreach(println)

The below 2 statements are going to work:
----------------------------------------------
r2.zip(r3).foreach(println)
r3.zip(r2).foreach(println)

scala> r2.zip(r3).foreach(println)
(a,raj)
(b,venkat)
(c,ravi)
(d,anil)

scala> r3.zip(r2).foreach(println)
(raj,a)
(venkat,b)
(ravi,c)
(anil,d)
---------------------------------------------------

scala> rdd1.++(rdd2).collect
res127: Array[Int] = Array(1, 2, 3, 4, 5, 6, 1, 2, 1, 3, 2, 4, 4)

scala> rdd1.union(rdd2).collect
res128: Array[Int] = Array(1, 2, 3, 4, 5, 6, 1, 2, 1, 3, 2, 4, 4)

scala> rdd1.union(rdd2).distinct.collect
res129: Array[Int] = Array(4, 1, 5, 6, 2, 3)


---------------------------------------------------
 RDD Types:
 ---------------
 1. Normal RDD
 2. Pair RDD

---------------------------------------------------
Importane of `Pair RDD`
---------------------------------------------------
val r1 = sc.parallelize(List(1,2,3,4,5,6,7,8),1)
val r2 = sc.parallelize(List("raj", "venkat", "ravi", "anil", "rajesh", "dev", "kalyan", "raju"),1)

---------------------------------------------------
scala> val r1 = sc.parallelize(List(1,2,3,4,5,6,7,8),1)
r1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[166] at parallelize at <console>:24

scala> val r2 = sc.parallelize(List("raj", "venkat", "ravi", "anil", "rajesh", "dev", "kalyan", "raju"),1)
r2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[167] at parallelize at <console>:24

scala> r1.zip(r2)
res130: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[168] at zip at <console>:29

scala> val prdd1 = r1.zip(r2)
prdd1: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[169] at zip at <console>:28

scala> prdd1.collect
res131: Array[(Int, String)] = Array((1,raj), (2,venkat), (3,ravi), (4,anil), (5,rajesh), (6,dev), (7,kalyan), (8,raju))

scala> prdd1.collect.foreach(println)
(1,raj)
(2,venkat)
(3,ravi)
(4,anil)
(5,rajesh)
(6,dev)
(7,kalyan)
(8,raju)

---------------------------------------------------
val ltuples = List((1,"raj"), (2,"venkat"), (3,"ravi"), (4,"anil"), (5,"rajesh"), (6,"dev"), (7,"kalyan"), (8,"raju"))

val prdd1 = sc.parallelize(ltuples, 2)

---------------------------------------------------
scala> val ltuples = List((1,"raj"), (2,"venkat"), (3,"ravi"), (4,"anil"), (5,"rajesh"), (6,"dev"), (7,"kalyan"), (8,"raju"))
ltuples: List[(Int, String)] = List((1,raj), (2,venkat), (3,ravi), (4,anil), (5,rajesh), (6,dev), (7,kalyan), (8,raju))

scala> val prdd1 = sc.parallelize(ltuples, 2)
prdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[170] at parallelize at <console>:26

---------------------------------------------------
Importance of `aggregateByKey`
---------------------------------------------------

scala> prdd1.aggregateByKey
<console>:29: error: ambiguous reference to overloaded definition,
both method aggregateByKey in class PairRDDFunctions of type [U](zeroValue: U, numPartitions: Int)(seqOp: (U, String) => U, combOp: (U, U) => U)(implicit evidence$2: scala.reflect.ClassTag[U])org.apache.spark.rdd.RDD[(Int, U)]
and  method aggregateByKey in class PairRDDFunctions of type [U](zeroValue: U, partitioner: org.apache.spark.Partitioner)(seqOp: (U, String) => U, combOp: (U, U) => U)(implicit evidence$1: scala.reflect.ClassTag[U])org.apache.spark.rdd.RDD[(Int, U)]
match expected type ?
       prdd1.aggregateByKey
             ^

---------------------------------------------------
def aggregateByKey(zeroValue: U, numPartitions: Int)(seqOp: (U, String) => U, combOp: (U, U) => U)
---------------------------------------------------
val ltuples1 = List(("raj",1), ("venkat",4), ("anil",2), ("anil",5), ("kalyan",2), ("raj",4), ("kalyan",6), ("raju",4))

val prdd2 = sc.parallelize(ltuples1, 2)

---------------------------------------------------
scala> prdd2.aggregateByKey
<console>:29: error: ambiguous reference to overloaded definition,
both method aggregateByKey in class PairRDDFunctions of type [U](zeroValue: U, numPartitions: Int)(seqOp: (U, Int) => U, combOp: (U, U) => U)(implicit evidence$2: scala.reflect.ClassTag[U])org.apache.spark.rdd.RDD[(String, U)]
and  method aggregateByKey in class PairRDDFunctions of type [U](zeroValue: U, partitioner: org.apache.spark.Partitioner)(seqOp: (U, Int) => U, combOp: (U, U) => U)(implicit evidence$1: scala.reflect.ClassTag[U])org.apache.spark.rdd.RDD[(String, U)]
match expected type ?
       prdd2.aggregateByKey
             ^



---------------------------------------------------
def aggregateByKey(zeroValue: U, numPartitions: Int)(seqOp: (U, Int) => U, combOp: (U, U) => U)
---------------------------------------------------
val zeroValue = U
def seqOp(U, Int) : U = {}
def combOp(U, U) : U = {}
---------------------------------------------------
Note: `Unit` => Int to sum the values

val zeroValue = 0
def seqOp(Int, Int) : Int = {}
def combOp(Int, Int) : Int = {}

---------------------------------------------------
val zeroValue = 0

def seqOp(res: Int, data: Int) : Int = {
 res + data
}

def combOp(res1: Int, res2: Int) : Int = {
 res1 + res2
}

prdd2.aggregateByKey(zeroValue)(seqOp, combOp)

---------------------------------------------------


scala> val zeroValue = 0
zeroValue: Int = 0

scala> def seqOp(res: Int, data: Int) : Int = {
     |  res + data
     | }
seqOp: (res: Int, data: Int)Int

scala> def combOp(res1: Int, res2: Int) : Int = {
     |  res1 + res2
     | }
combOp: (res1: Int, res2: Int)Int


---------------------------------------------------

scala> prdd2.aggregateByKey(zeroValue)(seqOp, combOp)
res135: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[172] at aggregateByKey at <console>:41

scala> prdd2.aggregateByKey(zeroValue)(seqOp, combOp).collect
res136: Array[(String, Int)] = Array((kalyan,8), (anil,7), (raju,4), (venkat,4), (raj,5))

scala> prdd2.aggregateByKey(zeroValue)(seqOp, combOp).collect.foreach(println)
(kalyan,8)
(anil,7)
(raju,4)
(venkat,4)
(raj,5)

---------------------------------------------------
Importance of `combineByKey`
---------------------------------------------------
scala> prdd2.combineByKey
<console>:29: error: ambiguous reference to overloaded definition,
both method combineByKey in class PairRDDFunctions of type [C](createCombiner: Int => C, mergeValue: (C, Int) => C, mergeCombiners: (C, C) => C)org.apache.spark.rdd.RDD[(String, C)]
and  method combineByKey in class PairRDDFunctions of type [C](createCombiner: Int => C, mergeValue: (C, Int) => C, mergeCombiners: (C, C) => C, numPartitions: Int)org.apache.spark.rdd.RDD[(String, C)]
match expected type ?
       prdd2.combineByKey
             ^


---------------------------------------------------
def combineByKey(createCombiner: Int => C, mergeValue: (C, Int) => C, mergeCombiners: (C, C) => C)
---------------------------------------------------

def createCombiner(Int) : C = {}
def mergeValue(C, Int) : C = {}
def mergeCombiners(C, C) : C = {}

---------------------------------------------------
def createCombiner(data: Int) : List[Int] = {
 data :: Nil
}

def mergeValue(res: List[Int], data: Int) : List[Int] = {
 res :+ data
}

def mergeCombiners(res1: List[Int], res2: List[Int]) : List[Int] = {
 res1 ::: res2
}

prdd2.combineByKey(createCombiner, mergeValue, mergeCombiners)

---------------------------------------------------


scala> def createCombiner(data: Int) : List[Int] = {
     |  data :: Nil
     | }
createCombiner: (data: Int)List[Int]

scala> def mergeValue(res: List[Int], data: Int) : List[Int] = {
     |  res :+ data
     | }
mergeValue: (res: List[Int], data: Int)List[Int]

scala> def mergeCombiners(res1: List[Int], res2: List[Int]) : List[Int] = {
     |  res1 ::: res2
     | }
mergeCombiners: (res1: List[Int], res2: List[Int])List[Int]

scala> prdd2.combineByKey(createCombiner, mergeValue, mergeCombiners)
res139: org.apache.spark.rdd.RDD[(String, List[Int])] = ShuffledRDD[175] at combineByKey at <console>:41

scala> prdd2.combineByKey(createCombiner, mergeValue, mergeCombiners).collect
res140: Array[(String, List[Int])] = Array((kalyan,List(2, 6)), (anil,List(2, 5)), (raju,List(4)), (venkat,List(4)), (raj,List(1, 4)))

scala> prdd2.combineByKey(createCombiner, mergeValue, mergeCombiners).collect.foreach(println)
(kalyan,List(2, 6))
(anil,List(2, 5))
(raju,List(4))
(venkat,List(4))
(raj,List(1, 4))


---------------------------------------------------
scala> val pr1 = prdd2.combineByKey(createCombiner, mergeValue, mergeCombiners)
pr1: org.apache.spark.rdd.RDD[(String, List[Int])] = ShuffledRDD[178] at combineByKey at <console>:40

scala> pr1.collect
res142: Array[(String, List[Int])] = Array((kalyan,List(2, 6)), (anil,List(2, 5)), (raju,List(4)), (venkat,List(4)), (raj,List(1, 4)))

scala> pr1.map( t => (t._1, t._2.sum))
res143: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[179] at map at <console>:43

scala> pr1.map( t => (t._1, t._2.sum)).collect
res144: Array[(String, Int)] = Array((kalyan,8), (anil,7), (raju,4), (venkat,4), (raj,5))


scala> pr1.map( t => (t._1, t._2.size)).collect
res146: Array[(String, Int)] = Array((kalyan,2), (anil,2), (raju,1), (venkat,1), (raj,2))

scala> pr1.map( t => (t._1, t._2.min)).collect
res147: Array[(String, Int)] = Array((kalyan,2), (anil,2), (raju,4), (venkat,4), (raj,1))

scala> pr1.map( t => (t._1, t._2.max)).collect
res148: Array[(String, Int)] = Array((kalyan,6), (anil,5), (raju,4), (venkat,4), (raj,4))


---------------------------------------------------



---------------------------------------------------


---------------------------------------------------

---------------------------------------------------


---------------------------------------------------



---------------------------------------------------


---------------------------------------------------

1 comment:

spark_streaming_examples

Create Spark Streaming Context: ========================================== scala: --------------- import org.apache.spark._ import ...