Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,11 @@
<artifactId>py4j</artifactId>
<version>0.8.2.1</version>
</dependency>
<dependency>
<groupId>com.intel.chimera</groupId>
<artifactId>chimera</artifactId>
<version>0.0.1</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, how stable is this project? It seems, from the github page, to be a re-packaging of the Hadoop code with maybe some more bells & whistles, but the version number doesn't scream stability...

</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.crypto

import org.apache.hadoop.io.Text

/**
* Constant variables
*/
private[spark] object CommonConfigurationKeys {
val SPARK_SHUFFLE_TOKEN: Text = new Text("SPARK_SHUFFLE_TOKEN")
val SPARK_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB: String = "spark.job" +
".encrypted-intermediate-data.buffer.kb"
val DEFAULT_SPARK_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB: Int = 128
val SPARK_ENCRYPTED_INTERMEDIATE_DATA: String =
"spark.job.encrypted-intermediate-data"
val DEFAULT_SPARK_ENCRYPTED_INTERMEDIATE_DATA: Boolean = false
val SPARK_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS: String =
"spark.job.encrypted-intermediate-data-key-size-bits"
val DEFAULT_SPARK_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS: Int = 128
}
68 changes: 68 additions & 0 deletions core/src/main/scala/org/apache/spark/crypto/CryptoConf.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.crypto

import java.security.NoSuchAlgorithmException
import javax.crypto.{KeyGenerator, SecretKey}

import org.apache.hadoop.security.Credentials

import org.apache.spark.crypto.CommonConfigurationKeys._
import org.apache.spark.SparkConf

/**
* CryptoConf is a class for Crypto configuration
*/
private[spark] case class CryptoConf(enabled: Boolean = false) {

}

