Monday, 23 September 2019

spark_streaming_examples

Create Spark Streaming Context:
==========================================
scala:
---------------


import org.apache.spark._

import org.apache.spark.streaming._

import org.apache.spark.streaming.StreamingContext._

// not necessary since Spark 1.3


// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

// Split each line into words
val words = lines.flatMap(_.split(" "))


import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate



java:
---------------
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;

import scala.Tuple2;


// Create a local StreamingContext with two working thread and batch interval of 1
second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1))

// Create a DStream that will connect to hostname:port, like localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

// Split each line into words
JavaDStream<String> words = lines.flatMap(
  new FlatMapFunction<String, String>() {
    @Override public Iterable<String> call(String x) {
      return Arrays.asList(x.split(" "));
    }
  });


// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(
  new PairFunction<String, String, Integer>() {
    @Override public Tuple2<String, Integer> call(String s) {
      return new Tuple2<String, Integer>(s, 1);
    }
  });
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
  new Function2<Integer, Integer, Integer>() {
    @Override public Integer call(Integer i1, Integer i2) {
      return i1 + i2;
    }
  });


// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();


jssc.start();             
// Start the computation
jssc.awaitTermination(); 
// Wait for the computation to terminate





python:
---------------
from pyspark
import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate





==================================================================

nc -lk 9999

$SPARK_HOME/bin/run-example streaming.NetworkWordCount localhost 9999

$SPARK_HOME/bin/run-example streaming.JavaNetworkWordCount localhost 9999

$SPARK_HOME/bin/spark-submit $SPARK_HOME/examples/src/main/python/streaming/network_wordcount.py localhost 9999

==================================================================

$SPARK_HOME/bin/run-example streaming.TwitterPopularTags FlRx3d0n8duIQ0UvGeGtTA DS7TTbxhmQ7oCUlDntpQQRqQllFFOiyNoOMEDD0lA 1643982224-xTfNpLrARoWKxRh9KtFqc7aoB8KAAHkCcfC5vDk PqkbuBqF3AVskgx1OKgXKOZzV7EMWRmRG0p8hvLQYKs

==================================================================

$FLUME_HOME/bin/flume-ng agent -n spark-flume --conf $FLUME_HOME/conf -f $FLUME_HOME/test/spark-flume.conf -Dflume.root.logger=DEBUG,console

telnet localhost 3000

$SPARK_HOME/bin/run-example org.apache.spark.examples.streaming.FlumeEventCount localhost 1234

==================================================================

$SPARK_HOME/bin/run-example streaming.HdfsWordCount file:/home/orienit/spark/input/test

==================================================================
$SPARK_HOME/bin/run-example streaming.QueueStream

==================================================================

$SPARK_HOME/bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10

$SPARK_HOME/bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444

$SPARK_HOME/bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewStream activeUserCount localhost 44444

$SPARK_HOME/bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewStream popularUsersSeen localhost 44444



==================================================================
Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>

$SPARK_HOME/bin/run-example org.apache.spark.examples.streaming.KafkaWordCount localhost:2181 my-consumer-group topic1,topic2 1


==================================================================



$SPARK_HOME/bin/run-example streaming.StatefulNetworkWordCount localhost 9999









spark_projects_build_commands

How to Build Eclipse Project:
--------------------------------------------

1. Scala + Maven + Eclipse project
--------------------------------------------
cd /home/orienit/spark/workspace/KalyanScalaProjectMaven
mvn clean eclipse:eclipse
mvn package

scala -cp /home/orienit/spark/workspace/KalyanScalaProjectMaven/target/KalyanScalaProjectMaven-0.0.1-SNAPSHOT.jar com.orienit.scala.training.HelloWorld


2. Scala + Sbt + Eclipse project
--------------------------------------------
cd /home/orienit/spark/workspace/KalyanScalaProjectSbt
sbt clean eclipse
sbt package

scala -cp /home/orienit/spark/workspace/KalyanScalaProjectSbt/target/scala-2.11/kalyanscalaprojectsbt_2.11-1.0.jar com.orienit.scala.training.HelloWorld



3. Spark + Maven + Eclipse project
--------------------------------------------
cd /home/orienit/spark/workspace/KalyanSparkProjectMaven
mvn clean eclipse:eclipse
mvn package

Scala Code:
------------------
$SPARK_HOME/bin/spark-submit --class com.orienit.spark.scala.training.WordCount --master local /home/orienit/spark/workspace/KalyanSparkProjectMaven/target/KalyanSparkProjectMaven-0.0.1-SNAPSHOT.jar /etc/hosts /home/orienit/spark/ouput/maven-op-1

Java Code:
------------------
$SPARK_HOME/bin/spark-submit --class com.orienit.spark.java.training.WordCount --master local /home/orienit/spark/workspace/KalyanSparkProjectMaven/target/KalyanSparkProjectMaven-0.0.1-SNAPSHOT.jar /etc/hosts /home/orienit/spark/ouput/maven-op-2

Python Code:
------------------
$SPARK_HOME/bin/spark-submit /home/orienit/spark/workspace/KalyanSparkProjectMaven/src/main/python/com/orienit/spark/python/training/WordCount.py /etc/hosts /home/orienit/spark/ouput/maven-op-3


4. Spark + Sbt + Eclipse project
--------------------------------------------
cd /home/orienit/spark/workspace/KalyanSparkProjectSbt
sbt clean eclipse
sbt package


Scala Code:
------------------
$SPARK_HOME/bin/spark-submit --class com.orienit.spark.scala.training.WordCount --master local /home/orienit/spark/workspace/KalyanSparkProjectSbt/target/scala-2.11/kalyansparkprojectsbt_2.11-1.0.jar /etc/hosts /home/orienit/spark/ouput/sbt-op-1

Java Code:
------------------
$SPARK_HOME/bin/spark-submit --class com.orienit.spark.java.training.WordCount --master local /home/orienit/spark/workspace/KalyanSparkProjectSbt/target/scala-2.11/kalyansparkprojectsbt_2.11-1.0.jar /etc/hosts /home/orienit/spark/ouput/sbt-op-2

Python Code:
------------------
$SPARK_HOME/bin/spark-submit /home/orienit/spark/workspace/KalyanSparkProjectSbt/src/main/python/com/orienit/spark/python/training/WordCount.py /etc/hosts /home/orienit/spark/ouput/sbt-op-3








spark_mllib_examples

----------------------------------------------
hadoop fs -put $SPARK_HOME/data data


Basic Statistics:
--------------------------

$SPARK_HOME/bin/run-example org.apache.spark.examples.mllib.MultivariateSummarizer --input $SPARK_HOME/data/mllib/sample_linear_regression_data.txt


$SPARK_HOME/bin/run-example org.apache.spark.examples.mllib.Correlations --input $SPARK_HOME/data/mllib/sample_linear_regression_data.txt


$SPARK_HOME/bin/run-example org.apache.spark.examples.mllib.RandomRDDGeneration


Classification and regression:
--------------------------
---------------------------------------------------------------------------
Problem Type ==> Supported Methods
---------------------------------------------------------------------------
Binary Classification ==> linear SVMs, logistic regression, decision trees, random forests, gradient-boosted trees, naive Bayes

Multiclass Classification ==> logistic regression, decision trees, random forests, naive Bayes

Regression ==> linear least squares, Lasso, ridge regression, decision trees, random forests, gradient-boosted trees, isotonic regression
---------------------------------------------------------------------------

linear SVMs:- $SPARK_HOME/bin/run-example mllib.BinaryClassificationMetricsExample

logistic regression:- $SPARK_HOME/bin/run-example mllib.MulticlassMetricsExample

naive Bayes:-  $SPARK_HOME/bin/run-example mllib.NaiveBayesExample

decision trees:-  $SPARK_HOME/bin/run-example mllib.DecisionTreeRegressionExample
decision trees:-  $SPARK_HOME/bin/run-example mllib.DecisionTreeClassificationExample

random forests:-  $SPARK_HOME/bin/run-example mllib.RandomForestClassificationExample
random forests:-  $SPARK_HOME/bin/run-example mllib.RandomForestRegressionExample

gradient-boosted trees:-  $SPARK_HOME/bin/run-example mllib.GradientBoostingRegressionExample
gradient-boosted trees:-  $SPARK_HOME/bin/run-example mllib.GradientBoostingClassificationExample

isotonic regression:-  $SPARK_HOME/bin/run-example mllib.IsotonicRegressionExample


$SPARK_HOME/bin/run-example org.apache.spark.examples.mllib.BinaryClassification $SPARK_HOME/data/mllib/sample_binary_classification_data.txt


$SPARK_HOME/bin/run-example org.apache.spark.examples.mllib.LinearRegression $SPARK_HOME/data/mllib/sample_linear_regression_data.txt



Collaborative filtering:
--------------------------

$SPARK_HOME/bin/run-example mllib.RecommendationExample


Clustering:
--------------------------
kmeans:- $SPARK_HOME/bin/run-example org.apache.spark.examples.mllib.DenseKMeans $SPARK_HOME/data/mllib/kmeans_data.txt --k 3 --numIterations 5

Gaussian mixture:- $SPARK_HOME/bin/run-example org.apache.spark.examples.mllib.DenseGaussianMixture $SPARK_HOME/data/mllib/kmeans_data.txt 3 5

power iteration clustering (PIC):- $SPARK_HOME/bin/run-example org.apache.spark.examples.mllib.PowerIterationClusteringExample

latent Dirichlet allocation (LDA):- $SPARK_HOME/bin/run-example org.apache.spark.examples.mllib.LDAExample $SPARK_HOME/data/mllib/sample_lda_data.txt


streaming k-means:- $SPARK_HOME/bin/run-example org.apache.spark.examples.mllib.StreamingKMeansExample

bisecting k-means:-

---------------------------------------------------------

$SPARK_HOME/bin/run-example mllib.CosineSimilarity --threshold 0.1 $SPARK_HOME/data/mllib/sample_svm_data.txt


$SPARK_HOME/bin/run-example mllib.FPGrowthExample --minSupport 0.8 --numPartition 2 $SPARK_HOME/data/mllib/sample_fpgrowth.txt


$SPARK_HOME/bin/run-example org.apache.spark.examples.mllib.MovieLensALS $SPARK_HOME/data/mllib/sample_movielens_data.txt



$SPARK_HOME/bin/run-example org.apache.spark.examples.mllib.SampledRDDs --input $SPARK_HOME/data/mllib/sample_linear_regression_data.txt


$SPARK_HOME/bin/run-example mllib.StreamingTestExample file:/home/orienit/spark/input/test


----------------------------------------------

$SPARK_HOME/bin/run-example ml.CrossValidatorExample


$SPARK_HOME/bin/run-example org.apache.spark.examples.ml.DataFrameExample --input $SPARK_HOME/data/mllib/sample_libsvm_data.txt


$SPARK_HOME/bin/run-example org.apache.spark.examples.ml.KMeansExample $SPARK_HOME/data/mllib/kmeans_data.txt 3


$SPARK_HOME/bin/run-example org.apache.spark.examples.ml.LinearRegressionExample --regParam 0.15 --elasticNetParam 1.0 $SPARK_HOME/data/mllib/sample_linear_regression_data.txt


