Friday, 7 July 2017

Apache Spark : RDD vs DataFrame vs Dataset



With Spark2.0 release, there are 3 types of data abstractions which Spark officially provides now to use : RDD,DataFrame and DataSet .
For a new user, it might be confusing to understand relevance of each one and decide which one to use and which one not to. In this post, will discuss each one of them in detail with their differences and pros-cons.


Short Combined Intro :
Before i discuss each one in detail separately, want to start with a short combined intro.
Evolution of these abstractions happened in this way :
RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)
RDD being the oldest available from 1.0 version to Dataset being the newest available from 1.6 version.
Given same data, each of the 3 abstraction will compute and give same results to user. But they differ in performance and the ways they compute.
RDD lets us decide HOW we want to do which limits the optimisation Spark can do on processing underneath where as dataframe/dataset lets us decide WHAT we want to do and leave everything on Spark to decide how to do computation. 

We will understand this in 2 minutes what is meant by HOW & WHAT .

Dataframe came as a major performance improvement over RDD but not without some downsides.
This led to development of Dataset which is an effort  to unify best of RDD and data frame.
In future, Dataset will eventually replace RDD and Dataframe to become the only API spark users should be using in code.
Lets understand them in detail one by one.

RDD:

-Its building block of spark. No matter which abstraction Dataframe or Dataset we use, internally final computation is done on RDDs. 
-RDD is lazily evaluated immutable parallel collection of objects exposed with lambda functions.
-The best part about RDD is that it is simple. It provides familiar OOPs style APIs with compile time safety. We can load any data from a source,convert them into RDD and store in memory to compute results. RDD can be easily cached if same set of data needs to recomputed.
-But the disadvantage is performance limitations. Being in-memory jvm objects, RDDs involve overhead of Garbage Collection and Java(or little better Kryo) Serialisation which are expensive when data grows.


RDD example:


Dataframe:

-DataFrame is an abstraction which gives a schema view of data.  Which means it gives us a view of data as columns with column name and types info, We can think data in data frame like a table in database.
-Like RDD, execution in Dataframe too is lazy triggered .
-offers huge performance improvement over RDDs because of 2 powerful features it has: 

1. Custom Memory management  (aka Project Tungsten)
Data is stored in off-heap memory in binary format. This saves a lot of memory space. Also there is no Garbage Collection overhead involved. By knowing the schema of data in advance and storing efficiently in binary format, expensive java Serialization is also avoided.

2. Optimized Execution Plans        (aka Catalyst Optimizer)
       Query plans are created for execution using Spark catalyst optimiser. After an optimised execution plan is prepared going through some steps, the final execution happens internally on RDDs only but thats completely hidden from the users.

Execution plan stages

Just to give an example of optimisation with respect to the above picture, lets consider a query as below  :


inefficient query: filter after join



In the above query, filter is used before join which is a costly shuffle operation. The logical plan sees that and in optimised logical plan, this filter is pushed to execute before join. In the optimised execution plan, it can leverage datasource capabilities also and push that filter further down to datasource so that it can apply that filter on the disk level rather than bringing all data in memory and doing filter in memory (which is not possible while directly using RDDs). So filter method now effectively works like a WHERE clause in a database query. Also with optimised data sources like parquet , if Spark sees that you need only few columns to compute the results , it will read and fetch only those columns from parquet saving both disk IO and memory.

-Drawback :

 Lack of Type Safety. As a developer, i will not like using dataframe as it doesn't seem developer friendly. Referring attribute by String names means no compile time safety. Things can fail at runtime. Also APIs doesn't look programmatic and more of sql kind.

Dataframe example:  

 2 ways to define: 
  
 1. Expression BuilderStyle   
 2. SQL Style


As discussed, If we try using some columns not present in schema, we will get problem only at runtime . For example, if we try accessing salary when only name and age are present in the schema will exception like below:


Dataset:

