From 760cc78d6d9b20cd66f5706767c5d7a42974833c Mon Sep 17 00:00:00 2001 From: Dian Fu Date: Fri, 30 Apr 2021 17:03:42 +0800 Subject: [PATCH 1/3] [FLINK-22539][python][docs] Restructure the Python dependency management documentation --- .../{table => }/dependency_management.md | 8 +- docs/content.zh/docs/dev/python/faq.md | 4 +- .../dev/python/table/intro_to_table_api.md | 2 +- .../table/python_table_api_connectors.md | 2 +- .../dev/python/table/table_environment.md | 2 +- docs/content.zh/docs/dev/table/sql/create.md | 2 +- .../datastream/dependency_management.md | 95 ------ .../docs/dev/python/dependency_management.md | 316 ++++++++++++++++++ docs/content/docs/dev/python/faq.md | 4 +- .../dev/python/table/dependency_management.md | 134 -------- .../dev/python/table/intro_to_table_api.md | 2 +- .../table/python_table_api_connectors.md | 2 +- .../dev/python/table/table_environment.md | 2 +- docs/content/docs/dev/table/sql/create.md | 2 +- 14 files changed, 332 insertions(+), 245 deletions(-) rename docs/content.zh/docs/dev/python/{table => }/dependency_management.md (98%) delete mode 100644 docs/content/docs/dev/python/datastream/dependency_management.md create mode 100644 docs/content/docs/dev/python/dependency_management.md delete mode 100644 docs/content/docs/dev/python/table/dependency_management.md diff --git a/docs/content.zh/docs/dev/python/table/dependency_management.md b/docs/content.zh/docs/dev/python/dependency_management.md similarity index 98% rename from docs/content.zh/docs/dev/python/table/dependency_management.md rename to docs/content.zh/docs/dev/python/dependency_management.md index 0f327656ca55c..aceeadd3acc8d 100644 --- a/docs/content.zh/docs/dev/python/table/dependency_management.md +++ b/docs/content.zh/docs/dev/python/dependency_management.md @@ -1,6 +1,6 @@ --- title: "依赖管理" -weight: 46 +weight: 30 type: docs aliases: - /zh/dev/python/table-api-users-guide/dependency_management.html @@ -26,7 +26,7 @@ under the License. # 依赖管理 -# Java Dependency in Python Program +## Java Dependency in Python Program 如果应用了第三方 Java 依赖, 用户可以通过以下 Python Table API进行配置,或者在提交作业时直接通过[命令行参数]({{< ref "docs/deployment/cli" >}}#usage) 配置. @@ -40,7 +40,7 @@ table_env.get_config().get_configuration().set_string("pipeline.jars", "file:/// table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar") ``` -# Python 依赖管理 +## Python 依赖管理 如果程序中应用到了 Python 第三方依赖,用户可以使用以下 Table API 配置依赖信息,或在提交作业时直接通过命令行参数配置。 @@ -102,7 +102,7 @@ table_env.get_config().set_python_executable("py_env.zip/py_env/bin/python") 请确保配置的 Python 环境和集群运行环境匹配。 -# Java/Scala程序中的Python依赖管理 +## Java/Scala程序中的Python依赖管理 It also supports to use Python UDFs in the Java Table API programs or pure SQL programs. The following example shows how to use the Python UDFs in a Java Table API program: diff --git a/docs/content.zh/docs/dev/python/faq.md b/docs/content.zh/docs/dev/python/faq.md index 6db7b0b32e76e..251c25979f898 100644 --- a/docs/content.zh/docs/dev/python/faq.md +++ b/docs/content.zh/docs/dev/python/faq.md @@ -55,7 +55,7 @@ $ # 指定用于执行python UDF workers (用户自定义函数工作者) 的pyt $ table_env.get_config().set_python_executable("venv.zip/venv/bin/python") ``` -如果需要了解`add_python_archive`和`set_python_executable`用法的详细信息,请参阅[相关文档]({{< ref "docs/dev/python/table/dependency_management" >}}#python-dependency-in-python-program)。 +如果需要了解`add_python_archive`和`set_python_executable`用法的详细信息,请参阅[相关文档]({{< ref "docs/dev/python/dependency_management" >}}#python-dependency-in-python-program)。 ## 添加Jar文件 @@ -70,7 +70,7 @@ table_env.get_config().get_configuration().set_string("pipeline.jars", "file:/// table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar") ``` -有关添加Java依赖项的API的详细信息,请参阅[相关文档]({{< ref "docs/dev/python/table/dependency_management" >}}#java-dependency-in-python-program)。 +有关添加Java依赖项的API的详细信息,请参阅[相关文档]({{< ref "docs/dev/python/dependency_management" >}}#java-dependency-in-python-program)。 ## 添加Python文件 您可以使用命令行参数`pyfs`或TableEnvironment的API `add_python_file`添加python文件依赖,这些依赖可以是python文件,python软件包或本地目录。 diff --git a/docs/content.zh/docs/dev/python/table/intro_to_table_api.md b/docs/content.zh/docs/dev/python/table/intro_to_table_api.md index 6fdd27921d942..bb4ba470c7809 100644 --- a/docs/content.zh/docs/dev/python/table/intro_to_table_api.md +++ b/docs/content.zh/docs/dev/python/table/intro_to_table_api.md @@ -113,7 +113,7 @@ table_env = TableEnvironment.create(env_settings) * 执行 SQL 查询,更多细节可查阅 [SQL]({{< ref "docs/dev/table/sql/overview" >}}) * 注册用户自定义的 (标量,表值,或者聚合) 函数, 更多细节可查阅 [普通的用户自定义函数]({{< ref "docs/dev/python/table/udfs/python_udfs" >}}) 和 [向量化的用户自定义函数]({{< ref "docs/dev/python/table/udfs/vectorized_python_udfs" >}}) * 配置作业,更多细节可查阅 [Python 配置]({{< ref "docs/dev/python/python_config" >}}) -* 管理 Python 依赖,更多细节可查阅 [依赖管理]({{< ref "docs/dev/python/table/dependency_management" >}}) +* 管理 Python 依赖,更多细节可查阅 [依赖管理]({{< ref "docs/dev/python/dependency_management" >}}) * 提交作业执行 目前有2个可用的执行器 : flink 执行器 和 blink 执行器。 diff --git a/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md b/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md index 1a6658d0ae7c8..2ebfbce6681a6 100644 --- a/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md +++ b/docs/content.zh/docs/dev/python/table/python_table_api_connectors.md @@ -32,7 +32,7 @@ under the License. ## 下载连接器(connector)和格式(format)jar 包 -由于 Flink 是一个基于 Java/Scala 的项目,连接器(connector)和格式(format)的实现是作为 jar 包存在的,要在 PyFlink 作业中使用,首先需要将其指定为作业的 [依赖]({{< ref "docs/dev/python/table/dependency_management" >}})。 +由于 Flink 是一个基于 Java/Scala 的项目,连接器(connector)和格式(format)的实现是作为 jar 包存在的,要在 PyFlink 作业中使用,首先需要将其指定为作业的 [依赖]({{< ref "docs/dev/python/dependency_management" >}})。 ```python diff --git a/docs/content.zh/docs/dev/python/table/table_environment.md b/docs/content.zh/docs/dev/python/table/table_environment.md index d3d498ba5a7f0..b595b64f996a7 100644 --- a/docs/content.zh/docs/dev/python/table/table_environment.md +++ b/docs/content.zh/docs/dev/python/table/table_environment.md @@ -512,7 +512,7 @@ TableEnvironment API ### 依赖管理 这些 APIs 用来管理 Python UDFs 所需要的 Python 依赖。 -更多细节可查阅 [依赖管理]({{< ref "docs/dev/python/table/dependency_management" >}}#python-dependency-in-python-program)。 +更多细节可查阅 [依赖管理]({{< ref "docs/dev/python/dependency_management" >}}#python-dependency-in-python-program)。 diff --git a/docs/content.zh/docs/dev/table/sql/create.md b/docs/content.zh/docs/dev/table/sql/create.md index 8d4d045ca3871..6004dbb0e3292 100644 --- a/docs/content.zh/docs/dev/table/sql/create.md +++ b/docs/content.zh/docs/dev/table/sql/create.md @@ -587,7 +587,7 @@ CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION 如果 language tag 是 PYTHON,则 identifier 是 UDF 对象的全限定名,例如 `pyflink.table.tests.test_udf.add`。关于 PYTHON UDF 的实现,请参考 [Python UDFs]({{< ref "docs/dev/python/table/udfs/python_udfs" >}})。 -如果 language tag 是 PYTHON,而当前程序是 Java/Scala 程序或者纯 SQL 程序,则需要[配置 Python 相关的依赖]({{< ref "docs/dev/python/table/dependency_management" >}}#python-dependency-in-javascala-program)。 +如果 language tag 是 PYTHON,而当前程序是 Java/Scala 程序或者纯 SQL 程序,则需要[配置 Python 相关的依赖]({{< ref "docs/dev/python/dependency_management" >}}#python-dependency-in-javascala-program)。 **TEMPORARY** diff --git a/docs/content/docs/dev/python/datastream/dependency_management.md b/docs/content/docs/dev/python/datastream/dependency_management.md deleted file mode 100644 index f890c242c1dde..0000000000000 --- a/docs/content/docs/dev/python/datastream/dependency_management.md +++ /dev/null @@ -1,95 +0,0 @@ ---- -title: "Dependency Management" -weight: 41 -type: docs -aliases: - - /dev/python/datastream-api-users-guide/dependency_management.html ---- - - -# Dependency Management - -# Java Dependency - -If third-party Java dependencies are used, you can specify the dependencies with the following Python DataStream APIs or -through [command line arguments]({{< ref "docs/deployment/cli" >}}#usage) directly when submitting the job. - -```python -# Use the add_jars() to add local jars and the jars will be uploaded to the cluster. -# NOTE: Only local file URLs (start with "file://") are supported. -stream_execution_environment.add_jars("file:///my/jar/path/connector.jar", ...) - -# Use the add_classpaths() to add the dependent jars URL into -# the classpath. And the URL will also be added to the classpath of the cluster. -# NOTE: The Paths must specify a protocol (e.g. file://) and users should ensure that the -# URLs are accessible on both the client and the cluster. -stream_execution_environment.add_classpaths("file:///my/jar/path/connector.jar", ...) -``` -**Note:** These APIs could be called multiple times. - -# Python Dependency - -If third-party Python dependencies are used, you can specify the dependencies with the following Python DataStream -APIs or through [command line arguments]({{< ref "docs/deployment/cli" >}}#usage) directly when submitting the job. - -Please make sure the uploaded python environment matches the platform that the cluster is running on. Currently only zip-format is supported. i.e. zip, jar, whl, egg, etc. - -#### add_python_file(file_path) - -Adds python file dependencies which could be python files, python packages or local directories. They will be added to the PYTHONPATH of the python UDF worker. - -```python -stream_execution_environment.add_python_file(file_path) -``` - -#### set_python_requirements(requirements_file_path, requirements_cache_dir=None) - -Specifies a requirements.txt file which defines the third-party dependencies. These dependencies will be installed to a temporary directory and added to the PYTHONPATH of the python UDF worker. For the dependencies which could not be accessed in the cluster, a directory which contains the installation packages of these dependencies could be specified using the parameter "requirements_cached_dir". It will be uploaded to the cluster to support offline installation. - -```python -# commands executed in shell -echo numpy==1.16.5 > requirements.txt -pip download -d cached_dir -r requirements.txt --no-binary :all: - -# python code -stream_execution_environment.set_python_requirements("/path/to/requirements.txt", "cached_dir") -``` - -Please make sure the installation packages matches the platform of the cluster and the python version used. These packages will be installed using pip, so also make sure the version of Pip (version >= 7.1.0) and the version of Setuptools (version >= 37.0.0). - -#### add_python_archive(archive_path, target_dir=None) - -Adds a python archive file dependency. The file will be extracted to the working directory of python UDF worker. If the parameter "target_dir" is specified, the archive file will be extracted to a directory named "target_dir". Otherwise, the archive file will be extracted to a directory with the same name of the archive file. - -```python -# command executed in shell -# assert the relative path of python interpreter is py_env/bin/python -zip -r py_env.zip py_env - -# python code -stream_execution_environment.add_python_archive("/path/to/py_env.zip") -# or -stream_execution_environment.add_python_archive("/path/to/py_env.zip", "myenv") - -# the files contained in the archive file can be accessed in UDF -def my_func(): - with open("myenv/py_env/data/data.txt") as f: - ... -``` diff --git a/docs/content/docs/dev/python/dependency_management.md b/docs/content/docs/dev/python/dependency_management.md new file mode 100644 index 0000000000000..fc96b9cad7e3c --- /dev/null +++ b/docs/content/docs/dev/python/dependency_management.md @@ -0,0 +1,316 @@ +--- +title: "Dependency Management" +weight: 30 +type: docs +aliases: + - /dev/python/table-api-users-guide/dependency_management.html + - /dev/python/datastream-api-users-guide/dependency_management.html +--- + + +# Dependency Management + +Users may need to access third-party Python libraries in Python user-defined functions. +In addition, in scenarios such as machine learning prediction, users may want to load a machine +learning model inside the Python user-defined functions. When the PyFlink job is executed +locally, users could install the third-party Python libraries in the local Python environment, +download the machine learning model to local, etc. However, this approach doesn't work well when +users want to submit the PyFlink jobs to remote clusters. In the following sections, +we will introduce the options provided in PyFlink for these requirements. + +## JAR Dependencies + +If third-party JARs are used, you can specify the JARs in the Python Table API as following: + +```python +# Specify a list of jar URLs via "pipeline.jars". The jars are separated by ";" +# and will be uploaded to the cluster. +# NOTE: Only local file URLs (start with "file://") are supported. +table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar") + +# Specify a list of URLs via "pipeline.classpaths". The URLs are separated by ";" +# and will be added to the classpath during job execution. +# NOTE: The paths must specify a protocol (e.g. file://) and users should ensure that the URLs are accessible on both the client and the cluster. +table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar") +``` + +or in the Python DataStream API as following: + +```python +# Use the add_jars() to add local jars and the jars will be uploaded to the cluster. +# NOTE: Only local file URLs (start with "file://") are supported. +stream_execution_environment.add_jars("file:///my/jar/path/connector1.jar", "file:///my/jar/path/connector2.jar") + +# Use the add_classpaths() to add the dependent jars URLs into the classpath. +# The URLs will also be added to the classpath of both the client and the cluster. +# NOTE: The paths must specify a protocol (e.g. file://) and users should ensure that the +# URLs are accessible on both the client and the cluster. +stream_execution_environment.add_classpaths("file:///my/jar/path/connector1.jar", "file:///my/jar/path/connector2.jar") +``` + +or through the [command line arguments]({{< ref "docs/deployment/cli" >}}#submitting-pyflink-jobs) `--jarfile` when submitting the job. + +Note It only supports to specify one jar file with the command +line argument `--jarfile` and so you need to build a fat jar if there are multiple jar files. + + +## Python Dependencies + +If third-party Python dependencies are used, you can specify the dependencies with the following +Python Table APIs or through [command line arguments]({{< ref "docs/deployment/cli" >}}#submitting-pyflink-jobs) +directly when submitting the job. + +### Python libraries + +You may want to use third-part Python libraries in Python user-defined functions. +There are multiple ways to specify the Python libraries. + +You could specify them inside the code using Python Table API as following: + +```python +table_env.add_python_file(file_path) +``` + +or using Python DataStream API as following: + +```python +stream_execution_environment.add_python_file(file_path) +``` + +You could also specify the Python libraries using configuration +[`python.files`]({{< ref "docs/dev/python/python_config" >}}#python-files) +or via [command line arguments]({{< ref "docs/deployment/cli" >}}#submitting-pyflink-jobs) `-pyfs` or `--pyFiles` +when submitting the job. + +Note The Python libraries dependencies could be files or +local directories. They will be added to the PYTHONPATH of the Python UDF worker. + +### requirements.txt + +It also allows to specify a `requirements.txt` file which defines the third-party Python dependencies. +These Python dependencies will be installed to the working directory and added to the PYTHONPATH of +the Python UDF worker. + +You could prepare the `requirements.txt` manually as following: + +```shell +echo numpy==1.16.5 > requirements.txt +echo pandas==1.0.0 > requirements.txt +``` + +or using `pip freeze` which lists all the packages installed in the current Python environment: + +```shell +pip freeze > requirements.txt +``` + +The requirements.txt may look like the following: + +```shell +numpy==1.16.5 +pandas==1.0.0 +``` + +You could also manually edit it by removing unnecessary items or adding extra items, etc. + +You could then specify the `requirements.txt` file inside the code using Python Table API as following: + +```python +# requirements_cache_dir is optional +table_env.set_python_requirements( + requirements_file_path="/path/to/requirements.txt", + requirements_cache_dir="cached_dir") +``` + +or using Python DataStream API as following: + +```python +# requirements_cache_dir is optional +stream_execution_environment.set_python_requirements( + requirements_file_path="/path/to/requirements.txt", + requirements_cache_dir="cached_dir") +``` + +Note For the dependencies which could not be accessed in +the cluster, a directory which contains the installation packages of these dependencies could be +specified using the parameter `requirements_cached_dir`. It will be uploaded to the cluster to +support offline installation. You could prepare the `cached_dir` as following: + +```shell +pip download -d cached_dir -r requirements.txt --no-binary :all: +``` + +Note Please make sure that the prepared packages match +the platform of the cluster, and the Python version used. + +You could also specify the `requirements.txt` file using configuration +[`python.requirements`]({{< ref "docs/dev/python/python_config" >}}#python-requirements) +or via [command line arguments]({{< ref "docs/deployment/cli" >}}#submitting-pyflink-jobs) +`-pyreq` or `--pyRequirements` when submitting the job. + +Note It will install the packages specified in the +`requirements.txt` file using pip, so please make sure that pip (version >= 7.1.0) +and setuptools (version >= 37.0.0) are available. + +### Archives + +You may also want to specify archive files. The archive files could be used to specify custom +Python virtual environments, data files, etc. + +You could specify the archive files inside the code using Python Table API as following: + +```python +table_env.add_python_archive(archive_path="/path/to/archive_file", target_dir=None) +``` + +or using Python DataStream API as following: + +```python +stream_execution_environment.add_python_archive(archive_path="/path/to/archive_file", target_dir=None) +``` + +Note The parameter `target_dir` is optional. If specified, +the archive file will be extracted to a directory named `target_dir` during execution. Otherwise, +the archive file will be extracted to a directory with the same name as the archive file. + +Suppose you have specified the archive file as following: + +```python +table_env.add_python_archive("/path/to/py_env.zip", "myenv") +``` + +Then, you could access the content of the archive file in Python user-defined functions as following: + +```python +def my_udf(): + with open("myenv/py_env/data/data.txt") as f: + ... +``` + +If you have not specified the parameter `target_dir`: + +```python +table_env.add_python_archive("/path/to/py_env.zip") +``` + +You could then access the content of the archive file in Python user-defined functions as following: + +```python +def my_udf(): + with open("py_env.zip/py_env/data/data.txt") as f: + ... +``` + +Note The archive file will be extracted to the working +directory of python UDF worker and so you could access the files inside the archive file using +relative path. + +You could also specify the archive files using configuration +[`python.archives`]({{< ref "docs/dev/python/python_config" >}}#python-archives) +or via [command line arguments]({{< ref "docs/deployment/cli" >}}#submitting-pyflink-jobs) +`-pyarch` or `--pyArchives` when submitting the job. + +Note If the archive file contains a Python virtual environment, +please make sure that the Python virtual environment matches the platform that the cluster is running on. + +Note Currently, only zip-format is supported, i.e. zip, jar, whl, egg, etc. + +### Python interpreter + +It supports to specify the path of the Python interpreter to execute Python worker. + +You could specify the Python interpreter inside the code using Python Table API as following: + +```python +table_env.set_python_executable("/path/to/python") +``` + +or using Python DataStream API as following: + +```python +stream_execution_environment.set_python_executable("/path/to/python") +``` + +It also supports to use the Python interpreter inside an archive file. + +```python +# Python Table API +table_env.add_python_archive("/path/to/py_env.zip", "venv") +table_env.set_python_executable("venv/py_env/bin/python") + +# Python DataStream API +stream_execution_environment.add_python_archive("/path/to/py_env.zip", "venv") +stream_execution_environment.set_python_executable("venv/py_env/bin/python") +``` + +You could also specify the Python interpreter using configuration +[`python.executable`]({{< ref "docs/dev/python/python_config" >}}#python-executable) +or via [command line arguments]({{< ref "docs/deployment/cli" >}}#submitting-pyflink-jobs) +`-pyexec` or `--pyExecutable` when submitting the job. + +Note If the path of the Python interpreter refers to the +uploaded Python archive file, it should be a relative path instead of absolute path. + +### Python interpreter of client + +Python is needed at the client side to parse the Python user-defined functions during +compiling the job. + +You could specify the custom Python environment used at the client side by activating it in the current session. + +```shell +source my_env/bin/activate +``` + +or specify it using configuration +[`python.client.executable`]({{< ref "docs/dev/python/python_config" >}}#python-client-executable) +or environment variable [PYFLINK_CLIENT_EXECUTABLE]({{< ref "docs/dev/python/environment_variables" >}}) + +## How to specify Python Dependencies in Java/Scala Program + +It also supports to use Python user-defined functions in the Java Table API programs or pure SQL programs. +The following code shows a simple example on how to use the Python user-defined functions in a +Java Table API program: + +```java +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; + +TableEnvironment tEnv = TableEnvironment.create( + EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()); +tEnv.getConfig().getConfiguration().set(CoreOptions.DEFAULT_PARALLELISM, 1); + +// register the Python UDF +tEnv.executeSql("create temporary system function add_one as 'add_one.add_one' language python"); + +tEnv.createTemporaryView("source", tEnv.fromValues(1L, 2L, 3L).as("a")); + +// use Python UDF in the Java Table API program +tEnv.executeSql("select add_one(a) as a from source").collect(); +``` + +You can refer to the SQL statement about [CREATE FUNCTION]({{< ref "docs/dev/table/sql/create" >}}#create-function) +for more details on how to create Python user-defined functions using SQL statements. + +The Python dependencies could then be specified via the Python [config options]({{< ref "docs/dev/python/python_config" >}}#python-options), +such as **python.archives**, **python.files**, **python.requirements**, **python.client.executable**, +**python.executable**. etc or through [command line arguments]({{< ref "docs/deployment/cli" >}}#usage) +when submitting the job. diff --git a/docs/content/docs/dev/python/faq.md b/docs/content/docs/dev/python/faq.md index cee91e8482826..592e2a4ed990d 100644 --- a/docs/content/docs/dev/python/faq.md +++ b/docs/content/docs/dev/python/faq.md @@ -65,7 +65,7 @@ $ # specify the path of the python interpreter which is used to execute the pyth $ table_env.get_config().set_python_executable("venv.zip/venv/bin/python") ``` -For details on the usage of `add_python_archive` and `set_python_executable`, you can refer to [the relevant documentation]({{< ref "docs/dev/python/table/dependency_management" >}}#usage). +For details on the usage of `add_python_archive` and `set_python_executable`, you can refer to [the relevant documentation]({{< ref "docs/dev/python/dependency_management" >}}#usage). ## Adding Jar Files @@ -80,7 +80,7 @@ table_env.get_config().get_configuration().set_string("pipeline.jars", "file:/// table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar") ``` -For details about the APIs of adding Java dependency, you can refer to [the relevant documentation]({{< ref "docs/dev/python/table/dependency_management" >}}#java-dependency-in-python-program) +For details about the APIs of adding Java dependency, you can refer to [the relevant documentation]({{< ref "docs/dev/python/dependency_management" >}}#java-dependency-in-python-program) ## Adding Python Files You can use the command-line arguments `pyfs` or the API `add_python_file` of `TableEnvironment` to add python file dependencies which could be python files, python packages or local directories. diff --git a/docs/content/docs/dev/python/table/dependency_management.md b/docs/content/docs/dev/python/table/dependency_management.md deleted file mode 100644 index 2a68f3363d71f..0000000000000 --- a/docs/content/docs/dev/python/table/dependency_management.md +++ /dev/null @@ -1,134 +0,0 @@ ---- -title: "Dependency Management" -weight: 46 -type: docs -aliases: - - /dev/python/table-api-users-guide/dependency_management.html ---- - - -# Dependency Management - -# Java Dependency in Python Program - -If third-party Java dependencies are used, you can specify the dependencies with the following Python Table APIs or through [command line arguments]({{< ref "docs/deployment/cli" >}}#usage) directly when submitting the job. - -```python -# Specify a list of jar URLs via "pipeline.jars". The jars are separated by ";" -#and will be uploaded to the cluster. -# NOTE: Only local file URLs (start with "file://") are supported. -table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar") - -# Specify a list of URLs via "pipeline.classpaths". The URLs are separated by ";" -# and will be added to the classpath of the cluster. -# NOTE: The Paths must specify a protocol (e.g. file://) and users should ensure that the URLs are accessible on both the client and the cluster. -table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar") -``` - -# Python Dependency in Python Program - -If third-party Python dependencies are used, you can specify the dependencies with the following Python Table APIs or through [command line arguments]({{< ref "docs/deployment/cli" >}}#usage) directly when submitting the job. - -#### add_python_file(file_path) - -Adds python file dependencies which could be python files, python packages or local directories. They will be added to the PYTHONPATH of the python UDF worker. - -```python -table_env.add_python_file(file_path) -``` - -#### set_python_requirements(requirements_file_path, requirements_cache_dir=None) - -Specifies a requirements.txt file which defines the third-party dependencies. These dependencies will be installed to a temporary directory and added to the PYTHONPATH of the python UDF worker. For the dependencies which could not be accessed in the cluster, a directory which contains the installation packages of these dependencies could be specified using the parameter "requirements_cached_dir". It will be uploaded to the cluster to support offline installation. - -```python -# commands executed in shell -echo numpy==1.16.5 > requirements.txt -pip download -d cached_dir -r requirements.txt --no-binary :all: - -# python code -table_env.set_python_requirements("/path/to/requirements.txt", "cached_dir") -``` - -Please make sure the installation packages matches the platform of the cluster and the python version used. These packages will be installed using pip, so also make sure the version of Pip (version >= 7.1.0) and the version of SetupTools (version >= 37.0.0). - -#### add_python_archive(archive_path, target_dir=None) - -Adds a python archive file dependency. The file will be extracted to the working directory of python UDF worker. If the parameter "target_dir" is specified, the archive file will be extracted to a directory named "target_dir". Otherwise, the archive file will be extracted to a directory with the same name of the archive file. - -```python -# command executed in shell -# assert the relative path of python interpreter is py_env/bin/python -zip -r py_env.zip py_env - -# python code -table_env.add_python_archive("/path/to/py_env.zip") -# or -table_env.add_python_archive("/path/to/py_env.zip", "myenv") - -# the files contained in the archive file can be accessed in UDF -def my_udf(): - with open("myenv/py_env/data/data.txt") as f: - ... -``` - -Please make sure the uploaded python environment matches the platform that the cluster is running on. Currently only zip-format is supported. i.e. zip, jar, whl, egg, etc. - -#### set_python_executable(python_exec) - -Sets the path of the python interpreter which is used to execute the python udf workers, e.g., "/usr/local/bin/python3". - -```python -table_env.add_python_archive("/path/to/py_env.zip") -table_env.get_config().set_python_executable("py_env.zip/py_env/bin/python") -``` - -Please note that if the path of the python interpreter comes from the uploaded python archive, the path specified in set_python_executable should be a relative path. - -Please make sure that the specified environment matches the platform that the cluster is running on. - -# Python Dependency in Java/Scala Program - -It also supports to use Python UDFs in the Java Table API programs or pure SQL programs. The following example shows how to -use the Python UDFs in a Java Table API program: - -```java -import org.apache.flink.configuration.CoreOptions; -import org.apache.flink.table.api.EnvironmentSettings; -import org.apache.flink.table.api.TableEnvironment; - -TableEnvironment tEnv = TableEnvironment.create( - EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()); -tEnv.getConfig().getConfiguration().set(CoreOptions.DEFAULT_PARALLELISM, 1); - -// register the Python UDF -tEnv.executeSql("create temporary system function add_one as 'add_one.add_one' language python"); - -tEnv.createTemporaryView("source", tEnv.fromValues(1L, 2L, 3L).as("a")); - -// use Python UDF in the Java Table API program -tEnv.executeSql("select add_one(a) as a from source").collect(); -``` - -You can refer to the SQL statement about [CREATE FUNCTION]({{< ref "docs/dev/table/sql/create" >}}#create-function) for more details -on how to create Python user-defined functions using SQL statements. - -The Python dependencies could be specified via the Python [config options]({{< ref "docs/dev/python/python_config" >}}#python-options), -such as **python.archives**, **python.files**, **python.requirements**, **python.client.executable**, **python.executable**. etc or through [command line arguments]({{< ref "docs/deployment/cli" >}}#usage) when submitting the job. diff --git a/docs/content/docs/dev/python/table/intro_to_table_api.md b/docs/content/docs/dev/python/table/intro_to_table_api.md index d7b74d36f400e..6e2cfbfac3790 100644 --- a/docs/content/docs/dev/python/table/intro_to_table_api.md +++ b/docs/content/docs/dev/python/table/intro_to_table_api.md @@ -111,7 +111,7 @@ The `TableEnvironment` is responsible for: * Executing SQL queries, see [SQL]({{< ref "docs/dev/table/sql/overview" >}}) for more details * Registering user-defined (scalar, table, or aggregation) functions, see [General User-defined Functions]({{< ref "docs/dev/python/table/udfs/python_udfs" >}}) and [Vectorized User-defined Functions]({{< ref "docs/dev/python/table/udfs/vectorized_python_udfs" >}}) for more details * Configuring the job, see [Python Configuration]({{< ref "docs/dev/python/python_config" >}}) for more details -* Managing Python dependencies, see [Dependency Management]({{< ref "docs/dev/python/table/dependency_management" >}}) for more details +* Managing Python dependencies, see [Dependency Management]({{< ref "docs/dev/python/dependency_management" >}}) for more details * Submitting the jobs for execution Currently there are 2 planners available: flink planner and blink planner. diff --git a/docs/content/docs/dev/python/table/python_table_api_connectors.md b/docs/content/docs/dev/python/table/python_table_api_connectors.md index d18ad4b87a62d..1f8ad70d514fa 100644 --- a/docs/content/docs/dev/python/table/python_table_api_connectors.md +++ b/docs/content/docs/dev/python/table/python_table_api_connectors.md @@ -32,7 +32,7 @@ This page describes how to use connectors in PyFlink and highlights the details ## Download connector and format jars -Since Flink is a Java/Scala-based project, for both connectors and formats, implementations are available as jars that need to be specified as job [dependencies]({{< ref "docs/dev/python/table/dependency_management" >}}). +Since Flink is a Java/Scala-based project, for both connectors and formats, implementations are available as jars that need to be specified as job [dependencies]({{< ref "docs/dev/python/dependency_management" >}}). ```python diff --git a/docs/content/docs/dev/python/table/table_environment.md b/docs/content/docs/dev/python/table/table_environment.md index e4b285f989397..3baa67185698d 100644 --- a/docs/content/docs/dev/python/table/table_environment.md +++ b/docs/content/docs/dev/python/table/table_environment.md @@ -514,7 +514,7 @@ For more details about the different kinds of UDFs, please refer to [User Define ### Dependency Management These APIs are used to manage the Python dependencies which are required by the Python UDFs. -Please refer to the [Dependency Management]({{< ref "docs/dev/python/table/dependency_management" >}}#python-dependency-in-python-program) documentation for more details. +Please refer to the [Dependency Management]({{< ref "docs/dev/python/dependency_management" >}}#python-dependency-in-python-program) documentation for more details.
diff --git a/docs/content/docs/dev/table/sql/create.md b/docs/content/docs/dev/table/sql/create.md index 07a2d956f310a..95f78ff30419f 100644 --- a/docs/content/docs/dev/table/sql/create.md +++ b/docs/content/docs/dev/table/sql/create.md @@ -588,7 +588,7 @@ If the language tag is JAVA/SCALA, the identifier is the full classpath of the U If the language tag is PYTHON, the identifier is the fully qualified name of the UDF, e.g. `pyflink.table.tests.test_udf.add`. For the implementation of Python UDF, please refer to [Python UDFs]({{< ref "docs/dev/python/table/udfs/python_udfs" >}}) for more details. -If the language tag is PYTHON, however the current program is written in Java/Scala or pure SQL, then you need to [configure the Python dependencies]({{< ref "docs/dev/python/table/dependency_management" >}}#python-dependency-in-javascala-program). +If the language tag is PYTHON, however the current program is written in Java/Scala or pure SQL, then you need to [configure the Python dependencies]({{< ref "docs/dev/python/dependency_management" >}}#python-dependency-in-javascala-program). **TEMPORARY** From 21c909a7845f93acf9aea096d6af4dc0da8d3403 Mon Sep 17 00:00:00 2001 From: Dian Fu Date: Fri, 30 Apr 2021 17:21:46 +0800 Subject: [PATCH 2/3] minor --- .../datastream/dependency_management.md | 95 ------- .../docs/dev/python/dependency_management.md | 266 +++++++++++++++--- .../docs/dev/python/dependency_management.md | 41 ++- 3 files changed, 245 insertions(+), 157 deletions(-) delete mode 100644 docs/content.zh/docs/dev/python/datastream/dependency_management.md diff --git a/docs/content.zh/docs/dev/python/datastream/dependency_management.md b/docs/content.zh/docs/dev/python/datastream/dependency_management.md deleted file mode 100644 index 4e8ea85736368..0000000000000 --- a/docs/content.zh/docs/dev/python/datastream/dependency_management.md +++ /dev/null @@ -1,95 +0,0 @@ ---- -title: "Dependency Management" -weight: 41 -type: docs -aliases: - - /zh/dev/python/datastream-api-users-guide/dependency_management.html ---- - - -# Dependency Management - -# Java Dependency - -If third-party Java dependencies are used, you can specify the dependencies with the following Python DataStream APIs or -through [command line arguments]({{< ref "docs/deployment/cli" >}}#usage) directly when submitting the job. - -```python -# Use the add_jars() to add local jars and the jars will be uploaded to the cluster. -# NOTE: Only local file URLs (start with "file://") are supported. -stream_execution_environment.add_jars("file:///my/jar/path/connector.jar", ...) - -# Use the add_classpaths() to add the dependent jars URL into -# the classpath. And the URL will also be added to the classpath of the cluster. -# NOTE: The Paths must specify a protocol (e.g. file://) and users should ensure that the -# URLs are accessible on both the client and the cluster. -stream_execution_environment.add_classpaths("file:///my/jar/path/connector.jar", ...) -``` -**Note:** These APIs could be called multiple times. - -# Python Dependency - -If third-party Python dependencies are used, you can specify the dependencies with the following Python DataStream -APIs or through [command line arguments]({{< ref "docs/deployment/cli" >}}#usage) directly when submitting the job. - -Please make sure the uploaded python environment matches the platform that the cluster is running on. Currently only zip-format is supported. i.e. zip, jar, whl, egg, etc. - -#### add_python_file(file_path) - -Adds python file dependencies which could be python files, python packages or local directories. They will be added to the PYTHONPATH of the python UDF worker. - -```python -stream_execution_environment.add_python_file(file_path) -``` - -#### set_python_requirements(requirements_file_path, requirements_cache_dir=None) - -Specifies a requirements.txt file which defines the third-party dependencies. These dependencies will be installed to a temporary directory and added to the PYTHONPATH of the python UDF worker. For the dependencies which could not be accessed in the cluster, a directory which contains the installation packages of these dependencies could be specified using the parameter "requirements_cached_dir". It will be uploaded to the cluster to support offline installation. - -```python -# commands executed in shell -echo numpy==1.16.5 > requirements.txt -pip download -d cached_dir -r requirements.txt --no-binary :all: - -# python code -stream_execution_environment.set_python_requirements("/path/to/requirements.txt", "cached_dir") -``` - -Please make sure the installation packages matches the platform of the cluster and the python version used. These packages will be installed using pip, so also make sure the version of Pip (version >= 7.1.0) and the version of Setuptools (version >= 37.0.0). - -#### add_python_archive(archive_path, target_dir=None) - -Adds a python archive file dependency. The file will be extracted to the working directory of python UDF worker. If the parameter "target_dir" is specified, the archive file will be extracted to a directory named "target_dir". Otherwise, the archive file will be extracted to a directory with the same name of the archive file. - -```python -# command executed in shell -# assert the relative path of python interpreter is py_env/bin/python -zip -r py_env.zip py_env - -# python code -stream_execution_environment.add_python_archive("/path/to/py_env.zip") -# or -stream_execution_environment.add_python_archive("/path/to/py_env.zip", "myenv") - -# the files contained in the archive file can be accessed in UDF -def my_func(): - with open("myenv/py_env/data/data.txt") as f: - ... -``` diff --git a/docs/content.zh/docs/dev/python/dependency_management.md b/docs/content.zh/docs/dev/python/dependency_management.md index aceeadd3acc8d..e8f8ba75a1e96 100644 --- a/docs/content.zh/docs/dev/python/dependency_management.md +++ b/docs/content.zh/docs/dev/python/dependency_management.md @@ -4,6 +4,7 @@ weight: 30 type: docs aliases: - /zh/dev/python/table-api-users-guide/dependency_management.html + - /zh/dev/python/datastream-api-users-guide/dependency_management.html --- -# 依赖管理 +# Dependency Management -## Java Dependency in Python Program +There are requirements to use dependencies inside the Python API programs. For example, users +may need to use third-party Python libraries in Python user-defined functions. +In addition, in scenarios such as machine learning prediction, users may want to load a machine +learning model inside the Python user-defined functions. -如果应用了第三方 Java 依赖, 用户可以通过以下 Python Table API进行配置,或者在提交作业时直接通过[命令行参数]({{< ref "docs/deployment/cli" >}}#usage) 配置. +When the PyFlink job is executed locally, users could install the third-party Python libraries into +the local Python environment, download the machine learning model to local, etc. +However, this approach doesn't work well when users want to submit the PyFlink jobs to remote clusters. +In the following sections, we will introduce the options provided in PyFlink for these requirements. + +## JAR Dependencies + +If third-party JARs are used, you can specify the JARs in the Python Table API as following: ```python -# 通过 "pipeline.jars" 参数指定 jar 包 URL列表, 每个 URL 使用 ";" 分隔。这些 jar 包最终会被上传到集群中。 -# 注意:当前支持通过本地文件 URL 进行上传(以 "file://" 开头)。 +# Specify a list of jar URLs via "pipeline.jars". The jars are separated by ";" +# and will be uploaded to the cluster. +# NOTE: Only local file URLs (start with "file://") are supported. table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar") -# 通过 "pipeline.jars" 参数指定依赖 URL 列表, 这些 URL 通过 ";" 分隔,它们最终会被添加到集群的 classpath 中。 -# 注意: 必须指定这些文件路径的协议 (如 file://), 并确保这些 URL 在本地客户端和集群都能访问。 +# Specify a list of URLs via "pipeline.classpaths". The URLs are separated by ";" +# and will be added to the classpath during job execution. +# NOTE: The paths must specify a protocol (e.g. file://) and users should ensure that the URLs are accessible on both the client and the cluster. table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar") ``` -## Python 依赖管理 +or in the Python DataStream API as following: + +```python +# Use the add_jars() to add local jars and the jars will be uploaded to the cluster. +# NOTE: Only local file URLs (start with "file://") are supported. +stream_execution_environment.add_jars("file:///my/jar/path/connector1.jar", "file:///my/jar/path/connector2.jar") + +# Use the add_classpaths() to add the dependent jars URLs into the classpath. +# The URLs will also be added to the classpath of both the client and the cluster. +# NOTE: The paths must specify a protocol (e.g. file://) and users should ensure that the +# URLs are accessible on both the client and the cluster. +stream_execution_environment.add_classpaths("file:///my/jar/path/connector1.jar", "file:///my/jar/path/connector2.jar") +``` + +or through the [command line arguments]({{< ref "docs/deployment/cli" >}}#submitting-pyflink-jobs) `--jarfile` when submitting the job. -如果程序中应用到了 Python 第三方依赖,用户可以使用以下 Table API 配置依赖信息,或在提交作业时直接通过命令行参数配置。 +Note It only supports to specify one jar file with the command +line argument `--jarfile` and so you need to build a fat jar if there are multiple jar files. -#### add_python_file(file_path) -添加 Python 文件依赖,可以是 Python文件、Python 包或本地文件目录。他们最终会被添加到 Python Worker 的 PYTHONPATH 中,从而让 Python 函数能够正确访问读取。 +## Python Dependencies + +### Python libraries + +You may want to use third-part Python libraries in Python user-defined functions. +There are multiple ways to specify the Python libraries. + +You could specify them inside the code using Python Table API as following: ```python table_env.add_python_file(file_path) ``` -#### set_python_requirements(requirements_file_path, requirements_cache_dir=None) - -配置一个 requirements.txt 文件用于指定 Python 第三方依赖,这些依赖会被安装到一个临时目录并添加到 Python Worker 的 PYTHONPATH 中。对于在集群中无法访问的外部依赖,用户可以通过 "requirements_cached_dir" 参数指定一个包含这些依赖安装包的目录,这个目录文件会被上传到集群并实现离线安装。 +or using Python DataStream API as following: ```python -# commands executed in shell +stream_execution_environment.add_python_file(file_path) +``` + +You could also specify the Python libraries using configuration +[`python.files`]({{< ref "docs/dev/python/python_config" >}}#python-files) +or via [command line arguments]({{< ref "docs/deployment/cli" >}}#submitting-pyflink-jobs) `-pyfs` or `--pyFiles` +when submitting the job. + +Note The Python libraries could be local files or +local directories. They will be added to the PYTHONPATH of the Python UDF worker. + +### requirements.txt + +It also allows to specify a `requirements.txt` file which defines the third-party Python dependencies. +These Python dependencies will be installed into the working directory and added to the PYTHONPATH of +the Python UDF worker. + +You could prepare the `requirements.txt` manually as following: + +```shell echo numpy==1.16.5 > requirements.txt -pip download -d cached_dir -r requirements.txt --no-binary :all: +echo pandas==1.0.0 > requirements.txt +``` -# python code -table_env.set_python_requirements("/path/to/requirements.txt", "cached_dir") +or using `pip freeze` which lists all the packages installed in the current Python environment: + +```shell +pip freeze > requirements.txt ``` -请确保这些依赖安装包和集群运行环境所使用的 Python 版本相匹配。此外,这些依赖将通过 Pip 安装, 请确保 Pip 的版本(version >= 7.1.0) 和 Setuptools 的版本(version >= 37.0.0)符合要求。 +The content of the requirements.txt file may look like the following: -#### add_python_archive(archive_path, target_dir=None) +```shell +numpy==1.16.5 +pandas==1.0.0 +``` -添加 Python 归档文件依赖。归档文件内的文件将会被提取到 Python Worker 的工作目录下。如果指定了 "target_dir" 参数,归档文件则会被提取到指定名字的目录文件中,否则文件被提取到和归档文件名相同的目录中。 +You could manually edit it by removing unnecessary entries or adding extra entries, etc. + +The `requirements.txt` file could then be specified inside the code using Python Table API as following: ```python -# command executed in shell -# assert the relative path of python interpreter is py_env/bin/python -zip -r py_env.zip py_env +# requirements_cache_dir is optional +table_env.set_python_requirements( + requirements_file_path="/path/to/requirements.txt", + requirements_cache_dir="cached_dir") +``` -# python code -table_env.add_python_archive("/path/to/py_env.zip") -# or +or using Python DataStream API as following: + +```python +# requirements_cache_dir is optional +stream_execution_environment.set_python_requirements( + requirements_file_path="/path/to/requirements.txt", + requirements_cache_dir="cached_dir") +``` + +Note For the dependencies which could not be accessed in +the cluster, a directory which contains the installation packages of these dependencies could be +specified using the parameter `requirements_cached_dir`. It will be uploaded to the cluster to +support offline installation. You could prepare the `requirements_cache_dir` as following: + +```shell +pip download -d cached_dir -r requirements.txt --no-binary :all: +``` + +Note Please make sure that the prepared packages match +the platform of the cluster, and the Python version used. + +You could also specify the `requirements.txt` file using configuration +[`python.requirements`]({{< ref "docs/dev/python/python_config" >}}#python-requirements) +or via [command line arguments]({{< ref "docs/deployment/cli" >}}#submitting-pyflink-jobs) +`-pyreq` or `--pyRequirements` when submitting the job. + +Note It will install the packages specified in the +`requirements.txt` file using pip, so please make sure that pip (version >= 7.1.0) +and setuptools (version >= 37.0.0) are available. + +### Archives + +You may also want to specify archive files. The archive files could be used to specify custom +Python virtual environments, data files, etc. + +You could specify the archive files inside the code using Python Table API as following: + +```python +table_env.add_python_archive(archive_path="/path/to/archive_file", target_dir=None) +``` + +or using Python DataStream API as following: + +```python +stream_execution_environment.add_python_archive(archive_path="/path/to/archive_file", target_dir=None) +``` + +Note The parameter `target_dir` is optional. If specified, +the archive file will be extracted to a directory with the specified name of `target_dir` during execution. +Otherwise, the archive file will be extracted to a directory with the same name as the archive file. + +Suppose you have specified the archive file as following: + +```python table_env.add_python_archive("/path/to/py_env.zip", "myenv") +``` + +Then, you could access the content of the archive file in Python user-defined functions as following: -# the files contained in the archive file can be accessed in UDF +```python def my_udf(): with open("myenv/py_env/data/data.txt") as f: ... ``` -请确保上传的 Python 环境和集群运行环境匹配。目前只支持上传 zip 格式的文件,如 zip, jar, whl, egg等等。 +If you have not specified the parameter `target_dir`: -#### set_python_executable(python_exec) +```python +table_env.add_python_archive("/path/to/py_env.zip") +``` -配置用于执行 Python Worker 的 Python 解释器路径,如 "/usr/local/bin/python3"。 +You could then access the content of the archive file in Python user-defined functions as following: ```python -table_env.add_python_archive("/path/to/py_env.zip") -table_env.get_config().set_python_executable("py_env.zip/py_env/bin/python") +def my_udf(): + with open("py_env.zip/py_env/data/data.txt") as f: + ... ``` -如果 Python 解释器的路径指向上传的 Python 归档文件,那么通过 set_python_executable 设置的 Python 解释器的路径必须是相对路径。 +Note The archive file will be extracted to the working +directory of Python UDF worker and so you could access the files inside the archive file using +relative path. + +You could also specify the archive files using configuration +[`python.archives`]({{< ref "docs/dev/python/python_config" >}}#python-archives) +or via [command line arguments]({{< ref "docs/deployment/cli" >}}#submitting-pyflink-jobs) +`-pyarch` or `--pyArchives` when submitting the job. + +Note If the archive file contains a Python virtual environment, +please make sure that the Python virtual environment matches the platform that the cluster is running on. + +Note Currently, only zip-format is supported, i.e. zip, jar, whl, egg, etc. + +### Python interpreter + +It supports to specify the path of the Python interpreter to execute Python worker. + +You could specify the Python interpreter inside the code using Python Table API as following: + +```python +table_env.set_python_executable("/path/to/python") +``` + +or using Python DataStream API as following: + +```python +stream_execution_environment.set_python_executable("/path/to/python") +``` + +It also supports to use the Python interpreter inside an archive file. + +```python +# Python Table API +table_env.add_python_archive("/path/to/py_env.zip", "venv") +table_env.set_python_executable("venv/py_env/bin/python") + +# Python DataStream API +stream_execution_environment.add_python_archive("/path/to/py_env.zip", "venv") +stream_execution_environment.set_python_executable("venv/py_env/bin/python") +``` + +You could also specify the Python interpreter using configuration +[`python.executable`]({{< ref "docs/dev/python/python_config" >}}#python-executable) +or via [command line arguments]({{< ref "docs/deployment/cli" >}}#submitting-pyflink-jobs) +`-pyexec` or `--pyExecutable` when submitting the job. + +Note If the path of the Python interpreter refers to the +Python archive file, relative path should be used instead of absolute path. + +### Python interpreter of client + +Python is needed at the client side to parse the Python user-defined functions during +compiling the job. + +You could specify the custom Python interpreter used at the client side by activating +it in the current session. + +```shell +source my_env/bin/activate +``` -请确保配置的 Python 环境和集群运行环境匹配。 +or specify it using configuration +[`python.client.executable`]({{< ref "docs/dev/python/python_config" >}}#python-client-executable) +or environment variable [PYFLINK_CLIENT_EXECUTABLE]({{< ref "docs/dev/python/environment_variables" >}}) -## Java/Scala程序中的Python依赖管理 +## How to specify Python Dependencies in Java/Scala Program -It also supports to use Python UDFs in the Java Table API programs or pure SQL programs. The following example shows how to use the Python UDFs in a Java Table API program: +It also supports to use Python user-defined functions in the Java Table API programs or pure SQL programs. +The following code shows a simple example on how to use the Python user-defined functions in a +Java Table API program: ```java import org.apache.flink.configuration.CoreOptions; @@ -124,8 +306,10 @@ tEnv.createTemporaryView("source", tEnv.fromValues(1L, 2L, 3L).as("a")); tEnv.executeSql("select add_one(a) as a from source").collect(); ``` -You can refer to the SQL statement about [CREATE FUNCTION]({{< ref "docs/dev/table/sql/create" >}}#create-function) for more details -on how to create Python user-defined functions using SQL statements. +You can refer to the SQL statement about [CREATE FUNCTION]({{< ref "docs/dev/table/sql/create" >}}#create-function) +for more details on how to create Python user-defined functions using SQL statements. -The Python dependencies could be specified via the Python [config options]({{< ref "docs/dev/python/python_config" >}}#python-options), -such as **python.archives**, **python.files**, **python.requirements**, **python.client.executable**, **python.executable**. etc or through [command line arguments]({{< ref "docs/deployment/cli" >}}#usage) when submitting the job. +The Python dependencies could then be specified via the Python [config options]({{< ref "docs/dev/python/python_config" >}}#python-options), +such as **python.archives**, **python.files**, **python.requirements**, **python.client.executable**, +**python.executable**. etc or through [command line arguments]({{< ref "docs/deployment/cli" >}}#usage) +when submitting the job. diff --git a/docs/content/docs/dev/python/dependency_management.md b/docs/content/docs/dev/python/dependency_management.md index fc96b9cad7e3c..6a2f21affccdc 100644 --- a/docs/content/docs/dev/python/dependency_management.md +++ b/docs/content/docs/dev/python/dependency_management.md @@ -27,13 +27,15 @@ under the License. # Dependency Management -Users may need to access third-party Python libraries in Python user-defined functions. +There are requirements to use dependencies inside the Python API programs. For example, users +may need to use third-party Python libraries in Python user-defined functions. In addition, in scenarios such as machine learning prediction, users may want to load a machine -learning model inside the Python user-defined functions. When the PyFlink job is executed -locally, users could install the third-party Python libraries in the local Python environment, -download the machine learning model to local, etc. However, this approach doesn't work well when -users want to submit the PyFlink jobs to remote clusters. In the following sections, -we will introduce the options provided in PyFlink for these requirements. +learning model inside the Python user-defined functions. + +When the PyFlink job is executed locally, users could install the third-party Python libraries into +the local Python environment, download the machine learning model to local, etc. +However, this approach doesn't work well when users want to submit the PyFlink jobs to remote clusters. +In the following sections, we will introduce the options provided in PyFlink for these requirements. ## JAR Dependencies @@ -73,10 +75,6 @@ line argument `--jarfile` and so you need to build a fat jar if there are multip ## Python Dependencies -If third-party Python dependencies are used, you can specify the dependencies with the following -Python Table APIs or through [command line arguments]({{< ref "docs/deployment/cli" >}}#submitting-pyflink-jobs) -directly when submitting the job. - ### Python libraries You may want to use third-part Python libraries in Python user-defined functions. @@ -99,13 +97,13 @@ You could also specify the Python libraries using configuration or via [command line arguments]({{< ref "docs/deployment/cli" >}}#submitting-pyflink-jobs) `-pyfs` or `--pyFiles` when submitting the job. -Note The Python libraries dependencies could be files or +Note The Python libraries could be local files or local directories. They will be added to the PYTHONPATH of the Python UDF worker. ### requirements.txt It also allows to specify a `requirements.txt` file which defines the third-party Python dependencies. -These Python dependencies will be installed to the working directory and added to the PYTHONPATH of +These Python dependencies will be installed into the working directory and added to the PYTHONPATH of the Python UDF worker. You could prepare the `requirements.txt` manually as following: @@ -121,16 +119,16 @@ or using `pip freeze` which lists all the packages installed in the current Pyth pip freeze > requirements.txt ``` -The requirements.txt may look like the following: +The content of the requirements.txt file may look like the following: ```shell numpy==1.16.5 pandas==1.0.0 ``` -You could also manually edit it by removing unnecessary items or adding extra items, etc. +You could manually edit it by removing unnecessary entries or adding extra entries, etc. -You could then specify the `requirements.txt` file inside the code using Python Table API as following: +The `requirements.txt` file could then be specified inside the code using Python Table API as following: ```python # requirements_cache_dir is optional @@ -151,7 +149,7 @@ stream_execution_environment.set_python_requirements( Note For the dependencies which could not be accessed in the cluster, a directory which contains the installation packages of these dependencies could be specified using the parameter `requirements_cached_dir`. It will be uploaded to the cluster to -support offline installation. You could prepare the `cached_dir` as following: +support offline installation. You could prepare the `requirements_cache_dir` as following: ```shell pip download -d cached_dir -r requirements.txt --no-binary :all: @@ -187,8 +185,8 @@ stream_execution_environment.add_python_archive(archive_path="/path/to/archive_f ``` Note The parameter `target_dir` is optional. If specified, -the archive file will be extracted to a directory named `target_dir` during execution. Otherwise, -the archive file will be extracted to a directory with the same name as the archive file. +the archive file will be extracted to a directory with the specified name of `target_dir` during execution. +Otherwise, the archive file will be extracted to a directory with the same name as the archive file. Suppose you have specified the archive file as following: @@ -219,7 +217,7 @@ def my_udf(): ``` Note The archive file will be extracted to the working -directory of python UDF worker and so you could access the files inside the archive file using +directory of Python UDF worker and so you could access the files inside the archive file using relative path. You could also specify the archive files using configuration @@ -266,14 +264,15 @@ or via [command line arguments]({{< ref "docs/deployment/cli" >}}#submitting-pyf `-pyexec` or `--pyExecutable` when submitting the job. Note If the path of the Python interpreter refers to the -uploaded Python archive file, it should be a relative path instead of absolute path. +Python archive file, relative path should be used instead of absolute path. ### Python interpreter of client Python is needed at the client side to parse the Python user-defined functions during compiling the job. -You could specify the custom Python environment used at the client side by activating it in the current session. +You could specify the custom Python interpreter used at the client side by activating +it in the current session. ```shell source my_env/bin/activate From d44f98b523a36e71a1f265768f4ea55a3aeb28ff Mon Sep 17 00:00:00 2001 From: Dian Fu Date: Fri, 30 Apr 2021 18:53:46 +0800 Subject: [PATCH 3/3] minor --- docs/content.zh/docs/dev/python/dependency_management.md | 4 ++-- docs/content/docs/dev/python/dependency_management.md | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/content.zh/docs/dev/python/dependency_management.md b/docs/content.zh/docs/dev/python/dependency_management.md index e8f8ba75a1e96..c5b897e12b34e 100644 --- a/docs/content.zh/docs/dev/python/dependency_management.md +++ b/docs/content.zh/docs/dev/python/dependency_management.md @@ -237,7 +237,7 @@ It supports to specify the path of the Python interpreter to execute Python work You could specify the Python interpreter inside the code using Python Table API as following: ```python -table_env.set_python_executable("/path/to/python") +table_env.get_config().set_python_executable("/path/to/python") ``` or using Python DataStream API as following: @@ -251,7 +251,7 @@ It also supports to use the Python interpreter inside an archive file. ```python # Python Table API table_env.add_python_archive("/path/to/py_env.zip", "venv") -table_env.set_python_executable("venv/py_env/bin/python") +table_env.get_config().set_python_executable("venv/py_env/bin/python") # Python DataStream API stream_execution_environment.add_python_archive("/path/to/py_env.zip", "venv") diff --git a/docs/content/docs/dev/python/dependency_management.md b/docs/content/docs/dev/python/dependency_management.md index 6a2f21affccdc..8e0f1e5e1f23f 100644 --- a/docs/content/docs/dev/python/dependency_management.md +++ b/docs/content/docs/dev/python/dependency_management.md @@ -237,7 +237,7 @@ It supports to specify the path of the Python interpreter to execute Python work You could specify the Python interpreter inside the code using Python Table API as following: ```python -table_env.set_python_executable("/path/to/python") +table_env.get_config().set_python_executable("/path/to/python") ``` or using Python DataStream API as following: @@ -251,7 +251,7 @@ It also supports to use the Python interpreter inside an archive file. ```python # Python Table API table_env.add_python_archive("/path/to/py_env.zip", "venv") -table_env.set_python_executable("venv/py_env/bin/python") +table_env.get_config().set_python_executable("venv/py_env/bin/python") # Python DataStream API stream_execution_environment.add_python_archive("/path/to/py_env.zip", "venv")