$SPARK_HOME/bin/run-example org.apache.spark.examples.ml.LogisticRegressionExample --regParam 0.3 --elasticNetParam 0.8 $SPARK_HOME/data/mllib/sample_libsvm_data.txt


$SPARK_HOME/bin/run-example org.apache.spark.examples.ml.MovieLensALS --rank 10 --maxIter 15 --regParam 0.1 --movies $SPARK_HOME/data/mllib/als/sample_movielens_movies.txt --ratings $SPARK_HOME/data/mllib/als/sample_movielens_ratings.txt



Estimator, Transformer, and Param
----------------------------------------------

$SPARK_HOME/bin/run-example org.apache.spark.examples.ml.SimpleParamsExample


model selection via train validation split
----------------------------------------------
$SPARK_HOME/bin/run-example org.apache.spark.examples.mlTrainValidationSplitExample



----------------------------------------------

cd /home/orienit/spark/machine_learning_examples/ml-100k

val rawData = sc.textFile("file:/home/orienit/spark/input/ml-100k/u.data")
rawData.first()


val rawRatings = rawData.map(_.split("\t").take(3))
rawRatings.first()


import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating

val ratings = rawRatings.map { case Array(user, movie, rating) => Rating(user.toInt, movie.toInt, rating.toDouble) }
ratings.first()

val model = ALS.train(ratings, 50, 10, 0.01)

model.userFeatures

model.userFeatures.count

model.productFeatures.count


val predictedRating = model.predict(789, 123)

val userId = 789
val K = 10
val topKRecs = model.recommendProducts(userId, K)
println(topKRecs.mkString("\n"))

----------------------------------------------

val movies = sc.textFile("file:/home/orienit/spark/input/ml-100k/u.item")
val titles = movies.map(line => line.split("\\|").take(2)).map(array => (array(0).toInt, array(1))).collectAsMap()
titles(123)


val moviesForUser = ratings.keyBy(_.user).lookup(789)







spark_hadoop_commands

===================================================================
Hadoop Spark Examples using scala
===================================================================

val key = sc.parallelize(List(1,2,3,4))

val value = sc.parallelize(List("a","b","c","d"))

val mapRdd = key.zip(value)

val tabData = mapRdd.map(x => x._1 + "\t" + x._2)

val tupleData = mapRdd.map(x => (x._1, x._2))


tabData.saveAsTextFile("/home/orienit/spark/output/tabData")

tabData.saveAsTextFile("file:///home/orienit/spark/output/tabData")


tupleData.saveAsSequenceFile("/home/orienit/spark/output/tupleData")

tupleData.saveAsSequenceFile("file:///home/orienit/spark/output/tupleData")




import org.apache.hadoop.conf._

import org.apache.hadoop.io._


import org.apache.hadoop.mapred._


import org.apache.hadoop.mapreduce.lib.input._

import org.apache.hadoop.mapreduce.lib.output._




==============================================
Store data into Hadoop using Hadoop Api's
==============================================


// for text format only


val data_1 = mapRdd.map(x => (x._1, x._2))



(or)

 // any other formats //


val data_1 = mapRdd.map(x => (new LongWritable(x._1), new Text(x._2)))


val conf = new Configuration()


data_1.saveAsHadoopFile("file:///home/orienit/spark/output/data_1", classOf[LongWritable], classOf[Text], classOf[org.apache.hadoop.mapred.TextOutputFormat[LongWritable, Text]]);



data_1.saveAsHadoopFile("file:///home/orienit/spark/output/tupleData_1", classOf[LongWritable], classOf[Text], classOf[org.apache.hadoop.mapred.SequenceFileOutputFormat[LongWritable, Text]]);
data_1.saveAsNewAPIHadoopFile("file:///home/orienit/spark/output/data_2", classOf[LongWritable], classOf[Text], classOf[org.apache.hadoop.mapreduce.lib.output.TextOutputFormat[LongWritable, Text]], conf);



data_1.saveAsNewAPIHadoopFile("file:///home/orienit/spark/output/tupleData_2", classOf[LongWritable], classOf[Text], classOf[org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat[LongWritable, Text]], conf);





==============================================
Load data from Hadoop using Hadoop Api's
==============================================




val data = sc.textFile("/home/orienit/spark/output/data").map(x => (x.split("\t")(0),x.split("\t")(1)))
data.collect


Hadoop Old Api:
=====================


val hadoop1 = sc.hadoopFile[Text, Text, org.apache.hadoop.mapred.KeyValueTextInputFormat]("/home/orienit/spark/output/data").map{ case (x, y) => (x.toString, y.toString) }
hadoop1.collect


val hadoop11 = sc.hadoopFile[LongWritable, Text, org.apache.hadoop.mapred.TextInputFormat]("/home/orienit/spark/output/data").map{ case (x, y) => (y.toString.split("\t")(0),y.toString.split("\t")(1)) }
hadoop11.collect


val hadoop1_seq = sc.hadoopFile[IntWritable, Text, org.apache.hadoop.mapred.SequenceFileInputFormat[IntWritable, Text]]("/home/orienit/spark/output/tupleData").map{ case (x, y) => (x.toString, y.toString) }

(or)

val hadoop1_seq = sc.hadoopFile("/home/orienit/spark/output/tupleData",classOf[org.apache.hadoop.mapred.SequenceFileInputFormat[IntWritable,Text]],classOf[IntWritable], classOf[Text]).map{ case (x, y) => (x.toString, y.toString) }

hadoop1_seq.collect



Hadoop New Api:
=====================

val hadoop2 = sc.newAPIHadoopFile[Text, Text, org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat]("/home/orienit/spark/output/data").map{ case (x, y) => (x.toString, y.toString) }
hadoop2.collect
val hadoop22 = sc.newAPIHadoopFile[LongWritable, Text, org.apache.hadoop.mapreduce.lib.input.TextInputFormat]("/home/orienit/spark/output/data").map{ case (x, y) => (y.toString.split("\t")(0),y.toString.split("\t")(1)) }
hadoop22.collect

val hadoop2_seq = sc.newAPIHadoopFile[IntWritable, Text, org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat[IntWritable, Text]]("/home/orienit/spark/output/tupleData").map{ case (x, y) => (x.toString, y.toString) }


(or)


val hadoop2_seq = sc.newAPIHadoopFile("/home/orienit/spark/output/tupleData", classOf[org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat[IntWritable, Text]], classOf[IntWritable], classOf[Text]).map{ case (x, y) => (x.toString, y.toString) }

hadoop2_seq.collect






Frequent Calls in Spark:
==============================

rdd.saveAsHadoopFile(outputpath, classOf[Key], classOf[Value], classOf[OutputFormat[Key,Value]]);


rdd.saveAsNewAPIHadoopFile(outputpath, classOf[Key], classOf[Value], classOf[OutputFormat[Key,Value]]);



val rdd = sc.hadoopFile[Key, Value, InputFormat[Key,Value]](inputpath)

val rdd = sc.hadoopFile(inputpath, classOf[InputFormat[Key,Value]], classOf[Key], classOf[Value])


val rdd = sc.newAPIHadoopFile[Key, Value, InputFormat[Key,Value]](inputpath)

val rdd = sc.newAPIHadoopFile(inputpath, classOf[InputFormat[Key,Value]], classOf[Key], classOf[Value])



val rdd = sc.textFile(inputpath)
val rdd = sc.parallelize(Object)

rdd.saveAsTextFile(outputpath)
rdd.saveAsSequenceFile(outputpath)
rdd.saveAsObjectFile(outputpath)











===================================================================
Hadoop Spark Examples using python
===================================================================


key = sc.parallelize([1,2,3,4])
value = sc.parallelize(["a","b","c","d"])
map = key.zip(value)

data = mapRdd.map(lambda x : "\t".join([str(x[0]), x[1]]))

tupleData = mapRdd.map(lambda x : (x[0], x[1]))


data.saveAsTextFile("/home/orienit/spark/output/data")

data.saveAsTextFile("file:///home/orienit/spark/output/data")


tupleData.saveAsSequenceFile("/home/orienit/spark/output/tupleData")

tupleData.saveAsSequenceFile("file:///home/orienit/spark/output/tupleData")





==============================================
Store data into Hadoop using Hadoop Api's
==============================================



// for text format only


data_1 = mapRdd.map(lambda x : (x[0], x[1]))


(or)


// any other formats

data_1 = mapRdd.map(lambda x : (long(x[0]), x[1]))


data_1.saveAsHadoopFile("file:///home/orienit/spark/output/data_1","org.apache.hadoop.mapred.TextOutputFormat",
"org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text");


data_1.saveAsHadoopFile("file:///home/orienit/spark/output/tupleData_1",
"org.apache.hadoop.mapred.SequenceFileOutputFormat",
"org.apache.hadoop.io.LongWritable",
"org.apache.hadoop.io.Text");



data_1.saveAsNewAPIHadoopFile("file:///home/orienit/spark/output/data_2","org.apache.hadoop.mapreduce.lib.output.TextOutputFormat",
"org.apache.hadoop.io.LongWritable",
"org.apache.hadoop.io.Text");


data_1.saveAsNewAPIHadoopFile("file:///home/orienit/spark/output/tupleData_2",
"org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat",
"org.apache.hadoop.io.LongWritable",
"org.apache.hadoop.io.Text");




==============================================
Load data from Hadoop using Hadoop Api's
==============================================



data = sc.textFile("/home/orienit/spark/output/data").map(lambda x : (x.split("\t")[0],x.split("\t")[1]))

data.collect()



Hadoop Old Api:
=====================

hadoop1 = sc.hadoopFile("file:///home/orienit/spark/output/data",
"org.apache.hadoop.mapred.KeyValueTextInputFormat",
"org.apache.hadoop.io.Text",
"org.apache.hadoop.io.Text").map(lambda x: (str(x[0]), str(x[1])))
hadoop1.collect()



hadoop11 = sc.hadoopFile("file:///home/orienit/spark/output/data",
"org.apache.hadoop.mapred.TextInputFormat","org.apache.hadoop.io.LongWritable",
"org.apache.hadoop.io.Text").map(lambda x : (long(x[1].split("\t")[0]), str(x[1].split("\t")[1])))
hadoop11.collect()



hadoop1_seq = sc.hadoopFile("file:///home/orienit/spark/output/tupleData",
"org.apache.hadoop.mapred.SequenceFileInputFormat",
"org.apache.hadoop.io.LongWritable",
"org.apache.hadoop.io.Text").map(lambda x: (long(x[0]), str(x[1])))
hadoop1_seq.collect()





Hadoop New Api:
=====================


hadoop2 = sc.newAPIHadoopFile("file:///home/orienit/spark/output/data",
"org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat","org.apache.hadoop.io.Text",
"org.apache.hadoop.io.Text").map(lambda x: (str(x[0]), str(x[1])))
hadoop2.collect()


hadoop22 = sc.newAPIHadoopFile("file:///home/orienit/spark/output/data",
"org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
"org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text").map(lambda x : (long(x[1].split("\t")[0]), str(x[1].split("\t")[1])))

hadoop22.collect()



hadoop2_seq = sc.newAPIHadoopFile("file:///home/orienit/spark/output/tupleData",
"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
"org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text").map(lambda x: (long(x[0]), str(x[1])))

hadoop2_seq.collect()






spark_graphx_examples

------------------------------------------------

