Monday 23 September 2019

spark_2.x_sql_examples_1.

----------------------------------------------------------------------------------
SPARK SQL INSTALLATION STEPS
----------------------------------------------------------------------------------

----------------------------------------------------------------------------------
SPARK-2.x SQL installation steps
----------------------------------------------------------------------------------
1. copy "kalyan_spark_2.x_jars" foder to "$SPARK_HOME" folder

2. copy "$SPARK_HOME/conf/spark-defaults.conf.template" file as "$SPARK_HOME/conf/spark-defaults.conf"

3. add the below lines to "$SPARK_HOME/conf/spark-defaults.conf" file

spark.driver.memory 5g

spark.jars    /home/orienit/spark/spark-2.0.2-bin-hadoop2.6/kalyan_spark_2.x_jars/guava-16.0.1.jar,/home/orienit/spark/spark-2.0.2-bin-hadoop2.6/kalyan_spark_2.x_jars/spark-cassandra-connector_2.11-2.0.0-M3.jar,/home/orienit/spark/spark-2.0.2-bin-hadoop2.6/kalyan_spark_2.x_jars/cassandra-driver-core-3.0.2.jar,/home/orienit/spark/spark-2.0.2-bin-hadoop2.6/kalyan_spark_2.x_jars/jackson-core-2.5.2.jar,/home/orienit/spark/spark-2.0.2-bin-hadoop2.6/kalyan_spark_2.x_jars/jackson-databind-2.5.2.jar,/home/orienit/spark/spark-2.0.2-bin-hadoop2.6/kalyan_spark_2.x_jars/jackson-module-scala_2.11-2.5.2.jar,/home/orienit/spark/spark-2.0.2-bin-hadoop2.6/kalyan_spark_2.x_jars/jsr166e-1.1.0.jar,/home/orienit/spark/spark-2.0.2-bin-hadoop2.6/kalyan_spark_2.x_jars/mysql-connector-java-5.1.38.jar,/home/orienit/spark/spark-2.0.2-bin-hadoop2.6/kalyan_spark_2.x_jars/phoenix-4.7.0-HBase-1.1-client.jar,/home/orienit/spark/spark-2.0.2-bin-hadoop2.6/kalyan_spark_2.x_jars/phoenix-spark-4.7.0-HBase-1.1.jar

4. start the `spark-shell` with below command

$SPARK_HOME/bin/spark-shell

5. copy "input" foder to "/home/orienit/spark" folder

----------------------------------------------------------------------------------
SPARK-1.x SQL installation steps
----------------------------------------------------------------------------------
1. copy "kalyan_spark_1.x_jars" foder to "$SPARK_HOME" folder

2. copy "$SPARK_HOME/conf/spark-defaults.conf.template" file as "$SPARK_HOME/conf/spark-defaults.conf"

3. add the below lines to "$SPARK_HOME/conf/spark-defaults.conf" file

spark.driver.memory 5g

spark.jars    /home/orienit/spark/spark-1.6.0-bin-hadoop2.6/kalyan_spark_1.x_jars/guava-18.0.jar,/home/orienit/spark/spark-1.6.0-bin-hadoop2.6/kalyan_spark_1.x_jars/cassandra-driver-core-3.0.0.jar,/home/orienit/spark/spark-1.6.0-bin-hadoop2.6/kalyan_spark_1.x_jars/jackson-core-2.5.2.jar,/home/orienit/spark/spark-1.6.0-bin-hadoop2.6/kalyan_spark_1.x_jars/jackson-databind-2.5.2.jar,/home/orienit/spark/spark-1.6.0-bin-hadoop2.6/kalyan_spark_1.x_jars/jackson-module-scala_2.10-2.5.2.jar,/home/orienit/spark/spark-1.6.0-bin-hadoop2.6/kalyan_spark_1.x_jars/jsr166e-1.1.0.jar,/home/orienit/spark/spark-1.6.0-bin-hadoop2.6/kalyan_spark_1.x_jars/mysql-connector-java-5.1.38.jar,/home/orienit/spark/spark-1.6.0-bin-hadoop2.6/kalyan_spark_1.x_jars/phoenix-4.7.0-HBase-1.1-client.jar,/home/orienit/spark/spark-1.6.0-bin-hadoop2.6/kalyan_spark_1.x_jars/phoenix-spark-4.7.0-HBase-1.1.jar,/home/orienit/spark/spark-1.6.0-bin-hadoop2.6/kalyan_spark_1.x_jars/spark-cassandra-connector_2.10-1.6.0-M2.jar

4. start the `spark-shell` with below command

$SPARK_HOME/bin/spark-shell

5. copy "input" foder to "/home/orienit/spark" folder



----------------------------------------------------------------------------------
SPARK SQL EXAMPLES with SCALA
----------------------------------------------------------------------------------
import org.apache.spark._
import org.apache.spark.sql._

