From 6ad24536fabf34e3abe77ac6b74f93e3f6a8160c Mon Sep 17 00:00:00 2001 From: Zhongshuai Pei Date: Thu, 21 Apr 2016 11:22:54 +0800 Subject: [PATCH] try catch unompress --- .../spark/shuffle/BlockStoreShuffleReader.scala | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index 5794f542b7564..97fc139179628 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -53,7 +53,22 @@ private[spark] class BlockStoreShuffleReader[K, C]( // Wrap the streams for compression based on configuration val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) => - serializerManager.wrapForCompression(blockId, inputStream) + try { + serializerManager.wrapForCompression(blockId, inputStream) + } catch { + case e: IOException => { + if ((e.getMessage.contains("FAILED_TO_UNCOMPRESS(5)") || + e.getMessage.contains("PARSING_ERROR(2)") || + e.getMessage.contains("Stream is corrupted")) && blockId.isShuffle) { + val shuffleBlockId = blockId.asInstanceOf[ShuffleBlockId] + throw new FetchFailedException( + blockManager.blockManagerId, shuffleBlockId.shuffleId, + shuffleBlockId.mapId, shuffleBlockId.reduceId, e) + } else { + throw e + } + } + } } val serializerInstance = dep.serializer.newInstance()