Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions python/pyspark/pandas/data_type_ops/num_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def _non_fractional_astype(
elif isinstance(spark_type, BooleanType):
return _as_bool_type(index_ops, dtype)
elif isinstance(spark_type, StringType):
return _as_string_type(index_ops, dtype, null_str=str(np.nan))
return _as_string_type(index_ops, dtype, null_str="NaN")
else:
return _as_other_type(index_ops, dtype, spark_type)

Expand Down Expand Up @@ -447,10 +447,29 @@ def nan_to_null(self, index_ops: IndexOpsLike) -> IndexOpsLike:
return index_ops.copy()

def astype(self, index_ops: IndexOpsLike, dtype: Union[str, type, Dtype]) -> IndexOpsLike:
# TODO(SPARK-36230): check index_ops.hasnans after fixing SPARK-36230
dtype, spark_type = pandas_on_spark_type(dtype)
if is_integer_dtype(dtype) and not isinstance(dtype, extension_dtypes):
if index_ops.hasnans:
raise ValueError(
"Cannot convert %s with missing values to integer" % self.pretty_name
)
return _non_fractional_astype(index_ops, dtype, spark_type)

def rpow(self, left: IndexOpsLike, right: Any) -> SeriesOrIndex:
_sanitize_list_like(right)
if not isinstance(right, numbers.Number):
raise TypeError("Exponentiation can not be applied to given types.")

def rpow_func(left: Column, right: Any) -> Column:
return (
F.when(left.isNull(), np.nan)
.when(SF.lit(right == 1), right)
.otherwise(Column.__rpow__(left, right))
)

right = transform_boolean_operand_to_numeric(right)
return column_op(rpow_func)(left, right)


class IntegralExtensionOps(IntegralOps):
"""
Expand Down
56 changes: 21 additions & 35 deletions python/pyspark/pandas/tests/data_type_ops/test_num_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,30 +172,20 @@ def test_pow(self):
pdf, psdf = self.pdf, self.psdf
for col in self.numeric_df_cols:
pser, psser = pdf[col], psdf[col]
if col == "float":
if col in ["float", "float_w_nan"]:
self.assert_eq(pser ** pser, psser ** psser)
self.assert_eq(pser ** pser.astype(bool), psser ** psser.astype(bool))
self.assert_eq(pser ** True, psser ** True)
self.assert_eq(pser ** False, psser ** False)
self.assert_eq(pser ** 1, psser ** 1)
self.assert_eq(pser ** 0, psser ** 0)

for n_col in self.non_numeric_df_cols:
if n_col == "bool":
self.assert_eq(pdf["float"] ** pdf[n_col], psdf["float"] ** psdf[n_col])
else:
self.assertRaises(TypeError, lambda: psser ** psdf[n_col])

# TODO(SPARK-36031): Merge test_pow_with_nan into test_pow
def test_pow_with_float_nan(self):
for col in self.numeric_w_nan_df_cols:
if col == "float_w_nan":
pser, psser = self.numeric_w_nan_pdf[col], self.numeric_w_nan_psdf[col]
self.assert_eq(pser ** pser, psser ** psser)
self.assert_eq(pser ** pser.astype(bool), psser ** psser.astype(bool))
self.assert_eq(pser ** True, psser ** True)
self.assert_eq(pser ** False, psser ** False)
self.assert_eq(pser ** 1, psser ** 1)
self.assert_eq(pser ** 0, psser ** 0)

def test_radd(self):
pdf, psdf = self.pdf, self.psdf
for col in self.numeric_df_cols:
Expand Down Expand Up @@ -344,40 +334,36 @@ def test_from_to_pandas(self):
self.assert_eq(ps.from_pandas(pser), psser)

def test_isnull(self):
pdf, psdf = self.numeric_w_nan_pdf, self.numeric_w_nan_psdf
for col in self.numeric_w_nan_df_cols:
pdf, psdf = self.pdf, self.psdf
for col in self.numeric_df_cols:
self.assert_eq(pdf[col].isnull(), psdf[col].isnull())

def test_astype(self):
pdf, psdf = self.pdf, self.psdf
for col in self.numeric_df_cols:
pser, psser = pdf[col], psdf[col]
self.assert_eq(pser.astype(int), psser.astype(int))

for int_type in [int, np.int32, np.int16, np.int8]:
if not pser.hasnans:
self.assert_eq(pser.astype(int_type), psser.astype(int_type))
else:
self.assertRaisesRegex(
ValueError,
"Cannot convert %s with missing "
"values to integer" % psser._dtype_op.pretty_name,
lambda: psser.astype(int_type),
)

# TODO(SPARK-37039): the np.nan series.astype(bool) should be True
if not pser.hasnans:
self.assert_eq(pser.astype(bool), psser.astype(bool))

self.assert_eq(pser.astype(float), psser.astype(float))
self.assert_eq(pser.astype(np.float32), psser.astype(np.float32))
self.assert_eq(pser.astype(np.int32), psser.astype(np.int32))
self.assert_eq(pser.astype(np.int16), psser.astype(np.int16))
self.assert_eq(pser.astype(np.int8), psser.astype(np.int8))
self.assert_eq(pser.astype(str), psser.astype(str))
self.assert_eq(pser.astype(bool), psser.astype(bool))
self.assert_eq(pser.astype("category"), psser.astype("category"))
cat_type = CategoricalDtype(categories=[2, 1, 3])
self.assert_eq(pser.astype(cat_type), psser.astype(cat_type))
self.assertRaisesRegex(
ValueError,
"Cannot convert fractions with missing values to integer",
lambda: self.float_withnan_psser.astype(int),
)
self.assertRaisesRegex(
ValueError,
"Cannot convert fractions with missing values to integer",
lambda: self.float_withnan_psser.astype(np.int32),
)
self.assert_eq(self.float_withnan_psser.astype(str), self.float_withnan_psser.astype(str))
self.assert_eq(self.float_withnan_psser.astype(bool), self.float_withnan_psser.astype(bool))
self.assert_eq(
self.float_withnan_psser.astype("category"), self.float_withnan_psser.astype("category")
)
if extension_object_dtypes_available and extension_float_dtypes_available:
pser = pd.Series(pd.Categorical([1.0, 2.0, 3.0]), dtype=pd.Float64Dtype())
psser = ps.from_pandas(pser)
Expand Down
57 changes: 14 additions & 43 deletions python/pyspark/pandas/tests/data_type_ops/testing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,21 @@ def numeric_pdf(self):
dtypes = [np.int32, int, np.float32, float]
sers = [pd.Series([1, 2, 3], dtype=dtype) for dtype in dtypes]
sers.append(pd.Series([decimal.Decimal(1), decimal.Decimal(2), decimal.Decimal(3)]))
sers.append(pd.Series([1, 2, np.nan], dtype=float))
# Skip decimal_nan test before v1.3.0, it not supported by pandas on spark yet.
if LooseVersion(pd.__version__) >= LooseVersion("1.3.0"):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify, old versions of pandas fails same with/without this PR, right? Supporting it only w/ 1.3.0 is okay as it's not a regression at least.

Copy link
Member Author

@Yikun Yikun Nov 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • pandas on spark with old versions of pandas fails same with/without this PR. (it's very weird, https://gist.github.com/Yikun/6b88920652fc535b336a03746fe3b04f), I added note: # Skip decimal_nan test before v1.3.0, it not supported by pandas on spark yet.
  • pandas on spark with v1.3.0+ of pandas passed with this PR.
  • old versions of pandas (native pandas) support decimal.

This PR only enable the test case of pandas on spark with pandas v1.3.0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks the changes look pretty good to me. Thanks for redoing this @Yikun.

sers.append(
pd.Series([decimal.Decimal(1), decimal.Decimal(2), decimal.Decimal(np.nan)])
)
pdf = pd.concat(sers, axis=1)
pdf.columns = [dtype.__name__ for dtype in dtypes] + ["decimal"]
if LooseVersion(pd.__version__) >= LooseVersion("1.3.0"):
pdf.columns = [dtype.__name__ for dtype in dtypes] + [
"decimal",
"float_nan",
"decimal_nan",
]
else:
pdf.columns = [dtype.__name__ for dtype in dtypes] + ["decimal", "float_nan"]
return pdf

@property
Expand All @@ -69,25 +82,6 @@ def integral_pdf(self):
def integral_psdf(self):
return ps.from_pandas(self.integral_pdf)

# TODO(SPARK-36031): Merge self.numeric_w_nan_p(s)df into self.numeric_p(s)df
@property
def numeric_w_nan_pdf(self):
psers = {
"float_w_nan": pd.Series([1, 2, np.nan]),
"decimal_w_nan": pd.Series(
[decimal.Decimal(1), decimal.Decimal(2), decimal.Decimal(np.nan)]
),
}
return pd.concat(psers, axis=1)

@property
def numeric_w_nan_psdf(self):
return ps.from_pandas(self.numeric_w_nan_pdf)

@property
def numeric_w_nan_df_cols(self):
return self.numeric_w_nan_pdf.columns

@property
def non_numeric_pdf(self):
psers = {
Expand Down Expand Up @@ -132,33 +126,10 @@ def numeric_psers(self):
def numeric_pssers(self):
return [ps.from_pandas(pser) for pser in self.numeric_psers]

@property
def decimal_withnan_pser(self):
return pd.Series([decimal.Decimal(1.0), decimal.Decimal(2.0), decimal.Decimal(np.nan)])

@property
def decimal_withnan_psser(self):
return ps.from_pandas(self.decimal_withnan_pser)

@property
def float_withnan_pser(self):
return pd.Series([1, 2, np.nan])

@property
def float_withnan_psser(self):
return ps.from_pandas(self.float_withnan_pser)

@property
def numeric_pser_psser_pairs(self):
return zip(self.numeric_psers, self.numeric_pssers)

@property
def numeric_withnan_pser_psser_pairs(self):
return zip(
self.numeric_psers + [self.decimal_withnan_pser, self.float_withnan_pser],
self.numeric_pssers + [self.decimal_withnan_psser, self.float_withnan_psser],
)

@property
def non_numeric_psers(self):
psers = {
Expand Down