$SPARK_HOME/bin/run-example org.apache.spark.examples.graphx.LiveJournalPageRank file:/home/orienit/spark/input/graphx/pagerank.txt --numEPart=10 --output=file:/home/orienit/spark/output/pagerank-1

------------------------------------------------

$SPARK_HOME/bin/run-example org.apache.spark.examples.graphx.Analytics pagerank file:/home/orienit/spark/input/graphx/pagerank.txt --numEPart=10 --output=file:/home/orienit/spark/output/pagerank-2

$SPARK_HOME/bin/run-example org.apache.spark.examples.graphx.Analytics cc file:/home/orienit/spark/input/graphx/pagerank.txt --numEPart=10

$SPARK_HOME/bin/run-example org.apache.spark.examples.graphx.Analytics triangles file:/home/orienit/spark/input/graphx/pagerank.txt --numEPart=10

------------------------------------------------

$SPARK_HOME/bin/run-example org.apache.spark.examples.graphx.SynthBenchmark -app=pagerank niters=5 nverts=100

------------------------------------------------



------------------------------------------------

Spark_Day3_3

---------------------------------------------------
val t1 = List((1, "kalyan"), (2, "raj"), (3, "venkat"), (4, "raju"))

val t2 = List((1, 10000), (2, 20000), (3, 30000), (5, 50000))

val prdd1 = sc.parallelize(t1, 1)

val prdd2 = sc.parallelize(t2, 1)

---------------------------------------------------

scala> val t1 = List((1, "kalyan"), (2, "raj"), (3, "venkat"), (4, "raju"))
t1: List[(Int, String)] = List((1,kalyan), (2,raj), (3,venkat), (4,raju))

scala> val t2 = List((1, 10000), (2, 20000), (3, 30000), (5, 50000))
t2: List[(Int, Int)] = List((1,10000), (2,20000), (3,30000), (5,50000))

scala> val prdd1 = sc.parallelize(t1, 1)
prdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[1] at parallelize at <console>:26

scala> val prdd2 = sc.parallelize(t2, 1)
prdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[2] at parallelize at <console>:26

---------------------------------------------------

scala> prdd1.join(prdd2).collect.foreach(println)
(1,(kalyan,10000))
(3,(venkat,30000))
(2,(raj,20000))

scala> prdd1.leftOuterJoin(prdd2).collect.foreach(println)
(4,(raju,None))
(1,(kalyan,Some(10000)))
(3,(venkat,Some(30000)))
(2,(raj,Some(20000)))

scala> prdd1.rightOuterJoin(prdd2).collect.foreach(println)
(1,(Some(kalyan),10000))
(3,(Some(venkat),30000))
(5,(None,50000))
(2,(Some(raj),20000))

scala> prdd1.fullOuterJoin(prdd2).collect.foreach(println)
(4,(Some(raju),None))
(1,(Some(kalyan),Some(10000)))
(3,(Some(venkat),Some(30000)))
(5,(None,Some(50000)))
(2,(Some(raj),Some(20000)))

---------------------------------------------------
scala> prdd1.cartesian(prdd2).collect.foreach(println)
((1,kalyan),(1,10000))
((1,kalyan),(2,20000))
((1,kalyan),(3,30000))
((1,kalyan),(5,50000))
((2,raj),(1,10000))
((2,raj),(2,20000))
((2,raj),(3,30000))
((2,raj),(5,50000))
((3,venkat),(1,10000))
((3,venkat),(2,20000))
((3,venkat),(3,30000))
((3,venkat),(5,50000))
((4,raju),(1,10000))
((4,raju),(2,20000))
((4,raju),(3,30000))
((4,raju),(5,50000))


---------------------------------------------------

scala> prdd1.cogroup(prdd2).collect.foreach(println)
(4,(CompactBuffer(raju),CompactBuffer()))
(1,(CompactBuffer(kalyan),CompactBuffer(10000)))
(3,(CompactBuffer(venkat),CompactBuffer(30000)))
(5,(CompactBuffer(),CompactBuffer(50000)))
(2,(CompactBuffer(raj),CompactBuffer(20000)))

scala> prdd1.cogroup(prdd2, prdd2).collect.foreach(println)
(4,(CompactBuffer(raju),CompactBuffer(),CompactBuffer()))
(1,(CompactBuffer(kalyan),CompactBuffer(10000),CompactBuffer(10000)))
(3,(CompactBuffer(venkat),CompactBuffer(30000),CompactBuffer(30000)))
(5,(CompactBuffer(),CompactBuffer(50000),CompactBuffer(50000)))
(2,(CompactBuffer(raj),CompactBuffer(20000),CompactBuffer(20000)))

scala> prdd1.cogroup(prdd2, prdd2, prdd2).collect.foreach(println)
(4,(CompactBuffer(raju),CompactBuffer(),CompactBuffer(),CompactBuffer()))
(1,(CompactBuffer(kalyan),CompactBuffer(10000),CompactBuffer(10000),CompactBuffer(10000)))
(3,(CompactBuffer(venkat),CompactBuffer(30000),CompactBuffer(30000),CompactBuffer(30000)))
(5,(CompactBuffer(),CompactBuffer(50000),CompactBuffer(50000),CompactBuffer(50000)))
(2,(CompactBuffer(raj),CompactBuffer(20000),CompactBuffer(20000),CompactBuffer(20000)))

---------------------------------------------------
Note: CoGroup allows only on maximum 4 PairRDD's
---------------------------------------------------

scala> prdd1.cogroup(prdd2, prdd2, prdd2, prdd2).collect.foreach(println)
<console>:33: error: overloaded method value cogroup with alternatives:
  [W1, W2, W3](other1: org.apache.spark.rdd.RDD[(Int, W1)], other2: org.apache.spark.rdd.RDD[(Int, W2)], other3: org.apache.spark.rdd.RDD[(Int, W3)], numPartitions: Int)org.apache.spark.rdd.RDD[(Int, (Iterable[String], Iterable[W1], Iterable[W2], Iterable[W3]))] <and>
  [W1, W2, W3](other1: org.apache.spark.rdd.RDD[(Int, W1)], other2: org.apache.spark.rdd.RDD[(Int, W2)], other3: org.apache.spark.rdd.RDD[(Int, W3)], partitioner: org.apache.spark.Partitioner)org.apache.spark.rdd.RDD[(Int, (Iterable[String], Iterable[W1], Iterable[W2], Iterable[W3]))]
 cannot be applied to (org.apache.spark.rdd.RDD[(Int, Int)], org.apache.spark.rdd.RDD[(Int, Int)], org.apache.spark.rdd.RDD[(Int, Int)], org.apache.spark.rdd.RDD[(Int, Int)])
       prdd1.cogroup(prdd2, prdd2, prdd2, prdd2).collect.foreach(println)
             ^

---------------------------------------------------

val ltuples1 = List(("raj",1), ("venkat",4), ("anil",2), ("anil",5), ("kalyan",2), ("raj",4), ("kalyan",6), ("raju",4))

val prdd3 = sc.parallelize(ltuples1, 2)

---------------------------------------------------

scala> prdd3.collectAsMap()
res1: scala.collection.Map[String,Int] = Map(kalyan -> 6, raju -> 4, raj -> 4, anil -> 5, venkat -> 4)

scala> prdd3.collect
res2: Array[(String, Int)] = Array((raj,1), (venkat,4), (anil,2), (anil,5), (kalyan,2), (raj,4), (kalyan,6), (raju,4))

---------------------------------------------------
scala> prdd3.collect.foreach(println)
(raj,1)
(venkat,4)
(anil,2)
(anil,5)
(kalyan,2)
(raj,4)
(kalyan,6)
(raju,4)

---------------------------------------------------
scala> prdd3.countByKey()
res14: scala.collection.Map[String,Long] = Map(raj -> 2, raju -> 1, venkat -> 1, anil -> 2, kalyan -> 2)

scala> prdd3.countByValue()
res15: scala.collection.Map[(String, Int),Long] = Map((venkat,4) -> 1, (kalyan,6) -> 1, (anil,5) -> 1, (raj,1) -> 1, (anil,2) -> 1, (raju,4) -> 1, (raj,4) -> 1, (kalyan,2) -> 1)


---------------------------------------------------

scala> prdd3.filter( t => t._1.length > 2).collect.foreach(println)
(raj,1)
(venkat,4)
(anil,2)
(anil,5)
(kalyan,2)
(raj,4)
(kalyan,6)
(raju,4)

scala> prdd3.filter( t => t._1.length > 4).collect.foreach(println)
(venkat,4)
(kalyan,2)
(kalyan,6)

scala> prdd3.filter( t => t._2 > 4).collect.foreach(println)
(anil,5)
(kalyan,6)

---------------------------------------------------

val ltuples2 = List((0,"I am going"), (11, "to hyd"), (18, "I am learning"), (32, "spark course"))

val prdd4 = sc.parallelize(ltuples2, 1)

---------------------------------------------------

scala> val ltuples2 = List((0,"I am going"), (11, "to hyd"), (18, "I am learning"), (32, "spark course"))
ltuples2: List[(Int, String)] = List((0,I am going), (11,to hyd), (18,I am learning), (32,spark course))

scala> val prdd4 = sc.parallelize(ltuples2, 1)
prdd4: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[32] at parallelize at <console>:26


---------------------------------------------------

scala> prdd4.flatMap(t => t._2)
res20: org.apache.spark.rdd.RDD[Char] = MapPartitionsRDD[33] at flatMap at <console>:29

scala> prdd4.flatMap(t => t._2).collect.foreach(println)
I

a
m

g
o
i
n
g
t
o

h
y
d
I

a
m

l
e
a
r
n
i
n
g
s
p
a
r
k

c
o
u
r
s
e

---------------------------------------------------

scala> prdd4.flatMapValues(t => t)
res22: org.apache.spark.rdd.RDD[(Int, Char)] = MapPartitionsRDD[35] at flatMapValues at <console>:29

scala> prdd4.flatMapValues(t => t).collect.foreach(println)
(0,I)
(0, )
(0,a)
(0,m)
(0, )
(0,g)
(0,o)
(0,i)
(0,n)
(0,g)
(11,t)
(11,o)
(11, )
(11,h)
(11,y)
(11,d)
(18,I)
(18, )
(18,a)
(18,m)
(18, )
(18,l)
(18,e)
(18,a)
(18,r)
(18,n)
(18,i)
(18,n)
(18,g)
(32,s)
(32,p)
(32,a)
(32,r)
(32,k)
(32, )
(32,c)
(32,o)
(32,u)
(32,r)
(32,s)
(32,e)

---------------------------------------------------


scala> prdd4.flatMap(t => t._2.split(" ")).collect.foreach(println)
I
am
going
to
hyd
I
am
learning
spark
course

scala> prdd4.flatMapValues(t => t.split(" ")).collect.foreach(println)
(0,I)
(0,am)
(0,going)
(11,to)
(11,hyd)
(18,I)
(18,am)
(18,learning)
(32,spark)
(32,course)


---------------------------------------------------

---------------------------------------------------
Compare `aggregate` and `fold`:
-----------------------------------------

aggregate(zeroValue)(seqOp, comOp)


aggregate(intialValue)(seqOp, seqOp)

<==>

fold(intialValue)(seqOp)


