Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2615,6 +2615,42 @@ object SQLConf {
.checkValue(bit => bit >= 10 && bit <= 30, "The bit value must be in [10, 30].")
.createWithDefault(16)

val SKIP_PARTIAL_AGGREGATE_MIN_ROWS =
buildConf("spark.sql.aggregate.skipPartialAggregate.minNumRows")
.internal()
.doc("The minimal number of input rows processed before hash aggregate checks if it can be" +
" skipped. Only applies to partial hash aggregate.")
.version("3.1.2")
.longConf
.checkValue(minNumRows => minNumRows > 0, "Invalid value for " +
"spark.sql.aggregate.skipPartialAggregate.minNumRows. Valid value needs" +
" to be greater than 0" )
.createWithDefault(100000)

val SKIP_PARTIAL_AGGREGATE_MIN_RATIO =
buildConf("spark.sql.aggregate.skipPartialAggregate.minRatio")
.internal()
.doc("The minimal ratio between input and output rows for partial hash aggregate allows it" +
" to be skipped")
.version("3.1.2")
.doubleConf
.checkValue(ratio => ratio > 0 && ratio < 1, "Invalid value for " +
"spark.sql.aggregate.skipPartialAggregate.minRatio. Valid value needs" +
" to be between 0 and 1" )
.createWithDefault(0.5)

val SKIP_PARTIAL_AGGREGATE_ENABLED =
buildConf("spark.sql.aggregate.skipPartialAggregate.enabled")
.internal()
.doc("When enabled, the partial aggregation is skipped when the following " +
"two conditions are met. 1. When the total number of records processed is greater " +
s"than threshold defined by ${SKIP_PARTIAL_AGGREGATE_MIN_ROWS.key} 2. When the ratio " +
"of record count in map to the total records is less that value defined by " +
s"${SKIP_PARTIAL_AGGREGATE_MIN_RATIO.key}")
.version("3.1.2")
.booleanConf
.createWithDefault(true)

val AVRO_COMPRESSION_CODEC = buildConf("spark.sql.avro.compression.codec")
.doc("Compression codec used in writing of AVRO files. Supported codecs: " +
"uncompressed, deflate, snappy, bzip2, xz and zstandard. Default codec is snappy.")
Expand Down Expand Up @@ -3605,6 +3641,12 @@ class SQLConf extends Serializable with Logging {

def fastHashAggregateRowMaxCapacityBit: Int = getConf(FAST_HASH_AGGREGATE_MAX_ROWS_CAPACITY_BIT)

def skipPartialAggregate: Boolean = getConf(SKIP_PARTIAL_AGGREGATE_ENABLED)

def skipPartialAggregateThreshold: Long = getConf(SKIP_PARTIAL_AGGREGATE_MIN_ROWS)

def skipPartialAggregateRatio: Double = getConf(SKIP_PARTIAL_AGGREGATE_MIN_RATIO)

def datetimeJava8ApiEnabled: Boolean = getConf(DATETIME_JAVA8API_ENABLED)

def uiExplainMode: String = getConf(UI_EXPLAIN_MODE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ public final class UnsafeFixedWidthAggregationMap {
*/
private final UnsafeRow currentAggregationBuffer;

/**
* Number of rows that were added to the map
* This includes the elements that were passed on sorter
* using {@link #destructAndCreateExternalSorter()}
*/
private long numRowsAdded = 0L;

/**
* @return true if UnsafeFixedWidthAggregationMap supports aggregation buffers with the given
* schema, false otherwise.
Expand Down Expand Up @@ -147,6 +154,8 @@ public UnsafeRow getAggregationBufferFromUnsafeRow(UnsafeRow key, int hash) {
);
if (!putSucceeded) {
return null;
} else {
numRowsAdded = numRowsAdded + 1;
}
}

Expand Down Expand Up @@ -249,4 +258,8 @@ public UnsafeKVExternalSorter destructAndCreateExternalSorter() throws IOExcepti
package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD()),
map);
}

public long getNumRows() {
return numRowsAdded;
}
}
Loading