Skip to content

Commit e0cebf4

Browse files
committed
Merge remote-tracking branch 'origin/master' into json-encoding-line-sep
2 parents a7be182 + 3fd297a commit e0cebf4

File tree

216 files changed

+8348
-1730
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

216 files changed

+8348
-1730
lines changed

R/pkg/NAMESPACE

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,9 @@ exportMethods("%<=>%",
201201
"approxCountDistinct",
202202
"approxQuantile",
203203
"array_contains",
204+
"array_max",
205+
"array_min",
206+
"array_position",
204207
"asc",
205208
"ascii",
206209
"asin",
@@ -245,6 +248,7 @@ exportMethods("%<=>%",
245248
"decode",
246249
"dense_rank",
247250
"desc",
251+
"element_at",
248252
"encode",
249253
"endsWith",
250254
"exp",

R/pkg/R/functions.R

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,11 @@ NULL
189189
#' the map or array of maps.
190190
#' \item \code{from_json}: it is the column containing the JSON string.
191191
#' }
192+
#' @param value A value to compute on.
193+
#' \itemize{
194+
#' \item \code{array_contains}: a value to be checked if contained in the column.
195+
#' \item \code{array_position}: a value to locate in the given array.
196+
#' }
192197
#' @param ... additional argument(s). In \code{to_json} and \code{from_json}, this contains
193198
#' additional named properties to control how it is converted, accepts the same
194199
#' options as the JSON data source.
@@ -201,14 +206,17 @@ NULL
201206
#' df <- createDataFrame(cbind(model = rownames(mtcars), mtcars))
202207
#' tmp <- mutate(df, v1 = create_array(df$mpg, df$cyl, df$hp))
203208
#' head(select(tmp, array_contains(tmp$v1, 21), size(tmp$v1)))
209+
#' head(select(tmp, array_max(tmp$v1), array_min(tmp$v1)))
210+
#' head(select(tmp, array_position(tmp$v1, 21)))
204211
#' tmp2 <- mutate(tmp, v2 = explode(tmp$v1))
205212
#' head(tmp2)
206213
#' head(select(tmp, posexplode(tmp$v1)))
207214
#' head(select(tmp, sort_array(tmp$v1)))
208215
#' head(select(tmp, sort_array(tmp$v1, asc = FALSE)))
209216
#' tmp3 <- mutate(df, v3 = create_map(df$model, df$cyl))
210217
#' head(select(tmp3, map_keys(tmp3$v3)))
211-
#' head(select(tmp3, map_values(tmp3$v3)))}
218+
#' head(select(tmp3, map_values(tmp3$v3)))
219+
#' head(select(tmp3, element_at(tmp3$v3, "Valiant")))}
212220
NULL
213221

214222
#' Window functions for Column operations
@@ -2975,7 +2983,6 @@ setMethod("row_number",
29752983
#' \code{array_contains}: Returns null if the array is null, true if the array contains
29762984
#' the value, and false otherwise.
29772985
#'
2978-
#' @param value a value to be checked if contained in the column
29792986
#' @rdname column_collection_functions
29802987
#' @aliases array_contains array_contains,Column-method
29812988
#' @note array_contains since 1.6.0
@@ -2986,6 +2993,48 @@ setMethod("array_contains",
29862993
column(jc)
29872994
})
29882995

2996+
#' @details
2997+
#' \code{array_max}: Returns the maximum value of the array.
2998+
#'
2999+
#' @rdname column_collection_functions
3000+
#' @aliases array_max array_max,Column-method
3001+
#' @note array_max since 2.4.0
3002+
setMethod("array_max",
3003+
signature(x = "Column"),
3004+
function(x) {
3005+
jc <- callJStatic("org.apache.spark.sql.functions", "array_max", x@jc)
3006+
column(jc)
3007+
})
3008+
3009+
#' @details
3010+
#' \code{array_min}: Returns the minimum value of the array.
3011+
#'
3012+
#' @rdname column_collection_functions
3013+
#' @aliases array_min array_min,Column-method
3014+
#' @note array_min since 2.4.0
3015+
setMethod("array_min",
3016+
signature(x = "Column"),
3017+
function(x) {
3018+
jc <- callJStatic("org.apache.spark.sql.functions", "array_min", x@jc)
3019+
column(jc)
3020+
})
3021+
3022+
#' @details
3023+
#' \code{array_position}: Locates the position of the first occurrence of the given value
3024+
#' in the given array. Returns NA if either of the arguments are NA.
3025+
#' Note: The position is not zero based, but 1 based index. Returns 0 if the given
3026+
#' value could not be found in the array.
3027+
#'
3028+
#' @rdname column_collection_functions
3029+
#' @aliases array_position array_position,Column-method
3030+
#' @note array_position since 2.4.0
3031+
setMethod("array_position",
3032+
signature(x = "Column", value = "ANY"),
3033+
function(x, value) {
3034+
jc <- callJStatic("org.apache.spark.sql.functions", "array_position", x@jc, value)
3035+
column(jc)
3036+
})
3037+
29893038
#' @details
29903039
#' \code{map_keys}: Returns an unordered array containing the keys of the map.
29913040
#'
@@ -3012,6 +3061,22 @@ setMethod("map_values",
30123061
column(jc)
30133062
})
30143063

3064+
#' @details
3065+
#' \code{element_at}: Returns element of array at given index in \code{extraction} if
3066+
#' \code{x} is array. Returns value for the given key in \code{extraction} if \code{x} is map.
3067+
#' Note: The position is not zero based, but 1 based index.
3068+
#'
3069+
#' @param extraction index to check for in array or key to check for in map
3070+
#' @rdname column_collection_functions
3071+
#' @aliases element_at element_at,Column-method
3072+
#' @note element_at since 2.4.0
3073+
setMethod("element_at",
3074+
signature(x = "Column", extraction = "ANY"),
3075+
function(x, extraction) {
3076+
jc <- callJStatic("org.apache.spark.sql.functions", "element_at", x@jc, extraction)
3077+
column(jc)
3078+
})
3079+
30153080
#' @details
30163081
#' \code{explode}: Creates a new row for each element in the given array or map column.
30173082
#'

R/pkg/R/generics.R

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,18 @@ setGeneric("approxCountDistinct", function(x, ...) { standardGeneric("approxCoun
757757
#' @name NULL
758758
setGeneric("array_contains", function(x, value) { standardGeneric("array_contains") })
759759

760+
#' @rdname column_collection_functions
761+
#' @name NULL
762+
setGeneric("array_max", function(x) { standardGeneric("array_max") })
763+
764+
#' @rdname column_collection_functions
765+
#' @name NULL
766+
setGeneric("array_min", function(x) { standardGeneric("array_min") })
767+
768+
#' @rdname column_collection_functions
769+
#' @name NULL
770+
setGeneric("array_position", function(x, value) { standardGeneric("array_position") })
771+
760772
#' @rdname column_string_functions
761773
#' @name NULL
762774
setGeneric("ascii", function(x) { standardGeneric("ascii") })
@@ -886,6 +898,10 @@ setGeneric("decode", function(x, charset) { standardGeneric("decode") })
886898
#' @name NULL
887899
setGeneric("dense_rank", function(x = "missing") { standardGeneric("dense_rank") })
888900

901+
#' @rdname column_collection_functions
902+
#' @name NULL
903+
setGeneric("element_at", function(x, extraction) { standardGeneric("element_at") })
904+
889905
#' @rdname column_string_functions
890906
#' @name NULL
891907
setGeneric("encode", function(x, charset) { standardGeneric("encode") })

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1479,24 +1479,40 @@ test_that("column functions", {
14791479
df5 <- createDataFrame(list(list(a = "010101")))
14801480
expect_equal(collect(select(df5, conv(df5$a, 2, 16)))[1, 1], "15")
14811481

1482-
# Test array_contains() and sort_array()
1482+
# Test array_contains(), array_max(), array_min(), array_position(), element_at()
1483+
# and sort_array()
14831484
df <- createDataFrame(list(list(list(1L, 2L, 3L)), list(list(6L, 5L, 4L))))
14841485
result <- collect(select(df, array_contains(df[[1]], 1L)))[[1]]
14851486
expect_equal(result, c(TRUE, FALSE))
14861487

1488+
result <- collect(select(df, array_max(df[[1]])))[[1]]
1489+
expect_equal(result, c(3, 6))
1490+
1491+
result <- collect(select(df, array_min(df[[1]])))[[1]]
1492+
expect_equal(result, c(1, 4))
1493+
1494+
result <- collect(select(df, array_position(df[[1]], 1L)))[[1]]
1495+
expect_equal(result, c(1, 0))
1496+
1497+
result <- collect(select(df, element_at(df[[1]], 1L)))[[1]]
1498+
expect_equal(result, c(1, 6))
1499+
14871500
result <- collect(select(df, sort_array(df[[1]], FALSE)))[[1]]
14881501
expect_equal(result, list(list(3L, 2L, 1L), list(6L, 5L, 4L)))
14891502
result <- collect(select(df, sort_array(df[[1]])))[[1]]
14901503
expect_equal(result, list(list(1L, 2L, 3L), list(4L, 5L, 6L)))
14911504

1492-
# Test map_keys() and map_values()
1505+
# Test map_keys(), map_values() and element_at()
14931506
df <- createDataFrame(list(list(map = as.environment(list(x = 1, y = 2)))))
14941507
result <- collect(select(df, map_keys(df$map)))[[1]]
14951508
expect_equal(result, list(list("x", "y")))
14961509

14971510
result <- collect(select(df, map_values(df$map)))[[1]]
14981511
expect_equal(result, list(list(1, 2)))
14991512

1513+
result <- collect(select(df, element_at(df$map, "y")))[[1]]
1514+
expect_equal(result, 2)
1515+
15001516
# Test that stats::lag is working
15011517
expect_equal(length(lag(ldeaths, 12)), 72)
15021518

assembly/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,14 @@
254254
<artifactId>spark-hadoop-cloud_${scala.binary.version}</artifactId>
255255
<version>${project.version}</version>
256256
</dependency>
257+
<!--
258+
Redeclare this dependency to force it into the distribution.
259+
-->
260+
<dependency>
261+
<groupId>org.eclipse.jetty</groupId>
262+
<artifactId>jetty-util</artifactId>
263+
<scope>${hadoop.deps.scope}</scope>
264+
</dependency>
257265
</dependencies>
258266
</profile>
259267
</profiles>

common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.netty.channel.ChannelOption;
3333
import io.netty.channel.EventLoopGroup;
3434
import io.netty.channel.socket.SocketChannel;
35+
import org.apache.commons.lang3.SystemUtils;
3536
import org.slf4j.Logger;
3637
import org.slf4j.LoggerFactory;
3738

@@ -98,6 +99,7 @@ private void init(String hostToBind, int portToBind) {
9899
.group(bossGroup, workerGroup)
99100
.channel(NettyUtils.getServerChannelClass(ioMode))
100101
.option(ChannelOption.ALLOCATOR, allocator)
102+
.option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS)
101103
.childOption(ChannelOption.ALLOCATOR, allocator);
102104

103105
this.metrics = new NettyMemoryMetrics(

common/unsafe/src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@ public static long nextPowerOf2(long num) {
3333
}
3434

3535
public static int roundNumberOfBytesToNearestWord(int numBytes) {
36-
int remainder = numBytes & 0x07; // This is equivalent to `numBytes % 8`
36+
return (int)roundNumberOfBytesToNearestWord((long)numBytes);
37+
}
38+
39+
public static long roundNumberOfBytesToNearestWord(long numBytes) {
40+
long remainder = numBytes & 0x07; // This is equivalent to `numBytes % 8`
3741
if (remainder == 0) {
3842
return numBytes;
3943
} else {

common/unsafe/src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ private void check(MemoryBlock memory, Object obj, long offset, int length) {
120120
} catch (Exception expected) {
121121
Assert.assertThat(expected.getMessage(), containsString("should not be larger than"));
122122
}
123+
124+
memory.setPageNumber(MemoryBlock.NO_PAGE_NUMBER);
123125
}
124126

125127
@Test
@@ -165,11 +167,13 @@ public void testOffHeapArrayMemoryBlock() {
165167
int length = 56;
166168

167169
check(memory, obj, offset, length);
170+
memoryAllocator.free(memory);
168171

169172
long address = Platform.allocateMemory(112);
170173
memory = new OffHeapMemoryBlock(address, length);
171174
obj = memory.getBaseObject();
172175
offset = memory.getBaseOffset();
173176
check(memory, obj, offset, length);
177+
Platform.freeMemory(address);
174178
}
175179
}

core/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,12 @@
9595
<groupId>org.apache.curator</groupId>
9696
<artifactId>curator-recipes</artifactId>
9797
</dependency>
98+
<!-- With curator 2.12 SBT/Ivy doesn't get ZK on the build classpath.
99+
Explicitly declaring it as a dependency fixes this. -->
100+
<dependency>
101+
<groupId>org.apache.zookeeper</groupId>
102+
<artifactId>zookeeper</artifactId>
103+
</dependency>
98104

99105
<!-- Jetty dependencies promoted to compile here so they are shaded
100106
and inlined into spark-core jar -->

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import scala.util.control.{ControlThrowable, NonFatal}
2626
import com.codahale.metrics.{Gauge, MetricRegistry}
2727

2828
import org.apache.spark.internal.Logging
29-
import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS}
29+
import org.apache.spark.internal.config._
3030
import org.apache.spark.metrics.source.Source
3131
import org.apache.spark.scheduler._
3232
import org.apache.spark.storage.BlockManagerMaster
@@ -69,6 +69,10 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
6969
* spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors
7070
* spark.dynamicAllocation.initialExecutors - Number of executors to start with
7171
*
72+
* spark.dynamicAllocation.executorAllocationRatio -
73+
* This is used to reduce the parallelism of the dynamic allocation that can waste
74+
* resources when tasks are small
75+
*
7276
* spark.dynamicAllocation.schedulerBacklogTimeout (M) -
7377
* If there are backlogged tasks for this duration, add new executors
7478
*
@@ -116,9 +120,12 @@ private[spark] class ExecutorAllocationManager(
116120
// TODO: The default value of 1 for spark.executor.cores works right now because dynamic
117121
// allocation is only supported for YARN and the default number of cores per executor in YARN is
118122
// 1, but it might need to be attained differently for different cluster managers
119-
private val tasksPerExecutor =
123+
private val tasksPerExecutorForFullParallelism =
120124
conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)
121125

126+
private val executorAllocationRatio =
127+
conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO)
128+
122129
validateSettings()
123130

124131
// Number of executors to add in the next round
@@ -209,8 +216,13 @@ private[spark] class ExecutorAllocationManager(
209216
throw new SparkException("Dynamic allocation of executors requires the external " +
210217
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
211218
}
212-
if (tasksPerExecutor == 0) {
213-
throw new SparkException("spark.executor.cores must not be less than spark.task.cpus.")
219+
if (tasksPerExecutorForFullParallelism == 0) {
220+
throw new SparkException("spark.executor.cores must not be < spark.task.cpus.")
221+
}
222+
223+
if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) {
224+
throw new SparkException(
225+
"spark.dynamicAllocation.executorAllocationRatio must be > 0 and <= 1.0")
214226
}
215227
}
216228

@@ -273,7 +285,9 @@ private[spark] class ExecutorAllocationManager(
273285
*/
274286
private def maxNumExecutorsNeeded(): Int = {
275287
val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
276-
(numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
288+
math.ceil(numRunningOrPendingTasks * executorAllocationRatio /
289+
tasksPerExecutorForFullParallelism)
290+
.toInt
277291
}
278292

279293
private def totalRunningTasks(): Int = synchronized {

0 commit comments

Comments
 (0)