fold(firstValue)(seqOp)

<==>

reduce(seqOp)

---------------------------------------------------
Compare `aggregateByKey` and `foldByKey`:
-----------------------------------------

aggregateByKey(zeroValue)(seqOp, comOp)


aggregateByKey(intialValue)(seqOp, seqOp)

<==>

foldByKey(intialValue)(seqOp)


foldByKey(firstValue)(seqOp)

<==>

reduceByKey(seqOp)


---------------------------------------------------

scala> prdd3.collect
res26: Array[(String, Int)] = Array((raj,1), (venkat,4), (anil,2), (anil,5), (kalyan,2), (raj,4), (kalyan,6), (raju,4))


scala> prdd3.groupBy(x => x._1).collect.foreach(println)
(kalyan,CompactBuffer((kalyan,2), (kalyan,6)))
(anil,CompactBuffer((anil,2), (anil,5)))
(raju,CompactBuffer((raju,4)))
(venkat,CompactBuffer((venkat,4)))
(raj,CompactBuffer((raj,1), (raj,4)))

scala> prdd3.groupByKey().collect.foreach(println)
(kalyan,CompactBuffer(2, 6))
(anil,CompactBuffer(2, 5))
(raju,CompactBuffer(4))
(venkat,CompactBuffer(4))
(raj,CompactBuffer(1, 4))

-----------------------------------------
scala> prdd3.keys.collect
res33: Array[String] = Array(raj, venkat, anil, anil, kalyan, raj, kalyan, raju)

scala> prdd3.values.collect
res34: Array[Int] = Array(1, 4, 2, 5, 2, 4, 6, 4)


---------------------------------------------------
scala> prdd4.collect
res35: Array[(Int, String)] = Array((0,I am going), (11,to hyd), (18,I am learning), (32,spark course))


scala> prdd4.map(t => t._2.reverse).collect
res37: Array[String] = Array(gniog ma I, dyh ot, gninrael ma I, esruoc kraps)

scala> prdd4.mapValues(t => t.reverse).collect
res38: Array[(Int, String)] = Array((0,gniog ma I), (11,dyh ot), (18,gninrael ma I), (32,esruoc kraps))


---------------------------------------------------
Note:
----------
rdd.partitions.count <==> rdd.getNumPartitions

---------------------------------------------------

scala> prdd4.collect
res39: Array[(Int, String)] = Array((0,I am going), (11,to hyd), (18,I am learning), (32,spark course))

scala> prdd4.map(t => t._2).collect
res40: Array[String] = Array(I am going, to hyd, I am learning, spark course)

scala> prdd4.map(t => t._2).flatMap(x => x.split(" ")).collect
res41: Array[String] = Array(I, am, going, to, hyd, I, am, learning, spark, course)

---------------------------------------------------

scala> val pr1 = prdd4.map(t => t._2).flatMap(x => x.split(" ")).map(x => (x ,1))
pr1: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[56] at map at <console>:28

scala> pr1.collect
res42: Array[(String, Int)] = Array((I,1), (am,1), (going,1), (to,1), (hyd,1), (I,1), (am,1), (learning,1), (spark,1), (course,1))

scala> pr1.reduceByKey((a,b) => a + b)
res43: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[57] at reduceByKey at <console>:31

scala> pr1.reduceByKey((a,b) => a + b).collect
res44: Array[(String, Int)] = Array((learning,1), (spark,1), (am,2), (hyd,1), (I,2), (to,1), (going,1), (course,1))

scala> pr1.groupByKey().collect
res56: Array[(String, Iterable[Int])] = Array((learning,CompactBuffer(1)), (spark,CompactBuffer(1)), (am,CompactBuffer(1, 1)), (hyd,CompactBuffer(1)), (I,CompactBuffer(1, 1)), (to,CompactBuffer(1)), (going,CompactBuffer(1)), (course,CompactBuffer(1)))

scala> pr1.groupByKey().map(t => (t._1, t._2.sum))
res57: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[63] at map at <console>:31

scala> pr1.groupByKey().map(t => (t._1, t._2.sum)).collect
res58: Array[(String, Int)] = Array((learning,1), (spark,1), (am,2), (hyd,1), (I,2), (to,1), (going,1), (course,1))

---------------------------------------------------

scala> prdd3.collect.foreach(println)
(raj,1)
(venkat,4)
(anil,2)
(anil,5)
(kalyan,2)
(raj,4)
(kalyan,6)
(raju,4)

scala> prdd3.sortBy(t => t._1).collect.foreach(println)
(anil,2)
(anil,5)
(kalyan,2)
(kalyan,6)
(raj,1)
(raj,4)
(raju,4)
(venkat,4)

scala> prdd3.sortBy(t => t._2).collect.foreach(println)
(raj,1)
(anil,2)
(kalyan,2)
(venkat,4)
(raj,4)
(raju,4)
(anil,5)
(kalyan,6)

scala> prdd3.sortByKey().collect.foreach(println)
(anil,2)
(anil,5)
(kalyan,2)
(kalyan,6)
(raj,1)
(raj,4)
(raju,4)
(venkat,4)


---------------------------------------------------
Playing with Load & Store operations
---------------------------------------------------
1. Create a RDD from DataSets(text file / csv /tsv /....)
---------------------------------------------------
val rdd = sc.textFile(<file path> , <no.of partitions>)

---------------------------------------------------
val filePath = "file:///home/orienit/spark/input/demoinput"

val rdd1 = sc.textFile(filePath)

rdd1.getNumPartitions


val rdd1 = sc.textFile(filePath, 1)

rdd1.getNumPartitions

---------------------------------------------------

scala> val filePath = "file:///home/orienit/spark/input/demoinput"
filePath: String = file:///home/orienit/spark/input/demoinput

scala> val rdd1 = sc.textFile(filePath)
rdd1: org.apache.spark.rdd.RDD[String] = file:///home/orienit/spark/input/demoinput MapPartitionsRDD[90] at textFile at <console>:26

scala> rdd1.getNumPartitions
res69: Int = 2

scala> val rdd1 = sc.textFile(filePath, 1)
rdd1: org.apache.spark.rdd.RDD[String] = file:///home/orienit/spark/input/demoinput MapPartitionsRDD[92] at textFile at <console>:26

scala> rdd1.getNumPartitions
res70: Int = 1

---------------------------------------------------
WordCount in Spark with Scala:
---------------------------------------------------
val filePath = "file:///home/orienit/spark/input/demoinput"

val fileRdd = sc.textFile(filePath, 1)

val wordsRdd = fileRdd.flatMap(line => line.split(" "))

val tupleRdd = wordsRdd.map(word => (word, 1))

val wordCountRdd = tupleRdd.reduceByKey((a,b) => a + b)

val sortedRdd = wordCountRdd.sortByKey()

sortedRdd.collect()

val outputPath = "file:///home/orienit/spark/output/wordcount"

sortedRdd.saveAsTextFile(outputPath)

---------------------------------------------------

val filePath = "file:///home/orienit/spark/input/demoinput"

val outputPath = "file:///home/orienit/spark/output/wordcount"

val fileRdd = sc.textFile(filePath, 1)

val sortedRdd = fileRdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b).sortByKey()

sortedRdd.saveAsTextFile(outputPath)

---------------------------------------------------

scala> val filePath = "file:///home/orienit/spark/input/demoinput"
filePath: String = file:///home/orienit/spark/input/demoinput

scala> val fileRdd = sc.textFile(filePath, 1)
fileRdd: org.apache.spark.rdd.RDD[String] = file:///home/orienit/spark/input/demoinput MapPartitionsRDD[94] at textFile at <console>:26

scala> fileRdd.collect
res72: Array[String] = Array(I am going, to hyd, I am learning, spark course)

scala> val wordsRdd = fileRdd.flatMap(line => line.split(" "))
wordsRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[95] at flatMap at <console>:28

scala> wordsRdd.collect
res73: Array[String] = Array(I, am, going, to, hyd, I, am, learning, spark, course)

scala> val tupleRdd = wordsRdd.map(word => (word, 1))
tupleRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[96] at map at <console>:30

scala> tupleRdd.collect
res74: Array[(String, Int)] = Array((I,1), (am,1), (going,1), (to,1), (hyd,1), (I,1), (am,1), (learning,1), (spark,1), (course,1))


scala> val wordCountRdd = tupleRdd.reduceByKey((a,b) => a + b)
wordCountRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[97] at reduceByKey at <console>:32

scala> wordCountRdd.collect
res75: Array[(String, Int)] = Array((learning,1), (spark,1), (am,2), (hyd,1), (I,2), (to,1), (going,1), (course,1))

scala> val sortedRdd = wordCountRdd.sortByKey()
sortedRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[98] at sortByKey at <console>:34

scala> sortedRdd.collect
res76: Array[(String, Int)] = Array((I,2), (am,2), (course,1), (going,1), (hyd,1), (learning,1), (spark,1), (to,1))

scala> val sortedRdd = wordCountRdd.sortBy(x => x._1)
sortedRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[101] at sortBy at <console>:34

scala> sortedRdd.collect
res77: Array[(String, Int)] = Array((I,2), (am,2), (course,1), (going,1), (hyd,1), (learning,1), (spark,1), (to,1))


scala> val outputPath = "file:///home/orienit/spark/output/wordcount"
outputPath: String = file:///home/orienit/spark/output/wordcount

scala> sortedRdd.saveAsTextFile(outputPath)


---------------------------------------------------
Spark Url:
--------------
http://localhost:4040/


---------------------------------------------------
val filePath = "file:///home/orienit/spark/input/demoinput"

val outputPath = "file:///home/orienit/spark/output/wordcount"

val fileRdd = sc.textFile(filePath, 1)

val sortedRdd = fileRdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b).sortByKey()

val wordCountRdd = fileRdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b)

wordCountRdd.saveAsTextFile(outputPath)

---------------------------------------------------

scala> wordCountRdd.toDebugString
res82: String =
(1) ShuffledRDD[113] at reduceByKey at <console>:28 []
 +-(1) MapPartitionsRDD[112] at map at <console>:28 []
    |  MapPartitionsRDD[111] at flatMap at <console>:28 []
    |  file:///home/orienit/spark/input/demoinput MapPartitionsRDD[104] at textFile at <console>:26 []
    |  file:///home/orienit/spark/input/demoinput HadoopRDD[103] at textFile at <console>:26 []


scala> sortedRdd.toDebugString
res83: String =
(1) ShuffledRDD[108] at sortByKey at <console>:28 []
 +-(1) ShuffledRDD[107] at reduceByKey at <console>:28 []
    +-(1) MapPartitionsRDD[106] at map at <console>:28 []
       |  MapPartitionsRDD[105] at flatMap at <console>:28 []
       |  file:///home/orienit/spark/input/demoinput MapPartitionsRDD[104] at textFile at <console>:26 []
       |  file:///home/orienit/spark/input/demoinput HadoopRDD[103] at textFile at <console>:26 []


---------------------------------------------------

Refer:
------------------------
http://kalyanbigdatatraining.blogspot.com/2016/10/more-details-about-input-format-and.html

---------------------------------------------------


---------------------------------------------------

---------------------------------------------------


---------------------------------------------------



---------------------------------------------------


---------------------------------------------------

---------------------------------------------------



---------------------------------------------------


---------------------------------------------------