Spark-2.x:
-------------------------------------------------
val spark = SparkSession.builder().appName("Kalyan Spark SQL Examples").getOrCreate()
val sc: SparkContext = spark.sparkContext
val sqlContext: SQLContext = spark.sqlContext


Spark-1.x & Spark-2.x:
-------------------------------------------------
val conf: SparkConf = new SparkConf().setAppName("Kalyan Spark SQL Examples").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val sqlContext: SQLContext = new org.apache.spark.sql.SQLContext(sc)



Spark-2.x:
-------------------------------------------------
val df = spark.read.json("file:///home/orienit/spark/input/student.json")
val df = spark.read.parquet("file:///home/orienit/spark/input/student.parquet")


Spark-1.x & Spark-2.x:
-------------------------------------------------
val df = sqlContext.read.json("file:///home/orienit/spark/input/student.json")
val df = sqlContext.read.parquet("file:///home/orienit/spark/input/student.parquet")


scala> df.show
+------+---+------+----+
|course| id|  name|year|
+------+---+------+----+
| spark|  1|  anil|2016|
|hadoop|  5|anvith|2015|
|hadoop|  6|   dev|2015|
| spark|  3|   raj|2016|
|hadoop|  4| sunil|2015|
| spark|  2|venkat|2016|
+------+---+------+----+


scala> df.select("name", "id").show
scala> df.select($"name", $"id").show
+------+---+
|  name| id|
+------+---+
|  anil|  1|
|anvith|  5|
|   dev|  6|
|   raj|  3|
| sunil|  4|
|venkat|  2|
+------+---+


scala> df.select(df("name"), df("id") + 1).show
scala> df.select($"name", $"id" + 1).show
+------+--------+
|  name|(id + 1)|
+------+--------+
|  anil|       2|
|anvith|       6|
|   dev|       7|
|   raj|       4|
| sunil|       5|
|venkat|       3|
+------+--------+

scala> df.filter(df("id") > 4).show
scala> df.filter($"id" > 4).show
scala> df.filter("id > 4").show
+------+---+------+----+
|course| id|  name|year|
+------+---+------+----+
|hadoop|  5|anvith|2015|
|hadoop|  6|   dev|2015|
+------+---+------+----+


scala> df.filter(df("id") > 4 && df("id") < 6).show
scala> df.filter($"id" > 4 && $"id" < 6).show
scala> df.filter("id > 4 and id < 6").show
+------+---+------+----+
|course| id|  name|year|
+------+---+------+----+
|hadoop|  5|anvith|2015|
+------+---+------+----+


scala> df.where(df("id") > 2).show
scala> df.where($"id" > 2).show
scala> df.where("id > 2").show
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|anvith|  5|hadoop|2015|
|   dev|  6|hadoop|2015|
|   raj|  3| spark|2016|
| sunil|  4|hadoop|2015|
+------+---+------+----+


scala> df.limit(4).show
+------+---+------+----+
|course| id|  name|year|
+------+---+------+----+
| spark|  1|  anil|2016|
|hadoop|  5|anvith|2015|
|hadoop|  6|   dev|2015|
| spark|  3|   raj|2016|
+------+---+------+----+


scala> df.toJSON.foreach(println)
{"name":"anil","id":1,"course":"spark","year":2016}
{"name":"anvith","id":5,"course":"hadoop","year":2015}
{"name":"dev","id":6,"course":"hadoop","year":2015}
{"name":"raj","id":3,"course":"spark","year":2016}
{"name":"sunil","id":4,"course":"hadoop","year":2015}
{"name":"venkat","id":2,"course":"spark","year":2016}


scala> df.groupBy(df("course"), df("year")).count.show
scala> df.groupBy($"course", $"year").count.show
scala> df.groupBy("course", "year").count.show
+------+----+-----+
|course|year|count|
+------+----+-----+
| spark|2016|    3|
|hadoop|2015|    3|
+------+----+-----+


scala> df.groupBy(df("id"), df("course"), df("year")).count.show
scala> df.groupBy($"id", $"course", $"year").count.show
scala> df.groupBy("id", "course", "year").count.show
+---+------+----+-----+
| id|course|year|count|
+---+------+----+-----+
|  6|hadoop|2015|    1|
|  5|hadoop|2015|    1|
|  3| spark|2016|    1|
|  4|hadoop|2015|    1|
|  2| spark|2016|    1|
|  1| spark|2016|    1|
+---+------+----+-----+


scala> df.orderBy(df("course")).show
scala> df.orderBy($"course").show
scala> df.orderBy("course").show
+------+---+------+----+
|course| id|  name|year|
+------+---+------+----+
|hadoop|  4| sunil|2015|
|hadoop|  5|anvith|2015|
|hadoop|  6|   dev|2015|
| spark|  2|venkat|2016|
| spark|  3|   raj|2016|
| spark|  1|  anil|2016|
+------+---+------+----+


