------------------------------------------------------------
Installation Steps
------------------------------------------------------------
1. Install Java, Python, R
sudo apt-get install openjdk-7-jdk
sudo apt-get install python
sudo apt-get install r-base
2. Install Hadoop-2.6.0, Hive-1.2.1
3. Install scala-2.11.7
Download the file from below link
http://www.scala-lang.org/files/archive/scala-2.11.7.tgz
copy the tar file into work dir & extract it
update the ~/.bashrc file with SCALA_HOME path
4. Install spark-1.6.0
Download the file from below link
https://spark.apache.org/downloads.html
copy the tar file into work dir & extract it
update the ~/.bashrc file with SPARK_HOME path
5. reopen the terminal to reflect the changes
------------------------------------------------------------
Spark Command Line Examples
------------------------------------------------------------
1. "scala" is the command to start the "scala"
"python" is the command to start the "python"
"R" is the command to start the "R"
Verify the all are installed or not
2. spark-shell, pyspark, sparkR, spark-sql these are the main commands to work with spark
execute "spark-shell" command, it will move to "scala" prompt
execute "pyspark" command, it will move to "python" prompt
execute "sparkR" command, it will move to "R" prompt
==========================================================
Examples on Spark using Scala
==========================================================
val nums = sc.parallelize(List(1,2,3,4,5,6), 2)
val chars = sc.parallelize(List("a","b","c","d","e","f"),2)
nums.foreach(println)
chars.foreach(println)
def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
iter.toList.map(x => "[index:" + index + ", val: " + x + "]").iterator
}
nums.mapPartitionsWithIndex(myfunc).collect
chars.mapPartitionsWithIndex(myfunc).collect
------------------------------------------------
def seqOp(a : Int,b : Int) = {math.max(a,b)}
def comOp(a : Int,b : Int) = {a * b}
nums.aggregate(5)(seqOp, comOp)
nums.aggregate(1)(math.max(_, _), _ + _)
reduce of partition 0 will be max(1, 1, 2, 3) = 3
reduce of partition 1 will be max(1, 4, 5, 6) = 6
final reduce across partitions will be 1 + 3 + 6 = 10
nums.aggregate(5)(math.max(_, _), _ + _)
reduce of partition 0 will be max(5, 1, 2, 3) = 5
reduce of partition 1 will be max(5, 4, 5, 6) = 6
final reduce across partitions will be 5 + 5 + 6 = 16
nums.aggregate(5)(math.max(_, _), _ * _)
reduce of partition 0 will be max(5, 1, 2, 3) = 5
reduce of partition 1 will be max(5, 4, 5, 6) = 6
final reduce across partitions will be 5 * 5 * 6 = 150
chars.aggregate("")(_ + _, _+_)
res: String = defabc
chars.aggregate("x")(_ + _, _+_)
res: String = xxabcxdef
------------------------------------------------
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
def myfunc(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.toList.map(x => "[index:" + index + ", val: " + x + "]").iterator
}
pairRDD.mapPartitionsWithIndex(myfunc).collect
pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
res: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
res: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))
------------------------------------------------
val x = sc.parallelize(List(1,2,3,4,5))
val y = sc.parallelize(List(6,7,8,9,10))
x.cartesian(y).collect
res: Array[(Int, Int)] = Array((1,6), (1,7), (1,8), (1,9), (1,10), (2,6), (2,7), (2,8), (2,9), (2,10), (3,6), (3,7), (3,8), (3,9), (3,10), (4,6), (5,6), (4,7), (5,7), (4,8), (5,8), (4,9), (4,10), (5,9), (5,10))
------------------------------------------------
val y = sc.parallelize(1 to 10, 10)
y.partitions.length
res: Int = 10
val z = y.coalesce(2, false)
z.partitions.length
res: Int = 2
------------------------------------------------
val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.map((_, "b"))
val c = a.map((_, "c"))
val d = a.map((_, "d"))
b.cogroup(c).collect
res: Array[(Int, (Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(b, b),CompactBuffer(c, c))), (3,(CompactBuffer(b),CompactBuffer(c))), (2,(CompactBuffer(b),CompactBuffer(c))))
b.cogroup(c, d).collect
res: Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(b, b),CompactBuffer(c, c),CompactBuffer(d, d))), (3,(CompactBuffer(b),CompactBuffer(c),CompactBuffer(d))), (2,(CompactBuffer(b),CompactBuffer(c),CompactBuffer(d))))
------------------------------------------------
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.collect
res: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)
------------------------------------------------
val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.zip(a)
b.collect
res: Array[(Int, Int)] = Array((1,1), (2,2), (1,1), (3,3))
b.collectAsMap
res: scala.collection.Map[Int,Int] = Map(2 -> 2, 1 -> 1, 3 -> 3)
------------------------------------------------
val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val c = b.zip(a)
c.collect
res: Array[(Int, String)] = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee))
val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)
d.collect
res: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf)))
------------------------------------------------
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
c.count
res: Long = 4
------------------------------------------------
val c = sc.parallelize(List((3, "Gnu"), (3, "Yak"), (5, "Mouse"), (3, "Dog")), 2)
c.countByKey
res: scala.collection.Map[Int,Long] = Map(3 -> 3, 5 -> 1)
------------------------------------------------
val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1))
b.countByValue
res: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 6, 6 -> 1, 2 -> 3, 7 -> 1, 3 -> 1, 8 -> 1, 4 -> 2)
------------------------------------------------
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.distinct.collect
res: Array[String] = Array(Dog, Gnu, Cat, Rat)
val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
a.distinct(2).partitions.length
res: Int = 2
a.distinct(3).partitions.length
res: Int = 3
------------------------------------------------
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
c.first
res: String = Gnu
------------------------------------------------
val a = sc.parallelize(1 to 10, 3)
val b = a.filter(_ % 2 == 0)
b.collect
res: Array[Int] = Array(2, 4, 6, 8, 10)
------------------------------------------------
val randRDD = sc.parallelize(List( (2,"cat"), (6, "mouse"),(7, "cup"), (3, "book"), (4, "tv"), (1, "screen"), (5, "heater")), 3)
val sortedRDD = randRDD.sortByKey()
sortedRDD.collect
res: Array[(Int, String)] = Array((1,screen), (2,cat), (3,book), (4,tv), (5,heater), (6,mouse), (7,cup))
sortedRDD.filterByRange(1, 3).collect
res: Array[(Int, String)] = Array((1,screen), (2,cat), (3,book))
------------------------------------------------
val a = sc.parallelize(1 to 9, 3)
val b = a.filterWith(i => i)((x,i) => x % 2 == 0 || i % 2 == 0)
b.collect
res: Array[Int] = Array(1, 2, 3, 4, 6, 7, 8, 9)
val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 5)
a.filterWith(x=> x)((a, b) => b == 0).collect
res: Array[Int] = Array(1, 2)
a.filterWith(x=> x)((a, b) => a % (b+1) == 0).collect
res: Array[Int] = Array(1, 2, 4, 6, 8, 10)
a.filterWith(x=> x.toString)((a, b) => b == "2").collect
res: Array[Int] = Array(5, 6)
------------------------------------------------
val a = sc.parallelize(1 to 10, 5)
a.flatMap(1 to _).collect
res: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
sc.parallelize(List(1, 2, 3), 2).flatMap(x => List(x, x, x)).collect
res: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)
// The program below generates a random number of copies (up to 10) of the items in the list.
val x = sc.parallelize(1 to 10, 3)
x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect
res: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)
------------------------------------------------
val c = sc.parallelize(List("cat", "dog", "tiger", "lion", "gnu", "crocodile", "ant", "whale", "dolphin", "spider"), 3)
c.foreach(x => println(x + "s are yummy"))
lions are yummy
gnus are yummy
crocodiles are yummy
ants are yummy
whales are yummy
dolphins are yummy
spiders are yummy
------------------------------------------------
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
b.collect
res: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))
val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val d = c.keyBy(_.length)
d.collect
res: Array[(Int, String)] = Array((3,dog), (3,cat), (3,gnu), (6,salmon), (6,rabbit), (6,turkey), (4,wolf), (4,bear), (3,bee))
b.join(d).collect
res: Array[(Int, (String, String))] = Array((6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)), (3,(rat,dog)), (3,(rat,cat)), (3,(rat,gnu)), (3,(rat,bee)))
------------------------------------------------
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.keys.collect
res: Array[Int] = Array(3, 5, 4, 3, 7, 5)
------------------------------------------------
val y = sc.parallelize(10 to 30)
y.max
res: Int = 30
val a = sc.parallelize(List((10, "dog"), (3, "tiger"), (9, "lion"), (18, "cat")))
a.min
res: (Int, String) = (3,tiger)
------------------------------------------------
val a = sc.parallelize(1 to 9, 3)
a.pipe("head -n 1").collect
res: Array[String] = Array(1, 4, 7)
------------------------------------------------
val a = sc.parallelize(1 to 10, 3)
a.reduce(_ + _)
res: Int = 55
------------------------------------------------
val a = sc.parallelize(1 to 10000, 3)
a.saveAsTextFile("mydata_a")
import org.apache.hadoop.io.compress.GzipCodec
a.saveAsTextFile("mydata_b", classOf[GzipCodec])
val x = sc.textFile("mydata_b")
x.count
res: Long = 10000
val x = sc.parallelize(List(1,2,3,4,5,6,6,7,9,8,10,21), 3)
x.saveAsTextFile("hdfs://localhost:8020/user/hadoop/test");
val sp = sc.textFile("hdfs://localhost:8020/user/hadoop/sp_data")
sp.flatMap(_.split(" ")).saveAsTextFile("hdfs://localhost:8020/user/hadoop/sp_x")
val v = sc.parallelize(Array(("owl",3), ("gnu",4), ("dog",1), ("cat",2), ("ant",5)), 2)
v.saveAsSequenceFile("hd_seq_file")
val x = sc.parallelize(1 to 100, 3)
x.saveAsObjectFile("objFile")
val y = sc.objectFile[Array[Int]]("objFile")
y.collect
res: Array[Int] = Array(67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33)
------------------------------------------------
val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2)
x.stats
res: org.apache.spark.util.StatCounter = (count: 9, mean: 11.266667, stdev: 8.126859, max: 21.000000, min: 1.000000)
------------------------------------------------
val y = sc.parallelize(Array(5, 7, 1, 3, 2, 1))
y.sortBy(c => c, true).collect
res: Array[Int] = Array(1, 1, 2, 3, 5, 7)
y.sortBy(c => c, false).collect
res: Array[Int] = Array(7, 5, 3, 2, 1, 1)
val z = sc.parallelize(Array(("H", 10), ("A", 26), ("Z", 1), ("L", 5)))
z.sortBy(c => c._1, true).collect
res: Array[(String, Int)] = Array((A,26), (H,10), (L,5), (Z,1))
z.sortBy(c => c._2, true).collect
res: Array[(String, Int)] = Array((Z,1), (L,5), (H,10), (A,26))
------------------------------------------------
val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = sc.parallelize(1 to a.count.toInt, 2)
val c = a.zip(b)
c.sortByKey(true).collect
res: Array[(String, Int)] = Array((ant,5), (cat,2), (dog,1), (gnu,4), (owl,3))
c.sortByKey(false).collect
res: Array[(String, Int)] = Array((owl,3), (gnu,4), (dog,1), (cat,2), (ant,5))
val a = sc.parallelize(1 to 100, 5)
val b = a.cartesian(a)
val c = sc.parallelize(b.takeSample(true, 5, 13), 2)
val d = c.sortByKey(false)
res: Array[(Int, Int)] = Array((96,9), (84,76), (59,59), (53,65), (52,4))
------------------------------------------------
val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2)
x.sum
res: Double = 101.39999999999999
------------------------------------------------
val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2)
b.take(2)
res: Array[String] = Array(dog, cat)
val b = sc.parallelize(1 to 10000, 5000)
b.take(100)
res: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)
------------------------------------------------
val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2)
b.takeOrdered(2)
res: Array[String] = Array(ape, cat)
------------------------------------------------
val a = sc.parallelize(1 to 9, 3)
val b = sc.parallelize(1 to 3, 3)
val c = a.subtract(b)
c.toDebugString
res156: String =
(3) MapPartitionsRDD[146] at subtract at <console>:26 []
| SubtractedRDD[145] at subtract at <console>:26 []
+-(3) MapPartitionsRDD[143] at subtract at <console>:26 []
| | ParallelCollectionRDD[141] at parallelize at <console>:22 []
+-(3) MapPartitionsRDD[144] at subtract at <console>:26 []
| ParallelCollectionRDD[142] at parallelize at <console>:22 []
------------------------------------------------
val c = sc.parallelize(Array(6, 9, 4, 7, 5, 8), 2)
c.top(2)
res: Array[Int] = Array(9, 8)
------------------------------------------------
val a = sc.parallelize(1 to 3, 1)
val b = sc.parallelize(5 to 7, 1)
(a ++ b).collect
res: Array[Int] = Array(1, 2, 3, 5, 6, 7)
------------------------------------------------
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.values.collect
res: Array[String] = Array(dog, tiger, lion, cat, panther, eagle)
------------------------------------------------
val a = sc.parallelize(List(9.1, 1.0, 1.2, 2.1, 1.3, 5.0, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 10.0, 8.9, 5.5), 3)
a.variance
res: Double = 10.605333333333332
val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2)
x.variance
res: Double = 66.04584444444443
x.sampleVariance
res: Double = 74.30157499999999
Installation Steps
------------------------------------------------------------
1. Install Java, Python, R
sudo apt-get install openjdk-7-jdk
sudo apt-get install python
sudo apt-get install r-base
2. Install Hadoop-2.6.0, Hive-1.2.1
3. Install scala-2.11.7
Download the file from below link
http://www.scala-lang.org/files/archive/scala-2.11.7.tgz
copy the tar file into work dir & extract it
update the ~/.bashrc file with SCALA_HOME path
4. Install spark-1.6.0
Download the file from below link
https://spark.apache.org/downloads.html
copy the tar file into work dir & extract it
update the ~/.bashrc file with SPARK_HOME path
5. reopen the terminal to reflect the changes
------------------------------------------------------------
Spark Command Line Examples
------------------------------------------------------------
1. "scala" is the command to start the "scala"
"python" is the command to start the "python"
"R" is the command to start the "R"
Verify the all are installed or not
2. spark-shell, pyspark, sparkR, spark-sql these are the main commands to work with spark
execute "spark-shell" command, it will move to "scala" prompt
execute "pyspark" command, it will move to "python" prompt
execute "sparkR" command, it will move to "R" prompt
==========================================================
Examples on Spark using Scala
==========================================================
val nums = sc.parallelize(List(1,2,3,4,5,6), 2)
val chars = sc.parallelize(List("a","b","c","d","e","f"),2)
nums.foreach(println)
chars.foreach(println)
def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
iter.toList.map(x => "[index:" + index + ", val: " + x + "]").iterator
}
nums.mapPartitionsWithIndex(myfunc).collect
chars.mapPartitionsWithIndex(myfunc).collect
------------------------------------------------
def seqOp(a : Int,b : Int) = {math.max(a,b)}
def comOp(a : Int,b : Int) = {a * b}
nums.aggregate(5)(seqOp, comOp)
nums.aggregate(1)(math.max(_, _), _ + _)
reduce of partition 0 will be max(1, 1, 2, 3) = 3
reduce of partition 1 will be max(1, 4, 5, 6) = 6
final reduce across partitions will be 1 + 3 + 6 = 10
nums.aggregate(5)(math.max(_, _), _ + _)
reduce of partition 0 will be max(5, 1, 2, 3) = 5
reduce of partition 1 will be max(5, 4, 5, 6) = 6
final reduce across partitions will be 5 + 5 + 6 = 16
nums.aggregate(5)(math.max(_, _), _ * _)
reduce of partition 0 will be max(5, 1, 2, 3) = 5
reduce of partition 1 will be max(5, 4, 5, 6) = 6
final reduce across partitions will be 5 * 5 * 6 = 150
chars.aggregate("")(_ + _, _+_)
res: String = defabc
chars.aggregate("x")(_ + _, _+_)
res: String = xxabcxdef
------------------------------------------------
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
def myfunc(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.toList.map(x => "[index:" + index + ", val: " + x + "]").iterator
}
pairRDD.mapPartitionsWithIndex(myfunc).collect
pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
res: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
res: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))
------------------------------------------------
val x = sc.parallelize(List(1,2,3,4,5))
val y = sc.parallelize(List(6,7,8,9,10))
x.cartesian(y).collect
res: Array[(Int, Int)] = Array((1,6), (1,7), (1,8), (1,9), (1,10), (2,6), (2,7), (2,8), (2,9), (2,10), (3,6), (3,7), (3,8), (3,9), (3,10), (4,6), (5,6), (4,7), (5,7), (4,8), (5,8), (4,9), (4,10), (5,9), (5,10))
------------------------------------------------
val y = sc.parallelize(1 to 10, 10)
y.partitions.length
res: Int = 10
val z = y.coalesce(2, false)
z.partitions.length
res: Int = 2
------------------------------------------------
val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.map((_, "b"))
val c = a.map((_, "c"))
val d = a.map((_, "d"))
b.cogroup(c).collect
res: Array[(Int, (Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(b, b),CompactBuffer(c, c))), (3,(CompactBuffer(b),CompactBuffer(c))), (2,(CompactBuffer(b),CompactBuffer(c))))
b.cogroup(c, d).collect
res: Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(b, b),CompactBuffer(c, c),CompactBuffer(d, d))), (3,(CompactBuffer(b),CompactBuffer(c),CompactBuffer(d))), (2,(CompactBuffer(b),CompactBuffer(c),CompactBuffer(d))))
------------------------------------------------
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.collect
res: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)
------------------------------------------------
val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.zip(a)
b.collect
res: Array[(Int, Int)] = Array((1,1), (2,2), (1,1), (3,3))
b.collectAsMap
res: scala.collection.Map[Int,Int] = Map(2 -> 2, 1 -> 1, 3 -> 3)
------------------------------------------------
val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val c = b.zip(a)
c.collect
res: Array[(Int, String)] = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee))
val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)
d.collect
res: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf)))
------------------------------------------------
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
c.count
res: Long = 4
------------------------------------------------
val c = sc.parallelize(List((3, "Gnu"), (3, "Yak"), (5, "Mouse"), (3, "Dog")), 2)
c.countByKey
res: scala.collection.Map[Int,Long] = Map(3 -> 3, 5 -> 1)
------------------------------------------------
val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1))
b.countByValue
res: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 6, 6 -> 1, 2 -> 3, 7 -> 1, 3 -> 1, 8 -> 1, 4 -> 2)
------------------------------------------------
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
c.distinct.collect
res: Array[String] = Array(Dog, Gnu, Cat, Rat)
val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
a.distinct(2).partitions.length
res: Int = 2
a.distinct(3).partitions.length
res: Int = 3
------------------------------------------------
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
c.first
res: String = Gnu
------------------------------------------------
val a = sc.parallelize(1 to 10, 3)
val b = a.filter(_ % 2 == 0)
b.collect
res: Array[Int] = Array(2, 4, 6, 8, 10)
------------------------------------------------
val randRDD = sc.parallelize(List( (2,"cat"), (6, "mouse"),(7, "cup"), (3, "book"), (4, "tv"), (1, "screen"), (5, "heater")), 3)
val sortedRDD = randRDD.sortByKey()
sortedRDD.collect
res: Array[(Int, String)] = Array((1,screen), (2,cat), (3,book), (4,tv), (5,heater), (6,mouse), (7,cup))
sortedRDD.filterByRange(1, 3).collect
res: Array[(Int, String)] = Array((1,screen), (2,cat), (3,book))
------------------------------------------------
val a = sc.parallelize(1 to 9, 3)
val b = a.filterWith(i => i)((x,i) => x % 2 == 0 || i % 2 == 0)
b.collect
res: Array[Int] = Array(1, 2, 3, 4, 6, 7, 8, 9)
val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 5)
a.filterWith(x=> x)((a, b) => b == 0).collect
res: Array[Int] = Array(1, 2)
a.filterWith(x=> x)((a, b) => a % (b+1) == 0).collect
res: Array[Int] = Array(1, 2, 4, 6, 8, 10)
a.filterWith(x=> x.toString)((a, b) => b == "2").collect
res: Array[Int] = Array(5, 6)
------------------------------------------------
val a = sc.parallelize(1 to 10, 5)
a.flatMap(1 to _).collect
res: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
sc.parallelize(List(1, 2, 3), 2).flatMap(x => List(x, x, x)).collect
res: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)
// The program below generates a random number of copies (up to 10) of the items in the list.
val x = sc.parallelize(1 to 10, 3)
x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect
res: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)
------------------------------------------------
val c = sc.parallelize(List("cat", "dog", "tiger", "lion", "gnu", "crocodile", "ant", "whale", "dolphin", "spider"), 3)
c.foreach(x => println(x + "s are yummy"))
lions are yummy
gnus are yummy
crocodiles are yummy
ants are yummy
whales are yummy
dolphins are yummy
spiders are yummy
------------------------------------------------
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
b.collect
res: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))
val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val d = c.keyBy(_.length)
d.collect
res: Array[(Int, String)] = Array((3,dog), (3,cat), (3,gnu), (6,salmon), (6,rabbit), (6,turkey), (4,wolf), (4,bear), (3,bee))
b.join(d).collect
res: Array[(Int, (String, String))] = Array((6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)), (3,(rat,dog)), (3,(rat,cat)), (3,(rat,gnu)), (3,(rat,bee)))
------------------------------------------------
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.keys.collect
res: Array[Int] = Array(3, 5, 4, 3, 7, 5)
------------------------------------------------
val y = sc.parallelize(10 to 30)
y.max
res: Int = 30
val a = sc.parallelize(List((10, "dog"), (3, "tiger"), (9, "lion"), (18, "cat")))
a.min
res: (Int, String) = (3,tiger)
------------------------------------------------
val a = sc.parallelize(1 to 9, 3)
a.pipe("head -n 1").collect
res: Array[String] = Array(1, 4, 7)
------------------------------------------------
val a = sc.parallelize(1 to 10, 3)
a.reduce(_ + _)
res: Int = 55
------------------------------------------------
val a = sc.parallelize(1 to 10000, 3)
a.saveAsTextFile("mydata_a")
import org.apache.hadoop.io.compress.GzipCodec
a.saveAsTextFile("mydata_b", classOf[GzipCodec])
val x = sc.textFile("mydata_b")
x.count
res: Long = 10000
val x = sc.parallelize(List(1,2,3,4,5,6,6,7,9,8,10,21), 3)
x.saveAsTextFile("hdfs://localhost:8020/user/hadoop/test");
val sp = sc.textFile("hdfs://localhost:8020/user/hadoop/sp_data")
sp.flatMap(_.split(" ")).saveAsTextFile("hdfs://localhost:8020/user/hadoop/sp_x")
val v = sc.parallelize(Array(("owl",3), ("gnu",4), ("dog",1), ("cat",2), ("ant",5)), 2)
v.saveAsSequenceFile("hd_seq_file")
val x = sc.parallelize(1 to 100, 3)
x.saveAsObjectFile("objFile")
val y = sc.objectFile[Array[Int]]("objFile")
y.collect
res: Array[Int] = Array(67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33)
------------------------------------------------
val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2)
x.stats
res: org.apache.spark.util.StatCounter = (count: 9, mean: 11.266667, stdev: 8.126859, max: 21.000000, min: 1.000000)
------------------------------------------------
val y = sc.parallelize(Array(5, 7, 1, 3, 2, 1))
y.sortBy(c => c, true).collect
res: Array[Int] = Array(1, 1, 2, 3, 5, 7)
y.sortBy(c => c, false).collect
res: Array[Int] = Array(7, 5, 3, 2, 1, 1)
val z = sc.parallelize(Array(("H", 10), ("A", 26), ("Z", 1), ("L", 5)))
z.sortBy(c => c._1, true).collect
res: Array[(String, Int)] = Array((A,26), (H,10), (L,5), (Z,1))
z.sortBy(c => c._2, true).collect
res: Array[(String, Int)] = Array((Z,1), (L,5), (H,10), (A,26))
------------------------------------------------
val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = sc.parallelize(1 to a.count.toInt, 2)
val c = a.zip(b)
c.sortByKey(true).collect
res: Array[(String, Int)] = Array((ant,5), (cat,2), (dog,1), (gnu,4), (owl,3))
c.sortByKey(false).collect
res: Array[(String, Int)] = Array((owl,3), (gnu,4), (dog,1), (cat,2), (ant,5))
val a = sc.parallelize(1 to 100, 5)
val b = a.cartesian(a)
val c = sc.parallelize(b.takeSample(true, 5, 13), 2)
val d = c.sortByKey(false)
res: Array[(Int, Int)] = Array((96,9), (84,76), (59,59), (53,65), (52,4))
------------------------------------------------
val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2)
x.sum
res: Double = 101.39999999999999
------------------------------------------------
val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2)
b.take(2)
res: Array[String] = Array(dog, cat)
val b = sc.parallelize(1 to 10000, 5000)
b.take(100)
res: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)
------------------------------------------------
val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2)
b.takeOrdered(2)
res: Array[String] = Array(ape, cat)
------------------------------------------------
val a = sc.parallelize(1 to 9, 3)
val b = sc.parallelize(1 to 3, 3)
val c = a.subtract(b)
c.toDebugString
res156: String =
(3) MapPartitionsRDD[146] at subtract at <console>:26 []
| SubtractedRDD[145] at subtract at <console>:26 []
+-(3) MapPartitionsRDD[143] at subtract at <console>:26 []
| | ParallelCollectionRDD[141] at parallelize at <console>:22 []
+-(3) MapPartitionsRDD[144] at subtract at <console>:26 []
| ParallelCollectionRDD[142] at parallelize at <console>:22 []
------------------------------------------------
val c = sc.parallelize(Array(6, 9, 4, 7, 5, 8), 2)
c.top(2)
res: Array[Int] = Array(9, 8)
------------------------------------------------
val a = sc.parallelize(1 to 3, 1)
val b = sc.parallelize(5 to 7, 1)
(a ++ b).collect
res: Array[Int] = Array(1, 2, 3, 5, 6, 7)
------------------------------------------------
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.values.collect
res: Array[String] = Array(dog, tiger, lion, cat, panther, eagle)
------------------------------------------------
val a = sc.parallelize(List(9.1, 1.0, 1.2, 2.1, 1.3, 5.0, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 10.0, 8.9, 5.5), 3)
a.variance
res: Double = 10.605333333333332
val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2)
x.variance
res: Double = 66.04584444444443
x.sampleVariance
res: Double = 74.30157499999999
No comments:
Post a Comment