---------------------------------------------------


---------------------------------------------------



---------------------------------------------------


---------------------------------------------------


---------------------------------------------------



---------------------------------------------------


---------------------------------------------------

---------------------------------------------------


---------------------------------------------------



---------------------------------------------------


---------------------------------------------------


---------------------------------------------------



---------------------------------------------------


---------------------------------------------------

---------------------------------------------------


---------------------------------------------------



---------------------------------------------------


---------------------------------------------------

Spark_Day2_2

---------------------------------------------------
Find the `average` of 1 to 10 using `aggregate` function?
---------------------------------------------------
val list = List(1,2,3,4,5,6,7,8,9,10)
val rdd = sc.parallelize(list, 2)

---------------------------------------------------
val zeroValue = _

def seqOp(Unit, Int) : Unit = {}

def combOp(Unit, Unit) : Unit = {}

Note: `Unit` is the mainly expected `Return Type`

---------------------------------------------------

Note: replce `Unit` with `(Int, Int)` all the places


val zeroValue = (0, 0)

def seqOp(res: (Int, Int), data: Int) : (Int, Int) = {
 (res._1 + data, res._2 + 1)
}

def combOp(res1: (Int, Int), res2: (Int, Int)) : (Int, Int) = {
 (res1._1 + res2._1, res1._2 + res2._2)
}

val tuple = rdd.aggregate(zeroValue)(seqOp,combOp)

val avg = tuple._1 / tuple._2

val avg = tuple._1.toDouble / tuple._2

---------------------------------------------------

scala> val list = List(1,2,3,4,5,6,7,8,9,10)
list: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> val rdd = sc.parallelize(list, 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> val zeroValue = (0, 0)
zeroValue: (Int, Int) = (0,0)

scala> def seqOp(res: (Int, Int), data: Int) : (Int, Int) = {
     |  (res._1 + data, res._2 + 1)
     | }
seqOp: (res: (Int, Int), data: Int)(Int, Int)

scala> def combOp(res1: (Int, Int), res2: (Int, Int)) : (Int, Int) = {
     |  (res1._1 + res2._1, res1._2 + res2._2)
     | }
combOp: (res1: (Int, Int), res2: (Int, Int))(Int, Int)

scala> val tuple = rdd.aggregate(zeroValue)(seqOp,combOp)
tuple: (Int, Int) = (55,10)

scala> val avg = tuple._1 / tuple._2
avg: Int = 5

scala> val avg = tuple._1.toDouble / tuple._2
avg: Double = 5.5


---------------------------------------------------
Note: For doing `aggregate` operation, operator need to follow `associative law and cummitive law`
---------------------------------------------------

val rdd1 = sc.parallelize(List(1,2,3,4,5,6), 2)
val rdd2 = sc.parallelize(List(1,2,1,3,2,4,4), 2)

---------------------------------------------------
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5,6), 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> val rdd2 = sc.parallelize(List(1,2,1,3,2,4,4), 2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

---------------------------------------------------
scala> rdd1.count
res0: Long = 6

scala> rdd1.first
res1: Int = 1

scala> rdd1.min
res2: Int = 1

scala> rdd1.max
res3: Int = 6

scala> rdd1.sum
res4: Double = 21.0


scala> rdd1.foreach(println)
1
4
5
6
2
3

scala> rdd1.collect
res7: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> rdd1.collect.foreach(println)
1
2
3
4
5
6

---------------------------------------------------
Importance of `coalesce` and `repartition`
---------------------------------------------------
1. use `coalesce` we can only decrease the partition number

2. use `repartition` we can increase or decrease the partition number

3. in `coalesce` bydefault `shuffle` value is `false`

4. in `repartition` always `shuffle` value is `true`

5. try to use `coalesce`, instead of `repartition` to avoid `shuffle` operation


---------------------------------------------------

scala> rdd1.coalesce(3)
res9: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[4] at coalesce at <console>:27

scala> rdd1.coalesce(3).getNumPartitions
res10: Int = 2

scala> rdd1.coalesce(1).getNumPartitions
res11: Int = 1

scala> rdd1.repartition(3)
res12: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[10] at repartition at <console>:27

scala> rdd1.repartition(3).getNumPartitions
res13: Int = 3

scala> rdd1.repartition(1).getNumPartitions
res14: Int = 1

---------------------------------------------------

scala> rdd2.collect
res18: Array[Int] = Array(1, 2, 1, 3, 2, 4, 4)

scala> rdd2.distinct
res19: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[21] at distinct at <console>:27

scala> rdd2.distinct.collect
res20: Array[Int] = Array(4, 2, 1, 3)

scala> rdd2.distinct.getNumPartitions
res22: Int = 2

scala> rdd2.distinct(1).getNumPartitions
res23: Int = 1

scala> rdd2.distinct(3).getNumPartitions
res24: Int = 3

---------------------------------------------------
scala> rdd2.collect
res25: Array[Int] = Array(1, 2, 1, 3, 2, 4, 4)

scala> rdd2.sortBy(x => x)
res26: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[41] at sortBy at <console>:27

scala> rdd2.sortBy(x => x).collect
res27: Array[Int] = Array(1, 1, 2, 2, 3, 4, 4)

scala> rdd2.sortBy(x => -x).collect
res28: Array[Int] = Array(4, 4, 3, 2, 2, 1, 1)

---------------------------------------------------

scala> rdd1.collect
res31: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> rdd1.keyBy(x => x)
res32: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[52] at keyBy at <console>:27

scala> rdd1.keyBy(x => x).collect
res33: Array[(Int, Int)] = Array((1,1), (2,2), (3,3), (4,4), (5,5), (6,6))

scala> rdd2.collect
res34: Array[Int] = Array(1, 2, 1, 3, 2, 4, 4)

scala> rdd2.keyBy(x => x).collect
res35: Array[(Int, Int)] = Array((1,1), (2,2), (1,1), (3,3), (2,2), (4,4), (4,4))

---------------------------------------------------

scala> rdd1.keyBy(x => x).collect
res36: Array[(Int, Int)] = Array((1,1), (2,2), (3,3), (4,4), (5,5), (6,6))

scala> rdd1.keyBy(x => x * x).collect
res37: Array[(Int, Int)] = Array((1,1), (4,2), (9,3), (16,4), (25,5), (36,6))

scala> rdd1.map(x => (x * x, x)).collect
res38: Array[(Int, Int)] = Array((1,1), (4,2), (9,3), (16,4), (25,5), (36,6))

---------------------------------------------------
scala> rdd1.countByValue()
res40: scala.collection.Map[Int,Long] = Map(5 -> 1, 1 -> 1, 6 -> 1, 2 -> 1, 3 -> 1, 4 -> 1)

scala> rdd2.countByValue()
res41: scala.collection.Map[Int,Long] = Map(4 -> 2, 2 -> 2, 1 -> 2, 3 -> 1)

---------------------------------------------------

scala> rdd1.collect
res42: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> rdd2.collect
res43: Array[Int] = Array(1, 2, 1, 3, 2, 4, 4)

scala> rdd1.union(rdd2)
res44: org.apache.spark.rdd.RDD[Int] = UnionRDD[64] at union at <console>:29

scala> rdd1.union(rdd2).collect
res45: Array[Int] = Array(1, 2, 3, 4, 5, 6, 1, 2, 1, 3, 2, 4, 4)

scala> rdd1.intersection(rdd2).collect
res46: Array[Int] = Array(4, 2, 1, 3)

scala> rdd1.subtract(rdd2)
res48: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[75] at subtract at <console>:29

scala> rdd1.subtract(rdd2).collect
res49: Array[Int] = Array(6, 5)

scala> rdd2.subtract(rdd1).collect
res50: Array[Int] = Array()

---------------------------------------------------

scala> rdd2.take(4)
res52: Array[Int] = Array(1, 2, 1, 3)

scala> rdd2.takeOrdered(4)
res53: Array[Int] = Array(1, 1, 2, 2)

scala> rdd2.collect
res54: Array[Int] = Array(1, 2, 1, 3, 2, 4, 4)

---------------------------------------------------
val lines = List("I am going", "to hyd", "I am learning", "spark course")

val rdd3 = sc.parallelize(lines, 1)

---------------------------------------------------
scala> val lines = List("I am going", "to hyd", "I am learning", "spark course")
lines: List[String] = List(I am going, to hyd, I am learning, spark course)

scala> val rdd3 = sc.parallelize(lines, 1)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[86] at parallelize at <console>:26


---------------------------------------------------
Importance of `FlatMap`
---------------------------------------------------
scala> rdd3.flatMap(x => x)
res57: org.apache.spark.rdd.RDD[Char] = MapPartitionsRDD[87] at flatMap at <console>:29

scala> rdd3.flatMap(x => x).collect
res58: Array[Char] = Array(I,  , a, m,  , g, o, i, n, g, t, o,  , h, y, d, I,  , a, m,  , l, e, a, r, n, i, n, g, s, p, a, r, k,  , c, o, u, r, s, e)

scala> rdd3.flatMap(x => x.split(" "))
res59: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[89] at flatMap at <console>:29

scala> rdd3.flatMap(x => x.split(" ")).collect
res60: Array[String] = Array(I, am, going, to, hyd, I, am, learning, spark, course)

scala> rdd3.flatMap(x => x.split(" ")).countByValue().foreach(println)
(course,1)
(am,2)
(going,1)
(I,2)
(hyd,1)
(to,1)
(spark,1)
(learning,1)

---------------------------------------------------

val llines = List(List("I am going"), List("to hyd"), List("I am learning"), List("spark course"))

val rdd4 = sc.parallelize(llines, 1)

---------------------------------------------------
scala> val llines = List(List("I am going"), List("to hyd"), List("I am learning"), List("spark course"))
llines: List[List[String]] = List(List(I am going), List(to hyd), List(I am learning), List(spark course))

scala> val rdd4 = sc.parallelize(llines, 1)
rdd4: org.apache.spark.rdd.RDD[List[String]] = ParallelCollectionRDD[99] at parallelize at <console>:26

scala> rdd4.flatMap(x => x)
res65: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[100] at flatMap at <console>:29

scala> rdd4.flatMap(x => x).collect
res66: Array[String] = Array(I am going, to hyd, I am learning, spark course)

---------------------------------------------------
scala> rdd1.cartesian(rdd2)
res67: org.apache.spark.rdd.RDD[(Int, Int)] = CartesianRDD[102] at cartesian at <console>:29

scala> rdd1.cartesian(rdd2).collect
res68: Array[(Int, Int)] = Array((1,1), (1,2), (1,1), (2,1), (2,2), (2,1), (3,1), (3,2), (3,1), (1,3), (1,2), (1,4), (1,4), (2,3), (2,2), (2,4), (2,4), (3,3), (3,2), (3,4), (3,4), (4,1), (4,2), (4,1), (5,1), (5,2), (5,1), (6,1), (6,2), (6,1), (4,3), (4,2), (4,4), (4,4), (5,3), (5,2), (5,4), (5,4), (6,3), (6,2), (6,4), (6,4))

scala> rdd1.cartesian(rdd2).count
res69: Long = 42

---------------------------------------------------
scala> rdd1.context
res70: org.apache.spark.SparkContext = org.apache.spark.SparkContext@6780f4f2

scala> rdd1.dependencies
res71: Seq[org.apache.spark.Dependency[_]] = List()


---------------------------------------------------
Importance of `fold`
---------------------------------------------------

scala> rdd1.fold()()
<console>:27: error: not enough arguments for method fold: (zeroValue: Int)(op: (Int, Int) => Int)Int.
Unspecified value parameter zeroValue.
       rdd1.fold()()
                ^

---------------------------------------------------
Compare `aggregate` and `fold`:
-----------------------------------------

aggregate(zeroValue)(seqOp, comOp)


aggregate(intialValue)(seqOp, seqOp)

<==>

fold(intialValue)(seqOp)

---------------------------------------------------

val zeroValue = 1

def seqOp(res: Int, data: Int) : Int = {
 res * data
}

rdd1.fold(zeroValue)(seqOp)

---------------------------------------------------
scala> val zeroValue = 1
zeroValue: Int = 1

scala> def seqOp(res: Int, data: Int) : Int = {
     |  res * data
     | }
seqOp: (res: Int, data: Int)Int

scala> rdd1.fold(zeroValue)(seqOp)
res73: Int = 720


---------------------------------------------------
scala> rdd1.glom()
res74: org.apache.spark.rdd.RDD[Array[Int]] = MapPartitionsRDD[105] at glom at <console>:27

scala> rdd1.glom().collect
res75: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6))

