From 8abcf821ee608981f09b69893162f04223682941 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 26 Jul 2015 14:24:56 -0700 Subject: [PATCH 1/3] Properly decrement freeSpaceInCurrentPage in UnsafeExternalSorter --- .../unsafe/sort/UnsafeExternalSorter.java | 7 ++++++- .../sort/UnsafeExternalSorterSuite.java | 19 +++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 4d6731ee60af..80b03d7e99e2 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -150,6 +150,11 @@ private long getMemoryUsage() { return sorter.getMemoryUsage() + (allocatedPages.size() * (long) PAGE_SIZE); } + @VisibleForTesting + public int getNumberOfAllocatedPages() { + return allocatedPages.size(); + } + public long freeMemory() { long memoryFreed = 0; for (MemoryBlock block : allocatedPages) { @@ -257,7 +262,7 @@ public void insertRecord( currentPagePosition, lengthInBytes); currentPagePosition += lengthInBytes; - + freeSpaceInCurrentPage -= totalSpaceRequired; sorter.insertRecord(recordAddress, prefix); } diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index ea8755e21eb6..0e391b751226 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -199,4 +199,23 @@ public void testSortingEmptyArrays() throws Exception { } } + @Test + public void testFillingPage() throws Exception { + final UnsafeExternalSorter sorter = new UnsafeExternalSorter( + memoryManager, + shuffleMemoryManager, + blockManager, + taskContext, + recordComparator, + prefixComparator, + 1024, + new SparkConf()); + + byte[] record = new byte[16]; + while (sorter.getNumberOfAllocatedPages() < 2) { + sorter.insertRecord(record, PlatformDependent.BYTE_ARRAY_OFFSET, record.length, 0); + } + sorter.freeMemory(); + } + } From f4cf91df7b3b71eb881c0d08e069b1f81c25c929 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 26 Jul 2015 15:23:32 -0700 Subject: [PATCH 2/3] Fix use-after-free bug in UnsafeExternalRowSorter. --- .../spark/sql/execution/UnsafeExternalRowSorter.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java index be4ff400c475..811d595a64ec 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java @@ -141,10 +141,13 @@ public InternalRow next() { numFields, sortedIterator.getRecordLength()); if (!hasNext()) { - row.copy(); // so that we don't have dangling pointers to freed page + UnsafeRow copy = row.copy(); // so that we don't have dangling pointers to freed page + row.pointTo(null, 0, 0, 0); // so that we don't keep references to the base object cleanupResources(); + return copy; + } else { + return row; } - return row; } catch (IOException e) { cleanupResources(); // Scala iterators don't declare any checked exceptions, so we need to use this hack From 590f3114a41d9a710bbe0bdd36846f7b3c75e97c Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 26 Jul 2015 22:27:56 -0700 Subject: [PATCH 3/3] null out row --- .../apache/spark/sql/execution/UnsafeExternalRowSorter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java index 811d595a64ec..4c3f2c655714 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java @@ -124,7 +124,7 @@ Iterator sort() throws IOException { return new AbstractScalaRowIterator() { private final int numFields = schema.length(); - private final UnsafeRow row = new UnsafeRow(); + private UnsafeRow row = new UnsafeRow(); @Override public boolean hasNext() { @@ -142,7 +142,7 @@ public InternalRow next() { sortedIterator.getRecordLength()); if (!hasNext()) { UnsafeRow copy = row.copy(); // so that we don't have dangling pointers to freed page - row.pointTo(null, 0, 0, 0); // so that we don't keep references to the base object + row = null; // so that we don't keep references to the base object cleanupResources(); return copy; } else {