PyTorch API Reference

User Guide

PyTorch API

determined.pytorch.PyTorchTrial

class determined.pytorch.PyTorchTrial(context: determined.pytorch._pytorch_context.PyTorchTrialContext)

PyTorch trials are created by subclassing this abstract class.

We can do the following things in this trial class:

  • Define models, optimizers, and LR schedulers.

    In the __init__() method, initialize models, optimizers, and LR schedulers and wrap them with wrap_model, wrap_optimizer, wrap_lr_scheduler provided by PyTorchTrialContext.

  • Run forward and backward passes.

    In train_batch(), call backward and step_optimizer provided by PyTorchTrialContext. We support arbitrary numbers of models, optimizers, and LR schedulers and arbitrary orders of running forward and backward passes.

  • Configure automatic mixed precision.

    In the __init__() method, call configure_apex_amp provided by PyTorchTrialContext.

  • Clip gradients.

    In train_batch(), pass a function into step_optimizer(optimizer, clip_grads=...) provided by PyTorchTrialContext.

trial_controller_class

alias of determined.pytorch._pytorch_trial.PyTorchTrialController

trial_context_class

alias of determined.pytorch._pytorch_context.PyTorchTrialContext

abstract __init__(context: determined.pytorch._pytorch_context.PyTorchTrialContext) None

Initializes a trial using the provided context. The general steps are:

  1. Initialize model(s) and wrap them with context.wrap_model.

  2. Initialize optimizer(s) and wrap them with context.wrap_optimizer.

  3. Initialize learning rate schedulers and wrap them with context.wrap_lr_scheduler.

  4. If desired, wrap models and optimizer with context.configure_apex_amp to use apex.amp for automatic mixed precision.

  5. Define custom loss function and metric functions.

Warning

You may see metrics for trials that are paused and later continued that are significantly different from trials that are not paused if some of your models, optimizers, and learning rate schedulers are not wrapped. The reason is that the model’s state may not be restored accurately or completely from the checkpoint, which is saved to a checkpoint and then later loaded into the trial during resumed training. When using PyTorch, this can sometimes happen if the PyTorch API is not used correctly.

Here is a code example.

self.context = context

self.a = self.context.wrap_model(MyModelA())
self.b = self.context.wrap_model(MyModelB())
self.opt1 = self.context.wrap_optimizer(torch.optm.Adam(self.a))
self.opt2 = self.context.wrap_optimizer(torch.optm.Adam(self.b))

(self.a, self.b), (self.opt1, self.opt2) = self.context.configure_apex_amp(
    models=[self.a, self.b],
    optimizers=[self.opt1, self.opt2],
    num_losses=2,
)

self.lrs1 = self.context.wrap_lr_scheduler(
    lr_scheduler=LambdaLR(self.opt1, lr_lambda=lambda epoch: 0.95 ** epoch),
    step_mode=LRScheduler.StepMode.STEP_EVERY_EPOCH,
))
abstract train_batch(batch: Union[Dict[str, torch.Tensor], Sequence[torch.Tensor], torch.Tensor], epoch_idx: int, batch_idx: int) Union[torch.Tensor, Dict[str, Any]]

Train on one batch.

Users should implement this function by doing the following things:

  1. Run forward passes on the models.

  2. Calculate the gradients with the losses with context.backward.

  3. Call an optimization step for the optimizers with context.step_optimizer. You can clip gradients by specifying the argument clip_grads.

  4. Step LR schedulers if using manual step mode.

  5. Return training metrics in a dictionary.

Here is a code example.

# Assume two models, two optimizers, and two LR schedulers were initialized
# in ``__init__``.

# Calculate the losses using the models.
loss1 = self.model1(batch)
loss2 = self.model2(batch)

# Run backward passes on losses and step optimizers. These can happen
# in arbitrary orders.
self.context.backward(loss1)
self.context.backward(loss2)
self.context.step_optimizer(
    self.opt1,
    clip_grads=lambda params: torch.nn.utils.clip_grad_norm_(params, 0.0001),
)
self.context.step_optimizer(self.opt2)

# Step the learning rate.
self.lrs1.step()
self.lrs2.step()

return {"loss1": loss1, "loss2": loss2}
Parameters
  • batch (Dict[str, torch.Tensor], Sequence[torch.Tensor], torch.Tensor) – batch of data for training.

  • epoch_idx (integer) – index of the current epoch among all the batches processed per device (slot) since the start of training.

  • batch_idx (integer) – index of the current batch among all the epochs processed per device (slot) since the start of training.

Returns

training metrics to return.

Return type

torch.Tensor or Dict[str, Any]

abstract build_training_data_loader() determined.pytorch._data.DataLoader

Defines the data loader to use during training.

Must return an instance of determined.pytorch.DataLoader.

abstract build_validation_data_loader() determined.pytorch._data.DataLoader

Defines the data loader to use during validation.

Must return an instance of determined.pytorch.DataLoader.

build_callbacks() Dict[str, determined.pytorch._callback.PyTorchCallback]

Defines a dictionary of string names to callbacks to be used during training and/or validation.

The string name will be used as the key to save and restore callback state for any callback that defines load_state_dict() and state_dict().

evaluate_batch(batch: Union[Dict[str, torch.Tensor], Sequence[torch.Tensor], torch.Tensor], batch_idx: int) Dict[str, Any]

Calculate validation metrics for a batch and return them as a dictionary mapping metric names to metric values. Per-batch validation metrics are reduced (aggregated) to produce a single set of validation metrics for the entire validation set (see evaluation_reducer()).

There are two ways to specify evaluation metrics. Either override evaluate_batch() or evaluate_full_dataset(). While evaluate_full_dataset() is more flexible, evaluate_batch() should be preferred, since it can be parallelized in distributed environments, whereas evaluate_full_dataset() cannot. Only one of evaluate_full_dataset() and evaluate_batch() should be overridden by a trial.

The metrics returned from this function must be JSON-serializable.

