Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,9 @@ public final class BytesToBytesMap extends MemoryConsumer {
*/
private final Location loc;

private final boolean enablePerfMetrics;
private long numProbes = 0L;

private long numProbes = 0;

private long numKeyLookups = 0;
private long numKeyLookups = 0L;

private long peakMemoryUsedBytes = 0L;

Expand All @@ -180,16 +178,14 @@ public BytesToBytesMap(
SerializerManager serializerManager,
int initialCapacity,
double loadFactor,
long pageSizeBytes,
boolean enablePerfMetrics) {
long pageSizeBytes) {
super(taskMemoryManager, pageSizeBytes, taskMemoryManager.getTungstenMemoryMode());
this.taskMemoryManager = taskMemoryManager;
this.blockManager = blockManager;
this.serializerManager = serializerManager;
this.loadFactor = loadFactor;
this.loc = new Location();
this.pageSizeBytes = pageSizeBytes;
this.enablePerfMetrics = enablePerfMetrics;
if (initialCapacity <= 0) {
throw new IllegalArgumentException("Initial capacity must be greater than 0");
}
Expand All @@ -209,23 +205,14 @@ public BytesToBytesMap(
TaskMemoryManager taskMemoryManager,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this constructor is called, the enablePerfMetrics will be false. Where do we use this constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review the current master branch, this constructor is called by HashedRelationSuite , AggregateBenchmark, UnsafeKVExternalSorterSuite, AbstractBytesToBytesMapSuite. thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK all tests, then we are fine

int initialCapacity,
long pageSizeBytes) {
this(taskMemoryManager, initialCapacity, pageSizeBytes, false);
}

public BytesToBytesMap(
TaskMemoryManager taskMemoryManager,
int initialCapacity,
long pageSizeBytes,
boolean enablePerfMetrics) {
this(
taskMemoryManager,
SparkEnv.get() != null ? SparkEnv.get().blockManager() : null,
SparkEnv.get() != null ? SparkEnv.get().serializerManager() : null,
initialCapacity,
// In order to re-use the longArray for sorting, the load factor cannot be larger than 0.5.
0.5,
pageSizeBytes,
enablePerfMetrics);
pageSizeBytes);
}

/**
Expand Down Expand Up @@ -462,15 +449,12 @@ public Location lookup(Object keyBase, long keyOffset, int keyLength, int hash)
public void safeLookup(Object keyBase, long keyOffset, int keyLength, Location loc, int hash) {
assert(longArray != null);

if (enablePerfMetrics) {
numKeyLookups++;
}
numKeyLookups++;

int pos = hash & mask;
int step = 1;
while (true) {
if (enablePerfMetrics) {
numProbes++;
}
numProbes++;
if (longArray.get(pos * 2) == 0) {
// This is a new key.
loc.with(pos, hash, false);
Expand Down Expand Up @@ -860,9 +844,6 @@ public long getPeakMemoryUsedBytes() {
* Returns the average number of probes per key lookup.
*/
public double getAverageProbesPerLookup() {
if (!enablePerfMetrics) {
throw new IllegalStateException();
}
return (1.0 * numProbes) / numKeyLookups;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ public void failureToGrow() {
@Test
public void spillInIterator() throws IOException {
BytesToBytesMap map = new BytesToBytesMap(
taskMemoryManager, blockManager, serializerManager, 1, 0.75, 1024, false);
taskMemoryManager, blockManager, serializerManager, 1, 0.75, 1024);
try {
int i;
for (i = 0; i < 1024; i++) {
Expand Down Expand Up @@ -569,7 +569,7 @@ public void spillInIterator() throws IOException {
@Test
public void multipleValuesForSameKey() {
BytesToBytesMap map =
new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 1, 0.5, 1024, false);
new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 1, 0.5, 1024);
try {
int i;
for (i = 0; i < 1024; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public UnsafeFixedWidthAggregationMap(
this.groupingKeyProjection = UnsafeProjection.create(groupingKeySchema);
this.groupingKeySchema = groupingKeySchema;
this.map = new BytesToBytesMap(
taskContext.taskMemoryManager(), initialCapacity, pageSizeBytes, true);
taskContext.taskMemoryManager(), initialCapacity, pageSizeBytes);

// Initialize the buffer for aggregation value
final UnsafeProjection valueProjection = UnsafeProjection.create(aggregationBufferSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ private[joins] class UnsafeHashedRelation(
binaryMap = new BytesToBytesMap(
taskMemoryManager,
(nKeys * 1.5 + 1).toInt, // reduce hash collision
pageSizeBytes,
true)
pageSizeBytes)

var i = 0
var keyBuffer = new Array[Byte](1024)
Expand Down Expand Up @@ -299,8 +298,7 @@ private[joins] object UnsafeHashedRelation {
taskMemoryManager,
// Only 70% of the slots can be used before growing, more capacity help to reduce collision
(sizeEstimate * 1.5 + 1).toInt,
pageSizeBytes,
true)
pageSizeBytes)

// Create a mapping of buildKeys -> rows
val keyGenerator = UnsafeProjection.create(key)
Expand Down