Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions R/pkg/R/mllib_clustering.R
Original file line number Diff line number Diff line change
Expand Up @@ -498,11 +498,7 @@ setMethod("write.ml", signature(object = "KMeansModel", path = "character"),
#' @export
#' @examples
#' \dontrun{
#' # nolint start
#' # An example "path/to/file" can be
#' # paste0(Sys.getenv("SPARK_HOME"), "/data/mllib/sample_lda_libsvm_data.txt")
#' # nolint end
#' text <- read.df("path/to/file", source = "libsvm")
#' text <- read.df("data/mllib/sample_lda_libsvm_data.txt", source = "libsvm")
#' model <- spark.lda(data = text, optimizer = "em")
#'
#' # get a summary of the model
Expand Down
4 changes: 4 additions & 0 deletions R/pkg/R/mllib_fpm.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ setClass("FPGrowthModel", slots = list(jobj = "jobj"))
#' FP-growth
#'
#' A parallel FP-growth algorithm to mine frequent itemsets.
#' \code{spark.fpGrowth} fits a FP-growth model on a SparkDataFrame. Users can
#' \code{spark.freqItemsets} to get frequent itemsets, \code{spark.associationRules} to get
#' association rules, \code{predict} to make predictions on new data based on generated association
#' rules, and \code{write.ml}/\code{read.ml} to save/load fitted models.
#' For more details, see
#' \href{https://spark.apache.org/docs/latest/mllib-frequent-pattern-mining.html#fp-growth}{
#' FP-growth}.
Expand Down
19 changes: 16 additions & 3 deletions core/src/main/resources/org/apache/spark/ui/static/log-view.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,26 @@ function noNewAlert() {
window.setTimeout(function () {alert.css("display", "none");}, 4000);
}


function getRESTEndPoint() {
// If the worker is served from the master through a proxy (see doc on spark.ui.reverseProxy),
// we need to retain the leading ../proxy/<workerid>/ part of the URL when making REST requests.
// Similar logic is contained in executorspage.js function createRESTEndPoint.
var words = document.baseURI.split('/');
var ind = words.indexOf("proxy");
if (ind > 0) {
return words.slice(0, ind + 2).join('/') + "/log";
}
return "/log"
}

function loadMore() {
var offset = Math.max(startByte - byteLength, 0);
var moreByteLength = Math.min(byteLength, startByte);

$.ajax({
type: "GET",
url: "/log" + baseParams + "&offset=" + offset + "&byteLength=" + moreByteLength,
url: getRESTEndPoint() + baseParams + "&offset=" + offset + "&byteLength=" + moreByteLength,
success: function (data) {
var oldHeight = $(".log-content")[0].scrollHeight;
var newlineIndex = data.indexOf('\n');
Expand All @@ -83,14 +96,14 @@ function loadMore() {
function loadNew() {
$.ajax({
type: "GET",
url: "/log" + baseParams + "&byteLength=0",
url: getRESTEndPoint() + baseParams + "&byteLength=0",
success: function (data) {
var dataInfo = data.substring(0, data.indexOf('\n')).match(/\d+/g);
var newDataLen = dataInfo[2] - totalLogLength;
if (newDataLen != 0) {
$.ajax({
type: "GET",
url: "/log" + baseParams + "&byteLength=" + newDataLen,
url: getRESTEndPoint() + baseParams + "&byteLength=" + newDataLen,
success: function (data) {
var newlineIndex = data.indexOf('\n');
var dataInfo = data.substring(0, newlineIndex).match(/\d+/g);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@ private[spark] class TypedConfigBuilder[T](
}
}

/** Creates a [[ConfigEntry]] with a function to determine the default value */
def createWithDefaultFunction(defaultFunc: () => T): ConfigEntry[T] = {
val entry = new ConfigEntryWithDefaultFunction[T](parent.key, defaultFunc, converter,
stringConverter, parent._doc, parent._public)
parent._onCreate.foreach(_ (entry))
entry
}

/**
* Creates a [[ConfigEntry]] that has a default value. The default value is provided as a
* [[String]] and must be a valid value for the entry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,24 @@ private class ConfigEntryWithDefault[T] (
def readFrom(reader: ConfigReader): T = {
reader.get(key).map(valueConverter).getOrElse(_defaultValue)
}
}

private class ConfigEntryWithDefaultFunction[T] (
key: String,
_defaultFunction: () => T,
valueConverter: String => T,
stringConverter: T => String,
doc: String,
isPublic: Boolean)
extends ConfigEntry(key, valueConverter, stringConverter, doc, isPublic) {

override def defaultValue: Option[T] = Some(_defaultFunction())

override def defaultValueString: String = stringConverter(_defaultFunction())

def readFrom(reader: ConfigReader): T = {
reader.get(key).map(valueConverter).getOrElse(_defaultFunction())
}
}

private class ConfigEntryWithDefaultString[T] (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages"
val stageId = Option(request.getParameter("id")).map(_.toInt)
stageId.foreach { id =>
if (progressListener.activeStages.contains(id)) {
sc.foreach(_.cancelStage(id))
sc.foreach(_.cancelStage(id, "killed via the Web UI"))
// Do a quick pause here to give Spark time to kill the stage so it shows up as
// killed after the refresh. Note that this will block the serving thread so the
// time should be limited in duration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,4 +251,13 @@ class ConfigEntrySuite extends SparkFunSuite {
.createWithDefault(null)
testEntryRef(nullConf, ref(nullConf))
}

test("conf entry : default function") {
var data = 0
val conf = new SparkConf()
val iConf = ConfigBuilder(testKey("intval")).intConf.createWithDefaultFunction(() => data)
assert(conf.get(iConf) === 0)
data = 2
assert(conf.get(iConf) === 2)
}
}
98 changes: 89 additions & 9 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
layout: global
displayTitle: Structured Streaming Programming Guide [Alpha]
displayTitle: Structured Streaming Programming Guide [Experimental]
title: Structured Streaming Programming Guide
---

Expand Down Expand Up @@ -871,6 +871,65 @@ streamingDf.join(staticDf, "type", "right_join") # right outer join with a stat
</div>
</div>

### Streaming Deduplication
You can deduplicate records in data streams using a unique identifier in the events. This is exactly same as deduplication on static using a unique identifier column. The query will store the necessary amount of data from previous records such that it can filter duplicate records. Similar to aggregations, you can use deduplication with or without watermarking.

- *With watermark* - If there is a upper bound on how late a duplicate record may arrive, then you can define a watermark on a event time column and deduplicate using both the guid and the event time columns. The query will use the watermark to remove old state data from past records that are not expected to get any duplicates any more. This bounds the amount of the state the query has to maintain.

- *Without watermark* - Since there are no bounds on when a duplicate record may arrive, the query stores the data from all the past records as state.

<div class="codetabs">
<div data-lang="scala" markdown="1">

{% highlight scala %}
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...

// Without watermark using guid column
streamingDf.dropDuplicates("guid")

// With watermark using guid and eventTime columns
streamingDf
.withWatermark("eventTime", "10 seconds")
.dropDuplicates("guid", "eventTime")
{% endhighlight %}

</div>
<div data-lang="java" markdown="1">

{% highlight java %}
Dataset<Row> streamingDf = spark.readStream. ...; // columns: guid, eventTime, ...

// Without watermark using guid column
streamingDf.dropDuplicates("guid");

// With watermark using guid and eventTime columns
streamingDf
.withWatermark("eventTime", "10 seconds")
.dropDuplicates("guid", "eventTime");
{% endhighlight %}


</div>
<div data-lang="python" markdown="1">

{% highlight python %}
streamingDf = spark.readStream. ...

// Without watermark using guid column
streamingDf.dropDuplicates("guid")

// With watermark using guid and eventTime columns
streamingDf \
.withWatermark("eventTime", "10 seconds") \
.dropDuplicates("guid", "eventTime")
{% endhighlight %}

</div>
</div>

### Arbitrary Stateful Operations
Many uscases require more advanced stateful operations than aggregations. For example, in many usecases, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger. Since Spark 2.2, this can be done using the operation `mapGroupsWithState` and the more powerful operation `flatMapGroupsWithState`. Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state. For more concrete details, take a look at the API documentation ([Scala](api/scala/index.html#org.apache.spark.sql.streaming.GroupState)/[Java](api/java/org/apache/spark/sql/streaming/GroupState.html)) and the examples ([Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java)).

### Unsupported Operations
There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets.
Some of them are as follows.
Expand All @@ -891,7 +950,7 @@ Some of them are as follows.

+ Right outer join with a streaming Dataset on the left is not supported

- Any kind of joins between two streaming Datasets are not yet supported.
- Any kind of joins between two streaming Datasets is not yet supported.

In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).

Expand Down Expand Up @@ -951,13 +1010,6 @@ Here is the compatibility matrix.
<th>Supported Output Modes</th>
<th>Notes</th>
</tr>
<tr>
<td colspan="2" style="vertical-align: middle;">Queries without aggregation</td>
<td style="vertical-align: middle;">Append, Update</td>
<td style="vertical-align: middle;">
Complete mode not supported as it is infeasible to keep all data in the Result Table.
</td>
</tr>
<tr>
<td rowspan="2" style="vertical-align: middle;">Queries with aggregation</td>
<td style="vertical-align: middle;">Aggregation on event-time with watermark</td>
Expand Down Expand Up @@ -986,6 +1038,33 @@ Here is the compatibility matrix.
this mode.
</td>
</tr>
<tr>
<td colspan="2" style="vertical-align: middle;">Queries with <code>mapGroupsWithState</code></td>
<td style="vertical-align: middle;">Update</td>
<td style="vertical-align: middle;"></td>
</tr>
<tr>
<td rowspan="2" style="vertical-align: middle;">Queries with <code>flatMapGroupsWithState</code></td>
<td style="vertical-align: middle;">Append operation mode</td>
<td style="vertical-align: middle;">Append</td>
<td style="vertical-align: middle;">
Aggregations are allowed after <code>flatMapGroupsWithState</code>.
</td>
</tr>
<tr>
<td style="vertical-align: middle;">Update operation mode</td>
<td style="vertical-align: middle;">Update</td>
<td style="vertical-align: middle;">
Aggregations not allowed after <code>flatMapGroupsWithState</code>.
</td>
</tr>
<tr>
<td colspan="2" style="vertical-align: middle;">Other queries</td>
<td style="vertical-align: middle;">Append, Update</td>
<td style="vertical-align: middle;">
Complete mode not supported as it is infeasible to keep all unaggregated data in the Result Table.
</td>
</tr>
<tr>
<td></td>
<td></td>
Expand All @@ -994,6 +1073,7 @@ Here is the compatibility matrix.
</tr>
</table>


#### Output Sinks
There are a few types of built-in output sinks.

Expand Down
Loading