Usage Guide#

This usage guide introduces DeepSpeed and guides you through how to train a PyTorch model with the DeepSpeed engine. To implement DeepSpeedTrial, you need to overwrite specific functions corresponding to common training aspects. It is helpful to work from a skeleton trial to keep track of what is required, as the following example template shows:

from typing import Any, Dict, Iterator, Optional,  Union
from attrdict import AttrDict

import torch
import deepspeed

import determined.pytorch import DataLoader, TorchData
from determined.pytorch.deepspeed import DeepSpeedTrial, DeepSpeedTrialContext

class MyTrial(DeepSpeedTrial):
    def __init__(self, context: DeepSpeedTrialContext) -> None:
        self.context = context
        self.args = AttrDict(self.context.get_hparams())

    def build_training_data_loader(self) -> DataLoader:
        return DataLoader()

    def build_validation_data_loader(self) -> DataLoader:
        return DataLoader()

    def train_batch(
        self,
        dataloader_iter: Optional[Iterator[TorchData]],
        epoch_idx: int,
        batch_idx: int,
    ) -> Union[torch.Tensor, Dict[str, Any]]:
        return {}

    def evaluate_batch(
        self, dataloader_iter: Optional[Iterator[TorchData]], batch_idx: int
    ) -> Dict[str, Any]:
        return {}

The DeepSpeed API organizes training routines into common steps like creating the data loaders and training and evaluating the model. The provided template shows the function signatures, including the expected return types, for these methods.

Because DeepSpeed is built on top of PyTorch, there are many similarities between the API for PyTorchTrial and DeepSpeedTrial. The following steps show you how to implement each of the DeepSpeedTrial methods beginning with training objects initialization.

Step 1- Configure and Initialize Training Objects#

DeepSpeed training initialization consists of two steps:

  1. Initialize the distributed backend.

  2. Create the DeepSpeed model engine.

Refer to the DeepSpeed Getting Started guide for more information.

Outside of Determined, this is typically done in the following way:

deepspeed.init_distributed(dist_backend=args.backend)
net = ...
model_engine, optimizer, lr_scheduler, _ = deepspeed.initialize(args=args, net=net, ...)

DeepSpeedTrial automatically initializes the distributed training backend so all you need to do is initialize the model engine and other training objects in the DeepSpeedTrial __init__() method.

Configuration#

DeepSpeed behavior during training is configured by passing arguments when initializing the model engine. This can be done in two ways:

  • Using a configuration file specified as an argument with a field named deepspeed_config.

  • Using a dictionary, which is passed in directly when initializing a model engine.

Both approaches can be used in combination with the Determined experiment configuration. See the DeepSpeed documentation for more information on what can be specified in the configuration.

If you want to use a DeepSpeed configuration file, the hyperparameters section can be used as arguments to pass to deepspeed.initialize. For example, if the DeepSpeed configuration file is named ds_config.json, the hyperparameter section of the Determined experiment configuration is:

hyperparameters:
  deepspeed_config: ds_config.json
  ...

If you want to overwrite some values in an existing DeepSpeed configuration file, use overwrite_deepspeed_config() and an experiment configuration similar to:

hyperparameters:
  deepspeed_config: ds_config.json
  overwrite_deepspeed_args:
      train_batch_size: 16
      optimizer:
        params:
          lr: 0.005
  ...

If you want to use a dictionary directly, specify a DeepSpeed configuration dictionary in the hyperparameters section:

hyperparameters:
  optimizer:
    type: Adam
    params:
      betas:
        - 0.8
        - 0.999
      eps: 1.0e-08
      lr: 0.001
      weight_decay: 3.0e-07
  train_batch_size: 16
  zero_optimization:
    stage: 0
    allgather_bucket_size: 50000000
    allgather_partitions: true
    contiguous_gradients: true
    cpu_offload: false
    overlap_comm: true
    reduce_bucket_size: 50000000
    reduce_scatter: true

Initialization#

After configuration, you can initialize the model engine in the DeepSpeedTrial. The following example corresponds to the experiment configuration above, with a field in the hyperparameters section named overwrite_deepspeed_args.

class MyTrial(DeepSpeedTrial):
    def __init__(self, context: DeepSpeedTrialContext) -> None:
        self.context = context
        self.args = AttrDict(self.context.get_hparams())

        model = Net(self.args)
        ds_config = overwrite_deepspeed_config(
            self.args.deepspeed_config, self.args.get("overwrite_deepspeed_args", {})
        )
        parameters = filter(lambda p: p.requires_grad, model.parameters())
        model_engine, __, __, __ = deepspeed.initialize(
            model=model, model_parameters=parameters, config=ds_config
        )
        self.model_engine = self.context.wrap_model_engine(model_engine)