-It is an extension to Dataframe API, the latest abstraction which tries to provide best of both RDD and Dataframe.
-comes with OOPs style and developer friendly compile time safety like RDD as well as performance boosting features of Dataframe : Catalyst optimiser and custom memory management.
-How dataset scores over Dataframe is an additional feature it has: Encoders .
-Encoders act as interface between JVM objects and off-heap custom memory binary format data. 
-Encoders generate byte code to interact with off-heap data and provide on-demand access to individual attributes without having to de-serialize an entire object.
-case class is used to define the structure of data schema in Dataset. Using case class, its very easy to work with dataset. Names of different attributes in case class is directly mapped to attributes in Dataset . It gives feeling like working with RDD but actually underneath it works same as Dataframe.
Dataframe is infact treated as dataset of generic row objects. DataFrame=Dataset[Row] . So we can always convert a data frame at any point of time into a dataset by calling ‘as’ method on Dataframe.
 e.g.  df.as[MyClass]

Dataset Example :


Important point  to remember is that both Dataset and DataFrame internally does final execution on RDD objects only but the difference is users do not write code to create the RDD collections and have no control as such over RDDs.  RDDs are created in the execution plan as last stage after deciding and going through all the optimizations (see Execution Plan Diagram). 
Thats why at the beginning of this post i emphasized on……..RDD let us decide HOW we want to do where as Dataframe/Dataset lets us decide WHAT we want to do.  
And all these optimisations could have been possible because data is structured and Spark knows about the schema of data in advance. So it can apply all the powerful features like tungsten custom memory off-heap binary storage,catalyst optimiser and encoders to get the performance which was not possible if users would have been directly working on RDD.


Conclusion:

In short, Spark is moving from unstructured computation(RDDs) towards structured computation because of many performance optimisations it allows . Data frame was a step in direction of structured computation but lacked developer friendliness of compile time safety,lambda functions. Finally Dataset is the unification of Dataframe and RDD to bring the best abstraction out of two.
Going forward developers should only be concerned about DataSet while Dataframe and RDD will be discouraged to use. But its always better to be aware of the legacy for better understanding of internals.
Interestingly,most of these new concepts like custom memory management(tungsten),logical/physical plans(catalyst optimizer),encoders(dataset),etc seems to be inspired from its competitor Apache Flink which inherently supports these since inception.There are other new powerful feature enhancements like windowing,sessions,etc coming in Spark which are already in Flink. So its better to keep a close watch on both Spark and Flink in coming days.

Wednesday, 21 June 2017

Converting Airline dataset from the row format to columnar format using AWS EMR


To process Big Data huge number of machines are required. Instead of buying them, it's better to process the data in the Cloud as it provides lower CAPEX and OPEX costs. In this blog we will at processing the airline data set in the AWS EMR (Elastic MapReduce). EMR provides Big Data as a service. We don't need to worry about installing, configuring, patching, security aspects of the Big Data software. EMR takes care of them, just we need specify the size and the number of the machines in the cluster, the location of the input/output data and finally the program to run. It's as easy as this.
The Airline dataset is in a csv format which is efficient for fetching the data in a row wise format based on some condition. But, not really efficient when we want to do some aggregations. So, we would be converting the CSV data into Parquet format and then run the same queries on the csv and Parquet format to observe the performance improvements.


Note that using the AWS EMR will incur cost and doesn't fall under the AWS free tier as we would be launching not the t2.micro EC2 instances, but a bit bigger EC2 instances. I will try to keep the cost to the minimum as this is a demo. Also, I prepared the required scripts ahead and tested them in the local machine on small data sets instead of the AWS EMR. This will save the AWS expenses.

So here are the steps :

Step 1 : Download the Airline data set from here and uncompress the same. All the data sets can be downloaded and uncompressed. But, to keep the cost to the minimum I downloaded the 1987, 1989, 1991, 1993 and 2007 related data and uploaded to S3 as shown below.






Step 2 : Create a folder called scripts and upload them to S3.


The '1-create-tables-move-data.sql' script will create the ontime and the ontime_parquet_snappy table, map the data to the table and finally move the data from the ontime table to the ontime_parquet_snappy table after transforming the data from the csv to the Parquet format. Below is the SQL for the same.

