Skip to content

Commit a701926

Browse files
committed
Added separate code file and used literalinclude
Summary: Test Plan: Reviewers: Subscribers: Tasks: Tags:
1 parent 0aaeb4e commit a701926

File tree

3 files changed

+220
-177
lines changed

3 files changed

+220
-177
lines changed

advanced_source/rpc_ddp_tutorial.rst

Lines changed: 28 additions & 177 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,17 @@ workers in our setup as follows:
3636
training loop on the two trainers.
3737
2) 1 Parameter Server, which basically holds the embedding table in memory and
3838
responds to RPCs from the Master and Trainers.
39-
3) 2 Trainers, which store a FC layer (nn.Linear) which is replicated amongst
39+
3) 2 Trainers, which store an FC layer (nn.Linear) which is replicated amongst
4040
themselves using `DistributedDataParallel <https://pytorch.org/docs/stable/nn.html#torch.nn.parallel.DistributedDataParallel>`__.
4141
The trainers are also responsible for executing the forward pass, backward
4242
pass and optimizer step.
4343

4444
|
4545
The entire training process is executed as follows:
4646

47-
1) The master creates an embedding table on the Parameter Server and holds a
47+
1) The master creates an embedding table on the Parameter Server and holds an
4848
`RRef <https://pytorch.org/docs/master/rpc.html#rref>`__ to it.
49-
2) The master, then kicks of the training loop on the trainers and passes the
49+
2) The master, then kicks off the training loop on the trainers and passes the
5050
embedding table RRef to the trainers.
5151
3) The trainers create a ``HybridModel`` which first performs an embedding lookup
5252
using the embedding table RRef provided by the master and then executes the
@@ -60,11 +60,13 @@ The entire training process is executed as follows:
6060
7) Finally, the `Distributed Optimizer <https://pytorch.org/docs/master/rpc.html#module-torch.distributed.optim>`__ is used to update all the parameters.
6161

6262

63-
|
64-
**NOTE**: You should always use `Distributed Autograd <https://pytorch.org/docs/master/rpc.html#distributed-autograd-framework>`__ for the backward pass if you're combining DDP and RPC.
63+
.. attention::
64+
65+
You should always use `Distributed Autograd <https://pytorch.org/docs/master/rpc.html#distributed-autograd-framework>`__
66+
for the backward pass if you're combining DDP and RPC.
6567

6668

67-
Now, lets go through each part in detail. Firstly, we need to setup all of our
69+
Now, let's go through each part in detail. Firstly, we need to setup all of our
6870
workers before we can perform any training. We create 4 processes such that
6971
ranks 0 and 1 are our trainers, rank 2 is the master and rank 3 is the
7072
parameter server.
@@ -79,90 +81,24 @@ Finally, the master waits for all training to finish before exiting.
7981
The trainers first initialize a ``ProcessGroup`` for DDP with world_size=2
8082
(for two trainers) using `init_process_group <https://pytorch.org/docs/stable/distributed.html#torch.distributed.init_process_group>`__.
8183
Next, they initialize the RPC framework using the TCP init_method. Note that
82-
the ports are different in RPC initialization and ProcessGroup intialization.
84+
the ports are different in RPC initialization and ProcessGroup initialization.
8385
This is to avoid port conflicts between initialization of both frameworks.
8486
Once the initialization is done, the trainers just wait for the ``_run_trainer``
8587
RPC from the master.
8688

8789
The parameter server just initializes the RPC framework and waits for RPCs from
8890
the trainers and master.
8991

