Skip to content
This repository was archived by the owner on Jul 10, 2025. It is now read-only.

Commit 12f4414

Browse files
author
ematejska
authored
Merge pull request #224 from liutongxuan/master
RFC: FuseRecv
2 parents d1c022d + f4b925c commit 12f4414

File tree

5 files changed

+232
-0
lines changed

5 files changed

+232
-0
lines changed

rfcs/20200411-fuse_recv.md

Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
# FuseRecv
2+
3+
| Status | Proposed |
4+
:-------------- |:---------------------------------------------------- |
5+
| **Author(s)** | Tongxuan Liu([email protected]) Peng Tao([email protected]) Langshi Chen ([email protected]) |
6+
| **Reviewers(s)** | Ayush Dubey([email protected]) Jeroen Bédorf([email protected]) Derek Murray([email protected]) Bairen Yi([email protected]) Paul Tucker([email protected]) |
7+
| **Sponsor** | Ayush Dubey([email protected]) |
8+
| **Updated** | 2020-04-11 |
9+
10+
## Objective
11+
This RFC proposes a new FuseRecv Op which would receive multiple tensors with
12+
different types through one Remote Procedure Call (RPC). This feature could
13+
significantly reduce the number of RPC calls in most rank or match models
14+
such as Search, Recommend or Ad systems.
15+
16+
## Motivation
17+
When very many small tensors are being transferred around the same time,
18+
it's more efficient to transfer multiple tensors in a single RPC rather than
19+
using a separate RPC for each of them.
20+
21+
In the case the neural network graph is complicated, each iteration through
22+
the graph may introduce tens or even hundreds of RPC operations between the running
23+
nodes. In general, there are a large number of small tensors, such as multiple
24+
feature columns that gather data from the same Parameter Server. These tensors
25+
have no dependence on each other, and each feature column results in at least
26+
one RPC operation in the forward stage. In CTR (Click Through Rate) models or
27+
models that are mostly sparse (such as Match or Rank models that are widely
28+
used in Recommender and Ad systems), there would be hundreds of feature columns.
29+
In our scenario, each sample includes at least hundreds of features.
30+
One training job normally uses thousands of workers and tens of parameter servers.
31+
One worker generally has to get variables from all the parameter servers, and each
32+
feature column, at least in the forward stage, receives at least one request from
33+
the parameter server. There could be hundreds of RPC operations for these feature columns,
34+
and even more for some of the big feature columns (such as ids). These would be partitioned
35+
into dozens of RPCs per feature column. In summary there would be
36+
at least hundreds of RPC per worker for these feature columns only, and
37+
hundreds of thousands of RPCs per step, for each parameter server in the forward stage.
38+
Most feature columns only gather very small tensors from the parameter
39+
server, usually less than 100KB. Logically these small tensors could be
40+
sent together (e.g. fused). Furthermore, tensors that belong to the same layer can also
41+
be fused before transfer, which would significantly reduce the number of RPC operations.
42+
43+
As we know, each RPC operations introduces some satellite overhead besides the
44+
actual tensor data transfer, which includes:
45+
* Serialization/Deserialization which introduces additional overhead for each RPC operation.
46+
* The execution engine overhead for executing a Recv node operation, and the corresponding thread pool
47+
action required to execute the RPC callback function.
48+
49+
## User Benefit
50+
51+
Performance improvement: From performance benchmarking of the feature during large
52+
(end-user) training jobs (> 400 workers), we normally see that the training speed would
53+
be 1.5-2x timer faster in the parameter-server/worker setup.
54+
55+
## Design Proposal
56+
57+
![Figure 1: Current graph partition strategy](20200411-fuse_recv/current_graph_partition_strategy.png "Current graph partition strategy")
58+
![Figure 2: Graph partition strategy with FuseRecv](20200411-fuse_recv/graph_partition_strategy_with_fuse_recv.png "Graph partition strategy with FuseRecv")
59+
60+
In the original Recv/Send design, each Recv node only receives one tensor
61+
even if there are Recv Ops that output to the same destination Op. Moreover each
62+
Recv node would trigger one RPC operation even if the received tensor is a scalar.
63+
64+
In the proposed design, we traverse (partitioned) graphs according to
65+
its topology and iteratively replace Recv nodes with the new FuseRecv nodes.
66+
Please refer to the details in Section [FuseRecv Optimizer in Grappler](#FuseRecv Optimizer in Grappler)
67+
68+
As shown in Figures 1 and 2, instead of adding a Recv node for each tensor
69+
‘a’ and ‘x’, we use only one FuseRecv node to replace the two Recv nodes which
70+
fetches two tensors together. The FuseRecv node will have two output
71+
‘slots’ (‘ports’): slot 0 feeds input ‘b’ and ‘c’ and slot 1 feeds ‘y’.
72+
Notice that, because the RPC operation is Recv driven, there is no need
73+
to fuse the send node.
74+
75+
A new RPC method ‘FuseRecvTensorAsync’ and its Handler (FuseRecvTensorHandlerRaw)
76+
is added into WorkInterface and WorkerService. FuseRecvTensor follows similar
77+
optimization steps as RecvTensor to avoid copying the response buffer.
78+
79+
### Alternatives Considered
80+
#### Fuse the tensors into a single Send/Recv Solution 1 (Derek Murray)
81+
Pack the N tensors to be sent into a length-N DT_VARIANT vector.
82+
83+
Pros: Reuse currently RPC, avoid potential intricate changes in zero-copy
84+
response buffer code.
85+
86+
Cons: Introduce memcopy overhead.
87+
88+
#### Fuse the tensors into a single Send/Recv Solution 2 (Derek Murray)
89+
Pack the tensor contents into a single flattened buffer. This would be very
90+
similar to the ScopedAllocator optimization that [email protected] and
91+
[email protected] implemented for collectives, and it might be possible
92+
to reuse some of the graph analysis code
93+
94+
Pros: Reuse currently RPC, avoid potential intricate changes in zero-copy
95+
response buffer code.
96+
97+
Cons: The fused tensors could be of different types and dynamic shapes,
98+
which couldn't be handled by this solution.
99+
100+
#### Dynamic Fusion in runtime (Paul Tucker)
101+
Instead of adding a new FuseRecvTensor method to the Worker interface,
102+
we add a slightly different RecvSomeTensors method. The client sends a
103+
list of keys for which it's ready to receive values to the server and the
104+
server streams back one or more when it's ready. It's the responsibility of
105+
the client to retry any key that was not included in the response.
106+
107+
To make this work well there needs to be some dynamic bundling on each side.
108+
For example, on the client side a call to RecvTensor on the local Rendezvous
109+
for a remote value does not necessarily result in an immediate RPC. It might
110+
if the value is expected to be large, but it might also just add the key to
111+
a ready set associated with the remote host. An RPC may not be sent until
112+
the ready set reaches a certain size, or a minimum time has elapsed since the
113+
last RPC against that host was started. When the response is received any
114+
missing keys go back in the ready set.
115+
116+
On the server side there could be some logic to decide for a RecvSomeTensors
117+
method whether to wait for more of the requested values to be ready or just
118+
immediately send what's available now and let the client re-request anything
119+
missing.
120+
121+
Pros: Dynamic fusion in runtime seems get better result, and also brings
122+
ability to control priority of tensors (which Recv is more important).
123+
124+
Cons: Potential bottleneck of the solution is the time window of ready set.
125+
For different models it would be much different, manually setting the value
126+
would be hard. This solution is another good candidate of FuseRecv.
127+
128+
### Performance Implications
129+
With a wide and deep model, the number of RPCs calls per step has been reduced
130+
by 55%, and the overall training throughput has increased by 40%.
131+
![Figure 3: performance_result](20200411-fuse_recv/performance_result.png "Performance result")
132+
133+
### Dependencies
134+
* None
135+
136+
### Engineering Impact
137+
* Engineering impact: Once the feature is (manually) enabled (in ConfigProto.GraphOptions.do_fuse_recv), the test times would be longer because the FuseRecv post-partitioned optimizer would traverse and update the graph.
138+
* Maintenance: Minimal maintenance overhead. The TensorFlow team and contributors will maintain the documentation and keep it up to date. Changes should be reviewed and approved by the TensorFlow team leads.
139+
140+
### Platforms and Environments
141+
* Platforms: The feature is independent of platforms.
142+
* Execution environments (Cloud services, accelerator hardware): The first stage would support CPU & GPU device. We consider supporting
143+
additional devices as much as possible.
144+
145+
### Best Practices
146+
* We strongly suggest to enable FuseRecv in rank or match models such as [W&DL](https://arxiv.org/abs/1606.07792), [Dien](https://arxiv.org/abs/1809.03672).
147+
148+
### Tutorials and Examples
149+
Example of how to enable the FuseRecv feature:
150+
151+
```
152+
>>> tf.config.optimizer.set_experimental_options({"do_fuse_recv": True})
153+
```
154+
155+
### Compatibility
156+
* This feature works with the ParameterServerStrategy.
157+
* This feature considers tensors on difference devices such as CPU, GPU and TPU.
158+
* Independent of SavedModel or checkpoint.
159+
160+
### User Impact
161+
* None
162+
163+
## Detailed Design
164+
165+
### FuseRecv Op
166+
We introduce the _RecvV2 Op and an RPC operation named FuseRecvTensorAsync in
167+
RemoteWorker and WorkerService. The _RecvV2 Op definition is as follows:
168+
169+
```
170+
>>> REGISTER_OP("_RecvV2")
171+
>>> .Output("tensor: tensor_type")
172+
>>> .Attr("tensor_type: list(type)")
173+
>>> .Attr("tensor_name: list(string)")
174+
>>> .Attr("send_device: string")
175+
>>> .Attr("send_device_incarnation: int")
176+
>>> .Attr("recv_device: string")
177+
>>> .Attr("client_terminated: bool = false")
178+
>>> .SetIsStateful()
179+
>>> .SetShapeFn(shape_inference::UnknownShape);
180+
```
181+
182+
FuseRecv requests a list of tensors with different types from remote devices, generally
183+
we only fuse the Recv ops in the same recv device and on the same send device.
184+
185+
### FuseRecv Optimizer in Grappler
186+
During the post partition phase, we add a new pass to the post-partitioning optimizer
187+
called “FuseRecv” to fuse Recv ops together. We traverse partitioned graphs &
188+
the whole graph, replace Recv ops by FuseRecv ops in the partitioned graphs according
189+
to its topology while iteratively searching and fusing potential Recv
190+
operations. See Figure 4 for the formal algorithm definition.
191+
192+
![Figure 4: fuse_recv_procedure](20200411-fuse_recv/fuse_recv_procedure.png "Fuse Recv Procedure")
193+
194+
The procedure RECVFUSE takes two input arguments: 1) the TF computation
195+
graph g, 2) a Partitioned graph. It is worth noting that the iteration of
196+
all nodes shall start from the `root` nodes, which do not have any
197+
source edge (node). The process between line 17 and 37 would be iteratively
198+
executed and output key-value pairs (value: a group of edges could be fused
199+
into one FuseRecv node). Then based on the grouped edges, we find out Recv
200+
nodes in partitioned graph which could be replace by FusedRecv nodes. Besides
201+
RECVFUSE also makes sure that no deadlock exists after the change to the
202+
original graph. Also, the RPC operation of FuseRecvTensor is able to overlap
203+
the computation and communication by using the graph topology.
204+
205+
### FuseRecv RPC Method and Handler
206+
A new RPC method ‘FuseRecvTensorAsync’ is added to the WorkerInterface.
207+
We extend the ‘FuseRecvTensorAsync’ method with the ability to handle
208+
multi rendezvous keys and fetch multi key tensors.
209+
210+
At the server side, we add a ‘FuseRecvTensorHandlerRaw’, which handles
211+
the multi rendezvous key for the ‘local recv’ instantiated by the local
212+
tensor operations. As mentioned before, the sending nodes are not fused
213+
and we therefore must do multiple local recvs corresponding to the
214+
multi send nodes.
215+
216+
Because the ‘FuseRecvTensorAsync’ handler might be executed before
217+
the send operations happen, a call back wrapper is required. We use
218+
a counter, initialized with the fuse count, and each send action triggers
219+
the call back wrapper and performs an atomic decrease of the counter,
220+
when the counter reaches 0, the real callback is executed and the tensors
221+
are sent to the Recv node.
222+
223+
### Dead Tensor Handling
224+
We treat the output of the FuseRecv node as dead if and only if all the
225+
fused tensors are dead.
226+
227+
### FuseRecv Error Handling
228+
The status of the FuseRecv node would be similar as the Recv node, which
229+
include additional information for every Recv tensor.
230+
231+
## Questions and Discussion Topics
232+
344 KB
Loading
165 KB
Loading
345 KB
Loading
80.4 KB
Loading

0 commit comments

Comments
 (0)