Skip to content

Commit df95a90

Browse files
felixcheungFelix Cheung
authored andcommitted
[SPARK-22933][SPARKR] R Structured Streaming API for withWatermark, trigger, partitionBy
## What changes were proposed in this pull request? R Structured Streaming API for withWatermark, trigger, partitionBy ## How was this patch tested? manual, unit tests Author: Felix Cheung <[email protected]> Closes #20129 from felixcheung/rwater.
1 parent 7d045c5 commit df95a90

File tree

7 files changed

+214
-6
lines changed

7 files changed

+214
-6
lines changed

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ exportMethods("arrange",
179179
"with",
180180
"withColumn",
181181
"withColumnRenamed",
182+
"withWatermark",
182183
"write.df",
183184
"write.jdbc",
184185
"write.json",

R/pkg/R/DataFrame.R

Lines changed: 92 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3661,7 +3661,8 @@ setMethod("getNumPartitions",
36613661
#' isStreaming
36623662
#'
36633663
#' Returns TRUE if this SparkDataFrame contains one or more sources that continuously return data
3664-
#' as it arrives.
3664+
#' as it arrives. A dataset that reads data from a streaming source must be executed as a
3665+
#' \code{StreamingQuery} using \code{write.stream}.
36653666
#'
36663667
#' @param x A SparkDataFrame
36673668
#' @return TRUE if this SparkDataFrame is from a streaming source
@@ -3707,7 +3708,17 @@ setMethod("isStreaming",
37073708
#' @param df a streaming SparkDataFrame.
37083709
#' @param source a name for external data source.
37093710
#' @param outputMode one of 'append', 'complete', 'update'.
3710-
#' @param ... additional argument(s) passed to the method.
3711+
#' @param partitionBy a name or a list of names of columns to partition the output by on the file
3712+
#' system. If specified, the output is laid out on the file system similar to Hive's
3713+
#' partitioning scheme.
3714+
#' @param trigger.processingTime a processing time interval as a string, e.g. '5 seconds',
3715+
#' '1 minute'. This is a trigger that runs a query periodically based on the processing
3716+
#' time. If value is '0 seconds', the query will run as fast as possible, this is the
3717+
#' default. Only one trigger can be set.
3718+
#' @param trigger.once a logical, must be set to \code{TRUE}. This is a trigger that processes only
3719+
#' one batch of data in a streaming query then terminates the query. Only one trigger can be
3720+
#' set.
3721+
#' @param ... additional external data source specific named options.
37113722
#'
37123723
#' @family SparkDataFrame functions
37133724
#' @seealso \link{read.stream}
@@ -3725,7 +3736,8 @@ setMethod("isStreaming",
37253736
#' # console
37263737
#' q <- write.stream(wordCounts, "console", outputMode = "complete")
37273738
#' # text stream
3728-
#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp")
3739+
#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp"
3740+
#' partitionBy = c("year", "month"), trigger.processingTime = "30 seconds")
37293741
#' # memory stream
37303742
#' q <- write.stream(wordCounts, "memory", queryName = "outs", outputMode = "complete")
37313743
#' head(sql("SELECT * from outs"))
@@ -3737,7 +3749,8 @@ setMethod("isStreaming",
37373749
#' @note experimental
37383750
setMethod("write.stream",
37393751
signature(df = "SparkDataFrame"),
3740-
function(df, source = NULL, outputMode = NULL, ...) {
3752+
function(df, source = NULL, outputMode = NULL, partitionBy = NULL,
3753+
trigger.processingTime = NULL, trigger.once = NULL, ...) {
37413754
if (!is.null(source) && !is.character(source)) {
37423755
stop("source should be character, NULL or omitted. It is the data source specified ",
37433756
"in 'spark.sql.sources.default' configuration by default.")
@@ -3748,12 +3761,43 @@ setMethod("write.stream",
37483761
if (is.null(source)) {
37493762
source <- getDefaultSqlSource()
37503763
}
3764+
cols <- NULL
3765+
if (!is.null(partitionBy)) {
3766+
if (!all(sapply(partitionBy, function(c) { is.character(c) }))) {
3767+
stop("All partitionBy column names should be characters.")
3768+
}
3769+
cols <- as.list(partitionBy)
3770+
}
3771+
jtrigger <- NULL
3772+
if (!is.null(trigger.processingTime) && !is.na(trigger.processingTime)) {
3773+
if (!is.null(trigger.once)) {
3774+
stop("Multiple triggers not allowed.")
3775+
}
3776+
interval <- as.character(trigger.processingTime)
3777+
if (nchar(interval) == 0) {
3778+
stop("Value for trigger.processingTime must be a non-empty string.")
3779+
}
3780+
jtrigger <- handledCallJStatic("org.apache.spark.sql.streaming.Trigger",
3781+
"ProcessingTime",
3782+
interval)
3783+
} else if (!is.null(trigger.once) && !is.na(trigger.once)) {
3784+
if (!is.logical(trigger.once) || !trigger.once) {
3785+
stop("Value for trigger.once must be TRUE.")
3786+
}
3787+
jtrigger <- callJStatic("org.apache.spark.sql.streaming.Trigger", "Once")
3788+
}
37513789
options <- varargsToStrEnv(...)
37523790
write <- handledCallJMethod(df@sdf, "writeStream")
37533791
write <- callJMethod(write, "format", source)
37543792
if (!is.null(outputMode)) {
37553793
write <- callJMethod(write, "outputMode", outputMode)
37563794
}
3795+
if (!is.null(cols)) {
3796+
write <- callJMethod(write, "partitionBy", cols)
3797+
}
3798+
if (!is.null(jtrigger)) {
3799+
write <- callJMethod(write, "trigger", jtrigger)
3800+
}
37573801
write <- callJMethod(write, "options", options)
37583802
ssq <- handledCallJMethod(write, "start")
37593803
streamingQuery(ssq)
@@ -3967,3 +4011,47 @@ setMethod("broadcast",
39674011
sdf <- callJStatic("org.apache.spark.sql.functions", "broadcast", x@sdf)
39684012
dataFrame(sdf)
39694013
})
4014+
4015+
#' withWatermark
4016+
#'
4017+
#' Defines an event time watermark for this streaming SparkDataFrame. A watermark tracks a point in
4018+
#' time before which we assume no more late data is going to arrive.
4019+
#'
4020+
#' Spark will use this watermark for several purposes:
4021+
#' \itemize{
4022+
#' \item{-} To know when a given time window aggregation can be finalized and thus can be emitted
4023+
#' when using output modes that do not allow updates.
4024+
#' \item{-} To minimize the amount of state that we need to keep for on-going aggregations.
4025+
#' }
4026+
#' The current watermark is computed by looking at the \code{MAX(eventTime)} seen across
4027+
#' all of the partitions in the query minus a user specified \code{delayThreshold}. Due to the cost
4028+
#' of coordinating this value across partitions, the actual watermark used is only guaranteed
4029+
#' to be at least \code{delayThreshold} behind the actual event time. In some cases we may still
4030+
#' process records that arrive more than \code{delayThreshold} late.
4031+
#'
4032+
#' @param x a streaming SparkDataFrame
4033+
#' @param eventTime a string specifying the name of the Column that contains the event time of the
4034+
#' row.
4035+
#' @param delayThreshold a string specifying the minimum delay to wait to data to arrive late,
4036+
#' relative to the latest record that has been processed in the form of an
4037+
#' interval (e.g. "1 minute" or "5 hours"). NOTE: This should not be negative.
4038+
#' @return a SparkDataFrame.
4039+
#' @aliases withWatermark,SparkDataFrame,character,character-method
4040+
#' @family SparkDataFrame functions
4041+
#' @rdname withWatermark
4042+
#' @name withWatermark
4043+
#' @export
4044+
#' @examples
4045+
#' \dontrun{
4046+
#' sparkR.session()
4047+
#' schema <- structType(structField("time", "timestamp"), structField("value", "double"))
4048+
#' df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
4049+
#' df <- withWatermark(df, "time", "10 minutes")
4050+
#' }
4051+
#' @note withWatermark since 2.3.0
4052+
setMethod("withWatermark",
4053+
signature(x = "SparkDataFrame", eventTime = "character", delayThreshold = "character"),
4054+
function(x, eventTime, delayThreshold) {
4055+
sdf <- callJMethod(x@sdf, "withWatermark", eventTime, delayThreshold)
4056+
dataFrame(sdf)
4057+
})

R/pkg/R/SQLContext.R

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -727,7 +727,9 @@ read.jdbc <- function(url, tableName,
727727
#' @param schema The data schema defined in structType or a DDL-formatted string, this is
728728
#' required for file-based streaming data source
729729
#' @param ... additional external data source specific named options, for instance \code{path} for
730-
#' file-based streaming data source
730+
#' file-based streaming data source. \code{timeZone} to indicate a timezone to be used to
731+
#' parse timestamps in the JSON/CSV data sources or partition values; If it isn't set, it
732+
#' uses the default value, session local timezone.
731733
#' @return SparkDataFrame
732734
#' @rdname read.stream
733735
#' @name read.stream

R/pkg/R/generics.R

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -799,6 +799,12 @@ setGeneric("withColumn", function(x, colName, col) { standardGeneric("withColumn
799799
setGeneric("withColumnRenamed",
800800
function(x, existingCol, newCol) { standardGeneric("withColumnRenamed") })
801801

802+
#' @rdname withWatermark
803+
#' @export
804+
setGeneric("withWatermark", function(x, eventTime, delayThreshold) {
805+
standardGeneric("withWatermark")
806+
})
807+
802808
#' @rdname write.df
803809
#' @export
804810
setGeneric("write.df", function(df, path = NULL, ...) { standardGeneric("write.df") })

R/pkg/tests/fulltests/test_streaming.R

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,113 @@ test_that("Terminated by error", {
172172
stopQuery(q)
173173
})
174174

175+
test_that("PartitionBy", {
176+
parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
177+
checkpointPath <- tempfile(pattern = "sparkr-test", fileext = ".checkpoint")
178+
textPath <- tempfile(pattern = "sparkr-test", fileext = ".text")
179+
df <- read.df(jsonPath, "json", stringSchema)
180+
write.df(df, parquetPath, "parquet", "overwrite")
181+
182+
df <- read.stream(path = parquetPath, schema = stringSchema)
183+
184+
expect_error(write.stream(df, "json", path = textPath, checkpointLocation = "append",
185+
partitionBy = c(1, 2)),
186+
"All partitionBy column names should be characters")
187+
188+
q <- write.stream(df, "json", path = textPath, checkpointLocation = "append",
189+
partitionBy = "name")
190+
awaitTermination(q, 5 * 1000)
191+
callJMethod(q@ssq, "processAllAvailable")
192+
193+
dirs <- list.files(textPath)
194+
expect_equal(length(dirs[substring(dirs, 1, nchar("name=")) == "name="]), 3)
195+
196+
unlink(checkpointPath)
197+
unlink(textPath)
198+
unlink(parquetPath)
199+
})
200+
201+
test_that("Watermark", {
202+
parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
203+
schema <- structType(structField("value", "string"))
204+
t <- Sys.time()
205+
df <- as.DataFrame(lapply(list(t), as.character), schema)
206+
write.df(df, parquetPath, "parquet", "append")
207+
df <- read.stream(path = parquetPath, schema = "value STRING")
208+
df <- withColumn(df, "eventTime", cast(df$value, "timestamp"))
209+
df <- withWatermark(df, "eventTime", "10 seconds")
210+
counts <- count(group_by(df, "eventTime"))
211+
q <- write.stream(counts, "memory", queryName = "times", outputMode = "append")
212+
213+
# first events
214+
df <- as.DataFrame(lapply(list(t + 1, t, t + 2), as.character), schema)
215+
write.df(df, parquetPath, "parquet", "append")
216+
awaitTermination(q, 5 * 1000)
217+
callJMethod(q@ssq, "processAllAvailable")
218+
219+
# advance watermark to 15
220+
df <- as.DataFrame(lapply(list(t + 25), as.character), schema)
221+
write.df(df, parquetPath, "parquet", "append")
222+
awaitTermination(q, 5 * 1000)
223+
callJMethod(q@ssq, "processAllAvailable")
224+
225+
# old events, should be dropped
226+
df <- as.DataFrame(lapply(list(t), as.character), schema)
227+
write.df(df, parquetPath, "parquet", "append")
228+
awaitTermination(q, 5 * 1000)
229+
callJMethod(q@ssq, "processAllAvailable")
230+
231+
# evict events less than previous watermark
232+
df <- as.DataFrame(lapply(list(t + 25), as.character), schema)
233+
write.df(df, parquetPath, "parquet", "append")
234+
awaitTermination(q, 5 * 1000)
235+
callJMethod(q@ssq, "processAllAvailable")
236+
237+
times <- collect(sql("SELECT * FROM times"))
238+
# looks like write timing can affect the first bucket; but it should be t
239+
expect_equal(times[order(times$eventTime),][1, 2], 2)
240+
241+
stopQuery(q)
242+
unlink(parquetPath)
243+
})
244+
245+
test_that("Trigger", {
246+
parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
247+
schema <- structType(structField("value", "string"))
248+
df <- as.DataFrame(lapply(list(Sys.time()), as.character), schema)
249+
write.df(df, parquetPath, "parquet", "append")
250+
df <- read.stream(path = parquetPath, schema = "value STRING")
251+
252+
expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
253+
trigger.processingTime = "", trigger.once = ""), "Multiple triggers not allowed.")
254+
255+
expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
256+
trigger.processingTime = ""),
257+
"Value for trigger.processingTime must be a non-empty string.")
258+
259+
expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
260+
trigger.processingTime = "invalid"), "illegal argument")
261+
262+
expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
263+
trigger.once = ""), "Value for trigger.once must be TRUE.")
264+
265+
expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
266+
trigger.once = FALSE), "Value for trigger.once must be TRUE.")
267+
268+
q <- write.stream(df, "memory", queryName = "times", outputMode = "append", trigger.once = TRUE)
269+
awaitTermination(q, 5 * 1000)
270+
callJMethod(q@ssq, "processAllAvailable")
271+
df <- as.DataFrame(lapply(list(Sys.time()), as.character), schema)
272+
write.df(df, parquetPath, "parquet", "append")
273+
awaitTermination(q, 5 * 1000)
274+
callJMethod(q@ssq, "processAllAvailable")
275+
276+
expect_equal(nrow(collect(sql("SELECT * FROM times"))), 1)
277+
278+
stopQuery(q)
279+
unlink(parquetPath)
280+
})
281+
175282
unlink(jsonPath)
176283
unlink(jsonPathNa)
177284

python/pyspark/sql/streaming.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -793,6 +793,10 @@ def trigger(self, processingTime=None, once=None):
793793
.. note:: Evolving.
794794
795795
:param processingTime: a processing time interval as a string, e.g. '5 seconds', '1 minute'.
796+
Set a trigger that runs a query periodically based on the processing
797+
time. Only one trigger can be set.
798+
:param once: if set to True, set a trigger that processes only one batch of data in a
799+
streaming query then terminates the query. Only one trigger can be set.
796800
797801
>>> # trigger the query for execution every 5 seconds
798802
>>> writer = sdf.writeStream.trigger(processingTime='5 seconds')

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.spark.annotation.{Experimental, InterfaceStability}
2121
import org.apache.spark.sql.streaming.Trigger
2222

2323
/**
24-
* A [[Trigger]] that process only one batch of data in a streaming query then terminates
24+
* A [[Trigger]] that processes only one batch of data in a streaming query then terminates
2525
* the query.
2626
*/
2727
@Experimental

0 commit comments

Comments
 (0)