Monday 23 September 2019

Spark_Day3_3

---------------------------------------------------
val t1 = List((1, "kalyan"), (2, "raj"), (3, "venkat"), (4, "raju"))

val t2 = List((1, 10000), (2, 20000), (3, 30000), (5, 50000))

val prdd1 = sc.parallelize(t1, 1)

val prdd2 = sc.parallelize(t2, 1)

---------------------------------------------------

scala> val t1 = List((1, "kalyan"), (2, "raj"), (3, "venkat"), (4, "raju"))
t1: List[(Int, String)] = List((1,kalyan), (2,raj), (3,venkat), (4,raju))

scala> val t2 = List((1, 10000), (2, 20000), (3, 30000), (5, 50000))
t2: List[(Int, Int)] = List((1,10000), (2,20000), (3,30000), (5,50000))

scala> val prdd1 = sc.parallelize(t1, 1)
prdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[1] at parallelize at <console>:26

scala> val prdd2 = sc.parallelize(t2, 1)
prdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[2] at parallelize at <console>:26

---------------------------------------------------

scala> prdd1.join(prdd2).collect.foreach(println)
(1,(kalyan,10000))
(3,(venkat,30000))
(2,(raj,20000))

scala> prdd1.leftOuterJoin(prdd2).collect.foreach(println)
(4,(raju,None))
(1,(kalyan,Some(10000)))
(3,(venkat,Some(30000)))
(2,(raj,Some(20000)))

scala> prdd1.rightOuterJoin(prdd2).collect.foreach(println)
(1,(Some(kalyan),10000))
(3,(Some(venkat),30000))
(5,(None,50000))
(2,(Some(raj),20000))

scala> prdd1.fullOuterJoin(prdd2).collect.foreach(println)
(4,(Some(raju),None))
(1,(Some(kalyan),Some(10000)))
(3,(Some(venkat),Some(30000)))
(5,(None,Some(50000)))
(2,(Some(raj),Some(20000)))

---------------------------------------------------
scala> prdd1.cartesian(prdd2).collect.foreach(println)
((1,kalyan),(1,10000))
((1,kalyan),(2,20000))
((1,kalyan),(3,30000))
((1,kalyan),(5,50000))
((2,raj),(1,10000))
((2,raj),(2,20000))
((2,raj),(3,30000))
((2,raj),(5,50000))
((3,venkat),(1,10000))
((3,venkat),(2,20000))
((3,venkat),(3,30000))
((3,venkat),(5,50000))
((4,raju),(1,10000))
((4,raju),(2,20000))
((4,raju),(3,30000))
((4,raju),(5,50000))


---------------------------------------------------

scala> prdd1.cogroup(prdd2).collect.foreach(println)
(4,(CompactBuffer(raju),CompactBuffer()))
(1,(CompactBuffer(kalyan),CompactBuffer(10000)))
(3,(CompactBuffer(venkat),CompactBuffer(30000)))
(5,(CompactBuffer(),CompactBuffer(50000)))
(2,(CompactBuffer(raj),CompactBuffer(20000)))

scala> prdd1.cogroup(prdd2, prdd2).collect.foreach(println)
(4,(CompactBuffer(raju),CompactBuffer(),CompactBuffer()))
(1,(CompactBuffer(kalyan),CompactBuffer(10000),CompactBuffer(10000)))
(3,(CompactBuffer(venkat),CompactBuffer(30000),CompactBuffer(30000)))
(5,(CompactBuffer(),CompactBuffer(50000),CompactBuffer(50000)))
(2,(CompactBuffer(raj),CompactBuffer(20000),CompactBuffer(20000)))

scala> prdd1.cogroup(prdd2, prdd2, prdd2).collect.foreach(println)
(4,(CompactBuffer(raju),CompactBuffer(),CompactBuffer(),CompactBuffer()))
(1,(CompactBuffer(kalyan),CompactBuffer(10000),CompactBuffer(10000),CompactBuffer(10000)))
(3,(CompactBuffer(venkat),CompactBuffer(30000),CompactBuffer(30000),CompactBuffer(30000)))
(5,(CompactBuffer(),CompactBuffer(50000),CompactBuffer(50000),CompactBuffer(50000)))
(2,(CompactBuffer(raj),CompactBuffer(20000),CompactBuffer(20000),CompactBuffer(20000)))

---------------------------------------------------
Note: CoGroup allows only on maximum 4 PairRDD's
---------------------------------------------------