Parameters
  • batch (Dict[str, torch.Tensor], Sequence[torch.Tensor], torch.Tensor) – batch of data for evaluating.

  • batch_idx (integer) – index of the current batch among all the epochs processed per device (slot) since the start of training.

evaluation_reducer() Union[determined.pytorch._reducer.Reducer, Dict[str, determined.pytorch._reducer.Reducer]]

Return a reducer for all evaluation metrics, or a dict mapping metric names to individual reducers. Defaults to determined.pytorch.Reducer.AVG.

evaluate_full_dataset(data_loader: torch.utils.data.dataloader.DataLoader) Dict[str, Any]

Calculate validation metrics on the entire validation dataset and return them as a dictionary mapping metric names to reduced metric values (i.e., each returned metric is the average or sum of that metric across the entire validation set).

This validation cannot be distributed and is performed on a single device, even when multiple devices (slots) are used for training. Only one of evaluate_full_dataset() and evaluate_batch() should be overridden by a trial.

The metrics returned from this function must be JSON-serializable.

Parameters

data_loader (torch.utils.data.DataLoader) – data loader for evaluating.

get_batch_length(batch: Any) int

Count the number of records in a given batch.

Override this method when you are using custom batch types, as produced when iterating over the DataLoader. For example, when using pytorch_geometric:

# Extra imports:
from determined.pytorch import DataLoader
from torch_geometric.data.dataloader import Collater

# Trial methods:
def build_training_data_loader(self):
    return DataLoader(
        self.train_subset,
        batch_size=self.context.get_per_slot_batch_size(),
        collate_fn=Collater([], []),
    )

def get_batch_length(self, batch):
    # `batch` is `torch_geometric.data.batch.Batch`.
    return batch.num_graphs
Parameters

batch (Any) – input training or validation data batch object.

determined.pytorch.PyTorchTrialContext

class determined.pytorch.PyTorchTrialContext(*args: Any, **kwargs: Any)

Bases: determined._trial_context.TrialContext, determined.pytorch._reducer._PyTorchReducerContext

Contains runtime information for any Determined workflow that uses the PyTorch API.

With this class, users can do the following things:

  1. Wrap PyTorch models, optimizers, and LR schedulers with their Determined-compatible counterparts using wrap_model(), wrap_optimizer(), wrap_lr_scheduler(), respectively. The Determined-compatible objects are capable of transparent distributed training, checkpointing and exporting, mixed-precision training, and gradient aggregation.

  2. Configure apex amp by calling configure_apex_amp() (optional).

  3. Calculate the gradients with backward() on a specified loss.

  4. Run an optimization step with step_optimizer().

  5. Functionalities inherited from determined.TrialContext, including getting the runtime information and properly handling training data in distributed training.

backward(loss: torch.Tensor, gradient: Optional[torch.Tensor] = None, retain_graph: bool = False, create_graph: bool = False) None

Compute the gradient of current tensor w.r.t. graph leaves.

The arguments are used in the same way as torch.Tensor.backward. See https://pytorch.org/docs/1.4.0/_modules/torch/tensor.html#Tensor.backward for details.

Warning

When using distributed training, we don’t support manual gradient accumulation. That means the gradient on each parameter can only be calculated once on each batch. If a parameter is associated with multiple losses, you can either choose to call backward'' on only one of those losses, or you can set the ``require_grads flag of a parameter or module to False to avoid manual gradient accumulation on that parameter. However, you can do gradient accumulation across batches by setting optimizations.aggregation_frequency in the experiment configuration to be greater than 1.

Parameters
  • gradient (Tensor or None) – Gradient w.r.t. the tensor. If it is a tensor, it will be automatically converted to a Tensor that does not require grad unless create_graph is True. None values can be specified for scalar Tensors or ones that don’t require grad. If a None value would be acceptable then this argument is optional.

  • retain_graph (bool, optional) – If False, the graph used to compute the grads will be freed. Note that in nearly all cases setting this option to True is not needed and often can be worked around in a much more efficient way. Defaults to the value of create_graph.

  • create_graph (bool, optional) – If True, graph of the derivative will be constructed, allowing to compute higher order derivative products. Defaults to False.

configure_apex_amp(models: Union[torch.nn.modules.module.Module, List[torch.nn.modules.module.Module]], optimizers: Union[torch.optim.optimizer.Optimizer, List[torch.optim.optimizer.Optimizer]], enabled: Optional[bool] = True, opt_level: Optional[str] = 'O1', cast_model_type: Optional[torch.dtype] = None, patch_torch_functions: Optional[bool] = None, keep_batchnorm_fp32: Optional[Union[bool, str]] = None, master_weights: Optional[bool] = None, loss_scale: Optional[Union[float, str]] = None, cast_model_outputs: Optional[torch.dtype] = None, num_losses: Optional[int] = 1, verbosity: Optional[int] = 1, min_loss_scale: Optional[float] = None, max_loss_scale: Optional[float] = 16777216.0) Tuple

Configure automatic mixed precision for your models and optimizers using NVIDIA’s Apex PyTorch extension. Note that details for apex.amp are handled automatically within Determined after this call.

This function must be called after you have finished constructing your models and optimizers with wrap_model() and wrap_optimizer().

This function has the same arguments as apex.amp.initialize.

Warning

When using distributed training and automatic mixed precision, we only support num_losses=1 and calling backward on the loss once.