scala> rdd1.collect
res76: Array[Int] = Array(1, 2, 3, 4, 5, 6)

---------------------------------------------------
scala> rdd1.groupBy(x => x)
res77: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[108] at groupBy at <console>:27

scala> rdd1.groupBy(x => x).collect
res78: Array[(Int, Iterable[Int])] = Array((4,CompactBuffer(4)), (6,CompactBuffer(6)), (2,CompactBuffer(2)), (1,CompactBuffer(1)), (3,CompactBuffer(3)), (5,CompactBuffer(5)))

scala> rdd2.groupBy(x => x).collect
res79: Array[(Int, Iterable[Int])] = Array((4,CompactBuffer(4, 4)), (2,CompactBuffer(2, 2)), (1,CompactBuffer(1, 1)), (3,CompactBuffer(3)))


---------------------------------------------------
scala> rdd1.partitions.length
res80: Int = 2

scala> rdd1.getNumPartitions
res81: Int = 2

scala> rdd1.mean
res82: Double = 3.5

---------------------------------------------------

scala> rdd1.pipe("head -n 1")
res85: org.apache.spark.rdd.RDD[String] = PipedRDD[116] at pipe at <console>:27

scala> rdd1.pipe("head -n 1").collect
res86: Array[String] = Array(1, 4)

scala> rdd1.pipe("head -n 2").collect
res87: Array[String] = Array(1, 2, 4, 5)

scala> rdd1.pipe("head -n 3").collect
res88: Array[String] = Array(1, 2, 3, 4, 5, 6)


---------------------------------------------------
scala> rdd1.sample(true, 0.1, 3).collect
res92: Array[Int] = Array(1)

scala> rdd1.sample(true, 0.5, 3).collect
res93: Array[Int] = Array()

scala> rdd1.sample(true, 0.5, 3).collect
res94: Array[Int] = Array()

scala> rdd1.sample(true, 0.5, 7).collect
res95: Array[Int] = Array(1, 4, 5)

scala> rdd1.sample(true, 0.5, 7).collect
res96: Array[Int] = Array(1, 4, 5)

scala> rdd1.sample(true, 0.5, 7).collect
res97: Array[Int] = Array(1, 4, 5)

scala> rdd1.sample(true, 0.9, 7).collect
res98: Array[Int] = Array(1, 4, 4, 4, 5, 6, 6)

scala> rdd1.sample(true, 0.9, 7).collect
res99: Array[Int] = Array(1, 4, 4, 4, 5, 6, 6)


---------------------------------------------------

scala> rdd1.id
res100: Int = 1

scala> rdd1.name
res101: String = null

scala> rdd1.setName("kalyan")
res102: rdd1.type = kalyan ParallelCollectionRDD[1] at parallelize at <console>:24

scala> rdd1.name
res103: String = kalyan

scala> rdd1.setName("kalyan spark")
res104: rdd1.type = kalyan spark ParallelCollectionRDD[1] at parallelize at <console>:24

scala> rdd1.name
res105: String = kalyan spark

---------------------------------------------------
Importance of `sortBy`
---------------------------------------------------
scala> val rdd5 = rdd3.flatMap(x => x.split(" "))
rdd5: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[130] at flatMap at <console>:28

scala> val rdd5 = rdd3.flatMap(_.split(" "))
rdd5: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[131] at flatMap at <console>:28

scala> rdd5.collect
res107: Array[String] = Array(I, am, going, to, hyd, I, am, learning, spark, course)

---------------------------------------------------

scala> rdd5.sortBy(x => x)
res108: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[134] at sortBy at <console>:31

scala> rdd5.sortBy(x => x).collect
res109: Array[String] = Array(I, I, am, am, course, going, hyd, learning, spark, to)

scala> rdd5.sortBy(x => -x).collect
<console>:31: error: value unary_- is not a member of String
Error occurred in an application involving default arguments.
       rdd5.sortBy(x => -x).collect
                        ^
<console>:31: error: diverging implicit expansion for type Ordering[K]
starting with method Tuple9 in object Ordering
Error occurred in an application involving default arguments.
       rdd5.sortBy(x => -x).collect
                  ^

---------------------------------------------------

scala> rdd5.sortBy(x => x.reverse).collect
res111: Array[String] = Array(I, I, hyd, course, learning, going, spark, am, am, to)

scala> rdd5.sortBy(x => x.indexOf).collect
indexOf   indexOfSlice

scala> rdd5.sortBy(x => x.indexOf(0)).collect
res112: Array[String] = Array(I, am, going, to, hyd, I, am, learning, spark, course)

scala> rdd5.sortBy(x => x.indexOf(1)).collect
res113: Array[String] = Array(I, am, going, to, hyd, I, am, learning, spark, course)

scala> rdd5.sortBy(x => x.indexOf(3)).collect
res114: Array[String] = Array(I, am, going, to, hyd, I, am, learning, spark, course)

---------------------------------------------------
Imporatance of `zip`
---------------------------------------------------
In Scala:
-------------
val l1 = List(1,2,3,4,5,6)
val l2 = List('a','b','c','d')
val l3 = List("raj", "venkat", "ravi", "anil")

scala> l1.zip(l2).foreach(println)
(1,a)
(2,b)
(3,c)
(4,d)

scala> l2.zip(l1).foreach(println)
(a,1)
(b,2)
(c,3)
(d,4)

scala> l2.zip(l3).foreach(println)
(a,raj)
(b,venkat)
(c,ravi)
(d,anil)

scala> l3.zip(l2).foreach(println)
(raj,a)
(venkat,b)
(ravi,c)
(anil,d)


In Spark:
-------------
val r1 = sc.parallelize(List(1,2,3,4,5,6),1)
val r2 = sc.parallelize(List('a','b','c','d'),1)
val r3 = sc.parallelize(List("raj", "venkat", "ravi", "anil"),1)

---------------------------------------------------

r1.zip(r2).foreach(println)
r2.zip(r1).foreach(println)
r2.zip(r3).foreach(println)
r3.zip(r2).foreach(println)

---------------------------------------------------
Note: Can only zip RDDs with same number of elements in each partition

The below 2 statements are going to fail:
----------------------------------------------
r1.zip(r2).foreach(println)
r2.zip(r1).foreach(println)

The below 2 statements are going to work:
----------------------------------------------
r2.zip(r3).foreach(println)
r3.zip(r2).foreach(println)

scala> r2.zip(r3).foreach(println)
(a,raj)
(b,venkat)
(c,ravi)
(d,anil)

scala> r3.zip(r2).foreach(println)
(raj,a)
(venkat,b)
(ravi,c)
(anil,d)
---------------------------------------------------

scala> rdd1.++(rdd2).collect
res127: Array[Int] = Array(1, 2, 3, 4, 5, 6, 1, 2, 1, 3, 2, 4, 4)

scala> rdd1.union(rdd2).collect
res128: Array[Int] = Array(1, 2, 3, 4, 5, 6, 1, 2, 1, 3, 2, 4, 4)

scala> rdd1.union(rdd2).distinct.collect
res129: Array[Int] = Array(4, 1, 5, 6, 2, 3)


---------------------------------------------------
 RDD Types:
 ---------------
 1. Normal RDD
 2. Pair RDD

---------------------------------------------------
Importane of `Pair RDD`
---------------------------------------------------
val r1 = sc.parallelize(List(1,2,3,4,5,6,7,8),1)
val r2 = sc.parallelize(List("raj", "venkat", "ravi", "anil", "rajesh", "dev", "kalyan", "raju"),1)

---------------------------------------------------
scala> val r1 = sc.parallelize(List(1,2,3,4,5,6,7,8),1)
r1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[166] at parallelize at <console>:24

scala> val r2 = sc.parallelize(List("raj", "venkat", "ravi", "anil", "rajesh", "dev", "kalyan", "raju"),1)
r2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[167] at parallelize at <console>:24

scala> r1.zip(r2)
res130: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[168] at zip at <console>:29

scala> val prdd1 = r1.zip(r2)
prdd1: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[169] at zip at <console>:28

scala> prdd1.collect
res131: Array[(Int, String)] = Array((1,raj), (2,venkat), (3,ravi), (4,anil), (5,rajesh), (6,dev), (7,kalyan), (8,raju))

scala> prdd1.collect.foreach(println)
(1,raj)
(2,venkat)
(3,ravi)
(4,anil)
(5,rajesh)
(6,dev)
(7,kalyan)
(8,raju)

---------------------------------------------------
val ltuples = List((1,"raj"), (2,"venkat"), (3,"ravi"), (4,"anil"), (5,"rajesh"), (6,"dev"), (7,"kalyan"), (8,"raju"))

val prdd1 = sc.parallelize(ltuples, 2)

---------------------------------------------------
scala> val ltuples = List((1,"raj"), (2,"venkat"), (3,"ravi"), (4,"anil"), (5,"rajesh"), (6,"dev"), (7,"kalyan"), (8,"raju"))
ltuples: List[(Int, String)] = List((1,raj), (2,venkat), (3,ravi), (4,anil), (5,rajesh), (6,dev), (7,kalyan), (8,raju))

scala> val prdd1 = sc.parallelize(ltuples, 2)
prdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[170] at parallelize at <console>:26

---------------------------------------------------
Importance of `aggregateByKey`
---------------------------------------------------

scala> prdd1.aggregateByKey
<console>:29: error: ambiguous reference to overloaded definition,
both method aggregateByKey in class PairRDDFunctions of type [U](zeroValue: U, numPartitions: Int)(seqOp: (U, String) => U, combOp: (U, U) => U)(implicit evidence$2: scala.reflect.ClassTag[U])org.apache.spark.rdd.RDD[(Int, U)]
and  method aggregateByKey in class PairRDDFunctions of type [U](zeroValue: U, partitioner: org.apache.spark.Partitioner)(seqOp: (U, String) => U, combOp: (U, U) => U)(implicit evidence$1: scala.reflect.ClassTag[U])org.apache.spark.rdd.RDD[(Int, U)]
match expected type ?
       prdd1.aggregateByKey
             ^

