Skip to content

Commit 3e0d896

Browse files
committed
[FLINK-22445][python][docs] Add more examples of row-based operations in PyFlink doc
1 parent 0f8adc0 commit 3e0d896

File tree

6 files changed

+559
-15
lines changed

6 files changed

+559
-15
lines changed

docs/content.zh/docs/dev/python/table/python_types.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
---
22
title: "数据类型"
3-
weight: 31
3+
weight: 32
44
type: docs
55
aliases:
66
- /zh/dev/python/table-api-users-guide/python_types.html
@@ -69,4 +69,4 @@ Python Table API的用户可以在Python Table API中,或者定义Python用户
6969
| `ARRAY` | `list` | `numpy.ndarray` |
7070
| `MULTISET` | `list` | `Not Supported Yet` |
7171
| `MAP` | `dict` | `Not Supported Yet` |
72-
| `ROW` | `Row` | `dict` |
72+
| `ROW` | `Row` | `dict` |
Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
---
2+
title: "Row-based Operations"
3+
weight: 31
4+
type: docs
5+
---
6+
<!--
7+
Licensed to the Apache Software Foundation (ASF) under one
8+
or more contributor license agreements. See the NOTICE file
9+
distributed with this work for additional information
10+
regarding copyright ownership. The ASF licenses this file
11+
to you under the Apache License, Version 2.0 (the
12+
"License"); you may not use this file except in compliance
13+
with the License. You may obtain a copy of the License at
14+
15+
http://www.apache.org/licenses/LICENSE-2.0
16+
17+
Unless required by applicable law or agreed to in writing,
18+
software distributed under the License is distributed on an
19+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
20+
KIND, either express or implied. See the License for the
21+
specific language governing permissions and limitations
22+
under the License.
23+
-->
24+
25+
# Row-based Operations
26+
27+
This page describes how to use Row-based Operations in PyFlink Table API.
28+
29+
## Map
30+
31+
Performs a map operation with a python [general scalar function]({{< ref "docs/dev/python/table/udfs/python_udfs" >}}#scalar-functions) or [vectorized scalar function]({{< ref "docs/dev/python/table/udfs/vectorized_python_udfs" >}}#vectorized-scalar-functions).
32+
The output will be flattened if the output type is a composite type.
33+
34+
<span class="label label-info">Note</span> If you do not specify input args of your scalar function, all input args will be merged as a Row or Pandas.DataFrame.
35+
```python
36+
from pyflink.common import Row
37+
from pyflink.table import EnvironmentSettings, TableEnvironment
38+
from pyflink.table.expressions import col
39+
from pyflink.table.types import DataTypes
40+
41+
env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
42+
table_env = TableEnvironment.create(env_settings)
43+
44+
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
45+
46+
# 1. Specify columns
47+
@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
48+
DataTypes.FIELD("data", DataTypes.STRING())]))
49+
def func1(id: int, data: str) -> Row:
50+
return Row(id, data * 2)
51+
52+
table.map(func1(col('id'), col('data'))).to_pandas()
53+
# result is
54+
# _c0 _c1
55+
# 0 1 HiHi
56+
# 1 2 HelloHello
57+
58+
59+
# 2. Don't specify columns in general scalar function
60+
@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
61+
DataTypes.FIELD("data", DataTypes.STRING())]))
62+
def func2(data: Row) -> Row:
63+
return Row(data[0], data[1] * 2)
64+
65+
table.map(func2).alias('id', 'data').to_pandas()
66+
# result is
67+
# id data
68+
# 0 1 HiHi
69+
# 1 2 HelloHello
70+
71+
# 3. Don't specify columns in pandas scalar function
72+
import pandas as pd
73+
@udf(result_type=DataTypes.ROW([DataTypes.FIELD("id", DataTypes.BIGINT()),
74+
DataTypes.FIELD("data", DataTypes.STRING())]),
75+
func_type='pandas')
76+
def func3(data: pd.DataFrame) -> pd.DataFrame:
77+
res = pd.concat([data.id, data.data * 2], axis=1)
78+
return res
79+
80+
table.map(func3).alias('id', 'data').to_pandas()
81+
# result is
82+
# id data
83+
# 0 1 HiHi
84+
# 1 2 HelloHello
85+
```
86+
87+
## FlatMap
88+
89+
Performs a `flat_map` operation with a python [table function]({{< ref "docs/dev/python/table/udfs/python_udfs" >}}#table-functions).
90+
91+
```python
92+
from pyflink.common import Row
93+
from pyflink.table.udf import udtf
94+
from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment
95+
96+
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
97+
table_env = TableEnvironment.create(env_settings)
98+
99+
table = table_env.from_elements([(1, 'Hi,Flink'), (2, 'Hello')], ['id', 'data'])
100+
101+
@udtf(result_types=[DataTypes.INT(), DataTypes.STRING()])
102+
def split(x: Row) -> Row:
103+
for s in x[1].split(","):
104+
yield x[0], s
105+
106+
# use table function split in `flat_map`
107+
table.flat_map(split).to_pandas()
108+
# result is
109+
# f0 f1
110+
# 0 1 Hi
111+
# 1 1 Flink
112+
# 2 2 Hello
113+
114+
# use table function in `join_lateral` or `left_outer_join_lateral`
115+
table.join_lateral(split.alias('a', 'b')).to_pandas()
116+
# result is
117+
# id data a b
118+
# 0 1 Hi,Flink 1 Hi
119+
# 1 1 Hi,Flink 1 Flink
120+
# 2 2 Hello 2 Hello
121+
```
122+
123+
## Aggregate
124+
125+
Performs an aggregate operation with a python general aggregate function or vectorized aggregate function.
126+
You have to close the "aggregate" with a select statement and the select statement does not support aggregate functions.
127+
The output of aggregate will be flattened if the output type is a composite type.
128+
129+
<span class="label label-info">Note</span> If you do not specify input args of your aggregate function, all input args including group key will be merged as a Row or Pandas.DataFrame.
130+
131+
```python
132+
from pyflink.common import Row
133+
from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment
134+
from pyflink.table.expressions import col
135+
from pyflink.table.udf import AggregateFunction, udaf
136+
137+
class CountAndSumAggregateFunction(AggregateFunction):
138+
139+
def get_value(self, accumulator):
140+
return Row(accumulator[0], accumulator[1])
141+
142+
def create_accumulator(self):
143+
return Row(0, 0)
144+
145+
def accumulate(self, accumulator, *args):
146+
accumulator[0] += 1
147+
accumulator[1] += args[0][1]
148+
149+
def retract(self, accumulator, *args):
150+
accumulator[0] -= 1
151+
accumulator[1] -= args[0][1]
152+
153+
def merge(self, accumulator, accumulators):
154+
for other_acc in accumulators:
155+
accumulator[0] += other_acc[0]
156+
accumulator[1] += other_acc[1]
157+
158+
def get_accumulator_type(self):
159+
return DataTypes.ROW(
160+
[DataTypes.FIELD("a", DataTypes.BIGINT()),
161+
DataTypes.FIELD("b", DataTypes.BIGINT())])
162+
163+
def get_result_type(self):
164+
return DataTypes.ROW(
165+
[DataTypes.FIELD("a", DataTypes.BIGINT()),
166+
DataTypes.FIELD("b", DataTypes.BIGINT())])
167+
168+
function = CountAndSumAggregateFunction()
169+
agg = udaf(function,
170+
result_type=function.get_result_type(),
171+
accumulator_type=function.get_accumulator_type(),
172+
name=str(function.__class__.__name__))
173+
174+
# aggregate with a python general aggregate function
175+
176+
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
177+
table_env = TableEnvironment.create(env_settings)
178+
t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b'])
179+
180+
result = t.group_by(col('a')) \
181+
.aggregate(agg.alias("c", "d")) \
182+
.select(col('a'), col('c'), col('d'))
183+
result.to_pandas()
184+
185+
# the result is
186+
# a c d
187+
# 0 1 2 5
188+
# 1 2 1 1
189+
190+
191+
# aggregate with a python vectorized aggregate function
192+
env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
193+
table_env = TableEnvironment.create(env_settings)
194+
195+
t = table_env.from_elements([(1, 2), (2, 1), (1, 3)], ['a', 'b'])
196+
197+
pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
198+
result_type=DataTypes.ROW(
199+
[DataTypes.FIELD("a", DataTypes.FLOAT()),
200+
DataTypes.FIELD("b", DataTypes.INT())]),
201+
func_type="pandas")
202+
t.aggregate(pandas_udaf.alias("a", "b")) \
203+
.select(col('a'), col('b')).to_pandas()
204+
205+
# the result is
206+
# a b
207+
# 0 2.0 3
208+
```
209+
210+
## FlatAggregate
211+
212+
Performs a flat_aggregate operation with a python general [Table Aggregate Function]({{< ref "docs/dev/python/table/udfs/python_udfs" >}}#table-aggregate-functions)
213+
214+
Similar to a **GroupBy Aggregation**. Groups the rows on the grouping keys with the following running table aggregation operator to aggregate rows group-wise. The difference from an AggregateFunction is that TableAggregateFunction may return 0 or more records for a group. You have to close the "flat_aggregate" with a select statement. And the select statement does not support aggregate functions.
215+
216+
```python
217+
from pyflink.common import Row
218+
from pyflink.table.expressions import col
219+
from pyflink.table.udf import TableAggregateFunction, udtaf
220+
from pyflink.table import DataTypes, EnvironmentSettings, TableEnvironment
221+
222+
class Top2(TableAggregateFunction):
223+
224+
def emit_value(self, accumulator):
225+
yield Row(accumulator[0])
226+
yield Row(accumulator[1])
227+
228+
def create_accumulator(self):
229+
return [None, None]
230+
231+
def accumulate(self, accumulator, *args):
232+
if args[0][0] is not None:
233+
if accumulator[0] is None or args[0][0] > accumulator[0]:
234+
accumulator[1] = accumulator[0]
235+
accumulator[0] = args[0][0]
236+
elif accumulator[1] is None or args[0][0] > accumulator[1]:
237+
accumulator[1] = args[0][0]
238+
239+
def retract(self, accumulator, *args):
240+
accumulator[0] = accumulator[0] - 1
241+
242+
def merge(self, accumulator, accumulators):
243+
for other_acc in accumulators:
244+
self.accumulate(accumulator, other_acc[0])
245+
self.accumulate(accumulator, other_acc[1])
246+
247+
def get_accumulator_type(self):
248+
return DataTypes.ARRAY(DataTypes.BIGINT())
249+
250+
def get_result_type(self):
251+
return DataTypes.ROW(
252+
[DataTypes.FIELD("a", DataTypes.BIGINT())])
253+
254+
mytop = udtaf(Top2())
255+
256+
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
257+
table_env = TableEnvironment.create(env_settings)
258+
t = table_env.from_elements([(1, 'Hi', 'Hello'),
259+
(3, 'Hi', 'hi'),
260+
(5, 'Hi2', 'hi'),
261+
(7, 'Hi', 'Hello'),
262+
(2, 'Hi', 'Hello')], ['a', 'b', 'c'])
263+
result = t.select(col('a'), col('c')) \
264+
.group_by(col('c')) \
265+
.flat_aggregate(mytop) \
266+
.select(col('b')) \
267+
.flat_aggregate(mytop.alias("b")) \
268+
.select(col('b'))
269+
270+
result.to_pandas()
271+
# the result is
272+
# b
273+
# 0 7
274+
# 1 5
275+
```

docs/content.zh/docs/dev/table/tableApi.md

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2211,8 +2211,8 @@ def map_function(a: Row) -> Row:
22112211
# map operation with a python general scalar function
22122212
func = udf(map_function, result_type=DataTypes.ROW(
22132213
[DataTypes.FIELD("a", DataTypes.BIGINT()),
2214-
DataTypes.FIELD("b", DataTypes.BIGINT()))]))
2215-
table = input.map(map_function).alias('a', 'b')
2214+
DataTypes.FIELD("b", DataTypes.BIGINT())]))
2215+
table = input.map(func).alias('a', 'b')
22162216

22172217
# map operation with a python vectorized scalar function
22182218
pandas_func = udf(lambda x: x * 2, result_type=DataTypes.ROW(
@@ -2414,11 +2414,9 @@ from pyflink.table.udf import AggregateFunction, udaf
24142414
class CountAndSumAggregateFunction(AggregateFunction):
24152415

24162416
def get_value(self, accumulator):
2417-
from pyflink.common import Row
24182417
return Row(accumulator[0], accumulator[1])
24192418

24202419
def create_accumulator(self):
2421-
from pyflink.common import Row
24222420
return Row(0, 0)
24232421

24242422
def accumulate(self, accumulator, *args):
@@ -2461,8 +2459,7 @@ pandas_udaf = udaf(lambda pd: (pd.b.mean(), pd.b.max()),
24612459
[DataTypes.FIELD("a", DataTypes.FLOAT()),
24622460
DataTypes.FIELD("b", DataTypes.INT())]),
24632461
func_type="pandas")
2464-
t.select(t.b) \
2465-
.aggregate(pandas_udaf.alias("a", "b")) \
2462+
t.aggregate(pandas_udaf.alias("a", "b")) \
24662463
.select("a, b")
24672464

24682465
```

docs/content/docs/dev/python/table/python_types.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
---
22
title: "Data Types"
3-
weight: 31
3+
weight: 32
44
type: docs
55
aliases:
66
- /dev/python/table-api-users-guide/python_types.html

0 commit comments

Comments
 (0)