Skip to content

Commit 33c4d45

Browse files
Egor Krivokond-popkovakhalymon-cv
authored andcommitted
MapR [SPARK-979] Backport all needed 3.1.2 EEP commits tp 3.2 branch (apache#913)
* MapR [SPARK-953] Investigate and add all needed changes for Spark services (apache#905) * [EZSPA-347] Find a way to pass sensitive configs in secure manner (apache#907) * MapR [SPARK-961] Spark job can't be properly killed using yarn API or CLI (apache#908) * MapR [SPARK-962] MSSQL can not handle SQL syntax which is used in Spark (apache#909) * MapR [SPARK-963] select from hbase table which was created via hive fails (apache#910) Co-authored-by: Dmitry Popkov <[email protected]> Co-authored-by: Andrew Khalymon <[email protected]>
1 parent a95ba07 commit 33c4d45

File tree

8 files changed

+68
-12
lines changed

8 files changed

+68
-12
lines changed

conf/spark-env.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,3 +170,6 @@ export SPARK_WORKER_DIR=$SPARK_HOME/tmp
170170

171171
#UI
172172
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djava.library.path=$SPARK_MAPR_HOME/lib"
173+
export SPARK_HISTORY_OPTS="$SPARK_HISTORY_OPTS -Djava.library.path=$SPARK_MAPR_HOME/lib"
174+
export SPARK_MASTER_HOST=$(hostname --fqdn)
175+
export SPARK_MASTER_IP=$(hostname --fqdn)

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,7 @@ private[spark] class CoarseGrainedExecutorBackend(
300300
} else {
301301
logInfo("Skip exiting executor since it's been already asked to exit before.")
302302
}
303+
self.send(Shutdown)
303304
}
304305

305306
private def decommissionSelf(): Unit = {

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,6 +664,12 @@ private[spark] object Config extends Logging {
664664
.stringConf
665665
.createOptional
666666

667+
val MAPR_SPARK_EXTRACONF_SECRET_NAME =
668+
ConfigBuilder("spark.mapr.extraconf.secret")
669+
.doc("Name of the secret with Spark extra configurations that will be added to sparkConf")
670+
.stringConf
671+
.createOptional
672+
667673
val MAPR_CLUSTER_CONFIGMAP =
668674
ConfigBuilder("spark.mapr.cluster.configMap")
669675
.doc("Name of the mapr cluster config map")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ private[spark] object Constants {
126126
val MAPR_USER_TICKET_SUBPATH = "CONTAINER_TICKET"
127127
val MAPR_USER_SECRET_MOUNT_PATH = "/tmp/usersecret"
128128
val MAPR_USER_TICKET_MOUNT_PATH = s"$MAPR_USER_SECRET_MOUNT_PATH/$MAPR_USER_TICKET_SUBPATH"
129+
val MAPR_SPARK_EXTRA_CONFIG_MOUNT_PATH = "/opt/mapr/kubernetes/spark_secrets"
129130

130131
val ENV_MAPR_METRICSFILE_LOCATION = "MAPR_METRICSFILE_LOCATION"
131132
val MAPR_METRICS_TICKET_SUBPATH = "maprmetricsticket"

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/MaprConfigFeatureStep.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ private[spark] class MaprConfigFeatureStep(conf: KubernetesConf)
4242
applySSSDSecret(podBuilder, containerBuilder)
4343
applySSHSecret(podBuilder, containerBuilder)
4444
applyClientSecret(podBuilder, containerBuilder)
45+
applySparkExtraConfigs(podBuilder, containerBuilder)
4546

4647
SparkPod(podBuilder.build(), containerBuilder.build())
4748
}
@@ -130,6 +131,32 @@ private[spark] class MaprConfigFeatureStep(conf: KubernetesConf)
130131
.endVolumeMount()
131132
}
132133

134+
private def applySparkExtraConfigs(podBuilder: PodBuilder, containerBuilder: ContainerBuilder): Unit = {
135+
val confSecretName = sparkConf.get(MAPR_SPARK_EXTRACONF_SECRET_NAME).get
136+
137+
if (confSecretName.isEmpty) {
138+
return
139+
}
140+
141+
val confSecretVolumeName = "spark-extraconf-secret"
142+
143+
podBuilder.editOrNewSpec()
144+
.addNewVolume()
145+
.withName(confSecretVolumeName)
146+
.withNewSecret()
147+
.withSecretName(confSecretName)
148+
.endSecret()
149+
.endVolume()
150+
.endSpec()
151+
152+
containerBuilder
153+
.addNewVolumeMount()
154+
.withName(confSecretVolumeName)
155+
.withMountPath(MAPR_SPARK_EXTRA_CONFIG_MOUNT_PATH)
156+
.endVolumeMount()
157+
}
158+
159+
133160
private def applyUserSecret(podBuilder: PodBuilder, containerBuilder: ContainerBuilder): Unit = {
134161
val userSecretNameConfig = sparkConf.get(MAPR_USER_SECRET)
135162

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,12 @@ class JDBCOptions(
8989
if (subquery.isEmpty) {
9090
throw QueryExecutionErrors.emptyOptionError(JDBC_QUERY_STRING)
9191
} else {
92-
s"(${subquery}) SPARK_GEN_SUBQ_${curId.getAndIncrement()}"
92+
val runQueryAsIs = parameters.getOrElse(JDBC_USE_RAW_QUERY, "false").toBoolean
93+
if (runQueryAsIs) {
94+
s"${subquery}"
95+
} else {
96+
s"(${subquery}) SPARK_GEN_SUBQ_${curId.getAndIncrement()}"
97+
}
9398
}
9499
}
95100

@@ -258,6 +263,7 @@ object JDBCOptions {
258263
val JDBC_URL = newOption("url")
259264
val JDBC_TABLE_NAME = newOption("dbtable")
260265
val JDBC_QUERY_STRING = newOption("query")
266+
val JDBC_USE_RAW_QUERY = newOption("useRawQuery")
261267
val JDBC_DRIVER_CLASS = newOption("driver")
262268
val JDBC_PARTITION_COLUMN = newOption("partitionColumn")
263269
val JDBC_LOWER_BOUND = newOption("lowerBound")

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,15 @@ import scala.math.BigDecimal.RoundingMode
2323
import org.apache.spark.Partition
2424
import org.apache.spark.internal.Logging
2525
import org.apache.spark.rdd.RDD
26-
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession, SQLContext}
26+
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SQLContext, SaveMode, SparkSession}
2727
import org.apache.spark.sql.catalyst.analysis._
28+
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
2829
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
2930
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp}
3031
import org.apache.spark.sql.connector.expressions.SortOrder
3132
import org.apache.spark.sql.connector.expressions.filter.Predicate
3233
import org.apache.spark.sql.errors.QueryCompilationErrors
34+
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.{JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES, JDBC_USE_RAW_QUERY}
3335
import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
3436
import org.apache.spark.sql.internal.SQLConf
3537
import org.apache.spark.sql.jdbc.JdbcDialects
@@ -239,11 +241,23 @@ private[sql] object JDBCRelation extends Logging {
239241
* @return resolved Catalyst schema of a JDBC table
240242
*/
241243
def getSchema(resolver: Resolver, jdbcOptions: JDBCOptions): StructType = {
242-
val tableSchema = JDBCRDD.resolveTable(jdbcOptions)
243-
jdbcOptions.customSchema match {
244-
case Some(customSchema) => JdbcUtils.getCustomSchema(
245-
tableSchema, customSchema, resolver)
246-
case None => tableSchema
244+
val runQueryAsIs = jdbcOptions.parameters.getOrElse(JDBC_USE_RAW_QUERY, "false").toBoolean
245+
if (runQueryAsIs) {
246+
val customSchema = jdbcOptions.parameters.get(JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES)
247+
val newSchema = jdbcOptions.customSchema match {
248+
case Some(customSchema) => CatalystSqlParser.parseTableSchema(customSchema)
249+
case None => throw new IllegalArgumentException(
250+
s"Field $JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES is mandatory when using $JDBC_USE_RAW_QUERY")
251+
}
252+
logInfo(s"Option $JDBC_USE_RAW_QUERY is enabled, parsed $newSchema from the filed $JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES with value $customSchema")
253+
newSchema
254+
} else {
255+
val tableSchema = JDBCRDD.resolveTable(jdbcOptions)
256+
jdbcOptions.customSchema match {
257+
case Some(customSchema) => JdbcUtils.getCustomSchema(
258+
tableSchema, customSchema, resolver)
259+
case None => tableSchema
260+
}
247261
}
248262
}
249263

sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import scala.collection.JavaConverters._
2323

2424
import org.apache.hadoop.conf.Configuration
2525
import org.apache.hadoop.fs.{Path, PathFilter}
26-
import org.apache.hadoop.hive.maprdb.json.input.HiveMapRDBJsonInputFormat
2726
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
2827
import org.apache.hadoop.hive.ql.exec.Utilities
2928
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
@@ -322,11 +321,10 @@ class HadoopTableReader(
322321
*/
323322
private def createHadoopRDD(localTableDesc: TableDesc, inputPathStr: String): RDD[Writable] = {
324323
val inputFormatClazz = localTableDesc.getInputFileFormatClass
325-
if (classOf[newInputClass[_, _]].isAssignableFrom(inputFormatClazz)
326-
&& !inputFormatClazz.isAssignableFrom(classOf[HiveMapRDBJsonInputFormat])) {
327-
createNewHadoopRDD(localTableDesc, inputPathStr)
328-
} else {
324+
if (classOf[oldInputClass[_, _]].isAssignableFrom(inputFormatClazz)) {
329325
createOldHadoopRDD(localTableDesc, inputPathStr)
326+
} else {
327+
createNewHadoopRDD(localTableDesc, inputPathStr)
330328
}
331329
}
332330

0 commit comments

Comments
 (0)