Monday 23 September 2019

spark_command_line_examples

------------------------------------------------------------
Installation Steps
------------------------------------------------------------

1. Install Java, Python, R
sudo apt-get install openjdk-7-jdk
sudo apt-get install python
sudo apt-get install r-base

2. Install Hadoop-2.6.0, Hive-1.2.1

3. Install scala-2.11.7
Download the file from below link
http://www.scala-lang.org/files/archive/scala-2.11.7.tgz

copy the tar file into work dir & extract it
update the ~/.bashrc file with SCALA_HOME path

4. Install spark-1.6.0
Download the file from below link
https://spark.apache.org/downloads.html

copy the tar file into work dir & extract it
update the ~/.bashrc file with SPARK_HOME path

5. reopen the terminal to reflect the changes

------------------------------------------------------------
Spark Command Line Examples
------------------------------------------------------------
1. "scala" is the command to start the "scala"
   "python" is the command to start the "python"
   "R" is the command to start the "R"

    Verify the all are installed or not

2. spark-shell, pyspark, sparkR, spark-sql these are the main commands to work with spark

execute "spark-shell" command, it will move to "scala" prompt
execute "pyspark" command, it will move to "python" prompt
execute "sparkR" command, it will move to "R" prompt



==========================================================
Examples on Spark using Scala
==========================================================

val nums = sc.parallelize(List(1,2,3,4,5,6), 2)

val chars = sc.parallelize(List("a","b","c","d","e","f"),2)

nums.foreach(println)

chars.foreach(println)


def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
  iter.toList.map(x => "[index:" +  index + ", val: " + x + "]").iterator
}

nums.mapPartitionsWithIndex(myfunc).collect

chars.mapPartitionsWithIndex(myfunc).collect

------------------------------------------------

def seqOp(a : Int,b : Int) = {math.max(a,b)}
def comOp(a : Int,b : Int) = {a * b}
nums.aggregate(5)(seqOp, comOp)


nums.aggregate(1)(math.max(_, _), _ + _)

reduce of partition 0 will be max(1, 1, 2, 3) = 3
reduce of partition 1 will be max(1, 4, 5, 6) = 6
final reduce across partitions will be 1 + 3 + 6 = 10


nums.aggregate(5)(math.max(_, _), _ + _)

reduce of partition 0 will be max(5, 1, 2, 3) = 5
reduce of partition 1 will be max(5, 4, 5, 6) = 6
final reduce across partitions will be 5 + 5 + 6 = 16


nums.aggregate(5)(math.max(_, _), _ * _)

reduce of partition 0 will be max(5, 1, 2, 3) = 5
reduce of partition 1 will be max(5, 4, 5, 6) = 6
final reduce across partitions will be 5 * 5 * 6 = 150


chars.aggregate("")(_ + _, _+_)
res: String = defabc

chars.aggregate("x")(_ + _, _+_)
res: String = xxabcxdef


------------------------------------------------

val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12), ("mouse", 2)), 2)

def myfunc(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
  iter.toList.map(x => "[index:" +  index + ", val: " + x + "]").iterator
}

pairRDD.mapPartitionsWithIndex(myfunc).collect


pairRDD.aggregateByKey(0)(math.max(_, _), _ + _).collect
res: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))


pairRDD.aggregateByKey(100)(math.max(_, _), _ + _).collect
res: Array[(String, Int)] = Array((dog,100), (cat,200), (mouse,200))

------------------------------------------------

val x = sc.parallelize(List(1,2,3,4,5))
val y = sc.parallelize(List(6,7,8,9,10))

x.cartesian(y).collect
res: Array[(Int, Int)] = Array((1,6), (1,7), (1,8), (1,9), (1,10), (2,6), (2,7), (2,8), (2,9), (2,10), (3,6), (3,7), (3,8), (3,9), (3,10), (4,6), (5,6), (4,7), (5,7), (4,8), (5,8), (4,9), (4,10), (5,9), (5,10))

------------------------------------------------

val y = sc.parallelize(1 to 10, 10)
y.partitions.length
res: Int = 10

val z = y.coalesce(2, false)
z.partitions.length
res: Int = 2

------------------------------------------------

val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.map((_, "b"))
val c = a.map((_, "c"))
val d = a.map((_, "d"))

b.cogroup(c).collect
res: Array[(Int, (Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(b, b),CompactBuffer(c, c))), (3,(CompactBuffer(b),CompactBuffer(c))), (2,(CompactBuffer(b),CompactBuffer(c))))


b.cogroup(c, d).collect
res:  Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(b, b),CompactBuffer(c, c),CompactBuffer(d, d))), (3,(CompactBuffer(b),CompactBuffer(c),CompactBuffer(d))), (2,(CompactBuffer(b),CompactBuffer(c),CompactBuffer(d))))

------------------------------------------------

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)

c.collect
res: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)

------------------------------------------------

val a = sc.parallelize(List(1, 2, 1, 3), 1)
val b = a.zip(a)

b.collect
res: Array[(Int, Int)] = Array((1,1), (2,2), (1,1), (3,3))

b.collectAsMap
res: scala.collection.Map[Int,Int] = Map(2 -> 2, 1 -> 1, 3 -> 3)

------------------------------------------------

val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val c = b.zip(a)

c.collect
res: Array[(Int, String)] = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee))


val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)

d.collect
res: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf)))

------------------------------------------------

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)

c.count
res: Long = 4

------------------------------------------------

val c = sc.parallelize(List((3, "Gnu"), (3, "Yak"), (5, "Mouse"), (3, "Dog")), 2)

c.countByKey
res: scala.collection.Map[Int,Long] = Map(3 -> 3, 5 -> 1)

------------------------------------------------

val b = sc.parallelize(List(1,2,3,4,5,6,7,8,2,4,2,1,1,1,1,1))

b.countByValue
res: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 6, 6 -> 1, 2 -> 3, 7 -> 1, 3 -> 1, 8 -> 1, 4 -> 2)

------------------------------------------------

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2)

c.distinct.collect
res: Array[String] = Array(Dog, Gnu, Cat, Rat)

val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))

a.distinct(2).partitions.length
res: Int = 2

a.distinct(3).partitions.length
res: Int = 3

------------------------------------------------

val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog"), 2)

c.first
res: String = Gnu

------------------------------------------------

val a = sc.parallelize(1 to 10, 3)
val b = a.filter(_ % 2 == 0)

b.collect
res: Array[Int] = Array(2, 4, 6, 8, 10)

------------------------------------------------

val randRDD = sc.parallelize(List( (2,"cat"), (6, "mouse"),(7, "cup"), (3, "book"), (4, "tv"), (1, "screen"), (5, "heater")), 3)
val sortedRDD = randRDD.sortByKey()

sortedRDD.collect
res: Array[(Int, String)] = Array((1,screen), (2,cat), (3,book), (4,tv), (5,heater), (6,mouse), (7,cup))

sortedRDD.filterByRange(1, 3).collect
res: Array[(Int, String)] = Array((1,screen), (2,cat), (3,book))

------------------------------------------------

val a = sc.parallelize(1 to 9, 3)
val b = a.filterWith(i => i)((x,i) => x % 2 == 0 || i % 2 == 0)

b.collect
res: Array[Int] = Array(1, 2, 3, 4, 6, 7, 8, 9)

val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 5)
a.filterWith(x=> x)((a, b) =>  b == 0).collect
res: Array[Int] = Array(1, 2)

a.filterWith(x=> x)((a, b) =>  a % (b+1) == 0).collect
res: Array[Int] = Array(1, 2, 4, 6, 8, 10)

a.filterWith(x=> x.toString)((a, b) =>  b == "2").collect
res: Array[Int] = Array(5, 6)


------------------------------------------------

val a = sc.parallelize(1 to 10, 5)

a.flatMap(1 to _).collect
res: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

sc.parallelize(List(1, 2, 3), 2).flatMap(x => List(x, x, x)).collect
res: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)

// The program below generates a random number of copies (up to 10) of the items in the list.
val x  = sc.parallelize(1 to 10, 3)
x.flatMap(List.fill(scala.util.Random.nextInt(10))(_)).collect
res: Array[Int] = Array(1, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 10)

------------------------------------------------

val c = sc.parallelize(List("cat", "dog", "tiger", "lion", "gnu", "crocodile", "ant", "whale", "dolphin", "spider"), 3)

c.foreach(x => println(x + "s are yummy"))
lions are yummy
gnus are yummy
crocodiles are yummy
ants are yummy
whales are yummy
dolphins are yummy
spiders are yummy

------------------------------------------------


val a = sc.parallelize(List("dog", "salmon", "salmon", "rat", "elephant"), 3)
val b = a.keyBy(_.length)

b.collect
res: Array[(Int, String)] = Array((3,dog), (6,salmon), (6,salmon), (3,rat), (8,elephant))


val c = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val d = c.keyBy(_.length)

d.collect
res: Array[(Int, String)] = Array((3,dog), (3,cat), (3,gnu), (6,salmon), (6,rabbit), (6,turkey), (4,wolf), (4,bear), (3,bee))


b.join(d).collect

res: Array[(Int, (String, String))] = Array((6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)), (3,(rat,dog)), (3,(rat,cat)), (3,(rat,gnu)), (3,(rat,bee)))

------------------------------------------------

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.keys.collect
res: Array[Int] = Array(3, 5, 4, 3, 7, 5)

------------------------------------------------

val y = sc.parallelize(10 to 30)
y.max
res: Int = 30

val a = sc.parallelize(List((10, "dog"), (3, "tiger"), (9, "lion"), (18, "cat")))
a.min
res: (Int, String) = (3,tiger)
------------------------------------------------

val a = sc.parallelize(1 to 9, 3)

a.pipe("head -n 1").collect
res: Array[String] = Array(1, 4, 7)

------------------------------------------------

val a = sc.parallelize(1 to 10, 3)
a.reduce(_ + _)
res: Int = 55

------------------------------------------------
val a = sc.parallelize(1 to 10000, 3)
a.saveAsTextFile("mydata_a")

