Skip to content

Commit 52a9981

Browse files
committed
Fix some bugs in the address packing code.
The problem is that TaskMemoryManager expects offsets to include the page base address whereas PackedRecordPointer did not.
1 parent 51812a7 commit 52a9981

File tree

4 files changed

+63
-22
lines changed

4 files changed

+63
-22
lines changed

core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,8 @@ public static long packPointer(long recordPointer, int partitionId) {
6868
assert (partitionId <= MAXIMUM_PARTITION_ID);
6969
// Note that without word alignment we can address 2^27 bytes = 128 megabytes per page.
7070
// Also note that this relies on some internals of how TaskMemoryManager encodes its addresses.
71-
final int pageNumber = (int) ((recordPointer & MASK_LONG_UPPER_13_BITS) >>> 51);
72-
final long compressedAddress =
73-
(((long) pageNumber) << 27) | (recordPointer & MASK_LONG_LOWER_27_BITS);
71+
final long pageNumber = (recordPointer & MASK_LONG_UPPER_13_BITS) >>> 24;
72+
final long compressedAddress = pageNumber | (recordPointer & MASK_LONG_LOWER_27_BITS);
7473
return (((long) partitionId) << 40) | compressedAddress;
7574
}
7675

@@ -85,9 +84,8 @@ public int getPartitionId() {
8584
}
8685

8786
public long getRecordPointer() {
88-
final long compressedAddress = packedRecordPointer & MASK_LONG_LOWER_40_BITS;
89-
final long pageNumber = (compressedAddress << 24) & MASK_LONG_UPPER_13_BITS;
90-
final long offsetInPage = compressedAddress & MASK_LONG_LOWER_27_BITS;
87+
final long pageNumber = (packedRecordPointer << 24) & MASK_LONG_UPPER_13_BITS;
88+
final long offsetInPage = packedRecordPointer & MASK_LONG_LOWER_27_BITS;
9189
return pageNumber | offsetInPage;
9290
}
9391

core/src/test/java/org/apache/spark/shuffle/unsafe/PackedRecordPointerSuite.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,15 @@ public void heap() {
3434
new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP));
3535
final MemoryBlock page0 = memoryManager.allocatePage(100);
3636
final MemoryBlock page1 = memoryManager.allocatePage(100);
37-
final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1, 42);
37+
final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1,
38+
page1.getBaseOffset() + 42);
3839
PackedRecordPointer packedPointer = new PackedRecordPointer();
3940
packedPointer.set(PackedRecordPointer.packPointer(addressInPage1, 360));
4041
assertEquals(360, packedPointer.getPartitionId());
41-
assertEquals(addressInPage1, packedPointer.getRecordPointer());
42+
final long recordPointer = packedPointer.getRecordPointer();
43+
assertEquals(1, TaskMemoryManager.decodePageNumber(recordPointer));
44+
assertEquals(page1.getBaseOffset() + 42, memoryManager.getOffsetInPage(recordPointer));
45+
assertEquals(addressInPage1, recordPointer);
4246
memoryManager.cleanUpAllAllocatedMemory();
4347
}
4448

@@ -48,11 +52,15 @@ public void offHeap() {
4852
new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.UNSAFE));
4953
final MemoryBlock page0 = memoryManager.allocatePage(100);
5054
final MemoryBlock page1 = memoryManager.allocatePage(100);
51-
final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1, 42);
55+
final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1,
56+
page1.getBaseOffset() + 42);
5257
PackedRecordPointer packedPointer = new PackedRecordPointer();
5358
packedPointer.set(PackedRecordPointer.packPointer(addressInPage1, 360));
5459
assertEquals(360, packedPointer.getPartitionId());
55-
assertEquals(addressInPage1, packedPointer.getRecordPointer());
60+
final long recordPointer = packedPointer.getRecordPointer();
61+
assertEquals(1, TaskMemoryManager.decodePageNumber(recordPointer));
62+
assertEquals(page1.getBaseOffset() + 42, memoryManager.getOffsetInPage(recordPointer));
63+
assertEquals(addressInPage1, recordPointer);
5664
memoryManager.cleanUpAllAllocatedMemory();
5765
}
5866