Parameters
  • models (torch.nn.Module or list of torch.nn.Module s) – Model(s) to modify/cast.

  • optimizers (torch.optim.Optimizer or list of torch.optim.Optimizer s) – Optimizers to modify/cast. REQUIRED for training.

  • enabled (bool, optional, default=True) – If False, renders all Amp calls no-ops, so your script should run as if Amp were not present.

  • opt_level (str, optional, default="O1") – Pure or mixed precision optimization level. Accepted values are “O0”, “O1”, “O2”, and “O3”, explained in detail above.

  • cast_model_type (torch.dtype, optional, default=None) – Optional property override, see above.

  • patch_torch_functions (bool, optional, default=None) – Optional property override.

  • keep_batchnorm_fp32 (bool or str, optional, default=None) – Optional property override. If passed as a string, must be the string “True” or “False”.

  • master_weights (bool, optional, default=None) – Optional property override.

  • loss_scale (float or str, optional, default=None) – Optional property override. If passed as a string, must be a string representing a number, e.g., “128.0”, or the string “dynamic”.

  • cast_model_outputs (torch.dtype, optional, default=None) – Option to ensure that the outputs of your model is always cast to a particular type regardless of opt_level.

  • num_losses (int, optional, default=1) – Option to tell Amp in advance how many losses/backward passes you plan to use. When used in conjunction with the loss_id argument to amp.scale_loss, enables Amp to use a different loss scale per loss/backward pass, which can improve stability. If num_losses is left to 1, Amp will still support multiple losses/backward passes, but use a single global loss scale for all of them.

  • verbosity (int, default=1) – Set to 0 to suppress Amp-related output.

  • min_loss_scale (float, default=None) – Sets a floor for the loss scale values that can be chosen by dynamic loss scaling. The default value of None means that no floor is imposed. If dynamic loss scaling is not used, min_loss_scale is ignored.

  • max_loss_scale (float, default=2.**24) – Sets a ceiling for the loss scale values that can be chosen by dynamic loss scaling. If dynamic loss scaling is not used, max_loss_scale is ignored.

Returns

Model(s) and optimizer(s) modified according to the opt_level. If optimizers args were lists, the corresponding return value will also be a list.

current_train_batch() int

Current global batch index

classmethod from_config(config: Dict[str, Any]) determined._trial_context.TrialContext

Create a context object suitable for debugging outside of Determined.

An example for a subclass of PyTorchTrial:

config = { ... }
context = det.pytorch.PyTorchTrialContext.from_config(config)
my_trial = MyPyTorchTrial(context)

train_ds = my_trial.build_training_data_loader()
for epoch_idx in range(3):
    for batch_idx, batch in enumerate(train_ds):
        metrics = my_trial.train_batch(batch, epoch_idx, batch_idx)
        ...

An example for a subclass of TFKerasTrial:

config = { ... }
context = det.keras.TFKerasTrialContext.from_config(config)
my_trial = tf_keras_one_var_model.OneVarTrial(context)

model = my_trial.build_model()
model.fit(my_trial.build_training_data_loader())
eval_metrics = model.evaluate(my_trial.build_validation_data_loader())
Parameters

config – An experiment config file, in dictionary form.

get_data_config() Dict[str, Any]

Return the data configuration.

get_experiment_config() Dict[str, Any]

Return the experiment configuration.

get_experiment_id() int

Return the experiment ID of the current trial.

get_global_batch_size() int

Return the global batch size.

get_hparam(name: str) Any

Return the current value of the hyperparameter with the given name.

get_hparams() Dict[str, Any]

Return a dictionary of hyperparameter names to values.

get_per_slot_batch_size() int

Return the per-slot batch size. When a model is trained with a single GPU, this is equal to the global batch size. When multi-GPU training is used, this is equal to the global batch size divided by the number of GPUs used to train the model.

get_stop_requested() bool

Return whether a trial stoppage has been requested.

get_tensorboard_path() pathlib.Path

Get the path where files for consumption by TensorBoard should be written

get_trial_id() int

Return the trial ID of the current trial.

is_epoch_end() bool

Returns true if the current batch is the last batch of the epoch.

Warning

Not accurate for variable size epochs.

is_epoch_start() bool

Returns true if the current batch is the first batch of the epoch.

Warning

Not accurate for variable size epochs.

set_profiler(*args: List[str], **kwargs: Any) None

Sets a torch profiler instance on the trial context to be called in _pytorch_trial when training.

set_stop_requested(stop_requested: bool) None

Set a flag to request a trial stoppage. When this flag is set to True, we finish the step, checkpoint, then exit.

step_optimizer(optimizer: torch.optim.optimizer.Optimizer, clip_grads: Optional[Callable[[Iterator], None]] = None, auto_zero_grads: bool = True, scaler: Optional[Any] = None) None

Perform a single optimization step.

This function must be called once for each optimizer. However, the order of different optimizers’ steps can be specified by calling this function in different orders. Also, gradient accumulation across iterations is performed by the Determined training loop by setting the experiment configuration field optimizations.aggregation_frequency.

Here is a code example:

def clip_grads(params):
    torch.nn.utils.clip_grad_norm_(params, 0.0001),

self.context.step_optimizer(self.opt1, clip_grads)
Parameters
  • optimizer (torch.optim.Optimizer) – Which optimizer should be stepped.

  • clip_grads (a function, optional) – This function should have one argument for parameters in order to clip the gradients.

  • auto_zero_grads (bool, optional) – Automatically zero out gradients automatically after stepping the optimizer. If false, you need to call optimizer.zero_grad() manually. Note that if optimizations.aggregation_frequency is greater than 1, auto_zero_grads must be true.

  • scaler (torch.cuda.amp.GradScaler, optional) – The scaler to use for stepping the optimizer. This should be unset if not using AMP, and is necessary if wrap_scaler() was called directly.

to_device(data: Union[Dict[str, Union[numpy.ndarray, torch.Tensor]], Sequence[Union[numpy.ndarray, torch.Tensor]], numpy.ndarray, torch.Tensor]) Union[Dict[str, torch.Tensor], Sequence[torch.Tensor], torch.Tensor]

Map generated data to the device allocated by the Determined cluster.

All the data in the data loader and the models are automatically moved to the allocated device. This method aims at providing a function for the data generated on the fly.

wrap_lr_scheduler(lr_scheduler: torch.optim.lr_scheduler._LRScheduler, step_mode: determined.pytorch._lr_scheduler.LRScheduler.StepMode, frequency: int = 1) torch.optim.lr_scheduler._LRScheduler

