@@ -45,18 +45,24 @@ def _df(self, jdf):
4545
4646 @since (1.4 )
4747 def format (self , source ):
48- """
49- Specifies the input data source format.
48+ """Specifies the input data source format.
49+
50+ :param source: string, name of the data source, e.g. 'json', 'parquet'.
51+
52+ >>> df = sqlContext.read.format('json').load('python/test_support/sql/people.json')
53+ >>> df.dtypes
54+ [('age', 'bigint'), ('name', 'string')]
55+
5056 """
5157 self ._jreader = self ._jreader .format (source )
5258 return self
5359
5460 @since (1.4 )
5561 def schema (self , schema ):
56- """
57- Specifies the input schema. Some data sources (e.g. JSON) can
58- infer the input schema automatically from data. By specifying
59- the schema here, the underlying data source can skip the schema
62+ """Specifies the input schema.
63+
64+ Some data sources (e.g. JSON) can infer the input schema automatically from data.
65+ By specifying the schema here, the underlying data source can skip the schema
6066 inference step, and thus speed up data loading.
6167
6268 :param schema: a StructType object
@@ -69,8 +75,7 @@ def schema(self, schema):
6975
7076 @since (1.4 )
7177 def options (self , ** options ):
72- """
73- Adds input options for the underlying data source.
78+ """Adds input options for the underlying data source.
7479 """
7580 for k in options :
7681 self ._jreader = self ._jreader .option (k , options [k ])
@@ -84,6 +89,10 @@ def load(self, path=None, format=None, schema=None, **options):
8489 :param format: optional string for format of the data source. Default to 'parquet'.
8590 :param schema: optional :class:`StructType` for the input schema.
8691 :param options: all other string options
92+
93+ >>> df = sqlContext.read.load('python/test_support/sql/parquet_partitioned')
94+ >>> df.dtypes
95+ [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
8796 """
8897 if format is not None :
8998 self .format (format )
@@ -107,31 +116,10 @@ def json(self, path, schema=None):
107116 :param path: string, path to the JSON dataset.
108117 :param schema: an optional :class:`StructType` for the input schema.
109118
110- >>> import tempfile, shutil
111- >>> jsonFile = tempfile.mkdtemp()
112- >>> shutil.rmtree(jsonFile)
113- >>> with open(jsonFile, 'w') as f:
114- ... f.writelines(jsonStrings)
115- >>> df1 = sqlContext.read.json(jsonFile)
116- >>> df1.printSchema()
117- root
118- |-- field1: long (nullable = true)
119- |-- field2: string (nullable = true)
120- |-- field3: struct (nullable = true)
121- | |-- field4: long (nullable = true)
122-
123- >>> from pyspark.sql.types import *
124- >>> schema = StructType([
125- ... StructField("field2", StringType()),
126- ... StructField("field3",
127- ... StructType([StructField("field5", ArrayType(IntegerType()))]))])
128- >>> df2 = sqlContext.read.json(jsonFile, schema)
129- >>> df2.printSchema()
130- root
131- |-- field2: string (nullable = true)
132- |-- field3: struct (nullable = true)
133- | |-- field5: array (nullable = true)
134- | | |-- element: integer (containsNull = true)
119+ >>> df = sqlContext.read.json('python/test_support/sql/people.json')
120+ >>> df.dtypes
121+ [('age', 'bigint'), ('name', 'string')]
122+
135123 """
136124 if schema is not None :
137125 self .schema (schema )
@@ -141,24 +129,22 @@ def json(self, path, schema=None):
141129 def table (self , tableName ):
142130 """Returns the specified table as a :class:`DataFrame`.
143131
144- >>> sqlContext.registerDataFrameAsTable(df, "table1")
145- >>> df2 = sqlContext.read.table("table1")
146- >>> sorted(df.collect()) == sorted(df2.collect())
147- True
132+ :param tableName: string, name of the table.
133+
134+ >>> df = sqlContext.read.parquet('python/test_support/sql/parquet_partitioned')
135+ >>> df.registerTempTable('tmpTable')
136+ >>> sqlContext.read.table('tmpTable').dtypes
137+ [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
148138 """
149139 return self ._df (self ._jreader .table (tableName ))
150140
151141 @since (1.4 )
152142 def parquet (self , * path ):
153143 """Loads a Parquet file, returning the result as a :class:`DataFrame`.
154144
155- >>> import tempfile, shutil
156- >>> parquetFile = tempfile.mkdtemp()
157- >>> shutil.rmtree(parquetFile)
158- >>> df.saveAsParquetFile(parquetFile)
159- >>> df2 = sqlContext.read.parquet(parquetFile)
160- >>> sorted(df.collect()) == sorted(df2.collect())
161- True
145+ >>> df = sqlContext.read.parquet('python/test_support/sql/parquet_partitioned')
146+ >>> df.dtypes
147+ [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
162148 """
163149 return self ._df (self ._jreader .parquet (_to_seq (self ._sqlContext ._sc , path )))
164150
@@ -221,43 +207,49 @@ def __init__(self, df):
221207
222208 @since (1.4 )
223209 def mode (self , saveMode ):
224- """
225- Specifies the behavior when data or table already exists. Options include:
210+ """Specifies the behavior when data or table already exists.
211+
212+ Options include:
226213
227214 * `append`: Append contents of this :class:`DataFrame` to existing data.
228215 * `overwrite`: Overwrite existing data.
229216 * `error`: Throw an exception if data already exists.
230217 * `ignore`: Silently ignore this operation if data already exists.
218+
219+ >>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
231220 """
232221 self ._jwrite = self ._jwrite .mode (saveMode )
233222 return self
234223
235224 @since (1.4 )
236225 def format (self , source ):
237- """
238- Specifies the underlying output data source. Built-in options include
239- "parquet", "json", etc.
226+ """Specifies the underlying output data source.
227+
228+ :param source: string, name of the data source, e.g. 'json', 'parquet'.
229+
230+ >>> df.write.format('json').save(os.path.join(tempfile.mkdtemp(), 'data'))
240231 """
241232 self ._jwrite = self ._jwrite .format (source )
242233 return self
243234
244235 @since (1.4 )
245236 def options (self , ** options ):
246- """
247- Adds output options for the underlying data source.
237+ """Adds output options for the underlying data source.
248238 """
249239 for k in options :
250240 self ._jwrite = self ._jwrite .option (k , options [k ])
251241 return self
252242
253243 @since (1.4 )
254244 def partitionBy (self , * cols ):
255- """
256- Partitions the output by the given columns on the file system.
245+ """Partitions the output by the given columns on the file system.
246+
257247 If specified, the output is laid out on the file system similar
258248 to Hive's partitioning scheme.
259249
260250 :param cols: name of columns
251+
252+ >>> df.write.partitionBy('year', 'month').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
261253 """
262254 if len (cols ) == 1 and isinstance (cols [0 ], (list , tuple )):
263255 cols = cols [0 ]
@@ -266,8 +258,7 @@ def partitionBy(self, *cols):
266258
267259 @since (1.4 )
268260 def save (self , path = None , format = None , mode = "error" , ** options ):
269- """
270- Saves the contents of the :class:`DataFrame` to a data source.
261+ """Saves the contents of the :class:`DataFrame` to a data source.
271262
272263 The data source is specified by the ``format`` and a set of ``options``.
273264 If ``format`` is not specified, the default data source configured by
@@ -285,6 +276,8 @@ def save(self, path=None, format=None, mode="error", **options):
285276 :param format: the format used to save
286277 :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
287278 :param options: all other string options
279+
280+ >>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data'))
288281 """
289282 self .mode (mode ).options (** options )
290283 if format is not None :
@@ -296,8 +289,8 @@ def save(self, path=None, format=None, mode="error", **options):
296289
297290 @since (1.4 )
298291 def insertInto (self , tableName , overwrite = False ):
299- """
300- Inserts the content of the :class:`DataFrame` to the specified table.
292+ """Inserts the content of the :class:`DataFrame` to the specified table.
293+
301294 It requires that the schema of the class:`DataFrame` is the same as the
302295 schema of the table.
303296
@@ -307,8 +300,7 @@ def insertInto(self, tableName, overwrite=False):
307300
308301 @since (1.4 )
309302 def saveAsTable (self , name , format = None , mode = "error" , ** options ):
310- """
311- Saves the content of the :class:`DataFrame` as the specified table.
303+ """Saves the content of the :class:`DataFrame` as the specified table.
312304
313305 In the case the table already exists, behavior of this function depends on the
314306 save mode, specified by the `mode` function (default to throwing an exception).
@@ -328,13 +320,11 @@ def saveAsTable(self, name, format=None, mode="error", **options):
328320 self .mode (mode ).options (** options )
329321 if format is not None :
330322 self .format (format )
331- return self ._jwrite .saveAsTable (name )
323+ self ._jwrite .saveAsTable (name )
332324
333325 @since (1.4 )
334326 def json (self , path , mode = "error" ):
335- """
336- Saves the content of the :class:`DataFrame` in JSON format at the
337- specified path.
327+ """Saves the content of the :class:`DataFrame` in JSON format at the specified path.
338328
339329 Additionally, mode is used to specify the behavior of the save operation when
340330 data already exists in the data source. There are four modes:
@@ -346,14 +336,14 @@ def json(self, path, mode="error"):
346336
347337 :param path: the path in any Hadoop supported file system
348338 :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
339+
340+ >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
349341 """
350- return self ._jwrite .mode (mode ).json (path )
342+ self ._jwrite .mode (mode ).json (path )
351343
352344 @since (1.4 )
353345 def parquet (self , path , mode = "error" ):
354- """
355- Saves the content of the :class:`DataFrame` in Parquet format at the
356- specified path.
346+ """Saves the content of the :class:`DataFrame` in Parquet format at the specified path.
357347
358348 Additionally, mode is used to specify the behavior of the save operation when
359349 data already exists in the data source. There are four modes:
@@ -365,14 +355,14 @@ def parquet(self, path, mode="error"):
365355
366356 :param path: the path in any Hadoop supported file system
367357 :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
358+
359+ >>> df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data'))
368360 """
369- return self ._jwrite .mode (mode ).parquet (path )
361+ self ._jwrite .mode (mode ).parquet (path )
370362
371363 @since (1.4 )
372364 def jdbc (self , url , table , mode = "error" , properties = {}):
373- """
374- Saves the content of the :class:`DataFrame` to a external database table
375- via JDBC.
365+ """Saves the content of the :class:`DataFrame` to a external database table via JDBC.
376366
377367 In the case the table already exists in the external database,
378368 behavior of this function depends on the save mode, specified by the `mode`
@@ -383,12 +373,15 @@ def jdbc(self, url, table, mode="error", properties={}):
383373 * `error`: Throw an exception if data already exists.
384374 * `ignore`: Silently ignore this operation if data already exists.
385375
386- :param url: a JDBC URL of the form `jdbc:subprotocol:subname`
376+ .. warning:: Don't create too many partitions in parallel on a large cluster;
377+ otherwise Spark might crash your external database systems.
378+
379+ :param url: a JDBC URL of the form ``jdbc:subprotocol:subname``
387380 :param table: Name of the table in the external database.
388- :param mode: one of `append`, `overwrite`, `error`, `ignore` (default: error)
381+ :param mode: one of `` append`` , `` overwrite`` , `` error`` , `` ignore`` (default: `` error`` )
389382 :param properties: JDBC database connection arguments, a list of
390- arbitrary string tag/value. Normally at least a
391- "user" and "password" property should be included.
383+ arbitrary string tag/value. Normally at least a
384+ "user" and "password" property should be included.
392385 """
393386 jprop = JavaClass ("java.util.Properties" , self ._sqlContext ._sc ._gateway ._gateway_client )()
394387 for k in properties :
@@ -398,24 +391,23 @@ def jdbc(self, url, table, mode="error", properties={}):
398391
399392def _test ():
400393 import doctest
394+ import os
395+ import tempfile
401396 from pyspark .context import SparkContext
402397 from pyspark .sql import Row , SQLContext
403398 import pyspark .sql .readwriter
399+
400+ os .chdir (os .environ ["SPARK_HOME" ])
401+
404402 globs = pyspark .sql .readwriter .__dict__ .copy ()
405403 sc = SparkContext ('local[4]' , 'PythonTest' )
404+
405+ globs ['tempfile' ] = tempfile
406+ globs ['os' ] = os
406407 globs ['sc' ] = sc
407408 globs ['sqlContext' ] = SQLContext (sc )
408- globs ['df' ] = sc .parallelize ([(2 , 'Alice' ), (5 , 'Bob' )]) \
409- .toDF (StructType ([StructField ('age' , IntegerType ()),
410- StructField ('name' , StringType ())]))
411- jsonStrings = [
412- '{"field1": 1, "field2": "row1", "field3":{"field4":11}}' ,
413- '{"field1" : 2, "field3":{"field4":22, "field5": [10, 11]},'
414- '"field6":[{"field7": "row2"}]}' ,
415- '{"field1" : null, "field2": "row3", '
416- '"field3":{"field4":33, "field5": []}}'
417- ]
418- globs ['jsonStrings' ] = jsonStrings
409+ globs ['df' ] = globs ['sqlContext' ].read .parquet ('python/test_support/sql/parquet_partitioned' )
410+
419411 (failure_count , test_count ) = doctest .testmod (
420412 pyspark .sql .readwriter , globs = globs ,
421413 optionflags = doctest .ELLIPSIS | doctest .NORMALIZE_WHITESPACE | doctest .REPORT_NDIFF )
0 commit comments