forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 4
Update upstream #14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Update upstream #14
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…om Spark runtime routines in generated Java code
## What changes were proposed in this pull request?
This PR elminates unnecessary nullchecks of a return value from known Spark runtime routines. We know whether a given Spark runtime routine returns ``null`` or not (e.g. ``ArrayData.toDoubleArray()`` never returns ``null``). Thus, we can eliminate a null check for the return value from the Spark runtime routine.
When we run the following example program, now we get the Java code "Without this PR". In this code, since we know ``ArrayData.toDoubleArray()`` never returns ``null```, we can eliminate null checks at lines 90-92, and 97.
```java
val ds = sparkContext.parallelize(Seq(Array(1.1, 2.2)), 1).toDS.cache
ds.count
ds.map(e => e).show
```
Without this PR
```java
/* 050 */ protected void processNext() throws java.io.IOException {
/* 051 */ while (inputadapter_input.hasNext() && !stopEarly()) {
/* 052 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 053 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 054 */ ArrayData inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getArray(0));
/* 055 */
/* 056 */ ArrayData deserializetoobject_value1 = null;
/* 057 */
/* 058 */ if (!inputadapter_isNull) {
/* 059 */ int deserializetoobject_dataLength = inputadapter_value.numElements();
/* 060 */
/* 061 */ Double[] deserializetoobject_convertedArray = null;
/* 062 */ deserializetoobject_convertedArray = new Double[deserializetoobject_dataLength];
/* 063 */
/* 064 */ int deserializetoobject_loopIndex = 0;
/* 065 */ while (deserializetoobject_loopIndex < deserializetoobject_dataLength) {
/* 066 */ MapObjects_loopValue2 = (double) (inputadapter_value.getDouble(deserializetoobject_loopIndex));
/* 067 */ MapObjects_loopIsNull2 = inputadapter_value.isNullAt(deserializetoobject_loopIndex);
/* 068 */
/* 069 */ if (MapObjects_loopIsNull2) {
/* 070 */ throw new RuntimeException(((java.lang.String) references[0]));
/* 071 */ }
/* 072 */ if (false) {
/* 073 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = null;
/* 074 */ } else {
/* 075 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = MapObjects_loopValue2;
/* 076 */ }
/* 077 */
/* 078 */ deserializetoobject_loopIndex += 1;
/* 079 */ }
/* 080 */
/* 081 */ deserializetoobject_value1 = new org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray); /*###*/
/* 082 */ }
/* 083 */ boolean deserializetoobject_isNull = true;
/* 084 */ double[] deserializetoobject_value = null;
/* 085 */ if (!inputadapter_isNull) {
/* 086 */ deserializetoobject_isNull = false;
/* 087 */ if (!deserializetoobject_isNull) {
/* 088 */ Object deserializetoobject_funcResult = null;
/* 089 */ deserializetoobject_funcResult = deserializetoobject_value1.toDoubleArray();
/* 090 */ if (deserializetoobject_funcResult == null) {
/* 091 */ deserializetoobject_isNull = true;
/* 092 */ } else {
/* 093 */ deserializetoobject_value = (double[]) deserializetoobject_funcResult;
/* 094 */ }
/* 095 */
/* 096 */ }
/* 097 */ deserializetoobject_isNull = deserializetoobject_value == null;
/* 098 */ }
/* 099 */
/* 100 */ boolean mapelements_isNull = true;
/* 101 */ double[] mapelements_value = null;
/* 102 */ if (!false) {
/* 103 */ mapelements_resultIsNull = false;
/* 104 */
/* 105 */ if (!mapelements_resultIsNull) {
/* 106 */ mapelements_resultIsNull = deserializetoobject_isNull;
/* 107 */ mapelements_argValue = deserializetoobject_value;
/* 108 */ }
/* 109 */
/* 110 */ mapelements_isNull = mapelements_resultIsNull;
/* 111 */ if (!mapelements_isNull) {
/* 112 */ Object mapelements_funcResult = null;
/* 113 */ mapelements_funcResult = ((scala.Function1) references[1]).apply(mapelements_argValue);
/* 114 */ if (mapelements_funcResult == null) {
/* 115 */ mapelements_isNull = true;
/* 116 */ } else {
/* 117 */ mapelements_value = (double[]) mapelements_funcResult;
/* 118 */ }
/* 119 */
/* 120 */ }
/* 121 */ mapelements_isNull = mapelements_value == null;
/* 122 */ }
/* 123 */
/* 124 */ serializefromobject_resultIsNull = false;
/* 125 */
/* 126 */ if (!serializefromobject_resultIsNull) {
/* 127 */ serializefromobject_resultIsNull = mapelements_isNull;
/* 128 */ serializefromobject_argValue = mapelements_value;
/* 129 */ }
/* 130 */
/* 131 */ boolean serializefromobject_isNull = serializefromobject_resultIsNull;
/* 132 */ final ArrayData serializefromobject_value = serializefromobject_resultIsNull ? null : org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(serializefromobject_argValue);
/* 133 */ serializefromobject_isNull = serializefromobject_value == null;
/* 134 */ serializefromobject_holder.reset();
/* 135 */
/* 136 */ serializefromobject_rowWriter.zeroOutNullBytes();
/* 137 */
/* 138 */ if (serializefromobject_isNull) {
/* 139 */ serializefromobject_rowWriter.setNullAt(0);
/* 140 */ } else {
/* 141 */ // Remember the current cursor so that we can calculate how many bytes are
/* 142 */ // written later.
/* 143 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor;
/* 144 */
/* 145 */ if (serializefromobject_value instanceof UnsafeArrayData) {
/* 146 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes();
/* 147 */ // grow the global buffer before writing data.
/* 148 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes);
/* 149 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor);
/* 150 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes;
/* 151 */
/* 152 */ } else {
/* 153 */ final int serializefromobject_numElements = serializefromobject_value.numElements();
/* 154 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 8);
/* 155 */
/* 156 */ for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) {
/* 157 */ if (serializefromobject_value.isNullAt(serializefromobject_index)) {
/* 158 */ serializefromobject_arrayWriter.setNullDouble(serializefromobject_index);
/* 159 */ } else {
/* 160 */ final double serializefromobject_element = serializefromobject_value.getDouble(serializefromobject_index);
/* 161 */ serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element);
/* 162 */ }
/* 163 */ }
/* 164 */ }
/* 165 */
/* 166 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor);
/* 167 */ }
/* 168 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize());
/* 169 */ append(serializefromobject_result);
/* 170 */ if (shouldStop()) return;
/* 171 */ }
/* 172 */ }
```
With this PR (removed most of lines 90-97 in the above code)
```java
/* 050 */ protected void processNext() throws java.io.IOException {
/* 051 */ while (inputadapter_input.hasNext() && !stopEarly()) {
/* 052 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 053 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 054 */ ArrayData inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getArray(0));
/* 055 */
/* 056 */ ArrayData deserializetoobject_value1 = null;
/* 057 */
/* 058 */ if (!inputadapter_isNull) {
/* 059 */ int deserializetoobject_dataLength = inputadapter_value.numElements();
/* 060 */
/* 061 */ Double[] deserializetoobject_convertedArray = null;
/* 062 */ deserializetoobject_convertedArray = new Double[deserializetoobject_dataLength];
/* 063 */
/* 064 */ int deserializetoobject_loopIndex = 0;
/* 065 */ while (deserializetoobject_loopIndex < deserializetoobject_dataLength) {
/* 066 */ MapObjects_loopValue2 = (double) (inputadapter_value.getDouble(deserializetoobject_loopIndex));
/* 067 */ MapObjects_loopIsNull2 = inputadapter_value.isNullAt(deserializetoobject_loopIndex);
/* 068 */
/* 069 */ if (MapObjects_loopIsNull2) {
/* 070 */ throw new RuntimeException(((java.lang.String) references[0]));
/* 071 */ }
/* 072 */ if (false) {
/* 073 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = null;
/* 074 */ } else {
/* 075 */ deserializetoobject_convertedArray[deserializetoobject_loopIndex] = MapObjects_loopValue2;
/* 076 */ }
/* 077 */
/* 078 */ deserializetoobject_loopIndex += 1;
/* 079 */ }
/* 080 */
/* 081 */ deserializetoobject_value1 = new org.apache.spark.sql.catalyst.util.GenericArrayData(deserializetoobject_convertedArray); /*###*/
/* 082 */ }
/* 083 */ boolean deserializetoobject_isNull = true;
/* 084 */ double[] deserializetoobject_value = null;
/* 085 */ if (!inputadapter_isNull) {
/* 086 */ deserializetoobject_isNull = false;
/* 087 */ if (!deserializetoobject_isNull) {
/* 088 */ Object deserializetoobject_funcResult = null;
/* 089 */ deserializetoobject_funcResult = deserializetoobject_value1.toDoubleArray();
/* 090 */ deserializetoobject_value = (double[]) deserializetoobject_funcResult;
/* 091 */
/* 092 */ }
/* 093 */
/* 094 */ }
/* 095 */
/* 096 */ boolean mapelements_isNull = true;
/* 097 */ double[] mapelements_value = null;
/* 098 */ if (!false) {
/* 099 */ mapelements_resultIsNull = false;
/* 100 */
/* 101 */ if (!mapelements_resultIsNull) {
/* 102 */ mapelements_resultIsNull = deserializetoobject_isNull;
/* 103 */ mapelements_argValue = deserializetoobject_value;
/* 104 */ }
/* 105 */
/* 106 */ mapelements_isNull = mapelements_resultIsNull;
/* 107 */ if (!mapelements_isNull) {
/* 108 */ Object mapelements_funcResult = null;
/* 109 */ mapelements_funcResult = ((scala.Function1) references[1]).apply(mapelements_argValue);
/* 110 */ if (mapelements_funcResult == null) {
/* 111 */ mapelements_isNull = true;
/* 112 */ } else {
/* 113 */ mapelements_value = (double[]) mapelements_funcResult;
/* 114 */ }
/* 115 */
/* 116 */ }
/* 117 */ mapelements_isNull = mapelements_value == null;
/* 118 */ }
/* 119 */
/* 120 */ serializefromobject_resultIsNull = false;
/* 121 */
/* 122 */ if (!serializefromobject_resultIsNull) {
/* 123 */ serializefromobject_resultIsNull = mapelements_isNull;
/* 124 */ serializefromobject_argValue = mapelements_value;
/* 125 */ }
/* 126 */
/* 127 */ boolean serializefromobject_isNull = serializefromobject_resultIsNull;
/* 128 */ final ArrayData serializefromobject_value = serializefromobject_resultIsNull ? null : org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(serializefromobject_argValue);
/* 129 */ serializefromobject_isNull = serializefromobject_value == null;
/* 130 */ serializefromobject_holder.reset();
/* 131 */
/* 132 */ serializefromobject_rowWriter.zeroOutNullBytes();
/* 133 */
/* 134 */ if (serializefromobject_isNull) {
/* 135 */ serializefromobject_rowWriter.setNullAt(0);
/* 136 */ } else {
/* 137 */ // Remember the current cursor so that we can calculate how many bytes are
/* 138 */ // written later.
/* 139 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor;
/* 140 */
/* 141 */ if (serializefromobject_value instanceof UnsafeArrayData) {
/* 142 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes();
/* 143 */ // grow the global buffer before writing data.
/* 144 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes);
/* 145 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor);
/* 146 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes;
/* 147 */
/* 148 */ } else {
/* 149 */ final int serializefromobject_numElements = serializefromobject_value.numElements();
/* 150 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 8);
/* 151 */
/* 152 */ for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) {
/* 153 */ if (serializefromobject_value.isNullAt(serializefromobject_index)) {
/* 154 */ serializefromobject_arrayWriter.setNullDouble(serializefromobject_index);
/* 155 */ } else {
/* 156 */ final double serializefromobject_element = serializefromobject_value.getDouble(serializefromobject_index);
/* 157 */ serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element);
/* 158 */ }
/* 159 */ }
/* 160 */ }
/* 161 */
/* 162 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor);
/* 163 */ }
/* 164 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize());
/* 165 */ append(serializefromobject_result);
/* 166 */ if (shouldStop()) return;
/* 167 */ }
/* 168 */ }
```
## How was this patch tested?
Add test suites to ``DatasetPrimitiveSuite``
Author: Kazuaki Ishizaki <[email protected]>
Closes #17569 from kiszk/SPARK-20253.
## What changes were proposed in this pull request? sq/core module currently declares asm as a test scope dependency. Transitively it should actually be a normal dependency since the actual core module defines it. This occasionally confuses IntelliJ. ## How was this patch tested? N/A - This is a build change. Author: Reynold Xin <[email protected]> Closes #17574 from rxin/SPARK-20264.
…teger when the default value is in double ## What changes were proposed in this pull request? This bug was partially addressed in SPARK-18555 #15994, but the root cause isn't completely solved. This bug is pretty critical since it changes the member id in Long in our application if the member id can not be represented by Double losslessly when the member id is very big. Here is an example how this happens, with ``` Seq[(java.lang.Long, java.lang.Double)]((null, 3.14), (9123146099426677101L, null), (9123146560113991650L, 1.6), (null, null)).toDF("a", "b").na.fill(0.2), ``` the logical plan will be ``` == Analyzed Logical Plan == a: bigint, b: double Project [cast(coalesce(cast(a#232L as double), cast(0.2 as double)) as bigint) AS a#240L, cast(coalesce(nanvl(b#233, cast(null as double)), 0.2) as double) AS b#241] +- Project [_1#229L AS a#232L, _2#230 AS b#233] +- LocalRelation [_1#229L, _2#230] ``` Note that even the value is not null, Spark will cast the Long into Double first. Then if it's not null, Spark will cast it back to Long which results in losing precision. The behavior should be that the original value should not be changed if it's not null, but Spark will change the value which is wrong. With the PR, the logical plan will be ``` == Analyzed Logical Plan == a: bigint, b: double Project [coalesce(a#232L, cast(0.2 as bigint)) AS a#240L, coalesce(nanvl(b#233, cast(null as double)), cast(0.2 as double)) AS b#241] +- Project [_1#229L AS a#232L, _2#230 AS b#233] +- LocalRelation [_1#229L, _2#230] ``` which behaves correctly without changing the original Long values and also avoids extra cost of unnecessary casting. ## How was this patch tested? unit test added. +cc srowen rxin cloud-fan gatorsmile Thanks. Author: DB Tsai <[email protected]> Closes #17577 from dbtsai/fixnafill.
## What changes were proposed in this pull request? Like `Expression`, `QueryPlan` should also have a `semanticHash` method, then we can put plans to a hash map and look it up fast. This PR refactors `QueryPlan` to follow `Expression` and put all the normalization logic in `QueryPlan.canonicalized`, so that it's very natural to implement `semanticHash`. follow-up: improve `CacheManager` to leverage this `semanticHash` and speed up plan lookup, instead of iterating all cached plans. ## How was this patch tested? existing tests. Note that we don't need to test the `semanticHash` method, once the existing tests prove `sameResult` is correct, we are good. Author: Wenchen Fan <[email protected]> Closes #17541 from cloud-fan/plan-semantic.
## What changes were proposed in this pull request? Synchronize access to openStreams map. ## How was this patch tested? Existing tests. Author: Bogdan Raducanu <[email protected]> Closes #17592 from bogdanrdc/SPARK-20243.
## What changes were proposed in this pull request? This PR proposes to add `IGNORE NULLS` keyword in `first`/`last` in Spark's parser likewise http://docs.oracle.com/cd/B19306_01/server.102/b14200/functions057.htm. This simply maps the keywords to existing `ignoreNullsExpr`. **Before** ```scala scala> sql("select first('a' IGNORE NULLS)").show() ``` ``` org.apache.spark.sql.catalyst.parser.ParseException: extraneous input 'NULLS' expecting {')', ','}(line 1, pos 24) == SQL == select first('a' IGNORE NULLS) ------------------------^^^ at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:210) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:112) at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:46) at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:66) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:622) ... 48 elided ``` **After** ```scala scala> sql("select first('a' IGNORE NULLS)").show() ``` ``` +--------------+ |first(a, true)| +--------------+ | a| +--------------+ ``` ## How was this patch tested? Unit tests in `ExpressionParserSuite`. Author: hyukjinkwon <[email protected]> Closes #17566 from HyukjinKwon/SPARK-19518.
…oin Conditions
## What changes were proposed in this pull request?
```
sql("SELECT t1.b, rand(0) as r FROM cachedData, cachedData t1 GROUP BY t1.b having r > 0.5").show()
```
We will get the following error:
```
Job aborted due to stage failure: Task 1 in stage 4.0 failed 1 times, most recent failure: Lost task 1.0 in stage 4.0 (TID 8, localhost, executor driver): java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec$$anonfun$org$apache$spark$sql$execution$joins$BroadcastNestedLoopJoinExec$$boundCondition$1.apply(BroadcastNestedLoopJoinExec.scala:87)
at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec$$anonfun$org$apache$spark$sql$execution$joins$BroadcastNestedLoopJoinExec$$boundCondition$1.apply(BroadcastNestedLoopJoinExec.scala:87)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
```
Filters could be pushed down to the join conditions by the optimizer rule `PushPredicateThroughJoin`. However, Analyzer [blocks users to add non-deterministics conditions](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L386-L395) (For details, see the PR #7535).
We should not push down non-deterministic conditions; otherwise, we need to explicitly initialize the non-deterministic expressions. This PR is to simply block it.
### How was this patch tested?
Added a test case
Author: Xiao Li <[email protected]>
Closes #17585 from gatorsmile/joinRandCondition.
…urkish locale bug" causes Spark problems ## What changes were proposed in this pull request? Add Locale.ROOT to internal calls to String `toLowerCase`, `toUpperCase`, to avoid inadvertent locale-sensitive variation in behavior (aka the "Turkish locale problem"). The change looks large but it is just adding `Locale.ROOT` (the locale with no country or language specified) to every call to these methods. ## How was this patch tested? Existing tests. Author: Sean Owen <[email protected]> Closes #17527 from srowen/SPARK-20156.
## What changes were proposed in this pull request? Weigher.weigh needs to return Int but it is possible for an Array[FileStatus] to have size > Int.maxValue. To avoid this, the size is scaled down by a factor of 32. The maximumWeight of the cache is also scaled down by the same factor. ## How was this patch tested? New test in FileIndexSuite Author: Bogdan Raducanu <[email protected]> Closes #17591 from bogdanrdc/SPARK-20280.
…0 seconds
## What changes were proposed in this pull request?
Saw the following failure locally:
```
Traceback (most recent call last):
File "/home/jenkins/workspace/python/pyspark/streaming/tests.py", line 351, in test_cogroup
self._test_func(input, func, expected, sort=True, input2=input2)
File "/home/jenkins/workspace/python/pyspark/streaming/tests.py", line 162, in _test_func
self.assertEqual(expected, result)
AssertionError: Lists differ: [[(1, ([1], [2])), (2, ([1], [... != []
First list contains 3 additional elements.
First extra element 0:
[(1, ([1], [2])), (2, ([1], [])), (3, ([1], []))]
+ []
- [[(1, ([1], [2])), (2, ([1], [])), (3, ([1], []))],
- [(1, ([1, 1, 1], [])), (2, ([1], [])), (4, ([], [1]))],
- [('', ([1, 1], [1, 2])), ('a', ([1, 1], [1, 1])), ('b', ([1], [1]))]]
```
It also happened on Jenkins: http://spark-tests.appspot.com/builds/spark-branch-2.1-test-sbt-hadoop-2.7/120
It's because when the machine is overloaded, the timeout is not enough. This PR just increases the timeout to 30 seconds.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <[email protected]>
Closes #17597 from zsxwing/SPARK-20285.
…tion in tests ## What changes were proposed in this pull request? This PR fixes the following failure: ``` sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: Assert on query failed: == Progress == AssertOnQuery(<condition>, ) StopStream AddData to MemoryStream[value#30891]: 1,2 StartStream(OneTimeTrigger,org.apache.spark.util.SystemClock35cdc93a,Map()) CheckAnswer: [6],[3] StopStream => AssertOnQuery(<condition>, ) AssertOnQuery(<condition>, ) StartStream(OneTimeTrigger,org.apache.spark.util.SystemClockcdb247d,Map()) CheckAnswer: [6],[3] StopStream AddData to MemoryStream[value#30891]: 3 StartStream(OneTimeTrigger,org.apache.spark.util.SystemClock55394e4d,Map()) CheckLastBatch: [2] StopStream AddData to MemoryStream[value#30891]: 0 StartStream(OneTimeTrigger,org.apache.spark.util.SystemClock749aa997,Map()) ExpectFailure[org.apache.spark.SparkException, isFatalError: false] AssertOnQuery(<condition>, ) AssertOnQuery(<condition>, incorrect start offset or end offset on exception) == Stream == Output Mode: Append Stream state: not started Thread state: dead == Sink == 0: [6] [3] == Plan == at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:495) at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555) at org.scalatest.Assertions$class.fail(Assertions.scala:1328) at org.scalatest.FunSuite.fail(FunSuite.scala:1555) at org.apache.spark.sql.streaming.StreamTest$class.failTest$1(StreamTest.scala:347) at org.apache.spark.sql.streaming.StreamTest$class.verify$1(StreamTest.scala:318) at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:483) at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:357) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.sql.streaming.StreamTest$class.liftedTree1$1(StreamTest.scala:357) at org.apache.spark.sql.streaming.StreamTest$class.testStream(StreamTest.scala:356) at org.apache.spark.sql.streaming.StreamingQuerySuite.testStream(StreamingQuerySuite.scala:41) at org.apache.spark.sql.streaming.StreamingQuerySuite$$anonfun$6.apply$mcV$sp(StreamingQuerySuite.scala:166) at org.apache.spark.sql.streaming.StreamingQuerySuite$$anonfun$6.apply(StreamingQuerySuite.scala:161) at org.apache.spark.sql.streaming.StreamingQuerySuite$$anonfun$6.apply(StreamingQuerySuite.scala:161) at org.apache.spark.sql.catalyst.util.package$.quietly(package.scala:42) at org.apache.spark.sql.test.SQLTestUtils$$anonfun$testQuietly$1.apply$mcV$sp(SQLTestUtils.scala:268) at org.apache.spark.sql.test.SQLTestUtils$$anonfun$testQuietly$1.apply(SQLTestUtils.scala:268) at org.apache.spark.sql.test.SQLTestUtils$$anonfun$testQuietly$1.apply(SQLTestUtils.scala:268) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166) at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:68) at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175) at org.apache.spark.sql.streaming.StreamingQuerySuite.org$scalatest$BeforeAndAfterEach$$super$runTest(StreamingQuerySuite.scala:41) at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:255) at org.apache.spark.sql.streaming.StreamingQuerySuite.org$scalatest$BeforeAndAfter$$super$runTest(StreamingQuerySuite.scala:41) at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:200) at org.apache.spark.sql.streaming.StreamingQuerySuite.runTest(StreamingQuerySuite.scala:41) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) at scala.collection.immutable.List.foreach(List.scala:381) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483) at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208) at org.scalatest.FunSuite.runTests(FunSuite.scala:1555) at org.scalatest.Suite$class.run(Suite.scala:1424) at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555) at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) at org.scalatest.SuperEngine.runImpl(Engine.scala:545) at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212) at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:31) at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257) at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256) at org.apache.spark.sql.streaming.StreamingQuerySuite.org$scalatest$BeforeAndAfter$$super$run(StreamingQuerySuite.scala:41) at org.scalatest.BeforeAndAfter$class.run(BeforeAndAfter.scala:241) at org.apache.spark.sql.streaming.StreamingQuerySuite.run(StreamingQuerySuite.scala:41) at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:357) at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:502) at sbt.ForkMain$Run$2.call(ForkMain.java:296) at sbt.ForkMain$Run$2.call(ForkMain.java:286) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` The failure is because `CheckAnswer` will run once `committedOffsets` is updated. Then writing the commit log may be interrupted by the following `StopStream`. This PR just change the order to write the commit log first. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes #17594 from zsxwing/SPARK-20282.
## What changes were proposed in this pull request? We currently have postHocOptimizationBatches, but not preOptimizationBatches. This patch adds preOptimizationBatches so the optimizer debugging extensions are symmetric. ## How was this patch tested? N/A Author: Reynold Xin <[email protected]> Closes #17595 from rxin/SPARK-20283.
GulajavaMinistudio
pushed a commit
that referenced
this pull request
Nov 18, 2025
…onicalized expressions
### What changes were proposed in this pull request?
Make PullOutNonDeterministic use canonicalized expressions to dedup group and aggregate expressions. This affects pyspark udfs in particular. Example:
```
from pyspark.sql.functions import col, avg, udf
pythonUDF = udf(lambda x: x).asNondeterministic()
spark.range(10)\
.selectExpr("id", "id % 3 as value")\
.groupBy(pythonUDF(col("value")))\
.agg(avg("id"), pythonUDF(col("value")))\
.explain(extended=True)
```
Currently results in a plan like this:
```
Aggregate [_nondeterministic#15](#15), [_nondeterministic#15 AS dummyNondeterministicUDF(value)#12, avg(id#0L) AS avg(id)#13, dummyNondeterministicUDF(value#6L)#8 AS dummyNondeterministicUDF(value)#14](#15%20AS%20dummyNondeterministicUDF(value)#12,%20avg(id#0L)%20AS%20avg(id)#13,%20dummyNondeterministicUDF(value#6L)#8%20AS%20dummyNondeterministicUDF(value)#14)
+- Project [id#0L, value#6L, dummyNondeterministicUDF(value#6L)#7 AS _nondeterministic#15](#0L,%20value#6L,%20dummyNondeterministicUDF(value#6L)#7%20AS%20_nondeterministic#15)
+- Project [id#0L, (id#0L % cast(3 as bigint)) AS value#6L](#0L,%20(id#0L%20%%20cast(3%20as%20bigint))%20AS%20value#6L)
+- Range (0, 10, step=1, splits=Some(2))
```
and then it throws:
```
[[MISSING_AGGREGATION] The non-aggregating expression "value" is based on columns which are not participating in the GROUP BY clause. Add the columns or the expression to the GROUP BY, aggregate the expression, or use "any_value(value)" if you do not care which of the values within a group is returned. SQLSTATE: 42803
```
- how canonicalized fixes this:
- nondeterministic PythonUDF expressions always have distinct resultIds per udf
- The fix is to canonicalize the expressions when matching. Canonicalized means that we're setting the resultIds to -1, allowing us to dedup the PythonUDF expressions.
- for deterministic UDFs, this rule does not apply and "Post Analysis" batch extracts and deduplicates the expressions, as expected
### Why are the changes needed?
- the output of the query with the fix applied still makes sense - the nondeterministic UDF is invoked only once, in the project.
### Does this PR introduce _any_ user-facing change?
Yes, it's additive, it enables queries to run that previously threw errors.
### How was this patch tested?
- added unit test
### Was this patch authored or co-authored using generative AI tooling?
No
Closes apache#52061 from benrobby/adhoc-fix-pull-out-nondeterministic.
Authored-by: Ben Hurdelhey <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.