Skip to content
1 change: 1 addition & 0 deletions docs/ss-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ Please refer [Migration Guide: SQL, Datasets and DataFrame](sql-migration-guide.

- In Spark 3.0, Structured Streaming forces the source schema into nullable when file-based datasources such as text, json, csv, parquet and orc are used via `spark.readStream(...)`. Previously, it respected the nullability in source schema; however, it caused issues tricky to debug with NPE. To restore the previous behavior, set `spark.sql.streaming.fileSource.schema.forceNullable` to `false`.

- Spark 3.0 fixes the correctness issue on Stream-stream outer join, which changes the schema of state. (SPARK-26154 for more details) Spark 3.0 will fail the query if you start your query from checkpoint constructed from Spark 2.x which uses stream-stream outer join. Please discard the checkpoint and replay previous inputs to recalculate outputs.
Original file line number Diff line number Diff line change
Expand Up @@ -1069,6 +1069,16 @@ object SQLConf {
.checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2")
.createWithDefault(2)

val STREAMING_JOIN_STATE_FORMAT_VERSION =
buildConf("spark.sql.streaming.join.stateFormatVersion")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @tgravescs . The incompatibility of state format was the reason of not having this in branch-2.4.

.internal()
.doc("State format version used by streaming join operations in a streaming query. " +
"State between versions are tend to be incompatible, so state format version shouldn't " +
"be modified after running.")
.intConf
.checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkValues(Set(1, 2))

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Oct 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice suggestion. Btw, the line is actually consistent with the old things, STREAMING_AGGREGATION_STATE_FORMAT_VERSION, FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION. Would we like to change them as well, or just leave it be consistent? Maybe from follow-up PR then to exclude unrelated changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to change the others, feel free to open another PR. But I'd like the better API to be used here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Ignoring the discussion about removing this, in the other comment below.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to leave this as it is, and handle this entirely. We can keep consistency regardless of follow-up PR be accepted or not.

.createWithDefault(2)

val UNSUPPORTED_OPERATION_CHECK_ENABLED =
buildConf("spark.sql.streaming.unsupportedOperationCheck")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right, _)
if left.isStreaming && right.isStreaming =>

new StreamingSymmetricHashJoinExec(
leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
val stateVersion = conf.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION)
new StreamingSymmetricHashJoinExec(leftKeys, rightKeys, joinType, condition,
stateVersion, planLater(left), planLater(right)) :: Nil

case Join(left, right, _, _, _) if left.isStreaming && right.isStreaming =>
throw new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.json4s.jackson.Serialization
import org.apache.spark.internal.Logging
import org.apache.spark.sql.RuntimeConfig
import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2, SparkDataStream}
import org.apache.spark.sql.execution.streaming.state.{FlatMapGroupsWithStateExecHelper, StreamingAggregationStateManager}
import org.apache.spark.sql.execution.streaming.state.{FlatMapGroupsWithStateExecHelper, StreamingAggregationStateManager, SymmetricHashJoinStateManager}
import org.apache.spark.sql.internal.SQLConf.{FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION, _}


Expand Down Expand Up @@ -91,7 +91,8 @@ object OffsetSeqMetadata extends Logging {
private implicit val format = Serialization.formats(NoTypeHints)
private val relevantSQLConfs = Seq(
SHUFFLE_PARTITIONS, STATE_STORE_PROVIDER_CLASS, STREAMING_MULTIPLE_WATERMARK_POLICY,
FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION, STREAMING_AGGREGATION_STATE_FORMAT_VERSION)
FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION, STREAMING_AGGREGATION_STATE_FORMAT_VERSION,
STREAMING_JOIN_STATE_FORMAT_VERSION)

