diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index 5794f542b7564..97fc139179628 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -53,7 +53,22 @@ private[spark] class BlockStoreShuffleReader[K, C]( // Wrap the streams for compression based on configuration val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) => - serializerManager.wrapForCompression(blockId, inputStream) + try { + serializerManager.wrapForCompression(blockId, inputStream) + } catch { + case e: IOException => { + if ((e.getMessage.contains("FAILED_TO_UNCOMPRESS(5)") || + e.getMessage.contains("PARSING_ERROR(2)") || + e.getMessage.contains("Stream is corrupted")) && blockId.isShuffle) { + val shuffleBlockId = blockId.asInstanceOf[ShuffleBlockId] + throw new FetchFailedException( + blockManager.blockManagerId, shuffleBlockId.shuffleId, + shuffleBlockId.mapId, shuffleBlockId.reduceId, e) + } else { + throw e + } + } + } } val serializerInstance = dep.serializer.newInstance()