PyTorch API Reference¶
User Guide |
---|
determined.pytorch.PyTorchTrial
¶
- class determined.pytorch.PyTorchTrial(context: determined.pytorch._pytorch_context.PyTorchTrialContext)¶
PyTorch trials are created by subclassing this abstract class.
We can do the following things in this trial class:
Define models, optimizers, and LR schedulers.
In the
__init__()
method, initialize models, optimizers, and LR schedulers and wrap them withwrap_model
,wrap_optimizer
,wrap_lr_scheduler
provided byPyTorchTrialContext
.Run forward and backward passes.
In
train_batch()
, callbackward
andstep_optimizer
provided byPyTorchTrialContext
. We support arbitrary numbers of models, optimizers, and LR schedulers and arbitrary orders of running forward and backward passes.Configure automatic mixed precision.
In the
__init__()
method, callconfigure_apex_amp
provided byPyTorchTrialContext
.Clip gradients.
In
train_batch()
, pass a function intostep_optimizer(optimizer, clip_grads=...)
provided byPyTorchTrialContext
.
- trial_controller_class¶
alias of
determined.pytorch._pytorch_trial.PyTorchTrialController
- trial_context_class¶
alias of
determined.pytorch._pytorch_context.PyTorchTrialContext
- abstract __init__(context: determined.pytorch._pytorch_context.PyTorchTrialContext) None ¶
Initializes a trial using the provided
context
. The general steps are:Initialize model(s) and wrap them with
context.wrap_model
.Initialize optimizer(s) and wrap them with
context.wrap_optimizer
.Initialize learning rate schedulers and wrap them with
context.wrap_lr_scheduler
.If desired, wrap models and optimizer with
context.configure_apex_amp
to useapex.amp
for automatic mixed precision.Define custom loss function and metric functions.
Warning
You may see metrics for trials that are paused and later continued that are significantly different from trials that are not paused if some of your models, optimizers, and learning rate schedulers are not wrapped. The reason is that the model’s state may not be restored accurately or completely from the checkpoint, which is saved to a checkpoint and then later loaded into the trial during resumed training. When using PyTorch, this can sometimes happen if the PyTorch API is not used correctly.
Here is a code example.
self.context = context self.a = self.context.wrap_model(MyModelA()) self.b = self.context.wrap_model(MyModelB()) self.opt1 = self.context.wrap_optimizer(torch.optm.Adam(self.a)) self.opt2 = self.context.wrap_optimizer(torch.optm.Adam(self.b)) (self.a, self.b), (self.opt1, self.opt2) = self.context.configure_apex_amp( models=[self.a, self.b], optimizers=[self.opt1, self.opt2], num_losses=2, ) self.lrs1 = self.context.wrap_lr_scheduler( lr_scheduler=LambdaLR(self.opt1, lr_lambda=lambda epoch: 0.95 ** epoch), step_mode=LRScheduler.StepMode.STEP_EVERY_EPOCH, ))
- abstract train_batch(batch: Union[Dict[str, torch.Tensor], Sequence[torch.Tensor], torch.Tensor], epoch_idx: int, batch_idx: int) Union[torch.Tensor, Dict[str, Any]] ¶
Train on one batch.
Users should implement this function by doing the following things:
Run forward passes on the models.
Calculate the gradients with the losses with
context.backward
.Call an optimization step for the optimizers with
context.step_optimizer
. You can clip gradients by specifying the argumentclip_grads
.Step LR schedulers if using manual step mode.
Return training metrics in a dictionary.
Here is a code example.
# Assume two models, two optimizers, and two LR schedulers were initialized # in ``__init__``. # Calculate the losses using the models. loss1 = self.model1(batch) loss2 = self.model2(batch) # Run backward passes on losses and step optimizers. These can happen # in arbitrary orders. self.context.backward(loss1) self.context.backward(loss2) self.context.step_optimizer( self.opt1, clip_grads=lambda params: torch.nn.utils.clip_grad_norm_(params, 0.0001), ) self.context.step_optimizer(self.opt2) # Step the learning rate. self.lrs1.step() self.lrs2.step() return {"loss1": loss1, "loss2": loss2}
- Parameters
batch (Dict[str, torch.Tensor], Sequence[torch.Tensor], torch.Tensor) – batch of data for training.
epoch_idx (integer) – index of the current epoch among all the batches processed per device (slot) since the start of training.
batch_idx (integer) – index of the current batch among all the epochs processed per device (slot) since the start of training.
- Returns
training metrics to return.
- Return type
torch.Tensor or Dict[str, Any]
- abstract build_training_data_loader() determined.pytorch._data.DataLoader ¶
Defines the data loader to use during training.
Must return an instance of
determined.pytorch.DataLoader
.
- abstract build_validation_data_loader() determined.pytorch._data.DataLoader ¶
Defines the data loader to use during validation.
Must return an instance of
determined.pytorch.DataLoader
.
- build_callbacks() Dict[str, determined.pytorch._callback.PyTorchCallback] ¶
Defines a dictionary of string names to callbacks to be used during training and/or validation.
The string name will be used as the key to save and restore callback state for any callback that defines
load_state_dict()
andstate_dict()
.
- evaluate_batch(batch: Union[Dict[str, torch.Tensor], Sequence[torch.Tensor], torch.Tensor], batch_idx: int) Dict[str, Any] ¶
Calculate validation metrics for a batch and return them as a dictionary mapping metric names to metric values. Per-batch validation metrics are reduced (aggregated) to produce a single set of validation metrics for the entire validation set (see
evaluation_reducer()
).There are two ways to specify evaluation metrics. Either override
evaluate_batch()
orevaluate_full_dataset()
. Whileevaluate_full_dataset()
is more flexible,evaluate_batch()
should be preferred, since it can be parallelized in distributed environments, whereasevaluate_full_dataset()
cannot. Only one ofevaluate_full_dataset()
andevaluate_batch()
should be overridden by a trial.The metrics returned from this function must be JSON-serializable.
- Parameters
batch (Dict[str, torch.Tensor], Sequence[torch.Tensor], torch.Tensor) – batch of data for evaluating.
batch_idx (integer) – index of the current batch among all the epochs processed per device (slot) since the start of training.
- evaluation_reducer() Union[determined.pytorch._reducer.Reducer, Dict[str, determined.pytorch._reducer.Reducer]] ¶
Return a reducer for all evaluation metrics, or a dict mapping metric names to individual reducers. Defaults to
determined.pytorch.Reducer.AVG
.
- evaluate_full_dataset(data_loader: torch.utils.data.dataloader.DataLoader) Dict[str, Any] ¶
Calculate validation metrics on the entire validation dataset and return them as a dictionary mapping metric names to reduced metric values (i.e., each returned metric is the average or sum of that metric across the entire validation set).
This validation cannot be distributed and is performed on a single device, even when multiple devices (slots) are used for training. Only one of
evaluate_full_dataset()
andevaluate_batch()
should be overridden by a trial.The metrics returned from this function must be JSON-serializable.
- Parameters
data_loader (torch.utils.data.DataLoader) – data loader for evaluating.
- get_batch_length(batch: Any) int ¶
Count the number of records in a given batch.
Override this method when you are using custom batch types, as produced when iterating over the DataLoader. For example, when using pytorch_geometric:
# Extra imports: from determined.pytorch import DataLoader from torch_geometric.data.dataloader import Collater # Trial methods: def build_training_data_loader(self): return DataLoader( self.train_subset, batch_size=self.context.get_per_slot_batch_size(), collate_fn=Collater([], []), ) def get_batch_length(self, batch): # `batch` is `torch_geometric.data.batch.Batch`. return batch.num_graphs
- Parameters
batch (Any) – input training or validation data batch object.
determined.pytorch.PyTorchTrialContext
¶
- class determined.pytorch.PyTorchTrialContext(*args: Any, **kwargs: Any)¶
Bases:
determined._trial_context.TrialContext
,determined.pytorch._reducer._PyTorchReducerContext
Contains runtime information for any Determined workflow that uses the
PyTorch
API.With this class, users can do the following things:
Wrap PyTorch models, optimizers, and LR schedulers with their Determined-compatible counterparts using
wrap_model()
,wrap_optimizer()
,wrap_lr_scheduler()
, respectively. The Determined-compatible objects are capable of transparent distributed training, checkpointing and exporting, mixed-precision training, and gradient aggregation.Configure apex amp by calling
configure_apex_amp()
(optional).Calculate the gradients with
backward()
on a specified loss.Run an optimization step with
step_optimizer()
.Functionalities inherited from
determined.TrialContext
, including getting the runtime information and properly handling training data in distributed training.
- backward(loss: torch.Tensor, gradient: Optional[torch.Tensor] = None, retain_graph: bool = False, create_graph: bool = False) None ¶
Compute the gradient of current tensor w.r.t. graph leaves.
The arguments are used in the same way as
torch.Tensor.backward
. See https://pytorch.org/docs/1.4.0/_modules/torch/tensor.html#Tensor.backward for details.Warning
When using distributed training, we don’t support manual gradient accumulation. That means the gradient on each parameter can only be calculated once on each batch. If a parameter is associated with multiple losses, you can either choose to call
backward'' on only one of those losses, or you can set the ``require_grads
flag of a parameter or module toFalse
to avoid manual gradient accumulation on that parameter. However, you can do gradient accumulation across batches by setting optimizations.aggregation_frequency in the experiment configuration to be greater than 1.- Parameters
gradient (Tensor or None) – Gradient w.r.t. the tensor. If it is a tensor, it will be automatically converted to a Tensor that does not require grad unless
create_graph
is True. None values can be specified for scalar Tensors or ones that don’t require grad. If a None value would be acceptable then this argument is optional.retain_graph (bool, optional) – If
False
, the graph used to compute the grads will be freed. Note that in nearly all cases setting this option to True is not needed and often can be worked around in a much more efficient way. Defaults to the value ofcreate_graph
.create_graph (bool, optional) – If
True
, graph of the derivative will be constructed, allowing to compute higher order derivative products. Defaults toFalse
.
- configure_apex_amp(models: Union[torch.nn.modules.module.Module, List[torch.nn.modules.module.Module]], optimizers: Union[torch.optim.optimizer.Optimizer, List[torch.optim.optimizer.Optimizer]], enabled: Optional[bool] = True, opt_level: Optional[str] = 'O1', cast_model_type: Optional[torch.dtype] = None, patch_torch_functions: Optional[bool] = None, keep_batchnorm_fp32: Optional[Union[bool, str]] = None, master_weights: Optional[bool] = None, loss_scale: Optional[Union[float, str]] = None, cast_model_outputs: Optional[torch.dtype] = None, num_losses: Optional[int] = 1, verbosity: Optional[int] = 1, min_loss_scale: Optional[float] = None, max_loss_scale: Optional[float] = 16777216.0) Tuple ¶
Configure automatic mixed precision for your models and optimizers using NVIDIA’s Apex PyTorch extension. Note that details for
apex.amp
are handled automatically within Determined after this call.This function must be called after you have finished constructing your models and optimizers with
wrap_model()
andwrap_optimizer()
.This function has the same arguments as apex.amp.initialize.
Warning
When using distributed training and automatic mixed precision, we only support
num_losses=1
and calling backward on the loss once.- Parameters
models (
torch.nn.Module
or list oftorch.nn.Module
s) – Model(s) to modify/cast.optimizers (
torch.optim.Optimizer
or list oftorch.optim.Optimizer
s) – Optimizers to modify/cast. REQUIRED for training.enabled (bool, optional, default=True) – If False, renders all Amp calls no-ops, so your script should run as if Amp were not present.
opt_level (str, optional, default="O1") – Pure or mixed precision optimization level. Accepted values are “O0”, “O1”, “O2”, and “O3”, explained in detail above.
cast_model_type (
torch.dtype
, optional, default=None) – Optional property override, see above.patch_torch_functions (bool, optional, default=None) – Optional property override.
keep_batchnorm_fp32 (bool or str, optional, default=None) – Optional property override. If passed as a string, must be the string “True” or “False”.
master_weights (bool, optional, default=None) – Optional property override.
loss_scale (float or str, optional, default=None) – Optional property override. If passed as a string, must be a string representing a number, e.g., “128.0”, or the string “dynamic”.
cast_model_outputs (torch.dtype, optional, default=None) – Option to ensure that the outputs of your model is always cast to a particular type regardless of
opt_level
.num_losses (int, optional, default=1) – Option to tell Amp in advance how many losses/backward passes you plan to use. When used in conjunction with the
loss_id
argument toamp.scale_loss
, enables Amp to use a different loss scale per loss/backward pass, which can improve stability. Ifnum_losses
is left to 1, Amp will still support multiple losses/backward passes, but use a single global loss scale for all of them.verbosity (int, default=1) – Set to 0 to suppress Amp-related output.
min_loss_scale (float, default=None) – Sets a floor for the loss scale values that can be chosen by dynamic loss scaling. The default value of None means that no floor is imposed. If dynamic loss scaling is not used, min_loss_scale is ignored.
max_loss_scale (float, default=2.**24) – Sets a ceiling for the loss scale values that can be chosen by dynamic loss scaling. If dynamic loss scaling is not used, max_loss_scale is ignored.
- Returns
Model(s) and optimizer(s) modified according to the
opt_level
. Ifoptimizers
args were lists, the corresponding return value will also be a list.
- current_train_batch() int ¶
Current global batch index
- classmethod from_config(config: Dict[str, Any]) determined._trial_context.TrialContext ¶
Create a context object suitable for debugging outside of Determined.
An example for a subclass of
PyTorchTrial
:config = { ... } context = det.pytorch.PyTorchTrialContext.from_config(config) my_trial = MyPyTorchTrial(context) train_ds = my_trial.build_training_data_loader() for epoch_idx in range(3): for batch_idx, batch in enumerate(train_ds): metrics = my_trial.train_batch(batch, epoch_idx, batch_idx) ...
An example for a subclass of
TFKerasTrial
:config = { ... } context = det.keras.TFKerasTrialContext.from_config(config) my_trial = tf_keras_one_var_model.OneVarTrial(context) model = my_trial.build_model() model.fit(my_trial.build_training_data_loader()) eval_metrics = model.evaluate(my_trial.build_validation_data_loader())
- Parameters
config – An experiment config file, in dictionary form.
- get_data_config() Dict[str, Any] ¶
Return the data configuration.
- get_experiment_config() Dict[str, Any] ¶
Return the experiment configuration.
- get_experiment_id() int ¶
Return the experiment ID of the current trial.
- get_global_batch_size() int ¶
Return the global batch size.
- get_hparam(name: str) Any ¶
Return the current value of the hyperparameter with the given name.
- get_hparams() Dict[str, Any] ¶
Return a dictionary of hyperparameter names to values.
- get_per_slot_batch_size() int ¶
Return the per-slot batch size. When a model is trained with a single GPU, this is equal to the global batch size. When multi-GPU training is used, this is equal to the global batch size divided by the number of GPUs used to train the model.
- get_stop_requested() bool ¶
Return whether a trial stoppage has been requested.
- get_tensorboard_path() pathlib.Path ¶
Get the path where files for consumption by TensorBoard should be written
- get_trial_id() int ¶
Return the trial ID of the current trial.
- is_epoch_end() bool ¶
Returns true if the current batch is the last batch of the epoch.
Warning
Not accurate for variable size epochs.
- is_epoch_start() bool ¶
Returns true if the current batch is the first batch of the epoch.
Warning
Not accurate for variable size epochs.
- set_profiler(*args: List[str], **kwargs: Any) None ¶
Sets a torch profiler instance on the trial context to be called in _pytorch_trial when training.
- set_stop_requested(stop_requested: bool) None ¶
Set a flag to request a trial stoppage. When this flag is set to True, we finish the step, checkpoint, then exit.
- step_optimizer(optimizer: torch.optim.optimizer.Optimizer, clip_grads: Optional[Callable[[Iterator], None]] = None, auto_zero_grads: bool = True, scaler: Optional[Any] = None) None ¶
Perform a single optimization step.
This function must be called once for each optimizer. However, the order of different optimizers’ steps can be specified by calling this function in different orders. Also, gradient accumulation across iterations is performed by the Determined training loop by setting the experiment configuration field optimizations.aggregation_frequency.
Here is a code example:
def clip_grads(params): torch.nn.utils.clip_grad_norm_(params, 0.0001), self.context.step_optimizer(self.opt1, clip_grads)
- Parameters
optimizer (
torch.optim.Optimizer
) – Which optimizer should be stepped.clip_grads (a function, optional) – This function should have one argument for parameters in order to clip the gradients.
auto_zero_grads (bool, optional) – Automatically zero out gradients automatically after stepping the optimizer. If false, you need to call
optimizer.zero_grad()
manually. Note that if optimizations.aggregation_frequency is greater than 1,auto_zero_grads
must be true.scaler (
torch.cuda.amp.GradScaler
, optional) – The scaler to use for stepping the optimizer. This should be unset if not using AMP, and is necessary ifwrap_scaler()
was called directly.
- to_device(data: Union[Dict[str, Union[numpy.ndarray, torch.Tensor]], Sequence[Union[numpy.ndarray, torch.Tensor]], numpy.ndarray, torch.Tensor]) Union[Dict[str, torch.Tensor], Sequence[torch.Tensor], torch.Tensor] ¶
Map generated data to the device allocated by the Determined cluster.
All the data in the data loader and the models are automatically moved to the allocated device. This method aims at providing a function for the data generated on the fly.
- wrap_lr_scheduler(lr_scheduler: torch.optim.lr_scheduler._LRScheduler, step_mode: determined.pytorch._lr_scheduler.LRScheduler.StepMode, frequency: int = 1) torch.optim.lr_scheduler._LRScheduler ¶
Returns a wrapped LR scheduler.
The LR scheduler must use an optimizer wrapped by
wrap_optimizer()
. Ifapex.amp
is in use, the optimizer must also have been configured withconfigure_apex_amp()
.
- wrap_model(model: torch.nn.modules.module.Module) torch.nn.modules.module.Module ¶
Returns a wrapped model.
- wrap_optimizer(optimizer: torch.optim.optimizer.Optimizer, backward_passes_per_step: int = 1) torch.optim.optimizer.Optimizer ¶
Returns a wrapped optimizer.
The optimizer must use the models wrapped by
wrap_model()
. This function creates ahorovod.DistributedOptimizer
if using parallel/distributed training.backward_passes_per_step can be used to specify how many gradient aggregation steps will be performed in a single train_batch call per optimizer step. In most cases, this will just be the default value 1. However, this advanced functionality can be used to support training loops like the one shown below:
def train_batch( self, batch: TorchData, epoch_idx: int, batch_idx: int ) -> Dict[str, torch.Tensor]: data, labels = batch output = self.model(data) loss1 = output['loss1'] loss2 = output['loss2'] self.context.backward(loss1) self.context.backward(loss2) self.context.step_optimizer(self.optimizer, backward_passes_per_step=2) return {"loss1": loss1, "loss2": loss2}
- wrap_reducer(reducer: Union[Callable, determined.pytorch._reducer.MetricReducer], name: Optional[str] = None, for_training: bool = True, for_validation: bool = True) determined.pytorch._reducer.MetricReducer ¶
Register a custom reducer that will calculate a metric properly, even with distributed training.
During distributed training and evaluation, many types of metrics must be calculated globally, rather than calculating the metric on each shard of the dataset and averaged or summed. For example, an accurate ROC AUC for dataset cannot be derived from the individual ROC AUC metrics calculated on by each worker.
Determined solves this problem by offering fully customizable metric reducers which are distributed-aware. These are registered by calling
context.wrap_reducer()
and are updated by the user duringtrain_batch()
orevaluate_batch()
.- Parameters
reducer (Union[Callable, pytorch.MetricReducer]) – Either a reducer function or a pytorch.MetricReducer. See below for more details.
name – (Optional[str] = None): Either a string name to associate with the metric returned by the reducer, or
None
to indicate the metric will return a dict mapping string names to metric values. This allows for a single reducer to return many metrics, such as for a per-class mean IOU calculation. Note that if name is a string, the returned metric must NOT be a dict-type metric.for_training – (bool = True): Indicate that the
reducer
should be used for training workloads.for_validation – (bool = True): Indicate that the
reducer
should be used for validation workloads.
- Return Value:
- pytorch.MetricReducer:
If
reducer
was a function, the returnedMetricReducer
will have a single user-facing method likedef update(value: Any) -> None
that you should call duringtrain_batch
orevaluate_batch
. Otherwise, the return value will just be thereducer
that was passed in.
Reducer functions: the simple API
If the
reducer
parameter is a function, it must have the following properties:It accepts a single parameter, which will be a flat list of all inputs the users pass when they call
.update()
on the object returned bywrap_reducer()
. See the code example below for more details.It returns either a single (non-dict) metric or a dictionary mapping names to metrics, as described above.
The primary motivation for passing a function as the reducer is simplicity. Metrics from all batches will be buffered in memory and passed over the network before they are reduced all at once. This introduces some overhead, but it is likely unnoticeable for scalar metrics or on validation datasets of small or medium size. This single function strategy can be useful for quick prototyping or for calculating metrics that are difficult or impossible to calculate incrementally.
For example, ROC AUC could be properly calculated by passing a small wrapper function calling
sklearn.metrics.roc_auc_score
:# Custom reducer function. def roc_auc_reducer(values): # values will be a flat list of all inputs to # .update(), which in this code example are # tuples of (y_true, y_score). We reshape # that list into two separate lists: y_trues, y_scores = zip(*values) # Then we return a metric value: return sklearn.metrics.roc_auc_score( np.array(y_trues), np.array(y_scores) ) class MyPyTorchTrial(PyTorchTrial): def __init__(self, context): self.roc_auc = context.wrap_reducer( roc_auc_reducer, name="roc_auc" ) ... def evaluate_batch(self, batch): ... # Function-based reducers are updated with .update(). # The roc_auc_reducer function will get a list of all # inputs that we pass in here: self.roc_auc.update((y_true, y_score)) # The "roc_auc" metric will be included in the final # metrics after the workload has completed; no need # to return it here. If that is your only metric, # just return an empty dict. return {}
MetricReducer objects: the advanced API
The primary motivation for passing a
det.pytorch.MetricReducer
as the reducer is performance.det.pytorch.MetricReducer
allows the user more control in how values are stored and exposes aper_slot_reduce()
call which lets users minimize the cost of the network communication before the finalcross_slot_reduce()
.An additional reason for using the
det.pytorch.MetricReducer
is for flexibility of the update mechanism, which is completely user-defined when subclassingMetricReducer
.For the full details and a code example, see:
MetricReducer
.
- wrap_scaler(scaler: Any) Any ¶
Prepares to use automatic mixed precision through PyTorch’s native AMP API. The returned scaler should be passed to
step_optimizer
, but usage does not otherwise differ from vanilla PyTorch APIs. Loss should be scaled before callingbackward
,unscale_
should be called before clipping gradients,update
should be called after stepping all optimizers, etc.PyTorch 1.6 or greater is required for this feature.
- Parameters
scaler (
torch.cuda.amp.GradScaler
) – Scaler to wrap and track.- Returns
The scaler. It may be wrapped to add additional functionality for use in Determined.
determined.pytorch.PyTorchTrialContext.distributed
¶
- class determined.core._distributed.DistributedContext(*, rank: int, size: int, local_rank: int, local_size: int, cross_rank: int, cross_size: int, chief_ip: Optional[str] = None, pub_port: int = 12360, pull_port: int = 12376, port_offset: int = 0, force_tcp: bool = False)
DistributedContext provides useful methods for effective distributed training.
- A DistributedContext has the following required args:
rank: the index of this worker in the entire job
size: the number of workers in the entire job
local_rank: the index of this worker on this machine
local_size: the number of workers on this machine
cross_rank: the index of this machine in the entire job
cross_size: the number of machines in the entire job
- Additionally, any time that cross_size > 1, you must also provide:
chief_ip: the ip address to reach the chief worker (where rank==0)
Note
DistributedContext has
.allgather()
,.gather()
, and.broadcast()
methods, which are easy to use and which can be useful for coordinating work across workers, but it is not a replacement for the allgather/gather/broadcast operations in your particular distributed training framework.- classmethod from_horovod(hvd: Any, chief_ip: Optional[str] = None) determined.core._distributed.DistributedContext
Create a
DistributedContext
using the providedhvd
module to determine rank information.Example:
import horovod.torch as hvd hvd.init() distributed = DistributedContext.from_horovod(hvd)
The IP address for the chief worker is required whenever
hvd.cross_size() > 1
. The value may be provided using thechief_ip
argument or theDET_CHIEF_IP
environment variable.
- classmethod from_deepspeed(chief_ip: Optional[str] = None) determined.core._distributed.DistributedContext
Create a
DistributedContext
using the standard deepspeed environment variables to determine rank information.The IP address for the chief worker is required whenever CROSS_SIZE > 1. The value may be provided using the chief_ip argument or the DET_CHIEF_IP environment variable.
- classmethod from_torch_distributed(chief_ip: Optional[str] = None) determined.core._distributed.DistributedContext
Create a DistributedContext using the standard torch distributed environment variables to determine rank information.
The IP address for the chief worker is required whenever CROSS_SIZE > 1. The value may be provided via the chief_ip argument or the DET_CHIEF_IP environment variable.
- get_rank() int
Return the rank of the process in the trial. The rank of a process is a unique ID within the trial. That is, no two processes in the same trial are assigned the same rank.
- get_local_rank() int
Return the rank of the process on the agent. The local rank of a process is a unique ID within a given agent and trial; that is, no two processes in the same trial that are executing on the same agent are assigned the same rank.
- get_size() int
Return the number of slots this trial is running on.
- get_num_agents() int
Return the number of agents this trial is running on.
- gather(stuff: Any) Optional[List]
Gather
stuff
to the chief. The chief returns a list of all stuff, and workers returnNone
.gather()
is not a replacement for the gather functionality of your distributed training framework.
- gather_local(stuff: Any) Optional[List]
Gather
stuff
to the local chief. The local chief returns a list of all stuff, and local workers returnNone
.gather_local()
is not a replacement for the gather functionality of your distributed training framework.
- allgather(stuff: Any) List
Gather
stuff
to the chief and broadcast all of it back to the workers.allgather()
is not a replacement for the allgather functionality of your distributed training framework.
- allgather_local(stuff: Any) List
Gather
stuff
to the local chief and broadcast all of it back to the local workers.allgather_local()
is not a replacement for the allgather functionality of your distributed training framework.
- broadcast(stuff: Any) Any
Every worker gets the
stuff
sent by the chief.broadcast()
is not a replacement for the broadcast functionality of your distributed training framework.
- broadcast_local(stuff: Optional[Any] = None) Any
Every worker gets the
stuff
sent by the local chief.broadcast_local()
is not a replacement for the broadcast functionality of your distributed training framework.
determined.pytorch.PyTorchExperimentalContext
¶
- class determined.pytorch.PyTorchExperimentalContext(parent: Any)¶
- disable_auto_to_device() None ¶
Prevent the PyTorchTrialController from automatically moving batched data to device. Call this if you want to override the default behavior of moving all items of a list, tuple, and/or dict to the GPU. Then, you can control how data is moved to the GPU directly in the
train_batch
andevaluate_batch
methods of your PyTorchTrial definition. You should call context.to_device on primitive data types that you do want to move to GPU as in the example below.# PyTorchTrial methods. def __init__(context): # PyTorchTrial init self.context.experimental.disable_auto_to_device() ... def train_batch(self, context, batch): for k, item in batch.items(): if k == "img": batch["img"] = self.context.to_device(batch["img"]) ...
- disable_dataset_reproducibility_checks() None ¶
disable_dataset_reproducibility_checks()
allows you to return an arbitraryDataLoader
frombuild_training_data_loader()
orbuild_validation_data_loader()
.Normally you would be required to return a
det.pytorch.DataLoader
instead, which would guarantee that an appropriateSampler
is used that ensures:When
shuffle=True
, the shuffle is reproducible.The dataset will start at the right location, even after pausing/continuing.
Proper sharding is used during distributed training.
However, there may be cases where either reproducibility of the dataset is not needed or where the nature of the dataset may cause the
det.pytorch.DataLoader
to be unsuitable.In those cases, you may call
disable_dataset_reproducibility_checks()
and you will be free to return anytorch.utils.data.DataLoader
you like. Dataset reproducibility will still be possible, but it will be your responsibility. If desired, you may find theSampler
classes indetermined.pytorch.samplers
to be helpful.
- use_amp() None ¶
Handles all operations for the most simple cases automatically with a default gradient scaler. Specifically, wraps forward pass in an autocast context, scales loss before backward pass, unscales before clipping gradients, uses scaler when stepping optimizer(s), and updates scaler afterwards. Do not call
wrap_scaler
directly when using this method.PyTorch 1.6 or greater is required for this feature.
determined.pytorch.DataLoader
¶
- class determined.pytorch.DataLoader(dataset: torch.utils.data.dataset.Dataset, batch_size: Optional[int] = 1, shuffle: bool = False, sampler: Optional[torch.utils.data.sampler.Sampler] = None, batch_sampler: Optional[torch.utils.data.sampler.BatchSampler] = None, num_workers: int = 0, collate_fn: Optional[Callable[[List[determined.pytorch._data.T]], Any]] = None, pin_memory: bool = False, drop_last: bool = False, timeout: float = 0, worker_init_fn: Optional[Callable[[int], None]] = None, multiprocessing_context: Optional[Any] = None, generator: Optional[Any] = None, *, prefetch_factor: int = 2, persistent_workers: bool = False)¶
DataLoader is meant to contain a user’s Dataset, configuration for sampling data in batches, and performance configuration like multiprocessing.
The __init__ function determines the defaults in the same way as a torch.utils.data.DataLoader would, so the behavior should be familiar. However, the torch.utils.data.Dataloader that is used for training and validation is not created until get_data_loader(…) is called. This is done so that Determined can ensure that sampling restarts from the right location and distributed sampling is handled correctly.
Note that the arguments are from PyTorch.
- Parameters
dataset (Dataset) – dataset from which to load the data.
batch_size (int, optional) – how many samples per batch to load (default:
1
).shuffle (bool, optional) – set to
True
to have the data reshuffled at every epoch (default:False
).sampler (Sampler, optional) – defines the strategy to draw samples from the dataset. If specified,
shuffle
must beFalse
.batch_sampler (Sampler, optional) – like
sampler
, but returns a batch of indices at a time. Mutually exclusive withbatch_size
,shuffle
,sampler
, anddrop_last
.num_workers (int, optional) – how many subprocesses to use for data loading.
0
means that the data will be loaded in the main process. (default:0
)collate_fn (callable, optional) – merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset.
pin_memory (bool, optional) – If
True
, the data loader will copy Tensors into CUDA pinned memory before returning them. If your data elements are a custom type, or yourcollate_fn
returns a batch that is a custom type, see the example below.drop_last (bool, optional) – set to
True
to drop the last incomplete batch, if the dataset size is not divisible by the batch size. IfFalse
and the size of dataset is not divisible by the batch size, then the last batch will be smaller. (default:False
)timeout (numeric, optional) – if positive, the timeout value for collecting a batch from workers. Should always be non-negative. (default:
0
)worker_init_fn (callable, optional) – If not
None
, this will be called on each worker subprocess with the worker id (an int in[0, num_workers - 1]
) as input, after seeding and before data loading. (default:None
)generator (torch.Generator, optional) – If not
None
, this RNG will be used by RandomSampler to generate random indexes and multiprocessing to generate base_seed for workers. (default:None
)prefetch_factor (int, optional, keyword-only arg) – Number of samples loaded in advance by each worker.
2
means there will be a total of 2 * num_workers samples prefetched across all workers. (default:2
)persistent_workers (bool, optional) – If
True
, the data loader will not shut down the worker processes after a dataset has been consumed once. This allows to maintain the workers Dataset instances alive. (default:False
)
determined.pytorch.LRScheduler
¶
- class determined.pytorch.LRScheduler(scheduler: torch.optim.lr_scheduler._LRScheduler, step_mode: determined.pytorch._lr_scheduler.LRScheduler.StepMode, frequency: int = 1)¶
Wrapper for a PyTorch LRScheduler.
This wrapper fulfills two main functions:
Save and restore the learning rate when a trial is paused, preempted, etc.
Step the learning rate scheduler at the configured frequency (e.g., every batch or every epoch).
- class StepMode(value)¶
Specifies when and how scheduler.step() should be executed.
- STEP_EVERY_EPOCH¶
- STEP_EVERY_BATCH¶
- MANUAL_STEP¶
- STEP_EVERY_OPTIMIZER_STEP¶
- __init__(scheduler: torch.optim.lr_scheduler._LRScheduler, step_mode: determined.pytorch._lr_scheduler.LRScheduler.StepMode, frequency: int = 1)¶
LRScheduler constructor.
- Parameters
scheduler (
torch.optim.lr_scheduler._LRScheduler
) – Learning rate scheduler to be used by Determined.step_mode (
determined.pytorch.LRSchedulerStepMode
) –The strategy Determined will use to call (or not call) scheduler.step().
STEP_EVERY_EPOCH
: Determined will call scheduler.step() after everyfrequency
training epoch(s). No arguments will be passed to step().STEP_EVERY_BATCH
: Determined will call scheduler.step() after everyfrequency
training batch(es). No arguments will be passed to step(). This option does not take into account gradient aggregation;STEP_EVERY_OPTIMIZER_STEP
which is recommended.STEP_EVERY_OPTIMIZER_STEP
: Determined will call scheduler.step() in sync with optimizer steps. Withoptimizations.aggregation_frequency
unset, this is equivalent toSTEP_EVERY_BATCH
; when it is set, it ensures the LR scheduler is stepped every _effective_ batch.If the option
frequency
is set to some value N, Determined will step the LR scheduler every N optimizer steps.MANUAL_STEP
: Determined will not call scheduler.step() at all. It is up to the user to decide when to call scheduler.step(), and whether to pass any arguments.
frequency – Sets the frequency at which the batch and epoch step modes get triggered.
determined.pytorch.Reducer
¶
determined.pytorch.MetricReducer
¶
- class determined.pytorch.MetricReducer¶
Efficiently aggregating validation metrics during a multi-slot distributed trial is done in three steps:
Gather all the values to be reduced during the reduction window (either a training or a validation workload). In a multi-slot trial, this is done on each slot in parallel.
Calculate the per-slot reduction. This will return some intermediate value that each slot will contribute to the final metric calculation. It can be as simple as a list of all the raw values from step 1, but reducing the intermediate value locally will distribute the final metric calculation more efficiently and will reduce network communication costs.
Reduce the per-slot reduction values from Step 2 into a final metric.
The MetricReducer API makes it possible for users to define a maximally efficient custom metric by exposing these steps to users:
Step 1 is defined by the user; it is not part of the interface. This flexibility gives the user full control when gathering individual values for reduction.
Step 2 is the MetricReducer.per_slot_reduce() interface.
Step 3 is the MetricReducer.cross_slot_reduce() interface.
The MetricReducer.reset() interface allows for MetricReducer reuse across many train and validation workloads.
Example implementation and usage:
class MyAvgMetricReducer(pytorch.MetricReducer): def __init__(self): self.reset() def reset(self): self.sum = 0 self.counts = 0 # User-defined mechanism for collecting values throughout # training or validation. This update() mechanism demonstrates # a computationally- and memory-efficient way to store the values. def update(self, value): self.sum += sum(value) self.counts += 1 def per_slot_reduce(self): # Because the chosen update() mechanism is so # efficient, this is basically a noop. return self.sum, self.counts def cross_slot_reduce(self, per_slot_metrics): # per_slot_metrics is a list of (sum, counts) tuples # returned by the self.pre_slot_reduce() on each slot sums, counts = zip(*per_slot_metrics) return sum(sums) / sum(counts) class MyPyTorchTrial(pytorch.PyTorchTrial): def __init__(self, context): # Register your custom reducer. self.my_avg = context.wrap_reducer( MyAvgMetricReducer(), name="my_avg" ) ... def train_batch(self, batch, epoch_idx, batch_idx): ... # You decide how/when you call update(). self.my_avg.update(my_val) # The "my_avg" metric will be included in the final # metrics after the workload has completed; no need # to return it here. return {"loss": loss}
See also:
determined.pytorch.PyTorchExperimentalContext.wrap_reducer()
.- abstract reset() None ¶
Reset reducer state for another set of values.
This will be called before any train or validation workload begins.
- abstract per_slot_reduce() Any ¶
This will be called after all workers have finished (even when there is only one worker).
It should return some picklable value that is meaningful for cross_slot_reduce.
This will be called after any train or validation workload ends.
- abstract cross_slot_reduce(per_slot_metrics: List) Any ¶
This will be called after per_slot_reduce has finished (even when there is only one worker).
The per_slot_metrics will be a list containing the output of per_slot_reduce() from each worker.
- The return value should either be:
A dict mapping string metric names to metric values, if the call to context.wrap_reducer() omitted the name parameter, or
A non-dict metric value if the call to context.wrap_reducer() had name set to a string (an error will be raised if a dict-type metric is returned but name was set).
This will be called after per_slot_reduce.
determined.pytorch.samplers
¶
- class determined.pytorch.samplers.DistributedBatchSampler(batch_sampler: torch.utils.data.sampler.BatchSampler, num_workers: int, rank: int)¶
DistributedBatchSampler will iterate through an underlying batch sampler and return batches which belong to this shard.
DistributedBatchSampler is different from the PyTorch built-in torch.utils.data.distributed.DistributedSampler, because that DistributedSampler expects to bbe called before the BatchSampler, and additionally the DistributedSampler is meant to be a stand-alone sampler.
DistributedBatchSampler has the potential gotcha that when wrapping a non-repeating BatchSampler, if the length of the BatchSampler is not divisible by the number of replicas the length of the resulting DistributedBatchSampler will differ based on the rank. In that case, the divergent paths of multiple workers could cause problems during training. PyTorchTrial always uses RepeatBatchSampler during training, PyTorchTrial does not require that the workers stay in step during validation, so this potential gotcha is not a problem in Determined.
- class determined.pytorch.samplers.DistributedSampler(sampler: torch.utils.data.sampler.Sampler, num_workers: int, rank: int)¶
DistributedSampler will iterate through an underlying sampler and return samples which belong to this shard.
DistributedSampler is different from the PyTorch built-in torch.utils.data.DistributedSampler because theirs is meant to be a standalone sampler. Theirs does shuffling and assumes a constant size dataset as an input. Ours is meant to be used a building block in a chain of samplers, so it accepts a sampler as input that may or may not be constant-size.
- class determined.pytorch.samplers.RepeatBatchSampler(batch_sampler: torch.utils.data.sampler.BatchSampler)¶
RepeatBatchSampler yields infinite batches indices by repeatedly iterating through the batches of another BatchSampler. __len__ is just the length of the underlying BatchSampler.
- class determined.pytorch.samplers.RepeatSampler(sampler: torch.utils.data.sampler.Sampler)¶
RepeatSampler yields infinite batches indices by repeatedly iterating through the batches of another Sampler. __len__ is just the length of the underlying Sampler.
- class determined.pytorch.samplers.ReproducibleShuffleBatchSampler(batch_sampler: torch.utils.data.sampler.BatchSampler, seed: int)¶
ReproducibleShuffleBatchSampler will apply a deterministic shuffle based on a seed.
Warning
Always shuffle before skipping and before repeating. Skip-before-shuffle would break the reproducibility of the shuffle, and repeat-before-shuffle would cause the shuffle to hang as it iterates through an infinite sampler.
Warning
Always prefer ReproducibleShuffleSampler over this class when possible. The reason is that shuffling at the batch level results in a superior shuffle, where the contents of each batch are varied between epochs, rather than just the order of batches.
- class determined.pytorch.samplers.ReproducibleShuffleSampler(sampler: torch.utils.data.sampler.Sampler, seed: int)¶
ReproducibleShuffleSampler will apply a deterministic shuffle based on a seed.
Warning
Always shuffle before skipping and before repeating. Skip-before-shuffle would break the reproducibility of the shuffle, and repeat-before-shuffle would cause the shuffle to hang as it iterates through an infinite sampler.
- class determined.pytorch.samplers.SkipBatchSampler(batch_sampler: torch.utils.data.sampler.BatchSampler, skip: int)¶
SkipBatchSampler skips some batches from an underlying BatchSampler, and yield the rest.
Always skip before you repeat when you are continuing training, or you will apply the skip on every epoch.
Because the SkipBatchSampler is only meant to be used on a training dataset (we never checkpoint during evaluation), and because the training dataset should always be repeated before applying the skip (so you only skip once rather than many times), the length reported is always the length of the underlying sampler, regardless of the size of the skip.
- class determined.pytorch.samplers.SkipSampler(sampler: torch.utils.data.sampler.BatchSampler, skip: int)¶
SkipSampler skips some records from an underlying Sampler, and yields the rest.
Always skip before you repeat when you are continuing training, or you will apply the skip on every epoch.
Warning
When trying to achieve reproducibility after pausing and restarting, you should never prefer this SkipSampler over the SkipBatchSampler, unless you are sure that your dataset will always yield identically sized batches. This is due to how Determined counts batches trained but does not count records trained. Reproducibility when skipping records is only possible if the records to skip can be reliably calculated based on batch size and batches trained.
Because the SkipSampler is only meant to be used on a training dataset (we never checkpoint during evaluation), and because the training dataset should always be repeated before applying the skip (so you only skip once rather than many times), the length reported is always the length of the underlying sampler, regardless of the size of the skip.
determined.pytorch.PyTorchCallback
¶
- class determined.pytorch.PyTorchCallback¶
Abstract base class used to define a callback that should execute during the lifetime of a PyTorchTrial.
Warning
If you are defining a stateful callback (e.g., it mutates a
self
attribute over its lifetime), you must also overridestate_dict()
andload_state_dict()
to ensure this state can be serialized and deserialized over checkpoints.Warning
If distributed training is enabled, every GPU will execute a copy of this callback (except for
on_validation_end()
,on_validation_step_end()
andon_checkpoint_write_end()
). To configure a callback implementation to execute on a subset of GPUs, please condition your implementation ontrial.context.distributed.get_rank()
.- load_state_dict(state_dict: Dict[str, Any]) None ¶
Load the state of this using the deserialized
state_dict
.
- on_checkpoint_end(checkpoint_dir: str) None ¶
Deprecated. Please use
on_checkpoint_write_end()
instead.Warning
This callback only executes on the chief GPU when doing distributed training.
- on_checkpoint_load_start(checkpoint: Dict[str, Any]) None ¶
Run before state_dict is restored.
- on_checkpoint_save_start(checkpoint: Dict[str, Any]) None ¶
Run before checkpoint is persisted.
- on_checkpoint_upload_end(uuid: str) None ¶
Run after every checkpoint finishes uploading.
- on_checkpoint_write_end(checkpoint_dir: str) None ¶
Run after every checkpoint finishes writing to checkpoint_dir.
Warning
This callback only executes on the chief GPU when doing distributed training.
- on_training_epoch_end(epoch_idx: int) None ¶
Run on end of a training epoch
- on_training_epoch_start(epoch_idx: int) None ¶
Run on start of a new training epoch
- on_training_start() None ¶
Run after checkpoint loads and before training begins.
- on_training_workload_end(avg_metrics: Dict[str, Any], batch_metrics: Dict[str, Any]) None ¶
Run on end of a training workload. Workloads can contain varying numbers of batches. In the current implementation of PyTorchTrial, the maximum number of batches in a workload is equal to the
scheduling_unit
field defined in the experiment config.
- on_trial_shutdown() None ¶
Runs just before shutting down training to get off of the cluster. This does not imply that the trial is complete; it may just be paused or preempted by a higher-priority task.
Warning
This callback runs each time a Trial shuts down gracefully to come off the cluster. This callback does not mean that the Trial is done training. Additionally, if the trial is killed the container will be destroyed without this callback running.
- on_trial_startup(first_batch_idx: int, checkpoint_uuid: Optional[str]) None ¶
Runs before training, validation, or building dataloaders.
- Parameters
first_batch_idx (int) – The first batch index to be trained. If the trial has already completed some amount of training in a previous allocation on the cluster, this will be nonzero.
checkpoint_uuid (str or None) – The checkpoint from which weight, optimizer state, etc. will be loaded. When
first_batch_idx > 0
this will contain the uuid of the most recent checkpoint saved by this trial. Otherwise, it will contain the uuid of the checkpoint from which this trial was configured to warm start from (viasource_trial_id
orsource_checkpoint_uuid
in the searcher config), or None if no warm start was configured.
- on_validation_end(metrics: Dict[str, Any]) None ¶
Run after every validation ends.
- on_validation_epoch_end(outputs: List[Any]) None ¶
Run after a new validation epoch has finished
- on_validation_epoch_start() None ¶
Run on start of a new validation epoch
- on_validation_start() None ¶
Run before every validation begins.
- on_validation_step_end(metrics: Dict[str, Any]) None ¶
Run after every validation step ends.
- on_validation_step_start() None ¶
Run before every validation step begins.
- state_dict() Dict[str, Any] ¶
Serialize the state of this callback to a dictionary. Return value must be pickle-able.
determined.tensorboard.metric_writers.pytorch.TorchWriter
¶
- class determined.tensorboard.metric_writers.pytorch.TorchWriter¶
TorchWriter uses PyTorch file writers and summary operations to write out tfevent files containing scalar batch metrics. It creates an instance of
torch.utils.tensorboard.SummaryWriter
which can be accessed via thewriter
field and configures the SummaryWriter to write to the correct directory inside the trial container.Usage example:
from determined.tensorboard.metric_writers.pytorch import TorchWriter class MyModel(PyTorchTrial): def __init__(self, context): ... self.logger = TorchWriter() def train_batch(self, batch, epoch_idx, batch_idx): self.logger.writer.add_scalar('my_metric', np.random.random(), batch_idx)
determined.pytorch.load_trial_from_checkpoint_path
¶
- determined.pytorch.load_trial_from_checkpoint_path(path: str, **kwargs: Any) determined.pytorch._pytorch_trial.PyTorchTrial ¶
Loads a checkpoint written by a PyTorchTrial.
You should have already downloaded the checkpoint files, likely with
Checkpoint.download()
.The return value will be a restored instance of the subclass PyTorchTrial you used for training.
- Parameters
path (string) – Top level directory to load the checkpoint from.
kwargs (optional) – Any keyword arguments will be applied to
torch.load
. See documentation for torch.load.