Monday 23 September 2019

spark_core_rdd_practice

val nums = List(1,2,3,4,5,6)

val chars = List('a', 'b', 'c', 'd', 'e', 'f')

scala> val rdd1 = sc.parallelize(nums)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:29

scala> rdd1.partitions.length
res0: Int = 4

scala> val rdd2 = sc.parallelize(chars)
rdd2: org.apache.spark.rdd.RDD[Char] = ParallelCollectionRDD[1] at parallelize at <console>:29

scala> rdd2.partitions.length
res1: Int = 4

scala> val rdd3 = sc.parallelize(nums, 1)
rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:29

scala> rdd3.partitions.length
res2: Int = 1

-----------------------------------------
RDD :
1. Immutable
2. Cacheable
3. Lazy Evaluation
4. Type Infer

RDD operations:
1. Transformations
old rdd => new rdd
2. Actions
rdd => result





scala> val rdd4 = sc.parallelize(nums, 2)
rdd4: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:29

scala> rdd4.foreach(println)
4
5
1
2
3
6


def myFunc(index : Int, it : Iterator[Int]) : Iterator[String] = {
 it.toList.map(data => "[index: " + index + " value: " + data + "]").iterator
}

rdd4.mapPartitionsWithIndex(myFunc).foreach(println)

[index: 0 value: 1]
[index: 0 value: 2]
[index: 0 value: 3]
[index: 1 value: 4]
[index: 1 value: 5]
[index: 1 value: 6]


def myFunc1(index : Int, it : Iterator[Int]) : Iterator[(Int, Int)] = {
 it.toList.map(data => (index, data)).iterator
}

def myFunc1(index : Int, it : Iterator[Int]) : Iterator[(Int, Int)] = {
 it.toList.map((index, _)).iterator
}

rdd4.mapPartitionsWithIndex(myFunc1).foreach(println)


def myFunc2(index : Int, it : Iterator[Int]) : Iterator[(Int, Int)] = {
 it.toList.map((_ , index)).iterator
}

rdd4.mapPartitionsWithIndex(myFunc2).foreach(println)

-----------------------------------------
val intial = 1
def seqOp(a : Int, b : Int) = {
   if (a > b ) a else b
}

def comOp(a : Int, b : Int) = {
  a + b
}

rdd4.aggregate(intial)(seqOp, comOp)

-----------------------------------------

def max(a : Int, b : Int) = {
   if (a > b ) a else b
}

def sum(a : Int, b : Int) = {
  a + b
}

def mul(a : Int, b : Int) = {
  a * b
}

rdd4.aggregate(1)(max, sum)


rdd4.aggregate(1)(max , mul)


rdd4.aggregate(4)(max, sum)

-----------------------------------------
val nums = List(1,2,3,4,5,6)

def seqOp(a : Int, b : Int) = {
 a + b
}

def comOp(a : Int, b : Int) = {
 ( a + b ) / 2
}

val avgtuple = rdd4.aggregate(0)(seqOp, comOp)

val avg = avgtuple._1 / avgtuple._2


-----------------------------------------

val nums = List(1,2,3,4,5,6)

def seqOp(t : (Int, Int), a : Int) : (Int, Int) = {
 (t._1 + a, t._2 + 1)
}

def comOp(t1 : (Int, Int), t2 : (Int, Int))  : (Int, Int) = {
 (t1._1 + t2._1, t1._2 + t2._2)
}

val avgtuple = nums.aggregate((0,0))(seqOp, comOp)

val avg = avgtuple._1 / avgtuple._2

-----------------------------------------

val nums = List(1,2,3,4,5,6)
val rdd = sc.parallelize(nums,2)


def seqOp(t : (Int, Int), a : Int) : (Int, Int) = {
 ( t._1 + a, t._2 + 1)
}


def comOp(t1 : (Int, Int), t2: (Int, Int)) : (Int, Int) = {
  ( t1._1 +  t2._1 , t1._2 + t2._2)
}

