Monday 23 September 2019

spark_day_3_practice

-------------------------------------------------------

def myfunc[T](index: Int, it: Iterator[T]): Iterator[String] = {
 it.toList.map(x => s"index -> $index, value -> $x").toIterator
}

val names = List("raj", "venkat", "anil", "ravi", "sunil", "anvith", "rajesh", "kiran", "surya", "kalyan")

val rdd1 = sc.parallelize(names, 2)

val prdd1 = rdd1.map(x => (x.length, x))

-------------------------------------------------------

val r1 = sc.parallelize(List(1,2,3,4,5), 2)
val p1 = r1.map(x => (x, 'a'))
val p2 = r1.map(x => (x, 'b'))
val p3 = r1.map(x => (x, 'c'))
val p4 = r1.map(x => (x, 'd'))
val p5 = r1.map(x => (x, 'e'))

-------------------------------------------------------
scala> val r1 = sc.parallelize(List(1,2,3,4,5), 2)
r1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> val p1 = r1.map(x => (x, 'a'))
p1: org.apache.spark.rdd.RDD[(Int, Char)] = MapPartitionsRDD[3] at map at <console>:26

scala> val p2 = r1.map(x => (x, 'b'))
p2: org.apache.spark.rdd.RDD[(Int, Char)] = MapPartitionsRDD[4] at map at <console>:26

scala> val p3 = r1.map(x => (x, 'c'))
p3: org.apache.spark.rdd.RDD[(Int, Char)] = MapPartitionsRDD[5] at map at <console>:26

scala> val p4 = r1.map(x => (x, 'd'))
p4: org.apache.spark.rdd.RDD[(Int, Char)] = MapPartitionsRDD[6] at map at <console>:26

scala> val p5 = r1.map(x => (x, 'e'))
p5: org.apache.spark.rdd.RDD[(Int, Char)] = MapPartitionsRDD[7] at map at <console>:26



scala> p1.cogroup(p2).collect
res2: Array[(Int, (Iterable[Char], Iterable[Char]))] = Array((4,(CompactBuffer(a),CompactBuffer(b))), (2,(CompactBuffer(a),CompactBuffer(b))), (1,(CompactBuffer(a),CompactBuffer(b))), (3,(CompactBuffer(a),CompactBuffer(b))), (5,(CompactBuffer(a),CompactBuffer(b))))

scala> p1.cogroup(p2, p3).collect
res3: Array[(Int, (Iterable[Char], Iterable[Char], Iterable[Char]))] = Array((4,(CompactBuffer(a),CompactBuffer(b),CompactBuffer(c))), (2,(CompactBuffer(a),CompactBuffer(b),CompactBuffer(c))), (1,(CompactBuffer(a),CompactBuffer(b),CompactBuffer(c))), (3,(CompactBuffer(a),CompactBuffer(b),CompactBuffer(c))), (5,(CompactBuffer(a),CompactBuffer(b),CompactBuffer(c))))

scala> p1.cogroup(p2, p3, p4).collect
res4: Array[(Int, (Iterable[Char], Iterable[Char], Iterable[Char], Iterable[Char]))] = Array((4,(CompactBuffer(a),CompactBuffer(b),CompactBuffer(c),CompactBuffer(d))), (2,(CompactBuffer(a),CompactBuffer(b),CompactBuffer(c),CompactBuffer(d))), (1,(CompactBuffer(a),CompactBuffer(b),CompactBuffer(c),CompactBuffer(d))), (3,(CompactBuffer(a),CompactBuffer(b),CompactBuffer(c),CompactBuffer(d))), (5,(CompactBuffer(a),CompactBuffer(b),CompactBuffer(c),CompactBuffer(d))))



Note: allows only 4 pairrdd's to cogroup

-------------------------------------------------------

scala> prdd1.collect
res8: Array[(Int, String)] = Array((3,raj), (6,venkat), (4,anil), (4,ravi), (5,sunil), (6,anvith), (6,rajesh), (5,kiran), (5,surya), (6,kalyan))

scala> prdd1.countByKey
res9: scala.collection.Map[Int,Long] = Map(4 -> 2, 6 -> 4, 3 -> 1, 5 -> 3)

scala> prdd1.countByValue
res10: scala.collection.Map[(Int, String),Long] = Map((5,kiran) -> 1, (6,kalyan) -> 1, (3,raj) -> 1, (4,ravi) -> 1, (6,anvith) -> 1, (4,anil) -> 1, (6,rajesh) -> 1, (5,surya) -> 1, (6,venkat) -> 1, (5,sunil) -> 1)


-------------------------------------------------------

val lines = List("I am going", "to hyd", "I am learning", "spark course")
val l1 = sc.parallelize(lines, 2)
val pl1 = l1.flatMap(x => x.split(" ")).map(x => (x.length, x))

scala> val pl1 = l1.flatMap(x => x.split(" ")).map(x => (x.length, x))
pl1: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[24] at map at <console>:28

scala> pl1.collect
res12: Array[(Int, String)] = Array((1,I), (2,am), (5,going), (2,to), (3,hyd), (1,I), (2,am), (8,learning), (5,spark), (6,course))

scala> pl1.flatMap(x => x._2).collect
res20: Array[Char] = Array(I, a, m, g, o, i, n, g, t, o, h, y, d, I, a, m, l, e, a, r, n, i, n, g, s, p, a, r, k, c, o, u, r, s, e)

-------------------------------------------------------

l1.flatMap(x => x.split(" ")).collect

val pl2 = l1.map(x => (x.length, x))
pl2.flatMapValues(x => x).collect

-------------------------------------------------------


scala> val pl2 = l1.map(x => (x.length, x))
pl2: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[29] at map at <console>:28