scala> df.orderBy(df("course"), df("name")).show
scala> df.orderBy($"course", $"name").show
scala> df.orderBy("course", "name").show
+------+---+------+----+
|course| id|  name|year|
+------+---+------+----+
|hadoop|  5|anvith|2015|
|hadoop|  6|   dev|2015|
|hadoop|  4| sunil|2015|
| spark|  1|  anil|2016|
| spark|  3|   raj|2016|
| spark|  2|venkat|2016|
+------+---+------+----+


scala> df.sort(df("course"), df("id")).show
scala> df.sort($"course", $"id").show
scala> df.sort("course", "id").show
+------+---+------+----+
|course| id|  name|year|
+------+---+------+----+
|hadoop|  4| sunil|2015|
|hadoop|  5|anvith|2015|
|hadoop|  6|   dev|2015|
| spark|  1|  anil|2016|
| spark|  2|venkat|2016|
| spark|  3|   raj|2016|
+------+---+------+----+


Using `case class` create `DataFrame`
-------------------------------------------------------------------
case class Contact(cid: Int, name: String, loc: String, pincode:Int)
case class Orders(oid: Int, cid: Int, status: String)


Spark-2.x
----------------------------------------------------------------------
val contact = spark.read.csv("file:///home/orienit/spark/input/contact.csv")
val cdf = contact.map(c => Contact(c(0).toString.toInt, c(1).toString, c(2).toString, c(3).toString.toInt))

val orders = spark.read.text("file:///home/orienit/spark/input/orders.tsv").map(_.getAs[String]("value").split("\t"))
val odf = orders.map(x => Orders(x(0).trim.toInt, x(1).trim.toInt, x(2))).toDF()


Spark-2.x & Spark-1.x
----------------------------------------------------------------------
val contact = sc.textFile("file:///home/orienit/spark/input/contact.csv").map(_.split(","))
val cdf = contact.map(c => Contact(c(0).toInt, c(1), c(2), c(3).toInt)).toDF()

val orders = sc.textFile("file:///home/orienit/spark/input/orders.tsv").map(_.split("\t"))
val odf = orders.map(x => Orders(x(0).trim.toInt, x(1).trim.toInt, x(2))).toDF()


scala> cdf.show
+---+------+----+-------+
|cid|  name| loc|pincode|
+---+------+----+-------+
|  1|kalyan| hyd| 500072|
|  2|venkat| hyd| 500073|
|  3|prasad|bang| 600076|
|  4|anvith|bang| 600075|
+---+------+----+-------+


scala> odf.show
+---+---+-------+
|oid|cid| status|
+---+---+-------+
|111|  1|success|
|112|  1|failure|
|113|  2|success|
|114|  3|success|
|115|  2|failure|
+---+---+-------+


scala> cdf.join(odf).show
+---+------+----+-------+---+---+-------+
|cid|  name| loc|pincode|oid|cid| status|
+---+------+----+-------+---+---+-------+
|  1|kalyan| hyd| 500072|111|  1|success|
|  1|kalyan| hyd| 500072|112|  1|failure|
|  1|kalyan| hyd| 500072|113|  2|success|
|  2|venkat| hyd| 500073|111|  1|success|
|  2|venkat| hyd| 500073|112|  1|failure|
|  2|venkat| hyd| 500073|113|  2|success|
|  3|prasad|bang| 600076|111|  1|success|
|  3|prasad|bang| 600076|112|  1|failure|
|  3|prasad|bang| 600076|113|  2|success|
|  1|kalyan| hyd| 500072|114|  3|success|
|  1|kalyan| hyd| 500072|115|  2|failure|
|  2|venkat| hyd| 500073|114|  3|success|
|  2|venkat| hyd| 500073|115|  2|failure|
|  3|prasad|bang| 600076|114|  3|success|
|  3|prasad|bang| 600076|115|  2|failure|
|  4|anvith|bang| 600075|111|  1|success|
|  4|anvith|bang| 600075|112|  1|failure|
|  4|anvith|bang| 600075|113|  2|success|
|  4|anvith|bang| 600075|114|  3|success|
|  4|anvith|bang| 600075|115|  2|failure|
+---+------+----+-------+---+---+-------+


scala> cdf.join(odf, cdf("cid") === odf("cid")).show
+---+------+----+-------+---+---+-------+
|cid|  name| loc|pincode|oid|cid| status|
+---+------+----+-------+---+---+-------+
|  1|kalyan| hyd| 500072|111|  1|success|
|  1|kalyan| hyd| 500072|112|  1|failure|
|  2|venkat| hyd| 500073|113|  2|success|
|  2|venkat| hyd| 500073|115|  2|failure|
|  3|prasad|bang| 600076|114|  3|success|
+---+------+----+-------+---+---+-------+


scala> cdf.join(odf, cdf("cid") === odf("cid"), "left_outer").show
+---+------+----+-------+----+----+-------+
|cid|  name| loc|pincode| oid| cid| status|
+---+------+----+-------+----+----+-------+
|  1|kalyan| hyd| 500072| 111|   1|success|
|  1|kalyan| hyd| 500072| 112|   1|failure|
|  2|venkat| hyd| 500073| 113|   2|success|
|  2|venkat| hyd| 500073| 115|   2|failure|
|  3|prasad|bang| 600076| 114|   3|success|
|  4|anvith|bang| 600075|null|null|   null|
+---+------+----+-------+----+----+-------+