Returns a wrapped LR scheduler.

The LR scheduler must use an optimizer wrapped by wrap_optimizer(). If apex.amp is in use, the optimizer must also have been configured with configure_apex_amp().

wrap_model(model: torch.nn.modules.module.Module) torch.nn.modules.module.Module

Returns a wrapped model.

wrap_optimizer(optimizer: torch.optim.optimizer.Optimizer, backward_passes_per_step: int = 1) torch.optim.optimizer.Optimizer

Returns a wrapped optimizer.

The optimizer must use the models wrapped by wrap_model(). This function creates a horovod.DistributedOptimizer if using parallel/distributed training.

backward_passes_per_step can be used to specify how many gradient aggregation steps will be performed in a single train_batch call per optimizer step. In most cases, this will just be the default value 1. However, this advanced functionality can be used to support training loops like the one shown below:

def train_batch(
    self, batch: TorchData, epoch_idx: int, batch_idx: int
) -> Dict[str, torch.Tensor]:
    data, labels = batch
    output = self.model(data)
    loss1 = output['loss1']
    loss2 = output['loss2']
    self.context.backward(loss1)
    self.context.backward(loss2)
    self.context.step_optimizer(self.optimizer, backward_passes_per_step=2)
    return {"loss1": loss1, "loss2": loss2}
wrap_reducer(reducer: Union[Callable, determined.pytorch._reducer.MetricReducer], name: Optional[str] = None, for_training: bool = True, for_validation: bool = True) determined.pytorch._reducer.MetricReducer

Register a custom reducer that will calculate a metric properly, even with distributed training.

During distributed training and evaluation, many types of metrics must be calculated globally, rather than calculating the metric on each shard of the dataset and averaged or summed. For example, an accurate ROC AUC for dataset cannot be derived from the individual ROC AUC metrics calculated on by each worker.

Determined solves this problem by offering fully customizable metric reducers which are distributed-aware. These are registered by calling context.wrap_reducer() and are updated by the user during train_batch() or evaluate_batch().

Parameters
  • reducer (Union[Callable, pytorch.MetricReducer]) – Either a reducer function or a pytorch.MetricReducer. See below for more details.

  • name – (Optional[str] = None): Either a string name to associate with the metric returned by the reducer, or None to indicate the metric will return a dict mapping string names to metric values. This allows for a single reducer to return many metrics, such as for a per-class mean IOU calculation. Note that if name is a string, the returned metric must NOT be a dict-type metric.

  • for_training – (bool = True): Indicate that the reducer should be used for training workloads.

  • for_validation – (bool = True): Indicate that the reducer should be used for validation workloads.

Return Value:
pytorch.MetricReducer:

If reducer was a function, the returned MetricReducer will have a single user-facing method like def update(value: Any) -> None that you should call during train_batch or evaluate_batch. Otherwise, the return value will just be the reducer that was passed in.

Reducer functions: the simple API

If the reducer parameter is a function, it must have the following properties:

  • It accepts a single parameter, which will be a flat list of all inputs the users pass when they call .update() on the object returned by wrap_reducer(). See the code example below for more details.

  • It returns either a single (non-dict) metric or a dictionary mapping names to metrics, as described above.

The primary motivation for passing a function as the reducer is simplicity. Metrics from all batches will be buffered in memory and passed over the network before they are reduced all at once. This introduces some overhead, but it is likely unnoticeable for scalar metrics or on validation datasets of small or medium size. This single function strategy can be useful for quick prototyping or for calculating metrics that are difficult or impossible to calculate incrementally.

For example, ROC AUC could be properly calculated by passing a small wrapper function calling sklearn.metrics.roc_auc_score:

# Custom reducer function.
def roc_auc_reducer(values):
    # values will be a flat list of all inputs to
    # .update(), which in this code example are
    # tuples of (y_true, y_score).  We reshape
    # that list into two separate lists:
    y_trues, y_scores = zip(*values)

    # Then we return a metric value:
    return sklearn.metrics.roc_auc_score(
        np.array(y_trues), np.array(y_scores)
    )

class MyPyTorchTrial(PyTorchTrial):
    def __init__(self, context):
        self.roc_auc = context.wrap_reducer(
            roc_auc_reducer, name="roc_auc"
        )
        ...

    def evaluate_batch(self, batch):
        ...
        # Function-based reducers are updated with .update().
        # The roc_auc_reducer function will get a list of all
        # inputs that we pass in here:
        self.roc_auc.update((y_true, y_score))

        # The "roc_auc" metric will be included in the final
        # metrics after the workload has completed; no need
        # to return it here.  If that is your only metric,
        # just return an empty dict.
        return {}

MetricReducer objects: the advanced API

The primary motivation for passing a det.pytorch.MetricReducer as the reducer is performance. det.pytorch.MetricReducer allows the user more control in how values are stored and exposes a per_slot_reduce() call which lets users minimize the cost of the network communication before the final cross_slot_reduce().

An additional reason for using the det.pytorch.MetricReducer is for flexibility of the update mechanism, which is completely user-defined when subclassing MetricReducer.

For the full details and a code example, see: MetricReducer.

wrap_scaler(scaler: Any) Any

Prepares to use automatic mixed precision through PyTorch’s native AMP API. The returned scaler should be passed to step_optimizer, but usage does not otherwise differ from vanilla PyTorch APIs. Loss should be scaled before calling backward, unscale_ should be called before clipping gradients, update should be called after stepping all optimizers, etc.

PyTorch 1.6 or greater is required for this feature.

Parameters

scaler (torch.cuda.amp.GradScaler) – Scaler to wrap and track.

Returns

The scaler. It may be wrapped to add additional functionality for use in Determined.

determined.pytorch.PyTorchTrialContext.distributed

