Skip to content

Commit f0f75c2

Browse files
committed
Merge branch 'master' of https://github.com/apache/spark into text-plan-size
# Conflicts: # sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
2 parents a4be985 + a24e1a1 commit f0f75c2

File tree

299 files changed

+6968
-5407
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

299 files changed

+6968
-5407
lines changed

R/pkg/NAMESPACE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,8 @@ exportMethods("%<=>%",
351351
"row_number",
352352
"rpad",
353353
"rtrim",
354+
"schema_of_csv",
355+
"schema_of_json",
354356
"second",
355357
"sha1",
356358
"sha2",

R/pkg/R/DataFrame.R

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -766,7 +766,6 @@ setMethod("repartition",
766766
#' \item{2.} {Return a new SparkDataFrame range partitioned by the given column(s),
767767
#' using \code{spark.sql.shuffle.partitions} as number of partitions.}
768768
#'}
769-
#'
770769
#' At least one partition-by expression must be specified.
771770
#' When no explicit sort order is specified, "ascending nulls first" is assumed.
772771
#'
@@ -828,7 +827,6 @@ setMethod("repartitionByRange",
828827
#' toJSON
829828
#'
830829
#' Converts a SparkDataFrame into a SparkDataFrame of JSON string.
831-
#'
832830
#' Each row is turned into a JSON document with columns as different fields.
833831
#' The returned SparkDataFrame has a single character column with the name \code{value}
834832
#'
@@ -2732,13 +2730,25 @@ setMethod("union",
27322730
dataFrame(unioned)
27332731
})
27342732

2735-
#' Return a new SparkDataFrame containing the union of rows
2733+
#' Return a new SparkDataFrame containing the union of rows.
27362734
#'
2737-
#' This is an alias for `union`.
2735+
#' This is an alias for \code{union}.
27382736
#'
2739-
#' @rdname union
2740-
#' @name unionAll
2737+
#' @param x a SparkDataFrame.
2738+
#' @param y a SparkDataFrame.
2739+
#' @return A SparkDataFrame containing the result of the unionAll operation.
2740+
#' @family SparkDataFrame functions
27412741
#' @aliases unionAll,SparkDataFrame,SparkDataFrame-method
2742+
#' @rdname unionAll
2743+
#' @name unionAll
2744+
#' @seealso \link{union}
2745+
#' @examples
2746+
#'\dontrun{
2747+
#' sparkR.session()
2748+
#' df1 <- read.json(path)
2749+
#' df2 <- read.json(path2)
2750+
#' unionAllDF <- unionAll(df1, df2)
2751+
#' }
27422752
#' @note unionAll since 1.4.0
27432753
setMethod("unionAll",
27442754
signature(x = "SparkDataFrame", y = "SparkDataFrame"),

R/pkg/R/functions.R

Lines changed: 70 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -205,11 +205,18 @@ NULL
205205
#' also supported for the schema.
206206
#' \item \code{from_csv}: a DDL-formatted string
207207
#' }
208-
#' @param ... additional argument(s). In \code{to_json}, \code{to_csv} and \code{from_json},
209-
#' this contains additional named properties to control how it is converted, accepts
210-
#' the same options as the JSON/CSV data source. Additionally \code{to_json} supports
211-
#' the "pretty" option which enables pretty JSON generation. In \code{arrays_zip},
212-
#' this contains additional Columns of arrays to be merged.
208+
#' @param ... additional argument(s).
209+
#' \itemize{
210+
#' \item \code{to_json}, \code{from_json} and \code{schema_of_json}: this contains
211+
#' additional named properties to control how it is converted and accepts the
212+
#' same options as the JSON data source.
213+
#' \item \code{to_json}: it supports the "pretty" option which enables pretty
214+
#' JSON generation.
215+
#' \item \code{to_csv}, \code{from_csv} and \code{schema_of_csv}: this contains
216+
#' additional named properties to control how it is converted and accepts the
217+
#' same options as the CSV data source.
218+
#' \item \code{arrays_zip}, this contains additional Columns of arrays to be merged.
219+
#' }
213220
#' @name column_collection_functions
214221
#' @rdname column_collection_functions
215222
#' @family collection functions
@@ -1771,12 +1778,16 @@ setMethod("to_date",
17711778
#' df2 <- mutate(df2, people_json = to_json(df2$people))
17721779
#'
17731780
#' # Converts a map into a JSON object
1774-
#' df2 <- sql("SELECT map('name', 'Bob')) as people")
1781+
#' df2 <- sql("SELECT map('name', 'Bob') as people")
17751782
#' df2 <- mutate(df2, people_json = to_json(df2$people))
17761783
#'
17771784
#' # Converts an array of maps into a JSON array
17781785
#' df2 <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people")
1779-
#' df2 <- mutate(df2, people_json = to_json(df2$people))}
1786+
#' df2 <- mutate(df2, people_json = to_json(df2$people))
1787+
#'
1788+
#' # Converts a map into a pretty JSON object
1789+
#' df2 <- sql("SELECT map('name', 'Bob') as people")
1790+
#' df2 <- mutate(df2, people_json = to_json(df2$people, pretty = TRUE))}
17801791
#' @note to_json since 2.2.0
17811792
setMethod("to_json", signature(x = "Column"),
17821793
function(x, ...) {
@@ -2285,6 +2296,32 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType")
22852296
column(jc)
22862297
})
22872298

2299+
#' @details
2300+
#' \code{schema_of_json}: Parses a JSON string and infers its schema in DDL format.
2301+
#'
2302+
#' @rdname column_collection_functions
2303+
#' @aliases schema_of_json schema_of_json,characterOrColumn-method
2304+
#' @examples
2305+
#'
2306+
#' \dontrun{
2307+
#' json <- "{\"name\":\"Bob\"}"
2308+
#' df <- sql("SELECT * FROM range(1)")
2309+
#' head(select(df, schema_of_json(json)))}
2310+
#' @note schema_of_json since 3.0.0
2311+
setMethod("schema_of_json", signature(x = "characterOrColumn"),
2312+
function(x, ...) {
2313+
if (class(x) == "character") {
2314+
col <- callJStatic("org.apache.spark.sql.functions", "lit", x)
2315+
} else {
2316+
col <- x@jc
2317+
}
2318+
options <- varargsToStrEnv(...)
2319+
jc <- callJStatic("org.apache.spark.sql.functions",
2320+
"schema_of_json",
2321+
col, options)
2322+
column(jc)
2323+
})
2324+
22882325
#' @details
22892326
#' \code{from_csv}: Parses a column containing a CSV string into a Column of \code{structType}
22902327
#' with the specified \code{schema}.
@@ -2315,6 +2352,32 @@ setMethod("from_csv", signature(x = "Column", schema = "characterOrColumn"),
23152352
column(jc)
23162353
})
23172354

2355+
#' @details
2356+
#' \code{schema_of_csv}: Parses a CSV string and infers its schema in DDL format.
2357+
#'
2358+
#' @rdname column_collection_functions
2359+
#' @aliases schema_of_csv schema_of_csv,characterOrColumn-method
2360+
#' @examples
2361+
#'
2362+
#' \dontrun{
2363+
#' csv <- "Amsterdam,2018"
2364+
#' df <- sql("SELECT * FROM range(1)")
2365+
#' head(select(df, schema_of_csv(csv)))}
2366+
#' @note schema_of_csv since 3.0.0
2367+
setMethod("schema_of_csv", signature(x = "characterOrColumn"),
2368+
function(x, ...) {
2369+
if (class(x) == "character") {
2370+
col <- callJStatic("org.apache.spark.sql.functions", "lit", x)
2371+
} else {
2372+
col <- x@jc
2373+
}
2374+
options <- varargsToStrEnv(...)
2375+
jc <- callJStatic("org.apache.spark.sql.functions",
2376+
"schema_of_csv",
2377+
col, options)
2378+
column(jc)
2379+
})
2380+
23182381
#' @details
23192382
#' \code{from_utc_timestamp}: This is a common function for databases supporting TIMESTAMP WITHOUT
23202383
#' TIMEZONE. This function takes a timestamp which is timezone-agnostic, and interprets it as a

R/pkg/R/generics.R

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -631,7 +631,7 @@ setGeneric("toRDD", function(x) { standardGeneric("toRDD") })
631631
#' @rdname union
632632
setGeneric("union", function(x, y) { standardGeneric("union") })
633633

634-
#' @rdname union
634+
#' @rdname unionAll
635635
setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") })
636636

637637
#' @rdname unionByName
@@ -1206,6 +1206,14 @@ setGeneric("rpad", function(x, len, pad) { standardGeneric("rpad") })
12061206
#' @name NULL
12071207
setGeneric("rtrim", function(x, trimString) { standardGeneric("rtrim") })
12081208

1209+
#' @rdname column_collection_functions
1210+
#' @name NULL
1211+
setGeneric("schema_of_csv", function(x, ...) { standardGeneric("schema_of_csv") })
1212+
1213+
#' @rdname column_collection_functions
1214+
#' @name NULL
1215+
setGeneric("schema_of_json", function(x, ...) { standardGeneric("schema_of_json") })
1216+
12091217
#' @rdname column_aggregate_functions
12101218
#' @name NULL
12111219
setGeneric("sd", function(x, na.rm = FALSE) { standardGeneric("sd") })

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1620,14 +1620,20 @@ test_that("column functions", {
16201620
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][1], 2)
16211621
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][2], 4)
16221622

1623-
# Test from_csv()
1623+
# Test from_csv(), schema_of_csv()
16241624
df <- as.DataFrame(list(list("col" = "1")))
16251625
c <- collect(select(df, alias(from_csv(df$col, "a INT"), "csv")))
16261626
expect_equal(c[[1]][[1]]$a, 1)
16271627
c <- collect(select(df, alias(from_csv(df$col, lit("a INT")), "csv")))
16281628
expect_equal(c[[1]][[1]]$a, 1)
16291629

1630-
# Test to_json(), from_json()
1630+
df <- as.DataFrame(list(list("col" = "1")))
1631+
c <- collect(select(df, schema_of_csv("Amsterdam,2018")))
1632+
expect_equal(c[[1]], "struct<_c0:string,_c1:int>")
1633+
c <- collect(select(df, schema_of_csv(lit("Amsterdam,2018"))))
1634+
expect_equal(c[[1]], "struct<_c0:string,_c1:int>")
1635+
1636+
# Test to_json(), from_json(), schema_of_json()
16311637
df <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people")
16321638
j <- collect(select(df, alias(to_json(df$people), "json")))
16331639
expect_equal(j[order(j$json), ][1], "[{\"name\":\"Bob\"},{\"name\":\"Alice\"}]")
@@ -1654,6 +1660,12 @@ test_that("column functions", {
16541660
expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 })))
16551661
}
16561662