scala> prdd1.cogroup(prdd2, prdd2, prdd2, prdd2).collect.foreach(println)
<console>:33: error: overloaded method value cogroup with alternatives:
  [W1, W2, W3](other1: org.apache.spark.rdd.RDD[(Int, W1)], other2: org.apache.spark.rdd.RDD[(Int, W2)], other3: org.apache.spark.rdd.RDD[(Int, W3)], numPartitions: Int)org.apache.spark.rdd.RDD[(Int, (Iterable[String], Iterable[W1], Iterable[W2], Iterable[W3]))] <and>
  [W1, W2, W3](other1: org.apache.spark.rdd.RDD[(Int, W1)], other2: org.apache.spark.rdd.RDD[(Int, W2)], other3: org.apache.spark.rdd.RDD[(Int, W3)], partitioner: org.apache.spark.Partitioner)org.apache.spark.rdd.RDD[(Int, (Iterable[String], Iterable[W1], Iterable[W2], Iterable[W3]))]
 cannot be applied to (org.apache.spark.rdd.RDD[(Int, Int)], org.apache.spark.rdd.RDD[(Int, Int)], org.apache.spark.rdd.RDD[(Int, Int)], org.apache.spark.rdd.RDD[(Int, Int)])
       prdd1.cogroup(prdd2, prdd2, prdd2, prdd2).collect.foreach(println)
             ^

---------------------------------------------------

val ltuples1 = List(("raj",1), ("venkat",4), ("anil",2), ("anil",5), ("kalyan",2), ("raj",4), ("kalyan",6), ("raju",4))

val prdd3 = sc.parallelize(ltuples1, 2)

---------------------------------------------------

scala> prdd3.collectAsMap()
res1: scala.collection.Map[String,Int] = Map(kalyan -> 6, raju -> 4, raj -> 4, anil -> 5, venkat -> 4)

scala> prdd3.collect
res2: Array[(String, Int)] = Array((raj,1), (venkat,4), (anil,2), (anil,5), (kalyan,2), (raj,4), (kalyan,6), (raju,4))

---------------------------------------------------
scala> prdd3.collect.foreach(println)
(raj,1)
(venkat,4)
(anil,2)
(anil,5)
(kalyan,2)
(raj,4)
(kalyan,6)
(raju,4)

---------------------------------------------------
scala> prdd3.countByKey()
res14: scala.collection.Map[String,Long] = Map(raj -> 2, raju -> 1, venkat -> 1, anil -> 2, kalyan -> 2)

scala> prdd3.countByValue()
res15: scala.collection.Map[(String, Int),Long] = Map((venkat,4) -> 1, (kalyan,6) -> 1, (anil,5) -> 1, (raj,1) -> 1, (anil,2) -> 1, (raju,4) -> 1, (raj,4) -> 1, (kalyan,2) -> 1)


---------------------------------------------------

scala> prdd3.filter( t => t._1.length > 2).collect.foreach(println)
(raj,1)
(venkat,4)
(anil,2)
(anil,5)
(kalyan,2)
(raj,4)
(kalyan,6)
(raju,4)

scala> prdd3.filter( t => t._1.length > 4).collect.foreach(println)
(venkat,4)
(kalyan,2)
(kalyan,6)

scala> prdd3.filter( t => t._2 > 4).collect.foreach(println)
(anil,5)
(kalyan,6)

---------------------------------------------------

val ltuples2 = List((0,"I am going"), (11, "to hyd"), (18, "I am learning"), (32, "spark course"))

val prdd4 = sc.parallelize(ltuples2, 1)

---------------------------------------------------

scala> val ltuples2 = List((0,"I am going"), (11, "to hyd"), (18, "I am learning"), (32, "spark course"))
ltuples2: List[(Int, String)] = List((0,I am going), (11,to hyd), (18,I am learning), (32,spark course))

scala> val prdd4 = sc.parallelize(ltuples2, 1)
prdd4: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[32] at parallelize at <console>:26


---------------------------------------------------

scala> prdd4.flatMap(t => t._2)
res20: org.apache.spark.rdd.RDD[Char] = MapPartitionsRDD[33] at flatMap at <console>:29

scala> prdd4.flatMap(t => t._2).collect.foreach(println)
I

a
m

g
o
i
n
g
t
o

h
y
d
I

a
m

l
e
a
r
n
i
n
g
s
p
a
r
k

c
o
u
r
s
e

---------------------------------------------------

scala> prdd4.flatMapValues(t => t)
res22: org.apache.spark.rdd.RDD[(Int, Char)] = MapPartitionsRDD[35] at flatMapValues at <console>:29