class determined.core._distributed.DistributedContext(*, rank: int, size: int, local_rank: int, local_size: int, cross_rank: int, cross_size: int, chief_ip: Optional[str] = None, pub_port: int = 12360, pull_port: int = 12376, port_offset: int = 0, force_tcp: bool = False)

DistributedContext provides useful methods for effective distributed training.

A DistributedContext has the following required args:
  • rank: the index of this worker in the entire job

  • size: the number of workers in the entire job

  • local_rank: the index of this worker on this machine

  • local_size: the number of workers on this machine

  • cross_rank: the index of this machine in the entire job

  • cross_size: the number of machines in the entire job

Additionally, any time that cross_size > 1, you must also provide:
  • chief_ip: the ip address to reach the chief worker (where rank==0)

Note

DistributedContext has .allgather(), .gather(), and .broadcast() methods, which are easy to use and which can be useful for coordinating work across workers, but it is not a replacement for the allgather/gather/broadcast operations in your particular distributed training framework.

classmethod from_horovod(hvd: Any, chief_ip: Optional[str] = None) determined.core._distributed.DistributedContext

Create a DistributedContext using the provided hvd module to determine rank information.

Example:

import horovod.torch as hvd
hvd.init()
distributed = DistributedContext.from_horovod(hvd)

The IP address for the chief worker is required whenever hvd.cross_size() > 1. The value may be provided using the chief_ip argument or the DET_CHIEF_IP environment variable.

classmethod from_deepspeed(chief_ip: Optional[str] = None) determined.core._distributed.DistributedContext

Create a DistributedContext using the standard deepspeed environment variables to determine rank information.

The IP address for the chief worker is required whenever CROSS_SIZE > 1. The value may be provided using the chief_ip argument or the DET_CHIEF_IP environment variable.

classmethod from_torch_distributed(chief_ip: Optional[str] = None) determined.core._distributed.DistributedContext

Create a DistributedContext using the standard torch distributed environment variables to determine rank information.

The IP address for the chief worker is required whenever CROSS_SIZE > 1. The value may be provided via the chief_ip argument or the DET_CHIEF_IP environment variable.

get_rank() int

Return the rank of the process in the trial. The rank of a process is a unique ID within the trial. That is, no two processes in the same trial are assigned the same rank.

get_local_rank() int

Return the rank of the process on the agent. The local rank of a process is a unique ID within a given agent and trial; that is, no two processes in the same trial that are executing on the same agent are assigned the same rank.

get_size() int

Return the number of slots this trial is running on.

get_num_agents() int

Return the number of agents this trial is running on.

gather(stuff: Any) Optional[List]

Gather stuff to the chief. The chief returns a list of all stuff, and workers return None.

gather() is not a replacement for the gather functionality of your distributed training framework.

gather_local(stuff: Any) Optional[List]

Gather stuff to the local chief. The local chief returns a list of all stuff, and local workers return None.

gather_local() is not a replacement for the gather functionality of your distributed training framework.

allgather(stuff: Any) List

Gather stuff to the chief and broadcast all of it back to the workers.

allgather() is not a replacement for the allgather functionality of your distributed training framework.

allgather_local(stuff: Any) List

Gather stuff to the local chief and broadcast all of it back to the local workers.

allgather_local() is not a replacement for the allgather functionality of your distributed training framework.

broadcast(stuff: Any) Any

Every worker gets the stuff sent by the chief.

broadcast() is not a replacement for the broadcast functionality of your distributed training framework.

broadcast_local(stuff: Optional[Any] = None) Any

Every worker gets the stuff sent by the local chief.

broadcast_local() is not a replacement for the broadcast functionality of your distributed training framework.

determined.pytorch.PyTorchExperimentalContext

class determined.pytorch.PyTorchExperimentalContext(parent: Any)
disable_auto_to_device() None

Prevent the PyTorchTrialController from automatically moving batched data to device. Call this if you want to override the default behavior of moving all items of a list, tuple, and/or dict to the GPU. Then, you can control how data is moved to the GPU directly in the train_batch and evaluate_batch methods of your PyTorchTrial definition. You should call context.to_device on primitive data types that you do want to move to GPU as in the example below.

# PyTorchTrial methods.
def __init__(context): # PyTorchTrial init
    self.context.experimental.disable_auto_to_device()
    ...

def train_batch(self, context, batch):
    for k, item in batch.items():
        if k == "img":
            batch["img"] = self.context.to_device(batch["img"])
    ...
disable_dataset_reproducibility_checks() None

disable_dataset_reproducibility_checks() allows you to return an arbitrary DataLoader from build_training_data_loader() or build_validation_data_loader().

Normally you would be required to return a det.pytorch.DataLoader instead, which would guarantee that an appropriate Sampler is used that ensures:

  • When shuffle=True, the shuffle is reproducible.

  • The dataset will start at the right location, even after pausing/continuing.

  • Proper sharding is used during distributed training.

However, there may be cases where either reproducibility of the dataset is not needed or where the nature of the dataset may cause the det.pytorch.DataLoader to be unsuitable.

In those cases, you may call disable_dataset_reproducibility_checks() and you will be free to return any torch.utils.data.DataLoader you like. Dataset reproducibility will still be possible, but it will be your responsibility. If desired, you may find the Sampler classes in determined.pytorch.samplers to be helpful.

use_amp() None

Handles all operations for the most simple cases automatically with a default gradient scaler. Specifically, wraps forward pass in an autocast context, scales loss before backward pass, unscales before clipping gradients, uses scaler when stepping optimizer(s), and updates scaler afterwards. Do not call wrap_scaler directly when using this method.

PyTorch 1.6 or greater is required for this feature.

determined.pytorch.DataLoader