1663+
df <- as.DataFrame(list(list("col" = "1")))
1664+
c <- collect(select(df, schema_of_json('{"name":"Bob"}')))
1665+
expect_equal(c[[1]], "struct<name:string>")
1666+
c <- collect(select(df, schema_of_json(lit('{"name":"Bob"}'))))
1667+
expect_equal(c[[1]], "struct<name:string>")
1668+
16571669
# Test to_json() supports arrays of primitive types and arrays
16581670
df <- sql("SELECT array(19, 42, 70) as age")
16591671
j <- collect(select(df, alias(to_json(df$age), "json")))

bin/docker-image-tool.sh

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,12 @@ function build {
146146
fi
147147

148148
local BUILD_ARGS=(${BUILD_PARAMS})
149+
150+
# If a custom SPARK_UID was set add it to build arguments
151+
if [ -n "$SPARK_UID" ]; then
152+
BUILD_ARGS+=(--build-arg spark_uid=$SPARK_UID)
153+
fi
154+
149155
local BINDING_BUILD_ARGS=(
150156
${BUILD_PARAMS}
151157
--build-arg
@@ -207,8 +213,10 @@ Options:
207213
-t tag Tag to apply to the built image, or to identify the image to be pushed.
208214
-m Use minikube's Docker daemon.
209215
-n Build docker image with --no-cache
210-
-b arg Build arg to build or push the image. For multiple build args, this option needs to
211-
be used separately for each build arg.
216+
-u uid UID to use in the USER directive to set the user the main Spark process runs as inside the
217+
resulting container
218+
-b arg Build arg to build or push the image. For multiple build args, this option needs to
219+
be used separately for each build arg.
212220
213221
Using minikube when building images will do so directly into minikube's Docker daemon.
214222
There is no need to push the images into minikube in that case, they'll be automatically
@@ -243,7 +251,8 @@ PYDOCKERFILE=
243251
RDOCKERFILE=
244252
NOCACHEARG=
245253
BUILD_PARAMS=
246-
while getopts f:p:R:mr:t:nb: option
254+
SPARK_UID=
255+
while getopts f:p:R:mr:t:nb:u: option
247256
do
248257
case "${option}"
249258
in
@@ -263,6 +272,7 @@ do
263272
fi
264273
eval $(minikube docker-env)
265274
;;
275+
u) SPARK_UID=${OPTARG};;
266276
esac
267277
done
268278

common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,10 +165,14 @@ public void writeMinusZeroIsReplacedWithZero() {
165165
byte[] floatBytes = new byte[Float.BYTES];
166166
Platform.putDouble(doubleBytes, Platform.BYTE_ARRAY_OFFSET, -0.0d);
167167
Platform.putFloat(floatBytes, Platform.BYTE_ARRAY_OFFSET, -0.0f);
168-
double doubleFromPlatform = Platform.getDouble(doubleBytes, Platform.BYTE_ARRAY_OFFSET);
169-
float floatFromPlatform = Platform.getFloat(floatBytes, Platform.BYTE_ARRAY_OFFSET);
170168

171-
Assert.assertEquals(Double.doubleToLongBits(0.0d), Double.doubleToLongBits(doubleFromPlatform));
172-
Assert.assertEquals(Float.floatToIntBits(0.0f), Float.floatToIntBits(floatFromPlatform));
169+
byte[] doubleBytes2 = new byte[Double.BYTES];
170+
byte[] floatBytes2 = new byte[Float.BYTES];
171+
Platform.putDouble(doubleBytes, Platform.BYTE_ARRAY_OFFSET, 0.0d);
172+
Platform.putFloat(floatBytes, Platform.BYTE_ARRAY_OFFSET, 0.0f);
173+
174+
// Make sure the bytes we write from 0.0 and -0.0 are same.
175+
Assert.assertArrayEquals(doubleBytes, doubleBytes2);
176+
Assert.assertArrayEquals(floatBytes, floatBytes2);
173177
}
174178
}