val zeroValue = (0, 0)
rdd.aggregate(zeroValue)(seqOp, comOp)

-----------------------------------------

val names = List("anil", "venkat", "raj", "kumar", "anvith")


---------------------------------

n C r => n * (n-1) * (n -2) * ... (n -r + 1) / 1 * 2 * ... r


---------------------------------

val nums = List(1,2,3,4,5,6)
val names = List("anil", "venkat", "raj", "kumar", "anvith")

val zip1 = nums.zip(names)
val zip2 = names.zip(nums)

val rdd1 = sc.parallelize(nums,2)
val rdd2 = sc.parallelize(names,2)

// Note: Can only zip RDDs with same number of elements in each partition
val ziprdd1 = rdd1.zip(rdd2)
ziprdd1.foreach(println)

---------------------------------

val names = List("anil", "venkat", "raj", "kumar", "anvith", "rohit", "kalyan", "sam", "kabali", "raju")
val rdd4 = sc.parallelize(names,2)

val rdd5 = rdd4.map(x => (x.length, x))


def myfunc1(index: Int, it: Iterator[String]) : Iterator[String] = {
 it.toList.map(x => "[index: " + index + " value: " + x + "]").iterator
}

def myfunc2(index: Int, it: Iterator[(Int, String)]) : Iterator[String] = {
 it.toList.map(x => "[index: " + index + " value: " + x + "]").iterator
}

def myfunc3[T](index: Int, it: Iterator[T]) : Iterator[String] = {
 it.toList.map(x => "[index: " + index + " value: " + x + "]").iterator
}


rdd2.mapPartitionsWithIndex(myfunc1)
rdd2.mapPartitionsWithIndex(myfunc3)


rdd5.mapPartitionsWithIndex(myfunc3)
rdd5.mapPartitionsWithIndex(myfunc3)

scala> rdd5.mapPartitionsWithIndex(myfunc3).foreach(println)
[index: 1 value: (5,rohit)]
[index: 1 value: (6,kalyan)]
[index: 1 value: (3,sam)]
[index: 1 value: (6,kabali)]
[index: 1 value: (4,raju)]
[index: 0 value: (4,anil)]
[index: 0 value: (6,venkat)]
[index: 0 value: (3,raj)]
[index: 0 value: (5,kumar)]
[index: 0 value: (6,anvith)]



val zeroValue = "***"

def seqOp (a : String, b: String) : String= {
 a + "&&&" + b
}

def comOp (a : String, b: String) : String= {
 a + "???" + b
}

rdd5.aggregateByKey(zeroValue)(seqOp, comOp)

scala> rdd5.aggregateByKey(zeroValue)(seqOp, comOp).foreach(println)
(4,***&&&anil???***&&&raju)
(6,***&&&venkat&&&anvith???***&&&kalyan&&&kabali)
(3,***&&&raj???***&&&sam)
(5,***&&&kumar???***&&&rohit)


---------------------------------

def mergefunc(a : String, b: String) : String = {
  a + "&&&" + b
}

def comfunc(a : String, b: String) : String = {
  a + "???" + b
}

rdd5.combineByKey(_.toString, mergefunc, comfunc).foreach(println)

---------------------------------

def mergefunc(a : List[String], b: String) : List[String] = {
  b :: a
}

def comfunc(a : List[String], b: List[String]) : List[String] = {
  a ++ b
}

rdd5.combineByKey(List(_), mergefunc, comfunc).foreach(println)

---------------------------------

val names = List("anil", "venkat", "raj", "kumar", "anvith", "rohit", "kalyan", "sam", "kabali", "raju")
val rdd6 = sc.parallelize(names,2)

rdd6.map(x => x).foreach(println)
anil
venkat
raj
kumar
anvith
rohit
kalyan
sam
kabali
raju

rdd6.flatMap(x => x).foreach(println)
a
r
o
h
i
t
k
a
l
y
a
n
s
a
m
k
a
b
a
l
i
r
a
j
u
n
i
l
v
e
n
k
a
t
r
a
j
k
u
m
a
r
a
n
v
i
t
h