scala> cdf.join(odf, cdf("cid") === odf("cid"), "right_outer").show
+---+------+----+-------+---+---+-------+
|cid|  name| loc|pincode|oid|cid| status|
+---+------+----+-------+---+---+-------+
|  1|kalyan| hyd| 500072|111|  1|success|
|  1|kalyan| hyd| 500072|112|  1|failure|
|  2|venkat| hyd| 500073|113|  2|success|
|  2|venkat| hyd| 500073|115|  2|failure|
|  3|prasad|bang| 600076|114|  3|success|
+---+------+----+-------+---+---+-------+


scala> cdf.join(odf, cdf("cid") === odf("cid"), "full_outer").show
+---+------+----+-------+----+----+-------+
|cid|  name| loc|pincode| oid| cid| status|
+---+------+----+-------+----+----+-------+
|  1|kalyan| hyd| 500072| 111|   1|success|
|  1|kalyan| hyd| 500072| 112|   1|failure|
|  2|venkat| hyd| 500073| 113|   2|success|
|  2|venkat| hyd| 500073| 115|   2|failure|
|  3|prasad|bang| 600076| 114|   3|success|
|  4|anvith|bang| 600075|null|null|   null|
+---+------+----+-------+----+----+-------+


scala> cdf.join(odf, cdf("cid") === odf("cid"), "inner").show
+---+------+----+-------+---+---+-------+
|cid|  name| loc|pincode|oid|cid| status|
+---+------+----+-------+---+---+-------+
|  1|kalyan| hyd| 500072|111|  1|success|
|  1|kalyan| hyd| 500072|112|  1|failure|
|  2|venkat| hyd| 500073|113|  2|success|
|  2|venkat| hyd| 500073|115|  2|failure|
|  3|prasad|bang| 600076|114|  3|success|
+---+------+----+-------+---+---+-------+


scala> cdf.unionAll(cdf).show
+---+------+----+-------+
|cid|  name| loc|pincode|
+---+------+----+-------+
|  1|kalyan| hyd| 500072|
|  2|venkat| hyd| 500073|
|  3|prasad|bang| 600076|
|  4|anvith|bang| 600075|
|  1|kalyan| hyd| 500072|
|  2|venkat| hyd| 500073|
|  3|prasad|bang| 600076|
|  4|anvith|bang| 600075|
+---+------+----+-------+


scala> cdf.unionAll(df).show
+------+------+------+-------+
|   cid|  name|   loc|pincode|
+------+------+------+-------+
|     1|kalyan|   hyd| 500072|
|     2|venkat|   hyd| 500073|
|     3|prasad|  bang| 600076|
|     4|anvith|  bang| 600075|
| spark|     1|  anil|   2016|
|hadoop|     5|anvith|   2015|
|hadoop|     6|   dev|   2015|
| spark|     3|   raj|   2016|
|hadoop|     4| sunil|   2015|
| spark|     2|venkat|   2016|
+------+------+------+-------+


scala> cdf.intersect(cdf).show
+---+------+----+-------+
|cid|  name| loc|pincode|
+---+------+----+-------+
|  2|venkat| hyd| 500073|
|  4|anvith|bang| 600075|
|  1|kalyan| hyd| 500072|
|  3|prasad|bang| 600076|
+---+------+----+-------+


scala> cdf.intersect(df).show
+---+----+---+-------+
|cid|name|loc|pincode|
+---+----+---+-------+
+---+----+---+-------+


----------------------------------------------------------------------------------
RDBMS SPARK EXAMPLES
----------------------------------------------------------------------------------

----------------------------------------------------------------------------------
Using JDBC To Connect Databases
----------------------------------------------------------------------------------
SPARK_CLASSPATH=<path-to-mysql-jar>/mysql-connector-java-5.1.38.jar $SPARK_HOME/bin/spark-shell

$SPARK_HOME/bin/spark-shell --driver-class-path <path-to-mysql-jar>/mysql-connector-java-5.1.38.jar

$SPARK_HOME/bin/spark-shell --jars <path-to-mysql-jar>/mysql-connector-java-5.1.38.jar


----------------------------------------------------------------------------------
Mysql Operations
----------------------------------------------------------------------------------
mysql -u root -p

CREATE DATABASE IF NOT EXISTS kalyan;

CREATE TABLE kalyan.student(name VARCHAR(50) PRIMARY KEY, id INT, course VARCHAR(50), year INT);

INSERT INTO kalyan.student(name, id, course, year) VALUES ('anil', 1, 'spark', 2016);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('venkat', 2, 'spark', 2016);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('raj', 3, 'spark', 2016);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('sunil', 4, 'hadoop', 2015);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('anvith', 5, 'hadoop', 2015);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('dev', 6, 'hadoop', 2015);