90-
.. code:: python
91-
92-
def run_worker(rank, world_size):
93-
r"""
94-
A wrapper function that initializes RPC, calls the function, and shuts down
95-
RPC.
96-
"""
97-
98-
# We need to use different port numbers in TCP init_method for init_rpc and
99-
# init_process_group to avoid port conflicts.
100-
rpc_backend_options = ProcessGroupRpcBackendOptions()
101-
rpc_backend_options.init_method='tcp://localhost:29501'
102-
103-
# Rank 2 is master, 3 is ps and 0 and 1 are trainers.
104-
if rank == 2:
105-
rpc.init_rpc(
106-
"master",
107-
rank=rank,
108-
world_size=world_size,
109-
rpc_backend_options=rpc_backend_options)
110-
111-
# Build the embedding table on the ps.
112-
emb_rref = rpc.remote(
113-
"ps",
114-
torch.nn.EmbeddingBag,
115-
args=(NUM_EMBEDDINGS, EMBEDDING_DIM),
116-
kwargs={"mode": "sum"})
117-
118-
# Run the training loop on trainers.
119-
futs = []
120-
for trainer_rank in [0, 1]:
121-
trainer_name = "trainer{}".format(trainer_rank)
122-
fut = rpc.rpc_async(
123-
trainer_name, _run_trainer, args=(emb_rref, rank))
124-
futs.append(fut)
125-
126-
# Wait for all training to finish.
127-
for fut in futs:
128-
fut.wait()
129-
elif rank <= 1:
130-
# Initialize process group for Distributed DataParallel on trainers.
131-
dist.init_process_group(
132-
backend="gloo", rank=rank, world_size=2,
133-
init_method='tcp://localhost:29500')
134-
135-
# Initialize RPC.
136-
trainer_name = "trainer{}".format(rank)
137-
rpc.init_rpc(
138-
trainer_name,
139-
rank=rank,
140-
world_size=world_size,
141-
rpc_backend_options=rpc_backend_options)
14292

143-
# Trainer just waits for RPCs from master.
144-
else:
145-
rpc.init_rpc(
146-
"ps",
147-
rank=rank,
148-
world_size=world_size,
149-
rpc_backend_options=rpc_backend_options)
150-
# parameter server do nothing
151-
pass
93+
.. literalinclude:: ../advanced_source/rpc_ddp_tutorial/main.py
94+
:language: py
95+
:start-after: BEGIN run_worker
96+
:end-before: END run_worker
15297

153-
# block until all rpcs finish
154-
rpc.shutdown()
155-
156-
157-
if __name__=="__main__":
158-
# 2 trainers, 1 parameter server, 1 master.
159-
world_size = 4
160-
mp.spawn(run_worker, args=(world_size, ), nprocs=world_size, join=True)
161-
162-
Before we discuss details of the Trainer, lets introduce the ``HybridModel`` that
98+
Before we discuss details of the Trainer, let's introduce the ``HybridModel`` that
16399
the trainer uses. As described below, the ``HybridModel`` is initialized using an
164100
RRef to the embedding table (emb_rref) on the parameter server and the ``device``
165-
to use for DDP. The initialization of the model wraps a
101+
to use for DDP. The initialization of the model wraps an
166102
`nn.Linear <https://pytorch.org/docs/master/generated/torch.nn.Linear.html>`__
167103
layer inside DDP to replicate and synchronize this layer across all trainers.
168104

@@ -172,30 +108,12 @@ embedding lookup on the parameter server using an
172108
and passes its output onto the FC layer.
173109

174110

175-
.. code:: python
176-
177-
class HybridModel(torch.nn.Module):
178-
r"""
179-
The model consists of a sparse part and a dense part. The dense part is an
180-
nn.Linear module that is replicated across all trainers using
181-
DistributedDataParallel. The sparse part is an nn.EmbeddingBag that is
182-
stored on the parameter server.
111+
.. literalinclude:: ../advanced_source/rpc_ddp_tutorial/main.py
112+
:language: py
113+
:start-after: BEGIN hybrid_model
114+
:end-before: END hybrid_model
183115

184-
The model holds a Remote Reference to the embedding table on the parameter
185-
server.
186-
"""
187-
188-
def __init__(self, emb_rref, device):
189-
super(HybridModel, self).__init__()
190-
self.emb_rref = emb_rref
191-
self.fc = DDP(torch.nn.Linear(16, 8).cuda(device), device_ids=[device])
192-
self.device = device
193-
194-
def forward(self, indices, offsets):
195-
emb_lookup = self.emb_rref.rpc_sync().forward(indices, offsets)
196-
return self.fc(emb_lookup.cuda(self.device))
197-
198-
Next, lets look at the setup on the Trainer. The trainer first creates the
116+
Next, let's look at the setup on the Trainer. The trainer first creates the
199117
``HybridModel`` described above using an RRef to the embedding table on the
200118
parameter server and its own rank.
201119