import org.apache.hadoop.io.compress.GzipCodec
a.saveAsTextFile("mydata_b", classOf[GzipCodec])

val x = sc.textFile("mydata_b")
x.count
res: Long = 10000

val x = sc.parallelize(List(1,2,3,4,5,6,6,7,9,8,10,21), 3)
x.saveAsTextFile("hdfs://localhost:8020/user/hadoop/test");

val sp = sc.textFile("hdfs://localhost:8020/user/hadoop/sp_data")
sp.flatMap(_.split(" ")).saveAsTextFile("hdfs://localhost:8020/user/hadoop/sp_x")


val v = sc.parallelize(Array(("owl",3), ("gnu",4), ("dog",1), ("cat",2), ("ant",5)), 2)
v.saveAsSequenceFile("hd_seq_file")


val x = sc.parallelize(1 to 100, 3)
x.saveAsObjectFile("objFile")

val y = sc.objectFile[Array[Int]]("objFile")
y.collect
res: Array[Int] = Array(67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33)
------------------------------------------------

val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2)
x.stats
res: org.apache.spark.util.StatCounter = (count: 9, mean: 11.266667, stdev: 8.126859, max: 21.000000, min: 1.000000)

------------------------------------------------

val y = sc.parallelize(Array(5, 7, 1, 3, 2, 1))
y.sortBy(c => c, true).collect
res: Array[Int] = Array(1, 1, 2, 3, 5, 7)

y.sortBy(c => c, false).collect
res: Array[Int] = Array(7, 5, 3, 2, 1, 1)

val z = sc.parallelize(Array(("H", 10), ("A", 26), ("Z", 1), ("L", 5)))
z.sortBy(c => c._1, true).collect
res: Array[(String, Int)] = Array((A,26), (H,10), (L,5), (Z,1))

z.sortBy(c => c._2, true).collect
res: Array[(String, Int)] = Array((Z,1), (L,5), (H,10), (A,26))

------------------------------------------------

val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = sc.parallelize(1 to a.count.toInt, 2)
val c = a.zip(b)

c.sortByKey(true).collect
res: Array[(String, Int)] = Array((ant,5), (cat,2), (dog,1), (gnu,4), (owl,3))

c.sortByKey(false).collect
res: Array[(String, Int)] = Array((owl,3), (gnu,4), (dog,1), (cat,2), (ant,5))

val a = sc.parallelize(1 to 100, 5)
val b = a.cartesian(a)
val c = sc.parallelize(b.takeSample(true, 5, 13), 2)
val d = c.sortByKey(false)
res: Array[(Int, Int)] = Array((96,9), (84,76), (59,59), (53,65), (52,4))

------------------------------------------------

val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2)
x.sum
res: Double = 101.39999999999999

------------------------------------------------

val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2)
b.take(2)
res: Array[String] = Array(dog, cat)

val b = sc.parallelize(1 to 10000, 5000)
b.take(100)
res: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)

------------------------------------------------

val b = sc.parallelize(List("dog", "cat", "ape", "salmon", "gnu"), 2)
b.takeOrdered(2)
res: Array[String] = Array(ape, cat)

------------------------------------------------

val a = sc.parallelize(1 to 9, 3)
val b = sc.parallelize(1 to 3, 3)
val c = a.subtract(b)
c.toDebugString

res156: String =
(3) MapPartitionsRDD[146] at subtract at <console>:26 []
 |  SubtractedRDD[145] at subtract at <console>:26 []
 +-(3) MapPartitionsRDD[143] at subtract at <console>:26 []
 |  |  ParallelCollectionRDD[141] at parallelize at <console>:22 []
 +-(3) MapPartitionsRDD[144] at subtract at <console>:26 []
    |  ParallelCollectionRDD[142] at parallelize at <console>:22 []


------------------------------------------------

val c = sc.parallelize(Array(6, 9, 4, 7, 5, 8), 2)
c.top(2)
res: Array[Int] = Array(9, 8)

------------------------------------------------

val a = sc.parallelize(1 to 3, 1)
val b = sc.parallelize(5 to 7, 1)
(a ++ b).collect
res: Array[Int] = Array(1, 2, 3, 5, 6, 7)

------------------------------------------------

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.values.collect
res: Array[String] = Array(dog, tiger, lion, cat, panther, eagle)

------------------------------------------------

val a = sc.parallelize(List(9.1, 1.0, 1.2, 2.1, 1.3, 5.0, 2.0, 2.1, 7.4, 7.5, 7.6, 8.8, 10.0, 8.9, 5.5), 3)
a.variance
res: Double = 10.605333333333332

val x = sc.parallelize(List(1.0, 2.0, 3.0, 5.0, 20.0, 19.02, 19.29, 11.09, 21.0), 2)
x.variance
res: Double = 66.04584444444443

x.sampleVariance
res: Double = 74.30157499999999




No comments:

Post a Comment

spark_streaming_examples

Create Spark Streaming Context: ========================================== scala: --------------- import org.apache.spark._ import ...