private[spark] object CryptoConf {
def parse(sparkConf: SparkConf): CryptoConf = {
val enabled = if (sparkConf != null) {
sparkConf.getBoolean("spark.encrypted.shuffle", false)
} else {
false
}
new CryptoConf(enabled)
}

def initSparkShuffleCredentials(conf:SparkConf, credentials: Credentials) {
if (credentials.getSecretKey(SPARK_SHUFFLE_TOKEN) == null) {
var keyGen: KeyGenerator = null
try {
val SHUFFLE_KEY_LENGTH: Int = 64
var keyLen: Int = if (conf.getBoolean(SPARK_ENCRYPTED_INTERMEDIATE_DATA,
DEFAULT_SPARK_ENCRYPTED_INTERMEDIATE_DATA) == true) {
conf.getInt(SPARK_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,
DEFAULT_SPARK_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)
} else {
SHUFFLE_KEY_LENGTH
}
val SHUFFLE_KEYGEN_ALGORITHM = "HmacSHA1";
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM)
keyGen.init(keyLen)
} catch {
case e: NoSuchAlgorithmException => throw new RuntimeException("Error generating " +
"shuffle secret key")
}
val shuffleKey: SecretKey = keyGen.generateKey
credentials.addSecretKey(SPARK_SHUFFLE_TOKEN, shuffleKey.getEncoded)
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ private[spark] class BlockManager(
val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
val syncWrites = conf.getBoolean("spark.shuffle.sync", false)
new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream, syncWrites,
writeMetrics)
writeMetrics).setSparkConf(conf)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,15 @@ package org.apache.spark.storage
import java.io.{BufferedOutputStream, FileOutputStream, File, OutputStream}
import java.nio.channels.FileChannel

import org.apache.spark.Logging
import org.apache.spark.serializer.{SerializationStream, Serializer}
import org.apache.hadoop.mapreduce.security.TokenCache
import com.intel.chimera.{CipherSuite, CryptoCodec, CryptoOutputStream}

import org.apache.spark.{Logging,SparkConf}
import org.apache.spark.crypto.CommonConfigurationKeys._
import org.apache.spark.crypto.CryptoConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.serializer.{SerializationStream, Serializer}

/**
* An interface for writing JVM objects to some underlying storage. This interface allows
Expand Down Expand Up @@ -123,19 +129,47 @@ private[spark] class DiskBlockObjectWriter(
*/
private var numRecordsWritten = 0

private var sparkConf:SparkConf = null

override def open(): BlockObjectWriter = {
if (hasBeenClosed) {
throw new IllegalStateException("Writer already closed. Cannot be reopened.")
}
fos = new FileOutputStream(file, true)
ts = new TimeTrackingOutputStream(fos)
val cryptoConf = CryptoConf.parse(sparkConf)
if (cryptoConf.enabled) {
val cryptoCodec: CryptoCodec = CryptoCodec.getInstance(CipherSuite.AES_CTR_NOPADDING)
val bufferSize: Int = sparkConf.getInt(
SPARK_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB,
DEFAULT_SPARK_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB) * 1024
val iv: Array[Byte] = createInitializationVector(cryptoCodec)
val credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
var key: Array[Byte] = credentials.getSecretKey(SPARK_SHUFFLE_TOKEN)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you cannot get the user credential, you can write a default number to configuration file every time when a certain user started the jobs, and retrieve this number in such case.
however, i think it maybe unsafe to store SPARK_SHUFFLE_TOKEN and related information in plain text in configuration file, it is better to cypher the texts then store the values.

fos.write(iv)
val cos = new CryptoOutputStream(fos, cryptoCodec,
bufferSize, key, iv, iv.length)
ts = new TimeTrackingOutputStream(cos)
} else {
ts = new TimeTrackingOutputStream(fos)
}
channel = fos.getChannel()
bs = compressStream(new BufferedOutputStream(ts, bufferSize))
objOut = serializer.newInstance().serializeStream(bs)
initialized = true
this
}

def createInitializationVector(cryptoCodec: CryptoCodec): Array[Byte] = {
val iv: Array[Byte] = new Array[Byte](cryptoCodec.getCipherSuite.getAlgorithmBlockSize)
cryptoCodec.generateSecureRandom(iv)
iv
}

def setSparkConf(sparkConfVal: SparkConf): DiskBlockObjectWriter = {
sparkConf = sparkConfVal
this
}

override def close() {
if (initialized) {
if (syncWrites) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@

package org.apache.spark.storage

import java.io.{InputStream, IOException}
import java.io.InputStream
import java.util.concurrent.LinkedBlockingQueue

import scala.collection.mutable.{ArrayBuffer, HashSet, Queue}
import scala.util.{Failure, Success, Try}

import com.intel.chimera.{CipherSuite, CryptoCodec, CryptoInputStream}

import org.apache.spark.{Logging, TaskContext}
import org.apache.spark.crypto.CommonConfigurationKeys._
import org.apache.spark.crypto.CryptoConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient}
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.{CompletionIterator, Utils}

Expand Down Expand Up @@ -296,8 +301,27 @@ final class ShuffleBlockFetcherIterator(
// There is a chance that createInputStream can fail (e.g. fetching a local file that does
// not exist, SPARK-4085). In that case, we should propagate the right exception so
// the scheduler gets a FetchFailedException.
// is0:InputStream
Try(buf.createInputStream()).map { is0 =>
val is = blockManager.wrapForCompression(blockId, is0)
var is: InputStream = null
val sparkConf = blockManager.conf
val cryptoConf = CryptoConf.parse(sparkConf)
if (cryptoConf.enabled) {
val cryptoCodec: CryptoCodec = CryptoCodec.getInstance(CipherSuite.AES_CTR_NOPADDING)
val bufferSize: Int = sparkConf.getInt(
SPARK_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB,
DEFAULT_SPARK_ENCRYPTED_INTERMEDIATE_DATA_BUFFER_KB) * 1024
val iv: Array[Byte] = new Array[Byte](16)
is0.read(iv, 0, iv.length)
val streamOffset: Long = iv.length
val credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
var key: Array[Byte] = credentials.getSecretKey(SPARK_SHUFFLE_TOKEN)
var cos = new CryptoInputStream(is0, cryptoCodec, bufferSize, key,
iv, streamOffset)
is = blockManager.wrapForCompression(blockId, cos)
} else {
is = blockManager.wrapForCompression(blockId, is0)
}
val iter = serializer.newInstance().deserializeStream(is).asIterator
CompletionIterator[Any, Iterator[Any]](iter, {
// Once the iterator is exhausted, release the buffer and set currentResult to null
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.crypto

import java.io.{BufferedOutputStream, ByteArrayInputStream, ByteArrayOutputStream}
import java.security.SecureRandom

import org.apache.spark.Logging
import org.scalatest.FunSuite
import com.intel.chimera.{CryptoCodec, CryptoInputStream, CryptoOutputStream, JceAesCtrCryptoCodec}

/**
* test JceAesCtrCryptoCodec
*/
class JceAesCtrCryptoCodecSuite extends FunSuite with Logging {

test("TestJceAesCtrCryptoCodecSuite"){
val random: SecureRandom = new SecureRandom
val dataLen: Int = 10000000
val inputData: Array[Byte] = new Array[Byte](dataLen)
val outputData: Array[Byte] = new Array[Byte](dataLen)
random.nextBytes(inputData)
// encrypt
val codec: CryptoCodec = new JceAesCtrCryptoCodec()
val aos: ByteArrayOutputStream = new ByteArrayOutputStream
val bos: BufferedOutputStream = new BufferedOutputStream(aos)
val key: Array[Byte] = new Array[Byte](16)
val iv: Array[Byte] = new Array[Byte](16)
random.nextBytes(key)
random.nextBytes(iv)

val cos: CryptoOutputStream = new CryptoOutputStream(bos, codec, 1024, key, iv)
cos.write(inputData, 0, inputData.length)
cos.flush
// decrypt
val cis: CryptoInputStream = new CryptoInputStream(new ByteArrayInputStream(aos.toByteArray),
codec, 1024, key, iv)
var readLen: Int = 0
var outOffset: Int = 0
while (readLen < dataLen) {
val n: Int = cis.read(outputData, outOffset, outputData.length - outOffset)
if (n >= 0) {
readLen += n
outOffset += n
}
}
var i: Int = 0
for(i <- 0 until dataLen )
{
if (inputData(i) != outputData(i)) {
logInfo(s"decrypt failed:$i")
}
}
}
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.crypto

import java.io.{BufferedOutputStream, ByteArrayInputStream, ByteArrayOutputStream}
import java.security.SecureRandom

import org.apache.spark.Logging
import org.scalatest.FunSuite
import com.intel.chimera.{CryptoCodec, CryptoInputStream, CryptoOutputStream, OpensslAesCtrCryptoCodec}

/**
* Test OpensslAesCtrCryptoCodecSuite
*/
class OpensslAesCtrCryptoCodecSuite extends FunSuite with Logging {

test("TestOpensslAesCtrCryptoCodecSuite"){
val random: SecureRandom = new SecureRandom
val dataLen: Int = 10000000
val inputData: Array[Byte] = new Array[Byte](dataLen)
val outputData: Array[Byte] = new Array[Byte](dataLen)
random.nextBytes(inputData)
// encrypt
val codec: CryptoCodec = new OpensslAesCtrCryptoCodec()
val aos: ByteArrayOutputStream = new ByteArrayOutputStream
val bos: BufferedOutputStream = new BufferedOutputStream(aos)
val key: Array[Byte] = new Array[Byte](16)
val iv: Array[Byte] = new Array[Byte](16)
random.nextBytes(key)
random.nextBytes(iv)

val cos: CryptoOutputStream = new CryptoOutputStream(bos, codec, 1024, key, iv)
cos.write(inputData, 0, inputData.length)
cos.flush
// decrypt
val cis: CryptoInputStream = new CryptoInputStream(new ByteArrayInputStream(aos.toByteArray),
codec, 1024, key, iv)
var readLen: Int = 0
var outOffset: Int = 0
while (readLen < dataLen) {
val n: Int = cis.read(outputData, outOffset, outputData.length - outOffset)
if (n >= 0) {
readLen += n
outOffset += n
}
}
var i: Int = 0
for(i <- 0 until dataLen )
{
if (inputData(i) != outputData(i)) {
logInfo(s"decrypt failed:$i")
}
}
}
}

Loading