diff --git a/rfcs/20200411-fuse_recv.md b/rfcs/20200411-fuse_recv.md new file mode 100644 index 000000000..ec73aefe8 --- /dev/null +++ b/rfcs/20200411-fuse_recv.md @@ -0,0 +1,232 @@ +# FuseRecv + +| Status | Proposed | +:-------------- |:---------------------------------------------------- | +| **Author(s)** | Tongxuan Liu(tongxuan.ltx@alibaba-inc.com) Peng Tao(jiankeng.pt@alibaba-inc.com) Langshi Chen (langshi.cls@alibaba-inc.com) | +| **Reviewers(s)** | Ayush Dubey(ayushd@google.com) Jeroen Bédorf(jeroen@minds.ai) Derek Murray(mrry@google.com) Bairen Yi(yibairen.byron@bytedance.com) Paul Tucker(paul.tucker@gmail.com) | +| **Sponsor** | Ayush Dubey(ayushd@google.com) | +| **Updated** | 2020-04-11 | + +## Objective +This RFC proposes a new FuseRecv Op which would receive multiple tensors with +different types through one Remote Procedure Call (RPC). This feature could +significantly reduce the number of RPC calls in most rank or match models +such as Search, Recommend or Ad systems. + +## Motivation +When very many small tensors are being transferred around the same time, +it's more efficient to transfer multiple tensors in a single RPC rather than +using a separate RPC for each of them. + +In the case the neural network graph is complicated, each iteration through +the graph may introduce tens or even hundreds of RPC operations between the running +nodes. In general, there are a large number of small tensors, such as multiple +feature columns that gather data from the same Parameter Server. These tensors +have no dependence on each other, and each feature column results in at least +one RPC operation in the forward stage. In CTR (Click Through Rate) models or +models that are mostly sparse (such as Match or Rank models that are widely +used in Recommender and Ad systems), there would be hundreds of feature columns. +In our scenario, each sample includes at least hundreds of features. +One training job normally uses thousands of workers and tens of parameter servers. +One worker generally has to get variables from all the parameter servers, and each +feature column, at least in the forward stage, receives at least one request from +the parameter server. There could be hundreds of RPC operations for these feature columns, +and even more for some of the big feature columns (such as ids). These would be partitioned +into dozens of RPCs per feature column. In summary there would be +at least hundreds of RPC per worker for these feature columns only, and +hundreds of thousands of RPCs per step, for each parameter server in the forward stage. +Most feature columns only gather very small tensors from the parameter +server, usually less than 100KB. Logically these small tensors could be +sent together (e.g. fused). Furthermore, tensors that belong to the same layer can also +be fused before transfer, which would significantly reduce the number of RPC operations. + +As we know, each RPC operations introduces some satellite overhead besides the +actual tensor data transfer, which includes: +* Serialization/Deserialization which introduces additional overhead for each RPC operation. +* The execution engine overhead for executing a Recv node operation, and the corresponding thread pool + action required to execute the RPC callback function. + +## User Benefit + +Performance improvement: From performance benchmarking of the feature during large +(end-user) training jobs (> 400 workers), we normally see that the training speed would +be 1.5-2x timer faster in the parameter-server/worker setup. + +## Design Proposal + + + + +In the original Recv/Send design, each Recv node only receives one tensor +even if there are Recv Ops that output to the same destination Op. Moreover each +Recv node would trigger one RPC operation even if the received tensor is a scalar. + +In the proposed design, we traverse (partitioned) graphs according to +its topology and iteratively replace Recv nodes with the new FuseRecv nodes. +Please refer to the details in Section [FuseRecv Optimizer in Grappler](#FuseRecv Optimizer in Grappler) + +As shown in Figures 1 and 2, instead of adding a Recv node for each tensor +‘a’ and ‘x’, we use only one FuseRecv node to replace the two Recv nodes which +fetches two tensors together. The FuseRecv node will have two output +‘slots’ (‘ports’): slot 0 feeds input ‘b’ and ‘c’ and slot 1 feeds ‘y’. +Notice that, because the RPC operation is Recv driven, there is no need +to fuse the send node. + +A new RPC method ‘FuseRecvTensorAsync’ and its Handler (FuseRecvTensorHandlerRaw) +is added into WorkInterface and WorkerService. FuseRecvTensor follows similar +optimization steps as RecvTensor to avoid copying the response buffer. + +### Alternatives Considered +#### Fuse the tensors into a single Send/Recv Solution 1 (Derek Murray) +Pack the N tensors to be sent into a length-N DT_VARIANT vector. + +Pros: Reuse currently RPC, avoid potential intricate changes in zero-copy +response buffer code. + +Cons: Introduce memcopy overhead. + +#### Fuse the tensors into a single Send/Recv Solution 2 (Derek Murray) +Pack the tensor contents into a single flattened buffer. This would be very +similar to the ScopedAllocator optimization that +ayushd@google.com and ++tucker@google.com implemented for collectives, and it might be possible +to reuse some of the graph analysis code + +Pros: Reuse currently RPC, avoid potential intricate changes in zero-copy +response buffer code. + +Cons: The fused tensors could be of different types and dynamic shapes, +which couldn't be handled by this solution. + +#### Dynamic Fusion in runtime (Paul Tucker) +Instead of adding a new FuseRecvTensor method to the Worker interface, +we add a slightly different RecvSomeTensors method. The client sends a +list of keys for which it's ready to receive values to the server and the +server streams back one or more when it's ready. It's the responsibility of +the client to retry any key that was not included in the response. + +To make this work well there needs to be some dynamic bundling on each side. +For example, on the client side a call to RecvTensor on the local Rendezvous +for a remote value does not necessarily result in an immediate RPC. It might +if the value is expected to be large, but it might also just add the key to +a ready set associated with the remote host. An RPC may not be sent until +the ready set reaches a certain size, or a minimum time has elapsed since the +last RPC against that host was started. When the response is received any +missing keys go back in the ready set. + +On the server side there could be some logic to decide for a RecvSomeTensors +method whether to wait for more of the requested values to be ready or just +immediately send what's available now and let the client re-request anything +missing. + +Pros: Dynamic fusion in runtime seems get better result, and also brings +ability to control priority of tensors (which Recv is more important). + +Cons: Potential bottleneck of the solution is the time window of ready set. +For different models it would be much different, manually setting the value +would be hard. This solution is another good candidate of FuseRecv. + +### Performance Implications +With a wide and deep model, the number of RPCs calls per step has been reduced +by 55%, and the overall training throughput has increased by 40%. + + +### Dependencies +* None + +### Engineering Impact +* Engineering impact: Once the feature is (manually) enabled (in ConfigProto.GraphOptions.do_fuse_recv), the test times would be longer because the FuseRecv post-partitioned optimizer would traverse and update the graph. +* Maintenance: Minimal maintenance overhead. The TensorFlow team and contributors will maintain the documentation and keep it up to date. Changes should be reviewed and approved by the TensorFlow team leads. + +### Platforms and Environments +* Platforms: The feature is independent of platforms. +* Execution environments (Cloud services, accelerator hardware): The first stage would support CPU & GPU device. We consider supporting +additional devices as much as possible. + +### Best Practices +* We strongly suggest to enable FuseRecv in rank or match models such as [W&DL](https://arxiv.org/abs/1606.07792), [Dien](https://arxiv.org/abs/1809.03672). + +### Tutorials and Examples +Example of how to enable the FuseRecv feature: + +``` + >>> tf.config.optimizer.set_experimental_options({"do_fuse_recv": True}) +``` + +### Compatibility +* This feature works with the ParameterServerStrategy. +* This feature considers tensors on difference devices such as CPU, GPU and TPU. +* Independent of SavedModel or checkpoint. + +### User Impact +* None + +## Detailed Design + +### FuseRecv Op +We introduce the _RecvV2 Op and an RPC operation named FuseRecvTensorAsync in +RemoteWorker and WorkerService. The _RecvV2 Op definition is as follows: + +``` + >>> REGISTER_OP("_RecvV2") + >>> .Output("tensor: tensor_type") + >>> .Attr("tensor_type: list(type)") + >>> .Attr("tensor_name: list(string)") + >>> .Attr("send_device: string") + >>> .Attr("send_device_incarnation: int") + >>> .Attr("recv_device: string") + >>> .Attr("client_terminated: bool = false") + >>> .SetIsStateful() + >>> .SetShapeFn(shape_inference::UnknownShape); +``` + +FuseRecv requests a list of tensors with different types from remote devices, generally +we only fuse the Recv ops in the same recv device and on the same send device. + +### FuseRecv Optimizer in Grappler +During the post partition phase, we add a new pass to the post-partitioning optimizer +called “FuseRecv” to fuse Recv ops together. We traverse partitioned graphs & +the whole graph, replace Recv ops by FuseRecv ops in the partitioned graphs according +to its topology while iteratively searching and fusing potential Recv +operations. See Figure 4 for the formal algorithm definition. + + + +The procedure RECVFUSE takes two input arguments: 1) the TF computation +graph g, 2) a Partitioned graph. It is worth noting that the iteration of +all nodes shall start from the `root` nodes, which do not have any +source edge (node). The process between line 17 and 37 would be iteratively +executed and output key-value pairs (value: a group of edges could be fused +into one FuseRecv node). Then based on the grouped edges, we find out Recv +nodes in partitioned graph which could be replace by FusedRecv nodes. Besides +RECVFUSE also makes sure that no deadlock exists after the change to the +original graph. Also, the RPC operation of FuseRecvTensor is able to overlap +the computation and communication by using the graph topology. + +### FuseRecv RPC Method and Handler +A new RPC method ‘FuseRecvTensorAsync’ is added to the WorkerInterface. +We extend the ‘FuseRecvTensorAsync’ method with the ability to handle +multi rendezvous keys and fetch multi key tensors. + +At the server side, we add a ‘FuseRecvTensorHandlerRaw’, which handles +the multi rendezvous key for the ‘local recv’ instantiated by the local +tensor operations. As mentioned before, the sending nodes are not fused +and we therefore must do multiple local recvs corresponding to the +multi send nodes. + +Because the ‘FuseRecvTensorAsync’ handler might be executed before +the send operations happen, a call back wrapper is required. We use +a counter, initialized with the fuse count, and each send action triggers +the call back wrapper and performs an atomic decrease of the counter, +when the counter reaches 0, the real callback is executed and the tensors +are sent to the Recv node. + +### Dead Tensor Handling +We treat the output of the FuseRecv node as dead if and only if all the +fused tensors are dead. + +### FuseRecv Error Handling +The status of the FuseRecv node would be similar as the Recv node, which +include additional information for every Recv tensor. + +## Questions and Discussion Topics + diff --git a/rfcs/20200411-fuse_recv/current_graph_partition_strategy.png b/rfcs/20200411-fuse_recv/current_graph_partition_strategy.png new file mode 100644 index 000000000..1d882cd96 Binary files /dev/null and b/rfcs/20200411-fuse_recv/current_graph_partition_strategy.png differ diff --git a/rfcs/20200411-fuse_recv/fuse_recv_procedure.png b/rfcs/20200411-fuse_recv/fuse_recv_procedure.png new file mode 100644 index 000000000..359b006ee Binary files /dev/null and b/rfcs/20200411-fuse_recv/fuse_recv_procedure.png differ diff --git a/rfcs/20200411-fuse_recv/graph_partition_strategy_with_fuse_recv.png b/rfcs/20200411-fuse_recv/graph_partition_strategy_with_fuse_recv.png new file mode 100644 index 000000000..58c7887eb Binary files /dev/null and b/rfcs/20200411-fuse_recv/graph_partition_strategy_with_fuse_recv.png differ diff --git a/rfcs/20200411-fuse_recv/performance_result.png b/rfcs/20200411-fuse_recv/performance_result.png new file mode 100644 index 000000000..54100ba5a Binary files /dev/null and b/rfcs/20200411-fuse_recv/performance_result.png differ diff --git a/rfcs/20200420-tfx-tuner-component.md b/rfcs/20200420-tfx-tuner-component.md new file mode 100644 index 000000000..57f550f35 --- /dev/null +++ b/rfcs/20200420-tfx-tuner-component.md @@ -0,0 +1,383 @@ +# TFX Tuner Component + +| Status | Proposed | +| :------------ | :-------------------------------------------------------- | +| **Author(s)** | Jiayi Zhao (jyzhao@google.com), Amy Wu (wuamy@google.com) | +| **Sponsor** | Zhitao Li (zhitaoli@google.com), Tom O'Malley (omalleyt@google.com), Matthieu Monsch (mtth@google.com), Makoto Uchida (muchida@google.com), Goutham Bhat (goutham@google.com) | +| **Updated** | 2020-04-20 | + +## Objective + +### Goal + +* A new Tuner component in TFX for automated hyper-parameter tuning, which is + based on abstractions from + [KerasTuner library](https://github.com/keras-team/keras-tuner), in order to + reuse abstractions and algorithms from latter. + +### Non Goal + +* Natively support multi-worker tuning by the system. As TFX doesn't have + ability to manage multi-worker clusters, running multiple trials in parallel + (parallel tuning) and running each trial in distributed env (distributed + training) are not supported natively. Parallel tuning may instead be + realized by a particular implementation of TFX Tuner (custom Executor), + e.g., in Google Cloud environment. +* Implementation of custom tuner for + [KerasTuner library](https://github.com/keras-team/keras-tuner) is out of + scope of this design discussion, e.g., a built-in EstimatorTuner support. + However, user project can still implement a tuner that inherits from + [`kerastuner.BaseTuner`](https://github.com/keras-team/keras-tuner/blob/1.0.0/kerastuner/engine/base_tuner.py) + and provide it to the proposed TFX Tuner component. + +## Background and Motivation + +A hyperparameter is a parameter whose value is used to control the learning +process of a model or the model itself (e.g., layers and number of nodes). By +contrast, the values of other parameters (typically node weights) are learned. + +Hyperparameter optimization is a critical part of many machine learning +pipelines. Thus we propose a new TFX component, with the given search space +which specifies the hyperparameter configuration (name, type, range etc.). TFX +will optimize the hyperparameters based on the tuning algorithm. + +## User Benefit + +This document proposes a built-in TFX Tuner component, which works seamlessly +with Trainer and other TFX components. As the Tuner component will utilize the +[KerasTuner library](https://github.com/keras-team/keras-tuner), all supported +tuning methods will be available to TFX, including custom implementation of +KerasTuner. + +## Design Proposal + +TFX Tuner component will be built with the +[KerasTuner library](https://github.com/keras-team/keras-tuner). In the +following sections, we will first briefly go over the KerasTuner library and +several concepts in hyperparameter optimization. Then we will focus on our Tuner +component interface and how we utilize the KerasTuner library. After that, we +will discuss parallel tuning and our plan on Google Cloud integration. + +### KerasTuner Library + +The following graph shows a typical workflow of hyperparameter tuning under the +KerasTuner framework: + +