class determined.pytorch.DataLoader(dataset: torch.utils.data.dataset.Dataset, batch_size: Optional[int] = 1, shuffle: bool = False, sampler: Optional[torch.utils.data.sampler.Sampler] = None, batch_sampler: Optional[torch.utils.data.sampler.BatchSampler] = None, num_workers: int = 0, collate_fn: Optional[Callable[[List[determined.pytorch._data.T]], Any]] = None, pin_memory: bool = False, drop_last: bool = False, timeout: float = 0, worker_init_fn: Optional[Callable[[int], None]] = None, multiprocessing_context: Optional[Any] = None, generator: Optional[Any] = None, *, prefetch_factor: int = 2, persistent_workers: bool = False)

DataLoader is meant to contain a user’s Dataset, configuration for sampling data in batches, and performance configuration like multiprocessing.

The __init__ function determines the defaults in the same way as a torch.utils.data.DataLoader would, so the behavior should be familiar. However, the torch.utils.data.Dataloader that is used for training and validation is not created until get_data_loader(…) is called. This is done so that Determined can ensure that sampling restarts from the right location and distributed sampling is handled correctly.

Note that the arguments are from PyTorch.

Parameters
  • dataset (Dataset) – dataset from which to load the data.

  • batch_size (int, optional) – how many samples per batch to load (default: 1).

  • shuffle (bool, optional) – set to True to have the data reshuffled at every epoch (default: False).

  • sampler (Sampler, optional) – defines the strategy to draw samples from the dataset. If specified, shuffle must be False.

  • batch_sampler (Sampler, optional) – like sampler, but returns a batch of indices at a time. Mutually exclusive with batch_size, shuffle, sampler, and drop_last.

  • num_workers (int, optional) – how many subprocesses to use for data loading. 0 means that the data will be loaded in the main process. (default: 0)

  • collate_fn (callable, optional) – merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset.

  • pin_memory (bool, optional) – If True, the data loader will copy Tensors into CUDA pinned memory before returning them. If your data elements are a custom type, or your collate_fn returns a batch that is a custom type, see the example below.

  • drop_last (bool, optional) – set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. (default: False)

  • timeout (numeric, optional) – if positive, the timeout value for collecting a batch from workers. Should always be non-negative. (default: 0)

  • worker_init_fn (callable, optional) – If not None, this will be called on each worker subprocess with the worker id (an int in [0, num_workers - 1]) as input, after seeding and before data loading. (default: None)

  • generator (torch.Generator, optional) – If not None, this RNG will be used by RandomSampler to generate random indexes and multiprocessing to generate base_seed for workers. (default: None)

  • prefetch_factor (int, optional, keyword-only arg) – Number of samples loaded in advance by each worker. 2 means there will be a total of 2 * num_workers samples prefetched across all workers. (default: 2)

  • persistent_workers (bool, optional) – If True, the data loader will not shut down the worker processes after a dataset has been consumed once. This allows to maintain the workers Dataset instances alive. (default: False)

determined.pytorch.LRScheduler

class determined.pytorch.LRScheduler(scheduler: torch.optim.lr_scheduler._LRScheduler, step_mode: determined.pytorch._lr_scheduler.LRScheduler.StepMode, frequency: int = 1)

Wrapper for a PyTorch LRScheduler.

This wrapper fulfills two main functions:

  1. Save and restore the learning rate when a trial is paused, preempted, etc.

  2. Step the learning rate scheduler at the configured frequency (e.g., every batch or every epoch).

class StepMode(value)

Specifies when and how scheduler.step() should be executed.

STEP_EVERY_EPOCH
STEP_EVERY_BATCH
MANUAL_STEP
STEP_EVERY_OPTIMIZER_STEP
__init__(scheduler: torch.optim.lr_scheduler._LRScheduler, step_mode: determined.pytorch._lr_scheduler.LRScheduler.StepMode, frequency: int = 1)

LRScheduler constructor.

Parameters
  • scheduler (torch.optim.lr_scheduler._LRScheduler) – Learning rate scheduler to be used by Determined.

  • step_mode (determined.pytorch.LRSchedulerStepMode) –

    The strategy Determined will use to call (or not call) scheduler.step().

    1. STEP_EVERY_EPOCH: Determined will call scheduler.step() after every frequency training epoch(s). No arguments will be passed to step().

    2. STEP_EVERY_BATCH: Determined will call scheduler.step() after every frequency training batch(es). No arguments will be passed to step(). This option does not take into account gradient aggregation; STEP_EVERY_OPTIMIZER_STEP which is recommended.

    3. STEP_EVERY_OPTIMIZER_STEP: Determined will call scheduler.step() in sync with optimizer steps. With optimizations.aggregation_frequency unset, this is equivalent to STEP_EVERY_BATCH; when it is set, it ensures the LR scheduler is stepped every _effective_ batch.

      If the option frequency is set to some value N, Determined will step the LR scheduler every N optimizer steps.

    4. MANUAL_STEP: Determined will not call scheduler.step() at all. It is up to the user to decide when to call scheduler.step(), and whether to pass any arguments.

  • frequency – Sets the frequency at which the batch and epoch step modes get triggered.

determined.pytorch.Reducer

class determined.pytorch.Reducer(value)

A Reducer defines a method for reducing (aggregating) evaluation metrics. See evaluation_reducer() for details.

AVG
SUM
MAX
MIN

determined.pytorch.MetricReducer

class determined.pytorch.MetricReducer

Efficiently aggregating validation metrics during a multi-slot distributed trial is done in three steps:

  1. Gather all the values to be reduced during the reduction window (either a training or a validation workload). In a multi-slot trial, this is done on each slot in parallel.

  2. Calculate the per-slot reduction. This will return some intermediate value that each slot will contribute to the final metric calculation. It can be as simple as a list of all the raw values from step 1, but reducing the intermediate value locally will distribute the final metric calculation more efficiently and will reduce network communication costs.

  3. Reduce the per-slot reduction values from Step 2 into a final metric.

