Skip to content

Commit 38e784d

Browse files
committed
addressed comments v1.0
1 parent 8279d4d commit 38e784d

File tree

6 files changed

+198
-164
lines changed

6 files changed

+198
-164
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala

Lines changed: 13 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package org.apache.spark.sql
2020
import java.io.CharArrayWriter
2121
import java.sql.DriverManager
2222

23+
import org.apache.spark.sql.execution.stat.FrequentItems
24+
2325
import scala.collection.JavaConversions._
2426
import scala.language.implicitConversions
2527
import scala.reflect.ClassTag
@@ -41,7 +43,6 @@ import org.apache.spark.sql.catalyst.plans.logical._
4143
import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD}
4244
import org.apache.spark.sql.jdbc.JDBCWriteDetails
4345
import org.apache.spark.sql.json.JsonRDD
44-
import org.apache.spark.sql.ml.FrequentItems
4546
import org.apache.spark.sql.types._
4647
import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsSelect}
4748
import org.apache.spark.util.Utils
@@ -331,6 +332,17 @@ class DataFrame private[sql](
331332
*/
332333
def na: DataFrameNaFunctions = new DataFrameNaFunctions(this)
333334

335+
/**
336+
* Returns a [[DataFrameStatFunctions]] for working statistic functions support.
337+
* {{{
338+
* // Finding frequent items in column with name 'a'.
339+
* df.stat.freqItems(Seq("a"))
340+
* }}}
341+
*
342+
* @group dfops
343+
*/
344+
def stat: DataFrameStatFunctions = new DataFrameStatFunctions(this)
345+
334346
/**
335347
* Cartesian join with another [[DataFrame]].
336348
*
@@ -1415,37 +1427,4 @@ class DataFrame private[sql](
14151427
val jrdd = rdd.map(EvaluatePython.rowToArray(_, fieldTypes)).toJavaRDD()
14161428
SerDeUtil.javaToPython(jrdd)
14171429
}
1418-
1419-
/////////////////////////////////////////////////////////////////////////////
1420-
// Statistic functions
1421-
/////////////////////////////////////////////////////////////////////////////
1422-
1423-
// scalastyle:off
1424-
object stat {
1425-
// scalastyle:on
1426-
1427-
/**
1428-
* Finding frequent items for columns, possibly with false positives. Using the algorithm
1429-
* described in `http://www.cs.umd.edu/~samir/498/karp.pdf`.
1430-
*
1431-
* @param cols the names of the columns to search frequent items in
1432-
* @param support The minimum frequency for an item to be considered `frequent`
1433-
* @return A Local DataFrame with the Array of frequent items for each column.
1434-
*/
1435-
def freqItems(cols: Array[String], support: Double): DataFrame = {
1436-
FrequentItems.singlePassFreqItems(toDF(), cols, support)
1437-
}
1438-
1439-
/**
1440-
* Finding frequent items for columns, possibly with false positives. Using the algorithm
1441-
* described in `http://www.cs.umd.edu/~samir/498/karp.pdf`.
1442-
* Returns items more frequent than 1/1000'th of the time.
1443-
*
1444-
* @param cols the names of the columns to search frequent items in
1445-
* @return A Local DataFrame with the Array of frequent items for each column.
1446-
*/
1447-
def freqItems(cols: Array[String]): DataFrame = {
1448-
FrequentItems.singlePassFreqItems(toDF(), cols, 0.001)
1449-
}
1450-
}
14511430
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql
19+
20+
import org.apache.spark.annotation.Experimental
21+
import org.apache.spark.sql.execution.stat.FrequentItems
22+
23+
/**
24+
* :: Experimental ::
25+
* Statistic functions for [[DataFrame]]s.
26+
*/
27+
@Experimental
28+
final class DataFrameStatFunctions private[sql](df: DataFrame) {
29+
30+
/**
31+
* Finding frequent items for columns, possibly with false positives. Using the
32+
* frequent element count algorithm described in
33+
* [[http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou]].
34+
*
35+
* @param cols the names of the columns to search frequent items in
36+
* @param support The minimum frequency for an item to be considered `frequent`
37+
* @return A Local DataFrame with the Array of frequent items for each column.
38+
*/
39+
def freqItems(cols: Seq[String], support: Double): DataFrame = {
40+
FrequentItems.singlePassFreqItems(df, cols, support)
41+
}
42+
43+
/**
44+
* Finding frequent items for columns, possibly with false positives. Using the
45+
* frequent element count algorithm described in
46+
* [[http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou]].
47+
* Returns items more frequent than 1/1000'th of the time.
48+
*
49+
* @param cols the names of the columns to search frequent items in
50+
* @return A Local DataFrame with the Array of frequent items for each column.
51+
*/
52+
def freqItems(cols: Seq[String]): DataFrame = {
53+
FrequentItems.singlePassFreqItems(df, cols, 0.001)
54+
}
55+
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.stat
19+
20+
import org.apache.spark.Logging
21+
import org.apache.spark.sql.{Column, DataFrame, Row}
22+
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
23+
import org.apache.spark.sql.types.{ArrayType, StructField, StructType}
24+
25+
import scala.collection.mutable.{Map => MutableMap}
26+
27+
private[sql] object FrequentItems extends Logging {
28+
29+
/** A helper class wrapping `MutableMap[Any, Long]` for simplicity. */
30+
private class FreqItemCounter(size: Int) extends Serializable {
31+
val baseMap: MutableMap[Any, Long] = MutableMap.empty[Any, Long]
32+
33+
/**
34+
* Add a new example to the counts if it exists, otherwise deduct the count
35+
* from existing items.
36+
*/
37+
def add(key: Any, count: Long): this.type = {
38+
if (baseMap.contains(key)) {
39+
baseMap(key) += count
40+
} else {
41+
if (baseMap.size < size) {
42+
baseMap += key -> count
43+
} else {
44+
// TODO: Make this more efficient... A flatMap?
45+
baseMap.retain((k, v) => v > count)
46+
baseMap.transform((k, v) => v - count)
47+
}
48+
}
49+
this
50+
}
51+
52+
/**
53+
* Merge two maps of counts.
54+
* @param other The map containing the counts for that partition
55+
*/
56+
def merge(other: FreqItemCounter): this.type = {
57+
other.toSeq.foreach { case (k, v) =>
58+
add(k, v)
59+
}
60+
this
61+
}
62+
63+
def toSeq: Seq[(Any, Long)] = baseMap.toSeq
64+
65+
def foldLeft[A, B](start: A)(f: (A, (Any, Long)) => A): A = baseMap.foldLeft(start)(f)
66+
67+
def freqItems: Seq[Any] = baseMap.keys.toSeq
68+
}
69+
70+
/**
71+
* Finding frequent items for columns, possibly with false positives. Using the
72+
* frequent element count algorithm described in
73+
* [[http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou]].
74+
* For Internal use only.
75+
*
76+
* @param df The input DataFrame
77+
* @param cols the names of the columns to search frequent items in
78+
* @param support The minimum frequency for an item to be considered `frequent`
79+
* @return A Local DataFrame with the Array of frequent items for each column.
80+
*/
81+
private[sql] def singlePassFreqItems(
82+
df: DataFrame,
83+
cols: Seq[String],
84+
support: Double): DataFrame = {
85+
if (support < 1e-6) {
86+
logWarning(s"The selected support ($support) is too small, and might cause memory problems.")
87+
}
88+
val numCols = cols.length
89+
// number of max items to keep counts for
90+
val sizeOfMap = (1 / support).toInt
91+
val countMaps = Seq.tabulate(numCols)(i => new FreqItemCounter(sizeOfMap))
92+
val originalSchema = df.schema
93+
val colInfo = cols.map { name =>
94+
val index = originalSchema.fieldIndex(name)
95+
(name, originalSchema.fields(index).dataType)
96+
}
97+
98+
val freqItems = df.select(cols.map(Column(_)):_*).rdd.aggregate(countMaps)(
99+
seqOp = (counts, row) => {
100+
var i = 0
101+
while (i < numCols) {
102+
val thisMap = counts(i)
103+
val key = row.get(i)
104+
thisMap.add(key, 1L)
105+
i += 1
106+
}
107+
counts
108+
},
109+
combOp = (baseCounts, counts) => {
110+
var i = 0
111+
while (i < numCols) {
112+
baseCounts(i).merge(counts(i))
113+
i += 1
114+
}
115+
baseCounts
116+
}
117+
)
118+
val justItems = freqItems.map(m => m.freqItems)
119+
val resultRow = Row(justItems:_*)
120+
// append frequent Items to the column name for easy debugging
121+
val outputCols = colInfo.map{ v =>
122+
StructField(v._1 + "_freqItems", ArrayType(v._2, false))
123+
}
124+
val schema = StructType(outputCols).toAttributes
125+
new DataFrame(df.sqlContext, LocalRelation(schema, Seq(resultRow)))
126+
}
127+
}

sql/core/src/main/scala/org/apache/spark/sql/ml/FrequentItems.scala

Lines changed: 0 additions & 124 deletions
This file was deleted.

sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,5 +178,4 @@ public void testCreateDataFrameFromJavaBeans() {
178178
Assert.assertEquals(bean.getD().get(i), d.apply(i));
179179
}
180180
}
181-
182181
}

0 commit comments

Comments
 (0)