Thursday, 18 May 2017

Hadoop/MR vs Spark/RDD Example by Word count analysis



Apache Spark provides an efficient way for solving iterative algorithms by keeping the intermediate data in the memory. This avoids the overhead of R/W of the intermediate data from the disk as in the case of MR.

Also, when running the same operation again and again, data can be cached/fetched from the memory without performing the same operation again. MR is stateless, lets say a program/application in MR has been executed 10 times, then the whole data set has to be scanned 10 times.


Also, namesake MR supports only Map and Reduce operations and everything (join, groupby etc) has to be fit into the Map and Reduce model, which might not be the efficient way. Spark supports a couple of other transformations and actions besides just Map and Reduce as mentioned here and here.

Spark code is also compact when compared to the MR code. Below is the program for performing the WordCount using Python in Spark.

from pyspark import SparkContext


sc = SparkContext("spark://bigdata-vm:7077", "WordCount")

textFile = sc.textFile(logFile)

wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)

wordCounts.saveAsTextFile("hdfs://localhost:9000/user/bigdatavm/output")

and the same in MR model using Hadoop is a bit verbose as shown below. 
 package org.apache.hadoop.examples;


import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

    public static class TokenizerMapper extends
        Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer extends
        Reducer<Text, IntWritable, Text, IntWritable> {

        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<Intwritable> values,
        Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args)
            .getRemainingArgs();

        Job job = new Job(conf, "word count");

        job.setJarByClass(WordCount.class);

        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Here we looked into WordCount which is similar to HelloWorld program in terms of simplicity, for one to get started with a new concept/technology/language. In the future blogs, we will look into a little more advanced features in Spark.

Analyzing the airline dataset with Spark/Python


The airline dataset in the in this blog we will see how to do the analytics with Spark using Python. Programs in Spark can be implemented in Scala (Spark is built using Scala), Java, Python and the recently added R languages.

It took 5 min 30 sec for the processing, Not sure why Spark is not faster than MR as claimed, need to look into it a bit more. Here is the output (12) of the Python program.



The Spark program is a bit more concise (short) when compared to the corresponding MR program. For those who are new to Spark, here is the Spark Python API.


  from pyspark import SparkContext
from pyspark import SparkContext


import csv


def extractData(record) :


    #split the input record by comma

    splits = record.split(',')


    #extract the actual/scheduled departure time and the origin

    actualDepTime = splits[4]

    scheduledDepTime = splits[5]

    origin = splits[16]


    #1 delayed

    #0 don't know or not delayed

    delayed = 0


    # Check if the actual/scheduled departure time is a digit

    if actualDepTime.isdigit() & scheduledDepTime.isdigit():


        #if the flight got delayed or not

        if int(actualDepTime) > int(scheduledDepTime) :

            delayed = 1


    #return the origin and delayed status as a tuple

    return origin, delayed


#create the SparkConf() instance on which the different configurations can be done

conf = SparkConf().setMaster("spark://bigdata-server:7077")


#establish a connection to Spark

sc = SparkContext(conf = conf, appName = "flightDataAnalysis")


#load the input data

lines = sc.textFile("hdfs://localhost:9000/user/bigdata/airline/input")


#figure out the delayed flights and cache the data as it is being processed twice later

delayedFlights = lines.map(extractData).cache()


#get the delayed flights

delayedFlights.reduceByKey(lambda a, b : b).saveAsTextFile("hdfs://localhost:9000/user/bigdata/airline/spark-output/delayedFlights")


#get the total flights

totalFlights = delayedFlights.countByKey()


#totalFlights is dictionary. Iterate the same and write to a file

w=csv.writer(open("/home/bigdata/WorkSpace/PyCharm/AirlineDataset/output/totalFlights/totalFlights.csv", "w"))

for key, val in totalFlights.items():

    w.writerow([key, val])


As I mentioned in the previous blog, I do have a bit of high end machine for the data processing and below is resource usage in the Ubuntu system monitor when the above program is running. The CPU has 8 cores and all of them are running on full steam.



Now that we have seen how to convert CSV into Parquet format in the previous blog using Hive. We will look into how to process the same Parquet data with Spark using the DataFrame feature. After processing the data, the output is stored in the JSON format, so as to make it human readable. 

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext

conf = SparkConf().setMaster("spark://bigdata-server:7077")
sc = SparkContext(conf = conf, appName = "flightDataAnalysis")
sqlContext = SQLContext(sc)

# load the parquet data
df = sqlContext.read.load("hdfs://localhost:9000/user/bigdata/airline/input-parquet")

# find the number of delayed flights per origin
delayedCount = df.filter(df.deptime > df.crsdeptime).groupBy(df.origin).count()

# Stored the delayed count as JSON

delayedCount.write.json("hdfs://localhost:9000/user/bigdata/airline/spark-output/delayedFlights")


Pycharm has been used to develop the program in Python and then executed in the Spark Standalone mode using the spark-submit command. As shown below it took 24 s to get the delayed flights.



Here is the output back in HDFS.



Here is the matrix, which shows the execution times with different softwares and formats on the same dataset and for the same problem.



In the future blogs, we will look into how to do the same processing with other softwares and also how to optimize it.


Creating a Linux EC2 instance and logging into it

Now we know that how to create a Linux EC2 INSTANCE IN THE AWS cloud and access the same,I will create a simple static web on the same.

1. When i creates a security group  the port 22 was opened. This will enable the access to the remote machine. But, a WebServer listens to port 80 and the same has to be opened.

2. Select the Security Group and click on the Inbound tab.



3. Click on Edit and the rule for the HTTP inbound traffic as shown below.



Now, the Security Group rules should appear like this.



Note that if the Linux or the Windows instance is running, then any changes to the Security Group will take place immediately. There is no need to restart the EC2 instance.

4. The best practice is to update the package on the Linux instance with the `sudo yum update -y`. When prompted select yes.

5.  Install the WebServer and start it. The first command to elevate the user to root, the second command will install the WebServer and the final one will start the same.

1
2
3
sudo su
yum install -y httpd
service httpd start

The WebServer won't start automatically, if the Linux instance is rebooted. If it has to be started automatically on boot/reboot use the below command

1
chkconfig httpd on

6. Go to the /var/www/html folder and create an index.html file using the below commands


1
2
cd /var/www/html
echo "Welcome to thecloudavenue.com" > index.html

7. Get the ip address of the Linux instance from the EC2 management console.


8. Open the same in the browser to get the web page.

The above sequence of steps will simply install a WebServer on the Linux instance and put a simple web page in it. Much more interesting things can be done. One thing to note is that all the above commands were executed as root, which is not a really good a good practice, but was done for the sake of simplicity.

spark_streaming_examples

Create Spark Streaming Context: ========================================== scala: --------------- import org.apache.spark._ import ...