Skip to content
Closed
2 changes: 1 addition & 1 deletion pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ enable=
# If you would like to improve the code quality of pyspark, remove any of these disabled errors
# run ./dev/lint-python and see if the errors raised by pylint can be fixed.

disable=invalid-name,missing-docstring,protected-access,unused-argument,no-member,unused-wildcard-import,redefined-builtin,too-many-arguments,unused-variable,too-few-public-methods,bad-continuation,duplicate-code,redefined-outer-name,too-many-ancestors,import-error,superfluous-parens,unused-import,line-too-long,no-name-in-module,unnecessary-lambda,import-self,no-self-use,unidiomatic-typecheck,fixme,too-many-locals,cyclic-import,too-many-branches,bare-except,wildcard-import,dangerous-default-value,broad-except,too-many-public-methods,deprecated-lambda,anomalous-backslash-in-string,too-many-lines,reimported,too-many-statements,bad-whitespace,unpacking-non-sequence,too-many-instance-attributes,abstract-method,old-style-class,global-statement,attribute-defined-outside-init,arguments-differ,undefined-all-variable,no-init,useless-else-on-loop,super-init-not-called,notimplemented-raised,too-many-return-statements,pointless-string-statement,global-variable-undefined,bad-classmethod-argument,too-many-format-args,parse-error,no-self-argument,pointless-statement,undefined-variable
disable=invalid-name,missing-docstring,protected-access,unused-argument,no-member,unused-wildcard-import,redefined-builtin,too-many-arguments,unused-variable,too-few-public-methods,bad-continuation,duplicate-code,redefined-outer-name,too-many-ancestors,import-error,superfluous-parens,unused-import,line-too-long,no-name-in-module,unnecessary-lambda,import-self,no-self-use,unidiomatic-typecheck,fixme,too-many-locals,cyclic-import,too-many-branches,bare-except,wildcard-import,dangerous-default-value,broad-except,too-many-public-methods,deprecated-lambda,anomalous-backslash-in-string,too-many-lines,reimported,too-many-statements,bad-whitespace,unpacking-non-sequence,too-many-instance-attributes,abstract-method,old-style-class,global-statement,attribute-defined-outside-init,arguments-differ,undefined-all-variable,no-init,useless-else-on-loop,super-init-not-called,notimplemented-raised,too-many-return-statements,pointless-string-statement,global-variable-undefined,bad-classmethod-argument,too-many-format-args,parse-error,no-self-argument,pointless-statement,undefined-variable,undefined-loop-variable


[REPORTS]
Expand Down
38 changes: 37 additions & 1 deletion python/pyspark/cloudpickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,26 @@ def save_global(self, obj, name=None, pack=struct.pack):
if new_override:
d['__new__'] = obj.__new__

self.save_reduce(typ, (obj.__name__, obj.__bases__, d), obj=obj)
self.save(_load_class)
self.save_reduce(typ, (obj.__name__, obj.__bases__, {"__doc__": obj.__doc__}), obj=obj)
d.pop('__doc__', None)
# handle property and staticmethod
dd = {}
for k, v in d.items():
if isinstance(v, property):
k = ('property', k)
v = (v.fget, v.fset, v.fdel, v.__doc__)
elif isinstance(v, staticmethod) and hasattr(v, '__func__'):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like PyLint is complaining about a possibly undefined loop variable v at this line. If this isn't a legitimate error, then we can just add a comment to bypass that warning here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JoshRosen I think it's annoying to let PyLint report Warning as Error, should we only fail on real errors?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point; let's see if we can update the configuration to do that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried with putting -E to pylint, it seems even worse, lots of false-negtive errors.

k = ('staticmethod', k)
v = v.__func__
elif isinstance(v, classmethod) and hasattr(v, '__func__'):
k = ('classmethod', k)
v = v.__func__
dd[k] = v
self.save(dd)
self.write(pickle.TUPLE2)
self.write(pickle.REDUCE)

else:
raise pickle.PicklingError("Can't pickle %r" % obj)

Expand Down Expand Up @@ -708,6 +727,23 @@ def _make_skel_func(code, closures, base_globals = None):
None, None, closure)


