Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ export("as.DataFrame",
"read.parquet",
"read.text",
"sql",
"str",
"table",
"tableNames",
"tables",
Expand Down
73 changes: 73 additions & 0 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -2300,3 +2300,76 @@ setMethod("with",
newEnv <- assignNewEnv(data)
eval(substitute(expr), envir = newEnv, enclos = newEnv)
})

#' Display the structure of a DataFrame, including column names, column types, as well as a
#' a small sample of rows.
#' @name str
#' @title Compactly display the structure of a dataset
#' @rdname str
#' @family DataFrame functions
#' @param object a DataFrame
#' @examples \dontrun{
#' # Create a DataFrame from the Iris dataset
#' irisDF <- createDataFrame(sqlContext, iris)
#'
#' # Show the structure of the DataFrame
#' str(irisDF)
#' }
setMethod("str",
signature(object = "DataFrame"),
function(object) {

# TODO: These could be made global parameters, though in R it's not the case
MAX_CHAR_PER_ROW <- 120
MAX_COLS <- 100

# Get the column names and types of the DataFrame
names <- names(object)
types <- coltypes(object)

# Get the first elements of the dataset. Limit number of columns accordingly
localDF <- if (ncol(object) > MAX_COLS) {
head(object[, c(1:MAX_COLS)])
} else {
head(object)
}

# The number of observations will not be displayed as computing the
# number of rows is a very expensive operation
cat(paste0("'", class(object), "': ", length(names), " variables:\n"))

if (nrow(localDF) > 0) {
for (i in 1 : ncol(localDF)) {
# Get the first elements for each column

firstElements <- if (types[i] == "character") {
paste(paste0("\"", localDF[,i], "\""), collapse = " ")
} else {
paste(localDF[,i], collapse = " ")
}

# Add the corresponding number of spaces for alignment
spaces <- paste(rep(" ", max(nchar(names) - nchar(names[i]))), collapse="")

# Get the short type. For 'character', it would be 'chr';
# 'for numeric', it's 'num', etc.
dataType <- SHORT_TYPES[[types[i]]]
if (is.null(dataType)) {
dataType <- substring(types[i], 1, 3)
}

# Concatenate the colnames, coltypes, and first
# elements of each column
line <- paste0(" $ ", names[i], spaces, ": ",
dataType, " ",firstElements)

# Chop off extra characters if this is too long
cat(substr(line, 1, MAX_CHAR_PER_ROW))
cat("\n")
}

if (ncol(localDF) < ncol(object)) {
cat(paste0("\nDisplaying first ", ncol(localDF), " columns only."))
}
}
})
36 changes: 18 additions & 18 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,6 @@ setGeneric("subtractByKey",
setGeneric("value", function(bcast) { standardGeneric("value") })



#################### DataFrame Methods ########################

#' @rdname agg
Expand All @@ -389,6 +388,14 @@ setGeneric("agg", function (x, ...) { standardGeneric("agg") })
#' @export
setGeneric("arrange", function(x, col, ...) { standardGeneric("arrange") })

#' @rdname as.data.frame
#' @export
setGeneric("as.data.frame")

#' @rdname attach
#' @export
setGeneric("attach")

#' @rdname columns
#' @export
setGeneric("colnames", function(x, do.NULL = TRUE, prefix = "col") { standardGeneric("colnames") })
Expand Down Expand Up @@ -525,13 +532,12 @@ setGeneric("saveAsTable", function(df, tableName, source, mode, ...) {
standardGeneric("saveAsTable")
})

#' @rdname withColumn
#' @export
setGeneric("transform", function(`_data`, ...) {standardGeneric("transform") })
setGeneric("str")

#' @rdname write.df
#' @rdname mutate
#' @export
setGeneric("write.df", function(df, path, ...) { standardGeneric("write.df") })
setGeneric("transform", function(`_data`, ...) {standardGeneric("transform") })

#' @rdname write.df
#' @export
Expand Down Expand Up @@ -593,6 +599,10 @@ setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") })
#' @export
setGeneric("where", function(x, condition) { standardGeneric("where") })

#' @rdname with
#' @export
setGeneric("with")

#' @rdname withColumn
#' @export
setGeneric("withColumn", function(x, colName, col) { standardGeneric("withColumn") })
Expand All @@ -602,6 +612,9 @@ setGeneric("withColumn", function(x, colName, col) { standardGeneric("withColumn
setGeneric("withColumnRenamed",
function(x, existingCol, newCol) { standardGeneric("withColumnRenamed") })

#' @rdname write.df
#' @export
setGeneric("write.df", function(df, path, ...) { standardGeneric("write.df") })

###################### Column Methods ##########################

Expand Down Expand Up @@ -1105,7 +1118,6 @@ setGeneric("weekofyear", function(x) { standardGeneric("weekofyear") })
#' @export
setGeneric("year", function(x) { standardGeneric("year") })


#' @rdname glm
#' @export
setGeneric("glm")
Expand All @@ -1117,15 +1129,3 @@ setGeneric("predict", function(object, ...) { standardGeneric("predict") })
#' @rdname rbind
#' @export
setGeneric("rbind", signature = "...")

#' @rdname as.data.frame
#' @export
setGeneric("as.data.frame")

#' @rdname attach
#' @export
setGeneric("attach")

#' @rdname with
#' @export
setGeneric("with")
21 changes: 17 additions & 4 deletions R/pkg/R/types.R
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,23 @@ COMPLEX_TYPES <- list(
# The full list of data types.
DATA_TYPES <- as.environment(c(as.list(PRIMITIVE_TYPES), COMPLEX_TYPES))

SHORT_TYPES <- as.environment(list(
"character" = "chr",
"logical" = "logi",
"POSIXct" = "POSIXct",
"integer" = "int",
"numeric" = "num",
"raw" = "raw",
"Date" = "Date",
"map" = "map",
"array" = "array",
"struct" = "struct"
))

# An environment for mapping R to Scala, names are R types and values are Scala types.
rToSQLTypes <- as.environment(list(
"integer" = "integer", # in R, integer is 32bit
"numeric" = "double", # in R, numeric == double which is 64bit
"double" = "double",
"integer" = "integer", # in R, integer is 32bit
"numeric" = "double", # in R, numeric == double which is 64bit
"double" = "double",
"character" = "string",
"logical" = "boolean"))
"logical" = "boolean"))
31 changes: 31 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1795,6 +1795,37 @@ test_that("Method coltypes() to get and set R's data types of a DataFrame", {
"Only atomic type is supported for column types")
})

test_that("Method str()", {
# Structure of Iris
iris2 <- iris
colnames(iris2) <- c("Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Species")
iris2$col <- TRUE
irisDF2 <- createDataFrame(sqlContext, iris2)

out <- capture.output(str(irisDF2))
expect_equal(length(out), 7)
expect_equal(out[1], "'DataFrame': 6 variables:")
expect_equal(out[2], " $ Sepal_Length: num 5.1 4.9 4.7 4.6 5 5.4")
expect_equal(out[3], " $ Sepal_Width : num 3.5 3 3.2 3.1 3.6 3.9")
expect_equal(out[4], " $ Petal_Length: num 1.4 1.4 1.3 1.5 1.4 1.7")
expect_equal(out[5], " $ Petal_Width : num 0.2 0.2 0.2 0.2 0.2 0.4")
expect_equal(out[6], paste0(" $ Species : chr \"setosa\" \"setosa\" \"",
"setosa\" \"setosa\" \"setosa\" \"setosa\""))
expect_equal(out[7], " $ col : logi TRUE TRUE TRUE TRUE TRUE TRUE")

# A random dataset with many columns. This test is to check str limits
# the number of columns. Therefore, it will suffice to check for the
# number of returned rows
x <- runif(200, 1, 10)
df <- data.frame(t(as.matrix(data.frame(x,x,x,x,x,x,x,x,x))))
DF <- createDataFrame(sqlContext, df)
out <- capture.output(str(DF))
expect_equal(length(out), 103)

# Test utils:::str
expect_equal(capture.output(utils:::str(iris)), capture.output(str(iris)))
})

unlink(parquetPath)
unlink(jsonPath)
unlink(jsonPathNa)
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,10 @@ public UnsafeInMemorySorter(
* Free the memory used by pointer array.
*/
public void free() {
consumer.freeArray(array);
array = null;
if (consumer != null) {
consumer.freeArray(array);
array = null;
}
}

public void reset() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ private[v1] class ExecutorListResource(ui: SparkUI) {
@GET
def executorList(): Seq[ExecutorSummary] = {
val listener = ui.executorsListener
val storageStatusList = listener.storageStatusList
(0 until storageStatusList.size).map { statusId =>
ExecutorsPage.getExecInfo(listener, statusId)
listener.synchronized {
// The follow codes should be protected by `listener` to make sure no executors will be
// removed before we query their status. See SPARK-12784.
val storageStatusList = listener.storageStatusList
(0 until storageStatusList.size).map { statusId =>
ExecutorsPage.getExecInfo(listener, statusId)
}
}
}
}
16 changes: 16 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.ui

import java.net.URLDecoder
import java.text.SimpleDateFormat
import java.util.{Date, Locale}

Expand Down Expand Up @@ -451,4 +452,19 @@ private[spark] object UIUtils extends Logging {
<span class="description-input">{desc}</span>
}
}

/**
* Decode URLParameter if URL is encoded by YARN-WebAppProxyServlet.
* Due to YARN-2844: WebAppProxyServlet cannot handle urls which contain encoded characters
* Therefore we need to decode it until we get the real URLParameter.
*/
def decodeURLParameter(urlParam: String): String = {
var param = urlParam
var decodedParam = URLDecoder.decode(param, "UTF-8")
while (param != decodedParam) {
param = decodedParam
decodedParam = URLDecoder.decode(param, "UTF-8")
}
param
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.ui.exec

import java.net.URLDecoder
import javax.servlet.http.HttpServletRequest

import scala.util.Try
Expand All @@ -30,18 +29,8 @@ private[ui] class ExecutorThreadDumpPage(parent: ExecutorsTab) extends WebUIPage
private val sc = parent.sc

def render(request: HttpServletRequest): Seq[Node] = {
val executorId = Option(request.getParameter("executorId")).map {
executorId =>
// Due to YARN-2844, "<driver>" in the url will be encoded to "%25253Cdriver%25253E" when
// running in yarn-cluster mode. `request.getParameter("executorId")` will return
// "%253Cdriver%253E". Therefore we need to decode it until we get the real id.
var id = executorId
var decodedId = URLDecoder.decode(id, "UTF-8")
while (id != decodedId) {
id = decodedId
decodedId = URLDecoder.decode(id, "UTF-8")
}
id
val executorId = Option(request.getParameter("executorId")).map { executorId =>
UIUtils.decodeURLParameter(executorId)
}.getOrElse {
throw new IllegalArgumentException(s"Missing executorId parameter")
}
Expand Down
13 changes: 10 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,19 @@ private[ui] class ExecutorsPage(
private val listener = parent.listener

def render(request: HttpServletRequest): Seq[Node] = {
val storageStatusList = listener.storageStatusList
val (storageStatusList, execInfo) = listener.synchronized {
// The follow codes should be protected by `listener` to make sure no executors will be
// removed before we query their status. See SPARK-12784.
val _storageStatusList = listener.storageStatusList
val _execInfo = {
for (statusId <- 0 until _storageStatusList.size)
yield ExecutorsPage.getExecInfo(listener, statusId)
}
(_storageStatusList, _execInfo)
}
val maxMem = storageStatusList.map(_.maxMem).sum
val memUsed = storageStatusList.map(_.memUsed).sum
val diskUsed = storageStatusList.map(_.diskUsed).sum
val execInfo = for (statusId <- 0 until storageStatusList.size) yield
ExecutorsPage.getExecInfo(listener, statusId)
val execInfoSorted = execInfo.sortBy(_.id)
val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty

Expand Down
11 changes: 8 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") {

def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val poolName = request.getParameter("poolname")
require(poolName != null && poolName.nonEmpty, "Missing poolname parameter")
val poolName = Option(request.getParameter("poolname")).map { poolname =>
UIUtils.decodeURLParameter(poolname)
}.getOrElse {
throw new IllegalArgumentException(s"Missing poolname parameter")
}

val poolToActiveStages = listener.poolToActiveStages
val activeStages = poolToActiveStages.get(poolName) match {
Expand All @@ -44,7 +47,9 @@ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") {
killEnabled = parent.killEnabled)

// For now, pool information is only accessible in live UIs
val pools = sc.map(_.getPoolForName(poolName).get).toSeq
val pools = sc.map(_.getPoolForName(poolName).getOrElse {
throw new IllegalArgumentException(s"Unknown poolname: $poolName")
}).toSeq
val poolTable = new PoolTable(pools, parent)

val content =
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.ui.jobs

import java.net.URLEncoder

import scala.collection.mutable.HashMap
import scala.xml.Node

Expand Down Expand Up @@ -59,7 +61,7 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: StagesTab) {
case None => 0
}
val href = "%s/stages/pool?poolname=%s"
.format(UIUtils.prependBaseUri(parent.basePath), p.name)
.format(UIUtils.prependBaseUri(parent.basePath), URLEncoder.encode(p.name, "UTF-8"))
<tr>
<td>
<a href={href}>{p.name}</a>
Expand Down
Loading