Monday 23 September 2019

spark_day_2_practice

---------------------------------------------------
def myfunc[T](index: Int, it: Iterator[T]): Iterator[String] = {
 it.toList.map(x => s"index -> $index, value -> $x").toIterator
}


val names = List("raj", "venkat", "anil", "ravi", "sunil", "anvith", "rajesh", "kiran", "surya", "kalyan")

val rdd1 = sc.parallelize(names, 2)


val nums = List(1, 2, 3, 4, 5)

val rdd2 = sc.parallelize(nums, 2)

---------------------------------------------------

cala> val names = List("raj", "venkat", "anil", "ravi", "sunil", "anvith", "rajesh", "kiran", "surya", "kalyan")
names: List[String] = List(raj, venkat, anil, ravi, sunil, anvith, rajesh, kiran, surya, kalyan)

scala> val rdd1 = sc.parallelize(names, 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> rdd1.collect
[Stage 0:>                                                          (0 +[Stage 0:>                                                          (0 +                                                                        res0: Array[String] = Array(raj, venkat, anil, ravi, sunil, anvith, rajesh, kiran, surya, kalyan)

scala> rdd1.collect.foreach(println)
raj
venkat
anil
ravi
sunil
anvith
rajesh
kiran
surya
kalyan


---------------------------------------------------

scala> def myfunc[T](index: Int, it: Iterator[T]): Iterator[String] = {
     |  it.toList.map(x => s"index -> $index, value -> $x").toIterator
     | }
myfunc: [T](index: Int, it: Iterator[T])Iterator[String]

scala> rdd1.mapPartitionsWithIndex(myfunc).collect.foreach(println)
index -> 0, value -> raj
index -> 0, value -> venkat
index -> 0, value -> anil
index -> 0, value -> ravi
index -> 0, value -> sunil
index -> 1, value -> anvith
index -> 1, value -> rajesh
index -> 1, value -> kiran
index -> 1, value -> surya
index -> 1, value -> kalyan



---------------------------------------------------


scala> rdd1.cartesian(rdd2).collect.foreach(println)
(raj,1)
(raj,2)
(venkat,1)
(venkat,2)
(anil,1)
(anil,2)
(ravi,1)
(ravi,2)
(sunil,1)
(sunil,2)
(raj,3)
(raj,4)
(raj,5)
(venkat,3)
(venkat,4)
(venkat,5)
(anil,3)
(anil,4)
(anil,5)
(ravi,3)
(ravi,4)
(ravi,5)
(sunil,3)
(sunil,4)
(sunil,5)
(anvith,1)
(anvith,2)
(rajesh,1)
(rajesh,2)
(kiran,1)
(kiran,2)
(surya,1)
(surya,2)
(kalyan,1)
(kalyan,2)
(anvith,3)
(anvith,4)
(anvith,5)
(rajesh,3)
(rajesh,4)
(rajesh,5)
(kiran,3)
(kiran,4)
(kiran,5)
(surya,3)
(surya,4)
(surya,5)
(kalyan,3)
(kalyan,4)
(kalyan,5)

---------------------------------------------------

scala> rdd1.cartesian(rdd2).count
res5: Long = 50

---------------------------------------------------

val nums = List(1, 2, 3, 4, 5, 6, 7,8, 9, 10, 11, 12)
val rdd3 = sc.parallelize(nums, 5)
---------------------------------------------------

scala> val nums = List(1, 2, 3, 4, 5, 6, 7,8, 9, 10, 11, 12)
nums: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)

scala> val rdd3 = sc.parallelize(nums, 5)
rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:26

scala> rdd3.getNumPartitions
res6: Int = 5

scala> rdd3.coalesce(3).getNumPartitions
res7: Int = 3

scala> rdd3.coalesce(3, false).getNumPartitions
res8: Int = 3

scala> rdd3.coalesce(3, true).getNumPartitions
res9: Int = 3

---------------------------------------------------

scala> rdd3.repartition(3).getNumPartitions
res10: Int = 3

scala> rdd3.repartition(10).getNumPartitions
res11: Int = 10

scala> rdd3.coalesce(10).getNumPartitions
res12: Int = 5

---------------------------------------------------
scala> rdd3.context
res13: org.apache.spark.SparkContext = org.apache.spark.SparkContext@185b9c70

scala> rdd3.countByValue
res14: scala.collection.Map[Int,Long] = Map(5 -> 1, 10 -> 1, 1 -> 1, 6 -> 1, 9 -> 1, 2 -> 1, 12 -> 1, 7 -> 1, 3 -> 1, 11 -> 1, 8 -> 1, 4 -> 1)

scala> rdd1.countByValue
res15: scala.collection.Map[String,Long] = Map(raj -> 1, anvith -> 1, kiran -> 1, ravi -> 1, rajesh -> 1, surya -> 1, venkat -> 1, anil -> 1, sunil -> 1, kalyan -> 1)

---------------------------------------------------

val duplicates = List(1, 2, 3, 4, 5, 1, 2, 3, 1, 4)
val rdd4 = sc.parallelize(duplicates, 2)

---------------------------------------------------

scala> val duplicates = List(1, 2, 3, 4, 5, 1, 2, 3, 1, 4)
duplicates: List[Int] = List(1, 2, 3, 4, 5, 1, 2, 3, 1, 4)

scala> val rdd4 = sc.parallelize(duplicates, 2)
rdd4: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at <console>:26

scala> rdd4.distinct
res17: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[31] at distinct at <console>:29

scala> rdd4.distinct.collect
res18: Array[Int] = Array(4, 2, 1, 3, 5)


scala> rdd3.filter(x => x > 10).collect
res19: Array[Int] = Array(11, 12)

scala> rdd3.filter(x => x > 5).collect
res20: Array[Int] = Array(6, 7, 8, 9, 10, 11, 12)

---------------------------------------------------
NOte: only available on Spark-1.x

rdd.filterWith(index)((data,index) => boolean condition)

rdd.filterWith(x => x)((data, index) => data % 2 == 0 && index % 2 == 0)

---------------------------------------------------

val lines = List("I am going", "to hyd", "I am learning", "spark course")
val rdd5 = sc.parallelize(lines, 2)

---------------------------------------------------

scala> val lines = List("I am going", "to hyd", "I am learning", "spark course")
lines: List[String] = List(I am going, to hyd, I am learning, spark course)

scala> val rdd5 = sc.parallelize(lines, 2)
rdd5: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[37] at parallelize at <console>:26

scala> rdd5.collect.foreach(println)
I am going
to hyd
I am learning
spark course


---------------------------------------------------

scala> rdd5.flatMap(x => x).collect
res23: Array[Char] = Array(I,  , a, m,  , g, o, i, n, g, t, o,  , h, y, d, I,  , a, m,  , l, e, a, r, n, i, n, g, s, p, a, r, k,  , c, o, u, r, s, e)

scala> rdd5.flatMap(x => x.split("")).collect
res24: Array[String] = Array(I, " ", a, m, " ", g, o, i, n, g, t, o, " ", h, y, d, I, " ", a, m, " ", l, e, a, r, n, i, n, g, s, p, a, r, k, " ", c, o, u, r, s, e)

scala> rdd5.flatMap(x => x.split(" ")).collect
res25: Array[String] = Array(I, am, going, to, hyd, I, am, learning, spark, course)

scala> rdd5.flatMap(x => x.split(" ")).countByValue
res27: scala.collection.Map[String,Long] = Map(course -> 1, am -> 2, going -> 1, I -> 2, hyd -> 1, to -> 1, spark -> 1, learning -> 1)

---------------------------------------------------
rdd.aggregate(zeroVal)(op, op)
<==>
rdd.fold(zeroVal)(op)


rdd.reduce(op)
<==>
rdd.fold(0)(op)


---------------------------------------------------

scala> val rdd6 = rdd3.coalesce(2)
rdd6: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[53] at coalesce at <console>:28

scala> rdd6.fold(1)((a,b) => a + b)
res34: Int = 81

scala> rdd6.reduce((a,b) => a + b)
res36: Int = 78

scala> rdd6.reduce((a,b) => a + b)
res36: Int = 78

scala> rdd6.fold(0)((a,b) => a + b)
res37: Int = 78


---------------------------------------------------

scala> rdd5.collect
res39: Array[String] = Array(I am going, to hyd, I am learning, spark course)

scala> rdd4.collect
res40: Array[Int] = Array(1, 2, 3, 4, 5, 1, 2, 3, 1, 4)

scala> rdd4.groupBy(x => x).collect
res41: Array[(Int, Iterable[Int])] = Array((4,CompactBuffer(4, 4)), (2,CompactBuffer(2, 2)), (1,CompactBuffer(1, 1, 1)), (3,CompactBuffer(3, 3)), (5,CompactBuffer(5)))


---------------------------------------------------

scala> val rdd7 = rdd5.flatMap(x => x.split(" "))
rdd7: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[57] at flatMap at <console>:28

scala> rdd7.collect
res43: Array[String] = Array(I, am, going, to, hyd, I, am, learning, spark, course)

scala> rdd7.groupBy(x => x).collect
res44: Array[(String, Iterable[String])] = Array((learning,CompactBuffer(learning)), (am,CompactBuffer(am, am)), (going,CompactBuffer(going)), (spark,CompactBuffer(spark)), (hyd,CompactBuffer(hyd)), (I,CompactBuffer(I, I)), (to,CompactBuffer(to)), (course,CompactBuffer(course)))

scala> rdd7.groupBy(x => x.length).collect
res45: Array[(Int, Iterable[String])] = Array((6,CompactBuffer(course)), (8,CompactBuffer(learning)), (2,CompactBuffer(am, to, am)), (1,CompactBuffer(I, I)), (3,CompactBuffer(hyd)), (5,CompactBuffer(going, spark)))

scala> rdd7.groupBy(_.length).collect
res46: Array[(Int, Iterable[String])] = Array((6,CompactBuffer(course)), (8,CompactBuffer(learning)), (2,CompactBuffer(am, to, am)), (1,CompactBuffer(I, I)), (3,CompactBuffer(hyd)), (5,CompactBuffer(going, spark)))


---------------------------------------------------

val a = sc.parallelize(List(1,2,3,4,5),1)
val b = sc.parallelize(List(4,5,6,7,8),1)

a.intersection(b).collect

a.subtract(b).collect

a.union(b).collect

---------------------------------------------------


scala> val a = sc.parallelize(List(1,2,3,4,5),1)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24

scala> val b = sc.parallelize(List(4,5,6,7,8),1)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[66] at parallelize at <console>:24

scala> a.intersection(b).collect
res47: Array[Int] = Array(4, 5)

scala> a.subtract(b).collect
res48: Array[Int] = Array(1, 2, 3)

scala> a.union(b).collect
res49: Array[Int] = Array(1, 2, 3, 4, 5, 4, 5, 6, 7, 8)

scala> a.union(b).distinct
res50: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[81] at distinct at <console>:29

scala> a.union(b).distinct.collect
res51: Array[Int] = Array(4, 6, 8, 2, 1, 3, 7, 5)



---------------------------------------------------

scala> rdd7.collect
res52: Array[String] = Array(I, am, going, to, hyd, I, am, learning, spark, course)

scala> rdd7.keyBy(x => x).collect
res53: Array[(String, String)] = Array((I,I), (am,am), (going,going), (to,to), (hyd,hyd), (I,I), (am,am), (learning,learning), (spark,spark), (course,course))

scala> rdd7.keyBy(x => x.length).collect
res54: Array[(Int, String)] = Array((1,I), (2,am), (5,going), (2,to), (3,hyd), (1,I), (2,am), (8,learning), (5,spark), (6,course))


---------------------------------------------------

scala> rdd7.collect
res55: Array[String] = Array(I, am, going, to, hyd, I, am, learning, spark, course)

scala> rdd7.sortBy(x => x).collect
res56: Array[String] = Array(I, I, am, am, course, going, hyd, learning, spark, to)

scala> rdd7.sortBy(x => x, false).collect
res57: Array[String] = Array(to, spark, learning, hyd, going, course, am, am, I, I)

scala> rdd7.sortBy(x => x, true).collect
res58: Array[String] = Array(I, I, am, am, course, going, hyd, learning, spark, to)


---------------------------------------------------

scala> rdd7.sortBy(x => x).getNumPartitions
res61: Int = 2

scala> rdd7.sortBy(x => x,false).getNumPartitions
res62: Int = 2

scala> rdd7.sortBy(x => x,false, 3).getNumPartitions
res63: Int = 3


---------------------------------------------------

scala> rdd7.sortBy(x => x).collect
res64: Array[String] = Array(I, I, am, am, course, going, hyd, learning, spark, to)

scala> rdd7.sortBy(x => x.reverse).collect
res65: Array[String] = Array(I, I, hyd, course, learning, going, spark, am, am, to)

scala> rdd7.sortBy(x => x.reverse, true , 1).collect
res66: Array[String] = Array(I, I, hyd, course, learning, going, spark, am, am, to)

---------------------------------------------------

scala> rdd7.pipe("grep am").collect
res69: Array[String] = Array(am, am)

scala> rdd7.pipe("grep g").collect
res70: Array[String] = Array(going, learning)

scala> rdd7.pipe("cat").collect
res71: Array[String] = Array(I, am, going, to, hyd, I, am, learning, spark, course)

scala> rdd7.pipe("head -n 2").collect
res72: Array[String] = Array(I, am, I, am)

scala> rdd7.pipe("head -n 3").collect
res73: Array[String] = Array(I, am, going, I, am, learning)


---------------------------------------------------

scala> rdd6.collect
res74: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)

scala> rdd6.take(5)
res75: Array[Int] = Array(1, 2, 3, 4, 5)

scala> rdd7.take(5)
res78: Array[String] = Array(I, am, going, to, hyd)

scala> rdd7.takeOrdered(5)
res79: Array[String] = Array(I, I, am, am, course)

scala> rdd7.top(5)
res80: Array[String] = Array(to, spark, learning, hyd, going)

scala> rdd7.sortBy(x => x).take(5)
res81: Array[String] = Array(I, I, am, am, course)

scala> rdd7.sortBy(x => x, false).take(5)
res82: Array[String] = Array(to, spark, learning, hyd, going)

---------------------------------------------------

scala> rdd7.toDebugString
res83: String =
(2) MapPartitionsRDD[57] at flatMap at <console>:28 []
 |  ParallelCollectionRDD[37] at parallelize at <console>:26 []

---------------------------------------------------
Difference between scala & spark usage of zip:
---------------------------------------------------

val list1 = List(1,2,3,4,5)
val list2 = List('a','b','c','d','e')
val list3 = List("raj", "venkat", "anil", "ravi")

list1.zip(list2).foreach(println)
list1.zip(list3).foreach(println)
list2.zip(list2).foreach(println)

val l1 = sc.parallelize(list1, 2)
val l2 = sc.parallelize(list2, 2)
val l3 = sc.parallelize(list3, 2)

l1.zip(l2).foreach(println)

These are not allowed in Spark:
Note: Caused by: org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition

l1.zip(l3).foreach(println)
l2.zip(l1).foreach(println)

---------------------------------------------------
scala> val list1 = List(1,2,3,4,5)
list1: List[Int] = List(1, 2, 3, 4, 5)

scala> val list2 = List('a','b','c','d','e')
list2: List[Char] = List(a, b, c, d, e)

scala> val list3 = List("raj", "venkat", "anil", "ravi")
list3: List[String] = List(raj, venkat, anil, ravi)

scala> list1.zip(list2).foreach(println)
(1,a)
(2,b)
(3,c)
(4,d)
(5,e)


scala> list1.zip(list3).foreach(println)
(1,raj)
(2,venkat)
(3,anil)
(4,ravi)

scala> list3.zip(list1).foreach(println)
(raj,1)
(venkat,2)
(anil,3)
(ravi,4)

---------------------------------------------------

scala> val l1 = sc.parallelize(list1, 2)
l1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[149] at parallelize at <console>:26

scala> val l2 = sc.parallelize(list2, 2)
l2: org.apache.spark.rdd.RDD[Char] = ParallelCollectionRDD[150] at parallelize at <console>:26

scala> val l3 = sc.parallelize(list3, 2)
l3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[151] at parallelize at <console>:26

scala> l1.zip(l2).foreach(println)
(1,a)
(3,c)
(2,b)
(4,d)
(5,e)

---------------------------------------------------
RDD's are 2 types:
----------------------
1. normal
2. pairrdd

scala> val prdd1 = l1.zip(l2)
prdd1: org.apache.spark.rdd.RDD[(Int, Char)] = ZippedPartitionsRDD2[155] at zip at <console>:32

scala> prdd1.keys
res90: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[156] at keys at <console>:35

scala> prdd1.keys.collect
res91: Array[Int] = Array(1, 2, 3, 4, 5)

scala> prdd1.values
res92: org.apache.spark.rdd.RDD[Char] = MapPartitionsRDD[158] at values at <console>:35

scala> prdd1.values.collect
res93: Array[Char] = Array(a, b, c, d, e)

scala> prdd1.collect
res94: Array[(Int, Char)] = Array((1,a), (2,b), (3,c), (4,d), (5,e))

scala> prdd1.collectAsMap
res95: scala.collection.Map[Int,Char] = Map(2 -> b, 5 -> e, 4 -> d, 1 -> a, 3 -> c)

---------------------------------------------------

val names = List("raj", "venkat", "anil", "ravi", "sunil", "anvith", "rajesh", "kiran", "surya", "kalyan")

val rdd1 = sc.parallelize(names, 2)

val prdd1 = rdd1.map(x => (x.length, x))

val prdd2 = rdd1.map(x => (x, x.length))


---------------------------------------------------

scala> val names = List("raj", "venkat", "anil", "ravi", "sunil", "anvith", "rajesh", "kiran", "surya", "kalyan")
names: List[String] = List(raj, venkat, anil, ravi, sunil, anvith, rajesh, kiran, surya, kalyan)

scala> val rdd1 = sc.parallelize(names, 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[160] at parallelize at <console>:26

scala> val prdd1 = rdd1.map(x => (x.length, x))
prdd1: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[161] at map at <console>:28

scala> val prdd2 = rdd1.map(x => (x, x.length))
prdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[162] at map at <console>:28

scala> rdd1.collect
res96: Array[String] = Array(raj, venkat, anil, ravi, sunil, anvith, rajesh, kiran, surya, kalyan)

scala> prdd1.collect
res97: Array[(Int, String)] = Array((3,raj), (6,venkat), (4,anil), (4,ravi), (5,sunil), (6,anvith), (6,rajesh), (5,kiran), (5,surya), (6,kalyan))

scala> prdd2.collect
res98: Array[(String, Int)] = Array((raj,3), (venkat,6), (anil,4), (ravi,4), (sunil,5), (anvith,6), (rajesh,6), (kiran,5), (surya,5), (kalyan,6))

---------------------------------------------------
Difference between `aggregate` and `agrregateByKey`
---------------------------------------------------
aggregate: (seqOp: (Unit, (Int, String)) => Unit, combOp: (Unit, Unit) => Unit)

def seqOp(Unit, (Int, String)) : Unit = {

}

def combOp(Unit, Unit) : Unit = {

}

---------------------------------------------------

scala> prdd1.mapPartitionsWithIndex(myfunc).collect.foreach(println)
index -> 0, value -> (3,raj)
index -> 0, value -> (6,venkat)
index -> 0, value -> (4,anil)
index -> 0, value -> (4,ravi)
index -> 0, value -> (5,sunil)
index -> 1, value -> (6,anvith)
index -> 1, value -> (6,rajesh)
index -> 1, value -> (5,kiran)
index -> 1, value -> (5,surya)
index -> 1, value -> (6,kalyan)

---------------------------------------------------
val intialVal = (0, "")

def seqOp(res: (Int, String), data: (Int, String)) : (Int, String) = {
 (res._1 + data._1 , res._2 + ":" + data._2)
}

def combOp(res: (Int, String), data: (Int, String)) : (Int, String) = {
 (res._1 + data._1 , res._2 + "*" + data._2)
}

prdd1.aggregate(intialVal)(seqOp, combOp)

---------------------------------------------------

scala> val intialVal = (0, "")
intialVal: (Int, String) = (0,"")

scala> def seqOp(res: (Int, String), data: (Int, String)) : (Int, String) = {
     |  (res._1 + data._1 , res._2 + ":" + data._2)
     | }
seqOp: (res: (Int, String), data: (Int, String))(Int, String)

scala> def combOp(res: (Int, String), data: (Int, String)) : (Int, String) = {
     |  (res._1 + data._1 , res._2 + "*" + data._2)
     | }
combOp: (res: (Int, String), data: (Int, String))(Int, String)

scala> prdd1.aggregate(intialVal)(seqOp, combOp)
res103: (Int, String) = (50,*:raj:venkat:anil:ravi:sunil*:anvith:rajesh:kiran:surya:kalyan)



---------------------------------------------------

aggregateByKey: (seqOp: (Unit, String) => Unit, combOp: (Unit, Unit) => Unit)

def seqOp(Unit, String) : Unit = {

}

def combOp(Unit, Unit) : Unit = {

}

---------------------------------------------------
val intialVal = "#"

def seqOp(res: String, data: String) :  String = {
 res + ":" + data
}

def combOp(res: String, data: String) :  String = {
 res + "*" + data
}

prdd1.aggregateByKey(intialVal)(seqOp, combOp)


---------------------------------------------------

scala> val intialVal = "#"
intialVal: String = #

scala> def seqOp(res: String, data: String) :  String = {
     |  res + ":" + data
     | }
seqOp: (res: String, data: String)String

scala> def combOp(res: String, data: String) :  String = {
     |  res + "*" + data
     | }
combOp: (res: String, data: String)String

scala> prdd1.aggregateByKey(intialVal)(seqOp, combOp)
res105: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[164] at aggregateByKey at <console>:37

scala> prdd1.aggregateByKey(intialVal)(seqOp, combOp).collect
res106: Array[(Int, String)] = Array((4,#:anil:ravi), (6,#:venkat*#:anvith:rajesh:kalyan), (3,#:raj), (5,#:sunil*#:kiran:surya))


---------------------------------------------------

Note:

1. in aggregate `intial value` used in `seq operation` & `combine operation`

2. in `aggregateByKey` `intial value` used only in `seq operation`, not in `combine operation`

---------------------------------------------------

(createCombiner: String => C, mergeValue: (C, String) => C, mergeCombiners: (C, C) => C)


def createCombiner(String) : C = {

}

def mergeValue(C, String) : C = {

}

def mergeCombiners(C, C) : C = {

}
---------------------------------------------------
Example1 on `combineByKey`
---------------------------------------------------

def createCombiner(res: String) : String = {
  res
}

def mergeValue(res: String, data: String) : String = {
 res +  ":" + data
}

def mergeCombiners(res: String, data: String) : String = {
 res +  "*" + data
}

prdd1.combineByKey(createCombiner, mergeValue, mergeCombiners)

---------------------------------------------------

scala> def createCombiner(res: String) : String = {
     |   res
     | }
createCombiner: (res: String)String

scala> def mergeValue(res: String, data: String) : String = {
     |  res +  ":" + data
     | }
mergeValue: (res: String, data: String)String

scala> def mergeCombiners(res: String, data: String) : String = {
     |  res +  "*" + data
     | }
mergeCombiners: (res: String, data: String)String

scala> prdd1.combineByKey(createCombiner, mergeValue, mergeCombiners)
res108: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[166] at combineByKey at <console>:37

scala> prdd1.combineByKey(createCombiner, mergeValue, mergeCombiners).collect
res109: Array[(Int, String)] = Array((4,anil:ravi), (6,venkat*anvith:rajesh:kalyan), (3,raj), (5,sunil*kiran:surya))


---------------------------------------------------
Example2 on `combineByKey`
---------------------------------------------------

def createCombiner(res: String) : List[String] = {
  List(res)
}

def mergeValue(res: List[String], data: String) : List[String] = {
 res :+ data
}

def mergeCombiners(res: List[String], data: List[String]) : List[String] = {
 res ::: data
}

prdd1.combineByKey(createCombiner, mergeValue, mergeCombiners)



---------------------------------------------------

scala> def createCombiner(res: String) : List[String] = {
     |   List(res)
     | }
createCombiner: (res: String)List[String]

scala> def mergeValue(res: List[String], data: String) : List[String] = {
     |  res :+ data
     | }
mergeValue: (res: List[String], data: String)List[String]

scala> def mergeCombiners(res: List[String], data: List[String]) : List[String] = {
     |  res ::: data
     | }
mergeCombiners: (res: List[String], data: List[String])List[String]

scala> prdd1.combineByKey(createCombiner, mergeValue, mergeCombiners)
res110: org.apache.spark.rdd.RDD[(Int, List[String])] = ShuffledRDD[168] at combineByKey at <console>:37

scala> prdd1.combineByKey(createCombiner, mergeValue, mergeCombiners).collect
res111: Array[(Int, List[String])] = Array((4,List(anil, ravi)), (6,List(venkat, anvith, rajesh, kalyan)), (3,List(raj)), (5,List(sunil, kiran, surya)))




---------------------------------------------------

1 comment:

spark_streaming_examples

Create Spark Streaming Context: ========================================== scala: --------------- import org.apache.spark._ import ...