Skip to content

Commit 7265784

Browse files
committed
Merge remote-tracking branch 'origin/master' into pin-pages
2 parents 76cfebd + b313bad commit 7265784

File tree

138 files changed

+1363
-2341
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

138 files changed

+1363
-2341
lines changed

.gitignore

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ dev/create-release/*final
6060
spark-*-bin-*.tgz
6161
unit-tests.log
6262
/lib/
63-
ec2/lib/
6463
rat-results.txt
6564
scalastyle.txt
6665
scalastyle-output.xml

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ exportMethods("%in%",
130130
"count",
131131
"countDistinct",
132132
"crc32",
133+
"hash",
133134
"cume_dist",
134135
"date_add",
135136
"date_format",

R/pkg/R/functions.R

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,26 @@ setMethod("crc32",
340340
column(jc)
341341
})
342342

343+
#' hash
344+
#'
345+
#' Calculates the hash code of given columns, and returns the result as a int column.
346+
#'
347+
#' @rdname hash
348+
#' @name hash
349+
#' @family misc_funcs
350+
#' @export
351+
#' @examples \dontrun{hash(df$c)}
352+
setMethod("hash",
353+
signature(x = "Column"),
354+
function(x, ...) {
355+
jcols <- lapply(list(x, ...), function (x) {
356+
stopifnot(class(x) == "Column")
357+
x@jc
358+
})
359+
jc <- callJStatic("org.apache.spark.sql.functions", "hash", jcols)
360+
column(jc)
361+
})
362+
343363
#' dayofmonth
344364
#'
345365
#' Extracts the day of the month as an integer from a given date/timestamp/string.

R/pkg/R/generics.R

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -736,6 +736,10 @@ setGeneric("countDistinct", function(x, ...) { standardGeneric("countDistinct")
736736
#' @export
737737
setGeneric("crc32", function(x) { standardGeneric("crc32") })
738738

739+
#' @rdname hash
740+
#' @export
741+
setGeneric("hash", function(x, ...) { standardGeneric("hash") })
742+
739743
#' @rdname cume_dist
740744
#' @export
741745
setGeneric("cume_dist", function(x) { standardGeneric("cume_dist") })

R/pkg/inst/tests/testthat/test_sparkSQL.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -922,7 +922,7 @@ test_that("column functions", {
922922
c <- column("a")
923923
c1 <- abs(c) + acos(c) + approxCountDistinct(c) + ascii(c) + asin(c) + atan(c)
924924
c2 <- avg(c) + base64(c) + bin(c) + bitwiseNOT(c) + cbrt(c) + ceil(c) + cos(c)
925-
c3 <- cosh(c) + count(c) + crc32(c) + exp(c)
925+
c3 <- cosh(c) + count(c) + crc32(c) + hash(c) + exp(c)
926926
c4 <- explode(c) + expm1(c) + factorial(c) + first(c) + floor(c) + hex(c)
927927
c5 <- hour(c) + initcap(c) + last(c) + last_day(c) + length(c)
928928
c6 <- log(c) + (c) + log1p(c) + log2(c) + lower(c) + ltrim(c) + max(c) + md5(c)

core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
7777
This implementation is non-blocking, asynchronously handling the
7878
results of each job and triggering the next job using callbacks on futures.
7979
*/
80-
def continue(partsScanned: Long)(implicit jobSubmitter: JobSubmitter) : Future[Seq[T]] =
80+
def continue(partsScanned: Int)(implicit jobSubmitter: JobSubmitter): Future[Seq[T]] =
8181
if (results.size >= num || partsScanned >= totalParts) {
8282
Future.successful(results.toSeq)
8383
} else {
@@ -99,7 +99,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
9999
}
100100

101101
val left = num - results.size
102-
val p = partsScanned.toInt until math.min(partsScanned + numPartsToTry, totalParts).toInt
102+
val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
103103

104104
val buf = new Array[Array[T]](p.size)
105105
self.context.setCallSite(callSite)
@@ -109,13 +109,13 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
109109
p,
110110
(index: Int, data: Array[T]) => buf(index) = data,
111111
Unit)
112-
job.flatMap {_ =>
112+
job.flatMap { _ =>
113113
buf.foreach(results ++= _.take(num - results.size))
114114
continue(partsScanned + p.size)
115115
}
116116
}
117117

118-
new ComplexFutureAction[Seq[T]](continue(0L)(_))
118+
new ComplexFutureAction[Seq[T]](continue(0)(_))
119119
}
120120

121121
/**

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1190,7 +1190,7 @@ abstract class RDD[T: ClassTag](
11901190
} else {
11911191
val buf = new ArrayBuffer[T]
11921192
val totalParts = this.partitions.length
1193-
var partsScanned = 0L
1193+
var partsScanned = 0
11941194
while (buf.size < num && partsScanned < totalParts) {
11951195
// The number of partitions to try in this iteration. It is ok for this number to be
11961196
// greater than totalParts because we actually cap it at totalParts in runJob.
@@ -1209,7 +1209,7 @@ abstract class RDD[T: ClassTag](
12091209
}
12101210

12111211
val left = num - buf.size
1212-
val p = partsScanned.toInt until math.min(partsScanned + numPartsToTry, totalParts).toInt
1212+
val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
12131213
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)
12141214

12151215
res.foreach(buf ++= _.take(num - buf.size))

core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,10 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
482482
assert(nums.take(501) === (1 to 501).toArray)
483483
assert(nums.take(999) === (1 to 999).toArray)
484484
assert(nums.take(1000) === (1 to 999).toArray)
485+
486+
nums = sc.parallelize(1 to 2, 2)
487+
assert(nums.take(2147483638).size === 2)
488+
assert(nums.takeAsync(2147483638).get.size === 2)
485489
}
486490

487491
test("top with predefined ordering") {

dev/create-release/release-tag.sh

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,6 @@ git commit -a -m "Preparing Spark release $RELEASE_TAG"
6464
echo "Creating tag $RELEASE_TAG at the head of $GIT_BRANCH"
6565
git tag $RELEASE_TAG
6666

67-
# TODO: It would be nice to do some verifications here
68-
# i.e. check whether ec2 scripts have the new version
69-
7067
# Create next version
7168
$MVN versions:set -DnewVersion=$NEXT_VERSION | grep -v "no value" # silence logs
7269
git commit -a -m "Preparing development version $NEXT_VERSION"

dev/create-release/releaseutils.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,6 @@ def get_commits(tag):
159159
"build": CORE_COMPONENT,
160160
"deploy": CORE_COMPONENT,
161161
"documentation": CORE_COMPONENT,
162-
"ec2": "EC2",
163162
"examples": CORE_COMPONENT,
164163
"graphx": "GraphX",
165164
"input/output": CORE_COMPONENT,

0 commit comments

Comments
 (0)