-
Notifications
You must be signed in to change notification settings - Fork 28.9k
Branch 2.1 #19187
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Branch 2.1 #19187
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
…umentation ## What changes were proposed in this pull request? SPARK-19444 imports not being present in documentation ## How was this patch tested? Manual ## Disclaimer Contribution is original work and I license the work to the project under the project’s open source license Author: Aseem Bansal <[email protected]> Closes #16789 from anshbansal/patch-1. (cherry picked from commit aee2bd2) Signed-off-by: Sean Owen <[email protected]>
Today, you can start a stream that reads from kafka. However, given kafka's configurable retention period, it seems like sometimes you might just want to read all of the data that is available now. As such we should add a version that works with spark.read as well. The options should be the same as the streaming kafka source, with the following differences: startingOffsets should default to earliest, and should not allow latest (which would always be empty). endingOffsets should also be allowed and should default to latest. the same assign json format as startingOffsets should also be accepted. It would be really good, if things like .limit(n) were enough to prevent all the data from being read (this might just work). KafkaRelationSuite was added for testing batch queries via KafkaUtils. Author: Tyson Condie <[email protected]> Closes #16686 from tcondie/SPARK-18682. (cherry picked from commit 8df4444) Signed-off-by: Shixiong Zhu <[email protected]>
## What changes were proposed in this pull request? addBatch method in Sink trait is supposed to be a synchronous method to coordinate with the fault-tolerance design in StreamingExecution (being different with the compute() method in DStream) We need to add more notes in the comments of this method to remind the developers ## How was this patch tested? existing tests Author: CodingCat <[email protected]> Closes #16840 from CodingCat/SPARK-19499. (cherry picked from commit d4cd975) Signed-off-by: Shixiong Zhu <[email protected]>
…treaming doc There is a typo in http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-stream , python example n1 uses `readStream()` instead of `readStream` Just removed the parenthesis. Author: manugarri <[email protected]> Closes #16836 from manugarri/fix_kafka_python_doc. (cherry picked from commit 5a0569c) Signed-off-by: Shixiong Zhu <[email protected]>
…oval in the optimizer This is a backport of 73ee739 ## What changes were proposed in this pull request? The optimizer tries to remove redundant alias only projections from the query plan using the `RemoveAliasOnlyProject` rule. The current rule identifies removes such a project and rewrites the project's attributes in the **entire** tree. This causes problems when parts of the tree are duplicated (for instance a self join on a temporary view/CTE) and the duplicated part contains the alias only project, in this case the rewrite will break the tree. This PR fixes these problems by using a blacklist for attributes that are not to be moved, and by making sure that attribute remapping is only done for the parent tree, and not for unrelated parts of the query plan. The current tree transformation infrastructure works very well if the transformation at hand requires little or a global contextual information. In this case we need to know both the attributes that were not to be moved, and we also needed to know which child attributes were modified. This cannot be done easily using the current infrastructure, and solutions typically involves transversing the query plan multiple times (which is super slow). I have moved around some code in `TreeNode`, `QueryPlan` and `LogicalPlan`to make this much more straightforward; this basically allows you to manually traverse the tree. ## How was this patch tested? I have added unit tests to `RemoveRedundantAliasAndProjectSuite` and I have added integration tests to the `SQLQueryTestSuite.union` and `SQLQueryTestSuite.cte` test cases. Author: Herman van Hovell <[email protected]> Closes #16843 from hvanhovell/SPARK-18609-2.1.
…s for branch-2.1 This is a follow up PR for merging #16758 to spark 2.1 branch ## What changes were proposed in this pull request? `mapGroupsWithState` is a new API for arbitrary stateful operations in Structured Streaming, similar to `DStream.mapWithState` *Requirements* - Users should be able to specify a function that can do the following - Access the input row corresponding to a key - Access the previous state corresponding to a key - Optionally, update or remove the state - Output any number of new rows (or none at all) *Proposed API* ``` // ------------ New methods on KeyValueGroupedDataset ------------ class KeyValueGroupedDataset[K, V] { // Scala friendly def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], KeyedState[S]) => U) def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, Iterator[V], KeyedState[S]) => Iterator[U]) // Java friendly def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U]) def flatMapGroupsWithState[S, U](func: FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U]) } // ------------------- New Java-friendly function classes ------------------- public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable { R call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception; } public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends Serializable { Iterator<R> call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception; } // ---------------------- Wrapper class for state data ---------------------- trait KeyedState[S] { def exists(): Boolean def get(): S // throws Exception is state does not exist def getOption(): Option[S] def update(newState: S): Unit def remove(): Unit // exists() will be false after this } ``` Key Semantics of the State class - The state can be null. - If the state.remove() is called, then state.exists() will return false, and getOption will returm None. - After that state.update(newState) is called, then state.exists() will return true, and getOption will return Some(...). - None of the operations are thread-safe. This is to avoid memory barriers. *Usage* ``` val stateFunc = (word: String, words: Iterator[String, runningCount: KeyedState[Long]) => { val newCount = words.size + runningCount.getOption.getOrElse(0L) runningCount.update(newCount) (word, newCount) } dataset // type is Dataset[String] .groupByKey[String](w => w) // generates KeyValueGroupedDataset[String, String] .mapGroupsWithState[Long, (String, Long)](stateFunc) // returns Dataset[(String, Long)] ``` ## How was this patch tested? New unit tests. Author: Tathagata Das <[email protected]> Closes #16850 from tdas/mapWithState-branch-2.1.
…cancelOnInterrupt ## What changes were proposed in this pull request? `Signaling.cancelOnInterrupt` leaks a SparkContext per call and it makes ReplSuite unstable. This PR adds `SparkContext.getActive` to allow `Signaling.cancelOnInterrupt` to get the active `SparkContext` to avoid the leak. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes #16825 from zsxwing/SPARK-19481. (cherry picked from commit 303f00a) Signed-off-by: Davies Liu <[email protected]>
…umns ## What changes were proposed in this pull request? The analyzer currently does not check if a column used in grouping sets is actually nullable itself. This can cause the nullability of the column to be incorrect, which can cause null pointer exceptions down the line. This PR fixes that by also consider the nullability of the column. This is only a problem for Spark 2.1 and below. The latest master uses a different approach. Closes #16874 ## How was this patch tested? Added a regression test to `SQLQueryTestSuite.grouping_set`. Author: Herman van Hovell <[email protected]> Closes #16873 from hvanhovell/SPARK-19509.
…6852 ## What changes were proposed in this pull request? Set currentVars to null in GenerateOrdering.genComparisons before genCode is called. genCode ignores INPUT_ROW if currentVars is not null and in genComparisons we want it to use INPUT_ROW. ## How was this patch tested? Added test with 2 queries in WholeStageCodegenSuite Author: Bogdan Raducanu <[email protected]> Closes #16875 from bogdanrdc/SPARK-19512-2.1.
## What changes were proposed in this pull request? Using from_json on a column with an empty string results in: java.util.NoSuchElementException: head of empty list. This is because `parser.parse(input)` may return `Nil` when `input.trim.isEmpty` ## How was this patch tested? Regression test in `JsonExpressionsSuite` Author: Burak Yavuz <[email protected]> Closes #16881 from brkyvz/json-fix. (cherry picked from commit d5593f7) Signed-off-by: Herman van Hovell <[email protected]>
…able.Map also ## What changes were proposed in this pull request? Fixes compile errors in generated code when user has case class with a `scala.collections.immutable.Map` instead of a `scala.collections.Map`. Since ArrayBasedMapData.toScalaMap returns the immutable version we can make it work with both. ## How was this patch tested? Additional unit tests. Author: Andrew Ray <[email protected]> Closes #16161 from aray/fix-map-codegen. (cherry picked from commit 46d30ac) Signed-off-by: Cheng Lian <[email protected]>
…stamp column
## What changes were proposed in this pull request?
Fix a bug in collect method for collecting timestamp column, the bug can be reproduced as shown in the following codes and outputs:
```
library(SparkR)
sparkR.session(master = "local")
df <- data.frame(col1 = c(0, 1, 2),
col2 = c(as.POSIXct("2017-01-01 00:00:01"), NA, as.POSIXct("2017-01-01 12:00:01")))
sdf1 <- createDataFrame(df)
print(dtypes(sdf1))
df1 <- collect(sdf1)
print(lapply(df1, class))
sdf2 <- filter(sdf1, "col1 > 0")
print(dtypes(sdf2))
df2 <- collect(sdf2)
print(lapply(df2, class))
```
As we can see from the printed output, the column type of col2 in df2 is converted to numeric unexpectedly, when NA exists at the top of the column.
This is caused by method `do.call(c, list)`, if we convert a list, i.e. `do.call(c, list(NA, as.POSIXct("2017-01-01 12:00:01"))`, the class of the result is numeric instead of POSIXct.
Therefore, we need to cast the data type of the vector explicitly.
## How was this patch tested?
The patch can be tested manually with the same code above.
Author: titicaca <[email protected]>
Closes #16689 from titicaca/sparkr-dev.
(cherry picked from commit bc0a0e6)
Signed-off-by: Felix Cheung <[email protected]>
…or when the cluster size doesn't equal to k ## What changes were proposed in this pull request? Backport fix of #16666 ## How was this patch tested? Backport unit tests Author: [email protected] <[email protected]> Closes #16761 from wangmiao1981/kmeansport.
…hould not be in the same group ## What changes were proposed in this pull request? In `KafkaOffsetReader`, when error occurs, we abort the existing consumer and create a new consumer. In our current implementation, the first consumer and the second consumer would be in the same group (which leads to SPARK-19559), **_violating our intention of the two consumers not being in the same group._** The cause is that, in our current implementation, the first consumer is created before `groupId` and `nextId` are initialized in the constructor. Then even if `groupId` and `nextId` are increased during the creation of that first consumer, `groupId` and `nextId` would still be initialized to default values in the constructor for the second consumer. We should make sure that `groupId` and `nextId` are initialized before any consumer is created. ## How was this patch tested? Ran 100 times of `KafkaSourceSuite`; all passed Author: Liwei Lin <[email protected]> Closes #16902 from lw-lin/SPARK-19564-. (cherry picked from commit 2bdbc87) Signed-off-by: Shixiong Zhu <[email protected]>
…amount is not equal to end indices amount ### What changes were proposed in this pull request? ``` Liquid Exception: Start indices amount is not equal to end indices amount, see /Users/xiao/IdeaProjects/sparkDelivery/docs/../examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java. in ml-features.md ``` So far, the build is broken after merging #16789 This PR is to fix it. ## How was this patch tested? Manual Author: Xiao Li <[email protected]> Closes #16908 from gatorsmile/docMLFix. (cherry picked from commit 855a1b7) Signed-off-by: Sean Owen <[email protected]>
## What changes were proposed in this pull request? Add missing `warnings` import. ## How was this patch tested? Manual tests. Author: zero323 <[email protected]> Closes #16846 from zero323/SPARK-19506. (cherry picked from commit 5e7cd33) Signed-off-by: Holden Karau <[email protected]>
…thout errors ## What changes were proposed in this pull request? When a query uses a temp checkpoint dir, it's better to delete it if it's stopped without errors. ## How was this patch tested? New unit tests. Author: Shixiong Zhu <[email protected]> Closes #16880 from zsxwing/delete-temp-checkpoint. (cherry picked from commit 3dbff9b) Signed-off-by: Burak Yavuz <[email protected]>
…lassLoader to load Netty generated classes ## What changes were proposed in this pull request? Netty's `MessageToMessageEncoder` uses [Javassist](https://github.com/netty/netty/blob/91a0bdc17a8298437d6de08a8958d753799bd4a6/common/src/main/java/io/netty/util/internal/JavassistTypeParameterMatcherGenerator.java#L62) to generate a matcher class and the implementation calls `Class.forName` to check if this class is already generated. If `MessageEncoder` or `MessageDecoder` is created in `ExecutorClassLoader.findClass`, it will cause `ClassCircularityError`. This is because loading this Netty generated class will call `ExecutorClassLoader.findClass` to search this class, and `ExecutorClassLoader` will try to use RPC to load it and cause to load the non-exist matcher class again. JVM will report `ClassCircularityError` to prevent such infinite recursion. ##### Why it only happens in Maven builds It's because Maven and SBT have different class loader tree. The Maven build will set a URLClassLoader as the current context class loader to run the tests and expose this issue. The class loader tree is as following: ``` bootstrap class loader ------ ... ----- REPL class loader ---- ExecutorClassLoader | | URLClasssLoader ``` The SBT build uses the bootstrap class loader directly and `ReplSuite.test("propagation of local properties")` is the first test in ReplSuite, which happens to load `io/netty/util/internal/__matchers__/org/apache/spark/network/protocol/MessageMatcher` into the bootstrap class loader (Note: in maven build, it's loaded into URLClasssLoader so it cannot be found in ExecutorClassLoader). This issue can be reproduced in SBT as well. Here are the produce steps: - Enable `hadoop.caller.context.enabled`. - Replace `Class.forName` with `Utils.classForName` in `object CallerContext`. - Ignore `ReplSuite.test("propagation of local properties")`. - Run `ReplSuite` using SBT. This PR just creates a singleton MessageEncoder and MessageDecoder and makes sure they are created before switching to ExecutorClassLoader. TransportContext will be created when creating RpcEnv and that happens before creating ExecutorClassLoader. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes #16859 from zsxwing/SPARK-17714. (cherry picked from commit 905fdf0) Signed-off-by: Shixiong Zhu <[email protected]>
…aitUninterruptibly() This patch replaces a single `awaitUninterruptibly()` call with a plain `await()` call in Spark's `network-common` library in order to fix a bug which may cause tasks to be uncancellable. In Spark's Netty RPC layer, `TransportClientFactory.createClient()` calls `awaitUninterruptibly()` on a Netty future while waiting for a connection to be established. This creates problem when a Spark task is interrupted while blocking in this call (which can happen in the event of a slow connection which will eventually time out). This has bad impacts on task cancellation when `interruptOnCancel = true`. As an example of the impact of this problem, I experienced significant numbers of uncancellable "zombie tasks" on a production cluster where several tasks were blocked trying to connect to a dead shuffle server and then continued running as zombies after I cancelled the associated Spark stage. The zombie tasks ran for several minutes with the following stack: ``` java.lang.Object.wait(Native Method) java.lang.Object.wait(Object.java:460) io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:607) io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:301) org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:224) org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179) => holding Monitor(java.lang.Object1849476028}) org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105) org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114) org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169) org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala: 350) org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286) org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:120) org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45) org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) org.apache.spark.rdd.RDD.iterator(RDD.scala:287) [...] ``` As far as I can tell, `awaitUninterruptibly()` might have been used in order to avoid having to declare that methods throw `InterruptedException` (this code is written in Java, hence the need to use checked exceptions). This patch simply replaces this with a regular, interruptible `await()` call,. This required several interface changes to declare a new checked exception (these are internal interfaces, though, and this change doesn't significantly impact binary compatibility). An alternative approach would be to wrap `InterruptedException` into `IOException` in order to avoid having to change interfaces. The problem with this approach is that the `network-shuffle` project's `RetryingBlockFetcher` code treats `IOExceptions` as transitive failures when deciding whether to retry fetches, so throwing a wrapped `IOException` might cause an interrupted shuffle fetch to be retried, further prolonging the lifetime of a cancelled zombie task. Note that there are three other `awaitUninterruptibly()` in the codebase, but those calls have a hard 10 second timeout and are waiting on a `close()` operation which is expected to complete near instantaneously, so the impact of uninterruptibility there is much smaller. Manually. Author: Josh Rosen <[email protected]> Closes #16866 from JoshRosen/SPARK-19529. (cherry picked from commit 1c4d10b) Signed-off-by: Cheng Lian <[email protected]>
Spark's I/O encryption uses an ephemeral key for each driver instance. So driver B cannot decrypt data written by driver A since it doesn't have the correct key. The write ahead log is used for recovery, thus needs to be readable by a different driver. So it cannot be encrypted by Spark's I/O encryption code. The BlockManager APIs used by the WAL code to write the data automatically encrypt data, so changes are needed so that callers can to opt out of encryption. Aside from that, the "putBytes" API in the BlockManager does not do encryption, so a separate situation arised where the WAL would write unencrypted data to the BM and, when those blocks were read, decryption would fail. So the WAL code needs to ask the BM to encrypt that data when encryption is enabled; this code is not optimal since it results in a (temporary) second copy of the data block in memory, but should be OK for now until a more performant solution is added. The non-encryption case should not be affected. Tested with new unit tests, and by running streaming apps that do recovery using the WAL data with I/O encryption turned on. Author: Marcelo Vanzin <[email protected]> Closes #16862 from vanzin/SPARK-19520. (cherry picked from commit 0169360) Signed-off-by: Marcelo Vanzin <[email protected]>
…in the doc ## What changes were proposed in this pull request? https://spark.apache.org/docs/latest/sql-programming-guide.html#caching-data-in-memory In the doc, the call spark.cacheTable(“tableName”) and spark.uncacheTable(“tableName”) actually needs to be spark.catalog.cacheTable and spark.catalog.uncacheTable ## How was this patch tested? Built the docs and verified the change shows up fine. Author: Sunitha Kambhampati <[email protected]> Closes #16919 from skambha/docChange. (cherry picked from commit 9b5e460) Signed-off-by: Xiao Li <[email protected]>
…ment ## What changes were proposed in this pull request? As discussed in [JIRA](https://issues.apache.org/jira/browse/SPARK-19501), this patch addresses the problem where too many HDFS RPCs are made when there are many URIs specified in `spark.yarn.jars`, potentially adding hundreds of RTTs to YARN before the application launches. This becomes significant when submitting the application to a non-local YARN cluster (where the RTT may be in order of 100ms, for example). For each URI specified, the current implementation makes at least two HDFS RPCs, for: - [Calling `getFileStatus()` before uploading each file to the distributed cache in `ClientDistributedCacheManager.addResource()`](https://github.com/apache/spark/blob/v2.1.0/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala#L71). - [Resolving any symbolic links in each of the file URI](https://github.com/apache/spark/blob/v2.1.0/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L377-L379), which repeatedly makes HDFS RPCs until the all symlinks are resolved. (see [`FileContext.resolve(Path)`](https://github.com/apache/hadoop/blob/release-2.7.1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileContext.java#L2189-L2195), [`FSLinkResolver.resolve(FileContext, Path)`](https://github.com/apache/hadoop/blob/release-2.7.1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSLinkResolver.java#L79-L112), and [`AbstractFileSystem.resolvePath()`](https://github.com/apache/hadoop/blob/release-2.7.1/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java#L464-L468).) The first `getFileStatus` RPC can be removed, using `statCache` populated with the file statuses retrieved with [the previous `globStatus` call](https://github.com/apache/spark/blob/v2.1.0/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L531). The second one can be largely reduced by caching the symlink resolution results in a mutable.HashMap. This patch adds a local variable in `yarn.Client.prepareLocalResources()` and passes it as an additional parameter to `yarn.Client.copyFileToRemote`. [The symlink resolution code was added in 2013](a35472e#diff-b050df3f55b82065803d6e83453b9706R187) and has not changed since. I am assuming that this is still required, but otherwise we can remove using `symlinkCache` and symlink resolution altogether. ## How was this patch tested? This patch is based off 8e8afb3, currently the latest YARN patch on master. All tests except a few in spark-hive passed with `./dev/run-tests` on my machine, using JDK 1.8.0_112 on macOS 10.12.3; also tested myself with this modified version of SPARK 2.2.0-SNAPSHOT which performed a normal deployment and execution on a YARN cluster without errors. Author: Jong Wook Kim <[email protected]> Closes #16916 from jongwook/SPARK-19501. (cherry picked from commit ab9872d) Signed-off-by: Marcelo Vanzin <[email protected]>
…CRAN check ## What changes were proposed in this pull request? - this is cause by changes in SPARK-18444, SPARK-18643 that we no longer install Spark when `master = ""` (default), but also related to SPARK-18449 since the real `master` value is not known at the time the R code in `sparkR.session` is run. (`master` cannot default to "local" since it could be overridden by spark-submit commandline or spark config) - as a result, while running SparkR as a package in IDE is working fine, CRAN check is not as it is launching it via non-interactive script - fix is to add check to the beginning of each test and vignettes; the same would also work by changing `sparkR.session()` to `sparkR.session(master = "local")` in tests, but I think being more explicit is better. ## How was this patch tested? Tested this by reverting version to 2.1, since it needs to download the release jar with matching version. But since there are changes in 2.2 (specifically around SparkR ML) that are incompatible with 2.1, some tests are failing in this config. Will need to port this to branch-2.1 and retest with 2.1 release jar. manually as: ``` # modify DESCRIPTION to revert version to 2.1.0 SPARK_HOME=/usr/spark R CMD build pkg # run cran check without SPARK_HOME R CMD check --as-cran SparkR_2.1.0.tar.gz ``` Author: Felix Cheung <[email protected]> Closes #16720 from felixcheung/rcranchecktest. (cherry picked from commit a3626ca) Signed-off-by: Shivaram Venkataraman <[email protected]>
…und batch mode ## What changes were proposed in this pull request? Revision to structured-streaming-kafka-integration.md to reflect new Batch query specification and options. zsxwing tdas Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Tyson Condie <[email protected]> Closes #16918 from tcondie/kafka-docs. (cherry picked from commit 447b2b5) Signed-off-by: Tathagata Das <[email protected]>
Add coalesce on DataFrame for down partitioning without shuffle and coalesce on Column manual, unit tests Author: Felix Cheung <[email protected]> Closes #16739 from felixcheung/rcoalesce. (cherry picked from commit 671bc08) Signed-off-by: Felix Cheung <[email protected]>
## What changes were proposed in this pull request? SPARK-19464 removed support for Hadoop 2.5 and earlier, so we can do some cleanup for HDFSMetadataLog. This PR includes the following changes: - ~~Remove the workaround codes for HADOOP-10622.~~ Unfortunately, there is another issue [HADOOP-14084](https://issues.apache.org/jira/browse/HADOOP-14084) that prevents us from removing the workaround codes. - Remove unnecessary `writer: (T, OutputStream) => Unit` and just call `serialize` directly. - Remove catching FileNotFoundException. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes #16932 from zsxwing/metadata-cleanup. (cherry picked from commit 21b4ba2) Signed-off-by: Shixiong Zhu <[email protected]>
## What changes were proposed in this pull request? Right now, we only have info level log after we finish the tests of a Python test file. We should also log the start of a test. So, if a test is hanging, we can tell which test file is running. ## How was this patch tested? This is a change for python tests. Author: Yin Huai <[email protected]> Closes #16935 from yhuai/SPARK-19604. (cherry picked from commit f6c3bba) Signed-off-by: Yin Huai <[email protected]>
## What changes were proposed in this pull request?
`StreamingQuery.explain` doesn't show the correct streaming physical plan right now because `ExplainCommand` receives a runtime batch plan and its `logicalPlan.isStreaming` is always false.
This PR adds `streaming` parameter to `ExplainCommand` to allow `StreamExecution` to specify that it's a streaming plan.
Examples of the explain outputs:
- streaming DataFrame.explain()
```
== Physical Plan ==
*HashAggregate(keys=[value#518], functions=[count(1)])
+- StateStoreSave [value#518], OperatorStateId(<unknown>,0,0), Append, 0
+- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
+- StateStoreRestore [value#518], OperatorStateId(<unknown>,0,0)
+- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
+- Exchange hashpartitioning(value#518, 5)
+- *HashAggregate(keys=[value#518], functions=[partial_count(1)])
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
+- *MapElements <function1>, obj#517: java.lang.String
+- *DeserializeToObject value#513.toString, obj#516: java.lang.String
+- StreamingRelation MemoryStream[value#513], [value#513]
```
- StreamingQuery.explain(extended = false)
```
== Physical Plan ==
*HashAggregate(keys=[value#518], functions=[count(1)])
+- StateStoreSave [value#518], OperatorStateId(...,0,0), Complete, 0
+- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
+- StateStoreRestore [value#518], OperatorStateId(...,0,0)
+- *HashAggregate(keys=[value#518], functions=[merge_count(1)])
+- Exchange hashpartitioning(value#518, 5)
+- *HashAggregate(keys=[value#518], functions=[partial_count(1)])
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
+- *MapElements <function1>, obj#517: java.lang.String
+- *DeserializeToObject value#543.toString, obj#516: java.lang.String
+- LocalTableScan [value#543]
```
- StreamingQuery.explain(extended = true)
```
== Parsed Logical Plan ==
Aggregate [value#518], [value#518, count(1) AS count(1)#524L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String
+- DeserializeToObject cast(value#543 as string).toString, obj#516: java.lang.String
+- LocalRelation [value#543]
== Analyzed Logical Plan ==
value: string, count(1): bigint
Aggregate [value#518], [value#518, count(1) AS count(1)#524L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String
+- DeserializeToObject cast(value#543 as string).toString, obj#516: java.lang.String
+- LocalRelation [value#543]
== Optimized Logical Plan ==
Aggregate [value#518], [value#518, count(1) AS count(1)#524L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
+- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#517: java.lang.String
+- DeserializeToObject value#543.toString, obj#516: java.lang.String
+- LocalRelation [value#543]
== Physical Plan ==
*HashAggregate(keys=[value#518], functions=[count(1)], output=[value#518, count(1)#524L])
+- StateStoreSave [value#518], OperatorStateId(...,0,0), Complete, 0
+- *HashAggregate(keys=[value#518], functions=[merge_count(1)], output=[value#518, count#530L])
+- StateStoreRestore [value#518], OperatorStateId(...,0,0)
+- *HashAggregate(keys=[value#518], functions=[merge_count(1)], output=[value#518, count#530L])
+- Exchange hashpartitioning(value#518, 5)
+- *HashAggregate(keys=[value#518], functions=[partial_count(1)], output=[value#518, count#530L])
+- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#518]
+- *MapElements <function1>, obj#517: java.lang.String
+- *DeserializeToObject value#543.toString, obj#516: java.lang.String
+- LocalTableScan [value#543]
```
## How was this patch tested?
The updated unit test.
Author: Shixiong Zhu <[email protected]>
Closes #16934 from zsxwing/SPARK-19603.
(cherry picked from commit fc02ef9)
Signed-off-by: Shixiong Zhu <[email protected]>
## What changes were proposed in this pull request? fix test broken by git merge for #16739 ## How was this patch tested? manual Author: Felix Cheung <[email protected]> Closes #16950 from felixcheung/fixrtest.
…uite should clean up stopped sessions. ## What changes were proposed in this pull request? `SparkSessionBuilderSuite` should clean up stopped sessions. Otherwise, it leaves behind some stopped `SparkContext`s interfereing with other test suites using `ShardSQLContext`. Recently, master branch fails consequtively. - https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/ ## How was this patch tested? Pass the Jenkins with a updated suite. Author: Dongjoon Hyun <[email protected]> Closes #18572 from dongjoon-hyun/SPARK-21345-BRANCH-2.1.
…lyzing empty table ## What changes were proposed in this pull request? We should be able to store zero size and row count after analyzing empty table. This is a backport for 9fccc36. ## How was this patch tested? Added new test. Author: Zhenhua Wang <[email protected]> Closes #18577 from wzhfy/analyzeEmptyTable-2.1.
…rison ## What changes were proposed in this pull request? This PR fixes a wrong comparison for `BinaryType`. This PR enables unsigned comparison and unsigned prefix generation for an array for `BinaryType`. Previous implementations uses signed operations. ## How was this patch tested? Added a test suite in `OrderingSuite`. Author: Kazuaki Ishizaki <[email protected]> Closes #18571 from kiszk/SPARK-21344. (cherry picked from commit ac5d5d7) Signed-off-by: gatorsmile <[email protected]>
…alyst should be global ## What changes were proposed in this pull request? This PR is backport of #18418 to Spark 2.1. [SPARK-21391](https://issues.apache.org/jira/browse/SPARK-21391) reported this problem in Spark 2.1. The issue happens in `ExternalMapToCatalyst`. For example, the following codes create ExternalMap`ExternalMapToCatalyst`ToCatalyst to convert Scala Map to catalyst map format. ``` val data = Seq.tabulate(10)(i => NestedData(1, Map("key" -> InnerData("name", i + 100)))) val ds = spark.createDataset(data) ``` The `valueConverter` in `ExternalMapToCatalyst` looks like: ``` if (isnull(lambdavariable(ExternalMapToCatalyst_value52, ExternalMapToCatalyst_value_isNull52, ObjectType(class org.apache.spark.sql.InnerData), true))) null else named_struct(name, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(ExternalMapToCatalyst_value52, ExternalMapToCatalyst_value_isNull52, ObjectType(class org.apache.spark.sql.InnerData), true)).name, true), value, assertnotnull(lambdavariable(ExternalMapToCatalyst_value52, ExternalMapToCatalyst_value_isNull52, ObjectType(class org.apache.spark.sql.InnerData), true)).value) ``` There is a `CreateNamedStruct` expression (`named_struct`) to create a row of `InnerData.name` and `InnerData.value` that are referred by `ExternalMapToCatalyst_value52`. Because `ExternalMapToCatalyst_value52` are local variable, when `CreateNamedStruct` splits expressions to individual functions, the local variable can't be accessed anymore. ## How was this patch tested? Added a new test suite into `DatasetPrimitiveSuite` Author: Kazuaki Ishizaki <[email protected]> Closes #18627 from kiszk/SPARK-21391.
…pressions
## What changes were proposed in this pull request?
This PR changes the direction of expression transformation in the DecimalPrecision rule. Previously, the expressions were transformed down, which led to incorrect result types when decimal expressions had other decimal expressions as their operands. The root cause of this issue was in visiting outer nodes before their children. Consider the example below:
```
val inputSchema = StructType(StructField("col", DecimalType(26, 6)) :: Nil)
val sc = spark.sparkContext
val rdd = sc.parallelize(1 to 2).map(_ => Row(BigDecimal(12)))
val df = spark.createDataFrame(rdd, inputSchema)
// Works correctly since no nested decimal expression is involved
// Expected result type: (26, 6) * (26, 6) = (38, 12)
df.select($"col" * $"col").explain(true)
df.select($"col" * $"col").printSchema()
// Gives a wrong result since there is a nested decimal expression that should be visited first
// Expected result type: ((26, 6) * (26, 6)) * (26, 6) = (38, 12) * (26, 6) = (38, 18)
df.select($"col" * $"col" * $"col").explain(true)
df.select($"col" * $"col" * $"col").printSchema()
```
The example above gives the following output:
```
// Correct result without sub-expressions
== Parsed Logical Plan ==
'Project [('col * 'col) AS (col * col)#4]
+- LogicalRDD [col#1]
== Analyzed Logical Plan ==
(col * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]
== Optimized Logical Plan ==
Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- LogicalRDD [col#1]
== Physical Plan ==
*Project [CheckOverflow((col#1 * col#1), DecimalType(38,12)) AS (col * col)#4]
+- Scan ExistingRDD[col#1]
// Schema
root
|-- (col * col): decimal(38,12) (nullable = true)
// Incorrect result with sub-expressions
== Parsed Logical Plan ==
'Project [(('col * 'col) * 'col) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]
== Analyzed Logical Plan ==
((col * col) * col): decimal(38,12)
Project [CheckOverflow((promote_precision(cast(CheckOverflow((promote_precision(cast(col#1 as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) as decimal(26,6))) * promote_precision(cast(col#1 as decimal(26,6)))), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]
== Optimized Logical Plan ==
Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- LogicalRDD [col#1]
== Physical Plan ==
*Project [CheckOverflow((cast(CheckOverflow((col#1 * col#1), DecimalType(38,12)) as decimal(26,6)) * col#1), DecimalType(38,12)) AS ((col * col) * col)#11]
+- Scan ExistingRDD[col#1]
// Schema
root
|-- ((col * col) * col): decimal(38,12) (nullable = true)
```
## How was this patch tested?
This PR was tested with available unit tests. Moreover, there are tests to cover previously failing scenarios.
Author: aokolnychyi <[email protected]>
Closes #18583 from aokolnychyi/spark-21332.
(cherry picked from commit 0be5fb4)
Signed-off-by: gatorsmile <[email protected]>
…lures in some cases ## What changes were proposed in this pull request? https://issues.apache.org/jira/projects/SPARK/issues/SPARK-21441 This issue can be reproduced by the following example: ``` val spark = SparkSession .builder() .appName("smj-codegen") .master("local") .config("spark.sql.autoBroadcastJoinThreshold", "1") .getOrCreate() val df1 = spark.createDataFrame(Seq((1, 1), (2, 2), (3, 3))).toDF("key", "int") val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"), (3, "3"))).toDF("key", "str") val df = df1.join(df2, df1("key") === df2("key")) .filter("int = 2 or reflect('java.lang.Integer', 'valueOf', str) = 1") .select("int") df.show() ``` To conclude, the issue happens when: (1) SortMergeJoin condition contains CodegenFallback expressions. (2) In PhysicalPlan tree, SortMergeJoin node is the child of root node, e.g., the Project in above example. This patch fixes the logic in `CollapseCodegenStages` rule. ## How was this patch tested? Unit test and manual verification in our cluster. Author: donnyzone <[email protected]> Closes #18656 from DonnyZone/Fix_SortMergeJoinExec. (cherry picked from commit 6b6dd68) Signed-off-by: Wenchen Fan <[email protected]>
## What changes were proposed in this pull request? JIRA Issue: https://issues.apache.org/jira/browse/SPARK-21446 options.asConnectionProperties can not have fetchsize,because fetchsize belongs to Spark-only options, and Spark-only options have been excluded in connection properities. So change properties of beforeFetch from options.asConnectionProperties.asScala.toMap to options.asProperties.asScala.toMap ## How was this patch tested? Author: DFFuture <[email protected]> Closes #18665 from DFFuture/sparksql_pg. (cherry picked from commit c972918) Signed-off-by: gatorsmile <[email protected]>
## What changes were proposed in this pull request? add `setWeightCol` method for OneVsRest. `weightCol` is ignored if classifier doesn't inherit HasWeightCol trait. ## How was this patch tested? + [x] add an unit test. Author: Yan Facai (颜发才) <[email protected]> Closes #18554 from facaiy/BUG/oneVsRest_missing_weightCol. (cherry picked from commit a5a3189) Signed-off-by: Yanbo Liang <[email protected]>
This reverts commit 8520d7c.
… by its canonicalized child ## What changes were proposed in this pull request? When there are aliases (these aliases were added for nested fields) as parameters in `RuntimeReplaceable`, as they are not in the children expression, those aliases can't be cleaned up in analyzer rule `CleanupAliases`. An expression `nvl(foo.foo1, "value")` can be resolved to two semantically different expressions in a group by query because they contain different aliases. Because those aliases are not children of `RuntimeReplaceable` which is an `UnaryExpression`. So we can't trim the aliases out by simple transforming the expressions in `CleanupAliases`. If we want to replace the non-children aliases in `RuntimeReplaceable`, we need to add more codes to `RuntimeReplaceable` and modify all expressions of `RuntimeReplaceable`. It makes the interface ugly IMO. Consider those aliases will be replaced later at optimization and so they're no harm, this patch chooses to simply override `canonicalized` of `RuntimeReplaceable`. One concern is about `CleanupAliases`. Because it actually cannot clean up ALL aliases inside a plan. To make caller of this rule notice that, this patch adds a comment to `CleanupAliases`. ## How was this patch tested? Added test. Author: Liang-Chi Hsieh <[email protected]> Closes #18761 from viirya/SPARK-21555. (cherry picked from commit 9c8109e) Signed-off-by: gatorsmile <[email protected]>
Handle the case where the server closes the socket before the full message has been written by the client. Author: Marcelo Vanzin <[email protected]> Closes #18727 from vanzin/SPARK-21522. (cherry picked from commit b133501) Signed-off-by: Marcelo Vanzin <[email protected]>
… registry ## What changes were proposed in this pull request? When using PySpark broadcast variables in a multi-threaded environment, `SparkContext._pickled_broadcast_vars` becomes a shared resource. A race condition can occur when broadcast variables that are pickled from one thread get added to the shared ` _pickled_broadcast_vars` and become part of the python command from another thread. This PR introduces a thread-safe pickled registry using thread local storage so that when python command is pickled (causing the broadcast variable to be pickled and added to the registry) each thread will have their own view of the pickle registry to retrieve and clear the broadcast variables used. ## How was this patch tested? Added a unit test that causes this race condition using another thread. Author: Bryan Cutler <[email protected]> Closes #18825 from BryanCutler/pyspark-bcast-threadsafe-SPARK-12717-2_1.
…le with extreme values on the partition column ## What changes were proposed in this pull request? An overflow of the difference of bounds on the partitioning column leads to no data being read. This patch checks for this overflow. ## How was this patch tested? New unit test. Author: Andrew Ray <[email protected]> Closes #18800 from aray/SPARK-21330. (cherry picked from commit 25826c7) Signed-off-by: Sean Owen <[email protected]>
## What changes were proposed in this pull request? In SQLContext.get(key,null) for a key that is not defined in the conf, and doesn't have a default value defined, throws a NPE. Int happens only when conf has a value converter Added null check on defaultValue inside SQLConf.getConfString to avoid calling entry.valueConverter(defaultValue) ## How was this patch tested? Added unit test Author: vinodkc <[email protected]> Closes #18852 from vinodkc/br_Fix_SPARK-21588. (cherry picked from commit 1ba967b) Signed-off-by: gatorsmile <[email protected]>
…mation ## What changes were proposed in this pull request? Backporting SPARK-18535 and SPARK-19720 to spark 2.1 It's a backport PR that redacts senstive information by configuration to Spark UI and Spark Submit console logs. Using reference from Mark Grover markapache.org PRs ## How was this patch tested? Same tests from PR applied Author: Mark Grover <[email protected]> Closes #18802 from dmvieira/feature-redact.
The PR is related to #18554, and is modified for branch 2.1. ## What changes were proposed in this pull request? add `setWeightCol` method for OneVsRest. `weightCol` is ignored if classifier doesn't inherit HasWeightCol trait. ## How was this patch tested? + [x] add an unit test. Author: Yan Facai (颜发才) <[email protected]> Closes #18763 from facaiy/BUG/branch-2.1_OneVsRest_support_setWeightCol.
…when paths are successfully removed ## What changes were proposed in this pull request? Backport SPARK-21721 to branch 2.1: We put staging path to delete into the deleteOnExit cache of FileSystem in case of the path can't be successfully removed. But when we successfully remove the path, we don't remove it from the cache. We should do it to avoid continuing grow the cache size. ## How was this patch tested? Added test. Author: Liang-Chi Hsieh <[email protected]> Closes #18947 from viirya/SPARK-21721-backport-2.1.
…SurvivalRegression ## What changes were proposed in this pull request? The line SchemaUtils.appendColumn(schema, $(predictionCol), IntegerType) did not modify the variable schema, hence only the last line had any effect. A temporary variable is used to correctly append the two columns predictionCol and probabilityCol. ## How was this patch tested? Manually. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Cédric Pelvet <[email protected]> Closes #18980 from sharp-pixel/master. (cherry picked from commit 73e04ec) Signed-off-by: Sean Owen <[email protected]>
…xit cache when paths are successfully removed ## What changes were proposed in this pull request? Fix a typo in test. ## How was this patch tested? Jenkins tests. Author: Liang-Chi Hsieh <[email protected]> Closes #19006 from viirya/SPARK-21721-backport-2.1-followup.
…ow NPE backport #19036 to branch 2.1 and 2.0 Author: Wenchen Fan <[email protected]> Closes #19040 from cloud-fan/bug.
## What changes were proposed in this pull request? killExecutor api currently does not allow killing an executor without updating the total number of executors needed. In case of dynamic allocation is turned on and the allocator tries to kill an executor, the scheduler reduces the total number of executors needed ( see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L635) which is incorrect because the allocator already takes care of setting the required number of executors itself. ## How was this patch tested? Ran a job on the cluster and made sure the executor request is correct Author: Sital Kedia <[email protected]> Closes #19081 from sitalkedia/skedia/oss_fix_executor_allocation. (cherry picked from commit 6949a9c) Signed-off-by: Marcelo Vanzin <[email protected]>
…top SparkContext. ## What changes were proposed in this pull request? `pyspark.sql.tests.SQLTests2` doesn't stop newly created spark context in the test and it might affect the following tests. This pr makes `pyspark.sql.tests.SQLTests2` stop `SparkContext`. ## How was this patch tested? Existing tests. Author: Takuya UESHIN <[email protected]> Closes #19158 from ueshin/issues/SPARK-21950. (cherry picked from commit 57bc1e9) Signed-off-by: Takuya UESHIN <[email protected]>
## What changes were proposed in this pull request? cherrypick or manually porting changes to 2.1 ## How was this patch tested? Jenkins Author: Felix Cheung <[email protected]> Author: hyukjinkwon <[email protected]> Author: Wayne Zhang <[email protected]> Closes #19165 from felixcheung/rbackportpkg21.
|
Can one of the admins verify this patch? |
Member
|
@engineeyao close this |
## What changes were proposed in this pull request?
Fixed wrong documentation for Mean Absolute Error.
Even though the code is correct for the MAE:
```scala
Since("1.2.0")
def meanAbsoluteError: Double = {
summary.normL1(1) / summary.count
}
```
In the documentation the division by N is missing.
## How was this patch tested?
All of spark tests were run.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: FavioVazquez <[email protected]>
Author: faviovazquez <[email protected]>
Author: Favio André Vázquez <[email protected]>
Closes #19190 from FavioVazquez/mae-fix.
(cherry picked from commit e2ac2f1)
Signed-off-by: Sean Owen <[email protected]>
srowen
added a commit
to srowen/spark
that referenced
this pull request
Sep 12, 2017
Closes apache#18522 Closes apache#17722 Closes apache#18879 Closes apache#18891 Closes apache#18806 Closes apache#18948 Closes apache#18949 Closes apache#19070 Closes apache#19039 Closes apache#19142 Closes apache#18515 Closes apache#19154 Closes apache#19162 Closes apache#19187
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Please review http://spark.apache.org/contributing.html before opening a pull request.