Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,10 @@ object BroadcastTest {

val blockSize = if (args.length > 2) args(2) else "4096"

val sparkConf = new SparkConf()
.set("spark.broadcast.blockSize", blockSize)

val spark = SparkSession
.builder
.config(sparkConf)
.builder()
.appName("Broadcast Test")
.config("spark.broadcast.blockSize", blockSize)
.getOrCreate()

val sc = spark.sparkContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ object LDAExample {

val spark = SparkSession
.builder
.sparkContext(sc)
.getOrCreate()
import spark.implicits._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.io.File

import com.google.common.io.{ByteStreams, Files}

import org.apache.spark.SparkConf
import org.apache.spark.sql._

object HiveFromSpark {
Expand All @@ -35,8 +34,6 @@ object HiveFromSpark {
ByteStreams.copy(kv1Stream, Files.newOutputStreamSupplier(kv1File))

def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("HiveFromSpark")

// When working with Hive, one must instantiate `SparkSession` with Hive support, including
// connectivity to a persistent Hive metastore, support for Hive serdes, and Hive user-defined
// functions. Users who do not have an existing Hive deployment can still enable Hive support.
Expand All @@ -45,7 +42,7 @@ object HiveFromSpark {
// which defaults to the directory `spark-warehouse` in the current directory that the spark
// application is started.
val spark = SparkSession.builder
.config(sparkConf)
.appName("HiveFromSpark")
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1177,7 +1177,7 @@ private[python] class PythonMLLibAPI extends Serializable {
// We use DataFrames for serialization of IndexedRows to Python,
// so return a DataFrame.
val sc = indexedRowMatrix.rows.sparkContext
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
spark.createDataFrame(indexedRowMatrix.rows)
}

Expand All @@ -1188,7 +1188,7 @@ private[python] class PythonMLLibAPI extends Serializable {
// We use DataFrames for serialization of MatrixEntry entries to
// Python, so return a DataFrame.
val sc = coordinateMatrix.entries.sparkContext
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
spark.createDataFrame(coordinateMatrix.entries)
}

Expand All @@ -1199,7 +1199,7 @@ private[python] class PythonMLLibAPI extends Serializable {
// We use DataFrames for serialization of sub-matrix blocks to
// Python, so return a DataFrame.
val sc = blockMatrix.blocks.sparkContext
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
spark.createDataFrame(blockMatrix.blocks)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ class LogisticRegressionWithLBFGS
lr.setMaxIter(optimizer.getNumIterations())
lr.setTol(optimizer.getConvergenceTol())
// Convert our input into a DataFrame
val spark = SparkSession.builder().config(input.context.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(input.context).getOrCreate()
val df = spark.createDataFrame(input.map(_.asML))
// Determine if we should cache the DF
val handlePersistence = input.getStorageLevel == StorageLevel.NONE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
modelType: String)

def save(sc: SparkContext, path: String, data: Data): Unit = {
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()

// Create JSON metadata.
val metadata = compact(render(
Expand All @@ -207,7 +207,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {

@Since("1.3.0")
def load(sc: SparkContext, path: String): NaiveBayesModel = {
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
// Load Parquet data.
val dataRDD = spark.read.parquet(dataPath(path))
// Check schema explicitly since erasure makes it hard to use match-case for checking.
Expand Down Expand Up @@ -238,7 +238,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
theta: Array[Array[Double]])

def save(sc: SparkContext, path: String, data: Data): Unit = {
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()

// Create JSON metadata.
val metadata = compact(render(
Expand All @@ -251,7 +251,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
}

def load(sc: SparkContext, path: String): NaiveBayesModel = {
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
// Load Parquet data.
val dataRDD = spark.read.parquet(dataPath(path))
// Check schema explicitly since erasure makes it hard to use match-case for checking.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private[classification] object GLMClassificationModel {
weights: Vector,
intercept: Double,
threshold: Option[Double]): Unit = {
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()

// Create JSON metadata.
val metadata = compact(render(
Expand All @@ -73,7 +73,7 @@ private[classification] object GLMClassificationModel {
*/
def loadData(sc: SparkContext, path: String, modelClass: String): Data = {
val dataPath = Loader.dataPath(path)
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
val dataRDD = spark.read.parquet(dataPath)
val dataArray = dataRDD.select("weights", "intercept", "threshold").take(1)
assert(dataArray.length == 1, s"Unable to load $modelClass data from: $dataPath")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] {
val thisClassName = "org.apache.spark.mllib.clustering.BisectingKMeansModel"

def save(sc: SparkContext, model: BisectingKMeansModel, path: String): Unit = {
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion)
~ ("rootId" -> model.root.index)))
Expand All @@ -165,7 +165,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] {
}

def load(sc: SparkContext, path: String, rootId: Int): BisectingKMeansModel = {
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
val rows = spark.read.parquet(Loader.dataPath(path))
Loader.checkSchema[Data](rows.schema)
val data = rows.select("index", "size", "center", "norm", "cost", "height", "children")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] {
path: String,
weights: Array[Double],
gaussians: Array[MultivariateGaussian]): Unit = {
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()

// Create JSON metadata.
val metadata = compact(render
Expand All @@ -159,7 +159,7 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] {

def load(sc: SparkContext, path: String): GaussianMixtureModel = {
val dataPath = Loader.dataPath(path)
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
val dataFrame = spark.read.parquet(dataPath)
// Check schema explicitly since erasure makes it hard to use match-case for checking.
Loader.checkSchema[Data](dataFrame.schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ object KMeansModel extends Loader[KMeansModel] {
val thisClassName = "org.apache.spark.mllib.clustering.KMeansModel"

def save(sc: SparkContext, model: KMeansModel, path: String): Unit = {
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("k" -> model.k)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
Expand All @@ -135,7 +135,7 @@ object KMeansModel extends Loader[KMeansModel] {

def load(sc: SparkContext, path: String): KMeansModel = {
implicit val formats = DefaultFormats
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path)
assert(className == thisClassName)
assert(formatVersion == thisFormatVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ object LocalLDAModel extends Loader[LocalLDAModel] {
docConcentration: Vector,
topicConcentration: Double,
gammaShape: Double): Unit = {
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
val k = topicsMatrix.numCols
val metadata = compact(render
(("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~
Expand All @@ -470,7 +470,7 @@ object LocalLDAModel extends Loader[LocalLDAModel] {
topicConcentration: Double,
gammaShape: Double): LocalLDAModel = {
val dataPath = Loader.dataPath(path)
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
val dataFrame = spark.read.parquet(dataPath)

Loader.checkSchema[Data](dataFrame.schema)
Expand Down Expand Up @@ -851,7 +851,7 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] {
topicConcentration: Double,
iterationTimes: Array[Double],
gammaShape: Double): Unit = {
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()

val metadata = compact(render
(("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~
Expand Down Expand Up @@ -887,7 +887,7 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] {
val dataPath = new Path(Loader.dataPath(path), "globalTopicTotals").toUri.toString
val vertexDataPath = new Path(Loader.dataPath(path), "topicCounts").toUri.toString
val edgeDataPath = new Path(Loader.dataPath(path), "tokenCounts").toUri.toString
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
val dataFrame = spark.read.parquet(dataPath)
val vertexDataFrame = spark.read.parquet(vertexDataPath)
val edgeDataFrame = spark.read.parquet(edgeDataPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode

@Since("1.4.0")
def save(sc: SparkContext, model: PowerIterationClusteringModel, path: String): Unit = {
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()

val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("k" -> model.k)))
Expand All @@ -82,7 +82,7 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode
@Since("1.4.0")
def load(sc: SparkContext, path: String): PowerIterationClusteringModel = {
implicit val formats = DefaultFormats
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()

val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path)
assert(className == thisClassName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ object ChiSqSelectorModel extends Loader[ChiSqSelectorModel] {
val thisClassName = "org.apache.spark.mllib.feature.ChiSqSelectorModel"

def save(sc: SparkContext, model: ChiSqSelectorModel, path: String): Unit = {
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()

val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion)))
Expand All @@ -149,7 +149,7 @@ object ChiSqSelectorModel extends Loader[ChiSqSelectorModel] {

def load(sc: SparkContext, path: String): ChiSqSelectorModel = {
implicit val formats = DefaultFormats
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path)
assert(className == thisClassName)
assert(formatVersion == thisFormatVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ object Word2VecModel extends Loader[Word2VecModel] {
case class Data(word: String, vector: Array[Float])

def load(sc: SparkContext, path: String): Word2VecModel = {
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
val dataFrame = spark.read.parquet(Loader.dataPath(path))
// Check schema explicitly since erasure makes it hard to use match-case for checking.
Loader.checkSchema[Data](dataFrame.schema)
Expand All @@ -620,7 +620,7 @@ object Word2VecModel extends Loader[Word2VecModel] {
}

def save(sc: SparkContext, path: String, model: Map[String, Array[Float]]): Unit = {
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()

val vectorSize = model.values.head.length
val numWords = model.size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ object FPGrowthModel extends Loader[FPGrowthModel[_]] {

def save(model: FPGrowthModel[_], path: String): Unit = {
val sc = model.freqItemsets.sparkContext
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()

val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion)))
Expand All @@ -123,7 +123,7 @@ object FPGrowthModel extends Loader[FPGrowthModel[_]] {

def load(sc: SparkContext, path: String): FPGrowthModel[_] = {
implicit val formats = DefaultFormats
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()

val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path)
assert(className == thisClassName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ object PrefixSpanModel extends Loader[PrefixSpanModel[_]] {

def save(model: PrefixSpanModel[_], path: String): Unit = {
val sc = model.freqSequences.sparkContext
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()

val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion)))
Expand All @@ -640,7 +640,7 @@ object PrefixSpanModel extends Loader[PrefixSpanModel[_]] {

def load(sc: SparkContext, path: String): PrefixSpanModel[_] = {
implicit val formats = DefaultFormats
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()

val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path)
assert(className == thisClassName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
*/
def save(model: MatrixFactorizationModel, path: String): Unit = {
val sc = model.userFeatures.sparkContext
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
import spark.implicits._
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rank" -> model.rank)))
Expand All @@ -365,7 +365,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {

def load(sc: SparkContext, path: String): MatrixFactorizationModel = {
implicit val formats = DefaultFormats
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
val (className, formatVersion, metadata) = loadMetadata(sc, path)
assert(className == thisClassName)
assert(formatVersion == thisFormatVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] {
boundaries: Array[Double],
predictions: Array[Double],
isotonic: Boolean): Unit = {
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()

val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~
Expand All @@ -198,7 +198,7 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] {
}

def load(sc: SparkContext, path: String): (Array[Double], Array[Double]) = {
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
val dataRDD = spark.read.parquet(dataPath(path))

checkSchema[Data](dataRDD.schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private[regression] object GLMRegressionModel {
modelClass: String,
weights: Vector,
intercept: Double): Unit = {
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()

// Create JSON metadata.
val metadata = compact(render(
Expand All @@ -68,7 +68,7 @@ private[regression] object GLMRegressionModel {
*/
def loadData(sc: SparkContext, path: String, modelClass: String, numFeatures: Int): Data = {
val dataPath = Loader.dataPath(path)
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
val dataRDD = spark.read.parquet(dataPath)
val dataArray = dataRDD.select("weights", "intercept").take(1)
assert(dataArray.length == 1, s"Unable to load $modelClass data from: $dataPath")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,13 +233,13 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] with Logging {
// Create Parquet data.
val nodes = model.topNode.subtreeIterator.toSeq
val dataRDD = sc.parallelize(nodes).map(NodeData.apply(0, _))
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
spark.createDataFrame(dataRDD).write.parquet(Loader.dataPath(path))
}

def load(sc: SparkContext, path: String, algo: String, numNodes: Int): DecisionTreeModel = {
// Load Parquet data.
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
val dataPath = Loader.dataPath(path)
val dataRDD = spark.read.parquet(dataPath)
// Check schema explicitly since erasure makes it hard to use match-case for checking.
Expand Down
Loading