val nums = List(1,2,3,4,5,6)
val chars = List('a', 'b', 'c', 'd', 'e', 'f')
scala> val rdd1 = sc.parallelize(nums)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:29
scala> rdd1.partitions.length
res0: Int = 4
scala> val rdd2 = sc.parallelize(chars)
rdd2: org.apache.spark.rdd.RDD[Char] = ParallelCollectionRDD[1] at parallelize at <console>:29
scala> rdd2.partitions.length
res1: Int = 4
scala> val rdd3 = sc.parallelize(nums, 1)
rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:29
scala> rdd3.partitions.length
res2: Int = 1
-----------------------------------------
RDD :
1. Immutable
2. Cacheable
3. Lazy Evaluation
4. Type Infer
RDD operations:
1. Transformations
old rdd => new rdd
2. Actions
rdd => result
scala> val rdd4 = sc.parallelize(nums, 2)
rdd4: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:29
scala> rdd4.foreach(println)
4
5
1
2
3
6
def myFunc(index : Int, it : Iterator[Int]) : Iterator[String] = {
it.toList.map(data => "[index: " + index + " value: " + data + "]").iterator
}
rdd4.mapPartitionsWithIndex(myFunc).foreach(println)
[index: 0 value: 1]
[index: 0 value: 2]
[index: 0 value: 3]
[index: 1 value: 4]
[index: 1 value: 5]
[index: 1 value: 6]
def myFunc1(index : Int, it : Iterator[Int]) : Iterator[(Int, Int)] = {
it.toList.map(data => (index, data)).iterator
}
def myFunc1(index : Int, it : Iterator[Int]) : Iterator[(Int, Int)] = {
it.toList.map((index, _)).iterator
}
rdd4.mapPartitionsWithIndex(myFunc1).foreach(println)
def myFunc2(index : Int, it : Iterator[Int]) : Iterator[(Int, Int)] = {
it.toList.map((_ , index)).iterator
}
rdd4.mapPartitionsWithIndex(myFunc2).foreach(println)
-----------------------------------------
val intial = 1
def seqOp(a : Int, b : Int) = {
if (a > b ) a else b
}
def comOp(a : Int, b : Int) = {
a + b
}
rdd4.aggregate(intial)(seqOp, comOp)
-----------------------------------------
def max(a : Int, b : Int) = {
if (a > b ) a else b
}
def sum(a : Int, b : Int) = {
a + b
}
def mul(a : Int, b : Int) = {
a * b
}
rdd4.aggregate(1)(max, sum)
rdd4.aggregate(1)(max , mul)
rdd4.aggregate(4)(max, sum)
-----------------------------------------
val nums = List(1,2,3,4,5,6)
def seqOp(a : Int, b : Int) = {
a + b
}
def comOp(a : Int, b : Int) = {
( a + b ) / 2
}
val avgtuple = rdd4.aggregate(0)(seqOp, comOp)
val avg = avgtuple._1 / avgtuple._2
-----------------------------------------
val nums = List(1,2,3,4,5,6)
def seqOp(t : (Int, Int), a : Int) : (Int, Int) = {
(t._1 + a, t._2 + 1)
}
def comOp(t1 : (Int, Int), t2 : (Int, Int)) : (Int, Int) = {
(t1._1 + t2._1, t1._2 + t2._2)
}
val avgtuple = nums.aggregate((0,0))(seqOp, comOp)
val avg = avgtuple._1 / avgtuple._2
-----------------------------------------
val nums = List(1,2,3,4,5,6)
val rdd = sc.parallelize(nums,2)
def seqOp(t : (Int, Int), a : Int) : (Int, Int) = {
( t._1 + a, t._2 + 1)
}
def comOp(t1 : (Int, Int), t2: (Int, Int)) : (Int, Int) = {
( t1._1 + t2._1 , t1._2 + t2._2)
}
val zeroValue = (0, 0)
rdd.aggregate(zeroValue)(seqOp, comOp)
-----------------------------------------
val names = List("anil", "venkat", "raj", "kumar", "anvith")
---------------------------------
n C r => n * (n-1) * (n -2) * ... (n -r + 1) / 1 * 2 * ... r
---------------------------------
val nums = List(1,2,3,4,5,6)
val names = List("anil", "venkat", "raj", "kumar", "anvith")
val zip1 = nums.zip(names)
val zip2 = names.zip(nums)
val rdd1 = sc.parallelize(nums,2)
val rdd2 = sc.parallelize(names,2)
// Note: Can only zip RDDs with same number of elements in each partition
val ziprdd1 = rdd1.zip(rdd2)
ziprdd1.foreach(println)
---------------------------------
val names = List("anil", "venkat", "raj", "kumar", "anvith", "rohit", "kalyan", "sam", "kabali", "raju")
val rdd4 = sc.parallelize(names,2)
val rdd5 = rdd4.map(x => (x.length, x))
def myfunc1(index: Int, it: Iterator[String]) : Iterator[String] = {
it.toList.map(x => "[index: " + index + " value: " + x + "]").iterator
}
def myfunc2(index: Int, it: Iterator[(Int, String)]) : Iterator[String] = {
it.toList.map(x => "[index: " + index + " value: " + x + "]").iterator
}
def myfunc3[T](index: Int, it: Iterator[T]) : Iterator[String] = {
it.toList.map(x => "[index: " + index + " value: " + x + "]").iterator
}
rdd2.mapPartitionsWithIndex(myfunc1)
rdd2.mapPartitionsWithIndex(myfunc3)
rdd5.mapPartitionsWithIndex(myfunc3)
rdd5.mapPartitionsWithIndex(myfunc3)
scala> rdd5.mapPartitionsWithIndex(myfunc3).foreach(println)
[index: 1 value: (5,rohit)]
[index: 1 value: (6,kalyan)]
[index: 1 value: (3,sam)]
[index: 1 value: (6,kabali)]
[index: 1 value: (4,raju)]
[index: 0 value: (4,anil)]
[index: 0 value: (6,venkat)]
[index: 0 value: (3,raj)]
[index: 0 value: (5,kumar)]
[index: 0 value: (6,anvith)]
val zeroValue = "***"
def seqOp (a : String, b: String) : String= {
a + "&&&" + b
}
def comOp (a : String, b: String) : String= {
a + "???" + b
}
rdd5.aggregateByKey(zeroValue)(seqOp, comOp)
scala> rdd5.aggregateByKey(zeroValue)(seqOp, comOp).foreach(println)
(4,***&&&anil???***&&&raju)
(6,***&&&venkat&&&anvith???***&&&kalyan&&&kabali)
(3,***&&&raj???***&&&sam)
(5,***&&&kumar???***&&&rohit)
---------------------------------
def mergefunc(a : String, b: String) : String = {
a + "&&&" + b
}
def comfunc(a : String, b: String) : String = {
a + "???" + b
}
rdd5.combineByKey(_.toString, mergefunc, comfunc).foreach(println)
---------------------------------
def mergefunc(a : List[String], b: String) : List[String] = {
b :: a
}
def comfunc(a : List[String], b: List[String]) : List[String] = {
a ++ b
}
rdd5.combineByKey(List(_), mergefunc, comfunc).foreach(println)
---------------------------------
val names = List("anil", "venkat", "raj", "kumar", "anvith", "rohit", "kalyan", "sam", "kabali", "raju")
val rdd6 = sc.parallelize(names,2)
rdd6.map(x => x).foreach(println)
anil
venkat
raj
kumar
anvith
rohit
kalyan
sam
kabali
raju
rdd6.flatMap(x => x).foreach(println)
a
r
o
h
i
t
k
a
l
y
a
n
s
a
m
k
a
b
a
l
i
r
a
j
u
n
i
l
v
e
n
k
a
t
r
a
j
k
u
m
a
r
a
n
v
i
t
h
---------------------------------
val lines = List("I am going", "to hyd", "I am learning", "spark course")
val rdd7 = sc.parallelize(lines,2)
rdd7.map(x => x).foreach(println)
I am going
to hyd
I am learning
spark course
rdd7.flatMap(x => x).foreach(println)
I
a
m
g
o
i
n
g
t
o
h
y
d
I
a
m
l
e
a
r
n
i
n
g
s
p
a
r
k
c
o
u
r
s
e
rdd7.flatMap(x => x.split(" ")).foreach(println)
I
am
going
to
hyd
I
am
learning
spark
course
rdd7.flatMap(x => x.reverse.split(" ")).foreach(println)
rdd7.flatMap(_.reverse.split(" ")).foreach(println)
def reverseFunc(x : String) : Array[String] = {
x.reverse.split(" ")
}
rdd7.flatMap(reverseFunc).foreach(println)
gniog
gninrael
ma
I
ma
dyh
ot
I
esruoc
kraps
---------------------------------
val names = List("anil", "venkat", "raj", "kumar", "anvith", "rohit", "kalyan", "sam", "kabali", "raju")
val rdd8 = sc.parallelize(names,2)
rdd8.filter(x => x.length > 4).foreach(println)
venkat
kumar
anvith
rohit
kalyan
kabali
rdd8.filterWith(index => index)((x, index) => x.length > 4).foreach(println)
venkat
kumar
anvith
rohit
kalyan
kabali
rdd8.filterWith(index => index)((x, index) => index % 2 == 0 ).foreach(println)
anil
venkat
raj
kumar
anvith
rdd8.filterWith(index => index)((x, index) => ( index % 2 == 0 && x.length > 4 )).foreach(println)
venkat
kumar
anvith
rdd8.filterWith(index => index + 1)((x, index) => ( index % 2 == 0 && x.length > 4 )).foreach(println)
rohit
kalyan
kabali
rdd8.foreach(println)
anil
venkat
raj
kumar
anvith
rohit
kalyan
sam
kabali
raju
rdd8.foreachPartition(it => println(it.toList.max))
sam
venkat
rdd8.take(2).foreach(println)
anil
venkat
rdd8.takeOrdered(2).foreach(println)
anil
anvith
---------------------------------
rdd8.sortBy(x => x).foreach(println)
rdd8.sortBy(x => x, false).foreach(println)
rdd8.sortBy(x => x, true, 1).foreach(println)
rdd8.sortBy(x => x, false, 1).foreach(println)
rdd8.sortBy(x => x.length, true, 1).foreach(println)
rdd8.sortBy(x => x.length, false, 1).foreach(println)
---------------------------------
val names = List("anil", "venkat", "raj", "kumar", "anvith", "rohit", "kalyan", "sam", "kabali", "raju")
val rdd9 = sc.parallelize(names,2)
val rdd10 = rdd9.map(x => (x.length, x))
rdd10.foreach(println)
(5,rohit)
(6,kalyan)
(3,sam)
(6,kabali)
(4,raju)
(4,anil)
(6,venkat)
(3,raj)
(5,kumar)
(6,anvith)
rdd10.sortByKey(true, 1).foreach(println)
(3,raj)
(3,sam)
(4,anil)
(4,raju)
(5,kumar)
(5,rohit)
(6,venkat)
(6,anvith)
(6,kalyan)
(6,kabali)
rdd10.sortBy(x => x._1).foreach(println)
rdd10.sortBy(x => x._2).foreach(println)
rdd10.sortBy(x => x._2.length, true, 1).foreach(println)
val y = sc.parallelize(1 to 10, 10)
val z = y.coalesce(2, false)
z.mapPartitionsWithIndex(myfunc3).foreach(println)
val chars = List('a', 'b', 'c', 'd', 'e', 'f')
scala> val rdd1 = sc.parallelize(nums)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:29
scala> rdd1.partitions.length
res0: Int = 4
scala> val rdd2 = sc.parallelize(chars)
rdd2: org.apache.spark.rdd.RDD[Char] = ParallelCollectionRDD[1] at parallelize at <console>:29
scala> rdd2.partitions.length
res1: Int = 4
scala> val rdd3 = sc.parallelize(nums, 1)
rdd3: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:29
scala> rdd3.partitions.length
res2: Int = 1
-----------------------------------------
RDD :
1. Immutable
2. Cacheable
3. Lazy Evaluation
4. Type Infer
RDD operations:
1. Transformations
old rdd => new rdd
2. Actions
rdd => result
scala> val rdd4 = sc.parallelize(nums, 2)
rdd4: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:29
scala> rdd4.foreach(println)
4
5
1
2
3
6
def myFunc(index : Int, it : Iterator[Int]) : Iterator[String] = {
it.toList.map(data => "[index: " + index + " value: " + data + "]").iterator
}
rdd4.mapPartitionsWithIndex(myFunc).foreach(println)
[index: 0 value: 1]
[index: 0 value: 2]
[index: 0 value: 3]
[index: 1 value: 4]
[index: 1 value: 5]
[index: 1 value: 6]
def myFunc1(index : Int, it : Iterator[Int]) : Iterator[(Int, Int)] = {
it.toList.map(data => (index, data)).iterator
}
def myFunc1(index : Int, it : Iterator[Int]) : Iterator[(Int, Int)] = {
it.toList.map((index, _)).iterator
}
rdd4.mapPartitionsWithIndex(myFunc1).foreach(println)
def myFunc2(index : Int, it : Iterator[Int]) : Iterator[(Int, Int)] = {
it.toList.map((_ , index)).iterator
}
rdd4.mapPartitionsWithIndex(myFunc2).foreach(println)
-----------------------------------------
val intial = 1
def seqOp(a : Int, b : Int) = {
if (a > b ) a else b
}
def comOp(a : Int, b : Int) = {
a + b
}
rdd4.aggregate(intial)(seqOp, comOp)
-----------------------------------------
def max(a : Int, b : Int) = {
if (a > b ) a else b
}
def sum(a : Int, b : Int) = {
a + b
}
def mul(a : Int, b : Int) = {
a * b
}
rdd4.aggregate(1)(max, sum)
rdd4.aggregate(1)(max , mul)
rdd4.aggregate(4)(max, sum)
-----------------------------------------
val nums = List(1,2,3,4,5,6)
def seqOp(a : Int, b : Int) = {
a + b
}
def comOp(a : Int, b : Int) = {
( a + b ) / 2
}
val avgtuple = rdd4.aggregate(0)(seqOp, comOp)
val avg = avgtuple._1 / avgtuple._2
-----------------------------------------
val nums = List(1,2,3,4,5,6)
def seqOp(t : (Int, Int), a : Int) : (Int, Int) = {
(t._1 + a, t._2 + 1)
}
def comOp(t1 : (Int, Int), t2 : (Int, Int)) : (Int, Int) = {
(t1._1 + t2._1, t1._2 + t2._2)
}
val avgtuple = nums.aggregate((0,0))(seqOp, comOp)
val avg = avgtuple._1 / avgtuple._2
-----------------------------------------
val nums = List(1,2,3,4,5,6)
val rdd = sc.parallelize(nums,2)
def seqOp(t : (Int, Int), a : Int) : (Int, Int) = {
( t._1 + a, t._2 + 1)
}
def comOp(t1 : (Int, Int), t2: (Int, Int)) : (Int, Int) = {
( t1._1 + t2._1 , t1._2 + t2._2)
}
val zeroValue = (0, 0)
rdd.aggregate(zeroValue)(seqOp, comOp)
-----------------------------------------
val names = List("anil", "venkat", "raj", "kumar", "anvith")
---------------------------------
n C r => n * (n-1) * (n -2) * ... (n -r + 1) / 1 * 2 * ... r
---------------------------------
val nums = List(1,2,3,4,5,6)
val names = List("anil", "venkat", "raj", "kumar", "anvith")
val zip1 = nums.zip(names)
val zip2 = names.zip(nums)
val rdd1 = sc.parallelize(nums,2)
val rdd2 = sc.parallelize(names,2)
// Note: Can only zip RDDs with same number of elements in each partition
val ziprdd1 = rdd1.zip(rdd2)
ziprdd1.foreach(println)
---------------------------------
val names = List("anil", "venkat", "raj", "kumar", "anvith", "rohit", "kalyan", "sam", "kabali", "raju")
val rdd4 = sc.parallelize(names,2)
val rdd5 = rdd4.map(x => (x.length, x))
def myfunc1(index: Int, it: Iterator[String]) : Iterator[String] = {
it.toList.map(x => "[index: " + index + " value: " + x + "]").iterator
}
def myfunc2(index: Int, it: Iterator[(Int, String)]) : Iterator[String] = {
it.toList.map(x => "[index: " + index + " value: " + x + "]").iterator
}
def myfunc3[T](index: Int, it: Iterator[T]) : Iterator[String] = {
it.toList.map(x => "[index: " + index + " value: " + x + "]").iterator
}
rdd2.mapPartitionsWithIndex(myfunc1)
rdd2.mapPartitionsWithIndex(myfunc3)
rdd5.mapPartitionsWithIndex(myfunc3)
rdd5.mapPartitionsWithIndex(myfunc3)
scala> rdd5.mapPartitionsWithIndex(myfunc3).foreach(println)
[index: 1 value: (5,rohit)]
[index: 1 value: (6,kalyan)]
[index: 1 value: (3,sam)]
[index: 1 value: (6,kabali)]
[index: 1 value: (4,raju)]
[index: 0 value: (4,anil)]
[index: 0 value: (6,venkat)]
[index: 0 value: (3,raj)]
[index: 0 value: (5,kumar)]
[index: 0 value: (6,anvith)]
val zeroValue = "***"
def seqOp (a : String, b: String) : String= {
a + "&&&" + b
}
def comOp (a : String, b: String) : String= {
a + "???" + b
}
rdd5.aggregateByKey(zeroValue)(seqOp, comOp)
scala> rdd5.aggregateByKey(zeroValue)(seqOp, comOp).foreach(println)
(4,***&&&anil???***&&&raju)
(6,***&&&venkat&&&anvith???***&&&kalyan&&&kabali)
(3,***&&&raj???***&&&sam)
(5,***&&&kumar???***&&&rohit)
---------------------------------
def mergefunc(a : String, b: String) : String = {
a + "&&&" + b
}
def comfunc(a : String, b: String) : String = {
a + "???" + b
}
rdd5.combineByKey(_.toString, mergefunc, comfunc).foreach(println)
---------------------------------
def mergefunc(a : List[String], b: String) : List[String] = {
b :: a
}
def comfunc(a : List[String], b: List[String]) : List[String] = {
a ++ b
}
rdd5.combineByKey(List(_), mergefunc, comfunc).foreach(println)
---------------------------------
val names = List("anil", "venkat", "raj", "kumar", "anvith", "rohit", "kalyan", "sam", "kabali", "raju")
val rdd6 = sc.parallelize(names,2)
rdd6.map(x => x).foreach(println)
anil
venkat
raj
kumar
anvith
rohit
kalyan
sam
kabali
raju
rdd6.flatMap(x => x).foreach(println)
a
r
o
h
i
t
k
a
l
y
a
n
s
a
m
k
a
b
a
l
i
r
a
j
u
n
i
l
v
e
n
k
a
t
r
a
j
k
u
m
a
r
a
n
v
i
t
h
---------------------------------
val lines = List("I am going", "to hyd", "I am learning", "spark course")
val rdd7 = sc.parallelize(lines,2)
rdd7.map(x => x).foreach(println)
I am going
to hyd
I am learning
spark course
rdd7.flatMap(x => x).foreach(println)
I
a
m
g
o
i
n
g
t
o
h
y
d
I
a
m
l
e
a
r
n
i
n
g
s
p
a
r
k
c
o
u
r
s
e
rdd7.flatMap(x => x.split(" ")).foreach(println)
I
am
going
to
hyd
I
am
learning
spark
course
rdd7.flatMap(x => x.reverse.split(" ")).foreach(println)
rdd7.flatMap(_.reverse.split(" ")).foreach(println)
def reverseFunc(x : String) : Array[String] = {
x.reverse.split(" ")
}
rdd7.flatMap(reverseFunc).foreach(println)
gniog
gninrael
ma
I
ma
dyh
ot
I
esruoc
kraps
---------------------------------
val names = List("anil", "venkat", "raj", "kumar", "anvith", "rohit", "kalyan", "sam", "kabali", "raju")
val rdd8 = sc.parallelize(names,2)
rdd8.filter(x => x.length > 4).foreach(println)
venkat
kumar
anvith
rohit
kalyan
kabali
rdd8.filterWith(index => index)((x, index) => x.length > 4).foreach(println)
venkat
kumar
anvith
rohit
kalyan
kabali
rdd8.filterWith(index => index)((x, index) => index % 2 == 0 ).foreach(println)
anil
venkat
raj
kumar
anvith
rdd8.filterWith(index => index)((x, index) => ( index % 2 == 0 && x.length > 4 )).foreach(println)
venkat
kumar
anvith
rdd8.filterWith(index => index + 1)((x, index) => ( index % 2 == 0 && x.length > 4 )).foreach(println)
rohit
kalyan
kabali
rdd8.foreach(println)
anil
venkat
raj
kumar
anvith
rohit
kalyan
sam
kabali
raju
rdd8.foreachPartition(it => println(it.toList.max))
sam
venkat
rdd8.take(2).foreach(println)
anil
venkat
rdd8.takeOrdered(2).foreach(println)
anil
anvith
---------------------------------
rdd8.sortBy(x => x).foreach(println)
rdd8.sortBy(x => x, false).foreach(println)
rdd8.sortBy(x => x, true, 1).foreach(println)
rdd8.sortBy(x => x, false, 1).foreach(println)
rdd8.sortBy(x => x.length, true, 1).foreach(println)
rdd8.sortBy(x => x.length, false, 1).foreach(println)
---------------------------------
val names = List("anil", "venkat", "raj", "kumar", "anvith", "rohit", "kalyan", "sam", "kabali", "raju")
val rdd9 = sc.parallelize(names,2)
val rdd10 = rdd9.map(x => (x.length, x))
rdd10.foreach(println)
(5,rohit)
(6,kalyan)
(3,sam)
(6,kabali)
(4,raju)
(4,anil)
(6,venkat)
(3,raj)
(5,kumar)
(6,anvith)
rdd10.sortByKey(true, 1).foreach(println)
(3,raj)
(3,sam)
(4,anil)
(4,raju)
(5,kumar)
(5,rohit)
(6,venkat)
(6,anvith)
(6,kalyan)
(6,kabali)
rdd10.sortBy(x => x._1).foreach(println)
rdd10.sortBy(x => x._2).foreach(println)
rdd10.sortBy(x => x._2.length, true, 1).foreach(println)
val y = sc.parallelize(1 to 10, 10)
val z = y.coalesce(2, false)
z.mapPartitionsWithIndex(myfunc3).foreach(println)
nice blog
ReplyDeleteBig Data and Hadoop Online Training