diff --git a/feature_examples/tensorflow2/README.md b/feature_examples/tensorflow2/README.md index d8ce4552..71676159 100644 --- a/feature_examples/tensorflow2/README.md +++ b/feature_examples/tensorflow2/README.md @@ -6,4 +6,6 @@ This directory contains several examples showing how to use TensorFlow 2 on the - [IMDB Sentiment Prediction](embeddings): These examples train an IPU model with an embedding layer and an LSTM to predict the sentiment of an IMDB review. -- [Inspecting tensors using custom outfeed layers and a custom optimizer](inspecting_tensors): This example trains a choice of simple fully connected models on the MNIST numeral data set and shows how tensors (containing activations and gradients) can be returned to the host via outfeeds for inspection. \ No newline at end of file +- [Inspecting tensors using custom outfeed layers and a custom optimizer](inspecting_tensors): This example trains a choice of simple fully connected models on the MNIST numeral data set and shows how tensors (containing activations and gradients) can be returned to the host via outfeeds for inspection. + +- [Pipelining](pipelining): Examples demonstrating and explaining different ways of using multiple IPUs. Pipelining is applied to parallelise and speed up the training. diff --git a/feature_examples/tensorflow2/pipelining/README.md b/feature_examples/tensorflow2/pipelining/README.md new file mode 100644 index 00000000..d05a705a --- /dev/null +++ b/feature_examples/tensorflow2/pipelining/README.md @@ -0,0 +1,542 @@ +# TensorFlow 2: Model Parallelism with IPU Pipelining + +In this tutorial you will train a selection of simple fully connected models +on the MNIST numeral data set and see how training can be parallelised over +multiple IPU devices. + +## Model Parallelism With Pipelining + +With pipelining, as with sharding, the model is split into stages where each +stage can fit and be run on a single IPU. However, unlike sharding, the compute +for separate batches is overlapped so that execution of the model +is parallelised. That is, each stage (part of the original model) is executed +on its IPU while the IPUs allocated to previous stages are already working on +subsequent batches. This provides improved utilisation compared to sharding. + +![Pipelining outline](static/pipelining_outline.png) + +Refer to the technical note on TensorFlow Model Parallelism for full details: +[TensorFlow Model Parallelism - Pipelining]()/ + +Pipelining provides a method to run larger models that is conceptually less +straightforward compared to sharding. However, it offers better utilisation of +the allocated IPU resource and, for this reason, pipelining is recommended +where performance is critical. +This tutorial focuses on how to apply pipelining in TensorFlow 1. + +### Pipeline Execution Phases +It is important to understand the key phases of pipeline execution: + +1. Ramp up - the pipeline is being filled; work is flowing into each stage +until all stages are filled (all IPUs are busy). +2. Main execution - all stages are filled and IPU utilisation is maximised. +3. Ramp down - the pipeline is being drained; work is flowing out of each stage +until all stages are empty (no IPUs are busy). +4. Weight updates - all pipeline batches have been processed, so accumulated +gradients can be processed (gradient descent) and weights updated. +Note: +* Each individual batch passed through the pipeline is called a **mini-batch**. +* Weights are updated only once a set of mini-batches has been fully processed. +* Gradients are accumulated across a set of mini-batches. +* Weight updates are applied once all the complete set of mini-batches are +processed. + +In short, pipelining enforces **gradient accumulation** where: +`effective batch size = mini-batch size * gradient accumulation count` +Performing gradient accumulation is still valid because summing the gradients +across all the examples in a batch immediately and accumulating them over +several steps are equivalent. +Increasing the gradient accumulation count has these benefits: +1. A smaller proportion of time is spent in the ramp up and ramp down - that is, +more time is spent in the main execution phase where maximum utilisation of the +IPUs is made. +2. Fewer overall weight updates are made, which saves compute time. +Here is the pipeline outline extended to show the progress of 16 mini-batches +followed by a weight update. Notice that the best utilization of the IPUs is +during the main phase and that this is sustained until the last mini-batch enters +the pipeline, following which the ramp down begins. Also notice that weight +updates are only applied once, following the ramp down (after the pipeline has +been drained of all mini-batches). + +## Pipelining Schedules + +In this tutorial, we will create models using the Keras Model class and IPU +pipelining features. We are going to use Pipeline Stages to assign operations +to devices and to configure parallelism. + +In the following graphics, FWD and BWD refer to forward and backward passes. + +The computational stages can be interleaved on the devices in three different +ways as described by the `pipeline_schedule` parameter. By default the API +will use the `PipelineSchedule.Grouped` mode, where the forward passes are +grouped together, and the backward passes are grouped together. +![Grouped pipeline](static/grouped_pipeline.png) + +The main alternative is the `PipelineSchedule.Interleaved` mode, where the +forward and backward passes are interleaved, so that fewer activations need +to be stored. +![Interleaved pipeline](static/interleaved_pipeline.png) + +Additionally, the `PipelineSchedule.Sequential` mode, where the pipeline is +scheduled in the same way as if it were a sharded model, may be useful when +debugging your model. +![Sharded pipeline](static/sharded_pipeline.png) + +## Upgrading to TensorFlow 2 + +Considering that IPU computation can be enabled on both TensorFlow 1 +and Tensorflow 2 it is necessary to explain the major differences between them +and how it affects implementation of IPU specific code. + +### Device scopes +In IPU APIs for TF2, the scope context `ipu.scopes.ipu_scope(device_id)` was +replaced with a strategy context `ipu.ipu_strategy.IPUStrategy().scope()`. + +### Training loop +Since TF2 moved in the direction of eager execution, we no longer are required +to create sessions and use them as context (`with tf.Session()...`). Instead, +when using the Keras API, we can use the model instance directly and invoke +`model.compile()`, `model.fit()`, and `model.predict()` methods without +specifing explicitly the training loop. To enable IPUs, it is just required +that these invocations are executed under `IPUStrategy` scope. + +### Keras extensions to facilitate IPU computing +You can find the main documentation on the [GraphCore Keras for IPU](https://docs.graphcore.ai/projects/tensorflow-user-guide/en/latest/keras_tf2.html) page. +The framework has been extended to enable IPU devices usage and configuration. +All the new code can be found within the `tensorflow.python.ipu` package. + +### TF2 specific changes +There is a guide prepared by the TensorFlow team to conduct migration between +versions of TensorFlow library, which you can study [here](https://www.tensorflow.org/guide/migrate). + +A very exhaustive comparison of both versions can be found [here](https://www.tensorflow.org/guide/migrate/tf1_vs_tf2). + +## Tutorial Walkthrough + +This cell contains the constants applied to the whole tutorial. When running +this tutorial in a Jupyter Notebook, make sure all the cells below +are re-run (including this one). + + +```python +# Number of samples per batch. +BATCH_SIZE = 32 + +# Number of steps to run per execution. The number of batches to run for +# each TensorFlow function call. At most it would execute a full epoch. +STEPS_PER_EXECUTION = 500 + +# Number of steps per epoch. The total number of steps (batches of samples) +# for one epoch to finish and starting the next one. The default `None` is +# equal to the number of samples divided by the batch size. +STEPS_PER_EPOCH = STEPS_PER_EXECUTION + +# Number of epochs +EPOCHS = 4 + +# Optimizer parameters. +LEARNING_RATE = 0.01 +MOMENTUM = 0.9 + +# Number of devices that will be attached to this model for training and +# inference. +NUM_IPUS = 2 + +# Number of steps for which the gradients should be accumulated, for each +# configured replica. +STEPS_PER_REPLICA = 4 +``` + +## Importing libraries + + +```python +from typing import Optional + +import tensorflow as tf +from tensorflow import keras +from tensorflow.python import ipu +from tensorflow.keras import Model +from tensorflow.python.keras.engine.sequential import Sequential +from tensorflow.keras.layers import Flatten, Dense, Input +``` + +## Dataset preparation +We need to load the dataset and perform some normalization of values. Below +you will find a helper function to use inside IPU context, which will load +the input data with labels. + + +```python +def create_dataset(batch_size: int, repeat=True): + mnist = keras.datasets.mnist + (x_train, y_train), (x_test, y_test) = mnist.load_data() + x_train, x_test = x_train / 255.0, x_test / 255.0 + + train_ds = tf.data.Dataset.from_tensor_slices((x_train, y_train)) + train_ds = train_ds.shuffle(10000).batch(batch_size, drop_remainder=True) + train_ds = train_ds.map( + lambda d, l: (tf.cast(d, tf.float32), tf.cast(l, tf.float32)) + ) + if repeat: + return train_ds.repeat() + else: + return train_ds + + +train_ds = create_dataset(batch_size=BATCH_SIZE) +``` + +Initialise the IPU configuration - more details can be found in `IPUConfig` +[documentation](https://docs.graphcore.ai/projects/tensorflow-user-guide/en/latest/api.html#tensorflow.python.ipu.config.IPUConfig). +Creating new instance of `IPUConfig` and running `configure_ipu_system` always +reattaches the devices, freeing the resources if they were occupied by this +process. It is important when a Jupyter Notebook is used and the kernel +is still running, and it does not release IPU devices automatically. + + +```python +def configure_ipus(num_ipus: int): + ipu_configuration = ipu.config.IPUConfig() + ipu_configuration.auto_select_ipus = num_ipus + ipu_configuration.configure_ipu_system() +``` + +This will be the training function reused by all the kinds of models and modes +of pipelining. +> Note: model creation needs to be processed under the `IPUStrategy().scope()`, +> hence this function accepts only the reference to the function which performs +> the model creation, not the model instance (as `model_factory` argument). + + +```python +def train(strategy, + model_factory, + train_ds, + steps_per_replica: int = STEPS_PER_REPLICA, + steps_per_execution: int = STEPS_PER_EXECUTION, + steps_per_epoch: int = STEPS_PER_EPOCH, + epochs: int = 4): + + with strategy.scope(): + model = model_factory() + + model.compile( + loss=tf.keras.losses.SparseCategoricalCrossentropy( + from_logits=True + ), + optimizer=tf.keras.optimizers.SGD( + learning_rate=LEARNING_RATE, + momentum=MOMENTUM + ), + steps_per_execution=steps_per_execution + ) + + if steps_per_replica: + model.set_pipelining_options( + gradient_accumulation_steps_per_replica=steps_per_replica + ) + + model.fit(train_ds, steps_per_epoch=steps_per_epoch, epochs=epochs) +``` + +## Training a Keras `Functional` model on a single IPU + +Next let's define a function which returns a `Functional` Keras model. This +implementation looks very similar to a regular non-IPU Keras model definition. + +### Define the model + + +```python +def create_functional_model(batch_size=BATCH_SIZE): + input_layer = Input( + shape=(28, 28, 1), + dtype=tf.float32, + batch_size=batch_size + ) + x = Flatten(name='flatten')(input_layer) + x = Dense(256, activation='relu', name="dense256")(x) + x = Dense(128, activation='relu', name="dense128")(x) + x = Dense(64, activation='relu', name="dense64")(x) + x = Dense(32, activation='relu', name="dense32")(x) + x = Dense(10, name="logits")(x) + + model = Model( + inputs=input_layer, + outputs=x, + name="singleIPU" + ) + return model +``` + +### Execute Training + +It is essential to create a fresh instance of `IPUConfig` and `IPUStrategy` +before training. + + +```python +configure_ipus(num_ipus=1) + +train( + strategy=ipu.ipu_strategy.IPUStrategy(), + model_factory=create_functional_model, + train_ds=train_ds +) +``` + +## Training a Keras `Sequential` model on a single IPU + +Let us organize the same layers using the `Sequential` Keras model API. +This class groups a linear stack of layers into a `tf.Keras.Model`. +Then, `Sequential` provides training and inference features on this model. + +### Define the model + + +```python +def create_sequential_model(): + seq_model = Sequential( + layers=[ + Flatten(name='flatten'), + Dense(256, activation='relu', name="dense256"), + Dense(128, activation='relu', name="dense128"), + Dense(64, activation='relu', name="dense64"), + Dense(32, activation='relu', name="dense32"), + Dense(10, activation='softmax', name="logits") + ], + name="singleIPU" + ) + return seq_model +``` + +### Execute Training + +Next we refresh IPU device configuration and train again with the new model. + + +```python +configure_ipus(num_ipus=1) + +train( + strategy=ipu.ipu_strategy.IPUStrategy(), + model_factory=create_sequential_model, + train_ds=train_ds +) +``` + +## Training a Keras `Functional` model with pipelining for two devices + +The documentation of Pipeline Stages can be found [here](https://docs.graphcore.ai/projects/tensorflow-user-guide/en/latest/perf_training.html#pipelined-training). +There are two ways to enable IPU pipelining for a Keras model, depending on +if the user is writing a new model or using an existing model. + +To pipeline a `Functional` model you are writing yourself, each layer call must +happen within the scope of an `ipu.keras.PipelineStage` context. +In the function below, we assign layers to two different stages. + +### Define the model + + +```python +def create_functional_model_with_stages(): + input_layer = Input(shape=(28, 28, 1), + dtype=tf.float32, + batch_size=BATCH_SIZE) + with ipu.keras.PipelineStage(0): + x = Flatten(name='flatten')(input_layer) + x = Dense(256, activation='relu', name="dense256")(x) + x = Dense(128, activation='relu', name="dense128")(x) + x = Dense(64, activation='relu', name="dense64")(x) + + with ipu.keras.PipelineStage(1): + x = Dense(32, activation='relu', name="dense32")(x) + x = Dense(10, name="logits")(x) + + model = Model(inputs=input_layer, + outputs=x, + name="multipleIPUfunctional") + return model +``` + +In case an existing `TensorFlow` model is imported, an additional API +is provided to facilitate managing Pipeline Stages assignments. + +This feature is implemented with `model.get_pipeline_stage_assignment()` +and `model.set_pipeline_stage_assignment(assignments)` where `assignments` is +the result of calling `get_pipeline_stage_assignment`. +For an example with the ResNet50 please check this [documentation](https://docs.graphcore.ai/projects/tensorflow-user-guide/en/latest/keras_tf2.html#pipelining-an-existing-functional-model). + +### Execute Training + +Next we refresh IPU device configuration and train again with the new model. + + +```python +configure_ipus(num_ipus=2) + +train( + strategy=ipu.ipu_strategy.IPUStrategy(), + model_factory=create_functional_model_with_stages, + train_ds=train_ds +) +``` + +## Training a Keras `Sequential model` with pipelining + +Next we will write a function to create the model using the Keras `Sequential` +class as we did above, but with explicit mapping of layers to stages through +`set_pipeline_stage_assignment`, which accepts a list of integers as +a parameter. This function sets the pipeline stage assignment for all +the invocations of all the layers (excluding input layers) in the model +which is used to create a model-parallel execution when calling `fit()`, +`evaluate()` and `predict()`. + +>This pipelining stage assignment is ignored when using the `call()` function +>on this model. + +Below you will see pipeline stage assignment like this: `[0, 0, 0, 0, 1, 1])`. +This means that first four layers of `Sequential` model are assigned to +the first stage, and the remaining layers to the second stage. + +This list has to be has to be of the same length as the total number +of invocations of all the layers in this model, excluding input layers. + +### Define the model + + +```python +def create_pipeline_sequential_model(): + seq_model = Sequential( + layers=[ + Flatten(name='flatten'), + Dense(256, activation='relu', name="dense256"), + Dense(128, activation='relu', name="dense128"), + Dense(64, activation='relu', name="dense64"), + Dense(32, activation='relu', name="dense32"), + Dense(10, activation='softmax', name="logits") + ], + name="multipleIPUsequential" + ) + seq_model.set_pipeline_stage_assignment([0, 0, 0, 0, 1, 1]) + + return seq_model +``` + +### Execute Training + +Next we refresh IPU device configuration and train again with the new model. + + +```python +configure_ipus(num_ipus=2) + +train( + strategy=ipu.ipu_strategy.IPUStrategy(), + model_factory=create_pipeline_sequential_model, + train_ds=train_ds +) +``` + +## Other `PipelineSchedule` settings + +Next we can reuse the previous example and apply a different scheduling mode. +The modes can be characterized in detail like so (quoting the docstring of +`PipelineSchedule`): + +- `Grouped`: This groups the forward passes on multiple IPUs. This requires + more memory since activations need to be stored until the backward stages run + together. However, since forward passes tend to be smaller than backward + passes, `Grouped` tends to improve the speed of the execution, as different + IPUs don't spend so much time waiting for each other. + +- `Interleaved`: This schedules the backward passes whenever the forward passes + have just generated some activations. Consequently fewer activations are + required to be stored between the forward and backward pipeline stages, so + less memory is required. However, since forward and backward stages tend to + be very different in terms of execution cycles, the overall performance + of the pipeline tends to be slower. + +- `Sequential`: This is a debug mode, where the pipeline is scheduled in + the same way as if it were a sharded model. + +### Defining the model with `Interleaved` schedule + +The mode `Grouped` was used in the previous example, as it is the default +setting. In this next example we will use the `Interleaved` mode. + + +```python +def create_pipeline_sequential_model_interleaved(): + seq_model = Sequential( + layers=[ + Flatten(name='flatten'), + Dense(256, activation='relu', name="dense256"), + Dense(128, activation='relu', name="dense128"), + Dense(64, activation='relu', name="dense64"), + Dense(32, activation='relu', name="dense32"), + Dense(10, activation='softmax', name="logits") + ], + name="multipleIPUsequential" + ) + seq_model.set_pipeline_stage_assignment([0, 0, 1, 1, 1, 1]) + + seq_model.set_pipelining_options( + schedule=ipu.ops.pipelining_ops.PipelineSchedule.Interleaved + ) + return seq_model +``` + +### Execute training + + +```python +configure_ipus(num_ipus=2) + +train( + strategy=ipu.ipu_strategy.IPUStrategy(), + model_factory=create_pipeline_sequential_model_interleaved, + train_ds=train_ds +) +``` + +## Summary and further reading + +In the course of this tutorial multiple examples of model parallelism with IPU +devices were presented. Try and change some hyperparameters or IPU count and +observe the differences. You can investigate details of execution using +the PopVision tool. + +If you execute this code with environmental variable: +```bash +POPLAR_ENGINE_OPTIONS='{"autoReport.all":"true"}' python3 pipelining.py +``` +Or set this variable inside Jupyter Notebook: +```python +import os +os.environ['POPLAR_ENGINE_OPTIONS']='{"autoReport.all":"true"}' +``` +Then you could use the generated report, which for this tutorial might look +like this: +```bash +ls . +> ./tf_report__2021-10-06__02-24-24.631__70052: +> archive.a +> debug.cbor +> framework.json +> profile.pop +> profile.pop_cache +``` + +## PopVision - reading the reports +When you open such a report, you could navigate to multiple tabs which present +different aspects of the IPU computation. You can find more information on the +[PopVision User Guide](https://docs.graphcore.ai/projects/graphcore-popvision-user-guide/en/latest/index.html) page. + +## Further reading + +For further reading about related topics please check: +- [Keras API docs](https://docs.graphcore.ai/projects/tensorflow-user-guide/en/latest/api.html#keras) +- [Upgrading from TF2.1](https://docs.graphcore.ai/projects/tensorflow-user-guide/en/latest/keras_tf2.html#porting-models-from-tensorflow-2-1) +- [Automatic Data Parallelism](https://docs.graphcore.ai/projects/tensorflow-user-guide/en/latest/keras_tf2.html#automatic-data-parallelism) diff --git a/feature_examples/tensorflow2/pipelining/pipelining.ipynb b/feature_examples/tensorflow2/pipelining/pipelining.ipynb new file mode 100644 index 00000000..d4cbd7b6 --- /dev/null +++ b/feature_examples/tensorflow2/pipelining/pipelining.ipynb @@ -0,0 +1,815 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "4cfeecb9", + "metadata": {}, + "source": [ + "Copyright (c) 2021 Graphcore Ltd. All rights reserved." + ] + }, + { + "cell_type": "markdown", + "id": "e28d2189", + "metadata": {}, + "source": [ + "# TensorFlow 2: Model Parallelism with IPU Pipelining\n", + "\n", + "In this tutorial you will train a selection of simple fully connected models\n", + "on the MNIST numeral data set and see how training can be parallelised over\n", + "multiple IPU devices." + ] + }, + { + "cell_type": "markdown", + "id": "641761fd", + "metadata": {}, + "source": [ + "## Model Parallelism With Pipelining\n", + "\n", + "With pipelining, as with sharding, the model is split into stages where each\n", + "stage can fit and be run on a single IPU. However, unlike sharding, the compute\n", + "for separate batches is overlapped so that execution of the model\n", + "is parallelised. That is, each stage (part of the original model) is executed\n", + "on its IPU while the IPUs allocated to previous stages are already working on\n", + "subsequent batches. This provides improved utilisation compared to sharding." + ] + }, + { + "cell_type": "markdown", + "id": "1c1b743d", + "metadata": {}, + "source": [ + "![Pipelining outline](static/pipelining_outline.png)" + ] + }, + { + "cell_type": "markdown", + "id": "46d942dd", + "metadata": {}, + "source": [ + "Refer to the technical note on TensorFlow Model Parallelism for full details:\n", + "[TensorFlow Model Parallelism - Pipelining]()/\n", + "\n", + "Pipelining provides a method to run larger models that is conceptually less\n", + "straightforward compared to sharding. However, it offers better utilisation of\n", + "the allocated IPU resource and, for this reason, pipelining is recommended\n", + "where performance is critical.\n", + "This tutorial focuses on how to apply pipelining in TensorFlow 1." + ] + }, + { + "cell_type": "markdown", + "id": "ab82f3d9", + "metadata": {}, + "source": [ + "### Pipeline Execution Phases\n", + "It is important to understand the key phases of pipeline execution:\n", + "\n", + "1. Ramp up - the pipeline is being filled; work is flowing into each stage \n", + "until all stages are filled (all IPUs are busy).\n", + "2. Main execution - all stages are filled and IPU utilisation is maximised.\n", + "3. Ramp down - the pipeline is being drained; work is flowing out of each stage \n", + "until all stages are empty (no IPUs are busy).\n", + "4. Weight updates - all pipeline batches have been processed, so accumulated \n", + "gradients can be processed (gradient descent) and weights updated.\n", + "Note: \n", + "* Each individual batch passed through the pipeline is called a **mini-batch**. \n", + "* Weights are updated only once a set of mini-batches has been fully processed. \n", + "* Gradients are accumulated across a set of mini-batches. \n", + "* Weight updates are applied once all the complete set of mini-batches are \n", + "processed. \n", + "\n", + "In short, pipelining enforces **gradient accumulation** where: \n", + "`effective batch size = mini-batch size * gradient accumulation count` \n", + "Performing gradient accumulation is still valid because summing the gradients \n", + "across all the examples in a batch immediately and accumulating them over \n", + "several steps are equivalent. \n", + "Increasing the gradient accumulation count has these benefits:\n", + "1. A smaller proportion of time is spent in the ramp up and ramp down - that is, \n", + "more time is spent in the main execution phase where maximum utilisation of the \n", + "IPUs is made.\n", + "2. Fewer overall weight updates are made, which saves compute time.\n", + "Here is the pipeline outline extended to show the progress of 16 mini-batches \n", + "followed by a weight update. Notice that the best utilization of the IPUs is \n", + "during the main phase and that this is sustained until the last mini-batch enters \n", + "the pipeline, following which the ramp down begins. Also notice that weight \n", + "updates are only applied once, following the ramp down (after the pipeline has \n", + "been drained of all mini-batches)." + ] + }, + { + "cell_type": "markdown", + "id": "8faeb2ab", + "metadata": {}, + "source": [ + "## Pipelining Schedules\n", + "\n", + "In this tutorial, we will create models using the Keras Model class and IPU \n", + "pipelining features. We are going to use Pipeline Stages to assign operations\n", + "to devices and to configure parallelism.\n", + "\n", + "In the following graphics, FWD and BWD refer to forward and backward passes.\n", + "\n", + "The computational stages can be interleaved on the devices in three different \n", + "ways as described by the `pipeline_schedule` parameter. By default the API \n", + "will use the `PipelineSchedule.Grouped` mode, where the forward passes are \n", + "grouped together, and the backward passes are grouped together. \n", + "![Grouped pipeline](static/grouped_pipeline.png)\n", + "\n", + "The main alternative is the `PipelineSchedule.Interleaved` mode, where the \n", + "forward and backward passes are interleaved, so that fewer activations need \n", + "to be stored. \n", + "![Interleaved pipeline](static/interleaved_pipeline.png)\n", + "\n", + "Additionally, the `PipelineSchedule.Sequential` mode, where the pipeline is \n", + "scheduled in the same way as if it were a sharded model, may be useful when \n", + "debugging your model.\n", + "![Sharded pipeline](static/sharded_pipeline.png)" + ] + }, + { + "cell_type": "markdown", + "id": "489e63c3", + "metadata": {}, + "source": [ + "## Upgrading to TensorFlow 2\n", + "\n", + "Considering that IPU computation can be enabled on both TensorFlow 1 \n", + "and Tensorflow 2 it is necessary to explain the major differences between them\n", + "and how it affects implementation of IPU specific code.\n", + "\n", + "### Device scopes\n", + "In IPU APIs for TF2, the scope context `ipu.scopes.ipu_scope(device_id)` was\n", + "replaced with a strategy context `ipu.ipu_strategy.IPUStrategy().scope()`.\n", + "\n", + "### Training loop\n", + "Since TF2 moved in the direction of eager execution, we no longer are required\n", + "to create sessions and use them as context (`with tf.Session()...`). Instead, \n", + "when using the Keras API, we can use the model instance directly and invoke\n", + "`model.compile()`, `model.fit()`, and `model.predict()` methods without\n", + "specifing explicitly the training loop. To enable IPUs, it is just required\n", + "that these invocations are executed under `IPUStrategy` scope.\n", + "\n", + "### Keras extensions to facilitate IPU computing\n", + "You can find the main documentation on the [GraphCore Keras for IPU](https://docs.graphcore.ai/projects/tensorflow-user-guide/en/latest/keras_tf2.html) page.\n", + "The framework has been extended to enable IPU devices usage and configuration.\n", + "All the new code can be found within the `tensorflow.python.ipu` package.\n", + "\n", + "### TF2 specific changes\n", + "There is a guide prepared by the TensorFlow team to conduct migration between\n", + "versions of TensorFlow library, which you can study [here](https://www.tensorflow.org/guide/migrate).\n", + "\n", + "A very exhaustive comparison of both versions can be found [here](https://www.tensorflow.org/guide/migrate/tf1_vs_tf2)." + ] + }, + { + "cell_type": "markdown", + "id": "50424306", + "metadata": {}, + "source": [ + "## Tutorial Walkthrough" + ] + }, + { + "cell_type": "markdown", + "id": "dbb3ee67", + "metadata": {}, + "source": [ + "This cell contains the constants applied to the whole tutorial. When running\n", + "this tutorial in a Jupyter Notebook, make sure all the cells below \n", + "are re-run (including this one)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "abb345a6", + "metadata": {}, + "outputs": [], + "source": [ + "# Number of samples per batch.\n", + "BATCH_SIZE = 32\n", + "\n", + "# Number of steps to run per execution. The number of batches to run for\n", + "# each TensorFlow function call. At most it would execute a full epoch.\n", + "STEPS_PER_EXECUTION = 500\n", + "\n", + "# Number of steps per epoch. The total number of steps (batches of samples)\n", + "# for one epoch to finish and starting the next one. The default `None` is\n", + "# equal to the number of samples divided by the batch size.\n", + "STEPS_PER_EPOCH = STEPS_PER_EXECUTION\n", + "\n", + "# Number of epochs\n", + "EPOCHS = 4\n", + "\n", + "# Optimizer parameters.\n", + "LEARNING_RATE = 0.01\n", + "MOMENTUM = 0.9\n", + "\n", + "# Number of devices that will be attached to this model for training and\n", + "# inference.\n", + "NUM_IPUS = 2\n", + "\n", + "# Number of steps for which the gradients should be accumulated, for each\n", + "# configured replica.\n", + "STEPS_PER_REPLICA = 4" + ] + }, + { + "cell_type": "markdown", + "id": "35f95e94", + "metadata": {}, + "source": [ + "## Importing libraries" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4f3ee720", + "metadata": {}, + "outputs": [], + "source": [ + "from typing import Optional\n", + "\n", + "import tensorflow as tf\n", + "from tensorflow import keras\n", + "from tensorflow.python import ipu\n", + "from tensorflow.keras import Model\n", + "from tensorflow.python.keras.engine.sequential import Sequential\n", + "from tensorflow.keras.layers import Flatten, Dense, Input" + ] + }, + { + "cell_type": "markdown", + "id": "143c0614", + "metadata": {}, + "source": [ + "## Dataset preparation\n", + "We need to load the dataset and perform some normalization of values. Below\n", + "you will find a helper function to use inside IPU context, which will load\n", + "the input data with labels." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c8937645", + "metadata": {}, + "outputs": [], + "source": [ + "def create_dataset(batch_size: int, repeat=True):\n", + " mnist = keras.datasets.mnist\n", + " (x_train, y_train), (x_test, y_test) = mnist.load_data()\n", + " x_train, x_test = x_train / 255.0, x_test / 255.0\n", + "\n", + " train_ds = tf.data.Dataset.from_tensor_slices((x_train, y_train))\n", + " train_ds = train_ds.shuffle(10000).batch(batch_size, drop_remainder=True)\n", + " train_ds = train_ds.map(\n", + " lambda d, l: (tf.cast(d, tf.float32), tf.cast(l, tf.float32))\n", + " )\n", + " if repeat:\n", + " return train_ds.repeat()\n", + " else:\n", + " return train_ds\n", + "\n", + "\n", + "train_ds = create_dataset(batch_size=BATCH_SIZE)" + ] + }, + { + "cell_type": "markdown", + "id": "fec9ed3b", + "metadata": {}, + "source": [ + "Initialise the IPU configuration - more details can be found in `IPUConfig`\n", + "[documentation](https://docs.graphcore.ai/projects/tensorflow-user-guide/en/latest/api.html#tensorflow.python.ipu.config.IPUConfig).\n", + "Creating new instance of `IPUConfig` and running `configure_ipu_system` always\n", + "reattaches the devices, freeing the resources if they were occupied by this\n", + "process. It is important when a Jupyter Notebook is used and the kernel\n", + "is still running, and it does not release IPU devices automatically." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "519d4298", + "metadata": {}, + "outputs": [], + "source": [ + "def configure_ipus(num_ipus: int):\n", + " ipu_configuration = ipu.config.IPUConfig()\n", + " ipu_configuration.auto_select_ipus = num_ipus\n", + " ipu_configuration.configure_ipu_system()" + ] + }, + { + "cell_type": "markdown", + "id": "d7e6ed6d", + "metadata": {}, + "source": [ + "This will be the training function reused by all the kinds of models and modes\n", + "of pipelining.\n", + "> Note: model creation needs to be processed under the `IPUStrategy().scope()`,\n", + "> hence this function accepts only the reference to the function which performs\n", + "> the model creation, not the model instance (as `model_factory` argument)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "05ece0b7", + "metadata": {}, + "outputs": [], + "source": [ + "def train(strategy,\n", + " model_factory,\n", + " train_ds,\n", + " steps_per_replica: int = STEPS_PER_REPLICA,\n", + " steps_per_execution: int = STEPS_PER_EXECUTION,\n", + " steps_per_epoch: int = STEPS_PER_EPOCH,\n", + " epochs: int = 4):\n", + "\n", + " with strategy.scope():\n", + " model = model_factory()\n", + "\n", + " model.compile(\n", + " loss=tf.keras.losses.SparseCategoricalCrossentropy(\n", + " from_logits=True\n", + " ),\n", + " optimizer=tf.keras.optimizers.SGD(\n", + " learning_rate=LEARNING_RATE,\n", + " momentum=MOMENTUM\n", + " ),\n", + " steps_per_execution=steps_per_execution\n", + " )\n", + "\n", + " if steps_per_replica:\n", + " model.set_pipelining_options(\n", + " gradient_accumulation_steps_per_replica=steps_per_replica\n", + " )\n", + "\n", + " model.fit(train_ds, steps_per_epoch=steps_per_epoch, epochs=epochs)" + ] + }, + { + "cell_type": "markdown", + "id": "20317fb1", + "metadata": {}, + "source": [ + "## Training a Keras `Functional` model on a single IPU\n", + "\n", + "Next let's define a function which returns a `Functional` Keras model. This\n", + "implementation looks very similar to a regular non-IPU Keras model definition.\n", + "\n", + "### Define the model" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5f8d2131", + "metadata": {}, + "outputs": [], + "source": [ + "def create_functional_model(batch_size=BATCH_SIZE):\n", + " input_layer = Input(\n", + " shape=(28, 28, 1),\n", + " dtype=tf.float32,\n", + " batch_size=batch_size\n", + " )\n", + " x = Flatten(name='flatten')(input_layer)\n", + " x = Dense(256, activation='relu', name=\"dense256\")(x)\n", + " x = Dense(128, activation='relu', name=\"dense128\")(x)\n", + " x = Dense(64, activation='relu', name=\"dense64\")(x)\n", + " x = Dense(32, activation='relu', name=\"dense32\")(x)\n", + " x = Dense(10, name=\"logits\")(x)\n", + "\n", + " model = Model(\n", + " inputs=input_layer,\n", + " outputs=x,\n", + " name=\"singleIPU\"\n", + " )\n", + " return model" + ] + }, + { + "cell_type": "markdown", + "id": "9ff173f5", + "metadata": {}, + "source": [ + "### Execute Training\n", + "\n", + "It is essential to create a fresh instance of `IPUConfig` and `IPUStrategy`\n", + "before training." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "efec19ff", + "metadata": { + "tags": [ + "sst_hide_output" + ] + }, + "outputs": [], + "source": [ + "configure_ipus(num_ipus=1)\n", + "\n", + "train(\n", + " strategy=ipu.ipu_strategy.IPUStrategy(),\n", + " model_factory=create_functional_model,\n", + " train_ds=train_ds\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "db6bea5a", + "metadata": {}, + "source": [ + "## Training a Keras `Sequential` model on a single IPU\n", + "\n", + "Let us organize the same layers using the `Sequential` Keras model API.\n", + "This class groups a linear stack of layers into a `tf.Keras.Model`. \n", + "Then, `Sequential` provides training and inference features on this model.\n", + "\n", + "### Define the model" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a46dee71", + "metadata": {}, + "outputs": [], + "source": [ + "def create_sequential_model():\n", + " seq_model = Sequential(\n", + " layers=[\n", + " Flatten(name='flatten'),\n", + " Dense(256, activation='relu', name=\"dense256\"),\n", + " Dense(128, activation='relu', name=\"dense128\"),\n", + " Dense(64, activation='relu', name=\"dense64\"),\n", + " Dense(32, activation='relu', name=\"dense32\"),\n", + " Dense(10, activation='softmax', name=\"logits\")\n", + " ],\n", + " name=\"singleIPU\"\n", + " )\n", + " return seq_model" + ] + }, + { + "cell_type": "markdown", + "id": "2d7378ae", + "metadata": {}, + "source": [ + "### Execute Training\n", + "\n", + "Next we refresh IPU device configuration and train again with the new model." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "41285970", + "metadata": { + "tags": [ + "sst_hide_output" + ] + }, + "outputs": [], + "source": [ + "configure_ipus(num_ipus=1)\n", + "\n", + "train(\n", + " strategy=ipu.ipu_strategy.IPUStrategy(),\n", + " model_factory=create_sequential_model,\n", + " train_ds=train_ds\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "5dfe42c7", + "metadata": {}, + "source": [ + "## Training a Keras `Functional` model with pipelining for two devices\n", + "\n", + "The documentation of Pipeline Stages can be found [here](https://docs.graphcore.ai/projects/tensorflow-user-guide/en/latest/perf_training.html#pipelined-training).\n", + "There are two ways to enable IPU pipelining for a Keras model, depending on\n", + "if the user is writing a new model or using an existing model.\n", + "\n", + "To pipeline a `Functional` model you are writing yourself, each layer call must\n", + "happen within the scope of an `ipu.keras.PipelineStage` context.\n", + "In the function below, we assign layers to two different stages.\n", + "\n", + "### Define the model" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "66fdb7f5", + "metadata": {}, + "outputs": [], + "source": [ + "def create_functional_model_with_stages():\n", + " input_layer = Input(shape=(28, 28, 1),\n", + " dtype=tf.float32,\n", + " batch_size=BATCH_SIZE)\n", + " with ipu.keras.PipelineStage(0):\n", + " x = Flatten(name='flatten')(input_layer)\n", + " x = Dense(256, activation='relu', name=\"dense256\")(x)\n", + " x = Dense(128, activation='relu', name=\"dense128\")(x)\n", + " x = Dense(64, activation='relu', name=\"dense64\")(x)\n", + "\n", + " with ipu.keras.PipelineStage(1):\n", + " x = Dense(32, activation='relu', name=\"dense32\")(x)\n", + " x = Dense(10, name=\"logits\")(x)\n", + "\n", + " model = Model(inputs=input_layer,\n", + " outputs=x,\n", + " name=\"multipleIPUfunctional\")\n", + " return model" + ] + }, + { + "cell_type": "markdown", + "id": "7a195d9e", + "metadata": {}, + "source": [ + "In case an existing `TensorFlow` model is imported, an additional API\n", + "is provided to facilitate managing Pipeline Stages assignments.\n", + "\n", + "This feature is implemented with `model.get_pipeline_stage_assignment()`\n", + "and `model.set_pipeline_stage_assignment(assignments)` where `assignments` is\n", + "the result of calling `get_pipeline_stage_assignment`.\n", + "For an example with the ResNet50 please check this [documentation](https://docs.graphcore.ai/projects/tensorflow-user-guide/en/latest/keras_tf2.html#pipelining-an-existing-functional-model)." + ] + }, + { + "cell_type": "markdown", + "id": "f57421e3", + "metadata": {}, + "source": [ + "### Execute Training\n", + "\n", + "Next we refresh IPU device configuration and train again with the new model." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "c848941e", + "metadata": { + "tags": [ + "sst_hide_output" + ] + }, + "outputs": [], + "source": [ + "configure_ipus(num_ipus=2)\n", + "\n", + "train(\n", + " strategy=ipu.ipu_strategy.IPUStrategy(),\n", + " model_factory=create_functional_model_with_stages,\n", + " train_ds=train_ds\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "2b59f3a5", + "metadata": {}, + "source": [ + "## Training a Keras `Sequential model` with pipelining\n", + "\n", + "Next we will write a function to create the model using the Keras `Sequential`\n", + "class as we did above, but with explicit mapping of layers to stages through \n", + "`set_pipeline_stage_assignment`, which accepts a list of integers as\n", + "a parameter. This function sets the pipeline stage assignment for all\n", + "the invocations of all the layers (excluding input layers) in the model\n", + "which is used to create a model-parallel execution when calling `fit()`,\n", + "`evaluate()` and `predict()`. \n", + "\n", + ">This pipelining stage assignment is ignored when using the `call()` function\n", + ">on this model.\n", + "\n", + "Below you will see pipeline stage assignment like this: `[0, 0, 0, 0, 1, 1])`. \n", + "This means that first four layers of `Sequential` model are assigned to \n", + "the first stage, and the remaining layers to the second stage.\n", + "\n", + "This list has to be has to be of the same length as the total number\n", + "of invocations of all the layers in this model, excluding input layers.\n", + "\n", + "### Define the model" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e1f125ec", + "metadata": {}, + "outputs": [], + "source": [ + "def create_pipeline_sequential_model():\n", + " seq_model = Sequential(\n", + " layers=[\n", + " Flatten(name='flatten'),\n", + " Dense(256, activation='relu', name=\"dense256\"),\n", + " Dense(128, activation='relu', name=\"dense128\"),\n", + " Dense(64, activation='relu', name=\"dense64\"),\n", + " Dense(32, activation='relu', name=\"dense32\"),\n", + " Dense(10, activation='softmax', name=\"logits\")\n", + " ],\n", + " name=\"multipleIPUsequential\"\n", + " )\n", + " seq_model.set_pipeline_stage_assignment([0, 0, 0, 0, 1, 1])\n", + "\n", + " return seq_model" + ] + }, + { + "cell_type": "markdown", + "id": "de905953", + "metadata": {}, + "source": [ + "### Execute Training\n", + "\n", + "Next we refresh IPU device configuration and train again with the new model." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b7d6e5b3", + "metadata": { + "tags": [ + "sst_hide_output" + ] + }, + "outputs": [], + "source": [ + "configure_ipus(num_ipus=2)\n", + "\n", + "train(\n", + " strategy=ipu.ipu_strategy.IPUStrategy(),\n", + " model_factory=create_pipeline_sequential_model,\n", + " train_ds=train_ds\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "ef39ec01", + "metadata": {}, + "source": [ + "## Other `PipelineSchedule` settings\n", + "\n", + "Next we can reuse the previous example and apply a different scheduling mode.\n", + "The modes can be characterized in detail like so (quoting the docstring of \n", + "`PipelineSchedule`):\n", + "\n", + "- `Grouped`: This groups the forward passes on multiple IPUs. This requires\n", + " more memory since activations need to be stored until the backward stages run\n", + " together. However, since forward passes tend to be smaller than backward \n", + " passes, `Grouped` tends to improve the speed of the execution, as different \n", + " IPUs don't spend so much time waiting for each other.\n", + "\n", + "- `Interleaved`: This schedules the backward passes whenever the forward passes\n", + " have just generated some activations. Consequently fewer activations are \n", + " required to be stored between the forward and backward pipeline stages, so \n", + " less memory is required. However, since forward and backward stages tend to \n", + " be very different in terms of execution cycles, the overall performance \n", + " of the pipeline tends to be slower.\n", + "\n", + "- `Sequential`: This is a debug mode, where the pipeline is scheduled in\n", + " the same way as if it were a sharded model." + ] + }, + { + "cell_type": "markdown", + "id": "33b96326", + "metadata": {}, + "source": [ + "### Defining the model with `Interleaved` schedule\n", + "\n", + "The mode `Grouped` was used in the previous example, as it is the default\n", + "setting. In this next example we will use the `Interleaved` mode." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "fd7e752f", + "metadata": {}, + "outputs": [], + "source": [ + "def create_pipeline_sequential_model_interleaved():\n", + " seq_model = Sequential(\n", + " layers=[\n", + " Flatten(name='flatten'),\n", + " Dense(256, activation='relu', name=\"dense256\"),\n", + " Dense(128, activation='relu', name=\"dense128\"),\n", + " Dense(64, activation='relu', name=\"dense64\"),\n", + " Dense(32, activation='relu', name=\"dense32\"),\n", + " Dense(10, activation='softmax', name=\"logits\")\n", + " ],\n", + " name=\"multipleIPUsequential\"\n", + " )\n", + " seq_model.set_pipeline_stage_assignment([0, 0, 1, 1, 1, 1])\n", + "\n", + " seq_model.set_pipelining_options(\n", + " schedule=ipu.ops.pipelining_ops.PipelineSchedule.Interleaved\n", + " )\n", + " return seq_model" + ] + }, + { + "cell_type": "markdown", + "id": "8cc202da", + "metadata": {}, + "source": [ + "### Execute training" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "188eb794", + "metadata": { + "tags": [ + "sst_hide_output" + ] + }, + "outputs": [], + "source": [ + "configure_ipus(num_ipus=2)\n", + "\n", + "train(\n", + " strategy=ipu.ipu_strategy.IPUStrategy(),\n", + " model_factory=create_pipeline_sequential_model_interleaved,\n", + " train_ds=train_ds\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "37f1cd1d", + "metadata": {}, + "source": [ + "## Summary and further reading\n", + "\n", + "In the course of this tutorial multiple examples of model parallelism with IPU\n", + "devices were presented. Try and change some hyperparameters or IPU count and\n", + "observe the differences. You can investigate details of execution using \n", + "the PopVision tool.\n", + "\n", + "If you execute this code with environmental variable:\n", + "```bash\n", + "POPLAR_ENGINE_OPTIONS='{\"autoReport.all\":\"true\"}' python3 pipelining.py\n", + "```\n", + "Or set this variable inside Jupyter Notebook:\n", + "```python\n", + "import os\n", + "os.environ['POPLAR_ENGINE_OPTIONS']='{\"autoReport.all\":\"true\"}'\n", + "```\n", + "Then you could use the generated report, which for this tutorial might look\n", + "like this:\n", + "```bash\n", + "ls .\n", + "> ./tf_report__2021-10-06__02-24-24.631__70052:\n", + "> archive.a\n", + "> debug.cbor\n", + "> framework.json\n", + "> profile.pop\n", + "> profile.pop_cache\n", + "```" + ] + }, + { + "cell_type": "markdown", + "id": "b3850ffa", + "metadata": {}, + "source": [ + "## PopVision - reading the reports\n", + "When you open such a report, you could navigate to multiple tabs which present\n", + "different aspects of the IPU computation. You can find more information on the\n", + "[PopVision User Guide](https://docs.graphcore.ai/projects/graphcore-popvision-user-guide/en/latest/index.html) page.\n", + "\n", + "## Further reading\n", + "\n", + "For further reading about related topics please check:\n", + "- [Keras API docs](https://docs.graphcore.ai/projects/tensorflow-user-guide/en/latest/api.html#keras)\n", + "- [Upgrading from TF2.1](https://docs.graphcore.ai/projects/tensorflow-user-guide/en/latest/keras_tf2.html#porting-models-from-tensorflow-2-1)\n", + "- [Automatic Data Parallelism](https://docs.graphcore.ai/projects/tensorflow-user-guide/en/latest/keras_tf2.html#automatic-data-parallelism)" + ] + } + ], + "metadata": {}, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/feature_examples/tensorflow2/pipelining/pipelining.py b/feature_examples/tensorflow2/pipelining/pipelining.py new file mode 100644 index 00000000..ec52dfea --- /dev/null +++ b/feature_examples/tensorflow2/pipelining/pipelining.py @@ -0,0 +1,521 @@ +""" +Copyright (c) 2021 Graphcore Ltd. All rights reserved. +""" +""" +# TensorFlow 2: Model Parallelism with IPU Pipelining + +In this tutorial you will train a selection of simple fully connected models +on the MNIST numeral data set and see how training can be parallelised over +multiple IPU devices. +""" +""" +## Model Parallelism With Pipelining + +With pipelining, as with sharding, the model is split into stages where each +stage can fit and be run on a single IPU. However, unlike sharding, the compute +for separate batches is overlapped so that execution of the model +is parallelised. That is, each stage (part of the original model) is executed +on its IPU while the IPUs allocated to previous stages are already working on +subsequent batches. This provides improved utilisation compared to sharding. +""" +""" +![Pipelining outline](static/pipelining_outline.png) +""" +""" +Refer to the technical note on TensorFlow Model Parallelism for full details: +[TensorFlow Model Parallelism - Pipelining]()/ + +Pipelining provides a method to run larger models that is conceptually less +straightforward compared to sharding. However, it offers better utilisation of +the allocated IPU resource and, for this reason, pipelining is recommended +where performance is critical. +This tutorial focuses on how to apply pipelining in TensorFlow 1. +""" +""" +### Pipeline Execution Phases +It is important to understand the key phases of pipeline execution: + +1. Ramp up - the pipeline is being filled; work is flowing into each stage +until all stages are filled (all IPUs are busy). +2. Main execution - all stages are filled and IPU utilisation is maximised. +3. Ramp down - the pipeline is being drained; work is flowing out of each stage +until all stages are empty (no IPUs are busy). +4. Weight updates - all pipeline batches have been processed, so accumulated +gradients can be processed (gradient descent) and weights updated. +Note: +* Each individual batch passed through the pipeline is called a **mini-batch**. +* Weights are updated only once a set of mini-batches has been fully processed. +* Gradients are accumulated across a set of mini-batches. +* Weight updates are applied once all the complete set of mini-batches are +processed. + +In short, pipelining enforces **gradient accumulation** where: +`effective batch size = mini-batch size * gradient accumulation count` +Performing gradient accumulation is still valid because summing the gradients +across all the examples in a batch immediately and accumulating them over +several steps are equivalent. +Increasing the gradient accumulation count has these benefits: +1. A smaller proportion of time is spent in the ramp up and ramp down - that is, +more time is spent in the main execution phase where maximum utilisation of the +IPUs is made. +2. Fewer overall weight updates are made, which saves compute time. +Here is the pipeline outline extended to show the progress of 16 mini-batches +followed by a weight update. Notice that the best utilization of the IPUs is +during the main phase and that this is sustained until the last mini-batch enters +the pipeline, following which the ramp down begins. Also notice that weight +updates are only applied once, following the ramp down (after the pipeline has +been drained of all mini-batches). +""" +""" +## Pipelining Schedules + +In this tutorial, we will create models using the Keras Model class and IPU +pipelining features. We are going to use Pipeline Stages to assign operations +to devices and to configure parallelism. + +In the following graphics, FWD and BWD refer to forward and backward passes. + +The computational stages can be interleaved on the devices in three different +ways as described by the `pipeline_schedule` parameter. By default the API +will use the `PipelineSchedule.Grouped` mode, where the forward passes are +grouped together, and the backward passes are grouped together. +![Grouped pipeline](static/grouped_pipeline.png) + +The main alternative is the `PipelineSchedule.Interleaved` mode, where the +forward and backward passes are interleaved, so that fewer activations need +to be stored. +![Interleaved pipeline](static/interleaved_pipeline.png) + +Additionally, the `PipelineSchedule.Sequential` mode, where the pipeline is +scheduled in the same way as if it were a sharded model, may be useful when +debugging your model. +![Sharded pipeline](static/sharded_pipeline.png) +""" +""" +## Upgrading to TensorFlow 2 + +Considering that IPU computation can be enabled on both TensorFlow 1 +and Tensorflow 2 it is necessary to explain the major differences between them +and how it affects implementation of IPU specific code. + +### Device scopes +In IPU APIs for TF2, the scope context `ipu.scopes.ipu_scope(device_id)` was +replaced with a strategy context `ipu.ipu_strategy.IPUStrategy().scope()`. + +### Training loop +Since TF2 moved in the direction of eager execution, we no longer are required +to create sessions and use them as context (`with tf.Session()...`). Instead, +when using the Keras API, we can use the model instance directly and invoke +`model.compile()`, `model.fit()`, and `model.predict()` methods without +specifing explicitly the training loop. To enable IPUs, it is just required +that these invocations are executed under `IPUStrategy` scope. + +### Keras extensions to facilitate IPU computing +You can find the main documentation on the [GraphCore Keras for IPU](https://docs.graphcore.ai/projects/tensorflow-user-guide/en/latest/keras_tf2.html) page. +The framework has been extended to enable IPU devices usage and configuration. +All the new code can be found within the `tensorflow.python.ipu` package. + +### TF2 specific changes +There is a guide prepared by the TensorFlow team to conduct migration between +versions of TensorFlow library, which you can study [here](https://www.tensorflow.org/guide/migrate). + +A very exhaustive comparison of both versions can be found [here](https://www.tensorflow.org/guide/migrate/tf1_vs_tf2). +""" +""" +## Tutorial Walkthrough +""" +""" +This cell contains the constants applied to the whole tutorial. When running +this tutorial in a Jupyter Notebook, make sure all the cells below +are re-run (including this one). +""" +# Number of samples per batch. +BATCH_SIZE = 32 + +# Number of steps to run per execution. The number of batches to run for +# each TensorFlow function call. At most it would execute a full epoch. +STEPS_PER_EXECUTION = 500 + +# Number of steps per epoch. The total number of steps (batches of samples) +# for one epoch to finish and starting the next one. The default `None` is +# equal to the number of samples divided by the batch size. +STEPS_PER_EPOCH = STEPS_PER_EXECUTION + +# Number of epochs +EPOCHS = 4 + +# Optimizer parameters. +LEARNING_RATE = 0.01 +MOMENTUM = 0.9 + +# Number of devices that will be attached to this model for training and +# inference. +NUM_IPUS = 2 + +# Number of steps for which the gradients should be accumulated, for each +# configured replica. +STEPS_PER_REPLICA = 4 +""" +## Importing libraries +""" +from typing import Optional + +import tensorflow as tf +from tensorflow import keras +from tensorflow.python import ipu +from tensorflow.keras import Model +from tensorflow.python.keras.engine.sequential import Sequential +from tensorflow.keras.layers import Flatten, Dense, Input +""" +## Dataset preparation +We need to load the dataset and perform some normalization of values. Below +you will find a helper function to use inside IPU context, which will load +the input data with labels. +""" +def create_dataset(batch_size: int, repeat=True): + mnist = keras.datasets.mnist + (x_train, y_train), (x_test, y_test) = mnist.load_data() + x_train, x_test = x_train / 255.0, x_test / 255.0 + + train_ds = tf.data.Dataset.from_tensor_slices((x_train, y_train)) + train_ds = train_ds.shuffle(10000).batch(batch_size, drop_remainder=True) + train_ds = train_ds.map( + lambda d, l: (tf.cast(d, tf.float32), tf.cast(l, tf.float32)) + ) + if repeat: + return train_ds.repeat() + else: + return train_ds + + +train_ds = create_dataset(batch_size=BATCH_SIZE) + +""" +Initialise the IPU configuration - more details can be found in `IPUConfig` +[documentation](https://docs.graphcore.ai/projects/tensorflow-user-guide/en/latest/api.html#tensorflow.python.ipu.config.IPUConfig). +Creating new instance of `IPUConfig` and running `configure_ipu_system` always +reattaches the devices, freeing the resources if they were occupied by this +process. It is important when a Jupyter Notebook is used and the kernel +is still running, and it does not release IPU devices automatically. +""" +def configure_ipus(num_ipus: int): + ipu_configuration = ipu.config.IPUConfig() + ipu_configuration.auto_select_ipus = num_ipus + ipu_configuration.configure_ipu_system() +""" +This will be the training function reused by all the kinds of models and modes +of pipelining. +> Note: model creation needs to be processed under the `IPUStrategy().scope()`, +> hence this function accepts only the reference to the function which performs +> the model creation, not the model instance (as `model_factory` argument). +""" +def train(strategy, + model_factory, + train_ds, + steps_per_replica: int = STEPS_PER_REPLICA, + steps_per_execution: int = STEPS_PER_EXECUTION, + steps_per_epoch: int = STEPS_PER_EPOCH, + epochs: int = 4): + + with strategy.scope(): + model = model_factory() + + model.compile( + loss=tf.keras.losses.SparseCategoricalCrossentropy( + from_logits=True + ), + optimizer=tf.keras.optimizers.SGD( + learning_rate=LEARNING_RATE, + momentum=MOMENTUM + ), + steps_per_execution=steps_per_execution + ) + + if steps_per_replica: + model.set_pipelining_options( + gradient_accumulation_steps_per_replica=steps_per_replica + ) + + model.fit(train_ds, steps_per_epoch=steps_per_epoch, epochs=epochs) + +""" +## Training a Keras `Functional` model on a single IPU + +Next let's define a function which returns a `Functional` Keras model. This +implementation looks very similar to a regular non-IPU Keras model definition. + +### Define the model +""" +def create_functional_model(batch_size=BATCH_SIZE): + input_layer = Input( + shape=(28, 28, 1), + dtype=tf.float32, + batch_size=batch_size + ) + x = Flatten(name='flatten')(input_layer) + x = Dense(256, activation='relu', name="dense256")(x) + x = Dense(128, activation='relu', name="dense128")(x) + x = Dense(64, activation='relu', name="dense64")(x) + x = Dense(32, activation='relu', name="dense32")(x) + x = Dense(10, name="logits")(x) + + model = Model( + inputs=input_layer, + outputs=x, + name="singleIPU" + ) + return model + +""" +### Execute Training + +It is essential to create a fresh instance of `IPUConfig` and `IPUStrategy` +before training. +""" +configure_ipus(num_ipus=1) + +train( + strategy=ipu.ipu_strategy.IPUStrategy(), + model_factory=create_functional_model, + train_ds=train_ds +) +# sst_hide_output +""" +## Training a Keras `Sequential` model on a single IPU + +Let us organize the same layers using the `Sequential` Keras model API. +This class groups a linear stack of layers into a `tf.Keras.Model`. +Then, `Sequential` provides training and inference features on this model. + +### Define the model +""" +def create_sequential_model(): + seq_model = Sequential( + layers=[ + Flatten(name='flatten'), + Dense(256, activation='relu', name="dense256"), + Dense(128, activation='relu', name="dense128"), + Dense(64, activation='relu', name="dense64"), + Dense(32, activation='relu', name="dense32"), + Dense(10, activation='softmax', name="logits") + ], + name="singleIPU" + ) + return seq_model +""" +### Execute Training + +Next we refresh IPU device configuration and train again with the new model. +""" +configure_ipus(num_ipus=1) + +train( + strategy=ipu.ipu_strategy.IPUStrategy(), + model_factory=create_sequential_model, + train_ds=train_ds +) +# sst_hide_output +""" +## Training a Keras `Functional` model with pipelining for two devices + +The documentation of Pipeline Stages can be found [here](https://docs.graphcore.ai/projects/tensorflow-user-guide/en/latest/perf_training.html#pipelined-training). +There are two ways to enable IPU pipelining for a Keras model, depending on +if the user is writing a new model or using an existing model. + +To pipeline a `Functional` model you are writing yourself, each layer call must +happen within the scope of an `ipu.keras.PipelineStage` context. +In the function below, we assign layers to two different stages. + +### Define the model +""" +def create_functional_model_with_stages(): + input_layer = Input(shape=(28, 28, 1), + dtype=tf.float32, + batch_size=BATCH_SIZE) + with ipu.keras.PipelineStage(0): + x = Flatten(name='flatten')(input_layer) + x = Dense(256, activation='relu', name="dense256")(x) + x = Dense(128, activation='relu', name="dense128")(x) + x = Dense(64, activation='relu', name="dense64")(x) + + with ipu.keras.PipelineStage(1): + x = Dense(32, activation='relu', name="dense32")(x) + x = Dense(10, name="logits")(x) + + model = Model(inputs=input_layer, + outputs=x, + name="multipleIPUfunctional") + return model +""" +In case an existing `TensorFlow` model is imported, an additional API +is provided to facilitate managing Pipeline Stages assignments. + +This feature is implemented with `model.get_pipeline_stage_assignment()` +and `model.set_pipeline_stage_assignment(assignments)` where `assignments` is +the result of calling `get_pipeline_stage_assignment`. +For an example with the ResNet50 please check this [documentation](https://docs.graphcore.ai/projects/tensorflow-user-guide/en/latest/keras_tf2.html#pipelining-an-existing-functional-model). +""" +""" +### Execute Training + +Next we refresh IPU device configuration and train again with the new model. +""" +configure_ipus(num_ipus=2) + +train( + strategy=ipu.ipu_strategy.IPUStrategy(), + model_factory=create_functional_model_with_stages, + train_ds=train_ds +) +# sst_hide_output +""" +## Training a Keras `Sequential model` with pipelining + +Next we will write a function to create the model using the Keras `Sequential` +class as we did above, but with explicit mapping of layers to stages through +`set_pipeline_stage_assignment`, which accepts a list of integers as +a parameter. This function sets the pipeline stage assignment for all +the invocations of all the layers (excluding input layers) in the model +which is used to create a model-parallel execution when calling `fit()`, +`evaluate()` and `predict()`. + +>This pipelining stage assignment is ignored when using the `call()` function +>on this model. + +Below you will see pipeline stage assignment like this: `[0, 0, 0, 0, 1, 1])`. +This means that first four layers of `Sequential` model are assigned to +the first stage, and the remaining layers to the second stage. + +This list has to be has to be of the same length as the total number +of invocations of all the layers in this model, excluding input layers. + +### Define the model +""" +def create_pipeline_sequential_model(): + seq_model = Sequential( + layers=[ + Flatten(name='flatten'), + Dense(256, activation='relu', name="dense256"), + Dense(128, activation='relu', name="dense128"), + Dense(64, activation='relu', name="dense64"), + Dense(32, activation='relu', name="dense32"), + Dense(10, activation='softmax', name="logits") + ], + name="multipleIPUsequential" + ) + seq_model.set_pipeline_stage_assignment([0, 0, 0, 0, 1, 1]) + + return seq_model +""" +### Execute Training + +Next we refresh IPU device configuration and train again with the new model. +""" +configure_ipus(num_ipus=2) + +train( + strategy=ipu.ipu_strategy.IPUStrategy(), + model_factory=create_pipeline_sequential_model, + train_ds=train_ds +) +# sst_hide_output +""" +## Other `PipelineSchedule` settings + +Next we can reuse the previous example and apply a different scheduling mode. +The modes can be characterized in detail like so (quoting the docstring of +`PipelineSchedule`): + +- `Grouped`: This groups the forward passes on multiple IPUs. This requires + more memory since activations need to be stored until the backward stages run + together. However, since forward passes tend to be smaller than backward + passes, `Grouped` tends to improve the speed of the execution, as different + IPUs don't spend so much time waiting for each other. + +- `Interleaved`: This schedules the backward passes whenever the forward passes + have just generated some activations. Consequently fewer activations are + required to be stored between the forward and backward pipeline stages, so + less memory is required. However, since forward and backward stages tend to + be very different in terms of execution cycles, the overall performance + of the pipeline tends to be slower. + +- `Sequential`: This is a debug mode, where the pipeline is scheduled in + the same way as if it were a sharded model. +""" +""" +### Defining the model with `Interleaved` schedule + +The mode `Grouped` was used in the previous example, as it is the default +setting. In this next example we will use the `Interleaved` mode. +""" +def create_pipeline_sequential_model_interleaved(): + seq_model = Sequential( + layers=[ + Flatten(name='flatten'), + Dense(256, activation='relu', name="dense256"), + Dense(128, activation='relu', name="dense128"), + Dense(64, activation='relu', name="dense64"), + Dense(32, activation='relu', name="dense32"), + Dense(10, activation='softmax', name="logits") + ], + name="multipleIPUsequential" + ) + seq_model.set_pipeline_stage_assignment([0, 0, 1, 1, 1, 1]) + + seq_model.set_pipelining_options( + schedule=ipu.ops.pipelining_ops.PipelineSchedule.Interleaved + ) + return seq_model +""" +### Execute training +""" +configure_ipus(num_ipus=2) + +train( + strategy=ipu.ipu_strategy.IPUStrategy(), + model_factory=create_pipeline_sequential_model_interleaved, + train_ds=train_ds +) +# sst_hide_output +""" +## Summary and further reading + +In the course of this tutorial multiple examples of model parallelism with IPU +devices were presented. Try and change some hyperparameters or IPU count and +observe the differences. You can investigate details of execution using +the PopVision tool. + +If you execute this code with environmental variable: +```bash +POPLAR_ENGINE_OPTIONS='{"autoReport.all":"true"}' python3 pipelining.py +``` +Or set this variable inside Jupyter Notebook: +```python +import os +os.environ['POPLAR_ENGINE_OPTIONS']='{"autoReport.all":"true"}' +``` +Then you could use the generated report, which for this tutorial might look +like this: +```bash +ls . +> ./tf_report__2021-10-06__02-24-24.631__70052: +> archive.a +> debug.cbor +> framework.json +> profile.pop +> profile.pop_cache +``` +""" +""" +## PopVision - reading the reports +When you open such a report, you could navigate to multiple tabs which present +different aspects of the IPU computation. You can find more information on the +[PopVision User Guide](https://docs.graphcore.ai/projects/graphcore-popvision-user-guide/en/latest/index.html) page. + +## Further reading + +For further reading about related topics please check: +- [Keras API docs](https://docs.graphcore.ai/projects/tensorflow-user-guide/en/latest/api.html#keras) +- [Upgrading from TF2.1](https://docs.graphcore.ai/projects/tensorflow-user-guide/en/latest/keras_tf2.html#porting-models-from-tensorflow-2-1) +- [Automatic Data Parallelism](https://docs.graphcore.ai/projects/tensorflow-user-guide/en/latest/keras_tf2.html#automatic-data-parallelism) +""" diff --git a/feature_examples/tensorflow2/pipelining/pipelining_code_only.py b/feature_examples/tensorflow2/pipelining/pipelining_code_only.py new file mode 100644 index 00000000..6fe62832 --- /dev/null +++ b/feature_examples/tensorflow2/pipelining/pipelining_code_only.py @@ -0,0 +1,216 @@ +# Copyright (c) 2021 Graphcore Ltd. All rights reserved. +# Number of samples per batch. +BATCH_SIZE = 32 + +# Number of steps to run per execution. The number of batches to run for +# each TensorFlow function call. At most it would execute a full epoch. +STEPS_PER_EXECUTION = 500 + +# Number of steps per epoch. The total number of steps (batches of samples) +# for one epoch to finish and starting the next one. The default `None` is +# equal to the number of samples divided by the batch size. +STEPS_PER_EPOCH = STEPS_PER_EXECUTION + +# Number of epochs +EPOCHS = 4 + +# Optimizer parameters. +LEARNING_RATE = 0.01 +MOMENTUM = 0.9 + +# Number of devices that will be attached to this model for training and +# inference. +NUM_IPUS = 2 + +# Number of steps for which the gradients should be accumulated, for each +# configured replica. +STEPS_PER_REPLICA = 4 + +from typing import Optional + +import tensorflow as tf +from tensorflow import keras +from tensorflow.python import ipu +from tensorflow.keras import Model +from tensorflow.python.keras.engine.sequential import Sequential +from tensorflow.keras.layers import Flatten, Dense, Input + +def create_dataset(batch_size: int, repeat=True): + mnist = keras.datasets.mnist + (x_train, y_train), (x_test, y_test) = mnist.load_data() + x_train, x_test = x_train / 255.0, x_test / 255.0 + + train_ds = tf.data.Dataset.from_tensor_slices((x_train, y_train)) + train_ds = train_ds.shuffle(10000).batch(batch_size, drop_remainder=True) + train_ds = train_ds.map( + lambda d, l: (tf.cast(d, tf.float32), tf.cast(l, tf.float32)) + ) + if repeat: + return train_ds.repeat() + else: + return train_ds + + +train_ds = create_dataset(batch_size=BATCH_SIZE) + +def configure_ipus(num_ipus: int): + ipu_configuration = ipu.config.IPUConfig() + ipu_configuration.auto_select_ipus = num_ipus + ipu_configuration.configure_ipu_system() + +def train(strategy, + model_factory, + train_ds, + steps_per_replica: int = STEPS_PER_REPLICA, + steps_per_execution: int = STEPS_PER_EXECUTION, + steps_per_epoch: int = STEPS_PER_EPOCH, + epochs: int = 4): + + with strategy.scope(): + model = model_factory() + + model.compile( + loss=tf.keras.losses.SparseCategoricalCrossentropy( + from_logits=True + ), + optimizer=tf.keras.optimizers.SGD( + learning_rate=LEARNING_RATE, + momentum=MOMENTUM + ), + steps_per_execution=steps_per_execution + ) + + if steps_per_replica: + model.set_pipelining_options( + gradient_accumulation_steps_per_replica=steps_per_replica + ) + + model.fit(train_ds, steps_per_epoch=steps_per_epoch, epochs=epochs) + +def create_functional_model(batch_size=BATCH_SIZE): + input_layer = Input( + shape=(28, 28, 1), + dtype=tf.float32, + batch_size=batch_size + ) + x = Flatten(name='flatten')(input_layer) + x = Dense(256, activation='relu', name="dense256")(x) + x = Dense(128, activation='relu', name="dense128")(x) + x = Dense(64, activation='relu', name="dense64")(x) + x = Dense(32, activation='relu', name="dense32")(x) + x = Dense(10, name="logits")(x) + + model = Model( + inputs=input_layer, + outputs=x, + name="singleIPU" + ) + return model + +configure_ipus(num_ipus=1) + +train( + strategy=ipu.ipu_strategy.IPUStrategy(), + model_factory=create_functional_model, + train_ds=train_ds +) + +def create_sequential_model(): + seq_model = Sequential( + layers=[ + Flatten(name='flatten'), + Dense(256, activation='relu', name="dense256"), + Dense(128, activation='relu', name="dense128"), + Dense(64, activation='relu', name="dense64"), + Dense(32, activation='relu', name="dense32"), + Dense(10, activation='softmax', name="logits") + ], + name="singleIPU" + ) + return seq_model + +configure_ipus(num_ipus=1) + +train( + strategy=ipu.ipu_strategy.IPUStrategy(), + model_factory=create_sequential_model, + train_ds=train_ds +) + +def create_functional_model_with_stages(): + input_layer = Input(shape=(28, 28, 1), + dtype=tf.float32, + batch_size=BATCH_SIZE) + with ipu.keras.PipelineStage(0): + x = Flatten(name='flatten')(input_layer) + x = Dense(256, activation='relu', name="dense256")(x) + x = Dense(128, activation='relu', name="dense128")(x) + x = Dense(64, activation='relu', name="dense64")(x) + + with ipu.keras.PipelineStage(1): + x = Dense(32, activation='relu', name="dense32")(x) + x = Dense(10, name="logits")(x) + + model = Model(inputs=input_layer, + outputs=x, + name="multipleIPUfunctional") + return model + +configure_ipus(num_ipus=2) + +train( + strategy=ipu.ipu_strategy.IPUStrategy(), + model_factory=create_functional_model_with_stages, + train_ds=train_ds +) + +def create_pipeline_sequential_model(): + seq_model = Sequential( + layers=[ + Flatten(name='flatten'), + Dense(256, activation='relu', name="dense256"), + Dense(128, activation='relu', name="dense128"), + Dense(64, activation='relu', name="dense64"), + Dense(32, activation='relu', name="dense32"), + Dense(10, activation='softmax', name="logits") + ], + name="multipleIPUsequential" + ) + seq_model.set_pipeline_stage_assignment([0, 0, 0, 0, 1, 1]) + + return seq_model + +configure_ipus(num_ipus=2) + +train( + strategy=ipu.ipu_strategy.IPUStrategy(), + model_factory=create_pipeline_sequential_model, + train_ds=train_ds +) + +def create_pipeline_sequential_model_interleaved(): + seq_model = Sequential( + layers=[ + Flatten(name='flatten'), + Dense(256, activation='relu', name="dense256"), + Dense(128, activation='relu', name="dense128"), + Dense(64, activation='relu', name="dense64"), + Dense(32, activation='relu', name="dense32"), + Dense(10, activation='softmax', name="logits") + ], + name="multipleIPUsequential" + ) + seq_model.set_pipeline_stage_assignment([0, 0, 1, 1, 1, 1]) + + seq_model.set_pipelining_options( + schedule=ipu.ops.pipelining_ops.PipelineSchedule.Interleaved + ) + return seq_model + +configure_ipus(num_ipus=2) + +train( + strategy=ipu.ipu_strategy.IPUStrategy(), + model_factory=create_pipeline_sequential_model_interleaved, + train_ds=train_ds +) diff --git a/feature_examples/tensorflow2/pipelining/static/grouped_pipeline.png b/feature_examples/tensorflow2/pipelining/static/grouped_pipeline.png new file mode 100644 index 00000000..980e4471 Binary files /dev/null and b/feature_examples/tensorflow2/pipelining/static/grouped_pipeline.png differ diff --git a/feature_examples/tensorflow2/pipelining/static/interleaved_pipeline.png b/feature_examples/tensorflow2/pipelining/static/interleaved_pipeline.png new file mode 100644 index 00000000..4d3c61da Binary files /dev/null and b/feature_examples/tensorflow2/pipelining/static/interleaved_pipeline.png differ diff --git a/feature_examples/tensorflow2/pipelining/static/pipelining_outline.png b/feature_examples/tensorflow2/pipelining/static/pipelining_outline.png new file mode 100644 index 00000000..4ae39127 Binary files /dev/null and b/feature_examples/tensorflow2/pipelining/static/pipelining_outline.png differ diff --git a/feature_examples/tensorflow2/pipelining/static/sharded_pipeline.png b/feature_examples/tensorflow2/pipelining/static/sharded_pipeline.png new file mode 100644 index 00000000..c964c514 Binary files /dev/null and b/feature_examples/tensorflow2/pipelining/static/sharded_pipeline.png differ