diff --git a/core/pom.xml b/core/pom.xml index 66180035e61f..cac1874873d1 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -359,6 +359,11 @@ py4j 0.8.2.1 + + com.intel.chimera + chimera + 0.0.1 + target/scala-${scala.binary.version}/classes diff --git a/core/src/main/scala/org/apache/spark/crypto/CommonConfigurationKeys.scala b/core/src/main/scala/org/apache/spark/crypto/CommonConfigurationKeys.scala new file mode 100644 index 000000000000..04216f90041f --- /dev/null +++ b/core/src/main/scala/org/apache/spark/crypto/CommonConfigurationKeys.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import org.apache.hadoop.io.Text + +/** + * Constant variables + */ +private[spark] object CommonConfigurationKeys { + val SPARK_SHUFFLE_TOKEN: Text = new Text("SPARK_SHUFFLE_TOKEN") + val SPARK_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB: String = "spark.job" + + ".encrypted-intermediate-data.buffer.kb" + val DEFAULT_SPARK_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB: Int = 128 + val SPARK_ENCRYPTED_INTERMEDIATE_DATA: String = + "spark.job.encrypted-intermediate-data" + val DEFAULT_SPARK_ENCRYPTED_INTERMEDIATE_DATA: Boolean = false + val SPARK_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS: String = + "spark.job.encrypted-intermediate-data-key-size-bits" + val DEFAULT_SPARK_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS: Int = 128 +} diff --git a/core/src/main/scala/org/apache/spark/crypto/CryptoConf.scala b/core/src/main/scala/org/apache/spark/crypto/CryptoConf.scala new file mode 100644 index 000000000000..c7e6eca30462 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/crypto/CryptoConf.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import java.security.NoSuchAlgorithmException +import javax.crypto.{KeyGenerator, SecretKey} + +import org.apache.hadoop.security.Credentials + +import org.apache.spark.crypto.CommonConfigurationKeys._ +import org.apache.spark.SparkConf + +/** + * CryptoConf is a class for Crypto configuration + */ +private[spark] case class CryptoConf(enabled: Boolean = false) { + +} + +private[spark] object CryptoConf { + def parse(sparkConf: SparkConf): CryptoConf = { + val enabled = if (sparkConf != null) { + sparkConf.getBoolean("spark.encrypted.shuffle", false) + } else { + false + } + new CryptoConf(enabled) + } + + def initSparkShuffleCredentials(conf:SparkConf, credentials: Credentials) { + if (credentials.getSecretKey(SPARK_SHUFFLE_TOKEN) == null) { + var keyGen: KeyGenerator = null + try { + val SHUFFLE_KEY_LENGTH: Int = 64 + var keyLen: Int = if (conf.getBoolean(SPARK_ENCRYPTED_INTERMEDIATE_DATA, + DEFAULT_SPARK_ENCRYPTED_INTERMEDIATE_DATA) == true) { + conf.getInt(SPARK_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, + DEFAULT_SPARK_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS) + } else { + SHUFFLE_KEY_LENGTH + } + val SHUFFLE_KEYGEN_ALGORITHM = "HmacSHA1"; + keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM) + keyGen.init(keyLen) + } catch { + case e: NoSuchAlgorithmException => throw new RuntimeException("Error generating " + + "shuffle secret key") + } + val shuffleKey: SecretKey = keyGen.generateKey + credentials.addSecretKey(SPARK_SHUFFLE_TOKEN, shuffleKey.getEncoded) + } + } +} + diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 86dbd89f0ffb..523f59d85a47 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -651,7 +651,7 @@ private[spark] class BlockManager( val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _) val syncWrites = conf.getBoolean("spark.shuffle.sync", false) new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream, syncWrites, - writeMetrics) + writeMetrics).setSparkConf(conf) } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 81164178b9e8..81c12bd0a936 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -20,9 +20,15 @@ package org.apache.spark.storage import java.io.{BufferedOutputStream, FileOutputStream, File, OutputStream} import java.nio.channels.FileChannel -import org.apache.spark.Logging -import org.apache.spark.serializer.{SerializationStream, Serializer} +import org.apache.hadoop.mapreduce.security.TokenCache +import com.intel.chimera.{CipherSuite, CryptoCodec, CryptoOutputStream} + +import org.apache.spark.{Logging,SparkConf} +import org.apache.spark.crypto.CommonConfigurationKeys._ +import org.apache.spark.crypto.CryptoConf +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.ShuffleWriteMetrics +import org.apache.spark.serializer.{SerializationStream, Serializer} /** * An interface for writing JVM objects to some underlying storage. This interface allows @@ -123,12 +129,29 @@ private[spark] class DiskBlockObjectWriter( */ private var numRecordsWritten = 0 + private var sparkConf:SparkConf = null + override def open(): BlockObjectWriter = { if (hasBeenClosed) { throw new IllegalStateException("Writer already closed. Cannot be reopened.") } fos = new FileOutputStream(file, true) - ts = new TimeTrackingOutputStream(fos) + val cryptoConf = CryptoConf.parse(sparkConf) + if (cryptoConf.enabled) { + val cryptoCodec: CryptoCodec = CryptoCodec.getInstance(CipherSuite.AES_CTR_NOPADDING) + val bufferSize: Int = sparkConf.getInt( + SPARK_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB, + DEFAULT_SPARK_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB) * 1024 + val iv: Array[Byte] = createInitializationVector(cryptoCodec) + val credentials = SparkHadoopUtil.get.getCurrentUserCredentials() + var key: Array[Byte] = credentials.getSecretKey(SPARK_SHUFFLE_TOKEN) + fos.write(iv) + val cos = new CryptoOutputStream(fos, cryptoCodec, + bufferSize, key, iv, iv.length) + ts = new TimeTrackingOutputStream(cos) + } else { + ts = new TimeTrackingOutputStream(fos) + } channel = fos.getChannel() bs = compressStream(new BufferedOutputStream(ts, bufferSize)) objOut = serializer.newInstance().serializeStream(bs) @@ -136,6 +159,17 @@ private[spark] class DiskBlockObjectWriter( this } + def createInitializationVector(cryptoCodec: CryptoCodec): Array[Byte] = { + val iv: Array[Byte] = new Array[Byte](cryptoCodec.getCipherSuite.getAlgorithmBlockSize) + cryptoCodec.generateSecureRandom(iv) + iv + } + + def setSparkConf(sparkConfVal: SparkConf): DiskBlockObjectWriter = { + sparkConf = sparkConfVal + this + } + override def close() { if (initialized) { if (syncWrites) { diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index ab9ee4f0096b..07e2c3b22be7 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -17,16 +17,21 @@ package org.apache.spark.storage -import java.io.{InputStream, IOException} +import java.io.InputStream import java.util.concurrent.LinkedBlockingQueue import scala.collection.mutable.{ArrayBuffer, HashSet, Queue} import scala.util.{Failure, Success, Try} +import com.intel.chimera.{CipherSuite, CryptoCodec, CryptoInputStream} + import org.apache.spark.{Logging, TaskContext} +import org.apache.spark.crypto.CommonConfigurationKeys._ +import org.apache.spark.crypto.CryptoConf +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.network.BlockTransferService import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient} -import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.serializer.Serializer import org.apache.spark.util.{CompletionIterator, Utils} @@ -296,8 +301,27 @@ final class ShuffleBlockFetcherIterator( // There is a chance that createInputStream can fail (e.g. fetching a local file that does // not exist, SPARK-4085). In that case, we should propagate the right exception so // the scheduler gets a FetchFailedException. + // is0:InputStream Try(buf.createInputStream()).map { is0 => - val is = blockManager.wrapForCompression(blockId, is0) + var is: InputStream = null + val sparkConf = blockManager.conf + val cryptoConf = CryptoConf.parse(sparkConf) + if (cryptoConf.enabled) { + val cryptoCodec: CryptoCodec = CryptoCodec.getInstance(CipherSuite.AES_CTR_NOPADDING) + val bufferSize: Int = sparkConf.getInt( + SPARK_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB, + DEFAULT_SPARK_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB) * 1024 + val iv: Array[Byte] = new Array[Byte](16) + is0.read(iv, 0, iv.length) + val streamOffset: Long = iv.length + val credentials = SparkHadoopUtil.get.getCurrentUserCredentials() + var key: Array[Byte] = credentials.getSecretKey(SPARK_SHUFFLE_TOKEN) + var cos = new CryptoInputStream(is0, cryptoCodec, bufferSize, key, + iv, streamOffset) + is = blockManager.wrapForCompression(blockId, cos) + } else { + is = blockManager.wrapForCompression(blockId, is0) + } val iter = serializer.newInstance().deserializeStream(is).asIterator CompletionIterator[Any, Iterator[Any]](iter, { // Once the iterator is exhausted, release the buffer and set currentResult to null diff --git a/core/src/test/scala/org/apache/spark/crypto/JceAesCtrCryptoCodecSuite.scala b/core/src/test/scala/org/apache/spark/crypto/JceAesCtrCryptoCodecSuite.scala new file mode 100644 index 000000000000..7f8a873738c1 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/crypto/JceAesCtrCryptoCodecSuite.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import java.io.{BufferedOutputStream, ByteArrayInputStream, ByteArrayOutputStream} +import java.security.SecureRandom + +import org.apache.spark.Logging +import org.scalatest.FunSuite +import com.intel.chimera.{CryptoCodec, CryptoInputStream, CryptoOutputStream, JceAesCtrCryptoCodec} + +/** + * test JceAesCtrCryptoCodec + */ +class JceAesCtrCryptoCodecSuite extends FunSuite with Logging { + + test("TestJceAesCtrCryptoCodecSuite"){ + val random: SecureRandom = new SecureRandom + val dataLen: Int = 10000000 + val inputData: Array[Byte] = new Array[Byte](dataLen) + val outputData: Array[Byte] = new Array[Byte](dataLen) + random.nextBytes(inputData) + // encrypt + val codec: CryptoCodec = new JceAesCtrCryptoCodec() + val aos: ByteArrayOutputStream = new ByteArrayOutputStream + val bos: BufferedOutputStream = new BufferedOutputStream(aos) + val key: Array[Byte] = new Array[Byte](16) + val iv: Array[Byte] = new Array[Byte](16) + random.nextBytes(key) + random.nextBytes(iv) + + val cos: CryptoOutputStream = new CryptoOutputStream(bos, codec, 1024, key, iv) + cos.write(inputData, 0, inputData.length) + cos.flush + // decrypt + val cis: CryptoInputStream = new CryptoInputStream(new ByteArrayInputStream(aos.toByteArray), + codec, 1024, key, iv) + var readLen: Int = 0 + var outOffset: Int = 0 + while (readLen < dataLen) { + val n: Int = cis.read(outputData, outOffset, outputData.length - outOffset) + if (n >= 0) { + readLen += n + outOffset += n + } + } + var i: Int = 0 + for(i <- 0 until dataLen ) + { + if (inputData(i) != outputData(i)) { + logInfo(s"decrypt failed:$i") + } + } + } +} + + diff --git a/core/src/test/scala/org/apache/spark/crypto/OpensslAesCtrCryptoCodecSuite.scala b/core/src/test/scala/org/apache/spark/crypto/OpensslAesCtrCryptoCodecSuite.scala new file mode 100644 index 000000000000..f2bbe89a6b94 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/crypto/OpensslAesCtrCryptoCodecSuite.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.crypto + +import java.io.{BufferedOutputStream, ByteArrayInputStream, ByteArrayOutputStream} +import java.security.SecureRandom + +import org.apache.spark.Logging +import org.scalatest.FunSuite +import com.intel.chimera.{CryptoCodec, CryptoInputStream, CryptoOutputStream, OpensslAesCtrCryptoCodec} + +/** + * Test OpensslAesCtrCryptoCodecSuite + */ +class OpensslAesCtrCryptoCodecSuite extends FunSuite with Logging { + + test("TestOpensslAesCtrCryptoCodecSuite"){ + val random: SecureRandom = new SecureRandom + val dataLen: Int = 10000000 + val inputData: Array[Byte] = new Array[Byte](dataLen) + val outputData: Array[Byte] = new Array[Byte](dataLen) + random.nextBytes(inputData) + // encrypt + val codec: CryptoCodec = new OpensslAesCtrCryptoCodec() + val aos: ByteArrayOutputStream = new ByteArrayOutputStream + val bos: BufferedOutputStream = new BufferedOutputStream(aos) + val key: Array[Byte] = new Array[Byte](16) + val iv: Array[Byte] = new Array[Byte](16) + random.nextBytes(key) + random.nextBytes(iv) + + val cos: CryptoOutputStream = new CryptoOutputStream(bos, codec, 1024, key, iv) + cos.write(inputData, 0, inputData.length) + cos.flush + // decrypt + val cis: CryptoInputStream = new CryptoInputStream(new ByteArrayInputStream(aos.toByteArray), + codec, 1024, key, iv) + var readLen: Int = 0 + var outOffset: Int = 0 + while (readLen < dataLen) { + val n: Int = cis.read(outputData, outOffset, outputData.length - outOffset) + if (n >= 0) { + readLen += n + outOffset += n + } + } + var i: Int = 0 + for(i <- 0 until dataLen ) + { + if (inputData(i) != outputData(i)) { + logInfo(s"decrypt failed:$i") + } + } + } + } + diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index a9bf861d160c..5c03d421abf9 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicReference import akka.actor._ import akka.remote._ + import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.util.ShutdownHookManager import org.apache.hadoop.yarn.api._ @@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv} import org.apache.spark.SparkException +import org.apache.spark.crypto.CryptoConf import org.apache.spark.deploy.{PythonRunner, SparkHadoopUtil} import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.scheduler.cluster.YarnSchedulerBackend @@ -273,6 +275,7 @@ private[spark] class ApplicationMaster( sc.getConf.get("spark.driver.host"), sc.getConf.get("spark.driver.port"), isClusterMode = true) + initJobCredentialsAndUGI() registerAM(sc.ui.map(_.appUIAddress).getOrElse(""), securityMgr) userClassThread.join() } @@ -283,6 +286,7 @@ private[spark] class ApplicationMaster( conf = sparkConf, securityManager = securityMgr)._1 waitForSparkDriver() addAmIpFilter() + initJobCredentialsAndUGI() registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr) // In client mode the actor will stop the reporter thread. @@ -552,8 +556,22 @@ private[spark] class ApplicationMaster( } } + /** set "SPARK_SHUFFLE_TOKEN" before registerAM + * @return + */ + private def initJobCredentialsAndUGI() = { + val sc = sparkContextRef.get() + val conf = if (sc != null) sc.getConf else sparkConf + val cryptoConf = CryptoConf.parse(conf) + if (cryptoConf.enabled) { + val credentials = SparkHadoopUtil.get.getCurrentUserCredentials + CryptoConf.initSparkShuffleCredentials(conf, credentials) + SparkHadoopUtil.get.addCurrentUserCredentials(credentials) + } + } } + object ApplicationMaster extends Logging { val SHUTDOWN_HOOK_PRIORITY: Int = 30 diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 46d9df93488c..36483be11aa9 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.yarn -import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} +import java.net.{InetAddress, UnknownHostException, URI} import java.nio.ByteBuffer import scala.collection.JavaConversions._ @@ -26,12 +26,13 @@ import scala.util.{Try, Success, Failure} import com.google.common.base.Objects -import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.hadoop.fs.permission.FsPermission +import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.mapred.Master import org.apache.hadoop.mapreduce.MRJobConfig +import org.apache.hadoop.mapreduce.security.TokenCache import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.util.StringUtils import org.apache.hadoop.yarn.api._ @@ -43,6 +44,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.Records import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException} +import org.apache.spark.crypto.CommonConfigurationKeys._ +import org.apache.spark.crypto.CryptoConf import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.Utils @@ -534,6 +537,11 @@ private[spark] class Client( // send the acl settings into YARN to control who has access via YARN interfaces val securityManager = new SecurityManager(sparkConf) amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager)) + + val cryptoConf = CryptoConf.parse(sparkConf) + if (cryptoConf.enabled) { + CryptoConf.initSparkShuffleCredentials(sparkConf, credentials) + } setupSecurityToken(amContainer) UserGroupInformation.getCurrentUser().addCredentials(credentials)