The airline dataset in the in this blog we will see how to do the analytics with Spark using Python.
Programs in Spark can be implemented in Scala (Spark is built using Scala),
Java, Python and the recently added R languages.
It took 5 min 30 sec for the processing, Not sure why Spark is not faster than MR as claimed, need to look into it a bit more. Here is the output (1, 2) of the Python program.
It took 5 min 30 sec for the processing, Not sure why Spark is not faster than MR as claimed, need to look into it a bit more. Here is the output (1, 2) of the Python program.
The Spark program is a bit more
concise (short) when compared to the corresponding MR program. For those who
are new to Spark, here is
the Spark Python API.
from
pyspark import SparkContext
w.writerow([key, val])
from
pyspark import SparkContext
import csv
def
extractData(record) :
#split
the input record by comma
splits
= record.split(',')
#extract
the actual/scheduled departure time and the origin
actualDepTime
= splits[4]
scheduledDepTime
= splits[5]
origin
= splits[16]
#1
delayed
#0
don't know or not delayed
delayed
= 0
#
Check if the actual/scheduled departure time is a digit
if
actualDepTime.isdigit() & scheduledDepTime.isdigit():
#if
the flight got delayed or not
if
int(actualDepTime) > int(scheduledDepTime) :
delayed
= 1
#return
the origin and delayed status as a tuple
return
origin, delayed
#create
the SparkConf() instance on which the different configurations can be done
conf =
SparkConf().setMaster("spark://bigdata-server:7077")
#establish
a connection to Spark
sc =
SparkContext(conf = conf, appName = "flightDataAnalysis")
#load the
input data
lines =
sc.textFile("hdfs://localhost:9000/user/bigdata/airline/input")
#figure
out the delayed flights and cache the data as it is being processed twice later
delayedFlights
= lines.map(extractData).cache()
#get the
delayed flights
delayedFlights.reduceByKey(lambda a, b : b).saveAsTextFile("hdfs://localhost:9000/user/bigdata/airline/spark-output/delayedFlights")
#get the
total flights
totalFlights
= delayedFlights.countByKey()
#totalFlights
is dictionary. Iterate the same and write to a file
w=csv.writer(open("/home/bigdata/WorkSpace/PyCharm/AirlineDataset/output/totalFlights/totalFlights.csv",
"w"))
for key,
val in totalFlights.items():
As I mentioned in the previous blog, I
do have a bit of high end machine for the data processing and below is resource
usage in the Ubuntu system monitor when the above program is running. The CPU
has 8 cores and all of them are running on full steam.
Now that we have seen how to convert
CSV into Parquet format in the previous blog using Hive. We will look into how to process the same
Parquet data with Spark using the DataFrame feature. After processing the data, the output is
stored in the JSON format, so as to make it human readable.
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
conf = SparkConf().setMaster("spark://bigdata-server:7077")
sc = SparkContext(conf = conf, appName = "flightDataAnalysis")
sqlContext = SQLContext(sc)
# load the parquet data
df = sqlContext.read.load("hdfs://localhost:9000/user/bigdata/airline/input-parquet")
# find the number of delayed flights per origin
delayedCount = df.filter(df.deptime >
df.crsdeptime).groupBy(df.origin).count()
# Stored the delayed count as JSON
delayedCount.write.json("hdfs://localhost:9000/user/bigdata/airline/spark-output/delayedFlights")
Pycharm has been used to develop the
program in Python and then executed in the Spark
Standalone mode using the spark-submit command. As shown below it took
24 s to get the delayed flights.
Here is the output back in HDFS.
Here is the matrix, which shows the
execution times with different softwares and formats on the same dataset and
for the same problem.
In the future blogs, we will look into
how to do the same processing with other softwares and also how to optimize it.
good blog
ReplyDeleteSpark
and Scala Online Training