create external table ontime (
  Year INT,
  Month INT,
  DayofMonth INT,
  DayOfWeek INT,
  DepTime  INT,
  CRSDepTime INT,
  ArrTime INT,
  CRSArrTime INT,
  UniqueCarrier STRING,
  FlightNum INT,
  TailNum STRING,
  ActualElapsedTime INT,
  CRSElapsedTime INT,
  AirTime INT,
  ArrDelay INT,
  DepDelay INT,
  Origin STRING,
  Dest STRING,
  Distance INT,
  TaxiIn INT,
  TaxiOut INT,
  Cancelled INT,
  CancellationCode STRING,
  Diverted STRING,
  CarrierDelay INT,
  WeatherDelay INT,
  NASDelay INT,
  SecurityDelay INT,
  LateAircraftDelay INT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 's3://airline-dataset/airline-csv/';

create external table ontime_parquet_snappy (
  Year INT,
  Month INT,
  DayofMonth INT,
  DayOfWeek INT,
  DepTime  INT,
  CRSDepTime INT,
  ArrTime INT,
  CRSArrTime INT,
  UniqueCarrier STRING,
  FlightNum INT,
  TailNum STRING,
  ActualElapsedTime INT,
  CRSElapsedTime INT,
  AirTime INT,
  ArrDelay INT,
  DepDelay INT,
  Origin STRING,
  Dest STRING,
  Distance INT,
  TaxiIn INT,
  TaxiOut INT,
  Cancelled INT,
  CancellationCode STRING,
  Diverted STRING,
  CarrierDelay INT,
  WeatherDelay INT,
  NASDelay INT,
  SecurityDelay INT,
  LateAircraftDelay INT
) STORED AS PARQUET LOCATION 's3://airline-dataset/airline-parquet-snappy/' TBLPROPERTIES ("orc.compress"="SNAPPY");

INSERT OVERWRITE TABLE ontime_parquet_snappy SELECT * FROM ontime;

The '2-run-queries-csv.sql' script will run the query on the ontime table which maps to the csv data. Below is the query.

INSERT OVERWRITE DIRECTORY 's3://airline-dataset/csv-query-output' select Origin, count(*) from ontime where DepTime > CRSDepTime group by Origin;

The '3-run-queries-parquet.sql' script will run the query on the ontime_parquet_snappy table which maps to the Parquet-Snappy data. Below is the query. 

INSERT OVERWRITE DIRECTORY 's3://airline-dataset/parquet-snappy-query-output' select Origin, count(*) from ontime_parquet_snappy where DepTime > CRSDepTime group by Origin;

Step 3 : Goto the EMR management console and click on the 'Go to advanced options'.


Step 4 : Here select the software to be installed on the instances. For this blog we need Hadoop 2.7.3 and Hive 2.1.1. Make sure these are selected, the rest are optional. Here we can add a step. According to the AWS documentation, this is the definition of Step - 'Each step is a unit of work that contains instructions to manipulate data for processing by software installed on the cluster.'. This can be a MR program, Hive Query, Pig Script or something else. The steps can be added here or later. We will add steps later. Click on Next.


Step 5 : In this step, we can select the number of instances we want to run and the size of each instance. We will leave them as default and click on next.


Step 6 : In this step, we can select additional settings like the cluster name, the S3 log path location and so on. Make sure the 'S3 folder' points to the log folder in S3 and click on Next. Uncheck the 'Termination protection' option.


Step 7 : In this screen again all the default options are good enough. If we want to ssh into the EC2 instances then the 'EC2 key pair' has to be selected. Here are the instructions on how to create a key pair. Finally click on 'Create cluster' to launch the cluster.


Initially the cluster will be in a Starting state and the EC2 instances will be launched as shown below



Within a few minutes, the cluster will be in a running state and the Steps (data processing programs) can be added to the cluster.
.

Step 8 : Add a Step to the cluster by clicking on the 'Add step' and pointing to the '1-create-tables-move-data.sql' file as shown below and click on Add. The processing will start on the cluster.



The Step will be in a Pending status for some time and then move to the Completed status after the processing has been done.



Once the processing has been complete the csv data will be converted into a Parquet format with Snappy compression and put into S3 as shown below.


Note that the csv data was close to 2,192 MB and the Parquet Snappy data is around 190 MB. The Parquet data is in columnar format and provides higher compression compared to the csv format. This enables to fit more data into the memory and so quicker results.

Step 9 : Now add 2 more steps using the '2-run-queries-csv.sql' and the '3-run-queries-parquet.sql'. The first sql file will run the query on the csv data table and the second will run the query on the Parquet Snappy table. Both the queries are the same, returning the same results in S3.

Step 10 : Check the step log files for the steps to get the execution times in the EMR management console.

Converting the CSV to Parquet Snappy format - 148 seconds
Executing the query on the csv data - 90 seconds
Executing the query on the Parquet Snappy data – 56 seconds
Executing the query on the Parquet Snappy data - 56 seconds

Note that the query runs faster on the Parquet Snappy data, when compared to the csv data. I was expecting the query to run a bit faster, need to look into this a bit more.

Step 11 : Now that the processing has been done, it's time to terminate the cluster. Click on Terminate and again on Terminate. It will take a few minutes for the cluster to terminate.


Note that the EMR cluster will be terminated and EC2 instances will also terminate.



Step 12 : Go back to S3 management console the below folders should be there. Clean up by deleteing the bucket. I would be keeping the data, so that I can try Athena and RedShift on the CSV and the Parquet Snappy data. Note that 5GB of S3 data can be stored for free upto one year. More details about the AWS free tier here.



Thursday, 18 May 2017

Hadoop/MR vs Spark/RDD Example by Word count analysis



Apache Spark provides an efficient way for solving iterative algorithms by keeping the intermediate data in the memory. This avoids the overhead of R/W of the intermediate data from the disk as in the case of MR.

Also, when running the same operation again and again, data can be cached/fetched from the memory without performing the same operation again. MR is stateless, lets say a program/application in MR has been executed 10 times, then the whole data set has to be scanned 10 times.


Also, namesake MR supports only Map and Reduce operations and everything (join, groupby etc) has to be fit into the Map and Reduce model, which might not be the efficient way. Spark supports a couple of other transformations and actions besides just Map and Reduce as mentioned here and here.

Spark code is also compact when compared to the MR code. Below is the program for performing the WordCount using Python in Spark.

from pyspark import SparkContext


sc = SparkContext("spark://bigdata-vm:7077", "WordCount")

textFile = sc.textFile(logFile)

wordCounts = textFile.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)

