Thursday, June 29, 2017

SPARK Dataframe filters


https://stackoverflow.com/questions/42951905/spark-dataframe-filter

Creating a JSON Dataframe testcase

val df = sc.parallelize(Seq((1,"Emailab"), (2,"Phoneab"), (3, "Faxab"),(4,"Mail"),(5,"Other"),(6,"MSL12"),(7,"MSL"),(8,"HCP"),(9,"HCP12"))).toDF("c1","c2")

+---+-------+
| c1|     c2|
+---+-------+
|  1|Emailab|
|  2|Phoneab|
|  3|  Faxab|
|  4|   Mail|
|  5|  Other|
|  6|  MSL12|
|  7|    MSL|
|  8|    HCP|
|  9|  HCP12|
+---+-------+
scala> df.show()
+---+-------+
| c1|     c2|
+---+-------+
|  1|Emailab|
|  2|Phoneab|
|  3|  Faxab|
|  4|   Mail|
|  5|  Other|
|  6|  MSL12|
|  7|    MSL|
|  8|    HCP|
|  9|  HCP12|
+---+-------+


scala> df.filter($"c2".like("HCP")).show()
+---+---+
| c1| c2|
+---+---+
|  8|HCP|

+---+---+



scala> df.filter($"c2".like("HC")).show()

+---+---+

| c1| c2|

+---+---+
+---+---+


scala> df.filter($"c2".rlike("HC")).show()
+---+-----+
| c1|   c2|
+---+-----+
|  8|  HCP|
|  9|HCP12|
+---+-----+


scala> df.filter(df("c2")==="HCP").show()
+---+---+
| c1| c2|
+---+---+
|  8|HCP|

+---+---+


scala> df.filter($"c2".contains("HCP")).show()
+---+-----+
| c1|   c2|
+---+-----+
|  8|  HCP|
|  9|HCP12|
+---+-----+

https://www.tutorialspoint.com/spark_sql/spark_sql_dataframes.htm

Put this in employees.json (SPARK JSON format is different from standard JSON format-no commas between records and no square braces for lists of records) one JSON record per line.


   {"id" : "1201", "name" : "satish", "age" : "25"}
   {"id" : "1202", "name" : "krishna", "age" : "28"}
   {"id" : "1203", "name" : "amith", "age" : "39"}
   {"id" : "1204", "name" : "javed", "age" : "23"}
   {"id" : "1205", "name" : "prudvi", "age" : "23"}


val dfs = spark.read.json("employee.json")

dfs.printSchema()

dfs.select("name").show()

dfs.filter(dfs("age") > 23).show()

dfs.groupBy("age").count().show()

scala> val dfs = spark.read.json("employee.json")
dfs: org.apache.spark.sql.DataFrame = [age: string, id: string ... 1 more field]

scala> 

scala> dfs.printSchema()
root
 |-- age: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)


scala> 

scala> dfs.select("name").show()
+-------+
|   name|
+-------+
| satish|
|krishna|
|  amith|
|  javed|
| prudvi|
+-------+


scala> 

scala> dfs.filter(dfs("age") > 23).show()
+---+----+-------+
|age|  id|   name|
+---+----+-------+
| 25|1201| satish|
| 28|1202|krishna|
| 39|1203|  amith|
+---+----+-------+






Wednesday, June 28, 2017

SPARK SQL and Dataframe links

http://blog.antlypls.com/blog/2016/01/30/processing-json-data-with-sparksql/


// construct RDD[Sting] val events = sc.parallelize( """{"action":"create","timestamp":"2016-01-07T00:01:17Z"}""" :: Nil) // read it val df = sqlContext.read.json(events)


scala > df.show
+------+--------------------+
|action|           timestamp|
+------+--------------------+
|create|2016-01-07T00:01:17Z|
+------+--------------------+

scala>; df.printSchema
root
 |-- action: string (nullable = true)
 |-- timestamp: string (nullable = true)
val schema = (new StructType).add("action", StringType).add("timestamp", TimestampType) val df = sqlContext.read.schema(schema).json(events) df.show // +------+--------------------+ // |action| timestamp| // +------+--------------------+ // |create|2016-01-07 01:01:...| // +------+--------------------+





val events = sc.parallelize(
  """{"action":"create","timestamp":1452121277}""" ::
  """{"action":"create","timestamp":"1452121277"}""" ::
  """{"action":"create","timestamp":""}""" ::
  """{"action":"create","timestamp":null}""" ::
  """{"action":"create","timestamp":"null"}""" ::
  Nil
)

val schema = (new StructType).add("action", StringType).add("timestamp", LongType)

sqlContext.read.schema(schema).json(events).show

// +------+----------+
// |action| timestamp|
// +------+----------+
// |create|1452121277|
// |  null|      null|
// |create|      null|
// |create|      null|
// |  null|      null|
// +------+----------+
https://www.supergloo.com/fieldnotes/spark-sql-json-examples/

[{
"Year": "2013",
"First Name": "DAVID",
"County": "KINGS",
"Sex": "M",
"Count": "272"
}, {
"Year": "2013",
"First Name": "JAYDEN",
"County": "KINGS",
"Sex": "M",
"Count": "268"
}, {
"Year": "2013",
"First Name": "JAYDEN",
"County": "QUEENS",
"Sex": "M",
"Count": "219"
}, {
"Year": "2013",
"First Name": "MOSHE",
"County": "KINGS",
"Sex": "M",
"Count": "219"
}, {
"Year": "2013",
"First Name": "ETHAN",
"County": "QUEENS",
"Sex": "M",
"Count": "216"
}]

STEPS

1. Start the spark-shell from the same directory containing the baby_names.json file
2. Load the JSON using the Spark Context wholeTextFiles method which produces a PairRDD.  Use map to create the new RDD using the value portion of the pair.
3. Read in this RDD as JSON and confirm the schema


https://medium.com/@InDataLabs/converting-spark-rdd-to-dataframe-and-dataset-expert-opinion-826db069eb5

https://www.tutorialspoint.com/spark_sql/spark_sql_dataframes.htm