core/pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,19 @@
408408
<scope>provided</scope>
409409
</dependency>
410410

411+
<!--
412+
The following kafka dependency used to obtain delegation token.
413+
In order to prevent spark-core from depending on kafka, these deps have been placed in the
414+
"provided" scope, rather than the "compile" scope, and NoClassDefFoundError exceptions are
415+
handled when the user explicitly use neither spark-streaming-kafka nor spark-sql-kafka modules.
416+
-->
417+
<dependency>
418+
<groupId>org.apache.kafka</groupId>
419+
<artifactId>kafka-clients</artifactId>
420+
<version>${kafka.version}</version>
421+
<scope>provided</scope>
422+
</dependency>
423+
411424
</dependencies>
412425
<build>
413426
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>

core/src/main/resources/org/apache/spark/ui/static/stagepage.js

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,10 @@ function createDataTableForTaskSummaryMetricsTable(taskSummaryMetricsTable) {
221221
"searching": false,
222222
"order": [[0, "asc"]],
223223
"bSort": false,
224-
"bAutoWidth": false
224+
"bAutoWidth": false,
225+
"oLanguage": {
226+
"sEmptyTable": "No tasks have reported metrics yet"
227+
}
225228
};
226229
taskSummaryMetricsDataTable = $(taskMetricsTable).DataTable(taskConf);
227230
}
@@ -426,7 +429,10 @@ $(document).ready(function () {
426429
}
427430
],
428431
"order": [[0, "asc"]],
429-
"bAutoWidth": false
432+
"bAutoWidth": false,
433+
"oLanguage": {
434+
"sEmptyTable": "No data to show yet"
435+
}
430436
}
431437
var executorSummaryTableSelector =
432438
$("#summary-executor-table").DataTable(executorSummaryConf);

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -660,6 +660,7 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial
660660
with Logging {
661661

662662
private var encryptionServer: PythonServer[Unit] = null
663+
private var decryptionServer: PythonServer[Unit] = null
663664

664665
/**
665666
* Read data from disks, then copy it to `out`
@@ -708,16 +709,36 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial
708709
override def handleConnection(sock: Socket): Unit = {
709710
val env = SparkEnv.get
710711
val in = sock.getInputStream()
711-
val dir = new File(Utils.getLocalDir(env.conf))
712-
val file = File.createTempFile("broadcast", "", dir)
713-
path = file.getAbsolutePath
714-
val out = env.serializerManager.wrapForEncryption(new FileOutputStream(path))
712+
val abspath = new File(path).getAbsolutePath
713+
val out = env.serializerManager.wrapForEncryption(new FileOutputStream(abspath))
715714
DechunkedInputStream.dechunkAndCopyToOutput(in, out)
716715
}
717716
}
718717
Array(encryptionServer.port, encryptionServer.secret)
719718
}
720719

720+
def setupDecryptionServer(): Array[Any] = {
721+
decryptionServer = new PythonServer[Unit]("broadcast-decrypt-server-for-driver") {
722+
override def handleConnection(sock: Socket): Unit = {
723+
val out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream()))
724+
Utils.tryWithSafeFinally {
725+
val in = SparkEnv.get.serializerManager.wrapForEncryption(new FileInputStream(path))
726+
Utils.tryWithSafeFinally {
727+
Utils.copyStream(in, out, false)
728+
} {
729+
in.close()
730+
}
731+
out.flush()
732+
} {
733+
JavaUtils.closeQuietly(out)
734+
}
735+
}
736+
}
737+
Array(decryptionServer.port, decryptionServer.secret)
738+
}
739+
740+
def waitTillBroadcastDataSent(): Unit = decryptionServer.getResult()
741+
721742
def waitTillDataReceived(): Unit = encryptionServer.getResult()
722743
}
723744
// scalastyle:on no.finalize

0 commit comments

Comments
 (0)