wordCounts.saveAsTextFile("hdfs://localhost:9000/user/bigdatavm/output")

and the same in MR model using Hadoop is a bit verbose as shown below. 
 package org.apache.hadoop.examples;


import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

    public static class TokenizerMapper extends
        Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer extends
        Reducer<Text, IntWritable, Text, IntWritable> {

        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<Intwritable> values,
        Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args)
            .getRemainingArgs();

        Job job = new Job(conf, "word count");

        job.setJarByClass(WordCount.class);

        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Here we looked into WordCount which is similar to HelloWorld program in terms of simplicity, for one to get started with a new concept/technology/language. In the future blogs, we will look into a little more advanced features in Spark.

Analyzing the airline dataset with Spark/Python


The airline dataset in the in this blog we will see how to do the analytics with Spark using Python. Programs in Spark can be implemented in Scala (Spark is built using Scala), Java, Python and the recently added R languages.

It took 5 min 30 sec for the processing, Not sure why Spark is not faster than MR as claimed, need to look into it a bit more. Here is the output (12) of the Python program.



The Spark program is a bit more concise (short) when compared to the corresponding MR program. For those who are new to Spark, here is the Spark Python API.


  from pyspark import SparkContext
from pyspark import SparkContext


import csv


def extractData(record) :


    #split the input record by comma

    splits = record.split(',')


    #extract the actual/scheduled departure time and the origin

    actualDepTime = splits[4]

    scheduledDepTime = splits[5]

    origin = splits[16]


    #1 delayed

    #0 don't know or not delayed

    delayed = 0


    # Check if the actual/scheduled departure time is a digit

    if actualDepTime.isdigit() & scheduledDepTime.isdigit():


        #if the flight got delayed or not

        if int(actualDepTime) > int(scheduledDepTime) :

            delayed = 1


    #return the origin and delayed status as a tuple

    return origin, delayed


