Monday 23 September 2019

spark_1.x_sql_examples_2

----------------------------------------------------------------------------------
SPARK SQL EXAMPLES with SCALA / JAVA / PYTHON / R
----------------------------------------------------------------------------------


Usage of "SqlContext" in scala / java
----------------------------------------------------------------------------------

Scala:
-----------
val sc: SparkContext = // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create the DataFrame
val df = sqlContext.read.json("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json")


Java:
-----------
JavaSparkContext sc = ...; // An existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

// Create the DataFrame
DataFrame df = sqlContext.read().json("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json");



// Show the content of the DataFrame
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+


// Print the schema in a tree format
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



// Select only the "name" column
df.select("name").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+


// Select everybody, but increment the age by 1
df.select(df("name"), df("age") + 1).show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+


// Select people older than 21
df.filter(df("age") > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



// Count people by age
df.groupBy("age").count().show()

+----+-----+
| age|count|
+----+-----+
|null|    1|
|  19|    1|
|  30|    1|
+----+-----+



Usage of "SqlContext" in python
=========================================
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# Create the DataFrame
df = sqlContext.read.json("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json")


# Show the content of the DataFrame
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



# Print the schema in a tree format
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



# Select only the "name" column
df.select("name").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+



# Select people older than 21
df.filter(df['age'] > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+


# Count people by age
df.groupBy("age").count().show()

+----+-----+
| age|count|
+----+-----+
|null|    1|
|  19|    1|
|  30|    1|
+----+-----+



Usage of "SqlContext" in R
=========================================
sqlContext <- sparkRSQL.init(sc)

# Create the DataFrame
df <- jsonFile(sqlContext, "file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json")

df <- read.json(sqlContext, "file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json")


# Show the content of the DataFrame
showDF(df)

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+


# Print the schema in a tree format
printSchema(df)

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)


# Select only the "name" column
showDF(select(df, "name"))

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+


# Select everybody, but increment the age by 1
showDF(select(df, df$name, df$age + 1))

+-------+-----------+
|   name|(age + 1.0)|
+-------+-----------+
|Michael|       null|
|   Andy|       31.0|
| Justin|       20.0|
+-------+-----------+


# Select people older than 21
showDF(where(df, df$age > 21))

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+


# Count people by age
showDF(count(groupBy(df, "age")))

+----+-----+
| age|count|
+----+-----+
|null|    1|
|  19|    1|
|  30|    1|
+----+-----+




Creating Datasets using scala:
=========================================
// Encoders for most common types are automatically provided by importing sqlContext.implicits._

val ds = Seq(1, 2, 3).toDS()

ds.map(_ + 1).collect()
res: Array[Int] = Array(2, 3, 4)


// Encoders are also created for case classes.

case class Person(name: String, age: Long)

val ds = Seq(Person("Andy", 32)).toDS()


// DataFrames can be converted to a Dataset by providing a class.
// Mapping will be done by name.

val path = "file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json"

val people1 = sqlContext.read.json(path)
people1.collect()
res: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])


val people2 = sqlContext.read.json(path).as[Person]
people2.collect()
res: Array[Person] = Array(Person(Michael,-1), Person(Andy,30), Person(Justin,19))


Inferring the Schema Using Reflection (using scala code)
===========================================================
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()

people.registerTempTable("people")


// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
res: Name: Justin


// or by field name:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)
res: Name: Justin


// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
res: Map("name" -> "Justin", "age" -> 19)



Inferring the Schema Using Reflection (using python code)
===========================================================
# sc is an existing SparkContext.
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)


# Load a text file and convert each line to a Row.
lines = sc.textFile("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))


# Infer the schema, and register the DataFrame as a table.
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.registerTempTable("people")


# SQL can be run over DataFrames that have been registered as a table.
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")


# The results of SQL queries are RDDs and support all the normal RDD operations.
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
  print(teenName)


Programmatically Specifying the Schema (using scala code)
==============================================================

// Create an RDD
val people = sc.textFile("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.txt")


// The schema is encoded in a string
val schemaString = "name age"


// Import Row.
import org.apache.spark.sql.Row;


// Import Spark SQL data types
import org.apache.spark.sql.types.{StructType,StructField,StringType};


// Generate the schema based on the string of schema
val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))


// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// Apply the schema to the RDD.
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)


// Register the DataFrames as a table.
peopleDataFrame.registerTempTable("people")


// SQL statements can be run by using the sql methods provided by sqlContext.
val results = sqlContext.sql("SELECT name FROM people")


// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index or by field name.
results.map(t => "Name: " + t(0)).collect().foreach(println)




Programmatically Specifying the Schema (using python code)
==============================================================

# Import SQLContext and data types
from pyspark.sql import SQLContext
from pyspark.sql.types import *

# sc is an existing SparkContext.
sqlContext = SQLContext(sc)


# Load a text file and convert each line to a tuple.
lines = sc.textFile("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: (p[0], p[1].strip()))


# The schema is encoded in a string.
schemaString = "name age"


fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)


# Apply the schema to the RDD.
schemaPeople = sqlContext.createDataFrame(people, schema)


# Register the DataFrame as a table.
schemaPeople.registerTempTable("people")


# SQL can be run over DataFrames that have been registered as a table.
results = sqlContext.sql("SELECT name FROM people")


# The results of SQL queries are RDDs and support all the normal RDD operations.
names = results.map(lambda p: "Name: " + p.name)
for name in names.collect():
  print(name)


Generic Load/Save Functions
========================================
scala:
--------------
val df = sqlContext.read.load("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("file:///home/orienit/spark/output/namesAndFavColors.parquet")


python:
--------------
df = sqlContext.read.load("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("file:///home/orienit/spark/output/namesAndFavColors.parquet")


R:
--------------
df <- loadDF(sqlContext, "file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.parquet")
saveDF(select(df, "name", "age"), "file:///home/orienit/spark/output/namesAndAges.parquet")





Manually Specifying Options
=============================================
scala:
--------------
val df = sqlContext.read.format("json").load("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("file:///home/orienit/spark/output/namesAndAges.parquet")
df.select("name", "age").write.format("json").save("file:///home/orienit/spark/output/namesAndAges.json")


python:
--------------
df = sqlContext.read.load("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("file:///home/orienit/spark/output/namesAndAges.parquet", format="parquet")


R:
--------------
df <- loadDF(sqlContext, "file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json", "json")
saveDF(select(df, "name", "age"), "file:///home/orienit/spark/output/namesAndAges.parquet", "parquet")




Run SQL on files directly
=============================================
scala:
--------------
val df = sqlContext.sql("SELECT * FROM parquet.`file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/users.parquet`")


python:
--------------
df = sqlContext.sql("SELECT * FROM parquet.`file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/users.parquet`")


R:
--------------
df <- sql(sqlContext, "SELECT * FROM parquet.`file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/users.parquet`")



Loading Data Programmatically
=============================================
scala:
--------------
val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.


// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.
people.write.parquet("people.parquet")


// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
// The result of loading a Parquet file is also a DataFrame.
val parquetFile = sqlContext.read.parquet("people.parquet")


//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)



python:
--------------
# sqlContext from the previous example is used in this example.

schemaPeople # The DataFrame from the previous example.


# DataFrames can be saved as Parquet files, maintaining the schema information.
schemaPeople.write.parquet("people.parquet")


# Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile = sqlContext.read.parquet("people.parquet")


# Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile");
teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
  print(teenName)



R:
--------------
# sqlContext from the previous example is used in this example.

schemaPeople # The DataFrame from the previous example.


# DataFrames can be saved as Parquet files, maintaining the schema information.
saveAsParquetFile(schemaPeople, "people.parquet")


# Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile <- parquetFile(sqlContext, "people.parquet")


# Parquet files can also be registered as tables and then used in SQL statements.
registerTempTable(parquetFile, "parquetFile");
teenagers <- sql(sqlContext, "SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenNames <- map(teenagers, function(p) { paste("Name:", p$name)})
for (teenName in collect(teenNames)) {
  cat(teenName, "\n")
}











1 comment:

spark_streaming_examples

Create Spark Streaming Context: ========================================== scala: --------------- import org.apache.spark._ import ...