Skip to content

Commit 81d52c5

Browse files
committed
WIP on UnsafeSorter
1 parent 27de6fe commit 81d52c5

File tree

3 files changed

+236
-0
lines changed

3 files changed

+236
-0
lines changed
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.unsafe.sort;
19+
20+
import org.apache.spark.util.collection.SortDataFormat;
21+
22+
/**
23+
* TODO: finish writing this description
24+
*
25+
* Within each long[] buffer, position {@code 2 * i} holds a pointer pointer to the record at
26+
* index {@code i}, while position {@code 2 * i + 1} in the array holds an 8-byte key prefix.
27+
*/
28+
final class UnsafeSortDataFormat
29+
extends SortDataFormat<UnsafeSortDataFormat.KeyPointerAndPrefix, long[]> {
30+
31+
public static final UnsafeSortDataFormat INSTANCE = new UnsafeSortDataFormat();
32+
33+
private UnsafeSortDataFormat() { };
34+
35+
public static final class KeyPointerAndPrefix {
36+
/**
37+
* A pointer to a record; see {@link org.apache.spark.unsafe.memory.TaskMemoryManager} for a
38+
* description of how these addresses are encoded.
39+
*/
40+
long recordPointer;
41+
42+
/**
43+
* A key prefix, for use in comparisons.
44+
*/
45+
long keyPrefix;
46+
}
47+
48+
@Override
49+
public KeyPointerAndPrefix getKey(long[] data, int pos) {
50+
// Since we re-use keys, this method shouldn't be called.
51+
throw new UnsupportedOperationException();
52+
}
53+
54+
@Override
55+
public KeyPointerAndPrefix newKey() {
56+
return new KeyPointerAndPrefix();
57+
}
58+
59+
@Override
60+
public KeyPointerAndPrefix getKey(long[] data, int pos, KeyPointerAndPrefix reuse) {
61+
reuse.recordPointer = data[pos * 2];
62+
reuse.keyPrefix = data[pos * 2 + 1];
63+
return reuse;
64+
}
65+
66+
@Override
67+
public void swap(long[] data, int pos0, int pos1) {
68+
long tempPointer = data[pos0 * 2];
69+
long tempKeyPrefix = data[pos0 * 2 + 1];
70+
data[pos0 * 2] = data[pos1 * 2];
71+
data[pos0 * 2 + 1] = data[pos1 * 2 + 1];
72+
data[pos1 * 2] = tempPointer;
73+
data[pos1 * 2 + 1] = tempKeyPrefix;
74+
}
75+
76+
@Override
77+
public void copyElement(long[] src, int srcPos, long[] dst, int dstPos) {
78+
dst[dstPos * 2] = src[srcPos * 2];
79+
dst[dstPos * 2 + 1] = src[srcPos * 2 + 1];
80+
}
81+
82+
@Override
83+
public void copyRange(long[] src, int srcPos, long[] dst, int dstPos, int length) {
84+
System.arraycopy(src, srcPos * 2, dst, dstPos * 2, length * 2);
85+
}
86+
87+
@Override
88+
public long[] allocate(int length) {
89+
assert (length < Integer.MAX_VALUE / 2) : "Length " + length + " is too large";
90+
return new long[length * 2];
91+
}
92+
93+
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.unsafe.sort;
19+
20+
import java.util.Comparator;
21+
import java.util.Iterator;
22+
23+
import org.apache.spark.unsafe.memory.MemoryLocation;
24+
import org.apache.spark.util.collection.Sorter;
25+
import org.apache.spark.unsafe.memory.TaskMemoryManager;
26+
import static org.apache.spark.unsafe.sort.UnsafeSortDataFormat.KeyPointerAndPrefix;
27+
28+
public final class UnsafeSorter {
29+
30+
public static abstract class RecordComparator {
31+
public abstract int compare(
32+
Object leftBaseObject,
33+
long leftBaseOffset,
34+
Object rightBaseObject,
35+
long rightBaseOffset);
36+
}
37+
38+
public static abstract class PrefixComputer {
39+
public abstract long computePrefix(Object baseObject, long baseOffset);
40+
}
41+
42+
/**
43+
* Compares 8-byte key prefixes in prefix sort. Subclasses may implement type-specific comparisons,
44+
* such as lexicographic comparison for strings.
45+
*/
46+
public static abstract class PrefixComparator {
47+
public abstract int compare(long prefix1, long prefix2);
48+
}
49+
50+
private final TaskMemoryManager memoryManager;
51+
private final PrefixComputer prefixComputer;
52+
private final Sorter<KeyPointerAndPrefix, long[]> sorter;
53+
private final Comparator<KeyPointerAndPrefix> sortComparator;
54+
55+
/**
56+
* Within this buffer, position {@code 2 * i} holds a pointer pointer to the record at
57+
* index {@code i}, while position {@code 2 * i + 1} in the array holds an 8-byte key prefix.
58+
*/
59+
private long[] sortBuffer = new long[1024];
60+
61+
private int sortBufferInsertPosition = 0;
62+
63+
private void expandSortBuffer(int newSize) {
64+
assert (newSize > sortBuffer.length);
65+
final long[] oldBuffer = sortBuffer;
66+
sortBuffer = new long[newSize];
67+
System.arraycopy(oldBuffer, 0, sortBuffer, 0, oldBuffer.length);
68+
}
69+
70+
public UnsafeSorter(
71+
final TaskMemoryManager memoryManager,
72+
final RecordComparator recordComparator,
73+
PrefixComputer prefixComputer,
74+
final PrefixComparator prefixComparator) {
75+
this.memoryManager = memoryManager;
76+
this.prefixComputer = prefixComputer;
77+
this.sorter =
78+
new Sorter<KeyPointerAndPrefix, long[]>(UnsafeSortDataFormat.INSTANCE);
79+
this.sortComparator = new Comparator<KeyPointerAndPrefix>() {
80+
@Override
81+
public int compare(KeyPointerAndPrefix left, KeyPointerAndPrefix right) {
82+
if (left.keyPrefix == right.keyPrefix) {
83+
final Object leftBaseObject = memoryManager.getPage(left.recordPointer);
84+
final long leftBaseOffset = memoryManager.getOffsetInPage(left.recordPointer);
85+
final Object rightBaseObject = memoryManager.getPage(right.recordPointer);
86+
final long rightBaseOffset = memoryManager.getOffsetInPage(right.recordPointer);
87+
return recordComparator.compare(
88+
leftBaseObject, leftBaseOffset, rightBaseObject, rightBaseOffset);
89+
} else {
90+
return prefixComparator.compare(left.keyPrefix, right.keyPrefix);
91+
}
92+
}
93+
};
94+
}
95+
96+
public void insertRecord(long objectAddress) {
97+
if (sortBufferInsertPosition + 2 == sortBuffer.length) {
98+
expandSortBuffer(sortBuffer.length * 2);
99+
}
100+
final Object baseObject = memoryManager.getPage(objectAddress);
101+
final long baseOffset = memoryManager.getOffsetInPage(objectAddress);
102+
final long keyPrefix = prefixComputer.computePrefix(baseObject, baseOffset);
103+
sortBuffer[sortBufferInsertPosition] = objectAddress;
104+
sortBuffer[sortBufferInsertPosition + 1] = keyPrefix;
105+
sortBufferInsertPosition += 2;
106+
}
107+
108+
public Iterator<MemoryLocation> getSortedIterator() {
109+
final MemoryLocation memoryLocation = new MemoryLocation();
110+
sorter.sort(sortBuffer, 0, sortBufferInsertPosition, sortComparator);
111+
return new Iterator<MemoryLocation>() {
112+
int position = 0;
113+
114+
@Override
115+
public boolean hasNext() {
116+
return position < sortBufferInsertPosition;
117+
}
118+
119+
@Override
120+
public MemoryLocation next() {
121+
final long address = sortBuffer[position];
122+
position += 2;
123+
final Object baseObject = memoryManager.getPage(address);
124+
final long baseOffset = memoryManager.getOffsetInPage(address);
125+
memoryLocation.setObjAndOffset(baseObject, baseOffset);
126+
return memoryLocation;
127+
}
128+
129+
@Override
130+
public void remove() {
131+
throw new UnsupportedOperationException();
132+
}
133+
};
134+
}
135+
136+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.apache.spark.unsafe.sort;
2+
3+
/**
4+
* Created by joshrosen on 4/29/15.
5+
*/
6+
public class UnsafeSorterSuite {
7+
}

0 commit comments

Comments
 (0)