@@ -215,42 +133,10 @@ returns local parameters and doesn't include ``emb_rref``.
215133
Finally, we create our DistributedOptimizer using all the RRefs and define a
216134
CrossEntropyLoss function.
217135

218-
.. code:: python
219-
220-
def _retrieve_embedding_parameters(emb_rref):
221-
return [RRef(p) for p in emb_rref.local_value().parameters()]
222-
223-
224-
def _run_trainer(emb_rref, rank):
225-
r"""
226-
Each trainer runs a forward pass which involves an embedding lookup on the
227-
parameter server and running nn.Linear locally. During the backward pass,
228-
DDP is responsible for aggregating the gradients for the dense part
229-
(nn.Linear) and distributed autograd ensures gradients updates are
230-
propagated to the parameter server.
231-
"""
232-
233-
# Setup the model.
234-
model = HybridModel(emb_rref, rank)
235-
236-
# Retrieve all model parameters as rrefs for DistributedOptimizer.
237-
238-
# Retrieve parameters for embedding table.
239-
model_parameter_rrefs = rpc.rpc_sync(
240-
"ps", _retrieve_embedding_parameters, args=(emb_rref,))
241-
242-
# model.parameters() only includes local parameters.
243-
for param in model.parameters():
244-
model_parameter_rrefs.append(RRef(param))
245-
246-
# Setup distributed optimizer
247-
opt = DistributedOptimizer(
248-
optim.SGD,
249-
model_parameter_rrefs,
250-
lr=0.05,
251-
)
252-
253-
criterion = torch.nn.CrossEntropyLoss()
136+
.. literalinclude:: ../advanced_source/rpc_ddp_tutorial/main.py
137+
:language: py
138+
:start-after: BEGIN setup_trainer
139+
:end-before: END setup_trainer
254140

255141
Now we're ready to introduce the main training loop that is run on each trainer.
256142
``get_next_batch`` is just a helper function to generate random inputs and
@@ -264,45 +150,10 @@ batch:
264150
4) Use Distributed Autograd to execute a distributed backward pass using the loss.
265151
5) Finally, run a Distributed Optimizer step to optimize all the parameters.
266152

153+
.. literalinclude:: ../advanced_source/rpc_ddp_tutorial/main.py
154+
:language: py
155+
:start-after: BEGIN run_trainer
156+
:end-before: END run_trainer
267157
.. code:: python
268158
269-
# def _run_trainer(emb_rref, rank): continued...
270-
271-
def get_next_batch(rank):
272-
for _ in range(10):
273-
num_indices = random.randint(20, 50)
274-
indices = torch.LongTensor(num_indices).random_(0, NUM_EMBEDDINGS)
275-
276-
# Generate offsets.
277-
offsets = []
278-
start = 0
279-
batch_size = 0
280-
while start < num_indices:
281-
offsets.append(start)
282-
start += random.randint(1, 10)
283-
batch_size += 1
284-
285-
offsets_tensor = torch.LongTensor(offsets)
286-
target = torch.LongTensor(batch_size).random_(8).cuda(rank)
287-
yield indices, offsets_tensor, target
288-
289-
# Train for 100 epochs
290-
for epoch in range(100):
291-
# create distributed autograd context
292-
for indices, offsets, target in get_next_batch(rank):
293-
with dist_autograd.context() as context_id:
294-
output = model(indices, offsets)
295-
loss = criterion(output, target)
296-
297-
# Run distributed backward pass
298-
dist_autograd.backward(context_id, [loss])
299-
300-
# Tun distributed optimizer
301-
opt.step(context_id)
302-
303-
# Not necessary to zero grads as each iteration creates a different
304-
# distributed autograd context which hosts different grads
305-
print("Training done for epoch {}".format(epoch))
306-
307-
|
308159
Source code for the entire example can be found `here <https://github.com/pytorch/examples/tree/master/distributed/rpc/ddp_rpc>`__.

0 commit comments

Comments
 (0)