Skip to content

Commit a886497

Browse files
committed
[FLINK-20720][docs][python] Add documentation about output types for Python DataStream API
This closes #15733.
1 parent aea79b9 commit a886497

File tree

4 files changed

+197
-47
lines changed

4 files changed

+197
-47
lines changed

docs/content.zh/docs/dev/datastream/operators/overview.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ title: 概览
33
weight: 1
44
type: docs
55
aliases:
6-
- /dev/stream/operators/
6+
- /zh/dev/stream/operators/
77
---
88
<!--
99
Licensed to the Apache Software Foundation (ASF) under one
@@ -182,7 +182,7 @@ keyedStream.reduce { _ + _ }
182182
{{< /tab >}}
183183
{{< tab "Python" >}}
184184
```python
185-
data_stream = env.from_collection(collection=[(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b')], type_info=Types.ROW([Types.INT(), Types.STRING()]))
185+
data_stream = env.from_collection(collection=[(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b')], type_info=Types.TUPLE([Types.INT(), Types.STRING()]))
186186
data_stream.key_by(lambda x: x[1]).reduce(lambda a, b: (a[0] + b[0], b[1]))
187187
```
188188
{{< /tab >}}

docs/content.zh/docs/dev/python/datastream/operators/overview.md

Lines changed: 97 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,28 +26,24 @@ under the License.
2626

2727
# Operators
2828

29-
3029
Operators transform one or more DataStreams into a new DataStream. Programs can combine multiple transformations into
3130
sophisticated dataflow topologies.
3231

33-
34-
35-
# DataStream Transformations
32+
## DataStream Transformations
3633

3734
DataStream programs in Flink are regular programs that implement transformations on data streams (e.g., mapping,
3835
filtering, reducing). Please see [operators]({{< ref "docs/dev/datastream/operators/overview" >}})
39-
for an overview of the available stream transformations in Python DataStream API.
36+
for an overview of the available transformations in Python DataStream API.
4037

41-
# Functions
42-
Most transformations require a user-defined function as input to define the functionality of the transformation. The
43-
following describes different ways of defining user-defined functions.
38+
## Functions
39+
Transformations accept user-defined functions as input to define the functionality of the transformations.
40+
The following section describes different ways of defining Python user-defined functions in Python DataStream API.
4441

45-
## Implementing Function Interfaces
42+
### Implementing Function Interfaces
4643
Different Function interfaces are provided for different transformations in the Python DataStream API. For example,
4744
`MapFunction` is provided for the `map` transformation, `FilterFunction` is provided for the `filter` transformation, etc.
4845
Users can implement the corresponding Function interface according to the type of the transformation. Take MapFunction for
49-
instance:
50-
46+
instance:
5147

5248
```python
5349
# Implementing MapFunction
@@ -60,24 +56,19 @@ data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
6056
mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())
6157
```
6258