#create the SparkConf() instance on which the different configurations can be done

conf = SparkConf().setMaster("spark://bigdata-server:7077")


#establish a connection to Spark

sc = SparkContext(conf = conf, appName = "flightDataAnalysis")


#load the input data

lines = sc.textFile("hdfs://localhost:9000/user/bigdata/airline/input")


#figure out the delayed flights and cache the data as it is being processed twice later

delayedFlights = lines.map(extractData).cache()


#get the delayed flights

delayedFlights.reduceByKey(lambda a, b : b).saveAsTextFile("hdfs://localhost:9000/user/bigdata/airline/spark-output/delayedFlights")


#get the total flights

totalFlights = delayedFlights.countByKey()


#totalFlights is dictionary. Iterate the same and write to a file

w=csv.writer(open("/home/bigdata/WorkSpace/PyCharm/AirlineDataset/output/totalFlights/totalFlights.csv", "w"))

for key, val in totalFlights.items():

    w.writerow([key, val])


As I mentioned in the previous blog, I do have a bit of high end machine for the data processing and below is resource usage in the Ubuntu system monitor when the above program is running. The CPU has 8 cores and all of them are running on full steam.



Now that we have seen how to convert CSV into Parquet format in the previous blog using Hive. We will look into how to process the same Parquet data with Spark using the DataFrame feature. After processing the data, the output is stored in the JSON format, so as to make it human readable. 

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext

conf = SparkConf().setMaster("spark://bigdata-server:7077")
sc = SparkContext(conf = conf, appName = "flightDataAnalysis")
sqlContext = SQLContext(sc)

# load the parquet data
df = sqlContext.read.load("hdfs://localhost:9000/user/bigdata/airline/input-parquet")

# find the number of delayed flights per origin
delayedCount = df.filter(df.deptime > df.crsdeptime).groupBy(df.origin).count()

# Stored the delayed count as JSON

delayedCount.write.json("hdfs://localhost:9000/user/bigdata/airline/spark-output/delayedFlights")


Pycharm has been used to develop the program in Python and then executed in the Spark Standalone mode using the spark-submit command. As shown below it took 24 s to get the delayed flights.



Here is the output back in HDFS.



Here is the matrix, which shows the execution times with different softwares and formats on the same dataset and for the same problem.



In the future blogs, we will look into how to do the same processing with other softwares and also how to optimize it.


Creating a Linux EC2 instance and logging into it

Now we know that how to create a Linux EC2 INSTANCE IN THE AWS cloud and access the same,I will create a simple static web on the same.

1. When i creates a security group  the port 22 was opened. This will enable the access to the remote machine. But, a WebServer listens to port 80 and the same has to be opened.

2. Select the Security Group and click on the Inbound tab.



3. Click on Edit and the rule for the HTTP inbound traffic as shown below.



Now, the Security Group rules should appear like this.



Note that if the Linux or the Windows instance is running, then any changes to the Security Group will take place immediately. There is no need to restart the EC2 instance.

4. The best practice is to update the package on the Linux instance with the `sudo yum update -y`. When prompted select yes.

5.  Install the WebServer and start it. The first command to elevate the user to root, the second command will install the WebServer and the final one will start the same.

1
2
3
sudo su
yum install -y httpd
service httpd start

The WebServer won't start automatically, if the Linux instance is rebooted. If it has to be started automatically on boot/reboot use the below command

1
chkconfig httpd on

6. Go to the /var/www/html folder and create an index.html file using the below commands


1
2
cd /var/www/html
echo "Welcome to thecloudavenue.com" > index.html

7. Get the ip address of the Linux instance from the EC2 management console.


8. Open the same in the browser to get the web page.

The above sequence of steps will simply install a WebServer on the Linux instance and put a simple web page in it. Much more interesting things can be done. One thing to note is that all the above commands were executed as root, which is not a really good a good practice, but was done for the sake of simplicity.

spark_streaming_examples

Create Spark Streaming Context: ========================================== scala: --------------- import org.apache.spark._ import ...