scala> prdd4.flatMapValues(t => t).collect.foreach(println)
(0,I)
(0, )
(0,a)
(0,m)
(0, )
(0,g)
(0,o)
(0,i)
(0,n)
(0,g)
(11,t)
(11,o)
(11, )
(11,h)
(11,y)
(11,d)
(18,I)
(18, )
(18,a)
(18,m)
(18, )
(18,l)
(18,e)
(18,a)
(18,r)
(18,n)
(18,i)
(18,n)
(18,g)
(32,s)
(32,p)
(32,a)
(32,r)
(32,k)
(32, )
(32,c)
(32,o)
(32,u)
(32,r)
(32,s)
(32,e)

---------------------------------------------------


scala> prdd4.flatMap(t => t._2.split(" ")).collect.foreach(println)
I
am
going
to
hyd
I
am
learning
spark
course

scala> prdd4.flatMapValues(t => t.split(" ")).collect.foreach(println)
(0,I)
(0,am)
(0,going)
(11,to)
(11,hyd)
(18,I)
(18,am)
(18,learning)
(32,spark)
(32,course)


---------------------------------------------------

---------------------------------------------------
Compare `aggregate` and `fold`:
-----------------------------------------

aggregate(zeroValue)(seqOp, comOp)


aggregate(intialValue)(seqOp, seqOp)

<==>

fold(intialValue)(seqOp)


fold(firstValue)(seqOp)

<==>

reduce(seqOp)

---------------------------------------------------
Compare `aggregateByKey` and `foldByKey`:
-----------------------------------------

aggregateByKey(zeroValue)(seqOp, comOp)


aggregateByKey(intialValue)(seqOp, seqOp)

<==>

foldByKey(intialValue)(seqOp)


foldByKey(firstValue)(seqOp)

<==>

reduceByKey(seqOp)


---------------------------------------------------

scala> prdd3.collect
res26: Array[(String, Int)] = Array((raj,1), (venkat,4), (anil,2), (anil,5), (kalyan,2), (raj,4), (kalyan,6), (raju,4))


scala> prdd3.groupBy(x => x._1).collect.foreach(println)
(kalyan,CompactBuffer((kalyan,2), (kalyan,6)))
(anil,CompactBuffer((anil,2), (anil,5)))
(raju,CompactBuffer((raju,4)))
(venkat,CompactBuffer((venkat,4)))
(raj,CompactBuffer((raj,1), (raj,4)))

scala> prdd3.groupByKey().collect.foreach(println)
(kalyan,CompactBuffer(2, 6))
(anil,CompactBuffer(2, 5))
(raju,CompactBuffer(4))
(venkat,CompactBuffer(4))
(raj,CompactBuffer(1, 4))

-----------------------------------------
scala> prdd3.keys.collect
res33: Array[String] = Array(raj, venkat, anil, anil, kalyan, raj, kalyan, raju)

scala> prdd3.values.collect
res34: Array[Int] = Array(1, 4, 2, 5, 2, 4, 6, 4)


---------------------------------------------------
scala> prdd4.collect
res35: Array[(Int, String)] = Array((0,I am going), (11,to hyd), (18,I am learning), (32,spark course))


scala> prdd4.map(t => t._2.reverse).collect
res37: Array[String] = Array(gniog ma I, dyh ot, gninrael ma I, esruoc kraps)

scala> prdd4.mapValues(t => t.reverse).collect
res38: Array[(Int, String)] = Array((0,gniog ma I), (11,dyh ot), (18,gninrael ma I), (32,esruoc kraps))


---------------------------------------------------
Note:
----------
rdd.partitions.count <==> rdd.getNumPartitions

---------------------------------------------------

scala> prdd4.collect
res39: Array[(Int, String)] = Array((0,I am going), (11,to hyd), (18,I am learning), (32,spark course))

scala> prdd4.map(t => t._2).collect
res40: Array[String] = Array(I am going, to hyd, I am learning, spark course)

scala> prdd4.map(t => t._2).flatMap(x => x.split(" ")).collect
res41: Array[String] = Array(I, am, going, to, hyd, I, am, learning, spark, course)

---------------------------------------------------

scala> val pr1 = prdd4.map(t => t._2).flatMap(x => x.split(" ")).map(x => (x ,1))
pr1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[56] at map at <console>:28

scala> pr1.collect
res42: Array[(String, Int)] = Array((I,1), (am,1), (going,1), (to,1), (hyd,1), (I,1), (am,1), (learning,1), (spark,1), (course,1))

