From e477da835f673a56e3c53d5754682a0104da95f9 Mon Sep 17 00:00:00 2001 From: Rahij Ramsharan Date: Wed, 25 Sep 2019 16:53:14 +0100 Subject: [PATCH 1/4] optimization: call fs.exists only when necessary --- .../InsertIntoHadoopFsRelationCommand.scala | 42 +++++++++++-------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index d43fa3893df1d..21dfd2fbda057 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -90,8 +90,6 @@ case class InsertIntoHadoopFsRelationCommand( fs, catalogTable.get, qualifiedOutputPath, matchingPartitions) } - val pathExists = fs.exists(qualifiedOutputPath) - val parameters = CaseInsensitiveMap(options) val partitionOverwriteMode = parameters.get("partitionOverwriteMode") @@ -111,24 +109,34 @@ case class InsertIntoHadoopFsRelationCommand( outputPath = outputPath.toString, dynamicPartitionOverwrite = dynamicPartitionOverwrite) - val doInsertion = (mode, pathExists) match { - case (SaveMode.ErrorIfExists, true) => - throw new AnalysisException(s"path $qualifiedOutputPath already exists.") - case (SaveMode.Overwrite, true) => - if (ifPartitionNotExists && matchingPartitions.nonEmpty) { - false - } else if (dynamicPartitionOverwrite) { - // For dynamic partition overwrite, do not delete partition directories ahead. - true + val pathExists = () => fs.exists(qualifiedOutputPath) + + val doInsertion = mode match { + case SaveMode.Append => + true + case SaveMode.Ignore => + !pathExists() + case SaveMode.ErrorIfExists => + if (pathExists()) { + throw new AnalysisException(s"path $qualifiedOutputPath already exists.") + } + true + case SaveMode.Overwrite => + if (pathExists()) { + if (ifPartitionNotExists && matchingPartitions.nonEmpty) { + false + } else if (dynamicPartitionOverwrite) { + // For dynamic partition overwrite, do not delete partition directories ahead. + true + } else { + deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer) + true + } } else { - deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer) true } - case (SaveMode.Append, _) | (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) => - true - case (SaveMode.Ignore, exists) => - !exists - case (s, exists) => + case s => + val exists = pathExists() throw new IllegalStateException(s"unsupported save mode $s ($exists)") } From f9d008e8d8ddd21ebf08d3fd5f6411c1a1a21d5d Mon Sep 17 00:00:00 2001 From: Rahij Ramsharan Date: Thu, 26 Sep 2019 10:06:17 +0100 Subject: [PATCH 2/4] CR comments --- .../InsertIntoHadoopFsRelationCommand.scala | 34 ++++++++----------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 21dfd2fbda057..91d925aaa6b28 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -20,11 +20,10 @@ package org.apache.spark.sql.execution.datasources import java.io.IOException import org.apache.hadoop.fs.{FileSystem, Path} - import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTablePartition} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTablePartition} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap @@ -109,20 +108,14 @@ case class InsertIntoHadoopFsRelationCommand( outputPath = outputPath.toString, dynamicPartitionOverwrite = dynamicPartitionOverwrite) - val pathExists = () => fs.exists(qualifiedOutputPath) - - val doInsertion = mode match { - case SaveMode.Append => - true - case SaveMode.Ignore => - !pathExists() - case SaveMode.ErrorIfExists => - if (pathExists()) { + val doInsertion = if (mode == SaveMode.Append) { + true + } else { + val pathExists = fs.exists(qualifiedOutputPath) + (mode, pathExists) match { + case (SaveMode.ErrorIfExists, true) => throw new AnalysisException(s"path $qualifiedOutputPath already exists.") - } - true - case SaveMode.Overwrite => - if (pathExists()) { + case (SaveMode.Overwrite, true) => if (ifPartitionNotExists && matchingPartitions.nonEmpty) { false } else if (dynamicPartitionOverwrite) { @@ -132,12 +125,13 @@ case class InsertIntoHadoopFsRelationCommand( deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer) true } - } else { + case (SaveMode.Overwrite, _) | (SaveMode.ErrorIfExists, false) => true - } - case s => - val exists = pathExists() - throw new IllegalStateException(s"unsupported save mode $s ($exists)") + case (SaveMode.Ignore, exists) => + !exists + case (s, exists) => + throw new IllegalStateException(s"unsupported save mode $s ($exists)") + } } if (doInsertion) { From 2a4dbf036e48cd29ce5c6f9f2ed61051c61e2997 Mon Sep 17 00:00:00 2001 From: Rahij Ramsharan Date: Thu, 26 Sep 2019 10:07:44 +0100 Subject: [PATCH 3/4] revert --- .../datasources/InsertIntoHadoopFsRelationCommand.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 91d925aaa6b28..f73fdd4e6a83b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -22,8 +22,9 @@ import java.io.IOException import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTablePartition} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable} +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap From 9edfa7b2dd951955f6dae7674e2fb5890b966dfe Mon Sep 17 00:00:00 2001 From: Rahij Ramsharan Date: Thu, 26 Sep 2019 10:08:24 +0100 Subject: [PATCH 4/4] fix imports --- .../datasources/InsertIntoHadoopFsRelationCommand.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index f73fdd4e6a83b..fbe874b3e8bc5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -20,10 +20,10 @@ package org.apache.spark.sql.execution.datasources import java.io.IOException import org.apache.hadoop.fs.{FileSystem, Path} + import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTablePartition} -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan