From 25d6a08feac3931a70508859a582715f39536520 Mon Sep 17 00:00:00 2001 From: zero323 Date: Wed, 3 May 2017 13:06:17 +0200 Subject: [PATCH 1/3] Add DataFrame.hint --- python/pyspark/sql/dataframe.py | 29 +++++++++++++++++++++++++++++ python/pyspark/sql/tests.py | 16 ++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index ab6d35bfa7c5..e9d34603c6dc 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -380,6 +380,35 @@ def withWatermark(self, eventTime, delayThreshold): jdf = self._jdf.withWatermark(eventTime, delayThreshold) return DataFrame(jdf, self.sql_ctx) + @since(2.3) + def hint(self, name, *parameters): + """Specifies some hint on the current DataFrame. + + :param name: A name of the hint. + :param parameters: Optional parameters. + :return: + + >>> df.join(df2.hint("broadcast"), "name").show() + +----+---+------+ + |name|age|height| + +----+---+------+ + | Bob| 5| 85| + +----+---+------+ + """ + if len(parameters) == 1 and isinstance(parameters[0], list): + parameters = parameters[0] + + if not isinstance(name, str): + raise TypeError("name should be provided as str, got {0}".format(type(name))) + + for p in parameters: + if not isinstance(p, str): + raise TypeError( + "all parameters should be str, got {0} of type {1}".format(p, type(p))) + + jdf = self._jdf.hint(name, self._jseq(parameters)) + return DataFrame(jdf, self.sql_ctx) + @since(1.3) def count(self): """Returns the number of rows in this :class:`DataFrame`. diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index ce4abf8fb7e5..f644624f7f31 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1906,6 +1906,22 @@ def test_functions_broadcast(self): # planner should not crash without a join broadcast(df1)._jdf.queryExecution().executedPlan() + def test_generic_hints(self): + from pyspark.sql import DataFrame + + df1 = self.spark.range(10e10).toDF("id") + df2 = self.spark.range(10e10).toDF("id") + + self.assertIsInstance(df1.hint("broadcast"), DataFrame) + self.assertIsInstance(df1.hint("broadcast", []), DataFrame) + + # Dummy rules + self.assertIsInstance(df1.hint("broadcast", "foo", "bar"), DataFrame) + self.assertIsInstance(df1.hint("broadcast", ["foo", "bar"]), DataFrame) + + plan = df1.join(df2.hint("broadcast"), "id")._jdf.queryExecution().executedPlan() + self.assertEqual(1, plan.toString().count("BroadcastHashJoin")) + def test_toDF_with_schema_string(self): data = [Row(key=i, value=str(i)) for i in range(100)] rdd = self.sc.parallelize(data, 5) From fe6611d53c2ae63971228c7b3232fbeb746470c8 Mon Sep 17 00:00:00 2001 From: zero323 Date: Thu, 4 May 2017 01:43:53 +0200 Subject: [PATCH 2/3] Fix whitespace and add return type --- python/pyspark/sql/dataframe.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index e9d34603c6dc..3632df84316c 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -383,11 +383,11 @@ def withWatermark(self, eventTime, delayThreshold): @since(2.3) def hint(self, name, *parameters): """Specifies some hint on the current DataFrame. - + :param name: A name of the hint. :param parameters: Optional parameters. - :return: - + :return: :class:`DataFrame` + >>> df.join(df2.hint("broadcast"), "name").show() +----+---+------+ |name|age|height| From b2198efcadf471bf2ca2ce872aaa29ada5ab8dcc Mon Sep 17 00:00:00 2001 From: zero323 Date: Thu, 4 May 2017 01:45:26 +0200 Subject: [PATCH 3/3] Change since to 2.2 --- python/pyspark/sql/dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 3632df84316c..7b67985f2b32 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -380,7 +380,7 @@ def withWatermark(self, eventTime, delayThreshold): jdf = self._jdf.withWatermark(eventTime, delayThreshold) return DataFrame(jdf, self.sql_ctx) - @since(2.3) + @since(2.2) def hint(self, name, *parameters): """Specifies some hint on the current DataFrame.