scala> pr1.reduceByKey((a,b) => a + b)
res43: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[57] at reduceByKey at <console>:31

scala> pr1.reduceByKey((a,b) => a + b).collect
res44: Array[(String, Int)] = Array((learning,1), (spark,1), (am,2), (hyd,1), (I,2), (to,1), (going,1), (course,1))

scala> pr1.groupByKey().collect
res56: Array[(String, Iterable[Int])] = Array((learning,CompactBuffer(1)), (spark,CompactBuffer(1)), (am,CompactBuffer(1, 1)), (hyd,CompactBuffer(1)), (I,CompactBuffer(1, 1)), (to,CompactBuffer(1)), (going,CompactBuffer(1)), (course,CompactBuffer(1)))

scala> pr1.groupByKey().map(t => (t._1, t._2.sum))
res57: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[63] at map at <console>:31

scala> pr1.groupByKey().map(t => (t._1, t._2.sum)).collect
res58: Array[(String, Int)] = Array((learning,1), (spark,1), (am,2), (hyd,1), (I,2), (to,1), (going,1), (course,1))

---------------------------------------------------

scala> prdd3.collect.foreach(println)
(raj,1)
(venkat,4)
(anil,2)
(anil,5)
(kalyan,2)
(raj,4)
(kalyan,6)
(raju,4)

scala> prdd3.sortBy(t => t._1).collect.foreach(println)
(anil,2)
(anil,5)
(kalyan,2)
(kalyan,6)
(raj,1)
(raj,4)
(raju,4)
(venkat,4)

scala> prdd3.sortBy(t => t._2).collect.foreach(println)
(raj,1)
(anil,2)
(kalyan,2)
(venkat,4)
(raj,4)
(raju,4)
(anil,5)
(kalyan,6)

scala> prdd3.sortByKey().collect.foreach(println)
(anil,2)
(anil,5)
(kalyan,2)
(kalyan,6)
(raj,1)
(raj,4)
(raju,4)
(venkat,4)


---------------------------------------------------
Playing with Load & Store operations
---------------------------------------------------
1. Create a RDD from DataSets(text file / csv /tsv /....)
---------------------------------------------------
val rdd = sc.textFile(<file path> , <no.of partitions>)

---------------------------------------------------
val filePath = "file:///home/orienit/spark/input/demoinput"

val rdd1 = sc.textFile(filePath)

rdd1.getNumPartitions


val rdd1 = sc.textFile(filePath, 1)

rdd1.getNumPartitions

---------------------------------------------------

scala> val filePath = "file:///home/orienit/spark/input/demoinput"
filePath: String = file:///home/orienit/spark/input/demoinput

scala> val rdd1 = sc.textFile(filePath)
rdd1: org.apache.spark.rdd.RDD[String] = file:///home/orienit/spark/input/demoinput MapPartitionsRDD[90] at textFile at <console>:26

scala> rdd1.getNumPartitions
res69: Int = 2

scala> val rdd1 = sc.textFile(filePath, 1)
rdd1: org.apache.spark.rdd.RDD[String] = file:///home/orienit/spark/input/demoinput MapPartitionsRDD[92] at textFile at <console>:26

scala> rdd1.getNumPartitions
res70: Int = 1

---------------------------------------------------
WordCount in Spark with Scala:
---------------------------------------------------
val filePath = "file:///home/orienit/spark/input/demoinput"

val fileRdd = sc.textFile(filePath, 1)

val wordsRdd = fileRdd.flatMap(line => line.split(" "))

val tupleRdd = wordsRdd.map(word => (word, 1))

val wordCountRdd = tupleRdd.reduceByKey((a,b) => a + b)

val sortedRdd = wordCountRdd.sortByKey()

sortedRdd.collect()

val outputPath = "file:///home/orienit/spark/output/wordcount"

sortedRdd.saveAsTextFile(outputPath)

---------------------------------------------------

val filePath = "file:///home/orienit/spark/input/demoinput"

val outputPath = "file:///home/orienit/spark/output/wordcount"

val fileRdd = sc.textFile(filePath, 1)

val sortedRdd = fileRdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b).sortByKey()

sortedRdd.saveAsTextFile(outputPath)

---------------------------------------------------

scala> val filePath = "file:///home/orienit/spark/input/demoinput"
filePath: String = file:///home/orienit/spark/input/demoinput

scala> val fileRdd = sc.textFile(filePath, 1)
fileRdd: org.apache.spark.rdd.RDD[String] = file:///home/orienit/spark/input/demoinput MapPartitionsRDD[94] at textFile at <console>:26

