Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,9 +530,10 @@ def lead(col, count=1, default=None):
@since(1.4)
def ntile(n):
"""
Window function: returns a group id from 1 to `n` (inclusive) in a round-robin fashion in
a window partition. Fow example, if `n` is 3, the first row will get 1, the second row will
get 2, the third row will get 3, and the fourth row will get 1...
Window function: returns the ntile group id (from 1 to `n` inclusive)
in an ordered window partition. Fow example, if `n` is 4, the first
quarter of the rows will get value 1, the second quarter will get 2,
the third quarter will get 3, and the last quarter will get 4.

This is equivalent to the NTILE function in SQL.

Expand Down
23 changes: 23 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1124,5 +1124,28 @@ def test_window_functions(self):
for r, ex in zip(rs, expected):
self.assertEqual(tuple(r), ex[:len(r)])

def test_window_functions_without_partitionBy(self):
df = self.sqlCtx.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
w = Window.orderBy("key", df.value)
from pyspark.sql import functions as F
sel = df.select(df.value, df.key,
F.max("key").over(w.rowsBetween(0, 1)),
F.min("key").over(w.rowsBetween(0, 1)),
F.count("key").over(w.rowsBetween(float('-inf'), float('inf'))),
F.rowNumber().over(w),
F.rank().over(w),
F.denseRank().over(w),
F.ntile(2).over(w))
rs = sorted(sel.collect())
expected = [
("1", 1, 1, 1, 4, 1, 1, 1, 1),
("2", 1, 1, 1, 4, 2, 2, 2, 1),
("2", 1, 2, 1, 4, 3, 2, 2, 2),
("2", 2, 2, 2, 4, 4, 4, 3, 2)
]
for r, ex in zip(rs, expected):
self.assertEqual(tuple(r), ex[:len(r)])


if __name__ == "__main__":
unittest.main()
2 changes: 1 addition & 1 deletion python/pyspark/sql/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def orderBy(*cols):
Creates a :class:`WindowSpec` with the partitioning defined.
"""
sc = SparkContext._active_spark_context
jspec = sc._jvm.org.apache.spark.sql.expressions.Window.partitionBy(_to_java_cols(cols))
jspec = sc._jvm.org.apache.spark.sql.expressions.Window.orderBy(_to_java_cols(cols))
return WindowSpec(jspec)


Expand Down