63-
64-
<span class="label label-info">Note</span> In Python DataStream API, users can specify the output type information of the transformation explicityly. If not
65-
specified, the output type will be `Types.PICKLED_BYTE_ARRAY` so that data will be in a form of byte array generated by
66-
the pickle seriallizer. For more details about the `Pickle Serialization`, please refer to [DataTypes]({{< ref "docs/dev/python/datastream/data_types" >}}#pickle-serialization).
67-
68-
## Lambda Function
69-
As shown in the following example, all the transformations can also accept a lambda function to define the functionality of the transformation:
59+
### Lambda Function
60+
As shown in the following example, the transformations can also accept a lambda function to define the functionality of the transformation:
7061

7162
```python
7263
data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
7364
mapped_stream = data_stream.map(lambda x: x + 1, output_type=Types.INT())
7465
```
7566

76-
<span class="label label-info">Note</span> Operations ConnectedStream.map() and ConnectedStream.flat_map() do not support
77-
lambda function and must accept `CoMapFunction` and `CoFlatMapFunction` seperately.
67+
<span class="label label-info">Note</span> `ConnectedStream.map()` and `ConnectedStream.flat_map()` do not support
68+
lambda function and must accept `CoMapFunction` and `CoFlatMapFunction` separately.
7869

79-
## Python Function
80-
Users can also use Python function:
70+
### Python Function
71+
Users could also use Python function to define the functionality of the transformation:
8172

8273
```python
8374
def my_map_func(value):
@@ -87,3 +78,87 @@ data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
8778
mapped_stream = data_stream.map(my_map_func, output_type=Types.INT())
8879
```
8980

81+
## Output Type
82+
83+
Users could specify the output type information of the transformation explicitly in Python DataStream API. If not
84+
specified, the output type will be `Types.PICKLED_BYTE_ARRAY` by default, and the result data will be serialized using pickle serializer.
85+
For more details about the pickle serializer, please refer to [Pickle Serialization]({{< ref "docs/dev/python/datastream/data_types" >}}#pickle-serialization).
86+
87+
Generally, the output type needs to be specified in the following scenarios.
88+
89+
### Convert DataStream into Table
90+
91+
```python
92+
from pyflink.common.typeinfo import Types
93+
from pyflink.datastream import StreamExecutionEnvironment
94+
from pyflink.table import StreamTableEnvironment
95+
96+
97+
def data_stream_api_demo():
98+
env = StreamExecutionEnvironment.get_execution_environment()
99+
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
100+
101+
t_env.execute_sql("""
102+
CREATE TABLE my_source (
103+
a INT,
104+
b VARCHAR
105+
) WITH (
106+
'connector' = 'datagen',
107+
'number-of-rows' = '10'
108+
)
109+
""")
110+
111+
ds = t_env.to_append_stream(
112+
t_env.from_path('my_source'),
113+
Types.ROW([Types.INT(), Types.STRING()]))
114+
115+
def split(s):
116+
splits = s[1].split("|")
117+
for sp in splits:
118+
yield s[0], sp
119+
120+
ds = ds.map(lambda i: (i[0] + 1, i[1])) \
121+
.flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
122+
.key_by(lambda i: i[1]) \
123+
.reduce(lambda i, j: (i[0] + j[0], i[1]))
124+
125+
t_env.execute_sql("""
126+
CREATE TABLE my_sink (
127+
a INT,
128+
b VARCHAR
129+
) WITH (
130+
'connector' = 'print'
131+
)
132+
""")
133+
134+
table = t_env.from_data_stream(ds)
135+
table_result = table.execute_insert("my_sink")
136+
137+
# 1)等待作业执行结束,用于local执行,否则可能作业尚未执行结束,该脚本已退出,会导致minicluster过早退出
138+
# 2)当作业通过detach模式往remote集群提交时,比如YARN/Standalone/K8s等,需要移除该方法
139+
table_result.wait()
140+
141+
142+
if __name__ == '__main__':
143+
data_stream_api_demo()
144+
```
145+
146+
The output type must be specified for the flat_map operation in the above example which will be used as
147+
the output type of the reduce operation implicitly. The reason is that
148+
`t_env.from_data_stream(ds)` requires the output type of `ds` must be a composite type.
149+
150+
### Write DataStream to Sink
151+
152+
```python
153+
from pyflink.common.typeinfo import Types
154+
155+
def split(s):
156+
splits = s[1].split("|")
157+
for sp in splits:
158+
yield s[0], sp
159+
160+
ds.map(lambda i: (i[0] + 1, i[1]), Types.TUPLE([Types.INT(), Types.STRING()])) \
161+
.sink_to(...)
162+
```
163+
164+
Generally, the output type needs to be specified for the map operation in the above example if the sink only accepts special kinds of data, e.g. Row, etc.

docs/content/docs/dev/datastream/operators/overview.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ keyedStream.reduce { _ + _ }
184184
{{< /tab >}}
185185
{{< tab "Python" >}}
186186
```python
187-
data_stream = env.from_collection(collection=[(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b')], type_info=Types.ROW([Types.INT(), Types.STRING()]))
187+
data_stream = env.from_collection(collection=[(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b')], type_info=Types.TUPLE([Types.INT(), Types.STRING()]))
188188
data_stream.key_by(lambda x: x[1]).reduce(lambda a, b: (a[0] + b[0], b[1]))
189189
```
190190
{{< /tab >}}

docs/content/docs/dev/python/datastream/operators/overview.md

Lines changed: 97 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,28 +26,24 @@ under the License.
2626

2727
# Operators
2828

29-
3029
Operators transform one or more DataStreams into a new DataStream. Programs can combine multiple transformations into
3130
sophisticated dataflow topologies.
3231

33-
34-
35-
# DataStream Transformations
32+
## DataStream Transformations
3633

3734
DataStream programs in Flink are regular programs that implement transformations on data streams (e.g., mapping,
3835
filtering, reducing). Please see [operators]({{< ref "docs/dev/datastream/operators/overview" >}})
39-
for an overview of the available stream transformations in Python DataStream API.
36+
for an overview of the available transformations in Python DataStream API.
4037

41-
# Functions
42-
Most transformations require a user-defined function as input to define the functionality of the transformation. The
43-
following describes different ways of defining user-defined functions.
38+
## Functions
39+
Transformations accept user-defined functions as input to define the functionality of the transformations.
40+
The following section describes different ways of defining Python user-defined functions in Python DataStream API.
4441

45-
## Implementing Function Interfaces
42+
### Implementing Function Interfaces
4643
Different Function interfaces are provided for different transformations in the Python DataStream API. For example,
4744
`MapFunction` is provided for the `map` transformation, `FilterFunction` is provided for the `filter` transformation, etc.
4845
Users can implement the corresponding Function interface according to the type of the transformation. Take MapFunction for
49-
instance:
50-
46+
instance:
5147

5248
```python
5349
# Implementing MapFunction
@@ -60,24 +56,19 @@ data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
6056
mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())
6157
```
6258

63-
64-
<span class="label label-info">Note</span> In Python DataStream API, users can specify the output type information of the transformation explicityly. If not
65-
specified, the output type will be `Types.PICKLED_BYTE_ARRAY` so that data will be in a form of byte array generated by
66-
the pickle seriallizer. For more details about the `Pickle Serialization`, please refer to [DataTypes]({{< ref "docs/dev/python/datastream/data_types" >}}#pickle-serialization).
67-
68-
## Lambda Function
69-
As shown in the following example, all the transformations can also accept a lambda function to define the functionality of the transformation:
59+
### Lambda Function
60+
As shown in the following example, the transformations can also accept a lambda function to define the functionality of the transformation:
7061

7162
```python
7263
data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
7364
mapped_stream = data_stream.map(lambda x: x + 1, output_type=Types.INT())
7465
```
7566

76-
<span class="label label-info">Note</span> Operations ConnectedStream.map() and ConnectedStream.flat_map() do not support
77-
lambda function and must accept `CoMapFunction` and `CoFlatMapFunction` seperately.
67+
<span class="label label-info">Note</span> `ConnectedStream.map()` and `ConnectedStream.flat_map()` do not support
68+
lambda function and must accept `CoMapFunction` and `CoFlatMapFunction` separately.
7869

79-
## Python Function
80-
Users can also use Python function:
70+
### Python Function
71+
Users could also use Python function to define the functionality of the transformation:
8172

8273
```python
8374
def my_map_func(value):
@@ -87,3 +78,87 @@ data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
8778
mapped_stream = data_stream.map(my_map_func, output_type=Types.INT())
8879
```
8980

81+
## Output Type
82+
83+
Users could specify the output type information of the transformation explicitly in Python DataStream API. If not
84+
specified, the output type will be `Types.PICKLED_BYTE_ARRAY` by default, and the result data will be serialized using pickle serializer.
85+
For more details about the pickle serializer, please refer to [Pickle Serialization]({{< ref "docs/dev/python/datastream/data_types" >}}#pickle-serialization).
86+
87+
Generally, the output type needs to be specified in the following scenarios.
88+
89+
### Convert DataStream into Table
90+
91+
```python
92+
from pyflink.common.typeinfo import Types
93+
from pyflink.datastream import StreamExecutionEnvironment
94+
from pyflink.table import StreamTableEnvironment
95+
96+
97+
def data_stream_api_demo():
98+
env = StreamExecutionEnvironment.get_execution_environment()
99+
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
100+
101+
t_env.execute_sql("""
102+
CREATE TABLE my_source (
103+
a INT,
104+
b VARCHAR
105+
) WITH (
106+
'connector' = 'datagen',
107+
'number-of-rows' = '10'
108+
)
109+
""")
110+
111+
ds = t_env.to_append_stream(
112+
t_env.from_path('my_source'),
113+
Types.ROW([Types.INT(), Types.STRING()]))
114+
115+
def split(s):
116+
splits = s[1].split("|")
117+
for sp in splits:
118+
yield s[0], sp
119+
120+
ds = ds.map(lambda i: (i[0] + 1, i[1])) \
121+
.flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
122+
.key_by(lambda i: i[1]) \
123+
.reduce(lambda i, j: (i[0] + j[0], i[1]))
124+
125+
t_env.execute_sql("""
126+
CREATE TABLE my_sink (
127+
a INT,
128+
b VARCHAR
129+
) WITH (
130+
'connector' = 'print'
131+
)
132+
""")
133+
134+
table = t_env.from_data_stream(ds)
135+
table_result = table.execute_insert("my_sink")
136+
137+
# 1)wait for job finishes and only used in local execution, otherwise, it may happen that the script exits with the job is still running
138+
# 2)should be removed when submitting the job to a remote cluster such as YARN, standalone, K8s etc in detach mode
139+
table_result.wait()
140+
141+
142+
if __name__ == '__main__':
143+
data_stream_api_demo()
144+
```
145+
146+
The output type must be specified for the flat_map operation in the above example which will be used as
147+
the output type of the reduce operation implicitly. The reason is that
148+
`t_env.from_data_stream(ds)` requires the output type of `ds` must be a composite type.
149+
150+
### Write DataStream to Sink
151+
152+
```python
153+
from pyflink.common.typeinfo import Types
154+
155+
def split(s):
156+
splits = s[1].split("|")
157+
for sp in splits:
158+
yield s[0], sp
159+
160+
ds.map(lambda i: (i[0] + 1, i[1]), Types.TUPLE([Types.INT(), Types.STRING()])) \
161+
.sink_to(...)
162+
```
163+
164+
Generally, the output type needs to be specified for the map operation in the above example if the sink only accepts special kinds of data, e.g. Row, etc.

0 commit comments

Comments
 (0)