def _load_class(cls, d):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing doc

"""
Loads additional properties into class `cls`.
"""
for k, v in d.items():
if isinstance(k, tuple):
typ, k = k
if typ == 'property':
v = property(*v)
elif typ == 'staticmethod':
v = staticmethod(v)
elif typ == 'classmethod':
v = classmethod(v)
setattr(cls, k, v)
return cls


"""Constructors for 3rd party libraries
Note: These can never be renamed due to client compatibility issues"""

Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ def _open_file(self):
if not os.path.exists(d):
os.makedirs(d)
p = os.path.join(d, str(id(self)))
self._file = open(p, "wb+", 65536)
self._file = open(p, "w+b", 65536)
self._ser = BatchedSerializer(CompressedSerializer(PickleSerializer()), 1024)
os.unlink(p)

Expand Down
108 changes: 67 additions & 41 deletions python/pyspark/sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,66 @@ def applySchema(self, rdd, schema):

return self.createDataFrame(rdd, schema)

def _createFromRDD(self, rdd, schema, samplingRatio):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing doc, especially the return values

"""
Create an RDD for DataFrame from an existing RDD, returns the RDD and schema.
"""
if schema is None or isinstance(schema, (list, tuple)):
struct = self._inferSchema(rdd, samplingRatio)
converter = _create_converter(struct)
rdd = rdd.map(converter)
if isinstance(schema, (list, tuple)):
for i, name in enumerate(schema):
struct.fields[i].name = name
struct.names[i] = name
schema = struct

elif isinstance(schema, StructType):
# take the first few rows to verify schema
rows = rdd.take(10)
for row in rows:
_verify_type(row, schema)

else:
raise TypeError("schema should be StructType or list or None, but got: %s" % schema)

# convert python objects to sql data
rdd = rdd.map(schema.toInternal)
return rdd, schema

def _createFromLocal(self, data, schema):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing doc

"""
Create an RDD for DataFrame from an list or pandas.DataFrame, returns
the RDD and schema.
"""
if has_pandas and isinstance(data, pandas.DataFrame):
if schema is None:
schema = [str(x) for x in data.columns]
data = [r.tolist() for r in data.to_records(index=False)]

# make sure data could consumed multiple times
if not isinstance(data, list):
data = list(data)

if schema is None or isinstance(schema, (list, tuple)):
struct = self._inferSchemaFromList(data)
if isinstance(schema, (list, tuple)):
for i, name in enumerate(schema):
struct.fields[i].name = name
struct.names[i] = name
schema = struct

elif isinstance(schema, StructType):
for row in data:
_verify_type(row, schema)

else:
raise TypeError("schema should be StructType or list or None, but got: %s" % schema)

# convert python objects to sql data
data = [schema.toInternal(row) for row in data]
return self._sc.parallelize(data), schema

@since(1.3)
@ignore_unicode_prefix
def createDataFrame(self, data, schema=None, samplingRatio=None):
Expand Down Expand Up @@ -340,49 +400,15 @@ def createDataFrame(self, data, schema=None, samplingRatio=None):
if isinstance(data, DataFrame):
raise TypeError("data is already a DataFrame")

if has_pandas and isinstance(data, pandas.DataFrame):
if schema is None:
schema = [str(x) for x in data.columns]
data = [r.tolist() for r in data.to_records(index=False)]

if not isinstance(data, RDD):
if not isinstance(data, list):
data = list(data)
try:
# data could be list, tuple, generator ...
rdd = self._sc.parallelize(data)
except Exception:
raise TypeError("cannot create an RDD from type: %s" % type(data))
if isinstance(data, RDD):
rdd, schema = self._createFromRDD(data, schema, samplingRatio)
else:
rdd = data

if schema is None or isinstance(schema, (list, tuple)):
if isinstance(data, RDD):
struct = self._inferSchema(rdd, samplingRatio)
else:
struct = self._inferSchemaFromList(data)
if isinstance(schema, (list, tuple)):
for i, name in enumerate(schema):
struct.fields[i].name = name
schema = struct
converter = _create_converter(schema)
rdd = rdd.map(converter)