---------------------------------------------------
def aggregateByKey(zeroValue: U, numPartitions: Int)(seqOp: (U, String) => U, combOp: (U, U) => U)
---------------------------------------------------
val ltuples1 = List(("raj",1), ("venkat",4), ("anil",2), ("anil",5), ("kalyan",2), ("raj",4), ("kalyan",6), ("raju",4))

val prdd2 = sc.parallelize(ltuples1, 2)

---------------------------------------------------
scala> prdd2.aggregateByKey
<console>:29: error: ambiguous reference to overloaded definition,
both method aggregateByKey in class PairRDDFunctions of type [U](zeroValue: U, numPartitions: Int)(seqOp: (U, Int) => U, combOp: (U, U) => U)(implicit evidence$2: scala.reflect.ClassTag[U])org.apache.spark.rdd.RDD[(String, U)]
and  method aggregateByKey in class PairRDDFunctions of type [U](zeroValue: U, partitioner: org.apache.spark.Partitioner)(seqOp: (U, Int) => U, combOp: (U, U) => U)(implicit evidence$1: scala.reflect.ClassTag[U])org.apache.spark.rdd.RDD[(String, U)]
match expected type ?
       prdd2.aggregateByKey
             ^



---------------------------------------------------
def aggregateByKey(zeroValue: U, numPartitions: Int)(seqOp: (U, Int) => U, combOp: (U, U) => U)
---------------------------------------------------
val zeroValue = U
def seqOp(U, Int) : U = {}
def combOp(U, U) : U = {}
---------------------------------------------------
Note: `Unit` => Int to sum the values

val zeroValue = 0
def seqOp(Int, Int) : Int = {}
def combOp(Int, Int) : Int = {}

---------------------------------------------------
val zeroValue = 0

def seqOp(res: Int, data: Int) : Int = {
 res + data
}

def combOp(res1: Int, res2: Int) : Int = {
 res1 + res2
}

prdd2.aggregateByKey(zeroValue)(seqOp, combOp)

---------------------------------------------------


scala> val zeroValue = 0
zeroValue: Int = 0

scala> def seqOp(res: Int, data: Int) : Int = {
     |  res + data
     | }
seqOp: (res: Int, data: Int)Int

scala> def combOp(res1: Int, res2: Int) : Int = {
     |  res1 + res2
     | }
combOp: (res1: Int, res2: Int)Int


---------------------------------------------------

scala> prdd2.aggregateByKey(zeroValue)(seqOp, combOp)
res135: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[172] at aggregateByKey at <console>:41

scala> prdd2.aggregateByKey(zeroValue)(seqOp, combOp).collect
res136: Array[(String, Int)] = Array((kalyan,8), (anil,7), (raju,4), (venkat,4), (raj,5))

scala> prdd2.aggregateByKey(zeroValue)(seqOp, combOp).collect.foreach(println)
(kalyan,8)
(anil,7)
(raju,4)
(venkat,4)
(raj,5)

---------------------------------------------------
Importance of `combineByKey`
---------------------------------------------------
scala> prdd2.combineByKey
<console>:29: error: ambiguous reference to overloaded definition,
both method combineByKey in class PairRDDFunctions of type [C](createCombiner: Int => C, mergeValue: (C, Int) => C, mergeCombiners: (C, C) => C)org.apache.spark.rdd.RDD[(String, C)]
and  method combineByKey in class PairRDDFunctions of type [C](createCombiner: Int => C, mergeValue: (C, Int) => C, mergeCombiners: (C, C) => C, numPartitions: Int)org.apache.spark.rdd.RDD[(String, C)]
match expected type ?
       prdd2.combineByKey
             ^


---------------------------------------------------
def combineByKey(createCombiner: Int => C, mergeValue: (C, Int) => C, mergeCombiners: (C, C) => C)
---------------------------------------------------

def createCombiner(Int) : C = {}
def mergeValue(C, Int) : C = {}
def mergeCombiners(C, C) : C = {}

---------------------------------------------------
def createCombiner(data: Int) : List[Int] = {
 data :: Nil
}

def mergeValue(res: List[Int], data: Int) : List[Int] = {
 res :+ data
}

def mergeCombiners(res1: List[Int], res2: List[Int]) : List[Int] = {
 res1 ::: res2
}

prdd2.combineByKey(createCombiner, mergeValue, mergeCombiners)

---------------------------------------------------


scala> def createCombiner(data: Int) : List[Int] = {
     |  data :: Nil
     | }
createCombiner: (data: Int)List[Int]

scala> def mergeValue(res: List[Int], data: Int) : List[Int] = {
     |  res :+ data
     | }
mergeValue: (res: List[Int], data: Int)List[Int]

scala> def mergeCombiners(res1: List[Int], res2: List[Int]) : List[Int] = {
     |  res1 ::: res2
     | }
mergeCombiners: (res1: List[Int], res2: List[Int])List[Int]

scala> prdd2.combineByKey(createCombiner, mergeValue, mergeCombiners)
res139: org.apache.spark.rdd.RDD[(String, List[Int])] = ShuffledRDD[175] at combineByKey at <console>:41

scala> prdd2.combineByKey(createCombiner, mergeValue, mergeCombiners).collect
res140: Array[(String, List[Int])] = Array((kalyan,List(2, 6)), (anil,List(2, 5)), (raju,List(4)), (venkat,List(4)), (raj,List(1, 4)))

scala> prdd2.combineByKey(createCombiner, mergeValue, mergeCombiners).collect.foreach(println)
(kalyan,List(2, 6))
(anil,List(2, 5))
(raju,List(4))
(venkat,List(4))
(raj,List(1, 4))


---------------------------------------------------
scala> val pr1 = prdd2.combineByKey(createCombiner, mergeValue, mergeCombiners)
pr1: org.apache.spark.rdd.RDD[(String, List[Int])] = ShuffledRDD[178] at combineByKey at <console>:40

scala> pr1.collect
res142: Array[(String, List[Int])] = Array((kalyan,List(2, 6)), (anil,List(2, 5)), (raju,List(4)), (venkat,List(4)), (raj,List(1, 4)))

scala> pr1.map( t => (t._1, t._2.sum))
res143: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[179] at map at <console>:43

scala> pr1.map( t => (t._1, t._2.sum)).collect
res144: Array[(String, Int)] = Array((kalyan,8), (anil,7), (raju,4), (venkat,4), (raj,5))


scala> pr1.map( t => (t._1, t._2.size)).collect
res146: Array[(String, Int)] = Array((kalyan,2), (anil,2), (raju,1), (venkat,1), (raj,2))

scala> pr1.map( t => (t._1, t._2.min)).collect
res147: Array[(String, Int)] = Array((kalyan,2), (anil,2), (raju,4), (venkat,4), (raj,1))

scala> pr1.map( t => (t._1, t._2.max)).collect
res148: Array[(String, Int)] = Array((kalyan,6), (anil,5), (raju,4), (venkat,4), (raj,4))


---------------------------------------------------



---------------------------------------------------


---------------------------------------------------

---------------------------------------------------


---------------------------------------------------



---------------------------------------------------


---------------------------------------------------

SPARK_DAY1_PRACTICE

Spark:
---------------------
Apache Spark™ is a fast and general engine for large-scale data processing.


Spark Libraries on Spark-Core:
--------------------------------
1. Spark SQL
2. Spark Streaming
3. Spark MLLib
4. Spark GraphX


Spark Supports 4 Programming Languages:
------------------------------------------
Scala, Java, Python ,R


How to Start the Spark in Command Line:
--------------------------------------------
Scala => $SPARK_HOME/bin/spark-shell
Python => $SPARK_HOME/bin/pyspark
R => $SPARK_HOME/bin/sparkR


Spark-2.x:
--------------------------------------------
Spark context available as 'sc'
Spark session available as 'spark'


Spark-1.x:
--------------------------------------------
Spark context available as 'sc'
Spark SQLContext available as 'sqlContext'


Note:
------------
`Spark Context` is `Entry Point` for any `Spark Operations`.


Resilient Distributed DataSets (RDD):
--------------------------------------------

RDD Features:
----------------
1. Immutability
2. Lazy Evaluation
3. Cacheable
4. Type Infer


RDD Operations:
-----------------
1. Transformations

Convert `RDD` into `RDD`

ex: old_rdd ==> new_rdd


2. Actions

Convert `RDD` into `Result`

ex: rdd ==> result



How to Create a RDD?
-----------------------------------
We can create RDD in 2 ways
1. from collections (List, Set, Seq, ...)
2. from data sets (text, csv, tsv, json, ...)


How to create RDD from `collections`?
---------------------------------------
val list = List(1,2,3,4,5,6)

val rdd = sc.parallelize(<your collection>, <no.of partitions>)
val rdd = sc.parallelize(list,6)

Create a RDD with bydefault Partitions:
--------------------------------------
val rdd = sc.parallelize(list)


scala> val list = List(1,2,3,4,5,6)
list: List[Int] = List(1, 2, 3, 4, 5, 6)

scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> rdd.getNumPartitions
res0: Int = 4


Create a RDD with 2 Partitions:
--------------------------------------
val rdd = sc.parallelize(list, 2)