---------------------------------
val lines = List("I am going", "to hyd", "I am learning", "spark course")
val rdd7 = sc.parallelize(lines,2)

rdd7.map(x => x).foreach(println)
I am going
to hyd
I am learning
spark course

rdd7.flatMap(x => x).foreach(println)
I

a
m

g
o
i
n
g
t
o

h
y
d
I

a
m

l
e
a
r
n
i
n
g
s
p
a
r
k

c
o
u
r
s
e


rdd7.flatMap(x => x.split(" ")).foreach(println)
I
am
going
to
hyd
I
am
learning
spark
course

rdd7.flatMap(x => x.reverse.split(" ")).foreach(println)
rdd7.flatMap(_.reverse.split(" ")).foreach(println)

def reverseFunc(x : String) : Array[String] = {
 x.reverse.split(" ")
}
rdd7.flatMap(reverseFunc).foreach(println)
gniog
gninrael
ma
I
ma
dyh
ot
I
esruoc
kraps

---------------------------------

val names = List("anil", "venkat", "raj", "kumar", "anvith", "rohit", "kalyan", "sam", "kabali", "raju")
val rdd8 = sc.parallelize(names,2)

rdd8.filter(x => x.length > 4).foreach(println)
venkat
kumar
anvith
rohit
kalyan
kabali

rdd8.filterWith(index => index)((x, index) => x.length > 4).foreach(println)
venkat
kumar
anvith
rohit
kalyan
kabali


rdd8.filterWith(index => index)((x, index) => index % 2 == 0 ).foreach(println)
anil
venkat
raj
kumar
anvith

rdd8.filterWith(index => index)((x, index) => ( index % 2 == 0 && x.length > 4 )).foreach(println)
venkat
kumar
anvith

rdd8.filterWith(index => index + 1)((x, index) => ( index % 2 == 0 && x.length > 4 )).foreach(println)
rohit
kalyan
kabali

rdd8.foreach(println)
anil
venkat
raj
kumar
anvith
rohit
kalyan
sam
kabali
raju

rdd8.foreachPartition(it => println(it.toList.max))
sam
venkat


rdd8.take(2).foreach(println)
anil
venkat

rdd8.takeOrdered(2).foreach(println)
anil
anvith

---------------------------------
rdd8.sortBy(x => x).foreach(println)

rdd8.sortBy(x => x, false).foreach(println)

rdd8.sortBy(x => x, true, 1).foreach(println)

rdd8.sortBy(x => x, false, 1).foreach(println)

rdd8.sortBy(x => x.length, true, 1).foreach(println)

rdd8.sortBy(x => x.length, false, 1).foreach(println)


---------------------------------

val names = List("anil", "venkat", "raj", "kumar", "anvith", "rohit", "kalyan", "sam", "kabali", "raju")
val rdd9 = sc.parallelize(names,2)

val rdd10 = rdd9.map(x => (x.length, x))

rdd10.foreach(println)
(5,rohit)
(6,kalyan)
(3,sam)
(6,kabali)
(4,raju)
(4,anil)
(6,venkat)
(3,raj)
(5,kumar)
(6,anvith)


rdd10.sortByKey(true, 1).foreach(println)
(3,raj)
(3,sam)
(4,anil)
(4,raju)
(5,kumar)
(5,rohit)
(6,venkat)
(6,anvith)
(6,kalyan)
(6,kabali)

rdd10.sortBy(x => x._1).foreach(println)

rdd10.sortBy(x => x._2).foreach(println)

rdd10.sortBy(x => x._2.length, true, 1).foreach(println)



val y = sc.parallelize(1 to 10, 10)
val z = y.coalesce(2, false)
z.mapPartitionsWithIndex(myfunc3).foreach(println)






























1 comment:

spark_streaming_examples

Create Spark Streaming Context: ========================================== scala: --------------- import org.apache.spark._ import ...