SELECT * FROM kalyan.student;

----------------------------------------------------------------------------------

To connect any RDBMS:
--------------------------
1. connection url
2. username & password
3. driver class name
4. client jar
--------------------------


Spark-2.x
--------------------------------------------------
spark.read.jdbc
spark.read.format("jdbc")

spark.write.jdbc
spark.write.format("jdbc")


Spark-2.x & Spark-1.x
--------------------------------------------------
sqlContext.read.jdbc
sqlContext.read.format("jdbc")

sqlContext.write.jdbc
sqlContext.write.format("jdbc")


----------------------------------------------------------------------------------

val jdbcDF = sqlContext.read.format("jdbc").options(
  Map("url" -> "jdbc:mysql://localhost:3306/kalyan?user=root&password=hadoop", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "student")).load()

(or)

val jdbcDF = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/kalyan?user=root&password=hadoop").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "student").load()

(or)

val jdbcDF = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/kalyan").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "hadoop").load()


(or)

val prop = new java.util.Properties

val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/kalyan?user=root&password=hadoop", "student", prop)


(or)

val prop = new java.util.Properties
prop.setProperty("driver","com.mysql.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","hadoop")

val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/kalyan", "student", prop)


(or)

val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/kalyan?user=root&password=hadoop", "student", Array("course='spark'"), prop)

jdbcDF.show()


val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/kalyan?user=root&password=hadoop", "student", Array("course='spark'", "year=2015"), prop)

jdbcDF.show()


val userdata = jdbcDF.select("name", "id")
userdata.show()


----------------------------------------------------------------------------------
Saving the data from `DataFrame` to `Other Systems`
----------------------------------------------------------------------------------

val prop = new java.util.Properties
prop.setProperty("driver","com.mysql.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","hadoop")

val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/kalyan", "student", prop)

jdbcDF.show()

----------------------------------------------------------------------------------
Spark-2.x & Spark-1.x
----------------------------------------------------------------------------------

Write Modes:
-----------------------
1. overwrite
2. append
3. error
4. ignore



Write DataFrame data into JSON file
----------------------------------------------------------------------------------
jdbcDF.write.json("file:///home/orienit/spark/output/student_json")

jdbcDF.write.mode("overwrite").json("file:///home/orienit/spark/output/student_json")

jdbcDF.write.mode("append").json("file:///home/orienit/spark/output/student_json")


----------------------------------------------------------------------------------
Write DataFrame data into ORC file
----------------------------------------------------------------------------------
jdbcDF.write.orc("file:///home/orienit/spark/output/student_orc")

jdbcDF.write.mode("overwrite").orc("file:///home/orienit/spark/output/student_orc")

jdbcDF.write.mode("append").orc("file:///home/orienit/spark/output/student_orc")


----------------------------------------------------------------------------------
Write DataFrame data into PARQUET file
----------------------------------------------------------------------------------
jdbcDF.write.parquet("file:///home/orienit/spark/output/student_parquet")

jdbcDF.write.mode("overwrite").parquet("file:///home/orienit/spark/output/student_parquet")

jdbcDF.write.mode("append").parquet("file:///home/orienit/spark/output/student_parquet")


----------------------------------------------------------------------------------
Write DataFrame data into PARQUET file
----------------------------------------------------------------------------------
jdbcDF.write.save("file:///home/orienit/spark/output/student_parquet")

jdbcDF.write.mode("overwrite").save("file:///home/orienit/spark/output/student_parquet")

jdbcDF.write.mode("append").save("file:///home/orienit/spark/output/student_parquet")


----------------------------------------------------------------------------------
Write DataFrame data into CSV file
----------------------------------------------------------------------------------
jdbcDF.write.csv("file:///home/orienit/spark/output/student_csv")

jdbcDF.write.mode("overwrite").csv("file:///home/orienit/spark/output/student_csv")

jdbcDF.write.mode("append").csv("file:///home/orienit/spark/output/student_csv")


----------------------------------------------------------------------------------

val prop = new java.util.Properties
prop.setProperty("driver","com.mysql.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","hadoop")

----------------------------------------------------------------------------------
Write DataFrame data into RDBMS table
----------------------------------------------------------------------------------
jdbcDF.write.jdbc("jdbc:mysql://localhost:3306/kalyan", "student1", prop)

jdbcDF.write.mode("overwrite").jdbc("jdbc:mysql://localhost:3306/kalyan", "student1", prop)

jdbcDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/kalyan", "student1", prop)


----------------------------------------------------------------------------------
Write DataFrame data into HIVE table
----------------------------------------------------------------------------------
jdbcDF.write.saveAsTable("student")

jdbcDF.write.saveAsTable("kalyan.student")

jdbcDF.write.insertInto("kalyan.student")

----------------------------------------------------------------------------------






----------------------------------------------------------------------------------

Below actions will Support only on Spark-1.x

