Thursday, 18 May 2017

Analyzing the airline dataset with Spark/Python


The airline dataset in the in this blog we will see how to do the analytics with Spark using Python. Programs in Spark can be implemented in Scala (Spark is built using Scala), Java, Python and the recently added R languages.

It took 5 min 30 sec for the processing, Not sure why Spark is not faster than MR as claimed, need to look into it a bit more. Here is the output (12) of the Python program.



The Spark program is a bit more concise (short) when compared to the corresponding MR program. For those who are new to Spark, here is the Spark Python API.


  from pyspark import SparkContext
from pyspark import SparkContext


import csv


def extractData(record) :


    #split the input record by comma

    splits = record.split(',')


    #extract the actual/scheduled departure time and the origin

    actualDepTime = splits[4]

    scheduledDepTime = splits[5]

    origin = splits[16]


    #1 delayed

    #0 don't know or not delayed

    delayed = 0


    # Check if the actual/scheduled departure time is a digit

    if actualDepTime.isdigit() & scheduledDepTime.isdigit():


        #if the flight got delayed or not

        if int(actualDepTime) > int(scheduledDepTime) :

            delayed = 1


    #return the origin and delayed status as a tuple

    return origin, delayed


#create the SparkConf() instance on which the different configurations can be done

conf = SparkConf().setMaster("spark://bigdata-server:7077")


#establish a connection to Spark

sc = SparkContext(conf = conf, appName = "flightDataAnalysis")


#load the input data

lines = sc.textFile("hdfs://localhost:9000/user/bigdata/airline/input")


#figure out the delayed flights and cache the data as it is being processed twice later

delayedFlights = lines.map(extractData).cache()


#get the delayed flights

delayedFlights.reduceByKey(lambda a, b : b).saveAsTextFile("hdfs://localhost:9000/user/bigdata/airline/spark-output/delayedFlights")


#get the total flights

totalFlights = delayedFlights.countByKey()


#totalFlights is dictionary. Iterate the same and write to a file

w=csv.writer(open("/home/bigdata/WorkSpace/PyCharm/AirlineDataset/output/totalFlights/totalFlights.csv", "w"))

for key, val in totalFlights.items():

    w.writerow([key, val])


As I mentioned in the previous blog, I do have a bit of high end machine for the data processing and below is resource usage in the Ubuntu system monitor when the above program is running. The CPU has 8 cores and all of them are running on full steam.



Now that we have seen how to convert CSV into Parquet format in the previous blog using Hive. We will look into how to process the same Parquet data with Spark using the DataFrame feature. After processing the data, the output is stored in the JSON format, so as to make it human readable. 

from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext

conf = SparkConf().setMaster("spark://bigdata-server:7077")
sc = SparkContext(conf = conf, appName = "flightDataAnalysis")
sqlContext = SQLContext(sc)

# load the parquet data
df = sqlContext.read.load("hdfs://localhost:9000/user/bigdata/airline/input-parquet")

# find the number of delayed flights per origin
delayedCount = df.filter(df.deptime > df.crsdeptime).groupBy(df.origin).count()

# Stored the delayed count as JSON

delayedCount.write.json("hdfs://localhost:9000/user/bigdata/airline/spark-output/delayedFlights")


Pycharm has been used to develop the program in Python and then executed in the Spark Standalone mode using the spark-submit command. As shown below it took 24 s to get the delayed flights.



Here is the output back in HDFS.



Here is the matrix, which shows the execution times with different softwares and formats on the same dataset and for the same problem.



In the future blogs, we will look into how to do the same processing with other softwares and also how to optimize it.


1 comment:

spark_streaming_examples

Create Spark Streaming Context: ========================================== scala: --------------- import org.apache.spark._ import ...