After the model engine is initialized, you need to register it with Determined by calling wrap_model_engine(). Differing from PyTorchTrial, you do not need to register the optimizer or learning rate scheduler with Determined because both are attributes of the model engine.

If you want to use pipeline parallelism with a given model, pass layers of the model for partitioning to the DeepSpeed PipelineModule before creating the pipeline model engine:

net = ...
net = deepspeed.PipelineModule(
    layers=get_layers(net),
    loss_fn=torch.nn.CrossEntropyLoss(),
    num_stages=args.pipeline_parallel_size,
    ...,
)

Step 2 - Load Data#

The next step is to build the data loader used for training and validation. The same process is used to download the data for PyTorchTrial. Building the data loaders is also similar, except for the batch size specification for the returned data loaders, which differs because the DeepSpeed model engines automatically handle gradient aggregation.

Automatic gradient aggregation in DeepSpeed is specified in configuration fields:

  • train_batch_size

  • train_micro_batch_size

  • gradient_accumulation_steps

which are related as follows:

train_batch_size = train_micro_batch_size * gradient_accumulation_steps * data_parallel_size,

where data_parallel_size is the number of model replicas across all GPUs used during training. Therefore, a single train batch consists of multiple micro batches, specified by the gradient_accumulation_steps argument. Given a model parallelization scheme, you can specify two fields and the third can be inferred.

The DeepSpeed model engines assume the model is processing micro batches and automatically handle stepping the optimizer and learning rate scheduler every gradient_accumulation_steps micro batches. This means that the build_training_data_loader should return batches of size train_micro_batch_size_per_gpu. In most cases, build_validation_data_loader also returns batches of size train_micro_batch_size_per_gpu.

If you want exact epoch boundaries to be respected, the number of micro batches in the training data loader should be divisible by gradient_accumulation_steps.

If you are using pipeline parallelism, the validation data loader needs to have at least gradient_accumulation_steps worth of batches.

Step 3 - Training and Evaluation#

This step covers the training and evaluation routine for the standard data parallel model engine and the pipeline parallel engine available in DeepSpeed.

After you create the DeepSpeed model engine and data loaders, define the training and evaluation routines for the DeepSpeedTrial. Differing from PyTorchTrial, train_batch() and evaluate_batch() take an iterator over the corresponding data loader built from build_training_data_loader() and build_validation_dataloader() instead of a batch.

Data Parallel Training#

For data parallel training, only, the training and evaluation routines are:

def train_batch(
    self,
    dataloader_iter: Optional[Iterator[TorchData]],
    epoch_idx: int,
    batch_idx: int,
) -> Union[torch.Tensor, Dict[str, Any]]:
    inputs = self.context.to_device(next(dataloader_iter))
    loss = self.model_engine(inputs)
    self.model_engine.backward(loss)
    self.model_engine.step()
    return {"loss": loss}


def evaluate_batch(
    self, dataloader_iter: Optional[Iterator[TorchData]], batch_idx: int
) -> Dict[str, Any]:
    inputs = self.context.to_device(next(dataloader_iter))
    loss = self.model_engine(inputs)
    return {"loss": loss}

You need to manually get a batch from the iterator and move it to the GPU using the provided to_device() helper function, which knows the GPU assigned to a given distributed training process.

Pipeline Parallel Training#

When using pipeline parallelism, the forward and backward steps during training are combined into a single function call because DeepSpeed automatically interleaves multiple micro batches for processing in a single training step. In this case, there is no need to manually get a batch from the dataloader_iter iterator because the pipeline model engine requests it as needed while interleaving micro batches:

def train_batch(
    self,
    dataloader_iter: Optional[Iterator[TorchData]],
    epoch_idx: int,
    batch_idx: int,
) -> Union[torch.Tensor, Dict[str, Any]]:
    loss = self.model_engine.train_batch()
    return {"loss": loss}


def evaluate_batch(
    self, dataloader_iter: Optional[Iterator[TorchData]], batch_idx: int
) -> Dict[str, Any]:
    loss = self.model_engine.eval_batch()
    return {"loss": loss}

Known DeepSpeed Constraints#

Some DeepSpeed constraints are inherited concerning supported feature compatibility:

  • Pipeline parallelism can only be combined with Zero Redundancy Optimizer (ZeRO) stage 1.

  • Parameter offloading is only supported with ZeRO stage 3.

  • Optimizer offloading is only supported with ZeRO stage 2 and 3.