Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ importFrom("utils", "download.file", "object.size", "packageVersion", "tail", "u

# S3 methods exported
export("sparkR.session")
export("sparkR.init")
export("sparkR.stop")
export("sparkR.session.stop")
export("sparkR.stop")
export("sparkR.conf")
export("sparkR.version")
export("sparkR.uiWebUrl")
Expand All @@ -42,9 +41,6 @@ export("sparkR.callJStatic")

export("install.spark")

export("sparkRSQL.init",
"sparkRHive.init")

# MLlib integration
exportMethods("glm",
"spark.glm",
Expand Down Expand Up @@ -151,15 +147,13 @@ exportMethods("arrange",
"printSchema",
"randomSplit",
"rbind",
"registerTempTable",
"rename",
"repartition",
"repartitionByRange",
"rollup",
"sample",
"sample_frac",
"sampleBy",
"saveAsParquetFile",
"saveAsTable",
"saveDF",
"schema",
Expand All @@ -175,7 +169,6 @@ exportMethods("arrange",
"toJSON",
"transform",
"union",
"unionAll",
"unionByName",
"unique",
"unpersist",
Expand Down Expand Up @@ -415,10 +408,8 @@ export("as.DataFrame",
"cacheTable",
"clearCache",
"createDataFrame",
"createExternalTable",
"createTable",
"currentDatabase",
"dropTempTable",
"dropTempView",
"listColumns",
"listDatabases",
Expand Down
52 changes: 1 addition & 51 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ setMethod("showDF",
#' show
#'
#' If eager evaluation is enabled and the Spark object is a SparkDataFrame, evaluate the
#' SparkDataFrame and print top rows of the SparkDataFrame, otherwise, print the class
#' SparkDataFrame and print top rows of the SparkDataFrame, otherwise, print the class
#' and type information of the Spark object.
#'
#' @param object a Spark object. Can be a SparkDataFrame, Column, GroupedData, WindowSpec.
Expand Down Expand Up @@ -521,32 +521,6 @@ setMethod("createOrReplaceTempView",
invisible(callJMethod(x@sdf, "createOrReplaceTempView", viewName))
})

#' (Deprecated) Register Temporary Table
#'
#' Registers a SparkDataFrame as a Temporary Table in the SparkSession
#' @param x A SparkDataFrame
#' @param tableName A character vector containing the name of the table
#'
#' @seealso \link{createOrReplaceTempView}
#' @rdname registerTempTable-deprecated
#' @name registerTempTable
#' @aliases registerTempTable,SparkDataFrame,character-method
#' @examples
#'\dontrun{
#' sparkR.session()
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' registerTempTable(df, "json_df")
#' new_df <- sql("SELECT * FROM json_df")
#'}
#' @note registerTempTable since 1.4.0
setMethod("registerTempTable",
signature(x = "SparkDataFrame", tableName = "character"),
function(x, tableName) {
.Deprecated("createOrReplaceTempView")
invisible(callJMethod(x@sdf, "createOrReplaceTempView", tableName))
})

#' insertInto
#'
#' Insert the contents of a SparkDataFrame into a table registered in the current SparkSession.
Expand Down Expand Up @@ -956,7 +930,6 @@ setMethod("write.orc",
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' write.parquet(df, "/tmp/sparkr-tmp1/")
#' saveAsParquetFile(df, "/tmp/sparkr-tmp2/")
#'}
#' @note write.parquet since 1.6.0
setMethod("write.parquet",
Expand All @@ -967,17 +940,6 @@ setMethod("write.parquet",
invisible(handledCallJMethod(write, "parquet", path))
})

#' @rdname write.parquet
#' @name saveAsParquetFile
#' @aliases saveAsParquetFile,SparkDataFrame,character-method
#' @note saveAsParquetFile since 1.4.0
setMethod("saveAsParquetFile",
signature(x = "SparkDataFrame", path = "character"),
function(x, path) {
.Deprecated("write.parquet")
write.parquet(x, path)
})

#' Save the content of SparkDataFrame in a text file at the specified path.
#'
#' Save the content of the SparkDataFrame in a text file at the specified path.
Expand Down Expand Up @@ -2762,18 +2724,6 @@ setMethod("union",
dataFrame(unioned)
})

#' unionAll is deprecated - use union instead
#' @rdname union
#' @name unionAll
#' @aliases unionAll,SparkDataFrame,SparkDataFrame-method
#' @note unionAll since 1.4.0
setMethod("unionAll",
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
function(x, y) {
.Deprecated("union")
union(x, y)
})

#' Return a new SparkDataFrame containing the union of rows, matched by column names
#'
#' Return a new SparkDataFrame containing the union of rows in this SparkDataFrame
Expand Down
95 changes: 10 additions & 85 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -37,37 +37,6 @@ getInternalType <- function(x) {
stop(paste("Unsupported type for SparkDataFrame:", class(x))))
}

#' Temporary function to reroute old S3 Method call to new
#' This function is specifically implemented to remove SQLContext from the parameter list.
#' It determines the target to route the call by checking the parent of this callsite (say 'func').
#' The target should be called 'func.default'.
#' We need to check the class of x to ensure it is SQLContext/HiveContext before dispatching.
#' @param newFuncSig name of the function the user should call instead in the deprecation message
#' @param x the first parameter of the original call
#' @param ... the rest of parameter to pass along
#' @return whatever the target returns
#' @noRd
dispatchFunc <- function(newFuncSig, x, ...) {
# When called with SparkR::createDataFrame, sys.call()[[1]] returns c(::, SparkR, createDataFrame)
callsite <- as.character(sys.call(sys.parent())[[1]])
funcName <- callsite[[length(callsite)]]
f <- get(paste0(funcName, ".default"))
# Strip sqlContext from list of parameters and then pass the rest along.
contextNames <- c("org.apache.spark.sql.SQLContext",
"org.apache.spark.sql.hive.HiveContext",
"org.apache.spark.sql.hive.test.TestHiveContext",
"org.apache.spark.sql.SparkSession")
if (missing(x) && length(list(...)) == 0) {
f()
} else if (class(x) == "jobj" &&
any(grepl(paste(contextNames, collapse = "|"), getClassName.jobj(x)))) {
.Deprecated(newFuncSig, old = paste0(funcName, "(sqlContext...)"))
f(...)
} else {
f(x, ...)
}
}

#' return the SparkSession
#' @noRd
getSparkSession <- function() {
Expand Down Expand Up @@ -198,11 +167,10 @@ getDefaultSqlSource <- function() {
#' df4 <- createDataFrame(cars, numPartitions = 2)
#' }
#' @name createDataFrame
#' @method createDataFrame default
#' @note createDataFrame since 1.4.0
# TODO(davies): support sampling and infer type from NA
createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0,
numPartitions = NULL) {
createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
numPartitions = NULL) {
sparkSession <- getSparkSession()

if (is.data.frame(data)) {
Expand Down Expand Up @@ -285,31 +253,18 @@ createDataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0,
dataFrame(sdf)
}

createDataFrame <- function(x, ...) {
dispatchFunc("createDataFrame(data, schema = NULL)", x, ...)
}

#' @rdname createDataFrame
#' @aliases createDataFrame
#' @method as.DataFrame default
#' @note as.DataFrame since 1.6.0
as.DataFrame.default <- function(data, schema = NULL, samplingRatio = 1.0, numPartitions = NULL) {
as.DataFrame <- function(data, schema = NULL, samplingRatio = 1.0, numPartitions = NULL) {
createDataFrame(data, schema, samplingRatio, numPartitions)
}

#' @param ... additional argument(s).
#' @rdname createDataFrame
#' @aliases as.DataFrame
as.DataFrame <- function(data, ...) {
dispatchFunc("as.DataFrame(data, schema = NULL)", data, ...)
}

#' toDF
#'
#' Converts an RDD to a SparkDataFrame by infer the types.
#'
#' @param x An RDD
#'
#' @rdname SparkDataFrame
#' @noRd
#' @examples
Expand Down Expand Up @@ -345,9 +300,8 @@ setMethod("toDF", signature(x = "RDD"),
#' df <- read.json(path, multiLine = TRUE)
#' }
#' @name read.json
#' @method read.json default
#' @note read.json since 1.6.0
read.json.default <- function(path, ...) {
read.json <- function(path, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
# Allow the user to have a more flexible definition of the text file path
Expand All @@ -358,10 +312,6 @@ read.json.default <- function(path, ...) {
dataFrame(sdf)
}

read.json <- function(x, ...) {
dispatchFunc("read.json(path)", x, ...)
}

#' Create a SparkDataFrame from an ORC file.
#'
#' Loads an ORC file, returning the result as a SparkDataFrame.
Expand All @@ -388,13 +338,12 @@ read.orc <- function(path, ...) {
#' Loads a Parquet file, returning the result as a SparkDataFrame.
#'
#' @param path path of file to read. A vector of multiple paths is allowed.
#' @param ... additional external data source specific named properties.
#' @param ... additional data source specific named properties.
#' @return SparkDataFrame
#' @rdname read.parquet
#' @name read.parquet
#' @method read.parquet default
#' @note read.parquet since 1.6.0
read.parquet.default <- function(path, ...) {
read.parquet <- function(path, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
# Allow the user to have a more flexible definition of the Parquet file path
Expand All @@ -405,10 +354,6 @@ read.parquet.default <- function(path, ...) {
dataFrame(sdf)
}

read.parquet <- function(x, ...) {
dispatchFunc("read.parquet(...)", x, ...)
}

#' Create a SparkDataFrame from a text file.
#'
#' Loads text files and returns a SparkDataFrame whose schema starts with
Expand All @@ -428,9 +373,8 @@ read.parquet <- function(x, ...) {
#' df <- read.text(path)
#' }
#' @name read.text
#' @method read.text default
#' @note read.text since 1.6.1
read.text.default <- function(path, ...) {
read.text <- function(path, ...) {
sparkSession <- getSparkSession()
options <- varargsToStrEnv(...)
# Allow the user to have a more flexible definition of the text file path
Expand All @@ -441,10 +385,6 @@ read.text.default <- function(path, ...) {
dataFrame(sdf)
}

read.text <- function(x, ...) {
dispatchFunc("read.text(path)", x, ...)
}

#' SQL Query
#'
#' Executes a SQL query using Spark, returning the result as a SparkDataFrame.
Expand All @@ -461,18 +401,13 @@ read.text <- function(x, ...) {
#' new_df <- sql("SELECT * FROM table")
#' }
#' @name sql
#' @method sql default
#' @note sql since 1.4.0
sql.default <- function(sqlQuery) {
sql <- function(sqlQuery) {
sparkSession <- getSparkSession()
sdf <- callJMethod(sparkSession, "sql", sqlQuery)
dataFrame(sdf)
}

sql <- function(x, ...) {
dispatchFunc("sql(sqlQuery)", x, ...)
}

#' Create a SparkDataFrame from a SparkSQL table or view
#'
#' Returns the specified table or view as a SparkDataFrame. The table or view must already exist or
Expand Down Expand Up @@ -531,9 +466,8 @@ tableToDF <- function(tableName) {
#' df4 <- read.df(mapTypeJsonPath, "json", stringSchema, multiLine = TRUE)
#' }
#' @name read.df
#' @method read.df default
#' @note read.df since 1.4.0
read.df.default <- function(path = NULL, source = NULL, schema = NULL, na.strings = "NA", ...) {
read.df <- function(path = NULL, source = NULL, schema = NULL, na.strings = "NA", ...) {
if (!is.null(path) && !is.character(path)) {
stop("path should be character, NULL or omitted.")
}
Expand Down Expand Up @@ -568,22 +502,13 @@ read.df.default <- function(path = NULL, source = NULL, schema = NULL, na.string
dataFrame(sdf)
}

read.df <- function(x = NULL, ...) {
dispatchFunc("read.df(path = NULL, source = NULL, schema = NULL, ...)", x, ...)
}

#' @rdname read.df
#' @name loadDF
#' @method loadDF default
#' @note loadDF since 1.6.0
loadDF.default <- function(path = NULL, source = NULL, schema = NULL, ...) {
loadDF <- function(path = NULL, source = NULL, schema = NULL, ...) {
read.df(path, source, schema, ...)
}

loadDF <- function(x = NULL, ...) {
dispatchFunc("loadDF(path = NULL, source = NULL, schema = NULL, ...)", x, ...)
}

#' Create a SparkDataFrame representing the database table accessible via JDBC URL
#'
#' Additional JDBC database connection properties can be set (...)
Expand Down
Loading