From 19480b7e5934d920dec2f9e06e6c3b1270e2f6e9 Mon Sep 17 00:00:00 2001 From: anitatailor Date: Thu, 6 Mar 2014 08:16:13 +0530 Subject: [PATCH 1/2] Example for cassandra CQL read/write from spark --- .../spark/examples/CassandraCQLTest.scala | 124 ++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala new file mode 100644 index 0000000000000..d0228c21d1ed0 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.hadoop.mapreduce.Job +import org.apache.cassandra.hadoop.ConfigHelper +import org.apache.cassandra.utils.ByteBufferUtil +import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat +import org.apache.cassandra.db.IColumn +import org.apache.cassandra.hadoop.cql3.CqlConfigHelper +import org.apache.cassandra.hadoop.cql3.CqlOutputFormat +import scala.collection.JavaConversions._ +import java.nio.ByteBuffer +import scala.collection.mutable.ListBuffer +import scala.collection.immutable.Map + +/* + Need to create following keyspace and column family in cassandra before running this example + Start CQL shell using ./bin/cqlsh and execute following commands + CREATE KEYSPACE retail WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; + use retail; + + CREATE TABLE salecount (product_id text,sale_count int, PRIMARY KEY (product_id)); + CREATE TABLE ordercf (user_id text, + time timestamp, + product_id text, + quantity int, + PRIMARY KEY (user_id, time)); + INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('bob', 1385983646000, 'iphone', 1); + INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('tom', 1385983647000, 'samsung', 4); + INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('dora', 1385983648000, 'nokia', 2); + INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('charlie', 1385983649000, 'iphone', 2); +*/ + +/* + * This example demonstrates how to read and write to cassandra column family created using CQL3 + * using Spark. + * Parameters : + * Usage: ./bin/run-example org.apache.spark.examples.CassandraCQLTest local[2] localhost 9160 + * + */ +object CassandraCQLTest { + + def main(args: Array[String]) { + val sc = new SparkContext(args(0), "CQLTestApp", System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass)) + val cHost: String = args(1) + val cPort: String = args(2) + val KeySpace = "retail" + val InputColumnFamily = "ordercf" + val OutputColumnFamily = "salecount" + + val job = new Job() + job.setInputFormatClass(classOf[CqlPagingInputFormat]) + ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost) + ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort) + ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace, InputColumnFamily) + ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner") + CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3"); + + // CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "user_id='bob'"); + + // An UPDATE writes one or more columns to a record in a Cassandra column family. + val query:String = "UPDATE " + KeySpace + "." + OutputColumnFamily + " SET sale_count = ? " + CqlConfigHelper.setOutputCql(job.getConfiguration(), query); + + job.setOutputFormatClass(classOf[CqlOutputFormat]); + ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace, OutputColumnFamily); + ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost); + ConfigHelper.setOutputRpcPort(job.getConfiguration(), cPort) + ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner"); + + val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(), + classOf[CqlPagingInputFormat], + classOf[java.util.Map[String,ByteBuffer]], + classOf[java.util.Map[String,ByteBuffer]]) + + println("Count: " + casRdd.count); + val productSaleRDD = casRdd.map { + case (key, value) => { + (ByteBufferUtil.string(value.get("product_id")),ByteBufferUtil.toInt(value.get("quantity"))) + } + } + val aggregatedRDD = productSaleRDD.reduceByKey(_+_) + aggregatedRDD.collect().foreach { + case (productId, saleCount) => println(productId + ":" + saleCount) + } + + val casoutputCF = aggregatedRDD.map { + case (productId, saleCount) => { + val outColFamKey = Map("product_id" ->ByteBufferUtil.bytes(productId)) + val outKey : java.util.Map[String, ByteBuffer] = outColFamKey + var outColFamVal = new ListBuffer[ByteBuffer] + outColFamVal += ByteBufferUtil.bytes(saleCount) + val outVal : java.util.List[ByteBuffer] = outColFamVal + (outKey,outVal) + } + } + + casoutputCF.saveAsNewAPIHadoopFile( + KeySpace, + classOf[java.util.Map[String, ByteBuffer]], + classOf[java.util.List[ByteBuffer]], + classOf[CqlOutputFormat], + job.getConfiguration() + ) + } +} From 3493f81e81b783afa35cd996219bfe36db383c1b Mon Sep 17 00:00:00 2001 From: anitatailor Date: Thu, 6 Mar 2014 12:31:37 +0530 Subject: [PATCH 2/2] Fixed scala style as per review --- .../spark/examples/CassandraCQLTest.scala | 83 +++++++++++-------- 1 file changed, 48 insertions(+), 35 deletions(-) diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala index d0228c21d1ed0..ee283ce6abac2 100644 --- a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala @@ -17,39 +17,49 @@ package org.apache.spark.examples -import org.apache.spark.SparkContext -import org.apache.spark.SparkContext._ -import org.apache.hadoop.mapreduce.Job +import java.nio.ByteBuffer +import scala.collection.JavaConversions._ +import scala.collection.mutable.ListBuffer +import scala.collection.immutable.Map import org.apache.cassandra.hadoop.ConfigHelper -import org.apache.cassandra.utils.ByteBufferUtil import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat -import org.apache.cassandra.db.IColumn import org.apache.cassandra.hadoop.cql3.CqlConfigHelper import org.apache.cassandra.hadoop.cql3.CqlOutputFormat -import scala.collection.JavaConversions._ -import java.nio.ByteBuffer -import scala.collection.mutable.ListBuffer -import scala.collection.immutable.Map +import org.apache.cassandra.utils.ByteBufferUtil +import org.apache.hadoop.mapreduce.Job +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ /* Need to create following keyspace and column family in cassandra before running this example Start CQL shell using ./bin/cqlsh and execute following commands CREATE KEYSPACE retail WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; use retail; - - CREATE TABLE salecount (product_id text,sale_count int, PRIMARY KEY (product_id)); + CREATE TABLE salecount (prod_id text, sale_count int, PRIMARY KEY (prod_id)); CREATE TABLE ordercf (user_id text, time timestamp, - product_id text, + prod_id text, quantity int, PRIMARY KEY (user_id, time)); - INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('bob', 1385983646000, 'iphone', 1); - INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('tom', 1385983647000, 'samsung', 4); - INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('dora', 1385983648000, 'nokia', 2); - INSERT INTO ordercf (user_id, time, product_id, quantity) VALUES ('charlie', 1385983649000, 'iphone', 2); + INSERT INTO ordercf (user_id, + time, + prod_id, + quantity) VALUES ('bob', 1385983646000, 'iphone', 1); + INSERT INTO ordercf (user_id, + time, + prod_id, + quantity) VALUES ('tom', 1385983647000, 'samsung', 4); + INSERT INTO ordercf (user_id, + time, + prod_id, + quantity) VALUES ('dora', 1385983648000, 'nokia', 2); + INSERT INTO ordercf (user_id, + time, + prod_id, + quantity) VALUES ('charlie', 1385983649000, 'iphone', 2); */ -/* +/** * This example demonstrates how to read and write to cassandra column family created using CQL3 * using Spark. * Parameters : @@ -59,7 +69,10 @@ import scala.collection.immutable.Map object CassandraCQLTest { def main(args: Array[String]) { - val sc = new SparkContext(args(0), "CQLTestApp", System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass)) + val sc = new SparkContext(args(0), + "CQLTestApp", + System.getenv("SPARK_HOME"), + SparkContext.jarOfClass(this.getClass)) val cHost: String = args(1) val cPort: String = args(2) val KeySpace = "retail" @@ -72,44 +85,44 @@ object CassandraCQLTest { ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort) ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace, InputColumnFamily) ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner") - CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3"); + CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3") - // CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "user_id='bob'"); + /** CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "user_id='bob'") */ - // An UPDATE writes one or more columns to a record in a Cassandra column family. - val query:String = "UPDATE " + KeySpace + "." + OutputColumnFamily + " SET sale_count = ? " - CqlConfigHelper.setOutputCql(job.getConfiguration(), query); + /** An UPDATE writes one or more columns to a record in a Cassandra column family */ + val query = "UPDATE " + KeySpace + "." + OutputColumnFamily + " SET sale_count = ? " + CqlConfigHelper.setOutputCql(job.getConfiguration(), query) - job.setOutputFormatClass(classOf[CqlOutputFormat]); - ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace, OutputColumnFamily); - ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost); + job.setOutputFormatClass(classOf[CqlOutputFormat]) + ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace, OutputColumnFamily) + ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost) ConfigHelper.setOutputRpcPort(job.getConfiguration(), cPort) - ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner"); + ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner") val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(), classOf[CqlPagingInputFormat], classOf[java.util.Map[String,ByteBuffer]], classOf[java.util.Map[String,ByteBuffer]]) - println("Count: " + casRdd.count); + println("Count: " + casRdd.count) val productSaleRDD = casRdd.map { case (key, value) => { - (ByteBufferUtil.string(value.get("product_id")),ByteBufferUtil.toInt(value.get("quantity"))) + (ByteBufferUtil.string(value.get("prod_id")), ByteBufferUtil.toInt(value.get("quantity"))) } } - val aggregatedRDD = productSaleRDD.reduceByKey(_+_) + val aggregatedRDD = productSaleRDD.reduceByKey(_ + _) aggregatedRDD.collect().foreach { case (productId, saleCount) => println(productId + ":" + saleCount) } val casoutputCF = aggregatedRDD.map { case (productId, saleCount) => { - val outColFamKey = Map("product_id" ->ByteBufferUtil.bytes(productId)) - val outKey : java.util.Map[String, ByteBuffer] = outColFamKey - var outColFamVal = new ListBuffer[ByteBuffer] + val outColFamKey = Map("prod_id" -> ByteBufferUtil.bytes(productId)) + val outKey: java.util.Map[String, ByteBuffer] = outColFamKey + var outColFamVal = new ListBuffer[ByteBuffer] outColFamVal += ByteBufferUtil.bytes(saleCount) - val outVal : java.util.List[ByteBuffer] = outColFamVal - (outKey,outVal) + val outVal: java.util.List[ByteBuffer] = outColFamVal + (outKey, outVal) } }