Thursday, August 31, 2017

Spark-shell: put everything on one line

There are many examples of starting Spark sessions in blogs that have the session code on multiple lines.

import org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession.builder
  .master("local[*]")
  .appName("My Spark Application")
  .config("spark.sql.warehouse.dir", "c:/Temp") (1)
  .getOrCreate
Put the code on one line if the code is being executed in the spark-shell

 
val spark = SparkSession.builder.master("local[*]").appName("My Spark Application").config("spark.sql.warehouse.dir", "/tmp").getOrCreate
Otherwise the following errors will occur:

scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> val spark = SparkSession.builder
spark: org.apache.spark.sql.SparkSession.Builder = org.apache.spark.sql.SparkSession$Builder@5a20f793
scala>   .master("local[*]")
:1: error: illegal start of definition
  .master("local[*]")
  ^
scala>   .appName("My Spark Application")
:1: error: illegal start of definition
  .appName("My Spark Application")
  ^
scala>   .config("spark.sql.warehouse.dir", "/tmp") (1)
:1: error: illegal start of definition
  .config("spark.sql.warehouse.dir", "/tmp") (1)
  ^
scala>   .getOrCreate
:1: error: illegal start of definition
  .getOrCreate
  ^
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
This is the correct execution of the session code in the spark-shell
scala> val spark = SparkSession.builder.master("local[*]").appName("My Spark Application").config("spark.sql.warehouse.dir", "/tmp").getOrCreate
17/08/31 23:38:16 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@62b0792

Spark 2

Hortonworks: setting up Spark with Scala

Spark data frames/data sets

Spark2 data sets
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/8599738367597028/2201444230243967/3601578643761083/latest.html



https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-sql-DataFrame.html

Spark SQL borrowed the concept of DataFrame from pandas' DataFrame and made it immutableparallel (one machine, perhaps with many processors and cores) and distributed (many machines, perhaps with many processors and cores).
+

http://pandas.pydata.org/pandas-docs/stable/dsintro.html

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Column

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset

https://stackoverflow.com/questions/38137741/how-to-write-a-dataframe-schema-to-file-in-scala

import java.io.PrintWriter;
val filePath = "/tmp/schema_file"
new PrintWriter(filePath) { write(df.schema.treeString); close }
https://docs.databricks.com/spark/latest/spark-sql/complex-types.html#transform-complex-data-types-scala

https://www.balabit.com/blog/spark-scala-dataset-tutorial/

Tuesday, August 29, 2017

Spark and AVRO

Automatically and Elegantly flatten DataFrame in Spark SQL

https://stackoverflow.com/questions/37471346/automatically-and-elegantly-flatten-dataframe-in-spark-sql


The short answer is, there's no "accepted" way to do this, but you can do it very elegantly with a recursive function that generates your select(...) statement by walking through the DataFrame.schema.

The recursive function should return an Array[Column]. Every time the function hits a StructType, it would call itself and append the returned Array[Column] to its own Array[Column].
Something like:

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
  schema.fields.flatMap(f => {
    val colName = if (prefix == null) f.name else (prefix + "." + f.name)

    f.dataType match {
      case st: StructType => flattenSchema(st, colName)
      case _ => Array(col(colName))
    }
  })
}
 
You would then use it like this:

df.select(flattenSchema(df.schema):_*)

SOLR: search by count greater than 2

r_n:foo.com retCnt:[2 TO *]

Monday, August 28, 2017

Caused by: ERROR XSDB6: Another instance of Derby may have already booted the database /metastore_db.

https://stackoverflow.com/questions/34465516/caused-by-error-xsdb6-another-instance-of-derby-may-have-already-booted-the-da/35230223


I was getting the same error while creating Data frames on Spark Shell :
Caused by: ERROR XSDB6: Another instance of Derby may have already booted the database /metastore_db.
Cause:
I found that this is happening as there were multiple other instances of Spark-Shell already running and holding derby DB already, so when i was starting yet another Spark Shell and creating Data Frame on it using RDD.toDF() it was throwing error:
Solution:
I ran the ps command to find other instances of Spark-Shell:
ps -ef | grep spark-shell
and i killed them all using kill command:
kill -9 Spark-Shell-processID ( example: kill -9 4848)
after all the SPark-Shell instances were gone, i started a new SPark SHell and reran my Data frame function and it ran just fine :)

Scalatra

SBT G8 installation

Mac OSX: Cassandra and Scala tutorial link

Golang Receiver vs Function Argument

https://grisha.org/blog/2016/09/22/golang-receiver-vs-function/

Golang Receiver vs Function Argument

 | COMMENTS
What is the difference between a Go receiver (as in “method receiver”) and a function argument? Consider these two bits of code:
1
2
3
func (d *duck) quack() { // receiver
     // do something
}
versus
1
2
3
func quack(d *duck) { // funciton argument
    // do something
}
The “do something” part above would work exactly the same regardless of how you declare the function. Which begs the question, which should you use?
In the object-oriented world we were used to objects doing things, and in that context d.quack() may seem more intuitive or familiar than quack(d) because it “reads better”. After all, one could argue that the former is a duck quacking, but the latter reads like you’re quacking a duck, and what does that even mean? I have learned that you should not think this way in the Go universe, and here is why.
First, what is the essential difference? It is that at the time of the call, the receiver is an interface and the function to be called is determined dynamically. If you are not using interfaces, then this doesn’t matter whatsoever and the only benefit you are getting from using a method is syntactic sweetness.
But what if you need to write a test where you want to stub out quack(). If your code looks like this, then it is not possible, because methods are attached to their types inflexibly, you cannot change them, and there is no such thing as a “method variable”:
1
2
3
4
5
6
7
8
9
10
type duck struct{}

func (d *duck) quack() {
     // do something
}

// the function we are testing:
func testme(d *duck) {
    d.quack() // cannot be stubbed
}
However, if you used a function argument, it would be easy:
1
2
3
4
5
6
7
8
9
10
type duck struct{}

var quack = func(d *duck) {
     // do something
}

// the function we are testing:
func foo(d *duck) {
    quack(d)
}
Now you can assign another function to quack at test time, e.g. quack = func(d *duck) { // do something else } and all is well.
Alternatively, you can use an interface:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
type quacker interface {
    quack()
}

type duck struct{}

var func (d *duck) quack() { // satisfies quacker
     // do something
}

// the function we are testing:
func foo(d quacker) {
    d.quack()
}
Here, if we need to test foo() we can provide a different quacker.
Bottom line is that it only makes sense to use a receiver if this function is part of an interface implementation, OR if you never ever need to augment (stub) that function for testing or some other reason. As a practical matter, it seems like (contrary to how it’s done in the OO world) it is better to always start out with quack(d) rather than d.quack().