Implementing Distributed Training#
Connectivity#
Multi-machine training necessitates that all machines are capable of establishing a direct
connection. Firewall rules or network configurations might exist that prevent machines in your
cluster from communicating with each other. You can verify that agent machines can connect with each
other outside of Determined by using tools such as ping
or netcat
.
More rarely, if agents have multiple network interfaces and some of them are not routable, Determined may pick one of those interfaces rather than one that allows one agent to contact another. In this case, it is possible to explicitly set the network interface used for distributed training, as described in Basic Setup: Step 7 - Configure the Cluster.
Configuration#
Slots Per Trial#
The resources.slots_per_trial
field in the experiment configuration controls the number of GPUs used to train a single trial.
By default, this field is set to a value of 1
, which disables distributed training. If you
increase the slots_per_trial
value, this will automatically enable multi-GPU training. Bear in
mind that these GPUs can either be located on a single machine or distributed across multiple
machines. The experiment configuration merely dictates the number of GPUs to be used in the training
process, while the Determined job scheduler decides whether to schedule the task on a single agent
or multiple agents. Whether the job scheduler schedules the task on a single agent or multiple
agents depends on the machines in the cluster and other active workloads.
Multi-machine parallelism allows you to further parallelize training across more GPUs. To use this
feature, set slots_per_trial
to a multiple of the total number of GPUs on an agent machine. For
example, if your resource pool consists of multiple 8-GPU agent machines, valid slots_per_trial
values would be 16, 24, 32, and so on.
In the following configuration, trials will use the combined resources of multiple machines to train a model:
resources:
slots_per_trial: 16 # Two 8-GPU agent machines will be used in a trial
For distributed multi-machine training, Determined will automatically detect a common network
interface that is shared by the agent machines. If your cluster has multiple common network
interfaces, we advise specifying the fastest one in Step 7 - Configure the Cluster under
task_container_defaults.dtrain_network_interface
.
When the slots_per_trial
field is set, the per-slot (i.e., per-GPU) batch size is set to
global_batch_size // slots_per_trial
. The per-slot and global batch sizes can be accessed
through the context using context.get_per_slot_batch_size()
and context.get_global_batch_size()
, respectively. If global_batch_size
is not
evenly divisible by slots_per_trial
, the remainder is dropped.
When scheduling a multi-machine distributed training job, Determined prefers that the job use all of the slots (GPUs) on an agent. The section on Scheduling Behavior describes this preference in more detail.
Note
You might have existing tasks that are running on a single machine that are preventing your
multi-GPU trials from acquiring sufficient GPUs. To alleviate this, you may want to consider
adjusting slots_per_trial
or terminating existing tasks to free up slots in your cluster.
Global Batch Size#
You can reduce computational overhead by setting the global_batch_size
to the largest batch size
that fits into a single GPU multiplied times the number of slots.
Note
The global_batch_size
field will be automatically respected by the Trial APIs. To use this
hyperparameter with the Core API, you’ll need to reference global_batch_size
explicitly and
organize your code to respect its value.
During distributed training, the global_batch_size
specified in the experiment
configuration file is partitioned across slots_per_trial
GPUs.
The per-GPU batch size is set to: global_batch_size // slots_per_trial
. Recall that if
global_batch_size
is not evenly divisible by slots_per_trial
, the remainder is dropped. For
convenience, the per-GPU batch size can be accessed via the Trial API, using
context.get_per_slot_batch_size
.
For improved performance, weak-scaling is recommended. Weak-scaling means proportionally
increasing your global_batch_size
with slots_per_trial
. For example, you might change
global_batch_size
and slots_per_trial
from 32 and 1 to 128 and 4, respectively. You can
visit the blog post, Scaling deep learning workloads, to learn more about weak
scaling.
Note that adjusting global_batch_size
can impact your model convergence, which in turn can
affect your training and/or testing accuracy. You might need to adjust model hyperparameters, such
as the learning rate, or consider using a different optimizer when training with larger batch sizes.
Advanced Optimizations#
The following optimizations can further reduce training time.
optimizations.aggregation_frequency
controls how many batches are evaluated before exchanging gradients. This optimization increases your effective batch size toaggregation_frequency
*global_batch_size
.optimizations.aggregation_frequency
is useful in scenarios where directly increasing the batch size is not possible (for example, due to GPU memory limitations).optimizations.gradient_compression
reduces the time it takes to transfer gradients between GPUs.optimizations.auto_tune_tensor_fusion
automatically identifies the optimal message size during gradient transfers, thereby reducing communication overhead.optimizations.average_training_metrics
averages the training metrics across GPUs at the end of every training workload, a process that requires communication.average_training_metrics
is set totrue
by default and typically does not have a significant impact on training performance. However, if you have a very smallscheduling_unit
, disabling this option could improve performance. When disabled, only the training metrics from the chief GPU are reported. This impacts results shown in the WebUI and TensorBoard but does not influence model behavior or hyperparameter search.
To learn more about these optimizations, visit the optimizations section in the Experiment Configuration Reference.
If you’re not seeing improved performance with distributed training, your model might have a performance bottleneck that can’t be directly alleviated by using multiple GPUs, such as with data loading. You’re encouraged to experiment with a synthetic dataset in order to verify the performance of multi-GPU training.
Warning
Multi-machine distributed training is designed to maximize performance by training with all the resources of a machine. This can lead to situations where an experiment is created but never becomes active, such as when the number of GPUs requested does not factor into (divide evenly) the machines available, or when another experiment is already using some GPUs on a machine.
If an experiment does not become active after a minute or so, please ensure that
slots_per_trial
is a multiple of the number of GPUs available on a machine. You can also use
the CLI command det task list
to check if any other tasks are using GPUs and preventing your
experiment from using all the GPUs on a machine.
Downloading Data#
When performing distributed training, Determined automatically creates one process for each GPU that is being used for training. Each of these processes attempts to download training and/or validation data, so it is important to ensure that concurrent data downloads do not conflict with one another.
One way to achieve this is to include a unique identifier in the local file system path where the
downloaded data is stored. A convenient identifier is the rank
of the current process. The
process rank
is automatically assigned by Determined and is unique among all trial processes.
You can accomplish this by leveraging the self.context.distributed.get_rank()
function.
The following example demonstrates how to accomplish this when downloading data from S3. In this
example, the S3 bucket name is configured via a data.bucket
field in the experiment
configuration file.
import boto3
import os
def download_data_from_s3(self):
s3_bucket = self.context.get_data_config()["bucket"]
download_directory = f"/tmp/data-rank{self.context.distributed.get_rank()}"
data_file = "data.csv"
s3 = boto3.client("s3")
os.makedirs(download_directory, exist_ok=True)
filepath = os.path.join(download_directory, data_file)
if not os.path.exists(filepath):
s3.download_file(s3_bucket, data_file, filepath)
return download_directory
Scheduling Behavior#
The Determined master schedules distributed training jobs automatically, ensuring that all of the
compute resources required for a job are available before the job is launched. Here are some
important details regarding slots_per_trial
and the scheduler’s behavior:
If
slots_per_trial
is less than or equal to the number of slots on a single agent, Determined considers scheduling multiple distributed training jobs on a single agent. This approach is designed to improve utilization and to allow multiple small training jobs to run on a single agent. For example, an agent with eight GPUs could be assigned two 4-GPU jobs or four 2-GPU jobs.If
slots_per_trial
is greater than the number of slots on a single agent, Determined schedules the distributed training job onto multiple agents. To ensure good performance and utilize the full network bandwidth of each machine and to minimize inter-machine networking, Determined prefers utilizing all of the agent GPUs on a machine. For example, if all the agents in your cluster have eight GPUs each, you should submit jobs withslots_per_trial
set to a multiple of eight, such as 8, 16, or 24.
Note
The scheduler can find fits for distributed jobs against agents of different sizes. This is
configured via the allowing_heterogeneous_fits parameter. This
parameter defaults to false
. By default Determined requires that the job use all of the slots
(GPUs) on an agent.
Warning
If these scheduling constraints for multi-machine distributed training are not satisfied, and you
have not configured the allowing_heterogeneous_fits parameter,
distributed training jobs are not scheduled and wait indefinitely. For example, if every agent in
the cluster has eight GPUs, a job with slots_per_trial
set to 12
is never scheduled.
If a multi-GPU experiment does not become active after a minute or so, please ensure that
slots_per_trial
is set so that it can be scheduled within these constraints. You can also use
the CLI command det task list
to check if any other tasks are using GPUs and preventing your
experiment from using all the GPUs on a machine.
Distributed Inference#
PyTorch users have the option to use the existing distributed training workflow with PyTorchTrial to accelerate their inference workloads. This workflow is not yet officially supported, therefore, users must specify certain training-specific artifacts that are not used for inference. To run a distributed batch inference job, create a new PyTorchTrial and follow these steps:
Load the trained model and build the inference dataset using
build_validation_data_loader()
.Specify the inference step using
evaluate_batch()
orevaluate_full_dataset()
.Register a dummy
optimizer
.Specify a
build_training_data_loader()
that returns a dummy dataloader.Specify a no-op
train_batch()
that returns an empty map of metrics.
Once the new PyTorchTrial object is created, use the experiment configuration to distribute inference in the same way as training. cifar10_pytorch_inference serves as an example of distributed batch inference.