----------------------------------------------------------------------------------
SPARK SQL INSTALLATION STEPS
----------------------------------------------------------------------------------
1. copy "kalyan_spark_jars" foder to "$SPARK_HOME" folder
2. copy "$SPARK_HOME/conf/spark-env.sh.template" file as "$SPARK_HOME/conf/spark-env.sh"
3. add the below line to "$SPARK_HOME/conf/spark-env.sh" file
export SPARK_CLASSPATH="$(echo $SPARK_HOME/kalyan_spark_jars/*.jar | tr ' ' ':')"
4. copy "$SPARK_HOME/conf/spark-defaults.conf.template" file as "$SPARK_HOME/conf/spark-defaults.conf"
5. add the below line to "$SPARK_HOME/conf/spark-defaults.conf" file
spark.driver.memory 5g
6. start the `spark-shell` with below command
$SPARK_HOME/bin/spark-shell
7. copy "input" foder to "/home/orienit/spark" folder
----------------------------------------------------------------------------------
SPARK SQL EXAMPLES with SCALA
----------------------------------------------------------------------------------
import org.apache.spark._
import org.apache.spark.sql._
val conf: SparkConf = new SparkConf().setAppName("Kalyan Sql Practice").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val sqlContext: SQLContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("file:///home/orienit/spark/input/student.json")
val df = sqlContext.read.parquet("file:///home/orienit/spark/input/student.parquet")
scala> df.show
+------+---+------+----+
|course| id| name|year|
+------+---+------+----+
| spark| 1| anil|2016|
|hadoop| 5|anvith|2015|
|hadoop| 6| dev|2015|
| spark| 3| raj|2016|
|hadoop| 4| sunil|2015|
| spark| 2|venkat|2016|
+------+---+------+----+
scala> df.select("name", "id").show
scala> df.select($"name", $"id").show
+------+---+
| name| id|
+------+---+
| anil| 1|
|anvith| 5|
| dev| 6|
| raj| 3|
| sunil| 4|
|venkat| 2|
+------+---+
scala> df.select(df("name"), df("id") + 1).show
scala> df.select($"name", $"id" + 1).show
+------+--------+
| name|(id + 1)|
+------+--------+
| anil| 2|
|anvith| 6|
| dev| 7|
| raj| 4|
| sunil| 5|
|venkat| 3|
+------+--------+
scala> df.filter(df("id") > 4).show
scala> df.filter($"id" > 4).show
+------+---+------+----+
|course| id| name|year|
+------+---+------+----+
|hadoop| 5|anvith|2015|
|hadoop| 6| dev|2015|
+------+---+------+----+
scala> df.filter(df("id") > 4 && df("id") < 6).show
+------+---+------+----+
|course| id| name|year|
+------+---+------+----+
|hadoop| 5|anvith|2015|
+------+---+------+----+
scala> df.where(df("id") > 2).show
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
|anvith| 5|hadoop|2015|
| dev| 6|hadoop|2015|
| raj| 3| spark|2016|
| sunil| 4|hadoop|2015|
+------+---+------+----+
scala> df.limit(4).show
+------+---+------+----+
|course| id| name|year|
+------+---+------+----+
| spark| 1| anil|2016|
|hadoop| 5|anvith|2015|
|hadoop| 6| dev|2015|
| spark| 3| raj|2016|
+------+---+------+----+
scala> df.toJSON.foreach(println)
{"name":"anil","id":1,"course":"spark","year":2016}
{"name":"anvith","id":5,"course":"hadoop","year":2015}
{"name":"dev","id":6,"course":"hadoop","year":2015}
{"name":"raj","id":3,"course":"spark","year":2016}
{"name":"sunil","id":4,"course":"hadoop","year":2015}
{"name":"venkat","id":2,"course":"spark","year":2016}
scala> df.groupBy("course","year").count.show
+------+----+-----+
|course|year|count|
+------+----+-----+
| spark|2016| 3|
|hadoop|2015| 3|
+------+----+-----+
scala> df.groupBy("id","course","year").count.show
+---+------+----+-----+
| id|course|year|count|
+---+------+----+-----+
| 6|hadoop|2015| 1|
| 5|hadoop|2015| 1|
| 3| spark|2016| 1|
| 4|hadoop|2015| 1|
| 2| spark|2016| 1|
| 1| spark|2016| 1|
+---+------+----+-----+
scala> df.orderBy($"course").show
+------+---+------+----+
|course| id| name|year|
+------+---+------+----+
|hadoop| 4| sunil|2015|
|hadoop| 5|anvith|2015|
|hadoop| 6| dev|2015|
| spark| 2|venkat|2016|
| spark| 3| raj|2016|
| spark| 1| anil|2016|
+------+---+------+----+
scala> df.orderBy($"course", $"name").show
+------+---+------+----+
|course| id| name|year|
+------+---+------+----+
|hadoop| 5|anvith|2015|
|hadoop| 6| dev|2015|
|hadoop| 4| sunil|2015|
| spark| 1| anil|2016|
| spark| 3| raj|2016|
| spark| 2|venkat|2016|
+------+---+------+----+
scala> df.sort("course", "id").show
+------+---+------+----+
|course| id| name|year|
+------+---+------+----+
|hadoop| 4| sunil|2015|
|hadoop| 5|anvith|2015|
|hadoop| 6| dev|2015|
| spark| 1| anil|2016|
| spark| 2|venkat|2016|
| spark| 3| raj|2016|
+------+---+------+----+
case class Contact(cid: Int, name: String, loc: String, pincode:Int)
case class Orders(oid: Int, cid: Int, status: String)
val contact = sc.textFile("file:///home/orienit/spark/input/contact.csv").map(_.split(","))
val cdf = contact.map(c => Contact(c(0).trim.toInt, c(1), c(2), c(3).trim.toInt)).toDF()
val orders = sc.textFile("file:///home/orienit/spark/input/orders.tsv").map(_.split("\t"))
val odf = orders.map(x => Orders(x(0).trim.toInt, x(1).trim.toInt, x(2))).toDF()
scala> cdf.show
+---+------+----+-------+
|cid| name| loc|pincode|
+---+------+----+-------+
| 1|kalyan| hyd| 500072|
| 2|venkat| hyd| 500073|
| 3|prasad|bang| 600076|
| 4|anvith|bang| 600075|
+---+------+----+-------+
scala> odf.show
+---+---+-------+
|oid|cid| status|
+---+---+-------+
|111| 1|success|
|112| 1|failure|
|113| 2|success|
|114| 3|success|
|115| 2|failure|
+---+---+-------+
scala> cdf.join(odf).show
+---+------+----+-------+---+---+-------+
|cid| name| loc|pincode|oid|cid| status|
+---+------+----+-------+---+---+-------+
| 1|kalyan| hyd| 500072|111| 1|success|
| 1|kalyan| hyd| 500072|112| 1|failure|
| 1|kalyan| hyd| 500072|113| 2|success|
| 2|venkat| hyd| 500073|111| 1|success|
| 2|venkat| hyd| 500073|112| 1|failure|
| 2|venkat| hyd| 500073|113| 2|success|
| 3|prasad|bang| 600076|111| 1|success|
| 3|prasad|bang| 600076|112| 1|failure|
| 3|prasad|bang| 600076|113| 2|success|
| 1|kalyan| hyd| 500072|114| 3|success|
| 1|kalyan| hyd| 500072|115| 2|failure|
| 2|venkat| hyd| 500073|114| 3|success|
| 2|venkat| hyd| 500073|115| 2|failure|
| 3|prasad|bang| 600076|114| 3|success|
| 3|prasad|bang| 600076|115| 2|failure|
| 4|anvith|bang| 600075|111| 1|success|
| 4|anvith|bang| 600075|112| 1|failure|
| 4|anvith|bang| 600075|113| 2|success|
| 4|anvith|bang| 600075|114| 3|success|
| 4|anvith|bang| 600075|115| 2|failure|
+---+------+----+-------+---+---+-------+
scala> cdf.join(odf, cdf("cid") === odf("cid")).show
+---+------+----+-------+---+---+-------+
|cid| name| loc|pincode|oid|cid| status|
+---+------+----+-------+---+---+-------+
| 1|kalyan| hyd| 500072|111| 1|success|
| 1|kalyan| hyd| 500072|112| 1|failure|
| 2|venkat| hyd| 500073|113| 2|success|
| 2|venkat| hyd| 500073|115| 2|failure|
| 3|prasad|bang| 600076|114| 3|success|
+---+------+----+-------+---+---+-------+
scala> cdf.join(odf, cdf("cid") === odf("cid"), "left_outer").show
+---+------+----+-------+----+----+-------+
|cid| name| loc|pincode| oid| cid| status|
+---+------+----+-------+----+----+-------+
| 1|kalyan| hyd| 500072| 111| 1|success|
| 1|kalyan| hyd| 500072| 112| 1|failure|
| 2|venkat| hyd| 500073| 113| 2|success|
| 2|venkat| hyd| 500073| 115| 2|failure|
| 3|prasad|bang| 600076| 114| 3|success|
| 4|anvith|bang| 600075|null|null| null|
+---+------+----+-------+----+----+-------+
scala> cdf.join(odf, cdf("cid") === odf("cid"), "right_outer").show
+---+------+----+-------+---+---+-------+
|cid| name| loc|pincode|oid|cid| status|
+---+------+----+-------+---+---+-------+
| 1|kalyan| hyd| 500072|111| 1|success|
| 1|kalyan| hyd| 500072|112| 1|failure|
| 2|venkat| hyd| 500073|113| 2|success|
| 2|venkat| hyd| 500073|115| 2|failure|
| 3|prasad|bang| 600076|114| 3|success|
+---+------+----+-------+---+---+-------+
scala> cdf.join(odf, cdf("cid") === odf("cid"), "full_outer").show
+---+------+----+-------+----+----+-------+
|cid| name| loc|pincode| oid| cid| status|
+---+------+----+-------+----+----+-------+
| 1|kalyan| hyd| 500072| 111| 1|success|
| 1|kalyan| hyd| 500072| 112| 1|failure|
| 2|venkat| hyd| 500073| 113| 2|success|
| 2|venkat| hyd| 500073| 115| 2|failure|
| 3|prasad|bang| 600076| 114| 3|success|
| 4|anvith|bang| 600075|null|null| null|
+---+------+----+-------+----+----+-------+
scala> cdf.join(odf, cdf("cid") === odf("cid"), "inner").show
+---+------+----+-------+---+---+-------+
|cid| name| loc|pincode|oid|cid| status|
+---+------+----+-------+---+---+-------+
| 1|kalyan| hyd| 500072|111| 1|success|
| 1|kalyan| hyd| 500072|112| 1|failure|
| 2|venkat| hyd| 500073|113| 2|success|
| 2|venkat| hyd| 500073|115| 2|failure|
| 3|prasad|bang| 600076|114| 3|success|
+---+------+----+-------+---+---+-------+
scala> cdf.unionAll(cdf).show
+---+------+----+-------+
|cid| name| loc|pincode|
+---+------+----+-------+
| 1|kalyan| hyd| 500072|
| 2|venkat| hyd| 500073|
| 3|prasad|bang| 600076|
| 4|anvith|bang| 600075|
| 1|kalyan| hyd| 500072|
| 2|venkat| hyd| 500073|
| 3|prasad|bang| 600076|
| 4|anvith|bang| 600075|
+---+------+----+-------+
scala> cdf.unionAll(df).show
+------+------+------+-------+
| cid| name| loc|pincode|
+------+------+------+-------+
| 1|kalyan| hyd| 500072|
| 2|venkat| hyd| 500073|
| 3|prasad| bang| 600076|
| 4|anvith| bang| 600075|
| spark| 1| anil| 2016|
|hadoop| 5|anvith| 2015|
|hadoop| 6| dev| 2015|
| spark| 3| raj| 2016|
|hadoop| 4| sunil| 2015|
| spark| 2|venkat| 2016|
+------+------+------+-------+
scala> cdf.intersect(cdf).show
+---+------+----+-------+
|cid| name| loc|pincode|
+---+------+----+-------+
| 2|venkat| hyd| 500073|
| 4|anvith|bang| 600075|
| 1|kalyan| hyd| 500072|
| 3|prasad|bang| 600076|
+---+------+----+-------+
scala> cdf.intersect(df).show
+---+----+---+-------+
|cid|name|loc|pincode|
+---+----+---+-------+
+---+----+---+-------+
----------------------------------------------------------------------------------
RDBMS SPARK EXAMPLES
----------------------------------------------------------------------------------
Using JDBC To Connect Databases
----------------------------------------------------------------------------------
SPARK_CLASSPATH=<path-to-mysql-jar>/mysql-connector-java-5.1.34-bin.jar $SPARK_HOME/bin/spark-shell
$SPARK_HOME/bin/spark-shell --driver-class-path <path-to-mysql-jar>/mysql-connector-java-5.1.34-bin.jar
$SPARK_HOME/bin/spark-shell --jars <path-to-mysql-jar>/mysql-connector-java-5.1.34-bin.jar
Mysql Operations
----------------------------------------------------------------------------------
mysql -u root -p
CREATE DATABASE IF NOT EXISTS kalyan;
CREATE TABLE kalyan.student(name VARCHAR(50) PRIMARY KEY, id INT, course VARCHAR(50), year INT);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('anil', 1, 'spark', 2016);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('venkat', 2, 'spark', 2016);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('raj', 3, 'spark', 2016);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('sunil', 4, 'hadoop', 2015);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('anvith', 5, 'hadoop', 2015);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('dev', 6, 'hadoop', 2015);
SELECT * FROM kalyan.student;
----------------------------------------------------------------------------------
To connect any RDBMS:
--------------------------
1. connection url
2. user name & password
3. driver class name
4. client jar
--------------------------
val jdbcDF = sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:mysql://localhost:3306/kalyan?user=root&password=hadoop", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "student")).load()
(or)
val jdbcDF = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/kalyan?user=root&password=hadoop").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "student").load()
(or)
val jdbcDF = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/kalyan").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "hadoop").load()
(or)
val prop = new java.util.Properties
val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/kalyan?user=root&password=hadoop", "student", prop)
(or)
val prop = new java.util.Properties
prop.setProperty("driver","com.mysql.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","hadoop")
val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/kalyan", "student", prop)
(or)
val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/kalyan?user=root&password=hadoop", "student", Array("course='spark'"), prop)
jdbcDF.show()
val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/kalyan?user=root&password=hadoop", "student", Array("course='spark'", "year=2015"), prop)
jdbcDF.show()
val userdata = jdbcDF.select("name", "id")
userdata.show()
----------------------------------------------------------------------------------
Saving the data from `dataframe` to `other systems`
----------------------------------------------------------------------------------
val prop = new java.util.Properties
prop.setProperty("driver","com.mysql.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","hadoop")
val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/kalyan", "student", prop)
jdbcDF.show()
----------------------------------------------------------------------------------
jdbcDF.save("file:///home/orienit/spark/output/student_json", "json")
jdbcDF.save("file:///home/orienit/spark/output/student_json", "json", SaveMode.Overwrite)
jdbcDF.save("file:///home/orienit/spark/output/student_json", "json", SaveMode.Append)
----------------------------------------------------------------------------------
jdbcDF.save("file:///home/orienit/spark/output/student_orc", "orc")
jdbcDF.save("file:///home/orienit/spark/output/student_orc", "orc", SaveMode.Overwrite)
jdbcDF.save("file:///home/orienit/spark/output/student_orc", "orc", SaveMode.Append)
----------------------------------------------------------------------------------
jdbcDF.save("file:///home/orienit/spark/output/student_parquet")
jdbcDF.save("file:///home/orienit/spark/output/student_parquet", SaveMode.Overwrite)
jdbcDF.save("file:///home/orienit/spark/output/student_parquet", SaveMode.Append)
----------------------------------------------------------------------------------
jdbcDF.save("file:///home/orienit/spark/output/student_parquet", "parquet")
jdbcDF.save("file:///home/orienit/spark/output/student_parquet", "parquet", SaveMode.Overwrite)
jdbcDF.save("file:///home/orienit/spark/output/student_parquet", "parquet", SaveMode.Append)
jdbcDF.saveAsParquetFile("file:///home/orienit/spark/output/student_parquet_1")
----------------------------------------------------------------------------------
val prop = new java.util.Properties
prop.setProperty("driver","com.mysql.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","hadoop")
jdbcDF.write.jdbc("jdbc:mysql://localhost:3306/kalyan", "student1", prop)
jdbcDF.write.mode("overwrite").jdbc("jdbc:mysql://localhost:3306/kalyan", "student1", prop)
jdbcDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/kalyan", "student1", prop)
jdbcDF.insertIntoJDBC("jdbc:mysql://localhost:3306/kalyan?user=root&password=hadoop", "student1", false)
jdbcDF.insertIntoJDBC("jdbc:mysql://localhost:3306/kalyan?user=root&password=hadoop", "student1", true)
jdbcDF.createJDBCTable("jdbc:mysql://localhost:3306/kalyan?user=root&password=hadoop", "student2", true)
----------------------------------------------------------------------------------
import org.apache.spark.sql._
jdbcDF.saveAsTable("student")
jdbcDF.saveAsTable("student", SaveMode.Overwrite)
jdbcDF.saveAsTable("student", SaveMode.Append)
jdbcDF.saveAsTable("kalyan.student")
jdbcDF.insertInto("kalyan.student")
----------------------------------------------------------------------------------
Creating udfs in spark
----------------------------------------------------------------------------------
val students1 = List(("rajesh", 11, "spark" , 2016), ("nagesh", 12, "spark" , 2016), ("ganesh", 13, "spark" , 2016))
val studentsRdd1 = sc.parallelize(students1)
case class Student(name: String, id: Int, course: String, year:Int)
val students2 = students1.map( t => Student(t._1, t._2, t._3, t._4))
val studentsRdd2 = sc.parallelize(students2)
val df1 = sqlContext.createDataFrame(studentsRdd1)
df1.registerTempTable("student1")
val df2 = sqlContext.createDataFrame(studentsRdd2)
df2.registerTempTable("student2")
scala> sqlContext.sql("select * from student1").show
+------+---+-----+----+
| _1| _2| _3| _4|
+------+---+-----+----+
|rajesh| 11|spark|2016|
|nagesh| 12|spark|2016|
|ganesh| 13|spark|2016|
+------+---+-----+----+
scala> sqlContext.sql("select * from student2").show
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
|rajesh| 11| spark|2016|
|nagesh| 12| spark|2016|
|ganesh| 13| spark|2016|
+------+---+------+----+
scala> df1.select("_1", "_2").show
+------+---+
| _1| _2|
+------+---+
|rajesh| 11|
|nagesh| 12|
|ganesh| 13|
+------+---+
scala> df2.select("name", "id").show
+------+---+
| name| id|
+------+---+
|rajesh| 11|
|nagesh| 12|
|ganesh| 13|
+------+---+
----------------------------------------------------------------------------------
def lengthFunc(name: String) = { name.length }
sqlContext.udf.register("mylength", lengthFunc(_:String))
// Now we can use our function directly in SparkSQL.
sqlContext.sql("SELECT name, mylength(name) from student2").show
sqlContext.sql("SELECT name, mylength(name) as len from student2").show
// but not outside
df2.select($"name", mylength($"name"), $"id").show // fails
import org.apache.spark.sql.functions.udf
val lengthUdf = udf(lengthFunc(_:String))
// now this works
df2.select($"name", lengthUdf($"name"), $"id").show
----------------------------------------------------------------------------------
SPARK SQL with `DATA SETS`
----------------------------------------------------------------------------------
scala> val df = sc.makeRDD(1 to 10).toDF()
df: org.apache.spark.sql.DataFrame = [_1: int]
scala> df.map(_ + 1).collect()
<console>:30: error: type mismatch;
found : Int(1)
required: String
df.map(_ + 1).collect()
scala> df.map(row => row(0).toString.toInt + 1).collect
res83: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
scala> df.map(row => row.getAs[Int]("_1") + 1).collect
res97: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
scala> val ds = (1 to 10).toDS()
ds: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> ds.map(_ + 1).collect()
res71: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
----------------------------------------------------------------------------------
case class Student(name: String, id: Long, course: String, year: Long)
val path = "file:///home/orienit/spark/input/student.json"
scala> val df = sqlContext.read.json(path)
df: org.apache.spark.sql.DataFrame = [course: string, id: bigint, name: string, year: bigint]
scala> val ds = sqlContext.read.json(path).as[Student]
ds: org.apache.spark.sql.Dataset[Student] = [name: string, id: bigint, course: string, year: bigint]
Note:-
1. DataFrames can be converted to a Dataset by providing a class.
2. Datasets can be converted to a DataFrame directly
scala> val ds = df.as[Student]
ds: org.apache.spark.sql.Dataset[Student] = [name: string, id: bigint, course: string, year: bigint]
scala> df.show
+------+---+------+----+
|course| id| name|year|
+------+---+------+----+
| spark| 1| anil|2016|
|hadoop| 5|anvith|2015|
|hadoop| 6| dev|2015|
| spark| 3| raj|2016|
|hadoop| 4| sunil|2015|
| spark| 2|venkat|2016|
+------+---+------+----+
scala> ds.show
+------+---+------+----+
|course| id| name|year|
+------+---+------+----+
| spark| 1| anil|2016|
|hadoop| 5|anvith|2015|
|hadoop| 6| dev|2015|
| spark| 3| raj|2016|
|hadoop| 4| sunil|2015|
| spark| 2|venkat|2016|
+------+---+------+----+
scala> ds.collect.foreach(println)
Student(anil,1,spark,2016)
Student(anvith,5,hadoop,2015)
Student(dev,6,hadoop,2015)
Student(raj,3,spark,2016)
Student(sunil,4,hadoop,2015)
Student(venkat,2,spark,2016)
scala> df.collect.foreach(println)
[spark,1,anil,2016]
[hadoop,5,anvith,2015]
[hadoop,6,dev,2015]
[spark,3,raj,2016]
[hadoop,4,sunil,2015]
[spark,2,venkat,2016]
----------------------------------------------------------------------------------
HIVE SPARK EXAMPLES
----------------------------------------------------------------------------------
import org.apache.spark.sql.hive.HiveContext
val hiveContext = new HiveContext(sc)
hiveContext.sql("CREATE DATABASE IF NOT EXISTS kalyan")
hiveContext.sql("CREATE TABLE IF NOT EXISTS kalyan.src (key INT, value STRING)")
hiveContext.sql("LOAD DATA LOCAL INPATH '${env:SPARK_HOME}/examples/src/main/resources/kv1.txt' INTO TABLE kalyan.src")
val countdata = hiveContext.sql("SELECT count(*) from kalyan.src")
countdata.collect()
val input = hiveContext.sql("FROM kalyan.src SELECT key, value")
val data = input.map(_.getInt(0))
println(input.collect().toList)
println(data.collect().toList)
input.collect().foreach(println)
----------------------------------------------------------------------------------
CASSANDRA SPARK EXAMPLES
----------------------------------------------------------------------------------
DROP KEYSPACE kalyan;
CREATE KEYSPACE kalyan WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
CREATE TABLE kalyan.student(name text PRIMARY KEY, id int, course text, year int);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('anil', 1, 'spark', 2016);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('venkat', 2, 'spark', 2016);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('raj', 3, 'spark', 2016);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('sunil', 4, 'hadoop', 2015);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('anvith', 5, 'hadoop', 2015);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('dev', 6, 'hadoop', 2015);
SELECT * FROM kalyan.student;
----------------------------------------------------------------------------------
sc.stop()
import org.apache.spark._
import com.datastax.spark.connector._
val conf = new SparkConf(true).set("spark.cassandra.connection.host", "127.0.0.1").set("spark.cassandra.input.split.size_in_mb", "268435456")
val sc = new SparkContext("local[*]", "kalyan", conf)
val rdd = sc.cassandraTable("kalyan", "student")
println(rdd.count)
println(rdd.first)
rdd.groupBy(x => x.getInt("year")).foreach(println)
----------------------------------------------------------------------------------
val students = List(("rajesh", 11, "spark" , 2016), ("nagesh", 12, "spark" , 2016), ("ganesh", 13, "spark" , 2016))
val studentsRdd = sc.parallelize(students)
studentsRdd.saveToCassandra("kalyan", "student", SomeColumns("name", "id", "course", "year"))
----------------------------------------------------------------------------------
import org.apache.spark.sql._
import org.apache.spark.sql.cassandra._
val csc = new CassandraSQLContext(sc)
val df = csc.sql("SELECT * from kalyan.student")
df.foreach(println)
----------------------------------------------------------------------------------
PHOENIX SPARK EXAMPLES
----------------------------------------------------------------------------------
CREATE TABLE STUDENT(name VARCHAR PRIMARY KEY, id INTEGER, course VARCHAR, year INTEGER);
UPSERT INTO STUDENT(name, id, course, year) VALUES ('anil', 1, 'spark', 2016);
UPSERT INTO STUDENT(name, id, course, year) VALUES ('venkat', 2, 'spark', 2016);
UPSERT INTO STUDENT(name, id, course, year) VALUES ('raj', 3, 'spark', 2016);
UPSERT INTO STUDENT(name, id, course, year) VALUES ('sunil', 4, 'hadoop', 2015);
UPSERT INTO STUDENT(name, id, course, year) VALUES ('anvith', 5, 'hadoop', 2015);
UPSERT INTO STUDENT(name, id, course, year) VALUES ('dev', 6, 'hadoop', 2015);
SELECT * FROM STUDENT;
----------------------------------------------------------------------------------
sc.stop()
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import org.apache.phoenix.spark._
val sc = new SparkContext("local", "kalyan")
val sqlContext = new SQLContext(sc)
val df = sqlContext.load(
"org.apache.phoenix.spark",
Map("table" -> "STUDENT", "zkUrl" -> "localhost:2181")
)
----------------------------------------------------------------------------------
df.show
df.select("NAME").show()
df.select(df("NAME")).show()
df.select("NAME", "ID").show()
df.select(df("NAME"), df("ID")).show()
df.filter(df("ID") === 1L).show()
df.filter(df("ID") >= 2L).show()
df.filter(df("COURSE") === "spark" && df("ID") >= 1L).show
df.filter(df("COURSE") === "spark" && df("ID") === 1L).select(df("ID")).show
--------------------------------------------------------------------------
import org.apache.hadoop.conf.Configuration
val configuration = new Configuration()
val df = sqlContext.phoenixTableAsDataFrame(
"STUDENT", Seq[String](), zkUrl = Some("localhost:2181"), conf = configuration
)
df.show
val df = sqlContext.phoenixTableAsDataFrame(
"STUDENT", Seq[String]("ID", "NAME"), zkUrl = Some("localhost:2181"), conf = configuration
)
df.show
--------------------------------------------------------------------------
val rdd = sc.phoenixTableAsRDD(
"STUDENT", Seq[String](), zkUrl = Some("localhost:2181"), conf = configuration
)
rdd.foreach(println)
val rdd = sc.phoenixTableAsRDD(
"STUDENT", Seq[String]("ID", "NAME"), zkUrl = Some("localhost:2181"), conf = configuration
)
rdd.foreach(println)
val firstId = rdd.first()("ID").asInstanceOf[Int]
val firstNAME = rdd.first()("NAME").asInstanceOf[String]
--------------------------------------------------------------------------
val students = List(("rajesh", 11, "spark" , 2016), ("nagesh", 12, "spark" , 2016), ("ganesh", 13, "spark" , 2016))
val studentsRdd = sc.parallelize(students)
studentsRdd.saveToPhoenix("STUDENT", Seq("NAME", "ID", "COURSE", "YEAR"), zkUrl = Some("localhost:2181"))
--------------------------------------------------------------------------
import org.apache.spark.sql._
import org.apache.spark.sql.types._
// Load from `STUDENT`
val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "STUDENT", "zkUrl" -> "localhost:2181"))
// Save to `STUDENT1`
CREATE TABLE STUDENT1(name VARCHAR PRIMARY KEY, id INTEGER, course VARCHAR, year INTEGER);
df.save("org.apache.phoenix.spark", SaveMode.Overwrite, Map("table" -> "STUDENT1", "zkUrl" -> "localhost:2181"))
----------------------------------------------------------------------------------
SPARK SQL INSTALLATION STEPS
----------------------------------------------------------------------------------
1. copy "kalyan_spark_jars" foder to "$SPARK_HOME" folder
2. copy "$SPARK_HOME/conf/spark-env.sh.template" file as "$SPARK_HOME/conf/spark-env.sh"
3. add the below line to "$SPARK_HOME/conf/spark-env.sh" file
export SPARK_CLASSPATH="$(echo $SPARK_HOME/kalyan_spark_jars/*.jar | tr ' ' ':')"
4. copy "$SPARK_HOME/conf/spark-defaults.conf.template" file as "$SPARK_HOME/conf/spark-defaults.conf"
5. add the below line to "$SPARK_HOME/conf/spark-defaults.conf" file
spark.driver.memory 5g
6. start the `spark-shell` with below command
$SPARK_HOME/bin/spark-shell
7. copy "input" foder to "/home/orienit/spark" folder
----------------------------------------------------------------------------------
SPARK SQL EXAMPLES with SCALA
----------------------------------------------------------------------------------
import org.apache.spark._
import org.apache.spark.sql._
val conf: SparkConf = new SparkConf().setAppName("Kalyan Sql Practice").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val sqlContext: SQLContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("file:///home/orienit/spark/input/student.json")
val df = sqlContext.read.parquet("file:///home/orienit/spark/input/student.parquet")
scala> df.show
+------+---+------+----+
|course| id| name|year|
+------+---+------+----+
| spark| 1| anil|2016|
|hadoop| 5|anvith|2015|
|hadoop| 6| dev|2015|
| spark| 3| raj|2016|
|hadoop| 4| sunil|2015|
| spark| 2|venkat|2016|
+------+---+------+----+
scala> df.select("name", "id").show
scala> df.select($"name", $"id").show
+------+---+
| name| id|
+------+---+
| anil| 1|
|anvith| 5|
| dev| 6|
| raj| 3|
| sunil| 4|
|venkat| 2|
+------+---+
scala> df.select(df("name"), df("id") + 1).show
scala> df.select($"name", $"id" + 1).show
+------+--------+
| name|(id + 1)|
+------+--------+
| anil| 2|
|anvith| 6|
| dev| 7|
| raj| 4|
| sunil| 5|
|venkat| 3|
+------+--------+
scala> df.filter(df("id") > 4).show
scala> df.filter($"id" > 4).show
+------+---+------+----+
|course| id| name|year|
+------+---+------+----+
|hadoop| 5|anvith|2015|
|hadoop| 6| dev|2015|
+------+---+------+----+
scala> df.filter(df("id") > 4 && df("id") < 6).show
+------+---+------+----+
|course| id| name|year|
+------+---+------+----+
|hadoop| 5|anvith|2015|
+------+---+------+----+
scala> df.where(df("id") > 2).show
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
|anvith| 5|hadoop|2015|
| dev| 6|hadoop|2015|
| raj| 3| spark|2016|
| sunil| 4|hadoop|2015|
+------+---+------+----+
scala> df.limit(4).show
+------+---+------+----+
|course| id| name|year|
+------+---+------+----+
| spark| 1| anil|2016|
|hadoop| 5|anvith|2015|
|hadoop| 6| dev|2015|
| spark| 3| raj|2016|
+------+---+------+----+
scala> df.toJSON.foreach(println)
{"name":"anil","id":1,"course":"spark","year":2016}
{"name":"anvith","id":5,"course":"hadoop","year":2015}
{"name":"dev","id":6,"course":"hadoop","year":2015}
{"name":"raj","id":3,"course":"spark","year":2016}
{"name":"sunil","id":4,"course":"hadoop","year":2015}
{"name":"venkat","id":2,"course":"spark","year":2016}
scala> df.groupBy("course","year").count.show
+------+----+-----+
|course|year|count|
+------+----+-----+
| spark|2016| 3|
|hadoop|2015| 3|
+------+----+-----+
scala> df.groupBy("id","course","year").count.show
+---+------+----+-----+
| id|course|year|count|
+---+------+----+-----+
| 6|hadoop|2015| 1|
| 5|hadoop|2015| 1|
| 3| spark|2016| 1|
| 4|hadoop|2015| 1|
| 2| spark|2016| 1|
| 1| spark|2016| 1|
+---+------+----+-----+
scala> df.orderBy($"course").show
+------+---+------+----+
|course| id| name|year|
+------+---+------+----+
|hadoop| 4| sunil|2015|
|hadoop| 5|anvith|2015|
|hadoop| 6| dev|2015|
| spark| 2|venkat|2016|
| spark| 3| raj|2016|
| spark| 1| anil|2016|
+------+---+------+----+
scala> df.orderBy($"course", $"name").show
+------+---+------+----+
|course| id| name|year|
+------+---+------+----+
|hadoop| 5|anvith|2015|
|hadoop| 6| dev|2015|
|hadoop| 4| sunil|2015|
| spark| 1| anil|2016|
| spark| 3| raj|2016|
| spark| 2|venkat|2016|
+------+---+------+----+
scala> df.sort("course", "id").show
+------+---+------+----+
|course| id| name|year|
+------+---+------+----+
|hadoop| 4| sunil|2015|
|hadoop| 5|anvith|2015|
|hadoop| 6| dev|2015|
| spark| 1| anil|2016|
| spark| 2|venkat|2016|
| spark| 3| raj|2016|
+------+---+------+----+
case class Contact(cid: Int, name: String, loc: String, pincode:Int)
case class Orders(oid: Int, cid: Int, status: String)
val contact = sc.textFile("file:///home/orienit/spark/input/contact.csv").map(_.split(","))
val cdf = contact.map(c => Contact(c(0).trim.toInt, c(1), c(2), c(3).trim.toInt)).toDF()
val orders = sc.textFile("file:///home/orienit/spark/input/orders.tsv").map(_.split("\t"))
val odf = orders.map(x => Orders(x(0).trim.toInt, x(1).trim.toInt, x(2))).toDF()
scala> cdf.show
+---+------+----+-------+
|cid| name| loc|pincode|
+---+------+----+-------+
| 1|kalyan| hyd| 500072|
| 2|venkat| hyd| 500073|
| 3|prasad|bang| 600076|
| 4|anvith|bang| 600075|
+---+------+----+-------+
scala> odf.show
+---+---+-------+
|oid|cid| status|
+---+---+-------+
|111| 1|success|
|112| 1|failure|
|113| 2|success|
|114| 3|success|
|115| 2|failure|
+---+---+-------+
scala> cdf.join(odf).show
+---+------+----+-------+---+---+-------+
|cid| name| loc|pincode|oid|cid| status|
+---+------+----+-------+---+---+-------+
| 1|kalyan| hyd| 500072|111| 1|success|
| 1|kalyan| hyd| 500072|112| 1|failure|
| 1|kalyan| hyd| 500072|113| 2|success|
| 2|venkat| hyd| 500073|111| 1|success|
| 2|venkat| hyd| 500073|112| 1|failure|
| 2|venkat| hyd| 500073|113| 2|success|
| 3|prasad|bang| 600076|111| 1|success|
| 3|prasad|bang| 600076|112| 1|failure|
| 3|prasad|bang| 600076|113| 2|success|
| 1|kalyan| hyd| 500072|114| 3|success|
| 1|kalyan| hyd| 500072|115| 2|failure|
| 2|venkat| hyd| 500073|114| 3|success|
| 2|venkat| hyd| 500073|115| 2|failure|
| 3|prasad|bang| 600076|114| 3|success|
| 3|prasad|bang| 600076|115| 2|failure|
| 4|anvith|bang| 600075|111| 1|success|
| 4|anvith|bang| 600075|112| 1|failure|
| 4|anvith|bang| 600075|113| 2|success|
| 4|anvith|bang| 600075|114| 3|success|
| 4|anvith|bang| 600075|115| 2|failure|
+---+------+----+-------+---+---+-------+
scala> cdf.join(odf, cdf("cid") === odf("cid")).show
+---+------+----+-------+---+---+-------+
|cid| name| loc|pincode|oid|cid| status|
+---+------+----+-------+---+---+-------+
| 1|kalyan| hyd| 500072|111| 1|success|
| 1|kalyan| hyd| 500072|112| 1|failure|
| 2|venkat| hyd| 500073|113| 2|success|
| 2|venkat| hyd| 500073|115| 2|failure|
| 3|prasad|bang| 600076|114| 3|success|
+---+------+----+-------+---+---+-------+
scala> cdf.join(odf, cdf("cid") === odf("cid"), "left_outer").show
+---+------+----+-------+----+----+-------+
|cid| name| loc|pincode| oid| cid| status|
+---+------+----+-------+----+----+-------+
| 1|kalyan| hyd| 500072| 111| 1|success|
| 1|kalyan| hyd| 500072| 112| 1|failure|
| 2|venkat| hyd| 500073| 113| 2|success|
| 2|venkat| hyd| 500073| 115| 2|failure|
| 3|prasad|bang| 600076| 114| 3|success|
| 4|anvith|bang| 600075|null|null| null|
+---+------+----+-------+----+----+-------+
scala> cdf.join(odf, cdf("cid") === odf("cid"), "right_outer").show
+---+------+----+-------+---+---+-------+
|cid| name| loc|pincode|oid|cid| status|
+---+------+----+-------+---+---+-------+
| 1|kalyan| hyd| 500072|111| 1|success|
| 1|kalyan| hyd| 500072|112| 1|failure|
| 2|venkat| hyd| 500073|113| 2|success|
| 2|venkat| hyd| 500073|115| 2|failure|
| 3|prasad|bang| 600076|114| 3|success|
+---+------+----+-------+---+---+-------+
scala> cdf.join(odf, cdf("cid") === odf("cid"), "full_outer").show
+---+------+----+-------+----+----+-------+
|cid| name| loc|pincode| oid| cid| status|
+---+------+----+-------+----+----+-------+
| 1|kalyan| hyd| 500072| 111| 1|success|
| 1|kalyan| hyd| 500072| 112| 1|failure|
| 2|venkat| hyd| 500073| 113| 2|success|
| 2|venkat| hyd| 500073| 115| 2|failure|
| 3|prasad|bang| 600076| 114| 3|success|
| 4|anvith|bang| 600075|null|null| null|
+---+------+----+-------+----+----+-------+
scala> cdf.join(odf, cdf("cid") === odf("cid"), "inner").show
+---+------+----+-------+---+---+-------+
|cid| name| loc|pincode|oid|cid| status|
+---+------+----+-------+---+---+-------+
| 1|kalyan| hyd| 500072|111| 1|success|
| 1|kalyan| hyd| 500072|112| 1|failure|
| 2|venkat| hyd| 500073|113| 2|success|
| 2|venkat| hyd| 500073|115| 2|failure|
| 3|prasad|bang| 600076|114| 3|success|
+---+------+----+-------+---+---+-------+
scala> cdf.unionAll(cdf).show
+---+------+----+-------+
|cid| name| loc|pincode|
+---+------+----+-------+
| 1|kalyan| hyd| 500072|
| 2|venkat| hyd| 500073|
| 3|prasad|bang| 600076|
| 4|anvith|bang| 600075|
| 1|kalyan| hyd| 500072|
| 2|venkat| hyd| 500073|
| 3|prasad|bang| 600076|
| 4|anvith|bang| 600075|
+---+------+----+-------+
scala> cdf.unionAll(df).show
+------+------+------+-------+
| cid| name| loc|pincode|
+------+------+------+-------+
| 1|kalyan| hyd| 500072|
| 2|venkat| hyd| 500073|
| 3|prasad| bang| 600076|
| 4|anvith| bang| 600075|
| spark| 1| anil| 2016|
|hadoop| 5|anvith| 2015|
|hadoop| 6| dev| 2015|
| spark| 3| raj| 2016|
|hadoop| 4| sunil| 2015|
| spark| 2|venkat| 2016|
+------+------+------+-------+
scala> cdf.intersect(cdf).show
+---+------+----+-------+
|cid| name| loc|pincode|
+---+------+----+-------+
| 2|venkat| hyd| 500073|
| 4|anvith|bang| 600075|
| 1|kalyan| hyd| 500072|
| 3|prasad|bang| 600076|
+---+------+----+-------+
scala> cdf.intersect(df).show
+---+----+---+-------+
|cid|name|loc|pincode|
+---+----+---+-------+
+---+----+---+-------+
----------------------------------------------------------------------------------
RDBMS SPARK EXAMPLES
----------------------------------------------------------------------------------
Using JDBC To Connect Databases
----------------------------------------------------------------------------------
SPARK_CLASSPATH=<path-to-mysql-jar>/mysql-connector-java-5.1.34-bin.jar $SPARK_HOME/bin/spark-shell
$SPARK_HOME/bin/spark-shell --driver-class-path <path-to-mysql-jar>/mysql-connector-java-5.1.34-bin.jar
$SPARK_HOME/bin/spark-shell --jars <path-to-mysql-jar>/mysql-connector-java-5.1.34-bin.jar
Mysql Operations
----------------------------------------------------------------------------------
mysql -u root -p
CREATE DATABASE IF NOT EXISTS kalyan;
CREATE TABLE kalyan.student(name VARCHAR(50) PRIMARY KEY, id INT, course VARCHAR(50), year INT);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('anil', 1, 'spark', 2016);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('venkat', 2, 'spark', 2016);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('raj', 3, 'spark', 2016);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('sunil', 4, 'hadoop', 2015);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('anvith', 5, 'hadoop', 2015);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('dev', 6, 'hadoop', 2015);
SELECT * FROM kalyan.student;
----------------------------------------------------------------------------------
To connect any RDBMS:
--------------------------
1. connection url
2. user name & password
3. driver class name
4. client jar
--------------------------
val jdbcDF = sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:mysql://localhost:3306/kalyan?user=root&password=hadoop", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "student")).load()
(or)
val jdbcDF = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/kalyan?user=root&password=hadoop").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "student").load()
(or)
val jdbcDF = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/kalyan").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "student").option("user", "root").option("password", "hadoop").load()
(or)
val prop = new java.util.Properties
val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/kalyan?user=root&password=hadoop", "student", prop)
(or)
val prop = new java.util.Properties
prop.setProperty("driver","com.mysql.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","hadoop")
val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/kalyan", "student", prop)
(or)
val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/kalyan?user=root&password=hadoop", "student", Array("course='spark'"), prop)
jdbcDF.show()
val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/kalyan?user=root&password=hadoop", "student", Array("course='spark'", "year=2015"), prop)
jdbcDF.show()
val userdata = jdbcDF.select("name", "id")
userdata.show()
----------------------------------------------------------------------------------
Saving the data from `dataframe` to `other systems`
----------------------------------------------------------------------------------
val prop = new java.util.Properties
prop.setProperty("driver","com.mysql.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","hadoop")
val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/kalyan", "student", prop)
jdbcDF.show()
----------------------------------------------------------------------------------
jdbcDF.save("file:///home/orienit/spark/output/student_json", "json")
jdbcDF.save("file:///home/orienit/spark/output/student_json", "json", SaveMode.Overwrite)
jdbcDF.save("file:///home/orienit/spark/output/student_json", "json", SaveMode.Append)
----------------------------------------------------------------------------------
jdbcDF.save("file:///home/orienit/spark/output/student_orc", "orc")
jdbcDF.save("file:///home/orienit/spark/output/student_orc", "orc", SaveMode.Overwrite)
jdbcDF.save("file:///home/orienit/spark/output/student_orc", "orc", SaveMode.Append)
----------------------------------------------------------------------------------
jdbcDF.save("file:///home/orienit/spark/output/student_parquet")
jdbcDF.save("file:///home/orienit/spark/output/student_parquet", SaveMode.Overwrite)
jdbcDF.save("file:///home/orienit/spark/output/student_parquet", SaveMode.Append)
----------------------------------------------------------------------------------
jdbcDF.save("file:///home/orienit/spark/output/student_parquet", "parquet")
jdbcDF.save("file:///home/orienit/spark/output/student_parquet", "parquet", SaveMode.Overwrite)
jdbcDF.save("file:///home/orienit/spark/output/student_parquet", "parquet", SaveMode.Append)
jdbcDF.saveAsParquetFile("file:///home/orienit/spark/output/student_parquet_1")
----------------------------------------------------------------------------------
val prop = new java.util.Properties
prop.setProperty("driver","com.mysql.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","hadoop")
jdbcDF.write.jdbc("jdbc:mysql://localhost:3306/kalyan", "student1", prop)
jdbcDF.write.mode("overwrite").jdbc("jdbc:mysql://localhost:3306/kalyan", "student1", prop)
jdbcDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/kalyan", "student1", prop)
jdbcDF.insertIntoJDBC("jdbc:mysql://localhost:3306/kalyan?user=root&password=hadoop", "student1", false)
jdbcDF.insertIntoJDBC("jdbc:mysql://localhost:3306/kalyan?user=root&password=hadoop", "student1", true)
jdbcDF.createJDBCTable("jdbc:mysql://localhost:3306/kalyan?user=root&password=hadoop", "student2", true)
----------------------------------------------------------------------------------
import org.apache.spark.sql._
jdbcDF.saveAsTable("student")
jdbcDF.saveAsTable("student", SaveMode.Overwrite)
jdbcDF.saveAsTable("student", SaveMode.Append)
jdbcDF.saveAsTable("kalyan.student")
jdbcDF.insertInto("kalyan.student")
----------------------------------------------------------------------------------
Creating udfs in spark
----------------------------------------------------------------------------------
val students1 = List(("rajesh", 11, "spark" , 2016), ("nagesh", 12, "spark" , 2016), ("ganesh", 13, "spark" , 2016))
val studentsRdd1 = sc.parallelize(students1)
case class Student(name: String, id: Int, course: String, year:Int)
val students2 = students1.map( t => Student(t._1, t._2, t._3, t._4))
val studentsRdd2 = sc.parallelize(students2)
val df1 = sqlContext.createDataFrame(studentsRdd1)
df1.registerTempTable("student1")
val df2 = sqlContext.createDataFrame(studentsRdd2)
df2.registerTempTable("student2")
scala> sqlContext.sql("select * from student1").show
+------+---+-----+----+
| _1| _2| _3| _4|
+------+---+-----+----+
|rajesh| 11|spark|2016|
|nagesh| 12|spark|2016|
|ganesh| 13|spark|2016|
+------+---+-----+----+
scala> sqlContext.sql("select * from student2").show
+------+---+------+----+
| name| id|course|year|
+------+---+------+----+
|rajesh| 11| spark|2016|
|nagesh| 12| spark|2016|
|ganesh| 13| spark|2016|
+------+---+------+----+
scala> df1.select("_1", "_2").show
+------+---+
| _1| _2|
+------+---+
|rajesh| 11|
|nagesh| 12|
|ganesh| 13|
+------+---+
scala> df2.select("name", "id").show
+------+---+
| name| id|
+------+---+
|rajesh| 11|
|nagesh| 12|
|ganesh| 13|
+------+---+
----------------------------------------------------------------------------------
def lengthFunc(name: String) = { name.length }
sqlContext.udf.register("mylength", lengthFunc(_:String))
// Now we can use our function directly in SparkSQL.
sqlContext.sql("SELECT name, mylength(name) from student2").show
sqlContext.sql("SELECT name, mylength(name) as len from student2").show
// but not outside
df2.select($"name", mylength($"name"), $"id").show // fails
import org.apache.spark.sql.functions.udf
val lengthUdf = udf(lengthFunc(_:String))
// now this works
df2.select($"name", lengthUdf($"name"), $"id").show
----------------------------------------------------------------------------------
SPARK SQL with `DATA SETS`
----------------------------------------------------------------------------------
scala> val df = sc.makeRDD(1 to 10).toDF()
df: org.apache.spark.sql.DataFrame = [_1: int]
scala> df.map(_ + 1).collect()
<console>:30: error: type mismatch;
found : Int(1)
required: String
df.map(_ + 1).collect()
scala> df.map(row => row(0).toString.toInt + 1).collect
res83: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
scala> df.map(row => row.getAs[Int]("_1") + 1).collect
res97: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
scala> val ds = (1 to 10).toDS()
ds: org.apache.spark.sql.Dataset[Int] = [value: int]
scala> ds.map(_ + 1).collect()
res71: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)
----------------------------------------------------------------------------------
case class Student(name: String, id: Long, course: String, year: Long)
val path = "file:///home/orienit/spark/input/student.json"
scala> val df = sqlContext.read.json(path)
df: org.apache.spark.sql.DataFrame = [course: string, id: bigint, name: string, year: bigint]
scala> val ds = sqlContext.read.json(path).as[Student]
ds: org.apache.spark.sql.Dataset[Student] = [name: string, id: bigint, course: string, year: bigint]
Note:-
1. DataFrames can be converted to a Dataset by providing a class.
2. Datasets can be converted to a DataFrame directly
scala> val ds = df.as[Student]
ds: org.apache.spark.sql.Dataset[Student] = [name: string, id: bigint, course: string, year: bigint]
scala> df.show
+------+---+------+----+
|course| id| name|year|
+------+---+------+----+
| spark| 1| anil|2016|
|hadoop| 5|anvith|2015|
|hadoop| 6| dev|2015|
| spark| 3| raj|2016|
|hadoop| 4| sunil|2015|
| spark| 2|venkat|2016|
+------+---+------+----+
scala> ds.show
+------+---+------+----+
|course| id| name|year|
+------+---+------+----+
| spark| 1| anil|2016|
|hadoop| 5|anvith|2015|
|hadoop| 6| dev|2015|
| spark| 3| raj|2016|
|hadoop| 4| sunil|2015|
| spark| 2|venkat|2016|
+------+---+------+----+
scala> ds.collect.foreach(println)
Student(anil,1,spark,2016)
Student(anvith,5,hadoop,2015)
Student(dev,6,hadoop,2015)
Student(raj,3,spark,2016)
Student(sunil,4,hadoop,2015)
Student(venkat,2,spark,2016)
scala> df.collect.foreach(println)
[spark,1,anil,2016]
[hadoop,5,anvith,2015]
[hadoop,6,dev,2015]
[spark,3,raj,2016]
[hadoop,4,sunil,2015]
[spark,2,venkat,2016]
----------------------------------------------------------------------------------
HIVE SPARK EXAMPLES
----------------------------------------------------------------------------------
import org.apache.spark.sql.hive.HiveContext
val hiveContext = new HiveContext(sc)
hiveContext.sql("CREATE DATABASE IF NOT EXISTS kalyan")
hiveContext.sql("CREATE TABLE IF NOT EXISTS kalyan.src (key INT, value STRING)")
hiveContext.sql("LOAD DATA LOCAL INPATH '${env:SPARK_HOME}/examples/src/main/resources/kv1.txt' INTO TABLE kalyan.src")
val countdata = hiveContext.sql("SELECT count(*) from kalyan.src")
countdata.collect()
val input = hiveContext.sql("FROM kalyan.src SELECT key, value")
val data = input.map(_.getInt(0))
println(input.collect().toList)
println(data.collect().toList)
input.collect().foreach(println)
----------------------------------------------------------------------------------
CASSANDRA SPARK EXAMPLES
----------------------------------------------------------------------------------
DROP KEYSPACE kalyan;
CREATE KEYSPACE kalyan WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };
CREATE TABLE kalyan.student(name text PRIMARY KEY, id int, course text, year int);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('anil', 1, 'spark', 2016);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('venkat', 2, 'spark', 2016);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('raj', 3, 'spark', 2016);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('sunil', 4, 'hadoop', 2015);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('anvith', 5, 'hadoop', 2015);
INSERT INTO kalyan.student(name, id, course, year) VALUES ('dev', 6, 'hadoop', 2015);
SELECT * FROM kalyan.student;
----------------------------------------------------------------------------------
sc.stop()
import org.apache.spark._
import com.datastax.spark.connector._
val conf = new SparkConf(true).set("spark.cassandra.connection.host", "127.0.0.1").set("spark.cassandra.input.split.size_in_mb", "268435456")
val sc = new SparkContext("local[*]", "kalyan", conf)
val rdd = sc.cassandraTable("kalyan", "student")
println(rdd.count)
println(rdd.first)
rdd.groupBy(x => x.getInt("year")).foreach(println)
----------------------------------------------------------------------------------
val students = List(("rajesh", 11, "spark" , 2016), ("nagesh", 12, "spark" , 2016), ("ganesh", 13, "spark" , 2016))
val studentsRdd = sc.parallelize(students)
studentsRdd.saveToCassandra("kalyan", "student", SomeColumns("name", "id", "course", "year"))
----------------------------------------------------------------------------------
import org.apache.spark.sql._
import org.apache.spark.sql.cassandra._
val csc = new CassandraSQLContext(sc)
val df = csc.sql("SELECT * from kalyan.student")
df.foreach(println)
----------------------------------------------------------------------------------
PHOENIX SPARK EXAMPLES
----------------------------------------------------------------------------------
CREATE TABLE STUDENT(name VARCHAR PRIMARY KEY, id INTEGER, course VARCHAR, year INTEGER);
UPSERT INTO STUDENT(name, id, course, year) VALUES ('anil', 1, 'spark', 2016);
UPSERT INTO STUDENT(name, id, course, year) VALUES ('venkat', 2, 'spark', 2016);
UPSERT INTO STUDENT(name, id, course, year) VALUES ('raj', 3, 'spark', 2016);
UPSERT INTO STUDENT(name, id, course, year) VALUES ('sunil', 4, 'hadoop', 2015);
UPSERT INTO STUDENT(name, id, course, year) VALUES ('anvith', 5, 'hadoop', 2015);
UPSERT INTO STUDENT(name, id, course, year) VALUES ('dev', 6, 'hadoop', 2015);
SELECT * FROM STUDENT;
----------------------------------------------------------------------------------
sc.stop()
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.rdd.RDD
import org.apache.phoenix.spark._
val sc = new SparkContext("local", "kalyan")
val sqlContext = new SQLContext(sc)
val df = sqlContext.load(
"org.apache.phoenix.spark",
Map("table" -> "STUDENT", "zkUrl" -> "localhost:2181")
)
----------------------------------------------------------------------------------
df.show
df.select("NAME").show()
df.select(df("NAME")).show()
df.select("NAME", "ID").show()
df.select(df("NAME"), df("ID")).show()
df.filter(df("ID") === 1L).show()
df.filter(df("ID") >= 2L).show()
df.filter(df("COURSE") === "spark" && df("ID") >= 1L).show
df.filter(df("COURSE") === "spark" && df("ID") === 1L).select(df("ID")).show
--------------------------------------------------------------------------
import org.apache.hadoop.conf.Configuration
val configuration = new Configuration()
val df = sqlContext.phoenixTableAsDataFrame(
"STUDENT", Seq[String](), zkUrl = Some("localhost:2181"), conf = configuration
)
df.show
val df = sqlContext.phoenixTableAsDataFrame(
"STUDENT", Seq[String]("ID", "NAME"), zkUrl = Some("localhost:2181"), conf = configuration
)
df.show
--------------------------------------------------------------------------
val rdd = sc.phoenixTableAsRDD(
"STUDENT", Seq[String](), zkUrl = Some("localhost:2181"), conf = configuration
)
rdd.foreach(println)
val rdd = sc.phoenixTableAsRDD(
"STUDENT", Seq[String]("ID", "NAME"), zkUrl = Some("localhost:2181"), conf = configuration
)
rdd.foreach(println)
val firstId = rdd.first()("ID").asInstanceOf[Int]
val firstNAME = rdd.first()("NAME").asInstanceOf[String]
--------------------------------------------------------------------------
val students = List(("rajesh", 11, "spark" , 2016), ("nagesh", 12, "spark" , 2016), ("ganesh", 13, "spark" , 2016))
val studentsRdd = sc.parallelize(students)
studentsRdd.saveToPhoenix("STUDENT", Seq("NAME", "ID", "COURSE", "YEAR"), zkUrl = Some("localhost:2181"))
--------------------------------------------------------------------------
import org.apache.spark.sql._
import org.apache.spark.sql.types._
// Load from `STUDENT`
val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "STUDENT", "zkUrl" -> "localhost:2181"))
// Save to `STUDENT1`
CREATE TABLE STUDENT1(name VARCHAR PRIMARY KEY, id INTEGER, course VARCHAR, year INTEGER);
df.save("org.apache.phoenix.spark", SaveMode.Overwrite, Map("table" -> "STUDENT1", "zkUrl" -> "localhost:2181"))
----------------------------------------------------------------------------------
good blog
ReplyDeleteBig Data and Hadoop Online Training