Note: df.save() removed in Spark-2.x
----------------------------------------------------------------------------------
Write DataFrame data into JSON file
----------------------------------------------------------------------------------
jdbcDF.save("file:///home/orienit/spark/output/student_json", "json")

jdbcDF.save("file:///home/orienit/spark/output/student_json", "json", SaveMode.Overwrite)

jdbcDF.save("file:///home/orienit/spark/output/student_json", "json", SaveMode.Append)


----------------------------------------------------------------------------------
Write DataFrame data into ORC file
----------------------------------------------------------------------------------
jdbcDF.save("file:///home/orienit/spark/output/student_orc", "orc")

jdbcDF.save("file:///home/orienit/spark/output/student_orc", "orc", SaveMode.Overwrite)

jdbcDF.save("file:///home/orienit/spark/output/student_orc", "orc", SaveMode.Append)


----------------------------------------------------------------------------------
Write DataFrame data into PARQUET file
----------------------------------------------------------------------------------
jdbcDF.save("file:///home/orienit/spark/output/student_parquet")

jdbcDF.save("file:///home/orienit/spark/output/student_parquet", SaveMode.Overwrite)

jdbcDF.save("file:///home/orienit/spark/output/student_parquet", SaveMode.Append)


----------------------------------------------------------------------------------
Write DataFrame data into PARQUET file
----------------------------------------------------------------------------------
jdbcDF.save("file:///home/orienit/spark/output/student_parquet", "parquet")

jdbcDF.save("file:///home/orienit/spark/output/student_parquet", "parquet", SaveMode.Overwrite)

jdbcDF.save("file:///home/orienit/spark/output/student_parquet", "parquet", SaveMode.Append)

jdbcDF.saveAsParquetFile("file:///home/orienit/spark/output/student_parquet_1")


----------------------------------------------------------------------------------
Write DataFrame data into RDMBS table
----------------------------------------------------------------------------------
jdbcDF.insertIntoJDBC("jdbc:mysql://localhost:3306/kalyan?user=root&password=hadoop", "student1", false)

jdbcDF.insertIntoJDBC("jdbc:mysql://localhost:3306/kalyan?user=root&password=hadoop", "student1", true)

jdbcDF.createJDBCTable("jdbc:mysql://localhost:3306/kalyan?user=root&password=hadoop", "student2", true)


----------------------------------------------------------------------------------
Write DataFrame data into HIVE table
----------------------------------------------------------------------------------

import org.apache.spark.sql._

jdbcDF.saveAsTable("student")

jdbcDF.saveAsTable("student", SaveMode.Overwrite)

jdbcDF.saveAsTable("student", SaveMode.Append)

jdbcDF.saveAsTable("kalyan.student")

jdbcDF.insertInto("kalyan.student")

----------------------------------------------------------------------------------



----------------------------------------------------------------------------------
Creating udfs in spark
----------------------------------------------------------------------------------

val students1 = List(("rajesh", 11, "spark" , 2016), ("nagesh", 12, "spark" , 2016), ("ganesh", 13, "spark" , 2016))

val studentsRdd1 = sc.parallelize(students1)


case class Student(name: String, id: Int, course: String, year:Int)

val students2 = students1.map( t => Student(t._1, t._2, t._3, t._4))

val studentsRdd2 = sc.parallelize(students2)


val df1 = sqlContext.createDataFrame(studentsRdd1)
df1.registerTempTable("student1")

val df2 = sqlContext.createDataFrame(studentsRdd2)
df2.registerTempTable("student2")


scala> sqlContext.sql("select * from student1").show
+------+---+-----+----+
|    _1| _2|   _3|  _4|
+------+---+-----+----+
|rajesh| 11|spark|2016|
|nagesh| 12|spark|2016|
|ganesh| 13|spark|2016|
+------+---+-----+----+


scala> sqlContext.sql("select * from student2").show
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|rajesh| 11| spark|2016|
|nagesh| 12| spark|2016|
|ganesh| 13| spark|2016|
+------+---+------+----+

scala> df1.select("_1", "_2").show
+------+---+
|    _1| _2|
+------+---+
|rajesh| 11|
|nagesh| 12|
|ganesh| 13|
+------+---+


scala> df2.select("name", "id").show
+------+---+
|  name| id|
+------+---+
|rajesh| 11|
|nagesh| 12|
|ganesh| 13|
+------+---+

----------------------------------------------------------------------------------

def lengthFunc(name: String) = { name.length }
sqlContext.udf.register("mylength", lengthFunc(_:String))

// Now we can use our function directly in SparkSQL.
sqlContext.sql("SELECT name, mylength(name) from student2").show
sqlContext.sql("SELECT name, mylength(name) as len from student2").show

// but not outside
df2.select($"name", mylength($"name"), $"id").show // fails


import org.apache.spark.sql.functions.udf
val lengthUdf = udf(lengthFunc(_:String))

// now this works
df2.select($"name", lengthUdf($"name"), $"id").show


