From 9f6793664ca1e96e549254de9526921eb5094a8a Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 22 Dec 2017 17:02:58 +0900 Subject: [PATCH 1/4] Modify checking pandas version to use LooseVersion. --- python/pyspark/sql/session.py | 13 ++++++------- python/pyspark/sql/tests.py | 3 ++- python/pyspark/sql/types.py | 33 +++++++++++++-------------------- python/pyspark/sql/utils.py | 9 +++++++++ 4 files changed, 30 insertions(+), 28 deletions(-) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 86db16eca788..835cb044752d 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -493,16 +493,15 @@ def _create_from_pandas_with_arrow(self, pdf, schema, timezone): data types will be used to coerce the data in Pandas to Arrow conversion. """ from pyspark.serializers import ArrowSerializer, _create_batch - from pyspark.sql.types import from_arrow_schema, to_arrow_type, \ - _old_pandas_exception_message, TimestampType - from pyspark.sql.utils import _require_minimum_pyarrow_version - try: - from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype - except ImportError as e: - raise ImportError(_old_pandas_exception_message(e)) + from pyspark.sql.types import from_arrow_schema, to_arrow_type, TimestampType + from pyspark.sql.utils import _require_minimum_pandas_version, \ + _require_minimum_pyarrow_version + _require_minimum_pandas_version() _require_minimum_pyarrow_version() + from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype + # Determine arrow types to coerce data when creating batches if isinstance(schema, StructType): arrow_types = [to_arrow_type(f.dataType) for f in schema.fields] diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 6fdfda1cc831..a3f939592fd2 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -53,7 +53,8 @@ try: import pandas try: - import pandas.api + from pyspark.sql.utils import _require_minimum_pandas_version + _require_minimum_pandas_version() _have_pandas = True except: _have_old_pandas = True diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 46d9a417414b..e7bab4a7b546 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1678,13 +1678,6 @@ def from_arrow_schema(arrow_schema): for field in arrow_schema]) -def _old_pandas_exception_message(e): - """ Create an error message for importing old Pandas. - """ - msg = "note: Pandas (>=0.19.2) must be installed and available on calling Python process" - return "%s\n%s" % (_exception_message(e), msg) - - def _check_dataframe_localize_timestamps(pdf, timezone): """ Convert timezone aware timestamps to timezone-naive in the specified timezone or local timezone @@ -1693,10 +1686,10 @@ def _check_dataframe_localize_timestamps(pdf, timezone): :param timezone: the timezone to convert. if None then use local timezone :return pandas.DataFrame where any timezone aware columns have been converted to tz-naive """ - try: - from pandas.api.types import is_datetime64tz_dtype - except ImportError as e: - raise ImportError(_old_pandas_exception_message(e)) + from pyspark.sql.utils import _require_minimum_pandas_version + _require_minimum_pandas_version() + + from pandas.api.types import is_datetime64tz_dtype tz = timezone or 'tzlocal()' for column, series in pdf.iteritems(): # TODO: handle nested timestamps, such as ArrayType(TimestampType())? @@ -1714,10 +1707,10 @@ def _check_series_convert_timestamps_internal(s, timezone): :param timezone: the timezone to convert. if None then use local timezone :return pandas.Series where if it is a timestamp, has been UTC normalized without a time zone """ - try: - from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype - except ImportError as e: - raise ImportError(_old_pandas_exception_message(e)) + from pyspark.sql.utils import _require_minimum_pandas_version + _require_minimum_pandas_version() + + from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype # TODO: handle nested timestamps, such as ArrayType(TimestampType())? if is_datetime64_dtype(s.dtype): tz = timezone or 'tzlocal()' @@ -1737,11 +1730,11 @@ def _check_series_convert_timestamps_localize(s, from_timezone, to_timezone): :param to_timezone: the timezone to convert to. if None then use local timezone :return pandas.Series where if it is a timestamp, has been converted to tz-naive """ - try: - import pandas as pd - from pandas.api.types import is_datetime64tz_dtype, is_datetime64_dtype - except ImportError as e: - raise ImportError(_old_pandas_exception_message(e)) + from pyspark.sql.utils import _require_minimum_pandas_version + _require_minimum_pandas_version() + + import pandas as pd + from pandas.api.types import is_datetime64tz_dtype, is_datetime64_dtype from_tz = from_timezone or 'tzlocal()' to_tz = to_timezone or 'tzlocal()' # TODO: handle nested timestamps, such as ArrayType(TimestampType())? diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py index cc7dabb64b3e..fecd21faaf38 100644 --- a/python/pyspark/sql/utils.py +++ b/python/pyspark/sql/utils.py @@ -112,6 +112,15 @@ def toJArray(gateway, jtype, arr): return jarr +def _require_minimum_pandas_version(): + """ Raise ImportError if minimum version of Pandas is not installed + """ + from distutils.version import LooseVersion + import pandas + if LooseVersion(pandas.__version__) < LooseVersion('0.19.2'): + raise ImportError("pandas >= 0.19.2 must be installed on calling Python process") + + def _require_minimum_pyarrow_version(): """ Raise ImportError if minimum version of pyarrow is not installed """ From 21d2d4982c37f02cb2fdbe612915f827651b80b8 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 22 Dec 2017 17:53:59 +0900 Subject: [PATCH 2/4] Fix tests. --- python/pyspark/sql/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py index fecd21faaf38..deeb73b1c80d 100644 --- a/python/pyspark/sql/utils.py +++ b/python/pyspark/sql/utils.py @@ -118,7 +118,7 @@ def _require_minimum_pandas_version(): from distutils.version import LooseVersion import pandas if LooseVersion(pandas.__version__) < LooseVersion('0.19.2'): - raise ImportError("pandas >= 0.19.2 must be installed on calling Python process") + raise ImportError("Pandas >= 0.19.2 must be installed on calling Python process") def _require_minimum_pyarrow_version(): From 677f6bce3f12754848e97293d8bde12819204bb6 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 22 Dec 2017 17:58:19 +0900 Subject: [PATCH 3/4] Fix tests. --- python/pyspark/sql/tests.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index a3f939592fd2..138322bd2f3c 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -2601,7 +2601,7 @@ def test_to_pandas(self): @unittest.skipIf(not _have_old_pandas, "Old Pandas not installed") def test_to_pandas_old(self): with QuietTest(self.sc): - with self.assertRaisesRegexp(ImportError, 'Pandas \(.*\) must be installed'): + with self.assertRaisesRegexp(ImportError, 'Pandas >= .* must be installed'): self._to_pandas() @unittest.skipIf(not _have_pandas, "Pandas not installed") @@ -2644,7 +2644,7 @@ def test_create_dataframe_from_old_pandas(self): pdf = pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)], "d": [pd.Timestamp.now().date()]}) with QuietTest(self.sc): - with self.assertRaisesRegexp(ImportError, 'Pandas \(.*\) must be installed'): + with self.assertRaisesRegexp(ImportError, 'Pandas >= .* must be installed'): self.spark.createDataFrame(pdf) From 7ef68870077d13296b8700bfadb9a5862765351e Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 22 Dec 2017 19:01:38 +0900 Subject: [PATCH 4/4] Remove underscore from check methods. --- python/pyspark/sql/dataframe.py | 4 ++-- python/pyspark/sql/session.py | 8 ++++---- python/pyspark/sql/tests.py | 4 ++-- python/pyspark/sql/types.py | 12 ++++++------ python/pyspark/sql/udf.py | 4 ++-- python/pyspark/sql/utils.py | 4 ++-- 6 files changed, 18 insertions(+), 18 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 440684d3edfa..95eca76fa988 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1906,9 +1906,9 @@ def toPandas(self): if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true": try: from pyspark.sql.types import _check_dataframe_localize_timestamps - from pyspark.sql.utils import _require_minimum_pyarrow_version + from pyspark.sql.utils import require_minimum_pyarrow_version import pyarrow - _require_minimum_pyarrow_version() + require_minimum_pyarrow_version() tables = self._collectAsArrow() if tables: table = pyarrow.concat_tables(tables) diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 835cb044752d..6e5eec48e8ac 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -494,11 +494,11 @@ def _create_from_pandas_with_arrow(self, pdf, schema, timezone): """ from pyspark.serializers import ArrowSerializer, _create_batch from pyspark.sql.types import from_arrow_schema, to_arrow_type, TimestampType - from pyspark.sql.utils import _require_minimum_pandas_version, \ - _require_minimum_pyarrow_version + from pyspark.sql.utils import require_minimum_pandas_version, \ + require_minimum_pyarrow_version - _require_minimum_pandas_version() - _require_minimum_pyarrow_version() + require_minimum_pandas_version() + require_minimum_pyarrow_version() from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 138322bd2f3c..b977160af566 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -53,8 +53,8 @@ try: import pandas try: - from pyspark.sql.utils import _require_minimum_pandas_version - _require_minimum_pandas_version() + from pyspark.sql.utils import require_minimum_pandas_version + require_minimum_pandas_version() _have_pandas = True except: _have_old_pandas = True diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index e7bab4a7b546..063264a89379 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1686,8 +1686,8 @@ def _check_dataframe_localize_timestamps(pdf, timezone): :param timezone: the timezone to convert. if None then use local timezone :return pandas.DataFrame where any timezone aware columns have been converted to tz-naive """ - from pyspark.sql.utils import _require_minimum_pandas_version - _require_minimum_pandas_version() + from pyspark.sql.utils import require_minimum_pandas_version + require_minimum_pandas_version() from pandas.api.types import is_datetime64tz_dtype tz = timezone or 'tzlocal()' @@ -1707,8 +1707,8 @@ def _check_series_convert_timestamps_internal(s, timezone): :param timezone: the timezone to convert. if None then use local timezone :return pandas.Series where if it is a timestamp, has been UTC normalized without a time zone """ - from pyspark.sql.utils import _require_minimum_pandas_version - _require_minimum_pandas_version() + from pyspark.sql.utils import require_minimum_pandas_version + require_minimum_pandas_version() from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype # TODO: handle nested timestamps, such as ArrayType(TimestampType())? @@ -1730,8 +1730,8 @@ def _check_series_convert_timestamps_localize(s, from_timezone, to_timezone): :param to_timezone: the timezone to convert to. if None then use local timezone :return pandas.Series where if it is a timestamp, has been converted to tz-naive """ - from pyspark.sql.utils import _require_minimum_pandas_version - _require_minimum_pandas_version() + from pyspark.sql.utils import require_minimum_pandas_version + require_minimum_pandas_version() import pandas as pd from pandas.api.types import is_datetime64tz_dtype, is_datetime64_dtype diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 50c87ba1ac88..123138117fdc 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -37,9 +37,9 @@ def _create_udf(f, returnType, evalType): if evalType == PythonEvalType.SQL_PANDAS_SCALAR_UDF or \ evalType == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF: import inspect - from pyspark.sql.utils import _require_minimum_pyarrow_version + from pyspark.sql.utils import require_minimum_pyarrow_version - _require_minimum_pyarrow_version() + require_minimum_pyarrow_version() argspec = inspect.getargspec(f) if evalType == PythonEvalType.SQL_PANDAS_SCALAR_UDF and len(argspec.args) == 0 and \ diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py index deeb73b1c80d..fb7d42a35d8f 100644 --- a/python/pyspark/sql/utils.py +++ b/python/pyspark/sql/utils.py @@ -112,7 +112,7 @@ def toJArray(gateway, jtype, arr): return jarr -def _require_minimum_pandas_version(): +def require_minimum_pandas_version(): """ Raise ImportError if minimum version of Pandas is not installed """ from distutils.version import LooseVersion @@ -121,7 +121,7 @@ def _require_minimum_pandas_version(): raise ImportError("Pandas >= 0.19.2 must be installed on calling Python process") -def _require_minimum_pyarrow_version(): +def require_minimum_pyarrow_version(): """ Raise ImportError if minimum version of pyarrow is not installed """ from distutils.version import LooseVersion