elif isinstance(schema, StructType):
# take the first few rows to verify schema
rows = rdd.take(10)
for row in rows:
_verify_type(row, schema)

else:
raise TypeError("schema should be StructType or list or None")

# convert python objects to sql data
rdd = rdd.map(schema.toInternal)

rdd, schema = self._createFromLocal(data, schema)
jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
df = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
return DataFrame(df, self)
jdf = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
df = DataFrame(jdf, self)
df._schema = schema
return df

@since(1.3)
def registerDataFrameAsTable(self, df, tableName):
Expand Down
112 changes: 103 additions & 9 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def sqlType(self):

@classmethod
def module(cls):
return 'pyspark.tests'
return 'pyspark.sql.tests'

@classmethod
def scalaUDT(cls):
Expand Down Expand Up @@ -106,10 +106,45 @@ def __str__(self):
return "(%s,%s)" % (self.x, self.y)

def __eq__(self, other):
return isinstance(other, ExamplePoint) and \
return isinstance(other, self.__class__) and \
other.x == self.x and other.y == self.y


class PythonOnlyUDT(UserDefinedType):
"""
User-defined type (UDT) for ExamplePoint.
"""

@classmethod
def sqlType(self):
return ArrayType(DoubleType(), False)

@classmethod
def module(cls):
return '__main__'

def serialize(self, obj):
return [obj.x, obj.y]

def deserialize(self, datum):
return PythonOnlyPoint(datum[0], datum[1])

@staticmethod
def foo():
pass

@property
def props(self):
return {}


class PythonOnlyPoint(ExamplePoint):
"""
An example class to demonstrate UDT in only Python
"""
__UDT__ = PythonOnlyUDT()


class DataTypeTests(unittest.TestCase):
# regression test for SPARK-6055
def test_data_type_eq(self):
Expand Down Expand Up @@ -395,47 +430,106 @@ def test_convert_row_to_dict(self):
self.assertEqual(1, row.asDict()["l"][0].a)
self.assertEqual(1.0, row.asDict()['d']['key'].c)

def test_udt(self):
from pyspark.sql.types import _parse_datatype_json_string, _infer_type, _verify_type
from pyspark.sql.tests import ExamplePointUDT, ExamplePoint

def check_datatype(datatype):
pickled = pickle.loads(pickle.dumps(datatype))
assert datatype == pickled
scala_datatype = self.sqlCtx._ssql_ctx.parseDataType(datatype.json())
python_datatype = _parse_datatype_json_string(scala_datatype.json())
assert datatype == python_datatype

check_datatype(ExamplePointUDT())
structtype_with_udt = StructType([StructField("label", DoubleType(), False),
StructField("point", ExamplePointUDT(), False)])
check_datatype(structtype_with_udt)
p = ExamplePoint(1.0, 2.0)
self.assertEqual(_infer_type(p), ExamplePointUDT())
_verify_type(ExamplePoint(1.0, 2.0), ExamplePointUDT())
self.assertRaises(ValueError, lambda: _verify_type([1.0, 2.0], ExamplePointUDT()))

check_datatype(PythonOnlyUDT())
structtype_with_udt = StructType([StructField("label", DoubleType(), False),
StructField("point", PythonOnlyUDT(), False)])
check_datatype(structtype_with_udt)
p = PythonOnlyPoint(1.0, 2.0)
self.assertEqual(_infer_type(p), PythonOnlyUDT())
_verify_type(PythonOnlyPoint(1.0, 2.0), PythonOnlyUDT())
self.assertRaises(ValueError, lambda: _verify_type([1.0, 2.0], PythonOnlyUDT()))

def test_infer_schema_with_udt(self):
from pyspark.sql.tests import ExamplePoint, ExamplePointUDT
row = Row(label=1.0, point=ExamplePoint(1.0, 2.0))
df = self.sc.parallelize([row]).toDF()
df = self.sqlCtx.createDataFrame([row])
schema = df.schema
field = [f for f in schema.fields if f.name == "point"][0]
self.assertEqual(type(field.dataType), ExamplePointUDT)
df.registerTempTable("labeled_point")
point = self.sqlCtx.sql("SELECT point FROM labeled_point").head().point
self.assertEqual(point, ExamplePoint(1.0, 2.0))