----------------------------------------------------------------------------------
SPARK SQL with `DATA SETS`
----------------------------------------------------------------------------------

scala> val df = sc.makeRDD(1 to 10).toDF()
df: org.apache.spark.sql.DataFrame = [_1: int]

scala> df.map(_ + 1).collect()
<console>:30: error: type mismatch;
 found   : Int(1)
 required: String
              df.map(_ + 1).collect()

scala> df.map(row => row(0).toString.toInt + 1).collect
res83: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)

scala> df.map(row => row.getAs[Int]("_1") + 1).collect
res97: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)



scala> val ds = (1 to 10).toDS()
ds: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> ds.map(_ + 1).collect()
res71: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)

----------------------------------------------------------------------------------

case class Student(name: String, id: Long, course: String, year: Long)

val path = "file:///home/orienit/spark/input/student.json"

scala> val df = sqlContext.read.json(path)
df: org.apache.spark.sql.DataFrame = [course: string, id: bigint, name: string, year: bigint]


scala> val ds = sqlContext.read.json(path).as[Student]
ds: org.apache.spark.sql.Dataset[Student] = [name: string, id: bigint, course: string, year: bigint]


Note:-
1. DataFrames can be converted to a Dataset by providing a class.
2. Datasets can be converted to a DataFrame directly

scala> val ds = df.as[Student]
ds: org.apache.spark.sql.Dataset[Student] = [name: string, id: bigint, course: string, year: bigint]


scala> df.show
+------+---+------+----+
|course| id|  name|year|
+------+---+------+----+
| spark|  1|  anil|2016|
|hadoop|  5|anvith|2015|
|hadoop|  6|   dev|2015|
| spark|  3|   raj|2016|
|hadoop|  4| sunil|2015|
| spark|  2|venkat|2016|
+------+---+------+----+


scala> ds.show
+------+---+------+----+
|course| id|  name|year|
+------+---+------+----+
| spark|  1|  anil|2016|
|hadoop|  5|anvith|2015|
|hadoop|  6|   dev|2015|
| spark|  3|   raj|2016|
|hadoop|  4| sunil|2015|
| spark|  2|venkat|2016|
+------+---+------+----+


scala> ds.collect.foreach(println)
Student(anil,1,spark,2016)
Student(anvith,5,hadoop,2015)
Student(dev,6,hadoop,2015)
Student(raj,3,spark,2016)
Student(sunil,4,hadoop,2015)
Student(venkat,2,spark,2016)


scala> df.collect.foreach(println)
[spark,1,anil,2016]
[hadoop,5,anvith,2015]
[hadoop,6,dev,2015]
[spark,3,raj,2016]
[hadoop,4,sunil,2015]
[spark,2,venkat,2016]



----------------------------------------------------------------------------------
HIVE SPARK EXAMPLES
----------------------------------------------------------------------------------
import org.apache.spark.sql._

val sqlContext = new SQLContext(sc)

sqlContext.sql("CREATE DATABASE IF NOT EXISTS kalyan")
sqlContext.sql("CREATE TABLE IF NOT EXISTS kalyan.src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH '${env:SPARK_HOME}/examples/src/main/resources/kv1.txt' INTO TABLE kalyan.src")

val countdata = sqlContext.sql("SELECT count(*) from kalyan.src")
countdata.collect()

val input = sqlContext.sql("FROM kalyan.src SELECT key, value")
val data = input.map(_.getInt(0))

println(input.collect().toList)
println(data.collect().toList)

input.collect().foreach(println)




----------------------------------------------------------------------------------
CASSANDRA SPARK EXAMPLES
----------------------------------------------------------------------------------

DROP KEYSPACE kalyan;

CREATE KEYSPACE kalyan WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };

CREATE TABLE kalyan.student(name text PRIMARY KEY, id int, course text, year int);

INSERT INTO kalyan.student(name, id, course, year) VALUES ('anil', 1, 'spark', 2016);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('venkat', 2, 'spark', 2016);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('raj', 3, 'spark', 2016);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('sunil', 4, 'hadoop', 2015);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('anvith', 5, 'hadoop', 2015);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('dev', 6, 'hadoop', 2015);

SELECT * FROM kalyan.student;


----------------------------------------------------------------------------------
Spark-2.x  & Spark-1.x Support
----------------------------------------------------------------------------------

sc.stop()

import org.apache.spark._
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector

val conf = new SparkConf(true).set("spark.cassandra.connection.host", "127.0.0.1").set("spark.cassandra.input.split.size_in_mb", "268435456")

val sc = new SparkContext("local[*]", "kalyan", conf)

val rdd = sc.cassandraTable("kalyan", "student")

println(rdd.count)

println(rdd.first)

rdd.groupBy(x => x.getInt("year")).foreach(println)

----------------------------------------------------------------------------------

val students = List(("rajesh", 11, "spark" , 2016), ("nagesh", 12, "spark" , 2016), ("ganesh", 13, "spark" , 2016))