The MetricReducer API makes it possible for users to define a maximally efficient custom metric by exposing these steps to users:

  • Step 1 is defined by the user; it is not part of the interface. This flexibility gives the user full control when gathering individual values for reduction.

  • Step 2 is the MetricReducer.per_slot_reduce() interface.

  • Step 3 is the MetricReducer.cross_slot_reduce() interface.

  • The MetricReducer.reset() interface allows for MetricReducer reuse across many train and validation workloads.

Example implementation and usage:

class MyAvgMetricReducer(pytorch.MetricReducer):
    def __init__(self):
        self.reset()

    def reset(self):
        self.sum = 0
        self.counts = 0

    # User-defined mechanism for collecting values throughout
    # training or validation. This update() mechanism demonstrates
    # a computationally- and memory-efficient way to store the values.
    def update(self, value):
        self.sum += sum(value)
        self.counts += 1

    def per_slot_reduce(self):
        # Because the chosen update() mechanism is so
        # efficient, this is basically a noop.
        return self.sum, self.counts

    def cross_slot_reduce(self, per_slot_metrics):
        # per_slot_metrics is a list of (sum, counts) tuples
        # returned by the self.pre_slot_reduce() on each slot
        sums, counts = zip(*per_slot_metrics)
        return sum(sums) / sum(counts)


class MyPyTorchTrial(pytorch.PyTorchTrial):
    def __init__(self, context):
        # Register your custom reducer.
        self.my_avg = context.wrap_reducer(
            MyAvgMetricReducer(), name="my_avg"
        )
        ...

    def train_batch(self, batch, epoch_idx, batch_idx):
        ...
        # You decide how/when you call update().
        self.my_avg.update(my_val)

        # The "my_avg" metric will be included in the final
        # metrics after the workload has completed; no need
        # to return it here.
        return {"loss": loss}

See also: determined.pytorch.PyTorchExperimentalContext.wrap_reducer().

abstract reset() None

Reset reducer state for another set of values.

This will be called before any train or validation workload begins.

abstract per_slot_reduce() Any

This will be called after all workers have finished (even when there is only one worker).

It should return some picklable value that is meaningful for cross_slot_reduce.

This will be called after any train or validation workload ends.

abstract cross_slot_reduce(per_slot_metrics: List) Any

This will be called after per_slot_reduce has finished (even when there is only one worker).

The per_slot_metrics will be a list containing the output of per_slot_reduce() from each worker.

The return value should either be:
  • A dict mapping string metric names to metric values, if the call to context.wrap_reducer() omitted the name parameter, or

  • A non-dict metric value if the call to context.wrap_reducer() had name set to a string (an error will be raised if a dict-type metric is returned but name was set).

This will be called after per_slot_reduce.

determined.pytorch.samplers

class determined.pytorch.samplers.DistributedBatchSampler(batch_sampler: torch.utils.data.sampler.BatchSampler, num_workers: int, rank: int)

DistributedBatchSampler will iterate through an underlying batch sampler and return batches which belong to this shard.

DistributedBatchSampler is different from the PyTorch built-in torch.utils.data.distributed.DistributedSampler, because that DistributedSampler expects to bbe called before the BatchSampler, and additionally the DistributedSampler is meant to be a stand-alone sampler.

DistributedBatchSampler has the potential gotcha that when wrapping a non-repeating BatchSampler, if the length of the BatchSampler is not divisible by the number of replicas the length of the resulting DistributedBatchSampler will differ based on the rank. In that case, the divergent paths of multiple workers could cause problems during training. PyTorchTrial always uses RepeatBatchSampler during training, PyTorchTrial does not require that the workers stay in step during validation, so this potential gotcha is not a problem in Determined.

class determined.pytorch.samplers.DistributedSampler(sampler: torch.utils.data.sampler.Sampler, num_workers: int, rank: int)

DistributedSampler will iterate through an underlying sampler and return samples which belong to this shard.

DistributedSampler is different from the PyTorch built-in torch.utils.data.DistributedSampler because theirs is meant to be a standalone sampler. Theirs does shuffling and assumes a constant size dataset as an input. Ours is meant to be used a building block in a chain of samplers, so it accepts a sampler as input that may or may not be constant-size.

class determined.pytorch.samplers.RepeatBatchSampler(batch_sampler: torch.utils.data.sampler.BatchSampler)

RepeatBatchSampler yields infinite batches indices by repeatedly iterating through the batches of another BatchSampler. __len__ is just the length of the underlying BatchSampler.

class determined.pytorch.samplers.RepeatSampler(sampler: torch.utils.data.sampler.Sampler)

RepeatSampler yields infinite batches indices by repeatedly iterating through the batches of another Sampler. __len__ is just the length of the underlying Sampler.

class determined.pytorch.samplers.ReproducibleShuffleBatchSampler(batch_sampler: torch.utils.data.sampler.BatchSampler, seed: int)

ReproducibleShuffleBatchSampler will apply a deterministic shuffle based on a seed.

Warning

Always shuffle before skipping and before repeating. Skip-before-shuffle would break the reproducibility of the shuffle, and repeat-before-shuffle would cause the shuffle to hang as it iterates through an infinite sampler.

Warning

Always prefer ReproducibleShuffleSampler over this class when possible. The reason is that shuffling at the batch level results in a superior shuffle, where the contents of each batch are varied between epochs, rather than just the order of batches.

class determined.pytorch.samplers.ReproducibleShuffleSampler(sampler: torch.utils.data.sampler.Sampler, seed: int)

ReproducibleShuffleSampler will apply a deterministic shuffle based on a seed.

Warning

Always shuffle before skipping and before repeating. Skip-before-shuffle would break the reproducibility of the shuffle, and repeat-before-shuffle would cause the shuffle to hang as it iterates through an infinite sampler.

class determined.pytorch.samplers.SkipBatchSampler(batch_sampler: torch.utils.data.sampler.BatchSampler, skip: int)

SkipBatchSampler skips some batches from an underlying BatchSampler, and yield the rest.

Always skip before you repeat when you are continuing training, or you will apply the skip on every epoch.

