Skip to content

Commit 1d725ad

Browse files
committed
Merge remote-tracking branch 'origin/master' into modularize-jdbc-internals
2 parents 43cbef6 + d314677 commit 1d725ad

File tree

61 files changed

+1843
-606
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+1843
-606
lines changed

R/pkg/R/mllib.R

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,11 @@ predict_internal <- function(object, newData) {
138138
#' This can be a character string naming a family function, a family function or
139139
#' the result of a call to a family function. Refer R family at
140140
#' \url{https://stat.ethz.ch/R-manual/R-devel/library/stats/html/family.html}.
141-
#' @param weightCol the weight column name. If this is not set or \code{NULL}, we treat all instance
142-
#' weights as 1.0.
143141
#' @param tol positive convergence tolerance of iterations.
144142
#' @param maxIter integer giving the maximal number of IRLS iterations.
143+
#' @param weightCol the weight column name. If this is not set or \code{NULL}, we treat all instance
144+
#' weights as 1.0.
145+
#' @param regParam regularization parameter for L2 regularization.
145146
#' @param ... additional arguments passed to the method.
146147
#' @aliases spark.glm,SparkDataFrame,formula-method
147148
#' @return \code{spark.glm} returns a fitted generalized linear model
@@ -171,7 +172,8 @@ predict_internal <- function(object, newData) {
171172
#' @note spark.glm since 2.0.0
172173
#' @seealso \link{glm}, \link{read.ml}
173174
setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"),
174-
function(data, formula, family = gaussian, tol = 1e-6, maxIter = 25, weightCol = NULL) {
175+
function(data, formula, family = gaussian, tol = 1e-6, maxIter = 25, weightCol = NULL,
176+
regParam = 0.0) {
175177
if (is.character(family)) {
176178
family <- get(family, mode = "function", envir = parent.frame())
177179
}
@@ -190,7 +192,7 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"),
190192

191193
jobj <- callJStatic("org.apache.spark.ml.r.GeneralizedLinearRegressionWrapper",
192194
"fit", formula, data@sdf, family$family, family$link,
193-
tol, as.integer(maxIter), as.character(weightCol))
195+
tol, as.integer(maxIter), as.character(weightCol), regParam)
194196
new("GeneralizedLinearRegressionModel", jobj = jobj)
195197
})
196198

R/pkg/R/window.R

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
#'
2222
#' Creates a WindowSpec with the partitioning defined.
2323
#'
24-
#' @param col A column name or Column by which rows are partitioned to
24+
#' @param col A column name or Column by which rows are partitioned to
2525
#' windows.
26-
#' @param ... Optional column names or Columns in addition to col, by
26+
#' @param ... Optional column names or Columns in addition to col, by
2727
#' which rows are partitioned to windows.
2828
#'
2929
#' @rdname windowPartitionBy
@@ -32,10 +32,10 @@
3232
#' @export
3333
#' @examples
3434
#' \dontrun{
35-
#' ws <- windowPartitionBy("key1", "key2")
35+
#' ws <- orderBy(windowPartitionBy("key1", "key2"), "key3")
3636
#' df1 <- select(df, over(lead("value", 1), ws))
3737
#'
38-
#' ws <- windowPartitionBy(df$key1, df$key2)
38+
#' ws <- orderBy(windowPartitionBy(df$key1, df$key2), df$key3)
3939
#' df1 <- select(df, over(lead("value", 1), ws))
4040
#' }
4141
#' @note windowPartitionBy(character) since 2.0.0
@@ -70,9 +70,9 @@ setMethod("windowPartitionBy",
7070
#'
7171
#' Creates a WindowSpec with the ordering defined.
7272
#'
73-
#' @param col A column name or Column by which rows are ordered within
73+
#' @param col A column name or Column by which rows are ordered within
7474
#' windows.
75-
#' @param ... Optional column names or Columns in addition to col, by
75+
#' @param ... Optional column names or Columns in addition to col, by
7676
#' which rows are ordered within windows.
7777
#'
7878
#' @rdname windowOrderBy

R/pkg/inst/tests/testthat/test_mllib.R

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,12 @@ test_that("spark.glm summary", {
148148
baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris)
149149
baseSummary <- summary(baseModel)
150150
expect_true(abs(baseSummary$deviance - 12.19313) < 1e-4)
151+
152+
# Test spark.glm works with regularization parameter
153+
data <- as.data.frame(cbind(a1, a2, b))
154+
df <- suppressWarnings(createDataFrame(data))
155+
regStats <- summary(spark.glm(df, b ~ a1 + a2, regParam = 1.0))
156+
expect_equal(regStats$aic, 13.32836, tolerance = 1e-4) # 13.32836 is from summary() result
151157
})
152158

153159
test_that("spark.glm save/load", {

common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,7 @@ public UTF8String trim() {
470470
while (e >= 0 && getByte(e) <= 0x20 && getByte(e) >= 0x00) e--;
471471
if (s > e) {
472472
// empty string
473-
return UTF8String.fromBytes(new byte[0]);
473+
return EMPTY_UTF8;
474474
} else {
475475
return copyUTF8String(s, e);
476476
}
@@ -482,7 +482,7 @@ public UTF8String trimLeft() {
482482
while (s < this.numBytes && getByte(s) <= 0x20 && getByte(s) >= 0x00) s++;
483483
if (s == this.numBytes) {
484484
// empty string
485-
return UTF8String.fromBytes(new byte[0]);
485+
return EMPTY_UTF8;
486486
} else {
487487
return copyUTF8String(s, this.numBytes - 1);
488488
}
@@ -495,7 +495,7 @@ public UTF8String trimRight() {
495495

496496
if (e < 0) {
497497
// empty string
498-
return UTF8String.fromBytes(new byte[0]);
498+
return EMPTY_UTF8;
499499
} else {
500500
return copyUTF8String(0, e);
501501
}
@@ -761,7 +761,7 @@ public static UTF8String concatWs(UTF8String separator, UTF8String... inputs) {
761761

762762
if (numInputs == 0) {
763763
// Return an empty string if there is no input, or all the inputs are null.
764-
return fromBytes(new byte[0]);
764+
return EMPTY_UTF8;
765765
}
766766

767767
// Allocate a new byte array, and copy the inputs one by one into it.

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ private[spark] class ExecutorAllocationManager(
230230
}
231231
}
232232
}
233-
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
233+
executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
234234

235235
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
236236
}

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf,
383383

384384
/** Register multiple map output information for the given shuffle */
385385
def registerMapOutputs(shuffleId: Int, statuses: Array[MapStatus], changeEpoch: Boolean = false) {
386-
mapStatuses.put(shuffleId, Array[MapStatus]() ++ statuses)
386+
mapStatuses.put(shuffleId, statuses.clone())
387387
if (changeEpoch) {
388388
incrementEpoch()
389389
}
@@ -535,7 +535,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf,
535535
true
536536
case None =>
537537
logDebug("cached status not found for : " + shuffleId)
538-
statuses = mapStatuses.getOrElse(shuffleId, Array[MapStatus]())
538+
statuses = mapStatuses.getOrElse(shuffleId, Array.empty[MapStatus])
539539
epochGotten = epoch
540540
false
541541
}

core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import java.util.concurrent._
2121
import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture}
2222
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
2323

24+
import scala.concurrent.Future
25+
import scala.util.{Failure, Success}
2426
import scala.util.control.NonFatal
2527

2628
import org.apache.spark.SparkConf
@@ -79,11 +81,6 @@ private[spark] class StandaloneAppClient(
7981
private val registrationRetryThread =
8082
ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread")
8183

82-
// A thread pool to perform receive then reply actions in a thread so as not to block the
83-
// event loop.
84-
private val askAndReplyThreadPool =
85-
ThreadUtils.newDaemonCachedThreadPool("appclient-receive-and-reply-threadpool")
86-
8784
override def onStart(): Unit = {
8885
try {
8986
registerWithMaster(1)
@@ -220,19 +217,13 @@ private[spark] class StandaloneAppClient(
220217
endpointRef: RpcEndpointRef,
221218
context: RpcCallContext,
222219
msg: T): Unit = {
223-
// Create a thread to ask a message and reply with the result. Allow thread to be
220+
// Ask a message and create a thread to reply with the result. Allow thread to be
224221
// interrupted during shutdown, otherwise context must be notified of NonFatal errors.
225-
askAndReplyThreadPool.execute(new Runnable {
226-
override def run(): Unit = {
227-
try {
228-
context.reply(endpointRef.askWithRetry[Boolean](msg))
229-
} catch {
230-
case ie: InterruptedException => // Cancelled
231-
case NonFatal(t) =>
232-
context.sendFailure(t)
233-
}
234-
}
235-
})
222+
endpointRef.ask[Boolean](msg).andThen {
223+
case Success(b) => context.reply(b)
224+
case Failure(ie: InterruptedException) => // Cancelled
225+
case Failure(NonFatal(t)) => context.sendFailure(t)
226+
}(ThreadUtils.sameThread)
236227
}
237228

238229
override def onDisconnected(address: RpcAddress): Unit = {
@@ -272,7 +263,6 @@ private[spark] class StandaloneAppClient(
272263
registrationRetryThread.shutdownNow()
273264
registerMasterFutures.get.foreach(_.cancel(true))
274265
registerMasterThreadPool.shutdownNow()
275-
askAndReplyThreadPool.shutdownNow()
276266
}
277267

278268
}
@@ -301,25 +291,25 @@ private[spark] class StandaloneAppClient(
301291
*
302292
* @return whether the request is acknowledged.
303293
*/
304-
def requestTotalExecutors(requestedTotal: Int): Boolean = {
294+
def requestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
305295
if (endpoint.get != null && appId.get != null) {
306-
endpoint.get.askWithRetry[Boolean](RequestExecutors(appId.get, requestedTotal))
296+
endpoint.get.ask[Boolean](RequestExecutors(appId.get, requestedTotal))
307297
} else {
308298
logWarning("Attempted to request executors before driver fully initialized.")
309-
false
299+
Future.successful(false)
310300
}
311301
}
312302

313303
/**
314304
* Kill the given list of executors through the Master.
315305
* @return whether the kill request is acknowledged.
316306
*/
317-
def killExecutors(executorIds: Seq[String]): Boolean = {
307+
def killExecutors(executorIds: Seq[String]): Future[Boolean] = {
318308
if (endpoint.get != null && appId.get != null) {
319-
endpoint.get.askWithRetry[Boolean](KillExecutors(appId.get, executorIds))
309+
endpoint.get.ask[Boolean](KillExecutors(appId.get, executorIds))
320310
} else {
321311
logWarning("Attempted to kill executors before driver fully initialized.")
322-
false
312+
Future.successful(false)
323313
}
324314
}
325315

core/src/main/scala/org/apache/spark/rdd/ZippedWithIndexRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class ZippedWithIndexRDD[T: ClassTag](prev: RDD[T]) extends RDD[(T, Long)](prev)
4343
@transient private val startIndices: Array[Long] = {
4444
val n = prev.partitions.length
4545
if (n == 0) {
46-
Array[Long]()
46+
Array.empty
4747
} else if (n == 1) {
4848
Array(0L)
4949
} else {

0 commit comments

Comments
 (0)