val studentsRdd = sc.parallelize(students)

studentsRdd.saveToCassandra("kalyan", "student", SomeColumns("name", "id", "course", "year"))     


----------------------------------------------------------------------------------
Spark-2.x Support
----------------------------------------------------------------------------------

import org.apache.spark.sql._
import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector.cql.CassandraConnector

val sqlContext = new SQLContext(sc)

val df = sqlContext.read.cassandraFormat("student", "kalyan").load()
df.show()

df.createOrReplaceTempView("student")

val sdf = sqlContext.sql("SELECT * from student")
sdf.show()


----------------------------------------------------------------------------------
Spark-1.x Support, Spark-2.x removed CassandraSQLContext
----------------------------------------------------------------------------------

import org.apache.spark.sql._
import org.apache.spark.sql.cassandra._

val csc = new CassandraSQLContext(sc)

val df = csc.sql("SELECT * from kalyan.student")
df.show()

df.registerTempTable("student")

val sdf = sqlContext.sql("SELECT * from student")
sdf.show()


----------------------------------------------------------------------------------
PHOENIX SPARK EXAMPLES
----------------------------------------------------------------------------------

CREATE TABLE STUDENT(name VARCHAR PRIMARY KEY, id INTEGER, course VARCHAR, year INTEGER);

UPSERT INTO STUDENT(name, id, course, year) VALUES ('anil', 1, 'spark', 2016);
UPSERT INTO STUDENT(name, id, course, year) VALUES ('venkat', 2, 'spark', 2016);
UPSERT INTO STUDENT(name, id, course, year) VALUES ('raj', 3, 'spark', 2016);
UPSERT INTO STUDENT(name, id, course, year) VALUES ('sunil', 4, 'hadoop', 2015);
UPSERT INTO STUDENT(name, id, course, year) VALUES ('anvith', 5, 'hadoop', 2015);
UPSERT INTO STUDENT(name, id, course, year) VALUES ('dev', 6, 'hadoop', 2015);

SELECT * FROM STUDENT;

----------------------------------------------------------------------------------

sc.stop()

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import org.apache.phoenix.spark._

val sc = new SparkContext("local", "kalyan")
val sqlContext = new SQLContext(sc)

val df = sqlContext.load(
  "org.apache.phoenix.spark",
  Map("table" -> "STUDENT", "zkUrl" -> "localhost:2181")
)

----------------------------------------------------------------------------------

df.show

df.select("NAME").show()

df.select(df("NAME")).show()

df.select("NAME", "ID").show()

df.select(df("NAME"), df("ID")).show()

df.filter(df("ID") === 1L).show()

df.filter(df("ID") >= 2L).show()

df.filter(df("COURSE") === "spark" && df("ID") >= 1L).show

df.filter(df("COURSE") === "spark" && df("ID") === 1L).select(df("ID")).show

--------------------------------------------------------------------------

import org.apache.hadoop.conf.Configuration

val configuration = new Configuration()

val df = sqlContext.phoenixTableAsDataFrame(
 "STUDENT", Seq[String](), zkUrl = Some("localhost:2181"), conf = configuration
)

df.show


val df = sqlContext.phoenixTableAsDataFrame(
 "STUDENT", Seq[String]("ID", "NAME"), zkUrl = Some("localhost:2181"), conf = configuration
)

df.show

--------------------------------------------------------------------------

val rdd = sc.phoenixTableAsRDD(
 "STUDENT", Seq[String](), zkUrl = Some("localhost:2181"), conf = configuration
)

rdd.foreach(println)

val rdd = sc.phoenixTableAsRDD(
 "STUDENT", Seq[String]("ID", "NAME"), zkUrl = Some("localhost:2181"), conf = configuration
)

rdd.foreach(println)

val firstId = rdd.first()("ID").asInstanceOf[Int]
val firstNAME = rdd.first()("NAME").asInstanceOf[String]

--------------------------------------------------------------------------

val students = List(("rajesh", 11, "spark" , 2016), ("nagesh", 12, "spark" , 2016), ("ganesh", 13, "spark" , 2016))

val studentsRdd = sc.parallelize(students)

studentsRdd.saveToPhoenix("STUDENT", Seq("NAME", "ID", "COURSE", "YEAR"), zkUrl = Some("localhost:2181"))

--------------------------------------------------------------------------

import org.apache.spark.sql._
import org.apache.spark.sql.types._

// Load from `STUDENT`
val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "STUDENT", "zkUrl" -> "localhost:2181"))


// Save to `STUDENT1`
CREATE TABLE STUDENT1(name VARCHAR PRIMARY KEY, id INTEGER, course VARCHAR, year INTEGER);

df.save("org.apache.phoenix.spark", SaveMode.Overwrite, Map("table" -> "STUDENT1", "zkUrl" -> "localhost:2181"))

----------------------------------------------------------------------------------















1 comment:

spark_streaming_examples

Create Spark Streaming Context: ========================================== scala: --------------- import org.apache.spark._ import ...