scala> pl2.collect
res24: Array[(Int, String)] = Array((10,I am going), (6,to hyd), (13,I am learning), (12,spark course))

scala> pl2.flatMapValues(x => x).collect
res25: Array[(Int, Char)] = Array((10,I), (10, ), (10,a), (10,m), (10, ), (10,g), (10,o), (10,i), (10,n), (10,g), (6,t), (6,o), (6, ), (6,h), (6,y), (6,d), (13,I), (13, ), (13,a), (13,m), (13, ), (13,l), (13,e), (13,a), (13,r), (13,n), (13,i), (13,n), (13,g), (12,s), (12,p), (12,a), (12,r), (12,k), (12, ), (12,c), (12,o), (12,u), (12,r), (12,s), (12,e))

scala> pl2.flatMapValues(x => x.split(" ")).collect
res26: Array[(Int, String)] = Array((10,I), (10,am), (10,going), (6,to), (6,hyd), (13,I), (13,am), (13,learning), (12,spark), (12,course))


-------------------------------------------------------

scala> prdd1.foldByKey("*")((res, data) => res + ":" + data).collect
res27: Array[(Int, String)] = Array((4,*:anil:ravi), (6,*:venkat:*:anvith:rajesh:kalyan), (3,*:raj), (5,*:sunil:*:kiran:surya))

scala> prdd1.foldByKey("*")((res, data) => res + ":" + data).collect.foreach(println)
(4,*:anil:ravi)
(6,*:venkat:*:anvith:rajesh:kalyan)
(3,*:raj)
(5,*:sunil:*:kiran:surya)


scala> prdd1.fold((0,"*"))((res, data) => (res._1 + data._1, res._2 + ":" + data._2))
res33: (Int, String) = (50,*:*:anvith:rajesh:kiran:surya:kalyan:*:raj:venkat:anil:ravi:sunil)

-------------------------------------------------------

val j1 = sc.parallelize(List((1, "aa"), (2, "bb"), (3, "cc")), 1)

val j2 = sc.parallelize(List((2, "xx"), (3, "yy"), (4, "zz")), 1)


j1.join(j2).collect

j1.leftOuterJoin(j2).collect

j1.rightOuterJoin(j2).collect

j1.fullOuterJoin(j2).collect
-------------------------------------------------------

scala> j1.join(j2).collect
res34: Array[(Int, (String, String))] = Array((3,(cc,yy)), (2,(bb,xx)))

scala> j1.leftOuterJoin(j2).collect
res35: Array[(Int, (String, Option[String]))] = Array((1,(aa,None)), (3,(cc,Some(yy))), (2,(bb,Some(xx))))

scala> j1.rightOuterJoin(j2).collect
res36: Array[(Int, (Option[String], String))] = Array((4,(None,zz)), (3,(Some(cc),yy)), (2,(Some(bb),xx)))

scala> j1.fullOuterJoin(j2).collect
res37: Array[(Int, (Option[String], Option[String]))] = Array((4,(None,Some(zz))), (1,(Some(aa),None)), (3,(Some(cc),Some(yy))), (2,(Some(bb),Some(xx))))


-------------------------------------------------------


scala> prdd1.reduce((t1, t2) => (t1._1 + t2._1, t1._2 + ":" + t2._2))
res39: (Int, String) = (50,anvith:rajesh:kiran:surya:kalyan:raj:venkat:anil:ravi:sunil)

scala> prdd1.reduceByKey((a,b) => a + ":" + b).collect
res40: Array[(Int, String)] = Array((4,anil:ravi), (6,venkat:anvith:rajesh:kalyan), (3,raj), (5,sunil:kiran:surya))

-------------------------------------------------------

WordCount Solutions using RDD & PairRDD
---------------------------------------------

val linesRdd = sc.parallelize(List("I am going", "to hyd", "I am learning", "spark course"), 2)

val wordsRdd = linesRdd.flatMap(line => line.split(" "))

// not a good approach
wordsRdd.countByValue


// for better practice

val pairRdd = wordsRdd.map(word => (word, 1))

pairRdd.groupByKey().map(x => (x._1, x._2.sum)).sortByKey().collect

pairRdd.reduceByKey((a,b) => a + b).sortByKey().collect

-------------------------------------------------------

scala> pairRdd.reduceByKey((a,b) => a + b).collect
res51: Array[(String, Int)] = Array((learning,1), (am,2), (going,1), (spark,1), (hyd,1), (I,2), (to,1), (course,1))

scala> pairRdd.reduceByKey((a,b) => a + b).sortByKey().collect
res52: Array[(String, Int)] = Array((I,2), (am,2), (course,1), (going,1), (hyd,1), (learning,1), (spark,1), (to,1))

-------------------------------------------------------
















-------------------------------------------------------











-------------------------------------------------------











-------------------------------------------------------













-------------------------------------------------------













-------------------------------------------------------
















-------------------------------------------------------











-------------------------------------------------------











-------------------------------------------------------













-------------------------------------------------------













--------------------------------------------------------------------------------------------------------------











-------------------------------------------------------











-------------------------------------------------------













-------------------------------------------------------













-------------------------------------------------------











-------------------------------------------------------











-------------------------------------------------------











-------------------------------------------------------













-------------------------------------------------------













-------------------------------------------------------

















-------------------------------------------------------











-------------------------------------------------------











-------------------------------------------------------













-------------------------------------------------------













-------------------------------------------------------
















-------------------------------------------------------











-------------------------------------------------------











-------------------------------------------------------













-------------------------------------------------------













-------------------------------------------------------

1 comment:

spark_streaming_examples

Create Spark Streaming Context: ========================================== scala: --------------- import org.apache.spark._ import ...