Because the SkipBatchSampler is only meant to be used on a training dataset (we never checkpoint during evaluation), and because the training dataset should always be repeated before applying the skip (so you only skip once rather than many times), the length reported is always the length of the underlying sampler, regardless of the size of the skip.

class determined.pytorch.samplers.SkipSampler(sampler: torch.utils.data.sampler.BatchSampler, skip: int)

SkipSampler skips some records from an underlying Sampler, and yields the rest.

Always skip before you repeat when you are continuing training, or you will apply the skip on every epoch.

Warning

When trying to achieve reproducibility after pausing and restarting, you should never prefer this SkipSampler over the SkipBatchSampler, unless you are sure that your dataset will always yield identically sized batches. This is due to how Determined counts batches trained but does not count records trained. Reproducibility when skipping records is only possible if the records to skip can be reliably calculated based on batch size and batches trained.

Because the SkipSampler is only meant to be used on a training dataset (we never checkpoint during evaluation), and because the training dataset should always be repeated before applying the skip (so you only skip once rather than many times), the length reported is always the length of the underlying sampler, regardless of the size of the skip.

determined.pytorch.PyTorchCallback

class determined.pytorch.PyTorchCallback

Abstract base class used to define a callback that should execute during the lifetime of a PyTorchTrial.

Warning

If you are defining a stateful callback (e.g., it mutates a self attribute over its lifetime), you must also override state_dict() and load_state_dict() to ensure this state can be serialized and deserialized over checkpoints.

Warning

If distributed training is enabled, every GPU will execute a copy of this callback (except for on_validation_end(), on_validation_step_end() and on_checkpoint_write_end()). To configure a callback implementation to execute on a subset of GPUs, please condition your implementation on trial.context.distributed.get_rank().

load_state_dict(state_dict: Dict[str, Any]) None

Load the state of this using the deserialized state_dict.

on_checkpoint_end(checkpoint_dir: str) None

Deprecated. Please use on_checkpoint_write_end() instead.

Warning

This callback only executes on the chief GPU when doing distributed training.

on_checkpoint_load_start(checkpoint: Dict[str, Any]) None

Run before state_dict is restored.

on_checkpoint_save_start(checkpoint: Dict[str, Any]) None

Run before checkpoint is persisted.

on_checkpoint_upload_end(uuid: str) None

Run after every checkpoint finishes uploading.

on_checkpoint_write_end(checkpoint_dir: str) None

Run after every checkpoint finishes writing to checkpoint_dir.

Warning

This callback only executes on the chief GPU when doing distributed training.

on_training_epoch_end(epoch_idx: int) None

Run on end of a training epoch

on_training_epoch_start(epoch_idx: int) None

Run on start of a new training epoch

on_training_start() None

Run after checkpoint loads and before training begins.

on_training_workload_end(avg_metrics: Dict[str, Any], batch_metrics: Dict[str, Any]) None

Run on end of a training workload. Workloads can contain varying numbers of batches. In the current implementation of PyTorchTrial, the maximum number of batches in a workload is equal to the scheduling_unit field defined in the experiment config.

on_trial_shutdown() None

Runs just before shutting down training to get off of the cluster. This does not imply that the trial is complete; it may just be paused or preempted by a higher-priority task.

Warning

This callback runs each time a Trial shuts down gracefully to come off the cluster. This callback does not mean that the Trial is done training. Additionally, if the trial is killed the container will be destroyed without this callback running.

on_trial_startup(first_batch_idx: int, checkpoint_uuid: Optional[str]) None

Runs before training, validation, or building dataloaders.

Parameters
  • first_batch_idx (int) – The first batch index to be trained. If the trial has already completed some amount of training in a previous allocation on the cluster, this will be nonzero.

  • checkpoint_uuid (str or None) – The checkpoint from which weight, optimizer state, etc. will be loaded. When first_batch_idx > 0 this will contain the uuid of the most recent checkpoint saved by this trial. Otherwise, it will contain the uuid of the checkpoint from which this trial was configured to warm start from (via source_trial_id or source_checkpoint_uuid in the searcher config), or None if no warm start was configured.

on_validation_end(metrics: Dict[str, Any]) None

Run after every validation ends.

on_validation_epoch_end(outputs: List[Any]) None

Run after a new validation epoch has finished

on_validation_epoch_start() None

Run on start of a new validation epoch

on_validation_start() None

Run before every validation begins.

on_validation_step_end(metrics: Dict[str, Any]) None

Run after every validation step ends.

on_validation_step_start() None

Run before every validation step begins.

state_dict() Dict[str, Any]

Serialize the state of this callback to a dictionary. Return value must be pickle-able.

determined.tensorboard.metric_writers.pytorch.TorchWriter

class determined.tensorboard.metric_writers.pytorch.TorchWriter

TorchWriter uses PyTorch file writers and summary operations to write out tfevent files containing scalar batch metrics. It creates an instance of torch.utils.tensorboard.SummaryWriter which can be accessed via the writer field and configures the SummaryWriter to write to the correct directory inside the trial container.

Usage example:

from determined.tensorboard.metric_writers.pytorch import TorchWriter

class MyModel(PyTorchTrial):
    def __init__(self, context):
        ...
        self.logger = TorchWriter()

    def train_batch(self, batch, epoch_idx, batch_idx):
        self.logger.writer.add_scalar('my_metric', np.random.random(), batch_idx)

determined.pytorch.load_trial_from_checkpoint_path

determined.pytorch.load_trial_from_checkpoint_path(path: str, **kwargs: Any) determined.pytorch._pytorch_trial.PyTorchTrial

Loads a checkpoint written by a PyTorchTrial.

You should have already downloaded the checkpoint files, likely with Checkpoint.download().

The return value will be a restored instance of the subclass PyTorchTrial you used for training.

Parameters
  • path (string) – Top level directory to load the checkpoint from.

  • kwargs (optional) – Any keyword arguments will be applied to torch.load. See documentation for torch.load.