-
Notifications
You must be signed in to change notification settings - Fork 579
Single-Client Parameter Server Training #214
Conversation
Fix extra period.
Fix extra period.
| dataset_fn = # a function that returns a dataset | ||
|
|
||
| # Clone the dataset on all workers, shuffled with different seeds. | ||
| distributed_dataset = strategy.experimental_distribute_datasets_from_function( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @tensorflow/sig-io-maintainers Do we know if this would work with our current TF IO datasets? How do we make sure our implementation of different datasets would be safe to be used when cloned on all workers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is a tf dataset, it is supposed to work. But we need to verify.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the API should follow base class' contract to the extend possible: 1) It needs to return a tf.data.Dataset, and 2) the Dataset should have a per-replica batch size. Like yuefengz@ said verification should be done.
| **We will start with this kind of training loop in our first version.** | ||
|
|
||
|
|
||
| ### Fault Tolerance |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc ElasticDL team @wangkuiyi @LiMinghao1994 @skydoorkai @QiJune @brightcoder01 @workingloong @chunyang-wen
|
Will this approach cause a huge graph that takes a long time to compile, if there're a lot of parameter servers in the training? |
| The second challenge is calling `get_next` on an iterator is synchronous. This means that the training loop is not truly asynchronous. It is tricky to make `get_next` asynchronous because the client doesn’t know how many items will be in the iterator and thus doesn’t know how many functions to schedule. | ||
|
|
||
|
|
||
| ##### Alternative: passing iterators to `strategy.schedule` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to call model.save() when some replica_fns are still running?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is what happens in Estimator. If it is really a concern, users can choose to join before saving the model.
| x, y = next(iterator) | ||
| with tf.GradientTape() as tape: | ||
| predictions = model(x, table, training=True) | ||
| loss = compute_loss(y, predictions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you think this should work with mixed precision loss scaling?
We can either have the PS store the loss scale or have each worker have it's own loss scale. If we also want to send gradients to the PS in fp16, this will become more difficult, as then we have to send scaled gradients to the PS, so the workers would also have to send the loss scale over.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@reedwm Interesting question there. Just curious, how does current MWMS work with the loss scale?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently for multi-worker mirrored strategies, each worker has its own independent loss scale. Each worker independently scales the loss, unscales the gradients, and updates its loss scale. MWMS currently does not support allreducing in fp16, since then you must allreduce scaled gradients, but each worker may have a different loss scale. But at least it supports mixed precision for the forwards + backwards pass of the model with loss scaling.
With this proposal, extra work will have to be done even if we don't care about transferring gradients in fp16. We could have each worker have its own loss scale, like in MWMS, but this is more unnatural and difficult since all other variables are on the PS.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if the loss scale is also synchronized, like mean/variance in SyncBatchNormalization? Like you said, it could be aggregated over workers in MWMS or stored on PS.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't know SyncBatchNormalization existed until now, but I just read the documentation. I'm not sure how SyncBatchNormalization would work with this proposal either. The SyncBatchNormalization documentation states it synchronizes "the global batch statistics across all devices that are training the model". But you cannot synchronizes with this proposal as it uses asynchronous training. In general, I'm not sure what happens if you call all_reduce with a PS Strategy.
I think aggregating the loss scale over workers is tricky since they are asynchronous, but storing it on the PS would work, I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For common use case in which the batch norm statistics are updated using exponential decay, storing them on PS and updating asynchronously might work without too much harm to the accuracy.
But I agree calling AllReduce in a PS strategy is weird. Ping @anj-s who implemented SyncBN.
| 1. Connect to all remote workers and parameter servers. | ||
| 2. Create variables on parameter servers and hold references to them. | ||
| 3. Create datasets and iterators on workers. | ||
| 4. Create the replica function that takes an iterator as input, trace it and register it on all workers. Note: a function may create variables as well. If not specified, they will be created on parameter servers as well. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the workers are running the function that creates the variables, who creates the variables on which parameter server? How are they synced?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If variables defined inside a tf.function, these variables will be created when the function is traced. The tracing happens on the client, so the client will create these variables under the device scope of parameter servers.
After variables are created, each worker takes care of reading and update them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To the second question, since only the client is responsible for creating variables, they don't need to be synced. This is one of the benefits of single client design.
|
|
||
| #### Constraints | ||
|
|
||
| Function is first-class citizen. Users should only schedule functions instead of running individual ops, in addition to creating variables. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tf.functions? Or Python functions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, tf.functions. We cannot support arbitrary python functions now. I'll update it.
|
|
||
| Function is first-class citizen. Users should only schedule functions instead of running individual ops, in addition to creating variables. | ||
|
|
||
| Users can occasionally run individual ops on the client, only for reporting purposes such as printing a metric’s value. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why allow this at all? Does this increase the future support surface? It seems like restricting to functions can potentially save same "if" statements in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is common for users to call metrics.result(), fetching the variable of sum and count, dividing the sum by the count happens on the client. Users can just do it without calling into schedule. We won't have special conditioning in our implementation.
| class ParameterServerStrategyV2: | ||
|
|
||
| def schedule(self, replica_fn, args=(), kwargs=()): | ||
| """Schedule the `replica_fn` on all replicas in a sync group (a worker). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to have one group in sync within the larger async group? What would that look like? If that is off the table for now, maybe we should be more specific and say this is one worker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Sync group" is a concept of tf.distribute and some of our APIs depend on this concept. (You can search for sync group in the code base.) It has been defined that a sync group is one worker in PSStrategy and all workers in MWMS. So I followed this concept to be consistent with rest of the tf.distribute API.
| out of order and function inputs are shared between sync groups. | ||
|
|
||
| We don't support the cases where `args` or `kwargs` are bound to a specific | ||
| sync group. We will consider supporting them in the future. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this mean? Can you clarify what it means to bind something to a sync group?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we schedule two functions, one function takes the input of another function, then they should schedule on the same worker. The return value of the first function (i.e. the input of the second function) is bound to that worker. Does that make sense to you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yuefengz How about we remove this paragraph to avoid confusion, since we don't support it anyway? We can always add it back when it's supported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SG, will you remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sg, I'll do that
|
|
||
| Historically, `tf.estimator.Estimator` uses a dedicated evaluator that periodically loads from a checkpoint, and performs evaluation with evaluation data. However `tf.keras` typically evaluates in an alternating manner after every epoch of training, and this is also the case with `tf.keras` + `MultiWorkerMirroredStrategy`. | ||
|
|
||
| With `ParameterServerStrategyV2`, we will start with a dedicated** evaluator that runs alongside the training cluster**, **aka “sidecar evaluation”**; in this scheme, training client is required to generate checkpoints periodically, and the evaluator reads the latest checkpoint as it becomes available. The evaluation is asynchronous to the training progress. With this we provide the functionality Estimator has been able to with Keras API, which is important to attract updates from Estimator users to TF 2.0. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the bolding is mis-spaced in the first line here.
|
|
||
| Historically, `tf.estimator.Estimator` uses a dedicated evaluator that periodically loads from a checkpoint, and performs evaluation with evaluation data. However `tf.keras` typically evaluates in an alternating manner after every epoch of training, and this is also the case with `tf.keras` + `MultiWorkerMirroredStrategy`. | ||
|
|
||
| With `ParameterServerStrategyV2`, we will start with a dedicated** evaluator that runs alongside the training cluster**, **aka “sidecar evaluation”**; in this scheme, training client is required to generate checkpoints periodically, and the evaluator reads the latest checkpoint as it becomes available. The evaluation is asynchronous to the training progress. With this we provide the functionality Estimator has been able to with Keras API, which is important to attract updates from Estimator users to TF 2.0. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In estimator, we auto-generated checkpoints, IIRC. In this case, is it the user's responsibility? What if they forget? How can we make this easier/part of the examples?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For custom training loop, yes, it is their responsibility (and we should update the example accordingly). One thing we're working on is an EvaluationLoop API with which they gain automatic sidecar eval. If they use this, they are forced to provide a checkpoint_dir to __init__, matching what they use in the client when saving.
In the future, once we integrate with Keras model.fit(), users should use a callback as they would without distributed training.
| run_training_loop() | ||
| elif cluster_resolver.task_type == "evaluator": | ||
| run_evaluation_loop() | ||
| ``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems sort of sad to have this top level forking. Is there a better way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Users can definitely use the client to drive the evaluation on the evaluator (or any worker) with our current design. This is one way of running evaluation if users prefer the Estimator-style side-car eval. I think we need more paragraphs for evaluation. @rchao
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if we use an async executor for evaluator's function scheduling (with a checkpoint file), it should be feasible that we have a true single client (aka inline evaluation). One implication is that the evaluator would need to be considered the same cluster as the chief and workers. The plan now is that we develop an API for sidecar eval, and provide example for how inline eval can be done (possibly some util if useful), and which to be used is at their discretion.
|
|
||
|
|
||
| In the evaluation client, the user loads the checkpoints that were periodically saved into the model (by the training client), does evaluation over a full pass of eval dataset, and does whatever they want to do with eval results. Examples include exporting them to files which can be read by the training client for actions (such as reducing learning rate, early stopping, etc.) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we see the e2e code example with training_loop and the fork as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - we have some code sample and will update in the doc.
|
|
||
|
|
||
| We can potentially merge this `Future` class with our `Tensor` class. | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that if this is returned by a public API, we generally require that the returned type is public, or some interface to it. I don't love the idea of a generically named "Future" type in TF, especially because it is suspiciously like the two-stage-Session-running of v1, so we should consider what the contract is here and what the user needs to know about this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can compare the Future to PerReplica value in MirroredStrategy. In MirroredStrategy, run returns an opaque type which is PerReplica and users can only get its concrete values through local_results. Similarly, our schedule method returns Future (or PerReplica of Futures when we support multi-GPU), users can only get its concrete values through local_results
Our Tensor object in async eager is a future-like object as well. It doesn't have any concrete value until we materialize it via numpy. Therefore, we can think of merging the logic to Tensor class in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this, it sounds to me the contract is "one would need to use local_results to deal with whatever they get from run() or schedule()", and doesn't mention the type of it.
Well, if possible and it makes things easier for users, we can encourage user not to rely on the return value of schedule(). For many things that are involved in training such as loss or metrics, they can simply be variables that get updated within the remote function call. User simply reads those values from the client.
|
|
||
| # Print accuracy value every one minute. | ||
| while not strategy.done(): | ||
| print("Current accuracy: %f" % accuracy.result()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the consistency/atomicity guarantees we're aiming for?
E.g. computing accuracy may require 2 (or more) variables. Depending on the parameter server updates, the metric could be computed from an invalid state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is by design, there's no consistency guaranteed by PS training. Values at any moment can be read from PS. Can you give an example of how there can be an invalid state? My view of this is, someone might be reading while some others may be writing, and it doesn't guarantee the value read is before writing or after writing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition to what Rick said, if strong consistency is really required, users can return something from the function and read that return values. E.g. some computed metrics that doesn't depend on multiple variables on ps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it is possible for Keras team to provide non-variable-based metrics? This may be useful who evaluation and for users who want strong consistency guarantee.
|
|
||
| #### The Unknown of Scheduled Functions | ||
|
|
||
| For functions that have been scheduled, it is difficult for the client to know whether they have actually been executed or not when the client detects their corresponding worker failure. Therefore, in addition to inform users of this uncertainty in the case of worker failure, we should do the following to reduce this uncertainty: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Depending on the consistency guarantees we make for the parameter server:
Would it be possible (or even make sense) for the parameter server to track which functions have and haven't been completed?
This may let us do things like know exactly which functions need to be
re-executed if a parameter server fails and a checkpoint needs to be reloaded.
Then again, if people are training asynchronously I guess it's not necessarily worth trying to provide reproducibility guarantees, or guarantees that each step will be run exactly once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our current design allows users to receive an object from strategy.schedule(), and when calling local_results(object) one gets the result. If it's not executed or not fully executed due to worker preemption, the strategy will find another worker to take over. For other kinds of errors, it is possible to design it in a way that such error would be discovered upon calling strategy.local_results() or strategy.join(). For the case where PS fails, the whole cluster would be restarted and a checkpoint previously saved is loaded.
|
|
||
| Historically, `tf.estimator.Estimator` uses a dedicated evaluator that periodically loads from a checkpoint, and performs evaluation with evaluation data. However `tf.keras` typically evaluates in an alternating manner after every epoch of training, and this is also the case with `tf.keras` + `MultiWorkerMirroredStrategy`. | ||
|
|
||
| With `ParameterServerStrategyV2`, we will start with a dedicated** evaluator that runs alongside the training cluster**, **aka “sidecar evaluation”**; in this scheme, training client is required to generate checkpoints periodically, and the evaluator reads the latest checkpoint as it becomes available. The evaluation is asynchronous to the training progress. With this we provide the functionality Estimator has been able to with Keras API, which is important to attract updates from Estimator users to TF 2.0. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So just to clarify: The parameter server won't be shared by the training & evaluation job. Instead, they'll do what estimator does and communicate via checkpoints & the file system?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the current design for our first cut.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the large sizes of some embedding models, incremental checkpoints might ease the file system pressure if users would like to do frequent evaluation without sharing the parameter servers.
|
|
||
| With `ParameterServerStrategyV2`, we will start with a dedicated** evaluator that runs alongside the training cluster**, **aka “sidecar evaluation”**; in this scheme, training client is required to generate checkpoints periodically, and the evaluator reads the latest checkpoint as it becomes available. The evaluation is asynchronous to the training progress. With this we provide the functionality Estimator has been able to with Keras API, which is important to attract updates from Estimator users to TF 2.0. | ||
|
|
||
| With our recommendation, users should create a separate evaluation client that runs the same python binary as the training client. This python binary will contain the if-else clause as it bifurcates into two paths: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given it's two separate jobs, why require it be the same python binary instead of having two separate python binaries? Internally we have pretty good tooling for bringing up one binary as a training job and another as an evaluation job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe users can do so. @rchao
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's this flexibility to do either way. We were assuming the convenience brought by letting every job run the same binary, regardless of chief, worker, ps, or evaluator. But yes, user is free to have two or even more binaries for different jobs.
| # evaluation pass | ||
| return eval_accuracy.result() | ||
|
|
||
| while True: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's definitely room for non-parameter-server-specific utilities here for side car evaluation. Plenty of users (e.g. object detection) want to use dedicated evaluation jobs with mirroredstrategy, tpustrategy, & others.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The common patterns are:
Wait until a checkpoint is present
Evaluate
Check if a new one is present, if not wait a while until it is
Finish evaluating when no new checkpoint has been written for a while
With timeouts, meaningful error messages, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly and agreed. We're working on this and will update the design doc accordingly. Thanks for pointing this out.
| ``` | ||
|
|
||
|
|
||
| In the evaluation client, the user loads the checkpoints that were periodically saved into the model (by the training client), does evaluation over a full pass of eval dataset, and does whatever they want to do with eval results. Examples include exporting them to files which can be read by the training client for actions (such as reducing learning rate, early stopping, etc.) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the training job and sidecar evaluation have two-way communication, all via the file system? Does this make sense?
It might since the training client can run asynchronously then somehow signal to workers, but it feels a little unwieldy...
How hard would it be to allow multiple clients/jobs connect to the same parameter server? Then they could communicate via reads/writes directly to the parameter server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If users want the training cluster to read eval results, they probably shouldn't do side-car. They should do inline eval -- alternative between training and eval, preferably using the same worker pool. Our current design allows that with some limitation.
It is a future work to fully support inline eval since it needs us to think carefully on how to support visitation guarantee with a sharded dataset, especially when workers go down or when there are stragglers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One option yuefengz@ and I discussed is inline-evaluation-by-single-evaluator, where one schedules an evaluation function to the dedicated evaluator job. That way, a variable (on ps) can be used for the evaluator to update, and for the client to read and act accordingly.
| * (Future work) It captures the lineage between functions and return values so that we can rebuild any poisoned objects. | ||
|
|
||
| ```python | ||
| class Future(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So no on_complete asynchronous callbacks on futures? The clients check the value of futures exclusively by joining/polling?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can definitely asynchronous callbacks if this is needed. Maybe you can give me some use cases?
It will trace a function for one worker and schedule the function to all workers. So the graph size should be small as opposed to in-graph distributed training where we have a single large graph that has definition for all workers. If there are a lot of parameter servers, the function will be partitioned into smaller functions, one for each ps. We don't see any problem so far. Please let us know if you have encountered any problem before. |
|
This went through a design review on 4/9/2020 and the RFC has been updated based on the review comments. Accepting. |
Objective
This RFC proposes a design and tentative interfaces for parameter server training in TensorFlow 2 using a single-client architecture.