# Assumes that you have previously installed Oracle Java 8
# Use the java install instructions at this page if you have not already installed Java 8
# Install Cassandra
curl https://www.apache.org/dist/cassandra/KEYS | sudo apt-key add -
sudo apt-get update
sudo apt-get install cassandra
# build the 2.11 Spark compatible spark-cassandra-connector library
$ sbt/sbt -Dscala-2.11=true assembly
# Assuming that you git cloned the Spark Cassandra connector code in $HOME and did an set build and spark-shell is in the path.
# Start cassandra
# To start the Apache Cassandra service on your server, you can use the following command:
sudo systemctl start cassandra.service
# To stop the service, you can use the command below:
sudo systemctl stop cassandra.service
# If the service is not already enabled on system boot, you can enable it by using the command below:
sudo systemctl enable cassandra.service
# Add a key space and table for the tutorial in the Cassandra shell : cqlsh
$ cqlsh
Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.11.1 | CQL spec 3.4.4 | Native protocol v4]
Use HELP for help.
cqlsh> CREATE KEYSPACE test_spark WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };
cqlsh> CREATE TABLE test_spark.test (value int PRIMARY KEY);
$ spark-shell
scala> sc.parallelize( 1 to 50 ).sum()
sc.parallelize( 1 to 50 ).sum()
res1: Double = 1275.0
scala> CNTL-D # to exit
# restart with the Cassandra connector jar
$ spark-shell --jars ~/spark-cassandra-connector/spark-cassandra-connector/target/full/scala-2.11/spark-cassandra-connector-assembly-2.0.5-70-g2ee41fc.jar
scala> import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf
scala> val conf = new SparkConf(true).set("spark.cassandra.connection.host","localhost")
scala> val sc = new SparkContext(conf)
scala> val test_spark_rdd = sc.cassandraTable("test_spark", "test")
test_spark_rdd: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[4] at RDD at CassandraRDD.scala:16
cqlsh:test_spark> USE test_spark;
cqlsh:test_spark> CREATE KEYSPACE spark_demo WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };
val data = sc.cassandraTable[Movie]("spark_demo", "movies")
data.foreach(println)
#output
Movie(1,Bladerunner,Scifi)
Movie(2,The Big Short,Finance)
cqlsh> CREATE TABLE test_spark.test (value int PRIMARY KEY);
cqlsh:test_spark> INSERT INTO test_spark.test (value) VALUES (1);
# In another shell start spark-shell
# In another shell start spark-shell
$ cd
$ spark-shell
scala> sc.parallelize( 1 to 50 ).sum()
sc.parallelize( 1 to 50 ).sum()
res1: Double = 1275.0
scala> CNTL-D # to exit
# restart with the Cassandra connector jar
$ spark-shell --jars ~/spark-cassandra-connector/spark-cassandra-connector/target/full/scala-2.11/spark-cassandra-connector-assembly-2.0.5-70-g2ee41fc.jar
scala> import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf
scala> val conf = new SparkConf(true).set("spark.cassandra.connection.host","localhost")
scala> val sc = new SparkContext(conf)
scala> val test_spark_rdd = sc.cassandraTable("test_spark", "test")
test_spark_rdd: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[4] at RDD at CassandraRDD.scala:16
scala> val data = sc.cassandraTable("my_keyspace", "my_table")
data: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[5] at RDD at CassandraRDD.scala:16
data: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[5] at RDD at CassandraRDD.scala:16
#########
# Start the movie tutorial
# Make the key space and table for movies
cqlsh:test_spark> USE test_spark;
cqlsh:test_spark> CREATE KEYSPACE spark_demo WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };
cqlsh:test_spark> USE spark_demo;
cqlsh:spark_demo> CREATE TABLE spark_demo.movies (id int PRIMARY KEY, title text, genres text);
cqlsh:spark_demo> CREATE TABLE spark_demo.movies (id int PRIMARY KEY, title text, genres text);
cqlsh:spark_demo> describe table movies;
cqlsh:spark_demo> INSERT INTO spark_demo.movies (id, title, genres) VALUES (1, 'Bladerunner', 'Scifi');
cqlsh:spark_demo> INSERT INTO spark_demo.movies (id, title, genres) VALUES (2, 'The Big Short', 'Finance');
cqlsh:spark_demo> SELECT * FROM spark_demo.movies ;
id | genres | title
----+---------+---------------
1 | Scifi | Bladerunner
2 | Finance | The Big Short
(2 rows)
# Spark code for movies
import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf
val conf = new SparkConf(true).set("spark.cassandra.connection.host","localhost")
case class Movie(Id: Int, Title: String, Genres: String)val conf = new SparkConf(true).set("spark.cassandra.connection.host","localhost")
val data = sc.cassandraTable("sparc_demo", "movies")
val data = sc.cassandraTable[Movie]("spark_demo", "movies")
data.foreach(println)
#output
Movie(1,Bladerunner,Scifi)
Movie(2,The Big Short,Finance)
No comments:
Post a Comment