scala> fileRdd.collect
res72: Array[String] = Array(I am going, to hyd, I am learning, spark course)

scala> val wordsRdd = fileRdd.flatMap(line => line.split(" "))
wordsRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[95] at flatMap at <console>:28

scala> wordsRdd.collect
res73: Array[String] = Array(I, am, going, to, hyd, I, am, learning, spark, course)

scala> val tupleRdd = wordsRdd.map(word => (word, 1))
tupleRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[96] at map at <console>:30

scala> tupleRdd.collect
res74: Array[(String, Int)] = Array((I,1), (am,1), (going,1), (to,1), (hyd,1), (I,1), (am,1), (learning,1), (spark,1), (course,1))


scala> val wordCountRdd = tupleRdd.reduceByKey((a,b) => a + b)
wordCountRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[97] at reduceByKey at <console>:32

scala> wordCountRdd.collect
res75: Array[(String, Int)] = Array((learning,1), (spark,1), (am,2), (hyd,1), (I,2), (to,1), (going,1), (course,1))

scala> val sortedRdd = wordCountRdd.sortByKey()
sortedRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[98] at sortByKey at <console>:34

scala> sortedRdd.collect
res76: Array[(String, Int)] = Array((I,2), (am,2), (course,1), (going,1), (hyd,1), (learning,1), (spark,1), (to,1))

scala> val sortedRdd = wordCountRdd.sortBy(x => x._1)
sortedRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[101] at sortBy at <console>:34

scala> sortedRdd.collect
res77: Array[(String, Int)] = Array((I,2), (am,2), (course,1), (going,1), (hyd,1), (learning,1), (spark,1), (to,1))


scala> val outputPath = "file:///home/orienit/spark/output/wordcount"
outputPath: String = file:///home/orienit/spark/output/wordcount

scala> sortedRdd.saveAsTextFile(outputPath)


---------------------------------------------------
Spark Url:
--------------
http://localhost:4040/


---------------------------------------------------
val filePath = "file:///home/orienit/spark/input/demoinput"

val outputPath = "file:///home/orienit/spark/output/wordcount"

val fileRdd = sc.textFile(filePath, 1)

val sortedRdd = fileRdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b).sortByKey()

val wordCountRdd = fileRdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b)

wordCountRdd.saveAsTextFile(outputPath)

---------------------------------------------------

scala> wordCountRdd.toDebugString
res82: String =
(1) ShuffledRDD[113] at reduceByKey at <console>:28 []
 +-(1) MapPartitionsRDD[112] at map at <console>:28 []
    |  MapPartitionsRDD[111] at flatMap at <console>:28 []
    |  file:///home/orienit/spark/input/demoinput MapPartitionsRDD[104] at textFile at <console>:26 []
    |  file:///home/orienit/spark/input/demoinput HadoopRDD[103] at textFile at <console>:26 []


scala> sortedRdd.toDebugString
res83: String =
(1) ShuffledRDD[108] at sortByKey at <console>:28 []
 +-(1) ShuffledRDD[107] at reduceByKey at <console>:28 []
    +-(1) MapPartitionsRDD[106] at map at <console>:28 []
       |  MapPartitionsRDD[105] at flatMap at <console>:28 []
       |  file:///home/orienit/spark/input/demoinput MapPartitionsRDD[104] at textFile at <console>:26 []
       |  file:///home/orienit/spark/input/demoinput HadoopRDD[103] at textFile at <console>:26 []


---------------------------------------------------

Refer:
------------------------
http://kalyanbigdatatraining.blogspot.com/2016/10/more-details-about-input-format-and.html

---------------------------------------------------


---------------------------------------------------

---------------------------------------------------


---------------------------------------------------



---------------------------------------------------


---------------------------------------------------

---------------------------------------------------



---------------------------------------------------


---------------------------------------------------

---------------------------------------------------


---------------------------------------------------



---------------------------------------------------


---------------------------------------------------


---------------------------------------------------



---------------------------------------------------


---------------------------------------------------

---------------------------------------------------


---------------------------------------------------



---------------------------------------------------


---------------------------------------------------


---------------------------------------------------



---------------------------------------------------


---------------------------------------------------

---------------------------------------------------


---------------------------------------------------



---------------------------------------------------


---------------------------------------------------

1 comment:

spark_streaming_examples

Create Spark Streaming Context: ========================================== scala: --------------- import org.apache.spark._ import ...