diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index a3f2fb2ed1491..78d5d8ec110e3 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -226,7 +226,7 @@ jobs: run: | # TODO(SPARK-32407): Sphinx 3.1+ does not correctly index nested classes. # See also https://github.com/sphinx-doc/sphinx/issues/7551. - pip3 install flake8 'sphinx<3.1.0' numpy pydata_sphinx_theme + pip3 install flake8 'sphinx<3.1.0' numpy pydata_sphinx_theme ipython nbsphinx - name: Install R 4.0 run: | sudo sh -c "echo 'deb https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/' >> /etc/apt/sources.list" @@ -245,10 +245,11 @@ jobs: ruby-version: 2.7 - name: Install dependencies for documentation generation run: | + # pandoc is required to generate PySpark APIs as well in nbsphinx. sudo apt-get install -y libcurl4-openssl-dev pandoc # TODO(SPARK-32407): Sphinx 3.1+ does not correctly index nested classes. # See also https://github.com/sphinx-doc/sphinx/issues/7551. - pip install 'sphinx<3.1.0' mkdocs numpy pydata_sphinx_theme + pip install 'sphinx<3.1.0' mkdocs numpy pydata_sphinx_theme ipython nbsphinx gem install jekyll jekyll-redirect-from rouge sudo Rscript -e "install.packages(c('devtools', 'testthat', 'knitr', 'rmarkdown', 'roxygen2'), repos='https://cloud.r-project.org/')" - name: Scala linter diff --git a/binder/apt.txt b/binder/apt.txt new file mode 100644 index 0000000000000..385f5b0fba754 --- /dev/null +++ b/binder/apt.txt @@ -0,0 +1 @@ +openjdk-8-jre diff --git a/binder/postBuild b/binder/postBuild new file mode 100644 index 0000000000000..42bb3514c5a2e --- /dev/null +++ b/binder/postBuild @@ -0,0 +1,24 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This file is used for Binder integration to install PySpark available in +# Jupyter notebook. + +VERSION=$(python -c "exec(open('python/pyspark/version.py').read()); print(__version__)") +pip install "pyspark[sql,ml,mllib]<=$VERSION" diff --git a/dev/create-release/spark-rm/Dockerfile b/dev/create-release/spark-rm/Dockerfile index 200f8b97f8d66..4e007a5eeb93a 100644 --- a/dev/create-release/spark-rm/Dockerfile +++ b/dev/create-release/spark-rm/Dockerfile @@ -36,7 +36,7 @@ ARG APT_INSTALL="apt-get install --no-install-recommends -y" # TODO(SPARK-32407): Sphinx 3.1+ does not correctly index nested classes. # See also https://github.com/sphinx-doc/sphinx/issues/7551. # We should use the latest Sphinx version once this is fixed. -ARG PIP_PKGS="sphinx==3.0.4 mkdocs==1.0.4 numpy==1.18.1 pydata_sphinx_theme==0.3.1" +ARG PIP_PKGS="sphinx==3.0.4 mkdocs==1.0.4 numpy==1.18.1 pydata_sphinx_theme==0.3.1 ipython==7.16.1 nbsphinx==0.7.1" ARG GEM_PKGS="jekyll:4.0.0 jekyll-redirect-from:0.16.0 rouge:3.15.0" # Install extra needed repos and refresh. @@ -75,6 +75,7 @@ RUN apt-get clean && apt-get update && $APT_INSTALL gnupg ca-certificates && \ pip3 install $PIP_PKGS && \ # Install R packages and dependencies used when building. # R depends on pandoc*, libssl (which are installed above). + # Note that PySpark doc generation also needs pandoc due to nbsphinx $APT_INSTALL r-base r-base-dev && \ $APT_INSTALL texlive-latex-base texlive texlive-fonts-extra texinfo qpdf && \ Rscript -e "install.packages(c('curl', 'xml2', 'httr', 'devtools', 'testthat', 'knitr', 'rmarkdown', 'roxygen2', 'e1071', 'survival'), repos='https://cloud.r-project.org/')" && \ diff --git a/dev/lint-python b/dev/lint-python index 07897eb499895..14c7f7f2a00d8 100755 --- a/dev/lint-python +++ b/dev/lint-python @@ -196,6 +196,22 @@ function sphinx_test { return fi + # TODO(SPARK-32666): Install nbsphinx in Jenkins machines + PYTHON_HAS_NBSPHINX=$("$PYTHON_EXECUTABLE" -c 'import importlib.util; print(importlib.util.find_spec("nbsphinx") is not None)') + if [[ "$PYTHON_HAS_NBSPHINX" == "False" ]]; then + echo "$PYTHON_EXECUTABLE does not have nbsphinx installed. Skipping Sphinx build for now." + echo + return + fi + + # TODO(SPARK-32666): Install ipython in Jenkins machines + PYTHON_HAS_IPYTHON=$("$PYTHON_EXECUTABLE" -c 'import importlib.util; print(importlib.util.find_spec("ipython") is not None)') + if [[ "$PYTHON_HAS_IPYTHON" == "False" ]]; then + echo "$PYTHON_EXECUTABLE does not have ipython installed. Skipping Sphinx build for now." + echo + return + fi + echo "starting $SPHINX_BUILD tests..." pushd python/docs &> /dev/null make clean &> /dev/null diff --git a/dev/requirements.txt b/dev/requirements.txt index a862a6e986791..b11f24fdbd4b2 100644 --- a/dev/requirements.txt +++ b/dev/requirements.txt @@ -4,3 +4,5 @@ PyGithub==1.26.0 Unidecode==0.04.19 sphinx pydata_sphinx_theme +ipython +nbsphinx diff --git a/docs/README.md b/docs/README.md index e2002a66b0433..09982c1301163 100644 --- a/docs/README.md +++ b/docs/README.md @@ -63,7 +63,7 @@ See also https://github.com/sphinx-doc/sphinx/issues/7551. --> ```sh -$ sudo pip install 'sphinx<3.1.0' mkdocs numpy pydata_sphinx_theme +$ sudo pip install 'sphinx<3.1.0' mkdocs numpy pydata_sphinx_theme ipython nbsphinx ``` ## Generating the Documentation HTML diff --git a/python/docs/source/conf.py b/python/docs/source/conf.py index 7b1939d976080..738765a576290 100644 --- a/python/docs/source/conf.py +++ b/python/docs/source/conf.py @@ -45,8 +45,20 @@ 'sphinx.ext.viewcode', 'sphinx.ext.mathjax', 'sphinx.ext.autosummary', + 'nbsphinx', # Converts Jupyter Notebook to reStructuredText files for Sphinx. + # For ipython directive in reStructuredText files. It is generated by the notebook. + 'IPython.sphinxext.ipython_console_highlighting' ] +# Links used globally in the RST files. +# These are defined here to allow link substitutions dynamically. +rst_epilog = """ +.. |binder| replace:: Live Notebook +.. _binder: https://mybinder.org/v2/gh/apache/spark/{0}?filepath=python%2Fdocs%2Fsource%2Fgetting_started%2Fquickstart.ipynb +.. |examples| replace:: Examples +.. _examples: https://github.com/apache/spark/tree/{0}/examples/src/main/python +""".format(os.environ.get("RELEASE_TAG", "master")) + # Add any paths that contain templates here, relative to this directory. templates_path = ['_templates'] @@ -84,7 +96,7 @@ # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. -exclude_patterns = ['_build'] +exclude_patterns = ['_build', '.DS_Store', '**.ipynb_checkpoints'] # The reST default role (used for this markup: `text`) to use for all # documents. diff --git a/python/docs/source/getting_started/index.rst b/python/docs/source/getting_started/index.rst index 457368c8194cb..cf4f7de11dbe3 100644 --- a/python/docs/source/getting_started/index.rst +++ b/python/docs/source/getting_started/index.rst @@ -20,3 +20,7 @@ Getting Started =============== +.. toctree:: + :maxdepth: 2 + + quickstart diff --git a/python/docs/source/getting_started/quickstart.ipynb b/python/docs/source/getting_started/quickstart.ipynb new file mode 100644 index 0000000000000..34a3641205364 --- /dev/null +++ b/python/docs/source/getting_started/quickstart.ipynb @@ -0,0 +1,1177 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Quickstart\n", + "\n", + "This is a short introduction and quickstart for the PySpark DataFrame API. PySpark DataFrames are lazily evaluated. They are implemented on top of [RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html#overview)s. When Spark [transforms](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations) data, it does not immediately compute the transformation but plans how to compute later. When [actions](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions) such as `collect()` are explicitly called, the computation starts.\n", + "This notebook shows the basic usages of the DataFrame, geared mainly for new users. You can run the latest version of these examples by yourself on a live notebook [here](https://mybinder.org/v2/gh/databricks/apache/master?filepath=python%2Fdocs%2Fsource%2Fgetting_started%2Fquickstart.ipynb).\n", + "\n", + "There is also other useful information in Apache Spark documentation site, see the latest version of [Spark SQL and DataFrames](https://spark.apache.org/docs/latest/sql-programming-guide.html), [RDD Programming Guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html), [Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html), [Spark Streaming Programming Guide](https://spark.apache.org/docs/latest/streaming-programming-guide.html) and [Machine Learning Library (MLlib) Guide](https://spark.apache.org/docs/latest/ml-guide.html).\n", + "\n", + "PySaprk applications start with initializing `SparkSession` which is the entry point of PySpark as below. In case of running it in PySpark shell via pyspark executable, the shell automatically creates the session in the variable spark for users." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql import SparkSession\n", + "\n", + "spark = SparkSession.builder.getOrCreate()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## DataFrame Creation\n", + "\n", + "A PySpark DataFrame can be created via `pyspark.sql.SparkSession.createDataFrame` typically by passing a list of lists, tuples, dictionaries and `pyspark.sql.Row`s, a [pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) and an RDD consisting of such a list.\n", + "`pyspark.sql.SparkSession.createDataFrame` takes the `schema` argument to specify the schema of the DataFrame. When it is omitted, PySpark infers the corresponding schema by taking a sample from the data.\n", + "\n", + "Firstly, you can create a PySpark DataFrame from a list of rows" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from datetime import datetime, date\n", + "import pandas as pd\n", + "from pyspark.sql import Row\n", + "\n", + "df = spark.createDataFrame([\n", + " Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),\n", + " Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),\n", + " Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))\n", + "])\n", + "df" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Create a PySpark DataFrame with an explicit schema." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df = spark.createDataFrame([\n", + " (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),\n", + " (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),\n", + " (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))\n", + "], schema='a long, b double, c string, d date, e timestamp')\n", + "df" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Create a PySpark DataFrame from a pandas DataFrame" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "pandas_df = pd.DataFrame({\n", + " 'a': [1, 2, 3],\n", + " 'b': [2., 3., 4.],\n", + " 'c': ['string1', 'string2', 'string3'],\n", + " 'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],\n", + " 'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]\n", + "})\n", + "df = spark.createDataFrame(pandas_df)\n", + "df" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Create a PySpark DataFrame from an RDD consisting of a list of tuples." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "rdd = spark.sparkContext.parallelize([\n", + " (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),\n", + " (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),\n", + " (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))\n", + "])\n", + "df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])\n", + "df" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The DataFrames created above all have the same results and schema." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+---+-------+----------+-------------------+\n", + "| a| b| c| d| e|\n", + "+---+---+-------+----------+-------------------+\n", + "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n", + "| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|\n", + "| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|\n", + "+---+---+-------+----------+-------------------+\n", + "\n", + "root\n", + " |-- a: long (nullable = true)\n", + " |-- b: double (nullable = true)\n", + " |-- c: string (nullable = true)\n", + " |-- d: date (nullable = true)\n", + " |-- e: timestamp (nullable = true)\n", + "\n" + ] + } + ], + "source": [ + "# All DataFrames above result same.\n", + "df.show()\n", + "df.printSchema()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Viewing Data\n", + "\n", + "The top rows of a DataFrame can be displayed using `DataFrame.show()`." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+---+-------+----------+-------------------+\n", + "| a| b| c| d| e|\n", + "+---+---+-------+----------+-------------------+\n", + "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n", + "+---+---+-------+----------+-------------------+\n", + "only showing top 1 row\n", + "\n" + ] + } + ], + "source": [ + "df.show(1)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Alternatively, you can enable `spark.sql.repl.eagerEval.enabled` configuration for the eager evaluation of PySpark DataFrame in notebooks such as Jupyter. The number of rows to show can be controlled via `spark.sql.repl.eagerEval.maxNumRows` configuration." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "
abcde
12.0string12000-01-012000-01-01 12:00:00
23.0string22000-02-012000-01-02 12:00:00
34.0string32000-03-012000-01-03 12:00:00
\n" + ], + "text/plain": [ + "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "spark.conf.set('spark.sql.repl.eagerEval.enabled', True)\n", + "df" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The rows can also be shown vertically. This is useful when rows are too long to show horizontally." + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "-RECORD 0------------------\n", + " a | 1 \n", + " b | 2.0 \n", + " c | string1 \n", + " d | 2000-01-01 \n", + " e | 2000-01-01 12:00:00 \n", + "only showing top 1 row\n", + "\n" + ] + } + ], + "source": [ + "df.show(1, vertical=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "You can see the DataFrame's schema and column names as follows:" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['a', 'b', 'c', 'd', 'e']" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df.columns" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "root\n", + " |-- a: long (nullable = true)\n", + " |-- b: double (nullable = true)\n", + " |-- c: string (nullable = true)\n", + " |-- d: date (nullable = true)\n", + " |-- e: timestamp (nullable = true)\n", + "\n" + ] + } + ], + "source": [ + "df.printSchema()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Show the summary of the DataFrame" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-------+---+---+-------+\n", + "|summary| a| b| c|\n", + "+-------+---+---+-------+\n", + "| count| 3| 3| 3|\n", + "| mean|2.0|3.0| null|\n", + "| stddev|1.0|1.0| null|\n", + "| min| 1|2.0|string1|\n", + "| max| 3|4.0|string3|\n", + "+-------+---+---+-------+\n", + "\n" + ] + } + ], + "source": [ + "df.select(\"a\", \"b\", \"c\").describe().show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "`DataFrame.collect()` collects the distributed data to the driver side as the local data in Python. Note that this can throw an out-of-memory error when the dataset is too larget to fit in the driver side because it collects all the data from executors to the driver side." + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),\n", + " Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),\n", + " Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df.collect()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In order to avoid throwing an out-of-memory exception, use `DataFrame.take()` or `DataFrame.tail()`." + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0))]" + ] + }, + "execution_count": 14, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df.take(1)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "PySpark DataFrame also provides the conversion back to a [pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) to leverage pandas APIs. Note that `toPandas` also collects all data into the driver side that can easily cause an out-of-memory-error when the data is too large to fit into the driver side." + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
abcde
012.0string12000-01-012000-01-01 12:00:00
123.0string22000-02-012000-01-02 12:00:00
234.0string32000-03-012000-01-03 12:00:00
\n", + "
" + ], + "text/plain": [ + " a b c d e\n", + "0 1 2.0 string1 2000-01-01 2000-01-01 12:00:00\n", + "1 2 3.0 string2 2000-02-01 2000-01-02 12:00:00\n", + "2 3 4.0 string3 2000-03-01 2000-01-03 12:00:00" + ] + }, + "execution_count": 15, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df.toPandas()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Selecting and Accessing Data\n", + "\n", + "PySpark DataFrame is lazily evaluated and simply selecting a column does not trigger the computation but it returns a `Column` instance." + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "Column" + ] + }, + "execution_count": 16, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df.a" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In fact, most of column-wise operations return `Column`s." + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 17, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from pyspark.sql import Column\n", + "from pyspark.sql.functions import upper\n", + "\n", + "type(df.c) == type(upper(df.c)) == type(df.c.isNull())" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "These `Column`s can be used to select the columns from a DataFrame. For example, `DataFrame.select()` takes the `Column` instances that returns another DataFrame." + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-------+\n", + "| c|\n", + "+-------+\n", + "|string1|\n", + "|string2|\n", + "|string3|\n", + "+-------+\n", + "\n" + ] + } + ], + "source": [ + "df.select(df.c).show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Assign new `Column` instance." + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+---+-------+----------+-------------------+-------+\n", + "| a| b| c| d| e|upper_c|\n", + "+---+---+-------+----------+-------------------+-------+\n", + "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|\n", + "| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|\n", + "| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|\n", + "+---+---+-------+----------+-------------------+-------+\n", + "\n" + ] + } + ], + "source": [ + "df.withColumn('upper_c', upper(df.c)).show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "To select a subset of rows, use `DataFrame.filter()`." + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+---+-------+----------+-------------------+\n", + "| a| b| c| d| e|\n", + "+---+---+-------+----------+-------------------+\n", + "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n", + "+---+---+-------+----------+-------------------+\n", + "\n" + ] + } + ], + "source": [ + "df.filter(df.a == 1).show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Applying a Function\n", + "\n", + "PySpark supports various UDFs and APIs to allow users to execute Python native functions. See also the latest [Pandas UDFs](https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#pandas-udfs-aka-vectorized-udfs) and [Pandas Function APIs](https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#pandas-function-apis). For instance, the example below allows users to directly use the APIs in [a pandas Series](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.html) within Python native function." + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+------------------+\n", + "|pandas_plus_one(a)|\n", + "+------------------+\n", + "| 2|\n", + "| 3|\n", + "| 4|\n", + "+------------------+\n", + "\n" + ] + } + ], + "source": [ + "import pandas\n", + "from pyspark.sql.functions import pandas_udf\n", + "\n", + "@pandas_udf('long')\n", + "def pandas_plus_one(series: pd.Series) -> pd.Series:\n", + " # Simply plus one by using pandas Series.\n", + " return series + 1\n", + "\n", + "df.select(pandas_plus_one(df.a)).show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Another example is `DataFrame.mapInPandas` which allows users directly use the APIs in a [pandas DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) without any restrictions such as the result length." + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+---+-------+----------+-------------------+\n", + "| a| b| c| d| e|\n", + "+---+---+-------+----------+-------------------+\n", + "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n", + "+---+---+-------+----------+-------------------+\n", + "\n" + ] + } + ], + "source": [ + "def pandas_filter_func(iterator):\n", + " for pandas_df in iterator:\n", + " yield pandas_df[pandas_df.a == 1]\n", + "\n", + "df.mapInPandas(pandas_filter_func, schema=df.schema).show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Grouping Data\n", + "\n", + "PySpark DataFrame also provides a way of handling grouped data by using the common approach, split-apply-combine strategy.\n", + "It groups the data by a certain condition applies a function to each group and then combines them back to the DataFrame." + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----+------+---+---+\n", + "|color| fruit| v1| v2|\n", + "+-----+------+---+---+\n", + "| red|banana| 1| 10|\n", + "| blue|banana| 2| 20|\n", + "| red|carrot| 3| 30|\n", + "| blue| grape| 4| 40|\n", + "| red|carrot| 5| 50|\n", + "|black|carrot| 6| 60|\n", + "| red|banana| 7| 70|\n", + "| red| grape| 8| 80|\n", + "+-----+------+---+---+\n", + "\n" + ] + } + ], + "source": [ + "df = spark.createDataFrame([\n", + " ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],\n", + " ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],\n", + " ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])\n", + "df.show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Grouping and then applying the `avg()` function to the resulting groups." + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----+-------+-------+\n", + "|color|avg(v1)|avg(v2)|\n", + "+-----+-------+-------+\n", + "| red| 4.8| 48.0|\n", + "|black| 6.0| 60.0|\n", + "| blue| 3.0| 30.0|\n", + "+-----+-------+-------+\n", + "\n" + ] + } + ], + "source": [ + "df.groupby('color').avg().show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "You can also apply a Python native function against each group by using pandas APIs." + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----+------+---+---+\n", + "|color| fruit| v1| v2|\n", + "+-----+------+---+---+\n", + "| red|banana| -3| 10|\n", + "| red|carrot| -1| 30|\n", + "| red|carrot| 0| 50|\n", + "| red|banana| 2| 70|\n", + "| red| grape| 3| 80|\n", + "|black|carrot| 0| 60|\n", + "| blue|banana| -1| 20|\n", + "| blue| grape| 1| 40|\n", + "+-----+------+---+---+\n", + "\n" + ] + } + ], + "source": [ + "def plus_mean(pandas_df):\n", + " return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())\n", + "\n", + "df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Co-grouping and applying a function." + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+--------+---+---+---+\n", + "| time| id| v1| v2|\n", + "+--------+---+---+---+\n", + "|20000101| 1|1.0| x|\n", + "|20000102| 1|3.0| x|\n", + "|20000101| 2|2.0| y|\n", + "|20000102| 2|4.0| y|\n", + "+--------+---+---+---+\n", + "\n" + ] + } + ], + "source": [ + "df1 = spark.createDataFrame(\n", + " [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],\n", + " ('time', 'id', 'v1'))\n", + "\n", + "df2 = spark.createDataFrame(\n", + " [(20000101, 1, 'x'), (20000101, 2, 'y')],\n", + " ('time', 'id', 'v2'))\n", + "\n", + "def asof_join(l, r):\n", + " return pd.merge_asof(l, r, on='time', by='id')\n", + "\n", + "df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(\n", + " asof_join, schema='time int, id int, v1 double, v2 string').show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Getting Data in/out\n", + "\n", + "CSV is straightforward and easy to use. Parquet and ORC are efficient and compact file formats to read and write faster.\n", + "\n", + "There are many other data sources available in PySpark such as JDBC, text, binaryFile, Avro, etc. See also the latest [Spark SQL, DataFrames and Datasets Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html) in Apache Spark documentation." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### CSV" + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----+------+---+---+\n", + "|color| fruit| v1| v2|\n", + "+-----+------+---+---+\n", + "| red|banana| 1| 10|\n", + "| blue|banana| 2| 20|\n", + "| red|carrot| 3| 30|\n", + "| blue| grape| 4| 40|\n", + "| red|carrot| 5| 50|\n", + "|black|carrot| 6| 60|\n", + "| red|banana| 7| 70|\n", + "| red| grape| 8| 80|\n", + "+-----+------+---+---+\n", + "\n" + ] + } + ], + "source": [ + "df.write.csv('foo.csv', header=True)\n", + "spark.read.csv('foo.csv', header=True).show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Parquet" + ] + }, + { + "cell_type": "code", + "execution_count": 28, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----+------+---+---+\n", + "|color| fruit| v1| v2|\n", + "+-----+------+---+---+\n", + "| red|banana| 1| 10|\n", + "| blue|banana| 2| 20|\n", + "| red|carrot| 3| 30|\n", + "| blue| grape| 4| 40|\n", + "| red|carrot| 5| 50|\n", + "|black|carrot| 6| 60|\n", + "| red|banana| 7| 70|\n", + "| red| grape| 8| 80|\n", + "+-----+------+---+---+\n", + "\n" + ] + } + ], + "source": [ + "df.write.parquet('bar.parquet')\n", + "spark.read.parquet('bar.parquet').show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### ORC" + ] + }, + { + "cell_type": "code", + "execution_count": 29, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----+------+---+---+\n", + "|color| fruit| v1| v2|\n", + "+-----+------+---+---+\n", + "| red|banana| 1| 10|\n", + "| blue|banana| 2| 20|\n", + "| red|carrot| 3| 30|\n", + "| blue| grape| 4| 40|\n", + "| red|carrot| 5| 50|\n", + "|black|carrot| 6| 60|\n", + "| red|banana| 7| 70|\n", + "| red| grape| 8| 80|\n", + "+-----+------+---+---+\n", + "\n" + ] + } + ], + "source": [ + "df.write.orc('zoo.orc')\n", + "spark.read.orc('zoo.orc').show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Working with SQL\n", + "\n", + "DataFrame and Spark SQL share the same execution engine so they can be interchangeably used seamlessly. For example, you can register the DataFrame as a table and run a SQL easily as below:" + ] + }, + { + "cell_type": "code", + "execution_count": 30, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+--------+\n", + "|count(1)|\n", + "+--------+\n", + "| 8|\n", + "+--------+\n", + "\n" + ] + } + ], + "source": [ + "df.createOrReplaceTempView(\"tableA\")\n", + "spark.sql(\"SELECT count(*) from tableA\").show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "In addition, UDFs can be registered and invoked in SQL out of the box:" + ] + }, + { + "cell_type": "code", + "execution_count": 31, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----------+\n", + "|add_one(v1)|\n", + "+-----------+\n", + "| 2|\n", + "| 3|\n", + "| 4|\n", + "| 5|\n", + "| 6|\n", + "| 7|\n", + "| 8|\n", + "| 9|\n", + "+-----------+\n", + "\n" + ] + } + ], + "source": [ + "@pandas_udf(\"integer\")\n", + "def add_one(s: pd.Series) -> pd.Series:\n", + " return s + 1\n", + "\n", + "spark.udf.register(\"add_one\", add_one)\n", + "spark.sql(\"SELECT add_one(v1) FROM tableA\").show()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "These SQL expressions can directly be mixed and used as PySpark columns." + ] + }, + { + "cell_type": "code", + "execution_count": 32, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----------+\n", + "|add_one(v1)|\n", + "+-----------+\n", + "| 2|\n", + "| 3|\n", + "| 4|\n", + "| 5|\n", + "| 6|\n", + "| 7|\n", + "| 8|\n", + "| 9|\n", + "+-----------+\n", + "\n", + "+--------------+\n", + "|(count(1) > 0)|\n", + "+--------------+\n", + "| true|\n", + "+--------------+\n", + "\n" + ] + } + ], + "source": [ + "from pyspark.sql.functions import expr\n", + "\n", + "df.selectExpr('add_one(v1)').show()\n", + "df.select(expr('count(*)') > 0).show()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.8" + }, + "name": "quickstart", + "notebookId": 1927513300154480 + }, + "nbformat": 4, + "nbformat_minor": 1 +} diff --git a/python/docs/source/index.rst b/python/docs/source/index.rst index b9180cefe5dcc..4286f616374c5 100644 --- a/python/docs/source/index.rst +++ b/python/docs/source/index.rst @@ -21,7 +21,7 @@ PySpark Documentation ===================== -.. TODO(SPARK-32204): Add Binder integration at Live Notebook. +|binder|_ | `GitHub `_ | `Issues `_ | |examples|_ | `Community `_ PySpark is an interface for Apache Spark in Python. It not only allows you to write Spark applications using Python APIs, but also provides the PySpark shell for