unsafe/src/main/java/org/apache/spark/unsafe/memory/TaskMemoryManager.java

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,18 @@ public final class TaskMemoryManager {
4848

4949
private final Logger logger = LoggerFactory.getLogger(TaskMemoryManager.class);
5050

51-
/**
52-
* The number of entries in the page table.
53-
*/
54-
private static final int PAGE_TABLE_SIZE = 1 << 13;
51+
/** The number of bits used to address the page table. */
52+
private static final int PAGE_NUMBER_BITS = 13;
53+
54+
/** The number of bits used to encode offsets in data pages. */
55+
@VisibleForTesting
56+
static final int OFFSET_BITS = 64 - PAGE_NUMBER_BITS; // 51
57+
58+
/** The number of entries in the page table. */
59+
private static final int PAGE_TABLE_SIZE = 1 << PAGE_NUMBER_BITS;
60+
61+
/** Maximum supported data page size */
62+
private static final long MAXIMUM_PAGE_SIZE = (1L << OFFSET_BITS);
5563

5664
/** Bit mask for the lower 51 bits of a long. */
5765
private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL;
@@ -102,8 +110,9 @@ public TaskMemoryManager(ExecutorMemoryManager executorMemoryManager) {
102110
* intended for allocating large blocks of memory that will be shared between operators.
103111
*/
104112
public MemoryBlock allocatePage(long size) {
105-
if (size >= (1L << 51)) {
106-
throw new IllegalArgumentException("Cannot allocate a page with more than 2^51 bytes");
113+
if (size > MAXIMUM_PAGE_SIZE) {
114+
throw new IllegalArgumentException(
115+
"Cannot allocate a page with more than " + MAXIMUM_PAGE_SIZE + " bytes");
107116
}
108117

109118
final int pageNumber;
@@ -168,15 +177,36 @@ public void free(MemoryBlock memory) {
168177
/**
169178
* Given a memory page and offset within that page, encode this address into a 64-bit long.
170179
* This address will remain valid as long as the corresponding page has not been freed.
180+
*
181+
* @param page a data page allocated by {@link TaskMemoryManager#allocate(long)}.
182+
* @param offsetInPage an offset in this page which incorporates the base offset. In other words,
183+
* this should be the value that you would pass as the base offset into an
184+
* UNSAFE call (e.g. page.baseOffset() + something).
185+
* @return an encoded page address.
171186
*/
172187
public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {
188+
if (!inHeap) {
189+
// In off-heap mode, an offset is an absolute address that may require a full 64 bits to
190+
// encode. Due to our page size limitation, though, we can convert this into an offset that's
191+
// relative to the page's base offset; this relative offset will fit in 51 bits.
192+
offsetInPage -= page.getBaseOffset();
193+
}
173194
return encodePageNumberAndOffset(page.pageNumber, offsetInPage);
174195
}
175196

176197
@VisibleForTesting
177198
public static long encodePageNumberAndOffset(int pageNumber, long offsetInPage) {
178199
assert (pageNumber != -1) : "encodePageNumberAndOffset called with invalid page";
179-
return (((long) pageNumber) << 51) | (offsetInPage & MASK_LONG_LOWER_51_BITS);
200+
return (((long) pageNumber) << OFFSET_BITS) | (offsetInPage & MASK_LONG_LOWER_51_BITS);
201+
}
202+
203+
@VisibleForTesting
204+
public static int decodePageNumber(long pagePlusOffsetAddress) {
205+
return (int) ((pagePlusOffsetAddress & MASK_LONG_UPPER_13_BITS) >>> OFFSET_BITS);
206+
}
207+
208+
private static long decodeOffset(long pagePlusOffsetAddress) {
209+
return (pagePlusOffsetAddress & MASK_LONG_LOWER_51_BITS);
180210
}
181211

182212
/**
@@ -185,7 +215,7 @@ public static long encodePageNumberAndOffset(int pageNumber, long offsetInPage)
185215
*/
186216
public Object getPage(long pagePlusOffsetAddress) {
187217
if (inHeap) {
188-
final int pageNumber = (int) ((pagePlusOffsetAddress & MASK_LONG_UPPER_13_BITS) >>> 51);
218+
final int pageNumber = decodePageNumber(pagePlusOffsetAddress);
189219
assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE);
190220
final Object page = pageTable[pageNumber].getBaseObject();
191221
assert (page != null);
@@ -200,11 +230,13 @@ public Object getPage(long pagePlusOffsetAddress) {
200230
* {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)}
201231
*/
202232
public long getOffsetInPage(long pagePlusOffsetAddress) {
203-
final long offsetInPage = (pagePlusOffsetAddress & MASK_LONG_LOWER_51_BITS);
233+
final long offsetInPage = decodeOffset(pagePlusOffsetAddress);
204234
if (inHeap) {
205235
return offsetInPage;
206236
} else {
207-
final int pageNumber = (int) ((pagePlusOffsetAddress & MASK_LONG_UPPER_13_BITS) >>> 51);
237+
// In off-heap mode, an offset is an absolute address. In encodePageNumberAndOffset, we
238+
// converted the absolute address into a relative address. Here, we invert that operation:
239+
final int pageNumber = decodePageNumber(pagePlusOffsetAddress);
208240
assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE);
209241
return pageTable[pageNumber].getBaseOffset() + offsetInPage;
210242
}

unsafe/src/test/java/org/apache/spark/unsafe/memory/TaskMemoryManagerSuite.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,12 @@ public void encodePageNumberAndOffsetOffHeap() {
4343
final TaskMemoryManager manager =
4444
new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.UNSAFE));
4545
final MemoryBlock dataPage = manager.allocatePage(256);
46-
final long encodedAddress = manager.encodePageNumberAndOffset(dataPage, 64);
46+
// In off-heap mode, an offset is an absolute address that may require more than 51 bits to
47+
// encode. This test exercises that corner-case:
48+
final long offset = ((1L << TaskMemoryManager.OFFSET_BITS) + 10);
49+
final long encodedAddress = manager.encodePageNumberAndOffset(dataPage, offset);
4750
Assert.assertEquals(null, manager.getPage(encodedAddress));
48-
Assert.assertEquals(dataPage.getBaseOffset() + 64, manager.getOffsetInPage(encodedAddress));
51+
Assert.assertEquals(offset, manager.getOffsetInPage(encodedAddress));
4952
}
5053

5154
@Test

0 commit comments

Comments
 (0)