@@ -2297,6 +2297,7 @@ setClassUnion("characterOrColumn", c("character", "Column"))
22972297# ' @param ... additional sorting fields
22982298# ' @param decreasing a logical argument indicating sorting order for columns when
22992299# ' a character vector is specified for col
2300+ # ' @param withinPartitions a logical argument indicating whether to sort only within each partition
23002301# ' @return A SparkDataFrame where all elements are sorted.
23012302# ' @family SparkDataFrame functions
23022303# ' @aliases arrange,SparkDataFrame,Column-method
@@ -2312,16 +2313,21 @@ setClassUnion("characterOrColumn", c("character", "Column"))
23122313# ' arrange(df, asc(df$col1), desc(abs(df$col2)))
23132314# ' arrange(df, "col1", decreasing = TRUE)
23142315# ' arrange(df, "col1", "col2", decreasing = c(TRUE, FALSE))
2316+ # ' arrange(df, "col1", "col2", withinPartitions = TRUE)
23152317# ' }
23162318# ' @note arrange(SparkDataFrame, Column) since 1.4.0
23172319setMethod ("arrange ",
23182320 signature(x = " SparkDataFrame" , col = " Column" ),
2319- function (x , col , ... ) {
2321+ function (x , col , ... , withinPartitions = FALSE ) {
23202322 jcols <- lapply(list (col , ... ), function (c ) {
23212323 c @ jc
23222324 })
23232325
2324- sdf <- callJMethod(x @ sdf , " sort" , jcols )
2326+ if (withinPartitions ) {
2327+ sdf <- callJMethod(x @ sdf , " sortWithinPartitions" , jcols )
2328+ } else {
2329+ sdf <- callJMethod(x @ sdf , " sort" , jcols )
2330+ }
23252331 dataFrame(sdf )
23262332 })
23272333
@@ -2332,7 +2338,7 @@ setMethod("arrange",
23322338# ' @note arrange(SparkDataFrame, character) since 1.4.0
23332339setMethod ("arrange ",
23342340 signature(x = " SparkDataFrame" , col = " character" ),
2335- function (x , col , ... , decreasing = FALSE ) {
2341+ function (x , col , ... , decreasing = FALSE , withinPartitions = FALSE ) {
23362342
23372343 # all sorting columns
23382344 by <- list (col , ... )
@@ -2356,7 +2362,7 @@ setMethod("arrange",
23562362 }
23572363 })
23582364
2359- do.call(" arrange" , c(x , jcols ))
2365+ do.call(" arrange" , c(x , jcols , withinPartitions = withinPartitions ))
23602366 })
23612367
23622368# ' @rdname arrange
@@ -3655,7 +3661,8 @@ setMethod("getNumPartitions",
36553661# ' isStreaming
36563662# '
36573663# ' Returns TRUE if this SparkDataFrame contains one or more sources that continuously return data
3658- # ' as it arrives.
3664+ # ' as it arrives. A dataset that reads data from a streaming source must be executed as a
3665+ # ' \code{StreamingQuery} using \code{write.stream}.
36593666# '
36603667# ' @param x A SparkDataFrame
36613668# ' @return TRUE if this SparkDataFrame is from a streaming source
@@ -3701,7 +3708,17 @@ setMethod("isStreaming",
37013708# ' @param df a streaming SparkDataFrame.
37023709# ' @param source a name for external data source.
37033710# ' @param outputMode one of 'append', 'complete', 'update'.
3704- # ' @param ... additional argument(s) passed to the method.
3711+ # ' @param partitionBy a name or a list of names of columns to partition the output by on the file
3712+ # ' system. If specified, the output is laid out on the file system similar to Hive's
3713+ # ' partitioning scheme.
3714+ # ' @param trigger.processingTime a processing time interval as a string, e.g. '5 seconds',
3715+ # ' '1 minute'. This is a trigger that runs a query periodically based on the processing
3716+ # ' time. If value is '0 seconds', the query will run as fast as possible, this is the
3717+ # ' default. Only one trigger can be set.
3718+ # ' @param trigger.once a logical, must be set to \code{TRUE}. This is a trigger that processes only
3719+ # ' one batch of data in a streaming query then terminates the query. Only one trigger can be
3720+ # ' set.
3721+ # ' @param ... additional external data source specific named options.
37053722# '
37063723# ' @family SparkDataFrame functions
37073724# ' @seealso \link{read.stream}
@@ -3719,7 +3736,8 @@ setMethod("isStreaming",
37193736# ' # console
37203737# ' q <- write.stream(wordCounts, "console", outputMode = "complete")
37213738# ' # text stream
3722- # ' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp")
3739+ # ' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp"
3740+ # ' partitionBy = c("year", "month"), trigger.processingTime = "30 seconds")
37233741# ' # memory stream
37243742# ' q <- write.stream(wordCounts, "memory", queryName = "outs", outputMode = "complete")
37253743# ' head(sql("SELECT * from outs"))
@@ -3731,7 +3749,8 @@ setMethod("isStreaming",
37313749# ' @note experimental
37323750setMethod ("write.stream ",
37333751 signature(df = " SparkDataFrame" ),
3734- function (df , source = NULL , outputMode = NULL , ... ) {
3752+ function (df , source = NULL , outputMode = NULL , partitionBy = NULL ,
3753+ trigger.processingTime = NULL , trigger.once = NULL , ... ) {
37353754 if (! is.null(source ) && ! is.character(source )) {
37363755 stop(" source should be character, NULL or omitted. It is the data source specified " ,
37373756 " in 'spark.sql.sources.default' configuration by default." )
@@ -3742,12 +3761,43 @@ setMethod("write.stream",
37423761 if (is.null(source )) {
37433762 source <- getDefaultSqlSource()
37443763 }
3764+ cols <- NULL
3765+ if (! is.null(partitionBy )) {
3766+ if (! all(sapply(partitionBy , function (c ) { is.character(c ) }))) {
3767+ stop(" All partitionBy column names should be characters." )
3768+ }
3769+ cols <- as.list(partitionBy )
3770+ }
3771+ jtrigger <- NULL
3772+ if (! is.null(trigger.processingTime ) && ! is.na(trigger.processingTime )) {
3773+ if (! is.null(trigger.once )) {
3774+ stop(" Multiple triggers not allowed." )
3775+ }
3776+ interval <- as.character(trigger.processingTime )
3777+ if (nchar(interval ) == 0 ) {
3778+ stop(" Value for trigger.processingTime must be a non-empty string." )
3779+ }
3780+ jtrigger <- handledCallJStatic(" org.apache.spark.sql.streaming.Trigger" ,
3781+ " ProcessingTime" ,
3782+ interval )
3783+ } else if (! is.null(trigger.once ) && ! is.na(trigger.once )) {
3784+ if (! is.logical(trigger.once ) || ! trigger.once ) {
3785+ stop(" Value for trigger.once must be TRUE." )
3786+ }
3787+ jtrigger <- callJStatic(" org.apache.spark.sql.streaming.Trigger" , " Once" )
3788+ }
37453789 options <- varargsToStrEnv(... )
37463790 write <- handledCallJMethod(df @ sdf , " writeStream" )
37473791 write <- callJMethod(write , " format" , source )
37483792 if (! is.null(outputMode )) {
37493793 write <- callJMethod(write , " outputMode" , outputMode )
37503794 }
3795+ if (! is.null(cols )) {
3796+ write <- callJMethod(write , " partitionBy" , cols )
3797+ }
3798+ if (! is.null(jtrigger )) {
3799+ write <- callJMethod(write , " trigger" , jtrigger )
3800+ }
37513801 write <- callJMethod(write , " options" , options )
37523802 ssq <- handledCallJMethod(write , " start" )
37533803 streamingQuery(ssq )
@@ -3782,6 +3832,33 @@ setMethod("checkpoint",
37823832 dataFrame(df )
37833833 })
37843834
3835+ # ' localCheckpoint
3836+ # '
3837+ # ' Returns a locally checkpointed version of this SparkDataFrame. Checkpointing can be used to
3838+ # ' truncate the logical plan, which is especially useful in iterative algorithms where the plan
3839+ # ' may grow exponentially. Local checkpoints are stored in the executors using the caching
3840+ # ' subsystem and therefore they are not reliable.
3841+ # '
3842+ # ' @param x A SparkDataFrame
3843+ # ' @param eager whether to locally checkpoint this SparkDataFrame immediately
3844+ # ' @return a new locally checkpointed SparkDataFrame
3845+ # ' @family SparkDataFrame functions
3846+ # ' @aliases localCheckpoint,SparkDataFrame-method
3847+ # ' @rdname localCheckpoint
3848+ # ' @name localCheckpoint
3849+ # ' @export
3850+ # ' @examples
3851+ # '\dontrun{
3852+ # ' df <- localCheckpoint(df)
3853+ # ' }
3854+ # ' @note localCheckpoint since 2.3.0
3855+ setMethod ("localCheckpoint ",
3856+ signature(x = " SparkDataFrame" ),
3857+ function (x , eager = TRUE ) {
3858+ df <- callJMethod(x @ sdf , " localCheckpoint" , as.logical(eager ))
3859+ dataFrame(df )
3860+ })
3861+
37853862# ' cube
37863863# '
37873864# ' Create a multi-dimensional cube for the SparkDataFrame using the specified columns.
@@ -3934,3 +4011,47 @@ setMethod("broadcast",
39344011 sdf <- callJStatic(" org.apache.spark.sql.functions" , " broadcast" , x @ sdf )
39354012 dataFrame(sdf )
39364013 })
4014+
4015+ # ' withWatermark
4016+ # '
4017+ # ' Defines an event time watermark for this streaming SparkDataFrame. A watermark tracks a point in
4018+ # ' time before which we assume no more late data is going to arrive.
4019+ # '
4020+ # ' Spark will use this watermark for several purposes:
4021+ # ' \itemize{
4022+ # ' \item{-} To know when a given time window aggregation can be finalized and thus can be emitted
4023+ # ' when using output modes that do not allow updates.
4024+ # ' \item{-} To minimize the amount of state that we need to keep for on-going aggregations.
4025+ # ' }
4026+ # ' The current watermark is computed by looking at the \code{MAX(eventTime)} seen across
4027+ # ' all of the partitions in the query minus a user specified \code{delayThreshold}. Due to the cost
4028+ # ' of coordinating this value across partitions, the actual watermark used is only guaranteed
4029+ # ' to be at least \code{delayThreshold} behind the actual event time. In some cases we may still
4030+ # ' process records that arrive more than \code{delayThreshold} late.
4031+ # '
4032+ # ' @param x a streaming SparkDataFrame
4033+ # ' @param eventTime a string specifying the name of the Column that contains the event time of the
4034+ # ' row.
4035+ # ' @param delayThreshold a string specifying the minimum delay to wait to data to arrive late,
4036+ # ' relative to the latest record that has been processed in the form of an
4037+ # ' interval (e.g. "1 minute" or "5 hours"). NOTE: This should not be negative.
4038+ # ' @return a SparkDataFrame.
4039+ # ' @aliases withWatermark,SparkDataFrame,character,character-method
4040+ # ' @family SparkDataFrame functions
4041+ # ' @rdname withWatermark
4042+ # ' @name withWatermark
4043+ # ' @export
4044+ # ' @examples
4045+ # ' \dontrun{
4046+ # ' sparkR.session()
4047+ # ' schema <- structType(structField("time", "timestamp"), structField("value", "double"))
4048+ # ' df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
4049+ # ' df <- withWatermark(df, "time", "10 minutes")
4050+ # ' }
4051+ # ' @note withWatermark since 2.3.0
4052+ setMethod ("withWatermark ",
4053+ signature(x = " SparkDataFrame" , eventTime = " character" , delayThreshold = " character" ),
4054+ function (x , eventTime , delayThreshold ) {
4055+ sdf <- callJMethod(x @ sdf , " withWatermark" , eventTime , delayThreshold )
4056+ dataFrame(sdf )
4057+ })
0 commit comments