Installation Steps
1. Install Java, Python, R
sudo apt-get install openjdk-7-jdk
sudo apt-get install python
sudo apt-get install r-base
2. Install Hadoop-2.6.0, Hive-1.2.1
3. Install scala-2.11.7
Download the file from below link
copy the tar file into work dir & extract it
update the ~/.bashrc file with SCALA_HOME path
4. Install spark-1.6.0
Download the file from below link
copy the tar file into work dir & extract it
update the ~/.bashrc file with SPARK_HOME path
5. reopen the terminal to reflect the changes
Spark Command Line Examples
1. "scala" is the command to start the "scala"
"python" is the command to start the "python"
"R" is the command to start the "R"
Verify the all are installed or not
2. spark-shell, pyspark, sparkR, spark-sql these are the main commands to work with spark
execute "spark-shell" command, it will move to "scala" prompt
execute "pyspark" command, it will move to "python" prompt
execute "sparkR" command, it will move to "R" prompt
Examples on Spark using Scala
val nums = sc.parallelize(List(1,2,3,4,5,6), 2)
val chars = sc.parallelize(List("a","b","c","d","e","f"),2)
def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
iter.toList.map(x => "[index:" + index + ", val: " + x + "]").iterator
def seqOp(a : Int,b : Int) = {math.max(a,b)}
def comOp(a : Int,b : Int) = {a * b}
nums.aggregate(5)(seqOp, comOp)
nums.aggregate(1)(math.max(_, _), _ + _)
reduce of partition 0 will be max(1, 1, 2, 3) = 3
reduce of partition 1 will be max(1, 4, 5, 6) = 6
final reduce across partitions will be 1 + 3 + 6 = 10
nums.aggregate(5)(math.max(_, _), _ + _)
reduce of partition 0 will be max(5, 1, 2, 3) = 5
reduce of partition 1 will be max(5, 4, 5, 6) = 6
final reduce across partitions will be 5 + 5 + 6 = 16
nums.aggregate(5)(math.max(_, _), _ * _)
reduce of partition 0 will be max(5, 1, 2, 3) = 5
reduce of partition 1 will be max(5, 4, 5, 6) = 6
final reduce across partitions will be 5 * 5 * 6 = 150
chars.aggregate("")(_ + _, _+_)
res: String = defabc
chars.aggregate("x")(_ + _, _+_)
res: String = xxabcxdef
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)
def myfunc(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.toList.map(x => "[index:" + index + ", val: " + x + "]").iterator
pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
res: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
res: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))
val x = sc.parallelize(List(1,2,3,4,5))
val y = sc.parallelize(List(6,7,8,9,10))
res: Array[(Int, Int)] = Array((1,6), (1,7), (1,8), (1,9), (1,10), (2,6), (2,7), (2,8), (2,9), (2,10), (3,6), (3,7), (3,8), (3,9), (3,10), (4,6), (5,6), (4,7), (5,7), (4,8), (5,8), (4,9), (4,10), (5,9), (5,10))
val y = sc.parallelize(1 to 10, 10)
res: Int = 10
val z = y.coalesce(2, false)
res: Int = 2
val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.map((_, "b"))
val c = a.map((_, "c"))
val d = a.map((_, "d"))
res: Array[(Int, (Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(b, b),CompactBuffer(c, c))), (3,(CompactBuffer(b),CompactBuffer(c))), (2,(CompactBuffer(b),CompactBuffer(c))))
b.cogroup(c, d).collect
res: Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(b, b),CompactBuffer(c, c),CompactBuffer(d, d))), (3,(CompactBuffer(b),CompactBuffer(c),CompactBuffer(d))), (2,(CompactBuffer(b),CompactBuffer(c),CompactBuffer(d))))
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
res: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)
val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.zip(a)
res: Array[(Int, Int)] = Array((1,1), (2,2), (1,1), (3,3))
res: scala.collection.Map[Int,Int] = Map(2 -> 2, 1 -> 1, 3 -> 3)
val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val c = b.zip(a)
res: Array[(Int, String)] = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee))
val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)
res: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf)))
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
res: Long = 4
val c = sc.parallelize(List((3, "Gnu"), (3, "Yak"), (5, "Mouse"), (3, "Dog")), 2)
res: scala.collection.Map[Int,Long] = Map(3 -> 3, 5 -> 1)
val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1))
res: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 6, 6 -> 1, 2 -> 3, 7 -> 1, 3 -> 1, 8 -> 1, 4 -> 2)
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)
res: Array[String] = Array(Dog, Gnu, Cat, Rat)
val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
res: Int = 2
res: Int = 3
val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)
res: String = Gnu
val a = sc.parallelize(1 to 10, 3)
val b = a.filter(_ % 2 == 0)
res: Array[Int] = Array(2, 4, 6, 8, 10)
val randRDD = sc.parallelize(List( (2,"cat"), (6, "mouse"),(7, "cup"), (3, "book"), (4, "tv"), (1, "screen"), (5, "heater")), 3)
val sortedRDD = randRDD.sortByKey()
res: Array[(Int, String)] = Array((1,screen), (2,cat), (3,book), (4,tv), (5,heater), (6,mouse), (7,cup))
sortedRDD.filterByRange(1, 3).collect
res: Array[(Int, String)] = Array((1,screen), (2,cat), (3,book))
val a = sc.parallelize(1 to 9, 3)
val b = a.filterWith(i => i)((x,i) => x % 2 == 0 || i % 2 == 0)
res: Array[Int] = Array(1, 2, 3, 4, 6, 7, 8, 9)
val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 5)
a.filterWith(x=> x)((a, b) => b == 0).collect
res: Array[Int] = Array(1, 2)
a.filterWith(x=> x)((a, b) => a % (b+1) == 0).collect
res: Array[Int] = Array(1, 2, 4, 6, 8, 10)
a.filterWith(x=> x.toString)((a, b) => b == "2").collect
res: Array[Int] = Array(5, 6)
val a = sc.parallelize(1 to 10, 5)
a.flatMap(1 to _).collect
res: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
sc.parallelize(List(1, 2, 3), 2).flatMap(x => List(x, x, x)).collect
res: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)
// The program below generates a random number of copies (up to 10) of the items in the list.
val x = sc.parallelize(1 to 10, 3)
res: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)
val c = sc.parallelize(List("cat", "dog", "tiger", "lion", "gnu", "crocodile", "ant", "whale", "dolphin", "spider"), 3)
c.foreach(x => println(x + "s are yummy"))
lions are yummy
gnus are yummy
crocodiles are yummy
ants are yummy
whales are yummy
dolphins are yummy
spiders are yummy
val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)
res: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))
val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val d = c.keyBy(_.length)
res: Array[(Int, String)] = Array((3,dog), (3,cat), (3,gnu), (6,salmon), (6,rabbit), (6,turkey), (4,wolf), (4,bear), (3,bee))
res: Array[(Int, (String, String))] = Array((6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)), (3,(rat,dog)), (3,(rat,cat)), (3,(rat,gnu)), (3,(rat,bee)))
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
res: Array[Int] = Array(3, 5, 4, 3, 7, 5)
val y = sc.parallelize(10 to 30)
res: Int = 30
val a = sc.parallelize(List((10, "dog"), (3, "tiger"), (9, "lion"), (18, "cat")))
res: (Int, String) = (3,tiger)
val a = sc.parallelize(1 to 9, 3)
a.pipe("head -n 1").collect
res: Array[String] = Array(1, 4, 7)
val a = sc.parallelize(1 to 10, 3)
a.reduce(_ + _)
res: Int = 55
val a = sc.parallelize(1 to 10000, 3)
import org.apache.hadoop.io.compress.GzipCodec
a.saveAsTextFile("mydata_b", classOf[GzipCodec])
val x = sc.textFile("mydata_b")
res: Long = 10000
val x = sc.parallelize(List(1,2,3,4,5,6,6,7,9,8,10,21), 3)
val sp = sc.textFile("hdfs://localhost:8020/user/hadoop/sp_data")
sp.flatMap(_.split(" ")).saveAsTextFile("hdfs://localhost:8020/user/hadoop/sp_x")
val v = sc.parallelize(Array(("owl",3), ("gnu",4), ("dog",1), ("cat",2), ("ant",5)), 2)
val x = sc.parallelize(1 to 100, 3)
val y = sc.objectFile[Array[Int]]("objFile")
res: Array[Int] = Array(67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33)
val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2)
res: org.apache.spark.util.StatCounter = (count: 9, mean: 11.266667, stdev: 8.126859, max: 21.000000, min: 1.000000)
val y = sc.parallelize(Array(5, 7, 1, 3, 2, 1))
y.sortBy(c => c, true).collect
res: Array[Int] = Array(1, 1, 2, 3, 5, 7)
y.sortBy(c => c, false).collect
res: Array[Int] = Array(7, 5, 3, 2, 1, 1)
val z = sc.parallelize(Array(("H", 10), ("A", 26), ("Z", 1), ("L", 5)))
z.sortBy(c => c._1, true).collect
res: Array[(String, Int)] = Array((A,26), (H,10), (L,5), (Z,1))
z.sortBy(c => c._2, true).collect
res: Array[(String, Int)] = Array((Z,1), (L,5), (H,10), (A,26))
val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = sc.parallelize(1 to a.count.toInt, 2)
val c = a.zip(b)
res: Array[(String, Int)] = Array((ant,5), (cat,2), (dog,1), (gnu,4), (owl,3))
res: Array[(String, Int)] = Array((owl,3), (gnu,4), (dog,1), (cat,2), (ant,5))
val a = sc.parallelize(1 to 100, 5)
val b = a.cartesian(a)
val c = sc.parallelize(b.takeSample(true, 5, 13), 2)
val d = c.sortByKey(false)
res: Array[(Int, Int)] = Array((96,9), (84,76), (59,59), (53,65), (52,4))
val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2)
res: Double = 101.39999999999999
val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2)
res: Array[String] = Array(dog, cat)
val b = sc.parallelize(1 to 10000, 5000)
res: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)
val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2)
res: Array[String] = Array(ape, cat)
val a = sc.parallelize(1 to 9, 3)
val b = sc.parallelize(1 to 3, 3)
val c = a.subtract(b)
res156: String =
(3) MapPartitionsRDD[146] at subtract at <console>:26 []
| SubtractedRDD[145] at subtract at <console>:26 []
+-(3) MapPartitionsRDD[143] at subtract at <console>:26 []
| | ParallelCollectionRDD[141] at parallelize at <console>:22 []
+-(3) MapPartitionsRDD[144] at subtract at <console>:26 []
| ParallelCollectionRDD[142] at parallelize at <console>:22 []
val c = sc.parallelize(Array(6, 9, 4, 7, 5, 8), 2)
res: Array[Int] = Array(9, 8)
val a = sc.parallelize(1 to 3, 1)
val b = sc.parallelize(5 to 7, 1)
(a ++ b).collect
res: Array[Int] = Array(1, 2, 3, 5, 6, 7)
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
res: Array[String] = Array(dog, tiger, lion, cat, panther, eagle)
val a = sc.parallelize(List(9.1, 1.0, 1.2, 2.1, 1.3, 5.0, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 10.0, 8.9, 5.5), 3)
res: Double = 10.605333333333332
val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2)
res: Double = 66.04584444444443
res: Double = 74.30157499999999