/**
* Default values of relevant configurations that are used for backward compatibility.
Expand All @@ -108,7 +109,9 @@ object OffsetSeqMetadata extends Logging {
FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION.key ->
FlatMapGroupsWithStateExecHelper.legacyVersion.toString,
STREAMING_AGGREGATION_STATE_FORMAT_VERSION.key ->
StreamingAggregationStateManager.legacyVersion.toString
StreamingAggregationStateManager.legacyVersion.toString,
STREAMING_JOIN_STATE_FORMAT_VERSION.key ->
SymmetricHashJoinStateManager.legacyVersion.toString
)

def apply(json: String): OffsetSeqMetadata = Serialization.read[OffsetSeqMetadata](json)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper._
import org.apache.spark.sql.execution.streaming.state._
import org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.KeyToValuePair
import org.apache.spark.sql.internal.SessionState
import org.apache.spark.util.{CompletionIterator, SerializableConfiguration}

Expand Down Expand Up @@ -131,6 +132,7 @@ case class StreamingSymmetricHashJoinExec(
stateInfo: Option[StatefulOperatorStateInfo],
eventTimeWatermark: Option[Long],
stateWatermarkPredicates: JoinStateWatermarkPredicates,
stateFormatVersion: Int,
left: SparkPlan,
right: SparkPlan) extends SparkPlan with BinaryExecNode with StateStoreWriter {

Expand All @@ -139,13 +141,20 @@ case class StreamingSymmetricHashJoinExec(
rightKeys: Seq[Expression],
joinType: JoinType,
condition: Option[Expression],
stateFormatVersion: Int,
left: SparkPlan,
right: SparkPlan) = {

this(
leftKeys, rightKeys, joinType, JoinConditionSplitPredicates(condition, left, right),
stateInfo = None, eventTimeWatermark = None,
stateWatermarkPredicates = JoinStateWatermarkPredicates(), left, right)
stateWatermarkPredicates = JoinStateWatermarkPredicates(), stateFormatVersion, left, right)
}

if (stateFormatVersion < 2 && joinType != Inner) {
throw new IllegalArgumentException("The query is using stream-stream outer join with state" +
s" format version ${stateFormatVersion} - correctness issue is discovered. Please discard" +
" the checkpoint and rerun the query. See SPARK-26154 for more details.")
}

private def throwBadJoinTypeException(): Nothing = {
Expand Down Expand Up @@ -270,20 +279,30 @@ case class StreamingSymmetricHashJoinExec(
// * Getting an iterator over the rows that have aged out on the left side. These rows are
// candidates for being null joined. Note that to avoid doing two passes, this iterator
// removes the rows from the state manager as they're processed.
// * Checking whether the current row matches a key in the right side state, and that key
// has any value which satisfies the filter function when joined. If it doesn't,
// we know we can join with null, since there was never (including this batch) a match
// within the watermark period. If it does, there must have been a match at some point, so
// we know we can't join with null.
// * (state format version 1) Checking whether the current row matches a key in the
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left origin comment as it is since it is still applied for state format version 1, and added comment for explaining state format version 2 and the reason of making a change. Please let me know if we just want to remove explanation of state format version 1.

// right side state, and that key has any value which satisfies the filter function when
// joined. If it doesn't, we know we can join with null, since there was never
// (including this batch) a match within the watermark period. If it does, there must have
// been a match at some point, so we know we can't join with null.
// * (state format version 2) We found edge-case of above approach which brings correctness
// issue, and had to take another approach (see SPARK-26154); now Spark stores 'matched'
// flag along with row, which is set to true when there's any matching row on the right.

def matchesWithRightSideState(leftKeyValue: UnsafeRowPair) = {
rightSideJoiner.get(leftKeyValue.key).exists { rightValue =>
postJoinFilter(joinedRow.withLeft(leftKeyValue.value).withRight(rightValue))
}
}
val removedRowIter = leftSideJoiner.removeOldState()
val outerOutputIter = removedRowIter
.filterNot(pair => matchesWithRightSideState(pair))
.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
val outerOutputIter = removedRowIter.filterNot { kv =>
stateFormatVersion match {
case 1 => matchesWithRightSideState(new UnsafeRowPair(kv.key, kv.value))
case 2 => kv.matched
case _ =>
throw new IllegalStateException("Unexpected state format version! " +
s"version $stateFormatVersion")
}
}.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))

innerOutputIter ++ outerOutputIter
case RightOuter =>
Expand All @@ -294,9 +313,15 @@ case class StreamingSymmetricHashJoinExec(
}
}
val removedRowIter = rightSideJoiner.removeOldState()
val outerOutputIter = removedRowIter
.filterNot(pair => matchesWithLeftSideState(pair))
.map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value))
val outerOutputIter = removedRowIter.filterNot { kv =>
stateFormatVersion match {
case 1 => matchesWithLeftSideState(new UnsafeRowPair(kv.key, kv.value))
case 2 => kv.matched
case _ =>
throw new IllegalStateException("Unexpected state format version! " +
s"version $stateFormatVersion")
}
}.map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value))

innerOutputIter ++ outerOutputIter
case _ => throwBadJoinTypeException()
Expand Down Expand Up @@ -395,7 +420,8 @@ case class StreamingSymmetricHashJoinExec(
newPredicate(preJoinFilterExpr.getOrElse(Literal(true)), inputAttributes).eval _

private val joinStateManager = new SymmetricHashJoinStateManager(
joinSide, inputAttributes, joinKeys, stateInfo, storeConf, hadoopConfBcast.value.value)
joinSide, inputAttributes, joinKeys, stateInfo, storeConf, hadoopConfBcast.value.value,
stateFormatVersion)
private[this] val keyGenerator = UnsafeProjection.create(joinKeys, inputAttributes)

private[this] val stateKeyWatermarkPredicateFunc = stateWatermarkPredicate match {
Expand Down Expand Up @@ -445,16 +471,9 @@ case class StreamingSymmetricHashJoinExec(
// the case of inner join).
if (preJoinFilter(thisRow)) {
val key = keyGenerator(thisRow)
val outputIter = otherSideJoiner.joinStateManager.get(key).map { thatRow =>
generateJoinedRow(thisRow, thatRow)
}.filter(postJoinFilter)
val shouldAddToState = // add only if both removal predicates do not match
!stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow)
if (shouldAddToState) {
joinStateManager.append(key, thisRow)
updatedStateRowsCount += 1
}
outputIter
val outputIter: Iterator[JoinedRow] = otherSideJoiner.joinStateManager
.getJoinedRows(key, thatRow => generateJoinedRow(thisRow, thatRow), postJoinFilter)
new AddingProcessedRowToStateCompletionIterator(key, thisRow, outputIter)
} else {
joinSide match {
case LeftSide if joinType == LeftOuter =>
Expand All @@ -467,6 +486,23 @@ case class StreamingSymmetricHashJoinExec(
}
}

private class AddingProcessedRowToStateCompletionIterator(
key: UnsafeRow,
thisRow: UnsafeRow,
subIter: Iterator[JoinedRow])
extends CompletionIterator[JoinedRow, Iterator[JoinedRow]](subIter) {
private val iteratorNotEmpty: Boolean = super.hasNext

override def completion(): Unit = {
val shouldAddToState = // add only if both removal predicates do not match
!stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow)
if (shouldAddToState) {
joinStateManager.append(key, thisRow, matched = iteratorNotEmpty)
updatedStateRowsCount += 1
}
}
}

/**
* Get an iterator over the values stored in this joiner's state manager for the given key.
*
Expand All @@ -486,7 +522,7 @@ case class StreamingSymmetricHashJoinExec(
* We do this to avoid requiring either two passes or full materialization when
* processing the rows for outer join.
*/
def removeOldState(): Iterator[UnsafeRowPair] = {
def removeOldState(): Iterator[KeyToValuePair] = {
stateWatermarkPredicate match {
case Some(JoinStateKeyWatermarkPredicate(expr)) =>
joinStateManager.removeByKeyCondition(stateKeyWatermarkPredicateFunc)
Expand Down
Loading