row = Row(label=1.0, point=PythonOnlyPoint(1.0, 2.0))
df = self.sqlCtx.createDataFrame([row])
schema = df.schema
field = [f for f in schema.fields if f.name == "point"][0]
self.assertEqual(type(field.dataType), PythonOnlyUDT)
df.registerTempTable("labeled_point")
point = self.sqlCtx.sql("SELECT point FROM labeled_point").head().point
self.assertEqual(point, PythonOnlyPoint(1.0, 2.0))

def test_apply_schema_with_udt(self):
from pyspark.sql.tests import ExamplePoint, ExamplePointUDT
row = (1.0, ExamplePoint(1.0, 2.0))
rdd = self.sc.parallelize([row])
schema = StructType([StructField("label", DoubleType(), False),
StructField("point", ExamplePointUDT(), False)])
df = rdd.toDF(schema)
df = self.sqlCtx.createDataFrame([row], schema)
point = df.head().point
self.assertEquals(point, ExamplePoint(1.0, 2.0))

row = (1.0, PythonOnlyPoint(1.0, 2.0))
schema = StructType([StructField("label", DoubleType(), False),
StructField("point", PythonOnlyUDT(), False)])
df = self.sqlCtx.createDataFrame([row], schema)
point = df.head().point
self.assertEquals(point, PythonOnlyPoint(1.0, 2.0))

def test_udf_with_udt(self):
from pyspark.sql.tests import ExamplePoint, ExamplePointUDT
row = Row(label=1.0, point=ExamplePoint(1.0, 2.0))
df = self.sc.parallelize([row]).toDF()
df = self.sqlCtx.createDataFrame([row])
self.assertEqual(1.0, df.map(lambda r: r.point.x).first())
udf = UserDefinedFunction(lambda p: p.y, DoubleType())
self.assertEqual(2.0, df.select(udf(df.point)).first()[0])
udf2 = UserDefinedFunction(lambda p: ExamplePoint(p.x + 1, p.y + 1), ExamplePointUDT())
self.assertEqual(ExamplePoint(2.0, 3.0), df.select(udf2(df.point)).first()[0])

row = Row(label=1.0, point=PythonOnlyPoint(1.0, 2.0))
df = self.sqlCtx.createDataFrame([row])
self.assertEqual(1.0, df.map(lambda r: r.point.x).first())
udf = UserDefinedFunction(lambda p: p.y, DoubleType())
self.assertEqual(2.0, df.select(udf(df.point)).first()[0])
udf2 = UserDefinedFunction(lambda p: PythonOnlyPoint(p.x + 1, p.y + 1), PythonOnlyUDT())
self.assertEqual(PythonOnlyPoint(2.0, 3.0), df.select(udf2(df.point)).first()[0])

def test_parquet_with_udt(self):
from pyspark.sql.tests import ExamplePoint
from pyspark.sql.tests import ExamplePoint, ExamplePointUDT
row = Row(label=1.0, point=ExamplePoint(1.0, 2.0))
df0 = self.sc.parallelize([row]).toDF()
df0 = self.sqlCtx.createDataFrame([row])
output_dir = os.path.join(self.tempdir.name, "labeled_point")
df0.saveAsParquetFile(output_dir)
df0.write.parquet(output_dir)
df1 = self.sqlCtx.parquetFile(output_dir)
point = df1.head().point
self.assertEquals(point, ExamplePoint(1.0, 2.0))

row = Row(label=1.0, point=PythonOnlyPoint(1.0, 2.0))
df0 = self.sqlCtx.createDataFrame([row])
df0.write.parquet(output_dir, mode='overwrite')
df1 = self.sqlCtx.parquetFile(output_dir)
point = df1.head().point
self.assertEquals(point, PythonOnlyPoint(1.0, 2.0))

def test_column_operators(self):
ci = self.df.key
cs = self.df.value
Expand Down
Loading