scala> val rdd = sc.parallelize(list, 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:26

scala> rdd.getNumPartitions
res1: Int = 2

---------------------------------------------------

scala> rdd.collect
res3: Array[Int] = Array(1, 2, 3, 4, 5, 6)

scala> rdd.collect.foreach(println)
1
2
3
4
5
6


---------------------------------------------------

scala> rdd.foreach(println)
1
2
3
4
5
6

scala> rdd.foreach(println)
4
1
2
3
5
6


---------------------------------------------------

Note:
------------
`collect` will collect the data from `all the partitions`

`collect` will ensure the `order of the data`

Don't use `collect` in `Real-Time Use Cases`, the reason is `It will end up with MEMORY issues`

---------------------------------------------------

x.map(data => (data, data))

or

x.map(y => (y, y))

or

x.map(z => (z, z))

or

def f(p : Int) : (Int, Int) = {
 (p,p)
}

x.map(f)


---------------------------------------------------
Importance of `mapPartitionsWithIndex`
---------------------------------------------------

scala> rdd.mapPartitionsWithIndex
<console>:29: error: missing argument list for method mapPartitionsWithIndex in class RDD
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `mapPartitionsWithIndex _` or `mapPartitionsWithIndex(_,_)(_)` instead of `mapPartitionsWithIndex`.
       rdd.mapPartitionsWithIndex
           ^

scala> rdd.mapPartitionsWithIndex()
<console>:29: error: not enough arguments for method mapPartitionsWithIndex: (f: (Int, Iterator[Int]) => Iterator[U], preservesPartitioning: Boolean)(implicit evidence$8: scala.reflect.ClassTag[U])org.apache.spark.rdd.RDD[U].
Unspecified value parameter f.
       rdd.mapPartitionsWithIndex()
                                 ^

---------------------------------------------------

def mapPartitionsWithIndex(f: (Int, Iterator[Int]) => Iterator[U], preservesPartitioning: Boolean)

def f(Int, Iterator[Int]) : Iterator[U]

---------------------------------------------------


def myFunc1(index: Int, it: Iterator[Int]) : Iterator[String]= {
 it.toList.map(data => s"index: $index, data: $data").iterator
}




---------------------------------------------------

rdd.mapPartitionsWithIndex(myFunc1)

rdd.mapPartitionsWithIndex(myFunc1).collect

rdd.mapPartitionsWithIndex(myFunc1).collect.foreach(println)

---------------------------------------------------

scala> def myFunc1(index: Int, it: Iterator[Int]) : Iterator[String]= {
     |  it.toList.map(data => s"index: $index, data: $data").iterator
     | }
myFunc1: (index: Int, it: Iterator[Int])Iterator[String]


---------------------------------------------------

scala> rdd.mapPartitionsWithIndex(myFunc1)
res28: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at mapPartitionsWithIndex at <console>:31

scala> rdd.mapPartitionsWithIndex(myFunc1).collect
res29: Array[String] = Array(index: 0, data: 1, index: 0, data: 2, index: 0, data: 3, index: 1, data: 4, index: 1, data: 5, index: 1, data: 6)

scala> rdd.mapPartitionsWithIndex(myFunc1).collect.foreach(println)
index: 0, data: 1
index: 0, data: 2
index: 0, data: 3
index: 1, data: 4
index: 1, data: 5
index: 1, data: 6


---------------------------------------------------

val names = List("kalyan", "anil", "raj", "sunil", "rajesh", "dev")

val rdd1 = sc.parallelize(names, 2)


// the below one won't work
rdd1.mapPartitionsWithIndex(myFunc1)


---------------------------------------------------

scala> val names = List("kalyan", "anil", "raj", "sunil", "rajesh", "dev")
names: List[String] = List(kalyan, anil, raj, sunil, rajesh, dev)

scala> val rdd1 = sc.parallelize(names, 2)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:26

---------------------------------------------------

scala> rdd1.mapPartitionsWithIndex(myFunc1)
<console>:31: error: type mismatch;
 found   : (Int, Iterator[Int]) => Iterator[String]
 required: (Int, Iterator[String]) => Iterator[?]
Error occurred in an application involving default arguments.
       rdd1.mapPartitionsWithIndex(myFunc1)
                                   ^




---------------------------------------------------


def myFunc2(index: Int, it: Iterator[String]) : Iterator[String]= {
 it.toList.map(data => s"index: $index, data: $data").iterator
}

rdd1.mapPartitionsWithIndex(myFunc2)

---------------------------------------------------

scala> def myFunc2(index: Int, it: Iterator[String]) : Iterator[String]= {
     |  it.toList.map(data => s"index: $index, data: $data").iterator
     | }
myFunc2: (index: Int, it: Iterator[String])Iterator[String]

scala> rdd1.mapPartitionsWithIndex(myFunc2)
res32: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at mapPartitionsWithIndex at <console>:31

scala> rdd1.mapPartitionsWithIndex(myFunc2).collect
res33: Array[String] = Array(index: 0, data: kalyan, index: 0, data: anil, index: 0, data: raj, index: 1, data: sunil, index: 1, data: rajesh, index: 1, data: dev)

scala> rdd1.mapPartitionsWithIndex(myFunc2).collect.foreach(println)
index: 0, data: kalyan
index: 0, data: anil
index: 0, data: raj
index: 1, data: sunil
index: 1, data: rajesh
index: 1, data: dev



---------------------------------------------------
How to create `Generalized Function`:
------------------------------------------
def myFunc[T](index: Int, it: Iterator[T]) : Iterator[String]= {
 it.toList.map(data => s"index: $index, data: $data").iterator
}

rdd.mapPartitionsWithIndex(myFunc).collect.foreach(println)

rdd1.mapPartitionsWithIndex(myFunc).collect.foreach(println)


---------------------------------------------------


scala> def myFunc[T](index: Int, it: Iterator[T]) : Iterator[String]= {
     |  it.toList.map(data => s"index: $index, data: $data").iterator
     | }
myFunc: [T](index: Int, it: Iterator[T])Iterator[String]

scala> rdd.mapPartitionsWithIndex(myFunc).collect.foreach(println)
index: 0, data: 1
index: 0, data: 2
index: 0, data: 3
index: 1, data: 4
index: 1, data: 5
index: 1, data: 6

scala> rdd1.mapPartitionsWithIndex(myFunc).collect.foreach(println)
index: 0, data: kalyan
index: 0, data: anil
index: 0, data: raj
index: 1, data: sunil
index: 1, data: rajesh
index: 1, data: dev

---------------------------------------------------

scala> rdd.aggregate
<console>:29: error: missing argument list for method aggregate in class RDD
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `aggregate _` or `aggregate(_)(_,_)(_)` instead of `aggregate`.
       rdd.aggregate
           ^

scala> rdd.aggregate()
<console>:29: error: missing argument list for method aggregate in class RDD
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `aggregate _` or `aggregate(_)(_,_)(_)` instead of `aggregate`.
       rdd.aggregate()
                    ^

scala> rdd.aggregate()()
<console>:29: error: not enough arguments for method aggregate: (seqOp: (Unit, Int) => Unit, combOp: (Unit, Unit) => Unit)(implicit evidence$29: scala.reflect.ClassTag[Unit])Unit.
Unspecified value parameters seqOp, combOp.
       rdd.aggregate()()
                      ^

---------------------------------------------------

def aggregate(zeroValue)(seqOp: (Unit, Int) => Unit, combOp: (Unit, Unit) => Unit)

---------------------------------------------------

val zeroValue = Int

def seqOp(Unit, Int) : Unit = {}

def combOp(Unit, Unit) : Unit = {}

Note: `Unit` is the mainly expected `Return Type`

---------------------------------------------------
Implementation of `aggregate`
---------------------------------------------------

Example1: Sum the RDD (Unit => Int)
---------------------------------------------
Note: replce `Unit` with `Int` all the places

val zeroValue = 0

def seqOp(res: Int, data: Int) : Int = {
 res + data
}

def combOp(res1: Int, res2: Int) : Int = {
 res1 + res2
}

rdd.aggregate(zeroValue)(seqOp,combOp)

---------------------------------------------------

scala> val zeroValue = 0
zeroValue: Int = 0

scala> def seqOp(res: Int, data: Int) : Int = {
     |  res + data
     | }
seqOp: (res: Int, data: Int)Int

scala> def combOp(res1: Int, res2: Int) : Int = {
     |  res1 + res2
     | }
combOp: (res1: Int, res2: Int)Int

scala> rdd.aggregate(zeroValue)(seqOp,combOp)
res40: Int = 21



---------------------------------------------------

Example2: Multiply the RDD (Unit => Int)
---------------------------------------------
Note: replce `Unit` with `Int` all the places

val zeroValue = 1

def seqOp(res: Int, data: Int) : Int = {
 res * data
}

def combOp(res1: Int, res2: Int) : Int = {
 res1 * res2
}

rdd.aggregate(zeroValue)(seqOp,combOp)


---------------------------------------------------

scala> val zeroValue = 1
zeroValue: Int = 1

scala> def seqOp(res: Int, data: Int) : Int = {
     |  res * data
     | }
seqOp: (res: Int, data: Int)Int

scala> def combOp(res1: Int, res2: Int) : Int = {
     |  res1 * res2
     | }
combOp: (res1: Int, res2: Int)Int

scala> rdd.aggregate(zeroValue)(seqOp,combOp)
res41: Int = 720


---------------------------------------------------
Find the `average` of 1 to 10 using `aggregate` function?
---------------------------------------------------


---------------------------------------------------



---------------------------------------------------

scala> rdd.reduce
<console>:29: error: missing argument list for method reduce in class RDD
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `reduce _` or `reduce(_)` instead of `reduce`.
       rdd.reduce
           ^

scala> rdd.reduce()
<console>:29: error: not enough arguments for method reduce: (f: (Int, Int) => Int)Int.
Unspecified value parameter f.
       rdd.reduce()
                 ^
---------------------------------------------------

def f(Int, Int) : Int

---------------------------------------------------

def sum(res: Int, data: Int) : Int = {res + data}

def mul(res: Int, data: Int) : Int = {res * data}

---------------------------------------------------

scala> def sum(res: Int, data: Int) : Int = {res + data}
sum: (res: Int, data: Int)Int

scala> def mul(res: Int, data: Int) : Int = {res * data}
mul: (res: Int, data: Int)Int

scala> rdd.reduce(sum)
res44: Int = 21

scala> rdd.reduce(mul)
res45: Int = 720

---------------------------------------------------
Compare `aggregate` and `reduce`:
-----------------------------------------

aggregate(zeroValue)(seqOp, comOp)


aggregate(intialValue)(seqOp, seqOp)

<==>

reduce(seqOp)

---------------------------------------------------

Transformations in Spark:
----------------------------
scala> rdd.collect.foreach(println)
1
2
3
4
5
6

scala> rdd.map(x => x + 1)
res49: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[15] at map at <console>:29

scala> rdd.map(x => x + 1).collect.foreach(println)
2
3
4
5
6
7

scala> rdd.map(x => x * x).collect.foreach(println)
1
4
9
16
25
36

scala> rdd.map(x => x * x * x).collect.foreach(println)
1
8
27
64
125
216

---------------------------------------------------

scala> rdd.filter(x => x > 3)
res53: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[19] at filter at <console>:29

scala> rdd.filter(x => x > 3).collect.foreach(println)
4
5
6

scala> rdd.filter(x => x % 2 == 1).collect.foreach(println)
1
3
5

scala> rdd.filter(x => x % 2 == 0).collect.foreach(println)
2
4
6


---------------------------------------------------

scala> rdd.filter(x => x % 2 == 0).collect.foreach(println)
2
4
6

scala> rdd.filter(_ % 2 == 0).collect.foreach(println)
2
4
6

scala> rdd.filter(x => x > 3).collect.foreach(println)
4
5
6

scala> rdd.filter(_ > 3).collect.foreach(println)
4
5
6

scala> rdd.filter(_ > 3).collect.foreach(x => println(x))
[Stage 45:>                                                         (0 +                                                                        4     
5
6

scala> rdd.filter(_ > 3).collect.foreach(println)
4
5
6

scala> rdd.filter(_ > 3).collect.foreach(println(_))
4
5
6


---------------------------------------------------

val data = sc.textFile("hdfs:///user/tsipl1059/lakshmi/spark_sample.txt")
data: org.apache.spark.rdd.RDD[String] = hdfs:///user/tsipl1059/lakshmi/spark_sample.txt MapPartitionsRDD[1] at textFile at <console>:24

data.collect()
res0: Array[String] = Array(this is my first spark smaple data., Load data into RDD using scala language., "testing the data ", count the number of lines in a file)

---------------------------------------------------

---------------------------------------------------

val r2 = sc.textFile("/user/tsipl1115/input.txt") =====================> where file located in hdfs and moving that into rdd

---------------------------------------------------

val r1 = sc.textFile("/home/tsipl1115/input.txt")========================> where file located in local and moving that into rdd

---------------------------------------------------


---------------------------------------------------

---------------------------------------------------


---------------------------------------------------



---------------------------------------------------


---------------------------------------------------

---------------------------------------------------


---------------------------------------------------



---------------------------------------------------


---------------------------------------------------

spark_streaming_examples

Create Spark Streaming Context: ========================================== scala: --------------- import org.apache.spark._ import ...