Tuesday, September 29, 2020

SPARK Dataframes example




https://medium.com/expedia-group-tech/deep-dive-into-apache-spark-array-functions-720b8fbfa729



val initial_df = Seq(
("x", 4, 1),
("x", 6, 2),
("z", 7, 3),
("a", 3, 4),
("z", 5, 2),
("x", 7, 3),
("x", 9, 7),
("z", 1, 8),
("z", 4, 9),
("z", 7, 4),
("a", 8, 5),
("a", 5, 2),
("a", 3, 8),
("x", 2, 7),
("z", 1, 9)
).toDF("col1", "col2", "col3")
// Generate Array columns 

scala> initial_df
res58: org.apache.spark.sql.DataFrame = [col1: string, col2: int ... 1 more field]

scala> initial_df.show
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   x|   4|   1|
|   x|   6|   2|
|   z|   7|   3|
|   a|   3|   4|
|   z|   5|   2|
|   x|   7|   3|
|   x|   9|   7|
|   z|   1|   8|
|   z|   4|   9|
|   z|   7|   4|
|   a|   8|   5|
|   a|   5|   2|
|   a|   3|   8|
|   x|   2|   7|
|   z|   1|   9|
+----+----+----+

scala> val full_df = (initial_df.groupBy("col1").agg(collect_list($"col2")))
full_df: org.apache.spark.sql.DataFrame = [col1: string, collect_list(col2): array<int>]

scala> val full_df = (initial_df.groupBy("col1").agg(collect_list($"col2"))).show
+----+------------------+
|col1|collect_list(col2)|
+----+------------------+
|   x|   [4, 6, 7, 9, 2]|
|   z|[7, 5, 1, 4, 7, 1]|
|   a|      [3, 8, 5, 3]|
+----+------------------+

scala> initial_df.schema
res61: org.apache.spark.sql.types.StructType = StructType(StructField(col1,StringType,true), StructField(col2,IntegerType,false), StructField(col3,IntegerType,false))

scala> initial_df.toJSON
res62: org.apache.spark.sql.Dataset[String] = [value: string]

scala> initial_df.toJSON.show
+--------------------+
|               value|
+--------------------+
|{"col1":"x","col2...|
|{"col1":"x","col2...|
|{"col1":"z","col2...|
|{"col1":"a","col2...|
|{"col1":"z","col2...|
|{"col1":"x","col2...|
|{"col1":"x","col2...|
|{"col1":"z","col2...|
|{"col1":"z","col2...|
|{"col1":"z","col2...|
|{"col1":"a","col2...|
|{"col1":"a","col2...|
|{"col1":"a","col2...|
|{"col1":"x","col2...|
|{"col1":"z","col2...|
+--------------------+

scala> val full_df = (initial_df.groupBy("col1")).agg(collect_list($"col2"))
full_df: org.apache.spark.sql.DataFrame = [col1: string, collect_list(col2): array<int>]

scala> val full_df = (initial_df.groupBy("col1")).agg(collect_list($"col2")).show
+----+------------------+
|col1|collect_list(col2)|
+----+------------------+
|   x|   [4, 6, 7, 9, 2]|
|   z|[7, 5, 1, 4, 7, 1]|
|   a|      [3, 8, 5, 3]|
+----+------------------+

scala> val full_df = (initial_df.groupBy("col1").agg(collect_list($"col2").as("array_col1"),collect_list($"col3").as("array_col2")))
full_df: org.apache.spark.sql.DataFrame = [col1: string, array_col1: array<int> ... 1 more field]

scala> full_df.show
+----+------------------+------------------+                                    
|col1|        array_col1|        array_col2|
+----+------------------+------------------+
|   x|   [4, 6, 7, 9, 2]|   [1, 2, 3, 7, 7]|
|   z|[7, 5, 1, 4, 7, 1]|[3, 2, 8, 9, 4, 9]|
|   a|      [3, 8, 5, 3]|      [4, 5, 2, 8]|
+----+------------------+------------------+


scala> initial_df.show
+----+----+----+
|col1|col2|col3|
+----+----+----+
|   x|   4|   1|
|   x|   6|   2|
|   z|   7|   3|
|   a|   3|   4|
|   z|   5|   2|
|   x|   7|   3|
|   x|   9|   7|
|   z|   1|   8|
|   z|   4|   9|
|   z|   7|   4|
|   a|   8|   5|
|   a|   5|   2|
|   a|   3|   8|
|   x|   2|   7|
|   z|   1|   9|
+----+----+----+


scala> val df = full_df.drop("array_col1")
df: org.apache.spark.sql.DataFrame = [col1: string, array_col2: array<int>]

scala> df.show
+----+------------------+
|col1|        array_col2|
+----+------------------+
|   x|   [1, 2, 3, 7, 7]|
|   z|[3, 2, 8, 9, 4, 9]|
|   a|      [4, 5, 2, 8]|
+----+------------------+

scala> val arr_contains_df = df.withColumn("result", array_contains($"array_col2", 3))
arr_contains_df: org.apache.spark.sql.DataFrame = [col1: string, array_col2: array<int> ... 1 more field]

scala> 

scala> arr_contains_df.show()
+----+------------------+------+
|col1|        array_col2|result|
+----+------------------+------+
|   x|   [1, 2, 3, 7, 7]|  true|
|   z|[3, 2, 8, 9, 4, 9]|  true|
|   a|      [4, 5, 2, 8]| false|
+----+------------------+------+


scala> val arr_distinct_df = df.withColumn("result", array_distinct($"array_col2"))
arr_distinct_df: org.apache.spark.sql.DataFrame = [col1: string, array_col2: array<int> ... 1 more field]

scala> 

scala> arr_distinct_df.show()
+----+------------------+---------------+
|col1|        array_col2|         result|
+----+------------------+---------------+
|   x|   [1, 2, 3, 7, 7]|   [1, 2, 3, 7]|
|   z|[3, 2, 8, 9, 4, 9]|[3, 2, 8, 9, 4]|
|   a|      [4, 5, 2, 8]|   [4, 5, 2, 8]|
+----+------------------+---------------+









No comments:

Post a Comment