https://medium.com/expedia-group-tech/deep-dive-into-apache-spark-array-functions-720b8fbfa729
val initial_df = Seq(
("x", 4, 1),
("x", 6, 2),
("z", 7, 3),
("a", 3, 4),
("z", 5, 2),
("x", 7, 3),
("x", 9, 7),
("z", 1, 8),
("z", 4, 9),
("z", 7, 4),
("a", 8, 5),
("a", 5, 2),
("a", 3, 8),
("x", 2, 7),
("z", 1, 9)
).toDF("col1", "col2", "col3")
// Generate Array columns
scala> initial_df
res58: org.apache.spark.sql.DataFrame = [col1: string, col2: int ... 1 more field]
scala> initial_df.show
+----+----+----+
|col1|col2|col3|
+----+----+----+
| x| 4| 1|
| x| 6| 2|
| z| 7| 3|
| a| 3| 4|
| z| 5| 2|
| x| 7| 3|
| x| 9| 7|
| z| 1| 8|
| z| 4| 9|
| z| 7| 4|
| a| 8| 5|
| a| 5| 2|
| a| 3| 8|
| x| 2| 7|
| z| 1| 9|
+----+----+----+
scala> val full_df = (initial_df.groupBy("col1").agg(collect_list($"col2")))
full_df: org.apache.spark.sql.DataFrame = [col1: string, collect_list(col2): array<int>]
scala> val full_df = (initial_df.groupBy("col1").agg(collect_list($"col2"))).show
+----+------------------+
|col1|collect_list(col2)|
+----+------------------+
| x| [4, 6, 7, 9, 2]|
| z|[7, 5, 1, 4, 7, 1]|
| a| [3, 8, 5, 3]|
+----+------------------+
scala> initial_df.schema
res61: org.apache.spark.sql.types.StructType = StructType(StructField(col1,StringType,true), StructField(col2,IntegerType,false), StructField(col3,IntegerType,false))
scala> initial_df.toJSON
res62: org.apache.spark.sql.Dataset[String] = [value: string]
scala> initial_df.toJSON.show
+--------------------+
| value|
+--------------------+
|{"col1":"x","col2...|
|{"col1":"x","col2...|
|{"col1":"z","col2...|
|{"col1":"a","col2...|
|{"col1":"z","col2...|
|{"col1":"x","col2...|
|{"col1":"x","col2...|
|{"col1":"z","col2...|
|{"col1":"z","col2...|
|{"col1":"z","col2...|
|{"col1":"a","col2...|
|{"col1":"a","col2...|
|{"col1":"a","col2...|
|{"col1":"x","col2...|
|{"col1":"z","col2...|
+--------------------+
scala> val full_df = (initial_df.groupBy("col1")).agg(collect_list($"col2"))
full_df: org.apache.spark.sql.DataFrame = [col1: string, collect_list(col2): array<int>]
scala> val full_df = (initial_df.groupBy("col1")).agg(collect_list($"col2")).show
+----+------------------+
|col1|collect_list(col2)|
+----+------------------+
| x| [4, 6, 7, 9, 2]|
| z|[7, 5, 1, 4, 7, 1]|
| a| [3, 8, 5, 3]|
+----+------------------+
scala> val full_df = (initial_df.groupBy("col1").agg(collect_list($"col2").as("array_col1"),collect_list($"col3").as("array_col2")))
full_df: org.apache.spark.sql.DataFrame = [col1: string, array_col1: array<int> ... 1 more field]
scala> full_df.show
+----+------------------+------------------+
|col1| array_col1| array_col2|
+----+------------------+------------------+
| x| [4, 6, 7, 9, 2]| [1, 2, 3, 7, 7]|
| z|[7, 5, 1, 4, 7, 1]|[3, 2, 8, 9, 4, 9]|
| a| [3, 8, 5, 3]| [4, 5, 2, 8]|
+----+------------------+------------------+
scala> initial_df.show
+----+----+----+
|col1|col2|col3|
+----+----+----+
| x| 4| 1|
| x| 6| 2|
| z| 7| 3|
| a| 3| 4|
| z| 5| 2|
| x| 7| 3|
| x| 9| 7|
| z| 1| 8|
| z| 4| 9|
| z| 7| 4|
| a| 8| 5|
| a| 5| 2|
| a| 3| 8|
| x| 2| 7|
| z| 1| 9|
+----+----+----+
scala> val df = full_df.drop("array_col1")
df: org.apache.spark.sql.DataFrame = [col1: string, array_col2: array<int>]
scala> df.show
+----+------------------+
|col1| array_col2|
+----+------------------+
| x| [1, 2, 3, 7, 7]|
| z|[3, 2, 8, 9, 4, 9]|
| a| [4, 5, 2, 8]|
+----+------------------+
scala> val arr_contains_df = df.withColumn("result", array_contains($"array_col2", 3))
arr_contains_df: org.apache.spark.sql.DataFrame = [col1: string, array_col2: array<int> ... 1 more field]
scala>
scala> arr_contains_df.show()
+----+------------------+------+
|col1| array_col2|result|
+----+------------------+------+
| x| [1, 2, 3, 7, 7]| true|
| z|[3, 2, 8, 9, 4, 9]| true|
| a| [4, 5, 2, 8]| false|
+----+------------------+------+
scala> val arr_distinct_df = df.withColumn("result", array_distinct($"array_col2"))
arr_distinct_df: org.apache.spark.sql.DataFrame = [col1: string, array_col2: array<int> ... 1 more field]
scala>
scala> arr_distinct_df.show()
+----+------------------+---------------+
|col1| array_col2| result|
+----+------------------+---------------+
| x| [1, 2, 3, 7, 7]| [1, 2, 3, 7]|
| z|[3, 2, 8, 9, 4, 9]|[3, 2, 8, 9, 4]|
| a| [4, 5, 2, 8]| [4, 5, 2, 8]|
+----+------------------+---------------+
No comments:
Post a Comment