----------------------------------------------------------------------------------
SPARK SQL EXAMPLES with SCALA / JAVA / PYTHON / R
----------------------------------------------------------------------------------
Usage of "SqlContext" in scala / java
----------------------------------------------------------------------------------
Scala:
-----------
val sc: SparkContext = // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Create the DataFrame
val df = sqlContext.read.json("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json")
Java:
-----------
JavaSparkContext sc = ...; // An existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
// Create the DataFrame
DataFrame df = sqlContext.read().json("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json");
// Show the content of the DataFrame
df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
// Print the schema in a tree format
df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
// Select everybody, but increment the age by 1
df.select(df("name"), df("age") + 1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
// Select people older than 21
df.filter(df("age") > 21).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
// Count people by age
df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
|null| 1|
| 19| 1|
| 30| 1|
+----+-----+
Usage of "SqlContext" in python
=========================================
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
# Create the DataFrame
df = sqlContext.read.json("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json")
# Show the content of the DataFrame
df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
# Print the schema in a tree format
df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
# Select only the "name" column
df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
# Select people older than 21
df.filter(df['age'] > 21).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
# Count people by age
df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
|null| 1|
| 19| 1|
| 30| 1|
+----+-----+
Usage of "SqlContext" in R
=========================================
sqlContext <- sparkRSQL.init(sc)
# Create the DataFrame
df <- jsonFile(sqlContext, "file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json")
df <- read.json(sqlContext, "file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json")
# Show the content of the DataFrame
showDF(df)
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
# Print the schema in a tree format
printSchema(df)
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
# Select only the "name" column
showDF(select(df, "name"))
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
# Select everybody, but increment the age by 1
showDF(select(df, df$name, df$age + 1))
+-------+-----------+
| name|(age + 1.0)|
+-------+-----------+
|Michael| null|
| Andy| 31.0|
| Justin| 20.0|
+-------+-----------+
# Select people older than 21
showDF(where(df, df$age > 21))
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
# Count people by age
showDF(count(groupBy(df, "age")))
+----+-----+
| age|count|
+----+-----+
|null| 1|
| 19| 1|
| 30| 1|
+----+-----+
Creating Datasets using scala:
=========================================
// Encoders for most common types are automatically provided by importing sqlContext.implicits._
val ds = Seq(1, 2, 3).toDS()
ds.map(_ + 1).collect()
res: Array[Int] = Array(2, 3, 4)
// Encoders are also created for case classes.
case class Person(name: String, age: Long)
val ds = Seq(Person("Andy", 32)).toDS()
// DataFrames can be converted to a Dataset by providing a class.
// Mapping will be done by name.
val path = "file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json"
val people1 = sqlContext.read.json(path)
people1.collect()
res: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])
val people2 = sqlContext.read.json(path).as[Person]
people2.collect()
res: Array[Person] = Array(Person(Michael,-1), Person(Andy,30), Person(Justin,19))
Inferring the Schema Using Reflection (using scala code)
===========================================================
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
res: Name: Justin
// or by field name:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)
res: Name: Justin
// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
res: Map("name" -> "Justin", "age" -> 19)
Inferring the Schema Using Reflection (using python code)
===========================================================
# sc is an existing SparkContext.
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
# Load a text file and convert each line to a Row.
lines = sc.textFile("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
# Infer the schema, and register the DataFrame as a table.
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.registerTempTable("people")
# SQL can be run over DataFrames that have been registered as a table.
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
# The results of SQL queries are RDDs and support all the normal RDD operations.
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
print(teenName)
Programmatically Specifying the Schema (using scala code)
==============================================================
// Create an RDD
val people = sc.textFile("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Import Row.
import org.apache.spark.sql.Row;
// Import Spark SQL data types
import org.apache.spark.sql.types.{StructType,StructField,StringType};
// Generate the schema based on the string of schema
val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
// Apply the schema to the RDD.
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
// Register the DataFrames as a table.
peopleDataFrame.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val results = sqlContext.sql("SELECT name FROM people")
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index or by field name.
results.map(t => "Name: " + t(0)).collect().foreach(println)
Programmatically Specifying the Schema (using python code)
==============================================================
# Import SQLContext and data types
from pyspark.sql import SQLContext
from pyspark.sql.types import *
# sc is an existing SparkContext.
sqlContext = SQLContext(sc)
# Load a text file and convert each line to a tuple.
lines = sc.textFile("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: (p[0], p[1].strip()))
# The schema is encoded in a string.
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
# Apply the schema to the RDD.
schemaPeople = sqlContext.createDataFrame(people, schema)
# Register the DataFrame as a table.
schemaPeople.registerTempTable("people")
# SQL can be run over DataFrames that have been registered as a table.
results = sqlContext.sql("SELECT name FROM people")
# The results of SQL queries are RDDs and support all the normal RDD operations.
names = results.map(lambda p: "Name: " + p.name)
for name in names.collect():
print(name)
Generic Load/Save Functions
========================================
scala:
--------------
val df = sqlContext.read.load("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("file:///home/orienit/spark/output/namesAndFavColors.parquet")
python:
--------------
df = sqlContext.read.load("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("file:///home/orienit/spark/output/namesAndFavColors.parquet")
R:
--------------
df <- loadDF(sqlContext, "file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.parquet")
saveDF(select(df, "name", "age"), "file:///home/orienit/spark/output/namesAndAges.parquet")
Manually Specifying Options
=============================================
scala:
--------------
val df = sqlContext.read.format("json").load("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("file:///home/orienit/spark/output/namesAndAges.parquet")
df.select("name", "age").write.format("json").save("file:///home/orienit/spark/output/namesAndAges.json")
python:
--------------
df = sqlContext.read.load("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("file:///home/orienit/spark/output/namesAndAges.parquet", format="parquet")
R:
--------------
df <- loadDF(sqlContext, "file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json", "json")
saveDF(select(df, "name", "age"), "file:///home/orienit/spark/output/namesAndAges.parquet", "parquet")
Run SQL on files directly
=============================================
scala:
--------------
val df = sqlContext.sql("SELECT * FROM parquet.`file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/users.parquet`")
python:
--------------
df = sqlContext.sql("SELECT * FROM parquet.`file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/users.parquet`")
R:
--------------
df <- sql(sqlContext, "SELECT * FROM parquet.`file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/users.parquet`")
Loading Data Programmatically
=============================================
scala:
--------------
val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.
// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.
people.write.parquet("people.parquet")
// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
// The result of loading a Parquet file is also a DataFrame.
val parquetFile = sqlContext.read.parquet("people.parquet")
//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
python:
--------------
# sqlContext from the previous example is used in this example.
schemaPeople # The DataFrame from the previous example.
# DataFrames can be saved as Parquet files, maintaining the schema information.
schemaPeople.write.parquet("people.parquet")
# Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile = sqlContext.read.parquet("people.parquet")
# Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile");
teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
print(teenName)
R:
--------------
# sqlContext from the previous example is used in this example.
schemaPeople # The DataFrame from the previous example.
# DataFrames can be saved as Parquet files, maintaining the schema information.
saveAsParquetFile(schemaPeople, "people.parquet")
# Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile <- parquetFile(sqlContext, "people.parquet")
# Parquet files can also be registered as tables and then used in SQL statements.
registerTempTable(parquetFile, "parquetFile");
teenagers <- sql(sqlContext, "SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenNames <- map(teenagers, function(p) { paste("Name:", p$name)})
for (teenName in collect(teenNames)) {
cat(teenName, "\n")
}
SPARK SQL EXAMPLES with SCALA / JAVA / PYTHON / R
----------------------------------------------------------------------------------
Usage of "SqlContext" in scala / java
----------------------------------------------------------------------------------
Scala:
-----------
val sc: SparkContext = // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Create the DataFrame
val df = sqlContext.read.json("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json")
Java:
-----------
JavaSparkContext sc = ...; // An existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
// Create the DataFrame
DataFrame df = sqlContext.read().json("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json");
// Show the content of the DataFrame
df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
// Print the schema in a tree format
df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
// Select everybody, but increment the age by 1
df.select(df("name"), df("age") + 1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
// Select people older than 21
df.filter(df("age") > 21).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
// Count people by age
df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
|null| 1|
| 19| 1|
| 30| 1|
+----+-----+
Usage of "SqlContext" in python
=========================================
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
# Create the DataFrame
df = sqlContext.read.json("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json")
# Show the content of the DataFrame
df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
# Print the schema in a tree format
df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
# Select only the "name" column
df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
# Select people older than 21
df.filter(df['age'] > 21).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
# Count people by age
df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
|null| 1|
| 19| 1|
| 30| 1|
+----+-----+
Usage of "SqlContext" in R
=========================================
sqlContext <- sparkRSQL.init(sc)
# Create the DataFrame
df <- jsonFile(sqlContext, "file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json")
df <- read.json(sqlContext, "file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json")
# Show the content of the DataFrame
showDF(df)
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
# Print the schema in a tree format
printSchema(df)
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
# Select only the "name" column
showDF(select(df, "name"))
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
# Select everybody, but increment the age by 1
showDF(select(df, df$name, df$age + 1))
+-------+-----------+
| name|(age + 1.0)|
+-------+-----------+
|Michael| null|
| Andy| 31.0|
| Justin| 20.0|
+-------+-----------+
# Select people older than 21
showDF(where(df, df$age > 21))
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
# Count people by age
showDF(count(groupBy(df, "age")))
+----+-----+
| age|count|
+----+-----+
|null| 1|
| 19| 1|
| 30| 1|
+----+-----+
Creating Datasets using scala:
=========================================
// Encoders for most common types are automatically provided by importing sqlContext.implicits._
val ds = Seq(1, 2, 3).toDS()
ds.map(_ + 1).collect()
res: Array[Int] = Array(2, 3, 4)
// Encoders are also created for case classes.
case class Person(name: String, age: Long)
val ds = Seq(Person("Andy", 32)).toDS()
// DataFrames can be converted to a Dataset by providing a class.
// Mapping will be done by name.
val path = "file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json"
val people1 = sqlContext.read.json(path)
people1.collect()
res: Array[org.apache.spark.sql.Row] = Array([null,Michael], [30,Andy], [19,Justin])
val people2 = sqlContext.read.json(path).as[Person]
people2.collect()
res: Array[Person] = Array(Person(Michael,-1), Person(Andy,30), Person(Justin,19))
Inferring the Schema Using Reflection (using scala code)
===========================================================
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
res: Name: Justin
// or by field name:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)
res: Name: Justin
// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
res: Map("name" -> "Justin", "age" -> 19)
Inferring the Schema Using Reflection (using python code)
===========================================================
# sc is an existing SparkContext.
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
# Load a text file and convert each line to a Row.
lines = sc.textFile("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
# Infer the schema, and register the DataFrame as a table.
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.registerTempTable("people")
# SQL can be run over DataFrames that have been registered as a table.
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
# The results of SQL queries are RDDs and support all the normal RDD operations.
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
print(teenName)
Programmatically Specifying the Schema (using scala code)
==============================================================
// Create an RDD
val people = sc.textFile("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Import Row.
import org.apache.spark.sql.Row;
// Import Spark SQL data types
import org.apache.spark.sql.types.{StructType,StructField,StringType};
// Generate the schema based on the string of schema
val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
// Apply the schema to the RDD.
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
// Register the DataFrames as a table.
peopleDataFrame.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val results = sqlContext.sql("SELECT name FROM people")
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index or by field name.
results.map(t => "Name: " + t(0)).collect().foreach(println)
Programmatically Specifying the Schema (using python code)
==============================================================
# Import SQLContext and data types
from pyspark.sql import SQLContext
from pyspark.sql.types import *
# sc is an existing SparkContext.
sqlContext = SQLContext(sc)
# Load a text file and convert each line to a tuple.
lines = sc.textFile("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: (p[0], p[1].strip()))
# The schema is encoded in a string.
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
# Apply the schema to the RDD.
schemaPeople = sqlContext.createDataFrame(people, schema)
# Register the DataFrame as a table.
schemaPeople.registerTempTable("people")
# SQL can be run over DataFrames that have been registered as a table.
results = sqlContext.sql("SELECT name FROM people")
# The results of SQL queries are RDDs and support all the normal RDD operations.
names = results.map(lambda p: "Name: " + p.name)
for name in names.collect():
print(name)
Generic Load/Save Functions
========================================
scala:
--------------
val df = sqlContext.read.load("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("file:///home/orienit/spark/output/namesAndFavColors.parquet")
python:
--------------
df = sqlContext.read.load("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("file:///home/orienit/spark/output/namesAndFavColors.parquet")
R:
--------------
df <- loadDF(sqlContext, "file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.parquet")
saveDF(select(df, "name", "age"), "file:///home/orienit/spark/output/namesAndAges.parquet")
Manually Specifying Options
=============================================
scala:
--------------
val df = sqlContext.read.format("json").load("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("file:///home/orienit/spark/output/namesAndAges.parquet")
df.select("name", "age").write.format("json").save("file:///home/orienit/spark/output/namesAndAges.json")
python:
--------------
df = sqlContext.read.load("file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("file:///home/orienit/spark/output/namesAndAges.parquet", format="parquet")
R:
--------------
df <- loadDF(sqlContext, "file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/people.json", "json")
saveDF(select(df, "name", "age"), "file:///home/orienit/spark/output/namesAndAges.parquet", "parquet")
Run SQL on files directly
=============================================
scala:
--------------
val df = sqlContext.sql("SELECT * FROM parquet.`file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/users.parquet`")
python:
--------------
df = sqlContext.sql("SELECT * FROM parquet.`file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/users.parquet`")
R:
--------------
df <- sql(sqlContext, "SELECT * FROM parquet.`file:///home/orienit/spark/spark-1.6.0-bin-hadoop2.6/examples/src/main/resources/users.parquet`")
Loading Data Programmatically
=============================================
scala:
--------------
val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.
// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.
people.write.parquet("people.parquet")
// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
// The result of loading a Parquet file is also a DataFrame.
val parquetFile = sqlContext.read.parquet("people.parquet")
//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
python:
--------------
# sqlContext from the previous example is used in this example.
schemaPeople # The DataFrame from the previous example.
# DataFrames can be saved as Parquet files, maintaining the schema information.
schemaPeople.write.parquet("people.parquet")
# Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile = sqlContext.read.parquet("people.parquet")
# Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile");
teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():
print(teenName)
R:
--------------
# sqlContext from the previous example is used in this example.
schemaPeople # The DataFrame from the previous example.
# DataFrames can be saved as Parquet files, maintaining the schema information.
saveAsParquetFile(schemaPeople, "people.parquet")
# Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile <- parquetFile(sqlContext, "people.parquet")
# Parquet files can also be registered as tables and then used in SQL statements.
registerTempTable(parquetFile, "parquetFile");
teenagers <- sql(sqlContext, "SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenNames <- map(teenagers, function(p) { paste("Name:", p$name)})
for (teenName in collect(teenNames)) {
cat(teenName, "\n")
}
good blog useful information
ReplyDeleteBig Data and Hadoop Online Training